avatar

目录
用netty实现一个简单的rpc

目录

[TOC]

参考文章:https://www.cnblogs.com/stateis0/p/8960791.html

我的代码地址:netty案例:用netty实现一个简单的rpc

1、需求

客户端使用接口透明调用服务端:

image-20190813174733039

2、设计与实现

接口设计

java
1
2
3
public interface HelloService {
2 String hello(String msg);
3 }

消费者相关实现

思路:使用动态代理实现接口的代理对象,当调用代理方法时,使用netty客户端发送请求,等待返回。

客户端调用接口方法

客户端调用代理方法,返回一个实现了 HelloService 接口的代理对象,调用代理对象的方法,返回结果。

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ClientBootstrap {
public static final String providerName = "HelloService#hello#";

public static void main(String[] args) throws InterruptedException {
RpcConsumer consumer = new RpcConsumer();
// 创建一个代理对象
HelloService service = (HelloService) consumer
.createProxy(HelloService.class, providerName);

// 每秒调用一次hello方法
for (; ; ) {
Thread.sleep(1000);
System.out.println(service.hello("are you ok ?"));
}
}
}

那createProxy()是如何实现接口的代理对象的呢?

接口的代理方法实现

思路:使用JDK 的动态代理(Proxy.newProxyInstance)创建HelloService的代理对象,当调用代理方法时,使用netty客户端向服务端发送相应请求,等待返回结果。

代码:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class RpcConsumer {
private static ExecutorService executor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

private static HelloClientHandler helloClientHandler;
/**
* 创建一个代理对象
*/
public Object createProxy(final Class<?> serviceClass,
final String providerName) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serviceClass}, (proxy, method, args) -> {
// 当代理对象被创建时就会执行该内容

if (helloClientHandler == null) {
initClient();
}
// 设置参数
helloClientHandler.setPara(providerName + args[0]);
return executor.submit(helloClientHandler).get();
});
}
/**
* 初始化客户端
*/
private static void initClient() {
helloClientHandler = new HelloClientHandler();
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(helloClientHandler);
}
});
try {
b.connect("localhost", 8088).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

详细过程:

  1. 当客户端调用代理类的方法时(hello() ),会触发InvocationHandler方法执行,其返回值即调用方法的返回值。

    java
    1
    2
    Proxy.newProxyInstance(ClassLoader loader,Class<?>[] interfaces,
    InvocationHandler h)
  2. 可在InvocationHandler方法里做如下操作:初始化 Netty 客户端,还需要向服务端请求数据,并返回数据

  3. 初始化客户端逻辑(initClient()): 创建一个 Netty 的客户端,并连接提供者(服务端),并设置一个自定义 handler(HelloClientHandler),和一些 String 类型的编解码器。

  4. 创建代理逻辑:使用 JDK 的动态代理技术,代理对象中的 invoke 方法实现如下:
    如果 helloClientHandler 没有初始化,则初始化 helloClientHandler,这个 helloClientHandler 既是 handler (),也是一个 Callback。

    java
    1
    public class HelloClientHandler extends ChannelInboundHandlerAdapter implements Callable

    将参数设置进 client ,使用线程池调用 helloClientHandler 的 call 方法并阻塞等待数据返回(阻塞会在call里体现)。

    java
    1
    2
    3
    4
    5
    // 该返回即代理对象调用的代理方法的返回值
    return executor.submit(helloClientHandler).get();

    // ExecutorService.submit(Callable<T> task)
    // submit方法会触发HelloClientHandler.call方法执行

接下来是HelloClientHandler 的实现

HelloClientHandler 的实现

代码:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class HelloClientHandler extends ChannelInboundHandlerAdapter implements Callable {

private ChannelHandlerContext context;
private String result;
private String para;

@Override
public void channelActive(ChannelHandlerContext ctx) {
context = ctx;
}

/**
* 收到服务端数据,唤醒等待线程
*/
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) {
result = msg.toString();
notify();
}

/**
* 写出数据,开始等待唤醒
*/
@Override
public synchronized Object call() throws InterruptedException {
context.writeAndFlush(para);
wait();
return result;
}

public void setPara(String para) {
this.para = para;
}
}

详细过程:

  1. 当成功连接后,触发channelActive(),缓存 ChannelHandlerContext,用于下次使用(call方法每次要使用context发消息到服务端),该类有两个属性:返回结果和请求参数。

  2. 当调用 call 方法的时候,将请求参数发送到服务端,等待服务端返回(wait())。

  3. 当服务端收到并返回数据后,触发 channelRead方法,将返回值赋值给 result,并唤醒等待在 call 方法上的线程(notify())。此时,call方法返回数据,即代理对象返回数据。

提供者相关实现

主要就是创建一个 netty 服务端,实现一个自定义的 handler,自定义 handler 判断是否符合之间的约定(算是协议吧),如果符合,就创建一个接口的实现类,并调用他的方法返回字符串

实现接口

实现约定接口,用于返回客户端数据

java
1
2
3
4
5
6
7
8
/**
* 实现类
*/
public class HelloServiceImpl implements HelloService {
public String hello(String msg) {
return msg != null ? msg + " -----> I am fine." : "I am fine.";
}
}

实现 Netty 服务端及自定义handler

服务端启动类

java
1
2
3
4
5
6
7
public class ServerBootstrap {
public static void main(String[] args) {
int port = 8088;
NettyServer.startServer("localhost", port);
System.out.println("服务端已开启,端口:"+port);
}
}

NettyServer的代码:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 服务端
*/
public class NettyServer {
/**
* 启动客户端
*/
public static void startServer(String hostName, int port) {
startServer0(hostName, port);
}
private static void startServer0(String hostName, int port) {
try {
ServerBootstrap bootstrap = new ServerBootstrap();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
bootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(new HelloServerHandler());
}
});
bootstrap.bind(hostName, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

上面的代码中添加了 String类型的编解码 handler,添加了一个自定义 handler。HelloServerHandler用于处理客户端的请求。

HelloServerHandler的实现

代码:

java
1
2
3
4
5
6
7
8
9
10
11
public class HelloServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 如何符合约定,则调用本地方法,返回数据
if (msg.toString().startsWith(ClientBootstrap.providerName)) {
String result = new HelloServiceImpl()
.hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(result);
}
}
}

这里判断是否符合约定(并没有使用复杂的协议,只是一个字符串判断),然后创建一个具体实现类,并调用方法写回客户端。

3、测试结果

启动ServerBootstrap

启动ClientBootstrap

控制台每秒打印:

Code
1
2
3
4
5
6
are you ok ? -----> I am fine.
are you ok ? -----> I am fine.
are you ok ? -----> I am fine.
are you ok ? -----> I am fine.
are you ok ? -----> I am fine.
...
文章作者: Machine
文章链接: https://machine4869.gitee.io/2019/08/13/20190813154158451/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 哑舍
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论