本文介绍了 trivago 的搜索后端团队在设计和实现我们众多 Java 后端服务之一时如何有效地使用 Java 中的反应式编程。与传统的命令式和函数式编程相比,反应式编程需要转变思维方式才能有效地应用概念和技术。我们获得的好处支持我们应对当今后端架构中每个工程师几乎每个(微)服务都面临的一些关键挑战:处理阻塞 IO、背压、管理高度变化的负载以及消息和错误传播。
文章重点关注以下几个关键方面:
- 展示我们在生产中可靠运行一年多的一个真实用例。
- 我们在开发过程中遇到的技术挑战。
- 我们实施的关键方面以及他们如何应对挑战。
由于已经有大量的优质资源可用于学习和理解反应式编程的基本概念,因此本文不涵盖反应式编程的基础知识,并假定您具备同一主题的中级知识。中提供了一些推荐学习资源的参考结论。
用例
trivago 以其酒店搜索产品这使我们的用户能够比较全球数以百万计的住宿和价格。除了我们自己的平台外,我们还出现在其他酒店广告平台上,例如谷歌酒店广告。
此类平台提供了应用程序编程接口使企业能够连接到其后端服务并提供要在目标平台上宣传的库存和定价。
trivago 的举措之一就是在这些平台上进行展示。为了实现这一目标,trivago 的搜索后端工程团队必须开发一种解决方案,以将可用价格从我们自己的平台提供给其他酒店广告平台。
在本文中,我们不会关注整体架构和业务问题的细节,而是关注我们实现的技术方面。因此,我们深入研究了一个微服务,它负责:
- 从 Kafka 主题读取目标酒店广告平台的价格数据
- 将数据转换为目标酒店广告平台所需的不同格式
- 将数据发送至目标酒店广告平台
- 将有关发送数据的信息写入业务日志
技术挑战
与往常一样,有许多技术挑战需要解决。通常,技术需求需要从要解决的用例中得出,以便评估其对整体解决方案的影响。与我们构建的反应式管道相关,必须解决以下关键挑战:
- 数据量波动:数据可以在任何时间、任何数量到达输入Kafka主题。可靠地处理大量突发以及一致的数据流非常重要。
- 价格数据具有(缓存)过期时间:由于价格会随着时间的推移而变化,我们的系统不应增加对价格新鲜度产生直接影响的主要处理延迟。交付到目标平台的速度越快越好。
- API 配额:需要谨慎处理数据爆发和高度变化的数据量,以保持在目标酒店广告平台给出的 API 报价范围内。
- 数据分析:系统需要记录发送到目标酒店广告平台的所有价格,以进行进一步的技术和业务分析。
- 数据转换和优化:从内部格式转换为所需的目标格式相当简单。一些平台提出了额外的要求,例如不得在同一请求中发送重复的价格。在这个过程中需要考虑到这一点。
除了这些具体挑战之外,我们还面临着每个(后端)在线服务所共有的挑战,例如可扩展性、弹性、资源需求和性能。
执行
概述的关键技术要求使得使用 Java 中的 Reactor 进行反应式编程成为支持我们解决以下问题的合适工具:
- 处理波动的处理量:非阻塞IO、背压
- 对于 API 配额:请求速率限制和数据缓冲。
- 弹性:简化的错误处理和传播。
- 可扩展性:垂直由于非阻塞性质并利用调度程序,水平使用您最喜欢的容器编排。
我们在服务中使用了以下技术:
- 春季启动作为我们的基础应用框架
- 反应堆(核心)、反应式API的实现
- Spring 的 WebFlux用于使用响应式 API 进行基于 HTTP 的通信
- 反应堆卡夫卡,一个构建在 Apache Kafka API 之上的反应式 API
这些使我们能够从头开始(即从 Kafka 主题读取输入数据)到结束(即将生成的 HTTP 请求发送到我们的目标外部酒店广告平台)构建完全反应式管道。
我们应用程序的主要流程/管道必须包括以下高级处理步骤,下面将进一步说明:
- 从输入 Kafka 主题消耗价格。
- 缓冲价格以创建更大批量的 HTTP 突发。
- 以下更多计算密集型和数据无关的步骤被调度到线程池以增加应用程序的吞吐量:
- 将批量价格转换为目标酒店广告平台期望的数据结构,例如 Google API 的交易
- 将结果对象发送到指定的 HTTP 端点
- 如果成功,将发送的价格记录到业务日志(Kafka 主题)
现在,是时候编写一些代码了。请注意,以下摘录经过简化以减少代码大小,例如没有 getter/setter 方法,并重点关注解决方案的关键方面。此外,接下来的部分仅描述最相关的部分,并对其进行详细阐述。片段的扩展版本可以在附录。
所需的处理步骤被分为管道的单独阶段。最后通过链接将它们组装起来。本文从深入研究每个构建块开始,自下而上地进行处理。
使用 Kafka 主题的价格数据
要检索要发送的价格数据,我们的应用程序需要从提供的输入 Kafka 主题中使用它。KafkaReceiver
我们使用了包中的reactive reactor.kafka
,并在一个名为的类中实现了所需的逻辑酒店价格消费者。让我们重点关注以下几行:
kafkaReceiver
.receive() // Flux<ReceiverRecord<String, HotelPrice>>
.subscribeOn(scheduler) // scheduler: Schedulers.newSingle("kafka-consumer")
.retryBackoff(Long.MAX_VALUE, Duration.ofSeconds(1), Duration.ofSeconds(10));
的实例KafkaReceiver
用于通过调用receive()
运算符来开始接收数据(第 2 行)。与非反应式接口相比KafkaConsumer
,它可以轻松地集成到整个反应流中,使我们能够从 Reactor 的功能(例如背压)中受益。
我们希望使用专用调度程序无限地运行它(第 3 行)。我们也尝试使用多个线程来安排调度程序。我们的结果表明,对于我们的用例来说,使用单线程没有任何好处。此外,单个线程可确保即将到来的价格批处理的内存复杂性不会增加,因为这将是每个线程独立执行的。请注意,调度程序必须仅创建一次,例如在初始化时,以确保在每次调用 Flux 时使用相同的调度程序。
如果从上游 Kafka 代理接收数据时出现问题,请给它一些时间来恢复,例如重新平衡。这是通过使用retryBackoff
应用随机指数退避的运算符来实现的(第 4 行)。持续轮询它们只会添加负载和垃圾邮件日志/错误指标,这些指标在我们期望它们恢复时没有用(如果没有,则需要有人在 Kafka 代理端调查问题)。
价格批量化
随着价格流流入我们的应用程序,我们开始考虑解决批量需求。这部分管道是在类中实现的价格批量创建器,重点摘录:
hotelPriceFlux // Flux<HotelPrice>
// Values determined by various load tests suiting our use-case
.bufferTimeout(1000, Duration.ofMillis(5000), scheduler)
.publishOn(scheduler); // scheduler: Schedulers.newParallel("main")
使用运算符bufferTimeout
(第 3 行),我们可以轻松实现缓冲要求。该运算符考虑使用缓冲元素发出 a 的两个触发器List
:最大缓冲区大小和最大持续时间。这使我们能够限制向目标酒店广告平台发出的每个请求的最大价格数量,并涵盖以下边缘情况:在我们的主平台上的流量较低的时段,需要一些时间才能有足够的价格到达目标酒店广告平台。缓冲区限制。由于所有价格都有到期时间,超时可确保它们不会在管道中停留太久。超时确保定期刷新它们。由于缓冲区大小无论如何都很小,因此这不会对目标酒店广告平台的 API 配额产生任何影响。
一旦缓冲区被 发出,除了在操作符上使用相同的调度程序之外,bufferTimeout
我们还需要使用操作符(第 4 行)将其发布到调度程序。这是必需的,因为仅在使用其提供的调度程序触发超时时才负责发出数据。达到缓冲区大小阈值时的排放需要由下游调度程序处理。publishOn
bufferTimeout
bufferTimeout
使用并行调度程序(第 4 行),我们可以将下游任何较繁重的处理分配给多个线程。通过这种方式,可以根据可用核心数量启用垂直应用程序扩展。这非常方便,因为应用程序的水平扩展与输入 Kafka 主题的分区数量绑定(一个消费者组的每个应用程序实例最大为 1 个分区)。或者,您也可以在此处使用parallel
运算符后跟 arunOn(scheduler)
来引入并发处理。在此用例中,两者都是实现预期结果的可行选择。
将 HotelPrice 批次转换为 Transactions
下一步非常简单,并在相当简短的代码片段中进行了概述价格批量到交易转换器班级:
hotelPriceBatchFlux // Flux<List<HotelPrice>> hotelPriceBatchFlux
.map(this::convert);
这是一个非常简单的“将一种数据类型映射到另一种”类,这里没有什么特别需要注意的。
目标平台施加的附加要求(例如每个请求的重复价格删除)也在此阶段实现,但此处省略。由于这些可能会根据 hotelads 平台的不同而有所不同,因此这部分可以通过应用程序配置进行交换。
将事务发送到目标 HTTP 端点
我们使用反应式WebClient
类(Spring 的 WebFlux 包的一部分)将任何Transaction
对象作为序列化 XML 数据发送到目标酒店广告平台,例如本例中的 Google,因为他们的 API 期望事务数据采用 XML 格式。以下片段摘自Http客户端类专注于发送交易:
Mono.just(transaction)
.map(this::serialize) // Object -> String (XML)
.transformDeferred(
xmlStringMono ->
webClient
.post()
.body(xmlStringMono, String.class)
.retrieve()
.onStatus(
HttpStatus::isError,
clientResponse -> responseStatusErrorHandler(transaction, clientResponse))
// Response doesn't provide any data in this case
.bodyToMono(String.class))
.then();
捕获端点可能使用该onStatus
方法返回的任何错误(第 9 行)。提供的回调responseStatusErrorHandler
(第 11 行)返回一个在出错时执行的 Mono。在该方法中,我们连接了一些日志记录和可观察性指标的记录。
返回的 HTTP 响应不包含与管道下游任何内容相关的任何数据。因此,我们可以通过使用then()
运算符(第 14 行)来传达这一点,该运算符仍然向下游传播成功或错误等相关信号,但不使用对象传递数据Void
。
该阶段可能出现的任何错误信号都会有意向下游传播。我们希望在组装主流程时使用“全局”错误处理。
记录已成功将价格发送到 Kafka 主题
作为我们要求的一部分,我们想知道哪些价格已成功发送到目标平台。这是由类处理的价格记录器它使用反应式KafkaSender
来达到这个目的。
Mono.just(transaction)
.map(this::toSenderRecord)
.transformDeferred(kafkaSender::send)
.then();
转换数据(第 2 行)后,使用KafkaSender
(第 3 行)进行记录。由于下游对 所返回的任何数据结果不感兴趣KafkaSender
,then()
因此运算符(第 4 行)仅用于传达下游操作的结果信号,即。成功或错误。
主流:将所有内容粘在一起
最后,我们精心制作了所有的零碎内容,并将它们粘在一起形成一个主流。Reactor 提供了各种运算符,使此任务变得非常简单。结果也很容易阅读,因为它列出了图中描述的流程的每个阶段推送管道班级。
hotelPriceConsumer
.consumeKafkaTopic() // Flux<HotelPrice>
.transformDeferred(priceBatchCreator::createBatches) // Flux<List<HotelPrice>>
.transformDeferred(priceBatchToTransactionConverter::convert) // Flux<Transaction>
.flatMap(transaction -> httpClient
.send(transaction) // Flux<Void>
.then(priceLogger.log(transaction))) // Flux<Void>
.onErrorContinue((t, o) -> {/* Some logging + metrics */}));
我们称hotelPriceConsumer.consumeKafkaTopic()
(第 2 行)将管道打开为Flux<HotelPrice>
. 价格由专用的单线程消耗和转发。
在下一步中,我们使用transformDeferred
(第 3 行)插入价格批处理。这会产生一个Flux<List<HotelPrice>>
在并行调度程序上调度的 N: #CPUs 线程,从而支持所有后续步骤的并发性。
List<HotelPrice>
将对象转换为 aTransaction
由我们的转换器类负责(第 4 行)。
该运算符flatMap()
用于公开 的发出元素Flux<Transaction>
。嵌套管道必须使用相同的对象来发送 HTTP 请求(第 6 行)并将其记录到 Kafka(第 7 行)。
链接在一起的子组件的任何错误都会向下游传播,由操作员全局处理onErrorContinue
(第 8 行)。这确保了管道在处理错误后保持无限运行。必须关心最高级别的错误处理完全符合我们希望管道的行为方式。流程会在管道中的任何点中止,除非consumeKafkaTopic()
有专门的错误处理。任何下游运算符都会被跳过,因为我们只关心“输入数据的处理和发送是否成功?” 全局处理任何错误就足够了。
结论
Java 中 Reactor 的响应式编程支持我们有效地处理阻塞 IO(例如 Kafka、gRPC)、背压、线程处理/调度和错误处理。
它需要我们在如何设计和处理应用程序流程方面进行范式和思维方式的转变。我们的团队花了一些时间来学习所需的技术,并在我们庞大且不断发展的后端架构中逐步实施它们。
作为这篇博文的结束语,以下是一些帮助我们的团队在该主题上有效学习和成长的建议:
- 学习资源
- 书:《Reactive Programming with RxJava》(虽然是为 RxJava 而不是 Reactor 编写的,但使用的运算符和教授的概念大多相同)
- 阅读 Project Reactor 的源代码:一旦您了解了基本原理并开始编写您的第一个管道,请检查事物是如何在幕后实现和工作的。这使我们能够更好地了解如何有效地使用不同的运算符
- 如果您缺乏实践,请不要从头开始编写完整的响应式服务。开始编写/重构现有服务的非响应部分。反应式代码可以内联在任何命令式代码旁边。这也是我们使用混合反应式和非反应式代码来处理各种服务的方式,目的是将它们转变为完全反应式。
- 经常与同事迭代并分享您的学习成果#FanaticLearning
附录:扩展代码片段
以下是本文中提供的代码片段的扩展版本,为您提供有关我们的实现的一些额外背景信息。
片段 1:HotelPriceConsumer
public class HotelPriceConsumer {
private KafkaReceiver<String, HotelPrice> kafkaReceiver: KafkaReceiver.create(/* setup params omitted */);
private Scheduler scheduler: Schedulers.newSingle("kafka-consumer");
private Flux<HotelPrice> consumeKafkaTopic() {
return kafkaReceiver
.receive() // Flux<ReceiverRecord<String, HotelPrice>>
.subscribeOn(scheduler)
.map(ConsumerRecord::value)
.doOnError(throwable -> errorLogging(throwable))
.retryBackoff(Long.MAX_VALUE, Duration.ofSeconds(1), Duration.ofSeconds(10));
}
private void errorLogging(Throwable t) {
// ... some logging for observability when bad things happen
}
}
片段 2:PriceBatchCreator
public class PriceBatchCreator {
private Scheduler scheduler: Schedulers.newParallel("main");
public Flux<List<HotelPrice>> createBatches(Flux<HotelPrice> hotelPriceFlux) {
return hotelPriceFlux
// Values determined by various load tests suiting our use-case
.bufferTimeout(1000, Duration.ofMillis(5000), scheduler)
.publishOn(scheduler);
}
}
片段 3:PriceBatchToTransactionConverter
public class PriceBatchToTransactionConverter {
public Flux<Transaction> convert(Flux<List<HotelPrice>> hotelPriceBatchFlux) {
return hotelPriceBatchFlux
.map(this::convert);
}
private Transaction convert(List<HotelPrice> hotelPriceBatch) {
// ... converts to target object format
}
}
片段 4:HttpClient
public class HttpClient {
private WebClient webClient;
public Mono<Void> send(Transaction transaction) {
return Mono.just(transaction)
.map(this::serialize)
.transformDeferred(
xmlStringMono ->
webClient
.post()
.body(xmlStringMono, String.class)
.retrieve()
.onStatus(
HttpStatus::isError,
clientResponse -> responseStatusErrorHandler(transaction, clientResponse))
// Response doesn't provide any data in this case
.bodyToMono(String.class))
.then();
}
private String serialize(Transaction transaction) {
// ... serialize to XML text
}
private Mono<Throwable> responseStatusErrorHandler(Transaction transaction, ClientResponse clientResponse) {
return Mono.just(Tuples.of(transaction, clientResponse))
.doOnNext(tuple -> responseStatusErrorMetrics(tuple))
.flatMap(response -> Mono.error(new PushClientException("Sending failed, HTTP status code: " + clientResponse.statusCode())));
}
private void responseStatusErrorMetrics(Tuple2<Transaction, ClientResponse> tuple) {
// ... record some metrics for observability
}
}
片段 5:PriceLogger
public class PriceLogger {
private KafkaSender<String, Transaction> kafkaSender: KafkaSender.create(/* setup params omitted */);
public Mono<Void> log(Transaction transaction) {
return Mono.just(transaction)
.map(this::toSenderRecord)
.transformDeferred(kafkaSender::send)
.then();
}
private SenderRecord<String, Transaction> toSenderRecord(Transaction transaction) {
// ... some converter logic
}
}
片段 6:PushPipeline
public class PushPipeline {
private HotelPriceConsumer hotelPriceConsumer;
private PriceBatchCreator priceBatchCreator;
private PriceBatchToTransactionConverter priceBatchToTransactionConverter;
private HttpClient httpClient;
private PriceLogger priceLogger;
private Disposable mainPipeline;
@PostConstruct
void initMainPipeline() {
// For bootstrapping the pipeline with Spring
// subscribe() does not block the calling thread here!
mainPipeline: mainPipeline().subscribe();
}
private Flux<Void> mainPipeline() {
hotelPriceConsumer
.consumeKafkaTopic()
.transformDeferred(priceBatchCreator::createBatches)
.transformDeferred(priceBatchToTransactionConverter::convert)
.flatMap(transaction -> httpClient
.send(transaction)
.then(priceLogger.log(transaction)))
.onErrorContinue((t, o) -> {/* Some logging + metrics */}));
}
}