前言
CompletableFuture是jdk8提供的新属性,用于提供异步逻辑,提高代码执行效率,里面提供了丰富的方法可以使用
CompletableFuture注意
CompletableFuture是一个守护线程来着,也就是说,如果main方法执行结束,它也会跟着结束,例如:
import java.util.concurrent.CompletableFuture;
public class CompletableFuture1Demo {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
System.out.println("hello world");
return "result";
});
}
}
多执行几次,会发现,有可能这串代码,压根就不输出
CompletableFuture方法使用
supplyAsync,返回值
在以前线程中,要拿到线程的返回值,一般可以使用Future获取线程的返回值,CompletableFuture也可以实现
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("hello world");
return "success";
});
String result = completableFuture.get();
System.out.println(result);
}
}
可以拿到线程的返回值
备注,其中get()方法可以设置等待时间,如
public class CompletableFuture4Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
System.out.println("哈哈哈========");
return "result";
});
try {
String result = cf.get(1000, TimeUnit.NANOSECONDS);
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
}
runAsync,不带返回值
public class CompletableFuture2Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {
System.out.println("hello world");
});
completableFuture.join();
}
}
runAsync()方法实现是不带返回值的,但是CompletableFuture是守护线程,所以可以使用join()等待线程执行结束
runAsync支持传入自定义线程池
对于runAsync方法,提供了传入自定义线程池,没传的话,默认使用ForkJoinPool.commonPool()线程池
public class CompletableFuture3Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {
System.out.println("hello world");
}, executorService);
completableFuture.join();
}
}
不建议使用jdk使用默认线程池
public class ThenApplyDemo {
public static void main(String[] args) {
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
System.out.println("test");
return "hello";
}).thenApply((x) -> {
System.out.println(x);
x = x + " world";
return x;
});
try {
String result = cf.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
thenAcceptAsync使用
thenAcceptAsync是没有返回值的
import java.util.concurrent.CompletableFuture;
public class ThenApplyAsyncDemo {
public static void main(String[] args) {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("test");
return "hello";
}).thenAcceptAsync((x) -> {
x = x + " world";
System.out.println(x);
});
completableFuture.join();
}
}
thenAcceptAsync传入线程池
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThenCompose1Demo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("test");
return "hello";
}).thenAcceptAsync(x -> {
x = x + ":aaa";
System.out.println(x);
}, executorService);
completableFuture.join();
}
}
thenCompose使用
thenCompose用于连接两个CompletableFuture,例
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ThenComposeDemo {
public static void main(String[] args) {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("test");
return "hello";
}).thenCompose(x -> CompletableFuture.supplyAsync(() -> {
System.out.println(x);
return "world";
}));
try {
String result = completableFuture.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
thenRun使用
thenRun表示CompletableFuture执行结束之后要执行的动作,不用任何传参和返回
import java.util.concurrent.CompletableFuture;
public class ThenRunDemo {
public static void main(String[] args) {
CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行1");
return 1;
});
CompletableFuture completableFuture2 = completableFuture1.thenRun(() -> {
System.out.println("执行2");
});
completableFuture2.join();
}
}
thenRunAsync使用
同理,thenRunAsync不传入线程池,使用默认线程池
import java.util.concurrent.CompletableFuture;
public class ThenRunAsyncDemo {
public static void main(String[] args) {
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(1);
return 1;
});
CompletableFuture cf2 = cf1.thenRunAsync(() -> {
System.out.println(2);
});
cf2.join();
}
}
thenRunAsync传入自定义线程池
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThenRunAsync1Demo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(1);
return 1;
});
CompletableFuture completableFuture2 = completableFuture.thenRunAsync(() -> {
System.out.println(2);
}, executorService);
completableFuture2.join();
}
}
exceptionally使用
exceptionally用于执行异常处理,例
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ExceptionDemo {
public static void main(String[] args) {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("哈哈哈========");
int a = 1 / 0;
return "result";
}).exceptionally(e -> {
System.out.println(e.getMessage());
return "xxx";
});
try {
String result = completableFuture.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
whenComplete使用
CompletableFuture执行时,无论任务正常还是异常,它都会调用whenComplete
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class WhenCompleteDemo {
public static void main(String[] args) {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("======111===========");
return "success";
}).whenComplete((s, throwable) -> {
System.out.println("结果为:" + s);
if (Objects.nonNull(throwable)) {
System.out.println(throwable.getMessage());
}
});
try {
String result = completableFuture.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
异常为:
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class WhenCompleteDemo {
public static void main(String[] args) {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("======111===========");
int a = 1 / 0;
return "success";
}).whenComplete((s, throwable) -> {
System.out.println("结果为:" + s);
if (Objects.nonNull(throwable)) {
System.out.println(throwable.getMessage());
}
});
try {
String result = completableFuture.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
whenCompleteAsync使用
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class WhenCompleteAsyncDemo {
public static void main(String[] args) {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("======111===========");
int a = 1 / 0;
return "success";
}).whenCompleteAsync((s, throwable) -> {
System.out.println("结果为:" + s);
if (Objects.nonNull(throwable)) {
System.out.println(throwable.getMessage());
}
});
try {
String result = completableFuture.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
也能传入自定义线程池
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class WhenCompleteAsync1Demo {
public static void main(String[] args) {
ExecutorService executorService= Executors.newSingleThreadExecutor();
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("======111===========");
int a = 1 / 0;
return "success";
}).whenCompleteAsync((s, throwable) -> {
System.out.println("结果为:" + s);
if (Objects.nonNull(throwable)) {
System.out.println(throwable.getMessage());
}
}, executorService);
try {
String result = completableFuture.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
handle()使用
handle()是执行任务完成时对结果的处理
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class HandleDemo {
public static void main(String[] args) {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("============1111============");
return "aaaa";
}).handle((s, throwable) -> {
System.out.println(s);
if (Objects.nonNull(throwable)) {
System.out.println(throwable.getMessage());
}
return "bbb";
});
try {
String result = completableFuture.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
异常执行为
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class HandleDemo {
public static void main(String[] args) {
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("============1111============");
int c = 1 / 0;
return "aaaa";
}).handle((s, throwable) -> {
System.out.println(s);
if (Objects.nonNull(throwable)) {
System.out.println(throwable.getMessage());
}
return "bbb";
});
try {
String result = completableFuture.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
handle()方法在执行异常时,也会调用
thenAcceptBoth整合CompletableFuture多任务
import java.util.concurrent.CompletableFuture;
public class ThenAcceptBothDemo {
public static void main(String[] args) {
CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("completableFuture1执行");
return 1;
});
CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println("completableFuture2执行");
return 2;
});
CompletableFuture completableFuture3 = completableFuture1.thenAcceptBoth(completableFuture2, (a, b) -> {
System.out.println("completableFuture3执行");
System.out.println(a + b);
});
completableFuture3.join();
}
}
CompletableFuture中allOf()方法
allOf()是等待所有CompletableFuture方法执行完毕,然后再接着往下执行,没有返回值
import java.util.concurrent.CompletableFuture;
public class AllOfDemo {
public static void main(String[] args) {
CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("completableFuture1执行");
return 1;
});
CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println("completableFuture2执行");
return 2;
});
CompletableFuture completableFuture3 = CompletableFuture.supplyAsync(() -> {
System.out.println("completableFuture3执行");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 3;
});
CompletableFuture completableFuture = CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3);
completableFuture.join();
System.out.println("主线程执行结束");
}
}
allOf()参数支持传入一个数组
anyOf()方法使用
anyOf()是等待所有CompletableFuture方法任何一个返回就执行结束,有返回值
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class AnyOfDemo {
public static void main(String[] args) {
CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("completableFuture1执行");
return 1;
});
CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println("completableFuture2执行");
return 2;
});
CompletableFuture completableFuture3 = CompletableFuture.supplyAsync(() -> {
System.out.println("completableFuture3执行");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("completableFuture3执行结束");
return 3;
});
CompletableFuture completableFuture = CompletableFuture.anyOf(completableFuture1, completableFuture2, completableFuture3);
try {
Object result = completableFuture.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
System.out.println("主线程执行结束");
}
}
applyToEither()方法使用
applyToEither()说白了,就是哪个先返回就用哪个结果作为执行
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class ApplyToEitherDemo1 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(111);
// try {
// Thread.sleep(500);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
return "hello1";
});
CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(222);
return "hello2";
});
CompletableFuture future = completableFuture1.applyToEither(completableFuture2, x -> {
System.out.println(x);
return x;
});
System.out.println(future.get());
}
}
试着把sleep那段代码放开再试试,会发现哪个先返回用哪个
acceptEither()使用
acceptEither()跟applyToEither()差不多,只是没有返回值
import java.util.concurrent.CompletableFuture;
public class AcceptEitherDemo {
public static void main(String[] args) {
CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(111);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "hello1";
});
CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println(222);
return "hello2";
});
CompletableFuture future = completableFuture1.acceptEither(completableFuture2, x -> {
x = x + ":" + "aa";
System.out.println(x);
});
future.join();
}
}
总结
CompletableFuture提供了丰富的api供我们使用,以此来提高开发效率,至于在开发中也没必要强行用CompletableFuture,还是有很多异步方法可供选择的