本地docker搭建RocketMQ用于测试参考# Docker 部署 RocketMQ 5.x
RocketMQ官方的Python客户端是 rocketmq-client-python
这个客户端只支持Linux和macOS,并且需要安装额外的C++库支持。
以centos为例,安装rocketmq-client-cpp库:
wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
sudo rpm -ivh rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
安装rocketmq-client-python:
pip install rocketmq-client-python
RocketMQ生产者
from rocketmq.client import Producer, Message
producer = Producer('PID-XXX') # 创建生产者, 生产者组PID-XXX, 生产消息无序
producer.set_name_server_address('127.0.0.1:9876') # 连接到RocketMQ名称服务
# 设置会话凭证, ACL认证用户名, ACL认证密码, 通道名标识连接来源(一般为消费者应用程序名)
producer.set_session_credentials("access_key", "access_secret", "channel")
producer.start() # 启动生产者
msg = Message('YOUR-TOPIC') # 指定目标Topic
msg.set_keys('XXX') # 设置消息key
msg.set_tags('XXX') # 设置消息标签, 用于服务端过滤消息
msg.set_body('XXXX') # 设置消息体, 一般二进制传输
ret = producer.send_sync(msg) # 阻塞等待消息发送完成
print(ret.status, ret.msg_id, ret.offset) # 打印生产结果
producer.shutdown() # 关闭生产者
RocketMQ消费者
import time
from rocketmq.client import PushConsumer, ConsumeStatus, ReceivedMessage
# 消息处理回调函数
def callback(msg: ReceivedMessage):
print(msg.id, msg.body, msg.tags, msg.keys)
return ConsumeStatus.CONSUME_SUCCESS
# 创建消费者, 消费者组PUBLIC_GROUP_APP1, 有序消费消息
consumer = PushConsumer('PUBLIC_GROUP_APP1', orderly=True)
# 连接到RocketMQ名称服务
consumer.set_name_server_address('127.0.0.1:9876')
# 设置订阅主题TOP_COMMON, 消息处理回调函数callback, 消息过滤标签ALARM_TAG
consumer.subscribe('TOP_COMMON', callback, "ALARM_TAG")
# 设置会话凭证, ACL认证用户名, ACL认证密码, 通道名标识连接来源(一般为消费者应用程序名)
consumer.set_session_credentials("MQPublicUser", "12345678", "ALARM_CENTER")
# 启动消费者, 此时consumer创建子线程持续消费消息并调用callback处理
consumer.start()
# 主线程阻塞1s后停止消费者
time.sleep(1)
consumer.shutdown()
参考资料
Apache RocketMQ
rocketmq-client-python