线程池的源码及原理[JDK1.6实现]
1.线程池的包含的内容123456789101112131415161718192021222324 | //流程就是:没达到corePoolSize,创建worker执行,达到corePoolSize加入workQueue//workQueue满了且在maximumPoolSize下,创建新worker,达到maximumPoolSize,执行rejectpublicvoidexecute(Runnablecommand){if(command==null)thrownewNullPointerException();//1:poolSize达到corePoolSize,执行3把任务加入workQueue//2:poolSize没达到,执行addIfUnderCorePoolSize()在corePoolSize内创建新worker立即执行任务//如果达到corePoolSize,则同上执行3if(poolSize>=corePoolSize||!addIfUnderCorePoolSize(command)){//3:workQueue满了,执行5if(runState==RUNNING&&workQueue.offer(command)){if(runState!=RUNNING||poolSize==0){//4:如果线程池关闭,执行拒绝策略//如果poolSize==0,新启动一个线程执行队列内任务ensureQueuedTaskHandled(command);}//5:在maximumPoolSize内创建新worker立即执行任务//如果达到maximumPoolSize,执行6拒绝策略}elseif(!addIfUnderMaximumPoolSize(command))//6:拒绝策略reject(command);//isshutdownorsaturated}} |
1234567891011121314 | publicvoidrun(){try{Runnabletask=firstTask;firstTask=null;//getTask()是从workQueue里面阻塞获取任务,如果getTask()返回null则终结本线程while(task!=null||(task=getTask())!=null){runTask(task);task=null;}}finally{//走到这里代表这个worker或者说这个线程由于线程池关闭或超过aliveTime需要关闭了workerDone(this);}} |
123456789101112131415161718192021222324252627282930 | RunnablegetTask(){for(;;){try{intstate=runState;if(state>SHUTDOWN)returnnull;Runnabler;if(state==SHUTDOWN)//Helpdrainqueuer=workQueue.poll();elseif(poolSize>corePoolSize||allowCoreThreadTimeOut)//在poolSize大于corePoolSize或允许核心线程超时时//阻塞超时获取有可能获取到null,此时worker线程销毁r=workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS);elser=workQueue.take();if(r!=null)returnr;//这里是是否运行worker线程销毁的判断if(workerCanExit()){if(runState>=SHUTDOWN)//STOP或TERMINATED状态,终止空闲workerinterruptIdleWorkers();returnnull;// 这里返回null,代表工作线程worker销毁}//其他:retry,继续循环}catch(InterruptedExceptionie){//Oninterruption,re-checkrunState}}} |
taskCount:线程池需要执行的任务数量。completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不+ getActiveCount:获取活动的线程数。通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。8.线程池调优[更多可参考:线程池与工作队列]调整线程池的大小 - 线程池的最佳大小取决于可用处理器的数目以及工作队列中的任务的性质。调整线程池的大小基本上就是避免两类错误:线程太少或线程太多。a.CPU限制的任务,提高CPU利用率。 在运行于具有 N 个处理器机器上的计算限制的应用程序中,在线程数目接近 N 时添加额外的线程可能会改善总处理能力,而在线程数目超过 N 时添加额外的线程将不起作用。事实上,太多的线程甚至会降低性能,因为它会导致额外的环境切换开销。 若在一个具有 N 个处理器的系统上只有一个工作队列,其中全部是计算性质的任务,在线程池具有N 或 N+1个线程时一般会获得最大的 CPU 利用率。b.I/O限制的任务(例如,从套接字读取 HTTP 请求的任务) 需要让池的大小超过可用处理器的数目,因为并不是所有线程都一直在工作。通过使用概要分析,您可以或得一些数据,并计算出大概的线程池大小。 Amdahl 法则提供很好的近似公式。用 WT 表示每项任务的平均等待时间,ST 表示每项任务的平均服务时间(计算时间)。则 WT/ST 是每项任务等待所用时间的百分比。对于 N 处理器系统,池中可以近似有N*(1+WT/ST) 个线程。c.综合考虑线程池性能瓶颈 a.处理器利用率 b.随着线程池的增长,您可能会碰到调度程序、可用内存方面的限制,或者其它系统资源方面的限制,例如套接字、打开的文件句柄或数据库连接等的数目。9.线程池扩展 - 延时线程池ScheduledThreadPoolExecutor ScheduledThreadPoolExecutor是在普通线程池的基础上增加了两个功能,一是延时执行+定时执行,二是重复执行 定时Executor的流程在大体上与普通线程池一致,因此它继承于ThreadPoolExecutor,对于问题1,它采用了DelayedQueue来实现此功能。对于问题2,定时Executor每次执行完调用ThreadPoolExecutor.runAndReset()重置状态,然后重新把任务加入到Delayed队列中 定时Executor在外部Runnable的基础上套了一个ScheduledFutureTask,其核心源码如下:Java Code 普通任务的外部封装Future
1234567891011121314151617181920212223242526272829303132 | //加入的任务外部封装了ScheduledFutureTask,继承于FutureTask,因此也可以获取任务结果PRivateclassScheduledFutureTask<V>extendsFutureTask<V>implementsRunnableScheduledFuture<V>{//省略部分代码//周期性运行,执行完成就把任务加入到delay队列中privatevoidrunPeriodic(){//这里重置线程池状态booleanok=ScheduledFutureTask.super.runAndReset();booleandown=isShutdown();//Rescheduleifnotcancelledandnotshutdownorpolicyallowsif(ok&&(!down||(getContinueExistingPeriodicTasksAfterShutdownPolicy()&&!isTerminating()))){longp=period;if(p>0)time+=p;elsetime=now()-p;//重复把任务加入到线程池delay队列中ScheduledThreadPoolExecutor.super.getQueue().add(this);}elseif(down)interruptIdleWorkers();}//线程池调用的run方法publicvoidrun(){if(isPeriodic())runPeriodic();elseScheduledFutureTask.super.run();}} |
1234567891011121314151617181920212223242526272829 | publicclassExecutorCompletionService<V>implementsCompletionService<V>{//部分代码省略//外部Future的封装类privateclassQueueingFutureextendsFutureTask<Void>{QueueingFuture(RunnableFuture<V>task){super(task,null);this.task=task;}//这里把Future加入到completionQueueprotectedvoiddone(){completionQueue.add(task);}privatefinalFuture<V>task;}publicFuture<V>submit(Callable<V>task){if(task==null)thrownewNullPointerException();RunnableFuture<V>f=newTaskFor(task);//对f外层又包了一层QueueingFutureexecutor.execute(newQueueingFuture(f));returnf;}//外部则可通过completionQueue来获取已完成的任务FuturepublicFuture<V>take()throwsInterruptedException{returncompletionQueue.take();}} |
新闻热点
疑难解答