并发编程 CompletableFuture

2023年 9月 12日 75.6k 0

Future 接口

Future 接口 (FutureTask 实现类)定义了一个异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务是否完成等。

比如:主线程让一个子线程去执行任务,子线程可能比较耗时,启动了子线程开始执行任务后,
主线程就去做其他的事情去了,忙其他的事情或者先执行完,过可以回才会去获取子任务的执行结果或者变更任务状态。

Future 是 Java5 新加的一个接口,它提供了一种异步并行计算的功能。
如果住县城需要执行一个耗时的计算任务,我们就可以通过 futrue 把这个任务放异步线程中去执行。
如果住县城继续处理其他的任务或者先行结束,再通过 future 获取计算结果。

三种线程创建方式
Runnable 接口
Callable 接口
Future 接口和 FutureTask 接口

FutureTask 案例:

public class FutureTaskTest {

    public static void main(String[] args) {
        long start = System.currentTimeMillis();

        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i  {
                Thread.sleep(1000);
                System.out.println("task " + taskId + " over !");
                return null;
            });
            threadPool.submit(futureTask);
        }
        threadPool.shutdown();
        // 如果没有执行完就一直循环
        while (!threadPool.isTerminated()) {
        }
        System.out.println("cost:" + (System.currentTimeMillis() - start));
    }
}

输出信息:
image.png

get() 阻塞方法

一旦调用 get 方法,如果计算没有完成容易造成程序阻塞。
同步获取返回值

futureTask.get();

设置返回的超时时间


futureTask.get(5, TimeUnit.SECONDS);

isDone 轮询

轮询的方式会耗费更多的 cpu 资源,而且也不见得能及时得到计算结果。
如果想要一步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞。

FutureTask futureTask = new FutureTask(() -> {
    Thread.sleep(5000);
    System.out.println("task 1 over !");
    return null;
});
Thread t = new Thread(futureTask, "t1");
t.start();
while (true) {
    System.out.println("task running ....");
    if (futureTask.isDone()) {
        System.out.println("task over ....");
        break;
    }
}

结论

future 对于获取结果不是很友好,只能通过阻塞或值轮询的方式来得到任务的结果。

CompletableFuture

适用场景

对于简单的业务场景使用 Future 完成 OK
回调通知

  • 对应 futrue 的完成时间,完成了可以告诉我,也就是我们的回调通知
  • 通过轮询的方式去判断任务是否完成这样非常占 CPU 并且代码也不优雅

创建异步任务

  • Futrue + 线程池的配合

依赖组合处理

  • 多个异步任务计算组合起来,后一个异步任务的计算结果需要前一个异步任务的值
  • 将两个或多个异步计算合成一个异步计算,这几个异步计算相互独立,同时这个又依赖前一个处理的结果。

计算速度最快

  • 当 Futrue 集合中某一个任务最快结束时,返回结果,返回第一名处理结果。

背景介绍

get() 方法在 Future 计算完成之前会一直处于阻塞状态下,
idDone() 方法容易耗费 CPU 资源,
对于真正的异步处理我们希望可以通过传入回调函数,在 futrue 结束自动调用该回调函数,这样,我们就不用等待结果。

阻塞方式和异步编程的设计理念相违背,而轮询的方式会增加无谓的 CPU 资源,因此。
JDK8 设计出 CompletableFuture

CompletableFuture 提供了一种观察者类似的机制,可以让任务执行完成后通知监听一方。
image.png

CompletionStage

  • CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成过后可能会触发另一个阶段。
  • 一个阶段的计算执行可以是一个 Funcation , Consumer 或值 Runnable , 比如 stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(()-> System.out.println())
  • 一个阶段的执行可能被单个阶段的完成触发,也可能是多个阶段一起触发

代表异步计算过程中的某一个阶段,一个阶段完成后可能会触发另外一个阶段,有些类似 Linux 系统的管道分隔符传参数。

四个静态方法

CompletableFuture提供了四个静态方法用来创建CompletableFuture对象:

public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)

public static  CompletableFuture supplyAsync(Supplier supplier)
public static  CompletableFuture supplyAsync(Supplier supplier, Executor executor)
  • supplyAsync 与 runAsync 不同在与前者异步返回一个结果,后者是 void;
  • supply 是一个函数值接口

参数说明:没有指定 Executor 的方法,直接使用默认的 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
如果指定线程池,则使用我们自定义的活着特别指定的线程池执行异步代码。

测试代码:

public class CompletableFutureTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService executors = Executors.newFixedThreadPool(10);

        //测试1: 异步返回一个结果
        /*CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName());
            return "hello supplyAsync";
        });
        System.out.println(completableFuture.get());*/

        //测试2 :返回 void
        CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName());
        }, executors);
        System.out.println(completableFuture.get());
    }
}

使用案例

从 java8 开始引入 CompletableFuture, 它是 Future 的功能增强版本,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
综合使用


public class CompletableFutureTest2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService executors = Executors.newFixedThreadPool(10);

        //异步返回一个结果
        CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + ": 任务启动。。。。");
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println(Thread.currentThread().getName() + ": 1s 后返回结果");
            return "hello supplyAsync";
        }, executors).whenComplete((v, e) -> {
            if (e == null) {
                System.out.println("--- 任务处理完成,执行返回结果 v=" + v);
            }
        }).exceptionally(ex -> {
            System.out.println("出错了。。。。。");
            return null;
        });
        System.out.println("主线程去忙别的任务 。。。。");
    }
}

输出结果:
image.png

电商比价案例

package io.pipi.task;

import lombok.Getter;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class CompletableFutureMall {

	// 构造三家购物网站
	static List list = Arrays.asList(
			new NetMall("jd"),
			new NetMall("taobao"),
			new NetMall("dangdang")
	);

	/**
	 * 从每个网站依次获取商品价格
	 * ProductName:书名
	 */
	public static List getPrice(List list, String productName){
		return list
				.stream()
				.map(netMall ->
						String.format("%s in %s price is %.2f",
								productName,
								netMall.getNetMallName(),
								netMall.calcPrice(productName)))
				.collect(Collectors.toList());

	}


	public static List getPriceByCompletableFuture(List list, String productName){
		return list.stream()
				.map(netMall -> CompletableFuture.supplyAsync(()-> String.format("%s in %s price is %.2f",
				productName,
				netMall.getNetMallName(),
				netMall.calcPrice(productName))))
				.collect(Collectors.toList())
				.stream()
				.map(s -> s.join())
				.collect(Collectors.toList());
	}


	public static void main(String[] args) {
		long start = System.currentTimeMillis();
		List prices = getPrice(list, "mysql");
		for (String price : prices) {
			System.out.println(price);
		}
		long end = System.currentTimeMillis();
		System.out.println(end - start);

		start = System.currentTimeMillis();
		prices = getPriceByCompletableFuture(list, "mysql");
		for (String price : prices) {
			System.out.println(price);
		}
		end = System.currentTimeMillis();
		System.out.println(end - start);
		
	}
}


// 模拟一个网站的价格
class NetMall{
	@Getter
	private final String netMallName;

	public NetMall(String netMallName){
		this.netMallName = netMallName;
	}

	public double calcPrice(String productName){
		try {
			TimeUnit.SECONDS.sleep(1);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
	}
}


输出结果:
image.png

CompletableFuture 常用方法

thenRun、thenAccept、thenApply方法区别

/**
 *thenRun、thenAccept、thenApply方法区别
 */
@Test
public void test9() {
    //thenRun的结果与上一步无关
    System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(()->{}).join());
    //thenAccept 是消费型函数式接口
    System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(System.out::println).join());
    //thenApply 串行处理前面的结果 有返回值
    System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(v-> v + " resultB").join());

}

thenAccept方法测试

该方法是消费型函数式接口,这里打印了一下结果

	/**
	 *thenAccept方法测试  该方法是消费型函数式接口,这里打印了一下结果
	 */
	@Test
	public void test8() {
		CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
			return 1;
		}).thenApply(integer -> {
			return integer +2;
		}).thenApply(integer -> {
			return integer + 3;
		}).thenAccept(System.out::println);

	}

handle方法测试

其计算是串行化的 即使当前步骤有异常也可以继续往下计算


	/**
	 * handle方法测试  其计算是串行化的  即使当前步骤有异常也可以继续往下计算
	 */
	@Test
	public void test7() throws ExecutionException, InterruptedException {
		CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
			try {
				TimeUnit.SECONDS.sleep(1);
				System.out.println("第一步");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			return 1;
		}).handle((integer, e) -> {
			int i = 10/0;
			System.out.println("第二步");
			return integer +2;
		}).handle((integer, e) -> {
			System.out.println("第三步");
			return integer + 3;
		}).whenComplete((value, exception) -> {
			if (exception == null){
				System.out.println("计算结果:" + value);
			}
		}).exceptionally(e -> {
			e.printStackTrace();
			System.out.println(e.getMessage());
			return null;
		});

		System.out.println(completableFuture.get());

	}


	

thenApply方法测试

其计算是串行化的 但是由于存在依赖关系,当前步骤有异常的话就终止计算了

/**
	 * thenApply方法测试  其计算是串行化的  但是由于存在依赖关系,当前步骤有异常的话就终止计算了
	 */
	@Test
	public void test6() throws ExecutionException, InterruptedException {
		CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
			try {
				TimeUnit.SECONDS.sleep(1);
				System.out.println("第一步");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			return 1;
		}).thenApply(integer -> {
			System.out.println("第二步");
			return integer +2;
		}).thenApply(integer -> {
			System.out.println("第三步");
			return integer + 3;
		}).whenComplete((value, exception) -> {
			if (exception == null){
				System.out.println("计算结果:" + value);
			}
		}).exceptionally(e -> {
			e.printStackTrace();
			System.out.println(e.getMessage());
			return null;
		});

		System.out.println(completableFuture.get());

	}



complete(T value)方法测试


	/**
	 * complete(T value)方法测试
	 * 如果尚未完成,则将get()和相关方法返回的值设置为给定值
	 */
	@Test
	public void test5(){
		CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
			try {
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

			return "abc";
		});
		// complete()返回true表示打断  false表示未打断
		System.out.println(completableFuture.complete("completeValue") + "   " + completableFuture.join());
	}


getNow 、join 获取结果


/**
 * getNow(T valueIfAbsent)方法测试  表示立即要结果  如果完成了则返回正确的结果,否则返回传入的结果 如"xxx"
 * 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的valueIfAbsent。
 */
@Test
public void test4(){
    CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "abc";
    });
    System.out.println(completableFuture.getNow("xxx"));
}


/**
 * join()方法测试  和get方法一样
 */
@Test
public void test3() {
    CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "abc";
    });
    System.out.println(completableFuture.join());
}

/**
 * get(long timeout, TimeUnit unit)方法测试  表示等待timeout秒 超时则报异常
 */
@Test
public void test2() throws ExecutionException, InterruptedException, TimeoutException {
    CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "abc";
    });
    System.out.println(completableFuture.get(1, TimeUnit.SECONDS));
}


/**
 * get()方法测试  表示获取结果
 */
@Test
public void test1() throws ExecutionException, InterruptedException, TimeoutException {
    CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "abc";
    });
    System.out.println(completableFuture.get());
}

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论