SpringBoot分布式事务之可靠消息最终一致性
环境:springboot2.3.9 + RocketMQ4.8.0
可靠消息最终一致性原理
- 执行流程
1、如果broker收到了commit/rollback消息 :
如果收到了commit,则broker认为整个事务是没问题的,执行成功的。那么会下发消息给Consumer端消费。
如果收到了rollback,则broker认为本地事务执行失败了,broker将会删除Half Message,不下发给Consumer端。
2、如果broker未收到消息(如果执行本地事务突然宕机了,相当执行本地事务(executeLocalTransaction)执行结果返回unknow,则和broker未收到确认消息的情况一样处理。):
broker会定时回查本地事务的执行结果:如果回查结果是本地事务已经执行则返回commit,若未执行,则返回unknow。
Producer端回查的结果发送给Broker。Broker接收到的如果是commit,则broker视为整个事务执行成功,如果是rollback,则broker视为本地事务执行失败,broker删除Half Message,不下发给consumer。如果broker未接收到回查的结果(或者查到的是unknow),则broker会定时进行重复回查,以确保查到最终的事务结果。重复回查的时间间隔和次数都可配。
工程结构
图片
建立父子工程,两个子项目account-manager,integral-manager。
依赖
org.springframework.boot spring-boot-starter-data-jpa org.springframework.boot spring-boot-starter-web org.apache.rocketmq rocketmq-spring-boot-starter 2.2.0
Account子模块
- 配置文件
server: port: 8081 --- rocketmq: nameServer: localhost:9876 producer: group: pack-mq --- spring: jpa: generateDdl: false hibernate: ddlAuto: update openInView: true show-sql: true --- spring: datasource: driverClassName: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/account?serverTimeznotallow=GMT%2B8 username: root password: ****** type: com.zaxxer.hikari.HikariDataSource hikari: minimumIdle: 10 maximumPoolSize: 200 autoCommit: true idleTimeout: 30000 poolName: MasterDatabookHikariCP maxLifetime: 1800000 connectionTimeout: 30000 connectionTestQuery: SELECT 1
- 业务实体类
// 用户表 @Entity @Table(name = "t_account") public class Account { @Id private Long id; private String name ; } // 业务记录表(用来查询去重) @Entity @Table(name = "t_account_log") public class AccountLog { @Id private Long txid; private Date createTime ; }
- DAO相关类
public interface AccountRepository extends JpaRepository { } public interface AccountLogRepository extends JpaRepository { }
- Service相关类
@Resource private AccountRepository accountRepository ; @Resource private AccountLogRepository accountLogRepository ; // 该方法保存业务数据,同时保存操作记录;操作记录用来回查。 @Transactional public boolean register(Account account) { accountRepository.save(account) ; AccountLog accountLog = new AccountLog(account.getId(), new Date()) ; accountLogRepository.save(accountLog) ; return true ; } public AccountLog existsTxId(Long txid) { return accountLogRepository.findById(txid).orElse(null) ; }
- 发送消息方法
@Resource private RocketMQTemplate rocketMQTemplate ; public String sendTx(String topic, String tags, Account account) { String uuid = UUID.randomUUID().toString().replaceAll("-", "") ; TransactionSendResult result =rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(account). setHeader("tx_id", uuid).build(), uuid) ; return result.getSendStatus().name() ; }
- 消息监听(生产者监听)
@RocketMQTransactionListener public class ProducerMessageListener implements RocketMQLocalTransactionListener { @Resource private AccountService accountService ; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ; accountService.register(account) ; } catch (Exception e) { e.printStackTrace() ; return RocketMQLocalTransactionState.ROLLBACK ; } return RocketMQLocalTransactionState.COMMIT ; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 这里检查本地事务是否执行成功 try { Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ; System.out.println("执行查询ID为:" + account.getId() + " 的数据是否存在") ; AccountLog accountLog = accountService.existsTxId(account.getId()) ; if (accountLog == null) { return RocketMQLocalTransactionState.UNKNOWN ; } } catch (Exception e) { e.printStackTrace() ; return RocketMQLocalTransactionState.UNKNOWN ; } return RocketMQLocalTransactionState.COMMIT ; } }
- Controller接口
@RestController @RequestMapping("/accounts") public class AccountController { @Resource private ProducerMessageService messageService ; @PostMapping("/send") public Object sendMessage(@RequestBody Account account) { return messageService.sendTx("tx-topic", "mks", account) ; } }
Integral子模块
- 业务实体类
@Entity @Table(name = "t_integral") public class Integral { @Id private Long id; private Integer score ; private Long acccountId ; }
- DAO相关类
public interface IntegralRepository extends JpaRepository { }
- Service相关类
@Resource private IntegralRepository integralRepository ; @Transactional public Integral saveIntegral(Integral integral) { return integralRepository.save(integral) ; }
- 消息监听
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "mks") @Component public class IntegralMessageListener implements RocketMQListener { @Resource private IntegralService integralService ; @SuppressWarnings("unchecked") @Override public void onMessage(String message) { System.out.println("Integral接收到消息:" + message) ; try { Map jsonMap = new JsonMapper().readValue(message, Map.class) ; Integer id = (Integer) jsonMap.get("id") ; integralService.saveIntegral(new Integral(1L, 1000, id + 0L)) ; } catch (Exception e) { throw new RuntimeException(e) ; } } }
测试
分别启动两个子模块
- 初始数据表
图片
- Postman测试
图片
Account模块
图片
Integral模块
图片
当子模块Account执行本地事务发生错误时,事务会回滚并且删除消息。子模块Integral并不会收到消息。