思路
借助python当中threading模块与Queue模块组合可以方便的实现基于生产者-消费者模型的多线程模型。Jimmy大神的tushare一直是广大python数据分析以及业余量化爱好者喜爱的免费、开源的python财经数据接口包。
平时一直有在用阿里云服务器通过tushare的接口自动落地相关财经数据,但日复权行情数据以往在串行下载的过程当中,速度比较慢,有时遇到网络原因还需要重下。每只股票的行情下载过程中都需要完成下载、落地2个步骤,一个可能需要网络开销、一个需要数据库mysql的存取开销。2者原本就可以独立并行执行,是个典型的“生产者-消费者”模型。
基于queue与threading模块的线程使用一般采用以下的套路:
producerQueue=Queue()consumerQueue=Queue()lock = threading.Lock()class producerThead(threading.Thread): def __init__(self, producerQueue,consumerQueue): self.producerQueue=producerQueue self.consumerQueue=consumerQueue def run(self): while not self.thread_stop: try: #接收任务,如果连续20秒没有新的任务,线程退出,否则会一直执行 item=self.producerQueue.get(block=True, timeout=20) #阻塞调用进程直到有数据可用。如果timeout是个正整数, #阻塞调用进程最多timeout秒, #如果一直无数据可用,抛出Empty异常(带超时的阻塞调用) except Queue.Empty: print("Nothing to do!thread exit!") self.thread_stop=True break #实现生产者逻辑,生成消费者需要处理的内容 consumerQueue.put(someItem) #还可以边处理,边生成新的生产任务 doSomethingAboutProducing() self.producerQueue.task_done() def stop(self): self.thread_stop = Trueclass consumerThead(threading.Thread): def __init__(self,lock, consumerQueue): self.consumerQueue=consumerQueue def run(self): while true: try: #接收任务,如果连续20秒没有新的任务,线程退出,否则会一直执行 item=self.consumerQueue.get(block=True, timeout=20) #阻塞调用进程直到有数据可用。如果timeout是个正整数, #阻塞调用进程最多timeout秒, #如果一直无数据可用,抛出Empty异常(带超时的阻塞调用) except Queue.Empty: print("Nothing to do!thread exit!") self.thread_stop=True break doSomethingAboutConsuming(lock)# 处理消费者逻辑,必要时使用线程锁 ,如文件操作等 self.consumerQueue.task_done()#定义主线程def main(): for i in range(n):#定义n个i消费者线程 t = ThreadRead(producerQueue, consumerQueue) t.setDaemon(True) t.start() producerTasks=[] #定义初始化生产者任务队列 producerQueue.put(producerTasks) for i in range(n):#定义n个生产者钱程 t = ThreadWrite(consumerQueue, lock) t.setDaemon(True) t.start() stock_queue.join() data_queue.join()
相关接口
1,股票列表信息接口
作用
获取沪深上市公司基本情况。属性包括:
新闻热点
疑难解答