首页 > 编程 > Java > 正文

Java Socket编程实例(三)- TCP服务端线程池

2019-11-26 14:11:37
字体:
来源:转载
供稿:网友

一、服务端回传服务类:

import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.util.logging.Level; import java.util.logging.Logger;  public class EchoProtocol implements Runnable {   private static final int BUFSIZE = 32; // Size (in bytes) of I/O buffer   private Socket clientSocket; // Socket connect to client   private Logger logger; // Server logger    public EchoProtocol(Socket clientSocket, Logger logger) {     this.clientSocket = clientSocket;     this.logger = logger;   }    public static void handleEchoClient(Socket clientSocket, Logger logger) {     try {       // Get the input and output I/O streams from socket       InputStream in = clientSocket.getInputStream();       OutputStream out = clientSocket.getOutputStream();        int recvMsgSize; // Size of received message       int totalBytesEchoed = 0; // Bytes received from client       byte[] echoBuffer = new byte[BUFSIZE]; // Receive Buffer       // Receive until client closes connection, indicated by -1       while ((recvMsgSize = in.read(echoBuffer)) != -1) {         out.write(echoBuffer, 0, recvMsgSize);         totalBytesEchoed += recvMsgSize;       }        logger.info("Client " + clientSocket.getRemoteSocketAddress() + ", echoed " + totalBytesEchoed + " bytes.");            } catch (IOException ex) {       logger.log(Level.WARNING, "Exception in echo protocol", ex);     } finally {       try {         clientSocket.close();       } catch (IOException e) {       }     }   }    public void run() {     handleEchoClient(this.clientSocket, this.logger);   } } 

二、每个客户端请求都新启一个线程的Tcp服务端:

import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.logging.Logger;  public class TCPEchoServerThread {    public static void main(String[] args) throws IOException {     // Create a server socket to accept client connection requests     ServerSocket servSock = new ServerSocket(5500);      Logger logger = Logger.getLogger("practical");      // Run forever, accepting and spawning a thread for each connection     while (true) {       Socket clntSock = servSock.accept(); // Block waiting for connection       // Spawn thread to handle new connection       Thread thread = new Thread(new EchoProtocol(clntSock, logger));       thread.start();       logger.info("Created and started Thread " + thread.getName());     }     /* NOT REACHED */   } } 

三、固定线程数的Tcp服务端:

import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.logging.Level; import java.util.logging.Logger;  public class TCPEchoServerPool {   public static void main(String[] args) throws IOException {     int threadPoolSize = 3; // Fixed ThreadPoolSize      final ServerSocket servSock = new ServerSocket(5500);     final Logger logger = Logger.getLogger("practical");      // Spawn a fixed number of threads to service clients     for (int i = 0; i < threadPoolSize; i++) {       Thread thread = new Thread() {         public void run() {           while (true) {             try {               Socket clntSock = servSock.accept(); // Wait for a connection               EchoProtocol.handleEchoClient(clntSock, logger); // Handle it             } catch (IOException ex) {               logger.log(Level.WARNING, "Client accept failed", ex);             }           }         }       };       thread.start();       logger.info("Created and started Thread = " + thread.getName());     }   } } 

四、使用线程池(使用Spring的线程次会有队列、最大线程数、最小线程数和超时时间的概念)

1.线程池工具类:

import java.util.concurrent.*;  /**  * 任务执行者  *  * @author Watson Xu  * @since 1.0.0 <p>2013-6-8 上午10:33:09</p>  */ public class ThreadPoolTaskExecutor {    private ThreadPoolTaskExecutor() {    }    private static ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {     int count;      /* 执行器会在需要自行任务而线程池中没有线程的时候来调用该程序。对于callable类型的调用通过封装以后转化为runnable */     public Thread newThread(Runnable r) {       count++;       Thread invokeThread = new Thread(r);       invokeThread.setName("Courser Thread-" + count);       invokeThread.setDaemon(false);// //????????????        return invokeThread;     }   });    public static void invoke(Runnable task, TimeUnit unit, long timeout) throws TimeoutException, RuntimeException {     invoke(task, null, unit, timeout);   }    public static <T> T invoke(Runnable task, T result, TimeUnit unit, long timeout) throws TimeoutException,       RuntimeException {     Future<T> future = executor.submit(task, result);     T t = null;     try {       t = future.get(timeout, unit);     } catch (TimeoutException e) {       throw new TimeoutException("Thread invoke timeout ...");     } catch (Exception e) {       throw new RuntimeException(e);     }     return t;   }    public static <T> T invoke(Callable<T> task, TimeUnit unit, long timeout) throws TimeoutException, RuntimeException {     // 这里将任务提交给执行器,任务已经启动,这里是异步的。     Future<T> future = executor.submit(task);     // System.out.println("Task aready in thread");     T t = null;     try {       /*        * 这里的操作是确认任务是否已经完成,有了这个操作以后        * 1)对invoke()的调用线程变成了等待任务完成状态        * 2)主线程可以接收子线程的处理结果        */       t = future.get(timeout, unit);     } catch (TimeoutException e) {       throw new TimeoutException("Thread invoke timeout ...");     } catch (Exception e) {       throw new RuntimeException(e);     }      return t;   } } 

2.具有伸缩性的Tcp服务端:

import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.TimeUnit; import java.util.logging.Logger;  import demo.callable.ThreadPoolTaskExecutor;   public class TCPEchoServerExecutor {    public static void main(String[] args) throws IOException {     // Create a server socket to accept client connection requests     ServerSocket servSock = new ServerSocket(5500);      Logger logger = Logger.getLogger("practical");          // Run forever, accepting and spawning threads to service each connection     while (true) {       Socket clntSock = servSock.accept(); // Block waiting for connection       //executorService.submit(new EchoProtocol(clntSock, logger));       try {         ThreadPoolTaskExecutor.invoke(new EchoProtocol(clntSock, logger), TimeUnit.SECONDS, 3);       } catch (Exception e) {       }        //service.execute(new TimelimitEchoProtocol(clntSock, logger));     }     /* NOT REACHED */   } } 

以上就是本文的全部内容,查看更多Java的语法,大家可以关注:《Thinking in Java 中文手册》、《JDK 1.7 参考手册官方英文版》、《JDK 1.6 API java 中文参考手册》、《JDK 1.5 API java 中文参考手册》,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表