Netty黏包半包解决方案

1. 短连接

service

package com.wuhm.netty.part5;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * @author wuhuaming
 * @description
 * @date 2022-06-24 16:42
 **/
@Slf4j
public class Server {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            // 调整netty的接收缓冲区大小
            serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));
            serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel serverChannel) throws Exception {
                    serverChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (InterruptedException e){
            log.error("server error:", e);
        }finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

client

package com.wuhm.netty.part5;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * @author wuhuaming
 * @description
 * @date 2022-06-24 16:50
 **/
@Slf4j
public class Client {
    public static void main(String[] args) {
        for(int i = 0; i < 10; i++){
            send();
        }
        log.info("client finish!");
    }

    public static void send(){
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            ByteBuf buf = ctx.alloc().buffer(16);
                            buf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18});
                            ctx.writeAndFlush(buf);
                            // 短连接的方式
                            ctx.channel().close();
                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (InterruptedException e){
            log.error("client error: ", e);
        }finally {
            worker.shutdownGracefully();
        }
    }
}

2. 定长解码器

service

......
  
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel serverChannel) throws Exception {
                    serverChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    serverChannel.pipeline().addLast(new FixedLengthFrameDecoder(10));
                }
            });

......

3. 行解码器

......
  serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel serverChannel) throws Exception {
                    serverChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    serverChannel.pipeline().addLast(new LineBasedFrameDecoder(10));
                }
            });
  
......

LTC解码器

public class TestLengthFileDecoder {

    public static void main(String[] args) {
        EmbeddedChannel channel = new EmbeddedChannel(
                new LengthFieldBasedFrameDecoder(1024, 0, 4, 0 ,4),
                new LoggingHandler(LogLevel.DEBUG)
        );

        // 发送4个字节的长度
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
        send(byteBuf, "hello word");
        send(byteBuf, "hi!");
        channel.writeInbound(byteBuf);
    }

    private static void send(ByteBuf byteBuf, String msg) {
        byte[] bytes = msg.getBytes();
        int length = bytes.length;
        byteBuf.writeInt(length);
        byteBuf.writeBytes(bytes);
    }
}

http协议

@Slf4j
public class TestHttp {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup work = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, work);
            serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new HttpServerCodec());
                    ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
                            log.debug(msg.uri());
                            DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
                            String res = "<h1>hello word</h1>";
                            response.content().writeBytes(res.getBytes());
                            response.headers().setInt(CONTENT_LENGTH, res.length());
                            ctx.writeAndFlush(response);
                        }
                    });
//                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
//                        @Override
//                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//                            log.debug("{}", msg.getClass());
//
//                        }
//                    });
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            log.error("error: ", e);
        }
    }
}

自定义协议的要素(推荐)

  • 魔数:用来在第一时间判断是否是无效数据包
  • 版本号,可以支持协议的升级
  • 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json,protobuf,hessian,jdk
  • 指令类型:和业务相关联,如登录,注册,单聊,群聊等等
  • 请求序号,为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

创建基础协议常量

public interface ProtocolConstants {

    /**
     * Magic code
     */
    byte[] MAGIC_CODE_BYTES = {(byte) 0xda, (byte) 0xda};

    /**
     * Protocol version
     */
    byte VERSION = 1;

    /**
     * Max frame length
     */
    int MAX_FRAME_LENGTH = 8 * 1024 * 1024;

    /**
     * HEAD_LENGTH of protocol v1
     */
    int V1_HEAD_LENGTH = 16;

    /**
     * 数据长度字节数
     */
    int V1_FIELD_LENGTH = 4;
    
    /**
     * Message type: Request
     */
    byte MSGTYPE_RESQUEST_SYNC = 0;
    /**
     * Message type: Response
     */
    byte MSGTYPE_RESPONSE = 1;
    /**
     * Message type: Request which no need response
     */
    byte MSGTYPE_RESQUEST_ONEWAY = 2;
    /**
     * Message type: Heartbeat Request
     */
    byte MSGTYPE_HEARTBEAT_REQUEST = 3;
    /**
     * Message type: Heartbeat Response
     */
    byte MSGTYPE_HEARTBEAT_RESPONSE = 4;

}

创建编解码器

@Slf4j
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, AbstractMessage> {

    @Override
    protected void encode(ChannelHandlerContext ctx, AbstractMessage msg, List<Object> out) throws Exception {
        ByteBuf byteBuf = ctx.alloc().buffer();
        // 1. 2个字节的魔数
        byteBuf.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);
        // 2. 1个字节的版本号
        byteBuf.writeByte(ProtocolConstants.VERSION);
        // 3. 1字节序列化方式  0-jdk;2-fst;4-kryo
        int serializerType = NettySerializerConfig.getSerializerCode();
        byteBuf.writeByte(serializerType);
        // 4. 4字节指令类型
        if(msg.getMessageType() == null){
            byteBuf.writeInt(0);
        }else{
            byteBuf.writeInt(msg.getMessageType());
        }
        // 5. 8字节请求
        if(msg.getSequenceId() == null){
            byteBuf.writeLong(0L);
        }else{
            byteBuf.writeLong(msg.getSequenceId());
        }
        // 前面一共是 2 + 1 + 1 + 4 + 8  = 16

        // 6. 采用序列化方式
        Serializer serializer = NettySerializerConfig.getSerializerAlgorithm();
        byte[] bytes = serializer.serialize(msg);
        byteBuf.writeInt(bytes.length);
        // 7. 正文内容
        byteBuf.writeBytes(bytes);
        out.add(byteBuf);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {

        byte b0 = byteBuf.readByte();
        byte b1 = byteBuf.readByte();
        if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
                || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
            throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1);
        }

        byte version = byteBuf.readByte();

        byte serializerType = byteBuf.readByte();

        int messageType = byteBuf.readInt();

        Long sequenceId = byteBuf.readLong();

        int length = byteBuf.readInt();
        byte[] bytes = new byte[length];
        byteBuf.readBytes(bytes, 0, length);
        // 反序列化对象
        Serializer serializer = NettySerializerConfig.getSerializerAlgorithm();
        AbstractMessage message = (AbstractMessage) serializer.deserialize(bytes);
        out.add(message);

    }
}

创建序列化工具枚举类

@Getter
@AllArgsConstructor
public enum SerializerTypeEnum {

    JDK(0, "jdk"),
    FST(2, "fst"),
    KRYO(2 << 1, "kryo"),
    HESSIAN(2 << 2, "hessian"),
    FURY(2 << 3, "fury"),
    ;

    /**
     * 序列化方式编码
     */
    private final int code;

    /**
     * 序列化方式名称
     */
    private final String name;

    public static int getCodeByName(String name){
        for(SerializerTypeEnum serializerTypeEnum : SerializerTypeEnum.values()){
            if(serializerTypeEnum.getName().equals(name)){
                return serializerTypeEnum.code;
            }
        }
        return 0;
    }

    public static String getNameByCode(int code){
        for(SerializerTypeEnum serializerTypeEnum : SerializerTypeEnum.values()){
            if(serializerTypeEnum.getCode() == code){
                return serializerTypeEnum.name;
            }
        }
        return null;
    }

    public static SerializerTypeEnum getInstance(int code){
        for(SerializerTypeEnum serializerTypeEnum : SerializerTypeEnum.values()){
            if(serializerTypeEnum.getCode() == code){
                return serializerTypeEnum;
            }
        }
        return null;
    }

}

这里通过配置文件中的配置来选择序列化和反序列化的方式

netty.serializer.type=fst

定义序列化接口

public interface Serializer {

    /**
     * Encode object to byte[].
     *
     * @param <T> the type parameter
     * @param t   the t
     * @return the byte [ ]
     */
    <T> byte[] serialize(T t);

    /**
     * Decode t from byte[].
     *
     * @param <T>   the type parameter
     * @param bytes the bytes
     * @return the t
     */
    <T> T deserialize(byte[] bytes);

}

序列化框架的实现类,这里只列举两种,更多序列化方式可自行扩展

jdk序列化方式

public class JdkSerializerImpl implements Serializer {

    @Override
    public <T> T deserialize(byte[] bytes) {
        try {
            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
            return (T) ois.readObject();
        } catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException("jdk反序列化失败", e);
        }
    }

    @Override
    public <T> byte[] serialize(T object) {
        try {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(bos);
            oos.writeObject(object);
            return bos.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException("jdk序列化失败", e);
        }
    }

}

fst序列化方式

public class FstSerializerFactory {

    private static final FstSerializerFactory FACTORY = new FstSerializerFactory();

    private final FSTConfiguration conf = FSTConfiguration.createDefaultConfiguration();


    public static FstSerializerFactory getDefaultFactory() {
        return FACTORY;
    }

    public FstSerializerFactory() {
        SerializerSecurityRegistry.getAllowClassType().forEach(conf::registerClass);
    }

    public <T> byte[] serialize(T t) {
        return conf.asByteArray(t);
    }

    public <T> T deserialize(byte[] bytes) {
        return (T)conf.asObject(bytes);
    }
}
public class FstSerializerImpl implements Serializer, Initialize {

    private final FstSerializerFactory fstFactory = FstSerializerFactory.getDefaultFactory();

    @Override
    public <T> byte[] serialize(T t) {
        return fstFactory.serialize(t);
    }

    @Override
    public <T> T deserialize(byte[] bytes) {
        return fstFactory.deserialize(bytes);
    }

    @Override
    public void init() {

    }
}

可自行扩展hessiankryofury序列化方式

通过配置文件获取序列化方式配置类

public class NettySerializerConfig {

    static Properties properties;

    static {
        try (InputStream in = NettySerializerConfig.class.getResourceAsStream("/application.properties")) {
            properties = new Properties();
            properties.load(in);
        } catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
    }

    public static int getSerializerCode(){
        String value = properties.getProperty("netty.serializer.type");
        if(value == null){
            return SerializerTypeEnum.JDK.getCode();
        }
        return SerializerTypeEnum.getCodeByName(value);
    }


    public static Serializer getSerializerAlgorithm() {
        String value = properties.getProperty("netty.serializer.type");
        if(value == null) {
            log.info("use default serializer type: jdk");
            return new JdkSerializerImpl();
        } else if(SerializerTypeEnum.JDK.getName().equals(value)){
            log.info("use jdk serializer type.");
            return new JdkSerializerImpl();
        }else if(SerializerTypeEnum.FST.getName().equals(value)){
            log.info("use fst serializer type.");
            return new FstSerializerImpl();
        }else{
            throw new RuntimeException("please check serializer type: [netty.serializer.type][jdk/fst/kryo/hessian/fury]");
        }
    }

定义抽象message对象

@Data
@NoArgsConstructor
@AllArgsConstructor
public class AbstractMessage implements Serializable {

    /**
     * 消息类型
     */
    private Integer messageType;

    /**
     * 消息序列id
     */
    private Long sequenceId;

}

测试实现类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class TestCodecMessage extends AbstractMessage {

    private String name;

    private Integer age;

    private Double height;

    private Date birthday;

    private String desc;

}

测试

server

@Slf4j
public class ChatServer {

    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup work = new NioEventLoopGroup();
        MessageCodecSharable messageCodecSharable = new MessageCodecSharable();
        LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
        try {
            ChannelFuture channelFuture = new ServerBootstrap()
                    .group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            // 添加 handler
                            ch.pipeline().addLast(new ProtocolFrameDecoder());
                            ch.pipeline().addLast(loggingHandler);
                            ch.pipeline().addLast(messageCodecSharable);
                            ch.pipeline().addLast(new SimpleChannelInboundHandler<TestCodecMessage>(){
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, TestCodecMessage msg) throws Exception {
                                    log.info("收到消息:{}",JSON.toJSONString(msg));
                                }
                            });

                        }
                    }).bind(8000);
            channelFuture.sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            log.error("服务器错误:", e);
        }finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}

client

@Slf4j
public class ChatClient {
    public static void main(String[] args) {
        NioEventLoopGroup work = new NioEventLoopGroup();
        MessageCodecSharable messageCodecSharable = new MessageCodecSharable();
        LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
        try{
            Bootstrap bootstrap = new Bootstrap()
                    .group(work)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ProtocolFrameDecoder())
                                    .addLast(loggingHandler)
                                    .addLast(messageCodecSharable);
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    log.info("连接成功");
                                    TestCodecMessage message = new TestCodecMessage("江山待有才人出,各领风骚数百年。",
                                            18, 170.50, new Date(),
                                            "不积跬步无以至千里,不积小流无以至江海。莫愁前路无知己,天下谁人不识君。");
                                    message.setMessageType(4);
                                    message.setSequenceId(1024L);
                                    ctx.writeAndFlush(message);
                                }
                            });
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8000).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            log.error("客户端连接异常", e);
        }finally {
            work.shutdownGracefully();
        }
    }
}

结果

server console

1:13:32.057 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x748725e5, L:/127.0.0.1:8000 - R:/127.0.0.1:56681] REGISTERED
11:13:32.057 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x748725e5, L:/127.0.0.1:8000 - R:/127.0.0.1:56681] ACTIVE
11:13:32.316 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x748725e5, L:/127.0.0.1:8000 - R:/127.0.0.1:56681] READ: 263B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| da da 01 10 00 00 00 04 00 00 00 00 00 00 04 00 |................|
|00000010| 00 00 00 f3 02 ff 00 00 01 36 06 5e b9 9f ec a5 |.........6.^....|
|00000020| 30 00 63 6f 6d 2e 65 78 61 6d 70 6c 65 2e 73 70 |0.com.example.sp|
|00000030| 72 69 6e 67 2e 62 6f 6f 74 2e 74 65 73 74 2e 6e |ring.boot.test.n|
|00000040| 65 74 74 79 2e 6d 6f 64 65 6c 2e 6d 65 73 73 61 |etty.model.messa|
|00000050| 67 65 00 c7 05 98 30 a6 02 da 98 10 00 54 65 73 |ge....0......Tes|
|00000060| 74 43 6f 64 65 63 4d 65 73 73 61 67 65 98 e4 b0 |tCodecMessage...|
|00000070| 34 ff 00 00 00 00 00 50 65 40 ff 00 04 00 00 00 |4......Pe@......|
|00000080| 00 00 00 ff 12 00 00 00 ff 04 00 00 00 ff 01 48 |...............H|
|00000090| 0d 4e ef 79 ec 8d 65 6b e0 65 e5 4e f3 81 43 53 |.N.y..ek.e.N..CS|
|000000a0| cc 91 0c ff 0d 4e ef 79 0f 5c 41 6d e0 65 e5 4e |.....N.y.\Am.e.N|
|000000b0| f3 81 5f 6c 77 6d 02 30 ab 83 01 61 4d 52 ef 8d |.._lwm.0...aMR..|
|000000c0| e0 65 e5 77 f1 5d 0c ff 29 59 0b 4e 01 8c ba 4e |.e.w.]..)Y.N...N|
|000000d0| 0d 4e c6 8b 1b 54 02 30 ff 01 20 5f 6c 71 5c 85 |.N...T.0.. _lq\.|
|000000e0| 5f 09 67 4d 62 ba 4e fa 51 0c ff 04 54 86 98 ce |_.gMb.N.Q...T...|
|000000f0| 98 9a 9a 70 65 7e 76 74 5e 02 30 ff 01 31 00 9b |...pe~vt^.0..1..|
|00000100| ab b9 1c 8b 01 00 00                            |.......         |
+--------+-------------------------------------------------+----------------+
11:13:32.316 [nioEventLoopGroup-3-2] INFO com.example.spring.boot.test.netty.model.server.ChatServer - 收到消息:{"age":18,"birthday":1696994012059,"desc":"不积跬步无以至千里,不积小流无以至江海。莫愁前路无知己,天下谁人不识君。","height":170.5,"messageType":4,"name":"江山待有才人出,各领风骚数百年。","sequenceId":1024}
11:13:32.316 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x748725e5, L:/127.0.0.1:8000 - R:/127.0.0.1:56681] READ COMPLETE
0%