avatar

目录
Netty项目-仿写微信 IM 即时通讯系统(2)

目录

[TOC]

pipeline 与 channelHandler

问题:如何避免 else 泛滥?

一个大致处理流程:

16545510d7b4f970

缺陷:ClientHandler、ServerHandler类臃肿,每增加新命令都要添加分支。

解决:模块化处理,不同的逻辑放置到单独的类来处理,最后形成一个完整的逻辑处理链。

Netty 中的 pipeline 和 channelHandler 正是用来解决这个问题的:责任链设计模式

pipeline 与 channelHandler 的构成

1654526f0a67bb52

1、一条连接对应着一个 Channel,这条 Channel 所有的处理逻辑都在一个叫做 ChannelPipeline 的对象里面

2、ChannelPipeline 里面每个节点都是一个 ChannelHandlerContext 对象,这个对象能够拿到和 Channel 相关的所有的上下文信息,然后这个对象包着一个重要的对象,那就是逻辑处理器 ChannelHandler

channelHandler 的分类

1654526f0a8f2890

ChannelHandler 有两大子接口:

1、 ChannelInboundHandler,处理读数据的逻辑。我们在一端读到一段数据,要做解析、处理、响应, 在开始组装响应之前的所有的逻辑,都可以放置在 ChannelInboundHandler 里处理

2、 ChannelOutBoundHandler ,处理写数据的逻辑。它是定义我们一端在组装完响应之后,把数据写到对端的逻辑。比如,我们封装好一个 response 对象,接下来我们有可能对这个 response 做一些其他的特殊逻辑,然后,再编码成 ByteBuf,最终写到对端(response>ByteBuf之间的一些公共操作)。 write()

3、这两个子接口分别有对应的默认实现,ChannelInboundHandlerAdapter,和 ChanneloutBoundHandlerAdapter,它们分别实现了两大接口的所有功能,默认情况下会把读写事件传播到下一个 handler。

ChannelInboundHandler 的事件传播

java
1
2
3
4
5
6
7
8
serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
}
});

InBoundHandlerA

java
1
2
3
4
5
6
7
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerA: " + msg);
super.channelRead(ctx, msg);
}
}

1、父类的 channelRead() 方法会自动调用到下一个 inBoundHandler 的 channelRead() 方法,并且会把当前 inBoundHandler 里处理完毕的对象传递到下一个 inBoundHandler,例子中传递的对象都是同一个 msg

2、inBoundHandler 的执行顺序与我们通过 addLast() 方法 添加的顺序保持一致

ChannelOutboundHandler 的事件传播

java
1
2
3
4
// outBound,处理写数据的逻辑链
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
java
1
2
3
4
5
6
7
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerA: " + msg);
super.write(ctx, msg, promise);
}
}

1、父类的 write() 方法会自动调用到下一个 outBoundHandler 的 write() 方法,并且会把当前 outBoundHandler 里处理完毕的对象传递到下一个 outBoundHandler

2、可以看到,outBoundHandler 的执行顺序与我们添加的顺序相反

为什么?

pipeline 的结构

1654526f0a73d8c3

1、不管我们定义的是哪种类型的 handler, 最终它们都是以双向链表的方式连接,这里实际链表的节点是 ChannelHandlerContext

2、虽然两种类型的 handler 在一个双向链表里,但是这两类 handler 的分工是不一样的。inBoundHandler 的事件通常只会传播到下一个 inBoundHandler,outBoundHandler 的事件通常只会传播到下一个 outBoundHandler,两者相互不受干扰。

在下一小节,我们会了解到几种特殊的 channelHandler,并且使用这几种特殊的 channelHandler 来改造我们的客户端和服务端逻辑,解决掉 if else 泛滥的问题。

实战:构建客户端与服务端 pipeline

把复杂的逻辑从单独的一个 channelHandler 中抽取出来。

ChannelInboundHandlerAdapter 与 ChannelOutboundHandlerAdapter

ChannelInboundHandlerAdapter.java

java
1
2
3
4
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}

1、这里的 msg 就是上一个 handler 的输出

2、默认情况下 adapter 会通过 fireChannelRead() 方法直接把上一个 handler 的输出结果传递到下一个 handler。

ChannelOutboundHandlerAdapter.java

java
1
2
3
4
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}

我们往 pipeline 添加的第一个 handler 中的 channelRead 方法中,msg 对象其实就是 ByteBuf

服务端在接受到数据之后,应该首先要做的第一步逻辑就是把这个 ByteBuf 进行解码,然后把解码后的结果传递到下一个 handler,如下:

java
1
2
3
4
5
6
7
8
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf requestByteBuf = (ByteBuf) msg;
// 解码
Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);
// 解码后的对象传递到下一个 handler 处理
ctx.fireChannelRead(packet)
}

ByteToMessageDecoder

把二进制数据转换到我们的一个 Java 对象

使用:

java
1
2
3
4
5
6
7
public class PacketDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {
out.add(PacketCodeC.INSTANCE.decode(in));
}
}

1、我们通过往这个 List 里面添加解码后的结果对象,就可以自动实现结果往下一个 handler 进行传递

2、使用 ByteToMessageDecoder,Netty 会自动进行内存的释放,不用操心太多的内存管理方面的逻辑(ByteBuf是堆外内存需要自行释放)

SimpleChannelInboundHandler

XXXHandler.java

java
1
2
3
4
5
if (packet instanceof XXXPacket) {
// ...处理
} else {
ctx.fireChannelRead(packet);
}

SimpleChannelInboundHandler 对象, 类型判断和对象传递的活都自动帮我们实现了,而我们可以专注于处理我们所关心的指令即可。

使用:

LoginRequestHandler.java

java
1
2
3
4
5
6
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {
// 登录逻辑
}
}

MessageRequestHandler.java

java
1
2
3
4
5
6
public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageRequestPacket messageRequestPacket) {

}
}

MessageToByteEncoder

写响应的时候处理逻辑类似,都需要进行编码,然后调用 writeAndFlush() 将数据写到客户端,这个编码的过程其实也是重复的逻辑。

而Netty 提供了一个特殊的 channelHandler 来专门处理编码逻辑,我们不需要每一次将响应写到对端的时候调用一次编码逻辑进行编码,也不需要自行创建 ByteBuf,这个类叫做 MessageToByteEncoder

实现编码逻辑:

java
1
2
3
4
5
6
7
public class PacketEncoder extends MessageToByteEncoder<Packet> {

@Override
protected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf out) {
PacketCodeC.INSTANCE.encode(out, packet);
}
}

PacketCodeC.encode需要修改

构建客户端与服务端 pipeline

1666fd9cc2b9c089

服务端

java
1
2
3
4
5
6
7
8
9
serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new MessageRequestHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});

客户端

java
1
2
3
4
5
6
7
8
9
10
bootstrap
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginResponseHandler());
ch.pipeline().addLast(new MessageResponseHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});

实战:拆包粘包理论与解决方案

拆包粘包例子

客户端在连接建立成功之后,使用一个 for 循环,不断向服务端写一串数据。服务端收到数据之后,仅仅把数据打印出来。出现了服务端粘包半包现象。

见 FirstClientHandler 、FirstServerHandler

  1. 一种是正常的字符串输出。
  2. 一种是多个字符串“粘”在了一起,我们定义这种 ByteBuf 为粘包。
  3. 一种是一个字符串被“拆”开,形成一个破碎的包,我们定义这种 ByteBuf 为半包。

为什么会有粘包半包现象?

  1. 应用层是按照 ByteBuf 为 单位来发送数据,数据到了服务端,也是按照字节流的方式读入,然后到了 Netty 应用层面,重新拼装成 ByteBuf,而这里的 ByteBuf 与客户端按顺序发送的 ByteBuf 可能是不对等的。

  2. 因此,我们需要在客户端根据自定义协议来组装我们应用层的数据包,然后在服务端根据我们的应用层的协议来组装数据包,这个过程通常在服务端称为拆包,而在客户端称为粘包。

  3. 举个栗子,发送端将三个数据包粘成两个 TCP 数据包发送到接收端,接收端就需要根据应用协议将两个数据包重新组装成三个数据包。

拆包的原理

用户如果自己需要拆包:略

Netty 自带的拆包器

  1. 固定长度的拆包器 FixedLengthFrameDecoder

  2. 行拆包器 LineBasedFrameDecoder

  3. 分隔符拆包器 DelimiterBasedFrameDecoder

  4. 基于长度域拆包器 LengthFieldBasedFrameDecoder

    只要你的自定义协议中包含长度域字段,均可以使用这个拆包器来实现应用层拆包。

如何使用 LengthFieldBasedFrameDecoder

1、在自定义协议中,得到长度域偏移量和长度域的长度(7,4),构造一个拆包器。

java
1
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4);

2、这样一个拆包器写好之后,只需要在 pipeline 的最前面加上这个拆包器。

可参考的文章: netty源码分析之LengthFieldBasedFrameDecoder

服务端

java
1
2
3
4
5
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4));
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new MessageRequestHandler());
ch.pipeline().addLast(new PacketEncoder());

客户端

java
1
2
3
4
5
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4));
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginResponseHandler());
ch.pipeline().addLast(new MessageResponseHandler());
ch.pipeline().addLast(new PacketEncoder());

这样,在后续 PacketDecoder 进行 decode 操作的时候,ByteBuf 就是一个完整的自定义协议数据包。

拒绝非本协议连接

每个客户端发过来的数据包都做一次快速判断,判断当前发来的数据包是否是满足我的自定义协议, 我们只需要继承自 LengthFieldBasedFrameDecoder 的 decode() 方法,然后在 decode 之前判断前四个字节是否是等于我们定义的魔数 0x12345678

Spliter

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Spliter extends LengthFieldBasedFrameDecoder {
private static final int LENGTH_FIELD_OFFSET = 7;
private static final int LENGTH_FIELD_LENGTH = 4;

public Spliter() {
super(Integer.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH);
}

@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 屏蔽非本协议的客户端
if (in.getInt(in.readerIndex()) != PacketCodeC.MAGIC_NUMBER) {
ctx.channel().close();
return null;
}

return super.decode(ctx, in);
}
}

实验一下:

shell
1
2
3
4
~ telnet 127.0.0.1 8000
# ...
telnet> send ayt # 发送字符串“Are you There”
Connection closed by foreign host. # 被关闭了

向服务端发送一段字符串,由于这段字符串是不符合我们的自定义协议的,于是在第一时间,我们的服务端就关闭了这条连接。

服务端和客户端的 pipeline 结构

至此

1657e014321e00b0

channelHandler 的生命周期

回调方法的执行是有顺序的,而这个执行顺序可以称为 ChannelHandler 的生命周期

ChannelHandler 的生命周期详解

ChannelInBoundHandler为例

1、LifeCyCleTestHandler

运行 NettyServer,控制台输出:

Code
1
2
3
4
5
6
7
8
9
10
11
逻辑处理器被添加:handlerAdded()
channel 绑定到线程(NioEventLoop):channelRegistered()
channel 准备就绪:channelActive()
channel 有数据可读:channelRead()
Thu Aug 01 21:12:46 CST 2019: 收到客户端登录请求……
Thu Aug 01 21:12:46 CST 2019: 登录成功!
channel 某次数据读完:channelReadComplete()
channel 某次数据读完:channelReadComplete()
channel 被关闭:channelInactive()
channel 取消线程(NioEventLoop) 的绑定: channelUnregistered()
逻辑处理器被移除:handlerRemoved()

2、回调方法的执行顺序

3、每个回调方法的含义

4、一幅图来标识 ChannelHandler 的生命周期

1666fdc2bdcf3f9e

ChannelHandler 生命周期各回调方法用法举例

  1. ChannelInitializer 的实现原理

    initChannel()方法是利用handlerAdded()启动的

  2. handlerAdded() 与 handlerRemoved()

    这两个方法通常可以用在一些资源的申请和释放

  3. channelActive() 与 channelInActive()

    1. 通常我们在这两个回调里面统计单机的连接数,channelActive() 被调用,连接数加一,channelInActive() 被调用,连接数减一
    2. 也可以在 channelActive() 方法中,实现对客户端连接 ip 黑白名单的过滤
  4. channelRead()

    案例:拆包器,可参考阅读:netty源码分析之拆包器的奥秘

  5. channelReadComplete()

    在每次向客户端写数据的时候,都通过 writeAndFlush() 的方法写并刷新到底层,其实这种方式不是特别高效。

    可以在之前调用 writeAndFlush() 的地方都换成 write() 方法,然后在这个方面里面调用 ctx.channel().flush() 方法,相当于一个批量刷新的机制

    如果对性能要求没那么高,writeAndFlush() 足矣。

实战:使用 channelHandler 的热插拔实现客户端身份校验

  1. 如果有很多业务逻辑的 handler 都要进行某些相同的操作,我们完全可以抽取出一个 handler 来单独处理
  2. 如果某一个独立的逻辑在执行几次之后(这里是一次)不需要再执行了,那么我们可以通过 ChannelHandler 的热插拔机制来实现动态删除逻辑,应用程序性能处理更为高效
  3. 很多操作在执行前都要进行登录校验,所以将其抽取为单独的handler;只要连接未断开,客户端只要成功登录过,后续就不需要每次都进行客户端的身份校验。

1、身份校验

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// --LoginRequestHandler.java
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {
if (valid(loginRequestPacket)) {
// ...
// 基于我们前面小节的代码,添加如下一行代码
LoginUtil.markAsLogin(ctx.channel());
}
// ...
}

// --NettyServer.java
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
// 新增加用户认证handler
ch.pipeline().addLast(new AuthHandler());
ch.pipeline().addLast(new MessageRequestHandler());
ch.pipeline().addLast(new PacketEncoder());

// --AuthHandler.java
public class AuthHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!LoginUtil.hasLogin(ctx.channel())) {
ctx.channel().close();
} else {
super.channelRead(ctx, msg);
}
}
}

2、移除校验逻辑

客户端只要成功登录过,后续就不需要再进行客户端的身份校验

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// --AuthHandler.java
public class AuthHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!LoginUtil.hasLogin(ctx.channel())) {
ctx.channel().close();
} else {
// 一行代码实现逻辑的删除
ctx.pipeline().remove(this);
super.channelRead(ctx, msg);
}
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
if (LoginUtil.hasLogin(ctx.channel())) {
System.out.println("当前连接登录验证完毕,无需再次验证, AuthHandler 被移除");
} else {
System.out.println("无登录验证,强制关闭连接!");
}
}
}

3、 身份校验演示

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// --NettyClient.java
private static void startConsoleThread(Channel channel) {
new Thread(() -> {
while (!Thread.interrupted()) {
// 这里注释掉,因为已经有了校验handler
// if (LoginUtil.hasLogin(channel)) {
System.out.println("输入消息发送至服务端: ");
Scanner sc = new Scanner(System.in);
String line = sc.nextLine();

channel.writeAndFlush(new MessageRequestPacket(line));
// }
}
}).start();
}

有身份认证的演示

无身份认证的演示:令用户名密码错误

java
1
2
3
4
5
// --LoginResponseHandler.java
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("客户端连接被关闭!");
}

未完待续…

文章作者: Machine
文章链接: https://machine4869.gitee.io/2019/07/28/20190728161454796/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 哑舍
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论