聊聊我对CompletableFuture的理解

2023年 10月 8日 41.0k 0

Java提供了许多工具来处理并发编程,而本文将重点介绍Java8中的CompletableFuture。在本文中,笔者通过查阅资料和实践经验,避免了重复已有优秀文章的内容和思路,而是用更简单明了的示例和语言来介绍CompletableFuture,并提供自己的思考。最后,本文还会附上其他一些优秀文章的链接,供读者更深入学习和理解。

1 .理解 Future

在处理一个任务时,通常会经历以下几个阶段:

  • 提交任务:将任务交给生产者线程来构造并提交。
  • 执行任务:由消费者线程来执行任务。
  • 任务完成的后置处理:由后置消费者线程来处理任务完成后的操作。
  • 根据任务的特性,会有各种不同的线程模型,其中包括 Future 模式。接下来我们先通过一个简单的例子快速理解 Future 模式。

    例1.1

         ExecutorService executor = Executors.newFixedThreadPool(3);
         Future future = executor.submit(new Callable() {
        
             @Override
             public String call() throws Exception {
                 //do some thing
                 Thread.sleep(100);
                 return "i am ok";
             }
         });
         println(future.isDone());
         println(future.get());
    

    在本例中,首先创建了一个线程池,并向线程池中提交了一个任务。使用submit方法提交任务后,会立即返回,并不会等到任务实际处理完成才返回。而任务提交后返回的值便是Future,我们可以通过future调用get()方法来阻塞式地获取返回结果,也可以使用isDone方法来判断任务是否完成。

    生产者线程在提交完任务后,有两个选择,可以关注或者不关注处理结果。处理结果包括任务的返回值以及任务是否正确完成和中途是否抛出异常等等。Future模式提供了一种机制,在消费者异步处理生产者提交的任务的同时,生产者线程也能够获取到消费者线程的处理结果,并且通过future也可以取消正在处理中的任务。在实际的开发中,我们经常会遇到类似的需求,即任务需要异步处理,同时还关心任务的处理结果。使用future是再合适不过的选择。

    1.2 future 如何被构建的

    future 是如何被创建的呢?当生产者线程将任务提交给消费者线程池时,线程池会创建一个实现了Future接口的FutureTask对象。这个对象充当着消费者和生产者之间的桥梁。消费者通过FutureTask对象来存储任务的处理结果,并更新任务的状态,包括未开始、正在处理和已完成等状态。而生产者拿到的FutureTask对象可以转型为Future接口,从而可以以阻塞或非阻塞的方式获取任务的处理结果和状态。如果想更详细地了解具体的实现机制,读者可以参考JDK中的FutureTask类。

    1.3 java之外的一些思考

    我一直将Future视为消费者线程和生产者线程之间的通信桥梁。在Java中,我们可以通过共享对象来实现线程间的通信,并提供各种工具来确保共享对象的线程安全性。Future是一个通过共享内存实现通信的典型例子。熟悉Go语言的读者可能会想到,协程之间的通信方式是通过通道。正如Go语言所倡导的,不要通过共享内存来通信,而应该通过通信来共享内存,Go语言通过通道实现了这一点,从而实现了数据共享。

    通过对Future的示例,我们了解到Future在任务生产者和消费者之间起到了桥梁作用。但是本文要解决的问题是任务处理三个阶段中的最后一个阶段,即任务完成后的后置处理。而本文介绍的重点是CompletableFuture提供的编程模型,它可以让我们优雅地处理后置结果。

    2. 任务结果的花式处理

    有两种不需要生产者关注的任务处理结果。第一种是处理结果对后续业务逻辑没有影响。另一种是任务在结束前会将自身的处理结果上报到其他结构中,如mq、db、redis等,这些结果会被其他协调者或调度者跟踪和监控,不再需要生产者关心。这种编程模型实现了解耦,但也增加了系统的复杂度,需要额外监控和管控任务状态和结果。在复杂场景和高吞吐量的分布式系统中广泛应用。

    然而,另一种更简单的系统设计要求生产者需要关注处理结果,并根据结果执行后续任务处理。为此,CompletableFuture应运而生。

    2.1 Future 的改造

    在第一节中,我们简单介绍了Future。然而,我们给出的例子都使用了阻塞式的get方法来获取结果。在实际开发中,我们并不希望生产者线程被阻塞,但我们又希望可以通过Future来处理结果。那么如何实现呢?在1.2中,我们讨论了Future是如何构造的,我们曾说过在消费者线程执行Task后,会将处理结果set到Future中。那么为什么我们不利用set方法,为我们提供一种后置处理的机制呢?我们的思路是在调用set方法后执行一系列后置处理,这些后置处理是生产者在提交Task时指定的。虽然执行后置处理的线程并不是生产者线程,但实际上处理逻辑是由生产者指定的。CompletableFuture基于这种机制为我们提供了很多后置处理的执行方式,同时还提供了很多整合多个Future的方法。使用CompletableFuture,我们可以灵活处理多个Task协同处理的问题。

    2.2 复杂任务的示例说明

    在某些业务场景中,执行任务并不仅仅是简单地执行一条SQL语句。有些长期任务需要拆分成多个小任务,其中一些小任务可以并行处理,而其他小任务则需要按照一定的顺序进行依赖。让我们假设有以下一个长期任务:

    任务A包括以下子任务:

  • 可并行处理的子任务1.1、子任务1.2、子任务1.3
  • 根据子任务1.1、1.2和1.3的结果,执行子任务2
  • 根据子任务2的结果,异步执行子任务3.1
  • 任务A是一个相对复杂的任务,需要将其拆分成多个子任务。同时,这些子任务中可能还会包含其他子任务。那么,我们应该如何确保这些任务之间的依赖关系,并且保证它们可以异步处理呢?

      例2.2      
              future1.thenCombine(future2, (args1, args2) -> {      ### Task 1
                  println(args1);
                  println(args2);
                  return "3";
              }).thenApply((res) -> {                               ### Task 2
                  println(res);
                  return "4";
              }).thenApplyAsync((res) -> {                          ### Task 3
                  println(res);
                  return "5";
              });
    

    在例2.2中,当future1和future2都完成时,执行了Combine动作,combine会生成新的Future。新的future完成后将执行thenApply,对合并产生的结果再次处理,最后再次对结果处理,而此次处理则是异步执行,即后置处理的线程和任务的消费者线程不是同一个线程。

    例2.2只是一个使用CompletableFuture的简单使用,CompletableFuture为我们提供了非常多的方法,笔者将其所有方法按照功能分类如下:

    • 对一个或多个Future合并操作,生成一个新的Future,例如allOf,anyOf,runAsync,supplyAsync。

    • 为Future添加后置处理动作,例如thenAccept,thenApply,thenRun。

    • 当两个或全部Future完成时,执行后置动作,例如applyToEither,acceptEither,thenAcceptBothAsync,runAfterBoth,runAfterEither等。

    • 当Future完成条件满足时,异步或同步执行后置处理动作,例如thenApplyAsync,thenRunAsync。所有异步后置处理都会添加Async后缀。

    • 定义Future的处理顺序,例如thenCompose协同存在依赖关系的Future,thenCombine。合并多个Future的处理结果返回新的处理结果。

    • 异常处理exceptionally,如果任务处理过程中抛出了异常。

      我们需要了解 CompletableFuture 能够提供一些方法,以组合新的 Future。这些方法可以按照条件依赖的顺序执行,也可以并行执行。此外,还有其他方法可以指定 Future 的完成条件,并在完成后执行一些后置处理操作。这些后置处理操作可以是 apply、accept、run 或者其他带有返回值的操作,用于生成新的处理结果。其中,accept 用于对处理结果进行消费,但不会产生新的处理结果;run 则更加简单,既没有上一次处理结果的输入,也没有返回处理结果。

    通过这三种类型的后置处理,我们可以构建一个链式处理的后置处理流。后置处理可以在消费者线程之外执行,也可以在一个独立的线程池中执行。这样一来,生产者、消费者和后置处理这三个阶段的操作都可以异步执行。

    CompletableFuture实现了Future和CompletableFuture接口。实现Future接口代表它可以充当"桥梁",既可以是生产者也可以是消费者。而CompletableFuture接口定义了各种组合条件、完成条件和后置处理类型等多种API。可以说,Future接口只描述了单个任务的处理方式,而CompletableFuture接口进一步从实际编程需求出发,满足了多个任务协同处理的场景需求,包括多个任务中任一完成和全部完成的情况。任务可以串行顺序执行,也可以并行执行。此外,CompletableFuture还创造性地提出了apply、accept、run三种后置处理器的类型,本质上后置处理仍然是链式顺序执行的。在众多子任务的场景需求中,CompletableFuture可以很好地胜任。

    由于CompletableFuture的API很多,笔者根据自己的理解进行了分类。如果读者想要更深入地理解,仍然需要动手实践。文末还提供了一份非常不错的相关API使用教程。

    具体 API 的使用还需要读者慢慢的摸索 www.importnew.com/28319.html

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论