最后,向大家介绍muduo库中对于线程池的封装的理解。
最重要的想法就是线程池将线程看为自己可执行的最小并且可随时增加的单位。
整个线程池对象维持两个任务队列,threads_表示目前正在运行中的线程池,queue_表示位于存储队列中的等待线程。
thread_在运行的过程中使用while循环+条件变量判断当前的活动线程池中是否有空位,以及新的等待线程进入线程池。
线程池从一开始就确定了自己将要运行的线程数目,不能在后面的运行中更改。
/***ThreadPool.h***/class ThreadPool : boost::noncopyable{ public: typedef boost::function<void ()> Task;//将线程池中的线程作为可替换的任务,以线程为基本单位放在线程池中运行 explicit ThreadPool(const string& nameArg = string("ThreadPool")); ~ThreadPool(); // Must be called before start(). // 设置线程池运行的最大的负载以及线程池中将要运行的线程 void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }// void setThreadInitCallback(const Task& cb) { threadInitCallback_ = cb; } void start(int numThreads);//启动一定数量的线程 void stop();//线程池运算停止 const string& name() const { return name_; } size_t queueSize() const;//返回正在排队等待的线程任务 // Could block if maxQueueSize > 0 void run(const Task& f);//将一个想要运行的线程放入线程池的任务队列#ifdef __GXX_EXPERIMENTAL_CXX0X__ void run(Task&& f);//C++11的移动方法,用于节省资源#endif PRivate: bool isFull() const;//判断线程队列是否已经满了 void runInThread();//真正让线程跑起来的函数 Task take();//获得任务队列的首个线程 mutable MutexLock mutex_;//互斥锁 Condition notEmpty_;//条件变量 Condition notFull_; string name_; Task threadInitCallback_;//线程池中执行的线程对象 boost::ptr_vector<muduo::Thread> threads_;//线程池 std::deque<Task> queue_;//排队执行的线程对象队列 size_t maxQueueSize_;//等待队列的最大数 bool running_;//是否已经启动};每一个加入线程池的线程都带有一个while循环,保证线程等待队列中的线程不会等待太久。即所有将加入线程池的线程都会进入线程等待队列接受检查。
start():线程池启动函数保证在调用时启动一定数量的线程。
stop():保证所有的正在运行的线程停止
queueSize():返回此时线程等待队列中的个数,用于判断线程等待队列是否为空
run():如果线程池为空,直接跑传入的线程。如果线程池等待队列满了,则当前控制流(线程)在notFull_上等待;否则将传入的线程加入线程等待队列,并且使用条件变量notEmpty_通知一条阻塞在该条件变量上的控制流(线程)。
take():如果当前线程等待队列为空并且线程池正在跑,则控制流(线程)阻塞在notEmpty_条件变量上。当条件变量被激活时(有线程对象加入呆线程等待队列),判断是否可以从线程等待队列中拿出一个线程对象,如果可以,则将使用条件变量notFull_通知run()中阻塞在–想加入队列但是队列没有空余位置的变量上。
isFull():返回在线程等待队列中的个数,用于判断是否可以将想要运行的线程放到线程等待队列中。
runInThread():如果线程启动函数不为空,则在此将线程的控制流交给用于初始化线程池的线程对象。当此线程对象运行结束的时候,并且此时的线程池还在运行,则线程池离开初始化模式,进入线程池的循环线程补充模式。这种模式控制着线程池中的线程数量:当有新的线程对象进入线程池,则当前的线程控制流交给将要执行的线程对象。也就是说线程池中的线程对象要么主动结束自己的‘life’,然后由线程池的线程补充模式决定将要进入线程池运行的线程对象。然后在后面的take()中使用条件变量完成新的线程进入线程池的同步。
/***ThreadPool.cc***/ThreadPool::ThreadPool(const string& nameArg) : mutex_(), notEmpty_(mutex_), notFull_(mutex_), name_(nameArg), maxQueueSize_(0), running_(false){}ThreadPool::~ThreadPool(){ if (running_) { stop(); }}void ThreadPool::start(int numThreads){ assert(threads_.empty());//首次启动,断言线程池为空 running_ = true; threads_.reserve(numThreads);//预分配空间 for (int i = 0; i < numThreads; ++i) { char id[32]; snprintf(id, sizeof id, "%d", i+1); threads_.push_back(new muduo::Thread( boost::bind(&ThreadPool::runInThread, this), name_+id)); threads_[i].start();//直接启动线程 } if (numThreads == 0 && threadInitCallback_)//只启动一条线程 { threadInitCallback_(); }}void ThreadPool::stop(){ { MutexLockGuard lock(mutex_); running_ = false; notEmpty_.notifyAll();//目的在于通知notempty条件变量 } for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1));//使用STL算法依次关闭线程池中运行的线程}size_t ThreadPool::queueSize() const{//获得排队任务执行队列的队列长度 MutexLockGuard lock(mutex_); return queue_.size();}void ThreadPool::run(const Task& task){ if (threads_.empty())//如果线程池为空,直接跑这条线程 { task(); } else { MutexLockGuard lock(mutex_); while (isFull())//如果线程等待队列满了,在notfull条件变量上等待 { notFull_.wait(); } assert(!isFull()); queue_.push_back(task);//现在线程池执行任务队列中有空位了 notEmpty_.notify();//notempty条件变量通知信息 }}#ifdef __GXX_EXPERIMENTAL_CXX0X__void ThreadPool::run(Task&& task){ if (threads_.empty()) { task(); } else { MutexLockGuard lock(mutex_); while (isFull()) { notFull_.wait(); } assert(!isFull()); queue_.push_back(std::move(task)); notEmpty_.notify(); }}#endifThreadPool::Task ThreadPool::take(){ MutexLockGuard lock(mutex_); // always use a while-loop, due to spurious wakeup while (queue_.empty() && running_)//如果线程队列为空并且线程池正在跑 {//在notempty条件变量上等待 notEmpty_.wait();//当前线程停下来等待,当队列不为空了继续跑 }//然后获得新任务 Task task;//创建一个新的任务 if (!queue_.empty()) { task = queue_.front();//获得队列中的头任务 queue_.pop_front();//弹出队列中的头任务 if (maxQueueSize_ > 0)//如果队列最大长度大于0 { notFull_.notify();//通知线程可以跑了 } } return task;//返回任务}bool ThreadPool::isFull() const{//用来判断线程队列是否已经满了 mutex_.assertLocked(); return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;}void ThreadPool::runInThread()//生成一个threadFunc对象{ try { if (threadInitCallback_)//如果线程启动函数不为空,直接启动 { threadInitCallback_(); } while (running_)//while循环保证线程任务等待队列中没有多余的任务 { Task task(take()); if (task) { task(); } } } catch (const Exception& ex) //异常捕捉过程 { fprintf(stderr, "exception caught in ThreadPool %s/n", name_.c_str()); fprintf(stderr, "reason: %s/n", ex.what()); fprintf(stderr, "stack trace: %s/n", ex.stackTrace()); abort(); } catch (const std::exception& ex) { fprintf(stderr, "exception caught in ThreadPool %s/n", name_.c_str()); fprintf(stderr, "reason: %s/n", ex.what()); abort(); } catch (...) { fprintf(stderr, "unknown exception caught in ThreadPool %s/n", name_.c_str()); throw; // rethrow }}硕爷的条件变量用的很精髓,非常精髓。
新闻热点
疑难解答