Seata源码(三)事务处理类结构和流程

2023年 10月 13日 18.2k 0

Java极客  |  作者  /  铿然一叶
这是Java极客的第 93 篇原创文章

相关阅读:

萌新快速成长之路
JAVA编程思想(一)通过依赖注入增加扩展性
JAVA编程思想(二)如何面向接口编程
JAVA编程思想(三)去掉别扭的if,自注册策略模式优雅满足开闭原则
JAVA编程思想(四)Builder模式经典范式以及和工厂模式如何选?
Java编程思想(五)事件通知模式解耦过程
Java编程思想(六)事件通知模式解耦过程
Java编程思想(七)使用组合和继承的场景
JAVA基础(一)简单、透彻理解内部类和静态内部类
JAVA基础(二)内存优化-使用Java引用做缓存
JAVA基础(三)ClassLoader实现热加载
JAVA基础(四)枚举(enum)和常量定义,工厂类使用对比
JAVA基础(五)函数式接口-复用,解耦之利刃
HikariPool源码(二)设计思想借鉴
【极客源码】JetCache源码(一)开篇
【极客源码】JetCache源码(二)顶层视图
如何编写软件设计文档
Seata源码(一)初始化
Seata源码(二)事务基础对象

1. 核心类结构

image.png

描述
GlobalTransactionalInterceptor 全局事务拦截器,拦截GlobalTransactional注解和GlobalLock注解的方法做相应处理
TransactionalTemplate 事务模版,全局事务处理的控制流逻辑
TransactionalExecutor 事务执行接口,定义执行方法,封装了执行异常
GlobalLockTemplate 全局锁模版,全局锁处理的控制流逻辑
GlobalLockExecutor 全局事务执行接口,定义执行方法
GlobalTransaction 全局事务接口,定义事务处理的核心接口,例如begin、commit、rollback
DefaultGlobalTransaction 全局事务实现类
GlobalTransactionContext 全局事务上下文,用户创建全局事务,获取全局事务
RootContext 根上下文,用于绑定XID,全局锁,事务分支类型和获取这些信息
TransactionManager 事务管理接口,DefaultGlobalTransaction调用它进行事务管理
DefaultTransactionManager 事务管理默认实现类
TransactionManagerHolder 单例模式,将TransactionManager缓存在线程变量中,不需要通过方法传入传递,而直接获取

2. 拦截器入口流程

GlobalTransactionalInterceptor类invoke方法入口流程:

image.png

2.1 入口方法

识别GlobalTransactional注解和GlobalLock注解调用不同方法:

    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class targetClass =
            methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
            if (!localDisable) {
                if (globalTransactionalAnnotation != null) {
                    return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
                } else if (globalLockAnnotation != null) {
                    return handleGlobalLock(methodInvocation, globalLockAnnotation);
                }
            }
        }
        return methodInvocation.proceed();
    }

2.2 未启用全局事务或满足降级条件判断

只要满足条件就不启用全局事务。

boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);

判断条件的参数配置:

参数 配置参数
disable service.disableGlobalTransaction
degradeCheck client.tm.degradeCheck
degradeCheckAllowTimes client.tm.degradeCheckAllowTimes

2.3 degradeNum参数更新

其基本思路是启动一个定时任务模拟发起全局事务,根据返回结果是成功还是失败来更新degradeNum值(类似健康检查)。

启动一个定时任务模拟执行全局事务获取结果并发送事件通知:

    private static void startDegradeCheck() {
        executor.scheduleAtFixedRate(() -> {
            if (degradeCheck) {
                try {
                    String xid = TransactionManagerHolder.get().begin(null, null, "degradeCheck", 60000);
                    TransactionManagerHolder.get().commit(xid);
                    // 发送事件通知
                    EVENT_BUS.post(new DegradeCheckEvent(true));
                } catch (Exception e) {
                    EVENT_BUS.post(new DegradeCheckEvent(false));
                }
            }
        }, degradeCheckPeriod, degradeCheckPeriod, TimeUnit.MILLISECONDS);
    }

将模拟全局事务的结果通知到如下方法修改降级次数:

    // Subscribe注解的方法可以接收事件通知(方法参数类型和发送事件通知时的一致)
    @Subscribe
    public static void onDegradeCheck(DegradeCheckEvent event) {
        if (event.isRequestSuccess()) {
            if (degradeNum >= degradeCheckAllowTimes) {
                reachNum++;
                if (reachNum >= degradeCheckAllowTimes) {
                    reachNum = 0;
                    degradeNum = 0;
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("the current global transaction has been restored");
                    }
                }
            } else if (degradeNum != 0) {
                degradeNum = 0;
            }
        } else {
            if (degradeNum = degradeCheckAllowTimes) {
                    if (LOGGER.isWarnEnabled()) {
                        LOGGER.warn("the current global transaction has been automatically downgraded");
                    }
                }
            } else if (reachNum != 0) {
                reachNum = 0;
            }
        }
    }

3. 全局事务处理

3.1 事务模版

事务模版的主要逻辑有:

1.根据不同的事务传播级别做相应处理,不同事务传播级别处理如下:

传播级别 描述
REQUIRED 如果事务存在,则执行当前事务,否则开启新事务
REQUIRES_NEW 如果事务存在,则挂起当前事务,开启一个新任务,类似事务嵌套
NOT_SUPPORTED 如果事务存在,则挂起当前事务,待执行的业务方法不开启事务
SUPPORTS 如果事务不存在,执行业务方法不开启事务,否则执行业务方法使用当前事务
NEVER 如果事务存在,则抛出异常,否则待执行业务方法不开启事务
MANDATORY 如果事务不存在,则抛出异常,否则使用当前事务执行业务方法

2.开启事务、执行业务方法、提交事务

3.业务异常处理

4.调用事务生命周期钩子方法

3.1.1 控制流

TransactionalTemplate.java

    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. Get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();

        // 1.2 Handle the transaction propagation.
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            switch (propagation) {
                case NOT_SUPPORTED:
                    // If transaction is existing, suspend it.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                    }
                    // Execute without transaction and return.
                    return business.execute();
                case REQUIRES_NEW:
                    // If transaction is existing, suspend it, and then begin new transaction.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    }
                    // Continue and execute with new transaction
                    break;
                case SUPPORTS:
                    // If transaction is not existing, execute without transaction.
                    if (notExistingTransaction(tx)) {
                        return business.execute();
                    }
                    // Continue and execute with new transaction
                    break;
                case REQUIRED:
                    // If current transaction is existing, execute with current transaction,
                    // else continue and execute with new transaction.
                    break;
                case NEVER:
                    // If transaction is existing, throw exception.
                    if (existingTransaction(tx)) {
                        throw new TransactionException(
                            String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                    , tx.getXid()));
                    } else {
                        // Execute without transaction and return.
                        return business.execute();
                    }
                case MANDATORY:
                    // If transaction is not existing, throw exception.
                    if (notExistingTransaction(tx)) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    // Continue and execute with current transaction.
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }

            // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }

            // set current tx config to holder
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
                // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
                //    else do nothing. Of course, the hooks will still be triggered.
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    // Do Your Business
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. The needed business exception to rollback.
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                // 4. everything is fine, commit.
                commitTransaction(tx);

                return rs;
            } finally {
                //5. clear
                resumeGlobalLockConfig(previousConfig);
                triggerAfterCompletion();
                cleanUp();
            }
        } finally {
            // If the transaction is suspended, resume it.
            if (suspendedResourcesHolder != null) {
                tx.resume(suspendedResourcesHolder);
            }
        }
    }

3.1.2 事务开启

TransactionalTemplate.java

    private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            // 事务声明周期钩子方法调用
            triggerBeforeBegin();
            // 开启事务中使用Launcher角色
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            // 事务声明周期钩子方法调用
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }
    }

3.1.3 事务提交

TransactionalTemplate.java

    private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            triggerBeforeCommit();
            tx.commit();
            triggerAfterCommit();
        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.CommitFailure);
        }
    }

3.1.4 异常事务回滚

异常入口:
TransactionalTemplate.java

                try {
                    // Do Your Business
                    rs = business.execute();
                    // 捕捉根异常,所有异常都能捕捉到
                } catch (Throwable ex) {
                    // 3. The needed business exception to rollback.
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

异常处理控制逻辑:
TransactionalTemplate.java

    private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
        //roll back  txInfo.rollbackOn方法将抛出的异常和注解GlobalTransactional上配置的异常做比较
        if (txInfo != null && txInfo.rollbackOn(originalException)) {
            try {
                rollbackTransaction(tx, originalException);
            } catch (TransactionException txe) {
                // Failed to rollback
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                        TransactionalExecutor.Code.RollbackFailure, originalException);
            }
        } else {
            // not roll back on this exception, so commit
            commitTransaction(tx);
        }
    }

是否处理异常根据GlobalTransactional注解上配置rollback规则来决定,这些规则在GlobalTransactionalInterceptor中创建匿名TransactionalExecutor类时获取:

                public TransactionInfo getTransactionInfo() {
                    // reset the value of timeout
                    int timeout = globalTrxAnno.timeoutMills();
                    if (timeout  rbRule : globalTrxAnno.noRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });

回滚规则判断,根据实际抛出的异常类型,和GlobalTransactional注解上配置的回滚规则中指定的异常类型比较:
TransactionInfo.java

    public boolean rollbackOn(Throwable ex) {

        RollbackRule winner = null;
        int deepest = Integer.MAX_VALUE;

        if (CollectionUtils.isNotEmpty(rollbackRules)) {
            winner = NoRollbackRule.DEFAULT_NO_ROLLBACK_RULE;
            for (RollbackRule rule : this.rollbackRules) {
                int depth = rule.getDepth(ex);
                if (depth >= 0 && depth < deepest) {
                    deepest = depth;
                    winner = rule;
                }
            }
        }

        return !(winner instanceof NoRollbackRule);
    }

实际回滚方法:
TransactionalTemplate.java

    private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
        triggerBeforeRollback();
        tx.rollback();
        triggerAfterRollback();
        // 3.1 Successfully rolled back
        throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
            ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
    }

end.

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论