avatar

目录
Netty项目-仿写微信 IM 即时通讯系统(1)

简介

1、Netty 是一个高性能网络通信框架,广泛用于互联网中间件领域网络通信层

2、项目:Netty实现单聊和群聊(控制台交互,不写UI)

目录

[TOC]

项目参考:https://juejin.im/book/5b4bc28bf265da0f60130116

原作者源码位置:https://github.com/lightningMan/flash-netty

我的代码地址:https://gitee.com/machine4869/note_netty

项目简介

单聊与群聊的流程和指令集

单聊流程

单聊的指令

群聊流程

群聊要实现的指令集

具体内容:略

Netty

涉及的核心知识点:

  • 服务端如何启动
  • 客户端如何启动
  • 数据载体 ByteBuf
  • 长连自定义协议如何设计
  • 粘包拆包原理与实践
  • 如何实现自定义编解码
  • pipeline 与 channelHandler
  • 定时发心跳怎么做
  • 如何进行连接空闲检测

客户端使用 Netty 的程序逻辑结构

服务端使用 Netty 的程序逻辑结构

具体内容:略

Netty 基础

见我的blog【Netty 基础】,用netty实现了简单的c/s通信

Netty 环境配置

使用分支功能看源码

服务端启动流程

切换分支查看代码

NettyServer.java

1、最小化参数配置

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
- 两大线程组:
bossGroup(接活的老板): 监听端口,accept新连接
workerGroup(干活的员工): 处理每一条连接的数据读写

- serverBootstrap
引导类,引导我们进行服务端的启动工作,直接new出来开搞。
- .group(bossGroup, workerGroup)
给引导类配置两大线程组,这个引导类的线程模型也就定型了

-.channel(NioServerSocketChannel.class)
来指定 IO 模型,当然,这里也有其他的选择,如果你想指定 IO 模型为 BIO,那么这里配置上OioServerSocketChannel.class类型即可,当然通常我们也不会这么做,因为Netty的优势就在于NIO。

- childHandler()
给这个引导类创建一个ChannelInitializer,这里主要就是定义后续每条连接的数据读写,业务处理逻辑。

ChannelInitializer这个类中,我们注意到有一个泛型参数NioSocketChannel,这个类,就是 Netty 对 NIO 类型的连接的抽象,而我们前面NioServerSocketChannel也是对 NIO 类型的连接的抽象

NioServerSocketChannel和NioSocketChannel的概念可以和 BIO 编程模型中(IOServer)的ServerSocket以及Socket两个概念对应上

- 总结:
要启动一个Netty服务端,必须要指定三类属性,分别是线程模型(.group)、IO 模型(.channel)、连接读写处理逻辑(childHandler()),有了这三者,之后在调用bind(8000),我们就可以在本地绑定一个 8000 端口启动起来

2、自动绑定递增端口

Code
1
2
3
4
5
6
7
8
9
10
NettyServer.bind()

递归绑定端口,直到成功

结果:
...
端口[1021]绑定失败!
端口[1022]绑定失败!
端口[1023]绑定失败!
端口[1024]绑定成功!

3、服务端启动其他方法

Code
1
2
3
4
5
6
- handler()
- attr()
- childAttr()
- childOption()
- option()
解释:略

客户端启动流程

切换分支查看代码

NettyClient.java

1、标准流程

java
1
2
3
4
5
6
7
bootstrap
// 1.指定线程模型
.group(workerGroup)
// 2.指定 IO 类型为 NIO
.channel(NioSocketChannel.class)
// 3.IO 处理逻辑
.handler(xxx)

2、失败重连

Code
1
2
3
4
5
bootstrap.connect(host, port).addListener(xxx)

- 指数退避逻辑:1 秒、2 秒、4 秒、8 秒,以 2 的幂次来建立连接,默认重试 5 次
- schedule
可实现定时任务逻辑,延迟delay时间执行connect方法

3、客户端启动其他方法

Code
1
2
3
- attr()
- option()
解释:略

实战:客户端与服务端双向通信

切换分支查看代码

功能:客户端连接成功之后,向服务端写一段数据 ,服务端收到数据之后打印,并向客户端回一段数据。

1、客户端发数据到服务端

Code
1
2
3
4
5
- ch.pipeline() 返回的是和这条连接相关的逻辑处理链
- addLast() 方法 添加一个逻辑处理器。为的就是在客户端建立连接成功之后,向服务端写数据
- ChannelInboundHandlerAdapter.channelActive
在客户端连接建立成功之后被调用
- Netty 里面数据是以 ByteBuf 为单位的

demo

java
1
2
3
4
5
6
7
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
// 自定义FirstClientHandler继承ChannelInboundHandlerAdapter
ch.pipeline().addLast(new FirstClientHandler());
}
});

2、服务端读取客户端数据

Code
1
2
3
4
- ChannelInboundHandlerAdapter.channelRead
在接收到客户端发来的数据之后被回调
- msg 参数
这里强转之后,然后调用 byteBuf.toString() 就能够拿到我们客户端发过来的字符串数据

demo

java
1
2
3
4
5
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new FirstServerHandler());
}
});

3、服务端回数据给客户端

Code
1
2
3
- 覆写channelRead()
- 创建填充ByteBuf
- writeAndFlush()

4、客户端读取数据

Code
1
2
- 覆盖 ChannelRead()
- 类似

数据传输载体 ByteBuf 介绍

1、ByteBuf结构

Code
1
2
3
- readerIndex、writerIndex
- capacity、maxCapacity
- 有效地区分可读数据和可写数据,读写之间相互没有冲突

2、ByteBuf 常用的 API

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
解释:略

容量 APIAPI
- capacity()
- maxCapacity()
- readableBytes() 与 isReadable()
- writableBytes()、 isWritable() 与 maxWritableBytes()

读写指针相关的 API
- readerIndex() 与 readerIndex(int)
- writeIndex() 与 writeIndex(int)
- markReaderIndex() 与 resetReaderIndex() // 推荐使用

读写 API
- writeBytes(byte[] src) 与 buffer.readBytes(byte[] dst)
- writeByte(byte b) 与 buffer.readByte()
- release() 与 retain()
Netty 使用了堆外内存,申请到的内存无法被垃圾回收器直接回收,所以需要我们手动回收。
Netty 的 ByteBuf 是通过引用计数的方式管理的。

- slice()、duplicate()、copy()
- retainedSlice() 与 retainedDuplicate()
内存共享,引用计数共享,读写指针不共享

错误demo
1、多次释放
2、不释放造成内存泄漏

要点:在一个函数体里面,只要增加了引用计数(包括 ByteBuf 的创建和手动调用 retain() 方法),就必须调用 release() 方法

3、实战

切换分支查看代码

ByteBufTest.java

一个具体的例子说明 ByteBuf 的实际使用

客户端与服务端通信协议编解码

如何设计并实现客户端与服务端的通信协议

1、什么是服务端与客户端的通信协议

例:一个简单的登录指令数据包格式

2、通信协议的设计

1653028b36ee5d81

Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1、魔数
通常情况下为固定的几个字节。识别该数据包是自定义协议

2、版本号
通常情况下是预留字段,用于协议升级的时候用到

3、序列化算法
Java对象转换二进制数据是如何转换的。比如 Java 自带的序列化,json,hessian 等序列化方式

4、指令
服务端或者客户端每收到一种指令都会有相应的处理逻辑,用一个字节来表示,最高支持256种指令,对于我们这个 IM 系统来说已经完全足够了。

5、数据长度

6、数据内容

这样一套标准的协议能够适配大多数情况下的服务端与客户端的通信场景

3、通信协议的实现

如何使用 Netty 来实现这套协议

定义协议对象

java
1
2
3
4
5
6
7
8
9
10
11
12
@Data
public abstract class Packet {
/**
* 协议版本
*/
private Byte version = 1;

/**
* 指令: 抽象方法
* /
public abstract Byte getCommand();
}

例:定义登录请求数据包

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface Command {

Byte LOGIN_REQUEST = 1;
}

@Data
public class LoginRequestPacket extends Packet {
private Integer userId;

private String username;

private String password;

@Override
public Byte getCommand() {

return LOGIN_REQUEST;
}
}

序列化/反序列化

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface Serializer {

/**
* 序列化算法
*/
byte getSerializerAlgorithm();

/**
* java 对象转换成二进制
*/
byte[] serialize(Object object);

/**
* 二进制转换成 java 对象
*/
<T> T deserialize(Class<T> clazz, byte[] bytes);
}

本项目方案:

使用最简单的 json 序列化方式,使用阿里巴巴的 fastjson 作为序列化框架。

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public interface SerializerAlgorithm {
/**
* json 序列化标识
*/
byte JSON = 1;
}


public class JSONSerializer implements Serializer {

@Override
public byte getSerializerAlgorithm() {

return SerializerAlgorithm.JSON;
}

@Override
public byte[] serialize(Object object) {

return JSON.toJSONBytes(object);
}

@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {

return JSON.parseObject(bytes, clazz);
}
}

使用的默认值

java
1
2
3
4
5
6
7
8
9
10
public interface Serializer {
/**
* json 序列化
*/
byte JSON_SERIALIZER = 1;

Serializer DEFAULT = new JSONSerializer();

// ...
}

编码:封装成二进制

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// PacketCodeC.java

private static final int MAGIC_NUMBER = 0x12345678;

public ByteBuf encode(Packet packet) {
// 1. 创建 ByteBuf 对象
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();
// 2. 序列化 Java 对象
byte[] bytes = Serializer.DEFAULT.serialize(packet);

// 3. 实际编码过程
byteBuf.writeInt(MAGIC_NUMBER);
byteBuf.writeByte(packet.getVersion());
byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm());
byteBuf.writeByte(packet.getCommand());
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);

return byteBuf;
}

解码:解析 Java 对象

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public Packet decode(ByteBuf byteBuf) {
// 跳过 magic number
byteBuf.skipBytes(4);

// 跳过版本号
byteBuf.skipBytes(1);

// 序列化算法标识
byte serializeAlgorithm = byteBuf.readByte();

// 指令
byte command = byteBuf.readByte();

// 数据包长度
int length = byteBuf.readInt();

byte[] bytes = new byte[length];
byteBuf.readBytes(bytes);

Class<? extends Packet> requestType = getRequestType(command);
Serializer serializer = getSerializer(serializeAlgorithm);

if (requestType != null && serializer != null) {
return serializer.deserialize(requestType, bytes);
}

return null;
}

PacketCodeCTest.java 是对编解码过程的测试用例

实战:Netty 实现客户端登录

1、登录流程

实现客户端登录到服务端的过程

16535d7424e02d3a

2、逻辑处理器

客户端

java
1
2
3
4
5
6
7
8
9
public class ClientHandler extends ChannelInboundHandlerAdapter {
}

bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ClientHandler());
}
});

服务端

java
1
2
3
4
5
6
7
8
public class ServerHandler extends ChannelInboundHandlerAdapter {
}

serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ServerHandler());
}
}

3、客户端发送登录请求

客户端处理登录请求

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 在连接上服务端之后,Netty 会回调到 ClientHandler 的 channelActive()
// 把 PacketCodeC 变成单例模式
// ctx.alloc() 获取的就是与当前连接相关的 ByteBuf 分配器
public void channelActive(ChannelHandlerContext ctx) {
System.out.println(new Date() + ": 客户端开始登录");

// 创建登录对象
LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
loginRequestPacket.setUserId(UUID.randomUUID().toString());
loginRequestPacket.setUsername("flash");
loginRequestPacket.setPassword("pwd");

// 编码
ByteBuf buffer = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginRequestPacket);

// 写数据
ctx.channel().writeAndFlush(buffer);
}

服务端处理登录请求

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf requestByteBuf = (ByteBuf) msg;

// 解码
Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);

// 判断是否是登录请求数据包
if (packet instanceof LoginRequestPacket) {
LoginRequestPacket loginRequestPacket = (LoginRequestPacket) packet;

// 登录校验
if (valid(loginRequestPacket)) {
// 校验成功
} else {
// 校验失败
}
}
}

private boolean valid(LoginRequestPacket loginRequestPacket) {
return true;
}

4、服务端发送登录响应

服务端处理登录响应

java
1
2
3
4
5
6
7
8
9
10
11
12
13
// ServerHandler 的 channelRead() 方法里

LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
loginResponsePacket.setVersion(packet.getVersion());
if (valid(loginRequestPacket)) {
loginResponsePacket.setSuccess(true);
} else {
loginResponsePacket.setReason("账号密码校验失败");
loginResponsePacket.setSuccess(false);
}
// 编码
ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginResponsePacket);
ctx.channel().writeAndFlush(responseByteBuf);

客户端处理登录响应

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;

Packet packet = PacketCodeC.INSTANCE.decode(byteBuf);

if (packet instanceof LoginResponsePacket) {
LoginResponsePacket loginResponsePacket = (LoginResponsePacket) packet;

if (loginResponsePacket.isSuccess()) {
System.out.println(new Date() + ": 客户端登录成功");
} else {
System.out.println(new Date() + ": 客户端登录失败,原因:" + loginResponsePacket.getReason());
}
}
}

测试

Code
1
分别启动 NettyServer.java 与 NettyClient.java 即可看到效果

实战:实现客户端与服务端收发消息

具体功能是:在控制台输入一条消息之后按回车,校验完客户端的登录状态之后,把消息发送到服务端,服务端收到消息之后打印并且向客户端发送一条消息,客户端收到之后打印。

1、收发消息对象

c>s

java
1
2
3
4
5
6
7
8
9
10
@Data
public class MessageRequestPacket extends Packet {

private String message;

@Override
public Byte getCommand() {
return MESSAGE_REQUEST;
}
}

s>c

java
1
2
3
4
5
6
7
8
9
10
11
@Data
public class MessageResponsePacket extends Packet {

private String message;

@Override
public Byte getCommand() {

return MESSAGE_RESPONSE;
}
}

加入PacketCodeC

java
1
2
3
4
5
6
7
8
9
10
11
12
private PacketCodeC() {
packetTypeMap = new HashMap<>();
packetTypeMap.put(LOGIN_REQUEST, LoginRequestPacket.class);
packetTypeMap.put(LOGIN_RESPONSE, LoginResponsePacket.class);
// 新加的
packetTypeMap.put(MESSAGE_REQUEST, MessageRequestPacket.class);
packetTypeMap.put(MESSAGE_RESPONSE, MessageResponsePacket.class);

serializerMap = new HashMap<>();
Serializer serializer = new JSONSerializer();
serializerMap.put(serializer.getSerializerAlogrithm(), serializer);
}

2、判断客户端是否登录成功

避免重复登录。可以在登录成功之后,给客户端连接 Channel 绑定一个登录成功的标志位。

定义一下是否登录成功的标志位

java
1
2
3
public interface Attributes {
AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login");
}

登录成功之后,给客户端绑定登录成功的标志位

java
1
2
3
4
5
6
7
8
9
10
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// ...
if (loginResponsePacket.isSuccess()) {
LoginUtil.markAsLogin(ctx.channel());
System.out.println(new Date() + ": 客户端登录成功");
} else {
System.out.println(new Date() + ": 客户端登录失败,原因:" + loginResponsePacket.getReason());
}
// ...
}

2、控制台输入消息并发送

在客户端连接上服务端之后启动控制台线程,从控制台获取消息,然后发送至服务端

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
Channel channel = ((ChannelFuture) future).channel();
// 连接成功之后,启动控制台线程
startConsoleThread(channel);
}
// ...
});
}

private static void startConsoleThread(Channel channel) {
new Thread(() -> {
while (!Thread.interrupted()) {
if (LoginUtil.hasLogin(channel)) {
System.out.println("输入消息发送至服务端: ");
Scanner sc = new Scanner(System.in);
String line = sc.nextLine();

MessageRequestPacket packet = new MessageRequestPacket();
packet.setMessage(line);
ByteBuf byteBuf = PacketCodeC.INSTANCE.encode(channel.alloc(), packet);
channel.writeAndFlush(byteBuf);
}
}
}).start();
}

3、服务端收发消息处理

ServerHandler.java

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf requestByteBuf = (ByteBuf) msg;

Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);

if (packet instanceof LoginRequestPacket) {
// 处理登录..
} else if (packet instanceof MessageRequestPacket) {
// 处理消息
MessageRequestPacket messageRequestPacket = ((MessageRequestPacket) packet);
System.out.println(new Date() + ": 收到客户端消息: " + messageRequestPacket.getMessage());

MessageResponsePacket messageResponsePacket = new MessageResponsePacket();
messageResponsePacket.setMessage("服务端回复【" + messageRequestPacket.getMessage() + "】");
ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), messageResponsePacket);
ctx.channel().writeAndFlush(responseByteBuf);
}
}

4、客户端收消息处理

ClientHandler.java

java
1
2
3
4
5
6
7
8
9
10
11
12
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;

Packet packet = PacketCodeC.INSTANCE.decode(byteBuf);

if (packet instanceof LoginResponsePacket) {
// 登录逻辑...
} else if (packet instanceof MessageResponsePacket) {
MessageResponsePacket messageResponsePacket = (MessageResponsePacket) packet;
System.out.println(new Date() + ": 收到服务端的消息: " + messageResponsePacket.getMessage());
}
}

测试

java
1
分别启动 NettyServer.java 与 NettyClient.java 即可看到效果。

控制台输出

客户端

Code
1
2
3
4
5
6
7
8
9
10
Sun Jul 28 16:03:56 CST 2019: 连接成功,启动控制台线程……
Sun Jul 28 16:03:56 CST 2019: 客户端开始登录
输入消息发送至服务端:
Sun Jul 28 16:03:57 CST 2019: 客户端登录成功
你好,我是Machine
输入消息发送至服务端:
Sun Jul 28 16:04:12 CST 2019: 收到服务端的消息: 服务端回复【你好,我是Machine】
我在学Netty通信
输入消息发送至服务端:
Sun Jul 28 16:04:25 CST 2019: 收到服务端的消息: 服务端回复【我在学Netty通信】

服务端

Code
1
2
3
4
5
端口[8000]绑定成功!
Sun Jul 28 16:03:57 CST 2019: 收到客户端登录请求……
Sun Jul 28 16:03:57 CST 2019: 登录成功!
Sun Jul 28 16:04:12 CST 2019: 收到客户端消息: 你好,我是Machine
Sun Jul 28 16:04:25 CST 2019: 收到客户端消息: 我在学Netty通信
java
1
2


文章作者: Machine
文章链接: https://machine4869.gitee.io/2019/07/22/20190722161728334/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 哑舍
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论