前言
CompletableFuture继承于java.util.concurrent.Future,它本身具备Future的所有特性,并且基于JDK1.8的流式编程以及Lambda表达式等实现一元操作符、异步性以及事件驱动编程模型,可以用来实现多线程的串行关系,并行关系,聚合关系。它的灵活性和更强大的功能是Future无法比拟的。
获取返回结果
get():阻塞获取返回结果
get()方式是阻塞的获取返回结果,它会一直等到有返回结果为止,程序才会继续往下执行,所以建议放在程序最后执行
public static void main(String[] args) {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
return 1;
});
try {
completableFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
completableFuture.get(1,TimeUnit.SECONDS):阻塞限时获取返回结果
也是阻塞获取返回结果,但是在1秒后没有获取到返回结果就会抛出一个超时异常,工作中建议使用这个,避免get()无限期阻塞等待
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
return 1;
});
try {
completableFuture.get(1,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
completableFuture.getNow(1):立即返回
立即返回一个结果,如果程序完成了就返回,如果程序没有完成那么就返回一个默认值
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
return 1;
});
completableFuture.getNow(1);
completableFuture.join():阻塞获取返回结果
join()与get()一样,区别是join()不会抛出异常
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
return 1;
});
completableFuture.join();
completableFuture.complete()
complete()要与get()方法一起使用
complete是打断的意思,如果打断成功,那么就返回默认值44,如果打断失败就获取到真实的返回结果
当执行complete()方法时,我们异步方法已经执行完成了,那么就打断失败,可以获取到真实结果
如果这时候异步方法还在执行,那么直接打断,最后返回一个默认值
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
return 1;
});
completableFuture.complete(44);
completableFuture.get();
对计算结果进行处理
thenApply
有输入,也有输出,它会将上一步的输出作为这一步的输入,然后顺序的执行下去,直到最后返回结果,但是如果其中一步出现了异常,那么就直接执行最后的抛出异常步骤exceptionally,不会继续执行剩下的业务逻辑
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return 1;
}).thenApply(f -> {
System.out.println(Thread.currentThread().getName());
return f + 1;
}).thenApply(f -> {
System.out.println(Thread.currentThread().getName());
return f + 1;
}).whenComplete((r,e) -> {
System.out.println(Thread.currentThread().getName());
if (e == null) {
System.out.println(r);
}
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
handle
handle和thenApply一样,区别就是即使其中某一个步骤出现了异常,程序还是会执行剩下的步骤,还会将异常信息作为参数一起传递到下一个步骤中,e:就是异常信息
CompletableFuture.supplyAsync(() -> {
return 1;
}).handle((f,e) -> {
return f + 1;
}).handle((f,e) -> {
return f + 1;
}).whenComplete((f,e) -> {
if(e == null) {
System.out.println("result = " + f);
}
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
thenAccept
thenAccept与 thenApply和handle不同,它是没有返回结果的,是对计算结果直接进行了消费
CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(f -> {
return f + 1;
}).thenApply(f -> {
return f + 1;
}).thenAccept(f -> {
System.out.println("accept = " + f);
});
总结
- thenRun:任务A执行完继续执行任务B,并且任务B不需要任务A的返回结果
- thenApply:任务A执行完继续执行任务B,并且任务B需要任务A的返回结果,同时任务B有返回值
- thenAccept:任务A执行完继续执行任务B,并且任务B需要任务A的返回结果,但是任务B没有返回值
谁先快就返回谁的结果:applyToEither()
在一些游戏中,我们可以看到就是谁先完成了谁就赢了的游戏,其实这就是一个获取最快的返回结果的问题。applyToEither就是一个谁先快就返回谁的结果
我们让两个线程一个休眠5秒,一个休眠3秒,那么最后就会返回这个休眠3秒的结果
Integer result = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 5;
}).applyToEither(CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 3;
}), r -> {
return r;
}).join();
三个甚至多个的线程
CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
return 5;
}).applyToEither(CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
return 3;
}), r -> {
return r;
}).applyToEither(CompletableFuture.supplyAsync(() -> {
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
return 2;
}),r -> {
return r;
}).join();
对计算结果进行合并
就类似CountDownlatch一样,等所有线程都完成了,然后对所有结果进行合并计算,如果有线程先完成了,那么就只能等着
CompletableFuture.supplyAsync(() -> {
return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
return 20;
}),(r1,r2) -> {
return r1 + r2;
}).join();
三个甚至多个线程
CompletableFuture.supplyAsync(() -> {
return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
return 20;
}),(r1,r2) -> {
return r1 + r2;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
return 30;
}),(r1,r2) -> {
return r1 + r2;
}).join();