Java极客 | 作者 / 铿然一叶
这是Java极客的第 93 篇原创文章
相关阅读:
萌新快速成长之路
JAVA编程思想(一)通过依赖注入增加扩展性
JAVA编程思想(二)如何面向接口编程
JAVA编程思想(三)去掉别扭的if,自注册策略模式优雅满足开闭原则
JAVA编程思想(四)Builder模式经典范式以及和工厂模式如何选?
Java编程思想(五)事件通知模式解耦过程
Java编程思想(六)事件通知模式解耦过程
Java编程思想(七)使用组合和继承的场景
JAVA基础(一)简单、透彻理解内部类和静态内部类
JAVA基础(二)内存优化-使用Java引用做缓存
JAVA基础(三)ClassLoader实现热加载
JAVA基础(四)枚举(enum)和常量定义,工厂类使用对比
JAVA基础(五)函数式接口-复用,解耦之利刃
HikariPool源码(二)设计思想借鉴
【极客源码】JetCache源码(一)开篇
【极客源码】JetCache源码(二)顶层视图
如何编写软件设计文档
Seata源码(一)初始化
Seata源码(二)事务基础对象
1. 核心类结构
类 | 描述 |
---|---|
GlobalTransactionalInterceptor | 全局事务拦截器,拦截GlobalTransactional注解和GlobalLock注解的方法做相应处理 |
TransactionalTemplate | 事务模版,全局事务处理的控制流逻辑 |
TransactionalExecutor | 事务执行接口,定义执行方法,封装了执行异常 |
GlobalLockTemplate | 全局锁模版,全局锁处理的控制流逻辑 |
GlobalLockExecutor | 全局事务执行接口,定义执行方法 |
GlobalTransaction | 全局事务接口,定义事务处理的核心接口,例如begin、commit、rollback |
DefaultGlobalTransaction | 全局事务实现类 |
GlobalTransactionContext | 全局事务上下文,用户创建全局事务,获取全局事务 |
RootContext | 根上下文,用于绑定XID,全局锁,事务分支类型和获取这些信息 |
TransactionManager | 事务管理接口,DefaultGlobalTransaction调用它进行事务管理 |
DefaultTransactionManager | 事务管理默认实现类 |
TransactionManagerHolder | 单例模式,将TransactionManager缓存在线程变量中,不需要通过方法传入传递,而直接获取 |
2. 拦截器入口流程
GlobalTransactionalInterceptor类invoke方法入口流程:
2.1 入口方法
识别GlobalTransactional注解和GlobalLock注解调用不同方法:
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed();
}
2.2 未启用全局事务或满足降级条件判断
只要满足条件就不启用全局事务。
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
判断条件的参数配置:
参数 | 配置参数 |
---|---|
disable | service.disableGlobalTransaction |
degradeCheck | client.tm.degradeCheck |
degradeCheckAllowTimes | client.tm.degradeCheckAllowTimes |
2.3 degradeNum参数更新
其基本思路是启动一个定时任务模拟发起全局事务,根据返回结果是成功还是失败来更新degradeNum值(类似健康检查)。
启动一个定时任务模拟执行全局事务获取结果并发送事件通知:
private static void startDegradeCheck() {
executor.scheduleAtFixedRate(() -> {
if (degradeCheck) {
try {
String xid = TransactionManagerHolder.get().begin(null, null, "degradeCheck", 60000);
TransactionManagerHolder.get().commit(xid);
// 发送事件通知
EVENT_BUS.post(new DegradeCheckEvent(true));
} catch (Exception e) {
EVENT_BUS.post(new DegradeCheckEvent(false));
}
}
}, degradeCheckPeriod, degradeCheckPeriod, TimeUnit.MILLISECONDS);
}
将模拟全局事务的结果通知到如下方法修改降级次数:
// Subscribe注解的方法可以接收事件通知(方法参数类型和发送事件通知时的一致)
@Subscribe
public static void onDegradeCheck(DegradeCheckEvent event) {
if (event.isRequestSuccess()) {
if (degradeNum >= degradeCheckAllowTimes) {
reachNum++;
if (reachNum >= degradeCheckAllowTimes) {
reachNum = 0;
degradeNum = 0;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("the current global transaction has been restored");
}
}
} else if (degradeNum != 0) {
degradeNum = 0;
}
} else {
if (degradeNum = degradeCheckAllowTimes) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("the current global transaction has been automatically downgraded");
}
}
} else if (reachNum != 0) {
reachNum = 0;
}
}
}
3. 全局事务处理
3.1 事务模版
事务模版的主要逻辑有:
1.根据不同的事务传播级别做相应处理,不同事务传播级别处理如下:
传播级别 | 描述 |
---|---|
REQUIRED | 如果事务存在,则执行当前事务,否则开启新事务 |
REQUIRES_NEW | 如果事务存在,则挂起当前事务,开启一个新任务,类似事务嵌套 |
NOT_SUPPORTED | 如果事务存在,则挂起当前事务,待执行的业务方法不开启事务 |
SUPPORTS | 如果事务不存在,执行业务方法不开启事务,否则执行业务方法使用当前事务 |
NEVER | 如果事务存在,则抛出异常,否则待执行业务方法不开启事务 |
MANDATORY | 如果事务不存在,则抛出异常,否则使用当前事务执行业务方法 |
2.开启事务、执行业务方法、提交事务
3.业务异常处理
4.调用事务生命周期钩子方法
3.1.1 控制流
TransactionalTemplate.java
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. Get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 1.2 Handle the transaction propagation.
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
}
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
} finally {
//5. clear
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
3.1.2 事务开启
TransactionalTemplate.java
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
// 事务声明周期钩子方法调用
triggerBeforeBegin();
// 开启事务中使用Launcher角色
tx.begin(txInfo.getTimeOut(), txInfo.getName());
// 事务声明周期钩子方法调用
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
}
3.1.3 事务提交
TransactionalTemplate.java
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeCommit();
tx.commit();
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
}
3.1.4 异常事务回滚
异常入口:
TransactionalTemplate.java
try {
// Do Your Business
rs = business.execute();
// 捕捉根异常,所有异常都能捕捉到
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
异常处理控制逻辑:
TransactionalTemplate.java
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
//roll back txInfo.rollbackOn方法将抛出的异常和注解GlobalTransactional上配置的异常做比较
if (txInfo != null && txInfo.rollbackOn(originalException)) {
try {
rollbackTransaction(tx, originalException);
} catch (TransactionException txe) {
// Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, originalException);
}
} else {
// not roll back on this exception, so commit
commitTransaction(tx);
}
}
是否处理异常根据GlobalTransactional注解上配置rollback规则来决定,这些规则在GlobalTransactionalInterceptor中创建匿名TransactionalExecutor类时获取:
public TransactionInfo getTransactionInfo() {
// reset the value of timeout
int timeout = globalTrxAnno.timeoutMills();
if (timeout rbRule : globalTrxAnno.noRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
回滚规则判断,根据实际抛出的异常类型,和GlobalTransactional注解上配置的回滚规则中指定的异常类型比较:
TransactionInfo.java
public boolean rollbackOn(Throwable ex) {
RollbackRule winner = null;
int deepest = Integer.MAX_VALUE;
if (CollectionUtils.isNotEmpty(rollbackRules)) {
winner = NoRollbackRule.DEFAULT_NO_ROLLBACK_RULE;
for (RollbackRule rule : this.rollbackRules) {
int depth = rule.getDepth(ex);
if (depth >= 0 && depth < deepest) {
deepest = depth;
winner = rule;
}
}
}
return !(winner instanceof NoRollbackRule);
}
实际回滚方法:
TransactionalTemplate.java
private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
triggerBeforeRollback();
tx.rollback();
triggerAfterRollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
}
end.