目录
[TOC]
pipeline 与 channelHandler
问题:如何避免 else 泛滥?
一个大致处理流程:
缺陷:ClientHandler、ServerHandler类臃肿,每增加新命令都要添加分支。
解决:模块化处理,不同的逻辑放置到单独的类来处理,最后形成一个完整的逻辑处理链。
Netty 中的 pipeline 和 channelHandler 正是用来解决这个问题的:责任链设计模式
pipeline 与 channelHandler 的构成
1、一条连接对应着一个 Channel,这条 Channel 所有的处理逻辑都在一个叫做 ChannelPipeline 的对象里面
2、ChannelPipeline 里面每个节点都是一个 ChannelHandlerContext 对象,这个对象能够拿到和 Channel 相关的所有的上下文信息,然后这个对象包着一个重要的对象,那就是逻辑处理器 ChannelHandler。
channelHandler 的分类
ChannelHandler 有两大子接口:
1、 ChannelInboundHandler,处理读数据的逻辑。我们在一端读到一段数据,要做解析、处理、响应, 在开始组装响应之前的所有的逻辑,都可以放置在 ChannelInboundHandler 里处理
2、 ChannelOutBoundHandler ,处理写数据的逻辑。它是定义我们一端在组装完响应之后,把数据写到对端的逻辑。比如,我们封装好一个 response 对象,接下来我们有可能对这个 response 做一些其他的特殊逻辑,然后,再编码成 ByteBuf,最终写到对端(response>ByteBuf之间的一些公共操作)。 write()。
3、这两个子接口分别有对应的默认实现,ChannelInboundHandlerAdapter,和 ChanneloutBoundHandlerAdapter,它们分别实现了两大接口的所有功能,默认情况下会把读写事件传播到下一个 handler。
ChannelInboundHandler 的事件传播
1 | serverBootstrap |
InBoundHandlerA
1 | public class InBoundHandlerA extends ChannelInboundHandlerAdapter { |
1、父类的 channelRead() 方法会自动调用到下一个 inBoundHandler 的 channelRead() 方法,并且会把当前 inBoundHandler 里处理完毕的对象传递到下一个 inBoundHandler,例子中传递的对象都是同一个 msg
2、inBoundHandler 的执行顺序与我们通过 addLast() 方法 添加的顺序保持一致
ChannelOutboundHandler 的事件传播
1 | // outBound,处理写数据的逻辑链 |
1 | public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter { |
1、父类的 write() 方法会自动调用到下一个 outBoundHandler 的 write() 方法,并且会把当前 outBoundHandler 里处理完毕的对象传递到下一个 outBoundHandler
2、可以看到,outBoundHandler 的执行顺序与我们添加的顺序相反
为什么?
pipeline 的结构
1、不管我们定义的是哪种类型的 handler, 最终它们都是以双向链表的方式连接,这里实际链表的节点是 ChannelHandlerContext。
2、虽然两种类型的 handler 在一个双向链表里,但是这两类 handler 的分工是不一样的。inBoundHandler 的事件通常只会传播到下一个 inBoundHandler,outBoundHandler 的事件通常只会传播到下一个 outBoundHandler,两者相互不受干扰。
在下一小节,我们会了解到几种特殊的 channelHandler,并且使用这几种特殊的 channelHandler 来改造我们的客户端和服务端逻辑,解决掉 if else 泛滥的问题。
实战:构建客户端与服务端 pipeline
把复杂的逻辑从单独的一个 channelHandler 中抽取出来。
ChannelInboundHandlerAdapter 与 ChannelOutboundHandlerAdapter
ChannelInboundHandlerAdapter.java
1 |
|
1、这里的 msg 就是上一个 handler 的输出
2、默认情况下 adapter 会通过 fireChannelRead() 方法直接把上一个 handler 的输出结果传递到下一个 handler。
ChannelOutboundHandlerAdapter.java
1 |
|
我们往 pipeline 添加的第一个 handler 中的 channelRead 方法中,msg 对象其实就是 ByteBuf。
服务端在接受到数据之后,应该首先要做的第一步逻辑就是把这个 ByteBuf 进行解码,然后把解码后的结果传递到下一个 handler,如下:
1 |
|
ByteToMessageDecoder
把二进制数据转换到我们的一个 Java 对象
使用:
1 | public class PacketDecoder extends ByteToMessageDecoder { |
1、我们通过往这个 List 里面添加解码后的结果对象,就可以自动实现结果往下一个 handler 进行传递
2、使用 ByteToMessageDecoder,Netty 会自动进行内存的释放,不用操心太多的内存管理方面的逻辑(ByteBuf是堆外内存需要自行释放)
SimpleChannelInboundHandler
XXXHandler.java
1 | if (packet instanceof XXXPacket) { |
SimpleChannelInboundHandler 对象, 类型判断和对象传递的活都自动帮我们实现了,而我们可以专注于处理我们所关心的指令即可。
使用:
LoginRequestHandler.java
1 | public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> { |
MessageRequestHandler.java
1 | public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> { |
MessageToByteEncoder
写响应的时候处理逻辑类似,都需要进行编码,然后调用 writeAndFlush() 将数据写到客户端,这个编码的过程其实也是重复的逻辑。
而Netty 提供了一个特殊的 channelHandler 来专门处理编码逻辑,我们不需要每一次将响应写到对端的时候调用一次编码逻辑进行编码,也不需要自行创建 ByteBuf,这个类叫做 MessageToByteEncoder
实现编码逻辑:
1 | public class PacketEncoder extends MessageToByteEncoder<Packet> { |
PacketCodeC.encode需要修改
构建客户端与服务端 pipeline
服务端
1 | serverBootstrap |
客户端
1 | bootstrap |
实战:拆包粘包理论与解决方案
拆包粘包例子
客户端在连接建立成功之后,使用一个 for 循环,不断向服务端写一串数据。服务端收到数据之后,仅仅把数据打印出来。出现了服务端粘包半包现象。
见 FirstClientHandler 、FirstServerHandler
- 一种是正常的字符串输出。
- 一种是多个字符串“粘”在了一起,我们定义这种 ByteBuf 为粘包。
- 一种是一个字符串被“拆”开,形成一个破碎的包,我们定义这种 ByteBuf 为半包。
为什么会有粘包半包现象?
应用层是按照 ByteBuf 为 单位来发送数据,数据到了服务端,也是按照字节流的方式读入,然后到了 Netty 应用层面,重新拼装成 ByteBuf,而这里的 ByteBuf 与客户端按顺序发送的 ByteBuf 可能是不对等的。
因此,我们需要在客户端根据自定义协议来组装我们应用层的数据包,然后在服务端根据我们的应用层的协议来组装数据包,这个过程通常在服务端称为拆包,而在客户端称为粘包。
举个栗子,发送端将三个数据包粘成两个 TCP 数据包发送到接收端,接收端就需要根据应用协议将两个数据包重新组装成三个数据包。
拆包的原理
用户如果自己需要拆包:略
Netty 自带的拆包器
固定长度的拆包器 FixedLengthFrameDecoder
行拆包器 LineBasedFrameDecoder
分隔符拆包器 DelimiterBasedFrameDecoder
基于长度域拆包器 LengthFieldBasedFrameDecoder
只要你的自定义协议中包含长度域字段,均可以使用这个拆包器来实现应用层拆包。
如何使用 LengthFieldBasedFrameDecoder
1、在自定义协议中,得到长度域偏移量和长度域的长度(7,4),构造一个拆包器。
1 | new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4); |
2、这样一个拆包器写好之后,只需要在 pipeline 的最前面加上这个拆包器。
服务端
1 | ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4)); |
客户端
1 | ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4)); |
这样,在后续 PacketDecoder 进行 decode 操作的时候,ByteBuf 就是一个完整的自定义协议数据包。
拒绝非本协议连接
每个客户端发过来的数据包都做一次快速判断,判断当前发来的数据包是否是满足我的自定义协议, 我们只需要继承自 LengthFieldBasedFrameDecoder 的
decode()方法,然后在 decode 之前判断前四个字节是否是等于我们定义的魔数0x12345678
Spliter
1 | public class Spliter extends LengthFieldBasedFrameDecoder { |
实验一下:
1 | ~ telnet 127.0.0.1 8000 |
向服务端发送一段字符串,由于这段字符串是不符合我们的自定义协议的,于是在第一时间,我们的服务端就关闭了这条连接。
服务端和客户端的 pipeline 结构
至此
channelHandler 的生命周期
回调方法的执行是有顺序的,而这个执行顺序可以称为 ChannelHandler 的生命周期
ChannelHandler 的生命周期详解
ChannelInBoundHandler为例
1、LifeCyCleTestHandler
运行 NettyServer,控制台输出:
1 | 逻辑处理器被添加:handlerAdded() |
2、回调方法的执行顺序
3、每个回调方法的含义
略
4、一幅图来标识 ChannelHandler 的生命周期
ChannelHandler 生命周期各回调方法用法举例
ChannelInitializer 的实现原理
initChannel()方法是利用handlerAdded()启动的
handlerAdded() 与 handlerRemoved()
这两个方法通常可以用在一些资源的申请和释放
channelActive() 与 channelInActive()
- 通常我们在这两个回调里面统计单机的连接数,
channelActive()被调用,连接数加一,channelInActive()被调用,连接数减一 - 也可以在
channelActive()方法中,实现对客户端连接 ip 黑白名单的过滤
- 通常我们在这两个回调里面统计单机的连接数,
channelRead()
案例:拆包器,可参考阅读:netty源码分析之拆包器的奥秘
channelReadComplete()
在每次向客户端写数据的时候,都通过
writeAndFlush()的方法写并刷新到底层,其实这种方式不是特别高效。可以在之前调用
writeAndFlush()的地方都换成write()方法,然后在这个方面里面调用ctx.channel().flush()方法,相当于一个批量刷新的机制如果对性能要求没那么高,
writeAndFlush()足矣。
实战:使用 channelHandler 的热插拔实现客户端身份校验
- 如果有很多业务逻辑的 handler 都要进行某些相同的操作,我们完全可以抽取出一个 handler 来单独处理
- 如果某一个独立的逻辑在执行几次之后(这里是一次)不需要再执行了,那么我们可以通过 ChannelHandler 的热插拔机制来实现动态删除逻辑,应用程序性能处理更为高效
- 很多操作在执行前都要进行登录校验,所以将其抽取为单独的handler;只要连接未断开,客户端只要成功登录过,后续就不需要每次都进行客户端的身份校验。
1、身份校验
1 | // --LoginRequestHandler.java |
2、移除校验逻辑
客户端只要成功登录过,后续就不需要再进行客户端的身份校验
1 | // --AuthHandler.java |
3、 身份校验演示
1 | // --NettyClient.java |
有身份认证的演示
无身份认证的演示:令用户名密码错误
1 | // --LoginResponseHandler.java |
未完待续…












