并发编程:你真的了解FutureTask吗?
FutureTask是什么
public interface Runnable { public abstract void run(); } Student implements Runnable Thread xiaoming = new Thread(new Student()); xiaoming.start();
我们知道Runnable是一个接口,它用来承载的是具体的任务,交给Thread执行。
有这样一个场景:主线程执行一个任务(理解为调用一个方法)需要获取这个任务的返回值,然后再去执行另一个任务获取返回值,最后将两个返回值做其他处理,一般情况下耗时为两个任务的耗时和,如果想提高性能,可以用异步的方式,可以将任务一包装为Runnable,交给子线程执行,主线程执行第二个任务,这样就节省了一部分耗时,但是Runnable不支持返回值,所以行不通,而FutureTask就是解决这个问题的。
FutureTask的类图如下:
看图可以知道两个事:
- 实现了Runnable接口,所以FutureTask也是一个可以承载任务的任务类。
- 实现了Future接口,Future接口提供了任务操作和获取返回值等方法,利用Future接口可以获取任务执行状态,取消任务,获取任务返回值等。
所以说FutureTask就是一个实现了一套获取返回值机制的任务体,请注意它依然是任务体,自身不能执行任务,依然需要借助Thread执行。
FutureTask应用例子
Callable callableTask = () -> { // 一些耗时的计算 return 42; }; FutureTask futureTask = new FutureTask(callableTask); Thread thread = new Thread(futureTask); thread.start(); try { Integer result = futureTask.get(); // 阻塞直到任务完成并返回结果 System.out.println("任务结果:" + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
上面就是使用案例,看起来写法比较复杂,其实一旦理解就不复杂了,首先我们上面说了FutureTask实现了Runnable,所以它是任务体,任务体需要依赖Thread执行,所以这里FutureTask和Runnable的写法一样都是作为Thread的入参。
代码中可以看到还有一个Callable类,这个其实也是任务体,它的源码如下:
public interface Callable { V call() throws Exception; }
它是一个有返回值的任务体,但是它和Runnable不同,Runnable是和Thread绑定的官方任务体,而Callable只是为达到实现获取任务返回值而衍生出来的一个任务体,它主要作为FutureTask类中的一个成员属性,充当真正被执行的任务体。
FutureTask源码分析
先看下FutureTask主要的属性和构造方法:
public class FutureTask implements RunnableFuture { private volatile int state; //等待执行状态 初始状态 private static final int NEW = 0; //执行完毕状态,但是结果还没有填充到outcome private static final int COMPLETING = 1; //返回值填充outcome 正常结束任务状态 private static final int NORMAL = 2; //因异常而结束 private static final int EXCEPTIONAL = 3; //任务还未执行之前就调用了cancel(true)方法 private static final int CANCELLED = 4; //中断的中间状态 private static final int INTERRUPTING = 5; //中断的状态 private static final int INTERRUPTED = 6; private Callable callable; private Object outcome; private volatile Thread runner; private volatile WaitNode waiters; static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } protected void done() { } public FutureTask(Callable callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; }
- state是状态,没到一个阶段都是不同的状态。
- outcome是存放返回值的,当任务执行完返回的值会赋值给outcome
- callable是真正的任务体,run方法在运行的时候实际上是运行callable对象的call方法。
- waiters是线程阻塞等待结果的时候存放的队列,它的类型是WaitNode,WaitNode里面有一个next属性,可见它是一个单向链表结构。
- runner是当前正在执行任务的线程
- done方法在FutureTask里面没有具体实现,它是任务处理完的一个后置方法,是留给子类用的。
- 构造方法一是接收一个Callable对象赋值给成员变量callable。- 构造方法二是接收一个Runnable对象和一个result值,同样是赋值给callable变量。
构造方法一很容易理解,构造方法二则是接收一个Runnable任务体,但是Runnable没有返回值,所以再接收一个result值作为默认的返回值,也就是无论任务体的任务执行的结果是什么,都以result值作为固定的返回值。
public static Callable callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter(task, result); } static final class RunnableAdapter implements Callable { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
构造方法二中调用了Executors.callable()方法,这个方法就是将Runnable和result值封装为RunnableAdapter对象,RunnableAdapter实现了Callable,所以它也是一个Callable对象,所以可以赋值给成员变量callable.
接下来按照应用案例中的步骤跟一下源码,构造方法我们看了,接下来就是启动线程,启动线程就会调用任务体的run方法,我们从run方法开始跟进源码。
run方法源码分析
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
private void finishCompletion() { for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; q = next; } break; } } done(); callable = null; }
将所有的等待线程唤醒后,最后执行done方法,done方法在FutureTask没有被实现,所以也就相当于什么都不做,那么done存在的意义是什么呢?它存在的意义在于在任务处理的完毕后线程结束前做一些事情,是留给子类扩展用的。
再看futureTask.get()的源码分析
当前线程是等待任务结果的线程不同于上面执行任务的线程:
public V get() throws InterruptedException, ExecutionException { int s = state; if (s COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos