从源码角度聊聊RocketMQ 消息(文件)删除机制

2023年 7月 25日 54.7k 0

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

RocketMQ版本

  • 5.1.0

入口

这里消息删除的源码入口其实不太好直接确定,我们可以通过启动broker的代码开始即BrokerStartupmain方法

这里我们直接进入到controller.start();方法看看

里面的代码很长,我们可以看到有一个方法startBasicService();

进入startBasicService();方法后我们可以看到有一个this.messageStore.start();

这个messageStore就是封在了我们的一些消息文件的操作 可以简单看看他的一些方法

由于方法众多,我们回到今天的主线,继续查看start()方法的实现类即DefaultMessageStore

start方法里面我们可以看到一个比较核心的方法this.addScheduleTask(); 看着似乎是启动一些定时任务,没错,我们的消息过期删除就是通过定时任务扫描然后删除的

进入定时任务我们就能看到我们的文件删除定时任务了

可以看到这个定时任务是broker启动后 60s后再启动,每次执行的时间间隔是可以配置的,默认10s

实际的过期文件删除逻辑还是被封在再DefaultMessageStore.this.cleanFilesPeriodically();

cleanFilesPeriodically

我们进入cleanFilesPeriodically方法看看

private void cleanFilesPeriodically() {
        this.cleanCommitLogService.run();
        this.cleanConsumeQueueService.run();
        this.correctLogicOffsetService.run();
    }

可以看到有三个清理任务

  • 清理CommitLog
  • 清理ConsumeQueueindex
  • 更新逻辑偏移量即ConsumeQueueStore
  • cleanCommitLogService

    这里我们先看看CommitLog是如何清理的

    
    public void run() {
                try {
                    this.deleteExpiredFiles();
                    this.reDeleteHangedFile();
                } catch (Throwable e) {
                    DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
                }
            }
    

    这里可以看到又分两步

  • 删除已经失效的文件
  • 再次删除第一次未删除掉的文件(比如文件被其他线程占用)
  • 这里我们先看看deleteExpiredFiles方法

    deleteExpiredFiles

    
    private void deleteExpiredFiles() {
                int deleteCount = 0;
                // 获取配置的过期时间 默认72小时 3天
                long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
                // 获取删除文件的时间间隔 默认100毫秒
                int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
                // 1000 * 120 120s 强制删除文件映射
                int destroyMappedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();         
                // 每次删除文件的最大数量 默认10
                int deleteFileBatchMax = DefaultMessageStore.this.getMessageStoreConfig().getDeleteFileBatchMax();
                //是否是删除时间 默认凌晨 4点(04) 04:00~04:59
                boolean isTimeUp = this.isTimeToDelete();
                // 磁盘是否已满
                boolean isUsageExceedsThreshold = this.isSpaceToDelete();
                boolean isManualDelete = this.manualDeleteFileSeveralTimes > 0;
    
                if (isTimeUp || isUsageExceedsThreshold || isManualDelete) {
    
                    if (isManualDelete) {
                        this.manualDeleteFileSeveralTimes--;
                    }
    
                    boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
    
                    LOGGER.info("begin to delete before {} hours file. isTimeUp: {} isUsageExceedsThreshold: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {} deleteFileBatchMax: {}",
                        fileReservedTime,
                        isTimeUp,
                        isUsageExceedsThreshold,
                        manualDeleteFileSeveralTimes,
                        cleanAtOnce,
                        deleteFileBatchMax);
    
                    fileReservedTime *= 60 * 60 * 1000;
                    // 实际清理逻辑
                    deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                        destroyMappedFileIntervalForcibly, cleanAtOnce, deleteFileBatchMax);
                    if (deleteCount > 0) {
                        // 如果是 controller模式,还要清理 EpochFile
                        // If in the controller mode, we should notify the AutoSwitchHaService to truncateEpochFile
                        if (DefaultMessageStore.this.brokerConfig.isEnableControllerMode()) {
                            if (DefaultMessageStore.this.haService instanceof AutoSwitchHAService) {
                                final long minPhyOffset = getMinPhyOffset();
                                ((AutoSwitchHAService) DefaultMessageStore.this.haService).truncateEpochFilePrefix(minPhyOffset - 1);
                            }
                        }
                    } else if (isUsageExceedsThreshold) {
                        LOGGER.warn("disk space will be full soon, but delete file failed.");
                    }
                }
            }
    

    上面有一些代码都已经注释了,可以看到这里主要是做一些判断,是否徐要清理commitLog 有如下几个条件

  • 是否在配置的时间内,默认4点-5点
  • 是否磁盘已满
  • 是否手动调度,默认执行20次
  • 3这里的条件我们可以在这里确定

    可以看到这个参数主要是给外部控制台去使用的

    三个条件任意满足一个就去执行删除逻辑

    实际的文件删除逻辑是在方法

                    deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                        destroyMappedFileIntervalForcibly, cleanAtOnce, deleteFileBatchMax);
    

    commitLog.deleteExpiredFile

    最终是调用的MappedFileQueuedeleteExpiredFileByTime方法

    
    public int deleteExpiredFileByTime(final long expiredTime,
            final int deleteFilesInterval,
            final long intervalForcibly,
            final boolean cleanImmediately,
            final int deleteFileBatchMax) {
            //获取映射文件列表 commitlog文件可能随时有写入,copy一份不影响写入
            Object[] mfs = this.copyMappedFiles(0);
    
            if (null == mfs)
                return 0;
    
            int mfsLength = mfs.length - 1;
            // 删除的文件数量
            int deleteCount = 0;
            // 需要删除的 MappedFile
            List files = new ArrayList();
            int skipFileNum = 0;
            if (null != mfs) {
                //do check before deleting
                checkSelf();
                for (int i = 0; i = liveMaxTimestamp || cleanImmediately) {
                        if (skipFileNum > 0) {
                            log.info("Delete CommitLog {} but skip {} files", mappedFile.getFileName(), skipFileNum);
                        }
                        // 真正的删除逻辑
                        if (mappedFile.destroy(intervalForcibly)) {
                            files.add(mappedFile);
                            deleteCount++;
    
                            if (files.size() >= deleteFileBatchMax) {
                                break;
                            }
    
                            if (deleteFilesInterval > 0 && (i + 1)  this.lastPhysicalMinOffset) {
                    this.lastPhysicalMinOffset = minOffset;
                     // 获取消费队列表
                    ConcurrentMap tables = DefaultMessageStore.this.getConsumeQueueTable();
    
                    for (ConcurrentMap maps : tables.values()) {
                        for (ConsumeQueueInterface logic : maps.values()) {
                            //删除过期文件,并返回删除的文件数量
                            int deleteCount = DefaultMessageStore.this.consumeQueueStore.deleteExpiredFile(logic, minOffset);
                            if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
                                try {
                                    //当上一个ConsumeQueue成功删除之后,下一个ConsumeQueue删除需要等待0.1s
                                    Thread.sleep(deleteLogicsFilesInterval);
                                } catch (InterruptedException ignored) {
                                // 忽略中断异常
                                }
                            }
                        }
                    }
    
                    DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
                }
            }
    

    这里最终的删除逻辑还是在deleteExpiredFile中封在的

    DefaultMessageStore.this.consumeQueueStore.deleteExpiredFile(logic, minOffset);
    

    实际和上面commitLog删除类似,这里我们不过多深究MappedByteBuffer这种偏底层的细节。主要是了解主流程,后面再单独聊聊一些有深度的细节

    idnex删除

    实际我们在queue里面删除逻辑中会看到这个么一个方法

    DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
    

    没错这里面就是删除索引index的文件

    public void deleteExpiredFile(long offset) {
            Object[] files = null;
            try {
                // 获取读锁
                this.readWriteLock.readLock().lock();
                // 如果索引文件列表为空,则直接返回
                if (this.indexFileList.isEmpty()) {
                    return;
                }
                // 获取第一个索引文件的结束物理偏移量
                long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();    
                // 如果第一个索引文件的结束物理偏移量小于commitLog的最小偏移量,说明索引无效需要删除
                if (endPhyOffset 

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论