34.RocketMQ之Broker端消息存储流程详解

2023年 7月 13日 62.8k 0

Broker消息存储概要设计

RocketMQ主要存储的文件包括Commitlog文件,ConsumeQueue文件,IndexFile文件。

RMQ把所有主题的消息存储在同一个文件中,确保消息发送时顺序写文件。

为了提高消费效率引入了ConsumeQueue消息队列文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。

IndexFile索引文件,其主要设计理念就是为了加速消息的检索性能,根据消息的属性快速从Commitlog文件中检索消息。

初识Broker消息存储

消息存储实现类:org.apache.rocketmq.store.DefaultMessageStore,它是存储模块里面最重要的一个类,包含了很多对存储文件的操作API。

    // 消息存储配置属性    
    private final MessageStoreConfig messageStoreConfig;
    // CommitLog 文件的存储实现类
    private final CommitLog commitLog;
    // 消息队列存储缓存表,按消息主题分组
    private final ConcurrentMap consumeQueueTable;
    // ConsumeQueue刷盘线程
    private final FlushConsumeQueueService flushConsumeQueueService;
    // 清除CommitLog文件服务
    private final CleanCommitLogService cleanCommitLogService;
    // 清除ConsumeQueue文件服务
    private final CleanConsumeQueueService cleanConsumeQueueService;
    // 索引文件实现类
    private final IndexService indexService;
    // MappedFile分配服务
    private final AllocateMappedFileService allocateMappedFileService;
    // CommitLog消息分发,根据CommitLog文件构建ConsumeQueue,IndexFile文件
    private final ReputMessageService reputMessageService;
    // 存储HA机制
    private final HAService haService;
    // 消息堆内存缓存
    private final TransientStorePool transientStorePool;
    // 消息拉取长轮询模式消息达到监听器
    private final MessageArrivingListener messageArrivingListener;
    // Broker属性配置类
    private final BrokerConfig brokerConfig;
    // 文件刷盘监测点
    private StoreCheckpoint storeCheckpoint;
    // CommitLog文件转发请求
    private final LinkedList dispatcherList;

消息发送存储流程

Step1:如果Broker停止工作或者为Slave,则拒绝消息写入。

如果消息长度超过256个字符,消息属性长度超过65536个字符将拒绝改消息的写入。

Step2:如果消息的延迟级别大于0,将消息的原主题名称与原消息队列ID存入消息属性中,用延迟消息主题SCHEDULE_TOPIC、消息队列ID更新原先消息的主题与队列,这是并发消息消费重试关键的一步。

Step3:获取当前可以写入的Commitlog文件。每一个文件默认1G,一个文件写满后再创建另外一个,以该文件中第一个偏移量为文件名,偏移量小于20位用0补齐。第一个文件初始偏移量为0,第二个文件的偏移量是1073741824,代表该文件中的第一条消息的物理偏移量为1073741824,这样就能快速定位到消息。

Step4:对于rocketmq来说,存储消息的主要文件被称为CommitLog,因此就从该类入手。处理存储请求的入口方法是asyncPutMessage,主要流程如下:

    public CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) { 	
    	//可能会有多个线程并发请求,虽然支持集群,但是对于每个单独的broker都是本地存储,所以内存锁就足够了
        putMessageLock.lock();
        try {
        	//获取最新的文件
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            ...
            //如果文件为空,或者已经存满,则创建一个新的commitlog文件
            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
            }
            ...
            //调用底层的mappedFile进行出处,但是注意此时还没有刷盘
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            ...
        } finally {
            putMessageLock.unlock();
        }
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    }

在写入CommitLog之前,先申请putMessageLock,也就是将消息存储到CommitLog文件中是串行的。

可能会有多个线程并发请求,虽然支持集群,但是对于每个单独的broker都是本地存储,所以单机内存锁就足够。

接口PutMessageLock

    public interface PutMessageLock {
        void lock();
        void unlock();
    }

重入实现PutMessageReentrantLock

    public class PutMessageReentrantLock implements PutMessageLock {
        private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync

        @Override
        public void lock() {
            putMessageNormalLock.lock();
        }

        @Override
        public void unlock() {
            putMessageNormalLock.unlock();
        }
    }

自旋实现PutMessageSpinLock

    public class PutMessageSpinLock implements PutMessageLock {
        //true: Can lock, false : in lock.
        private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);

        @Override
        public void lock() {
            boolean flag;
            do {
                flag = this.putMessageSpinLock.compareAndSet(true, false);
            }
            while (!flag);
        }

        @Override
        public void unlock() {
            this.putMessageSpinLock.compareAndSet(false, true);
        }
    }

异步刷盘建议用自旋锁,同步刷盘建议用重入锁:useReentrantLockWhenPutMessage默认false使用自旋锁;

异步刷盘建议开启TransientStorePoolEnable;建议关闭transferMsgByHeap,提高拉消息效率;

同步刷盘建议适当增大sendMessageThreadPoolNums,具体配置需要经过压测。

自旋锁

互斥同步对性能最大的影响是阻塞的实现,挂起线程和恢复线程的操作都需要转入内核态中完成,这些操作给Java虚拟机的并发性能带来了很大的压力,共享数据的锁定状态只会持续很短的一段时间,为了这段时间去挂起和恢复线程并不值得。

如果物理机器有一个以上的处理器或者处理器核心,能让两个或以上的线程同时并行执行,我们就可以让后面请求锁的那个线程“稍等一会”,但不放弃处理器的执行时间,看看持有锁的线程是否很快就会释放锁。为了让线程等待,我们只须让线程执行一个循环(自旋),这项技术就是所谓的自旋锁。

自旋等待本身虽然避免了线程切换的开销,但它是要占用处理器时间的,所以如果锁被占用的时间很短,自旋等待的效果就会非常好,反之如果锁被占用的时间很长,那么自旋的线程只会白白消耗处理器资源,而不会做任何有价值的工作,这就会带来性能的浪费。

也就是说我们要权衡自旋等待、线程的用户态与内核态切换的开销,哪个更大?

重入锁

当一个线程要获取一个被其他线程持有的独占锁时,该线程会被阻塞,那么当一个线程再次获取它自己己经获取的锁时是否会被阻塞呢?如果不被阻塞,那么我们说该锁是可重入的。

了解了自旋锁和可重入锁,我们在看看如何回答:为什么建议“异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁”?

同步刷盘时,锁竞争激烈,会有较多的线程处于等待阻塞等待锁的状态,如果采用自旋锁会浪费很多的CPU时间,所以“同步刷盘建议使用重入锁”。

异步刷盘是间隔一定的时间刷一次盘,锁竞争不激烈,不会存在大量阻塞等待锁的线程,偶尔锁等待就自旋等待一下很短的时间,不要进行上下文切换了,所以采用自旋锁更合适。

Step5:设置消息的存储时间,如果mappedFile为空,表明 /commitlog 目录下不存在任何文件,说明本次消息是第一次消息发送,用偏移量0创建第一个 commit 文件。

messageExtBatch.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
    mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
}
if (null == mappedFile) {
    log.error("Create mapped file1 error");
    beginTimeInLock = 0;
 return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}

Step6:将消息追加到 MappedFile 中。首先先获取 MappedFile 当前写指针,如果currentPos大于或等于文件大小则表明文件已写满,抛出UNKNOWN_ERROR。如果currentPos小于文件大小,通过 slice() 方法创建一个与MappedFile的贡献内存区,并设置position为当前指针。

int currentPos = this.wrotePosition.get();

if (currentPos < this.fileSize) {
    ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
    byteBuffer.position(currentPos);
    AppendMessageResult result;
    if (messageExt instanceof MessageExtBrokerInner) {
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
    } else if (messageExt instanceof MessageExtBatch) {
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
    } else {
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
    this.wrotePosition.addAndGet(result.getWroteBytes());
    this.storeTimestamp = result.getStoreTimestamp();
    return result;
}

Step7:创建全局唯一消息ID,消息ID有16字节,消息ID前4字节是IP,中间4字节是端口号,最后8字节是消息偏移量。

    long wroteOffset = fileFromOffset + byteBuffer.position();
    this.resetByteBuffer(storeHostHolder, storeHostLength);
    String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);

但是为了消息ID可读性,返回给应用程序的msgId为字符类型,可以通过UtilAll.byte2string方法将msgId字节数组转换成字符串,通过UtilAll.string2bytes方法将msgId字符串还原成16个字节的字节数组,从而根据提取消息偏移量,可以快速通过msgId找到消息内容。

Step8:获取该消息在消息队列的偏移量,CommitLog中保存了当前所有消息队列的当前待写入偏移量

keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
    queueOffset = 0L;
    CommitLog.this.topicQueueTable.put(key, queueOffset);
}

Step9:根据消息体的长度、主题的长度、属性的长度结合消息存储格式计算消息的总长度。

    protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
        	
            final int msgLen =
                4 	//TOTALSIZE:该消息条目总长度,4字节
                + 4 //MAGICCODE:魔数,4字节。固定值0xdaa320a7
                + 4 //BODYCRC:消息体crc校验码,4字节
                + 4 //QUEUEID:消息消息队列ID,4字节
                + 4 //FLAG:消息FLAG,RMQ不做处理,供应用程序使用,默认4字节
                + 8 //QUEUEOFFSET:消息在消息消费队列的偏移量,8字节
                + 8 //PHYSICALOFFSET:消息在CommitLog文件中偏移量,8字节
                + 4 //SYSFLAG:消息系统Flag,例如是否压缩,是否是事务消息等,4字节
                + 8 //BORNTIMESTAMP:消息生产者调用消息发送API的时间戳,8字节
                + bornhostLength //BORNHOST:发送者IP,端口,8字节
                + 8 //STORETIMESTAMP:消息存储时间戳,8字节
                + storehostAddressLength //STOREHOSTADDRESS:BrokerIP和端口,8字节
                + 4 //RECONSUMETIMES:消息重试次数,4字节
                + 8 //Prepared Transaction Offset:事务消息物理偏移量,8字节
                + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY:消息体长度,4字节
                //TOPIC:主题存储长度,1字节
                + 1 + topicLength
                 //propertiesLength:消息属性,长度为PropertiesLength中存储的值
                + 2 + (propertiesLength > 0 ? propertiesLength : 0)
                + 0;
            return msgLen;
        }

上述表明CommitLog条目是不定长的,每一个条目的长度存储在前4个字节中。

Step10:如果消息长度 + END_FILE_MIN_BLANK_LENGTH 大于 CommitLog文件的空闲空间,则返回AppendMessageStatus.END_OF_FILE,Broker会创建一个新的CommitLog文件来存储该消息。

从这里可以看出,每个CommitLog文件最少会空闲8字节,高4字节存储当前文件剩余空间,低四字节存储魔数:CommitLog.BLANK_MAGIC_CODE。

final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

Step11:将消息内容存储到ByteBuffer中,然后创建AppendMessageResult。把消息存储在MappedFile对应的内存映射Buffer中,并没有刷写到磁盘。

       // Return code
        private AppendMessageStatus status;
        // Where to start writing
        private long wroteOffset;
        // Write Bytes
        private int wroteBytes;
        // Message ID
        private String msgId;
        // Message storage timestamp
        private long storeTimestamp;
        // Consume queue's offset(step by one)
        private long logicsOffset;
        private long pagecacheRT = 0;
        private int msgNum = 1; //批量发送消息条数


    /**
     * When write a message to the commit log, returns code
     */
    public enum AppendMessageStatus {
        PUT_OK,
        END_OF_FILE, //超过文件大小
        MESSAGE_SIZE_EXCEEDED, //消息长度超过最大允许长度
        PROPERTIES_SIZE_EXCEEDED, //消息属性超过最大允许长度
        UNKNOWN_ERROR,
    }

Step12:更新消息队列逻辑偏移量

Step13:处理完消息追加逻辑后将释放putMessageLock锁

Step14:DefaultAppendMessageCallback#doAppend只是将消息追加在内存中,需要根据同步刷盘还是异步刷盘方式,将内存中的数据持久化道磁盘,然后执行HA主从同步复制。

存储文件组织与内存映射 RMQ通过使用内存映射文件来提高IO访问性能,无论是CommitLog、ConsumeQueue还是IndexFile,单个文件都被设计成固定长度,如果一个文件写满后再创建一个新文件,文件名就是第一条消息的偏移量。

RocketMQ使用MappedFile、MappedFileQueue来封装存储文件,一个MappedFileQueue可以包含多个MappedFile。

Broker收到消息之后的处理流程:简单来说,roker通过Netty网络服务器获取到条消息,接着就会把这条消息写入到一个Commitlog文件里去,一个Broker机器上就只有一个CommitLog文件,所有Topic的消息都会写入到一个文件里去。

然后还会以异步的方式把消息写入到ConsumeQueue文件里去,因为一个Topic有多个MessageQueue,任何条消息都是写入一个MessageQueue的,那个MessaqeQueue其实就是对应了一个ConsumeQueue文件
所以一条写入MessageQueue的消息,必然会异步进入对应的ConsumeQueue文件。

同时还会异步把消息写入一个ndexFile里,在里面主要就是把每条消息的key和消息在CommitLog中的offset偏移量做一个索引,这样后续如果要根据消息key从CommitLog文件里查询消息,就可以根据IndexFile的索引来了。

接着我们来一步一步分析一下这里写入这几个文件的一个流程。

首先Broker收到一个消息之后,必然是先写入CommitLog文件的,那么这个CommitLog文件在磁盘上的目录结构大致如何呢? 看下面
CommitLog文件的存储目录是在SROCKETMQ HOME/store/commitlog下的,里面会有很多的CommitLog文件,每个文件默认是1GB大小,一个文件写满了就创建一个新的文件,文件名的话,就是文件中的第一个偏移量,如人面所示。文件名如果不足20位的话,就用0来补齐就可以了。
000000000000000000000
000000000003052631924
在把消息写入CommitLog文件的时候,会申请 个putMessageLock锁
也就是说,在Broker上写入消息到CommitLog文件的时候,都是事行的,不会让你并发的写入,并发写入文件必然会有数据错乱的问题,下面是源码片段

protected final PutMessageLock putMessageLock;
putMessageLock .lock()

接着其实会对消息做出一通处理,包括设置消息的存储时间、创建全局唯一的消息D、计算消息的总长度,然后会走段很关键的源码,把消息写入到MappedFile里去,这个其实我们之前还讲解过里面的黑科技,看下面的源码。

if (messageExt instanceof MessapeExtBrokerInner) {
    result = cb.doAppend(this.getFileFromoffset(), 
                        byteBuffer, 
                        this.filesize - currentpos,
                         (MessapeExtBrokerInner) messageExt);
}

上面源码片段中,其实最关键的是cb.doAppend这行代码,这行代码其实是把消息追加到MappedFile映射的一块内存里去,并没有直接刷入磁盘中.

至于具体什么时候才会把内存里的数据刷入磁盘,其实要看我们配置的刷盘策略,另外就是不管是同步刷盘还是异步刷盘,假设你配置了主从同步,一旦你写入完消息到CommitLog之后,接下来都会进行主从同步复制的。

Broker收到一条消息之后,其实就会直接把消息写入到Commitlod里去,但是他写入刚开始仅仅是写入到MappedFile映射的一块内存里去,后续是根据刷盘策略去决定是否立即把数据从内存刷入磁盘的.

这个消息写入CommitLog之后,然后消息是如何进入ConsumeQueue和IndexFile的。
实际上,Broker启动的时候会开启一个线程,ReputMessageService,他会把CommitLog更新事件转发出去,然后让任务处理器去更新ConsumeQueue和IndexFile.

我们看下面的源码片段,在DefaultMessageStore的start方法里,在里面就是启动了这个ReputMessageService线程。
这个DefaultMessageStore的start方法就是在Broker启动的时候调用的,所以相当于是Broker启动就会启动这个线程.

image.png

下面我们看这个ReputlMessageService线程的运行逻辑,源码片段如下所示

image.png

也就是说,在这个线程里,每隔1毫秒,就会把最近写入CommitLog的消息进行一次转发,转发到ConsumeQueue和ndexFile里去,通过的是doReput0方法来实现的,我们再看doReput方法里的实现逻辑,先看下面源码片段.

image.png

这段代码意思非常的清晰明了,就是从commitLog中去获取到一DispatchRequest拿到了一份需要进行转发的消息,也就是从CommitLog中读取的。

接着他就会通过下面的代码,调用doDispatch0方法去把消息进行转发,一个是转发到ConsumeQueue里去,一个是转发到IndexFile里去
大家看下面的源码片段,里面走了CommitLogDispatcher的循环

image.png

实际上F常来说这个CommitLoaDispatcher的实现类有两,分别是CommitLoaDispatcherBuildConsumeQueue和CommitLoaDispatcherBuildlindex,他们俩分别会负责把消息转发到ConsumeQueue和IndexFile。

接着我们看一下ConsumeQueueDispatche的源码实现逻辑,其实非常的简单,就是找到当前Topic的messageQueueld对应的一个ConsumeQueue文件
个MessageQueue会对应多个ConsumeQueue文件,找到一个即可,然后消息写入其中

image.png

再来看看indexFile的写入逻辑,其实也很简单,无非就是在ndexFile里去构建对应的索引罢了,如下面的源码片段

image.png

因此到这里为止,我想大家基本就看明白了,当我们把消息写入到CommitLog之后,有一个后台线程每隔1毫秒就会去拉取CommitLog中最新更新的一批消息,然后分别转发到ConsumeQueue和ndexFile里去,这就是他底层的实现原理

ConsumerQueue 消息消费队列是专门为消息订阅构建的索引文件,提高根据主题与消息队列检索消息的速度。

IndexFile主要用于根据Message Key和Unique Key检索消息

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论