首页 > 编程 > Java > 正文

Java NIO服务器端开发详解

2019-11-26 10:43:46
字体:
来源:转载
供稿:网友

一、NIO类库简介

  1、缓冲区Buffer

  Buffer是一个对象,包含一些要写入和读出的数据。

  在NIO中,所有的数据都是用缓冲区处理的,读取数据时,它是从通道(Channel)直接读到缓冲区中,在写入数据时,也是从缓冲区写入到通道。

  缓冲区实质上是一个数组,通常是一个字节数组(ByteBuffer),也可以是其它类型的数组,此外缓冲区还提供了对数据的结构化访问以及维护读写位置等信息。

  Buffer类的继承关系如下图所示:

2、通道Channel

  Channel是一个通道,网络数据通过Channel读取和写入。通道和流的不同之处在于通道是双向的(通道可以用于读、写后者二者同时进行),流只是在一个方向上移动。

  Channel大体上可以分为两类:用于网络读写的SelectableChannel(ServerSocketChannel和SocketChannel就是其子类)、用于文件操作的FileChannel。

  下面的例子给出通过FileChannel来向文件中写入数据、从文件中读取数据,将文件数据拷贝到另一个文件中:

public class NioTest{  public static void main(String[] args) throws IOException  {    copyFile();  }  //拷贝文件  private static void copyFile()  {    FileInputStream in=null;    FileOutputStream out=null;    try    {      in=new FileInputStream("src/main/java/data/in-data.txt");      out=new FileOutputStream("src/main/java/data/out-data.txt");      FileChannel inChannel=in.getChannel();      FileChannel outChannel=out.getChannel();      ByteBuffer buffer=ByteBuffer.allocate(1024);      int bytesRead = inChannel.read(buffer);      while (bytesRead!=-1)      {        buffer.flip();        outChannel.write(buffer);        buffer.clear();        bytesRead = inChannel.read(buffer);      }    }    catch (FileNotFoundException e)    {      // TODO Auto-generated catch block      e.printStackTrace();    } catch (IOException e)    {      // TODO Auto-generated catch block      e.printStackTrace();    }    }  //写文件  private static void writeFileNio()  {    try    {      RandomAccessFile fout = new RandomAccessFile("src/main/java/data/nio-data.txt", "rw");      FileChannel fc=fout.getChannel();      ByteBuffer buffer=ByteBuffer.allocate(1024);      buffer.put("hi123".getBytes());      buffer.flip();      try      {        fc.write(buffer);      } catch (IOException e)      {        // TODO Auto-generated catch block        e.printStackTrace();      }    }     catch (FileNotFoundException e)    {      // TODO Auto-generated catch block      e.printStackTrace();    }  }  //读文件  private static void readFileNio()  {    FileInputStream fileInputStream;    try    {      fileInputStream = new FileInputStream("src/main/java/data/nio-data.txt");      FileChannel fileChannel=fileInputStream.getChannel();//从 FileInputStream 获取通道      ByteBuffer byteBuffer=ByteBuffer.allocate(1024);//创建缓冲区      int bytesRead=fileChannel.read(byteBuffer);//将数据读到缓冲区      while(bytesRead!=-1)      {        /*limit=position         * position=0;         */        byteBuffer.flip();        //hasRemaining():告知在当前位置和限制之间是否有元素        while (byteBuffer.hasRemaining())        {          System.out.print((char) byteBuffer.get());        }        /*         * 清空缓冲区         * position=0;         * limit=capacity;         */        byteBuffer.clear();        bytesRead = fileChannel.read(byteBuffer);      }    } catch (FileNotFoundException e)    {      // TODO Auto-generated catch block      e.printStackTrace();    } catch (IOException e)    {      // TODO Auto-generated catch block      e.printStackTrace();    }  }}

3、多路复用器Selector

  多路复用器提供选择已经就绪的任务的能力。Selector会不断的轮询注册在其上的Channel,如果某个Channel上面发送读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。

  一个多路复用器Selector可以同时轮询多个Channel,由于JDK使用了epoll代替了传统的select实现,所以它没有最大连接句柄1024/2048的限制,意味着只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。其模型如下图所示:

用单线程处理一个Selector。要使用Selector,得向Selector注册Channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收等。

  注:

  1、什么select模型?

  select是事件触发机制,当等待的事件发生就触发进行处理,多用于Linux实现的服务器对客户端的处理。

  可以阻塞地同时探测一组支持非阻塞的IO设备,是否有事件发生(如可读、可写,有高优先级错误输出等),直至某一个设备触发了事件或者超过了指定的等待时间。也就是它们的职责不是做IO,而是帮助调用者寻找当前就绪的设备。

  2、什么是epoll模型?

  epoll的设计思路,是把select/poll单个的操作拆分为1个epoll_create+多个epoll_ctrl+一个wait。此外,内核针对epoll操作添加了一个文件系统”eventpollfs”,每一个或者多个要监视的文件描述符都有一个对应的eventpollfs文件系统的inode节点,主要信息保存在eventpoll结构体中。而被监视的文件的重要信息则保存在epitem结构体中。所以他们是一对多的关系。

二、NIO服务器端开发

  功能说明:开启服务器端,对每一个接入的客户端都向其发送hello字符串。

  使用NIO进行服务器端开发主要有以下几个步骤:

  1、创建ServerSocketChannel,配置它为非阻塞模式

  serverSocketChannel = ServerSocketChannel.open();  serverSocketChannel.configureBlocking(false);

  2、绑定监听,配置TCP参数,如backlog大小

  serverSocketChannel.socket().bind(new InetSocketAddress(8080));

  3、创建一个独立的I/O线程,用于轮询多路复用器Selector

  4、创建Selector,将之前创建的ServerSocketChannel注册到Selector上,监听SelectionKey.ACCEPT

  selector=Selector.open();  serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

  5、启动I/O线程,在循环体内执行Selector.select()方法,轮询就绪的Channel

while(true)  {    try    {      //select()阻塞到至少有一个通道在你注册的事件上就绪了      //如果没有准备好的channel,就在这一直阻塞      //select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。      selector.select();    }     catch (IOException e)    {      // TODO Auto-generated catch block      e.printStackTrace();      break;     } }

  6、当轮询到了处于就绪状态的Channel时,需对其进行判断,如果是OP_ACCEPT状态,说明是新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端

//返回已经就绪的SelectionKey,然后迭代执行      Set<SelectionKey> readKeys=selector.selectedKeys();      for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();)      {        SelectionKey key=it.next();        it.remove();        try        {          if(key.isAcceptable())          {            ServerSocketChannel server=(ServerSocketChannel) key.channel();            SocketChannel client=server.accept();            client.configureBlocking(false);            client.register(selector,SelectionKey.OP_WRITE);          }          else if(key.isWritable())          {            SocketChannel client=(SocketChannel) key.channel();            ByteBuffer buffer=ByteBuffer.allocate(20);            String str="hello";            buffer=ByteBuffer.wrap(str.getBytes());            client.write(buffer);            key.cancel();          }         }catch(IOException e)        {          e.printStackTrace();          key.cancel();          try          {            key.channel().close();          } catch (IOException e1)          {            // TODO Auto-generated catch block            e1.printStackTrace();          }                  }      }

 7、设置新接入的客户端链路SocketChannel为非阻塞模式,配置其他的一些TCP参数

if(key.isAcceptable())  {    ServerSocketChannel server=(ServerSocketChannel) key.channel();    SocketChannel client=server.accept();    client.configureBlocking(false);    ...  }

  8、将SocketChannel注册到Selector,监听OP_WRITE

  client.register(selector,SelectionKey.OP_WRITE);

  9、如果轮询的Channel为OP_WRITE,则说明要向SockChannel中写入数据,则构造ByteBuffer对象,写入数据包

else if(key.isWritable())  {    SocketChannel client=(SocketChannel) key.channel();    ByteBuffer buffer=ByteBuffer.allocate(20);    String str="hello";    buffer=ByteBuffer.wrap(str.getBytes());    client.write(buffer);    key.cancel();  }

 完整代码如下:

import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class ServerSocketChannelDemo{	public static void main(String[] args)	  {		ServerSocketChannel serverSocketChannel;		Selector selector=null;		try		    {			serverSocketChannel = ServerSocketChannel.open();			serverSocketChannel.configureBlocking(false);			serverSocketChannel.socket().bind(new InetSocketAddress(8080));			selector=Selector.open();			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);		}		catch (IOException e)		    {			// TODO Auto-generated catch block			e.printStackTrace();		}		while(true)		    {			try			      {				//select()阻塞到至少有一个通道在你注册的事件上就绪了				//如果没有准备好的channel,就在这一直阻塞				//select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。				selector.select();			}			catch (IOException e)			      {				// TODO Auto-generated catch block				e.printStackTrace();				break;			}			//返回已经就绪的SelectionKey,然后迭代执行			Set<SelectionKey> readKeys=selector.selectedKeys();			for (Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();)			      {				SelectionKey key=it.next();				it.remove();				try				        {					if(key.isAcceptable())					          {						ServerSocketChannel server=(ServerSocketChannel) key.channel();						SocketChannel client=server.accept();						client.configureBlocking(false);						client.register(selector,SelectionKey.OP_WRITE);					} else if(key.isWritable())					          {						SocketChannel client=(SocketChannel) key.channel();						ByteBuffer buffer=ByteBuffer.allocate(20);						String str="hello";						buffer=ByteBuffer.wrap(str.getBytes());						client.write(buffer);						key.cancel();					}				}				catch(IOException e)				        {					e.printStackTrace();					key.cancel();					try					          {						key.channel().close();					}					catch (IOException e1)					          {						// TODO Auto-generated catch block						e1.printStackTrace();					}				}			}		}	}}

我们用telnet localhost 8080模拟出多个客户端:

程序运行结果如下:

总结

以上就是本文关于Java NIO服务器端开发详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表