Java极客 | 作者 / 铿然一叶
这是Java极客的第 98 篇原创文章
相关阅读:
萌新快速成长之路
JAVA编程思想(一)通过依赖注入增加扩展性
JAVA编程思想(二)如何面向接口编程
JAVA编程思想(三)去掉别扭的if,自注册策略模式优雅满足开闭原则
JAVA编程思想(四)Builder模式经典范式以及和工厂模式如何选?
Java编程思想(五)事件通知模式解耦过程
Java编程思想(六)事件通知模式解耦过程
Java编程思想(七)使用组合和继承的场景
JAVA基础(一)简单、透彻理解内部类和静态内部类
JAVA基础(二)内存优化-使用Java引用做缓存
JAVA基础(三)ClassLoader实现热加载
JAVA基础(四)枚举(enum)和常量定义,工厂类使用对比
JAVA基础(五)函数式接口-复用,解耦之利刃
HikariPool源码(二)设计思想借鉴
【极客源码】JetCache源码(一)开篇
【极客源码】JetCache源码(二)顶层视图
如何编写软件设计文档
Seata源码(一)初始化
Seata源码(二)事务基础对象
Seata源码(三)事务处理类结构和流程
Seata源码(四)全局锁GlobalLock
Seata源码(五)Seata数据库操作
Seata源码(六)Seata的undo日志操作
1. 概述
Seata提供了故障处理接口和默认实现类,在发生故障时可以处理。
2. 核心类结构
引入不同的库实例化FailureHandler子类的入口类不同:
groupId | artifactId | 入口类 |
---|---|---|
io.seata | seata-spring-boot-starter | SeataAutoConfiguration |
com.alibaba.cloud | spring-cloud-alibaba-seata | GlobalTransactionScanner |
3. 代码
3.1 故障处理类实例化
3.1.1 seata-spring-boot-starter库
SeataAutoConfiguration.java
由SeataAutoConfiguration实例化默认实现类DefaultFailureHandlerImpl后通过构造器传入GlobalTransactionScanner:
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
public class SeataAutoConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);
@Bean(BEAN_NAME_FAILURE_HANDLER)
@ConditionalOnMissingBean(FailureHandler.class)
public FailureHandler failureHandler() {
return new DefaultFailureHandlerImpl();
}
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
}
3.1.2 spring-cloud-alibaba-seata库
GlobalTransactionAutoConfiguration.java
创建GlobalTransactionScanner,但并没有实例化FailureHandler的子类,调用的构造器为return new GlobalTransactionScanner(applicationName, txServiceGroup):
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
String applicationName = this.applicationContext.getEnvironment().getProperty("spring.application.name");
String txServiceGroup = this.seataProperties.getTxServiceGroup();
if (StringUtils.isEmpty(txServiceGroup)) {
txServiceGroup = applicationName + "-fescar-service-group";
this.seataProperties.setTxServiceGroup(txServiceGroup);
}
return new GlobalTransactionScanner(applicationName, txServiceGroup);
}
在GlobalTransactionScanner类中也没有实例化FailureHandler的子类,传给GlobalTransactionalInterceptor构造器的是个空对象:
if (globalTransactionalInterceptor == null) {
// 这种场景下failureHandlerHook为null
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
GlobalTransactionalInterceptor.java
在此类中实例化了默认实现类DefaultFailureHandlerImpl:
// 常量定义默认实现类
private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();
// 构造器中判断没有实例化则使用默认实现类
public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
3.2 故障处理调用点
GlobalTransactionalInterceptor.java
在方法handleGlobalTransaction中对不同异常场景调用对应的接口:
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
succeed = false;
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
succeed = false;
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
3.3 默认实现类
DefaultFailureHandlerImpl.java
从实现来看,仅仅是实现了日志打印能力:
public class DefaultFailureHandlerImpl implements FailureHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultFailureHandlerImpl.class);
/**
* Retry 1 hours by default
*/
private static final int RETRY_MAX_TIMES = 6 * 60;
private static final long SCHEDULE_INTERVAL_SECONDS = 10;
private static final long TICK_DURATION = 1;
private static final int TICKS_PER_WHEEL = 8;
private HashedWheelTimer timer = new HashedWheelTimer(
new NamedThreadFactory("failedTransactionRetry", 1),
TICK_DURATION, TimeUnit.SECONDS, TICKS_PER_WHEEL);
@Override
public void onBeginFailure(GlobalTransaction tx, Throwable cause) {
LOGGER.warn("Failed to begin transaction. ", cause);
}
@Override
public void onCommitFailure(GlobalTransaction tx, Throwable cause) {
LOGGER.warn("Failed to commit transaction[" + tx.getXid() + "]", cause);
timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Committed), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
@Override
public void onRollbackFailure(GlobalTransaction tx, Throwable originalException) {
LOGGER.warn("Failed to rollback transaction[" + tx.getXid() + "]", originalException);
timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Rollbacked), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
@Override
public void onRollbackRetrying(GlobalTransaction tx, Throwable originalException) {
StackTraceLogger.warn(LOGGER, originalException, "Retrying to rollback transaction[{}]", new String[] {tx.getXid()});
timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.RollbackRetrying), SCHEDULE_INTERVAL_SECONDS,
TimeUnit.SECONDS);
}
protected class CheckTimerTask implements TimerTask {
private final GlobalTransaction tx;
private final GlobalStatus required;
private int count = 0;
private boolean isStopped = false;
protected CheckTimerTask(final GlobalTransaction tx, GlobalStatus required) {
this.tx = tx;
this.required = required;
}
@Override
public void run(Timeout timeout) throws Exception {
if (!isStopped) {
if (++count > RETRY_MAX_TIMES) {
LOGGER.error("transaction [{}] retry fetch status times exceed the limit [{} times]", tx.getXid(), RETRY_MAX_TIMES);
return;
}
isStopped = shouldStop(tx, required);
timer.newTimeout(this, SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
}
}
private boolean shouldStop(final GlobalTransaction tx, GlobalStatus required) {
try {
GlobalStatus status = tx.getStatus();
LOGGER.info("transaction [{}] current status is [{}]", tx.getXid(), status);
if (status == required || status == GlobalStatus.Finished) {
return true;
}
} catch (TransactionException e) {
LOGGER.error("fetch GlobalTransaction status error", e);
}
return false;
}
}
4. 扩展性
当使用spring-cloud-alibaba-seata库时,FailureHandler的子类实现是在GlobalTransactionalInterceptor类中使用默认的DefaultFailureHandlerImpl,没有办法扩展自定义实现。
private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();
当使用seata-spring-boot-starter库时,默认的FailureHandler按照如下方式创建:
SeataAutoConfiguration.java
@Bean(BEAN_NAME_FAILURE_HANDLER)
@ConditionalOnMissingBean(FailureHandler.class)
public FailureHandler failureHandler() {
return new DefaultFailureHandlerImpl();
}
此时可以动态创建自定义的FailureHandler实现类来重载默认实现类实现扩展。
end.