消息存储文件
rocketMQ的消息持久化在我们在搭建集群时都特意指定的文件存储路径,进入指定的store目录下就可以看到。
下面介绍各文件含义
CommitLog
存储消息的元数据。produce发出的所有消息都会顺序存入到CommitLog文件当中。 CommitLog由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名。
ConsumerQueue
对CommitLog做索引,把消息按照Topic、队列进行归类并存储在ConsumerQueue中,但是存储的并不是消息本身,而是消息在CommitLog的索引。ConsumerQueue中存储的有消息的offset、size、Tag等等,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog,方便消费者组快速定位到对应的消息!
Index
类似于ConsumerQueue,也是对CommitLog做索引,与ConsumerQueue不同的是:为消息查询提供了一种通过key或时间区间来查询消息的方法,也是记录消息的offset、size、Tag等等
abort
这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建abort文件,在正常关闭服务时删除。
但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除。
因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作。
checkpoint
数据存盘检查点,存储CommitLog文件最后一次刷盘时间戳、consumerquueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。
config/*.json:
这些文件是将RocketMQ的一些关键配置信息以能看懂的json形式进行存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset 等等一些信息。
其中consumerOffset.json是集群模式下的消费者的消费进度。
{
"offsetTable":{
"ojbk@ojbkConsumer":{0:1,1:1,2:1,3:1,4:1,5:1,6:1,7:1
},
"%RETRY%1048@1048":{0:0
},
"%RETRY%rnmConsumer@rnmConsumer":{0:293
},
"tyrant@group1":{0:11,1:9,2:8,3:11,4:11,5:8,6:9,7:9
},
"tyrant@1048":{0:15,1:13,2:12,3:15,4:15,5:12,6:13,7:13
},
"%RETRY%group1@group1":{0:1608
},
"ok@okConsumer":{0:1,1:1,2:1,3:1,4:1,5:1,6:1,7:1
},
"%RETRY%ojbkConsumer@ojbkConsumer":{0:6
},
"rnm@rnmConsumer":{0:20,1:20,2:20,3:20,4:20,5:20,6:20,7:20
},
"%RETRY%okConsumer@okConsumer":{0:0
}
}
}
.rocketmq_offsets
C:Users彭方亮.rocketmq_offsets172.18.95.180@DEFAULT
其中consumerOffset.json是广播模式下的消费者的消费进度。
比如C:Users彭方亮.rocketmq_offsets172.18.95.180@DEFAULTtyrantConsumeroffsets.json
{
"offsetTable":{{
"brokerName":"broker-a",
"queueId":2,
"topic":"tyrantor"
}:2,{
"brokerName":"broker-a",
"queueId":3,
"topic":"tyrantor"
}:2,{
"brokerName":"broker-a",
"queueId":4,
"topic":"tyrantor"
}:2,{
"brokerName":"broker-a",
"queueId":5,
"topic":"tyrantor"
}:2,{
"brokerName":"broker-a",
"queueId":6,
"topic":"tyrantor"
}:2,{
"brokerName":"broker-a",
"queueId":7,
"topic":"tyrantor"
}:2,{
"brokerName":"broker-a",
"queueId":0,
"topic":"tyrantor"
}:2,{
"brokerName":"broker-a",
"queueId":1,
"topic":"tyrantor"
}:2
}
}
消息存储流程分析
消息存储流程
1.要发送的消息,会按顺序写入commitlog中,这里所有topic和queue共享一个文件
2.存入commitlog后,由于消息会按照topic纬度来消费,会异步构建consumeQueue(逻辑队列)和index(索引文件),consumeQueue存储消息的commitlogOffset/messageSize/tagHashCode, 方便定位commitlog中的消息实体。
3.每个 Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。索引文件(Index)提供消息检索的能力,主要在问题排查和数据统计等场景应用
4.消费者会从consumeQueue取到msgOffset,方便快速取出消息
RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题消息,保证消息存储是完全的顺序写,但这样给文件读取带来了不便。
为此RocketMQ为了方便消息消费构建了消息消费队列文件ConsumeQueue,基于主题与队列进行组织。
同时RocketMQ为消息实现了Hash索引文件IndexFile,可以为消息设置索引键,根据所以能够快速从CommitLog文件中检索消息。
当消息达到CommitLog后,会通过ReputMessageService线程接近实时地将消息转发给消息消费队列文件ConsumeQueue与索引文件IndexFile。
对于生产者将消息写入broker的时候,broker会直接把消息写入到磁盘的commitLog文件,那么broker是如何提升整个过程的性能的呢?
问题分析下:因为这个部分性能会直接提升broker处理消息写入的吞吐量,比如写入一条消息到commitLog磁盘文件假设需要10ms,那么每个线程每秒可以处理100个写入消息,假设有100个线程,每秒钟只能处理1万个写入消息请求。 但是如果将消息写入commitLog磁盘文件的性能优化为只需要1ms,那么每个线程每秒可以处理1000个消息写入,此时100个线程可以处理10万个写入请求,所以明显的可以看到,broker将接收到的消息写入commitLog磁盘文件的性能,对他的TPS有很大的影响.
铺垫【broker是基于OS操作系统的pageCache和顺序写两个机制,来提升commitLog文件的性能的】;
首先broker是以顺序的方式将消息写入commitLog磁盘文件的,也就是每次写入就是在文件的末尾追加一条数据就可以了,文件顺序写的性能要比随机写的性能提升很多,另外,数据写入commitLog文件的时候,其实不是直接写入底层的物理磁盘文件的,而是先进入OS的pagecache内存缓存中,然后后续由OS的后台线程选择一个时间,异步化的将OSPageCache内存缓冲中的数据刷入底层的磁盘文件。
commitLog优化思路总结:采用磁盘文件顺序写+OSPageCache缓存写入+OS异步刷盘的策略,基本上可以让消息写入commitLog的性能和直接写入内存是差不多的,所以broker才可以让broker高吞吐的处理每秒大量的消息写入。异步刷盘可能会导致消息数据丢失,简单提一嘴同步刷盘的机制:同步刷盘就是生产者发送一条消息出去,broker接收到了消息,必须直接强制的将这条消息刷入底层的物理磁盘中,然后返回ack给producer生产者,此时才知道消息写入成功了,只要消息进入了物理磁盘,数据就一定不会丢失,但是性能受了极大的影响;
通过上述优化,就实现了一个效果,写磁盘文件的时候都是进入pageCache的,保证写入的高性能,同时尽可能多的通过map+madvise的映射后预热机制,将磁盘文件中的数据尽可能多的加载到pageCache中来,对consumequeue,commitLog进行读取的时候,才能达到尽可能从内存中读取数据;
实际上在broker读写磁盘的时候,大量的将mmap技术和pagecache技术结合使用,通过mmap技术减少数据拷贝次数,利用pagecache技术尽可能有限读写内存,而不是物理磁盘;
消息顺序存储好处
1.CommitLog 顺序写 ,可以大大提高写人效率,提高堆积能力
2.虽然是随机读,但是利用操作系统的pagecache机制,可以批量地从磁盘读取,作为cache存到内存中,加速后续的读取速度
3.在实际情况中,大部分的 ConsumeQueue能够被全部读入内存,所以这个中间结构的操作速度很快, 可以认为是内存读取的速度
commitlog文件
- 存放该broke所有topic的消息
- 默认1G大小
- 以偏移量为文件名,当一个文件写满时则创建新文件,这样的设计主要是方便根据消息的物理偏移量,快速定位到消息所在的物理文件
- 一个消息存储单元是不定长的
- 顺序写但是支持随机读
消息单元的存储结构
下面的表格说明了,每个消息体不是定长的,会存储消息的哪些内容,包括物理偏移量、consumeQueue的偏移量、消息体等信息
顺序 字段名 说明
1 totalSize(4Byte) 消息大小
2 magicCode(4) 设置为daa320a7 (这个不太明白)
3 bodyCRC(4) 当broker重启recover时会校验
4 queueId(4) 消息对应的consumeQueueId
5 flag(4) rocketmq不做处理,只存储后透传
6 queueOffset(8) 消息在consumeQueue中的偏移量
7 physicalOffset(8) 消息在commitlog中的偏移量
8 sysFlg(4) 事务类型标识
9 bronTimestamp(8) 消息产生端(producer)的时间戳
10 bronHost(8) 消息产生端(producer)地址(address:port)
11 storeTimestamp(8) 消息在broker存储时间
12 storeHostAddress(8) 消息存储到broker的地址(address:port)
13 reconsumeTimes(4) 消息重试次数
14 preparedTransactionOffset(8) 事务消息的物理偏移量
15 bodyLength(4) 消息长度,最长不超过4MB
16 body(body length Bytes) 消息体内容
17 topicLength(1) 主题长度,最长不超过255Byte
18 topic(topic length Bytes) 主题内容
19 propertiesLength(2) 消息属性长度,最长不超过65535Bytes
20 properties(消息属性长度个字节) 消息属性内容
NOT_TYPE/PREPARED_TYPE/COMMIT_TYPE/ROLLBACK_TYPE
consumequeue文件
- 按topic和queueId纬度分别存储消息commitLogOffset、size、tagHashCode
- 以偏移量为文件名
- 一个存储单元是20个字节的定长的
- 顺序读顺序写
- 每个ConsumeQueue文件大小约5.72M
每个Topic下的每个MessageQueue都有一个对应的ConsumeQueue文件 该结构对应于消费者逻辑队列,为什么要将一个topic抽象出很多的queue呢?
这样的话,对集群模式更有好处,可以使多个消费者共同消费,而不用上锁。
消息单元的存储结构
顺序 | 字段名 | 说明 |
---|---|---|
1 | offset(8) | commitlog的偏移量 |
2 | size(4) | commitlog消息大小 |
3 | tagHashCode | tag的哈希值 |
index索引文件
- 以时间作为文件名
- 一个index存储单元是20个字节定长的
索引文件(Index)提供消息检索的能力,主要在问题排查和数据统计等场景应用
存储单元的结构
顺序 | 字段名 | 说明 |
---|---|---|
1 | keyHash(4) | key的hashcode |
2 | phyOffset(8) | commitLog真实的物理位移 |
3 | timeOffset(4) | 时间偏移量 |
4 | slotValue(4) | 下一个记录的slot值 |
消息存储源码解析
commitLog写入
参考:【RocketMQ源码学习】消息存储机制
DefaultMessageStore#putMessage
在RocketMQ的Broker启动时,会初始化一个核心组件messageStore.start();这个组件作为消息的存储组件,负责接受Produce发来的消息并保存到commitLog文件中,这个组件最终会调用DefaultMessageStore类中的putMessage()方法,这个方法是消息存储的核心!
// DefaultMessageStore#putMessage
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
// 判断该服务是否shutdown,不可用直接返回【代码省略】
// 判断broke的角色,如果是从节点直接返回【代码省略】
// 判断runningFlags是否是可写状态,不可写直接返回,可写把printTimes设为0【代码省略】
// 判断topic名字是否大于byte字节127, 大于则直接返回【代码省略】
// 判断msg中properties属性长度是否大于short最大长度32767,大于则直接返回【代码省略】
if (this.isOSPageCacheBusy()) { // 判断操作系统页写入是否繁忙
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessage(msg); // $2 查看下方代码,写msg核心
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
// 记录写commitlog时间,大于最大时间则设置为这个最新的时间
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
// 记录写commitlog 失败次数
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
commitLog.putMessage
在方法内部又会调用commitLog.putMessage(msg)方法,如下:
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
//获取消息的事务类型
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); // $1
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
//对于事务消息中UNKNOW、COMMIT消息,处理topic和queueId,
//同时备份real_topic,real_queueId
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // $2
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
//获取最新的mappedFile文件,有可能为空
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // $3
//给写mappedFile加锁,默认使用的是自旋锁PutMessageSpinLock
//AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
//死循环加锁: this.putMessageSpinLock.compareAndSet(true, false);
//解锁: this.putMessageSpinLock.compareAndSet(false, true);
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config // $4
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
msg.setStoreTimestamp(beginLockTimestamp);
//mappedFile为空时创建mappedFile文件, 创建的mappedFile文件offset为0
//文件名是文件大小
if (null == mappedFile || mappedFile.isFull()) { // $5
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
}
if (null == mappedFile) {
log.error("create mapped file1 error");
beginTimeInLock = 0;
return new PutMessageResult
(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
//在mappedFile中append消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback); // $6
//转换写入结果
switch (result.getStatus()) { // $7
//写入成功直接break
case PUT_OK:
break;
//文件剩下的空间不够写了,重新创建一个mappedFile文件, 重新写消息
case END_OF_FILE:
unlockMappedFile = mappedFile;
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
log.error("create mapped file2 error");
beginTimeInLock = 0;
return new PutMessageResult
(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
//msg超过大小统一返回MESSAGE_ILLEGAL
case MESSAGE_SIZE_EXCEEDED:
//properties超出大小统一返回MESSAGE_ILLEGAL
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
//未知错误,返回错误类型
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
elapsedTimeInLock = this.defaultMessageStore
.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
//解锁
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
//执行刷盘
handleDiskFlush(result, putMessageResult, msg); // $8
//执行主从同步
handleHA(result, putMessageResult, msg); // $9
return putMessageResult;
}
进入commitLog类中的putMessage()方法,方法中先对延时消息进行处理。
然后拿到虚拟内存中的文件(使用零拷贝实现),使用顺序写入的方式把消息追加到虚拟内存里,
在追加时使用lock保证同时只有一个线程往OScache内存写入消息!
mappedFile.appendMessage方法中是真正的写入逻辑
进入mappedFile.appendMessage方法中看一下具体的写入逻辑:就是包装消息的各种附加信息,例如msgId、offset等等,并把这些信息一并写入虚拟内存
由于CommitLog文件有1G的大小限制,当虚拟内存中的CommitLog被写满时,会创建一个新CommitLog文件继续写入。写入的只是虚拟内存,还要进行文件刷盘和主从同步
分发ConsumeQueue和IndexFile
Broker启动时会启动一个消息存储的核心组件messageStore。当CommitLog写入一条消息后,在DefaultMessageStore的start方法中,会启动一个后台线程reputMessageService每隔1毫秒就会去拉取CommitLog中最新更新的一批消息,然后分别转发到ComsumeQueue和IndexFile里去。
过期文件删除
默认情况下, Broker会启动后台线程,每60秒,检查CommitLog、ConsumeQueue文件。然后对超过72小时的数据进行删除。也就是说,默认情况下, RocketMQ只会保存3天内的数据。这个时间可以通过fileReservedTime来配置。注意删除时,并不会检查消息是否被消费了。
刷盘机制
我们之前简单提过一次,写入CommitLog的数据进入到MappedFile映射的一块内存里之后,后续会执行刷盘策略
比如是同步刷盘还是异步刷盘,如果是同步刷盘,那么此时就会直接把内存里的数据写入磁盘文件,如果是异步刷盘,那么就是过一段时间之后,再把数据刷入磁盘文件里去
那么今天我们来看看底层到底是如何执行不同的刷盘策略的。
大家应该还记得之前我们说过,往CommitLog里写数的时候,是调用的CommitLog类的putMessage0这个方法
没错的,其实在这个方法的末尾有两行代码,很关键的,大家看一下下面的源码片段.
大家会发现在末尾有两个方法调用,一个是handleDishFlush0,一个是handleHA0
顾名思义,一个就是用于决定如何进行刷盘的,一个是用于决定如何把消息同步给Slave Broker的。
关于消息如何同步给Slave Broker,这个我们就不看了,因为涉及到Broker高可用机制,这里展开说就太多了,其实大家有兴趣可以自己慢慢去研究,我们这里主要就是讲解一些RocketMQ的核心源码原理。
所以我们重点进入到handleDiskFlush0方法里去,看看他是如何处理刷盘的。
刷盘即存盘,刷盘机制是指生产者生产消息到rocketMQ后存入硬盘的方式,RocketMQ需要将消息存储到磁盘上,这样才能保证断电后消息不会丢失。同时这样才可以让存储的消息量可以超出内存的限制。RocketMQ为了提高性能,会尽量保证磁盘的顺序写。消息在写入磁盘时,有两种写磁盘的方式。
同步刷盘:只有在消息真正持久化至磁盘后,RocketMQ的Broker端才会真正地返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用领域。
异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
异步和同步刷盘的区别在于,异步刷盘时,主线程并不会阻塞,在将刷盘线程唤醒后,就会继续往下执行。
上面代码我们就看的很清晰了,同步刷盘的策略是如何处理的.
其实上面就是构建了一个GroupCommitRequest,然后提交给了GroupCommitService去进行处理,然后调用request.waitForFlush0方法等待同步刷盘成功
万一刷盘失败了,就打印日志。具体剧盘是由GroupCommitService执行的,他的doCommit0方法最终会执行同步刷盘的逻辑,里面有如下代码。
上面那行代码一层一层调用下去,最终刷盘其实是靠的MappedByteBuffer的force0方法,如下所示
这个MappedByteBuffer就是JDK NIO包下的API,他的force0方法就是强追把你写入内存的数据刷入到磁盘文件里去,到此就是同步刷盘成功了
那么如果是异步刷盘呢? 我们先看CommitLog.handleDiskFlush里的的代码片段
其实这里就是唤醒了一个flushCommitLogService组件,那么他是什么呢?
FlushCommitLogService其实是一个线程,他是个抽象父类,他的子类是CommitRealTimeService,所以真正唤醒的是他的子类代表的线程。
具体在子类线程的run0方法里就有定时刷新的逻辑,这里就不赘述了。
其实简单来说,就是每隔一定时间执行一次刷盘,最大间隔是10s,所以一且执行异步刷盘,那么最多就是10秒就会执行一次刷盘。
好了,到此为止,我们把Commitlog的同步刷盘和异步刷盘两种策略的核心源码也讲解完了。我们主要是讲解的核心
源码,而源码里很多细节不可能一行一行进行分析,大家可以顺着文中的思路继续探究
主从复制
如果Broker以一个集群的方式部署,会有一个master节点和多个slave节点,消息需要从Master复制到Slave上。而消息复制的方式分为同步复制和异步复制。
同步复制: 同步复制是等Master和Slave都写入消息成功后才反馈给客户端写入成功的状态。在同步复制下,如果Master节点故障,Slave上有全部的数据备份,这样容易恢复数据。但是同步复制会增大数据写入的延迟,降低系统的吞吐量。
异步复制: 异步复制是只要master写入消息成功,就反馈给客户端写入成功的状态。然后再异步的将消息复制给Slave节点。在异步复制下,系统拥有较低的延迟和较高的吞吐量。但是如果master节点故障,而有些数据没有完成复制,就会造成数据丢失。
配置方式如下:
参考链接:blog.csdn.net/qq_45076180…
mq消息存储相关链接
深入研究一下Broker是如何持久化存储消息
32张图带你解决RocketMQ所有场景问题
putMessage为什么要加锁
broker处理消息commit时加锁应该使用自旋锁还是重入锁