哈喽,大家好,我是了不起。
在实际的企业开发中,消息中间件是至关重要的组件之一。如常见的RabbitMQ和Kafka,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,这时候 Spring Cloud Stream 给我们提供了一种解耦合的方式。
简介
Spring Cloud Stream 由一个中间件中立的核组成。
应用通过 Spring Cloud Stream 插入的Input(相当于消费者Consumer,它是从队列中接收消息的)和Output(相当于生产者Producer,它是从队列中发送消息的。)通道与外界交流。
通道通过指定中间件的Binder实现与外部代理连接。
业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。
详细介绍
核心概念
Spring Cloud Stream 为各大消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
Spring Cloud Stream 提供了很多抽象和基础组件来简化消息驱动型微服务应用。包含以下内容:
- Spring Cloud Stream的应用模型
- 绑定抽象
- 持久化发布/订阅支持
- 消费者组支持
- 分片支持(Partitioning Support)
- 可插拔API
应用模型
Spring Cloud Stream由一个中立的中间件内核组成。Spring Cloud Stream会注入输入和输出的channels,应用程序通过这些channels与外界通信,而channels则是通过一个明确的中间件Binder与外部brokers连接。
图片
各大消息中间件的绑定抽象
Spring Cloud Stream 提供对Kafka、Rabbit MQ、Redis、Gemfire的Binder实现。Spring Cloud Stream还包括了一个TestSupportBinder、TestSupportBinder预留一个未更改的channel以便于直接地、可靠地和channels通信。
分区支持
分区在有状态处理中是一个很重要的概念,其重要性体现在性能和一致性上,要确保所有相关数据被一并处理,例如,在时间窗平均计算的例子中,给定传感器测量结果应该都由同一应用实例进行计算。
Spring Cloud Stream支持在一个应用程序的多个实例之间数据分区,在分区的情况下,物理通信介质(例如,topic代理)被视为多分区结构。一个或多个生产者应用程序实例将数据发送给多个消费应用实例,并保证共同的特性的数据由相同的消费者实例处理。
Spring Cloud Stream 提供了一个通用的抽象,用于统一方式进行分区处理,因此分区可以用于自带分区的代理(如Kafka)或者不带分区的代理(如RabbieMQ)
编程模型
Spring Cloud Stream 提供了一些预定义的注解,用于绑定输入和输出channels,以及如何监听channels。
通过@EnableBinding触发绑定
将@EnableBinding注解添加到应用的配置类,就可以把一个spring应用转换成Spring Cloud Stream应用,@EnableBinding注解本身就包含@Configuration注解,会触发Spring Cloud Stream 基本配置。
@Import(...)
@Configuration
@EnableIntegration
public @interface EnableBinding {
...
Class[] value() default {};
}
@Input 与 @Output
一个Spring Cloud Stream应用可以有任意数目的input和output通道,后者通过@Input和@Output注解在接口中定义。
@StreamListener
定义在方法中,被修饰的方法注册为消息中间件上数据流的事件监听器,注解中属性值对应了监听的消息通道名。
Source、Sink和Processor
Spring Cloud Stream提供了三个开箱即用的预定义接口。
- Source用于有单个输出(outbound)通道的应用。
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
- Sink用于有单个输入(inbound)通道的应用。
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
- Processor用于单个应用同时包含输入和输出通道的情况。
public interface Processor extends Source, Sink {
}
极简实例
下面是一个非常简单的 SpringBootApplication应用,通过依赖Spring Cloud Stream,从Input通道监听消息然后返回应答到Output通道,只要添加配置文件就可以应用。
@SpringBootApplication
@EnableBinding(Processor.class)
public class ServiceApplication {
public static void main(String[] args) {
SpringApplication.run(MyLoggerServiceApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public LogMessage enrichLogMessage(LogMessage log) {
return new LogMessage(String.format("[1]: %s", log.getMessage()));
}
}
下面解释下这个示例中相关注解的应用:
- @EnableBinding声明了这个应用程序绑定了2个通道:INPUT和OUTPUT。这2个通道是在接口Processor中定义的(Spring Cloud Stream默认设置)。所有通道都是配置在一个具体的消息中间件或绑定器中。
- @StreamListener(Processor.INPUT)表明这里在input中提取消息,并且处理。
- @SendTo(Processor.OUTPUT)表明在output中返回消息。
其他特性
消息发送失败的处理
消息发送失败后悔发送到默认的一个“topic.errors"的channel中(topic是配置的destination)。要配置消息发送失败的处理,需要将错误消息的channel打开。
消费者配置如下
spring:
application:
name: spring-cloud-stream-producer
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
producer:
group: test
sync: true
bindings:
output:
destination: stream-test-topic
content-type: text/plain # 内容格式。这里使用 JSON
producer:
errorChannelEnabled: true
在启动类中配置错误消息的Channel信息
@Bean("stream-test-topic.errors")
MessageChannel testoutPutErrorChannel(){
return new PublishSubscribeChannel();
}
新建异常处理service
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@Service
public class ErrorProducerService {
@ServiceActivator(inputChannel = "stream-test-topic.errors")
public void receiveProducerError(Message message){
System.out.println("receive error msg :"+message);
}
}
当发生异常时,由于测试类中已经将异常捕获,处理发送异常主要是在这里进行。
总结
这篇文章根据 Spring Cloud Stream 的官方文档,对Stream做了一个整体的介绍,包括设计目标,应用场景,业务模型以及对外开放的注解,希望大家能够学以致用。