首页 > 学院 > 开发设计 > 正文

epoll+线程池+信号量

2019-11-08 01:15:42
字体:
来源:转载
供稿:网友

epoll+线程池+信号量

该程序是对上一篇程序进行简单的修改。主要改动如下: 1.创建了多个线程来模拟线程池处理客户端的效果,上个程序是只有一个线程处理客户端。 2.如果用到多个线程处理客户端,那就需要将条件变量锁pthread_cond_t改成信号量sem_t sem。这样多个线程才能同时处理多个客户端事件。 3.另外封装了两个函数epoll_add(int epollfd, int fd, int events) 和 acceptAll(int server) 如图: 这里写图片描述

示例代码

#include <semaphore.h>#include <pthread.h>#include<arpa/inet.h>#include <stdio.h>#include <sys/socket.h>#include <sys/types.h>#include <netinet/in.h>#include <errno.h>#include <string.h>#include <stdlib.h>#include<sys/stat.h>#include<fcntl.h>#include <unistd.h>#include<sys/epoll.h>#include <list>using namespace std;void epoll_add(int epollfd, int fd, int events){ struct epoll_event ev; ev.data.fd = fd; ev.events = events; // 先尝试修改,如果修改失败,再进行add操作 if(epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev) == -1) epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);}// 所有有消息的socket文件描述符,由子线程对其进行处理list<int> socks;// 防止全局变量被多线程访问导致不一致的情况pthread_mutex_t mutex;// 用来等待条件成立// pthread_cond_t cond;sem_t sem;int epollfd;int quit = 0;void* thread_func(void* ptr){ while(1) { sem_wait(&sem); // 主线程叫我退出,并且我的任务处理完成了,就可以安全退出了 if(quit && socks.size() == 0) // 可以不加锁 break; pthread_mutex_lock(&mutex); // 获取一个有消息的socket文件描述符 int fd = *socks.begin(); socks.pop_front(); pthread_mutex_unlock(&mutex); // 保证数据都被读完了 while(1) { char buf[1024]; int ret = read(fd, buf, sizeof(buf)); if(ret > 0) { // 处理数据 PRintf("%s/n", buf); } else if(ret == 0) { close(fd); break; } else // <0 { if(errno == EAGAIN) // 数据已经被读完了,应该把oneshot属性去掉 { epoll_add(epollfd, fd, EPOLLIN|EPOLLONESHOT); } if(errno != EAGAIN) close(fd); break; } } }}int acceptAll(int server){ while(1) { int newfd = accept(server, NULL, NULL); if(newfd < 0 && errno == EAGAIN) break; set_nonblock(newfd); epoll_add(epollfd, newfd, EPOLLIN|EPOLLONESHOT); }}int main(){ // 创建服务器 int server = create_server(9988, "0.0.0.0", 250); epollfd = epoll_create(1024); // 创建epoll // 创建线程 pthread_mutex_init(&mutex, NULL); sem_init(&sem, 0, 0); pthread_t tid1; pthread_create(&tid1, NULL, thread_func, NULL); pthread_t tid2; pthread_create(&tid2, NULL, thread_func, NULL); pthread_t tid3; pthread_create(&tid3, NULL, thread_func, NULL); // 将服务器非阻塞,并且将其加入epoll集合 set_nonblock(server); // 非阻塞 epoll_add(epollfd, server, EPOLLIN); struct epoll_event ev_out[8]; while(1) { // 等待消息 int ret = epoll_wait(epollfd, ev_out, 8, 2000); if(ret > 0) { int i; for(i=0; i<ret; ++i) { int fd = ev_out[i].data.fd; if(fd == server) { acceptAll(fd); } else // 一个不是server的文件描符有消息 // 应该将它发送给线程去处理 { // 要不要加锁?肯定要的 pthread_mutex_lock(&mutex); socks.push_back(fd); pthread_mutex_unlock(&mutex); sem_post(&sem); } } } else if(ret == 0 || (ret < 0 && errno == EINTR)) { continue; } else { exit(1); } } quit = 1; sem_post(&sem); sem_post(&sem); sem_post(&sem); pthread_join(tid1, NULL); pthread_join(tid2, NULL); pthread_join(tid3, NULL); return 0;}
上一篇:PAT A1100. Mars Numbers (20)

下一篇:莫队

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表