写在最前
如果这个项目让你有所收获,记得 Star 关注哦,这对我是非常不错的鼓励与支持。
源码地址(后端):gitee.com/csps/mingyu…
源码地址(前端):gitee.com/csps/mingyu…
文档地址:gitee.com/csps/mingyu…
消息队列
消息队列(Message Queue)是一种用于在分布式系统中进行异步通信的通信模式和技术。它允许不同的组件或服务之间通过发送和接收消息来进行通信,而无需直接耦合它们的实现细节。消息队列通常用于解耦系统的不同部分,提高系统的可伸缩性、可靠性和灵活性。
以下是消息队列的一些关键特点和概念:
使用场景
- 异步通信:允许不同的系统组件异步通信,提高系统的响应性能。
- 解耦组件:降低系统中不同组件之间的耦合,使得系统更容易维护和扩展。
- 负载均衡:通过分发消息给多个消费者来平衡工作负载。
- 消息传递可靠性:确保消息的可靠传递,即使在系统中的故障情况下也能保证不丢失消息。
- 日志和审计:用于记录和审计系统活动,以便后续分析和故障排除。
技术选型
一些常见的消息队列实现包括 RabbitMQ、RocketMQ、Kafka等,选择适合特定应用场景的消息队列是关键,因为它会影响系统的性能、可靠性和可扩展性。不同的场景可能更适合不同的消息队列系统。
基础对比
RabbitMQ | RocketMQ | Kafka | |
---|---|---|---|
推出时间 | 2007年 | 2012年 | 2012年 |
所属 | Pivotal开源,Mozilla | 阿里开源,Apache | Linkin开源,Apache |
社区活跃度 | 高 | 高 | 高 |
开发语言 | Erlang | Java | Scala、Java |
支持的协议 | AMQP | 自己定义一套 | 自行定义一套(基于TCP) |
吞吐量 | 万级(5.95w/s) | 十万级(11.6w/s) | 十万级(17.3w/s) |
topic数量对吞吐量的影响 | topic达到几百,几千个时,吞吐量会有较小幅度的下降 | topic达到几十,几百个时,吞吐量会大幅度下降 | |
时效性 | 微秒级 | 毫秒级 | 毫秒级 |
可用性 | 高(主从架构) | 非常高(分布式架构) | 非常高(分布式架构) |
使用场景 | 适用于各种规模的应用程序,尤其适合需要多语言支持的场景。 | 适用于大规模的企业应用和互联网场景,尤其在阿里巴巴等大型公司中得到广泛应用。 | 适用于大数据处理、实时数据流分析、事件溯源等高吞吐量场景。 |
功能对比
RabbitMQ | RocketMQ | Kafka | |
---|---|---|---|
延迟队列 | ✅ | ✅ | ❌ |
死信队列 | ✅ | ✅ | ❌ |
优先级队列 | ✅ | ❌ | ❌ |
消息回溯 | ❌ | ✅ | ✅ |
消焦持久化 | ✅ | ✅ | ✅ |
消魚确认机制 | 单条 | Offset | Offset |
消息TTL | ✅ | ✅ | ❌ |
消息重复 | 支持at least once、at most once | 支持at least once | 支持at least once、at most once |
消息顺序性 | ❌ | 消费者加锁 | 分区有序 |
消息事务 | ❌ | ✅ | ❌ |
消息过滤 | ❌ | ✅ | ❌ |
消息查询 | ✅ | ✅ | ❌ |
消息重新消费 | ❌ | ✅ | ✅ |
消费模式 | 队列模式 | 广播模式+集群模式 | 流模式 |
消费推拉模式 | Pull、Push | Pull、Push | Pull |
批量发送 | ❌ | ✅ | ✅ |
选型总结
通过对RabbitMQ、RocketMQ、Kafka 基础与功能两个维度对比,本项目将采用 RocketMQ、Kafka 两个消息队列。
RocketMQ 适用场景
- 高性能、高可用性的消息传递场景,例如实时数据分析、电商秒杀等。
- 需要强大的消息过滤和消息追踪功能的场景,例如广告投放、用户推送等。
- 需要分布式事务支持的场景,RocketMQ提供了分布式事务消息特性。
Kafka 适用场景
- 需要高吞吐量和低延迟的实时数据处理场景,例如用户行为日志分析、实时监控等。
- 需要保留大量历史数据并支持数据回溯的场景,例如大数据分析、数据仓库等。
- 需要构建事件驱动架构的场景,Kafka可以作为事件源和消息总线。
Docker 安装 RocketMQ
创建目录结构
具体内容可以参考:mingyue/docker/rocketmq
rocketmq
/broker1
/conf
broker.conf
/logs
README.md
/store
README.md
/namesrv
/logs
README.md
docker-compose.yml
编写 docker-compose rocketmq 服务
version: '3.8'
services:
mingyue-mqnamesrv:
image: apache/rocketmq:4.9.4
container_name: mingyue-mqnamesrv
ports:
- "9876:9876"
environment:
JAVA_OPT: -server -Xms512m -Xmx512m
command: sh mqnamesrv
volumes:
- ./rocketmq/namesrv/logs:/home/rocketmq/logs/rocketmqlogs
mingyue-mqbroker1:
image: apache/rocketmq:4.9.4
container_name: mingyue-mqbroker1
ports:
- "10911:10911"
- "10909:10909"
- "10912:10912"
environment:
JAVA_OPT_EXT: -server -Xms512M -Xmx512M -Xmn256m
command: sh mqbroker -c /home/rocketmq/rocketmq-4.9.4/conf/broker.conf
depends_on:
- mingyue-mqnamesrv
volumes:
- ./rocketmq/broker1/conf/broker.conf:/home/rocketmq/rocketmq-4.9.4/conf/broker.conf
- ./rocketmq/broker1/logs:/home/rocketmq/logs/rocketmqlogs
- ./rocketmq/broker1/store:/home/rocketmq/store
mingyue-mqconsole:
image: styletang/rocketmq-console-ng
container_name: mingyue-mqconsole
ports:
- "19876:19876"
links:
- mingyue-mqnamesrv:mqnamesrv #可以用mqnamesrv这个域名访问rocketmq服务
environment:
JAVA_OPTS: -Dserver.port=19876 -Drocketmq.namesrv.addr=mqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false
depends_on:
- mingyue-mqnamesrv
启动测试
启动前先执行部分目录赋予读写权限,例:
chmod 777 /docker/rocketmq/broker1/logs
访问 mingyue-mqconsole 可以打开 Dashboard 页面即可:http://ip:19876/#/
Docker 安装 Kafka
创建目录结构
具体内容可以参考:mingyue/docker/kafka
kafka
/data
README.md
docker-compose.yml
编写 docker-compose kafka 服务
version: '3.8'
services:
mingyue-zookeeper:
image: 'bitnami/zookeeper:3.8.0'
container_name: mingyue-zookeeper
ports:
- "2181:2181"
environment:
TZ: Asia/Shanghai
ALLOW_ANONYMOUS_LOGIN: "yes"
ZOO_SERVER_ID: 1
ZOO_PORT_NUMBER: 2181
# 自带的控制台 一般用不上可自行开启
ZOO_ENABLE_ADMIN_SERVER: "no"
# 自带控制台的端口
ZOO_ADMIN_SERVER_PORT_NUMBER: 8080
mingyue-kafka:
image: 'bitnami/kafka:3.2.0'
container_name: mingyue-kafka
ports:
- "9092:9092"
environment:
TZ: Asia/Shanghai
# 更多变量 查看文档 https://github.com/bitnami/bitnami-docker-kafka/blob/master/README.md
KAFKA_BROKER_ID: 1
# 监听端口
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
# 实际访问ip 本地用 127 内网用 192 外网用 外网ip
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://宿主机IP:9092
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: "yes"
volumes:
- /docker/kafka/data:/bitnami/kafka/data
depends_on:
- mingyue-zookeeper
links:
- mingyue-zookeeper:zookeeper #可以用zookeeper这个域名访问zookeeper服务
mingyue-kafka-manager:
image: sheepkiller/kafka-manager:latest
container_name: mingyue-kafka-manager
ports:
- "19092:19092"
environment:
ZK_HOSTS: mingyue-zookeeper:2181
APPLICATION_SECRET: letmein
KAFKA_MANAGER_USERNAME: mingyue
KAFKA_MANAGER_PASSWORD: mingyue123
KM_ARGS: -Dhttp.port=19092
depends_on:
- mingyue-kafka
links:
- mingyue-zookeeper:zookeeper #可以用zookeeper这个域名访问zookeeper服务
启动测试
启动前先执行部分目录赋予读写权限,例:chmod 777 /docker/kafka/data`
访问 mingyue-kafka-manager 可以打开 Clusters 页面即可:http://mingyue-mq:19092/
Spring Cloud Stream
Spring Cloud Stream 是一个用于构建与共享消息系统连接的高度可扩展的事件驱动微服务的框架。该框架提供了一个基于已经建立和熟悉的 Spring 成语和最佳实践的灵活编程模型,包括支持持久的 pub/sub 语义、消费者组和有状态分区。
说人话:Spring Cloud Stream 是 Spring 用来整合各种 MQ 中间件的框架。
Spring Cloud Stream的核心构建块
- Destination Binders(目标绑定器) :目标指的是 Kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。
- Destination Bindings(目标绑定) :MQ 中间件与最终用户提供的应用程序代码(生产者/消费者)之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建)
- Message(消息) :一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。
Spring Cloud Stream 架构图
Spring Cloud Stream 应用程序由中间件中立的核心组成。该应用程序通过在外部代理暴露的目的地和代码中的输入/输出参数之间建立绑定,与外部世界进行通信。建立绑定所需的经纪人特定细节由特定于中间件的 Binder 实现处理。
- Middleware:消息中间件,如RabbitMQ、Kafka、RocketMQ等。
- Binder:可以认为是适配器,用来将Stream与中间件连接起来,不同的Binder对应不同的中间件,需要我们配置。
- Application:由Stream封装的消息机制,很少自定义开发。
- Inputs:输入,可以自定义开发。
- Outputs:输出,可以自定义开发。
小结
本节介绍了什么是消息队列、以及选择什么样的消息队列,如何对比,最终选择了 Kafka 与 RocketMQ。然后给出了 Docker 一件部署 Kafka 与 RocketMQ 的 docker-compose 脚本。阐述了什么是 Spring Cloud Stream,未来将会使用 Spring Cloud Stream 作为 MQ 中间价的框架。
下面我们就使用 Spring Cloud Stream 来搭建代码与 MQ 之间的桥梁~~~