Future 接口
Future 接口 (FutureTask 实现类)定义了一个异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务是否完成等。
比如:主线程让一个子线程去执行任务,子线程可能比较耗时,启动了子线程开始执行任务后,
主线程就去做其他的事情去了,忙其他的事情或者先执行完,过可以回才会去获取子任务的执行结果或者变更任务状态。
Future 是 Java5 新加的一个接口,它提供了一种异步并行计算的功能。
如果住县城需要执行一个耗时的计算任务,我们就可以通过 futrue 把这个任务放异步线程中去执行。
如果住县城继续处理其他的任务或者先行结束,再通过 future 获取计算结果。
三种线程创建方式
Runnable 接口
Callable 接口
Future 接口和 FutureTask 接口
FutureTask 案例:
public class FutureTaskTest {
public static void main(String[] args) {
long start = System.currentTimeMillis();
ExecutorService threadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i {
Thread.sleep(1000);
System.out.println("task " + taskId + " over !");
return null;
});
threadPool.submit(futureTask);
}
threadPool.shutdown();
// 如果没有执行完就一直循环
while (!threadPool.isTerminated()) {
}
System.out.println("cost:" + (System.currentTimeMillis() - start));
}
}
输出信息:
get() 阻塞方法
一旦调用 get 方法,如果计算没有完成容易造成程序阻塞。
同步获取返回值
futureTask.get();
设置返回的超时时间
futureTask.get(5, TimeUnit.SECONDS);
isDone 轮询
轮询的方式会耗费更多的 cpu 资源,而且也不见得能及时得到计算结果。
如果想要一步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞。
FutureTask futureTask = new FutureTask(() -> {
Thread.sleep(5000);
System.out.println("task 1 over !");
return null;
});
Thread t = new Thread(futureTask, "t1");
t.start();
while (true) {
System.out.println("task running ....");
if (futureTask.isDone()) {
System.out.println("task over ....");
break;
}
}
结论
future 对于获取结果不是很友好,只能通过阻塞或值轮询的方式来得到任务的结果。
CompletableFuture
适用场景
对于简单的业务场景使用 Future 完成 OK
回调通知
- 对应 futrue 的完成时间,完成了可以告诉我,也就是我们的回调通知
- 通过轮询的方式去判断任务是否完成这样非常占 CPU 并且代码也不优雅
创建异步任务
- Futrue + 线程池的配合
依赖组合处理
- 多个异步任务计算组合起来,后一个异步任务的计算结果需要前一个异步任务的值
- 将两个或多个异步计算合成一个异步计算,这几个异步计算相互独立,同时这个又依赖前一个处理的结果。
计算速度最快
- 当 Futrue 集合中某一个任务最快结束时,返回结果,返回第一名处理结果。
背景介绍
get() 方法在 Future 计算完成之前会一直处于阻塞状态下,
idDone() 方法容易耗费 CPU 资源,
对于真正的异步处理我们希望可以通过传入回调函数,在 futrue 结束自动调用该回调函数,这样,我们就不用等待结果。
阻塞方式和异步编程的设计理念相违背,而轮询的方式会增加无谓的 CPU 资源,因此。
JDK8 设计出 CompletableFuture
CompletableFuture 提供了一种观察者类似的机制,可以让任务执行完成后通知监听一方。
CompletionStage
- CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成过后可能会触发另一个阶段。
- 一个阶段的计算执行可以是一个 Funcation , Consumer 或值 Runnable , 比如 stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(()-> System.out.println())
- 一个阶段的执行可能被单个阶段的完成触发,也可能是多个阶段一起触发
代表异步计算过程中的某一个阶段,一个阶段完成后可能会触发另外一个阶段,有些类似 Linux 系统的管道分隔符传参数。
四个静态方法
CompletableFuture提供了四个静态方法用来创建CompletableFuture对象:
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
- supplyAsync 与 runAsync 不同在与前者异步返回一个结果,后者是 void;
- supply 是一个函数值接口
参数说明:没有指定 Executor 的方法,直接使用默认的 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
如果指定线程池,则使用我们自定义的活着特别指定的线程池执行异步代码。
测试代码:
public class CompletableFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executors = Executors.newFixedThreadPool(10);
//测试1: 异步返回一个结果
/*CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
return "hello supplyAsync";
});
System.out.println(completableFuture.get());*/
//测试2 :返回 void
CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName());
}, executors);
System.out.println(completableFuture.get());
}
}
使用案例
从 java8 开始引入 CompletableFuture, 它是 Future 的功能增强版本,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
综合使用
public class CompletableFutureTest2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executors = Executors.newFixedThreadPool(10);
//异步返回一个结果
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + ": 任务启动。。。。");
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + ": 1s 后返回结果");
return "hello supplyAsync";
}, executors).whenComplete((v, e) -> {
if (e == null) {
System.out.println("--- 任务处理完成,执行返回结果 v=" + v);
}
}).exceptionally(ex -> {
System.out.println("出错了。。。。。");
return null;
});
System.out.println("主线程去忙别的任务 。。。。");
}
}
输出结果:
电商比价案例
package io.pipi.task;
import lombok.Getter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class CompletableFutureMall {
// 构造三家购物网站
static List list = Arrays.asList(
new NetMall("jd"),
new NetMall("taobao"),
new NetMall("dangdang")
);
/**
* 从每个网站依次获取商品价格
* ProductName:书名
*/
public static List getPrice(List list, String productName){
return list
.stream()
.map(netMall ->
String.format("%s in %s price is %.2f",
productName,
netMall.getNetMallName(),
netMall.calcPrice(productName)))
.collect(Collectors.toList());
}
public static List getPriceByCompletableFuture(List list, String productName){
return list.stream()
.map(netMall -> CompletableFuture.supplyAsync(()-> String.format("%s in %s price is %.2f",
productName,
netMall.getNetMallName(),
netMall.calcPrice(productName))))
.collect(Collectors.toList())
.stream()
.map(s -> s.join())
.collect(Collectors.toList());
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
List prices = getPrice(list, "mysql");
for (String price : prices) {
System.out.println(price);
}
long end = System.currentTimeMillis();
System.out.println(end - start);
start = System.currentTimeMillis();
prices = getPriceByCompletableFuture(list, "mysql");
for (String price : prices) {
System.out.println(price);
}
end = System.currentTimeMillis();
System.out.println(end - start);
}
}
// 模拟一个网站的价格
class NetMall{
@Getter
private final String netMallName;
public NetMall(String netMallName){
this.netMallName = netMallName;
}
public double calcPrice(String productName){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
}
}
输出结果:
CompletableFuture 常用方法
thenRun、thenAccept、thenApply方法区别
/**
*thenRun、thenAccept、thenApply方法区别
*/
@Test
public void test9() {
//thenRun的结果与上一步无关
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(()->{}).join());
//thenAccept 是消费型函数式接口
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(System.out::println).join());
//thenApply 串行处理前面的结果 有返回值
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(v-> v + " resultB").join());
}
thenAccept方法测试
该方法是消费型函数式接口,这里打印了一下结果
/**
*thenAccept方法测试 该方法是消费型函数式接口,这里打印了一下结果
*/
@Test
public void test8() {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(integer -> {
return integer +2;
}).thenApply(integer -> {
return integer + 3;
}).thenAccept(System.out::println);
}
handle方法测试
其计算是串行化的 即使当前步骤有异常也可以继续往下计算
/**
* handle方法测试 其计算是串行化的 即使当前步骤有异常也可以继续往下计算
*/
@Test
public void test7() throws ExecutionException, InterruptedException {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("第一步");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}).handle((integer, e) -> {
int i = 10/0;
System.out.println("第二步");
return integer +2;
}).handle((integer, e) -> {
System.out.println("第三步");
return integer + 3;
}).whenComplete((value, exception) -> {
if (exception == null){
System.out.println("计算结果:" + value);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});
System.out.println(completableFuture.get());
}
thenApply方法测试
其计算是串行化的 但是由于存在依赖关系,当前步骤有异常的话就终止计算了
/**
* thenApply方法测试 其计算是串行化的 但是由于存在依赖关系,当前步骤有异常的话就终止计算了
*/
@Test
public void test6() throws ExecutionException, InterruptedException {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("第一步");
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}).thenApply(integer -> {
System.out.println("第二步");
return integer +2;
}).thenApply(integer -> {
System.out.println("第三步");
return integer + 3;
}).whenComplete((value, exception) -> {
if (exception == null){
System.out.println("计算结果:" + value);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});
System.out.println(completableFuture.get());
}
complete(T value)方法测试
/**
* complete(T value)方法测试
* 如果尚未完成,则将get()和相关方法返回的值设置为给定值
*/
@Test
public void test5(){
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
// complete()返回true表示打断 false表示未打断
System.out.println(completableFuture.complete("completeValue") + " " + completableFuture.join());
}
getNow 、join 获取结果
/**
* getNow(T valueIfAbsent)方法测试 表示立即要结果 如果完成了则返回正确的结果,否则返回传入的结果 如"xxx"
* 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的valueIfAbsent。
*/
@Test
public void test4(){
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
System.out.println(completableFuture.getNow("xxx"));
}
/**
* join()方法测试 和get方法一样
*/
@Test
public void test3() {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
System.out.println(completableFuture.join());
}
/**
* get(long timeout, TimeUnit unit)方法测试 表示等待timeout秒 超时则报异常
*/
@Test
public void test2() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
System.out.println(completableFuture.get(1, TimeUnit.SECONDS));
}
/**
* get()方法测试 表示获取结果
*/
@Test
public void test1() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});
System.out.println(completableFuture.get());
}