RocketMQ源码系列(6) — 消息存储之核心架构

2023年 9月 12日 74.2k 0

消息存储

上一篇文章主要介绍了 Broker 的基础架构设计和功能,Broker中还有消息生产、消息消费、Broker管理、事务管理、消息存储等核心功能,这篇文章我们先研究消息存储模块的设计,之后在这个基础上去研究生产者、消费者等相关功能就很容易了。通过研究RocketMQ的消息存储设计,我们也能掌握如何通过文件来高性能存储数据以及访问数据。

MessageStore

Broker 端是通过 MessageStore 来存储、读取消息,可以看到 MessageStore 主要提供了如下的一些写入消息、读取消息、读取消息偏移量等相关的接口。

public interface MessageStore {
    // 加载已存储的数据  
    boolean load();

    // 异步将消息写入存储  
    default CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg); 

    // 异步批量写入消息  
    default CompletableFuture asyncPutMessages(final MessageExtBatch messageExtBatch); 

    // 同步写入消息  
    PutMessageResult putMessage(final MessageExtBrokerInner msg);  

    // 同步批量写入消息  
    PutMessageResult putMessages(final MessageExtBatch messageExtBatch);

    // 查询消息
    GetMessageResult getMessage(final String group, String topic, int queueId, long offset, int maxMsgNums, MessageFilter messageFilter);  

    // 获取消息队列最大偏移量
    long getMaxOffsetInQueue(final String topic, final int queueId);

    // 通过偏移量读取一条消息
    MessageExt lookMessageByOffset(final long commitLogOffset);  
    
    // ......
}

MessageStore 的实现类是 DefaultMessageStore,消息存储的实现逻辑也非常复杂,有消息存储 CommitLog,磁盘文件映射 MappedFile,消费队列 ConsumeQueue,消息检索 IndexService 等核心组件。

image.png

从下图可以看到 DefaultMessageStore 的初始化以及加载过程。BrokerStartup 中创建了 BrokerController,BrokerController在初始化方法中创建了 DefaultMessageStore,然后调用其 load() 方法加载磁盘文件数据。之后 BrokerStartup 启动 BrokerController,BrokerController 的启动方法中,又启动了 DefaultMessageStore,其 start() 方法中启动了消息存储相关的组件。

image.png

文件锁定

在启动 BrokerStartup 时,DefaultMessageStore 构造方法中会创建 commitlog、consumequeue 等目录,期初目录下还没有文件。

File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));  
// /store  
MappedFile.ensureDirOK(file.getParent());  
// /store/commitlog  
MappedFile.ensureDirOK(getStorePathPhysic());  
// /store/consumequeue  
MappedFile.ensureDirOK(getStorePathLogic());
// /store/lock
lockFile = new RandomAccessFile(file, "rw");

image.png

最后可以看到一直创建了一个锁文件 lockFile,在 DefaultMessageStore 启动的时候,就会通过 lockFile 获取的 FileChannel 来锁定 /store/lock 文件。如果无法锁定,就直接抛出异常,这说明 /store 下的目录只会被一个 Broker 独占,其它进程都不可以再占用 /store 目录下的文件。它会锁定直到 DefaultMessageStore 停止的时候才会释放。

通过这里可以了解到 FileChannel 的一个特性,其支持对文件进行锁定,以避免多个进程或线程同时访问文件的问题。可以使用 lock() 和 tryLock() 方法来实现文件的独占锁定。

public void start() throws Exception {
    lock = lockFile.getChannel().tryLock(0, 1, false);
    if (lock == null || lock.isShared() || !lock.isValid()) {
        throw new RuntimeException("Lock failed,MQ already started");
    }

    lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
    lockFile.getChannel().force(true);
    
    this.commitLog.start();
    ....
}

public void shutdown() {
    this.commitLog.shutdown();
    ......

    if (lockFile != null && lock != null) {
        lock.release();
        lockFile.close();
    }
}

CommitLog

这篇文章我们就主要来研究消息存储核心组件之一的 CommitLog,DefaultMessageStore 写入、读取消息主要就是通过CommitLog来完成。

CommitLog

CommitLog 主要有如下一些组件和属性,大部分的组件都在 CommitLog 构造方法中做了初始化。

image.png

从 CommitLog 的构造方法中可以得知,commitlog 文件默认存储在 ${storePathRootDir}/commitlog 目录下。在构造方法中,创建了 commitlog 目录的映射对象 MappedFileQueue,然后在 CommitLog 加载的时候就会加载已存在的 commitlog 文件,映射为 MappedFile 对象。可以看到 commitlog 文件默认大小固定是 1GB。加载已存在的 commitlog 我们最后再来研究。

public CommitLog(final DefaultMessageStore defaultMessageStore) {
    // CommitLog 存储路径,默认在 ${storePathRootDir}/commitlog
    String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();

    // 将磁盘中的 CommitLog 做 MappedFile 内存映射
    this.mappedFileQueue = new MappedFileQueue(
             storePath,  // /commitlog 目录
             // commitlog 文件大小默认为 1GB
             defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
             // AllocateMappedFileService
             defaultMessageStore.getAllocateMappedFileService());
}

public boolean load() {  
    boolean result = this.mappedFileQueue.load();  
    log.info("load commit log " + (result ? "OK" : "Failed"));  
    return result;  
}

启动 CommitLog 的时候就会去启动一些辅助组件。

public void start() {
    this.flushCommitLogService.start();

    // 刷盘监控组件,守护线程
    flushDiskWatcher.setDaemon(true);
    flushDiskWatcher.start();

    if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
        this.commitLogService.start();
    }
}

消息结构

生产者向 Broker 发送消息时,DefaultMessageStore 会调用 CommitLog 的 asyncPutMessage 来写入消息,这节就来研究下消息写入过程以及如何存储的。

首先 asyncPutMessage 的入参是 MessageExtBrokerInner,它继承自 MessageExt,MessageExt 又继承自 Message。消息主要有如下的一些属性字段,最基础的便是消息投递到那个 Broker(brokerName)、哪个主题(topic)、主题下的哪个队列(queueId),以及消息内容(body)。

public class Message implements Serializable {
    // 消息投递到哪个 topic
    private String topic;
    // flag
    private int flag;
    // 消息属性
    private Map properties;
    // 消息内容
    private byte[] body;
    // 事务消息ID
    private String transactionId;
}

public class MessageExt extends Message {
    // Broker组
    private String brokerName;
    // Topic里的queue
    private int queueId;
    // 消息存储大小
    private int storeSize;
    // 消息队列偏移量
    private long queueOffset;
    // 系统标识
    private int sysFlag;
    // 消息诞生时间
    private long bornTimestamp;
    // 消息诞生的客户端网络连接地址
    private SocketAddress bornHost;
    // 消息存储时间
    private long storeTimestamp;
    // 消息存储的机器地址
    private SocketAddress storeHost;
    // 消息ID
    private String msgId;
    // 消息在commitlog里的偏移量
    private long commitLogOffset;
    // 消息体crc校验和
    private int bodyCRC;
    // 重新消费的次数
    private int reconsumeTimes;
    // 预准备事务偏移量
    private long preparedTransactionOffset;
}

public class MessageExtBrokerInner extends MessageExt {
    // 消息属性
    private String propertiesString;
    // tags code
    private long tagsCode;
    // 消息编码后的 buffer
    private ByteBuffer encodedBuff;
}

写入消息

我们可以在本地调试,首先启动 NamesrvStartupBrokerStartup,然后运行 rocketmq-example 下面的 org.apache.rocketmq.example.quickstart.Producer 来启动生产者发送一个消息,我们就可以在 asyncPutMessage 方法中打断点来调试消息的写入过程。

消息写入的主流程如下:

  • 设置消息存储时间戳、设置消息体 crc32 校验和,避免消息篡改

  • 设置消息来源服务器以及存储服务器的 IPv6 地址标识

  • 获取线程副本 PutMessageThreadLocal,它的内部有一个 MessageExtEncoder 的编码器用于编码消息。

    MessageExtEncoder 在消息编码时,就是将消息的一个个属性写入到一个 ByteBuffer 里。正常情况下 PutMessageResult 返回为 null,如果编码失败,比如消息默认不能超过4MB,超长就会认为消息非法然后返回一个 PutMessageResult,这个时候就会直接返回。

  • 如果消息编码成功,就会将编码得到的 ByteBuffer 设置到 MessageExtBrokerInner 里面,然后创建写入消息上下文对象 PutMessageContext

  • 接下来开始准备写入消息,先用 putMessageLock 加写锁,保证同一时刻只有一个线程能写入消息。从这里可以看出, CommitLog 写入消息是串行的,但后面的 IO 机制能保证串行写入的高性能。

    加锁之后,通过 MappedFileQueue 获取最后一个 MappedFile。前面已经大概了解到,commitlog 目录对应 MappedFileQueue,目录下的文件就对应多个 MappedFile,写满一个切换下一个,所以每次都应该写入最后一个。

    如果获取最后一个 MappedFile 返回 null,那么说明是程序初次启动,还没有 commitlog 文件。这时就会去获取偏移量为 0 的 MappedFile,也就是会创建第一个 commitlog 文件。

    接着就是向这个 MappedFile 追加消息 appendMessage,可以看到参数还传入了 AppendMessageCallback,这是在 MappedFile 写入消息后,就会执行这个回调。

  • 消息追加后,如果消息写满了,将会创建一个写的 MappedFile,继续写入消息。消息写入完成后,最后在 finally 中释放 putMessageLock 锁。

  • 如果MappedFile写满了,且启用了预热机制的情况下,就会调用 unlockMappedFile 解锁文件。这个其实是创建 MappedFile 的时候,如果启用了预热机制,就会提前把磁盘数据加载到内存区域,并调用mlock系统调用锁定 MappedFile 对应的内存区域。这里就是在写满 MappedFile 后,调用munlock系统调用解锁之前锁定的内存区域。这个我们讲 MappedFile 的时候再详细看。

  • 写入消息完了之后就是增加统计信息,统计topic写入次数以及写入消息总量等。

  • 最后就是同步提交 flush 请求和 replica 请求,就是强制刷盘,并将数据同步到 slave 节点,最后返回追加消息结果。

  • public CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) {
        // 设置存储时间戳
        msg.setStoreTimestamp(System.currentTimeMillis());
        // 设置消息体 crc32 校验和
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    
        // 消息诞生机器 IPv6 地址标识
        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setBornHostV6Flag();
        }
        // 消息存储机器 IPv6 地址标识
        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setStoreHostAddressV6Flag();
        }
    
        // 获取线程副本中的消息编码器对消息编码
        PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
        PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
        if (encodeResult != null) {
            return CompletableFuture.completedFuture(encodeResult);
        }
        
        // 设置消息编码后的 ByteBuffer
        msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
        // 创建写入消息上下文
        PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
    
        // 写入消息返回结果
        AppendMessageResult result = null;
        MappedFile unlockMappedFile = null;
        // 写入消息时加锁
        putMessageLock.lock();
        try {
            // 获取 MappedFile
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            
            // 设置消息存储时间戳
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            msg.setStoreTimestamp(beginLockTimestamp);
    
            if (null == mappedFile || mappedFile.isFull()) {
                // 第一个文件的起始偏移量就是 0,参数 startOffset 起始偏移量
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
            }
    
            // 向 MappedFile 追加消息
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:  // 文件写满了
                    unlockMappedFile = mappedFile;
                    // 一个文件写满了之后,创建一个新的文件,继续写这条消息
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                ......
                default:
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            }
        } finally {
            // 释放锁
            putMessageLock.unlock();
        }
    
        // munlock 解锁文件
        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }
    
        // 消息写入结果
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    
        // Statistics
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
    
        // 每次写完一条消息后,就会提交 flush 请求和 replica 请求
        CompletableFuture flushResultFuture = submitFlushRequest(result, msg);
        CompletableFuture replicaResultFuture = submitReplicaRequest(result, msg);
    
        // 等待 flush 和 replica 请求完成
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(flushStatus);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
            }
            return putMessageResult;
        });
    }
    

    一个整数存多个标识

    写入消息过程可以看到有如下两行代码,就是设置消息诞生和消息存储服务器IP地址是否为 IPv6 的标识。

    msg.setBornHostV6Flag();
    msg.setStoreHostAddressV6Flag();
    

    可以看到,这两个方法都是更新同一个整数 sysFlag,那么它是如何用一个整数存储多个标识的呢?

    private int sysFlag;
    
    public void setStoreHostAddressV6Flag() { 
        this.sysFlag = this.sysFlag | MessageSysFlag.STOREHOSTADDRESS_V6_FLAG; 
    }
    public void setBornHostV6Flag() { 
        this.sysFlag = this.sysFlag | MessageSysFlag.BORNHOST_V6_FLAG; 
    }
    

    首先可以看到 MessageSysFlag 中有很多标识,如果每个标识都在消息中单独用一个字段来存,那么这条消息就会多出几十个字节,消息存储在磁盘文件中就会占用更多存储空间。所以这里用一个 int 整数就存了10几个标识,将节约很多存储空间。

    public class MessageSysFlag {
    /**
    * Meaning of each bit in the system flag
    *
    * | bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
    * |--------|---|---|-----------|----------|-------------|------------------|------------------|------------------|
    * | byte 1 | | | STOREHOST | BORNHOST | TRANSACTION | TRANSACTION | MULTI_TAGS | COMPRESSED |
    * | byte 2 | | | | | | COMPRESSION_TYPE | COMPRESSION_TYPE | COMPRESSION_TYPE |
    * | byte 3 | | | | | | | | |
    * | byte 4 | | | | | | | | |
    */
    public final static int COMPRESSED_FLAG = 0x1;
    public final static int MULTI_TAGS_FLAG = 0x1

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论