简介
1、Netty 是一个高性能网络通信框架,广泛用于互联网中间件领域网络通信层
2、项目:Netty实现单聊和群聊(控制台交互,不写UI)
目录
[TOC]
项目参考:https://juejin.im/book/5b4bc28bf265da0f60130116
原作者源码位置:https://github.com/lightningMan/flash-netty
我的代码地址:https://gitee.com/machine4869/note_netty
项目简介
单聊与群聊的流程和指令集
单聊流程
单聊的指令
群聊流程
群聊要实现的指令集
具体内容:略
Netty
涉及的核心知识点:
- 服务端如何启动
- 客户端如何启动
- 数据载体 ByteBuf
- 长连自定义协议如何设计
- 粘包拆包原理与实践
- 如何实现自定义编解码
- pipeline 与 channelHandler
- 定时发心跳怎么做
- 如何进行连接空闲检测
客户端使用 Netty 的程序逻辑结构
服务端使用 Netty 的程序逻辑结构
具体内容:略
Netty 基础
见我的blog【Netty 基础】,用netty实现了简单的c/s通信
略
Netty 环境配置
使用分支功能看源码
服务端启动流程
切换分支查看代码
NettyServer.java
1、最小化参数配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| - 两大线程组: bossGroup(接活的老板): 监听端口,accept新连接 workerGroup(干活的员工): 处理每一条连接的数据读写
- serverBootstrap 引导类,引导我们进行服务端的启动工作,直接new出来开搞。 - .group(bossGroup, workerGroup) 给引导类配置两大线程组,这个引导类的线程模型也就定型了
-.channel(NioServerSocketChannel.class) 来指定 IO 模型,当然,这里也有其他的选择,如果你想指定 IO 模型为 BIO,那么这里配置上OioServerSocketChannel.class类型即可,当然通常我们也不会这么做,因为Netty的优势就在于NIO。
- childHandler() 给这个引导类创建一个ChannelInitializer,这里主要就是定义后续每条连接的数据读写,业务处理逻辑。
ChannelInitializer这个类中,我们注意到有一个泛型参数NioSocketChannel,这个类,就是 Netty 对 NIO 类型的连接的抽象,而我们前面NioServerSocketChannel也是对 NIO 类型的连接的抽象
NioServerSocketChannel和NioSocketChannel的概念可以和 BIO 编程模型中(IOServer)的ServerSocket以及Socket两个概念对应上
- 总结: 要启动一个Netty服务端,必须要指定三类属性,分别是线程模型(.group)、IO 模型(.channel)、连接读写处理逻辑(childHandler()),有了这三者,之后在调用bind(8000),我们就可以在本地绑定一个 8000 端口启动起来
|
2、自动绑定递增端口
1 2 3 4 5 6 7 8 9 10
| NettyServer.bind()
递归绑定端口,直到成功
结果: ... 端口[1021]绑定失败! 端口[1022]绑定失败! 端口[1023]绑定失败! 端口[1024]绑定成功!
|
3、服务端启动其他方法
1 2 3 4 5 6
| - handler() - attr() - childAttr() - childOption() - option() 解释:略
|
客户端启动流程
切换分支查看代码
NettyClient.java
1、标准流程
1 2 3 4 5 6 7
| bootstrap .group(workerGroup) .channel(NioSocketChannel.class) // 3.IO 处理逻辑 .handler(xxx)
|
2、失败重连
1 2 3 4 5
| bootstrap.connect(host, port).addListener(xxx)
- 指数退避逻辑:1 秒、2 秒、4 秒、8 秒,以 2 的幂次来建立连接,默认重试 5 次 - schedule 可实现定时任务逻辑,延迟delay时间执行connect方法
|
3、客户端启动其他方法
1 2 3
| - attr() - option() 解释:略
|
实战:客户端与服务端双向通信
切换分支查看代码
功能:客户端连接成功之后,向服务端写一段数据 ,服务端收到数据之后打印,并向客户端回一段数据。
1、客户端发数据到服务端
1 2 3 4 5
| - ch.pipeline() 返回的是和这条连接相关的逻辑处理链 - addLast() 方法 添加一个逻辑处理器。为的就是在客户端建立连接成功之后,向服务端写数据 - ChannelInboundHandlerAdapter.channelActive 在客户端连接建立成功之后被调用 - Netty 里面数据是以 ByteBuf 为单位的
|
demo
1 2 3 4 5 6 7
| .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new FirstClientHandler()); } });
|
2、服务端读取客户端数据
1 2 3 4
| - ChannelInboundHandlerAdapter.channelRead 在接收到客户端发来的数据之后被回调 - msg 参数 这里强转之后,然后调用 byteBuf.toString() 就能够拿到我们客户端发过来的字符串数据
|
demo
1 2 3 4 5
| .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new FirstServerHandler()); } });
|
3、服务端回数据给客户端
1 2 3
| - 覆写channelRead() - 创建填充ByteBuf - writeAndFlush()
|
4、客户端读取数据
数据传输载体 ByteBuf 介绍
1、ByteBuf结构
1 2 3
| - readerIndex、writerIndex - capacity、maxCapacity - 有效地区分可读数据和可写数据,读写之间相互没有冲突
|
2、ByteBuf 常用的 API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| 解释:略
容量 APIAPI - capacity() - maxCapacity() - readableBytes() 与 isReadable() - writableBytes()、 isWritable() 与 maxWritableBytes()
读写指针相关的 API - readerIndex() 与 readerIndex(int) - writeIndex() 与 writeIndex(int) - markReaderIndex() 与 resetReaderIndex() // 推荐使用
读写 API - writeBytes(byte[] src) 与 buffer.readBytes(byte[] dst) - writeByte(byte b) 与 buffer.readByte() - release() 与 retain() Netty 使用了堆外内存,申请到的内存无法被垃圾回收器直接回收,所以需要我们手动回收。 Netty 的 ByteBuf 是通过引用计数的方式管理的。
- slice()、duplicate()、copy() - retainedSlice() 与 retainedDuplicate() 内存共享,引用计数共享,读写指针不共享
错误demo 1、多次释放 2、不释放造成内存泄漏
要点:在一个函数体里面,只要增加了引用计数(包括 ByteBuf 的创建和手动调用 retain() 方法),就必须调用 release() 方法
|
3、实战
切换分支查看代码
ByteBufTest.java
一个具体的例子说明 ByteBuf 的实际使用
客户端与服务端通信协议编解码
如何设计并实现客户端与服务端的通信协议
1、什么是服务端与客户端的通信协议
例:一个简单的登录指令数据包格式
2、通信协议的设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| 1、魔数 通常情况下为固定的几个字节。识别该数据包是自定义协议
2、版本号 通常情况下是预留字段,用于协议升级的时候用到
3、序列化算法 Java对象转换二进制数据是如何转换的。比如 Java 自带的序列化,json,hessian 等序列化方式
4、指令 服务端或者客户端每收到一种指令都会有相应的处理逻辑,用一个字节来表示,最高支持256种指令,对于我们这个 IM 系统来说已经完全足够了。
5、数据长度
6、数据内容
这样一套标准的协议能够适配大多数情况下的服务端与客户端的通信场景
|
3、通信协议的实现
如何使用 Netty 来实现这套协议
定义协议对象
1 2 3 4 5 6 7 8 9 10 11 12
| @Data public abstract class Packet {
private Byte version = 1;
|
例:定义登录请求数据包
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public interface Command {
Byte LOGIN_REQUEST = 1; }
@Data public class LoginRequestPacket extends Packet { private Integer userId;
private String username;
private String password;
@Override public Byte getCommand() { return LOGIN_REQUEST; } }
|
序列化/反序列化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public interface Serializer {
byte getSerializerAlgorithm();
byte[] serialize(Object object);
<T> T deserialize(Class<T> clazz, byte[] bytes); }
|
本项目方案:
使用最简单的 json 序列化方式,使用阿里巴巴的 fastjson 作为序列化框架。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public interface SerializerAlgorithm {
byte JSON = 1; }
public class JSONSerializer implements Serializer { @Override public byte getSerializerAlgorithm() { return SerializerAlgorithm.JSON; }
@Override public byte[] serialize(Object object) { return JSON.toJSONBytes(object); }
@Override public <T> T deserialize(Class<T> clazz, byte[] bytes) { return JSON.parseObject(bytes, clazz); } }
|
使用的默认值
1 2 3 4 5 6 7 8 9 10
| public interface Serializer {
byte JSON_SERIALIZER = 1;
Serializer DEFAULT = new JSONSerializer();
}
|
编码:封装成二进制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
private static final int MAGIC_NUMBER = 0x12345678;
public ByteBuf encode(Packet packet) { ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer(); byte[] bytes = Serializer.DEFAULT.serialize(packet);
byteBuf.writeInt(MAGIC_NUMBER); byteBuf.writeByte(packet.getVersion()); byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm()); byteBuf.writeByte(packet.getCommand()); byteBuf.writeInt(bytes.length); byteBuf.writeBytes(bytes);
return byteBuf; }
|
解码:解析 Java 对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public Packet decode(ByteBuf byteBuf) { byteBuf.skipBytes(4);
byteBuf.skipBytes(1);
byte serializeAlgorithm = byteBuf.readByte();
byte command = byteBuf.readByte();
int length = byteBuf.readInt();
byte[] bytes = new byte[length]; byteBuf.readBytes(bytes);
Class<? extends Packet> requestType = getRequestType(command); Serializer serializer = getSerializer(serializeAlgorithm);
if (requestType != null && serializer != null) { return serializer.deserialize(requestType, bytes); }
return null; }
|
PacketCodeCTest.java 是对编解码过程的测试用例
实战:Netty 实现客户端登录
1、登录流程
实现客户端登录到服务端的过程
2、逻辑处理器
客户端
1 2 3 4 5 6 7 8 9
| public class ClientHandler extends ChannelInboundHandlerAdapter { }
bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new ClientHandler()); } });
|
服务端
1 2 3 4 5 6 7 8
| public class ServerHandler extends ChannelInboundHandlerAdapter { }
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new ServerHandler()); } }
|
3、客户端发送登录请求
客户端处理登录请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
public void channelActive(ChannelHandlerContext ctx) { System.out.println(new Date() + ": 客户端开始登录");
LoginRequestPacket loginRequestPacket = new LoginRequestPacket(); loginRequestPacket.setUserId(UUID.randomUUID().toString()); loginRequestPacket.setUsername("flash"); loginRequestPacket.setPassword("pwd");
ByteBuf buffer = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginRequestPacket);
ctx.channel().writeAndFlush(buffer); }
|
服务端处理登录请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf requestByteBuf = (ByteBuf) msg;
Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);
if (packet instanceof LoginRequestPacket) { LoginRequestPacket loginRequestPacket = (LoginRequestPacket) packet;
if (valid(loginRequestPacket)) { } else { } } }
private boolean valid(LoginRequestPacket loginRequestPacket) { return true; }
|
4、服务端发送登录响应
服务端处理登录响应
1 2 3 4 5 6 7 8 9 10 11 12 13
|
LoginResponsePacket loginResponsePacket = new LoginResponsePacket(); loginResponsePacket.setVersion(packet.getVersion()); if (valid(loginRequestPacket)) { loginResponsePacket.setSuccess(true); } else { loginResponsePacket.setReason("账号密码校验失败"); loginResponsePacket.setSuccess(false); }
ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginResponsePacket); ctx.channel().writeAndFlush(responseByteBuf);
|
客户端处理登录响应
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg;
Packet packet = PacketCodeC.INSTANCE.decode(byteBuf);
if (packet instanceof LoginResponsePacket) { LoginResponsePacket loginResponsePacket = (LoginResponsePacket) packet;
if (loginResponsePacket.isSuccess()) { System.out.println(new Date() + ": 客户端登录成功"); } else { System.out.println(new Date() + ": 客户端登录失败,原因:" + loginResponsePacket.getReason()); } } }
|
测试
1
| 分别启动 NettyServer.java 与 NettyClient.java 即可看到效果
|
实战:实现客户端与服务端收发消息
具体功能是:在控制台输入一条消息之后按回车,校验完客户端的登录状态之后,把消息发送到服务端,服务端收到消息之后打印并且向客户端发送一条消息,客户端收到之后打印。
1、收发消息对象
c>s
1 2 3 4 5 6 7 8 9 10
| @Data public class MessageRequestPacket extends Packet {
private String message;
@Override public Byte getCommand() { return MESSAGE_REQUEST; } }
|
s>c
1 2 3 4 5 6 7 8 9 10 11
| @Data public class MessageResponsePacket extends Packet {
private String message;
@Override public Byte getCommand() {
return MESSAGE_RESPONSE; } }
|
加入PacketCodeC
1 2 3 4 5 6 7 8 9 10 11 12
| private PacketCodeC() { packetTypeMap = new HashMap<>(); packetTypeMap.put(LOGIN_REQUEST, LoginRequestPacket.class); packetTypeMap.put(LOGIN_RESPONSE, LoginResponsePacket.class); packetTypeMap.put(MESSAGE_REQUEST, MessageRequestPacket.class); packetTypeMap.put(MESSAGE_RESPONSE, MessageResponsePacket.class);
serializerMap = new HashMap<>(); Serializer serializer = new JSONSerializer(); serializerMap.put(serializer.getSerializerAlogrithm(), serializer); }
|
2、判断客户端是否登录成功
避免重复登录。可以在登录成功之后,给客户端连接 Channel 绑定一个登录成功的标志位。
定义一下是否登录成功的标志位
1 2 3
| public interface Attributes { AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login"); }
|
登录成功之后,给客户端绑定登录成功的标志位
1 2 3 4 5 6 7 8 9 10
| public void channelRead(ChannelHandlerContext ctx, Object msg) { if (loginResponsePacket.isSuccess()) { LoginUtil.markAsLogin(ctx.channel()); System.out.println(new Date() + ": 客户端登录成功"); } else { System.out.println(new Date() + ": 客户端登录失败,原因:" + loginResponsePacket.getReason()); } }
|
2、控制台输入消息并发送
在客户端连接上服务端之后启动控制台线程,从控制台获取消息,然后发送至服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| private static void connect(Bootstrap bootstrap, String host, int port, int retry) { bootstrap.connect(host, port).addListener(future -> { if (future.isSuccess()) { Channel channel = ((ChannelFuture) future).channel(); startConsoleThread(channel); } }); }
private static void startConsoleThread(Channel channel) { new Thread(() -> { while (!Thread.interrupted()) { if (LoginUtil.hasLogin(channel)) { System.out.println("输入消息发送至服务端: "); Scanner sc = new Scanner(System.in); String line = sc.nextLine(); MessageRequestPacket packet = new MessageRequestPacket(); packet.setMessage(line); ByteBuf byteBuf = PacketCodeC.INSTANCE.encode(channel.alloc(), packet); channel.writeAndFlush(byteBuf); } } }).start(); }
|
3、服务端收发消息处理
ServerHandler.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf requestByteBuf = (ByteBuf) msg;
Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);
if (packet instanceof LoginRequestPacket) { } else if (packet instanceof MessageRequestPacket) { MessageRequestPacket messageRequestPacket = ((MessageRequestPacket) packet); System.out.println(new Date() + ": 收到客户端消息: " + messageRequestPacket.getMessage());
MessageResponsePacket messageResponsePacket = new MessageResponsePacket(); messageResponsePacket.setMessage("服务端回复【" + messageRequestPacket.getMessage() + "】"); ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), messageResponsePacket); ctx.channel().writeAndFlush(responseByteBuf); } }
|
4、客户端收消息处理
ClientHandler.java
1 2 3 4 5 6 7 8 9 10 11 12
| public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg;
Packet packet = PacketCodeC.INSTANCE.decode(byteBuf);
if (packet instanceof LoginResponsePacket) { } else if (packet instanceof MessageResponsePacket) { MessageResponsePacket messageResponsePacket = (MessageResponsePacket) packet; System.out.println(new Date() + ": 收到服务端的消息: " + messageResponsePacket.getMessage()); } }
|
测试
1
| 分别启动 NettyServer.java 与 NettyClient.java 即可看到效果。
|
控制台输出
客户端
1 2 3 4 5 6 7 8 9 10
| Sun Jul 28 16:03:56 CST 2019: 连接成功,启动控制台线程…… Sun Jul 28 16:03:56 CST 2019: 客户端开始登录 输入消息发送至服务端: Sun Jul 28 16:03:57 CST 2019: 客户端登录成功 你好,我是Machine 输入消息发送至服务端: Sun Jul 28 16:04:12 CST 2019: 收到服务端的消息: 服务端回复【你好,我是Machine】 我在学Netty通信 输入消息发送至服务端: Sun Jul 28 16:04:25 CST 2019: 收到服务端的消息: 服务端回复【我在学Netty通信】
|
服务端
1 2 3 4 5
| 端口[8000]绑定成功! Sun Jul 28 16:03:57 CST 2019: 收到客户端登录请求…… Sun Jul 28 16:03:57 CST 2019: 登录成功! Sun Jul 28 16:04:12 CST 2019: 收到客户端消息: 你好,我是Machine Sun Jul 28 16:04:25 CST 2019: 收到客户端消息: 我在学Netty通信
|