首页 > 编程 > Python > 正文

浅谈python 线程池threadpool之实现

2020-02-16 10:45:22
字体:
来源:转载
供稿:网友

首先介绍一下自己使用到的名词:

工作线程(worker):创建线程池时,按照指定的线程数量,创建工作线程,等待从任务队列中get任务;

任务(requests):即工作线程处理的任务,任务可能成千上万个,但是工作线程只有少数。任务通过          makeRequests来创建

任务队列(request_queue):存放任务的队列,使用了queue实现的。工作线程从任务队列中get任务进行处理;

任务处理函数(callable):工作线程get到任务后,通过调用任务的任务处理函数即(request.callable_)具体     的     处理任务,并返回处理结果;

任务结果队列(result_queue):任务处理完成后,将返回的处理结果,放入到任务结果队列中(包括异常);

任务异常处理函数或回调(exc_callback):从任务结果队列中get结果,如果设置了异常,则需要调用异常回调处理异常;

任务结果回调(callback):从任务结果队列中get结果,对result进行进一步处理;

上一节介绍了线程池threadpool的安装和使用,本节将主要介绍线程池工作的主要流程:

(1)线程池的创建
(2)工作线程的启动
(3)任务的创建
(4)任务的推送到线程池
(5)线程处理任务
(6)任务结束处理
(7)工作线程的退出

下面是threadpool的定义:

class ThreadPool:   """A thread pool, distributing work requests and collecting results.    See the module docstring for more information.    """   def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):     pass   def createWorkers(self, num_workers, poll_timeout=5):     pass   def dismissWorkers(self, num_workers, do_join=False):     pass   def joinAllDismissedWorkers(self):     pass   def putRequest(self, request, block=True, timeout=None):     pass   def poll(self, block=False):     pass   def wait(self):     pass 

1、线程池的创建(ThreadPool(args))

task_pool=threadpool.ThreadPool(num_works)

task_pool=threadpool.ThreadPool(num_works)   def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):     """Set up the thread pool and start num_workers worker threads.      ``num_workers`` is the number of worker threads to start initially.      If ``q_size > 0`` the size of the work *request queue* is limited and     the thread pool blocks when the queue is full and it tries to put     more work requests in it (see ``putRequest`` method), unless you also     use a positive ``timeout`` value for ``putRequest``.      If ``resq_size > 0`` the size of the *results queue* is limited and the     worker threads will block when the queue is full and they try to put     new results in it.      .. warning:       If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is       the possibilty of a deadlock, when the results queue is not pulled       regularly and too many jobs are put in the work requests queue.       To prevent this, always set ``timeout > 0`` when calling       ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.      """     self._requests_queue = Queue.Queue(q_size)#任务队列,通过threadpool.makeReuests(args)创建的任务都会放到此队列中     self._results_queue = Queue.Queue(resq_size)#字典,任务对应的任务执行结果</span>     self.workers = []#工作线程list,通过self.createWorkers()函数内创建的工作线程会放到此工作线程list中     self.dismissedWorkers = []#被设置线程事件并且没有被join的工作线程     self.workRequests = {}#字典,记录任务被分配到哪个工作线程中</span>     self.createWorkers(num_workers, poll_timeout)             
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表