1. 问题&分析
基于 MQ 进行系统间的解耦真的是太香了,小艾还沉浸在喜悦中久久不能自拔。但,打脸的事已经在路上了。。。。
1.1. 案例
昨天下班,在电梯里,物流组的晨姐偶遇了小艾,就一个技术问题向小艾进行了反馈。具体来说,物流系统有一项功能,即实时监控订单的支付成功事件,一旦检测到,便会为顾客准备物资,进而安排快递发货。今天系统出现了几次空指针异常。查阅日志,似乎是在反查订单信息时,没有获取到预期的订单数据。但查询物流系统,物流单已经成功生成,对业务操作并未造成实际影响,但这个问题还是值得注意。由于这个问题并没有立即影响到业务流程,所以晨姐没有在第一时间联系小艾进行确认。
在小艾正准备启动IDEA寻找线索的时候,算法组的负责人龙哥急匆匆地走了过来,向小艾反映了他们团队遇到的一个重要问题。为了提升推荐效果,算法组也会实时监控订单支付成功事件,并以此为依据重新计算用户的推荐商品。然而,今天早上,他们突然收到了一系列的报警信息,问题同样是无法查询到订单信息,这个现象与物流系统的问题高度相似。
小艾随口问道:“这个问题会自己修复吗?”龙哥愣了一下,回答说:“以前会自动修复,但刚刚那条数据还在报错。”随后,龙哥提供了报错的订单ID,小艾立即去数据库中查询,却惊讶地发现,这条数据竟然不存在!
看到这种场景,小艾有些慌神,连龙哥什么时候走的都没有注意到。目光直勾勾的盯着电脑屏幕发呆:
@Transactional
public void paySuccess(String orderId, String token){
// 验证 token,保障有效性
checkToke(token);
// 加载订单信息
Order order = this.orderRepository.getById(orderId);
if (order == null){
throw new RuntimeException("订单不存在");
}
// 支付成功,更新订单状态
order.paySuccess();
// 将变更更新到数据库
this.orderRepository.update(order);
// 发送支付成功事件
this.eventPublisher.publishEvent(new OrderPaidEvent(order));
// 执行其他业务逻辑
doSomething();
}
// 监听变更,发布 MQ
@EventListener
public void handle(OrderPaidEvent event){
rocketMQTemplate.convertAndSend("order_event", event);
}
1.2. 问题分析
两个问题看起来一样,但又有区别。当下游在收到 MQ 消息时
小艾无意间看到 paySuccess 方法上的 @Transactional 顿时茅塞顿开。
图片
正如上图所示:
这就完美的解释了物流问题,那为什么算法组收到消息里的订单ID在数据库不存在呢?
图片
如上图所示:
小艾终于锁定了问题所在,深深地吸了一口气,释放了紧绷的神经。就在这时,晨姐的电话打了进来。小艾喃喃自语:“毫无疑问,和算法部门遇到的情况一样,被XXX订单给堵住了。”说罢,她信心满满地接起了电话…
本质:该问题根本原因是==没有保障 更新数据库操作 与 发送消息操作这两个业务单元之间的一致性。==
2. 解决方案
定位后,解决方案就变的非常清晰。
2.1. 方案1:使用 @TransactionalEventListener
最简的方案就是将 @EventListener 注解 换成 @TransactionalEventListener。
2.1.1. EventListener 和 TransactionalEventListener
EventListener 和 TransactionalEventListener 都是 Spring 中用于处理事件的监听器。它们之间的主要区别在于它们处理事件的方式和事务管理。
总之,EventListener 和 TransactionalEventListener 的主要区别在于它们处理事件的方式和事务管理。在选择使用哪种监听器时,需要根据实际需求和事务一致性的要求来决定。
2.1.2. 源码示例
了解两者的区别后,只需做一点调整便可以解决这个问题,调整如下:
/**
* 使用 @TransactionalEventListener 替代 @EventListener 监听订单支付事件,然后发送消息到 RocketMQ
* @param event
*/
@TransactionalEventListener
public void handle(OrderPaidEvent event){
rocketMQTemplate.convertAndSend("order_event", event);
}
如果没有使用 Spring 的 Event 机制,但仍想实现 @TransactionEventlistner 的效果,可以直接使用 Spring API:
private void doIfCommitted(Runnable task) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronization transactionSynchronization = new TransactionSynchronizationAdapter(){
@Override
public void afterCommit() {
task.run();
}
};
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
}else {
task.run();
}
}
2.1.3. 问题&挑战
这个方案确实解决了上述问题,但从一致性角度分析,还是存在设计缺陷,只是发生的概率变低而已,没有从根本上解决问题。
在事务提交后发送 MQ 时,可能会遇到以下几种情况,导致两个操作(数据库操作和 MQ 发送操作)之间的一致性问题:
这个方案极为简单,但大幅降低了错误概率,主要应用于要求并不严格的业务场景。
2.2 方案2:RocketMQ事务消息
RocketMQ 的事务消息就是针对这个问题设计的,可以非常高效的解决这个问题。
2.2.1. 半消息以及工作原理
RocketMQ事务消息是一种支持分布式事务的消息模型,将消息生产和消费与业务逻辑绑定在一起,确保消息发送和事务执行的原子性,保证消息的可靠性。
事务消息分为两个阶段:发送消息和确认消息,确认消息分为提交和回滚两个操作。在提交操作执行完毕后,消息才会被消费端消费,而在回滚操作执行完毕后,消息会被删除,从而达到了事务的一致性和可靠性。
事务消息的发生流程如下:
图片
如果生成者发送 prepare 消息后,未在规定时间内发送 commit 或 rollback 消息,RocketMQ 将进入恢复流程,具体如下:
图片
2.2.2. 源码示例
一个简单的示例代码如下:
// 编写事务监听器类
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
// 执行本地事务
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
System.out.println("executeLocalTransaction " + value);
// TODO 执行本地事务,并返回事务状态
// 本例假定 index 为偶数的消息执行成功,奇数的消息执行失败
if (value % 2 == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// 检查本地事务状态
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("checkLocalTransaction " + msg.getTransactionId());
// 模拟检查本地事务状态,返回事务状态
boolean committed = prepare(true);
if (committed) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
// 模拟操作预处理逻辑
private boolean prepare(boolean commit) {
System.out.println("prepare " + (commit ? "commit" : "rollback"));
return commit;
}
}
// 编写发送消息的代码
public class Producer {
private static final String NAME_SERVER_ADDR = "localhost:9876";
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("MyGroup");
producer.setNamesrvAddr(NAME_SERVER_ADDR);
// 注册事务监听器
producer.setTransactionListener(new TransactionListenerImpl());
producer.start();
// 发送事务消息
String[] tags = {"TagA", "TagB", "TagC"};
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicTest", tags[i], ("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8));
// 在消息发送时传递给事务监听器的参数
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
}
// 关闭生产者
producer.shutdown();
}
}
单看代码很难理解,简单画了张图,具体如下:
图片
2.2.3. 问题&挑战
事务消息并不完美,存在一定的问题:
2.3. 方案3:本地消息表
事务消息表方案是一种常用的保证消息发送与业务操作一致性的方法。该方案基于数据库事务和消息队列,将消息发送和业务操作放入同一个事务中,并将业务操作和消息发送的状态记录在数据库的消息表中,以实现消息的可靠性和幂等性。
2.3.1. 设计&核心流程
整体如下图所示:
图片
image
核心流程如下:
通过事务消息表方案,可以保证消息的可靠性。即使在消息发送失败或应用程序崩溃的情况下,也可以通过重新发送消息将业务操作和消息发送的状态同步。同时,该方案可以避免消息重复发送和漏发的情况。
2.3.2. 功能封装
清晰的流程为复用打下了基础,lego 对其做了封装。
2.3.2.1. 环境准备
首先,需要引入 lego 相关依赖:
com.geekhalo.lego
lego-starter
0.1.12 以上版本
其次,在业务数据库上新建一张表用于存储消息,示例如下:
create table test_message
(
id bigint auto_increment primary key,
orderly tinyint not null comment '是否为顺序消息',
topic varchar(64) not null comment 'MQ topic',
sharding_key varchar(128) not null comment 'ShardingKey,用于选择不同的 partition',
tag varchar(128) not null comment 'Message Tag 信息',
msg_id varchar(64) not null comment 'Msg ID 只有发送成功后才有数据',
msg_key varchar(64) not null comment 'MSG Key,用于查询数据',
msg longtext not null comment '要发送的消息',
retry_time tinyint not null comment '重试次数',
status tinyint not null comment '发送状态:0-初始化,1-发送成功,2-发送失败',
create_time datetime not null,
update_time datetime not null,
index idx_update_time_status(update_time, status)
);
为了兼容多种MQ类型,对发送者进行了抽象,因此需要实现自己的 MessageSender。
@Component
@Getter
@Slf4j
public class TestMessageSender implements MessageSender {
@Override
public String send(Message message) {
// 发送消息
}
}
最后,就是对所有的组件进行配置,示例代码如下:
@Configuration
@Slf4j
public class LocalTableBasedReliableMessageConfiguration
extends LocalTableBasedReliableMessageConfigurationSupport {
@Autowired
private DataSource dataSource;
@Autowired
private MessageSender messageSender;
@Override
protected DataSource dataSource() {
return this.dataSource;
}
@Override
protected String messageTable() {
return "test_message";
}
@Override
protected MessageSender createMessageSend() {
return this.messageSender;
}
}
其中,包括:
2.2.3.2. 具体使用
ReliableMessageSender#send 在业务方法中使用,执行可靠消息发送;
@Transactional
public void testSuccess(){
// 业务逻辑
Message message = buildMessage();
// 业务逻辑
this.reliableMessageSender.send(message);
}
除发送流程外,还需要配置补充机制。
ReliableMessageCompensator#compensate 周期性调度,对未发送或发送失败的消息进行补充;
4. 示例&源码
代码仓库:https://gitee.com/litao851025/learnFromBug
代码地址:https://gitee.com/litao851025/learnFromBug/tree/master/src/main/java/com/geekhalo/demo/mq/sender