首页 > 编程 > Python > 正文

Python并发机制(三)——异步I/O(select & poll)

2019-11-11 03:33:17
字体:
来源:转载
供稿:网友

python异步机制(三)——I/O多路复用

select

select通过一个select()系统调用来监视多个文件描述符组成的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,select本质上是通过设置或检查存放fd标志位的数据结构来进行下一步处理

优点:

具有良好的跨平台性

缺点:

单个进程可监视的fd数量被限制(在linux上一般为1024)需要维护一个用来存放大量fd的数据结构(数组:线性表),使得用户空间和消耗大

服务器端:

#!/usr/bin/python#coding=utf-8import socket,sys,select,Queuehost = (sys.argv[1], int(sys.argv[2]))sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)#阻塞与端口复用sock.setblocking(False)#SOL_SOCKET(套接字描述符),SO_REUSEADDR(端口复用,让端口释放后可以被立即使用)sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)sock.bind(host)sock.listen(5)rlist = [sock]wlist = []msg_que = {}timeout = 60while True: rs, ws ,es = select.select(rlist, wlist, rlist, timeout) if not (rs or ws or es): PRint "timeout..." continue for r in rs: if r is sock: conn,addr = r.accept() print "connected by %s"%addr[1] conn.setblocking(False) rlist.append(conn) msg_que[conn] = Queue.Queue() else: data = r.recv(1024) if data: print data msg_que[r].put(data) if r not in wlist: wlist.append(r) else: if r in wlist: wlist.remove(r) rlist.remove(r) r.close del msg_que[r] for w in ws: try: #等同于get(0) msg = msg_que[w].get_nowait() except Queue.Empty: print 'msg empty' wlist.remove(w) else: w.send(msg) for e in es: print 'except:',s.getpeername() if e in rlist: rlist.remove(e) if e in wlist: wlist.remove(e) s.close del msg_que[e]

客户端:

#!/usr/bin/python#coding=utf-8import socket,sys,timehost = (sys.argv[1],int(sys.argv[2]))sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)sock.connect(host)data = 1while data: data = raw_input('please input:') sock.send(data)

运行结果:

linux@linux:~$ python test18.py "" 8880connected by 44980fdsconnected by 44982afdsafds

poll

poll与select类似,其注册时需要带有一个eventmask,eventmask是一些按位或标记.

POLLIN: 用于读取数据 POLLPRI: 用于读取紧急数据 POLLOUT: 准备写入 POLLERR: 错误情况 POLLHUP: 保持状态 POLLNVAL: 无效请求

下面是一个多路文件复制例子:

#! /usr/bin/python# -*- coding: utf-8 -*-import selectblksize = 8192def read_write(fromfd, tofd): readbuf = fromfd.read(blksize) if readbuf: tofd.write(readbuf) tofd.flush() return len(readbuf)def copy_poll(fromfd1, tofd1, fromfd2, tofd2): # 定义只读事件 READ_ONLY = (select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR ) totalbytes = 0 if not (fromfd1 or fromfd2 or tofd1 or tofd2): return 0 fd_dict = {fromfd1.fileno():fromfd1, fromfd2.fileno():fromfd2} p = select.poll() # 注册文件描述符 p.register(fromfd1, READ_ONLY) p.register(fromfd2, READ_ONLY) while True: # 轮询已注册事件是否准备好 result = p.poll() if len(result) != 0: for fd, events in result: if fd_dict[fd] is fromfd1: if events & (select.POLLIN | select.POLLPRI): bytesread = read_write(fromfd1, tofd1) totalbytes += bytesread elif events & (select.POLLERR): p.unregister(fd_dict[fd]) if fd_dict[fd] is fromfd2: if events & (select.POLLIN | select.POLLPRI): bytesread = read_write(fromfd2, tofd2) totalbytes += bytesread elif events & (select.POLLERR): p.unregister(fd_dict[fd]) if bytesread <= 0: break return totalbytesdef main(): fromfd1 = open('/home/linux/nginx-1.11.7.tar.gz', 'r') fromfd2 = open('/home/linux/wow.tar.gz', 'r') tofd1 = open('/home/linux/blogtest1', 'a') tofd2 = open('/home/linux/blogtest2', 'a') totalbytes = copy_poll(fromfd1, tofd1, fromfd2, tofd2) print "already copy %d" % totalbytes return 0if __name__ == '__main__': main()
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表