Netty入门
基础概念
-
channel理解为数据通道
-
把MSG理解为流动数据,最开始输入的事ByteBuffer,但是进过pipeline的加工,会变成其他类型或对象,最后编程ByteBuffer
-
把handler理解为数据的处理工序
- 工序有多道,合在一起就是pipeline,
- handle分为Inbound 和OutBound两类(即服务端读,写)
-
把EventLoop理解为处理数据的工人
- 工人可以管理多个channel的io操作;并且一旦工人负责了某个channel,就要负责到底(存在绑定关系)
- 工人既可以执行io操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个channel的待处理任务,任务分为普通任务,定时任务
- 工人安装pipeline的顺序,依次按照handle的规划处理数据,可以为每道工序指定不同的工人
案例:
server
public class HelloNettyServer {
public static void main(String[] args) {
// 服务器端的启动器:负责组装netty组件,协调组件的工作
new ServerBootstrap()
// 2. 事件组,里面包含了线程和selector
.group(new NioEventLoopGroup())
// 3. 选择服务器的 ServerSocketChannel 的实现类有NIO和BIO等等
.channel(NioServerSocketChannel.class)
// 4. 事件循环组 需要关注的事件与具体的业务逻辑处理工作
.childHandler(
// 5. NioSocketChannel:连接建立后,数据读写的通道
new ChannelInitializer<NioSocketChannel>() {
/**
* 负责添加别的handler
* @param nioSocketChannel
* @throws Exception
*/
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 将bytebuffer转为字符串
nioSocketChannel.pipeline().addLast(new StringDecoder());
// 自定义的handler
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
/**
* 处理读事件
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
.bind(8000);
}
}
client
public class HelloNettyClient {
public static void main(String[] args) throws InterruptedException {
// 1. 启动类
new Bootstrap()
// 2. 添加循环事件组
.group(new NioEventLoopGroup())
// 3.选择 SocketChannel 的实现类
.channel(NioSocketChannel.class)
// 4. 添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
/**
* 连接建立后被调用,做初始化工作
* @param nioSocketChannel
* @throws Exception
*/
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 将字符串转为bytebuffer
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
}).connect(new InetSocketAddress("localhost", 8000))
.sync()// 阻塞方法,直到连接建立
.channel()// 获取SocketChannel
.writeAndFlush("hello netty server,i am netty client");// 发送数据
}
}
netty 组件
EventLoop
事件循环对象
EventLoop
本质是一个单线程执行器(同时维护了一个selector
),里面有run方法处理channel
上的源源不断的io事件
他的继承关系比较复杂
- 他集成了
java.util.concurrent.ScheduledExecutorService
,因此包含了线程池中所有的方法 - 继承了
netty
的OrderedEventExecutor
- 提供了
Boolean inEventLoop(Thread thread)
方法,判断一个线程是否属于此EventLoop
- 提供了
parent
方法,查看自己属于哪个EventLoopGroup
- 提供了
事件循环组
EventLoopGroup
是一组EventLoop
,Channel一般会调用EventLoopGroup
的register
方法来绑定其中一个EventLoop
,后续这个channel
上的io事件都由此EventLoop
来处理(保证io事件处理时的线程安全)
案例:
package com.wuhm.netty.part2;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.NettyRuntime;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* @author
* @description
* @date 2022-06-11 21:31
**/
@Slf4j
public class TestEventLoopGroup {
public static void main(String[] args) {
System.out.println(NettyRuntime.availableProcessors());
// 1. 创建EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup(2);
// 获取下一个事件循环对象
/**
* io.netty.channel.nio.NioEventLoop@1b40d5f0
* io.netty.channel.nio.NioEventLoop@ea4a92b
* io.netty.channel.nio.NioEventLoop@1b40d5f0
*/
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
group.next().submit(()->{
log.debug("submit is ok");
});
log.debug("main is ok");
group.next().scheduleAtFixedRate(()->{
log.debug("schedule is ok");
}, 0,1, TimeUnit.SECONDS);
}
}
结果
io.netty.channel.nio.NioEventLoop@1b40d5f0
io.netty.channel.nio.NioEventLoop@ea4a92b
io.netty.channel.nio.NioEventLoop@1b40d5f0
21:46:12.319 [main] DEBUG com.wuhm.netty.part2.TestEventLoopGroup - main is ok
21:46:12.320 [nioEventLoopGroup-2-2] DEBUG com.wuhm.netty.part2.TestEventLoopGroup - schedule is ok
21:46:12.320 [nioEventLoopGroup-2-1] DEBUG com.wuhm.netty.part2.TestEventLoopGroup - submit is ok
21:46:13.320 [nioEventLoopGroup-2-2] DEBUG com.wuhm.netty.part2.TestEventLoopGroup - schedule is ok
21:46:14.320 [nioEventLoopGroup-2-2] DEBUG com.wuhm.netty.part2.TestEventLoopGroup - schedule is ok
21:46:15.321 [nioEventLoopGroup-2-2] DEBUG com.wuhm.netty.part2.TestEventLoopGroup - schedule is ok
21:46:16.320 [nioEventLoopGroup-2-2] DEBUG com.wuhm.netty.part2.TestEventLoopGroup - schedule is ok
21:46:17.321 [nioEventLoopGroup-2-2] DEBUG com.wuhm.netty.part2.TestEventLoopGroup - schedule is ok
21:46:18.320 [nioEventLoopGroup-2-2] DEBUG com.wuhm.netty.part2.TestEventLoopGroup - schedule is ok
21:46:19.321 [nioEventLoopGroup-2-2] DEBUG com.wuhm.netty.part2.TestEventLoopGroup - schedule is ok
21:46:20.321 [nioEventLoopGroup-2-2] DEBUG com.wuhm.netty.part2.TestEventLoopGroup - schedule is ok
EventLoopGroupServer
package com.wuhm.netty.part2;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
/**
* @author
* @description
* @date 2022-06-11 21:48
**/
@Slf4j
public class TestEventGroupServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}
EventLoopGroupClient
package com.wuhm.netty.part2;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
/**
* @author
* @description
* @date 2022-06-11 21:53
**/
public class TestEventLoopGroupClient {
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
// 在连接建立后被调用,初始化
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
channel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
System.out.println(channel);
System.out.println("");
}
}
分工细化
package com.wuhm.netty.part2;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
/**
* @author
* @description
* @date 2022-06-11 21:48
**/
@Slf4j
public class TestEventGroupServer {
public static void main(String[] args) {
// 自定义事件循环组
DefaultEventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
channel.pipeline().addLast( "handle1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
log.debug(buf.toString(Charset.defaultCharset()));
// 将消交给下一个handle处理
ctx.fireChannelRead(msg);
}
});
// 利用我们自定义的事件循环组来处理数据
channel.pipeline().addLast(group, "handle2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}
EventLoop线程切换的原理
channel
channel的主要作用:
- close()可以用来关闭channel
- close()Future() 用来处理channel的关闭
- sync()方法作用是同步等待channel关闭
- addListener()是异步等待channel关闭
- pipeline()方法添加处理器
- write()方法将数据写入
- writeAndFlush方法将数据写入并刷新
ChannelFuture
io.netty.bootstrap.Bootstrap#connect(java.lang.String, int)
:该方法是异步非阻塞的,main线程发起了调用,但是真正执行连接操作的是nio线程
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
}).connect("localhost", 8000);
// channelFuture.sync();
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello netty");
}
}
服务端并不能接收到:hello netty.
采用ChannelFuture处理结果
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
}).connect("localhost", 8000);
// 方式一:sync 阻塞线程,直到nio线程连接建立完毕
// channelFuture.sync();
// Channel channel = channelFuture.channel();
// channel.writeAndFlush("hello netty");
// 方式二,通过addListener 方法异步处理结果
channelFuture.addListener(new ChannelFutureListener() {
/**
* 在Nio线程中,连接建立好了之后会调用该方法
* @param future
* @throws Exception
*/
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello netty");
}
});
}
}
channelFuture关闭问题
@Slf4j
public class ChannelFutureServer {
public static void main(String[] args) {
// 定义一个普通事件,
EventLoop eventLoop = new DefaultEventLoop();
new ServerBootstrap()
// 第一个事件循环中只负责处理连接事件,第二个事件循环组负责SocketChannel上的读写事件
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))// 缺省或获取系统核心数*2
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast("handle1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.info(byteBuf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg);// 将msg传递给后面的handler
}
}).addLast(eventLoop, "handle2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.info(byteBuf.toString(Charset.defaultCharset()));
}
});
}
}).bind(8000);
}
}
client
@Slf4j
public class ChannelFutureClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
}).connect("localhost", 8000);
//sync 阻塞线程,直到nio线程连接建立完毕
channelFuture.sync();
Channel channel = channelFuture.channel();
// 定义一个线程循环接收输入,发送数据到服务器
new Thread(()->{
while(true){
Scanner scanner = new Scanner(System.in);
String nextLine = scanner.nextLine();
if("q".equals(nextLine)){
channel.close();
// log.info("连接关闭后的操作......");
break;
}
channel.writeAndFlush(nextLine);
}
}, "input").start();
// // 获取channel关闭对象
// ChannelFuture closedFuture = channel.closeFuture();
// // 同步等待关闭
// closedFuture.sync();
// log.info("连接关闭后的操作......");
ChannelFuture closedFuture = channel.closeFuture();
closedFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
log.info("连接关闭后的操作......");
// 优雅的终止
group.shutdownGracefully();
}
});
}
}
netty为什么要用异步
场景:4个医生给人看病,每个病人花费20分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人,假设病人源源不断的来,4个医生一天工作8小时,处理病人的总数是:(4 * 8 * (60 /20)) = 96
一个病人看病可以拆分为4步,每一步花费5分钟
4个医生每个人只负责一件事,只有一开始的时候,医生2,3,4分别要等待5,10,15分钟才能执行任务,后面源源不断的病人来了以后,他们就能够满负荷的工作,4 * 8 * (60 / 5) = 384
Future & Promise
在异步处理时,经常用到这两个接口,netty
中的Future
和jdk中的Future同名
,但是是两个接口,netty的Future
继承自jdk的Future
,而Promise
又对netty的Future
进行了扩展
- jdk的Future:只能同步等待任务结束(或成功,或失败)才能得到结果
- netty Future:可以同步等待任务结束得到结果,也可以异步的方式得到结果,但是都要等待任务结束
- netty Promise:不仅有netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
方法名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回null | - |
await | - | 等待任务结束,如果任务失败,不会抛出异常,而是通过isSuccess判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
cause | - | 获取失败信息,非阻塞,如果没有失败信息,返回null | - |
addLinstener | - | 添加回调,异步接收结果 | - |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
案例:netty通过addLinstener异步获取结果
public class NettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(2);
EventLoop eventLoop = eventExecutors.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return 10;
}
});
// System.out.println(future.get());
// 异步获取数据
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
System.out.println(future.getNow());
}
});
}
}
Promise 对象使用
public class PromiseTest {
public static void main(String[] args) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
EventLoop next = eventLoopGroup.next();
// 存储结果的容器
DefaultPromise<Integer> defaultPromise = new DefaultPromise<>(next);
// 存放结果的线程
new Thread(()->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
defaultPromise.setSuccess(30);
}, "input").start();
// 获取结果的线程
new Thread(()->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
Integer integer = defaultPromise.get();
System.out.println(integer);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}, "output").start();
}
}
bytebuf
直接内存 & 堆内存
可以使用下面代码来创建池化基于堆的Bytebuf
Bytebuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
也可以使用下面的代码来创建池化基于直接内存的ByteBuf
ByteBuf bytebuf = ByteBufAllocator.DEFAULT.directBuffer(10);
- 直接内存创建和销毁的代价高,但是读写性能高(少一次内存复制),适合配合池化功能一起使用
- 直接内存GC压力小,因为这部分内存不受JVM垃圾回收的管理,但也要注意及时主动释放
池化 & 非池化
池化的最大意义在于可以重用ByteBuf
- 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是创建的堆内存,也会增加GC压力
- 池化以后,可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置
-Dio.netty.allocator.type={unpooled|pooled}
- 在netty4.1版本后:非Android平台默认启用池化实现,
组成:
写入
部分方法列表
方法 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) | 写入boolean值 | 用一字节01|00表示true|false |
writeByte(int value) | 写入ByteBuf值 | |
writeInt(int value) | 写入int值 | Big Endian,从低位写入:0x250 -> 00 00 02 50 |
writeIntLE(int value) | 写入int值 | Little Endian,即0x250,写入后50 02 00 00 |
writeLong(long value) | 写入long值 | |
writeChar(long value) | 写入char值 | |
writeFloat(float value) | 写入float值 | |
writeDouble(double value) | 写入double值 | |
writeBytes(ByteBuf[] value) | 写入netty的ByteBuf | |
writeBytes(byte[] value) | 写入byte[] | |
int writeCharSequence(CharSequence sequence, Charset charset) | 写入字符串 |
扩容
扩容规则:(初始化容量是10)
- 如果写入后的数据大小未超过512,则选择下一个16的整数倍,例如写入后大小为12,则扩容后capacity是16
- 如果写入后数据大小超过512,则选择下一个2^n,例如写入后大小为513,则扩容后capacity是2^10=1024
内存释放
由于netty中有堆外内存的ByteBuf实现,堆外内存最好是手动来释放,而不是等GC垃圾回收。
- UnpooledHeapByteBuf使用的是JVM内存,只需要等待GC回收内存即可
- UnpooledDirectByteBuf使用的是直接内存,需要特殊的方法来回收内存
- PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存
protected abstract void deallocate()
Netty这里采用了引用计数法来控制回收内存,每ByteBuf都实现了ReferenceCounted接口
- 每个ByteBuf对象的初始计数为1
- 调用release方法计数减一,如果计数为0,ByteBuf内存被回收
- 调用retain方法计数加一,表示调用者没有用完之前,其它handler即便调用了release也不会造成回收
- 当计数为0时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用
零拷贝slice
零拷贝体现之一,对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf并没有发生内存复制,还是使用原始ByteBuf的内存,切片后的ByteBuf维护独立的read,write指针
图示
现在有一个Bytebuf,需平均分成2分
public class BytebufTest {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});
log(buffer);
// 切割成两个Bytebuf
ByteBuf slice1 = buffer.slice(0, 5);
ByteBuf slice2 = buffer.slice(5, 5);
log(slice1);
log(slice2);
System.out.println("------------------------");
// 验证
slice1.setByte(1, 'w');
log(slice1);
log(buffer);
}
private static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
}
结果:
read index:0 write index:10 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 66 67 68 69 6a |abcdefghij |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 |abcde |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 67 68 69 6a |fghij |
+--------+-------------------------------------------------+----------------+
------------------------
read index:0 write index:5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 77 63 64 65 |awcde |
+--------+-------------------------------------------------+----------------+
read index:0 write index:10 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 77 63 64 65 66 67 68 69 6a |awcdefghij |
+--------+-------------------------------------------------+----------------+