首页 > 学院 > 开发设计 > 正文

[netty核心类]--Channel和Unsafe类

2019-11-08 03:01:07
字体:
来源:转载
供稿:网友

主要内容: (1)Channel 功能说明 (2)Unsafe 功能说明 (3)Channel的主要实现子类源码分析 (4)Unsafe主要实现子类源码分析

1.Channel功能说明

channel是netty网络IO操作抽象出来的一个接口,主要功能有:网络IO的读写,客户端发起连接、主动关闭连接,链路关闭,获取通信双方的网络地址等。下面分类进行介绍:

网络IO操作 (1)Channel read(); 该函数将从当前的 Channel 中读取数据到第一个inbound缓冲区中,如果读取成功,就会触发ChannelHandler.channelRead(ChannelHandlerContext,Object)事件,紧接着触发ChannelHandler.channelReadComplete(ChannelHandlerContext)事件。

(2)ChannelFuture write(Object msg); 该函数将当前msg通过ChannelPipeline写入到目标channel中。这里需要注意write()函数只把数据写入到了消息发送缓冲区中,只有当调用flush()函数的时候,才真正写入到Channel中被发送出去。这里我们可以直接调用writeAndFlush()函数实现这两步。

(3)EventLoop eventLoop(); a)Channel 需要被注册到EventLoop的多路复用器上面的,用于处理IO事件,通过eventLoop()函数我们可以获取到Channel所注册的EventLoop。

b)EventLoop本质上就是处理网络读写事件的Reactor线程,不仅用来处理网络事件,也可以用来执行定时任务和自定义的NioTask等任务。

(4)metadata() 在netty中,每个Channel就对应一个物理连接,每个连接都有自己的TCP参数配置,我们通过这个函数就可以获得TCP的参数配置信息。

(5)parent() 对于服务端Channel而言,它的父Channel为空,对于客户端Channel而言,他的父Channel就是创建它的ServerSocketChannel。

Channel的子类非常多,这里我选择最重要的两个子类来分析:用于服务端的通道NioServerSocketChannel和用于客户端的通道NioSocketChannel

2. Channel子类NioServerSocketChannel和NioSocketChannel

NioServerSocketChannel是用于netty服务端的通道,它的继承类图如下: 这里写图片描述

NioSocketChannel是用于客户端的Channel,它的继承图如下: 这里写图片描述

分析一下这两个类的继承图就可以知道,他们都有公共的类,只是接口继承的不同。它们都继承于AbstractChannel、AbstractNioChannel类。后面不同的是NioServerSocketChannel继承于AbstractNioMessageChannel,NioSocketChannel继承于AbstractNioByteChannel。

下面就一步步来分析一下这些核心类: AbstractChannel (1)成员变量分析 AbstractChannel采用聚合的方式封装各种功能。这里我列出一些能说明核心聚合功能的成员属性如下:

//当前channel的父类channelPRivate final Channel parent;//当前channel的唯一idprivate final ChannelId id;//Unsafe实例private final Unsafe unsafe;//当前Channel对应的DefaultChannelPipelineprivate final DefaultChannelPipeline pipeline;//本地和远端地址private volatile SocketAddress localAddress;private volatile SocketAddress remoteAddress;//当前Channel注册所在的EventLoopprivate volatile EventLoop eventLoop;//当前Channel是否已被注册private volatile boolean registered;

从成员变量的定义就可以看出,AbstractChannel聚合了所有Channel使用到的能力对象,由AbstractChannel来初始化和统一封装,如果某些功能和子类强关联,那就定义成抽象方法,继承类自己实现。

(2)核心API源码分析 我们知道netty的核心就是网路IO操作,所以我们也主要看一下网络IO相关API的实现。在前面我们说到,当Channel进行网络IO操作时会触发ChannelPipeline中对应的事件方法。(这里不得不再次强调,netty是基于事件驱动的,我们可以理解为Channel进行网络IO操作时会产生对应的IO事件,然后IO驱动事件在ChannelPipeline中传播,由对应的ChannelHandler对事件进行处理)

netty基于事件驱动的模型,可以轻松通过事件定义来划分事件拦截切面,方便业务的定制和功能扩展,相比AOP,其性能更加高。

下面给出一些核心的网络IO相关的函数的源码:

@Overridepublic ChannelFuture bind(SocketAddress localAddress) { return pipeline.bind(localAddress);}@Overridepublic ChannelFuture connect(SocketAddress remoteAddress) { return pipeline.connect(remoteAddress);}@Overridepublic Channel read() { pipeline.read(); return this;}@Overridepublic ChannelFuture write(Object msg) { return pipeline.write(msg);}@Overridepublic ChannelFuture write(Object msg, ChannelPromise promise) { return pipeline.write(msg, promise);}@Overridepublic ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg);}

这里只给出了一部分,稍微看一下我们就能看到共性:他们都是通过调用pipeline对象的方式实现的,这个pipeline对象是通过构造器实例化的,我们看看在构造其中实例化的代码 pipeline = newChannelPipeline(); 继续深入看newChannelPipeline()函数:

protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this);}

可知pipeline其实就是DefaultChannelPipeline类。

此外AbstractChannel也提供了一些公共API的实现,比如localAddress()和 remoteAddress()方法,实现源码如下:

public SocketAddress localAddress() { SocketAddress localAddress = this.localAddress; if (localAddress == null) { try { this.localAddress = localAddress = unsafe().localAddress(); } catch (Throwable t) { // Sometimes fails on a closed socket in Windows. return null; } } return localAddress;}

首先从缓存的成员变量中获取,当是第一次调用时就通过unfase对象的localAddress()获取。

AbstractNioChannel (1)成员变量 同样只列出核心的成员变量:

//private final SelectableChannel ch;protected final int readInterestOp;volatile SelectionKey selectionKey;

由于AbstractNioChannel是NioSocketChannel和NioServerSocketChannel需要共用,所以定义了一个java.nio.SocketChannel和java.nio.ServerSocketChannel的公共父类SelectableChannel,用于设置SelectableChannel参数和进行IO操作。

第二个参数是readInterestOp,其实含义和JDK中的SelectionKey的OP_READ参数类似,表示读就绪。

第三个变量是一个volatile修饰的SelectionKey,该SelectionKey是Channel注册到EventLoop后返回的选择键。 由于一个通道可能会面临多个线程的并发写操作,所以当SelectionKey的状态改变了之后必须保证其他的业务线程能够感知到变化,所以需要使用volatile变量保证可视性。

(2)核心API的源码分析 下面看看AbstractNioChannel中的Channel的注册函数doRegister() :

protected void doRegister() throws Exception { //标识注册是否成功 boolean selected = false; for (;;) { try { //调用JDK的NIO中的SelectableChannel的register方法, //将当前的channel注册到EventLoop的多路复用器上 selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { //当当前注册的selectionKey已经被取消,则抛出异常并处理, //eventLoop().selectNow()方法:将已经取消的selectionKey从多路复用器中删除 eventLoop().selectNow(); selected = true; } else { // We forced a select Operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } }}

从上面的源码中可知,通过局部变量selected来标识注册操作是否成功,然后调用SelectableChannel的register方法将当前的channel注册到EventLoop的多路复用器上。我们知道,当我们注册channel到具体的多路复用选择器上的时候是需要指定监听的网络操作位来表示Channel对哪几种网络事件感兴趣。具体的定义在java.nio.channels.SelectionKey类中可以看到:

public static final int OP_READ = 1 << 0;//读操作位public static final int OP_WRITE = 1 << 2;//写操作位public static final int OP_CONNECT = 1 << 3;//客户端连接服务器操作位public static final int OP_ACCEPT = 1 << 4;//服务端接收客户端连接操作位

但是在AbstractNioChannel中注册的却是0,表示在注册时对任何事件都不感兴趣,仅仅完成注册操作。

在注册的时候可以指定附件,后续Channel接收到网络事件通知时,可以从SelectionKey中重新获取之前的附件进行处理,此处通过传入this指针将AbstractNioChannel的子类自身作为附件注册。如果当前Channel注册成功,则返回selectionKey,通过selectionKey可以从多路复用器中获取Channel对象。

AbstractNioByteChannel AbstractNioByteChannel只有一个成员变量,就是一个Runnable类型的flushTask来负责继续写半包消息,在这个Runnable的接口实现类里面,其实就是调用了flush()函数,保证缓冲区数据都写入到channel中。

private Runnable flushTask;

在这个类里面最重要的方法就是 doWrite(ChannelOutboundBuffer in)方法。这个方法的实现很长,我就把它拆分成几部分来讲:

第一部分

int writeSpinCount = -1;boolean setOpWrite = false;//写半包标识for (;;) { //弹出一条消息 Object msg = in.current(); if (msg == null) { //所有消息已经写完 clearOpWrite(); //直接返回,所以incompleteWrite(...) 方法不会被调用 return; }

ChannelOutboundBuffer是一个环形数组,其实也就是一个环形缓冲区,服务端的数据是先写入到缓冲区,然后再从缓冲区写入到通道中的。

in.current(); 该方法先从消息环形数组中弹出一条消息,然后判断该消息是否为空,如果是null说明缓冲区中所有待发送的消息全部都发送完毕了,调用clearOpWrite(); 方法清除半包标识然后直接返回退出循环。

下面看看清除半包标识做了什么:

protected final void clearOpWrite() { //获取当前通道的SelectionKey final SelectionKey key = selectionKey(); //如果SelectionKey无效就直接退出函数 if (!key.isValid()) { return; } //获取网络操作位 final int interestOps = key.interestOps(); //与SelectionKey.OP_WRITE按位与,如果非0就表示当前SelectionKey是可读的,也就是通道可读的,然后清除写操作位。 if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); }}

第二部分 其实也就是当所发送的消息时ByteBuf的时候,具体处理如下:

if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; //获取可读字节数 int readableBytes = buf.readableBytes(); //如果可读字节数为0,直接从环形缓冲数组中删除该消息,继续循环处理其余消息 if (readableBytes == 0) { in.remove(); continue; } boolean done = false;//消息是否已经全部发送完毕标识 long flushedAmount = 0;//发送的总消息字节数 //获取循环发送的次数 if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (!buf.isReadable()) { done = true; break; } } in.progress(flushedAmount); if (done) { in.remove(); } else { // Break the loop and so incompleteWrite(...) is called. break; }}

具体业务如下: (1)首先判断ByteBuf可读字节数是否为0,如果为0就直接从环形缓冲数组中删除该消息,继续循环处理其余消息

(2)创建标识消息是否完全发送标识done和发送的总消息字节数flushedAmount。之后从配置类中获取循环发送的次数。循环发送的次数指一次发送没有完成时继续循环发送的次数。这里设置最大循环次数原因是为了避免IO线程一直尝试写操作时,IO写线程无法处理其他IO操作,如果网络IO太慢或则对方接收太慢会造成IO线程假死。 (一次发送没有完成时称为写半包。)

调用doWriteBytes(buf);进行消息发送,不同的Channel有不同的实现,所以是抽象方法。如果本次发送的字节数是0,表示TCP缓冲区已满,此时自旋再发送任然可能是0,所以讲半包标识setOpWrite设置为true,退出循环,释放IO线程,防止Io线程假死。

AbstractNioMessageChannel AbstractNioMessageChannel是服务端的Channel继承的类,这个类里面主要的实现方法只有一个,就是doWrite(ChannelOutboundBuffer in); 源码如下:

protected void doWrite(ChannelOutboundBuffer in) throws Exception { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); for (;;) { Object msg = in.current(); if (msg == null) { // Wrote all messages. if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } break; } try { boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) { if (doWriteMessage(msg, in)) { done = true; break; } } if (done) { in.remove(); } else { // Did not write all messages. if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } break; } } catch (IOException e) { if (continueOnWriteError()) { in.remove(e); } else { throw e; } } }}

该方法的实现与AbstractNioByteChannel类似,区别在于: (1)AbstractNioByteChannel调用doWriteBytes( );发送的是ByteBuf或则是FileRegion。

(2)AbstractNioMessageChannel 调用doWriteMessage(msg, in) 发送的是POJO对象。

NioServerSocketChannel NioServerSocketChannel主要是继承了AbstractNioMessageChannel并且实现了io.netty.channel.socket.ServerSocketChannel接口。

成员属性 我们首先看成员属性和静态函数:

private static ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); }}private final ServerSocketChannelConfig config;

定义了一个ServerSocketChannelConfig用于配置ServerSocketChannel的TCP参数,该对象在NioServerSocketChannel中实例化。

静态的newSocket()方法用于通SelectorProvider.openServerSocketChannel(); 方法打开新ServerSocketChannel通道。

实现接口的方法实现 直接看源码:

@Overridepublic boolean isActive() { return javaChannel().socket().isBound();}@Overridepublic InetSocketAddress remoteAddress() { return null;}//该函数获取的是当前服务器的channel@Overrideprotected ServerSocketChannel javaChannel() { return (ServerSocketChannel) super.javaChannel();}@Overrideprotected SocketAddress localAddress0() { return javaChannel().socket().getLocalSocketAddress();}@Overrideprotected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); }}

java.net.ServerSocket的isBound()方法判断服务端监听端口是否处于绑定状态,它的remoteAddress为空。

javaChannel() 的实现是java.nio.ServerSocketChannel,服务端在进行端口绑定的时候,可以指定backlog,也就是允许客户端排队的最大长度。

下面继续看服务端Channel的doReadMessages()实现。

@Overrideprotected int doReadMessages(List<Object> buf) throws Exception { //1.获取服务端的ServerSocketChannel,然后调用accept()接受新的客户端; SocketChannel ch = javaChannel().accept(); try { //2.如果SocketChannel不为空,就利用当前的NioServerSocketChannel、EventLoop和SocketChannel创建新的NioSocketChannel,并加入到List<Object> if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0;}

客户端-NioSocketChannel

(1)连接操作 重点分析与客户端连接相关的操作:


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表