RocketMQ简介
首先简单介绍一下RocketMQ的整体架构,一个RocketMQ集群主要由四个部分组成:
NameServer集群,Producer,Consumer集群,Broker集群。
NameServer是消息队列中的状态服务器,维护整个RocketMQ集群的状态信息,可以类比成Zookeeper在Kafka集群中的作用。Broker,Producer,Consumer都会和RocketMQ集群连接通信。NameServer可以集群部署,不过NameServer彼此之间是没有进行任何网络通信,整个NameServer集群是无状态的。
Producer和Consumer是消息生产者和消费者,属于客户端,使用Producer向Broker集群发送消息,Consumer从Broker消费消息。
Broker是以一个独立的进程,一般是集群式部署,可以理解为RocketMQ服务端,Producer发送的消息最终会存储到Broker集群上的CommitLog日志上,Consumer消费的位移信息存在再ConsumerQueue日志,RocketMQ客户端支持通过关键字搜索消息,RocketMQ搜索索引数据存储在Broker服务的IndexFile中。
磁盘存储文件
RocketMQ Broker上主要有三种文件类型:
ComimitLog日志: 存储消息日志,每个Broker上所有的Topic的日志都会存储在同一个CommitLog文件中
ConsumerQueue日志:存储每个Topic下某个队列的位移,通过位移可以查找到具体消息日志
IndexFile日志:存储RocketMQ的搜索索引数据
底层存储MappedFile
ComimitLog,ConsumerQueue,IndexFile底层都依赖MappedFile,所有的读写操作都被封装在MappedFile中。
// 内存页大小,默认4K
public static final int OS_PAGE_SIZE = 1024 * 4;
// 映射文件的内存大小,CommitLog default is 1G,ConsumeQueue default is 30W
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
// mappedFile文件计数,初始化后+1,摧毁后-1
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
// 文件写入位置
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// 文件提交位置
protected final AtomicInteger committedPosition = new AtomicInteger(0);
// 文件刷新位置
private final AtomicInteger flushedPosition = new AtomicInteger(0);
// 文件大小
protected int fileSize;
// 该MappedFile对应的channel
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
// 如果启用了TransientStorePool,则writeBuffer为TransientStorePool获取,
// 此时消息会先写入该writeBuffer,commit到fileChannel,对fileChannel进行flush刷盘
protected ByteBuffer writeBuffer = null;
// ByteBuffer暂存池,没启用则为null
protected TransientStorePool transientStorePool = null;
// 文件名,就是该文件内容默认起始位置
private String fileName;
// 该文件中内容相于整个文件的偏移,和文件名相同
private long fileFromOffset;
// MappedFile对应的实际文件
private File file;
// fileChannel.map得到的可读写的内存映射buffer,如果没有启用
// TransientStorePool则写数据时会写到该缓冲中,刷盘时直接调用该
// 映射buffer的force函数,而不需要进行commit操作
private MappedByteBuffer mappedByteBuffer;
// 存储时间戳
private volatile long storeTimestamp = 0;
// mappedFile是是否是ConsumerQueue的创建的第一个文件
private boolean firstCreateInQueue = false;
MappedFile本质上是对文件读写的封装,每个MappedFile对象对应磁盘上一个具体的File文件对象。RocketMQ日志都是顺序存储,所以写入性能非常高,MappedFile底层基于FileChannel(sendfile)和MappedByteBuffer(mmap)实现顺序读写。
MappedFile写入磁盘
方式一 | 写入内存字节缓冲区(writeBuffer) | 从内存字节缓冲区(writebuffer)提交(commit)到文件通道(fileChannel) | 文件通道(fileChannel)flush |
---|---|---|---|
方式二 | 写入映射文件字节缓冲区(mappedByteBuffer) | 映射文件字节缓冲区(mappedByteBuffer)flush |
MappedFile刷盘方式
考虑到写入性能,满足 flushLeastPages * OS_PAGE_SIZE (默认4KB)才进行 flush刷新操作,在Linux系统下flush是指从PageCache刷新到磁盘上(非mmap方式)。flushLeastPages默认是0,即立即刷新到磁盘,目前只有ConsumerQueue文件刷新是2。
同磁盘写入方式一样,刷盘方式与之对应也有两种方式:
其实二者的区别就是在于使用FileChannel还是MappedByteBuffer:
MappedByteBuffer底层基于内存映射mmap,FileChannel底层基于Linux的"零拷贝"sendfile"。MappedByteBuffer在一次写入很小量数据的场景下才能表现出比 FileChannel 稍微优异的性能。很多人认为:读 4kb 以下的数据请使用 MappedByteBuffer,大于 4kb 以上请使用 FileChannel。
关于FileChannel和MappedByteBuffer可以看下:论最强IO:MappedByteBuffer VS FileChannel
索引文件ComimitLog
CommitLog概述
和Kafka不同,RocketMQ Broker下不同的Topic都存储在同一个CommitLog,Producer发送到Broker的消息日志会顺序写入到CommitLog中,CommitLog是MQ消息真正的物理存储文件。RocketMQ存储目录下有个commitlog的文件夹,Broker中的日志文件会顺序写入该文件夹下,默认存储路径为:
${user.home}/store/commitlog/${fileName}
文件名是以索引偏移量命名的,每台Broker上的CommitLog偏移量都是从0开始的,所以第一个文件名就是000000000000000000。CommitLog文件默认大小是1GB,也就是 10024 * 1024 * 1024B = 1073741824B,第二个文件名为00000000001073741824,偏移量是从1073741824开始的。
这里的偏移量都是指的物理偏移量,也就是在CommitLog中的真实的物理位置,使用物理偏移量的好处是可以直接定位到消息的位置。除此之外还有一个是逻辑偏移量,可以理解为这条消息是CommitLog中的第几条日志,由于消息是不定长的,给出逻辑地址是不方便直接寻址的。
CommitLog文件架构
一个CommitLog下面使用MappedFileQueue持有和管理MappedFile,每个CommitLog都是有一个或多个MappedFile首尾连接形成的,由此可见RocketMQ存储日志的思想:追求极致的顺序读写。
CommitLog结构
// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = -626843481;
// End of file empty MAGIC CODE cbd43194
protected final static int BLANK_MAGIC_CODE = -875286124;
/**
* 用于管理MappedFile
*/
protected final MappedFileQueue mappedFileQueue;
protected final DefaultMessageStore defaultMessageStore;
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
private final FlushCommitLogService commitLogService;
private final AppendMessageCallback appendMessageCallback;
private final ThreadLocal putMessageThreadLocal;
protected HashMap topicQueueTable = new HashMap(1024);
protected volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0;
protected final PutMessageLock putMessageLock;
序号 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | TOTALSIZE | 消息总长度 | int | 4 |
2 | MAGICCODE | 魔数,普通:daa320a7空白:cbd43194 | int | 4 |
3 | BODYCRC | 消息体的crc值 | int | 4 |
4 | QUEUEID | 消息队列编号 | int | 4 |
5 | FLAG | MQ请求消息的FLAG值 | int | 4 |
6 | QUEUEOFFSET | 消息队列中的位移 | long | 8 |
7 | PHYSICALOFFSET | 物理位移,在 CommitLog 的顺序存储位置 | long | 8 |
8 | SYSFLAG | int | 4 | |
9 | BORNTIMESTAMP | long | 8 | |
10 | BORNHOST | long | 8 | |
11 | STORETIMESTAMP | long | 8 | |
12 | STOREHOSTADDRESS | int + int | 4+4 | |
13 | RECONSUMETIMES | int | 4 | |
14 | PreparedTransationOffset | long | 8 | |
15 | BODYLEN | 消息内容长度 | int | 4 |
16 | BODY | 消息内容 | bytes | bodyLength |
17 | TOPICLEN | 话题长度(字节数) | int | 4 |
18 | TOPIC | RocketMQ当前话题 | bytes | topicLength |
19 | PROPERTIESLEN | 属性长度(字节数) | int | 4 |
20 | PROPERTIES | 属性内容 | bytes | propertiesLength |
CommitLog存储的是原始消息,长度是不固定的。
持久化机制
在 RocketMQ 中提供了三种方式来持久化,对应了三个不同的线程实现,实际使用中只会选择一个。
- 同步持久化,使用 GroupCommitService。
- 异步持久化且未开启 TransientStorePool 缓存,使用 FlushRealTimeService。
- 异步持久化且开启 TransientStorePool 缓存,使用 CommitRealService。
同步刷盘,写入线程仅负责唤醒落盘线程,将消息转交给存储线程,不会等待消息存储完成就立刻返回。
从具体实现来看看,落盘线程每隔 10 ms 会检查一次,如果有数据未持久化,便将 page cache 中的数据刷入磁盘。此时操作系统 crash 或者断电,只要生产者使用了可靠发送 (指非 oneway 的 rpc 调用),由于此时对于发送者来说还没有收到成功的响应,那么客户端会进行重试,将消息写入其他可用的节点。
异步持久化对应的线程是 FlushRealTimeService,实现上又分为固定频率和非固定频率,核心区别是线程是否响应中断。所谓的固定频率是指每次有新的消息到来的时候不管,不响应中断,每隔 500ms(可配置)flush 一次,如果发现未落盘数据不足(默认 16K),直接进入下一个循环,如果数据写入量很少,一直没有填充满16K,就不会落盘了吗?这里还有一个基于时间的兜底方案,即线程发现距离上次写入已经很久了(默认 10 秒),也会执行一次 flush。
但事实上 FileChannel 还是 MappedByteBuffer 的 force() 方法都不能精确控制写入的数据量,这里的写行为也只是对内核的一种建议。对于非固定频率实现,即每次有新的消息到来的时候,都会发送唤醒信号,当唤醒动作在数据量较大时,存在性能损耗,但消息量较少且情况下实时性好,更省资源。在生产中,具体选择哪种持久化实现由具体的场景决定。是同步写还是多副本异步写来保证数据存储的可靠性,本质上是读写延迟和和成本之间的权衡。
消费队列ConsumerQueue
ConsumerQueue概述
RocketMQ的消息都是按照先来后到,顺序的存储在CommitLog中的,而消费者通常只关心某个Topic下的消息。顺序的查找CommitLog肯定是不现实的,我们可以构建一个索引文件,里面存放着某个Topic下面所有消息在CommitLog中的位置,这样消费者获取消息的时候,只需要先查找这个索引文件,然后再去CommitLog中获取消息就 可以了,在RocketMQ中这个索引文件就是ComsumerQueue。
同一个Broker下可能会有多个Topic,每个Topic可能会有多个队列,每个队列就对应一个ConsumerQueue,同一个Broker多个ConsumerQueue共享一个CommitLog。与CommitLog类似,每个ConsumerQueue也是使用MappedFileQueue持有和管理多个MappedFile。
ConsumerQueue整体架构
ConsumerQueue整体架构如上,ConsumerQueue和下面的IndexFIle本质上可以认为是CommitLog的两种索引:
ConsumerQueue是通过位移找到原始消息日志,IndexFIle是通过key找到原始消息日志。
ConsumerQueue日志结构
public static final int CQ_STORE_UNIT_SIZE = 20;
// 存储服务
private final DefaultMessageStore defaultMessageStore;
// 用于管理MappedFile
private final MappedFileQueue mappedFileQueue;
// 话题
private final String topic;
private final int queueId;
// 用于数据写入,是否会有线程安全问题
private final ByteBuffer byteBufferIndex;
private final String storePath;
private final int mappedFileSize;
private long maxPhysicOffset = -1;
private volatile long minLogicOffset = 0;
private ConsumeQueueExt consumeQueueExt = null;
序号 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | offset | 消息位移,对应commitLog的PHYSICALOFFSET | long | 8 |
2 | totalsize | 对应commitLog的TOTALSIZE,该条消息的总长度 | int | 4 |
3 | tagsCode | 消息哈希,基于commitLogPROPERTIES计算哈希值 | long | 8 |
ConsumerQueue的日志是定长的CQ_STORE_UNIT_SIZE = 20字节。整体而言ConsumerQueue是最简单的。
索引文件IndexFile
public static final int INDEX_HEADER_SIZE = 40;
private static int beginTimestampIndex = 0;
private static int endTimestampIndex = 8;
private static int beginPhyoffsetIndex = 16;
private static int endPhyoffsetIndex = 24;
private static int hashSlotcountIndex = 32;
private static int indexCountIndex = 36;
private final ByteBuffer byteBuffer;
private AtomicLong beginTimestamp = new AtomicLong(0);
private AtomicLong endTimestamp = new AtomicLong(0);
private AtomicLong beginPhyOffset = new AtomicLong(0);
private AtomicLong endPhyOffset = new AtomicLong(0);
private AtomicInteger hashSlotCount = new AtomicInteger(0);
private AtomicInteger indexCount = new AtomicInteger(1);
// 每个hash slot的大小
private static int hashSlotSize = 4;
// 每个index的大小
private static int indexSize = 20;
private static int invalidIndex = 0;
private final int hashSlotNum;
private final int indexNum;
private final MappedFile mappedFile;
// private final FileChannel fileChannel;
private final MappedByteBuffer mappedByteBuffer;
private final IndexHeader indexHeader;
IndexHeader结构
序号 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | beginTimestamp | 当前索引文件中第一条消息存储时间戳 | long | 8 |
2 | endTimestamp | 最新消息存储时间戳 | long | 8 |
3 | beginPhyOffset | 第一条消息在commitlog中的偏移量,即commitlog offset | long | 8 |
4 | endPhyOffset | 最新消息在commitlog中偏移量 | long | 8 |
5 | hashSlotCount | 含有index的slot数量 | int | 4 |
6 | indexCount | 包含的索引单元的个数 | int | 4 |
IndexHeader存储的是IndexFile整个索引文件的元数据信息,总长度8 + 8 + 8 + 8 + 4 + 4 = 40B。
IndexFile结构
IndexFile索引域
序号 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | slotIndex | 该哈希表采用头插法,索引槽会指向最新的元素的索引位置,默认配置下slot数量为5000000 | int | 4 |
哈希槽的数量默认配置最大数量为5000000,可以理解为Java中HashMap的数组长度,每个IndexFile创建完成后确定后续生命周期中不能修改。
IndexFile数据域
序号 | 字段 | 说明 | 数据类型 | 字节数 |
---|---|---|---|---|
1 | keyHash | 本条数据的哈希 | int | 4 |
2 | phyOffset | 物理位移,commitLog中的位移数据 | long | 8 |
3 | timeDiff | 当前commitLog的存储时间STORETIMESTAMP和当前IndexHeader中beginTimestamp的差值 | int | 4 |
4 | slotIndex | 指向上一条数据的索引位置 | int | 4 |
IndexFile整体架构如上图,哈希槽索引数量默认配置为500w个,支持的索引数据最大数量为为2000w个。
索引数据写入
索引数据读取
性能优化
总结
ByteBuffer | MappedByteBuffer | FileChannel |
---|---|---|
mmap | senfile | pagecache |
DirectIO |