RocketMQ磁盘存储

2023年 10月 10日 48.5k 0

RocketMQ简介

首先简单介绍一下RocketMQ的整体架构,一个RocketMQ集群主要由四个部分组成:

NameServer集群,Producer,Consumer集群,Broker集群。

NameServer是消息队列中的状态服务器,维护整个RocketMQ集群的状态信息,可以类比成Zookeeper在Kafka集群中的作用。Broker,Producer,Consumer都会和RocketMQ集群连接通信。NameServer可以集群部署,不过NameServer彼此之间是没有进行任何网络通信,整个NameServer集群是无状态的。

Producer和Consumer是消息生产者和消费者,属于客户端,使用Producer向Broker集群发送消息,Consumer从Broker消费消息。

Broker是以一个独立的进程,一般是集群式部署,可以理解为RocketMQ服务端,Producer发送的消息最终会存储到Broker集群上的CommitLog日志上,Consumer消费的位移信息存在再ConsumerQueue日志,RocketMQ客户端支持通过关键字搜索消息,RocketMQ搜索索引数据存储在Broker服务的IndexFile中。

磁盘存储文件

RocketMQ Broker上主要有三种文件类型:

ComimitLog日志: 存储消息日志,每个Broker上所有的Topic的日志都会存储在同一个CommitLog文件中

ConsumerQueue日志:存储每个Topic下某个队列的位移,通过位移可以查找到具体消息日志

IndexFile日志:存储RocketMQ的搜索索引数据

底层存储MappedFile

ComimitLog,ConsumerQueue,IndexFile底层都依赖MappedFile,所有的读写操作都被封装在MappedFile中。

// 内存页大小,默认4K
public static final int OS_PAGE_SIZE = 1024 * 4;

// 映射文件的内存大小,CommitLog default is 1G,ConsumeQueue default is 30W
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
// mappedFile文件计数,初始化后+1,摧毁后-1
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
// 文件写入位置
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// 文件提交位置
protected final AtomicInteger committedPosition = new AtomicInteger(0);
// 文件刷新位置
private final AtomicInteger flushedPosition = new AtomicInteger(0);
// 文件大小
protected int fileSize;
// 该MappedFile对应的channel
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
// 如果启用了TransientStorePool,则writeBuffer为TransientStorePool获取,
// 此时消息会先写入该writeBuffer,commit到fileChannel,对fileChannel进行flush刷盘
protected ByteBuffer writeBuffer = null;
// ByteBuffer暂存池,没启用则为null
protected TransientStorePool transientStorePool = null;
// 文件名,就是该文件内容默认起始位置
private String fileName;
// 该文件中内容相于整个文件的偏移,和文件名相同
private long fileFromOffset;
// MappedFile对应的实际文件
private File file;
// fileChannel.map得到的可读写的内存映射buffer,如果没有启用
// TransientStorePool则写数据时会写到该缓冲中,刷盘时直接调用该
// 映射buffer的force函数,而不需要进行commit操作
private MappedByteBuffer mappedByteBuffer;
// 存储时间戳
private volatile long storeTimestamp = 0;
// mappedFile是是否是ConsumerQueue的创建的第一个文件
private boolean firstCreateInQueue = false;

MappedFile本质上是对文件读写的封装,每个MappedFile对象对应磁盘上一个具体的File文件对象。RocketMQ日志都是顺序存储,所以写入性能非常高,MappedFile底层基于FileChannel(sendfile)和MappedByteBuffer(mmap)实现顺序读写。

MappedFile写入磁盘

方式一 写入内存字节缓冲区(writeBuffer) 从内存字节缓冲区(writebuffer)提交(commit)到文件通道(fileChannel) 文件通道(fileChannel)flush
方式二 写入映射文件字节缓冲区(mappedByteBuffer) 映射文件字节缓冲区(mappedByteBuffer)flush

MappedFile刷盘方式

考虑到写入性能,满足 flushLeastPages * OS_PAGE_SIZE (默认4KB)才进行 flush刷新操作,在Linux系统下flush是指从PageCache刷新到磁盘上(非mmap方式)。flushLeastPages默认是0,即立即刷新到磁盘,目前只有ConsumerQueue文件刷新是2。

同磁盘写入方式一样,刷盘方式与之对应也有两种方式:

  • 如果使用FileChannel(优先使用),调用FileChannel.force()方法强制刷盘
  • 如果使用MappedByteBuffer,调用MappedByteBuffer.force()方法强制刷盘
  • 其实二者的区别就是在于使用FileChannel还是MappedByteBuffer:

    MappedByteBuffer底层基于内存映射mmap,FileChannel底层基于Linux的"零拷贝"sendfile"。MappedByteBuffer在一次写入很小量数据的场景下才能表现出比 FileChannel 稍微优异的性能。很多人认为:读 4kb 以下的数据请使用 MappedByteBuffer,大于 4kb 以上请使用 FileChannel。

    关于FileChannel和MappedByteBuffer可以看下:论最强IO:MappedByteBuffer VS FileChannel

    索引文件ComimitLog

    CommitLog概述

    和Kafka不同,RocketMQ Broker下不同的Topic都存储在同一个CommitLog,Producer发送到Broker的消息日志会顺序写入到CommitLog中,CommitLog是MQ消息真正的物理存储文件。RocketMQ存储目录下有个commitlog的文件夹,Broker中的日志文件会顺序写入该文件夹下,默认存储路径为:

    ${user.home}/store/commitlog/${fileName}
    

    文件名是以索引偏移量命名的,每台Broker上的CommitLog偏移量都是从0开始的,所以第一个文件名就是000000000000000000。CommitLog文件默认大小是1GB,也就是 10024 * 1024 * 1024B = 1073741824B,第二个文件名为00000000001073741824,偏移量是从1073741824开始的。

    这里的偏移量都是指的物理偏移量,也就是在CommitLog中的真实的物理位置,使用物理偏移量的好处是可以直接定位到消息的位置。除此之外还有一个是逻辑偏移量,可以理解为这条消息是CommitLog中的第几条日志,由于消息是不定长的,给出逻辑地址是不方便直接寻址的。

    CommitLog文件架构

    一个CommitLog下面使用MappedFileQueue持有和管理MappedFile,每个CommitLog都是有一个或多个MappedFile首尾连接形成的,由此可见RocketMQ存储日志的思想:追求极致的顺序读写。

    CommitLog结构

    // Message's MAGIC CODE daa320a7
    public final static int MESSAGE_MAGIC_CODE = -626843481;
    
    // End of file empty MAGIC CODE cbd43194
    protected final static int BLANK_MAGIC_CODE = -875286124;
    /**
         * 用于管理MappedFile
         */
    protected final MappedFileQueue mappedFileQueue;
    protected final DefaultMessageStore defaultMessageStore;
    private final FlushCommitLogService flushCommitLogService;
    
    //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
    private final FlushCommitLogService commitLogService;
    
    private final AppendMessageCallback appendMessageCallback;
    private final ThreadLocal putMessageThreadLocal;
    protected HashMap topicQueueTable = new HashMap(1024);
    protected volatile long confirmOffset = -1L;
    
    private volatile long beginTimeInLock = 0;
    
    protected final PutMessageLock putMessageLock;
    
    序号 字段 说明 数据类型 字节数
    1 TOTALSIZE 消息总长度 int 4
    2 MAGICCODE 魔数,普通:daa320a7空白:cbd43194 int 4
    3 BODYCRC 消息体的crc值 int 4
    4 QUEUEID 消息队列编号 int 4
    5 FLAG MQ请求消息的FLAG值 int 4
    6 QUEUEOFFSET 消息队列中的位移 long 8
    7 PHYSICALOFFSET 物理位移,在 CommitLog 的顺序存储位置 long 8
    8 SYSFLAG int 4
    9 BORNTIMESTAMP long 8
    10 BORNHOST long 8
    11 STORETIMESTAMP long 8
    12 STOREHOSTADDRESS int + int 4+4
    13 RECONSUMETIMES int 4
    14 PreparedTransationOffset long 8
    15 BODYLEN 消息内容长度 int 4
    16 BODY 消息内容 bytes bodyLength
    17 TOPICLEN 话题长度(字节数) int 4
    18 TOPIC RocketMQ当前话题 bytes topicLength
    19 PROPERTIESLEN 属性长度(字节数) int 4
    20 PROPERTIES 属性内容 bytes propertiesLength

    CommitLog存储的是原始消息,长度是不固定的。

    持久化机制

    在 RocketMQ 中提供了三种方式来持久化,对应了三个不同的线程实现,实际使用中只会选择一个。

    • 同步持久化,使用 GroupCommitService。
    • 异步持久化且未开启 TransientStorePool 缓存,使用 FlushRealTimeService。
    • 异步持久化且开启 TransientStorePool 缓存,使用 CommitRealService。

    同步刷盘,写入线程仅负责唤醒落盘线程,将消息转交给存储线程,不会等待消息存储完成就立刻返回。

    从具体实现来看看,落盘线程每隔 10 ms 会检查一次,如果有数据未持久化,便将 page cache 中的数据刷入磁盘。此时操作系统 crash 或者断电,只要生产者使用了可靠发送 (指非 oneway 的 rpc 调用),由于此时对于发送者来说还没有收到成功的响应,那么客户端会进行重试,将消息写入其他可用的节点。

    异步持久化对应的线程是 FlushRealTimeService,实现上又分为固定频率和非固定频率,核心区别是线程是否响应中断。所谓的固定频率是指每次有新的消息到来的时候不管,不响应中断,每隔 500ms(可配置)flush 一次,如果发现未落盘数据不足(默认 16K),直接进入下一个循环,如果数据写入量很少,一直没有填充满16K,就不会落盘了吗?这里还有一个基于时间的兜底方案,即线程发现距离上次写入已经很久了(默认 10 秒),也会执行一次 flush。

    但事实上 FileChannel 还是 MappedByteBuffer 的 force() 方法都不能精确控制写入的数据量,这里的写行为也只是对内核的一种建议。对于非固定频率实现,即每次有新的消息到来的时候,都会发送唤醒信号,当唤醒动作在数据量较大时,存在性能损耗,但消息量较少且情况下实时性好,更省资源。在生产中,具体选择哪种持久化实现由具体的场景决定。是同步写还是多副本异步写来保证数据存储的可靠性,本质上是读写延迟和和成本之间的权衡。

    消费队列ConsumerQueue

    ConsumerQueue概述

    RocketMQ的消息都是按照先来后到,顺序的存储在CommitLog中的,而消费者通常只关心某个Topic下的消息。顺序的查找CommitLog肯定是不现实的,我们可以构建一个索引文件,里面存放着某个Topic下面所有消息在CommitLog中的位置,这样消费者获取消息的时候,只需要先查找这个索引文件,然后再去CommitLog中获取消息就 可以了,在RocketMQ中这个索引文件就是ComsumerQueue。

    同一个Broker下可能会有多个Topic,每个Topic可能会有多个队列,每个队列就对应一个ConsumerQueue,同一个Broker多个ConsumerQueue共享一个CommitLog。与CommitLog类似,每个ConsumerQueue也是使用MappedFileQueue持有和管理多个MappedFile。

    ConsumerQueue整体架构

    ConsumerQueue整体架构如上,ConsumerQueue和下面的IndexFIle本质上可以认为是CommitLog的两种索引:

    ConsumerQueue是通过位移找到原始消息日志,IndexFIle是通过key找到原始消息日志。

    ConsumerQueue日志结构

    public static final int CQ_STORE_UNIT_SIZE = 20;
    // 存储服务
    private final DefaultMessageStore defaultMessageStore;
    
    // 用于管理MappedFile
    private final MappedFileQueue mappedFileQueue;
    // 话题
    private final String topic;
    private final int queueId;
    // 用于数据写入,是否会有线程安全问题
    private final ByteBuffer byteBufferIndex;
    
    private final String storePath;
    private final int mappedFileSize;
    private long maxPhysicOffset = -1;
    private volatile long minLogicOffset = 0;
    private ConsumeQueueExt consumeQueueExt = null;
    
    序号 字段 说明 数据类型 字节数
    1 offset 消息位移,对应commitLog的PHYSICALOFFSET long 8
    2 totalsize 对应commitLog的TOTALSIZE,该条消息的总长度 int 4
    3 tagsCode 消息哈希,基于commitLogPROPERTIES计算哈希值 long 8

    ConsumerQueue的日志是定长的CQ_STORE_UNIT_SIZE = 20字节。整体而言ConsumerQueue是最简单的。

    索引文件IndexFile

    public static final int INDEX_HEADER_SIZE = 40;
    private static int beginTimestampIndex = 0;
    private static int endTimestampIndex = 8;
    private static int beginPhyoffsetIndex = 16;
    private static int endPhyoffsetIndex = 24;
    private static int hashSlotcountIndex = 32;
    private static int indexCountIndex = 36;
    private final ByteBuffer byteBuffer;
    private AtomicLong beginTimestamp = new AtomicLong(0);
    private AtomicLong endTimestamp = new AtomicLong(0);
    private AtomicLong beginPhyOffset = new AtomicLong(0);
    private AtomicLong endPhyOffset = new AtomicLong(0);
    private AtomicInteger hashSlotCount = new AtomicInteger(0);
    
    private AtomicInteger indexCount = new AtomicInteger(1);
    
     // 每个hash slot的大小
    private static int hashSlotSize = 4;
    // 每个index的大小
    private static int indexSize = 20;
    private static int invalidIndex = 0;
    private final int hashSlotNum;
    private final int indexNum;
    private final MappedFile mappedFile;
    // private final FileChannel fileChannel;
    private final MappedByteBuffer mappedByteBuffer;
    private final IndexHeader indexHeader;
    

    IndexHeader结构

    序号 字段 说明 数据类型 字节数
    1 beginTimestamp 当前索引文件中第一条消息存储时间戳 long 8
    2 endTimestamp 最新消息存储时间戳 long 8
    3 beginPhyOffset 第一条消息在commitlog中的偏移量,即commitlog offset long 8
    4 endPhyOffset 最新消息在commitlog中偏移量 long 8
    5 hashSlotCount 含有index的slot数量 int 4
    6 indexCount 包含的索引单元的个数 int 4

    IndexHeader存储的是IndexFile整个索引文件的元数据信息,总长度8 + 8 + 8 + 8 + 4 + 4 = 40B。

    IndexFile结构

    IndexFile索引域

    序号 字段 说明 数据类型 字节数
    1 slotIndex 该哈希表采用头插法,索引槽会指向最新的元素的索引位置,默认配置下slot数量为5000000 int 4

    哈希槽的数量默认配置最大数量为5000000,可以理解为Java中HashMap的数组长度,每个IndexFile创建完成后确定后续生命周期中不能修改。

    IndexFile数据域

    序号 字段 说明 数据类型 字节数
    1 keyHash 本条数据的哈希 int 4
    2 phyOffset 物理位移,commitLog中的位移数据 long 8
    3 timeDiff 当前commitLog的存储时间STORETIMESTAMP和当前IndexHeader中beginTimestamp的差值 int 4
    4 slotIndex 指向上一条数据的索引位置 int 4

    IndexFile整体架构如上图,哈希槽索引数量默认配置为500w个,支持的索引数据最大数量为为2000w个。

    索引数据写入

  • 计算hash值
  • 取索引key,然后用hashCode()计算哈希值keyHash
  • 计算哈希槽位置
  • slotPos = keyHash % hashSlotNum
  • hashSlotNum就是哈希槽的默认配置数量即500w
  • 哈希槽的实际物理位置
  • absSlotPos = INDEX_HEADER_SIZE + slotPos * hashSlotSize
  • INDEX_HEADER_SIZE固定为40B,hashSlotSize固定为4B
  • 哈希槽存储的数据域索引指针
  • 取absSlotPos位置的数据slotValue(为一个int类型)
  • 如果slotValue = 0,说明当前哈希槽没有指向任何数据域
  • 如果slotValue > 0, 说明当前哈希槽已经有指向的数据域
  • 索引数据物理位置
  • absIndexPos = INDEX_HEADER_SIZE + hashSlotNum * hashSlotSize + indexCount * indexSize
  • indexCount是当前哈希表存储的是索引数量,indexSize为索引大小,固定大小为20B
  • 存储索引数据
  • 在absIndexPos存储索引数据,分别存储keyHash,phyOffset,timeDiff,slotIndex(slotValue)
  • 更新IndexHeader中的hashSlotCount,indexCount等字段
  • 索引数据读取

  • 通过key计算索引实际数据物理位置absIndexPos
  • 计算方式与上面的写入逻辑相同
  • 哈希槽存储的数据域索引指针
  • 取absSlotPos位置的数据slotValue(为一个int类型)
  • 如果slotValue = 0,说明当前哈希槽没有指向任何数据域,则该key在当前IndexFile不存在
  • 如果slotValue > 0, 说明当前哈希槽已经有指向的数据域,继续第3步
  • 循环读取每个slot下挂载的的全部item
  • 性能优化

    总结

  • 三种主要的日志文件都是顺序读写,磁盘读写速度远胜随机读写;
  • 每个Broker上全部消息写到同一个CommitLog,这个是借鉴Kafka的失败经验,Kafka每个Topic独占一个消息文件,如果Topic数量过多写入消息时磁头竞争非常激烈,原本的顺序读写也会退化成随机读写;
  • 基于FileChannel和MappedByteBuffer实现零拷贝;
  • 支持消息批处理操作提高性能
  • 等等等
  • ByteBuffer MappedByteBuffer FileChannel
    mmap senfile pagecache
    DirectIO

    相关文章

    服务器端口转发,带你了解服务器端口转发
    服务器开放端口,服务器开放端口的步骤
    产品推荐:7月受欢迎AI容器镜像来了,有Qwen系列大模型镜像
    如何使用 WinGet 下载 Microsoft Store 应用
    百度搜索:蓝易云 – 熟悉ubuntu apt-get命令详解
    百度搜索:蓝易云 – 域名解析成功但ping不通解决方案

    发布评论