Rocketmq发送消息原理(含事务消息)
前言
上一篇文章已经介绍了RocketMQ的功能,架构,从本文开始,我们将开始深入源码层面,一步一步学习RocketMQ设计原理。
在消息队列中,生产者负责发送消息到Broker,本文分享RocketMQ发送消息的实现原理以及一些注意的事项。
一、生产者端的发送流程
一般来说我们的业务应用端是生产者,负责和Broker和nameserver通信完成消息投递的功能。
在源码中,发送消息的主逻辑在
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
中,阅读起来不难,逻辑比较清晰
我把发送端的主线流程理了一张流程图。
主要步骤
1、根据Topic从nameserver或本地获取路由信息,包含topic的队列信息,broker信息等
在RocketMQ的生产者端,使用ConcurrentHashMap
将topic关联的队列信息进行缓存
private final ConcurrentMap topicPublishInfoTable = new ConcurrentHashMap();
2、根据重试次数,循环发送消息
3、使用生产者负载均衡策略(默认轮训),查找需要把消息发送哪个队列
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
这里设计比较巧妙,使用到ThreadLocal
记录上次发送的队列索引,消费者轮训负载均衡策略也使用到了该技巧。
4、消息内容组装成RemotingCommand对象,包括请求头和请求体
5、分oneway,sync,async的方式进行发送
6、如果是async,oneway会获取令牌再发送
7、组装请求头,调用netty组件发送消息,最终调用Channel
将消息内容写到socket
8、发送结果处理,这里只有同步发送模式
才直接处理结果
如果是异步,会在发送时指定一个回调函数
,在回调函数中处理结果。
- 回调接口
public interface SendCallback { void onSuccess(final SendResult sendResult); void onException(final Throwable e); }
二、Broker端接收发送消息请求与处理流程
broker这边入口是netty监听客户端消息的地方,在NettyServerHandler
类
class NettyServerHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { //接收客户端消息入口 processMessageReceived(ctx, msg); } } public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: //消息请求 processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } }
紧接着发送消息的请求处理器
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this.consumerSendMsgBack(ctx, request); default: //解析消息头 SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return null; } mqtraceContext = buildMsgContext(ctx, requestHeader); //存储消息前回调函数 this.executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; //消息处理 if (requestHeader.isBatch()) { response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); } //存储消息后回调函数 this.executeSendMessageHookAfter(response, mqtraceContext); return response; } }
消息存储逻辑在org.apache.rocketmq.store.CommitLog#putMessage
最终会将消息写入到commitlog
org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()
public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // 记录消息存储时间 msg.setStoreTimestamp(System.currentTimeMillis()); // Back to Results AppendMessageResult result = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); //写消息加锁 putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { //..... //消息内容存储到commitlog result = mappedFile.appendMessage(msg, this.appendMessageCallback); //.... } finally { putMessageLock.unlock(); } PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); //刷盘处理 handleDiskFlush(result, putMessageResult, msg); //高可用处理,将消息同步到从节点 handleHA(result, putMessageResult, msg); return putMessageResult; }
上面追加消息到commitlog,其实还没有真正的持久化
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
而是通过刷盘机制刷新数据到磁盘,判断刷盘方式,如果是同步刷盘,立即刷新缓冲数据到磁盘
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush 同步刷盘 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } // Asynchronous flush 异步刷盘 else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } } }
这个地方,同步刷盘,设计者使用了多个线程来实现刷盘功能,具有超时的能力,如果指定时间没有刷盘完成会立即返回,不会阻塞请求。
最终调用
java.nio.MappedByteBuffer#force0
或sun.nio.ch.FileDispatcherImpl#force0
从缓冲写入磁盘。
public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); try { //We only append data to fileChannel or mappedByteBuffer, never both. if (writeBuffer != null || this.fileChannel.position() != 0) { //真正的刷新到磁盘 this.fileChannel.force(false); } else { //真正的刷新到磁盘 this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }
通过同步树刷盘异步刷盘可用在一定程度上保证消息不丢失,rocketmq还支持集群模式,主从同步模式支持同步或异步,实现数据在多个节点上备份。
//等5s,如果slave未返回,则超时 public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); if (messageExt.isWaitStoreMsgOK()) {//判断开关有没有开 // Determine whether to wait 判断一下slave是否正常 if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { //master里面最新的消息偏移量 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } } // Slave problem else { //从节点与主节点同步的数据 超过256M了,slave不行了 // Tell the producer, slave not available putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); } } } }
我将broker端接收发送消息请求后的总体处理过程整理到下面流程图了
核心步骤总结如下:
1、netty接收到请求,转发到SendMessageProcessor
处理器
2、消息头解码
3、判断是是重试消息
,并且判断是否达到最大重试次数
,如果到了则转换topic和队列
,加入死信队列
4、是否是事务消息,转换内置的事务消息topic和queueId
5、不是事务消息,判断是否是延迟消息,是延迟消息
,转换成延迟消息topic(SCHEDULE_TOPIC_XXXX)和队列
(延迟等级-1,延迟等级从1开始到18)
6、创建或获取消息文件,bytebuffer
7、通过bytebuffer写入缓冲
8、如果是SYNC_FLUSH刷盘方式,立即刷盘 ,刷盘类型有同步和异步两种
9、如果Broker的角色设置主从同步是SYNC_MASTER
, 需要同步到从节点,这里是RocketMQ实现高可用
的关键
如果是事务消息,发送流程是怎样的?
在Rocketmq中,事务消息
是用来保证本地事务和发送消息逻辑同时成功的一种机制。
事务消息标记存在消息的properties,第一步是将properties解码
public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
然后将消息发送到一个内置的topic里
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { //暂存真实topic和队列id到properties MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); //topic 转换成内置事务消息topic msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }
在客户端发送事务消息
首先需要实现一个事务消息监听器,实现TransactionListener接口,分别实现本地事务逻辑,检查本地事务状态的逻辑
public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap localTrans = new ConcurrentHashMap(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { //执行本地事务逻辑 int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } //查询本地事务执行结果 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("check msg" + msg.getMsgId() +"===" + LocalDateTime.now().format( DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; default: return LocalTransactionState.COMMIT_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }
然后通过事务消息发送器发送消息
public static void main(String[] args) throws MQClientException, InterruptedException { TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); //指定事务消息发送监听器 TransactionListener transactionListener = new TransactionListenerImpl(); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; try { Message msg = new Message("TopicTest1234", tags[ tags.length], "KEY", ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } producer.shutdown(); }
看完源码,我们再看发送事务消息流程图就容易理解了。
事务消息流程总结
1、客户端使用同步发送半消息
,broker将半消息存储到内置的事务消息topic和队列
,这个时候事务消息不能被消费者消费到。
2、客户端接收发送消息返回,执行本地事务
,然后发送本地事务执行结果到broker
3、broker如果发现成功,将半消息转移到真实topic和队列
,删除半消息
,这个时候事务消息可被消费者消费,如果回滚,直接删除半消息
4、broker启用一个线程
,扫描事务消息topic里的队列里面的消息,判断是否需要检查事务状态(最大检查15次
)
5、通过oneway
方式向客户端发起查询事务状态请求,客户端查询状态,客户端通过oneway发送事务状态到broker,broker执行第3步骤。 oneway的请求是不等待结果的,只管发请求
, 这里使用oneway是为了提高回查效率,避免阻塞,同时客户端接收到事务状态查询请求后,会主动将事务状态发送给broker。
消息发送篇总结
本文分析了消息发送和broker处理消息发送请求的实现,得出结论
1、生产者发送消息会发送到指定的topic队列,默认采用轮训算法
实现发送的负载均衡
2、发送消息类型有3种,分别是同步,异步,单次(oneway)
,其中同步会重试3次
。
3、broker存储消息采用mmap
机制实现零拷贝,刷盘机制支持同步刷盘和异步刷盘
4、broker主从复制模式支持同步复制,异步复制
5、事务消息采用内置topic+消息回查机制实现本地事务和发送逻辑的事务一致性。