环境:projectreactor2020.0.14
1. 前言
在响应式编程中,Project Reactor提供了两个核心的概念:Mono和Flux。Mono和Flux都是Reactor中的Publisher,它们可以产生并发布数据,然后可以被订阅和消费。这两个概念在WebFlux中有着广泛的应用,帮助我们实现异步和非阻塞的编程模型。
在这个主题中,我们将深入探讨Mono和Flux的基本使用。我们将了解它们如何被创建,如何订阅它们的事件,以及如何处理错误和完成通知。通过学习这些内容,你将能够更好地理解WebFlux的响应式编程模型,并能够在你的项目中有效地使用Mono和Flux。
让我们开始吧!
2. 环境依赖
io.projectreactor
reactor-core
io.projectreactor
reactor-bom
${reactor.version}
pom
import
3. Mono & Flux介绍
Flux
Flux表示了0到N个元素序列,下图展示了Flux如何转换元素
Flux
一个Flux是一个标准的Publisher,它表示一个由0到N个发射项目组成的异步序列,可选地由一个完成信号或一个错误终止。在响应式流规范中,这三种类型的信号转换为对下游订阅者的onNext、onComplete和onError方法的调用。
由于可能信号的范围很大,Flux是通用的反应式类型。请注意,所有事件,甚至是终止事件,都是可选的:只有onComplete事件才能表示一个空的有限序列,但删除onComplete事件就会得到一个无限的空序列(没什么用处,除了关于取消的测试)。类似地,无限序列不一定是空的。例如,Flux.interval(Duration)产生一个无限长的Flux,并从时钟发出规则的时标。
Mono
Mono表示了0个或1个元素序列,下图展示了Mono如何转换元素
图片
Mono
Mono是一个专门的发布者,它通过onNext信号发出最多一个项目,然后以onComplete信号终止(Mono成功,有或没有值),或只发出一个onError信号(Mono失败)。
大多数Mono实现都希望在调用onNext之后立即对其订阅者调用onComplete。Mono.never()是一个异常值:它不会发出任何信号,这在技术上并没有被禁止,但在测试之外并不是特别有用。另一方面,onNext和onError的组合是明确禁止的。
Mono只提供了可用于` Flux `的操作符子集,有些操作符(特别是那些将Mono与另一个`Publisher`结合的操作符)会切换到`Flux`。例如,Mono#concatWith(Publisher)返回一个Flux,而Mono#then(Mono)返回另一个Mono。
注意,你可以使用Mono来表示只有完成概念的无值异步进程(类似于Runnable)。要创建一个,可以使用一个空的Mono。
4. Mono & Flux常用操作
Mono常用操作
- 创建元素
Mono.just(T value)方法:创建一个包含指定值的Mono对象。
Mono.just(10).subscribe(System.out::println) ;
Mono.empty()方法:创建一个空的Mono对象,即不包含任何元素。
Mono.justOrEmpty(T value)方法:如果指定值不为null,则创建一个包含该值的Mono对象;否则创建一个空的Mono对象。
// 输出10
Mono.justOrEmpty(10).subscribe(System.out::println) ;
// 如果值为null,没有任何输出
Mono.justOrEmpty(null).subscribe(System.out::println) ;
图片
Mono.fromCallable(Callable