前言
Java 8新增的CompletableFuture类正是吸收了所有Google Guava中ListenableFuture和SettableFuture的特征,还提供了其它强大的功能,让Java拥有了完整的非阻塞编程模型:Future、Promise 和 Callback(在Java8之前,只有无Callback 的Future)。
CompletableFuture能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。
CompletableFuture弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAccept、thenApply、thenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理
一. CompletableFuture的静态方法
方法名 | 描述 |
---|---|
runAsync(Runnable runnable) | 使用ForkJoinPool.commonPool()作为它的线程池执行异步代码 |
runAsync(Runnable runnable, Executor executor) | 使用指定的线程池 |
supplyAsync(Supplier supplier) | 异步操作有返回值 |
supplyAsync(Supplier supplier, Executor executor) | 使用指定的线程池 |
allOf(CompletableFuture<?>... cfs) | allOf里的所有future执行完成前主线程会阻塞,等所有future执行成功后,主进程会被唤醒 |
anyOf(CompletableFuture<?>... cfs) | 线程队列只要有一个异步线程完成就触发 |
completedFuture(U value) | 传入一个对象,后续可以用对应的方法进行操作 |
二.Completable
方法名 | 描述 |
---|---|
complete(String t) | 完成异步执行,并返回future的结果 |
completeExceptionally(Throwable ex) | 异步执行不正常的结束 |
future.get()在等待执行结果时,程序会一直block,如果此时调用complete(T t)会立即执行。但是complete(T t)只能调用一次,后续的重复调用会失效。
使用completeExceptionally(Throwable ex)则抛出一个异常,而不是一个成功的结果。
三.转换操作
- map
我们可以通过CompletableFuture来异步获取一组数据,并对数据进行一些转换.
thenApply(Function<? super T,? extends U> fn) | 接受一个Function<? super T,? extends U>参数用来转换CompletableFuture |
---|
thenApply的功能相当于将CompletableFuture转换成CompletableFuture
2.flatMap
thenCompose(Function<? super T, ? extends CompletionStage fn) | 在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletableFuture类型 |
---|
thenCompose可以用于组合多个CompletableFuture,将前一个结果作为下一个计算的参数,它们之间存在着先后顺序
3. 组合
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | 当两个CompletableFuture都正常完成后,执行提供的fn,用它来组合另外一个CompletableFuture的结果。 |
---|
//例如:
CompletableFuture<String> future5 = CompletableFuture.completedFuture("SUCCESS");
CompletableFuture<String> error = CompletableFuture.completedFuture("ERROR");
future5.thenCombineAsync(error,(s,i) -> s.concat(i)).get(1000,TimeUnit.MILLISECONDS);
- 使用thenCombine()之后future1、future2之间是并行执行的,最后再将结果汇总。这一点跟thenCompose()不同。
thenAcceptBoth跟thenCombine类似,但是返回CompletableFuture
/*thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
当两个CompletableFuture都正常完成后,执行提供的action,用它来组合另外一个CompletableFuture的结果。
*/
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100");
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture<Void> future = future1.thenAcceptBoth(future2, (s, i) -> System.out.println(Double.parseDouble(s + i)));
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
四.计算结果完成时的处理
- 当CompletableFuture完成计算结果后,我们可能需要对结果进行一些处理。
//whenComplete(BiConsumer<? super T,? super Throwable> action)
//当CompletableFuture完成计算结果时对结果进行处理,或者当CompletableFuture产生异常的时候对异常进行处理。
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s->s+" World")
.thenApply(s->s+ "\nThis is CompletableFuture demo")
.thenApply(String::toLowerCase)
.whenComplete((result, throwable) -> System.out.println(result));
- handle方法可以在执行完Action可以做转换
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "100")
.thenApply(s->s+"100")
.handle((s, t) -> s != null ? Double.parseDouble(s) : 0);
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
handle()相当于whenComplete()+转换。
- 纯消费(执行Action)
//thenAccept()是只会对计算结果进行消费而不会返回任何结果的方法。
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s->s+" World")
.thenApply(s->s+ "\nThis is CompletableFuture demo")
.thenApply(String::toLowerCase)
.thenAccept(System.out::print);
五. Either
- Either 表示的是两个CompletableFuture,当其中任意一个CompletableFuture计算完成的时候就会执行
Random random = new Random();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "from future1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "from future2";
});
CompletableFuture<Void> future = future1.acceptEither(future2,str->System.out.println("The future is "+str));
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
- applyToEither 跟 acceptEither 类似。
applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
//当任意一个CompletableFuture完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture<U>的计算结果。
Random random = new Random();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "from future1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "from future2";
});
CompletableFuture<String> future = future1.applyToEither(future2,str->"The future is "+str);
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
执行结果也跟上面的程序类似。
六. CompletableFuture异常处理
CompletableFuture在运行时如果遇到异常,可以使用get()并抛出异常进行处理,但这并不是一个最好的方法。CompletableFuture本身也提供了几种方式来处理异常。
//exceptionally(Function<Throwable,? extends T> fn)
CompletableFuture.supplyAsync(() -> "hello world")
.thenApply(s -> {
s = null;
int length = s.length();
return length;
}).thenAccept(i -> System.out.println(i))
.exceptionally(t -> {
System.out.println("Unexpected error:" + t);
return null;
});
总结
Java 8提供了一种函数风格的异步和事件驱动编程模型CompletableFuture,它不会造成堵塞。CompletableFuture背后依靠的是fork/join框架来启动新的线程实现异步与并发。当然,我们也能通过指定线程池来做这些事情。
评论区