【深度挖掘Java并发编程底层源码「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理

2023年 8月 16日 25.7k 0

FutureTask的基本介绍

FutureTask是Java中的一个类,它实现了Future接口和Runnable接口,并且被用作线程执行的任务。FutureTask可以在多线程环境下异步执行一个任务并获取其结果。

FutureTask的特点用法

  • 异步执行:通过将耗时的任务交给FutureTask,在一个单独的线程中执行,当前线程可以继续执行其他任务,不会被阻塞,从而提高程序的并发性。

  • 获取结果:FutureTask提供了get()方法,用于在任务完成后获取其执行结果。如果任务尚未完成,get()方法将阻塞当前线程,直到任务完成并返回结果。

  • 取消任务:可以使用cancel()方法取消任务的执行。如果任务尚未开始执行,则会取消任务的执行;如果任务已经开始执行,则根据传入的参数决定是否中断正在执行的任务。

  • 线程安全:FutureTask是线程安全的,可以在多个线程中共享使用。多个线程可以同时调用FutureTask的get()方法等待任务结果。

  • FutureTask常见的应用场景是在并发编程中,将计算密集型的任务交给FutureTask后台执行,并在需要时获取结果。这样可以充分利用系统资源,提高程序的性能和响应能力。

    FutureTask的执行流程

    在这里插入图片描述

    任务提交

    在之前的线程池分析中,我们介绍了AbstractExecutorService的实现。AbstractExecutorService是Java中的一个抽象类,它实现了ExecutorService接口,并提供了一些通用的逻辑和方法来简化线程池的实现。

    AbstractExecutorService的主要特点和用法

  • 线程池管理:AbstractExecutorService提供了默认的线程池管理逻辑。我们可以通过继承AbstractExecutorService并实现其中的抽象方法来定制自己的线程池。

  • 任务执行:通过调用submit()或invokeAll()等方法,可以将任务提交给AbstractExecutorService来执行。它会在适当的时候自动创建线程并执行任务。

  • 异常处理:AbstractExecutorService提供了一些默认的异常处理机制,比如可以捕获任务执行过程中抛出的异常,并进行相应的处理操作。我们也可以在继承AbstractExecutorService时自定义异常处理逻辑。

  • 生命周期管理:AbstractExecutorService定义了线程池的生命周期方法,如shutdown()和isShutdown()等,用于管理线程池的启动和关闭。

  • 对象模型转换传递流程

    在这里插入图片描述

    submit方法提交任务

    AbstractExecutorService主要用于实现ExecutorService接口中的submit()和invokeAll()等方法。它提供了一些默认的实现,使得我们可以更方便地创建和管理线程池,并执行任务。

    public Future submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    
    public  Future submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    
    public  Future submit(Callable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    

    RunnableFuture任务模型

    通过使用RunnableFuture,我们可以更加灵活地管理线程池,处理任务的提交和执行,并在需要时进行异常处理和线程池的关闭。最终进行构建一个FutureTask对象。

    protected  RunnableFuture newTaskFor(Runnable runnable, T value) {
        return new FutureTask(runnable, value);
    }
    
    protected  RunnableFuture newTaskFor(Callable callable) {
        return new FutureTask(callable);
    }
    

    FutureTask

    对于通过submit方法提交的任务,无论是Runnable还是Callable接口实现,都会被统一封装为FutureTask对象,并传递给execute方法。

    public FutureTask(Callable callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // 确保 callable 的可见性
    }
    
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;      // 确保可调用的可见性
    }
    

    注意,AbstractExecutorService是一个抽象类,不能直接实例化。我们可以继承AbstractExecutorService类并实现其中的抽象方法,或者使用Java提供的具体实现类如ThreadPoolExecutor来创建并使用线程池。

    RunnableAdapter适配器模型

    RunnableAdapter的作用是将一个Runnable对象包装成一个Callable对象。在适配器的call()方法中,会调用任务的run()方法来执行任务,并返回预先指定的结果。

     public static  Callable callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter(task, result);
     }
    

    通过使用RunnableAdapter,我们可以将Runnable任务在提交给ExecutorService时,以Callable的方式进行处理和执行。

    static final class RunnableAdapter implements Callable {
        final Runnable task;
        final T result;
        
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        
        public T call() {
            task.run();
            return result;
        }
    }
    

    该适配器的设计使得我们可以更加方便地将Runnable任务与Callable任务一同处理,充分利用ExecutorService的统一接口,并且无需对Runnable任务进行额外的修改。

    任务状态

    FutureTask的状态定义,说明了FutureTask对象可能处于的不同状态。初始状态为NEW,后续可能会演化为COMPLETING、NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTING、INTERRUPTED等状态。

    private static final int NEW = 0;
    private static final int COMPLETING = 1;
    private static final int NORMAL = 2;
    private static final int EXCEPTIONAL = 3;
    private static final int CANCELLED = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED = 6;
    

    在这里插入图片描述
    这些状态描述了FutureTask在执行过程中的不同情况,包括正常完成、遇到异常、被取消、被中断等。了解FutureTask的状态有助于我们理解和处理FutureTask对象在多线程环境下的行为。

    任务执行

    首先,方法会检查FutureTask的状态是否为NEW,并尝试使用compareAndSwapObject方法将执行线程设置为当前线程。然后,方法将执行并处理相关任务,并根据任务是否成功完成设置相应的结果。最后,清除执行线程(runner)并重新获取FutureTask的状态,根据状态处理可能的取消操作。

    public void run() {
        if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) {
            return;
        }
        try {
            Callable c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran) {
                    set(result);
                }
            }
        } finally {
            // 让runner非空直到state被稳定设置,以防止并发调用run()
            runner = null;
            // 在将runner设置为空之后必须重新读取state以防止泄漏中断
            int s = state;
            if (s >= INTERRUPTING) {
                handlePossibleCancellationInterrupt(s);
            }
        }
    }
    

    在这里插入图片描述

    设置线程任务数据结果

    对于Callable对象,FutureTask会直接执行它的call方法。如果call方法执行成功,FutureTask会调用set方法来设置执行结果;如果遇到异常,FutureTask会调用setException方法来设置异常。

    protected void set(V v) {
        // 首先使用CAS(比较并交换)将状态设置为中间状态COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            // 将状态设置为正常状态
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终状态
            finishCompletion();
        }
    }
    
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            // 将状态设置为异常状态
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 最终状态
            finishCompletion();
        }
    }
    

    下面是对于Callable直接执行其call方法的处理过程。
    在这里插入图片描述

  • 通过CAS操作将FutureTask的状态设置为中间状态COMPLETING。
  • 根据执行的结果(成功或异常),将结果或异常存储在outcome变量中,并将状态设置为相应的最终状态(正常状态或异常状态)。
  • 完成执行并处理相关的完成操作。
  • 获取线程任务数据结果

    下面的这两个方法都是用来对全局变量outcome进行赋值的。而当我们通过get方法在另一个线程中获取结果时,将会执行以下过程:

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s  COMPLETING) {
                // WaitNode已经创建,当前可以设置为null了
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // 不能超时
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                    q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos = CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable) x);
    }
    

    如果状态s等于NORMAL,表示任务正常完成,将会返回实际的结果。如果状态s大于等于CANCELLED,则抛出CancellationException异常。否则,抛出ExecutionException异常。通过这样的设计,无论是任务正常完成还是抛出异常,都能够在线程池中执行的任务中被感知到。

    任务取消

    通过调用cancel方法可以取消FutureTask的执行,并在任务完成后通过finishCompletion方法来唤醒等待的线程。被唤醒的线程在awaitDone方法中继续循环,检查任务的状态以获取结果。

    public boolean cancel(boolean mayInterruptIfRunning) {
        // 如果任务不是刚创建或者是刚创建但是更改为指定状态失败则返回 false
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
    
  • FutureTask类中有一个cancel方法,接受一个布尔类型参数mayInterruptIfRunning,用于表示在执行中的任务是否可以中断。
  • cancel方法首先检查任务的状态,如果状态不是NEW或者尝试将状态更改为INTERRUPTING或CANCELLED失败,则返回false。
  • 如果允许中断正在运行的任务(mayInterruptIfRunning为true),则调用Thread的interrupt方法来中断任务的线程,并将任务的状态设置为INTERRUPTED。
  • 然后,调用finishCompletion方法来完成任务的完成操作。
  • 在finishCompletion方法中,首先尝试将waiters设置为null,以便其他线程无法再加入等待队列。然后循环遍历等待队列,唤醒等待的线程,并将等待队列中节点的thread属性设置为null。
  • 在唤醒线程后,被唤醒的线程会在awaitDone方法中继续循环,检查任务的状态。
  • 如果任务的状态大于COMPLETING,则表示任务已完成或抛出异常,直接返回状态。
  • 总结说明

    • FutureTask是一个用于异步执行任务并获取结果的Java类。它提供了获取结果、取消任务等功能,帮助我们更好地处理多线程编程中的任务执行和结果获取。

    • AbstractExecutorService是Java中用于简化线程池实现的一个抽象类。通过继承并实现其中的方法,我们可以更方便地管理线程池的创建、任务的提交和执行。它提供了默认的异常处理和生命周期管理,并可以根据需要进行定制和扩展。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论