Seata源码(七)Seata事务故障处理

2023年 10月 15日 43.3k 0

Java极客  |  作者  /  铿然一叶
这是Java极客的第 98 篇原创文章

相关阅读:

萌新快速成长之路
JAVA编程思想(一)通过依赖注入增加扩展性
JAVA编程思想(二)如何面向接口编程
JAVA编程思想(三)去掉别扭的if,自注册策略模式优雅满足开闭原则
JAVA编程思想(四)Builder模式经典范式以及和工厂模式如何选?
Java编程思想(五)事件通知模式解耦过程
Java编程思想(六)事件通知模式解耦过程
Java编程思想(七)使用组合和继承的场景
JAVA基础(一)简单、透彻理解内部类和静态内部类
JAVA基础(二)内存优化-使用Java引用做缓存
JAVA基础(三)ClassLoader实现热加载
JAVA基础(四)枚举(enum)和常量定义,工厂类使用对比
JAVA基础(五)函数式接口-复用,解耦之利刃
HikariPool源码(二)设计思想借鉴
【极客源码】JetCache源码(一)开篇
【极客源码】JetCache源码(二)顶层视图
如何编写软件设计文档
Seata源码(一)初始化
Seata源码(二)事务基础对象
Seata源码(三)事务处理类结构和流程
Seata源码(四)全局锁GlobalLock
Seata源码(五)Seata数据库操作
Seata源码(六)Seata的undo日志操作

1. 概述

Seata提供了故障处理接口和默认实现类,在发生故障时可以处理。

2. 核心类结构

image.png

引入不同的库实例化FailureHandler子类的入口类不同:

groupId artifactId 入口类
io.seata seata-spring-boot-starter SeataAutoConfiguration
com.alibaba.cloud spring-cloud-alibaba-seata GlobalTransactionScanner

3. 代码

3.1 故障处理类实例化

3.1.1 seata-spring-boot-starter库

SeataAutoConfiguration.java

由SeataAutoConfiguration实例化默认实现类DefaultFailureHandlerImpl后通过构造器传入GlobalTransactionScanner:

@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
public class SeataAutoConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);

    @Bean(BEAN_NAME_FAILURE_HANDLER)
    @ConditionalOnMissingBean(FailureHandler.class)
    public FailureHandler failureHandler() {
        return new DefaultFailureHandlerImpl();
    }

    @Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Automatically configure Seata");
        }
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }
}

3.1.2 spring-cloud-alibaba-seata库

GlobalTransactionAutoConfiguration.java

创建GlobalTransactionScanner,但并没有实例化FailureHandler的子类,调用的构造器为return new GlobalTransactionScanner(applicationName, txServiceGroup):

    @Bean
    public GlobalTransactionScanner globalTransactionScanner() {
        String applicationName = this.applicationContext.getEnvironment().getProperty("spring.application.name");
        String txServiceGroup = this.seataProperties.getTxServiceGroup();
        if (StringUtils.isEmpty(txServiceGroup)) {
            txServiceGroup = applicationName + "-fescar-service-group";
            this.seataProperties.setTxServiceGroup(txServiceGroup);
        }

        return new GlobalTransactionScanner(applicationName, txServiceGroup);
    }

在GlobalTransactionScanner类中也没有实例化FailureHandler的子类,传给GlobalTransactionalInterceptor构造器的是个空对象:

                    if (globalTransactionalInterceptor == null) {
                        // 这种场景下failureHandlerHook为null
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);

GlobalTransactionalInterceptor.java
在此类中实例化了默认实现类DefaultFailureHandlerImpl:

    // 常量定义默认实现类
    private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();

    // 构造器中判断没有实例化则使用默认实现类
    public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
        this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;

3.2 故障处理调用点

GlobalTransactionalInterceptor.java

在方法handleGlobalTransaction中对不同异常场景调用对应的接口:

        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    succeed = false;
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    succeed = false;
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                case RollbackRetrying:
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                default:
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
            }

3.3 默认实现类

DefaultFailureHandlerImpl.java

从实现来看,仅仅是实现了日志打印能力:

public class DefaultFailureHandlerImpl implements FailureHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultFailureHandlerImpl.class);

    /**
     * Retry 1 hours by default
     */
    private static final int RETRY_MAX_TIMES = 6 * 60;

    private static final long SCHEDULE_INTERVAL_SECONDS = 10;

    private static final long TICK_DURATION = 1;

    private static final int TICKS_PER_WHEEL = 8;

    private HashedWheelTimer timer = new HashedWheelTimer(
        new NamedThreadFactory("failedTransactionRetry", 1),
        TICK_DURATION, TimeUnit.SECONDS, TICKS_PER_WHEEL);

    @Override
    public void onBeginFailure(GlobalTransaction tx, Throwable cause) {
        LOGGER.warn("Failed to begin transaction. ", cause);
    }

    @Override
    public void onCommitFailure(GlobalTransaction tx, Throwable cause) {
        LOGGER.warn("Failed to commit transaction[" + tx.getXid() + "]", cause);
        timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Committed), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
    }

    @Override
    public void onRollbackFailure(GlobalTransaction tx, Throwable originalException) {
        LOGGER.warn("Failed to rollback transaction[" + tx.getXid() + "]", originalException);
        timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Rollbacked), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
    }

    @Override
    public void onRollbackRetrying(GlobalTransaction tx, Throwable originalException) {
        StackTraceLogger.warn(LOGGER, originalException, "Retrying to rollback transaction[{}]", new String[] {tx.getXid()});
        timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.RollbackRetrying), SCHEDULE_INTERVAL_SECONDS,
            TimeUnit.SECONDS);
    }

    protected class CheckTimerTask implements TimerTask {

        private final GlobalTransaction tx;

        private final GlobalStatus required;

        private int count = 0;

        private boolean isStopped = false;

        protected CheckTimerTask(final GlobalTransaction tx, GlobalStatus required) {
            this.tx = tx;
            this.required = required;
        }

        @Override
        public void run(Timeout timeout) throws Exception {
            if (!isStopped) {
                if (++count > RETRY_MAX_TIMES) {
                    LOGGER.error("transaction [{}] retry fetch status times exceed the limit [{} times]", tx.getXid(), RETRY_MAX_TIMES);
                    return;
                }
                isStopped = shouldStop(tx, required);
                timer.newTimeout(this, SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
            }
        }
    }

    private boolean shouldStop(final GlobalTransaction tx, GlobalStatus required) {
        try {
            GlobalStatus status = tx.getStatus();
            LOGGER.info("transaction [{}] current status is [{}]", tx.getXid(), status);
            if (status == required || status == GlobalStatus.Finished) {
                return true;
            }
        } catch (TransactionException e) {
            LOGGER.error("fetch GlobalTransaction status error", e);
        }
        return false;
    }
}

4. 扩展性

当使用spring-cloud-alibaba-seata库时,FailureHandler的子类实现是在GlobalTransactionalInterceptor类中使用默认的DefaultFailureHandlerImpl,没有办法扩展自定义实现。

    private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();

当使用seata-spring-boot-starter库时,默认的FailureHandler按照如下方式创建:
SeataAutoConfiguration.java

    @Bean(BEAN_NAME_FAILURE_HANDLER)
    @ConditionalOnMissingBean(FailureHandler.class)
    public FailureHandler failureHandler() {
        return new DefaultFailureHandlerImpl();
    }

此时可以动态创建自定义的FailureHandler实现类来重载默认实现类实现扩展。

end.

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论