SpringBoot分布式事务之可靠消息最终一致性

2023年 7月 25日 46.8k 0

环境:springboot2.3.9 + RocketMQ4.8.0

可靠消息最终一致性原理

图片

  • 执行流程
  • Producer发送Prepare message到broker。
  • Prepare Message发送成功后开始执行本地事务。
  • 如果本地事务执行成功的话则返回commit,如果执行失败则返回rollback。(这个是在事务消息的回调方法里由开发者自己决定commit or rollback)
  • Producer发送上一步的commit还是rollback到broker,这里有以下两种情况:
  • 1、如果broker收到了commit/rollback消息 :

    如果收到了commit,则broker认为整个事务是没问题的,执行成功的。那么会下发消息给Consumer端消费。

    如果收到了rollback,则broker认为本地事务执行失败了,broker将会删除Half Message,不下发给Consumer端。

    2、如果broker未收到消息(如果执行本地事务突然宕机了,相当执行本地事务(executeLocalTransaction)执行结果返回unknow,则和broker未收到确认消息的情况一样处理。):

    broker会定时回查本地事务的执行结果:如果回查结果是本地事务已经执行则返回commit,若未执行,则返回unknow。

    Producer端回查的结果发送给Broker。Broker接收到的如果是commit,则broker视为整个事务执行成功,如果是rollback,则broker视为本地事务执行失败,broker删除Half Message,不下发给consumer。如果broker未接收到回查的结果(或者查到的是unknow),则broker会定时进行重复回查,以确保查到最终的事务结果。重复回查的时间间隔和次数都可配。

    工程结构

    图片图片

    建立父子工程,两个子项目account-manager,integral-manager。

    依赖

    
      org.springframework.boot
      spring-boot-starter-data-jpa
    
    
      org.springframework.boot
      spring-boot-starter-web
    
    
      org.apache.rocketmq
      rocketmq-spring-boot-starter
      2.2.0
    

    Account子模块

    • 配置文件
    server:
      port: 8081
    ---
    rocketmq:
      nameServer: localhost:9876
      producer:
        group: pack-mq
    ---
    spring:
      jpa:
        generateDdl: false
        hibernate:
          ddlAuto: update
        openInView: true
        show-sql: true
    ---
    spring:
      datasource:
        driverClassName: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/account?serverTimeznotallow=GMT%2B8
        username: root
        password: ******
        type: com.zaxxer.hikari.HikariDataSource
        hikari:
          minimumIdle: 10
          maximumPoolSize: 200
          autoCommit: true
          idleTimeout: 30000
          poolName: MasterDatabookHikariCP
          maxLifetime: 1800000
          connectionTimeout: 30000
          connectionTestQuery: SELECT 1
    • 业务实体类
    // 用户表
    @Entity
    @Table(name = "t_account")
    public class Account {
      @Id
      private Long id;
      private String name ;
    }
    // 业务记录表(用来查询去重)
    @Entity
    @Table(name = "t_account_log")
    public class AccountLog {
      @Id
      private Long txid;
      private Date createTime ;
    }
    • DAO相关类
    public interface AccountRepository extends JpaRepository {
    }
    public interface AccountLogRepository extends JpaRepository {
    }
    • Service相关类
    @Resource
    private AccountRepository accountRepository ;
    @Resource
    private AccountLogRepository accountLogRepository ;
      
    // 该方法保存业务数据,同时保存操作记录;操作记录用来回查。
    @Transactional
    public boolean register(Account account) {
      accountRepository.save(account) ;
      AccountLog accountLog = new AccountLog(account.getId(), new Date()) ;
      accountLogRepository.save(accountLog) ;
      return true ;
    }
      
    public AccountLog existsTxId(Long txid) {
      return accountLogRepository.findById(txid).orElse(null) ;
    }
    • 发送消息方法
    @Resource
    private RocketMQTemplate rocketMQTemplate ;
      
    public String sendTx(String topic, String tags, Account account) {
      String uuid = UUID.randomUUID().toString().replaceAll("-", "") ;
      TransactionSendResult result =rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(account).
          setHeader("tx_id", uuid).build(), uuid) ;
      return result.getSendStatus().name() ;
    }
    • 消息监听(生产者监听)
    @RocketMQTransactionListener
    public class ProducerMessageListener implements RocketMQLocalTransactionListener {
      
      @Resource
      private AccountService accountService ;
    
    
      @Override
      public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
          Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ;
          accountService.register(account) ;
        } catch (Exception e) {
          e.printStackTrace() ;
          return RocketMQLocalTransactionState.ROLLBACK ;
        }
        return RocketMQLocalTransactionState.COMMIT ;
      }
    
    
      @Override
      public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
      // 这里检查本地事务是否执行成功
        try {
          Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ;
          System.out.println("执行查询ID为:" + account.getId() + " 的数据是否存在") ;
          AccountLog accountLog = accountService.existsTxId(account.getId()) ;
          if (accountLog == null) {
            return RocketMQLocalTransactionState.UNKNOWN ;
          }
        } catch (Exception e) {
          e.printStackTrace() ;
          return RocketMQLocalTransactionState.UNKNOWN ;
        }
        return RocketMQLocalTransactionState.COMMIT ;
      }
    
    
    }
    • Controller接口
    @RestController
    @RequestMapping("/accounts")
    public class AccountController {
      @Resource
      private ProducerMessageService messageService ;
      @PostMapping("/send")
      public Object sendMessage(@RequestBody Account account) {
        return messageService.sendTx("tx-topic", "mks", account) ;
      }
    }

    Integral子模块

    • 业务实体类
    @Entity
    @Table(name = "t_integral")
    public class Integral {
      @Id
      private Long id;
      private Integer score ;
      private Long acccountId ;
    }
    • DAO相关类
    public interface IntegralRepository extends JpaRepository {
    }
    • Service相关类
    @Resource
    private IntegralRepository integralRepository ;
      
    @Transactional
    public Integral saveIntegral(Integral integral) {
      return integralRepository.save(integral) ;
    }
    • 消息监听
    @RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "mks")
    @Component
    public class IntegralMessageListener implements RocketMQListener {
    
    
      @Resource
      private IntegralService integralService ;
      
      @SuppressWarnings("unchecked")
      @Override
      public void onMessage(String message) {
        System.out.println("Integral接收到消息:" + message) ;
        try {
          Map jsonMap = new JsonMapper().readValue(message, Map.class) ;
          Integer id = (Integer) jsonMap.get("id") ;
          integralService.saveIntegral(new Integral(1L, 1000, id + 0L)) ;
        } catch (Exception e) {
          throw new RuntimeException(e) ;
        }
      }
    
    
    }

    测试

    分别启动两个子模块

    • 初始数据表

    图片图片

    • Postman测试

    图片图片

    Account模块

    图片图片

    Integral模块

    图片图片

    当子模块Account执行本地事务发生错误时,事务会回滚并且删除消息。子模块Integral并不会收到消息。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论