RocketMQ4源码(四)生产者特性

2023年 8月 14日 60.7k 0

前言

本章基于rocketmq4.6.0,分析两个生产者特性:延迟消息和事务消息。

一、延迟消息

案例

延迟消息的生产和消费,对于Producer和Consumer实例并没有特殊的操作。

仅仅通过设置Message延迟级别,即Message#setDelayTimeLevel,发送延迟消息。

DefaultMQProducer producer = new DefaultMQProducer("producer-group-1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message(
    "MyTestTopicA", // topic
    "TagA", // tag
    ("Hello Delay " + i).getBytes(StandardCharsets.UTF_8) // body
);
msg.setDelayTimeLevel(3); // 延迟级别=3 10s
SendResult sendResult = producer.send(msg); // 发送消息

Message用properties.DELAY存储延迟级别。

默认延迟级别有18个,在broker侧可配置,即MessageStoreConfig#messageDelayLevel。

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

延迟消息主要逻辑都在broker侧,直接分析broker即可。

写CommitLog

CommitLog#putMessage:写commitlog前替换原始topic和queue

  • 替换message.topic=SCHEDULE_TOPIC_XXXX,message.queueId=delayLevel-1;
  • 设置properties.REAL_TOPIC=原topic,properties.REAL_QID=原queueId;
  • 写ConsumeQueue

    CommitLog#checkMessageAndReturnSize:

    构建DispatchRequest时,原消息的tag.hashCode会被修改为投递时间,即存储时间+延迟时间。

    进而影响consumequeue的20字节结构中的最后8字节tagHash变为投递时间。

    消费延迟Topic

    ScheduleMessageService负责消费当前broker管理的topic=SCHEDULE_TOPIC_XXXX的所有queue。

    初始化

    ScheduleMessageService#parseDelayLevel加载延迟级别和延迟时长的映射关系到delayLevelTable。

    延迟topic下所有queue的消费进度都存储在delayOffset.json,加载到内存offsetTable。

    启动

    ScheduleMessageService#start:

    开启一个线程(jdk的timer)做两类事情。

    一个是消费延迟topic,一个是每隔10s持久化消费进度到delayOffset.json。

    由于延迟级别-1=queueId,所以为每个延迟级别开启一个DeliverDelayedMessageTimerTask。

    DeliverDelayedMessageTimerTask是个不可变对象。

    delayLevel对应延迟级别,也对应消费queue;

    offset对应从哪个逻辑offset开始消费。

    消费

    DeliverDelayedMessageTimerTask#executeOnTimeup:

    消费延迟topic=SCHEDULE_TOPIC_XXXX,queueId=delayLevel-1。

    消费逻辑和broker处理PullMessage逻辑一致,根据逻辑offset可以定位到一段consumequeue的buffer,然后循环每个consumequeue记录处理。

    注意这里tagsCode不是原始消息的tag的哈希,在写ConsumeQueue阶段已经修改为目标投递时间戳。

    对于每条consumequeue记录,计算目标投递时间和当前时间的差countdown毫秒。

    如果countdown大于0,代表还未到目标时间,即延迟topic的当前queue中,后续消息都还未到时间,因为延迟级别都相同。

    重新提交一个DeliverDelayedMessageTimerTask,延迟时间为countdown,下次执行即可投递真实消息,更新内存中的消费进度,当前任务直接结束。

    如果countdown小于等于0,代表到了目标时间,当前消息可以给消费者消费。

    根据consumequeue的commitlog物理offset+消息长度,从commitlog中读到topic=SCHEDULE_TOPIC_XXXX延迟消息。

    将延迟消息转换为真实消息,写入commitlog(写内存+异步刷盘),最终写入真实的consumequeue,对消费者可见。

    DeliverDelayedMessageTimerTask#messageTimeup:将延迟消息转换为真实消息

    使用REAL_TOPIC和REAL_QID替换topic和queueId;

    移除properties.DELAY;

    设置真实tag的hashCode;

    设置为异步刷盘;

    CommitLog和ConsumeQueue的变化关系大致如下:

    其他情况:

    如果写CommitLog失败,延迟10s再次执行Task。

    如果当前delayLevel下没有新消息到达,延迟100ms再次执行Task。

    二、事务消息

    案例

    案例来源于rocketmq-example,对于事务消息,逻辑都在producer和broker中。

    首先定义一个TransactionListener,有两个方法:

  • executeLocalTransaction:执行本地事务;
  • checkLocalTransaction:响应broker回查本地事务执行状态;
  • public class TransactionListenerImpl implements TransactionListener {
        private AtomicInteger transactionIndex = new AtomicInteger(0);
    
        private ConcurrentHashMap localTrans = new ConcurrentHashMap();
    
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            int value = transactionIndex.getAndIncrement();
            int status = value % 3;
            localTrans.put(msg.getTransactionId(), status);
            return LocalTransactionState.UNKNOW;
        }
    
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            Integer status = localTrans.get(msg.getTransactionId());
            if (null != status) {
                switch (status) {
                    case 0:
                        return LocalTransactionState.UNKNOW;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                    default:
                        return LocalTransactionState.COMMIT_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
    

    生产者:

    使用TransactionMQProducer,注入TransactionListener实现。

    调用TransactionMQProducer#sendMessageInTransaction(String,Object)发送事务消息,这里第二个入参,对应TransactionListener#executeLocalTransaction执行本地事务的第二个入参,一般是业务模型。

    public static void main(String[] args) {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("my_tx_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.setTransactionListener(transactionListener);
        producer.start();
        Message msg = 
            new Message("TopicTest1234", "TagA", "hello".getBytes(StandardCharsets.UTF_8));
        producer.sendMessageInTransaction(msg, null);
    }
    

    TransactionMQProducer

    TransactionMQProducer继承普通生产者DefaultMQProducer。

    相较于普通生产者多了两个属性:

  • transactionListener,用户代码逻辑,负责执行本地事务和本地事务状态回查;
  • executorService,线程池,负责执行本地事务状态回查;
  • 默认本地事务状态回查线程池采用1线程2000队列。

    TransactionMQProducer#sendMessageInTransaction:

    这是事务消息生产者相较于普通消息生产者唯一新增的api,发送事务消息。

    其底层还是调用内部生产实现DefaultMQProducerImpl。

    producer发送半消息

    TransactionMQProducer#sendMessageInTransaction:

    producer侧,发送半消息主要是设置Message的属性。

  • 不支持延迟消息;
  • 设置properties.TRAN_MSG=true,代表这是一条事务半消息;
  • 设置properties.PGROUP=生产者组,用于消息回查;
  • 后面send方法基本同普通消息,除了请求头的sysFlag设置事务PREPARE标志位。

    borker接收半消息

    接着,半消息来到broker。

    SendMessageProcessor#sendMessage:

    将客户端请求转换为MessageExeBrokerInner后,

    识别到properties.TRAN_MSG=true,进入TransactionalMessageService#prepareMessage。

    TransactionalMessageServiceImpl#prepareMessage:

    TransactionalMessageBridge#putHalfMessage:

    事务消息和延迟消息类似,将topic替换为RMQ_SYS_TRANS_HALF_TOPIC,queueId替换为0。

    真实topic存储到REAL_TOPIC和REAL_QID。

    区别在于事务消息并没有对ConsumeQueue做特殊处理,即tagHash不会变。

    producer本地事务

    DefaultMQProducerImpl#sendMessageInTransaction:

    如果发送broker异常,直接抛出MQClientException。

    如果发送broker成功,且SEND_OK,执行本地事务TransactionListener#executeLocalTransaction,如果本地事务执行抛出异常,state=UNKNOWN,localException非null;

    如果发送broker成功,且非SEND_OK,包括刷盘超时、同步SLAVE超时、SLAVE不可用,不执行本地事务,state=ROLLBACK;

    Message的transactionId就是生产者生成的messageId。

    如果用户代码返回null,state=UNKNOWN。

    producer提交/回滚/未知

    DefaultMQProducerImpl#sendMessageInTransaction:

    这是生产者的最后一步,不会抛出异常到用户代码。

    即如果本地事务执行成功,生产者不用关心后续流程,由broker回查来确认最终事务状态。

    DefaultMQProducerImpl#endTransaction:

    首先通过broker返回的SendResult#offsetMsgId解析得到MessageId。

    在第二章说到,broker会生成一个messageId给producer,这个messageId包含两部分:

  • broker的ip和port,即MessageId#address;
  • 消息在这个broker的commitlog的物理offset;
  • EndTransactionRequestHeader包含多个重要属性:

  • commitLogOffset:commitlog物理offset;
  • tranStateTableOffset:逻辑offset;
  • producerGroup:生产组;
  • commitOrRollback:二阶段提交/回滚/未知标志;
  • MQClientAPIImpl#endTransactionOneway:

    最终EndTransactionRequestHeader以oneway请求的方式发送给broker。

    对生产者来说,sendMessageInTransaction api主要耗时在发送半消息和执行本地事务,二阶段结果发送broker只需要提交到io线程即可。

    broker提交/回滚/未知

    对于END_TRANSACTION请求,broker用单独的业务线程池处理。

    线程数:8+2*核数,队列10w。

    EndTransactionProcessor#processRequest对于本地事务UNKNWON状态,不做任何处理。

    EndTransactionProcessor#processRequest提交分为四步:

  • commitMessage,根据commitlog的物理offset,找到HALF消息;
  • 组装真实消息MessageExtBrokerInner;
  • 调用MessageStore将真实消息写入commitlog(最终写入consumequeue被消费者感知到);
  • 删除HALF消息;
  • EndTransactionProcessor#processRequest回滚分为两步:

  • rollbackMessage,虽然方法名不同,但是和提交一致,根据commitlog的物理offset,找到HALF消息;
  • 删除HALF消息;
  • 综上,提交HALF消息,涉及一个真实消息和删除HALF消息;回滚HALF消息,仅涉及删除HALF消息。

    并且整个流程没有对异常做任何处理,无论哪一步失败,都将由消息回查流程处理。

    真实消息

    EndTransactionProcessor#endMessageTransaction:

    提交真实消息,和延迟消息类似,用真实topic和queue替换HALF消息的topic和queue。

    删除HALF消息(Op消息)

    为什么要删除HALF消息?

    众所周知,broker会启动一个定时任务,判断HALF消息是否已经被二阶段commit或rollback。

    如果HALF消息还未收到二阶段指令,那么需要回查。

    如果不删除HALF消息,那么会一直回查。

    TransactionalMessageServiceImpl#deletePrepareMessage:

    但rocketmq不支持删除消息,因为所有消息相关文件都是顺序写的。

    所以对于二阶段提交/回滚,实际底层是发送一个Op消息。

    如果HALF消息有匹配的OP消息,代表HALF消息已经成功处理(收到二阶段指令);

    反之代表HALF消息还未处理完成,可能需要回查producer。

    TransactionalMessageBridge#putOpMessage:

    通过HALF消息,反向构建一个MessageQueue模型,topic=HALF消息topic,queueId=0。

    TransactionalMessageBridge#addRemoveTagInTransactionOp:

    和客户端一样,构建了一个Message。

    topic=RMQ_SYS_TRANS_OP_HALF_TOPIC(Op消息topic);

    tag=d;

    body=HALF消息的逻辑offset。

    TransactionalMessageBridge#writeOp:

    针对每个HALF消息topic的MessageQueue,映射到一个OP消息topic的MessageQueue。

    实际上,HALF消息topic的队列数量为1,所以这里opQueueMap只有一个kv对。

    最终还是将消息转换为MessaageExtBrokerInner写入MessageStore。

    broker回查

    如果在一定时间内,未收到producer发来的二阶段明确指示,即commit/rollback,broker会回查producer本地事务执行情况。

    TransactionalMessageCheckService线程每隔60秒进行一次回查逻辑。

    TransactionalMessageServiceImpl#check:

    首先找到HALF_TOPIC下所有MessageQueue,前面broker接收半消息,queueId写死0,所以只有一个MessageQueue。

    查询HALF&OP消费进度

    TransactionalMessageServiceImpl#check:

    HALF_TOPIC的queue可以映射到一个OP_TOPIC的queue。

    查询系统消费组CID_RMQ_SYS_TRANS对于两个queue的消费进度。

    拉取Op消息32条

    TransactionalMessageServiceImpl#check:

    接下来,根据OP_TOPIC的消费进度,拉取OP消息,组装两个集合:

  • doneOpOffset:op消息的body(op对应half消息逻辑offset)小于half消费进度,即op消息已处理;
  • removeMap:op消息的body(op对应half消息逻辑offset)大于等于half消费进度,代表这些half消息,已经收到二阶段明确指令(commit/rollback);
  • TransactionalMessageServiceImpl#fillOpRemoveMap:

    拉取op消息,一次最多32条op,比较op消息的body和当前half消息消费进度的大小。

    op消息的body就是commit/rollback指令对应的half消息的逻辑offset。

    如果op消息的body(queueOffset)小于half消费进度(miniOffset),加入doneOpOffset,即这些op消息已经没有用了(client重复或超时commit或rollback一个half消息,见后面HALF回查),对应half消息已经处理了。

    如果op消息的body(queueOffset)大于等于half消费进度(miniOffset),加入removeMap,这些op消息mapping到了half消息,即这些half消息已经被commit/rollback。

    通过fillOpRemoveMap,removeMap中是所有二阶段处理完成的half消息,反之不在removeMap里的就是还未收到二阶段指令的half消息。

    如下图,假设现在half和op消费进度都是0,half消息已经发送了5条,而op消息只收到4条,可以看到half逻辑offset=3的这条消息还未收到二阶段指令。

    消费HALF消息

    TransactionalMessageServiceImpl#check:

    while-true循环处理half消息。

    如果half消息进度在removeMap中,代表收到二阶段指令,直接将op逻辑offset加入doneOpOffset中;

    如果half消息进度不在removeMap中,可能需要执行回查,为什么说是可能,因为op一次才拉32条,不一定足够匹配当前half消息。

    TransactionalMessageServiceImpl#check:

    Step1,根据half消费进度(逻辑offset)查询消息。

    如果没有找到,代表还没有新的half消息到达,直接退出。

    Step2,校验回查次数。

    如果half消息回查超过15次,投递到topic=TRANS_CHECK_MAXTIME_TOPIC。

    如果half消息回查未超过15次,needDiscard将Message的properties的回查次数+1。

    Step3,如果这条half消息存储时间大于本轮检查的开始时间,直接退出。

    Step4,如果这条half消息,还未到超时时间6s,直接退出,后面的half消息也不可能到超时时间。

    Step5,

    如果需要回查,将half消息重新写入HALF_TOPIC,重入次数+1,调用producer回查事务状态;

    如果不需要回查,fillOpRemoveMap再拉32条op消息,进行下一轮判断。

    HALF回查

    回写half消息

    在向producer发送回查请求前,先要将当前的half消息重新写一份到commitlog。

    TransactionalMessageServiceImpl#putBackHalfMsgQueue:

    注意,新half消息的逻辑offset和commitlog物理offset会填充到老half消息中。

    这意味着客户端收到的回查请求,一定是针对新half消息的,老half消息将被消费,老op消息将被视为失效后续进入doneOpOffset。

    TransactionalMessageBridge#renewHalfMessageInner:根据老half消息,生成新half消息。

    重新写入HALF_TOPIC的消息的生成时间戳是不会变的。

    (TransactionalMessageServiceImpl#needDiscard校验回查次数的时候,已经将回查次数+1了)

    发送CHECK_TRANSACTION_STATE请求

    AbstractTransactionalMessageCheckListener#resolveHalfMsg:

    half消息重新写入commitlog之后,才会异步发送回查请求给producer。

    线程池:2核心线程,5最大线程,2000队列,CallerRuns拒绝策略。

    AbstractTransactionalMessageCheckListener#sendCheckMessage:

    broker根据生产组,找连接broker的一个生产者实例,发送CHECK_TRANSACTION_STATE请求。

    其中包含transactionId(生产者messageId),producer可以用transactionId确认本地事务情况。

    更新HALF&OP消费进度

    当没有更新的half消息或half头部消息未超时,结束本轮校验,更新两个topic的消费进度。

    half和op提交的情况非常相似:

  • half匹配到op,正常收到producer二阶段commit/rollback;
  • half未匹配到op且half超时,创建新half,老half在本轮提交,而后续收到的op会被视为无效也会被提交;
  • TransactionalMessageServiceImpl#calculateOpOffset:

    需要注意的是,op消息并非只要匹配到half消息/half超时,op消息offset就可以提交到该位置。

    op提交只能提交到第一个未被half匹配的op消息。

    假设目前一共收到4条half消息,收到3个commit/rollback,关系如下。

    其中half2尚未收到对应op且还未超时,于是结束本轮循环(即使half3能匹配op1,half的消费进度同样也不能跳)。

    doneOffset中会包含0和2,此时opOffset只能前进到1,而不是3。

    如果opOffset前进到3,下一次half3消息就匹配不到op消息了。

    producer回查

    ClientRemotingProcessor#checkTransactionState:

    producer收到CHECK_TRANSACTION_STATE。

    DefaultMQProducerImpl#checkTransactionState:

    提交回查请求到checkExecutor。(默认1线程2000队列)

    执行用户TransactionListener#checkLocalTransaction。

    最终还是走END_TRANSACTION响应broker。

    总结

    延迟消息

    producer通过设置Message.properties的DELAY为指定延迟级别,发送延迟消息。

    默认broker支持18个延迟级别,从1s到2h,可以通过设置messageDelayLevel修改。

    broker收到消息后,判断DELAY非空,识别为延迟消息。

    写入commitlog前,修改原始消息:

  • 替换message.topic=SCHEDULE_TOPIC_XXXX,message.queueId=delayLevel-1;
  • 设置properties.REAL_TOPIC=原topic,properties.REAL_QID=原queueId;
  • commitlog写入后,DispatchRequest中的tag的hashCode会被修改为目标投递时间。

    最终ConsumeQueue中的最后8字节变为目标投递时间戳。

    broker针对每个delayLevel开启一个DeliverDelayedMessageTimerTask消费queueId=delayLevel-1中的延迟消息。

    需要注意的是,在4.x版本,所有延迟级别的task都使用同一个线程,用jdk的Timer实现。

    Task拉取consumequeue记录后,进行判断:

  • 如果consumequeue为空,延迟100ms再执行task;
  • 如果consumequeue非空,判断consumequeue记录中的目标投递时间戳;
  • 如果目标投递时间戳小于等于当前时间,则写入真实消息(从REAL_TOPIC和REAL_QID提取真实topic和queue)到commitlog,继续处理下一个consumequeue记录;
  • 如果目标投递时间戳大于当前时间,则延迟(目标时间-当前时间)再执行task;
  • SCHEDULE_TOPIC_XXXX的消费进度存储在delayOffset.json中,每隔10s对内存中的消费进度进行持久化(和消费使用同一个timer线程)。

    事务消息

    producer需要指定生产者组和TransactionListener实现。

    TransactionListener负责本地事务执行和本地事务状态查询。

    正常流程

    producer端使用TransactionMQProducer#sendMessageInTransaction发送事务消息。

    Step1:producer发送half消息给broker。

    设置properties.TRAN_MSG=true,代表这是一条事务半消息;

    设置properties.PGROUP=生产者组,用于消息回查;

    producer等待broker响应。

    Step2:broker收到half消息,判断TRAN_MSG=true,处理事务消息。

    broker将topic替换为RMQ_SYS_TRANS_HALF_TOPIC,queueId替换为0;真实topic存储到REAL_TOPIC和REAL_QID。

    最终存储到commitlog,构建consumequeue。

    Step3:producer收到broker响应,执行本地事务。

    如果发送broker异常,直接抛出MQClientException,sendMessageInTransaction结束。

    如果发送broker成功,但返回刷盘超时、同步SLAVE超时、SLAVE不可用,不执行本地事务,state=ROLLBACK;

    如果发送broker成功,且SEND_OK,执行本地事务TransactionListener#executeLocalTransaction

  • 如果本地事务执行抛出异常,state=UNKNOWN;
  • 如果本地事务执行返回null,state=UNKNOWN;
  • 其他,state=本地事务执行返回;
  • Step4:producer发送END_TRANSACTION,告知broker本地事务执行状态state。

    END_TRANSACTION请求是个oneway请求,且try-catch所有异常,如果有问题由broker补偿回查。

    END_TRANSACTION绝大多数参数来源于half消息响应,包含half消息的物理offset、逻辑offset等。

    至此TransactionMQProducer#sendMessageInTransaction结束。

    broker处理END_TRANSACTION

    对于state=UNKNOWN不处理;

    对于state=ROLLBACK,根据物理offset查询half消息,写op消息;

    对于state=COMMIT,根据物理offset查询half消息,写真实消息(REAL_TOPIC和REAL_QID替换),写op消息;

    op消息相关属性:

  • topic=RMQ_SYS_TRANS_OP_HALF_TOPIC;
  • queueId=0;(一个half消息queue映射到一个op消息queue,而half消息只有一个queue)
  • tag=d;
  • body=half消息的逻辑offset。
  • op消息用于标记一个half消息是否收到二阶段commit/rollback,

    如果half消息超时未收到对应op消息会由broker发起回查。

    回查

    TransactionalMessageCheckService线程每隔60秒进行一次回查逻辑。

    broker以group=CID_RMQ_SYS_TRANS的名义,消费half消息(RMQ_SYS_TRANS_HALF_TOPIC)和op消息(RMQ_SYS_TRANS_OP_HALF_TOPIC)。

    Step1,查询消费进度。

    分别查询half消息和op消息的消费进度。

    Step2,首次拉取32条op消息。

    如果op消息对应half消息(body中的half消息逻辑offset)已经被处理(body中的half消息offset小于当前half消费进度),op消息被忽略,加入doneOpOffset集合;

    如果op消息对应half消息还未处理,op消息加入removeMap,key是half消息逻辑offset,value是op消息逻辑offset;

    Step3,消费HALF消息。

    根据当前half消费进度,进行while-true循环。

    如果half消息在removeMap中,代表half消息已经匹配到op,收到二阶段指令,不用回查,将half消息加入doneOpOffset集合;

    如果half消息不在removeMap中,可能代表half消息还未收到二阶段指令:

  • 根据half消息逻辑offset,查询half消息,如果为空,代表还没有新的half消息到达,直接Step5;
  • 判断half消息是否超过回查次数15,如果超出,将half消息投递到TRANS_CHECK_MAXTIME_TOPIC,继续Step3;
  • 判断half消息的存储时间,如果还未到6s超时,直接Step5,因为后续half消息都未超时;
  • 如果需要回查Step4后继续Step3,如果不回查同拉取更多op消息Step3;
  • Step4,消息回查。

    broker侧:

  • 重新写一份half消息到commitlog,老half消息将被消费,新half消息需要匹配新的op消息;
  • 异步发送CHECK_TRANSACTION_STATE给producer组内的producer实例,请求中是新half消息 ;
  • producer侧:回调TransactionListener查询本地事务状态,返回END_TRANSACTION请求给broker(同上)。

    Step5,上述while-true执行half消息消费完毕,提交half和op消息的消费进度。

    half和op提交offset的情况非常相似:

  • half匹配到op,正常收到producer二阶段commit/rollback;
  • half未匹配到op且half超时,创建新half,老half在本轮提交,而后续收到的op会被视为无效也会被提交;
  • 需要注意的是,op消息并非只要匹配到half消息/half超时,op消息offset就可以提交到该位置。

    op提交只能提交到第一个未被half匹配的op消息。

    欢迎大家评论或私信讨论问题。

    本文原创,未经许可不得转载。

    欢迎关注公众号【程序猿阿越】。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论