写在前面:
昨天在博客记录自己抽空写的一个Socket聊天程序的初始设计,那是这个程序的整体设计,为了完整性,今天把服务端的设计细化记录一下,首页贴出Socket聊天程序的服务端大体设计图,如下图:

功能说明:
服务端主要有两个操作,一是阻塞接收客户端的socket并做响应处理,二是检测客户端的心跳,如果客户端一段时间内没有发送心跳则移除该客户端,由Server创建ServerSocket,然后启动两个线程池去处理这两件事(newFixedThreadPool,newScheduledThreadPool),对应的处理类分别是SocketDispatcher、SocketSchedule,其中SocketDispatcher根据socket不同的请求分发给不同SocketHandler去处理,而SocketWrapper则是对socket加了一层外壳包装,用lastAliveTime记录socket最新的交互时间,SocketHolder存储当前跟服务端交互的socket集合。
具体实现:
[Server.java]
Server是服务端的入口,由Server的start()方法启动ServerSocket,然后阻塞接收客户端的请求,交由SocketDispatcher去分发,SocketDispatcher由newFixedThread类型的线程池启动,当连接数超过最大数据时将被队列处理,使用scheduleAtFixedRate启动SocketSchedule定时循环去监听客户端的心跳包,这两个类型都实现了Runnable接口,下面给出服务端的代码:
?| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 | package yaolin.chat.server; import java.io.IOException;import java.net.ServerSocket;import java.util.Date;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit; import yaolin.chat.common.ConstantValue;import yaolin.chat.util.LoggerUtil; /** * 服务器 * @author yaolin */public class Server { PRivatefinal ServerSocket server; privatefinal ExecutorService pool; publicServer() throwsIOException { server =new ServerSocket(ConstantValue.SERVER_PORT); pool = Executors.newFixedThreadPool(ConstantValue.MAX_POOL_SIZE); } publicvoid start() { try{ ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1); // Watch dog. Exception?? schedule.scheduleAtFixedRate(newSocketSchedule(), 10, ConstantValue.TIME_OUT, TimeUnit.SECONDS); while(true) { pool.execute(newSocketDispatcher(server.accept())); LoggerUtil.info("ACCEPT A CLIENT AT "+ new Date()); } }catch (IOException e) { pool.shutdown(); } } publicstatic void main(String[] args) { try{ newServer().start(); }catch (IOException e) { LoggerUtil.error("Server start failed! -> "+ e.getMessage(), e); } }} |
[SocketDispatcher.java]
Server只是服务端的入口,并指挥中心,SocketDispatcher才是服务端的指挥中心,对客户端不同的消息类型请求进行分发,让不同的SocketHandler去处理对应的消息请求,这里服务端和客户端的消息交互都是用JSON数据,所有消息类都继承BaseMessage,所以将接收到数据转换成BaseMessage类型,再判断其类型,(数据类型模块属于common模块),这里需要提一下的是当消息类型是文件类型的时候会睡眠配置执行的间隔时间,这样FileHandler才能有时间对文件流进行读取和重新发送给指定的客户端,而不会立即进入下一次循环对消息类型的判断(可能这里设计有点问题,不过暂时先这样做),下面给出SocketDispatcher的代码:
?| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 | /** * SocketDispatcher * * @author yaolin */public class SocketDispatcher implementsRunnable { privatefinal Socket socket; publicSocketDispatcher(Socket socket) { this.socket = socket; } @Override publicvoid run() { if(socket != null) { while(!socket.isClosed()) { try{ InputStream is = socket.getInputStream(); String line =null; StringBuffer sb =null; if(is.available() > 0) { BufferedReader bufr =new BufferedReader(newInputStreamReader(is)); sb =new StringBuffer(); while(is.available() > 0&& (line = bufr.readLine()) != null) { sb.append(line); } LoggerUtil.trach("RECEIVE ["+ sb.toString() + "] AT "+ new Date()); BaseMessage message = JSON.parSEObject(sb.toString(), BaseMessage.class); switch(message.getType()) { caseMessageType.ALIVE: HandlerFactory.getHandler(MessageType.ALIVE).handle(socket, sb.toString()); break; caseMessageType.CHAT: HandlerFactory.getHandler(MessageType.CHAT).handle(socket, sb.toString()); break; caseMessageType.FILE: HandlerFactory.getHandler(MessageType.FILE).handle(socket, sb.toString()); LoggerUtil.trach("SEVER:PAUSE TO RECEIVE FILE"); Thread.sleep(ConstantValue.MESSAGE_PERIOD); break; caseMessageType.LOGIN: HandlerFactory.getHandler(MessageType.LOGIN).handle(socket, sb.toString()); break; caseMessageType.LOGOUT: break; caseMessageType.REGISTER: HandlerFactory.getHandler(MessageType.REGISTER).handle(socket, sb.toString()); break; } }else { Thread.sleep(ConstantValue.MESSAGE_PERIOD); } }catch (Exception e) {// catch all handler exception LoggerUtil.error("SocketDispatcher Error!"+ e.getMessage(), e); } } } }} |
[SocketSchedule.java]
跟Server有直接关系的另一个类(组件)是SocketSchedule,SocketSchedule主要负责检测客户端的最新一次跟服务端的交互时间是否超过系统配置允许最大的时间,如果超过了,则将该客户端socket从服务端移除,否则更新客户端的最新一次跟服务端的交互时间。下面是具体的实现:
?| 1234567891011121314151617181920 | /** * Remove socket from SocketHolder if lastAliveTime > TIME_OUT * @author yaolin * */public class SocketSchedule implementsRunnable { @Override publicvoid run() { for(String key : SocketHolder.keySet()) { SocketWrapper wrapper = SocketHolder.get(key); if(wrapper != null&& wrapper.getLastAliveTime() != null) { if(((new Date().getTime() - wrapper.getLastAliveTime().getTime()) / 1000) > ConstantValue.TIME_OUT) { // remove socket if timeout SocketHolder.remove(key); } } } }} |
[SocketHolder.java、SocketWrapper.java]
从上面的代码可以看出,SocketSchedule#run()只是简单的对时间进行一次判断,真正有意义的其实是SocketHolder和SocketWrapper,SocketWrapper则是对socket加了一层外壳包装,SocketHolder的存储了当前有效时间内所有跟服务端有交互的客户端,SocketHolder以客户端的唯一标识(这里使用用户名),作为KEY,客户端所在的socket作为VALUE的键值对形式存储,其中SocketHolder#flushClientStatus()的处理逻辑是用于通知其他客户端当前客户端的上线/离线状态,下面给出这两个类的具体实现:
?| 12345678910111213141516171819202122232425262728 | /** * Wrap Socket, SocketSchedule remove socket if lastAliveTime > TIME_OUT * @author yaolin * */public class SocketWrapper { privateSocket socket; privateDate lastAliveTime; // full constructor publicSocketWrapper(Socket socket, Date lastAliveTime) { this.socket = socket; this.lastAliveTime = lastAliveTime; } publicSocket getSocket() { returnsocket; } publicvoid setSocket(Socket socket) { this.socket = socket; } publicDate getLastAliveTime() { returnlastAliveTime; } publicvoid setLastAliveTime(Date lastAliveTime) { this.lastAliveTime = lastAliveTime; }} |
| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 | /** * SocketHolder * @author yaolin */public class SocketHolder { privatestatic ConcurrentMap<String, SocketWrapper> listSocketWrap =new ConcurrentHashMap<String, SocketWrapper>(); publicstatic Set<String> keySet() { returnlistSocketWrap.keySet(); } publicstatic SocketWrapper get(String key) { returnlistSocketWrap.get(key); } publicstatic void put(String key, SocketWrapper value) { listSocketWrap.put(key, value); flushClientStatus(key,true); } publicstatic SocketWrapper remove(String key) { flushClientStatus(key,false); returnlistSocketWrap.remove(key); } publicstatic void clear() { listSocketWrap.clear(); } /** * <pre>content:{username:"",flag:false}</pre> * @param flag true:put,false:remove; */ privatestatic void flushClientStatus(String key, boolean flag) { ClientNotifyDTO dto =new ClientNotifyDTO(flag, key); ReturnMessage rm =new ReturnMessage().setKey(Key.NOTIFY).setSuccess(true).setContent(dto); rm.setFrom(ConstantValue.SERVER_NAME); for(String toKey : listSocketWrap.keySet()) { if(!toKey.equals(key)) { // not send to self rm.setTo(toKey); SocketWrapper wrap = listSocketWrap.get(toKey); if(wrap != null) { SendHelper.send(wrap.getSocket(), rm); } } } }} |
[SocketHandler.java、HandlerFactory.java、OtherHandlerImpl.java]
SocketDispatcher让不同的SocketHandler去处理对应的消息请求,SocketHandler的设计其实就是一套简单的工厂组件吧(其中ReturnHandler暂时由SendHelper实现信息传送,暂时没有用到,已经@Deprecated ,这里还是给出),完整类图如下:

下面给出这一块的代码,为了缩小篇幅,将所有Handler实现的代码收起来。
?| 12345678910 | /** * SocketHandler * @author yaolin */public interface SocketHandler { /** * Handle Client Socket */ publicObject handle(Socket client,Object data);} |
| 1234567891011121314151617181920212223242526272829 | /** * SocketHandlerFactory * @author yaolin */public class HandlerFactory { // can not create instance privateHandlerFactory(){} publicstatic SocketHandler getHandler(inttype) { switch(type) { caseMessageType.ALIVE: // usually use returnnew AliveHandler(); caseMessageType.CHAT: returnnew ChatHandler(); caseMessageType.LOGIN: returnnew LoginHandler();// case MessageType.RETURN:// return new ReturnHandler(); caseMessageType.LOGOUT: returnnew LogoutHandler(); caseMessageType.REGISTER: returnnew RegisterHandler(); caseMessageType.FILE: returnnew FileHandler(); } returnnull; // NullPointException }} |
| 12345678910111213141516171819202122232425 | /** * AliveSocketHandler * @author yaolin */public class AliveHandler implementsSocketHandler { /** * @return null */ @Override publicObject handle(Socket client, Object data) { if(data != null) { BaseMessage message = JSON.parseObject(data.toString(), BaseMessage.class); if(StringUtil.isNotEmpty(message.getFrom())) { SocketWrapper wrapper = SocketHolder.get(message.getFrom()); if(wrapper != null) { wrapper.setLastAliveTime(newDate()); // KEEP SOCKET ... SocketHolder.put(message.getFrom(), wrapper); } } } returnnull; } } |
| 1234567891011121314151617181920212223242526272829303132333435363738394041424344 | /** * ChatHandler * * @author yaolin */public class ChatHandler implementsSocketHandler { @Override publicObject handle(Socket client, Object data) { if(data != null) { ChatMessage message = JSON.parseObject(data.toString(), ChatMessage.class); if(StringUtil.isNotEmpty(message.getFrom()) && StringUtil.isNotEmpty(message.getTo())) { // exist & send if(SocketHolder.keySet().contains(message.getFrom())) { String owner = message.getFrom(); message.setOwner(owner);// owner will be display if(ConstantValue.TO_ALL.equals(message.getTo())) { // one-to-all // TO_ALL TAB will be select; message.setFrom(ConstantValue.TO_ALL); for(String key : SocketHolder.keySet()) { // also send to self SocketWrapper wrapper = SocketHolder.get(key); if(wrapper != null) { SendHelper.send(wrapper.getSocket(), message); } } }else {// one-to-one SocketWrapper wrapper = SocketHolder.get(message.getTo()); if(wrapper != null) { // owner = from SendHelper.send(wrapper.getSocket(), message); // also send to self // TO TAB will be select; message.setFrom(message.getTo()).setTo(owner); SendHelper.send(client, message); } } } } } returnnull; }} |
| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 | public class FileHandler implementsSocketHandler { @Override publicObject handle(Socket client, Object data) { if(client != null) { FileMessage message = JSON.parseObject(data.toString(), FileMessage.class); if(StringUtil.isNotEmpty(message.getFrom()) && StringUtil.isNotEmpty(message.getTo())) { // exist & send if(SocketHolder.keySet().contains(message.getFrom())) { if(!ConstantValue.TO_ALL.equals(message.getTo())) { // one-to-all SocketWrapper wrapper = SocketHolder.get(message.getTo()); if(wrapper != null) { SendHelper.send(wrapper.getSocket(), message); try{ if(client != null&& wrapper.getSocket() != null&& message.getSize() > 0) { InputStream is = client.getInputStream(); OutputStream os = wrapper.getSocket().getOutputStream(); inttotal = 0; while(!client.isClosed() && !wrapper.getSocket().isClosed()) { if(is.available() > 0) { byte[] buff =new byte[ConstantValue.BUFF_SIZE]; intlen = -1; while(is.available() > 0&& (len = is.read(buff)) != -1) { os.write(buff,0, len); total += len; LoggerUtil.debug("SEND BUFF ["+ len + "]"); } os.flush(); if(total >= message.getSize()) { LoggerUtil.info("SEND BUFF [OK]"); break; } } } // AFTER SEND FILE // SEND SUCCESSFULLY ReturnMessage result =new ReturnMessage().setKey(Key.Tip) .setSuccess(true) .setContent(I18N.INFO_FILE_SEND_SUCCESSFULLY); result.setFrom(message.getTo()).setTo(message.getFrom()) .setOwner(ConstantValue.SERVER_NAME); SendHelper.send(client, result); // RECEIVE SUCCESSFULLY result.setContent(I18N.INFO_FILE_RECEIVE_SUCCESSFULLY) .setFrom(message.getFrom()) .setTo(message.getTo()); SendHelper.send(wrapper.getSocket(), result); } }catch (Exception e) { LoggerUtil.error("Handle file failed !"+ e.getMessage(), e); } } } } } } returnnull; }} |
| 12345678910111213141516171819202122232425262728293031323334353637383940414243 | /** * LoginHandler * * @author yaolin * */public class LoginHandler implementsSocketHandler { privateUsrService usrService = newUsrService(); @Override publicObject handle(Socket client, Object data) { ReturnMessage result =new ReturnMessage(); result.setSuccess(false); if(data != null) { LoginMessage message = JSON.parseObject(data.toString(), LoginMessage.class); if(StringUtil.isNotEmpty(message.getUsername()) && StringUtil.isNotEmpty(message.getPassWord())) { if(usrService.login(message.getUsername(), message.getPassword()) !=null) { result.setSuccess(true); }else { result.setMessage(I18N.INFO_LOGIN_ERROR_DATA); } result.setFrom(ConstantValue.SERVER_NAME).setTo(message.getUsername()); }else { result.setMessage(I18N.INFO_LOGIN_EMPTY_DATA); } // AFTER LOGIN result.setKey(Key.LOGIN); if(result.isSuccess()) { // HOLD SOCKET SocketHolder.put(result.getTo(),new SocketWrapper(client,new Date())); } SendHelper.send(client, result); if(result.isSuccess()) { // SEND LIST USER ClientListUserDTO dto =new ClientListUserDTO(); dto.setListUser(SocketHolder.keySet()); result.setContent(dto).setKey(Key.LISTUSER); SendHelper.send(client, result); } } returnnull; } } |
| 1234567891011121314151617181920212223 | public class LogoutHandler implementsSocketHandler { @Override publicObject handle(Socket client, Object data) { if(data != null) { LogoutMessage message = JSON.parseObject(data.toString(), LogoutMessage.class); if(message != null&& StringUtil.isNotEmpty(message.getFrom())) { SocketWrapper wrapper = SocketHolder.get(message.getFrom()); Socket socket = wrapper.getSocket(); if(socket != null) { try{ socket.close(); socket =null; }catch (Exception ignore) { } } SocketHolder.remove(message.getFrom()); } } returnnull; } } |
| 123456789101112131415161718192021222324252627282930313233 | public class RegisterHandler implementsSocketHandler { privateUsrService usrService = newUsrService(); @Override publicObject handle(Socket client, Object data) { ReturnMessage result =new ReturnMessage(); result.setSuccess(false).setFrom(ConstantValue.SERVER_NAME); if(data != null) { RegisterMessage message = JSON.parseObject(data.toString(), RegisterMessage.class); if(StringUtil.isNotEmpty(message.getUsername()) && StringUtil.isNotEmpty(message.getPassword())) { if(usrService.register(message.getUsername(), message.getPassword()) !=null) { result.setSuccess(true).setContent(I18N.INFO_REGISTER_OK); }else { result.setMessage(I18N.INFO_REGISTER_CLIENT_EXIST); } }else { result.setMessage(I18N.INFO_REGISTER_EMPTY_DATA); } if(StringUtil.isNotEmpty(message.getUsername())) { result.setTo(message.getUsername()); } // AFTER REGISTER result.setKey(Key.REGISTER); SendHelper.send(client, result); } returnnull; } } |
| 1234567891011121314151617181920212223242526 | /** * Use SendHelper to send ReturnMessage, * @see yaolin.chat.server.SocketDispatcher#run() * @author yaolin */@Deprecatedpublic class ReturnHandler implementsSocketHandler { /** * @param data ReturnMessage */ @Override publicObject handle(Socket client, Object data) { if(data != null) { ReturnMessage message = (ReturnMessage) data; if(StringUtil.isNotEmpty(message.getFrom()) && StringUtil.isNotEmpty(message.getTo())) { SocketWrapper wrap = SocketHolder.get(message.getTo()); if(wrap != null) { SendHelper.send(wrap.getSocket(), message); } } } returnnull; } } |
用户业务:
服务端除了socket之外,还有一点点具体的业务,那就是用户的注册、登陆等,这里简单的列出Usr和UsrService这两个类,这些业务暂时没有怎么实现,我并不打算在这个程序中引入ORM框架,所以自己写一套DBUtil(待改善),在这里也一并贴出来。

这里只进行了简单的校验,没有持久化存储到DB中,下面是Usr和UsrService:
?| 123456789101112131415161718192021222324 | public class Usr { privatelong id; privateString username; privateString password; publiclong getId() { returnid; } publicvoid setId(longid) { this.id = id; } publicString getUsername() { returnusername; } publicvoid setUsername(String username) { this.username = username; } publicString getPassword() { returnpassword; } publicvoid setPassword(String password) { this.password = password; }} |
| 12345678910111213141516171819202122232425262728293031323334353637 | /** * // TODO * @see yaolin.chat.server.usr.repository.UsrRepository * @author yaolin * */public class UsrService { // TODO db privatestatic Map<String,Usr> db =new HashMap<String,Usr>(); publicUsr register(String username, String password) { if(StringUtil.isEmpty(username) || StringUtil.isEmpty(password)) { returnnull; } if(db.containsKey(username)) { returnnull; // exist; } Usr usr =new Usr(); usr.setUsername(username); usr.setPassword(md5Util.getMD5Code(password)); db.put(username, usr); returnusr; } publicUsr login(String username, String password) { if(StringUtil.isEmpty(username) || StringUtil.isEmpty(password)) { returnnull; } if(db.containsKey(username)) { Usr usr = db.get(username); if(MD5Util.getMD5Code(password).equals(usr.getPassword())) { returnusr; } } returnnull; }} |
下面是DBUtil工具:
?| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 | /** * DBUtils // TODO 有待调整&优化!! * @author yaolin */public class DBUtil { // make connection used repeatedly privatestatic final List<Connection> cache = new LinkedList<Connection>(); privatestatic String url; privatestatic String driver; privatestatic String user; privatestatic String password; privatestatic Boolean debug; static{ InputStream is = DBUtil.class.getResourceAsStream("/db.properties"); try{ Properties p =new Properties(); p.load(is); url = p.getProperty("url"); driver = p.getProperty("driver"); user = p.getProperty("user"); password = p.getProperty("password"); // just for debug try{ debug = Boolean.valueOf(p.getProperty("debug")); }catch (Exception ignore) { debug =false; } }catch (Exception e) { thrownew RuntimeException(e); }finally { if(is != null) { try{ is.close(); is =null; }catch (Exception ignore) { } } } } publicsynchronized staticConnection getConnection() { if(cache.isEmpty()) { cache.add(makeConnection()); } Connection conn =null; inti = 0; try{ do{ conn = cache.remove(i); }while (conn != null && conn.isClosed() && i < cache.size()); }catch (Exception ignore) { } try{ if(conn == null || conn.isClosed()) { cache.add(makeConnection()); conn = cache.remove(0); } returnconn; }catch (Exception e) { thrownew RuntimeException(e); } } publicsynchronized staticvoid close(Connection connection) { try{ if(connection != null&& !connection.isClosed()) { if(debug) debug("release connection!"); cache.add(connection); } }catch (SQLException ignore) { } } publicstatic Object query(String sql, ResultSetMapper mapper, Object... args) { if(debug) debug(sql); Connection conn = getConnection(); PreparedStatement ps =null; ResultSet rs =null; Object result =null; try{ ps = conn.prepareStatement(sql); inti = 1; for(Object object : args) { ps.setObject(i++, object); } rs = ps.executeQuery(); result = mapper.mapper(rs); }catch (Exception e) { thrownew RuntimeException(e); }finally { try{ if(rs != null) { rs.close(); rs =null; } if(ps != null) { ps.close(); ps =null; } }catch (Exception ignore) { } } close(conn); returnresult; } publicstatic int modify(String sql, Object... args) { if(debug) debug(sql); Connection conn = getConnection(); PreparedStatement ps =null; introw = 0; try{ ps = conn.prepareStatement(sql); inti = 1; for(Object object : args) { ps.setObject(i++, object); } row = ps.executeUpdate(); }catch (Exception e) { thrownew RuntimeException(e); }finally { try{ if(ps != null) { ps.close(); ps =null; } }catch (Exception ignore) { } } close(conn); returnrow; } publicstatic int[] batch(List<String> sqls) { if(debug) debug(sqls.toString()); Connection conn = getConnection(); Statement stmt =null; int[] row; try{ stmt = conn.createStatement(); for(String sql : sqls) { stmt.addBatch(sql); } row = stmt.executeBatch(); }catch (Exception e) { thrownew RuntimeException(e); }finally { try{ if(stmt != null) { stmt.close(); stmt =null; } }catch (Exception ignore) { } } close(conn); returnrow; } publicstatic int[] batch(String sql, PreparedStatementSetter setter) { if(debug) debug(sql); Connection conn = getConnection(); PreparedStatement ps =null; int[] row; try{ ps = conn.prepareStatement(sql); setter.setter(ps); row = ps.executeBatch(); }catch (Exception e) { thrownew RuntimeException(e); }finally { try{ if(ps != null) { ps.close(); ps =null; } }catch (Exception ignore) { } } close(conn); returnrow; } privatestatic Connection makeConnection() { try{ Class.forName(driver).newInstance(); Connection conn = DriverManager.getConnection(url, user, password); if(debug) debug("create connection!"); returnconn; }catch (Exception e) { thrownew RuntimeException(e); } } privatestatic void debug(String sqls) { SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(sdf.format(newDate()) +" DEBUG " + Thread.currentThread().getId() +" --- [" + Thread.currentThread().getName() +"] " + "excute sqls : "+ sqls); }} |
| 1234567 | /** * PreparedStatementSetter * @author yaolin */public interface PreparedStatementSetter { publicvoid setter(PreparedStatement ps);} |
| 1234567 | /** * ResultSetMapper * @author yaolin */public interface ResultSetMapper { publicObject mapper(ResultSet rs);} |
源码下载:demo
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。
新闻热点
疑难解答