首页 > 编程 > Java > 正文

深入Java NIO

2019-11-08 02:52:34
字体:
来源:转载
供稿:网友

在此之前如果你对nio一点都不了解,前先看完这篇文章

一、直接缓冲区与非直接缓冲区

字节缓冲区(ByteBuffer)要么是直接的,要么是非直接的。如果为直接字节缓冲区,则 java 虚拟机会尽最大努力直接在此缓冲区上执行本机 I/O 操作。也就是说,在每次调用基础操作系统的一个本机 I/O 操作之前(或之后),虚拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容)。显然,直接缓冲区的操作效率要比非直接缓冲区要高,特别对于大内容的操作。

直接字节缓冲区可以通过调用此类的 allocateDirect() 工厂方法来创建。将缓冲区建立在物理内存中,可以提高效率。此方法返回的缓冲区进行分配和取消分配所需成本通常高于非直接缓冲区。直接字节缓冲区还可以通过 FileChannel 的 map() 方法将文件区域直接映射到内存中来创建。该方法返回MappedByteBuffer 。

字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其 isDirect() 方法来确定。

直接缓冲区: 这里写图片描述

复制文件的例子:

//使用直接缓冲区完成文件复制(内存映射文件) try { FileChannel inChannel = FileChannel.open(Paths.get("mina.png"), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get("mina3.png"), StandardOpenOption.WRITE,StandardOpenOption.READ, StandardOpenOption.CREATE); //内存映射文件 MappedByteBuffer inMappedBuf = inChannel.map(FileChannel.MapMode.READ_ONLY,0, inChannel.size()); MappedByteBuffer outMappedBuf = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size()); byte[] bytes = new byte[inMappedBuf.limit()]; inMappedBuf.get(bytes); outMappedBuf.put(bytes); inChannel.close(); outChannel.close(); } catch (IOException e) { e.PRintStackTrace(); }//利用通道完成文件的复制 try { FileInputStream fis = new FileInputStream("mina.png"); FileOutputStream fos = new FileOutputStream("mina4.png"); //获取通道 FileChannel inChannel = fis.getChannel(); FileChannel outChannel = fos.getChannel(); ByteBuffer buf = ByteBuffer.allocate(1024); while (inChannel.read(buf) != -1) { buf.flip(); outChannel.write(buf); buf.clear(); } inChannel.close(); outChannel.close(); } catch (IOException e) { e.printStackTrace(); }

二、分散读取与聚集写入

分散读取( Scattering Reads)是指从 Channel 中读取的数据“分散” 到多个 Buffer 中。 聚集写入( Gathering Writes)是指将多个 Buffer 中的数据“聚集”到 Channel。 这里写图片描述 这里写图片描述 注意:读取的时候,按照缓冲区的顺序,从 Channel 中读取的数据依次将 Buffer 填满。写入时,按照缓冲区的顺序,写入 position 和 limit 之间的数据到 Channel 。

示例代码:

try { RandomaccessFile raf = new RandomAccessFile("test.txt", "rw"); FileChannel fileChannel = raf.getChannel(); ByteBuffer buf1 = ByteBuffer.allocate((int) (fileChannel.size() / 2)); ByteBuffer buf2 = ByteBuffer.allocate((int) (fileChannel.size() / 2) + 1); ByteBuffer[] bufs = {buf1, buf2}; //分散读取 fileChannel.read(bufs); for (ByteBuffer buf : bufs) { buf.flip(); System.out.println(new String(buf.array(), 0, buf.limit())); buf.clear(); } fileChannel.close(); RandomAccessFile raf2 = new RandomAccessFile("test2.txt", "rw"); FileChannel outChannel = raf2.getChannel(); //聚集写入 outChannel.write(bufs); outChannel.close(); } catch (IOException e) { e.printStackTrace(); }

阻塞IO与非阻塞IO

NIO的另一个称呼是 non-blocking IO。接下来看下如何实现非阻塞IO。在此之间我先实现一个阻塞IO的示例。

阻塞的IO:实现一个客户端向服务端传输一个图片功能。

client端:

try { SocketChannel inChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8989)); FileChannel fileChannel = FileChannel.open(Paths.get("mina.png"), StandardOpenOption.READ); ByteBuffer buf = ByteBuffer.allocate(1024); while (fileChannel.read(buf) != -1) { buf.flip(); inChannel.write(buf); buf.clear(); } fileChannel.close(); inChannel.close(); } catch (IOException e) { e.printStackTrace(); }

server端:

try { ServerSocketChannel outChannel = ServerSocketChannel.open(); FileChannel fileChannel = FileChannel.open(Paths.get("mina2.png"),StandardOpenOption.WRITE, StandardOpenOption.CREATE); outChannel.bind(new InetSocketAddress(8989)); //获取客户端连接通道,会发生阻塞 SocketChannel socketChannel = outChannel.accept(); ByteBuffer buf = ByteBuffer.allocate(1024); while (socketChannel.read(buf) != -1) { buf.flip(); fileChannel.write(buf); buf.clear(); } socketChannel.close(); fileChannel.close(); outChannel.close(); } catch (IOException e) { e.printStackTrace(); }

非阻塞IO:向服务端发送字符串。

client端:

try { SocketChannel inChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8989)); inChannel.configureBlocking(false); ByteBuffer buf = ByteBuffer.allocate(1024); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String str = scanner.nextLine(); buf.put((LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)+"/n"+str).getBytes()); buf.flip(); inChannel.write(buf); buf.clear(); } inChannel.close(); } catch (IOException e) { e.printStackTrace(); }

server端:

try { ServerSocketChannel ssChannel = ServerSocketChannel.open(); ssChannel.configureBlocking(false); ssChannel.bind(new InetSocketAddress(8989)); //获取选择器 Selector selector = Selector.open(); ssChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { //获取选择器中已就绪的监听事件 Set<SelectionKey> set= selector.selectedKeys(); Iterator<SelectionKey> iterator = set.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); if (selectionKey.isAcceptable()) { SocketChannel socketChannel = ssChannel.accept(); socketChannel.configureBlocking(false); //读取通道数据,监控读就绪状态 socketChannel.register(selector, SelectionKey.OP_READ); }else if (selectionKey.isReadable()) { //获取选择器上准备就绪的通道 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer buf = ByteBuffer.allocate(1024); while (socketChannel.read(buf) > 0) { buf.flip(); System.out.println(new String(buf.array())); buf.clear(); } } //取消选择键SelectionKey iterator.remove(); } } } catch (IOException e) { e.printStackTrace(); }

关于第30行read方法返回值大于0的判断要说明的是如果判断改为不等于-1,客户端发生一条字符串,服务端会不断收到该字符串。其中这时read方法返回的一直是0,这里对read方法加以说明:

1、read什么时候返回-1 read返回-1说明客户端的数据发送完毕,并且主动的close socket。所以在这种场景下,你需要关闭socketChannel并且取消key,最好是退出当前函数。

2、read什么时候返回0 其实read返回0有3种情况,一是某一时刻socketChannel中当前(注意是当前)没有数据可以读,这时会返回0,其次是bytebuffer的position等于limit了,即bytebuffer的remaining等于0,这个时候也会返回0,最后一种情况就是客户端的数据发送完毕了,这个时候客户端想获取服务端的反馈调用了recv函数,若服务端继续read,这个时候就会返回0。

四、管道Pipe

Java NIO 管道是2个线程之间的单向数据连接。Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。 这里写图片描述

示例代码:

public void testPipe() { Thread thread1 = new Thread(() -> { try { pipe = Pipe.open(); Pipe.SinkChannel sinkChannel = pipe.sink(); ByteBuffer buf = ByteBuffer.allocate(1024); buf.clear(); buf.put("hello pip".getBytes()); buf.flip(); while (buf.hasRemaining()) { sinkChannel.write(buf); } } catch (IOException e) { e.printStackTrace(); } }); Thread thread2 = new Thread(() -> { try { thread1.join(); } catch (InterruptedException e) { e.printStackTrace(); } Pipe.SourceChannel sourceChannel = pipe.source(); ByteBuffer buf = ByteBuffer.allocate(1024);// buf.flip(); try { int len = sourceChannel.read(buf); System.out.println(new String(buf.array(), 0, len)); sourceChannel.close(); } catch (IOException e) { e.printStackTrace(); } }); thread1.start(); thread2.start(); }
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表