一、前情提要
在上一篇文章中,我们使用双异步后,从 191s 优化到 2s,有个小伙伴在评论区问我,如何保证插入后数据的一致性呢?
很简单,通过对比Excel文件行数和入库数量是否相等即可。
那么,如何获取异步线程的返回值呢?
二、通过Future获取异步返回值
我们可以通过给异步方法添加Future返回值的方式获取结果。
FutureTask 除了实现 Future 接口外,还实现了 Runnable 接口。因此,FutureTask 可以交给 Executor 执行,也可以由调用线程直接执行FutureTask.run()。
1、FutureTask 是基于 AbstractQueuedSynchronizer实现的
AbstractQueuedSynchronizer简称AQS,它是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及 维护被阻塞线程的队列。基于 AQS 实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch 和 FutureTask。
基于 AQS实现的同步器包含两种操作:
- acquire,阻塞调用线程,直到AQS的状态允许这个线程继续执行,在FutureTask中,get()就是这个方法;
- release,改变AQS的状态,使state变为非阻塞状态,在FutureTask中,可以通过run()和cancel()实现。
2、FutureTask执行流程
执行@Async异步方法。
建立新线程async-executor-X,执行Runnable的run()方法,(FutureTask实现RunnableFuture,RunnableFuture实现Runnable)。
判断状态state。
- 如果未新建或者不处于AQS,直接返回。
- 否则进入COMPLETING状态,执行异步线程代码。
如果执行cancel()方法改变AQS的状态时,会唤醒AQS等待队列中的第一个线程线程async-executor-1。
线程async-executor-1被唤醒后
- 将自己从AQS队列中移除。
- 然后唤醒next线程async-executor-2。
- 改变线程async-executor-1的state。
- 等待get()线程取值。
next等待线程被唤醒后,循环线程async-executor-1的步骤。
- 被唤醒。
- 从AQS队列中移除。
- 唤醒next线程。
- 改变异步线程状态。
新建线程async-executor-N,监听异步方法的state。
- 如果处于EXCEPTIONAL以上状态,抛出异常。
- 如果处于COMPLETING状态,加入AQS队列等待。
- 如果处于NORMAL状态,返回结果。
3、get()方法执行流程
get()方法通过判断状态state观测异步线程是否已结束,如果结束直接将结果返回,否则会将等待节点扔进等待队列自旋,阻塞住线程。
自旋直至异步线程执行完毕,获取另一边的线程计算出结果或取消后,将等待队列里的所有节点依次唤醒并移除队列。
如果state小于等于COMPLETING,表示任务还在执行中。
- 计算超时时间。
- 如果超时,则从等待队列中移除等待节点WaitNode,返回当前状态state。
- 阻塞队列nanos毫秒。
- 如果已有等待节点WaitNode,将线程置空。
- 返回当前状态。
- 如果线程被中断,从等待队列中移除等待节点WaitNode,抛出中断异常。
- 如果state大于COMPLETING。
- 如果任务正在执行,让出时间片。
- 如果还未构造等待节点,则new一个新的等待节点。
- 如果未入队列,CAS尝试入队。
- 如果有超时时间参数。
- 否则阻塞队列。
如果state大于COMPLETING。
- 如果执行完毕,返回结果。
- 如果大于等于取消状态,则抛出异常。
很多小朋友对读源码,嗤之以鼻,工作3年、5年,还是没认真读过任何源码,觉得读了也没啥用,或者读了也看不懂~
其实,只要把源码的执行流程通过画图的形式呈现出来,你就会幡然醒悟,原来是这样的~
简而言之:
- 如果异步线程还没执行完,则进入CAS自旋。
- 其它线程获取结果或取消后,重新唤醒CAS队列中等待的线程。
- 再通过get()判断状态state;。
- 直至返回结果或(取消、超时、异常)为止。
三、FutureTask源码具体分析
1、FutureTask源码
通过定义整形状态值,判断state大小,这个思想很有意思,值得学习。
public interface RunnableFuture extends Runnable, Future {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
public class FutureTask implements RunnableFuture {
// 最初始的状态是new 新建状态
private volatile int state;
private static final int NEW = 0; // 新建状态
private static final int COMPLETING = 1; // 完成中
private static final int NORMAL = 2; // 正常执行完
private static final int EXCEPTIONAL = 3; // 异常
private static final int CANCELLED = 4; // 取消
private static final int INTERRUPTING = 5; // 正在中断
private static final int INTERRUPTED = 6; // 已中断
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 任务还在执行中
if (s COMPLETING) {
// 如果已有等待节点WaitNode,将线程置空
if (q != null)
q.thread = null;
return s;
}
// 任务正在执行,让出时间片
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 还未构造等待节点,则new一个新的等待节点
else if (q == null)
q = new WaitNode();
// 未入队列,CAS尝试入队
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果有超时时间参数
else if (timed) {
// 计算超时时间
nanos = deadline - System.nanoTime();
// 如果超时,则从等待队列中移除等待节点WaitNode,返回当前状态state
if (nanos = CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
}
2、将异步方法的返回值改为Future,将返回值放到new AsyncResult();中
@Async("async-executor")
public void readXls(String filePath, String filename) {
try {
// 此代码为简化关键性代码
List futureList = new ArrayList();
for (int time = 0; time < times; time++) {
Future sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();
futureList.add(sumFuture);
}
}catch (Exception e){
logger.error("readXlsCacheAsync---插入数据异常:",e);
}
}
@Async("async-executor")
public Future readXlsCacheAsync() {
try {
// 此代码为简化关键性代码
return new AsyncResult(sum);
}catch (Exception e){
return new AsyncResult(0);
}
}
3、通过Future.get()获取返回值:
public static boolean getFutureResult(List futureList, int excelRow){
int[] futureSumArr = new int[futureList.size()];
for (int i = 0;i