首页 > 学院 > 开发设计 > 正文

epoll EPOLLONESHOT,非阻塞,子线程处理客户端事件

2019-11-08 01:17:31
字体:
来源:转载
供稿:网友

epoll EPOLLONESHOT,非阻塞,子线程处理客户端事件

非阻塞

(1)默认情况下,服务器在accept等待客户端连接时,会发生阻塞。直到客户端发生连接才不阻塞。当客户端同时有多个连接请求的时候,想通过**一次**epoll_wait的返回,就循环处理到所有的连接请求。这样会导致循环处理完所有连接请求后,会阻塞在accept,无法运行到epoll_wait继续进行监听。所以需要将服务器描述符lfd设置非阻塞。通过EAGAIN错误表明所有连接事件已经处理完了就退出循环,进行下次监听。 (2)同样在默认情况下,read函数在读完所有数据同样也会阻塞等待数据发送过来。如果想通过一次监听事件就处理完所有数据,但又希望在读完数据后不会阻塞。就需要将对应的客户端描述符设置为非阻塞。同样通过EAGAIN进行判断。

EPOLLONESHOT

ev.events=EPOLLIN|EPOLLONESHOT 如果将监听的事件设置为EPOLLONESHOT,表明该ev对应的文件描述符事件只能被epoll_wait监听到一次。如果需要再次被监听到需要重新进行注册修改: epoll_ctl(efd,EPOLL_CTL_MOD,fd,&ev)。 问题提出:为什么要有这个参数呢? 已如下代码为例,如下代码采用的是主线程负责监听事件的发生。而子线程进行客户端事件的处理。假如子线程在处理一个客户端时,该客户端据没办法短时间内处理完。那么主线程会一直监听到该客户端有读事件发生。这样epoll_wait会一直返回该客户端事件,并且将其重复放入消息描述符list中。影响效率。如果设置了EPOLLONESHOT,就会只监听到该客户端事件一次,等待将该客户端数据全部处理完后,再通过epoll_ctl(efd,EPOLL_CTL_MOD,fd,&ev)。重新注册,就可以进行下次监听。

补充:ev.events=EPOLLIN|EPOLLET 边沿触发也可以处理同样的问题,如此设置。当客户端每次发生一次读事件时,**才会触发一次**epoll_wait(),不考虑有没有读完所有数据。避免了客户端只要有数据,会一直触发epoll_wait的问题。

子线程处理客户端事件

如果只有一个进程负责客户端的连接和客户端数据的处理,这样会影响服务器的效率。在处理客户端数据时,不能处理客户端的连接。反之,一样。 所以,如下代码,主线程负责监听客户端连接,子线程负责处理客户端数据。这样服务器就能高效的进行工作。 主线程将需要处理的客户端描述符放入list sock消息队列,子线程从中取出进行事件处理。多线程之间操作全局消息队列。要注意线程间同步问题,进行加锁。 在处理完一个客户端所有数据后,如果该客户端还没退出,注意需要再重新挂载一下。这样才能对该客户端进行再一次监听

示例代码

#include<stdio.h>#include<sys/epoll.h>#include<netinet/in.h>#include<strings.h>#include<sys/types.h>#include<sys/socket.h>#include<unistd.h>#include<stdlib.h>#include<iostream>#include<arpa/inet.h>#include<sys/stat.h>#include<fcntl.h>#include<errno.h>#include<list>using namespace std;list<int> socks;//所有有消息的socket文件描述符,由子线程进行处理pthread_mutex_t mutex;pthread_cond_t cond;int quit;//线程退出的标记 1退出int efd;void setNonblock(int fd){//设置文件描述符非阻塞 int flags=fcntl(fd,F_GETFL); flags|=O_NONBLOCK; fcntl(fd,F_SETFL,flags);}void *thread_func(void *ptr){ while(1){//条件信号为不可靠信号,加上循环,一个信号可能需要处理多个客户端事件 //条件变量阻塞,有信号来表示要读客户端了。 pthread_mutex_lock(&mutex); pthread_cond_wait(&cond,&mutex); pthread_mutex_unlock(&mutex); if(quit&&socks.size()==0)//主线程要求退出,并且任务处理都已经完成了 break; while(1){//一个循环处理一个客户端事件 if(socks.size()==0) //没有事件可以处理 break; pthread_mutex_lock(&mutex); int fd=*socks.begin(); socks.pop_front(); pthread_mutex_unlock(&mutex); while(1){//循环客户端的所有数据 char buf[1024]; int n=read(fd,buf,sizeof(buf)); if(n>0){ PRintf("%s/n",buf); }else if(n==0){//数据读完,并且对方客户端已经断开连接。 close(fd); break; }else{ if(errno==EAGAIN){//非阻塞状态下,数据读完了。 struct epoll_event ev; ev.events=EPOLLIN|EPOLLONESHOT; ev.data.fd=fd; epoll_ctl(efd,EPOLL_CTL_MOD,fd,&ev);//重新挂载一下事件 break; } close(fd);//发生真正错误断开连接 break; } } } }}int main(int argc,char *argv[]){ int lfd,cfd; pthread_mutex_init(&mutex,NULL); pthread_cond_init(&cond,NULL); pthread_t tid; pthread_create(&tid,NULL,thread_func,NULL);//线程处理客户端事件 lfd=socket(AF_INET,SOCK_STREAM,0); if(lfd<0){ perror("socket"); return -1; } struct sockaddr_in servaddr; bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family=AF_INET; servaddr.sin_port=htons(6666); servaddr.sin_addr.s_addr=inet_addr("0.0.0.0"); bind(lfd,(struct sockaddr*)&servaddr,sizeof(servaddr)); listen(lfd,250); efd=epoll_create(1024); setNonblock(lfd);//设置监听为非阻塞 struct epoll_event ev; ev.events=EPOLLIN; ev.data.fd=lfd; int ret=epoll_ctl(efd,EPOLL_CTL_ADD,lfd,&ev); if(ret<0){ perror("epoll_ctl"); return -1; } struct epoll_event events[10];//存放监听到事件的集合 while(1){ ret=epoll_wait(efd,events,10,2000);//等待监听 int i; if(ret<=0){ if(ret==0||errno==EINTR)//监听被打断 continue; return -1; } for(i=0;i<ret;i++){//处理监听事件 int newfd=events[i].data.fd; if(newfd==lfd){//监听到服务器事件,处理完所有请求连接 while(1){ cfd=accept(lfd,NULL,NULL); if(cfd<0){ if(errno==EAGAIN) break; exit(1); } setNonblock(cfd); ev.data.fd=cfd; ev.events=EPOLLIN|EPOLLONESHOT; epoll_ctl(efd,EPOLL_CTL_ADD,cfd,&ev);//挂载客户端读事件 } } else{//监听到客户端读事件,发信号给线程 pthread_mutex_lock(&mutex); socks.push_back(newfd); pthread_mutex_unlock(&mutex); pthread_cond_signal(&cond); } } } quit=1; pthread_join(tid,NULL); return 0;}
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表