目录
[TOC]
参考文章:https://www.cnblogs.com/stateis0/p/8960791.html
我的代码地址:netty案例:用netty实现一个简单的rpc
1、需求 客户端使用接口透明调用服务端:
2、设计与实现 接口设计 1 2 3 public interface HelloService {2 String hello (String msg) ;3 }
消费者相关实现 思路:使用动态代理实现接口的代理对象,当调用代理方法时,使用netty客户端发送请求,等待返回。
客户端调用接口方法 客户端调用代理方法,返回一个实现了 HelloService 接口的代理对象,调用代理对象的方法,返回结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class ClientBootstrap { public static final String providerName = "HelloService#hello#" ; public static void main (String[] args) throws InterruptedException { RpcConsumer consumer = new RpcConsumer(); HelloService service = (HelloService) consumer .createProxy(HelloService.class , providerName ) ; for (; ; ) { Thread.sleep(1000 ); System.out.println(service.hello("are you ok ?" )); } } }
那createProxy()是如何实现接口的代理对象的呢?
接口的代理方法实现 思路:使用JDK 的动态代理(Proxy.newProxyInstance)创建HelloService的代理对象,当调用代理方法时,使用netty客户端向服务端发送相应请求,等待返回结果。
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class RpcConsumer { private static ExecutorService executor = Executors .newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static HelloClientHandler helloClientHandler; public Object createProxy (final Class<?> serviceClass, final String providerName) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass}, (proxy, method, args) -> { if (helloClientHandler == null ) { initClient(); } helloClientHandler.setPara(providerName + args[0 ]); return executor.submit(helloClientHandler).get(); }); } private static void initClient () { helloClientHandler = new HelloClientHandler(); EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class ) .option (ChannelOption .TCP_NODELAY , true ) .handler (new ChannelInitializer <SocketChannel >() { @Override public void initChannel (SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new StringDecoder()); p.addLast(new StringEncoder()); p.addLast(helloClientHandler); } }); try { b.connect("localhost" , 8088 ).sync(); } catch (InterruptedException e) { e.printStackTrace(); } } }
详细过程:
当客户端调用代理类的方法时(hello() ),会触发InvocationHandler方法执行,其返回值即调用方法的返回值。
1 2 Proxy.newProxyInstance(ClassLoader loader,Class<?>[] interfaces, InvocationHandler h)
可在InvocationHandler方法里做如下操作:初始化 Netty 客户端,还需要向服务端请求数据,并返回数据
初始化客户端逻辑(initClient()): 创建一个 Netty 的客户端,并连接提供者(服务端),并设置一个自定义 handler(HelloClientHandler),和一些 String 类型的编解码器。
创建代理逻辑:使用 JDK 的动态代理技术,代理对象中的 invoke 方法实现如下: 如果 helloClientHandler 没有初始化,则初始化 helloClientHandler,这个 helloClientHandler 既是 handler (),也是一个 Callback。
1 public class HelloClientHandler extends ChannelInboundHandlerAdapter implements Callable
将参数设置进 client ,使用线程池调用 helloClientHandler 的 call 方法并阻塞等待数据返回(阻塞会在call里体现)。
1 2 3 4 5 return executor.submit(helloClientHandler).get();
接下来是HelloClientHandler 的实现
HelloClientHandler 的实现 代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class HelloClientHandler extends ChannelInboundHandlerAdapter implements Callable { private ChannelHandlerContext context; private String result; private String para; @Override public void channelActive (ChannelHandlerContext ctx) { context = ctx; } @Override public synchronized void channelRead (ChannelHandlerContext ctx, Object msg) { result = msg.toString(); notify(); } @Override public synchronized Object call () throws InterruptedException { context.writeAndFlush(para); wait(); return result; } public void setPara (String para) { this .para = para; } }
详细过程:
当成功连接后,触发channelActive(),缓存 ChannelHandlerContext,用于下次使用(call方法每次要使用context发消息到服务端),该类有两个属性:返回结果和请求参数。
当调用 call 方法的时候,将请求参数发送到服务端,等待服务端返回(wait())。
当服务端收到并返回数据后,触发 channelRead方法,将返回值赋值给 result,并唤醒等待在 call 方法上的线程(notify())。此时,call方法返回数据,即代理对象返回数据。
提供者相关实现 主要就是创建一个 netty 服务端,实现一个自定义的 handler,自定义 handler 判断是否符合之间的约定(算是协议吧),如果符合,就创建一个接口的实现类,并调用他的方法返回字符串
实现接口 实现约定接口,用于返回客户端数据
1 2 3 4 5 6 7 8 public class HelloServiceImpl implements HelloService { public String hello (String msg) { return msg != null ? msg + " -----> I am fine." : "I am fine." ; } }
实现 Netty 服务端及自定义handler 服务端启动类
1 2 3 4 5 6 7 public class ServerBootstrap { public static void main (String[] args) { int port = 8088 ; NettyServer.startServer("localhost" , port); System.out.println("服务端已开启,端口:" +port); } }
NettyServer的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class NettyServer { public static void startServer (String hostName, int port) { startServer0(hostName, port); } private static void startServer0 (String hostName, int port) { try { ServerBootstrap bootstrap = new ServerBootstrap(); NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); bootstrap.group(eventLoopGroup) .channel(NioServerSocketChannel.class ) .childHandler (new ChannelInitializer <SocketChannel >() { @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new StringDecoder()); p.addLast(new StringEncoder()); p.addLast(new HelloServerHandler()); } }); bootstrap.bind(hostName, port).sync(); } catch (InterruptedException e) { e.printStackTrace(); } } }
上面的代码中添加了 String类型的编解码 handler,添加了一个自定义 handler。HelloServerHandler用于处理客户端的请求。
HelloServerHandler的实现
代码:
1 2 3 4 5 6 7 8 9 10 11 public class HelloServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (msg.toString().startsWith(ClientBootstrap.providerName)) { String result = new HelloServiceImpl() .hello(msg.toString().substring(msg.toString().lastIndexOf("#" ) + 1 )); ctx.writeAndFlush(result); } } }
这里判断是否符合约定(并没有使用复杂的协议,只是一个字符串判断),然后创建一个具体实现类,并调用方法写回客户端。
3、测试结果 启动ServerBootstrap
启动ClientBootstrap
控制台每秒打印:
1 2 3 4 5 6 are you ok ? -----> I am fine. are you ok ? -----> I am fine. are you ok ? -----> I am fine. are you ok ? -----> I am fine. are you ok ? -----> I am fine. ...