首页 > 学院 > 开发设计 > 正文

ThreadPoolExecutor机制探索-我们到底能走多远系列(41)

2019-11-14 23:03:25
字体:
来源:转载
供稿:网友
ThreadPoolExecutor机制探索-我们到底能走多远系列(41)我们到底能走多远系列(41)

扯淡:

  这一年过的不匆忙,也颇多感受,成长的路上难免弯路,这个世界上没人关心你有没有变强,只有自己时刻提醒自己,不要忘记最初出发的原因。

  其实这个世界上比我们聪明的人无数,很多人都比我们努力,当我门奇怪为什么他们可以如此轻松的时候,是不会问他们付出过什么。怨天尤人是无用的,使自己变好,哪怕是变好一点点,我觉得生活着就是有意义的。

  未来,太远。唯有不停的积累,不要着急,抓得住的才能叫机会。

  羊年,一定要不做被动的人。大家加油!

目录留白:

  * ArrayBlockingQueue

主题:

直接进ThreadPoolExecutor源码看一看:(版本是1.7.0)首先,这个线程池的状态是怎么样的呢?我们看下面的字段定义,ctl作为ThreadPoolExecutor的核心状态控制字段,包含来两个信息: 1,工作线程总数 workerCount 2,线程池状态RUNNING SHUTDOWNSTOPTIDYINGTERMINATED下面代码解释一下: COUNT_BITS 是32减去3 就是29,下面的线程池状态就是-1 到 3 分别向左移动29位。 如此,int的右侧29位,代表着线程数量,总数可以达到2的29次,29位后的3位代表线程池的状态这样,线程池增加一个线程,只需吧ctl加1即可,而我们也发现实际这个线程池的最高线程数量是2的29次减1。并不是先前我们现象的2的32次减1。这个作者在注释中也提到了,说如果后续需要增大这个值,可以吧ctl定义成AtomicLong。这个关键的控制字段的理解,对阅读源码很有帮助。
    PRivate final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));    private static final int COUNT_BITS = Integer.SIZE - 3;    private static final int CAPACITY  = (1 << COUNT_BITS) - 1;    // runState is stored in the high-order bits    private static final int RUNNING    = -1 << COUNT_BITS;// 111 00000000000000000000000000000    private static final int SHUTDOWN  =  0 << COUNT_BITS;// 000 00000000000000000000000000000    private static final int STOP      =  1 << COUNT_BITS;// 001 00000000000000000000000000000    private static final int TIDYING    =  2 << COUNT_BITS;// 010 00000000000000000000000000000    private static final int TERMINATED =  3 << COUNT_BITS;// 100 00000000000000000000000000000    // Packing and unpacking ctl    private static int runStateOf(int c)     { return c & ~CAPACITY; }//最高3位    private static int workerCountOf(int c)  { return c & CAPACITY; }//后29位    private static int ctlOf(int rs, int wc) { return rs | wc; }

代码里我们可能会这样使用ThreadPoolExecutor的方法:

Future<?> future = this.threadPoolExecutor.submit(runnable);

那么就从submit方法入手,这个submit的代码在 AbstractExecutorService,因为 ThreadPoolExecutor继承了它。

    public Future<?> submit(Runnable task) {        if (task == null) throw new NullPointerException();        RunnableFuture<Void> ftask = newTaskFor(task, null);        execute(ftask);        return ftask;    }
把task包装成RunnableFuture,然后执行execute,下面是ThreadPoolExecutor的execute方法:这个方法就是我们把任务提交给线程池去完成,至于线程池按照怎样的一个管理机制来完成这个task我们不关心,task关系的是run方法中的逻辑。如此,对于开发来说是极其方便的,配置一个线程池,只需一句代码,然后专心完成task的逻辑。那么,了解这个线程池的机制,我感觉只需要看下这个execute方法大概也明白了。特别是方法中的注释。1,当一个task被安排进来的时候,再确定不是空值后,直接判断在池中已经有工作的线程是否小于corePoolSize,小于则增加一个线程来负责这个task。2,如果池中已经工作的线程大于等于corePoolSize,就向队列里存task,而不是继续增加线程。3,当workQueue.offer失败时,也就是说task不能再向队列里放的时候,而此时工作线程大于等于corePoolSize,那么新进的task,就要新开一个线程来接待了。根据代码分析诸多判断和逻辑,而对于使用这个线程池的外部来说,机制是这样:a、如果正在运行的线程数 < corePoolSize,那就马上创建线程并运行这个任务,而不会进行排队。b、如果正在运行的线程数 >= corePoolSize,那就把这个任务放入队列。c、如果队列满了,并且正在运行的线程数 < maximumPoolSize,那么还是要创建线程并运行这个任务。d、如果队列满了,并且正在运行的线程数 >= maximumPoolSize,那么线程池就会调用handler里方法。(采用LinkedBlockingDeque就不会出现队列满情况)
/**     * Executes the given task sometime in the future.  The task     * may execute in a new thread or in an existing pooled thread.     *     * If the task cannot be submitted for execution, either because this     * executor has been shutdown or because its capacity has been reached,     * the task is handled by the current {@code RejectedExecutionHandler}.     *     * @param command the task to execute     * @throws RejectedExecutionException at discretion of     *         {@code RejectedExecutionHandler}, if the task     *         cannot be accepted for execution     * @throws NullPointerException if {@code command} is null     */    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /*         * Proceed in 3 steps:         *         * 1. If fewer than corePoolSize threads are running, try to         * start a new thread with the given command as its first         * task.  The call to addWorker atomically checks runState and         * workerCount, and so prevents false alarms that would add         * threads when it shouldn't, by returning false.         *         * 2. If a task can be successfully queued, then we still need         * to double-check whether we should have added a thread         * (because existing ones died since last checking) or that         * the pool shut down since entry into this method. So we         * recheck state and if necessary roll back the enqueuing if         * stopped, or start a new thread if there are none.         *         * 3. If we cannot queue task, then we try to add a new         * thread.  If it fails, we know we are shut down or saturated         * and so reject the task.         */        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }
单从execute方法,大概能了解整个线程池的工作机制。那么,全局的观看以下,我们一定明白这个ThreadPoolExecutor维护着一个池:
    /**     * Set containing all worker threads in pool. accessed only when     * holding mainLock.     */    private final HashSet<Worker> workers = new HashSet<Worker>();
猜测execute方法中的addWorker应该是向这个set中add一个worker,而这里面的worker里有一个线程,这个线程执行完成时,就会从这个set中remove掉。看一下开进程开始工作的addWorker方法:
  /*     * Methods for creating, running and cleaning up after workers     */    /**     * Checks if a new worker can be added with respect to current     * pool state and the given bound (either core or maximum). If so,     * the worker count is adjusted accordingly, and, if possible, a     * new worker is created and started, running firstTask as its     * first task. This method returns false if the pool is stopped or     * eligible to shut down. It also returns false if the thread     * factory fails to create a thread when asked.  If the thread     * creation fails, either due to the thread factory returning     * null, or due to an exception (typically OutOfMemoryError in     * Thread#start), we roll back cleanly.     *     * @param firstTask the task the new thread should run first (or     * null if none). Workers are created with an initial first task     * (in method execute()) to bypass queuing when there are fewer     * than corePoolSize threads (in which case we always start one),     * or when the queue is full (in which case we must bypass queue).     * Initially idle threads are usually created via     * prestartCoreThread or to replace other dying workers.     *     * @param core if true use corePoolSize as bound, else     * maximumPoolSize. (A boolean indicator is used here rather than a     * value to ensure reads of fresh values after checking other pool     * state).     * @return true if successful     */    private boolean addWorker(Runnable firstTask, boolean core) {        retry:        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);            // Check if queue empty only if necessary.            if (rs >= SHUTDOWN &&                ! (rs == SHUTDOWN &&                   firstTask == null &&                   ! workQueue.isEmpty()))                return false;            for (;;) {                int wc = workerCountOf(c);                if (wc >= CAPACITY ||                    wc >= (core ? corePoolSize : maximumPoolSize))                    return false;                if (compareAndIncrementWorkerCount(c))                    break retry;                c = ctl.get();  // Re-read ctl                if (runStateOf(c) != rs)                    continue retry;                // else CAS failed due to workerCount change; retry inner loop            }        }        boolean workerStarted = false;        boolean workerAdded = false;        Worker w = null;        try {            final ReentrantLock mainLock = this.mainLock;            w = new Worker(firstTask);            final Thread t = w.thread;            if (t != null) {                mainLock.lock();                try {                    // Recheck while holding lock.                    // Back out on ThreadFactory failure or if                    // shut down before lock acquired.                    int c = ctl.get();                    int rs = runStateOf(c);                    if (rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask == null)) {                        if (t.isAlive()) // precheck that t is startable                            throw new IllegalThreadStateException();                        workers.add(w);                        int s = workers.size();                        if (s > largestPoolSize)                            largestPoolSize = s;                        workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                if (workerAdded) {                    t.start();                    workerStarted = true;                }            }        } finally {            if (! workerStarted)                addWorkerFailed(w);        }        return workerStarted;    }
View Code

方法前面的retry循环,最终break的时候,执行compareAndIncrementWorkerCount(c),是的,最前面提到的ctl加1啦!这里利用CAS原则,可以参考先前的文章:摸我

    /**     * Attempt to CAS-increment the workerCount field of ctl.     */    private boolean compareAndIncrementWorkerCount(int expect) {        return ctl.compareAndSet(expect, expect + 1);    }
retry循环break之后,就是做核心的事,new一个worker出来然后add进set,然后启动worker里的thread。 我们注意到做把worker放入set这个操作前,先获取了锁,这个mainLock是类静态成员变量,是一个公用的可重入锁:
    /**     * Lock held on access to workers set and related bookkeeping.     * While we could use a concurrent set of some sort, it turns out     * to be generally preferable to use a lock. Among the reasons is     * that this serializes interruptIdleWorkers, which avoids     * unnecessary interrupt storms, especially during shutdown.     * Otherwise exiting threads would concurrently interrupt those     * that have not yet interrupted. It also simplifies some of the     * associated statistics bookkeeping of largestPoolSize etc. We     * also hold mainLock on shutdown and shutdownNow, for the sake of     * ensuring workers set is stable while separately checking     * permission to interrupt and actually interrupting.     */    private final ReentrantLock mainLock = new ReentrantLock();

其实调用这个addWorker方法有4种传参的方式:  1, addWorker(command, true);  2, addWorker(command, false);  3, addWorker(null, false);  4, addWorker(null, true);在execute方法中就使用了前3种,结合这个核心方法我们先进行一下分析。第一个:线程数小于corePoolSize时,放一个需要处理的task进worker set。如果worker set长度超过corePoolSize,就返回false。 第二个:当队列被放满时,就尝试将这个新来的task直接放入workerset,而此时workerset 的长度限制是maximumPoolSize。如果线程池也满了的话就返回false。 第三个:放入一个空的task进set,比较的的长度限制是maximumPoolSize。这样一个task为空的worker在线程执行的时候会判断出后去任务队列里拿任务,这样就相当于世创建了一个新的线程,只是没有马上分配任务。 第四个:这个方法就是放一个null的task进set,而且是在小于corePoolSize时。实际使用中是在 prestartCoreThread() 方法。这个方法用来为线程池先启动一个worker等待在那边,如果此时set中的数量已经达到corePoolSize那就返回false,什么也不干。还有是 prestartAllCoreThreads()方法,准备corePoolSize个worker:
   /**     * Starts all core threads, causing them to idly wait for work. This     * overrides the default policy of starting core threads only when     * new tasks are executed.     *     * @return the number of threads started     */    public int prestartAllCoreThreads() {        int n = 0;        while (addWorker(null, true))            ++n;        return n;    }
在addWorker中 t.start()使线程就绪,thread是怎么来的,就看下Worker的代码Worker类的源码:
/**     * Class Worker mainly maintains interrupt control state for     * threads running tasks, along with other minor bookkeeping.     * This class opportunistically extends AbstractQueuedSynchronizer     * to simplify acquiring and releasing a lock surrounding each     * task execution.  This protects against interrupts that are     * intended to wake up a worker thread waiting for a task from     * instead interrupting a task being run.  We implement a simple     * non-reentrant mutual exclusion lock rather than use     * ReentrantLock because we do not want worker tasks to be able to     * reacquire the lock when they invoke pool control methods like     * setCorePoolSize.  Additionally, to suppress interrupts until     * the thread actually starts running tasks, we initialize lock     * state to a negative value, and clear it upon start (in     * runWorker).     */    private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable    {        /**         * This class will never be serialized, but we provide a         * serialVersionUID to suppress a javac warning.         */        private static final long serialVersionUID = 6138294804551838833L;        /** Thread this worker is running in.  Null if factory fails. */        final Thread thread;        /** Initial task to run.  Possibly null. */        Runnable firstTask;        /** Per-thread task counter */        volatile long completedTasks;        /**         * Creates with given first task and thread from ThreadFactory.         * @param firstTask the first task (null if none)         */        Worker(Runnable firstTask) {            setState(-1); // inhibit interrupts until runWorker            this.firstTask = firstTask;            this.thread = getThreadFactory().newThread(this);        }        /** Delegates main run loop to outer runWorker  */        public void run() {            runWorker(this);        }        // Lock methods        //        // The value 0 represents the unlocked state.        // The value 1 represents the locked state.        protected boolean isHeldExclusively() {            return getState() != 0;        }        protected boolean tryAcquire(int unused) {            if (compareAndSetState(0, 1)) {                setExclusiveOwnerThread(Thread.currentThread());                return true;            }            return false;        }        protected boolean tryRelease(int unused) {            setExclusiveOwnerThread(null);            setState(0);            return true;        }        public void lock()        { acquire(1); }        public boolean tryLock()  { return tryAcquire(1); }        public void unlock()      { release(1); }        public boolean isLocked() { return isHeldExclusively(); }        void interruptIfStarted() {            Thread t;            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {                try {                    t.interrupt();                } catch (SecurityException ignore) {                }            }        }    }
View Code线程启动后就会调用run方法,也就是调用runWorker(Worker w),核心代码了,英文注释十分详细。在执行task之前会先执行beforeExecute,task结束后执行afterExecute,pool的扩展性利用:摸我
/**     * Main worker run loop.  Repeatedly gets tasks from queue and     * executes them, while coping with a number of issues:     *     * 1. We may start out with an initial task, in which case we     * don't need to get the first one. Otherwise, as long as pool is     * running, we get tasks from getTask. If it returns null then the     * worker exits due to changed pool state or configuration     * parameters.  Other exits result from exception throws in     * external code, in which case completedAbruptly holds, which     * usually leads processWorkerExit to replace this thread.     *     * 2. Before running any task, the lock is acquired to prevent     * other pool interrupts while the task is executing, and     * clearInterruptsForTaskRun called to ensure that unless pool is     * stopping, this thread does not have its interrupt set.     *     * 3. Each task run is preceded by a call to beforeExecute, which     * might throw an exception, in which case we cause thread to die     * (breaking loop with completedAbruptly true) without processing     * the task.     *     * 4. Assuming beforeExecute completes normally, we run the task,     * gathering any of its thrown exceptions to send to     * afterExecute. We separately handle RuntimeException, Error     * (both of which the specs guarantee that we trap) and arbitrary     * Throwables.  Because we cannot rethrow Throwables within     * Runnable.run, we wrap them within Errors on the way out (to the     * thread's UncaughtExceptionHandler).  Any thrown exception also     * conservatively causes thread to die.     *     * 5. After task.run completes, we call afterExecute, which may     * also throw an exception, which will also cause thread to     * die. According to JLS Sec 14.20, this exception is the one that     * will be in effect even if task.run throws.     *     * The net effect of the exception mechanics is that afterExecute     * and the thread's UncaughtExceptionHandler have as accurate     * information as we can provide about any problems encountered by     * user code.     *     * @param w the worker     */    final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            while (task != null || (task = getTask()) != null) {                w.lock();                // If pool is stopping, ensure thread is interrupted;                // if not, ensure thread is not interrupted.  This                // requires a recheck in second case to deal with                // shutdownNow race while clearing interrupt                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);        }    }
View Code

while循环条件:先取worker自己的task,如果没有,就是上面提到addWorker时task放null的那种,就调用getTask方法。

 /**     * Performs blocking or timed wait for a task, depending on     * current configuration settings, or returns null if this worker     * must exit because of any of:     * 1. There are more than maximumPoolSize workers (due to     *    a call to setMaximumPoolSize).     * 2. The pool is stopped.     * 3. The pool is shutdown and the queue is empty.     * 4. This worker timed out waiting for a task, and timed-out     *    workers are subject to termination (that is,     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})     *    both before and after the timed wait.     *     * @return task, or null if the worker must exit, in which case     *         workerCount is decremented     */    private Runnable getTask() {        boolean timedOut = false; // Did the last poll() time out?        retry:        for (;;) {            int c = ctl.get();            int rs = runStateOf(c);            // Check if queue empty only if necessary.            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {                decrementWorkerCount();                return null;            }            boolean timed;      // Are workers subject to culling?            for (;;) {                int wc = workerCountOf(c);                timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果线程池允许线程  timeout或者当前线程数大于核心线程数,则会进行timeout的处理                if (wc <= maximumPoolSize && ! (timedOut && timed))//如果线程小于最大值,也不需要timeout判断的,就直接退出                    break;                if (compareAndDecrementWorkerCount(c))//削减线程                    return null;                c = ctl.get();  // Re-read ctl                if (runStateOf(c) != rs)//状态再判断是否变化,发生变化需要重新再来                    continue retry;                // else CAS failed due to workerCount change; retry inner loop            }            try {               //keepAliveTime来控制获取queue中元素时的等待时间                Runnable r = timed ?                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                    workQueue.take();                if (r != null)                    return r;                timedOut = true;            } catch (InterruptedException retry) {                timedOut = false;            }        }    }

至此基本了解了ThreadPoolExecutor源码。在使用是也会更明了一些。

让我们继续前行

----------------------------------------------------------------------

努力不一定成功,但不努力肯定不会成功。


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表