整理者:郑昀@ultrapower
我们采用的是i/o complete port(以下简称iocp)处理机制。
简单的讲,当服务应用程序初始化时,它应该先创建一个i/o cp。我们在请求到来后,将得到的数据打包用postqueuedcompletionstatus发送到iocp中。这时需要创建一些个线程(7个线程/每cpu,再多就没有意义了)来处理发送到iocp端口的消息。实现步骤大致如下:
1 先在主线程中调用createiocompletionport创建iocp。
createiocompletionport的前三个参数只在把设备同complete port相关联时才有用。
此时我们只需传递invalid_handle_value,null和0即可。
第四个参数告诉端口同时能运行的最多线程数,这里设置为0,表示默认为当前计算机的cpu数目。
2 我们的threadfun线程函数执行一些初始化之后,将进入一个循环,该循环会在服务进程终止时才结束。
在循环中,调用getqueuedcompletionstatus,这样就把当前线程的id放入一个等待线程队列中,i/o cp内核对象就总能知道哪个线程在等待处理完成的i/o请求。
如果在idle_thread_timeout规定的时间内i/o cp上还没有出现一个completion packet,则转入下一次循环。在这里我们设置的idle_thread_timeout为1秒。
当端口的i/o完成队列中出现一项时,完成端口就唤醒等待线程队列中的这个线程,该线程将得到完成的i/o项中的信息: 传输的字节数、完成键和overlapped结构的地址。
在我们的程序中可以用智能指针或者bstr或者int来接受这个overlapped结构的地址的值,从而得到消息;然后在这个线程中处理消息。
getqueuedcompletionstatus的第一个参数hcompletionport指出了要监视哪一个端口,这里我们传送先前从createiocompletionport返回的端口句柄。
需要注意的是:
第一, 线程池的数目是有限制的,和cpu数目有关系。
第二, iocp是一种较为完美的睡眠/唤醒 线程机制;线程当前没有任务要处理时,就进入睡眠状态,从而不占用cpu资源,直到被内核唤醒;
第三, 最近一次刚执行完的线程,下次任务来的时候还会唤醒它;所以有可能比较少被调用的线程以后被调用的几率也少。
 
测试代码: 
 
using system; 
using system.threading;  // included for the thread.sleep call 
using continuum.threading; 
using system.runtime.interopservices; 
 
namespace iocpdemo ![]()
![]()
{ 
    //============================================================================= ![]()
    /**//// <summary> sample class for the threading class </summary> 
    public class utilthreadingsample ![]()
    
{ 
        //*****************************************************************************    ![]()
        /**//// <summary> test method </summary> 
        static void main() ![]()
        
{ 
            // create the mssql iocp thread pool 
            iocpthreadpool pthreadpool = new iocpthreadpool(0, 10, 20, new iocpthreadpool.user_function(iocpthreadfunction)); 
       
            //for(int i =1;i<10000;i++) ![]()
            
{ 
                pthreadpool.postevent(1234); 
            } 
       
            thread.sleep(100); 
       
            pthreadpool.dispose(); 
        } 
     
        //******************************************************************** ![]()
        /**//// <summary> function to be called by the iocp thread pool.  called when 
        ///           a command is posted for processing by the socketmanager </summary> 
        /// <param name="ivalue"> the value provided by the thread posting the event </param> 
        static public void iocpthreadfunction(int ivalue) ![]()
        
{ 
            try ![]()
            
{ 
                console.writeline("value: {0}", ivalue.tostring()); 
                thread.sleep(3000); 
            } 
       
            catch (exception pexception) ![]()
            
{ 
                console.writeline(pexception.message); 
            } 
        } 
    } 
 
} 
 
类代码: 
using system; 
using system.threading; 
using system.runtime.interopservices; 
 
namespace iocpthreading ![]()
![]()
{ 
    [structlayout(layoutkind.sequential, charset=charset.auto)] 
 
    public sealed class iocpthreadpool ![]()
    
{ 
        [dllimport("kernel32", charset=charset.auto)] 
        private unsafe static extern uint32 createiocompletionport(uint32 hfile, uint32 hexistingcompletionport, uint32* puicompletionkey, uint32 uinumberofconcurrentthreads); 
 
        [dllimport("kernel32", charset=charset.auto)] 
        private unsafe static extern boolean closehandle(uint32 hobject); 
 
        [dllimport("kernel32", charset=charset.auto)] 
        private unsafe static extern boolean postqueuedcompletionstatus(uint32 hcompletionport, uint32 uisizeofargument, uint32* puiuserarg, system.threading.nativeoverlapped* poverlapped); 
 
        [dllimport("kernel32", charset=charset.auto)] 
        private unsafe static extern boolean getqueuedcompletionstatus(uint32 hcompletionport, uint32* psizeofargument, uint32* puiuserarg, system.threading.nativeoverlapped** ppoverlapped, uint32 uimilliseconds); 
 
        private const uint32 invalid_handle_value = 0xffffffff; 
        private const uint32 inifinite = 0xffffffff; 
        private const int32 shutdown_iocpthread = 0x7fffffff; 
        public delegate void user_function(int ivalue); 
        private uint32 m_hhandle; ![]()
        private uint32 gethandle 
{ get 
{ return m_hhandle; } set 
{ m_hhandle = value; } } 
 
        private int32 m_uimaxconcurrency; 
 ![]()
        private int32 getmaxconcurrency 
{ get 
{ return m_uimaxconcurrency; } set 
{ m_uimaxconcurrency = value; } } 
 
 
        private int32 m_iminthreadsinpool; 
 ![]()
        private int32 getminthreadsinpool 
{ get 
{ return m_iminthreadsinpool; } set 
{ m_iminthreadsinpool = value; } } 
 
        private int32 m_imaxthreadsinpool; 
 ![]()
        private int32 getmaxthreadsinpool 
{ get 
{ return m_imaxthreadsinpool; } set 
{ m_imaxthreadsinpool = value; } } 
 
 
        private object m_pcriticalsection; 
 ![]()
        private object getcriticalsection 
{ get 
{ return m_pcriticalsection; } set 
{ m_pcriticalsection = value; } } 
 
 
        private user_function m_pfnuserfunction; 
 ![]()
        private user_function getuserfunction 
{ get 
{ return m_pfnuserfunction; } set 
{ m_pfnuserfunction = value; } } 
 
 
        private boolean m_bdisposeflag; 
 ![]()
        /**//// <summary> simtype: flag to indicate if the class is disposing </summary> 
 ![]()
        private boolean isdisposed 
{ get 
{ return m_bdisposeflag; } set 
{ m_bdisposeflag = value; } } 
 
        private int32 m_icurthreadsinpool; 
 ![]()
        /**//// <summary> simtype: the current number of threads in the thread pool </summary> 
 ![]()
        public int32 getcurthreadsinpool 
{ get 
{ return m_icurthreadsinpool; } set 
{ m_icurthreadsinpool = value; } } 
 ![]()
        /**//// <summary> simtype: increment current number of threads in the thread pool </summary> 
 ![]()
        private int32 inccurthreadsinpool() 
{ return interlocked.increment(ref m_icurthreadsinpool); } 
 ![]()
        /**//// <summary> simtype: decrement current number of threads in the thread pool </summary> 
 ![]()
        private int32 deccurthreadsinpool() 
{ return interlocked.decrement(ref m_icurthreadsinpool); } 
 
 
        private int32 m_iactthreadsinpool; 
 ![]()
        /**//// <summary> simtype: the current number of active threads in the thread pool </summary> 
 ![]()
        public int32 getactthreadsinpool 
{ get 
{ return m_iactthreadsinpool; } set 
{ m_iactthreadsinpool = value; } } 
 ![]()
        /**//// <summary> simtype: increment current number of active threads in the thread pool </summary> 
 ![]()
        private int32 incactthreadsinpool() 
{ return interlocked.increment(ref m_iactthreadsinpool); } 
 ![]()
        /**//// <summary> simtype: decrement current number of active threads in the thread pool </summary> 
 ![]()
        private int32 decactthreadsinpool() 
{ return interlocked.decrement(ref m_iactthreadsinpool); } 
 
 
        private int32 m_icurworkinpool; 
 ![]()
        /**//// <summary> simtype: the current number of work posted in the thread pool </summary> 
 ![]()
        public int32 getcurworkinpool 
{ get 
{ return m_icurworkinpool; } set 
{ m_icurworkinpool = value; } } 
 ![]()
        /**//// <summary> simtype: increment current number of work posted in the thread pool </summary> 
 ![]()
        private int32 inccurworkinpool() 
{ return interlocked.increment(ref m_icurworkinpool); } 
 ![]()
        /**//// <summary> simtype: decrement current number of work posted in the thread pool </summary> 
 ![]()
        private int32 deccurworkinpool() 
{ return interlocked.decrement(ref m_icurworkinpool); } 
 
        public iocpthreadpool(int32 imaxconcurrency, int32 iminthreadsinpool, int32 imaxthreadsinpool, user_function pfnuserfunction) ![]()
        
{ 
            try ![]()
            
{ 
                // set initial class state 
 
                getmaxconcurrency   = imaxconcurrency; 
 
                getminthreadsinpool = iminthreadsinpool; 
 
                getmaxthreadsinpool = imaxthreadsinpool; 
 
                getuserfunction     = pfnuserfunction; 
 
 
                // init the thread counters 
 
                getcurthreadsinpool = 0; 
 
                getactthreadsinpool = 0; 
 
                getcurworkinpool    = 0; 
 
 
                // initialize the monitor object 
 
                getcriticalsection = new object(); 
 
 
                // set the disposing flag to false 
 
                isdisposed = false; 
 
 
                unsafe ![]()
                
{ 
 
                    // create an io completion port for thread pool use 
                    gethandle = createiocompletionport(invalid_handle_value, 0, null, (uint32) getmaxconcurrency); 
 
                } 
 
 
                // test to make sure the io completion port was created 
 
                if (gethandle == 0) 
 
                    throw new exception("unable to create io completion port"); 
 
 
                // allocate and start the minimum number of threads specified 
 
                int32 istartingcount = getcurthreadsinpool; 
 
         
 
                threadstart tsthread = new threadstart(iocpfunction); 
 
                for (int32 ithread = 0; ithread < getminthreadsinpool; ++ithread) ![]()
                
{ 
 
                    // create a thread and start it 
 
                    thread ththread = new thread(tsthread); 
 
                    ththread.name = "iocp " + ththread.gethashcode(); 
 
                    ththread.start(); 
 
 
                    // increment the thread pool count 
 
                    inccurthreadsinpool(); 
 
                } 
 
            } 
 
 
            catch ![]()
            
{ 
 
                throw new exception("unhandled exception"); 
 
            } 
 
        } 
 
        ~iocpthreadpool() ![]()
        
{ 
 
            if (!isdisposed) 
 
                dispose(); 
 
        } 
 
        public void dispose() ![]()
        
{ 
 
            try ![]()
            
{ 
 
                // flag that we are disposing this object 
 
                isdisposed = true; 
 
 
                // get the current number of threads in the pool 
 
                int32 icurthreadsinpool = getcurthreadsinpool; 
 
 
                // shutdown all thread in the pool 
 
                for (int32 ithread = 0; ithread < icurthreadsinpool; ++ithread) ![]()
                
{ 
                    unsafe ![]()
                    
{ 
 
                        bool bret = postqueuedcompletionstatus(gethandle, 4, (uint32*) shutdown_iocpthread, null); 
 
                    } 
 
                } 
 
 
                // wait here until all the threads are gone 
 
                while (getcurthreadsinpool != 0) thread.sleep(100); 
 
 
                unsafe ![]()
                
{ 
 
                    // close the iocp handle 
                    closehandle(gethandle); 
 
                } 
 
            } 
 
            catch ![]()
            
{ 
 
            } 
 
        } 
        private void iocpfunction() ![]()
        
{ 
            uint32 uinumberofbytes; 
 
            int32  ivalue; 
 
            try ![]()
            
{ 
                while (true) ![]()
                
{ 
 
                    unsafe ![]()
                    
{ 
 
                        system.threading.nativeoverlapped* pov; 
 
 
                        // wait for an event 
 
                        getqueuedcompletionstatus(gethandle, &uinumberofbytes, (uint32*) &ivalue, &pov, inifinite); 
                    } 
 
                    // decrement the number of events in queue 
 
                    deccurworkinpool(); 
 
 
                    // was this thread told to shutdown 
 
                    if (ivalue == shutdown_iocpthread) 
 
                        break; 
 
 
                    // increment the number of active threads 
 
                    incactthreadsinpool(); 
 
 
                    try ![]()
                    
{ 
                        // call the user function 
                        getuserfunction(ivalue); 
 
                    } 
 
                    catch(exception ex) ![]()
                    
{ 
                        throw ex; 
                    } 
 
 
                    // get a lock 
 
                    monitor.enter(getcriticalsection); 
 
 
                    try ![]()
                    
{ 
 
                        // if we have less than max threads currently in the pool 
 
                        if (getcurthreadsinpool < getmaxthreadsinpool) ![]()
                        
{ 
 
                            // should we add a new thread to the pool 
 
                            if (getactthreadsinpool == getcurthreadsinpool) ![]()
                            
{ 
 
                                if (isdisposed == false) ![]()
                                
{ 
 
                                    // create a thread and start it 
 
                                    threadstart tsthread = new threadstart(iocpfunction); 
 
                                    thread ththread = new thread(tsthread); 
 
                                    ththread.name = "iocp " + ththread.gethashcode(); 
 
                                    ththread.start(); 
 
 
                                    // increment the thread pool count 
 
                                    inccurthreadsinpool(); 
 
                                } 
 
                            } 
 
                        } 
 
                    } 
 
                    catch ![]()
                    
{ 
 
                    } 
 
 
                    // relase the lock 
 
                    monitor.exit(getcriticalsection); 
 
 
                    // increment the number of active threads 
 
                    decactthreadsinpool(); 
 
                } 
 
            } 
 
 
            catch(exception ex) ![]()
            
{ 
                string str=ex.message; 
 
            } 
 
 
            // decrement the thread pool count 
 
            deccurthreadsinpool(); 
 
        } 
 
        //public void postevent(int32 ivalue 
        public void postevent(int ivalue) ![]()
        
{ 
 
            try ![]()
            
{ 
 
                // only add work if we are not disposing 
 
                if (isdisposed == false) ![]()
                
{ 
 
                    unsafe ![]()
                    
{ 
 
                        // post an event into the iocp thread pool 
 
                        postqueuedcompletionstatus(gethandle, 4, (uint32*) ivalue, null); 
 
                    } 
 
 
                    // increment the number of item of work 
 
                    inccurworkinpool(); 
 
 
                    // get a lock 
 
                    monitor.enter(getcriticalsection); 
 
 
                    try ![]()
                    
{ 
 
                        // if we have less than max threads currently in the pool 
 
                        if (getcurthreadsinpool < getmaxthreadsinpool) ![]()
                        
{ 
 
                            // should we add a new thread to the pool 
 
                            if (getactthreadsinpool == getcurthreadsinpool) ![]()
                            
{ 
 
                                if (isdisposed == false) ![]()
                                
{ 
 
                                    // create a thread and start it 
 
                                    threadstart tsthread = new threadstart(iocpfunction); 
 
                                    thread ththread = new thread(tsthread); 
 
                                    ththread.name = "iocp " + ththread.gethashcode(); 
 
                                    ththread.start(); 
 
 
                                    // increment the thread pool count 
 
                                    inccurthreadsinpool(); 
 
                                } 
 
                            } 
 
                        } 
 
                    } 
 
 
                    catch ![]()
                    
{ 
 
                    } 
 
 
                    // release the lock 
 
                    monitor.exit(getcriticalsection); 
 
                } 
 
            } 
 
 
            catch (exception e) ![]()
            
{ 
 
                throw e; 
 
            } 
 
 
            catch ![]()
            
{ 
 
                throw new exception("unhandled exception"); 
 
            } 
 
        }   
 
        public void postevent() ![]()
        
{ 
 
            try ![]()
            
{ 
 
                // only add work if we are not disposing 
 
                if (isdisposed == false) ![]()
                
{ 
 
                    unsafe ![]()
                    
{ 
 
                        // post an event into the iocp thread pool 
 
                        postqueuedcompletionstatus(gethandle, 0, null, null); 
 
                    } 
 
 
                    // increment the number of item of work 
 
                    inccurworkinpool(); 
 
 
                    // get a lock 
 
                    monitor.enter(getcriticalsection); 
 
 
                    try 
 ![]()
                    
{ 
 
                        // if we have less than max threads currently in the pool 
 
                        if (getcurthreadsinpool < getmaxthreadsinpool) 
 ![]()
                        
{ 
 
                            // should we add a new thread to the pool 
 
                            if (getactthreadsinpool == getcurthreadsinpool) 
 ![]()
                            
{ 
 
                                if (isdisposed == false) 
 ![]()
                                
{ 
 
                                    // create a thread and start it 
 
                                    threadstart tsthread = new threadstart(iocpfunction); 
 
                                    thread ththread = new thread(tsthread); 
 
                                    ththread.name = "iocp " + ththread.gethashcode(); 
 
                                    ththread.start(); 
 
 
                                    // increment the thread pool count 
 
                                    inccurthreadsinpool(); 
 
                                } 
 
                            } 
 
                        } 
 
                    } 
 
 
                    catch 
 ![]()
                    
{ 
 
                    } 
 
 
                    // release the lock 
 
                    monitor.exit(getcriticalsection); 
 
                } 
 
            } 
 
            catch 
 ![]()
            
{ 
 
                throw new exception("unhandled exception"); 
 
            } 
 
        } 
 
    } 
 
} 
,欢迎访问网页设计爱好者web开发。
新闻热点
疑难解答