1.引言
Hi,大家好,我是有清
缓存我们都知道,在我们日常的工作过程中一般都有使用,一般我们使用的都是堆内缓存,但是我一提到堆外缓存,想必
堆外缓存顾名思义,这部分数据是放在堆内存以外,因为堆外内存并不在 GC 的工作范围之内,所以可以有效的避免缓存过大对于 GC 的影响,听起来好像很棒的样子
但是我们今天讲的堆外缓存池更棒,它可以使我们的 RocketMQ 性能加倍,那它具体是如何助力 RocketMQ,让我来 see 一 see
2.设计背景
先来个 🌰 : RocketMQ 中的消息文件,我们如何设计可以提升它的写入速度呢?
第一步:暴力写入本地文件:我们将消息直接写到本地文件,然后用户需要消费信息的时候,去文件进行 IO 读写即可
我们知道在 Java 程序中,进行读取文件这样的 IO 操作的时候,一般都会涉及到用户态和内核态的切换,在我们第一次读取这个消息文件的时候,发现缓存中没有数据,那么系统就会切换到内核态,操作系统程序,将文件拷贝到缓存中。然后再切换回用户态,执行用户程序将该内容拷贝到 Java 内存里,这样 Java 程序就能读到文件内容
弊端在于用户态和内核态的切换过于消耗 CPU 资源,并且 RocketMQ 中存在大量这样的 IO 操作,所以这个方法不是最优雅的实现
第二步:借助 Linux 中的 mmap 机制:mmap 简单来说,就是将文件信息映射到内存中,进程可以通过操作内存的方式,实现对文件的操作
这样做的好处直接避免了用户态和内核态的切换,用户操作文件会变的非常丝滑,但是尽管 mmap 机制使用的也是堆外缓存,但是在某些高并发的场景下,进程会将堆外内存交换到磁盘
第三步:借助堆外缓存池:单独创建一个内存缓存池,用来临时存储数据,数据首先写入该池子,然后由 commit 线程定时借助 mmap 机制进行虚拟内存和文件映射,这样的话就将我们的堆外内存一直牢牢锁定在堆外,避免被交换到磁盘上
3.代码分析
设计背景已经清晰了,我们就看看这个堆外缓存池具体是怎么实现的
代码地址:org.apache.rocketmq.store.TransientStorePool
(差评代码上没有注释,只有版权声明)
TransientStorePool 实现的很简单,我们先整体看一下
初始化方法
/**
* It's a heavy init method.
* 它自己也承认了 它自己是一个很重的方法,这个初始化方法会默认情况下会向系统索要 5g 的堆外内存, poolSize 默认为 5,fileSize 默认为 1g
*/
public void init() {
// 这个 poolSize 默认是 5,由 MessageStoreConfig() 决定
for (int i = 0; i < poolSize; i++) {
// 申请直接缓冲区(这部分位于堆外)
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
// 获取直接缓冲区的内存地址
final long address = ((DirectBuffer) byteBuffer).address();
// 构造一个本地指针对象
Pointer pointer = new Pointer(address);
// 调用 mlock 锁定该文件的 Page Cache,防止其被交换到 swap 空间
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
// 放入队列,取的时候从 availableBuffers 中取,同样哪里取的哪里还
availableBuffers.offer(byteBuffer);
}
}
销毁方法
/**
* 销毁方法:队列中有多少数据,就解锁多少,然后系统会将剩下的数据归还到 swap 空间,
* 一般只有 shuwDown 的时候调用该方法 */
public void destroy() {
for (ByteBuffer byteBuffer : availableBuffers) {
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
}
}
申请缓冲区
/**
* 借出缓冲区的数据来写数据 */
public ByteBuffer borrowBuffer() {
ByteBuffer buffer = availableBuffers.pollFirst();
if (availableBuffers.size() < poolSize * 0.4) {
log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
}
return buffer;
}
归还缓冲区
/**
* 对应 DefaultMappedFile 中的 commit,刷盘成功后归还数据 */
public void returnBuffer(ByteBuffer byteBuffer) {
byteBuffer.position(0);
byteBuffer.limit(fileSize);
this.availableBuffers.offerFirst(byteBuffer);
}
DefaultMappedFile 使用
接下来我们看看在实际中,是如何使用 TransientStorePool 的
org.apache.rocketmq.store.logfile.DefaultMappedFile
在 DefaultMappedFile 的 init 中,如果你指定来使用 TransientStorePool,那么 DefaultMappedFile 中的 writeBuffer,将默认调用 transientStorePool.borrowBuffer() ,即上文中的申请缓冲区方法,使用堆外缓冲区
具体使用我们可以参考 DefaultMappedFile 中 appendMessage,
开启堆外缓存主从同步,异步刷盘 rt 变高?
这一个 Issue 还蛮有意思的, 所以单独拎出来讲讲
github 上的一个老哥反馈自己的机器32g内存 -Xms16g -Xmx16g -Xmn8g,模式是主从同步异步刷盘,TransientStorePool 不开启时生产 rt 不到2ms,开启后 > 100ms
莫非我们的 TransientStorePool 是一把双刃剑?降低了 GC,拉高了 RT
另外一位老哥给出了回答:
其实核心代码在于同步 slave 中耗时过长,其中 HAService#run中selector.select(1000)花费了大量时间,大概200ms
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
if (this.connectMaster()) {
if (this.isTimeToReportOffset()) {
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
// 省略....
}
简单来说就是,开启TransientStorePool后,master写入变成 先写堆外内存 ,然后批量 commit 到 FileChannel写入,而主从同步判断能同步到的消息是已经 commit 到 FileChannel 的消息,而消息由堆外内存 commit 到 PageCache 是有一定频率的,是受commitIntervalCommitLog,commitCommitLogThoroughInterval两个参数影响,默认值是200ms。
我们可以做的是将c ommitIntervalCommitLog,commitCommitLogThoroughInterval 调小,然后通过不断的压测去找到最合适的参数即可
4.闲言碎语
打个广告:飞猪旅行 p5、p6 内推,流程快,免费简历指导,实时进度跟踪
今天看了b站里的一个视频
一个 up 主,在北京海拔最高的山顶,开了一家“一日快闪”咖啡店,一杯咖啡售价为 “1个故事”
其中有一个开面馆的小哥分享的故事是,他最早想开一个面馆,然后觉得当时开面馆的契机不是最好的,他想晚两年,等自己成熟一点再去开这个面馆
其实我感觉这个小哥的状态和大多数人的状态其实都挺像的,想要等事情成熟了、自己有信心了,准备好了再上手,但其实有时候生活中的一些机会就是会这样过去,或许只有想做的时候就勇敢的去做,像尝试的时候就勇敢的去尝试,这样生活才能叫做生活