详解Spring多线程下如何保证事务的一致性

2023年 10月 8日 67.6k 0

环境:Spring5.3.23

1. 事务原理

首先,我们先来大概的了解下Spring事务的工作原理,核心技术是通过AOP实现,将获取的Connection对象绑定到当前线程上下文中(ThreadLocal)。

事务核心拦截器TransactionInterceptor对象,如下(以下只会列出核心代码):

public class TransactionInterceptor {
  public Object invoke(MethodInvocation invocation) {
    // 该方法调用为核心方法,该方法在父类中
    return invokeWithinTransaction(...) ;
  }
}

父类TransactionAspectSupport

public abstract class TransactionAspectSupport {
  protected Object invokeWithinTransaction(...) {
    // 1.1.创建事务对象
    TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
    try {
        // 调用下一个拦截器或者是目标方法
      retVal = invocation.proceedWithInvocation();
    }
    catch (Throwable ex) {
      // 1.2.回滚事务
      completeTransactionAfterThrowing(txInfo, ex);
      throw ex;
    } finally {
      // 重置ThreadLocal中的TransactionInfo对象
      cleanupTransactionInfo(txInfo);
    }
    // 1.3.提交或者回滚事务
    commitTransactionAfterReturning(txInfo);
    return retVal;
  }  
}

上面代码列出了主要的事务执行流程及动作,我们主要是关心数据库连接对象Connection在当前线程中是如何使用的。

创建事务对象

protected TransactionInfo createTransactionIfNecessary(
    @Nullable PlatformTransactionManager tm,
    @Nullable TransactionAttribute txAttr, 
    final String joinpointIdentification) {
  TransactionStatus status = null;
  if (txAttr != null) {
    if (tm != null) {
      // 创建事务状态对象
      status = tm.getTransaction(txAttr);
    }
  }
  // 将事务状态对象包装到TransactionInfo中,然后将这个对象绑定到当前线程中
  return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

创建事务状态对象

public abstract class AbstractPlatformTransactionManager {
  public final TransactionStatus getTransaction(...) {
    if (isExistingTransaction(transaction)) {
      // Existing transaction found -> check propagation behavior to find out how to behave.
      return handleExistingTransaction(def, transaction, debugEnabled);
    }


    // 如果超时时间 < -1则抛出异常
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
      throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }


    // 当前不存在事务,则抛出异常
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
      throw new IllegalTransactionStateException(
          "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // 其它的传播特性,开启事务功能
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      try {
        // 开始事务
        return startTransaction(def, transaction, debugEnabled, suspendedResources);
      }
    }
  }
}

开始事务

private TransactionStatus startTransaction(
    TransactionDefinition definition, 
    Object transaction,
    boolean debugEnabled, 
    @Nullable SuspendedResourcesHolder suspendedResources) {


  boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  DefaultTransactionStatus status = newTransactionStatus(
      definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  //     
  doBegin(transaction, definition);
  prepareSynchronization(status, definition);
  return status;
}

创建Connection对象,并绑定到当前线程

public class DataSourceTransactionManager {
  protected void doBegin(
      Object transaction, 
      TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;
    try {
      if (!txObject.hasConnectionHolder() ||
          txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
        // 获取数据库连接对象  
        Connection newCon = obtainDataSource().getConnection();
        txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
      }
      // 将连接对象绑定到当前的线程
      if (txObject.isNewConnectionHolder()) {
        TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
      }
    }
  }
}

到此,已经清楚了当开始一个新的事务时,Spring会将获取的Connection绑定到当前的Thread中。

当我们使用通过JdbcTemplate操作数据库时,如下:

public class JdbcTemplate {
  // 核心执行方法
  private  T execute(...) {
    // 获取数据库连接对象
    Connection con = DataSourceUtils.getConnection(obtainDataSource());
  }
}

DataSourceUtils

public abstract class DataSourceUtils {
  public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
    try {
      return doGetConnection(dataSource) ;
    }
  }
  public static Connection doGetConnection(DataSource dataSource) throws SQLException {
    // 通过TransactionSynchronizationManager从当前线程上下文中获取连接对象
    // 在上面我们也是通过这个对象将连接对象绑定到当前的Thread中
    ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
    if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
      conHolder.requested() ;
      if (!conHolder.hasConnection()) {
        conHolder.setConnection(fetchConnection(dataSource)) ;
      }
      return conHolder.getConnection() ;
    }
  }
}

原理相信你应该非常清楚了,每个线程都会绑定自己的Connection。那在多线程下每个线程都使用的是自己的Connection对象,所以要想保证事务的一致性,单靠传统的方式一个@Transaction是肯定无法解决的,接下来我们就来实现一个多线程下的事务一致性的处理。

2.多线程事务

多线程下要实现事务的一致性,我们需要借助JUC下的相关类来实现。

这里直接给出代码示例:

static class PersonService {
  @Resource
  private JdbcTemplate jdbcTemplate;
  @Resource
  private DataSource dataSource ; 


  @Transactional
  public void save() throws Exception {
    CountDownLatch cdl = new CountDownLatch(2) ;
    AtomicBoolean txRollback = new AtomicBoolean(false) ;
    CompletableFuture.runAsync(() -> {
      Person person = new Person();
      person.setAge(1);
      person.setName("张三");
      transactionTemplate.execute(status -> {
        int result = 0 ;
        try {
          result = jdbcTemplate.update("insert into t_person (age, name) values (?, ?)", person.getAge(), person.getName()) ;
          // TODO
          // System.out.println(1 / 0) ;
        } catch (Exception e) {
            // 当发生异常后将状态该为true
          txRollback.set(true) ;
        }
        try {
            // 计数减一
          cdl.countDown() ;
          // 继续等待其它线程结束
          cdl.await() ;
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
          // 如果回滚状态为true说明有线程发生了异常,需要事务回滚
        if (txRollback.get()) {
          // 标记当前事务回滚
          status.setRollbackOnly() ;
        }
        System.out.printf("%s Insert Operator Result: %d 次%n", Thread.currentThread().getName(), result);
        return result ;
      }) ;
      }) ;
    transactionTemplate.execute(status -> {
      Person person = new Person();
      person.setAge(2);
      person.setName("李四");
      int result = 0 ;
      try {
        result = jdbcTemplate.update("insert into t_person (age, name) values (?, ?)", person.getAge(), person.getName()) ;
        // TODO
        TimeUnit.SECONDS.sleep(3) ;
      } catch (Exception e) {
        txRollback.set(true) ; 
      }
      try {
        cdl.countDown() ;
        cdl.await() ;
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      if (txRollback.get()) {
        // 回滚
        status.setRollbackOnly() ;
      }
      System.out.printf("%s Insert Operator Result: %d 次%n", Thread.currentThread().getName(), result);
      return result ;
    }) ;
    cdl.await() ;
    System.err.println("Operator Complete...") ;
  }
}

以上就是借助JUC来实现多线程下的事务一致性问题。

其实如果你真的理解了事务的原理,其实这里还有更加简单的实现方式,大家可以先思考,咱们下期再说这种简单的实现方法。

完毕!!!

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论