RocketMQ源码系列(9) — 消息消费之消费队列和索引

2023年 10月 4日 27.1k 0

消息主题

在 RocketMQ 中,消息通过主题(Topic)进行分类和管理。主题(Topic)是 RocketMQ 中消息发布和订阅的基本单位。它类似于一个消息分类的概念,可以理解为一个消息的容器或者标签。在生产者发送消息时,需要指定消息所属的主题;而消费者在订阅消息时,也需要指定订阅哪个主题的消息。

主题配置

TopicConfigManager 就是Broker管理 Topic 元数据的组件,可以看到它就是用一个 topicConfigTable 表来存储 topic 元数据,它的 key 就是对应的 topic 名称,值就是 TopicConfig。

TopicConfigManager 还用了一个 DataVersion 来存储当前数据的版本,topic 数据变更后,版本号也会变更,这个就可以用于多个地方同步数据时判断数据是否变更。

public class TopicConfigManager extends ConfigManager {
    // topic 配置表读写锁
    private transient final Lock topicConfigTableLock = new ReentrantLock();
    // topic 元数据 数据结构
    private final ConcurrentMap topicConfigTable = new ConcurrentHashMap(1024);
    // 数据版本号
    private final DataVersion dataVersion = new DataVersion();
}

TopicConfig 有如下属性:

  • topicName:Topic 名称
  • readQueueNums:读消息的队列数量,用于创建消息队列时的队列数
  • writeQueueNums:写消息的队列数量,用于创建消息队列时的队列数
  • perm:权限,默认具有 READ 和 WRITE 的权限
  • topicFilterType:Topic 过滤类型
  • topicSysFlag:是否为系统 Topic,Broker 在启动时会创建一些系统 Topic 来处理一些特殊的情况。
public class TopicConfig {
    private String topicName;
    private int readQueueNums;
    private int writeQueueNums;
    private int perm = PermName.PERM_READ | PermName.PERM_WRITE;
    private TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG;
    private int topicSysFlag = 0;
}

image.png

TopicConfigManager 是 BrokerController 中的管理组件,TopicConfigManager 创建时会初始化创建一些系统内置的topic,默认会创建如下一些内置的topic。

  • SELF_TEST_TOPIC:自我测试的Topic
  • TBW102:自动创建Topic相关
  • BenchmarkTest:进行压力测试相关的Topic
  • DefaultCluster:以Broker集群名称创建的topic,用于写入和消费Broker集群自身的一些元数据的topic
  • broker:以 Broker 组名称创建的 topic,对Broker组读写相关的Topic
  • OFFSET_MOVED_EVENT:偏移量移动事件topic
  • SCHEDULE_TOPIC_XXXX:调度相关topic
  • RMQ_SYS_TRACE_TOPIC:系统追踪相关topic
  • ClusterName_REPLY_TOPIC:集群重试相关

例如下面的一个代码块就是创建 TBW102,并添加为系统topic,然后设置了继承、可读、可写三种权限,读写队列数为 8 个;最后将这个 TopicConfig 放入 topicConfigTable 表中。

public class TopicConfigManager extends ConfigManager {
    public TopicConfigManager(BrokerController brokerController) {
        this.brokerController = brokerController;
        {
            if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                // 创建内置Topic: TBW102,自动创建Topic相关
                String topic = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
                TopicConfig topicConfig = new TopicConfig(topic);
                TopicValidator.addSystemTopic(topic);
                // 读写队列数,默认 8
                topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());
                topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig().getDefaultTopicQueueNums());
                // 三种权限都有
                int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
                topicConfig.setPerm(perm);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }
        }
        {......}
    }
}

TopicConfigManager 继承自 ConfigManager,ConfigManager 在前面的文章中已经分析过,主要就是用于将配置文件中的数据读取到内存和将内存数据持久化到配置文件,也就是说 ConfigManager 的子类都是基于文件的一种配置持久化方式。

那么 TopicConfigManager 要存储的配置数据就是 topicConfigTable,它的存储路径通过 configFilePath() 方法提供,可以看到默认的存储文件就是 /store/config/topics.json

@Override
public String configFilePath() {
    return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig()
            .getStorePathRootDir());
}

程序运行后,我们可以在 config 目录下看到很多类似的 json 配置文件,这些文件就是存储 RocketMQ 中用到的一些元数据。其中 .bak 结尾的文件是对应的备份文件。这些配置文件包含 Topic 配置数据、消费偏移量数据、订阅组数据等等,都是与消息消费相关的元数据文件,后面都会一一介绍。

image.png

创建主题

发送消息时如果Topic不存在,则会自动创建 Topic,我们直接来看 TopicConfigManager 中创建 Topic 的方法,看 topic 是如何被创建出来的。

在发送消息时创建topic的方法 createTopicInSendMessageMethod,有如下入参:

  • topic:要创建的 topic
  • defaultTopic:默认 topic,根据这个默认 topic 新建 topic,客户端发送过来的默认topic为 TBW102
  • remoteAddress:客户端机器地址
  • clientDefaultTopicQueueNums:默认的队列数,生产者发送过来的默认队列数为 4
  • topicSysFlag:是否为系统主题,默认 false

创建 topic 前先加同步锁,避免并发更新 topicConfigTable 表。接着判断 topicConfigTable 表中是否已经存在相同的 topic,存在则不创建。

不存在则获取 defaultTopic(TBW102),这个默认topic要可继承,然后就会根据这个默认topic来创建一个新的 TopicConfig,设置它的权限、队列数等。之后将其放入 topicConfigTable 表中,数据更新,数据版本 DataVersion 变更,然后将 TopicConfigManager 持久化到文件中。

最后,对于新建的 Topic,还要将元数据同步注册到所有的 NameServer 中,然后其它的 Broker 也能从 NameServer 拉取到最新的元数据信息。

public TopicConfig createTopicInSendMessageMethod(String topic, String defaultTopic,
            String remoteAddress, int clientDefaultTopicQueueNums, int topicSysFlag) {
        TopicConfig topicConfig = null;
        boolean createNew = false;
        if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                topicConfig = this.topicConfigTable.get(topic);
                if (topicConfig != null)
                    return topicConfig;

                // 获取默认 Topic
                TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
                if (defaultTopicConfig != null) {
                    if (PermName.isInherited(defaultTopicConfig.getPerm())) {
                        topicConfig = new TopicConfig(topic);
                        // 设置读写队列数
                        int queueNums = Math.min(clientDefaultTopicQueueNums, defaultTopicConfig.getWriteQueueNums());
                        topicConfig.setReadQueueNums(queueNums);
                        topicConfig.setWriteQueueNums(queueNums);

                        int perm = defaultTopicConfig.getPerm();
                        // 去掉继承权限
                        perm &= ~PermName.PERM_INHERIT;
                        topicConfig.setPerm(perm);
                        topicConfig.setTopicSysFlag(topicSysFlag);
                        topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
                    }
                }
                if (topicConfig != null) {
                    this.topicConfigTable.put(topic, topicConfig);
                    // 更新配置表版本号
                    this.dataVersion.nextVersion();
                    createNew = true;
                    // 持久化配置
                    this.persist();
                }
            } finally {
                this.topicConfigTableLock.unlock();
            }
        }
        // 创建新的 TopicConfig 后,向其它Broker注册(Oneway请求)
        if (createNew) {
            this.brokerController.registerBrokerAll(false, true, true);
        }
        return topicConfig;
    }

消息重投递

前面的文章我们分析了消息通过 DefaultMessageStore 顺序写入 CommitLog,再追加到 MappedFile 中,最后刷到磁盘文件中。那消息怎么被高效的消费呢,怎么记录消费的进度?

1、消息重投递服务

DefaultMessageStore 中有一个 ReputMessageService 线程服务,它主要就是用来将 CommitLog 中的消息重新投递到消费队列和索引文件中的。

DefaultMessageStore 启动时,会启动 ReputMessageService,并计算设置重新投递的起始偏移量。计算方式是取 CommitLog 的最小偏移量和消费队列逻辑最大物理偏移量的较大值。之后就会从这个位置开始将 CommitLog 的数据进行重新投递。

public void start() throws Exception {
    // 第一个 MappedFile 的 fileFromOffset
    long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
    for (ConcurrentMap maps : this.consumeQueueTable.values()) {
        for (ConsumeQueue logic : maps.values()) {
            if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
                maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
            }
        }
    }
    if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
        // 如果consumequeue文件被删了或损坏了、新的broker同步commitlog之后
        maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
    }
    this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
    this.reputMessageService.start();
}

2、消息重新投递

ReputMessageService 是一个后台线程,它的 run() 方法在不断的运行 doReput() 执行重新投递的任务。

重新投递的起始偏移量最小是从第一个 MappedFile 开始重新投递。从 CommitLog 获取数据(getData)只传入了一个起始偏移量,它会读取从起始偏移量到当前 MappedFile 的最后一条数据。只要 reputFromOffset 小于 CommitLog 的最大偏移量,也就是当前写入位置,就会一直读取,这种情况出现在有多个 MappedFile(commitlog) 时,一个文件读完了接着读下一个文件。

读取出来的数据就会去把一条条完整的数据读出来,得到一个分发请求 DispatchRequest,分发请求有消息,就会将其分发出去(doDispatch)。分发之后,如果是master节点且启用了长轮询机制,就会发一个消息到达的通知,然后进行一个消息多路分发的处理(默认关闭)。最后再更新 reputFromOffset 加上当前处理的消息长度。

如果读出来的分发请求没有消息了,则说明已经读到文件的末尾(空魔数),这是就会切换到下一个 MappedFile 继续读数据出来重新投递。

private void doReput() {
    // 初始化重新投递的起始偏移量为第一个 MappedFile 的 fileFromOffset
    if (this.reputFromOffset < commitLog.getMinOffset()) {
        this.reputFromOffset = commitLog.getMinOffset();
    }
    // reputFromOffset < 当前写入偏移量
    for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
        // 从 CommitLog 读取所有消息
        SelectMappedBufferResult result = commitLog.getData(reputFromOffset);
        if (result != null) {
            this.reputFromOffset = result.getStartOffset();
            // 读取每一条消息
            for (int readSize = 0; readSize  0) {
                    // 消息分发,分到到 ConsumeQueue 和 IndexService
                    DefaultMessageStore.this.doDispatch(dispatchRequest);
                    // 通知消息到达
                    if (BrokerRole.SLAVE != getMessageStoreConfig().getBrokerRole()
                            && brokerConfig.isLongPollingEnable() && messageArrivingListener != null) {
                        // 非 slave,启用了长轮询,消息到达监听器不为空
                        DefaultMessageStore.this.messageArrivingListener.arriving(....);
                        // 多路分发,分发到多个队列里
                        notifyMessageArrive4MultiQueue(dispatchRequest);
                    }
                    // 投递偏移量增加
                    this.reputFromOffset += size;
                    readSize += size;
                }
                // 返回成功而且 size = 0,说明是读到文件末尾的空消息了,表示这个文件读到末尾了
                else if (size == 0) {
                    // 切换到下一个 MappedFile 继续
                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                    // 读了整个结果,可以停止for循环了
                    readSize = result.getSize();
                }
            }
            // 释放资源
            result.release();
        } else {
            // 消息读完了,停止循环
            doNext = false;
        }
    }
}

checkMessageAndReturnSize 中,就是在按写入的顺序读取出一条完整的消息来,如果读出来的魔数是 BLANK_MAGIC_CODE,说明读取到文件的末尾了,返回的消息大小就是 0,就会切换到下一个文件继续读取。

image.png

如果是正常的消息,就会继续读取剩余的数据,然后做一些校验,比如消息体的 CRC 校验,消息长度的校验等,最后将读出来的消息封装到 DispatchRequest 中。

return new DispatchRequest(
                    topic, // 主题
                    queueId, // 队列ID
                    physicOffset, // 物理偏移量
                    totalSize, // 消息总大小
                    tagsCode, //tag
                    storeTimestamp, // 消息存储时间
                    queueOffset, // 队列偏移量
                    keys, // properties 中的 KEYS,用来表示消息的唯一性
                    uniqKey, // properties 中的 UNIQ_KEY,用来表示消息的唯一性
                    sysFlag, // 系统标识
                    preparedTransactionOffset, // 事务偏移量
                    propertiesMap // 属性
            );

3、消息分发

读取出来的一条完整消息 DispatchRequest,会被分发(doDispatch)出去。分发处理的接口是 CommitLogDispatcher,DefaultMessageStore 创建时默认创建了两个分发处理器 CommitLogDispatcherBuildConsumeQueueCommitLogDispatcherBuildIndex,看名字就知道是分发出去构建消息队列和构建索引。

public class DefaultMessageStore {
    private final LinkedList dispatcherList;
    
    public DefaultMessageStore(...){
        this.dispatcherList = new LinkedList();
        this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
        this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
    }
    
    public void doDispatch(DispatchRequest req) {
        for (CommitLogDispatcher dispatcher : this.dispatcherList) {
            dispatcher.dispatch(req);
        }
    }
}

消费队列

创建消费队列

1、创建消费队列

消息分发到消费队列,会先获取主题(topic)和队列(queueId)所在的消费队列 ConsumeQueue,然后将消息写入消费队列中。

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
    public void dispatch(DispatchRequest request) {
        DefaultMessageStore.this.putMessagePositionInfo(request);
    }
}

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    // 根据topic和queueId查找消费队列,没有则新建
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    // 向消费队列写入消息
    cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));
}

根据 topic 和 queueId 查找消费队列,就是从 consumeQueueTable 表获取主题下的队列,如果没有就会创建一个新的Map放入 consumeQueueTable。接着从队列表里取出 ConsumeQueue,如果没有同样会创建一个新的 ConsumeQueue,从这里可以看出消费队列默认的存储路径是 ~/store/consumequeue/

public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    ConcurrentMap map = consumeQueueTable.get(topic);
    if (null == map) {
        map = new ConcurrentHashMap(128);
        consumeQueueTable.putIfAbsent(topic, newMap);
    }
    ConsumeQueue logic = map.get(queueId);
    if (null == logic) {
        // 创建消费队列
        logic = new ConsumeQueue(
                topic,  queueId,
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // consumequeue
                this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(), // 消费队列文件大小,映射 MappedFile
                this);
        map.putIfAbsent(queueId, logic);
    }
    return logic;
}

而 ConsumeQueue 对应的 MappedFile 文件大小是通过计算得来的,ConsumeQueue 存储是以20字节为一个存储单元(CQ_STORE_UNIT_SIZE),MappedFile 文件大小也必须是 20 的倍数,默认情况下一个 ConsumeQueue 文件可以存 30万 个单位数据,那么一个 MappedFile 文件大小就是 300000*20 。

// ConsumeQueue file size,default is 30W
private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
    
public int getMappedFileSizeConsumeQueue() {
    int factor = (int) Math.ceil(this.mappedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
    return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);
}

2、ConsumeQueue

来看下 ConsumeQueue 的数据结构,可以看出,ConsumeQueue 代表的是 topic 下的一个队列,前面分析过 Topic 元数据,一个 Topic 下有多个读写队列,消息写入的时候会指定写入的 topic 和 queueId。

ConsumeQueue 和 CommitLog 是类似的,也是基于文件的存储,存储路径是 ~/consumequeue/{topic}/{queueId},ConsumeQueue 一个文件默认最多存30万个单元20字节的数据,一个文件写满了之后就会继续写下一个文件。所以也会创建一个 MappedFileQueue 来管理队列下的文件,每个文件会映射为一个 MappedFile,而文件的名称同样是这个文件的起始偏移量。

public class ConsumeQueue {
    // 存储单元,一个单元20字节
    public static final int CQ_STORE_UNIT_SIZE = 20;

    // MappedFile 队列,ConsumeQueue 也是基于 MappedFile 内存映射来实现的
    private final MappedFileQueue mappedFileQueue;
    // 所属的topic
    private final String topic;
    // topic 里的 queueId
    private final int queueId;
    // 索引缓冲区
    private final ByteBuffer byteBufferIndex;

    // ConsumeQueue 存储路径
    private final String storePath;
    // MappedFile 文件大小
    private final int mappedFileSize;
    // 最大物理偏移量
    private long maxPhysicOffset = -1;
    // 最小逻辑偏移量
    private volatile long minLogicOffset = 0;

    public ConsumeQueue(String topic, int queueId, String storePath, int mappedFileSize, DefaultMessageStore defaultMessageStore) {
        this.storePath = storePath;
        this.mappedFileSize = mappedFileSize;
        this.defaultMessageStore = defaultMessageStore;
        // 所属 topic 和 queueId
        this.topic = topic;
        this.queueId = queueId;
        // ~/consumequeue/{topic}/{queueId}
        String queueDir = this.storePath + File.separator + topic + File.separator + queueId;
        // 创建 MappedFileQueue
        this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
        // 一个存储单元的缓冲区,20字节
        this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
    }
}

image.png

写入消息位置信息

再来看消息分发到 ConsumeQueue 的过程,首先会判断磁盘是否可以写入数据,比如磁盘快没有剩余空间时,就不能写入数据了。然后就是在写入消息位置信息,写入成功后,更新存储检查点中的物理消息时间戳(commitlog)和逻辑消息时间戳(consumequeue)。

public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) {
    final int maxRetries = 30;
    // 当前是否可写入,会有一个线程检测磁盘是否满了,是否可以写入数据
    boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
    for (int i = 0; i < maxRetries && canWrite; i++) {
        // tag
        long tagsCode = request.getTagsCode();
        
        // 写入消息位置信息
        boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(), request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
        if (result) {
            // slave 节点或者启用了 DLedger 技术,更新存储检查点钟物理消息时间戳(commitlog)
            if (getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE || getMessageStoreConfig().isEnableDLegerCommitLog()) {
                getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
            }
            // 更新逻辑消息时间戳(consumequeue)
            getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
            return;
        } else {....}
    }
}

接着看写入消息位置,ConsumeQueue 是以 20 字节为一个存储单元,所以它用了一块 20字节的缓冲区 byteBufferIndex 来写数据,从这可以知道这20字节存储了消息的3个属性:

  • offset:消息的物理偏移量,表示的是 commitlog 中的物理偏移量
  • size:消息的总大小
  • tagsCode:消息tag hash 码

入参中有一个 cqOffset,这个表示消息在这个 topic 队列下的索引(0,1,2...),这个索引从0开始,每写入一条消息自增1。一条消息在 consumequeue 中占 20 字节,那么接下来要写入位置的偏移量 expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE,然后可以通过 expectLogicOffset 找到要写入的 MappedFile。

如果是第一次创建的 MappedFile 且没有写入数据,会按20字节为单元预填充空白数据(0),起到一个磁盘预热的目的。然后更新了当前最大的物理偏移量 maxPhysicOffset,可以看出它表示的是消息在 CommitLog 中的偏移量。最后再将位置信息写入 MappedFile。

private boolean putMessagePositionInfo(final long offset, // 消息物理偏移量
                                       final int size, // 消息大小
                                       final long tagsCode, // 消息 tags
                                       final long cqOffset // 这是从 topicQueueTable 来的,表示的是这个 topic 队列下的索引) {
    // 索引缓冲区
    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    this.byteBufferIndex.putLong(offset);
    this.byteBufferIndex.putInt(size);
    this.byteBufferIndex.putLong(tagsCode);
    // 写入位置偏移量
    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
    // 通过偏移量来定位 MappedFile,满了会自动创建新的 MappedFile
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);

    if (mappedFile != null) {
        // 第一次创建,还没写入数据
        if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
            // 对于一个新的 MappedFile ,当前最小逻辑偏移量
            this.minLogicOffset = expectLogicOffset;
            // flushedWhere
            this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
            // committedWhere
            this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            // 新的 MappedFile 文件预填入空白消息
            this.fillPreBlank(mappedFile, expectLogicOffset);
        }
        // 最大的消息物理偏移量
        this.maxPhysicOffset = offset + size;
        // 追加消息
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}

在 CommitLog 追加消息的时候,会从 topicQueueTable 表获取 topic-queueId 对应的一个偏移量 queueOffset,可以看到这个表的偏移量是从0开始自增,也就是说它其实表示的是这个 topic 队列下的消息数量或者下标索引。然后可以看到消息中会写入这个偏移量 queueOffset,以及消息的物理偏移量。

public class CommitLog {
    // topic-queueId -> offset
    protected HashMap topicQueueTable = new HashMap(1024);

    class DefaultAppendMessageCallback {
        public AppendMessageResult doAppend(...){
            // Record ConsumeQueue information
            // key: topic-queueId
            String key = putMessageContext.getTopicQueueTableKey();
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
            // 如果是 null 就重置为 0
            if (null == queueOffset) {
                queueOffset = 0L;
                CommitLog.this.topicQueueTable.put(key, queueOffset);
            }

            // 前面的消息
            int pos = 4 + 4 + 4 + 4 + 4;
            // 6 QUEUEOFFSET  queue 对应的偏移量
            preEncodeBuffer.putLong(pos, queueOffset);
            pos += 8;
            // 7 PHYSICALOFFSET  物理偏移量,此时 byteBuffer 还未写入消息,表示的是当前写入消息的起始物理偏移量
            preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());

            ......
            // The next update ConsumeQueue information
            CommitLog.this.topicQueueTable.put(key, ++queueOffset);
        }
    }
}

到这里我们就知道了 ConsumeQueue 的结构,DefaultMessageStore 中有一个消费队列表 consumeQueueTable 存储了各个 topic 下的队列,每个队列对应着一个 ConsumeQueue,一个topic有几个队列是由 TopicConfig 决定的,一般在发送消息时会随机指定一个队列。

ConsumeQueue 以20字节为一个单元,存储了消息的位置信息,包括消息在 CommitLog 中的物理偏移量(8字节)、消息总大小(4字节),以及TAG信息(8字节)。ConsumeQueue 也是使用 MappedFileQueue 来管理磁盘文件,每个 consumequeue 会映射为一个 MappedFile。一个 consumequeue 文件默认最多存储30万条消息的位置信息,所以一个文件写满了之后会创建一个新的文件继续写入。

image.png

消息队列刷盘

DefaultMessageStore 会启动一个刷消费队列的线程服务来定时将消费队列的数据刷到磁盘中。

这个线程会每隔1秒执行一次刷盘,但每个消费队列至少有2个缓存页的脏数据才会刷盘,但每隔60秒,还会强制刷盘一次。

class FlushConsumeQueueService extends ServiceThread {
    private long lastFlushTimestamp = 0;
    
    private void doFlush(int retryTimes) {
        // 刷新缓冲区,默认至少刷 2 页
        int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
        
        // 刷消费队列间隔时间 60秒
        int flushConsumeQueueThoroughInterval = getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
        long currentTimeMillis = System.currentTimeMillis();
        // 每隔60秒强制刷盘
        if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
            this.lastFlushTimestamp = currentTimeMillis;
            flushConsumeQueueLeastPages = 0;
        }

        ConcurrentMap tables = DefaultMessageStore.this.consumeQueueTable;
        // 消费队列刷盘
        for (ConcurrentMap maps : tables.values()) {
            for (ConsumeQueue cq : maps.values()) {
                cq.flush(flushConsumeQueueLeastPages);
            }
        }
    }
    public void run() {
        while (!this.isStopped()) {
            // 刷队列间隔时间 1秒
            int interval = getMessageStoreConfig().getFlushIntervalConsumeQueue();
            this.waitForRunning(interval);
            this.doFlush(1);
        }
        // 强制刷盘
        this.doFlush(RETRY_TIMES_OVER);
    }
}

消息索引

索引服务

1、IndexService

CommitLog 消息分发构建索引,默认配置是启用了消息索引的,所以会调用 IndexService 来构建索引。

class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
    @Override
    public void dispatch(DispatchRequest request) {
        if (messageStoreConfig.isMessageIndexEnable()) {
            DefaultMessageStore.this.indexService.buildIndex(request);
        }
    }
}

IndexService 也是在 DefaultMessageStore 创建时初始化的,从构造中可以知道,索引的设计有一个 hash slot(hash槽)的概念,默认 hash slot 的数量是500万个,然后还有一个索引数量,默认是 500万*4=2000万个。然后可能是由多个索引文件 IndexFile 构成,索引文件的存储路径默认是 ~/store/index

public class IndexService {
    private static final int MAX_TRY_IDX_CREATE = 3;
    private final DefaultMessageStore defaultMessageStore;

    private final int hashSlotNum;
    private final int indexNum;
    private final String storePath;
    private final ArrayList indexFileList = new ArrayList();
    // 读写锁
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    public IndexService(final DefaultMessageStore store) {
        this.defaultMessageStore = store;
        // 最大槽位数量(5000000)
        this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
        // 索引数量(5000000 * 4)
        this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
        // 存储路径 ~/store/index
        this.storePath = StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
    }
}

消息分发出来构建索引,核心就两步,获取或者创建索引文件 IndexFile,然后构建索引写入索引文件。

public void buildIndex(DispatchRequest req) {
    // 创建和获取索引文件 IndexFile
    IndexFile indexFile = retryGetAndCreateIndexFile();
    // 消息创建时会去设置消息的唯一ID
    indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
}

2、创建索引文件

获取或者创建索引文件时,会先获取 indexFileList 列表中最后一个 IndexFile,如果 IndexFile 存在且还未写满,说明这个 IndexFile 还可以继续写入。如果 indexFileList 中没有索引文件或者 IndexFile 已经写满了,就需要创建一个新的 IndexFile。

创建新的索引文件时,文件名称是以当前时间戳来命名,创建好新的 IndexFile 后,还会将上一个 IndexFile 刷盘,进去会发现 IndexFile 也是基于 MappedFile 来做文件映射,所以也需要将数据flush到磁盘。

IndexFile 的刷盘不像 ConsumeQueue 的刷盘,是由一个线程服务来定时刷,IndexFile 的刷盘是在自己写满之后,创建了一个新的 IndexFile 的时候才去执行刷盘操作。

public IndexFile getAndCreateLastIndexFile() {
    IndexFile indexFile = null;
    // 上一个索引文件
    IndexFile prevIndexFile = null;
    // 上一个文件的末尾物理偏移量
    long lastUpdateEndPhyOffset = 0;
    // 上一个文件的索引更新时间
    long lastUpdateIndexTimestamp = 0;

    if (!this.indexFileList.isEmpty()) {
        IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
        if (!tmp.isWriteFull()) { // 索引文件未写满
            indexFile = tmp;
        } else { // 索引文件写满了
            lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
            lastUpdateIndexTimestamp = tmp.getEndTimestamp();
            prevIndexFile = tmp;
        }
    }

    if (indexFile == null) {
        // 文件路径:${user.home}/store/index/timestamp
        String fileName = this.storePath + File.separator + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
        // 创建索引文件
        indexFile = new IndexFile(fileName, this.hashSlotNum, this.indexNum,
                lastUpdateEndPhyOffset, lastUpdateIndexTimestamp);
        this.indexFileList.add(indexFile);

        if (indexFile != null && prevIndexFile != null) {
            // 刷新上一个索引文件
            IndexService.this.flush(prevIndexFile);
        }
    }
    return indexFile;
}

而写入索引,其实就是调用 IndexFile 写入索引信息,而 IndexFile 写满了之后,会自动创建一个新的 IndexFile 来写入索引信息。

private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
    for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
        // 索引文件写满之后会返回 false,就创建一个新的 IndexFile,继续写入
        indexFile = retryGetAndCreateIndexFile();
        if (null == indexFile) {
            return null;
        }
        ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
    }
    return indexFile;
}

索引文件

1、IndexFile

我们初看索引文件 IndexFile,已经很熟悉了,它也是通过 MappedFile 来映射磁盘文件,然后也会拿到一块内存映射缓冲区 MappedByteBuffer 来读写数据。

索引文件的名称是以当前时间来命名的,而文件的大小是计算出来的,可以看到 IndexFile 分为三个部分:文件头、哈希槽、索引。

  • 文件头:固定占 40字节
  • 哈希槽:hash slot 默认是 500W 个,一个占 4字节,哈希槽占 500W * 4字节 ≈ 19MB
  • 索引:索引默认是 500W * 4 个,一个占 20字节,索引部分占 500W * 4 * 20字节 ≈ 381MB

也就是说一个index索引文件的数据由3部分组成,大小加起来约等于 400MB,然后基于此去创建 MappedFile。

哈希槽指向索引,主要解决 hash 冲突的问题,类似于 HashMap 的结构
一个索引固定占 20字节,索引有一个指针指向下一个索引,形成一个链表

public class IndexFile {
    private static final int hashSlotSize = 4;
    private static final int indexSize = 20;
    private static final int invalidIndex = 0;

    // hash 槽位数量
    private final int hashSlotNum;
    // 索引数量
    private final int indexNum;
    // 一个索引文件映射一个 MappedFile
    private final MappedFile mappedFile;
    // 内存映射区域
    private final MappedByteBuffer mappedByteBuffer;
    // 索引头
    private final IndexHeader indexHeader;

    public IndexFile(final String fileName, int hashSlotNum, int indexNum,
                     final long endPhyOffset, final long endTimestamp) throws IOException {
        // 文件大小             
        int fileTotalSize = IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
        // 创建内存映射文件
        this.mappedFile = new MappedFile(fileName, fileTotalSize);
        this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
        
        this.hashSlotNum = hashSlotNum;
        this.indexNum = indexNum;

        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        // 文件头
        this.indexHeader = new IndexHeader(byteBuffer);
        if (endPhyOffset > 0) {
            this.indexHeader.setBeginPhyOffset(endPhyOffset);
            this.indexHeader.setEndPhyOffset(endPhyOffset);
        }
        if (endTimestamp > 0) {
            this.indexHeader.setBeginTimestamp(endTimestamp);
            this.indexHeader.setEndTimestamp(endTimestamp);
        }
    }
}

2、IndexHeader

索引头 IndexHeader 固定占 40 字节,会存储在索引文件的开头,用来存一些重要的元数据,通过后文代码的分析,可以知道这些头信息的作用是什么。

  • beginTimestamp:占8字节,第一条消息的存储时间(storeTimestamp),不会变更
  • endTimestamp:占8字节,当前写入消息的存储时间(storeTimestamp),写入消息后变更
  • beginPhyOffset:占8字节,第一条消息的物理偏移量(physicOffset),不会变更
  • endPhyOffset:占8字节,当前写入消息的物理偏移量(physicOffset),写入消息后变更
  • hashSlotCount:占4字节,已使用的哈希槽数量,每使用一个哈希槽就会自增
  • indexCount:占4字节,已使用的索引数量,初始值为 1,每使用一个索引就会自增
public class IndexHeader {
    private final ByteBuffer byteBuffer;
    // 共40个字节
    // 开始时间 8字节
    private final AtomicLong beginTimestamp = new AtomicLong(0);
    // 结束时间 8字节
    private final AtomicLong endTimestamp = new AtomicLong(0);
    // 索引开始物理偏移量 8字节
    private final AtomicLong beginPhyOffset = new AtomicLong(0);
    // 索引结束物理偏移量 8字节
    private final AtomicLong endPhyOffset = new AtomicLong(0);
    // 哈希槽数量 4字节
    private final AtomicInteger hashSlotCount = new AtomicInteger(0);
    // 索引数量 4字节
    private final AtomicInteger indexCount = new AtomicInteger(1);
}

3、构建索引

我们来看索引是怎么构建,这样也就能了解索引的存储结构是怎样的。

  • 首先根据消息的唯一Key计算一个 hash 值,然后模运算除以哈希槽数量,等到哈希槽位置,也就是说先锁定某一个哈希槽。然后计算出哈希槽的偏移量 absSlotPos = 请求头大小 + 哈希槽索引 * 哈希槽大小。

  • 接着读取哈希槽里的值 slotValue(一个哈希槽4字节),然后计算了当前消息与这个文件的第一条消息的存储时间差 timeDiff。

    这里存时间差,而不是当前时间戳,我猜是因为时间差只占4个字节,而时间戳要占8个字节,主要是为了节省存储空间。

  • 再计算索引写入的偏移量 absIndexPos = 请求头大小 + 哈希槽数量 * 哈希槽大小 + 已添加索引数量 * 索引大小。

    因为 indexCount 初始值为 1,所以索引是从下标1开始写入数据的,下标0的数据库是没有写入索引数据的。

  • 接着就从 absIndexPos 开始写入索引信息,一个索引信息由四部分组成,共20字节:

    • keyHash:4字节,消息唯一Key的hash值
    • phyOffset:8字节,消息的物理偏移量
    • timeDiff:4字节,消息存储的时间差
    • slotValue:4字节,写入 hash slot 里的索引值
  • 索引写入后,再向哈希槽里写入上一次的索引数量,其实这个就代表了索引的下标(0,1,2...),通过这个下标能计算出索引的偏移量。

  • 最后就是在更新索引头信息,这个前面介绍 IndexHeader 已经说过。

  • public boolean putKey(final String key, // 消息唯一Key
                          final long phyOffset, // 消息物理偏移量
                          final long storeTimestamp // 消息存储时间
    ) {
        // key hash
        int keyHash = indexKeyHashMethod(key);
        // hash slot
        int slotPos = keyHash % this.hashSlotNum;
        // hash slot 偏移量,前面 500W显示slot,计算出slot的偏移量
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    
        try {
            // hash slot 里存的索引值(没有值就返回 0)
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            // 消息存储时间到第一个存储的时间的差值
            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
            // 单位秒
            timeDiff = timeDiff / 1000;
            // 索引绝对位置,索引是顺序写入
            int absIndexPos = IndexHeader.INDEX_HEADER_SIZE 
                            + this.hashSlotNum * hashSlotSize
                            + this.indexHeader.getIndexCount() * indexSize;
    
            // 写入索引
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); // 写入 hash slot 里的索引(上一个索引)
    
            // 更新哈希槽
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
    
            // 第一个索引,写入开始时的偏移量,和开始存储的时间
            if (this.indexHeader.getIndexCount() = this.indexNum;  
    }
    

    4、索引查找物理偏移量

    再来看 IndexFile 是如果通过索引查询消息物理偏移量的,入参中 phyOffsets 就是要返回的消息物理偏移量,最多会读取 maxNum 条消息的物理偏移量。拿到物理偏移量之后,就可以从 CommitLog 读取到消息。

    同样的,先根据消息的 key 计算 hash 值,然后算出在哪个哈希槽,得到槽的偏移量 absSlotPos,然后读取这个哈希槽里面的值 slotValue,这个值就是指向索引的下标。

    有了索引下标,就可以计算出要读取的索引位置,再读出这条索引的信息。先根据消息的时间差和索引文件的存储开始时间计算出消息的存储时间,然后判断这条消息是否在查询的时间范围内(begin、end),如果是的才会添加到 phyOffsets 中。

    接着判断存储时间如果小于开始时间,或者索引中记录的上一个索引下标和当前索引下标相同,说明已经没有可读的索引了,否则,将继续读取上一个索引下标位置的索引数据。

    public void selectPhyOffset(final List phyOffsets,
    final String key, // 消息唯一Key
    final int maxNum, // 最多读取多少条消息的物理偏移量
    final long begin, // 查询消息存储的开始时间
    final long end // 查询消息存储的结束时间
    ) {
    if (this.mappedFile.hold()) {
    // key hash
    int keyHash = indexKeyHashMethod(key);
    // hash slot
    int slotPos = keyHash % this.hashSlotNum;
    // hash slot 位置
    int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

    // 先定位到 hash slot,再遍历其指向的索引链表
    int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
    // 从这个索引开始读,一个hash slot 有多个索引
    for (int nextIndexToRead = slotValue; ; ) {
    if (phyOffsets.size() >= maxNum) {
    break;
    }
    // 索引开始的位置
    int absIndexPos = IndexHeader.INDEX_HEADER_SIZE
    + this.hashSlotNum * hashSlotSize
    + nextIndexToRead * indexSize;

    // 读取索引
    int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
    long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
    long timeDiff = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
    int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

    timeDiff *= 1000L;
    // 得到存储时的时间
    long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
    // 判断时间范围
    boolean timeMatched = (timeRead >= begin) && (timeRead

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论