首页 > 学院 > 开发设计 > 正文

基于Netty5的RPC架构笔记4之案例讲解

2019-11-08 02:40:07
字体:
来源:转载
供稿:网友

    一个thread + 队列 == 一个单线程线程池   =====> 线程安全的,任务是线性串行执行的

线程安全,不会产生阻塞效应 ,使用对象组  下图是原理图

线程不安全,会产生阻塞效应, 使用对象池,下图是对象池的原理图

再看下线程池的原理图

下面是netty5的服务端代码参考

package com.server;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/** * netty5服务端 * */public class Server {	public static void main(String[] args) {		//服务类		ServerBootstrap bootstrap = new ServerBootstrap();				//boss和worker		EventLoopGroup boss = new NioEventLoopGroup();		EventLoopGroup worker = new NioEventLoopGroup();				try {			//设置线程池			bootstrap.group(boss, worker);						//设置socket工厂、			bootstrap.channel(NioServerSocketChannel.class);						//设置管道工厂			bootstrap.childHandler(new ChannelInitializer<Channel>() {				@Override				PRotected void initChannel(Channel ch) throws Exception {					ch.pipeline().addLast(new StringDecoder());					ch.pipeline().addLast(new StringEncoder());					ch.pipeline().addLast(new ServerHandler());				}			});						//netty3中对应设置如下			//bootstrap.setOption("backlog", 1024);			//bootstrap.setOption("tcpNoDelay", true);			//bootstrap.setOption("keepAlive", true);			//设置参数,TCP参数			bootstrap.option(ChannelOption.SO_BACKLOG, 1);//serverSocketchannel的设置,链接缓冲池的大小			bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接			bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送						//绑定端口			ChannelFuture future = bootstrap.bind(10101);						System.out.println("start");						//等待服务端关闭			future.channel().closeFuture().sync();					} catch (Exception e) {			e.printStackTrace();		} finally{			//释放资源			boss.shutdownGracefully();			worker.shutdownGracefully();		}	}}
package com.server;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/** * 服务端消息处理 * */public class ServerHandler extends SimpleChannelInboundHandler<String> {	@Override	protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {		System.out.println(msg);				ctx.channel().writeAndFlush("hi");		ctx.writeAndFlush("hi");	}	/**	 * 新客户端接入	 */	@Override	public void channelActive(ChannelHandlerContext ctx) throws Exception {		System.out.println("channelActive");	}	/**	 * 客户端断开	 */	@Override	public void channelInactive(ChannelHandlerContext ctx) throws Exception {		System.out.println("channelInactive");	}	/**	 * 异常	 */	@Override	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {		cause.printStackTrace();	}		}再看看普通客户端的代码

package com.client;import java.io.BufferedReader;import java.io.InputStreamReader;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/** * netty5的客户端 * */public class Client {	public static void main(String[] args) {		//服务类		Bootstrap bootstrap = new Bootstrap();				//worker		EventLoopGroup worker = new NioEventLoopGroup();				try {			//设置线程池			bootstrap.group(worker);						//设置socket工厂、			bootstrap.channel(NioSocketChannel.class);						//设置管道			bootstrap.handler(new ChannelInitializer<Channel>() {				@Override				protected void initChannel(Channel ch) throws Exception {					ch.pipeline().addLast(new StringDecoder());					ch.pipeline().addLast(new StringEncoder());					ch.pipeline().addLast(new ClientHandler());				}			});						ChannelFuture connect = bootstrap.connect("127.0.0.1", 10101);						BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));			while(true){				System.out.println("请输入:");				String msg = bufferedReader.readLine();				connect.channel().writeAndFlush(msg);			}					} catch (Exception e) {			 e.printStackTrace();		} finally{			worker.shutdownGracefully();		}	}}
package com.client;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/** * 客户端消息处理 * */public class ClientHandler extends SimpleChannelInboundHandler<String> {	@Override	protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {		System.out.println("客户端收到消息:"+msg);	}}再看看单客户端多连接程序

package com.client;import java.io.BufferedReader;import java.io.InputStreamReader;/** * 启动类 * */public class Start {	public static void main(String[] args) {		MultClient client = new MultClient();		client.init(5);				BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));		while(true){			try {				System.out.println("请输入:");				String msg = bufferedReader.readLine();				client.nextChannel().writeAndFlush(msg);			} catch (Exception e) {				e.printStackTrace();			}		}	}}
package com.client;import java.util.ArrayList;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/** * 多连接客户端 * */public class MultClient {		/**	 * 服务类	 */	private Bootstrap bootstrap = new Bootstrap();		/**	 * 会话	 */	private List<Channel> channels = new ArrayList<>();		/**	 * 引用计数	 */	private final AtomicInteger index = new AtomicInteger();		/**	 * 初始化	 * @param count	 */	public void init(int count){				//worker		EventLoopGroup worker = new NioEventLoopGroup();				//设置线程池		bootstrap.group(worker);				//设置socket工厂、		bootstrap.channel(NioSocketChannel.class);				//设置管道		bootstrap.handler(new ChannelInitializer<Channel>() {			@Override			protected void initChannel(Channel ch) throws Exception {				ch.pipeline().addLast(new StringDecoder());				ch.pipeline().addLast(new StringEncoder());				ch.pipeline().addLast(new ClientHandler());			}		});				for(int i=1; i<=count; i++){			ChannelFuture future = bootstrap.connect("127.0.0.1", 10101);			channels.add(future.channel());		}	}		/**	 * 获取会话	 * @return	 */	public Channel nextChannel(){		return getFirstActiveChannel(0);	}			private Channel getFirstActiveChannel(int count){		Channel channel = channels.get(Math.abs(index.getAndIncrement() % channels.size()));		if(!channel.isActive()){			//重连			reconnect(channel);			if(count >= channels.size()){				throw new RuntimeException("no can use channel");			}			return getFirstActiveChannel(count + 1);		}		return channel;	}		/**	 * 重连	 * @param channel	 */	private void reconnect(Channel channel){		synchronized(channel){			if(channels.indexOf(channel) == -1){				return ;			}						Channel newChannel = bootstrap.connect("127.0.0.1", 10101).channel();			channels.set(channels.indexOf(channel), newChannel);		}	}}


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表