RocketMQ4源码(二)普通消息发送

2023年 8月 7日 25.6k 0

前言

本章基于rocketmq4.6.0分析普通消息的发送流程,仅涉及producer和broker两个角色。

主要包括:

  • producer侧相关后台任务
  • producer侧发送消息
  • broker侧自动创建topic
  • broker侧写消息
  • broker侧构建consumequeue和index
  • 其他特性及HA后续再看。

    Producer

    继承关系、构造、属性

    MQAdmin:基础的管理功能,忽略。

    MQProducer:提供给客户端的api,下面列举了常用的方法。

    public interface MQProducer extends MQAdmin {
        // 启停
        void start() throws MQClientException;
        void shutdown();
    	// 获取topic下所有queue
        List fetchPublishMessageQueues(final String topic) throws MQClientException;
        // 发送普通消息
        SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException;
        // 指定queue发送消息
        SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
            RemotingException, MQBrokerException, InterruptedException;
    	// 发送oneway消息
        void sendOneway(final Message msg) throws MQClientException, RemotingException,
        InterruptedException;
        // 选择MessageQueue发送消息,顺序消息
        SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
        // 发送事务消息
        TransactionSendResult sendMessageInTransaction(final Message msg,
                                                       final Object arg) throws MQClientException;
    	// 批量发送消息
        SendResult send(final Collection msgs) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException;
        // rpc调用消息
        Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
        RemotingException, MQBrokerException, InterruptedException;
    }
    

    ClientConfig:客户端配置,consumer和producer都继承这个类。

    包含比如nameserver地址、客户端心跳间隔30s、从nameserver拉取路由间隔30s等等。

    DefaultMQProducer:包含两类属性

    • 配置:生产者特有配置,如发送消息超时时间、发送消息重试次数等等;
    • 组件:DefaultMQProducerImpl是生产者的底层实现,TraceDispatcher消息轨迹组件;
    public class DefaultMQProducer extends ClientConfig implements MQProducer {
        // 生产者组,事务消息
        private String producerGroup;
        // 自动创建topic模板topic TBW102
        private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
        // 默认创建topic队列数量 4
        private volatile int defaultTopicQueueNums = 4;
        // 发送消息超时时间 3s
        private int sendMsgTimeout = 3000;
        // 消息体超过多少,开启压缩,4kb
        private int compressMsgBodyOverHowmuch = 1024 * 4;
        // 同步发送重试次数 2
        private int retryTimesWhenSendFailed = 2;
        // 异步发送重试次数 2
        private int retryTimesWhenSendAsyncFailed = 2;
        // 当发送消息失败,是否重试其他broker false
        private boolean retryAnotherBrokerWhenNotStoreOK = false;
        // 消息最大4M
        private int maxMessageSize = 1024 * 1024 * 4;
        // 底层实际生产者实现
        protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
        // 消息轨迹组件
        private TraceDispatcher traceDispatcher = null;
    }
    

    DefaultMQProducer构造入参:

    • namespace:命名空间,默认空,逻辑隔离;
    • producerGroup:生产者组,事务消息需要;
    • rpcHook:远程调用钩子,目前只有acl权限实现;
    • enableMsgTrace:是否开启消息轨迹;
    • customizedTraceTopic:存储消息轨迹的topic;

    对于普通消息发送来说,这些属性都不重要,唯一重要的是底层DefaultMQProducerImpl。

    DefaultMQProducerImpl构造仅创建了一个线程池,用于异步发送消息。

    启动

    DefaultMQProducer#start:启动所有组件,主要看DefaultMQProducerImpl。

    DefaultMQProducerImpl#start(true):重点在于第一步,后两步省略

  • 开启底层通讯客户端;
  • 向所有broker发送一次心跳;
  • 开启定时任务,每1秒扫描超时请求(RequestFutureTable存储rpc的id和future);
  • DefaultMQProducerImpl#start(true):构建MQClientInstance并启动。

    构建MQClientInstance

    MQClientManager#getOrCreateMQClientInstance:

    对于一个clientId存在一个MQClientInstance实例。

    ClientConfig#buildMQClientId:默认instanceName=pid,对于用户而言,一个进程中只有一个MQClientInstance实例。

    producer和consumer底层通讯nettyclient都是同一个

    MQClientInstance构造创建了很多组件,其中和producer相关的主要是MQClientAPIImpl。

    MQClientAPIImpl实际维护底层通讯客户端NettyRemotingClient。

    启动MQClientInstance

    MQClientInstance#start:启动多个后台线程执行不同任务,这里和producer相关的是startScheduledTask部分。

    MQClientInstance#startScheduledTask:

    这里包含两个和生产者相关的任务,这两个任务也和消费者相关。

    路由刷新

    30s一次。

    MQClientInstance#updateTopicRouteInfoFromNameServer():

    收集consumer和producer需要用到的topic,循环每个topic从NameServer查询路由信息。

    每个MQProducerInner会在运行阶段,把发送过的topic存储到topicPublishInfoTable,这样定时路由刷新能收集到这些topic,持续更新路由。

    MQClientInstance#updateTopicRouteInfoFromNameServer:

  • 根据topic获取TopicRouteData,上一章看过数据结构(这里省略获取模板topic=TBW102的路由数据,作用是为了自动创建topic时,根据模板topic决定队列数量);
  • 存储brokerName->brokerAddrs的关系;
  • 更新所有MQProducerInner的发布路由表,即topic->TopicPublishInfo;
  • 关键在于nameserver的TopicRouteData模型转换为TopicPublishInfo模型。

    MQClientInstance#topicRouteData2TopicPublishInfo:

    上一章提到过,如果nameserver支持orderTopic,可以配死路由,这里优先取orderTopicConf,其实这段可以忽略。

    一般都会走第二段逻辑,匹配有写权限且master broker存活的QueueData,将QueueData转换为MessageQueue模型(后文简称MQ),放入TopicPublishInfo。

    MessageQueue模型如下,将QueueData中可写队列数量拆分为从0-n。

    清理下线broker

    30s一次。

    根据路由,MQClientInstance会维护brokerName->brokerId->brokerAddr的映射关系,即brokerAddrTable。

    MQClientInstance#cleanOfflineBroker:broker地址的映射关系,通过另外一个任务定期清理。

    MQClientInstance#isBrokerAddrExistInTopicRouteTable:

    根据所有topic的原始路由TopicRouteData,判断是否存在对应BrokerData,不存在将被移除。

    发送心跳给broker

    30s一次。

    MQClientInstance#sendHeartbeatToAllBrokerWithLock:客户端发送心跳给所有broker。

    MQClientInstance#sendHeartbeatToAllBroker:

  • 准备心跳数据包HeartbeatData;
  • 扫描broker地址table;
  • 跳过非master broker;
  • 发送心跳
  • MQClientInstance#prepareHeartbeatData:

    clientId一般是ip+pid,生产者数据只包含所有生产者组,所以生产者的心跳主要服务事务消息。

    Message

    对于生产者用户侧来说,一个消息对应一个Message。

    普通消息只需要关注topic和body属性。

    更多的扩展点在于properties这个扩展map。

    比如延迟消息,key=DELAY,value=延迟级别。

    比如broker存储相关,是否等待刷盘结束返回,key=WAIT,value=true/false,默认true。

    比如Tag过滤,key=Tags。

    比如设置业务key,key=KEYS,value=业务key。

    可以通过业务key定位到消息id。

    发送消息

    DefaultMQProducerImpl#sendDefaultImpl是发送消息的核心方法,大致分为四步:

  • 获取发布路由;
  • 根据发布路由选择Queue;
  • 发送;
  • 结果处理;
  • 获取发布路由

    DefaultMQProducerImpl#tryToFindTopicPublishInfo:

    根据topic查询TopicPublishInfo,如果发布路由还未获取到,手动从nameserver同步一次topic路由,和后台自动刷新逻辑一致。

    选择MessageQueue

    MQFaultStrategy#selectOneMessageQueue:

    MQFaultStrategy针对生产者的容错策略。

    目前只有一种容错措施,即LatencyFaultTolerance延迟容错。

    默认sendLatencyFaultEnable=false,所以不会开启延迟容错,简单了解一下。

    大致意思是,每次发送消息后,都记录发送消息的耗时,匹配耗时到latencyMax数组中的某一区间,再对应到notAvailableDuration对应下标代表的不可用时间。

    每次发送消息前,选择MQ,都尽量避开选择这些由于延迟而标记为不可用的broker。

    TopicPublishInfo#selectOneMessageQueue(String) :

    如果当前不在重试,走正常选择一个MQ;

    如果当前正在重试,即lastBrokerName非空,绕过本次处理失败broker的所有MQ,根据一个topic纬度自增int(sendWhichQueue)选择下一个MQ,如果找不到,走正常选择一个MQ。

    TopicPublishInfo#selectOneMessageQueue:

    正常选择一个MQ就是根据topic纬度自增int,每次轮询下一个MQ。

    发送

    DefaultMQProducerImpl#sendKernelImpl:

    Step1,对Message对象做一些特殊处理,比如生成messageId、body压缩等。

    DefaultMQProducerImpl#sendKernelImpl:

    Step2,构造业务请求头SendMessageRequestHeader。

    注意这里的defaultTopic(TBW102)和defaultTopicQueueNums(4),之后在broker侧用于自动创建topic。

    DefaultMQProducerImpl#sendKernelImpl:

    Step3,调用通讯层发送消息

    MQClientAPIImpl#sendMessage:

    封装RemotingCommand,将Message.body作为通讯请求体,注意请求编码为SEND_MESSAGE_V2(310)。

    结果处理

    MQClientAPIImpl#processSendResponse:

    通讯层对于broker返回的响应码只会处理四种,其他都属于Exception。

    这四种ResponseCode,都一一映射到SendStatus。

    SendResult里包含一些broker返回的数据:

    • uniqMsgId:生产者msgId;
    • responseHeader#msgId:broker生成的msgId;
    • responseHeader#queueOffset:broker-topic-queue纬度的逻辑offset,从0开始自增;

    DefaultMQProducerImpl#sendDefaultImpl:

    业务侧这四种都归类为NotStoreOk,默认是不支持重试的,直接返回用户SendResult。

    DefaultMQProducerImpl#sendDefaultImpl:

    业务侧对于异常的处理如下,continue代表可以重试;

    对于通讯异常、客户端异常支持重试;

    对于broker正常返回response,而不属于NotStoreOk的,有特殊处理,部分支持重试,部分不支持。

    Broker

    启动

    先来梳理一下Broker启动的大致流程。

    BrokerStartup,先创建BrokerController,然后启动BrokerController。

    BrokerStartup#createBrokerController:创建BrokerController分四步。

    • 读配置
    • 构建BrokerController,内部都是new对象
    • BrokerController初始化
    • 注册ShutdownHook

    BrokerController#initialize:初始化阶段,从磁盘加载数据到内存,构建通讯server、开启后台线程。

    BrokerController#start:启动阶段,执行各组件的start方法,开始向nameserver发送心跳,可以对外提供服务。

    通讯层

    BrokerController#registerProcessor:

    SEND_MESSAGE_V2由SendMessageProcessor处理。

    BrokerController#initialize:

    处理生产消息的业务线程池,默认核心和最大线程数都是1,队列大小为1万。

    同步刷盘sendMessageThreadPoolNums官方建议调优。

    异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁,调整Broker配置项useReentrantLockWhenPutMessage,默认为false;异步刷盘建议开启TransientStorePoolEnable;建议关闭transferMsgByHeap,提高拉消息效率;同步刷盘建议适当增大sendMessageThreadPoolNums,具体配置需要经过压测。

    新版本,sendMessageThreadPoolNums默认改为min(核数,4)了,基本就是4。

    SendMessageProcessor#sendMessage:

    Step1,msgCheck,对消息做一些校验,核心逻辑是自动创建topic。

    Step2,broker侧将Message转换为MessageExtBrokerInner。

    Step3,调用MessageStore存储消息,返回PutMessageResult。

    Step4,根据PutMessageResult,响应客户端。

    自动创建Topic

    TopicConfigManager

    Broker侧由TopicConfigManager管理所有topic配置,即TopicConfig。

    运行时,所有Topic配置表现为TopicConfigManager.topicConfigTable。

    模板topic=TBW102在TopicConfigManager构建时放入topicConfigTable。

    BrokerController#initialize阶段,首先会装载topic配置。

    TopicConfigManager#decode:将~/store/config/topics.json装载到TopicConfigManager内存。

    {
    	"dataVersion":{
    		"counter":17,
    		"stateVersion":0,
    		"timestamp":1686643918215
    	},
    	"topicConfigTable":{
    		"TopicTest1":{
    			"attributes":{},
    			"order":false,
    			"perm":6,
    			"readQueueNums":4,
    			"topicFilterType":"SINGLE_TAG",
    			"topicName":"TopicTest1",
    			"topicSysFlag":0,
    			"writeQueueNums":4
    		},
    		"TBW102":{
    			"attributes":{},
    			"order":false,
    			"perm":6,
    			"readQueueNums":8,
    			"topicFilterType":"SINGLE_TAG",
    			"topicName":"TBW102",
    			"topicSysFlag":0,
    			"writeQueueNums":8
    		}
    	}
    }
    

    TopicConfig

    TopicConfig代表每个topic在当前broker的配置,包含读写队列数量、权限等。

    自动创建Topic

    AbstractSendMessageProcessor#msgCheck:

    消息处理前,除了一些校验之外,如果TopicConfigManager中没有topic对应TopicConfig,走自动创建topic逻辑。

    TopicConfigManager#createTopicInSendMessageMethod:

    入参topic是需要自动创建的topic,defaultTopic是客户端传入模板topic(TBW102),clientDefaultTopicQueueNums是客户端传入队列数量(4)。

    Step1,构建TopicConfig。

    最终TopicConfig的队列数量取min(broker默认队列数量,client默认队列数量),默认为4。

    Step2,Topic配置写内存+写磁盘。

    Step3,向所有nameserver立即发送一次注册请求,目的是刷新topic路由。

    消息存储基础

    DefaultMessageStore

    DefaultMessageStore是broker的默认MessageStore实现,管理消息存储组件。

    MappedFile(通用服务)

    MappedFile,代表一个文件通过mmap映射到内存。

    使用mmap内存映射能够避免内核空间到用户空间的数据拷贝,但是要求文件定长。

    MappedFile#init(fileName fileSize):MappedFile构造时将磁盘文件mmap映射到内存buffer。

    MappedFileQueue(通用服务)

    MappedFileQueue管理多个通过mmap内存映射的文件MappedFile。

    比如commitlog,在磁盘上表现为commitlog目录下的多个文件。

    MappedFileQueue#load:

    在broker启动时,MappedFile往往会通过MappedFileQueue,从磁盘mmap映射到内存。

    比如commitlog加载commitlog路径下的所有1G文件映射到内存,按照文件名排序。

    写消息

    DefaultMessageStore#putMessage:写消息,调用底层CommitLog。

    CommitLog(主流程)

    众所周知,rocketmq所有消息,不区分topic和queue,都会写入逻辑上的一个commitlog文件。

    CommitLog#putMessage:

    • 获取最后一个MappedFile
    • 全局锁(putMessageLock),写消息到MappedFile
    • 刷盘
    • HA(本章忽略)

    Step1 找最后一个MappedFile

    对于CommitLog来说,每个文件1G,第一个文件是00000000000000000000,第二个文件是00000000001073741824,依此类推。

    MappedFileQueue#getLastMappedFile():只需要从MappedFile列表中取最后一个返回即可。

    Step2 写内存(pagecache)

    注意,这一步由CommitLog.putMessageLock保护,同一时间只有一个线程能写内存。

    MappedFile#appendMessagesInner:组装数据调用上层传入的AppendMessageCallback。

    AppendMessageCallback只有commitlog实现,即CommitLog.DefaultAppendMessageCallback。

    CommitLog在每个broker单例,每个CommitLog也只有一个DefaultAppendMessageCallback。

    DefaultAppendMessageCallback主要存储一些中间buffer。

    写commitlog是加锁(自旋或reentrant)单线程写,所以buffer都可以复用。

    CommitLog.DefaultAppendMessageCallback#doAppend:消息写入底层MappedByteBuffer。

    入参:

    • fileFromOffset:文件起始offset,其实就是文件名;
    • byteBuffer:mmap底层MappedByteBuffer;
    • maxBlank:文件剩余容量;
    • msgInner:消息;

    出参:AppendMessageResult。

    本质是将MessageExtBrokerInner写入bytebuffer(写pagecache),关注AppendMessageResult关键字段。

    wroteOffset,物理offset,代表这条消息相对于当前broker的所有commitlog文件中的写入位置。

    wroteOffset=当前文件起始offset(文件名)+当前文件已经写入的字节数(MappedFile#wrotePosition)。

    msgId,由客户端ip+port和物理offset两部分组成,没有记录在commitlog中,仅作为出参返回。

    这个msgId对于单broker节点来说是唯一的,因为物理offset在单个broker唯一。

    logicsOffset(queueOffset) ,逻辑offset,消息在broker-topic-queue纬度的偏移量,从0开始,每次处理完后增加1。

    消息大小+8字节,超出mmap文件剩余容量,处理逻辑如下:

  • 4个字节写入消息大小为剩余总容量
  • 4个字节写入BLANK_MAGIC_CODE魔数,标记这条消息是空
  • 返回END_OF_FILE
  • 读commitlog时,通过第一个4字节能判断这条消息的长度,第二个4字节能判断当前这条消息是空消息,还是正常消息。

    CommitLog#putMessage:如果写MappedFile内存返回END_OF_FILE,创建一个新的MappedFile继续写入。

    创建MappedFile步骤忽略,文件名=上一个文件名+1G,由AllocateMappedFileService实际操作。

    Step3 刷盘

    CommitLog#handleDiskFlush:根据不同的策略,执行刷盘。

    case1:broker同步刷盘,且消息设置WAIT=true,这是默认情况,提交GroupCommitRequest到GroupCommitService,等待刷盘结束,超时时间5s;request的入参是下一个写入的物理offset;

    case2:broker同步刷盘,但消息设置WAIT=false,支持客户端针对消息级别设置为异步刷盘,唤醒GroupCommitService执行刷盘;

    case3:broker异步刷盘,唤醒GroupCommitService执行刷盘;

    rocketmq里有很多ServiceThread都使用了多线程生产单线程消费的生产消费模型,GroupCommitService也一样。

    GroupCommitService中有两个队列。

    putRequest方法:requestsWrite支持多线程写入刷盘请求;

    swapRequests方法:当GroupCommitService被唤醒后,会执行swapRequests交换两个队列,单线程处理requestsRead中的刷盘请求;

    CommitLog.GroupCommitService#doCommit:

    同步刷盘和异步刷盘,最终都会调用MappedFileQueue#flush。

    之所以叫GroupCommit组提交,是因为实际刷盘offset可能会超出请求刷盘offset,支持对于多个offset一并刷盘。

    由于写pagecache线程(sendMessageThreadPoolNums)和GroupCommitService刷盘线程是两个线程,所以GroupCommitService可能实际在跑的时候已经有多个消息被写入pagecache了,可以一起刷盘。

    注意,写内存pagecache是被CommitLog.putMessageLock锁的,同一时间只有一个线程在写pagecache,但是提交刷盘请求并不在锁范围内。

    MappedFileQueue#flush:

    MappedFileQueue根据上次刷盘的全局commitlog物理offset(flushedWhere),找到单个MappedFile,执行MappedFile#flush,更新flushedWhere。

    MappedFile#flush:处理单个文件的刷盘。

    如果写内存的进度(wrotePosition)大于刷盘的进度(flushedPosition),调用force api执行刷盘,并更新刷盘进度。

    二级索引

    Reput

    消息存储完成后(不考虑HA),直接就能响应producer客户端了,但是发送消息还有一些其他任务需要处理。

    ReputMessageService是一个后台运行的线程,当commitlog的物理offset(pagecache)发生变化,这里会做一些事情。

    DefaultMessageStore.ReputMessageService#doReput:

    当reputFromOffset(ReputMessageService对于物理offset的处理进度)小于已经写入pagecache的最大物理offset,持续处理这些新写入的消息。

  • 查询新写入的buffer,即最大物理offset - reputFromOffset这段buffer;
  • 构建DispatchRequest;
  • 分发DispatchRequest;
  • MessageArrivingListener通知消息到达,消费相关忽略;
  • 找新写入的buffer

    CommitLog#getData:根据物理offset定位MappedFile,最后定位到具体的buffer。

    MappedFile#selectMappedBuffer(int):buffer中有三个参数,物理offset、buffer大小、buffer。

    构建DispatchRequest

    CommitLog#checkMessageAndReturnSize:

    首先呼应写pagecache那段逻辑,这里如果读到魔数是BLANK_MAGIC_CODE,代表当前buffer所属MappedFile已经处理完了。

    在ReputMessageService#doReput中会判断size=0,更新reputFromOffset,去找下一段buffer。

    接下来就是从buffer中读取一条完整的消息,构建成DispatchRequest。除了tagsCode是特殊处理的(取tag.hashCode),其他都是原来commitlog中的。

    分发DispatchRequest

    DefaultMessageStore#doDispatch:将DispatchRequest分发到所有CommitLogDispatcher处理。

    DefaultMessageStore构造时加入了两个CommitLogDispatcher。

    忽略BrokerController加入的CommitLogDispatcherCalcBitMap,和消费者SQL92过滤相关。

    构建ConsumeQueue

    众所周知,消息写入commitlog对consumer并不可见,需要写到ConsumeQueue才能被consumer感知。

    CommitLogDispatcherBuildConsumeQueue构建ConsumeQueue。

    物理存储上,ConsumeQueue由consumequeue/{topic}/{queueId}下的多个MappedFile组成,还是用了MappedFileQueue来管理这多个MappedFile。

    每个MappedFile默认存储30w条消息,每条20byte,共600wbyte。

    DefaultMessageStore#putMessagePositionInfo:

  • 获取或创建ConsumeQueue
  • ConsumeQueue处理DispatchRequest
  • DefaultMessageStore#findConsumeQueue:

    查询ConsumeQueue是否存在,如果不存在放入内存table,纬度是topic+queueId。

    ConsumeQueue#putMessagePositionInfo:构建ConsumeQueue的核心方法。

    第一,ConsumeQueue针对每条消息,用20个字节存储。

    8字节物理offset,4字节消息长度,8字节tag的hashCode。

    根据前8个字节能定位到CommitLog中的某个MappedFile的某段buffer,4字节就能取出一条消息。

    第二,这里MappedFile#appendMessage仅仅写入pagecache,并未flush。

    DefaultMessageStore.FlushConsumeQueueService:

    后台线程默认每1秒扫描所有ConsumeQueue,执行刷盘。

    具体流程省略,调用MappedFile里的FileChannel.force。

    构建IndexFile

    rocketmq支持根据messageId、业务key查询消息。

    这些查询通过一组index文件实现。

    CommitLogDispatcherBuildIndex构建index索引文件。

    IndexFile

    每个IndexFile也是个定长MappedFile,包含40字节头+500w个slot4字节+2kw个索引20字节。

    IndexHeader

    每个IndexFile开始有40个字节的IndexHeader,代表这段索引的元数据:

    • begin/endTimestamp:这段索引对应消息存储的开始时间/结束时间
    • begin/endPhyOffset:这段索引对应消息的物理offset开始/结束
    • hashSlotCount:暂时没什么用
    • indexCount:索引数量

    IndexService

    IndexService管理所有IndexFile。

    这里不用MappedFileQueue是因为IndexFile的文件名不是物理offset,而是时间戳。

    构建索引

    IndexService#buildIndex:获取IndexFile、写messageId索引、写业务key索引。

    IndexService#getAndCreateLastIndexFile:

    由于IndexFile是按照时间戳命名排序的,所以最后一个IndexFile就是时间最晚的一个。

    如果不存在IndexFile,用当前时间作为文件名创建一个IndexFile。

    IndexService#putKey:控制IndexFile的滚动,当IndexFile#putKey写满,创建新IndexFile继续写入。

    IndexFile分为三段:header、slot、index。

    IndexFile#putKey:写索引的核心逻辑。

    第一步,根据key(messageId、业务key)的hashCode模slot数量(500w),得到一个下标absSlotPos。

    第二步,从这个absSlotPos上读取4字节的slotValue,后面看是啥。

    第三步,计算这条消息,和这个index文件中最早的消息的时间差timeDiff。

    第四步,找到index写入位置=header长度+slot总长度+索引数量*20索引长度,写入index结构包含20字节:

    • 4字节key.hashCode;
    • 8字节物理offset,用于定位commitlog中的一条消息;
    • 4字节timeDiff,和当前index文件最早的消息的时间差;
    • 4字节slotValue;

    第五步,更新absSlotPos上的slotValue为当前索引总数。

    所以slotValue代表上一个落在同一个slot的索引在当前index文件中的索引数量计数。

    用链表法解决了哈希冲突。

    第六步,更新IndexFile的IndexHeader元数据,主要索引数量要+1。

    总得来说,IndexFile的存储结构还是hashtable+链表,只不过通过一个连续的字节数组来存储。

    比如先收到消息A,key为hello,后收到消息B,key为world。

    对于消息A,hello.hashCode%500w定位到一个slotX,slotX当前是0,代表还没有产生哈希冲突。

    消息A是这个indexFile中的第222个索引,消息A的索引位置=20+500w4+22220,写入20字节索引数据,索引A的slotValue是0,更新当前slotX为222。

    对于消息B,world.hashCode%500w定位到同一个slot,此时slotX是222,其实对应消息A的位置。

    消息B是这个indexFile中的第666个索引,消息A的索引位置=20+500w4+66620,写入20字节索引数据,索引B的slotValue是222,更新当前slotX为666。

    假设查询IndexFile中key=hello的索引:

  • hello.hashCode%500w,定位到slot
  • 通过20+500w4+66620,定位到world的index
  • world.hash不等于hello.hash,根据world的slotValue继续查
  • 通过20+500w4+22220,定位到hello的index
  • IndexService#queryOffset:

    但是实际查询时,并没有那么理想,因为IndexFile是通过时间戳命名的,在没有提供时间范围查询条件的情况下,需要从最新的IndexFile开始往前遍历。

    总结

    Producer

    每个客户端有一个MQClientInstance,持有通讯客户端NettyRemotingClient。

    对于producer而言有三个相对重要的后台任务,每30s执行一次:

  • 路由刷新:从NameServer获取TopicRouteData路由数据,存储brokerName->brokerAddrs的关系,更新所有MQProducerInner的发布路由表,即topic->TopicPublishInfo;
  • broker清理:扫描brokerName->brokerAddrs的关系,清理路由表中不存在的broker;
  • 心跳:发送HeartbeatData给所有broker,其中包含所有producer的group,服务于事务消息;
  • 对于producer,每条消息包含topic、properties、body三部分。

    其中properties包含众多特性,比如延迟消息、刷盘策略、TAG、KEY、messageId等。

    producer发送消息经过四个步骤:

  • 获取发布topic的路由TopicPublishInfo,如果不存在,从nameserver同步一次;
  • 选择MessageQueue,同一个topic下采用轮询的方式选择,支持延迟容错(默认未开启);
  • 发送消息给broker;
  • 处理响应结果,默认刷盘超时、HA相关超时不会重试,通讯异常、客户端异常支持重试;
  • Broker

    自动创建Topic

    topic在broker侧由TopicConfigManager管理,每个topic对应一个TopicConfig。

    进入消息处理流程前,需要确保topic存在,默认broker支持自动创建topic。

    消息存储

    DefaultMessageStore,broker的默认MessageStore实现,管理消息存储组件。

    MappedFile,代表一个文件通过mmap映射到内存。

    MappedFileQueue,管理多个通过mmap内存映射的文件MappedFile。

    写消息

    对于一个broker,所有消息都会写入逻辑上的一个commitlog,在物理上体现为commitlog目录下的N个1G的文件。每个文件名为当前文件的第一条消息的物理offset。

    写消息流程如下:

  • 获取commitlog目录下最后一个文件MappedFile;
  • 获取putMessageLock锁,单线程写MappedFile内存pagecache,释放putMessageLock锁;
  • GroupCommitService将commitlog刷盘,同步刷盘情况下,主线程最多等待5s;异步刷盘,主线程直接返回;
  • HA(本章忽略);
  • 默认broker处理客户端发送消息SEND_MESSAGE_V2只有一个线程,同步刷盘的情况下可调优。

    针对commitlog有两个offset需要区分:

    • logicsOffset/queueOffset:逻辑offset,消息在broker-topic-queue纬度的offset,从0开始,每个消息+1;
    • phyOffset/wroteOffset:物理offset,消息在broker的所有commitlog文件下的物理偏移量,与内存映射息息相关。可以认为是消息的一级索引,consumequeue和index通过物理offset+消息长度,从commitlog中定位到唯一一条消息;

    二级索引

    当消息写入commitlog后,ReputMessageService线程会分发这些新消息buffer。

    主要用于构建consumequeue和index两个索引。

    无论构建哪个索引,ReputMessageService线程仅仅会将数据写入pagecache。

    Consumequeue

    众所周知,消息写入commitlog对consumer并不可见,需要写到ConsumeQueue才能被consumer感知。

    物理存储上,ConsumeQueue由consumequeue/{topic}/{queueId}下的多个文件组成,还是用了MappedFileQueue来管理多个MappedFile。每个文件名是consumequeue的物理offset。

    每个MappedFile默认存储30w条consumequeue数据,每条20byte,共600wbyte。

    consumequeue包含8字节物理offset、4字节消息长度、8字节tagHashCode。

    FlushConsumeQueueService线程会将consumequeue刷盘。

    Index

    Index提供根据时间、消息key、消息id查询消息的能力。

    物理存储上,每个IndexFile包含40字节头+500w个slot4字节+2kw个索引20字节,也是个定长MappedFile。

    区别是,IndexFile的文件名并不是什么物理offset,而是时间戳。

    每个IndexFile分为三段:header、slot、index。

    header部分是这个IndexFile的元数据,比如这段索引对应消息存储的开始时间/结束时间,这段索引对应消息的物理offset开始/结束等等。

    slot部分默认包含500个槽,每个槽存储4字节的int,代表上次发生哈希冲突的key的相对位置(这个key在IndexFile中的计数)。

    index部分是实际的索引数据,每个索引包含20字节的结构。

    总得来说,IndexFile的存储结构还是hashtable+链表,只不过通过一个连续的字节数组来存储。

    欢迎大家评论或私信讨论问题。

    本文原创,未经许可不得转载。

    欢迎关注公众号【程序猿阿越】。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论