前言
本章基于rocketmq4.6.0,分析两个生产者特性:延迟消息和事务消息。
一、延迟消息
案例
延迟消息的生产和消费,对于Producer和Consumer实例并没有特殊的操作。
仅仅通过设置Message延迟级别,即Message#setDelayTimeLevel,发送延迟消息。
DefaultMQProducer producer = new DefaultMQProducer("producer-group-1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message(
"MyTestTopicA", // topic
"TagA", // tag
("Hello Delay " + i).getBytes(StandardCharsets.UTF_8) // body
);
msg.setDelayTimeLevel(3); // 延迟级别=3 10s
SendResult sendResult = producer.send(msg); // 发送消息
Message用properties.DELAY存储延迟级别。
默认延迟级别有18个,在broker侧可配置,即MessageStoreConfig#messageDelayLevel。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
延迟消息主要逻辑都在broker侧,直接分析broker即可。
写CommitLog
CommitLog#putMessage:写commitlog前替换原始topic和queue
写ConsumeQueue
CommitLog#checkMessageAndReturnSize:
构建DispatchRequest时,原消息的tag.hashCode会被修改为投递时间,即存储时间+延迟时间。
进而影响consumequeue的20字节结构中的最后8字节tagHash变为投递时间。
消费延迟Topic
ScheduleMessageService负责消费当前broker管理的topic=SCHEDULE_TOPIC_XXXX的所有queue。
初始化
ScheduleMessageService#parseDelayLevel加载延迟级别和延迟时长的映射关系到delayLevelTable。
延迟topic下所有queue的消费进度都存储在delayOffset.json,加载到内存offsetTable。
启动
ScheduleMessageService#start:
开启一个线程(jdk的timer)做两类事情。
一个是消费延迟topic,一个是每隔10s持久化消费进度到delayOffset.json。
由于延迟级别-1=queueId,所以为每个延迟级别开启一个DeliverDelayedMessageTimerTask。
DeliverDelayedMessageTimerTask是个不可变对象。
delayLevel对应延迟级别,也对应消费queue;
offset对应从哪个逻辑offset开始消费。
消费
DeliverDelayedMessageTimerTask#executeOnTimeup:
消费延迟topic=SCHEDULE_TOPIC_XXXX,queueId=delayLevel-1。
消费逻辑和broker处理PullMessage逻辑一致,根据逻辑offset可以定位到一段consumequeue的buffer,然后循环每个consumequeue记录处理。
注意这里tagsCode不是原始消息的tag的哈希,在写ConsumeQueue阶段已经修改为目标投递时间戳。
对于每条consumequeue记录,计算目标投递时间和当前时间的差countdown毫秒。
如果countdown大于0,代表还未到目标时间,即延迟topic的当前queue中,后续消息都还未到时间,因为延迟级别都相同。
重新提交一个DeliverDelayedMessageTimerTask,延迟时间为countdown,下次执行即可投递真实消息,更新内存中的消费进度,当前任务直接结束。
如果countdown小于等于0,代表到了目标时间,当前消息可以给消费者消费。
根据consumequeue的commitlog物理offset+消息长度,从commitlog中读到topic=SCHEDULE_TOPIC_XXXX延迟消息。
将延迟消息转换为真实消息,写入commitlog(写内存+异步刷盘),最终写入真实的consumequeue,对消费者可见。
DeliverDelayedMessageTimerTask#messageTimeup:将延迟消息转换为真实消息
使用REAL_TOPIC和REAL_QID替换topic和queueId;
移除properties.DELAY;
设置真实tag的hashCode;
设置为异步刷盘;
CommitLog和ConsumeQueue的变化关系大致如下:
其他情况:
如果写CommitLog失败,延迟10s再次执行Task。
如果当前delayLevel下没有新消息到达,延迟100ms再次执行Task。
二、事务消息
案例
案例来源于rocketmq-example,对于事务消息,逻辑都在producer和broker中。
首先定义一个TransactionListener,有两个方法:
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap localTrans = new ConcurrentHashMap();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
生产者:
使用TransactionMQProducer,注入TransactionListener实现。
调用TransactionMQProducer#sendMessageInTransaction(String,Object)发送事务消息,这里第二个入参,对应TransactionListener#executeLocalTransaction执行本地事务的第二个入参,一般是业务模型。
public static void main(String[] args) {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("my_tx_group");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(transactionListener);
producer.start();
Message msg =
new Message("TopicTest1234", "TagA", "hello".getBytes(StandardCharsets.UTF_8));
producer.sendMessageInTransaction(msg, null);
}
TransactionMQProducer
TransactionMQProducer继承普通生产者DefaultMQProducer。
相较于普通生产者多了两个属性:
默认本地事务状态回查线程池采用1线程2000队列。
TransactionMQProducer#sendMessageInTransaction:
这是事务消息生产者相较于普通消息生产者唯一新增的api,发送事务消息。
其底层还是调用内部生产实现DefaultMQProducerImpl。
producer发送半消息
TransactionMQProducer#sendMessageInTransaction:
producer侧,发送半消息主要是设置Message的属性。
后面send方法基本同普通消息,除了请求头的sysFlag设置事务PREPARE标志位。
borker接收半消息
接着,半消息来到broker。
SendMessageProcessor#sendMessage:
将客户端请求转换为MessageExeBrokerInner后,
识别到properties.TRAN_MSG=true,进入TransactionalMessageService#prepareMessage。
TransactionalMessageServiceImpl#prepareMessage:
TransactionalMessageBridge#putHalfMessage:
事务消息和延迟消息类似,将topic替换为RMQ_SYS_TRANS_HALF_TOPIC,queueId替换为0。
真实topic存储到REAL_TOPIC和REAL_QID。
区别在于事务消息并没有对ConsumeQueue做特殊处理,即tagHash不会变。
producer本地事务
DefaultMQProducerImpl#sendMessageInTransaction:
如果发送broker异常,直接抛出MQClientException。
如果发送broker成功,且SEND_OK,执行本地事务TransactionListener#executeLocalTransaction,如果本地事务执行抛出异常,state=UNKNOWN,localException非null;
如果发送broker成功,且非SEND_OK,包括刷盘超时、同步SLAVE超时、SLAVE不可用,不执行本地事务,state=ROLLBACK;
Message的transactionId就是生产者生成的messageId。
如果用户代码返回null,state=UNKNOWN。
producer提交/回滚/未知
DefaultMQProducerImpl#sendMessageInTransaction:
这是生产者的最后一步,不会抛出异常到用户代码。
即如果本地事务执行成功,生产者不用关心后续流程,由broker回查来确认最终事务状态。
DefaultMQProducerImpl#endTransaction:
首先通过broker返回的SendResult#offsetMsgId解析得到MessageId。
在第二章说到,broker会生成一个messageId给producer,这个messageId包含两部分:
EndTransactionRequestHeader包含多个重要属性:
MQClientAPIImpl#endTransactionOneway:
最终EndTransactionRequestHeader以oneway请求的方式发送给broker。
对生产者来说,sendMessageInTransaction api主要耗时在发送半消息和执行本地事务,二阶段结果发送broker只需要提交到io线程即可。
broker提交/回滚/未知
对于END_TRANSACTION请求,broker用单独的业务线程池处理。
线程数:8+2*核数,队列10w。
EndTransactionProcessor#processRequest对于本地事务UNKNWON状态,不做任何处理。
EndTransactionProcessor#processRequest提交分为四步:
EndTransactionProcessor#processRequest回滚分为两步:
综上,提交HALF消息,涉及一个真实消息和删除HALF消息;回滚HALF消息,仅涉及删除HALF消息。
并且整个流程没有对异常做任何处理,无论哪一步失败,都将由消息回查流程处理。
真实消息
EndTransactionProcessor#endMessageTransaction:
提交真实消息,和延迟消息类似,用真实topic和queue替换HALF消息的topic和queue。
删除HALF消息(Op消息)
为什么要删除HALF消息?
众所周知,broker会启动一个定时任务,判断HALF消息是否已经被二阶段commit或rollback。
如果HALF消息还未收到二阶段指令,那么需要回查。
如果不删除HALF消息,那么会一直回查。
TransactionalMessageServiceImpl#deletePrepareMessage:
但rocketmq不支持删除消息,因为所有消息相关文件都是顺序写的。
所以对于二阶段提交/回滚,实际底层是发送一个Op消息。
如果HALF消息有匹配的OP消息,代表HALF消息已经成功处理(收到二阶段指令);
反之代表HALF消息还未处理完成,可能需要回查producer。
TransactionalMessageBridge#putOpMessage:
通过HALF消息,反向构建一个MessageQueue模型,topic=HALF消息topic,queueId=0。
TransactionalMessageBridge#addRemoveTagInTransactionOp:
和客户端一样,构建了一个Message。
topic=RMQ_SYS_TRANS_OP_HALF_TOPIC(Op消息topic);
tag=d;
body=HALF消息的逻辑offset。
TransactionalMessageBridge#writeOp:
针对每个HALF消息topic的MessageQueue,映射到一个OP消息topic的MessageQueue。
实际上,HALF消息topic的队列数量为1,所以这里opQueueMap只有一个kv对。
最终还是将消息转换为MessaageExtBrokerInner写入MessageStore。
broker回查
如果在一定时间内,未收到producer发来的二阶段明确指示,即commit/rollback,broker会回查producer本地事务执行情况。
TransactionalMessageCheckService线程每隔60秒进行一次回查逻辑。
TransactionalMessageServiceImpl#check:
首先找到HALF_TOPIC下所有MessageQueue,前面broker接收半消息,queueId写死0,所以只有一个MessageQueue。
查询HALF&OP消费进度
TransactionalMessageServiceImpl#check:
HALF_TOPIC的queue可以映射到一个OP_TOPIC的queue。
查询系统消费组CID_RMQ_SYS_TRANS对于两个queue的消费进度。
拉取Op消息32条
TransactionalMessageServiceImpl#check:
接下来,根据OP_TOPIC的消费进度,拉取OP消息,组装两个集合:
TransactionalMessageServiceImpl#fillOpRemoveMap:
拉取op消息,一次最多32条op,比较op消息的body和当前half消息消费进度的大小。
op消息的body就是commit/rollback指令对应的half消息的逻辑offset。
如果op消息的body(queueOffset)小于half消费进度(miniOffset),加入doneOpOffset,即这些op消息已经没有用了(client重复或超时commit或rollback一个half消息,见后面HALF回查),对应half消息已经处理了。
如果op消息的body(queueOffset)大于等于half消费进度(miniOffset),加入removeMap,这些op消息mapping到了half消息,即这些half消息已经被commit/rollback。
通过fillOpRemoveMap,removeMap中是所有二阶段处理完成的half消息,反之不在removeMap里的就是还未收到二阶段指令的half消息。
如下图,假设现在half和op消费进度都是0,half消息已经发送了5条,而op消息只收到4条,可以看到half逻辑offset=3的这条消息还未收到二阶段指令。
消费HALF消息
TransactionalMessageServiceImpl#check:
while-true循环处理half消息。
如果half消息进度在removeMap中,代表收到二阶段指令,直接将op逻辑offset加入doneOpOffset中;
如果half消息进度不在removeMap中,可能需要执行回查,为什么说是可能,因为op一次才拉32条,不一定足够匹配当前half消息。
TransactionalMessageServiceImpl#check:
Step1,根据half消费进度(逻辑offset)查询消息。
如果没有找到,代表还没有新的half消息到达,直接退出。
Step2,校验回查次数。
如果half消息回查超过15次,投递到topic=TRANS_CHECK_MAXTIME_TOPIC。
如果half消息回查未超过15次,needDiscard将Message的properties的回查次数+1。
Step3,如果这条half消息存储时间大于本轮检查的开始时间,直接退出。
Step4,如果这条half消息,还未到超时时间6s,直接退出,后面的half消息也不可能到超时时间。
Step5,
如果需要回查,将half消息重新写入HALF_TOPIC,重入次数+1,调用producer回查事务状态;
如果不需要回查,fillOpRemoveMap再拉32条op消息,进行下一轮判断。
HALF回查
回写half消息
在向producer发送回查请求前,先要将当前的half消息重新写一份到commitlog。
TransactionalMessageServiceImpl#putBackHalfMsgQueue:
注意,新half消息的逻辑offset和commitlog物理offset会填充到老half消息中。
这意味着客户端收到的回查请求,一定是针对新half消息的,老half消息将被消费,老op消息将被视为失效后续进入doneOpOffset。
TransactionalMessageBridge#renewHalfMessageInner:根据老half消息,生成新half消息。
重新写入HALF_TOPIC的消息的生成时间戳是不会变的。
(TransactionalMessageServiceImpl#needDiscard校验回查次数的时候,已经将回查次数+1了)
发送CHECK_TRANSACTION_STATE请求
AbstractTransactionalMessageCheckListener#resolveHalfMsg:
half消息重新写入commitlog之后,才会异步发送回查请求给producer。
线程池:2核心线程,5最大线程,2000队列,CallerRuns拒绝策略。
AbstractTransactionalMessageCheckListener#sendCheckMessage:
broker根据生产组,找连接broker的一个生产者实例,发送CHECK_TRANSACTION_STATE请求。
其中包含transactionId(生产者messageId),producer可以用transactionId确认本地事务情况。
更新HALF&OP消费进度
当没有更新的half消息或half头部消息未超时,结束本轮校验,更新两个topic的消费进度。
half和op提交的情况非常相似:
TransactionalMessageServiceImpl#calculateOpOffset:
需要注意的是,op消息并非只要匹配到half消息/half超时,op消息offset就可以提交到该位置。
op提交只能提交到第一个未被half匹配的op消息。
假设目前一共收到4条half消息,收到3个commit/rollback,关系如下。
其中half2尚未收到对应op且还未超时,于是结束本轮循环(即使half3能匹配op1,half的消费进度同样也不能跳)。
doneOffset中会包含0和2,此时opOffset只能前进到1,而不是3。
如果opOffset前进到3,下一次half3消息就匹配不到op消息了。
producer回查
ClientRemotingProcessor#checkTransactionState:
producer收到CHECK_TRANSACTION_STATE。
DefaultMQProducerImpl#checkTransactionState:
提交回查请求到checkExecutor。(默认1线程2000队列)
执行用户TransactionListener#checkLocalTransaction。
最终还是走END_TRANSACTION响应broker。
总结
延迟消息
producer通过设置Message.properties的DELAY为指定延迟级别,发送延迟消息。
默认broker支持18个延迟级别,从1s到2h,可以通过设置messageDelayLevel修改。
broker收到消息后,判断DELAY非空,识别为延迟消息。
写入commitlog前,修改原始消息:
commitlog写入后,DispatchRequest中的tag的hashCode会被修改为目标投递时间。
最终ConsumeQueue中的最后8字节变为目标投递时间戳。
broker针对每个delayLevel开启一个DeliverDelayedMessageTimerTask消费queueId=delayLevel-1中的延迟消息。
需要注意的是,在4.x版本,所有延迟级别的task都使用同一个线程,用jdk的Timer实现。
Task拉取consumequeue记录后,进行判断:
SCHEDULE_TOPIC_XXXX的消费进度存储在delayOffset.json中,每隔10s对内存中的消费进度进行持久化(和消费使用同一个timer线程)。
事务消息
producer需要指定生产者组和TransactionListener实现。
TransactionListener负责本地事务执行和本地事务状态查询。
正常流程
producer端使用TransactionMQProducer#sendMessageInTransaction发送事务消息。
Step1:producer发送half消息给broker。
设置properties.TRAN_MSG=true,代表这是一条事务半消息;
设置properties.PGROUP=生产者组,用于消息回查;
producer等待broker响应。
Step2:broker收到half消息,判断TRAN_MSG=true,处理事务消息。
broker将topic替换为RMQ_SYS_TRANS_HALF_TOPIC,queueId替换为0;真实topic存储到REAL_TOPIC和REAL_QID。
最终存储到commitlog,构建consumequeue。
Step3:producer收到broker响应,执行本地事务。
如果发送broker异常,直接抛出MQClientException,sendMessageInTransaction结束。
如果发送broker成功,但返回刷盘超时、同步SLAVE超时、SLAVE不可用,不执行本地事务,state=ROLLBACK;
如果发送broker成功,且SEND_OK,执行本地事务TransactionListener#executeLocalTransaction
Step4:producer发送END_TRANSACTION,告知broker本地事务执行状态state。
END_TRANSACTION请求是个oneway请求,且try-catch所有异常,如果有问题由broker补偿回查。
END_TRANSACTION绝大多数参数来源于half消息响应,包含half消息的物理offset、逻辑offset等。
至此TransactionMQProducer#sendMessageInTransaction结束。
broker处理END_TRANSACTION
对于state=UNKNOWN不处理;
对于state=ROLLBACK,根据物理offset查询half消息,写op消息;
对于state=COMMIT,根据物理offset查询half消息,写真实消息(REAL_TOPIC和REAL_QID替换),写op消息;
op消息相关属性:
op消息用于标记一个half消息是否收到二阶段commit/rollback,
如果half消息超时未收到对应op消息会由broker发起回查。
回查
TransactionalMessageCheckService线程每隔60秒进行一次回查逻辑。
broker以group=CID_RMQ_SYS_TRANS的名义,消费half消息(RMQ_SYS_TRANS_HALF_TOPIC)和op消息(RMQ_SYS_TRANS_OP_HALF_TOPIC)。
Step1,查询消费进度。
分别查询half消息和op消息的消费进度。
Step2,首次拉取32条op消息。
如果op消息对应half消息(body中的half消息逻辑offset)已经被处理(body中的half消息offset小于当前half消费进度),op消息被忽略,加入doneOpOffset集合;
如果op消息对应half消息还未处理,op消息加入removeMap,key是half消息逻辑offset,value是op消息逻辑offset;
Step3,消费HALF消息。
根据当前half消费进度,进行while-true循环。
如果half消息在removeMap中,代表half消息已经匹配到op,收到二阶段指令,不用回查,将half消息加入doneOpOffset集合;
如果half消息不在removeMap中,可能代表half消息还未收到二阶段指令:
Step4,消息回查。
broker侧:
producer侧:回调TransactionListener查询本地事务状态,返回END_TRANSACTION请求给broker(同上)。
Step5,上述while-true执行half消息消费完毕,提交half和op消息的消费进度。
half和op提交offset的情况非常相似:
需要注意的是,op消息并非只要匹配到half消息/half超时,op消息offset就可以提交到该位置。
op提交只能提交到第一个未被half匹配的op消息。
欢迎大家评论或私信讨论问题。
本文原创,未经许可不得转载。
欢迎关注公众号【程序猿阿越】。