前言
JDK 5 引入了 Future 模式。Future 接口是 Java 多线程 Future 模式的实现,在 java.util.concurrent 包中,可以来进行异步计算。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?如 Netty、Guava 分别扩展了 Java 的 Future 接口,方便异步编程。
为了解决这个问题,自 JDK8 开始,吸收了 Guava 的设计思想,加入了 Future 的诸多扩展功能形成了 CompletableFuture,让 Java 拥有了完整的非阻塞编程模型。CompletableFuture 它提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力。CompletableFuture 能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。
CompletableFuture 弥补了 Future 模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过 thenAccept、thenApply、thenCompose 等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。
CompletableFuture 简介
CompletableFuture 类实现了 CompletionStage 和 Future 接口,所以你还是可以像以前一样通过阻塞或者轮询的方式获得结果,尽管这种方式不推荐使用。
1 | public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { |
创建
supplyAsync / runAsync 异步计算结果
在该类中提供了四个静态方法创建 CompletableFuture 对象:
1 | // 使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码,异步操作有返回值 |
- 以 Async 结尾并且没有指定 Executor 的方法会使用 ForkJoinPool.commonPool() 作为线程池执行异步代码。
- runAsync 方法用于没有返回值的任务,它以 Runnable 函数式接口类型为参数,所以 CompletableFuture 的计算结果为空。
- supplyAsync 方法用于有返回值的任务,以 Supplier<U> 函数式接口类型为参数,CompletableFuture 的计算结果类型为 U。
执行
complete / completeExceptionally
1 | // 完成异步执行,并返回 future 的结果 |
转换
我们可以通过 CompletableFuture 来异步获取一组数据,并对数据进行一些转换,类似 RxJava、Scala 的 map、flatMap 操作。
thenApply 转换结果(map)
我们可以将操作串联起来,或者将 CompletableFuture 组合起来。它的入参是上一个阶段计算后的结果,返回值是经过转化后结果。
1 | // 接受一个 Function<? super T, ? extends U> 参数用来转换 CompletableFuture |
- thenApply 的功能相当于将 CompletableFuture<T> 转换成 CompletableFuture<U>。
- thenApply 函数的功能是当原来的 CompletableFuture 计算完后,将结果传递给函数 fn,将 fn 的结果作为新的 CompletableFuture 计算结果,这些转换并不是马上执行的,也不会阻塞,而是在前一个 stage 完成后继续执行。
- 它们与 handle 方法的区别在于 handle 方法会处理正常计算值和异常,因此它可以屏蔽异常,避免异常继续抛出。而 thenApply 方法只是用来处理正常值,因此一旦有异常就会抛出。
thenCompose 非嵌套整合(flatMap)
thenCompose 可以用于组合多个 CompletableFuture,将前一个结果作为下一个计算的参数,它们之间存在着先后顺序。
1 | // 在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回 CompletableFuture 类型。 |
- thenCompose 可以用于组合多个 CompletableFuture,将前一个结果作为下一个计算的参数,它们之间存在着先后顺序。
- thenapply() 是接受一个
Function<? super T,? extends U>
参数用来转换 CompletableFuture,相当于流的 map 操作,返回的是非 CompletableFuture 类型,它的功能相当于将 CompletableFuture<T> 转换成 CompletableFuture<U>。 - thenCompose() 在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回 CompletableFuture 类型,相当于 flatMap,用来连接两个 CompletableFuture。
组合
thenCombine
thenCombine 方法主要作用:结合两个 CompletionStage 的结果,进行转化后返回。
1 | // 当两个 CompletableFuture 都正常完成后,执行提供的 fn,用它来组合另外一个 CompletableFuture 的结果。 |
- 现在有 CompletableFuture<T>、CompletableFuture<U> 和一个函数 (T, U) -> V,thenCompose 就是将 CompletableFuture<T> 和 CompletableFuture<U> 变为 CompletableFuture<V>。
- 使用 thenCombine() 之后 future1、future2 之间是并行执行的,最后再将结果汇总。
thenAcceptBoth
thenAcceptBoth 方法主要作用:结合两个 CompletionStage 的结果,进行消耗,返回CompletableFuture<Void> 类型。
1 | // 当两个 CompletableFuture 都正常完成后,执行提供的 action,用它来组合另外一个 CompletableFuture 的结果。 |
- thenAcceptBoth 跟 thenCombine 类似,但是返回 CompletableFuture<Void> 类型。
- thenAcceptBoth 以及相关方法提供了类似的功能,当两个 CompletionStage 都正常完成计算的时候,就会执行提供的 action,它用来组合另外一个异步的结果。
计算结果完成时的处理
当 CompletableFuture 完成计算结果后,我们可能需要对结果进行一些处理。
whenComplete 计算结果完成时的处理
whenComplete 方法主要作用:当运行完成时,对结果的记录。
当 CompletableFuture 的计算结果完成,或者抛出异常的时候,有如下四个方法:
1 | // 当 CompletableFuture 完成计算结果时对结果进行处理,或者当 CompletableFuture 产生异常的时候对异常进行处理。 |
- 可以看到 Action 的类型是
BiConsumer<? super T, ? super Throwable>
它可以处理正常的计算结果,或者异常情况。 - 方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。
- exceptionally 方法返回一个新的 CompletableFuture,当原始的 CompletableFuture 抛出异常的时候,就会触发这个 CompletableFuture 的计算,调用 function 计算值,也就是这个 exceptionally 方法用来处理异常的情况。
handle 执行完可以做转换
handle 方法主要作用:运行完成时,对结果的处理。
除了上述四个方法之外,一组 handle 方法也可用于处理计算结果。当原先的 CompletableFuture 的值计算完成或者抛出异常的时候,会触发这个 CompletableFuture 对象的计算,结果由 BiFunction 参数计算而得。因此这组方法兼有 whenComplete 和转换的两个功能。
1 | // 当 CompletableFuture 完成计算结果或者抛出异常的时候,执行提供的 fn |
thenAccept 纯消费结果
上面的方法是当计算完成的时候,会生成新的计算结果 (thenApply, handle),或者返回同样的计算结果 whenComplete。我们可以在每个 CompletableFuture 上注册一个操作,该操作会在 CompletableFuture 完成执行后调用它。
1 | // 当 CompletableFuture 完成计算结果,只对结果执行 Action,而不返回新的计算值。 |
- CompletableFuture 通过 thenAccept 方法提供了这一功能,它接收CompletableFuture 执行完毕后的返回值做参数,只对结果执行Action,而不返回新的计算值。
或者
Either 表示的是两个 CompletableFuture,当其中任意一个 CompletableFuture 计算完成的时候就会执行。
acceptEither
applyToEither 方法主要作用:两个 CompletionStage,谁计算的快,我就用那个 CompletionStage 的结果进行下一步的消耗操作。
1 | // 当任意一个 CompletableFuture 完成的时候,action 这个消费者就会被执行。 |
applyToEither
applyToEither 方法主要作用:两个 CompletionStage,谁计算的快,我就用那个 CompletionStage 的结果进行下一步的转化操作。
1 | // 当任意一个 CompletableFuture 完成的时候,fn 会被执行,它的返回值会当作新的 CompletableFuture<U> 的计算结果。 |
- acceptEither 跟 applyToEither 类似,但是返回 CompletableFuture<Void> 类型。
其他方法
allOf、anyOf 是 CompletableFuture 的静态方法。
1 | // 在所有 Future 对象完成后结束,并返回一个 future |
- allOf() 方法所返回的 CompletableFuture,并不能组合前面多个 CompletableFuture 的计算结果。
- anyOf 和 acceptEither、applyToEither 的区别在于,后两者只能使用在两个 future 中,而 anyOf 可以使用在多个 future 中。
CompletableFuture 异常处理
CompletableFuture 在运行时如果遇到异常,可以使用 get() 并抛出异常进行处理,但这并不是一个最好的方法。CompletableFuture 本身也提供了几种方式来处理异常。
1 | // 只有当 CompletableFuture 抛出异常的时候,才会触发这个 exceptionally 的计算,调用 function 计算值。 |
- exceptionally 方法主要作用在于当运行时出现了异常,可以通过 exceptionally 进行补偿。
使用 JAVA CompletableFuture 的 20 例子
新建一个完成的 CompletableFuture
这个简单的示例中创建了一个已经完成的预先设置好结果的 CompletableFuture。通常作为计算的起点阶段。
1 | static void completedFutureExample() { |
运行一个简单的异步 stage
下面的例子解释了如何创建一个异步运行 Runnable 的 stage。
1 | static void runAsyncExample() { |
将方法作用于前一个 Stage
下面的例子引用了第一个例子中已经完成的 CompletableFuture,它将引用生成的字符串结果并将该字符串大写。
1 | static void thenApplyExample() { |
- Function 是阻塞的,这意味着只有当大写操作执行完成之后才会执行 getNow() 方法。
异步的将方法作用于前一个 Stage
1 | static void thenApplyAsyncExample() { |
使用一个自定义的 Executor 来异步执行该方法
异步方法的一个好处是可以提供一个 Executor 来执行 CompletableStage。这个例子展示了如何使用一个固定大小的线程池来实现大写操作。
1 | static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() { |
消费(Consume)前一个 Stage 的结果
果下一个 Stage 接收了当前 Stage 的结果但是在计算中无需返回值(比如其返回值为 void),那么它将使用方法 thenAccept 并传入一个 Consumer 接口。
1 | static void thenAcceptExample() { |
Consumer 将会同步执行,所以我们无需在返回的 CompletableFuture 上执行 join 操作。
异步执行 Comsume
1 | static void thenAcceptAsyncExample() { |
计算出现异常时
为了简洁性,我们还是将一个字符串大写,但是我们会模拟延时进行该操作。我们会使用 thenApplyAsyn(Function, Executor),第一个参数是大写转化方法,第二个参数是一个延时 executor,它会延时一秒钟再将操作提交给 ForkJoinPool。
1 | static void completeExceptionallyExample() { |
注意:handle 方法返回一个新的 CompletionStage,无论之前的 Stage 是否正常运行完毕。传入的参数包括上一个阶段的结果和抛出异常。
取消计算
和计算时异常处理很相似,我们可以通过 Future 接口中的 cancel(boolean mayInterruptIfRunning) 来取消计算。
1 | static void cancelExample() { |
注意:exceptionally 方法返回一个新的 CompletableFuture,如果出现异常,则为该方法中执行的结果,否则就是正常执行的结果。
将 Function 作用于两个已完成 Stage 的结果之一
下面的例子创建了一个 CompletableFuture 对象并将 Function 作用于已完成的两个 Stage 中的任意一个(没有保证哪一个将会传递给 Function)。这两个阶段分别如下:一个将字符串大写,另一个小写。
1 | static void applyToEitherExample() { |
消费两个阶段的任意一个结果
和前一个例子类似,将 Function 替换为 Consumer
1 | static void acceptEitherExample() { |
在两个阶段都完成后运行 Runnable
注意这里的两个 Stage 都是同步运行的,第一个 stage 将字符串转化为大写之后,第二个 stage 将其转化为小写。
1 | static void runAfterBothExample() { |
用 Biconsumer 接收两个 stage 的结果
Biconsumer 支持同时对两个 Stage 的结果进行操作。
1 | static void thenAcceptBothExample() { |
将 Bifunction 同时作用于两个阶段的结果
如果 CompletableFuture 想要合并两个阶段的结果并且返回值,我们可以使用方法 thenCombine。这里的计算流都是同步的,所以最后的 getNow() 方法会获得最终结果,即大写操作和小写操作的结果的拼接。
1 | static void thenCombineExample() { |
异步将 Bifunction 同时作用于两个阶段的结果
1 | static void thenCombineAsyncExample() { |
Compose CompletableFuture
我们可以使用 thenCompose 来完成前两个例子中的操作。
1 | static void thenComposeExample() { |
当多个阶段中有有何一个完成,即新建一个完成阶段
1 | static void anyOfExample() { |
当所有的阶段完成,新建一个完成阶段
1 | static void allOfExample() { |
当所有阶段完成以后,新建一个异步完成阶段
1 | static void allOfAsyncExample() { |
真实场景
下面展示了一个实践 CompletableFuture 的场景:
1、先通过调用 cars() 方法异步获得 Car 列表。它将会返回一个 CompletionStage<List<Car>>。cars() 方法应当使用一个远程的 REST 端点来实现。
2、我们将该 Stage 和另一个 Stage 组合,另一个 Stage 会通过调用 rating(manufactureId) 来异步获取每辆车的评分。
3、当所有的 Car 对象都填入评分后,我们调用 allOf() 来进入最终 Stage,它将在这两个阶段完成后执行
4、 在最终 Stage 上使用 whenComplete(),打印出车辆的评分。
1 | // cars()返回一个汽车列表 |
总结
Java 8 提供了一种函数风格的异步和事件驱动编程模型 CompletableFuture,它不会造成堵塞。CompletableFuture 背后依靠的是 fork/join 框架来启动新的线程实现异步与并发。当然,我们也能通过指定线程池来做这些事情。
CompletableFuture 特别是对微服务架构而言,会有很大的作为。举一个具体的场景,电商的商品页面可能会涉及到商品详情服务、商品评论服务、相关商品推荐服务等等。获取商品的信息时,需要调用多个服务来处理这一个请求并返回结果。这里可能会涉及到并发编程,我们完全可以使用 Java 8 的 CompletableFuture 或者 RxJava 来实现。事实证明,只有当每个操作很复杂需要花费相对很长的时间(比如,调用多个其它的系统的接口;比如,商品详情页面这种需要从多个系统中查数据显示)的时候用 CompletableFuture 才合适,不然区别真的不大,还不如顺序同步执行。
参考博文
[1]. 猫头鹰的深夜翻译:使用JAVA CompletableFuture的20例子
[2]. Java8学习笔记之CompletableFuture组合式异步编程
Java8 那些事儿系列
- Java8 那些事儿(一):Stream 函数式编程
- Java8 那些事儿(二):Optional 类解决空指针异常
- Java8 那些事儿(三):Date/Time API(JSR 310)
- Java8 那些事儿(四):增强的 Map 集合
- Java8 那些事儿(五):函数式接口
- Java8 那些事儿(六):从 CompletableFuture 到异步编程