事务消息是RocketMQ的一个非常特色的高级特性,它的基础诉求是通过RocketMQ的事务机制,来保证上下游的数据⼀致性。
我们在单机版本下面只需要在业务方法上加上对应的事务就可以达到效果,但是分布式的场景下,多个系统之间的协调配合,你无法知道到底是那个先执行那个后执行,当然在微服务里面存在Seate框架来保证事务,但是这事务的保证始终是心头大患,只能用一句话形容鱼和熊掌不可兼得。
而RocketMq的事务消息能够在提升性能的情况下满足要求,其主要实现是支持分布式情况下保障消息生产和本地事务的最终一致性,消息生产我们可以使用顺序消息去执行,这样我们只需要满足这两个的事务即可。
实现过程
图片
准备阶段:生产者将消息发送到Broker,Broker向生产者发送ack表示消息发送成功,但是此时的消息为一个等待状态,不会被消费者去消费。(生产者继续执行接下来的代码)
确认阶段:当我们执行完所有的代码后,本地事务要么回滚要么提交,此时当我们了解本地事务的状态后,将结果推送给Broker做二次确认结果,如果为Commit则将修改激活准备推送给消费者,如果为Rollback则将消息进行回滚。
补偿机制:当出现异常情况没有发生二次确认,此时我们在固定时间后将会进行回查,检查回查消息对应的本地事务的状态,重写Commit或者Rollback。
涉及状态以及注意点
事务消息存在三种状态:
CommitTransaction:提交事务状态,此状态下允许消费者消费。
RollbackTransaction:回滚事务状态,此状态下消息会被删除。
Unknown:中间状态,此状态下会等待本地事务处理结果进行对应操作。
注意点:
本消息状态是一种对消费者不可见的状态,将消息的内容放到系统Topic的RMQ_SYS_TRANS_HALF_TOPIC队列里面去。
事务消息中的相关参数可以进行设置,比如:本地事务回查次数transactionCheckMax默认15次,本地事务回查的间隙transactionCheckInterval默认60s,超出后会直接将消息丢弃。
RocketMQ的事务消息是指应用本地事务和发送消息操作可以定义到全局事务中,要么同时成功,要么同时失败,通过RocketMQ的事务信息可以实现可靠消息的最终一致性方案。
源码解析
Producer端通过构建TransactionMQProducer对象绑定事务监听。
TransactionListener transactionListener = new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { return LocalTransactionState.COMMIT_MESSAGE; }
@Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { return LocalTransactionState.COMMIT_MESSAGE; }};TransactionMQProducer producer = new TransactionMQProducer(producerGroupTemp);producer.setTransactionListener(transactionListener);producer.setNamesrvAddr("127.0.0.1:9876");product.start();SendResult result = producer.sendMessageInTransaction(message, arg);
执行sendMessageInTransaction方法来发送消息。
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
// 检查TransactionListener是否存在,如果不存在就直接抛异常
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// 事务消息不支持延迟等特性
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// 设置half属性,表明是事务属性
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
// 设置所属生成者组
// broker向生产者发送回查事务请求根据这个producergroup找到指定的channel
// 生产者能找到所有在同一个组的机器实例从而检查事务状态
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
// 同步发送
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
// 消息返回信息
switch (sendResult.getSendStatus()) {
// 第一阶段消息发送成功
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
// 设置事务ID属性
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
// 执行本地事务
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
// 发送消息成功后,执行本地操作
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
// 本地事务执行完毕向broker提交事务或回滚事务
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
首先发送第一阶段信息直接返回半提交状态,然后执行本地事务返回事务的三种状态,未知,回滚,提交,最后执行endTransaction方法,把事务执行的状态告诉broker。
endTransaction方法
根据本地事务执行状态构建requestHeader对象执行二阶段提交。
public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
// 获取消息中的MessageId
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
// 找到broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
// 构建EndTransactionRequestHeader对象
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
// offset是prepare消息中offsetMsgId中获取的
requestHeader.setCommitLogOffset(id.getOffset());
requestHeader.setBname(sendResult.getMessageQueue().getBrokerName());
// 社会提交/回滚状态
switch (localTransactionState) {
case COMMIT_MESSAGE:
// 提交
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
// 回滚
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
// 未知
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
// 发送给broker端
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
将本地方法执行事务的结果发送给Broker,通过endTransactionOneway方法创建Code为END_TRANSACTION的消息,然后在Broker就会找出对应的Processor来处理。
Broker端处理
Broker总共存在两个处理,首先针对第一个阶段发送的Half消息,broker要进行相关的操作,后面endTransaction提交进来的事务状态,针对三种状态进行相关操作。
接收第一阶段发送的Half消息
SendMessageProcessor的sendMessage方法中去执行处理事务消息。
// 发送Half消息时,在属性中设置了PROPERTY_TRANSACTION_PREPARED为true,这里根据这个属性判断是否是事务消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
// 事务消息进入这里,把消息的topic改成RMQ_SYS_TRANS_HALF_TOPIC,以同步刷盘的方式存入store
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
}
如果消息携带事务标记就去执行TransactionMessageService类的prepareMessage方法进行相关的处理。
// 解析Half消息
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 把真实的topic和真实的queueId放在消息的属性中
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
// 设置默认的事务状态为TRANSACTION_NOT_TYPE=>unknow
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 将消息的topic设置为RMQ_SYS_TRANS_HALF_TOPIC,这个是对消费者不可见的
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
// 设置queueId=0
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
进行topic的切换,将原来的topic存入到消息的属性里面,将消息的topic设置为RMQ_SYS_TRANS_HALF_TOPIC。
处理endTransaction方法
在endTransaction方法中将消息同步给Broker处理的Code对应为END_TRANSACTION,Broker就会找出对应的Processor来处理该类即调用EndTransactionProcessor类的processRequest方法处理。
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 根据commitLogOffset获取文件中的message,获取到了返回success
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 检查消息是否一致
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 生成要保存的消息
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 把真实的topic消息存储到CommitLog中
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 移除prepare消息,存入opQueueMap中
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
// 回滚
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 查询到half消息则返回成功
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 检查消息是否一致
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 移除prepare消息,存入opQueueMap中
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
仅仅展示相关核心代码,其主要逻辑:首先去判断请求的方式是commit还是rollback,如果是commit查询到消息还原消息原来的topic,然后删除half topic上的消息转存到opQueueMap中,如果是rollback直接进行删除half topic上的消息并转存到opQueueMap中去。
注意:opQueueMap的引入为了解决有可能出现网络、进程、线程等各种因素导致消费端未能成功处理消息的情况,该机制的作用是在消费者端将未成功处理的消息重新发送到服务端进行重试,直到确认消息已经被成功处理或者达到最大重试次数后进行回滚操作。而 Op 消息本身则是通过修改消息状态来实现的。
消息回查
当网络中断或者响应超时等各种异常信息导致消息并没有传送到broker端去,为了解决这一问题在Broker就开启一个回查线程每隔一分钟执行一次处理超过6s未回查的消息,当超过15次回查后直接将消息丢弃。
在启动BrokerController类时,会去调用startProcessorByHa方法如果是Master节点就会去启动一个线程每隔6s处理未回查的消息,检查最大次数为15次。
public void run() {
log.info("Start transaction check service thread!");
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
// 检查回查消息 timeout = 6s checkMax=15
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
在check方法里面去调用listener.resolveHalfMsg(msgExt)方法去处理事务消息。
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}
执行sendCheckMessage方法发送一个检查事务状态的Code为CHECK_TRANSACTION_STATE的消息,在客户端MQClientAPIImpl初始化的时候就会去注册一个Code对应的Processor,最终就会去执行checkTransactionState方法,判断本地事务的状态,然后再去执行endTransactionOneway发起END_TRANSACTION处理。
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
// 执行线程方法
@Override
public void run() {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
// 检查本地事务
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}
// 处理事务状态
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
//
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);
thisHeader.setBname(checkRequestHeader.getBname());
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}
thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
switch (localTransactionState) {
// 提交状态
case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
// 回滚状态
case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break;
// 未知状态
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break;
default:
break;
}
String remark = null;
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}
doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);
try {
// 再次执行endTransactionOneway发起END_TRANSACTION
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
};
this.checkExecutor.submit(request);
}
总结
首先客户端Producer通过sendMessageInTransaction方法发送事务消息,Broker判断是事务消息就将消息topic存入到RMQ_SYS_TRANS_HALF_TOPIC返回给客户端,客户端继续执行逻辑。
然后调用endTransaction方法去提交本地事务通过endTransactionOneway将消息提交给Broker端,Broker端通过Code为END_TRANSACTION的处理器去处理消息调用processRequest方法来处理对应的消息,
如果由于各种原因导致消息的失败传输,为了防止这些现象的出现所以在BrokerController启动时就启动一个线程每隔6s处理未回查的消息(检查最大次数为15次)的任务来进行消息的回查,简单来说就是通过sendCheckMessage方法去注册一个Code为CHECK_TRANSACTION_STATE的消息将内容发送给客户端,然后客户端在启动时也注册对应Code的处理逻辑,通过processTransactionState方法去处理事务的状态,如果正常最后还是会去执行endTransactionOneway方法,完成事务消息。