目录
[TOC]
实战:客户端互聊原理与实现
1、最终效果
- 客户端启动之后,我们在控制台输入用户名,服务端随机分配一个 userId 给客户端,这里我们省去了通过账号密码注册的过程,userId 就在服务端随机生成了,生产环境中可能会持久化在数据库,然后每次通过账号密码去“捞”。
- 当有两个客户端登录成功之后,在控制台输入
userId + 空格 + 消息,这里的 userId 是消息接收方的标识, 消息接收方的控制台接着就会显示另外一个客户端发来的消息。
2、 一对一单聊原理
- A 要和 B 聊天,首先 A 和 B 需要与服务器建立连接,然后进行一次登录流程,服务端保存用户标识和 TCP 连接的映射关系。
- A 发消息给 B,首先需要将带有 B 标识的消息数据包发送到服务器,然后服务器从消息数据包中拿到 B 的标识,找到对应的 B 的连接,将消息发送给 B。
3、一对一单聊实现
总思路:
- 我们定义一个会话类
Session用户维持用户的登录信息,用户登录的时候绑定 Session 与 channel,用户登出或者断线的时候解绑 Session 与 channel。- 服务端处理消息的时候,通过消息接收方的标识,拿到消息接收方的 channel,调用
writeAndFlush()将消息发送给消息接收方。
- 用户登录状态与 channel 的绑定
登录的时候保存会话信息,登出的时候删除会话信息。
1 | LoginRequestHandler.java |
- 服务端接收消息并转发的实现
- 服务端在收到客户端发来的消息之后,首先拿到当前用户,也就是消息发送方的会话信息。
- 拿到消息发送方的会话信息之后,构造一个发送给客户端的消息对象
MessageResponsePacket,填上发送消息方的用户标识、昵称、消息内容。- 通过消息接收方的标识拿到对应的 channel。
- 如果消息接收方当前是登录状态,直接发送,如果不在线,控制台打印出一条警告消息。
1 | MessageRequestPacket.java |
- 客户端收消息的逻辑处理
客户端收到消息之后,只是把当前消息打印出来
1 | MessageResponseHandler.java |
- 客户端控制台登录和发送消息
我们在客户端启动的时候,起一个线程
- 如果当前用户还未登录,我们在控制台输入一个用户名,然后构造一个登录数据包发送给服务器,发完之后,我们等待一个超时时间,可以当做是登录逻辑的最大处理时间,这里就简单粗暴点了。
- 如果当前用户已经是登录状态,我们可以在控制台输入消息接收方的 userId,然后输入一个空格,再输入消息的具体内容,然后,我们就可以构建一个消息数据包,发送到服务端。
1 | NettyClient.java |
测试:略
实战:群聊的发起与通知
如何创建一个群聊,并通知到群聊中的各位成员
1、最终效果
- 首先,依然是三位用户依次登录到服务器,分别是闪电侠、极速、萨维塔。
- 然后,我们在闪电侠的控制台输入
createGroup指令,提示创建群聊需要输入 userId 列表,然后我们输入以英文逗号分隔的 userId。- 群聊创建成功之后,分别在服务端和三个客户端弹出提示消息,包括群的 ID 以及群里各位用户的昵称。
2、群聊原理
- A,B,C 依然会经历登录流程,服务端保存用户标识对应的 TCP 连接
- A 发起群聊的时候,将 A,B,C 的标识发送至服务端,服务端拿到之后建立一个群聊 ID,然后把这个 ID 与 A,B,C 的标识绑定
- 群聊里面任意一方在群里聊天的时候,将群聊 ID 发送至服务端,服务端拿到群聊 ID 之后,取出对应的用户标识,遍历用户标识对应的 TCP 连接,就可以将消息发送至每一个群聊成员
3、控制台程序重构
由于控制台输入的指令越来越多,需要重构
- 我们在这个管理类中,把所有要管理的控制台指令都塞到一个 map 中。
- 执行具体操作的时候,我们先获取控制台第一个输入的指令,这里以字符串代替,比较清晰(这里我们已经实现了上小节课后思考题中的登出操作),然后通过这个指令拿到对应的控制台命令执行器执行。
- 创建控制台命令执行器
1 | ConsoleCommand.java |
- 管理控制台命令执行器
1 | ConsoleCommandManager.java |
举个栗子:
首先,我们在控制台输入
createGroup,然后我们按下回车,就会进入CreateGroupConsoleCommand这个类进行处理进入到
CreateGroupConsoleCommand的逻辑之后,我们创建了一个群聊创建请求的数据包,然后提示输入以英文逗号分隔的 userId 的列表,填充完这个数据包之后,调用writeAndFlush()我们就可以发送一个创建群聊的指令到服务端。
最后,客户端的控制台线程相关的代码。
1 | NettyClient.java startConsoleThread |
4、创建群聊的实现
- 客户端发送创建群聊请求
发送一个
CreateGroupRequestPacket数据包到服务端,它只包含了一个列表,这个列表就是需要拉取群聊的用户列表
1 | CreateGroupRequestPacket.java |
- 服务端处理创建群聊请求
整个过程可以分为以下几个过程
- 首先,我们这里创建一个
ChannelGroup。这里简单介绍一下ChannelGroup:它可以把多个 chanel 的操作聚合在一起,可以往它里面添加删除 channel,可以进行 channel 的批量读写,关闭等操作,详细的功能读者可以自行翻看这个接口的方法。这里我们一个群组其实就是一个 channel 的分组集合,使用ChannelGroup非常方便。- 接下来,我们遍历待加入群聊的 userId,如果存在该用户,就把对应的 channel 添加到
ChannelGroup中,用户昵称也添加到昵称列表中。- 然后,我们创建一个创建群聊响应的对象,其中
groupId是随机生成的,群聊创建结果一共三个字段,这里就不展开对这个类进行说明了。- 最后,我们调用
ChannelGroup的聚合发送功能,将拉群的通知批量地发送到客户端,接着在服务端控制台打印创建群聊成功的信息,至此,服务端处理创建群聊请求的逻辑结束。
1 | // --NettyServer.java |
- 客户端处理创建群聊响应
仅仅是把创建群聊成功之后的具体信息打印出来
1 | // --NettyClient.java |
实战:群聊的成员管理(加入与退出,获取成员列表)
加入群聊、退出群聊、获取群成员列表
1、最终效果
- 闪电侠先拉逆闪和极速加入了群聊,控制台输出群创建成功的消息。
- 随后在萨维塔的控制台输入 “joinGroup” 之后再输入群聊的 id,加入群聊,控制台显示加入群成功。
- 在闪电侠的控制台输入 “listGroupMembers” 之后再输入群聊的 id,展示了当前群聊成员包括了极速、萨维塔、闪电侠、逆闪。
- 萨维塔的控制台输入 “quitGroup” 之后再输入群聊的 id,退出群聊,控制台显示退群成功。
- 最后在闪电侠的控制台输入 “listGroupMembers” 之后再输入群聊的 ID,展示了当前群聊成员已无萨维塔。
2、群的加入
- 控制台添加群加入命令处理器
我们在控制台先添加群加入命令处理器
JoinGroupConsoleCommand,在这个处理器中,我们创建一个指令对象JoinGroupRequestPacket,填上群 id 之后,将数据包发送至服务端。之后,我们将该控制台指令添加到ConsoleCommandManager。
1 | // --JoinGroupConsoleCommand.java |
- 服务端处理群加入请求
- 首先,通过 groupId 拿到对应的
ChannelGroup,之后,只需要调用ChannelGroup.add()方法,将加入群聊的用户的 channel 添加进去,服务端即完成了加入群聊的逻辑。- 然后,构造一个加群响应,填入 groupId 之后,调用
writeAndFlush()发送给加入群聊的客户端。
1 | // --NettyServer.java |
- 客户端处理群加入响应
该处理器的逻辑很简单,只是简单的将加群的结果输出到控制台
1 | // --NettyClient.java |
3、群的退出
QuitGroupRequestHandler和JoinGroupRequestHandler其实是一个逆向的过程
- 首先,通过 groupId 拿到对应的
ChannelGroup,之后,只需要调用ChannelGroup.remove()方法,将当前用户的 channel 删除,服务端即完成了退群的逻辑。- 然后,构造一个退群响应,填入 groupId 之后,调用
writeAndFlush()发送给退群的客户端。
1 | // --QuitGroupRequestHandler.java |
4、获取成员列表
- 控制台添加获取群列表命令处理器
我们在控制台先添加获取群列表命令处理器
ListGroupMembersConsoleCommand,在这个处理器中,我们创建一个指令对象ListGroupMembersRequestPacket,填上群 id 之后,将数据包发送至服务端。之后,我们将该控制台指令添加到ConsoleCommandManager。
1 | // --ListGroupMembersConsoleCommand.java |
- 服务端处理获取成员列表请求
- 首先,我们通过 groupId 拿到对应的
ChannelGroup。- 接着,我们创建一个 sessionList 用来装载群成员信息,我们遍历 channel 的每个 session,把对应的用户信息装到 sessionList 中,实际生产环境中,这里可能会构造另外一个对象来装载用户信息而非 Session,这里我们就简单粗暴点了,改造起来不难。
- 最后,我们构造一个获取成员列表的响应指令数据包,填入 groupId 和群成员的信息之后,调用
writeAndFlush()发送给发起获取成员列表的客户端。
1 | // --NettyServer.java |
- 客户端处理获取成员列表响应
1 | NettyClient.java |
5、总结
添加一个服务端和客户端交互的新功能只需要遵循以下的步骤:
- 创建控制台指令对应的
ConsoleCommand并添加到ConsoleCommandManager。 - 控制台输入指令和数据之后填入协议对应的指令数据包 -
xxxRequestPacket,将请求写到服务端。 - 服务端创建对应的
xxxRequestPacketHandler并添加到服务端的 pipeline 中,在xxxRequestPacketHandler处理完之后构造对应的xxxResponsePacket发送给客户端。 - 客户端创建对应的
xxxResponsePacketHandler并添加到客户端的 pipeline 中,最后在xxxResponsePacketHandler完成响应的处理。 - 最后,最容易忽略的一点就是,新添加
xxxPacket别忘了完善编解码器PacketCodec中的packetTypeMap!
实战:群聊消息的收发及 Netty 性能优化
1、群聊消息最终效果
- 在闪电侠的控制台,输入 “sendToGroup” 指令之后,再输入 groupId + 空格 + 消息内容,发送消息给群里各位用户,随后,群组里的所有用户的控制台都显示了群消息。
- 随后,陆续在逆闪和极速的控制台做做相同的操作,群组里的所有用户的控制台陆续展示了群消息
2、群聊消息的收发的实现
- 首先,通过 groupId 构造群聊响应
GroupMessageResponsePacket,然后再把发送群聊的用户信息填入,这里的用户信息我们就直接复用与 channel 绑定的 session了。- 然后,我们拿到对应群组的
ChannelGroup,通过writeAndFlush()写到客户端。
1 | GroupMessageRequestHandler.java |
3、共享 handler
问题引入:
- 服务端的 pipeline 链里面已经有 12 个 handler,其中,与指令相关的 handler 有 9 个。
- Netty 在这里的逻辑是:每次有新连接到来的时候,都会调用
ChannelInitializer的initChannel()方法,然后这里 9 个指令相关的 handler 都会被 new 一次。- 其实这里的每一个指令 handler,他们内部都是没有成员变量的,也就是说是无状态的,我们完全可以使用单例模式,即调用
pipeline().addLast()方法的时候,都直接使用单例,不需要每次都 new,提高效率,也避免了创建很多小的对象。
拿 LoginRequestHandler 举例,来看一下如何改造
- 首先,非常重要的一点,如果一个 handler 要被多个 channel 进行共享,必须要加上
@ChannelHandler.Sharable显示地告诉 Netty,这个 handler 是支持多个 channel 共享的,否则会报错,读者可以自行尝试一下。- 然后,我们仿照 Netty 源码里面单例模式的写法,构造一个单例模式的类。
1 | LoginRequestHandler.java |
服务端的代理里面就可以这么写
1 | // --NettyServer.java |
4、压缩 handler - 合并编解码器
问题引入:
pipeline 中第一个 handler -
Spliter,我们是无法动它的,因为他内部实现是与每个channel有关,每个Spliter需要维持每个 channel 当前读到的数据,也就是说他是有状态的。而
PacketDecoder与PacketEncoder我们是可以继续改造的,Netty 内部提供了一个类,叫做MessageToMessageCodec,使用它可以让我们的编解码操作放到一个类里面去实现,首先我们定义一个PacketCodecHandlerPacketCodecHandler
- 首先,这里
PacketCodecHandler,他是一个无状态的 handler,因此,同样可以使用单例模式来实现。- 我们看到,我们需要实现
decode()和encode()方法,decode 是将二进制数据 ByteBuf 转换为 java 对象 Packet,而 encode 操作是一个相反的过程,在encode()方法里面,我们调用了 channel 的 内存分配器手工分配了ByteBuf。
1 | // --PacketCodecHandler.java |
5、缩短事件传播路径
问题引入:
随着指令相关的 handler 越来越多,handler 链越来越长,在事件传播过程中性能损耗会被逐渐放大,因为解码器解出来的每个 Packet 对象都要在每个 handler 上经过一遍,我们接下来来看一下如何缩短这个事件传播的路径。
- 压缩 handler - 合并平行 handler
对我们这个应用程序来说,每次 decode 出来一个指令对象之后,其实只会在一个指令 handler 上进行处理,因此,我们其实可以把这么多的指令 handler 压缩为一个 handler
IMHandler:
- 首先,IMHandler 是无状态的,依然是可以写成一个单例模式的类。
- 我们定义一个 map,存放指令到各个指令处理器的映射。
- 每次回调到 IMHandler 的
channelRead0()方法的时候,我们通过指令找到具体的 handler,然后调用指令 handler 的channelRead,他内部会做指令类型转换,最终调用到每个指令 handler 的channelRead0()方法。效果:
所有的平行指令处理 handler,我们都压缩到了一个
IMHandler,并且IMHandler和指令 handler 均为单例模式,在单机十几万甚至几十万的连接情况下,性能能得到一定程度的提升
1 | // --IMHandler.java |
- 更改事件传播源
ctx.writeAndFlush()事件传播路径
ctx.channel().writeAndFlush()事件传播路径?
在我们的应用程序中,当我们没有改造编解码之前,我们必须调用
ctx.channel().writeAndFlush(), 而经过改造之后,我们的编码器(既属于 inBound, 又属于 outBound 类型的 handler)已处于 pipeline 的最前面,因此,可以大胆使用ctx.writeAndFlush()。
6、减少阻塞主线程的操作
问题引入:
只要有一个 channel 的一个 handler 中的
channelRead0()方法阻塞了 NIO 线程,最终都会拖慢绑定在该 NIO 线程上的其他所有的 channel
而我们需要怎么做?对于耗时的操作,我们需要把这些耗时的操作丢到我们的业务线程池中去处理。下面是解决方案的伪代码:
1 | ThreadPool threadPool = xxx; |
这样,就可以避免一些耗时的操作影响 Netty 的 NIO 线程,从而影响其他的 channel。
7、如何准确统计处理时长
writeAndFlush()这个方法如果在非 NIO 线程(这里,我们其实是在业务线程中调用了该方法)中执行,它是一个异步的操作,调用之后,其实是会立即返回的,剩下的所有的操作,都是 Netty 内部有一个任务队列异步执行的
那么如何才能判断 writeAndFlush() 执行完毕呢?我们可以这么做
1 | protected void channelRead0(ChannelHandlerContext ctx, T packet) { |
实战:心跳与空闲检测
1、网络问题
连接假死
连接假死的现象是:在某一端(服务端或者客户端)看来,底层的 TCP 连接已经断开了,但是应用程序并没有捕获到,因此会认为这条连接仍然是存在的,从 TCP 层面来说,只有收到四次握手数据包或者一个 RST 数据包,连接的状态才表示已断开。
连接假死会带来以下两大问题
- 对于服务端来说,因为每条连接都会耗费 cpu 和内存资源,大量假死的连接会逐渐耗光服务器的资源,最终导致性能逐渐下降,程序奔溃。
- 对于客户端来说,连接假死会造成发送数据超时,影响用户体验。
通常,连接假死由以下几个原因造成的
- 应用程序出现线程堵塞,无法进行数据的读写。
- 客户端或者服务端网络相关的设备出现故障,比如网卡,机房故障。
- 公网丢包。公网环境相对内网而言,非常容易出现丢包,网络抖动等现象,如果在一段时间内用户接入的网络连续出现丢包现象,那么对客户端来说数据一直发送不出去,而服务端也是一直收不到客户端来的数据,连接就一直耗着。
接下来,我们分别从服务端和客户端的角度来解决连接假死的问题。
2、服务端空闲检测
对于服务端来说,客户端的连接如果出现假死,那么服务端将无法收到客户端的数据,也就是说,如果能一直收到客户端发来的数据,那么可以说明这条连接还是活的,因此,服务端对于连接假死的应对策略就是空闲检测。
何为空闲检测?简化一下,我们的服务端只需要检测一段时间内,是否收到过客户端发来的数据即可,Netty 自带的 IdleStateHandler 就可以实现这个功能。
1 | IMIdleStateHandler.java |
- 首先,我们观察一下
IMIdleStateHandler的构造函数,他调用父类IdleStateHandler的构造函数,有四个参数,其中第一个表示读空闲时间,指的是在这段时间内如果没有数据读到,就表示连接假死;第二个是写空闲时间,指的是 在这段时间如果没有写数据,就表示连接假死;第三个参数是读写空闲时间,表示在这段时间内如果没有产生数据读或者写,就表示连接假死。写空闲和读写空闲为0,表示我们不关心者两类条件;最后一个参数表示时间单位。在我们的例子中,表示的是:如果 15 秒内没有读到数据,就表示连接假死。- 连接假死之后会回调
channelIdle()方法,我们这个方法里面打印消息,并手动关闭连接。
接下来,我们把这个 handler 插入到服务端 pipeline 的最前面
1 | NettyServer.java |
为什么要插入到最前面?是因为如果插入到最后面的话,如果这条连接读到了数据,但是在 inBound 传播的过程中出错了或者数据处理完完毕就不往后传递了(我们的应用程序属于这类),那么最终
IMIdleStateHandler就不会读到数据,最终导致误判。
思考,在一段时间之内没有读到客户端的数据,是否一定能判断连接假死呢?并不能,如果在这段时间之内客户端确实是没有发送数据过来,但是连接是 ok 的,那么这个时候服务端也是不能关闭这条连接的,为了防止服务端误判,我们还需要在客户端做点什么。
3、客户端定时发心跳
可以在客户端定期发送数据到服务端,通常这个数据包称为心跳数据包,接下来,我们定义一个 handler,定期发送心跳给服务端
1 | HeartBeatTimerHandler.java |
schedule(),类似 jdk 的延时任务机制,可以隔一段时间之后执行一个任务,而我们这边是实现了每隔 5 秒(实现方法是方法回调自己,每回调一次自己,都会触发计时器延时执行,回调会造成栈溢出吗??),向服务端发送一个心跳数据包实际在生产环境中,我们的发送心跳间隔时间和空闲检测时间可以略长一些,可以设置为几分钟级别
上面其实解决了服务端的空闲检测问题,服务端这个时候是能够在一定时间段之内关掉假死的连接,释放连接的资源了,但是对于客户端来说,我们也需要检测到假死的连接。
4、 服务端回复心跳与客户端空闲检测
客户端的空闲检测其实和服务端一样,依旧是在客户端 pipeline 的最前方插入 IMIdleStateHandler
1 | NettyClient.java |
服务端也要定期发送心跳给客户端。
而其实在前面我们已经实现了客户端向服务端定期发送心跳,服务端这边其实只要在收到心跳之后回复客户端,给客户端发送一个心跳响应包即可。如果在一段时间之内客户端没有收到服务端发来的数据,也可以判定这条连接为假死状态。
因此,服务端的 pipeline 中需要再加上如下一个 handler - HeartBeatRequestHandler,由于这个 handler 的处理其实是无需登录的,所以,我们将该 handler 放置在 AuthHandler 前面
1 | NettyServer.java |
实现非常简单,只是简单地回复一个
HeartBeatResponsePacket数据包。客户端在检测到假死连接之后,断开连接,然后可以有一定的策略去重连,重新登录等等…
思考
IMIdleStateHandler能否实现为单例模式,为什么?
不能,因为每一个连接都需维持一个属于自己的15秒空闲检测
总结
1. Netty 是什么?
经过我们整个小册的学习,我们可以了解到,Netty 其实可以看做是对 BIO 和 NIO 的封装,并提供良好的 IO 读写相关的 API,另外还提供了非常多的开箱即用的 handler,工具类等等。
2. 服务端和客户端启动
Netty 提供了两大启动辅助类,ServerBootstrap 和 Bootstrap, 他们的启动参数类似,都是分为
- 配置 IO 类型,配置线程模型。
- 配置 TCP 参数,attr 属性。
- 配置 handler。server 端除了配置 handler,还需要配置 childHandler,他是定义每条连接的处理器。
3. ByteBuf
接着,我们又学习了 Netty 对二进制数据的抽象类 ByteBuf,ByteBuf 底层又可以细分为堆内存和堆外内存,它的 API 要比 jdk 提供的 ByteBuffer 要更好用,ByteBuf 所有的操作其实都是基于读指针和写指针来进行操作的,把申请到的一块内存划分为可读区、可写区,另外还提供了自动扩容的功能。
4. 自定义协议拆包与编解码
通常,我们要实现客户端与服务端的通信,需要自定义协议,说白了就是双方商量在字节流里面,对应位置的字节段分别表示什么含义。
我们用的最多的协议呢就是基于长度的协议,一个协议数据包里面包含了一个长度字段,我们在解析的时候,首先第一步就是从字节流里面根据自定义协议截取出一个个数据包,使用的最多的拆包器就是 LengthFieldBasedFrameDecoder,只需要给他配置一些参数,即可实现自动拆包。
拆包之后呢,我们就拿到了代表字节流区段的一个个 ByteBuf,我们的解码器的作用就是把这些个 ByteBuf 变成一个个 java 对象,这样我们后续的 handler 就可以进行相应的逻辑的处理。
5. handler 与 pipeline
Netty 对逻辑处理流的处理其实和 TCP 协议栈的思路非常类似,分为输入和输出,也就是 inBound 和 outBound 类型的 handler,inBound 类 handler 的添加顺序与事件传播的顺序相同,而 outBound 类 handler 的添加顺序与事件传播的顺序相反,这里一定要注意。
无状态的 handler 可以改造为单例模式,但是千万记得要加 @ChannelHandler.Sharable 注解,平行等价的 handler 可以使用压缩的方式减少事件传播路径,调用 ctx.xxx() 而不是 ctx.channel().xxx() 也可以减少事件传播路径,不过要看应用场景。
另外,每个 handler 都有自己的生命周期,Netty 会在 channel 或者 channelHandler 处于不同状态的情况下回调相应的方法,channelHandler 也可以动态添加,特别适用于一次性处理的 handler,用完即删除,干干净净。
6. 耗时操作的处理与统计
对于耗时的操作,不要直接在 NIO 线程里做,比如,不要在 channelRead0() 方法里做一些访问数据库或者网络相关的逻辑,要扔到自定义线程池里面去做,然后要注意这个时候,writeAndFlush() 的执行是异步的,需要通过添加监听回调的方式来判断是否执行完毕,进而进行延时的统计。
关于进阶学习 Netty
参考原作者:扩展:进阶学习 Netty 的方向与资料




