简单认识CompletableFuture

geepair

技术分享|2023-4-19|最后更新: 2023-4-21|
type
Post
status
Published
date
Apr 19, 2023
slug
summary
tags
开发
category
技术分享
icon
password

1.什么是CompletableFuture

CompletableFuture是jdk8引入的实现类。扩展了Future和CompletionStage,是一个可以在任务完成阶段触发一些操作Future,简单的来讲就是可以实现异步回调。

2.使用场景

CompletableFuture可以在很多场景下使用
  • 异步执行任务并处理任务结果
  • 处理多个异步任务的结果
  • 对多个异步任务进行组合
  • 处理异步任务的异常
I/O密集型任务)当需要执行一些耗时的操作时,我们可以用CompletableFuture来实现异步回调,从而释放主线程继续执行其他任务。还可以同时处理多个异步任务的结果,对它们进行一些组合操作,或者处理它们的异常情况。例如需要对外提供功能接口,对内调度各个下游服务获取数据进行聚合。

3.并行加载的实现方式

3.1同步模型

从各个服务获取数据,顺序执行
notion image
  • 简单直观
  • 但是在同步调用的场景下,接口耗时长、性能差,接口响应时长T > (T1+T2+T3+……+Tn)
 
为了缩短接口的响应时间,一般会使用线程池的方式并行获取数据。
notion image
导致资源利用率比较低
  • CPU资源大量浪费在阻塞等待上
  • 会引入更多额外的线程池
同步模型下,会导致硬件资源无法充分利用,系统吞吐量容易达到瓶颈。

3.2NIO异步模型

  • 通过RPC NIO异步调用的方式可以降低线程数,从而降低调度(上下文切换)开销,如Dubbo的异步调用
  • 通过引入CompletableFuture(下文简称CF)对业务流程进行编排,降低依赖之间的阻塞。

3.3比较

业界广泛流行的解决方案,主要包括Future、CompletableFuture、RxJava、Reactor(响应式编程)Mono/Flux
Future
CompletableFuture
RxJava
Reactor
Composable(可组合)
✔️
✔️
✔️
Asynchronous(异步)
✔️
✔️
✔️
✔️
Operator fusion(操作融合)
✔️
✔️
Lazy(延迟执行)
✔️
✔️
Backpressure(回压)
✔️
✔️
  • 可组合:可以将多个依赖操作通过不同的方式进行编排,例如CompletableFuture提供thenCompose、thenCombine等各种then开头的方法,这些方法就是对“可组合”特性的支持。
  • 操作融合:将数据流中使用的多个操作符以某种方式结合起来,进而降低开销(时间、内存)。
  • 延迟执行:操作不会立即执行,当收到明确指示时操作才会触发。例如Reactor只有当有订阅者订阅时,才会触发操作。
  • 回压:某些异步阶段的处理速度跟不上,直接失败会导致大量数据的丢失,对业务来说是不能接受的,这时需要反馈上游生产者降低调用量。

4.CompletableFuture使用与原理

 

4.1解决的问题

CompletableFuture是由Java 8引入的,在Java8之前我们一般通过Future实现异步。
  • Future用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法。
  • CompletableFuture对Future进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排,同时一定程度解决了回调地狱的问题。
 
Futures和ListenableFuture实现方式
 
CompletableFuture的实现方式
 

4.2CompletableFuture的定义

notion image
CompletableFuture实现了两个接口(如上图所示):FutureCompletionStage
  • Future表示异步计算的结果
  • CompletionStage用于表示异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个CompletionStage触发的,随着当前步骤的完成,也可能会触发其他一系列CompletionStage的执行。
从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage接口正是定义了这样的能力,我们可以通过其提供的thenAppythenCompose等函数式编程方法来组合编排这些步骤。
 

4.3CompletableFuture的使用

使用CompletableFuture也是构建依赖树的过程。一个CompletableFuture的完成会触发另外一系列依赖它的CompletableFuture的执行
notion image
一个业务接口的流程,其中包括CF1\CF2\CF3\CF4\CF5共5个步骤,并描绘了这些步骤之间的依赖关系,每个步骤可以是一次RPC调用、一次数据库操作或者是一次本地方法调用等
根据依赖数量,可以分为以下几类:零依赖、一元依赖、二元依赖和多元依赖。

4.3.1零依赖(创建)

 
notion image
  • 使用runAsync或supplyAsync发起异步调用
  • 直接创建一个已完成状态的CompletableFuture
  • 先初始化一个未完成的CompletableFuture,然后通过complete()、completeExceptionally(),完成该CompletableFuture
 
将回调方法转为CompletableFuture,然后再依赖CompletableFure的能力进行调用编排

4.3.2一元依赖

notion image
CF3,CF5分别依赖于CF1和CF2,这种对于单个CompletableFuture的依赖可以通过thenApply、thenAccept、thenCompose等方法来实现

4.3.3二元依赖:依赖两个CF

notion image
CF4同时依赖于两个CF1和CF2,这种二元依赖可以通过thenCombine等回调来实现

4.3.4多元依赖:依赖多个CF

notion image
整个流程的结束依赖于三个步骤CF3、CF4、CF5,这种多元依赖可以通过allOf或anyOf方法来实现,区别是当需要多个依赖全部完成时使用allOf,当多个依赖中的任意一个完成即可时使用anyOf

4.4CompletableFuture原理

CompletableFuture中包含两个字段:resultstack。result用于存储当前CF的结果。stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)的形式存储,stack表示栈顶元素。
notion image
依赖动作(Dependency Action)都封装在一个单独Completion子类中。下面是Completion类关系结构图。CompletableFuture中的每个方法都对应了图中的一个Completion的子类,Completion本身是观察者的基类。
  • UniCompletion继承了Completion,是一元依赖的基类,例如thenApply的实现类UniApply就继承自UniCompletion。
  • BiCompletion继承了UniCompletion,是二元依赖的基类,同时也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。
notion image

4.4.1设计思想

notion image
被观察者
  • 每个CompletableFuture都可以被看作一个被观察者,其内部有一个Completion类型的链表成员变量stack,用来存储注册到其中的所有观察者。当被观察者执行完成后会弹栈stack属性,依次通知注册到其中的观察者。
  • 被观察者CF中的result属性,用来存储返回结果数据。这里可能是一次RPC调用的返回值,也可能是任意对象。
观察者
CompletableFuture支持很多回调方法,例如thenAccept、thenApply、exceptionally等,这些方法接收一个函数类型的参数f,生成一个Completion类型的对象(即观察者),并将入参函数f赋值给Completion的成员变量fn,然后检查当前CF是否已处于完成状态(即result != null),如果已完成直接触发fn,否则将观察者Completion加入到CF的观察者链stack中,再次尝试触发,如果被观察者未执行完则其执行完毕之后通知触发。
  • 观察者中的dep属性:指向其对应的CompletableFuture
  • 观察者中的src属性:指向其依赖的CompletableFuture
  • 观察者Completion中的fn属性:用来存储具体的等待被回调的函数

4.4.2流程

一元依赖
  • 将观察者Completion注册到CF1,此时CF1将Completion压栈
  • 当CF1的操作运行完成时,会将结果赋值给CF1中的result属性
  • 依次弹栈,通知观察者尝试运行
notion image
二元依赖
thenCombine操作表示依赖两个CompletableFuture
BiApply通过src和snd两个属性关联被依赖的两个CF,fn属性的类型为BiFunction。与单个依赖不同的是,在依赖的CF未完成的情况下,thenCombine会尝试将BiApply压入这两个被依赖的CF的栈中,每个被依赖的CF完成时都会尝试触发观察者BiApply,BiApply会检查两个依赖是否都完成,如果完成则开始执行。
notion image
多元依赖
依赖多个CompletableFuture的回调方法包括allOfanyOfallOf观察者实现类为BiRelay,需要所有被依赖的CF完成后才会执行回调;而anyOf观察者实现类为OrRelay,任意一个被依赖的CF完成后就会触发
notion image

总结

同步方法
  • 如果注册时被依赖的操作已经执行完成,则直接由当前线程执行
  • 如果注册时被依赖的操作还未执行完,则由回调线程执行
异步方法
  • CompletableFuture默认使用ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的应用,线程数可能成为瓶颈
  • 异步回调传入自定义线程池,做好线程池隔离
Loading...