Java极客 | 作者 / 铿然一叶
这是Java极客的第 93 篇原创文章
相关阅读:
萌新快速成长之路
JAVA编程思想(一)通过依赖注入增加扩展性
JAVA编程思想(二)如何面向接口编程
JAVA编程思想(三)去掉别扭的if,自注册策略模式优雅满足开闭原则
JAVA编程思想(四)Builder模式经典范式以及和工厂模式如何选?
Java编程思想(五)事件通知模式解耦过程
Java编程思想(六)事件通知模式解耦过程
Java编程思想(七)使用组合和继承的场景
JAVA基础(一)简单、透彻理解内部类和静态内部类
JAVA基础(二)内存优化-使用Java引用做缓存
JAVA基础(三)ClassLoader实现热加载
JAVA基础(四)枚举(enum)和常量定义,工厂类使用对比
JAVA基础(五)函数式接口-复用,解耦之利刃
HikariPool源码(二)设计思想借鉴
【极客源码】JetCache源码(一)开篇
【极客源码】JetCache源码(二)顶层视图
如何编写软件设计文档
Seata源码(一)初始化
Seata源码(二)事务基础对象
Seata源码(三)事务处理类结构和流程
1. 概述
1.1 作用
对某条数据进行更新操作,如果全局事务正在进行,当某个本地事务需要更新该数据时,需要使用@GlobalLock确保其不会对全局事务正在操作的数据进行修改。防止的本地事务对全局事务的数据脏写。如果和select for update组合使用,还可以起到防止脏读的效果。
1.2 场景
1.在微服务A中,业务方法A和业务方法C都会修改同一个表T。
2.在全局事务执行过程中,如果业务方法A修改了表T,但全局事务还没执行完,此时业务方法C修改了表T,就会影响业务方法A的回退,因为UNDO镜像和实际数据对不上了。
如果业务方法C也使用全局事务,因全局事务要和TC交互那么代价会很大,而GlobalLock仅使用本地事务,不需要和TC交互,且会判断是否和全局事务冲突,会等待全局事务执行完之后才执行。
2. 类结构
类 | 描述 |
---|---|
GlobalLock | 全局锁注解,控制业务方法是否启用全局锁 |
GlobalTransactionalInterceptor | 全局事务拦截器,控制是走全局事务处理流程,还是全局锁处理流程 |
GlobalLockTemplate | 全局锁执行模版,编排全局锁的大流程 |
GlobalLockExecutor | 全员锁执行器,获取参数,调用业务方法,这个类的作用很弱,可要可不要 |
GlobalLockConfig | 全局锁配置,参数来自GlobalLock注解 |
GlobalLockConfigHolder | 全局锁Holder,缓存GlobalLockConfig到线程变量中 |
ConnectionProxy | DB Connection代理,增加事务和全局锁处理逻辑 |
DefaultResourceManager | 默认资源管理类,用于查询全局事务是否开启 |
LockRetryPolicy | 全局锁重试策略 |
LockRetryController | 全局锁重试控制器,根据重试次数,重试间隔参数进行重试 |
3. 源码
3.1 入口
GlobalTransactionalInterceptor.java
有GlobalLock注解则调用如下方法:
Object handleGlobalLock(final MethodInvocation methodInvocation,
final GlobalLock globalLockAnno) throws Throwable {
return globalLockTemplate.execute(new GlobalLockExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
@Override
public GlobalLockConfig getGlobalLockConfig() {
GlobalLockConfig config = new GlobalLockConfig();
config.setLockRetryInternal(globalLockAnno.lockRetryInternal());
config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
return config;
}
});
}
3.2 全局锁控制流
GlobalLockTemplate.java
public Object execute(GlobalLockExecutor executor) throws Throwable {
// 判断全局锁是否存在,不存在则绑定全局锁标志
boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
if (!alreadyInGlobalLock) {
RootContext.bindGlobalLockFlag();
}
// set my config to config holder so that it can be access in further execution
// for example, LockRetryController can access it with config holder
GlobalLockConfig myConfig = executor.getGlobalLockConfig();
GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
try {
return executor.execute();
} finally {
// only unbind when this is the root caller.
// otherwise, the outer caller would lose global lock flag
if (!alreadyInGlobalLock) {
RootContext.unbindGlobalLockFlag();
}
// if previous config is not null, we need to set it back
// so that the outer logic can still use their config
if (previousConfig != null) {
GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
} else {
GlobalLockConfigHolder.remove();
}
}
}
3.3 全局锁执行者
在GlobalTransactionalInterceptor类中创建匿名GlobalLockExecutor类来执行业务方法。
GlobalLockExecutor这个类非常简单,也没有提供扩展能力,看起了可以不需要此类,直接在globalLockTemplate中调用业务方法则可。
GlobalTransactionalInterceptor.java
return globalLockTemplate.execute(new GlobalLockExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
@Override
public GlobalLockConfig getGlobalLockConfig() {
GlobalLockConfig config = new GlobalLockConfig();
config.setLockRetryInternal(globalLockAnno.lockRetryInternal());
config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
return config;
}
});
3.4 全局锁的使用
GlobalLockExecutor类调用了GlobalLock注解的业务方法,最终调用ConnectionProxy提交事务或回滚,在ConnectionProxy中使用全局锁。
3.4.1 根据是全局锁还是全局事务走不同分支:
ConnectionProxy.java
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
private void processLocalCommitWithGlobalLocks() throws SQLException {
checkLock(context.buildLockKeys());
try {
targetConnection.commit();
} catch (Throwable ex) {
throw new SQLException(ex);
}
context.reset();
}
3.4.2 检查全局锁
检查全局锁,AT模式的的全局事务会参与一起判断,获取不到锁则抛出锁冲突异常:
ConnectionProxy.java
public void checkLock(String lockKeys) throws SQLException {
if (StringUtils.isBlank(lockKeys)) {
return;
}
// Just check lock without requiring lock by now.
try {
// 检查AT模式的全局事务是否存在
boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT,
getDataSourceProxy().getResourceId(), context.getXid(), lockKeys);
if (!lockable) {
throw new LockConflictException();
}
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, lockKeys);
}
}
3.4.3 锁重试策略入口
全局锁检查抛出的异常在LockRetryPolicy的execute方法中被捕捉并重试:
ConnectionProxy.java
public void commit() throws SQLException {
try {
// 锁重试策略,全局锁检查抛出的异常在execute方法中被捕捉并重试
LOCK_RETRY_POLICY.execute(() -> {
doCommit();
return null;
});
} catch (SQLException e) {
if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
rollback();
}
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}
3.4.4 锁重试策略
根据配置参数lock.retryPolicyBranchRollbackOnConflict决定是否重试。
LockRetryPolicy.java
public T execute(Callable callable) throws Exception {
// 根据配置参数lock.retryPolicyBranchRollbackOnConflict决定是否重试。
if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) {
return callable.call();
} else {
// 走重试逻辑
return doRetryOnLockConflict(callable);
}
}
protected T doRetryOnLockConflict(Callable callable) throws Exception {
LockRetryController lockRetryController = new LockRetryController();
while (true) {
try {
return callable.call();
} catch (LockConflictException lockConflict) {
onException(lockConflict);
lockRetryController.sleep(lockConflict);
} catch (Exception e) {
onException(e);
throw e;
}
}
}
3.4.5 锁重试控制器
根据重试次数和重试间隔决定是否重试:
参数 | 配置参数 |
---|---|
lockRetryInternal | lock.retryInterval |
lockRetryTimes | lock.retryTimes |
LockRetryController.java
public void sleep(Exception e) throws LockWaitTimeoutException {
if (--lockRetryTimes < 0) {
throw new LockWaitTimeoutException("Global lock wait timeout", e);
}
try {
Thread.sleep(lockRetryInternal);
} catch (InterruptedException ignore) {
}
}
end.