029从零搭建微服务消息队列(一)

2023年 9月 27日 53.6k 0

写在最前

如果这个项目让你有所收获,记得 Star 关注哦,这对我是非常不错的鼓励与支持。

源码地址(后端):gitee.com/csps/mingyu…

源码地址(前端):gitee.com/csps/mingyu…

文档地址:gitee.com/csps/mingyu…

消息队列

消息队列(Message Queue)是一种用于在分布式系统中进行异步通信的通信模式和技术。它允许不同的组件或服务之间通过发送和接收消息来进行通信,而无需直接耦合它们的实现细节。消息队列通常用于解耦系统的不同部分,提高系统的可伸缩性、可靠性和灵活性。

以下是消息队列的一些关键特点和概念:

  • 消息生产者(Producer): 这是向消息队列发送消息的组件或应用程序。生产者将消息发送到队列中,通常包括一些有关消息内容的元数据。
  • 消息队列(Queue): 这是用于存储消息的中间件组件,消息在这里排队等待被处理。消息队列通常支持不同的消息传递模式,例如先进先出(FIFO)或发布/订阅模式。
  • 消息消费者(Consumer): 这是从消息队列接收消息并进行处理的组件或应用程序。消费者订阅特定队列,并在有新消息可用时接收并处理它们。
  • 消息代理(Message Broker): 这是协调消息的发送和接收的中间件服务。消息代理通常负责消息的路由、传递和确保消息的可靠性。
  • 消息确认(Acknowledgment): 消费者在成功处理消息后,通常会向消息队列发送确认,以告知队列消息已被处理。这确保了消息不会被重复处理。
  • 消息持久性(Message Durability): 消息队列通常支持消息的持久性,这意味着即使在消息被传递给消费者之后,消息仍然会在系统中存储,以确保不会丢失。
  • 消息超时(Message Timeout): 有时候,消息队列会设置消息的超时时间,以确保消息在一定时间内被处理,否则可能会被认为是过期消息。
  • 发布/订阅模式(Publish/Subscribe): 这是一种消息传递模式,其中生产者将消息发布到一个主题(topic),而不是特定的队列,然后多个消费者订阅该主题以接收消息。这种模式支持广播消息。
  • 使用场景

    • 异步通信:允许不同的系统组件异步通信,提高系统的响应性能。
    • 解耦组件:降低系统中不同组件之间的耦合,使得系统更容易维护和扩展。
    • 负载均衡:通过分发消息给多个消费者来平衡工作负载。
    • 消息传递可靠性:确保消息的可靠传递,即使在系统中的故障情况下也能保证不丢失消息。
    • 日志和审计:用于记录和审计系统活动,以便后续分析和故障排除。

    技术选型

    一些常见的消息队列实现包括 RabbitMQ、RocketMQ、Kafka等,选择适合特定应用场景的消息队列是关键,因为它会影响系统的性能、可靠性和可扩展性。不同的场景可能更适合不同的消息队列系统。

    基础对比

    RabbitMQ RocketMQ Kafka
    推出时间 2007年 2012年 2012年
    所属 Pivotal开源,Mozilla 阿里开源,Apache Linkin开源,Apache
    社区活跃度
    开发语言 Erlang Java Scala、Java
    支持的协议 AMQP 自己定义一套 自行定义一套(基于TCP)
    吞吐量 万级(5.95w/s) 十万级(11.6w/s) 十万级(17.3w/s)
    topic数量对吞吐量的影响 topic达到几百,几千个时,吞吐量会有较小幅度的下降 topic达到几十,几百个时,吞吐量会大幅度下降
    时效性 微秒级 毫秒级 毫秒级
    可用性 高(主从架构) 非常高(分布式架构) 非常高(分布式架构)
    使用场景 适用于各种规模的应用程序,尤其适合需要多语言支持的场景。 适用于大规模的企业应用和互联网场景,尤其在阿里巴巴等大型公司中得到广泛应用。 适用于大数据处理、实时数据流分析、事件溯源等高吞吐量场景。

    功能对比

    RabbitMQ RocketMQ Kafka
    延迟队列
    死信队列
    优先级队列
    消息回溯
    消焦持久化
    消魚确认机制 单条 Offset Offset
    消息TTL
    消息重复 支持at least once、at most once 支持at least once 支持at least once、at most once
    消息顺序性 消费者加锁 分区有序
    消息事务
    消息过滤
    消息查询
    消息重新消费
    消费模式 队列模式 广播模式+集群模式 流模式
    消费推拉模式 Pull、Push Pull、Push Pull
    批量发送

    选型总结

    通过对RabbitMQ、RocketMQ、Kafka 基础与功能两个维度对比,本项目将采用 RocketMQ、Kafka 两个消息队列。

    RocketMQ 适用场景

    • 高性能、高可用性的消息传递场景,例如实时数据分析、电商秒杀等。
    • 需要强大的消息过滤和消息追踪功能的场景,例如广告投放、用户推送等。
    • 需要分布式事务支持的场景,RocketMQ提供了分布式事务消息特性。

    Kafka 适用场景

    • 需要高吞吐量和低延迟的实时数据处理场景,例如用户行为日志分析、实时监控等。
    • 需要保留大量历史数据并支持数据回溯的场景,例如大数据分析、数据仓库等。
    • 需要构建事件驱动架构的场景,Kafka可以作为事件源和消息总线。

    Docker 安装 RocketMQ

    创建目录结构

    具体内容可以参考:mingyue/docker/rocketmq

    rocketmq
      /broker1
        /conf
          broker.conf
        /logs
          README.md
        /store
          README.md
      /namesrv
        /logs
          README.md
    docker-compose.yml
    

    编写 docker-compose rocketmq 服务

    version: '3.8'
    services:
      mingyue-mqnamesrv:
        image: apache/rocketmq:4.9.4
        container_name: mingyue-mqnamesrv
        ports:
          - "9876:9876"
        environment:
          JAVA_OPT: -server -Xms512m -Xmx512m
        command: sh mqnamesrv
        volumes:
          - ./rocketmq/namesrv/logs:/home/rocketmq/logs/rocketmqlogs
    ​
      mingyue-mqbroker1:
        image: apache/rocketmq:4.9.4
        container_name: mingyue-mqbroker1
        ports:
          - "10911:10911"
          - "10909:10909"
          - "10912:10912"
        environment:
          JAVA_OPT_EXT: -server -Xms512M -Xmx512M -Xmn256m
        command: sh mqbroker -c /home/rocketmq/rocketmq-4.9.4/conf/broker.conf
        depends_on:
          - mingyue-mqnamesrv
        volumes:
          - ./rocketmq/broker1/conf/broker.conf:/home/rocketmq/rocketmq-4.9.4/conf/broker.conf
          - ./rocketmq/broker1/logs:/home/rocketmq/logs/rocketmqlogs
          - ./rocketmq/broker1/store:/home/rocketmq/store
    ​
      mingyue-mqconsole:
        image: styletang/rocketmq-console-ng
        container_name: mingyue-mqconsole
        ports:
          - "19876:19876"
        links:
          - mingyue-mqnamesrv:mqnamesrv #可以用mqnamesrv这个域名访问rocketmq服务
        environment:
          JAVA_OPTS: -Dserver.port=19876 -Drocketmq.namesrv.addr=mqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false
        depends_on:
          - mingyue-mqnamesrv
    

    启动测试

    启动前先执行部分目录赋予读写权限,例:chmod 777 /docker/rocketmq/broker1/logs

    访问 mingyue-mqconsole 可以打开 Dashboard 页面即可:http://ip:19876/#/

    Docker 安装 Kafka

    创建目录结构

    具体内容可以参考:mingyue/docker/kafka

    kafka
      /data
        README.md
    docker-compose.yml
    

    编写 docker-compose kafka 服务

    version: '3.8'
    services:
      mingyue-zookeeper:
        image: 'bitnami/zookeeper:3.8.0'
        container_name: mingyue-zookeeper
        ports:
          - "2181:2181"
        environment:
          TZ: Asia/Shanghai
          ALLOW_ANONYMOUS_LOGIN: "yes"
          ZOO_SERVER_ID: 1
          ZOO_PORT_NUMBER: 2181
          # 自带的控制台 一般用不上可自行开启
          ZOO_ENABLE_ADMIN_SERVER: "no"
          # 自带控制台的端口
          ZOO_ADMIN_SERVER_PORT_NUMBER: 8080
    ​
      mingyue-kafka:
        image: 'bitnami/kafka:3.2.0'
        container_name: mingyue-kafka
        ports:
          - "9092:9092"
        environment:
          TZ: Asia/Shanghai
          # 更多变量 查看文档 https://github.com/bitnami/bitnami-docker-kafka/blob/master/README.md
          KAFKA_BROKER_ID: 1
          # 监听端口
          KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
          # 实际访问ip 本地用 127 内网用 192 外网用 外网ip
          KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://宿主机IP:9092
          KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
          ALLOW_PLAINTEXT_LISTENER: "yes"
        volumes:
          - /docker/kafka/data:/bitnami/kafka/data
        depends_on:
          - mingyue-zookeeper
        links:
          - mingyue-zookeeper:zookeeper #可以用zookeeper这个域名访问zookeeper服务
    ​
      mingyue-kafka-manager:
        image: sheepkiller/kafka-manager:latest
        container_name: mingyue-kafka-manager
        ports:
          - "19092:19092"
        environment:
          ZK_HOSTS: mingyue-zookeeper:2181
          APPLICATION_SECRET: letmein
          KAFKA_MANAGER_USERNAME: mingyue
          KAFKA_MANAGER_PASSWORD: mingyue123
          KM_ARGS: -Dhttp.port=19092
        depends_on:
          - mingyue-kafka
        links:
          - mingyue-zookeeper:zookeeper #可以用zookeeper这个域名访问zookeeper服务
    

    启动测试

    启动前先执行部分目录赋予读写权限,例:chmod 777 /docker/kafka/data`

    访问 mingyue-kafka-manager 可以打开 Clusters 页面即可:http://mingyue-mq:19092/

    Spring Cloud Stream

    Spring Cloud Stream 是一个用于构建与共享消息系统连接的高度可扩展的事件驱动微服务的框架。该框架提供了一个基于已经建立和熟悉的 Spring 成语和最佳实践的灵活编程模型,包括支持持久的 pub/sub 语义、消费者组和有状态分区。

    说人话:Spring Cloud Stream 是 Spring 用来整合各种 MQ 中间件的框架。

    Spring Cloud Stream的核心构建块

    • Destination Binders(目标绑定器) :目标指的是 Kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。
    • Destination Bindings(目标绑定) :MQ 中间件与最终用户提供的应用程序代码(生产者/消费者)之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建)
    • Message(消息) :一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。

    Spring Cloud Stream 架构图

    Spring Cloud Stream 应用程序由中间件中立的核心组成。该应用程序通过在外部代理暴露的目的地和代码中的输入/输出参数之间建立绑定,与外部世界进行通信。建立绑定所需的经纪人特定细节由特定于中间件的 Binder 实现处理。

    • Middleware:消息中间件,如RabbitMQ、Kafka、RocketMQ等。
    • Binder:可以认为是适配器,用来将Stream与中间件连接起来,不同的Binder对应不同的中间件,需要我们配置。
    • Application:由Stream封装的消息机制,很少自定义开发。
    • Inputs:输入,可以自定义开发。
    • Outputs:输出,可以自定义开发。

    小结

    本节介绍了什么是消息队列、以及选择什么样的消息队列,如何对比,最终选择了 Kafka 与 RocketMQ。然后给出了 Docker 一件部署 Kafka 与 RocketMQ 的 docker-compose 脚本。阐述了什么是 Spring Cloud Stream,未来将会使用 Spring Cloud Stream 作为 MQ 中间价的框架。

    下面我们就使用 Spring Cloud Stream 来搭建代码与 MQ 之间的桥梁~~~

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论