Tomat组件研究之ThreadPool
2024-07-21 02:14:09
供稿:网友
 
tomat组件研究之threadpool
   前几天曾向大家承诺,要完成thredpool,tomcat ssl的文章,今天终于有时间可以写一点。tomcat的thradpool不同于apache的common thradpool,tomcat的threadpool是专门为tomcat服务的,确切说是为tomcat处理http连接服务地。经过研究发现,apache用了及其难懂而又隐晦的方法写了这几个threadpool类,虽然仅简单的几个类,但其难理解的程度却是惊人的。在理解之后看,里面确实又值得我们学习的东西,但也有好多无用的东西。看来我们也不要盲目崇拜apache。废话少说,下面直入正题.
threadpool的class图及整体结构:
一.每个类的说明:
1.   org.apache.tomcat.util.threads.threadpool
   线程池类,本类仅维护一定数量的线程处理对象,而把具体执行操作的任务委派给其他对象(controlrunnable),apache并没有把过多的功能交给这个类,而仅只是让这个类维护线程的创建,销毁,取出,归还等功能。下面我们看该类的代码:
public class threadpool  {
(1)      线程池常量,对应的变量设置就不再列出了
    //最大线程数
    public static final int max_threads = 200;
//最大线程数的最小数(最大线程的数量不能小于这个数)
    public static final int max_threads_min = 10;
//最大空闲线程数
    public static final int max_spare_threads = 50;
//最小空闲线程数(当线程池初始化时就启动这么多线程)
    public static final int min_spare_threads = 4;
//最大等待时间(1分钟)
    public static final int work_wait_timeout = 60*1000;
(2)      start方法
 
 
//对每个线程实例本方法仅被调用一次
    public synchronized void start() {
        //是否停止线程
        stopthepool=false;
        //当前生成线程的数量
        currentthreadcount  = 0;
        //当前使用线程的数量
        currentthreadsbusy  = 0;
        //如果当前设置的各个参数不正确,调整一下
        adjustlimits();
        //生成空的线程池
        pool = new controlrunnable[maxthreads];
        //启动最小线数线程
        openthreads(minsparethreads);
        //启动监视线程,监视线程池内部状态
        monitor = new monitorrunnable(this);
}
(3)      openthreads方法
/**
* 启动指定数量(toopen)的线程
* 这个方法很是奇怪,这个toopen并不是本次打开的的线程数
* 而是本次要打开的和以前已经打开的线程数总和
*/
    protected void openthreads(int toopen) {
 
 
        if(toopen > maxthreads) {
            toopen = maxthreads;
        }
        //新打开的线程数放在已经存在的空闲线程后面(用数组存放)
        for(int i = currentthreadcount ; i < toopen ; i++) {
            pool[i - currentthreadsbusy] = new 
controlrunnable(this);
        }
        currentthreadcount = toopen;
    }
到这里我们感觉apache的做法好生奇怪,首先这个toopen,还有一点,以前我们写连接池时,都时用list作为容器,一般有个当前的空闲线程数,但apache偏偏用数组作为容器来存放线程,用数组就要维护每种线程(新的,使用的,空闲的)在数组中的下标,若用list这些问题就没了,我们只要get,add,remove一下就一切ok,非常方便。因为有了currentthreadsbusy,apache的当前的空闲线程数就必须用currentthreadcount- currentthreadsbusy计算得来。这就时我们为什么会看到上面那个奇怪得小循环。但用数组到底有什么好处呢,还是apache的人是猪头(靠,他们不可能是猪头)?,我们可能发现上面有个常量:
 
 
//最大线程数
    public static final int max_threads = 200;
           也就是说,默认最多池可以有200个线程,就是说有很多线程可能频繁地从池中
            取,放线程,如果用list效率将大打折扣,因此才用了数组。
(4)      findcontrolrunnable方法,取得一个可用线程
private controlrunnable findcontrolrunnable() {
        controlrunnable c=null;
 
 
        if ( stopthepool ) {
            throw new illegalstateexception();
        }
 
 
        //从池中取一个可用的线程.
        synchronized(this) {
            //当前所有的线程都被使用
            while (currentthreadsbusy == currentthreadcount) {
                 //当前的线程数量小于最大线程数
                if (currentthreadcount < maxthreads) {
                      //生成一定数量(minsparethreads)线程,这一点与
                   //我们做连接池时非常不一样,我们往往只生成一个新连
                   //接,看来apache的东西真是值得我们学习
                    int toopen = currentthreadcount + 
minsparethreads;
                    openthreads(toopen);
                } else {
                    logfull(log, currentthreadcount, 
                            maxthreads);
                      //如果所有线程(已到最大数)都被使用,则等待其他线
                   //程释放.
                    try {
                        this.wait();
                    }catch(interruptedexception e) {
                        log.error("unexpected exception", e);
                    }
                    // pool was stopped. get away of the pool.
                    if( stopthepool) {
                        break;
                    }
                }
            }
            // pool was stopped. get away of the pool.
            if(0 == currentthreadcount || stopthepool) {
                throw new illegalstateexception();
            }
                    
           
            int pos = currentthreadcount - currentthreadsbusy - 1;
            //经过上面一番折腾,在线程池里终于又有了空闲线程,下面取数
            //组里最后一个线程
            c = pool[pos];
 
 
              //释放当前线程池对该线程的引用
pool[pos] = null;
 
 
             //当然,使用线程的数量也要加1
       currentthreadsbusy++;
 
 
        }
        return c;
}
 
 
这个方法我们可以看出:
ø         线程池里存放的都是空闲线程
ø         新生成的线程放在已存在的线程后面(队列)
ø         当取一个线程时,取队列上最后一个可用线程
ø         当线程被取出去时,队列释放对该线程的引用,同时使用线程变量加1
ø         线程池对使用线程的维护仅通过currentthreadsbusy变量得已实现
 
 
(5)      returncontroller方法,归还一个线程对象到池中
这个方法还算好理解
    protected synchronized void returncontroller(controlrunnable c) {
        if(0 == currentthreadcount || stopthepool) {
            c.terminate();
            return;
        }
 
 
        //使用线程减1
        currentthreadsbusy--;
       //把释放得线程放在空闲线程队列得最后
        pool[currentthreadcount - currentthreadsbusy - 1] = c;
       //唤醒等待线程,告诉他们有可用的线程了,不要在那傻等
        notify();
}
到这里我们看到,用数组作为容器存放线程真复杂,总让我们小心翼翼地操作数组的下标,但没办法,还是这玩意效率高。
(6)      runit方法
这个看起来丑陋的小方法,却是tomat threadpool精华部分的入口,它是线程池提供给其他组件有能力参与线程池内部操作的接口。当有http请求进来的时候,tomcat会生成一个工作线程,然后传入到这个方法这行具体操作。
    public void runit(threadpoolrunnable r) {
        if(null == r) {
            throw new nullpointerexception();
        }
        //查找一个可用空闲线程处理具体任务
        controlrunnable c = findcontrolrunnable();
        c.runit(r);
}
(7)      shutdown方法,关闭线程池,释放资源
    public synchronized void shutdown() {
        if(!stopthepool) {
            stopthepool = true;
           //停止监听器
            monitor.terminate();
            monitor = null;
           //释放空闲线程
            for(int i = 0 ; i < (currentthreadcount - currentthreadsbusy - 1) ; i++) {
                try {
                    pool[i].terminate();
                } catch(throwable t) {
           log.error("ignored exception while shutting down thread pool", t);
                }
            }
           //重置使用线程使用标志
            currentthreadsbusy = currentthreadcount = 0;
            pool = null;
            notifyall();
        }
    }
前面我们说过,使用线程的维护仅通过currentthreadsbusy变量,因此对已经被使用的线程对象根本无法回收,只能简单地置currentthreadsbusy=0
 
 
2.   org.apache.tomcat.util.threads.monitorrunnable
这个类仅有的目的就是维护线程池的线程数量,看到这里我们不仅又对apache的做法怪了起来,为什么要大动干戈做一个线程类去维护线程数量?其实线程池中线程数量的维护完全可以放在findcontrolrunnable及returncontroller方法中,但因为这两个方法的频繁调用,就对效率产生了影响,因此,归根结底还是从效率方面作考虑。
 
 
(1)      run方法
        public void run() {
            while(true) {
                try {
                      //等待一段指定的时间,或有线程归还时唤醒本线程
                    synchronized(this) {
                        this.wait(work_wait_timeout);
                    }
                      //停止.
                    if(shouldterminate) {
                        break;
                    }
 
 
                    //调用线程池的方法进行线程维护.
                    p.checksparecontrollers();
 
 
                } catch(throwable t) {
           threadpool.log.error("unexpected exception", t);
                }
            }
}
(2)      checksparecontrollers方法
该方法属于threadpool类,由monitorrunnable类的run方法调用。
    protected synchronized void checksparecontrollers() {
 
 
        if(stopthepool) {
            return;
        }
       //当前空闲线程数量大于最大空闲线程,释放多余线程
        if((currentthreadcount - currentthreadsbusy) > maxsparethreads) {
           //应该释放的线程数
            int tofree = currentthreadcount -
                         currentthreadsbusy -
                         maxsparethreads;
 
 
            for(int i = 0 ; i < tofree ; i++) {
                controlrunnable c = pool[currentthreadcount - currentthreadsbusy - 1];
                c.terminate();
              //从后向前释放
                pool[currentthreadcount - currentthreadsbusy - 1] = null;
 
 
              //线程数量减1
                currentthreadcount --;
            }
        }
}
通过这个方法,我们要把握住两点:
ø         释放空闲线程时按从后向前的顺序
ø         释放线程时总线程的数量要随之减少
3.   org.apache.tomcat.util.threads.controlrunnable
本类是一个静态线程类,线程池里存放该类的实例。这个类主要被用来在线程池内部执行各种各样的操作。
(1)      构造函数controlrunnable
        controlrunnable(threadpool p) {
            
torun = null;
            //停止标志
            shouldterminate = false;
           //运行标志,构造函数时该线程不运行
            shouldrun = false;
            this.p = p;
           //类似线程本地数据操作,类threadwithattributes将稍后介绍
            t = new threadwithattributes(p, this);
            t.setdaemon(true);
            t.setname(p.getname() + "-processor" + p.getsequence());
            //启动线程threadwithattributes
t.start();
           //向池中增加线程
            p.addthread( t, this );
            nothdata=true;
}
可以看出该构造函数完成了以下几个功能:
ø         用线程池对象p和本身(this)构造了threadwithattributes对象
threadwithattributes是用来代替threadlocal对象的,它的作用是把线程数据的本地化,避免了线程之间数据的访问冲突,令一方面,它对线程属性的访问加以控制,阻止非信任的代码访问线程数据,我们将在下面作具体讲解。
ø         启动了threadwithattributes线程
ø         向池中增加线程
到这里我们可以看出,线程池里的线程存放在两个不同的地方:用数组维护的线程池和用hashtable维护的threads对象:
protected hashtable threads=new hashtable();
key:threadwithattributes对象
value: controlrunnable对象
向池里增加线程将引起下面方法的调用:
(2)      addthread,removethread方法
    public void addthread( thread t, controlrunnable cr ) {
        threads.put( t, cr );
        for( int i=0; i<listeners.size(); i++ ) {
            threadpoollistener tpl=(threadpoollistener)listeners.elementat(i);
            //通知监听器,有线程加入
            tpl.threadstart(this, t);
        }
}
    public void removethread( thread t ) {
        threads.remove(t);
        for( int i=0; i<listeners.size(); i++ ) {
            threadpoollistener tpl=(threadpoollistener)listeners.elementat(i);
           //通知监听器,有线程被删除
            tpl.threadend(this, t);
        }
}
看到这个方法,我们可能会回想起好多地方都在用listener进行一些处理,listener到底为何物?其实我们仔细观察一下就会发现,用listener处理其实是使用了gof23种模式种的observer模式。
(3)      关于observer模式
上面是一般的observer模式class图,如果subject不使用接口而用一个类,并且把subject的含义扩展一下,不是对其所有的属性而是部分属性作观察,则subject其实就是我们的threadpool类,observer其实就是listener接口。被观察的对象就是threadpool类的threads,当对threads作put,remove时就会调用所有被注册到threadpool类的listener方法,即上面的addthread,removethread。
observer模式的用意是:在对象间建立一个一对多的依赖关系,当一个对象的状态发生变化是,所有依赖于它的对象能获得通知并且能被随之自动更新。
(4)      java的observer模式
其实java已经将observer模式集成到语言里面,类java.util.observable相当于subject,java.util.observer就时observer接口,若要使用只要作简单的继承即可。但为了更好的扩展性及更明确的逻辑意义,threadpool类并无继承observable类,而是用了自己的实现方式。
(5)      runit方法
        public synchronized void runit(threadpoolrunnable 
torun) {
           this.torun = 
torun;
            shouldrun = true;
            this.notify();
}
该方法主要是运行一个指定的任务,具体的任务都被封装在threadpoolrunnable接口里,该方法要注意以下几点:
ø         该方法对每个线程仅被调用一次
ø         调用该方法不是马上运行threadpoolrunnable指定的任务,而是通知controlrunnable”可以执行任务”。
具体的任务执行在下面的run方法里。
(6)      run方法
        public void run() {
          try {
            while(true) {
                try {
                    synchronized(this) {
                     //当既不运行也不停止时,等待
                        if(!shouldrun && !shouldterminate) {
                            this.wait();
                        }
                    }
                  //停止
                    if( shouldterminate ) {
                            if( threadpool.log.isdebugenabled())
                            threadpool.log.debug( "terminate");
                            break;
                    }
                    try {
                     //初始化线程数据,仅一次
                        if(nothdata) {
                            if( 
torun != null ) {
                                object thdata[]=torun.getinitdata();
                                t.setthreaddata(p, thdata);
                            }
                            nothdata = false;
                        }
                  //执行操作
                  if(shouldrun) {
                     //运行threadrunnalbe接口
                     if( 
torun != null ) { 
                         torun.runit(t.getthreaddata(p));
                     //controlrunnable也提供一般runnable接口参与处理的机会
                     } else if( torunrunnable != null ) { 
                         torunrunnable.run();
                     } else {
                         if( threadpool.log.isdebugenabled())
                            threadpool.log.debug( "no 
torun ???");
                         }
                     }
                  } catch(throwable t) {      
                      //发生致命错误,从池中删除线程
                        shouldterminate = true;
                        shouldrun = false;
                        p.notifythreadend(this);
                    } finally {
                     //运行结束回收线程
                        if(shouldrun) {
                            shouldrun = false;
                            p.returncontroller(this);
                        }
                    }
 
 
                    if(shouldterminate) {
                        break;
                    }
                } catch(interruptedexception ie) {       
                  //当执行wait时可能发生的异常(尽管这种异常不太可能发生)
p.log.error("unexpected exception", ie);
                }
            }
          } finally {
              //线程池停止或线程运行中发生错误时,从池中删除线程
              p.removethread(thread.currentthread());
          }
}
        结合runit方法,run方法能很容易看懂。
4.   org.apache.tomcat.util.threads.threadpoollistener
前面我们曾提到过,该接口时observer模式的observer对象,该接口定义了两个方法:
        //当线程被创建时执行的方法
public void threadstart( threadpool tp, thread t);
       //当线程被停止时执行的方法
public void threadend( threadpool tp, thread t);
关于该接口的详细使用可以参考上面提到的observer模式。
5.   org.apache.tomcat.util.threads.threadwithattributes
threadwithattributes是一个特殊的线程,该线程用来存放其他线程的属性和数据,并且该类提供了类似threadlocal的功能,但比threadlocal效率更高。
(1)      构造函数threadwithattributes
    public threadwithattributes(object control, runnable r) {
        super(r);
        this.control=control;
}
用control(threadpool)和r(controlrunnable)构造实例(具体可参见controlrunnable的构造方法)
(2)      setnote方法
    public final void setnote( object control, int id, object value ) {
        if( this.control != control ) return;
        notes[id]=value;
}
ø         用controlrunnable构造一个新的threadwithattributes对象避免了线程公用数据的争夺
ø         根据control设置线程属性,通过control可以阻止非信任的代码操作线程属性。
对其他操作线程属性的方法都比较简单就不再一一列出。
(3)      java的threadlocal
java.lang.threadlocal是在java1.2中出现的“线程局部变量”,它为每个使用它的线程提供单独的线程局部变量值的副本。每个线程只能看到与自己相联系的值,而不知道别的线程可能正在使用或修改它们自己的副本。“线程局部变量”是一种能简化多线程编程的好方法,可惜的是多数开发者可能不了解它。具体的信息可以参考:
http://www-900.ibm.com/developerworks/cn/java/j-threads/index3.shtml
6.   org.apache.tomcat.util.threads.threadpoolrunnable
前面我们提到过,如果想把自己的代码嵌入到线程池内部被执行,就必须实现该接口。具体可以参照controlrunnable的run方法。这个接口定义了下面两个方法:
(1)      getinitdata方法
public object[] getinitdata();
取得运行该对象所需要的初始化数据,对池中所有的线程来说应该返回相同类型的数据,否则处理机制将变的很复杂。
(2)      runit方法
public void runit(object thdata[]);
嵌入执行的代码将在这个方法里得以体现,以后我们将会看到,对tcp connection得处理也是在这里进行的。
至此,tomcat threadpool的介绍就算基本结束,对tomcat threadpool始终要把握住下面几点:
ø         tomcat threadpool仅提供了对线程的管理维护功能
ø         池所执行的操作有外部组件去实现
ø         从池的设计可以看出一点面向组件(cop)编程的痕迹
二.threadpool在处理tcp connection中的应用
在接下来的内容中我们将演示tomat是如何在指定的端口监听http连接,并利用threadpool生成一个线程处理接受的请求。
1.   org.apache.tomcat.util.net.pooltcpendpoint
类pooltcpendpoint主要是被用来处理接受到的http连接,处理方式是处理原始的socket,下面我们看几个重要的方法:
(1)      initendpoint方法
对该方法,现在我们可以暂时不要考虑太多,只要知道在初始化serversocket的工作就足够了。
    public void initendpoint() throws ioexception, instantiationexception {
    try {
        //创建serversocket工厂
        if(factory==null)
            factory=serversocketfactory.getdefault();
        
        //创建serversocket,将被用于在指定的端口(8080)监听连接
        if(serversocket==null) {
                try {
                    if (inet == null) {
                        serversocket = factory.createsocket(port, backlog);
                    } else {
                        serversocket = factory.createsocket(port, backlog, inet);
                    }
                } catch ( bindexception be ) {
                    throw new bindexception(be.getmessage() + ":" + port);
                }
        }
        //设定连接的超时限制时间
         if( servertimeout >= 0 )
            serversocket.setsotimeout( servertimeout );
    } catch( ioexception ex ) {
            throw ex;
    } catch( instantiationexception ex1 ) {
            throw ex1;
    }
       //保证初始化一次
        initialized = true;
}
(2)      startendpoint方法
该方法将在tocmat启动时被调用,主要作用时启动线程池并生成监听线程。
    public void startendpoint() throws ioexception, instantiationexception {
        if (!initialized) {
            initendpoint();
        }
     //tp是外部组件传进来的threadpool对象,这里tomcat启动了该线程池
  if(ispool) {
      tp.start();
  }
      running = true;
        //生成工作线程监听http连接
if(ispool) {
          listener = new tcpworkerthread(this);
          tp.runit(listener);
        } else {
      log.error("xxx error - need pool !");
  }
}
下面将向大家描述,工作线程是如何监听http连接的:
2.   org.apache.tomcat.util.net.tcpworkerthread
该类是pooltcpendpoint的内部类,它实现了threadpoolrunnable接口执行http连接监听和请求处理。(class tcpworkerthread implements threadpoolrunnable)
(1)      构造函数tcpworkerthread
该方法的主要目的是通过pooltcpendpoint对象生成一个实例,并且在缓存中生成一定数量的tcpconnection对象。
    public tcpworkerthread(pooltcpendpoint endpoint) {
    this.endpoint = endpoint;
    if( usepool ) {
       //缓存初始化simplepool为缓存对象,可先不理会其实现细节
        connectioncache = new simplepool(endpoint.getmaxthreads());
        for(int i = 0;i< endpoint.getmaxthreads()/2 ; i++) {
       connectioncache.put(new tcpconnection());
        }
      }
}
我们目的是先弄清楚http的监听及处理,对其他细节可先不于深究。
(2)      getinitdata方法
对该方法的描述前面已经说过,大家还记得否?本方法主要是取得线程的初始化数据。
    public object[] getinitdata() {
    if( usepool ) {
        return endpoint.getconnectionhandler().init();
    } else {
        object obj[]=new object[2];
       //第二个参数存放http请求处理器(可先不考虑细节)
        obj[1]= endpoint.getconnectionhandler().init();
       //第一个参数存放tcpconnection对象
        obj[0]=new tcpconnection();
        return obj;
    }
}
关于第二个参数,其实是初始化了http请求处理器及其他的信息,大家可先不究其细节。只要能认识到这个方法是返回线程初始化数据即可。
(3)      runit方法
前面我们说过,嵌入到线程池执行的代码要写在这个方法里,这个方法是http监听的核心,我们看具体实现:
    public void runit(object perthrdata[]) {
       if (endpoint.isrunning()) {
           socket s = null;
           //在指定的端口(8080)监听客户端连接
           try {
               s = endpoint.acceptsocket();
           } finally {
              //当接受到一个连接后继续启动下一个线程进行监听
               if (endpoint.isrunning()) {
                   endpoint.tp.runit(this);
               }
           }
           if (null != s) {
           try {
               if(endpoint.getserversocketfactory()!=null) {
    //客户端与服务器第一次握手,主要用于ssi连接(即https)              endpoint.getserversocketfactory().handshake(s);
               }             
            } catch (throwable t) {
           pooltcpendpoint.log.debug("handshake failed", t);
                try {
                    s.close();
                } catch (ioexception e) {
                }
                return;
            }
           tcpconnection con = null;
           try {
               if( usepool ) {
                  //从缓存中取一个tcpconnection对象
                  con=(tcpconnection)connectioncache.get();
                  
                  if( con == null ) {
                      con = new tcpconnection();
                  }
               } else {
                  //若不使用缓存从初始化数据中取一个tcpconnection对象
                          con = (tcpconnection) perthrdata[0];
                          perthrdata = (object []) perthrdata[1];
               }
               //设定刚生成tcpconnection对象
               con.setendpoint(endpoint);
               con.setsocket(s);
               endpoint.setsocketoptions( s );
//把tcpconnection及所需要的初始化数据传给http处理器处理
//在process处理中将把原始的socket流解析成request对象传
//给容器调用
endpoint.getconnectionhandler().processconnection(con, perthrdata);
             } catch (socketexception se) {
               try {
                   s.close();
               } catch (ioexception e) {}
            } catch (throwable t) {
               try {
                   s.close();
               } catch (ioexception e) {}
            } finally {
               if (con != null) {
                   con.recycle();
                   if (usepool) {
                       connectioncache.put(con);
                   }
               }
            }
         }
       }
}
请大家仔细而反复的多看一下上面带阴影的注释。通过上面我们看到工作线程作了如下的工作:
ø         启动了线程池(线程池启动时将生成指定数量的线程及监视线程)
ø         如果使用缓冲处理则预先生成指定数量的tcpconnection对象
ø         在指定的端口(默认是8080)监听http连接
ø         当接收的一个连接时再启动一个线程继续监听连接
ø         用接收的连接生成tcpconnection对象,即tomcat对http的处理是以tcpconnection对象为基础的
ø         把生成的tcpconnection对象交由http process进行socket解析,最终生成request对象
要注意的是:tomcat并不是事先用指定数量的线程在端口监听,而是当一个监听完成后再启动下一个监听线程。
,欢迎访问网页设计爱好者web开发。