Spring Boot+MyBatis+Atomikos+MySQL(附源码)

2023年 8月 28日 23.3k 0

我们在实际项目中,尽量规避分布式事务。但是,有些时候是真的需要做一些服务拆分从而会引出分布式事务问题。

同时,分布式事务也是面试中市场被问,可以拿着这个案例练练手,面试就可以说上个123了。

这里举个业务板栗:用户领取优惠券,需要扣减用户领取次数,然后记录一个用户领取优惠券记录。

Spring Boot+MyBatis+Atomikos+MySQL(附源码)拆分前Spring Boot+MyBatis+Atomikos+MySQL(附源码)拆分后

原本这里可以使用消息队列方式,采用异步化去新增用户领取记录。但是,这里需求是就是需要用户领完立马就能查看到自己的领取记录,那我们这里就引入了Atomikos来实现分布式事务问题。

分布式事务

分布式事务是指跨越多个计算机或数据库的事务,这些计算机或数据库之间可能存在网络延迟、故障或不一致性的情况。分布式事务需要保证所有操作的原子性、一致性、隔离性和持久性,以确保数据的正确性和完整性。

分布式事务协议有哪些?

分布式事务协议主要有两种:2PC(Two-Phase Commit)和3PC(Three-Phase Commit)。

2PC是目前最常用的分布式事务协议,其流程分为两个阶段:准备阶段和提交阶段。在准备阶段,事务协调者向所有参与者发出准备请求,参与者将本地事务执行到prepare状态,并将prepare结果返回给事务协调者。在提交阶段,如果所有参与者都执行成功,则事务协调者向所有参与者发出提交请求,参与者将本地事务提交,否则事务协调者向所有参与者发出回滚请求,参与者将本地事务回滚。

3PC是2PC的改进版,其在2PC的基础上增加了一个准备提交阶段。在准备提交阶段,协调者向参与者询问是否可以提交,如果参与者返回同意,则在提交阶段直接提交,否则在提交阶段回滚。

分布式事务常见解决方案有哪些?

分布式事务解决实现方案有:

  • 基于消息队列的分布式事务方案(如RocketMQ的开源方案)
  • 基于分布式事务框架的分布式事务方案(如Seata、TCC-Transaction等框架)
  • 基于XA协议的分布式事务方案(如JTA等)
  • 基于可靠消息最终一致性的分布式事务方案(如阿里巴巴的分布式事务中间件GTS)
  • 基于CAP原理的分布式事务方案(如CQRS架构中的事件溯源模式)

什么是JTA ?

JTA(Java Transaction API),是J2EE的编程接口规范,它是XA协议的JAVA实现。它主要定义了:

一个事务管理器的接口javax.transaction.TransactionManager,定义了有关事务的开始、提交、撤回等>操作。

一个满足XA规范的资源定义接口javax.transaction.xa.XAResource,一种资源如果要支持JTA事务,就需要让它的资源实现该XAResource接口,并实现该接口定义的两阶段提交相关的接口。
如果我们有一个应用,它使用JTA接口实现事务,应用在运行的时候,就需要一个实现JTA的容器,一般情况下,这是一个J2EE容器,像JBoss,Websphere等应用服务器。

但是,也有一些独立的框架实现了JTA,例如Atomikos, bitronix都提供了jar包方式的JTA实现框架。这样我们就能够在Tomcat或者Jetty之类的服务器上运行使用JTA实现事务的应用系统。

在上面的本地事务和外部事务的区别中说到,JTA事务是外部事务,可以用来实现对多个资源的事务性。它正是通过每个资源实现的XAResource来进行两阶段提交的控制。感兴趣的同学可以看看这个接口的方法,除了commit, rollback等方法以外,还有end(), forget(), isSameRM(), prepare()等等。光从这些接口就能够想象JTA在实现两阶段事务的复杂性。

什么是XA?

XA是由X/Open组织提出的分布式事务的架构(或者叫协议)。XA架构主要定义了(全局)事务管理器(Transaction Manager)和(局部)资源管理器(Resource Manager)之间的接口。XA接口是双向的系统接口,在事务管理器(Transaction Manager)以及一个或多个资源管理器(Resource Manager)之间形成通信桥梁。也就是说,在基于XA的一个事务中,我们可以针对多个资源进行事务管理,例如一个系统访问多个数据库,或即访问数据库、又访问像消息中间件这样的资源。这样我们就能够实现在多个数据库和消息中间件直接实现全部提交、或全部取消的事务。XA规范不是java的规范,而是一种通用的规范, 目前各种数据库、以及很多消息中间件都支持XA规范。

JTA是满足XA规范的、用于Java开发的规范。所以,当我们说,使用JTA实现分布式事务的时候,其实就是说,使用JTA规范,实现系统内多个数据库、消息中间件等资源的事务。

什么是Atomikos

Atomikos是一个非常流行的开源事务管理器,并且可以嵌入到你的Spring Boot应用中。Tomcat应用服务器没有实现JTA规范,当使用Tomcat作为应用服务器的时候,需要使用第三方的事务管理器类来作为全局的事务管理器,而Atomikos框架就是这个作用,将事务管理整合到应用中,而不依赖于application server。

Spring Boot 集成Atomikos

说一堆的理论没什么用,show me the code。

技术栈:Spring Boot+MyBatis+Atomikos+MySQL

如果你按照本文代码,注意你的mysql版本。

首先建好两个数据库(my-db_0和my-db_1),然后每个库里各建一张表。

数据库my-db_0中:

CREATE TABLE `t_user_0` (
`id` bigint NOT NULL AUTO_INCREMENT,
`user_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`age` int NOT NULL,
`gender` int NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8;

登录后复制

数据库my-db_1中:

CREATE TABLE `t_user_1` (
`id` bigint NOT NULL AUTO_INCREMENT,
`user_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`age` int NOT NULL,
`gender` int NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8;

登录后复制

这里只是为了演示分布式事务,不用在意表的具体含义。

整体项目结构

Spring Boot+MyBatis+Atomikos+MySQL(附源码)项目整体结构

maven配置

4.0.0

com.tian
spring-boot-atomikos
1.0-SNAPSHOT

jar

org.springframework.boot
spring-boot-starter-parent
2.0.0.RELEASE

spring-boot-atomikos

UTF-8

org.mybatis.spring.boot
mybatis-spring-boot-starter
1.3.1

org.springframework.boot
spring-boot-starter-web

mysql
mysql-connector-java
8.0.16

org.springframework.boot
spring-boot-starter-jta-atomikos

org.springframework.boot
spring-boot-maven-plugin

src/main/java

**/*.java

src/main/resources

**/*.*

登录后复制

properties配置

server.port=9001
spring.application.name=atomikos-demo

spring.datasource.user0.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.user0.url=jdbc:mysql://localhost:3306/my-db_0?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
spring.datasource.user0.user=root
spring.datasource.user0.password=123456

spring.datasource.user1.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.user1.url=jdbc:mysql://localhost:3306/my-db_1?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true
spring.datasource.user1.user=root
spring.datasource.user1.password=123456

mybatis.mapperLocations=classpath:/com/tian/mapper/*/*.xml
mybatis.typeAliasesPackage=com.tian.entity
mybatis.configuration.cache-enabled=true

登录后复制

数据源

/**
* @author tianwc 公众号:java后端技术全栈、面试专栏
* @version 1.0.0
* @date 2023年05月11日 19:38
* 博客地址:博客地址
*

* 配置好两个数据源
*/
@Configuration
public class DataSourceConfig {

// 将这个对象放入spring容器中(交给Spring管理)
@Bean
// 读取 application.yml 中的配置参数映射成为一个对象
@ConfigurationProperties(prefix = "spring.datasource.user0")
public XADataSource getDataSource0() {
// 创建XA连接池
return new MysqlXADataSource();
}

/**
* 创建Atomikos数据源
* 注解@DependsOn("druidXADataSourcePre"),在名为druidXADataSourcePre的bean实例化后加载当前bean
*/
@Bean
@DependsOn("getDataSource0")
@Primary
public DataSource dataSourcePre(@Qualifier("getDataSource0") XADataSource xaDataSource) {
//这里的AtomikosDataSourceBean使用的是spring提供的
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSource(xaDataSource);
atomikosDataSourceBean.setMaxPoolSize(20);
return atomikosDataSourceBean;
}

@Bean
@ConfigurationProperties(prefix = "spring.datasource.user1")
public XADataSource getDataSource1() {
// 创建XA连接池
return new MysqlXADataSource();
}

@Bean
@DependsOn("getDataSource1")
public DataSource dataSourceSit(@Qualifier("getDataSource1") XADataSource xaDataSource) {
//这里的AtomikosDataSourceBean使用的是spring提供的
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSource(xaDataSource);
return atomikosDataSourceBean;
}
}

登录后复制

MyBatis扫描

@Configuration
@MapperScan(basePackages = {"com.tian.mapper.user0"}, sqlSessionTemplateRef = "preSqlSessionTemplate")
public class MybatisPreConfig {

@Autowired
@Qualifier("dataSourcePre")
private DataSource dataSource;

/**
* 创建 SqlSessionFactory
*/
@Bean
@Primary
public SqlSessionFactory preSqlSessionFactory() throws Exception{
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().
getResources("classpath*:com/tian/mapper/user0/*.xml"));
return bean.getObject();
}

/**
* 通过 SqlSessionFactory 来创建 SqlSessionTemplate
*/
@Bean
@Primary
public SqlSessionTemplate preSqlSessionTemplate(@Qualifier("preSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
// SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用
return new SqlSessionTemplate(sqlSessionFactory);
}
}

登录后复制

另外一个基本一样,就是扫描路径改成:

("classpath*:com/tian/mapper/user1/*.xml")

登录后复制

mapper.xml

id, user_name, age, gender

insert into t_user_0 (id, user_name,age, gender)
values (#{id,jdbcType=BIGINT}, #{userName,jdbcType=VARCHAR},#{age,jdbcType=INTEGER},#{gender,jdbcType=INTEGER})

登录后复制

另外一个基本完全一样,这里就贴出来了。

对应mapper接口 也非常简单,贴出一个:

public interface User0Mapper {

int insert(User0 record);
}

登录后复制

service

/**
* @author tianwc 公众号:java后端技术全栈、面试专栏
* @version 1.0.0
* @date 2023年05月11日 19:38
* 博客地址:博客地址
*

* 模拟三种场景:正常、制造异常、数据库异常
*/
@Service
public class UserServiceImpl implements UserService {

@Resource
private User0Mapper user0Mapper;
@Resource
private User1Mapper user1Mapper;
/**
* 正常逻辑 同时对两个数据库进行 插入数据
*/
@Transactional
@Override
public int transaction1() throws Exception {
User1 user1 = new User1();
user1.setUserName("22222");
user1.setAge(11);
user1.setGender(0);
user1Mapper.add(user1);
System.out.println("---------------------------");
// sit(数据源1)
User0 user0 = new User0();
user0.setUserName("111111");
user0.setAge(11);
user0.setGender(0);
user0Mapper.insert(user0);
return 1;
}
/**
* 正常逻辑 同时对两个数据库进行 插入数据
* 数据插入完后 出现异常
*/
@Transactional
@Override
public int transaction2() throws Exception {
User1 user1 = new User1();
user1.setUserName("22222");
user1.setAge(11);
user1.setGender(0);
user1Mapper.add(user1);
System.out.println("---------------------------");
// sit(数据源1)
User0 user0 = new User0();
user0.setUserName("111111");
user0.setAge(11);
user0.setGender(0);
user0Mapper.insert(user0);
//认为制造一个异常
int a=1/0;
return 1;
}

/**
* 第一个数据插入成功 第二个数据插入失败
*/
@Transactional
@Override
public int transaction3() throws Exception {
User1 user1 = new User1();
user1.setUserName("22222");
user1.setAge(11);
user1.setGender(0);
user1Mapper.add(user1);
System.out.println("---------------------------");
// sit(数据源1)
User0 user0 = new User0();
//故意搞长点,模拟插入失败 让前面的数据回滚 user0.setUserName("111110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001");
user0.setAge(11);
user0.setGender(0);
user0Mapper.insert(user0);
return 1;
}
}

登录后复制

controller

@RestController
@RequestMapping("/user")
public class UserController {

@Resource
private UserService userService;

@PostMapping("/test1")
public CommonResult test1() {
int i = 0;
try {
i = userService.transaction1();
return CommonResult.success(i);
} catch (Exception e) {
e.printStackTrace();
}
return CommonResult.success(i);
}

@PostMapping("/test2")
public CommonResult test2() {
int i = 0;
try {
i = userService.transaction2();
return CommonResult.success(i);
} catch (Exception e) {
e.printStackTrace();
}
return CommonResult.success(i);
}

@PostMapping("/test3")
public CommonResult test3() {
int i = 0;
try {
i = userService.transaction3();
return CommonResult.success(i);
} catch (Exception e) {
e.printStackTrace();
}
return CommonResult.success(i);
}
}

登录后复制

项目启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

/**
* @author tianwc 公众号:java后端技术全栈、面试专栏
* @version 1.0.0
* @date 2023年05月11日 19:38
* 博客地址:博客地址
*

* 项目启动类
*/
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
//@ComponentScan(basePackages = {"com.tian"})
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

登录后复制

测试

启动项目,分别测试下面三个:

http://localhost:9001/user/test1 结果:两个数据库中,表数据都新增一条

http://localhost:9001/user/test2 结果:抛出除数不能为Zero的异常,两个数据库都没有新增数据。

http://localhost:9001/user/test3 结果:抛出数据字段值太长异常,两个数据库都没有新增数据。

好了,到此我们已经实现了分布式事务。

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论