RocketMQ Python客户端: rocketclientpython

2023年 9月 12日 40.3k 0

本地docker搭建RocketMQ用于测试参考# Docker 部署 RocketMQ 5.x

RocketMQ官方的Python客户端是 rocketmq-client-python

这个客户端只支持Linux和macOS,并且需要安装额外的C++库支持。

以centos为例,安装rocketmq-client-cpp库:

wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
sudo rpm -ivh rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm

安装rocketmq-client-python:

pip install rocketmq-client-python

RocketMQ生产者

from rocketmq.client import Producer, Message

producer = Producer('PID-XXX') # 创建生产者, 生产者组PID-XXX, 生产消息无序
producer.set_name_server_address('127.0.0.1:9876') # 连接到RocketMQ名称服务
# 设置会话凭证, ACL认证用户名, ACL认证密码, 通道名标识连接来源(一般为消费者应用程序名)
producer.set_session_credentials("access_key", "access_secret", "channel")
producer.start() # 启动生产者

msg = Message('YOUR-TOPIC') # 指定目标Topic
msg.set_keys('XXX') # 设置消息key
msg.set_tags('XXX') # 设置消息标签, 用于服务端过滤消息
msg.set_body('XXXX') # 设置消息体, 一般二进制传输
ret = producer.send_sync(msg) # 阻塞等待消息发送完成
print(ret.status, ret.msg_id, ret.offset) # 打印生产结果
producer.shutdown() # 关闭生产者

RocketMQ消费者

import time

from rocketmq.client import PushConsumer, ConsumeStatus, ReceivedMessage


# 消息处理回调函数
def callback(msg: ReceivedMessage):
    print(msg.id, msg.body, msg.tags, msg.keys)
    return ConsumeStatus.CONSUME_SUCCESS


# 创建消费者, 消费者组PUBLIC_GROUP_APP1, 有序消费消息
consumer = PushConsumer('PUBLIC_GROUP_APP1', orderly=True)
# 连接到RocketMQ名称服务
consumer.set_name_server_address('127.0.0.1:9876')
# 设置订阅主题TOP_COMMON, 消息处理回调函数callback, 消息过滤标签ALARM_TAG
consumer.subscribe('TOP_COMMON', callback, "ALARM_TAG")
# 设置会话凭证, ACL认证用户名, ACL认证密码, 通道名标识连接来源(一般为消费者应用程序名)
consumer.set_session_credentials("MQPublicUser", "12345678", "ALARM_CENTER")
# 启动消费者, 此时consumer创建子线程持续消费消息并调用callback处理
consumer.start()
# 主线程阻塞1s后停止消费者
time.sleep(1)
consumer.shutdown()

参考资料

Apache RocketMQ

rocketmq-client-python

相关文章

服务器端口转发,带你了解服务器端口转发
服务器开放端口,服务器开放端口的步骤
产品推荐:7月受欢迎AI容器镜像来了,有Qwen系列大模型镜像
如何使用 WinGet 下载 Microsoft Store 应用
百度搜索:蓝易云 – 熟悉ubuntu apt-get命令详解
百度搜索:蓝易云 – 域名解析成功但ping不通解决方案

发布评论