Netty黏包半包解决方案
目录
1. 短连接
service
package com.wuhm.netty.part5;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
/**
* @author wuhuaming
* @description
* @date 2022-06-24 16:42
**/
@Slf4j
public class Server {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
// 调整netty的接收缓冲区大小
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel serverChannel) throws Exception {
serverChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e){
log.error("server error:", e);
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
client
package com.wuhm.netty.part5;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
/**
* @author wuhuaming
* @description
* @date 2022-06-24 16:50
**/
@Slf4j
public class Client {
public static void main(String[] args) {
for(int i = 0; i < 10; i++){
send();
}
log.info("client finish!");
}
public static void send(){
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer(16);
buf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18});
ctx.writeAndFlush(buf);
// 短连接的方式
ctx.channel().close();
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e){
log.error("client error: ", e);
}finally {
worker.shutdownGracefully();
}
}
}
2. 定长解码器
service
......
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel serverChannel) throws Exception {
serverChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
serverChannel.pipeline().addLast(new FixedLengthFrameDecoder(10));
}
});
......
3. 行解码器
......
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel serverChannel) throws Exception {
serverChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
serverChannel.pipeline().addLast(new LineBasedFrameDecoder(10));
}
});
......
LTC解码器
public class TestLengthFileDecoder {
public static void main(String[] args) {
EmbeddedChannel channel = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(1024, 0, 4, 0 ,4),
new LoggingHandler(LogLevel.DEBUG)
);
// 发送4个字节的长度
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
send(byteBuf, "hello word");
send(byteBuf, "hi!");
channel.writeInbound(byteBuf);
}
private static void send(ByteBuf byteBuf, String msg) {
byte[] bytes = msg.getBytes();
int length = bytes.length;
byteBuf.writeInt(length);
byteBuf.writeBytes(bytes);
}
}
http协议
@Slf4j
public class TestHttp {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup work = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, work);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
log.debug(msg.uri());
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
String res = "<h1>hello word</h1>";
response.content().writeBytes(res.getBytes());
response.headers().setInt(CONTENT_LENGTH, res.length());
ctx.writeAndFlush(response);
}
});
// ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// log.debug("{}", msg.getClass());
//
// }
// });
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
log.error("error: ", e);
}
}
}
自定义协议的要素(推荐)
- 魔数:用来在第一时间判断是否是无效数据包
- 版本号,可以支持协议的升级
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json,protobuf,hessian,jdk
- 指令类型:和业务相关联,如登录,注册,单聊,群聊等等
- 请求序号,为了双工通信,提供异步能力
- 正文长度
- 消息正文
创建基础协议常量
public interface ProtocolConstants {
/**
* Magic code
*/
byte[] MAGIC_CODE_BYTES = {(byte) 0xda, (byte) 0xda};
/**
* Protocol version
*/
byte VERSION = 1;
/**
* Max frame length
*/
int MAX_FRAME_LENGTH = 8 * 1024 * 1024;
/**
* HEAD_LENGTH of protocol v1
*/
int V1_HEAD_LENGTH = 16;
/**
* 数据长度字节数
*/
int V1_FIELD_LENGTH = 4;
/**
* Message type: Request
*/
byte MSGTYPE_RESQUEST_SYNC = 0;
/**
* Message type: Response
*/
byte MSGTYPE_RESPONSE = 1;
/**
* Message type: Request which no need response
*/
byte MSGTYPE_RESQUEST_ONEWAY = 2;
/**
* Message type: Heartbeat Request
*/
byte MSGTYPE_HEARTBEAT_REQUEST = 3;
/**
* Message type: Heartbeat Response
*/
byte MSGTYPE_HEARTBEAT_RESPONSE = 4;
}
创建编解码器
@Slf4j
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, AbstractMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, AbstractMessage msg, List<Object> out) throws Exception {
ByteBuf byteBuf = ctx.alloc().buffer();
// 1. 2个字节的魔数
byteBuf.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);
// 2. 1个字节的版本号
byteBuf.writeByte(ProtocolConstants.VERSION);
// 3. 1字节序列化方式 0-jdk;2-fst;4-kryo
int serializerType = NettySerializerConfig.getSerializerCode();
byteBuf.writeByte(serializerType);
// 4. 4字节指令类型
if(msg.getMessageType() == null){
byteBuf.writeInt(0);
}else{
byteBuf.writeInt(msg.getMessageType());
}
// 5. 8字节请求
if(msg.getSequenceId() == null){
byteBuf.writeLong(0L);
}else{
byteBuf.writeLong(msg.getSequenceId());
}
// 前面一共是 2 + 1 + 1 + 4 + 8 = 16
// 6. 采用序列化方式
Serializer serializer = NettySerializerConfig.getSerializerAlgorithm();
byte[] bytes = serializer.serialize(msg);
byteBuf.writeInt(bytes.length);
// 7. 正文内容
byteBuf.writeBytes(bytes);
out.add(byteBuf);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
byte b0 = byteBuf.readByte();
byte b1 = byteBuf.readByte();
if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
|| ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1);
}
byte version = byteBuf.readByte();
byte serializerType = byteBuf.readByte();
int messageType = byteBuf.readInt();
Long sequenceId = byteBuf.readLong();
int length = byteBuf.readInt();
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes, 0, length);
// 反序列化对象
Serializer serializer = NettySerializerConfig.getSerializerAlgorithm();
AbstractMessage message = (AbstractMessage) serializer.deserialize(bytes);
out.add(message);
}
}
创建序列化工具枚举类
@Getter
@AllArgsConstructor
public enum SerializerTypeEnum {
JDK(0, "jdk"),
FST(2, "fst"),
KRYO(2 << 1, "kryo"),
HESSIAN(2 << 2, "hessian"),
FURY(2 << 3, "fury"),
;
/**
* 序列化方式编码
*/
private final int code;
/**
* 序列化方式名称
*/
private final String name;
public static int getCodeByName(String name){
for(SerializerTypeEnum serializerTypeEnum : SerializerTypeEnum.values()){
if(serializerTypeEnum.getName().equals(name)){
return serializerTypeEnum.code;
}
}
return 0;
}
public static String getNameByCode(int code){
for(SerializerTypeEnum serializerTypeEnum : SerializerTypeEnum.values()){
if(serializerTypeEnum.getCode() == code){
return serializerTypeEnum.name;
}
}
return null;
}
public static SerializerTypeEnum getInstance(int code){
for(SerializerTypeEnum serializerTypeEnum : SerializerTypeEnum.values()){
if(serializerTypeEnum.getCode() == code){
return serializerTypeEnum;
}
}
return null;
}
}
这里通过配置文件中的配置来选择序列化和反序列化的方式
netty.serializer.type=fst
定义序列化接口
public interface Serializer {
/**
* Encode object to byte[].
*
* @param <T> the type parameter
* @param t the t
* @return the byte [ ]
*/
<T> byte[] serialize(T t);
/**
* Decode t from byte[].
*
* @param <T> the type parameter
* @param bytes the bytes
* @return the t
*/
<T> T deserialize(byte[] bytes);
}
序列化框架的实现类,这里只列举两种,更多序列化方式可自行扩展
jdk序列化方式
public class JdkSerializerImpl implements Serializer {
@Override
public <T> T deserialize(byte[] bytes) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("jdk反序列化失败", e);
}
}
@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(object);
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("jdk序列化失败", e);
}
}
}
fst序列化方式
public class FstSerializerFactory {
private static final FstSerializerFactory FACTORY = new FstSerializerFactory();
private final FSTConfiguration conf = FSTConfiguration.createDefaultConfiguration();
public static FstSerializerFactory getDefaultFactory() {
return FACTORY;
}
public FstSerializerFactory() {
SerializerSecurityRegistry.getAllowClassType().forEach(conf::registerClass);
}
public <T> byte[] serialize(T t) {
return conf.asByteArray(t);
}
public <T> T deserialize(byte[] bytes) {
return (T)conf.asObject(bytes);
}
}
public class FstSerializerImpl implements Serializer, Initialize {
private final FstSerializerFactory fstFactory = FstSerializerFactory.getDefaultFactory();
@Override
public <T> byte[] serialize(T t) {
return fstFactory.serialize(t);
}
@Override
public <T> T deserialize(byte[] bytes) {
return fstFactory.deserialize(bytes);
}
@Override
public void init() {
}
}
可自行扩展hessian
和kryo
,fury
序列化方式
通过配置文件获取序列化方式配置类
public class NettySerializerConfig {
static Properties properties;
static {
try (InputStream in = NettySerializerConfig.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
}
public static int getSerializerCode(){
String value = properties.getProperty("netty.serializer.type");
if(value == null){
return SerializerTypeEnum.JDK.getCode();
}
return SerializerTypeEnum.getCodeByName(value);
}
public static Serializer getSerializerAlgorithm() {
String value = properties.getProperty("netty.serializer.type");
if(value == null) {
log.info("use default serializer type: jdk");
return new JdkSerializerImpl();
} else if(SerializerTypeEnum.JDK.getName().equals(value)){
log.info("use jdk serializer type.");
return new JdkSerializerImpl();
}else if(SerializerTypeEnum.FST.getName().equals(value)){
log.info("use fst serializer type.");
return new FstSerializerImpl();
}else{
throw new RuntimeException("please check serializer type: [netty.serializer.type][jdk/fst/kryo/hessian/fury]");
}
}
定义抽象message对象
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AbstractMessage implements Serializable {
/**
* 消息类型
*/
private Integer messageType;
/**
* 消息序列id
*/
private Long sequenceId;
}
测试实现类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TestCodecMessage extends AbstractMessage {
private String name;
private Integer age;
private Double height;
private Date birthday;
private String desc;
}
测试
server
@Slf4j
public class ChatServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup work = new NioEventLoopGroup();
MessageCodecSharable messageCodecSharable = new MessageCodecSharable();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
try {
ChannelFuture channelFuture = new ServerBootstrap()
.group(boss, work)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 添加 handler
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageCodecSharable);
ch.pipeline().addLast(new SimpleChannelInboundHandler<TestCodecMessage>(){
@Override
protected void channelRead0(ChannelHandlerContext ctx, TestCodecMessage msg) throws Exception {
log.info("收到消息:{}",JSON.toJSONString(msg));
}
});
}
}).bind(8000);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
log.error("服务器错误:", e);
}finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
client
@Slf4j
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup work = new NioEventLoopGroup();
MessageCodecSharable messageCodecSharable = new MessageCodecSharable();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
try{
Bootstrap bootstrap = new Bootstrap()
.group(work)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder())
.addLast(loggingHandler)
.addLast(messageCodecSharable);
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("连接成功");
TestCodecMessage message = new TestCodecMessage("江山待有才人出,各领风骚数百年。",
18, 170.50, new Date(),
"不积跬步无以至千里,不积小流无以至江海。莫愁前路无知己,天下谁人不识君。");
message.setMessageType(4);
message.setSequenceId(1024L);
ctx.writeAndFlush(message);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8000).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
log.error("客户端连接异常", e);
}finally {
work.shutdownGracefully();
}
}
}
结果
server console
1:13:32.057 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x748725e5, L:/127.0.0.1:8000 - R:/127.0.0.1:56681] REGISTERED
11:13:32.057 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x748725e5, L:/127.0.0.1:8000 - R:/127.0.0.1:56681] ACTIVE
11:13:32.316 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x748725e5, L:/127.0.0.1:8000 - R:/127.0.0.1:56681] READ: 263B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| da da 01 10 00 00 00 04 00 00 00 00 00 00 04 00 |................|
|00000010| 00 00 00 f3 02 ff 00 00 01 36 06 5e b9 9f ec a5 |.........6.^....|
|00000020| 30 00 63 6f 6d 2e 65 78 61 6d 70 6c 65 2e 73 70 |0.com.example.sp|
|00000030| 72 69 6e 67 2e 62 6f 6f 74 2e 74 65 73 74 2e 6e |ring.boot.test.n|
|00000040| 65 74 74 79 2e 6d 6f 64 65 6c 2e 6d 65 73 73 61 |etty.model.messa|
|00000050| 67 65 00 c7 05 98 30 a6 02 da 98 10 00 54 65 73 |ge....0......Tes|
|00000060| 74 43 6f 64 65 63 4d 65 73 73 61 67 65 98 e4 b0 |tCodecMessage...|
|00000070| 34 ff 00 00 00 00 00 50 65 40 ff 00 04 00 00 00 |4......Pe@......|
|00000080| 00 00 00 ff 12 00 00 00 ff 04 00 00 00 ff 01 48 |...............H|
|00000090| 0d 4e ef 79 ec 8d 65 6b e0 65 e5 4e f3 81 43 53 |.N.y..ek.e.N..CS|
|000000a0| cc 91 0c ff 0d 4e ef 79 0f 5c 41 6d e0 65 e5 4e |.....N.y.\Am.e.N|
|000000b0| f3 81 5f 6c 77 6d 02 30 ab 83 01 61 4d 52 ef 8d |.._lwm.0...aMR..|
|000000c0| e0 65 e5 77 f1 5d 0c ff 29 59 0b 4e 01 8c ba 4e |.e.w.]..)Y.N...N|
|000000d0| 0d 4e c6 8b 1b 54 02 30 ff 01 20 5f 6c 71 5c 85 |.N...T.0.. _lq\.|
|000000e0| 5f 09 67 4d 62 ba 4e fa 51 0c ff 04 54 86 98 ce |_.gMb.N.Q...T...|
|000000f0| 98 9a 9a 70 65 7e 76 74 5e 02 30 ff 01 31 00 9b |...pe~vt^.0..1..|
|00000100| ab b9 1c 8b 01 00 00 |....... |
+--------+-------------------------------------------------+----------------+
11:13:32.316 [nioEventLoopGroup-3-2] INFO com.example.spring.boot.test.netty.model.server.ChatServer - 收到消息:{"age":18,"birthday":1696994012059,"desc":"不积跬步无以至千里,不积小流无以至江海。莫愁前路无知己,天下谁人不识君。","height":170.5,"messageType":4,"name":"江山待有才人出,各领风骚数百年。","sequenceId":1024}
11:13:32.316 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x748725e5, L:/127.0.0.1:8000 - R:/127.0.0.1:56681] READ COMPLETE