ByteBuf源码分析(三十三)

更新时间:2023-05-12 01:55:49 阅读: 评论:0

ByteBuf源码分析(三⼗三)
今天进⾏ByteBuf源码分析:
⼀、前⾔简介:
当我们进⾏数据传输的时候,往往需要使⽤到缓冲区,常⽤的缓冲区就是 JDK NIO 类库提供的 java.nio.Buffer。实际上,7 种基础类型(Boolean 除外)都有⾃⼰的缓冲区实现。对于 NIO 编程⽽⾔,我们主要使⽤的是 ByteBuffer。从功能⾓度⽽⾔,ByteBuffer 完全可以满⾜ NIO 编程的需要,但是由于 NIO 编程的复杂性,ByteBuffer 也有其局限性,它的主要缺点如下。
( 1) ByteBuffer 长度固定,⼀旦分配完成,它的容量不能动态扩展和收缩,当需要编码的 POJO 对象⼤于 ByteBuffer 的容量时,会发⽣索引越界异常;
( 2) ByteBuffer 只有⼀个标识位置的指针 position,读写的时候需要⼿⼯调⽤ flip()和 rewind()等,使⽤者必须⼩⼼谨慎地处理这些API,否则很容易导致程序处理失败;
(3)ByteBuffer 的 API 功能有限,⼀些⾼级和实⽤的特性它不⽀持,需要使⽤者⾃⼰编程实现。
为了弥补这些不⾜,Netty 提供了⾃⼰的 ByteBuffer 实现——ByteBuf
⼆、ByteBuf 的简介
1、⾸先,ByteBuf 依然是个 Byte 数组的缓冲区,它的基本功能应该与 JDK 的 ByteBuffer ⼀致,提供以下⼏类基本功能。
7 种 Java 基础类型、byte 数组、ByteBuffer (ByteBuf〉等的读写;缓冲区⾃⾝的 copy 和 slice 等; 设置⽹络字节序; 构造缓冲区实例;操作位置指针等⽅法。
2、由于 JDK 的 ByteBuffer 已经提供了这些基础能⼒的实现,因此,Netty ByteBuf 的实现可以有两种策略。
1)、参考 JDK ByteBuffer 的实现,增加额外的功能,解决原 ByteBuffer 的缺点;
2)、聚合 JDK ByteBuffer,通过门⾯模式对其进⾏包装,可以减少⾃⾝的代码量,降低实现成本。
Netty ⾥两种⽅式都⽤了。
三、ByteBuf 的类层次关系
类所在的包⽬录:
由于 ByteBuf 的实现⾮常繁杂,因此我们不会对其所有⼦类都进⾏穷举分析,我们挑选ByteBuf 的主要接⼝实现类和主要⽅法进⾏分析说明。
1、从内存分配的⾓度看,ByteBuf 可以分为两类。
(1)堆内存(HeapByteBuf)字节缓冲区:特点是内存的分配和回收速度快,可以被 JVM ⾃动回收;缺点就是如果进⾏ Socket 的 I/O 读写,需要额外做⼀次内存复制,将堆内存对应的缓冲区复制到内核 Channel 中,性能会有⼀定程度的下降。
(2)直接内存(DirectByteBuf)字节缓冲区:⾮堆内存,它在堆外进⾏内存分配,相⽐于堆内存,它的分配和回收速度会慢⼀些,但是将它写⼊或者从 Socket Channel 中读取时,由于少了⼀次内存复制,速度⽐堆内存快。
正是因为各有利弊,所以 Netty 提供了多种 ByteBuf 供开发者使⽤,经验表明,ByteBuf 的最佳实践是在 IO 通信线程的读写缓冲区使⽤DirectByteBuf,后端业务消息的编解码模块使⽤ HeapByteBuf,这样组合可以达到性能最优。
2、从内存回收⾓度看,ByteBuf 也分为两类:
基于对象池的 ByteBuf 和普通 ByteBuf。两者的主要区别就是基于对象池的 ByteBuf 可以重⽤ ByteBuf 对象,它⾃⼰维护了⼀个内存池,可以循环利⽤创建的 ByteBuf,提升内存的使⽤效率,降低由于⾼负载导致的频繁 GC。测试表明使⽤内存池后的 Netty 在⾼负载、⼤并发的冲击下内存和 GC
更加平稳。 尽管推荐使⽤基于内存池的 ByteBuf,但是内存池的管理和维护更加复杂,使⽤起来也需要更加谨慎,因此,Netty 提供了灵活的策略供使⽤者来做选择。
其实,ByteBuf 的分类还有⼀种,对内存的操作⽅式:我们知道,jdk 提供了底层的 Unsafe 类进⾏读写,通过 Unsafe 类可以直接拿到对象的内存地址,并且基于这个内存地址进⾏读写操作。Netty 也提供了相关的 ByteBuf 的实现,UnpooledUnsafeDirectByteBuf、PooledUnsafeDirectByteBuf、UnpooledUnsafeHeapByteBuf、PooledUnsafeHeapByteBuf,基本上都是直接调⽤ unsafe 对象的 native API,根据数组对象与偏移量来获取数据,效率相对⽽⾔较⾼。
通过前⾯ Netty 的使⽤中,我们已经知道,在我们的程序中获得 ByteBuf 的实例 Netty提供了两种⽅式 ByteBufAllocator 接⼝和Unpooled ⼯具类,对它们的了解也是必须的。
四、ByteBufAllocator 的类层次关系
ByteBufAllocator 类的层次关系如下:
1、 在具体的实现类 UnpooledByteBufAllocator 内部使⽤了 ByteBuf 的实例化⽅法。⽽ Unpooled 实际上在内部进⾏ ByteBuf 的实例化时,也是使⽤的 UnpooledByteBufAllocator 来进⾏的。
public final class Unpooled {
private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT;
/**
* Big endian byte order.
*/
public static final ByteOrder BIG_ENDIAN = ByteOrder.BIG_ENDIAN;
/**
* Little endian byte order.
*/
public static final ByteOrder LITTLE_ENDIAN = ByteOrder.LITTLE_ENDIAN;
}
PooledByteBufAllocator 相对来说要特殊⼀点。
2、AbstractByteBuf 的源码
ByteBuf ⾥都是抽象⽅法,所以我们⾸先看 AbstractByteBuf, ByteBuf 的⼀些公共属性和功能会在 AbstractByteBuf 中实现,下⾯我们对其属性和重要代码进⾏分析解读。
1)成员变量
⾸先,像读索引、写索引、mark、最⼤容量等公共属性需要定义,相关源码:
static final ResourceLeakDetector<ByteBuf> leakDetector =
ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ByteBuf.class);
int readerIndex;
int writerIndex;
private int markedReaderIndex;
private int markedWriterIndex;
private int maxCapacity;
我们发现,在 AbstractByteBuf 中并没有定义 ByteBuf 的缓冲区实现,例如 byte 数组或者 DirectByteBuffer。原因显⽽易见,因为AbstractByteBuf 并不清楚⼦类到底是基于堆内存还是直接内存,因此⽆法提前定义。
2)读操作
⽆论⼦类如何实现 ByteBuf,例如 UnpooledHeapByteBuf 使⽤ byte 数组表⽰字节缓冲区,UnpooledDirectByteBuf 直接使⽤ByteBuffer,它们的功能都是相同的,操作的结果是等价的。因此,读操作以及其他的⼀些公共功能都由⽗类实现,差异化功能由⼦类实现,这也就是抽象和继承的价值所在。当然与读操作相关的⽅法很多,我们选择性的看看 readBytes(byte[] dst, int dstIndex, int length)
@Override
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
checkReadableBytes(length);
getBytes(readerIndex, dst, dstIndex, length);
readerIndex += length;
return this;
}
在读之前,⾸先对缓冲区的可⽤空间进⾏校验,校验通过之后,调⽤ getBytes ⽅法,从当前的读索引开始,复制 length 个字节到⽬标byte 数组中。由于不同的⼦类复制操作的技术实现细节不同,因此该⽅法由⼦类实现:
如果读取成功,需要对读索引进⾏递增:readerIndex += length。其他类型的读取操作与之类似。
3)写操作
实例化⼀个 ByteBuf 对象的时候,是可以设置⼀个 capacity 和⼀个 maxCapacity,当writerIndex 达到 capacity 的时候,再往⾥⾯写⼊内容,ByteBuf 就会进⾏扩容。与读取操作类似,写操作的公共⾏为在 AbstractByteBuf 中实现。我们选择与读取配套的
writeBytes(byte[ ] src, int srcIndex, int length)进⾏分析,它的功能是将源字节数组中从 srcIndex 开始,到 srcIndex + length 截⽌的字节数组写⼊到当前的ByteBuf 中。
@Override
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
ensureWritable(length);
tBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}
⾸先对写⼊字节数组的长度进⾏合法性校验以及扩容处理。
@Override
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
ensureWritable0(minWritableBytes);
return this;
}
这⾥的 minWritableBytes 代表的是写的长度,校验 length 是否 >= 0, 不满⾜则报错。
final void ensureWritable0(int minWritableBytes) {
ensureAccessible();
if (minWritableBytes <= writableBytes()) {
return;
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// Normalize the current capacity to the power of 2.
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
// Adjust to the new capacity.
capacity(newCapacity);
}
⾸先计算并检查容量⼤⼩是否超过了 maxCapacity,如果超过了则抛出⼀个 IndexOutOfBoundsException 异常。calculateNewCapacity()⽅法则是实现扩容的核⼼, 这个⽅法的整体思路是:有⼀个控制阈值,系统定义为 4M(这是个经验值)。⾸先还是进⾏各种检查,如果容量较⼩,2 的指数倍递增是较为合理的,因为不会造成很⼤的内存浪费,⽽且可以减少反复扩容的次数;如果容量⽐较⼤时,⽐如 10M 的空间,进⾏ 2 的指数倍新增后容量为 20M,所以这⾥的就造成了空间的浪费,⼀般都是先进⾏指数倍递增(64->128->256->512…..),到达阀值后 4M,按阀值的步进递增。
五、AbstractReferenceCountedByteBuf
从类的名字就可以看出该类主要是对引⽤进⾏计数,类似于JVM 内存回收的对象引⽤计数器,⽤于跟踪对象的分配和销毁,做⾃动内存回收。在具体的实现上,使⽤了Java 并发编程⾥的 CAS 原⼦类型。
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
private volatile int refCnt;
protected AbstractReferenceCountedByteBuf(int maxCapacity) {
super(maxCapacity);
refCntUpdater.t(this, 1);
}
@Override
public int refCnt() {
return refCnt;
}
⾸先看第⼀个字段 refCntUpdater,它是 AtomicIntegerFieldUpdater 类型变量,通过原⼦的⽅式对成员变量进⾏更新等操作,以实现线程安全,消除锁。最后定义了⼀个 volatile 修饰的 refCnt 字段⽤于跟踪对象的引⽤次数,使⽤ volatile 是为了解决多线程并发访问的可见性问题,此处不对 volatile 的⽤法展开说明。每调⽤⼀次 retain ⽅法,引⽤计数器就会增加,具体的增加则是使⽤了原⼦类型的getAndAdd,这是个线程安全的⽅法。每调⽤⼀次 relea ⽅法,引⽤计数器就会减少,同时,如果发现引⽤计数为 0,则会进⾏缓存的释放 deallocate,这个⽅法则是由⼦类负责实现的。
六、UnpooledHeapByteBuf
UnpooledHeapByteBuf 是基于堆内存进⾏内存分配的字节缓冲区,它没有基于对象池技术实现,这就意味着每次 IO 的读写都会创建⼀个新的 UnpooledHeapByteBuf,频繁进⾏⼤块内存的分配和回收对性能会造成⼀定影响,但是相⽐于堆外内存的申请和释放,它的成本还是会低⼀些。相⽐于 PooledHeapByteBuf,UnpooledHeapByteBuf 的实现原理更加简单,也不容易出现 内存管理⽅⾯的
问题,因此在满⾜性能的情况下,推荐使⽤ UnpooledHeapByteBuf。⾸先看下 UnpooledHeapByteBuf 的成员变量定义
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
private final ByteBufAllocator alloc;
byte[] array;
private ByteBuffer tmpNioBuf;
⾸先,它聚合了⼀个 ByteBufAllocator,⽤于 UnpooledHeapByteBuf 的内存分配,紧接着定义了⼀个 byte 数组作为缓冲区,最后定义了⼀个 ByteBuffer 类型的 tmpNioBuf 变量⽤于实现 Netty ByteBuf 到 JDK NIO ByteBuffer 的转换。事实上,如果使⽤ JDK 的ByteBuffer 替换 byte 数组也是可⾏的,直接使⽤ byte 数组的根本原因就是提升性能和更加便捷地进⾏位操作。JDK 的 ByteBuffer 底层实现也是 byte 数组。知道了这个类由哪些成员变量,那么成员⽅法基本上都是围绕着这个 byte 数组进⾏操作,包括在⽗类AbstractByteBuf 中没有实现的 tBytes,getBytes 等等。 转换成 JDK ByteBuffer,其实也很简单。因为 ByteBuf 基于 byte 数组实现,NIO 的 ByteBuffer 提供了 wrap ⽅法,可以将 byte 数组转换成 ByteBuffer 对象。
@Override
public ByteBuffer nioBuffer(int index, int length) {
ensureAccessible();
return ByteBuffer.wrap(array, index, length).slice();
}
七、UnpooledDirectByteBuf 源码:

本文发布于:2023-05-12 01:55:49,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/593229.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:内存   实现   缓冲区   操作   分配   数组   对象
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图