| 1234567891011121314151617181920212223 | /************************************************************************* > File Name: command_define.h > Author: cjj > Created Time: 2013年10月27日 星期日 00时57分04秒 ************************************************************************/#ifndef COMMAND_DEFINE_H#define COMMAND_DEFINE_H#include <stddef.h> constint MAX_LISTEN = 3000;constint SERVER_LISTEN_PORT = 8768;constint MAX_DATA_LEN = 4096; typedefstruct{ unsignedcharbuf[MAX_DATA_LEN]; size_tnLen; intsocket;}ServerData, *pServerData; typedefvoid (*pCallBack)(constchar * szBuf, size_tnLen, intsocket); #endif |
| 12345678910111213141516171819202122232425262728293031323334353637383940 | /************************************************************************* > File Name: TcpServer.h > Author: cjj > Created Time: 2013年10月27日 星期日 01时01分23秒 ************************************************************************/#ifndef TCP_SERVER_H#define TCP_SERVER_H #include "command_define.h"#include <set>#include <list>#include <sys/select.h>#include <pthread.h> classTcpServer{public: TcpServer(); virtual~TcpServer(); boolInitialize(unsigned intnPort, unsigned longrecvFunc); boolSendData(constunsigned char* szBuf, size_tnLen, intsocket); boolUnInitialize(); PRivate: staticvoid * AcceptThread(void* pParam); staticvoid * OperatorThread(void* pParam); staticvoid * ManageThread(void* pParam);private: intm_server_socket; fd_set m_fdReads; pthread_mutex_t m_mutex; pCallBack m_operaFunc; //int m_client_socket[MAX_LISTEN]; std::set<int> m_client_socket; std::list<ServerData> m_data; pthread_t m_pidAccept; pthread_t m_pidManage;}; #endif |
| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 | /************************************************************************* > File Name: TcpServer.cpp > Author: cjj > Created Time: 2013年10月27日 星期日 01时29分54秒 ************************************************************************/#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include "TcpServer.h"#include <stdio.h>#include <unistd.h>#include <string.h> TcpServer::TcpServer(){ pthread_mutex_init(&m_mutex, NULL); FD_ZERO(&m_fdReads); m_client_socket.clear(); m_data.clear(); m_operaFunc = 0; m_pidAccept = 0; m_pidManage = 0;} TcpServer::~TcpServer(){ FD_ZERO(&m_fdReads); m_client_socket.clear(); m_data.clear(); m_operaFunc = NULL; pthread_mutex_destroy(&m_mutex);} boolTcpServer::Initialize(unsigned intnPort, unsigned longrecvFunc){ if(0 != recvFunc) { //设置回调函数 m_operaFunc = (pCallBack)recvFunc; } //先反初始化 UnInitialize(); //创建socket m_server_socket = socket(AF_INET, SOCK_STREAM, 0); if(-1 == m_server_socket) { printf("socket error:%m/n"); returnfalse; } //绑定ip和端口 sockaddr_in serverAddr = {0}; serverAddr.sin_family = AF_INET; serverAddr.sin_port = htons(nPort); serverAddr.sin_addr.s_addr = htonl(INADDR_ANY); intres = bind(m_server_socket, (sockaddr*)&serverAddr, sizeof(serverAddr)); if(-1 == res) { printf("bind error:%m/n"); returnfalse; } //监听 res = listen(m_server_socket, MAX_LISTEN); if(-1 == res) { printf("listen error:%m/n"); returnfalse; } //创建线程接收socket连接 if(0 != pthread_create(&m_pidAccept, NULL, AcceptThread, this)) { printf("create accept thread failed/n"); returnfalse; } //创建管理线程 if(0 != pthread_create(&m_pidManage, NULL, ManageThread, this)) { printf("create manage thread failed/n"); returnfalse; } returntrue;} //接收socket连接线程void* TcpServer::AcceptThread(void* pParam){ if(!pParam) { printf("param is null/n"); return0; } TcpServer * pThis = (TcpServer*)pParam; intnMax_fd = 0; inti = 0; while(1) { FD_ZERO(&pThis->m_fdReads); //把服务器监听的socket添加到监听的文件描述符集合 FD_SET(pThis->m_server_socket, &pThis->m_fdReads); //设置监听的最大文件描述符 nMax_fd = nMax_fd > pThis->m_server_socket ? nMax_fd : pThis->m_server_socket; std::set<int>::iterator iter = pThis->m_client_socket.begin(); //把客户端对应的socket添加到监听的文件描述符集合 for(; iter != pThis->m_client_socket.end(); ++iter) { FD_SET(*iter, &pThis->m_fdReads); } //判断最大的文件描述符 if(!pThis->m_client_socket.empty()) { --iter; nMax_fd = nMax_fd > (*iter) ? nMax_fd : (*iter); } //调用select监听所有文件描述符 intres = select(nMax_fd + 1, &pThis->m_fdReads, 0, 0, NULL); if(-1 == res) { printf("select error:%m/n"); continue; } printf("select success/n"); //判断服务器socket是否可读 if(FD_ISSET(pThis->m_server_socket, &pThis->m_fdReads)) { //接收新的连接 intfd = accept(pThis->m_server_socket, 0,0); if(-1 == fd) { printf("accept error:%m/n"); continue; } //添加新连接的客户 pThis->m_client_socket.insert(fd); printf("连接成功/n"); } for(iter = pThis->m_client_socket.begin(); iter != pThis->m_client_socket.end(); ++iter) { //判断客户是否可读 if(-1 != *iter && FD_ISSET(*iter, &pThis->m_fdReads)) { unsignedcharbuf[MAX_DATA_LEN] = {0}; res = recv(*iter, buf, sizeof(buf), 0); if(res > 0) { ServerData serverData = {0}; memcpy(serverData.buf, buf, res); serverData.nLen = res; serverData.socket = *iter; pthread_mutex_lock(&pThis->m_mutex); pThis->m_data.push_back(serverData); pthread_mutex_unlock(&pThis->m_mutex); } elseif(0 == res) { printf("客户端退出/n"); pThis->m_client_socket.erase(iter); } else { printf("recv error/n"); } } } }} //发送数据到指定的socketboolTcpServer::SendData(constunsigned char* buf, size_tlen, intsock){ if(NULL == buf) { returnfalse; } intres = send(sock, buf, len, 0); if(-1 == res) { printf("send error:%m/n"); returnfalse; } returntrue;} //管理线程,用于创建处理线程void* TcpServer::ManageThread(void* pParam){ if(!pParam) { return0; } pthread_t pid; TcpServer * pThis = (TcpServer *)pParam; while(1) { //使用互斥量 pthread_mutex_lock(&pThis->m_mutex); intnCount = pThis->m_data.size(); pthread_mutex_unlock(&pThis->m_mutex); if(nCount > 0) { pid = 0; //创建处理线程 if( 0 != pthread_create(&pid, NULL, OperatorThread, pParam)) { printf("creae operator thread failed/n"); } } //防止抢占CPU资源 usleep(100); }} //数据处理线程void* TcpServer::OperatorThread(void* pParam){ if(!pParam) { return0; } TcpServer * pThis = (TcpServer*)pParam; pthread_mutex_lock(&pThis->m_mutex); if(!pThis->m_data.empty()) { ServerData data = pThis->m_data.front(); pThis->m_data.pop_front(); if(pThis->m_operaFunc) { //把数据交给回调函数处理 pThis->m_operaFunc((char*)&data, sizeof(data), data.socket); } } pthread_mutex_unlock(&pThis->m_mutex);} boolTcpServer::UnInitialize(){ close(m_server_socket); for(std::set<int>::iterator iter = m_client_socket.begin(); iter != m_client_socket.end(); ++iter) { close(*iter); } m_client_socket.clear(); if(0 != m_pidAccept) { pthread_cancel(m_pidAccept); } if(0 != m_pidManage) { pthread_cancel(m_pidManage); }} |
| 12345678910111213141516171819202122232425262728293031323334353637 | /************************************************************************* > File Name: test.cpp > Author: cjj > Created Time: 2013年10月27日 星期日 02时23分51秒 ************************************************************************/#include "TcpServer.h"#include <stdio.h>#include <unistd.h> TcpServer tcpServer;voidprintMessage(constchar * buf, size_tnlen, intsock){ if(NULL == buf) { return; } ServerData * serverData = (ServerData*)buf; printf("Message:%s/n", serverData->buf); tcpServer.SendData(serverData->buf,sizeof(serverData->buf), serverData->socket);} intmain(){ if(false== tcpServer.Initialize(8768, (unsigned long)printMessage)) { printf("Initialize failed/n"); return-1; } printf("tcpserver:%d/n",sizeof(tcpServer)); while(1) { sleep(2); } return0;} |
新闻热点
疑难解答