完美解决,RocketMQ如何支持多事务消息?

2024年 2月 4日 93.2k 0

今天我们将解决使用RocketMQ事务消息时可能遇到的一个常见问题:如何让其支持多事务消息?

1. 问题背景

在实际开发中,我们常常会面临多事务消息的场景,例如在DailyMart的订单模块中,用户支付后需要调用库存服务进行库存扣减,而在订单确认收货后需要调用用户服务实现积分赠送。这两个业务逻辑都需要通过事务消息来保证分布式事务。

为了处理这种情况,我们可能会考虑在订单模块中创建两个事务消息监听器,分别用于处理库存扣减和积分赠送的事务处理和事务回查。

@Component
@Slf4j
//处理订单支付的事务监听器
public class OrderPaidTransactionListener implements RocketMQLocalTransactionListener {
  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    ......
    //处理订单支付逻辑
   }

  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
      ......
      //检查订单处理逻辑
   }
}

@Component
@Slf4j
//处理订单收货的事务监听器
public class OrderReceivedTransactionListener implements RocketMQLocalTransactionListener {
  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
    ......
   }

  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
      ......
   }
}

然而,当我们信心满满地完成业务逻辑编写并启动服务时,可能会遇到如下错误:rocketMQTemplate already exists RocketMQLocalTransactionListener

图片图片

在rocketmq-spring-boot-starter版本低于2.1.0的项目中,可以使用多个 @RocketMQTransactionListener 监听不同的 txProducerGroup 来发送不同类型的事务消息到topic。然而,从 RocketMQ-Spring 2.1.0 版本开始,注解 @RocketMQTransactionListener 不能设置 txProducerGroup、ak、sk,这些值均需与对应的 RocketMQTemplate 保持一致。通过阅读源码 RocketMQTransactionConfiguration#registerTransactionListener() 方法,也可得知在RocketMQ如果已经存在了 RocketMQTransactionListener 则会出现上述错误。

图片图片

2. 如何解决

为了在保证系统只有一个 RocketMQTransactionListener 的前提下实现多事务消息,我们可以将 RocketMQLocalTransactionListener 不处理具体业务逻辑,而是将其作为一个分发器使用。

在生产者发送事务消息时指定对应的事务处理器 ,并将事务处理器放置在消息头上发送出去,在 RocketMQTransactionListener 中根据消息头选择具体的事务处理器来实现业务逻辑。

具体实现如下:

2.1 定义事务消息处理接口

首先,定义公共的事务消息处理接口,所有事务消息都实现此接口而非 RocketMQ 默认的 RocketMQLocalTransactionListener。

public interface TransactionMessageHandler {
    
    /**
    * 执行本地事务
    * @param payload 消息体
    * @param arg 参数
    */
    RocketMQLocalTransactionState executeLocalTransaction(Object payload, Object arg);
    
    /**
     * 检查本地执行状态
     * @param payload 消息体
     * @return 执行结果
     */
    RocketMQLocalTransactionState checkLocalTransaction(Object payload);
    
}

2.2 修改事务消息发送工具类,指定消息处理器

public  TransactionSendResult sendTransaction(String topic, String tag, T message, Class

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论