Seata源码(四)全局锁GlobalLock

2023年 10月 13日 29.7k 0

Java极客  |  作者  /  铿然一叶
这是Java极客的第 93 篇原创文章

相关阅读:

萌新快速成长之路
JAVA编程思想(一)通过依赖注入增加扩展性
JAVA编程思想(二)如何面向接口编程
JAVA编程思想(三)去掉别扭的if,自注册策略模式优雅满足开闭原则
JAVA编程思想(四)Builder模式经典范式以及和工厂模式如何选?
Java编程思想(五)事件通知模式解耦过程
Java编程思想(六)事件通知模式解耦过程
Java编程思想(七)使用组合和继承的场景
JAVA基础(一)简单、透彻理解内部类和静态内部类
JAVA基础(二)内存优化-使用Java引用做缓存
JAVA基础(三)ClassLoader实现热加载
JAVA基础(四)枚举(enum)和常量定义,工厂类使用对比
JAVA基础(五)函数式接口-复用,解耦之利刃
HikariPool源码(二)设计思想借鉴
【极客源码】JetCache源码(一)开篇
【极客源码】JetCache源码(二)顶层视图
如何编写软件设计文档
Seata源码(一)初始化
Seata源码(二)事务基础对象
Seata源码(三)事务处理类结构和流程

1. 概述

1.1 作用

对某条数据进行更新操作,如果全局事务正在进行,当某个本地事务需要更新该数据时,需要使用@GlobalLock确保其不会对全局事务正在操作的数据进行修改。防止的本地事务对全局事务的数据脏写。如果和select for update组合使用,还可以起到防止脏读的效果。

1.2 场景

image.png

1.在微服务A中,业务方法A和业务方法C都会修改同一个表T。

2.在全局事务执行过程中,如果业务方法A修改了表T,但全局事务还没执行完,此时业务方法C修改了表T,就会影响业务方法A的回退,因为UNDO镜像和实际数据对不上了。

如果业务方法C也使用全局事务,因全局事务要和TC交互那么代价会很大,而GlobalLock仅使用本地事务,不需要和TC交互,且会判断是否和全局事务冲突,会等待全局事务执行完之后才执行。

2. 类结构

image.png

描述
GlobalLock 全局锁注解,控制业务方法是否启用全局锁
GlobalTransactionalInterceptor 全局事务拦截器,控制是走全局事务处理流程,还是全局锁处理流程
GlobalLockTemplate 全局锁执行模版,编排全局锁的大流程
GlobalLockExecutor 全员锁执行器,获取参数,调用业务方法,这个类的作用很弱,可要可不要
GlobalLockConfig 全局锁配置,参数来自GlobalLock注解
GlobalLockConfigHolder 全局锁Holder,缓存GlobalLockConfig到线程变量中
ConnectionProxy DB Connection代理,增加事务和全局锁处理逻辑
DefaultResourceManager 默认资源管理类,用于查询全局事务是否开启
LockRetryPolicy 全局锁重试策略
LockRetryController 全局锁重试控制器,根据重试次数,重试间隔参数进行重试

3. 源码

3.1 入口

GlobalTransactionalInterceptor.java

有GlobalLock注解则调用如下方法:

    Object handleGlobalLock(final MethodInvocation methodInvocation,
        final GlobalLock globalLockAnno) throws Throwable {
        return globalLockTemplate.execute(new GlobalLockExecutor() {
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }

            @Override
            public GlobalLockConfig getGlobalLockConfig() {
                GlobalLockConfig config = new GlobalLockConfig();
                config.setLockRetryInternal(globalLockAnno.lockRetryInternal());
                config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
                return config;
            }
        });
    }

3.2 全局锁控制流

GlobalLockTemplate.java

   public Object execute(GlobalLockExecutor executor) throws Throwable {
        // 判断全局锁是否存在,不存在则绑定全局锁标志
        boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
        if (!alreadyInGlobalLock) {
            RootContext.bindGlobalLockFlag();
        }

        // set my config to config holder so that it can be access in further execution
        // for example, LockRetryController can access it with config holder
        GlobalLockConfig myConfig = executor.getGlobalLockConfig();
        GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);

        try {
            return executor.execute();
        } finally {
            // only unbind when this is the root caller.
            // otherwise, the outer caller would lose global lock flag
            if (!alreadyInGlobalLock) {
                RootContext.unbindGlobalLockFlag();
            }

            // if previous config is not null, we need to set it back
            // so that the outer logic can still use their config
            if (previousConfig != null) {
                GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
            } else {
                GlobalLockConfigHolder.remove();
            }
        }
    }

3.3 全局锁执行者

在GlobalTransactionalInterceptor类中创建匿名GlobalLockExecutor类来执行业务方法。

GlobalLockExecutor这个类非常简单,也没有提供扩展能力,看起了可以不需要此类,直接在globalLockTemplate中调用业务方法则可。

GlobalTransactionalInterceptor.java

        return globalLockTemplate.execute(new GlobalLockExecutor() {
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }

            @Override
            public GlobalLockConfig getGlobalLockConfig() {
                GlobalLockConfig config = new GlobalLockConfig();
                config.setLockRetryInternal(globalLockAnno.lockRetryInternal());
                config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
                return config;
            }
        });

3.4 全局锁的使用

GlobalLockExecutor类调用了GlobalLock注解的业务方法,最终调用ConnectionProxy提交事务或回滚,在ConnectionProxy中使用全局锁。

3.4.1 根据是全局锁还是全局事务走不同分支:

ConnectionProxy.java

    private void doCommit() throws SQLException {
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }

    private void processLocalCommitWithGlobalLocks() throws SQLException {
        checkLock(context.buildLockKeys());
        try {
            targetConnection.commit();
        } catch (Throwable ex) {
            throw new SQLException(ex);
        }
        context.reset();
    }

3.4.2 检查全局锁

检查全局锁,AT模式的的全局事务会参与一起判断,获取不到锁则抛出锁冲突异常:

ConnectionProxy.java

    public void checkLock(String lockKeys) throws SQLException {
        if (StringUtils.isBlank(lockKeys)) {
            return;
        }
        // Just check lock without requiring lock by now.
        try {
            // 检查AT模式的全局事务是否存在
            boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT,
                getDataSourceProxy().getResourceId(), context.getXid(), lockKeys);
            if (!lockable) {
                throw new LockConflictException();
            }
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, lockKeys);
        }
    }

3.4.3 锁重试策略入口

全局锁检查抛出的异常在LockRetryPolicy的execute方法中被捕捉并重试:

ConnectionProxy.java

    public void commit() throws SQLException {
        try {
            // 锁重试策略,全局锁检查抛出的异常在execute方法中被捕捉并重试
            LOCK_RETRY_POLICY.execute(() -> {
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                rollback();
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }

3.4.4 锁重试策略

根据配置参数lock.retryPolicyBranchRollbackOnConflict决定是否重试。

LockRetryPolicy.java

       public  T execute(Callable callable) throws Exception {
            // 根据配置参数lock.retryPolicyBranchRollbackOnConflict决定是否重试。
            if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) {
                return callable.call();
            } else {
                // 走重试逻辑
                return doRetryOnLockConflict(callable);
            }
        }

        protected  T doRetryOnLockConflict(Callable callable) throws Exception {
            LockRetryController lockRetryController = new LockRetryController();
            while (true) {
                try {
                    return callable.call();
                } catch (LockConflictException lockConflict) {
                    onException(lockConflict);
                    lockRetryController.sleep(lockConflict);
                } catch (Exception e) {
                    onException(e);
                    throw e;
                }
            }
        }

3.4.5 锁重试控制器

根据重试次数和重试间隔决定是否重试:

参数 配置参数
lockRetryInternal lock.retryInterval
lockRetryTimes lock.retryTimes

LockRetryController.java

    public void sleep(Exception e) throws LockWaitTimeoutException {
        if (--lockRetryTimes < 0) {
            throw new LockWaitTimeoutException("Global lock wait timeout", e);
        }

        try {
            Thread.sleep(lockRetryInternal);
        } catch (InterruptedException ignore) {
        }
    }

end.

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论