首页 > 学院 > 开发设计 > 正文

基于Netty的RPC架构笔记5之心跳

2019-11-08 02:39:48
字体:
来源:转载
供稿:网友

1、学习idleStateHandler, 用来检测会话状态

2、心跳其实就是一个普通的请求,特点数据简单,业务也简单

3、心跳对于服务端来说,定时清除闲置会话inactive(netty5) channelclose(netty3)

4、心跳对客户端来说,用来检测会话是否断开,是否重连! 用来检测网络延时!

5、检测心跳有的通过定时检测,netty提供了api来解决此问题

下面是netty3针对心跳检测的API测试

package com.heart;import java.net.InetSocketAddress;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ServerBootstrap;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;import org.jboss.netty.handler.codec.string.StringDecoder;import org.jboss.netty.handler.codec.string.StringEncoder;import org.jboss.netty.handler.timeout.IdleStateHandler;import org.jboss.netty.util.HashedWheelTimer;/** * netty服务端入门 * */public class Server {	public static void main(String[] args) {		//服务类		ServerBootstrap bootstrap = new ServerBootstrap();				//boss线程监听端口,worker线程负责数据读写		ExecutorService boss = Executors.newCachedThreadPool();		ExecutorService worker = Executors.newCachedThreadPool();				//设置niosocket工厂		bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));				final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();		//设置管道的工厂		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {						@Override			public ChannelPipeline getPipeline() throws Exception {				ChannelPipeline pipeline = Channels.pipeline();				pipeline.addLast("idle", new IdleStateHandler(hashedWheelTimer, 5, 5, 10));				pipeline.addLast("decoder", new StringDecoder());				pipeline.addLast("encoder", new StringEncoder());				pipeline.addLast("helloHandler", new HelloHandler());				return pipeline;			}		});				bootstrap.bind(new InetSocketAddress(10101));				System.out.PRintln("start!!!");			}}
package com.heart;import org.jboss.netty.channel.ChannelEvent;import org.jboss.netty.channel.ChannelFuture;import org.jboss.netty.channel.ChannelFutureListener;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.MessageEvent;import org.jboss.netty.channel.SimpleChannelHandler;import org.jboss.netty.handler.timeout.IdleState;import org.jboss.netty.handler.timeout.IdleStateEvent;public class HelloHandler extends SimpleChannelHandler {	@Override	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {		System.out.println(e.getMessage());	}	@Override	public void handleUpstream(final ChannelHandlerContext ctx, ChannelEvent e) throws Exception {		if (e instanceof IdleStateEvent) {			if(((IdleStateEvent)e).getState() == IdleState.ALL_IDLE){				System.out.println("需要提醒玩家下线");				//关闭会话,踢玩家下线				ChannelFuture write = ctx.getChannel().write("hi  time out, you will close");				write.addListener(new ChannelFutureListener() {										@Override					public void OperationComplete(ChannelFuture future) throws Exception {						 ctx.getChannel().close();					}				});			}		} else {			super.handleUpstream(ctx, e);		}	}}下面是netty5针对心跳检测的API测试
package com.heart;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.handler.timeout.IdleStateHandler;/** * netty5服务端 * */public class Server {	public static void main(String[] args) {		//服务类		ServerBootstrap bootstrap = new ServerBootstrap();				//boss和worker		EventLoopGroup boss = new NioEventLoopGroup();		EventLoopGroup worker = new NioEventLoopGroup();				try {			//设置线程池			bootstrap.group(boss, worker);						//设置socket工厂、			bootstrap.channel(NioServerSocketChannel.class);						//设置管道工厂			bootstrap.childHandler(new ChannelInitializer<Channel>() {				@Override				protected void initChannel(Channel ch) throws Exception {					ch.pipeline().addLast(new IdleStateHandler(5, 5, 10));					ch.pipeline().addLast(new StringDecoder());					ch.pipeline().addLast(new StringEncoder());					ch.pipeline().addLast(new ServerHandler());				}			});						//netty3中对应设置如下			//bootstrap.setOption("backlog", 1024);			//bootstrap.setOption("tcpNoDelay", true);			//bootstrap.setOption("keepAlive", true);			//设置参数,TCP参数			bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小			bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接			bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送						//绑定端口			ChannelFuture future = bootstrap.bind(10101);						System.out.println("start");						//等待服务端关闭			future.channel().closeFuture().sync();					} catch (Exception e) {			e.printStackTrace();		} finally{			//释放资源			boss.shutdownGracefully();			worker.shutdownGracefully();		}	}}
package com.heart;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.timeout.IdleState;import io.netty.handler.timeout.IdleStateEvent;/** * 服务端消息处理 * */public class ServerHandler extends SimpleChannelInboundHandler<String> {	@Override	protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {		System.out.println(msg);				ctx.channel().writeAndFlush("hi");		ctx.writeAndFlush("hi");	}			@Override	public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {		if(evt instanceof IdleStateEvent){			IdleStateEvent event = (IdleStateEvent)evt;			if(event.state() == IdleState.ALL_IDLE){								//清除超时会话				ChannelFuture writeAndFlush = ctx.writeAndFlush("you will close");				writeAndFlush.addListener(new ChannelFutureListener() {										@Override					public void operationComplete(ChannelFuture future) throws Exception {						ctx.channel().close();					}				});			}		}else{			super.userEventTriggered(ctx, evt);		}	}	/**	 * 新客户端接入	 */	@Override	public void channelActive(ChannelHandlerContext ctx) throws Exception {		System.out.println("channelActive");	}	/**	 * 客户端断开	 */	@Override	public void channelInactive(ChannelHandlerContext ctx) throws Exception {		System.out.println("channelInactive");	}	/**	 * 异常	 */	@Override	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {		cause.printStackTrace();	}		}


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表