缓存你知道,堆外缓存池你知道吗?

2023年 7月 25日 72.7k 0

1.引言

Hi,大家好,我是有清

缓存我们都知道,在我们日常的工作过程中一般都有使用,一般我们使用的都是堆内缓存,但是我一提到堆外缓存,想必

疑惑.png

堆外缓存顾名思义,这部分数据是放在堆内存以外,因为堆外内存并不在 GC 的工作范围之内,所以可以有效的避免缓存过大对于 GC 的影响,听起来好像很棒的样子

但是我们今天讲的堆外缓存池更棒,它可以使我们的 RocketMQ 性能加倍,那它具体是如何助力 RocketMQ,让我来 see 一 see

2.设计背景

先来个 🌰 : RocketMQ 中的消息文件,我们如何设计可以提升它的写入速度呢?

第一步:暴力写入本地文件:我们将消息直接写到本地文件,然后用户需要消费信息的时候,去文件进行 IO 读写即可

我们知道在 Java 程序中,进行读取文件这样的 IO 操作的时候,一般都会涉及到用户态和内核态的切换,在我们第一次读取这个消息文件的时候,发现缓存中没有数据,那么系统就会切换到内核态,操作系统程序,将文件拷贝到缓存中。然后再切换回用户态,执行用户程序将该内容拷贝到 Java 内存里,这样 Java 程序就能读到文件内容

弊端在于用户态和内核态的切换过于消耗 CPU 资源,并且 RocketMQ 中存在大量这样的 IO 操作,所以这个方法不是最优雅的实现

第二步:借助 Linux 中的 mmap 机制:mmap 简单来说,就是将文件信息映射到内存中,进程可以通过操作内存的方式,实现对文件的操作

这样做的好处直接避免了用户态和内核态的切换,用户操作文件会变的非常丝滑,但是尽管 mmap 机制使用的也是堆外缓存,但是在某些高并发的场景下,进程会将堆外内存交换到磁盘

可恶.png

第三步:借助堆外缓存池:单独创建一个内存缓存池,用来临时存储数据,数据首先写入该池子,然后由 commit 线程定时借助 mmap 机制进行虚拟内存和文件映射,这样的话就将我们的堆外内存一直牢牢锁定在堆外,避免被交换到磁盘上

3.代码分析

设计背景已经清晰了,我们就看看这个堆外缓存池具体是怎么实现的

代码地址:org.apache.rocketmq.store.TransientStorePool
(差评代码上没有注释,只有版权声明)

TransientStorePool 实现的很简单,我们先整体看一下

初始化方法

/**  
 * It's a heavy init method. 
 * 它自己也承认了 它自己是一个很重的方法,这个初始化方法会默认情况下会向系统索要 5g 的堆外内存, poolSize 默认为 5,fileSize 默认为 1g
 */
 public void init() {
    // 这个 poolSize 默认是 5,由 MessageStoreConfig() 决定
    for (int i = 0; i < poolSize; i++) {  
        // 申请直接缓冲区(这部分位于堆外)
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);  
        // 获取直接缓冲区的内存地址
        final long address = ((DirectBuffer) byteBuffer).address();
        // 构造一个本地指针对象 
        Pointer pointer = new Pointer(address);  
        // 调用 mlock 锁定该文件的 Page Cache,防止其被交换到 swap 空间
        LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));  
        // 放入队列,取的时候从 availableBuffers 中取,同样哪里取的哪里还
        availableBuffers.offer(byteBuffer);  
    }  
}

销毁方法

/**  
 * 销毁方法:队列中有多少数据,就解锁多少,然后系统会将剩下的数据归还到 swap 空间,
 * 一般只有 shuwDown 的时候调用该方法 */
 public void destroy() {  
    for (ByteBuffer byteBuffer : availableBuffers) {  
        final long address = ((DirectBuffer) byteBuffer).address();  
        Pointer pointer = new Pointer(address);  
        LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));  
    }  
}

申请缓冲区

  
/**  
 * 借出缓冲区的数据来写数据 */
 public ByteBuffer borrowBuffer() {  
    ByteBuffer buffer = availableBuffers.pollFirst();  
    if (availableBuffers.size() < poolSize * 0.4) {  
        log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());  
    }  
    return buffer;  
}

归还缓冲区

/**  
 * 对应 DefaultMappedFile 中的 commit,刷盘成功后归还数据 */
 public void returnBuffer(ByteBuffer byteBuffer) {  
    byteBuffer.position(0);  
    byteBuffer.limit(fileSize);  
    this.availableBuffers.offerFirst(byteBuffer);  
}

DefaultMappedFile 使用

接下来我们看看在实际中,是如何使用 TransientStorePool 的

org.apache.rocketmq.store.logfile.DefaultMappedFile

在 DefaultMappedFile 的 init 中,如果你指定来使用 TransientStorePool,那么 DefaultMappedFile 中的 writeBuffer,将默认调用 transientStorePool.borrowBuffer() ,即上文中的申请缓冲区方法,使用堆外缓冲区

具体使用我们可以参考 DefaultMappedFile 中 appendMessage,

代码示例一.png

开启堆外缓存主从同步,异步刷盘 rt 变高?

这一个 Issue 还蛮有意思的, 所以单独拎出来讲讲

github 上的一个老哥反馈自己的机器32g内存 -Xms16g -Xmx16g -Xmn8g,模式是主从同步异步刷盘,TransientStorePool 不开启时生产 rt 不到2ms,开启后 > 100ms

莫非我们的 TransientStorePool 是一把双刃剑?降低了 GC,拉高了 RT

另外一位老哥给出了回答:

其实核心代码在于同步 slave 中耗时过长,其中 HAService#run中selector.select(1000)花费了大量时间,大概200ms

        public void run() {
            log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    if (this.connectMaster()) {

                        if (this.isTimeToReportOffset()) {
                            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                            if (!result) {
                                this.closeMaster();
                            }
                        }

                        this.selector.select(1000);

                        boolean ok = this.processReadEvent();
                        if (!ok) {
                            this.closeMaster();
                        }

                        if (!reportSlaveMaxOffsetPlus()) {
                            continue;
                        }
                        // 省略....
        }

简单来说就是,开启TransientStorePool后,master写入变成 先写堆外内存 ,然后批量 commit 到 FileChannel写入,而主从同步判断能同步到的消息是已经 commit 到 FileChannel 的消息,而消息由堆外内存 commit 到 PageCache 是有一定频率的,是受commitIntervalCommitLog,commitCommitLogThoroughInterval两个参数影响,默认值是200ms。
我们可以做的是将c ommitIntervalCommitLog,commitCommitLogThoroughInterval 调小,然后通过不断的压测去找到最合适的参数即可

4.闲言碎语

打个广告:飞猪旅行 p5、p6 内推,流程快,免费简历指导,实时进度跟踪

今天看了b站里的一个视频

一个 up 主,在北京海拔最高的山顶,开了一家“一日快闪”咖啡店,一杯咖啡售价为 “1个故事”

其中有一个开面馆的小哥分享的故事是,他最早想开一个面馆,然后觉得当时开面馆的契机不是最好的,他想晚两年,等自己成熟一点再去开这个面馆

其实我感觉这个小哥的状态和大多数人的状态其实都挺像的,想要等事情成熟了、自己有信心了,准备好了再上手,但其实有时候生活中的一些机会就是会这样过去,或许只有想做的时候就勇敢的去做,像尝试的时候就勇敢的去尝试,这样生活才能叫做生活

结尾图.png

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论