这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
RocketMQ版本
- 5.1.0
入口
这里消息删除的源码入口其实不太好直接确定,我们可以通过启动broker
的代码开始即BrokerStartup
的main
方法
这里我们直接进入到controller.start();
方法看看
里面的代码很长,我们可以看到有一个方法startBasicService();
进入startBasicService();
方法后我们可以看到有一个this.messageStore.start();
这个messageStore
就是封在了我们的一些消息文件的操作 可以简单看看他的一些方法
由于方法众多,我们回到今天的主线,继续查看start()
方法的实现类即DefaultMessageStore
在start
方法里面我们可以看到一个比较核心的方法this.addScheduleTask();
看着似乎是启动一些定时任务,没错,我们的消息过期删除就是通过定时任务扫描然后删除的
进入定时任务我们就能看到我们的文件删除定时任务了
可以看到这个定时任务是broker启动后 60s后再启动,每次执行的时间间隔是可以配置的,默认10s
实际的过期文件删除逻辑还是被封在再DefaultMessageStore.this.cleanFilesPeriodically();
中
cleanFilesPeriodically
我们进入cleanFilesPeriodically
方法看看
private void cleanFilesPeriodically() {
this.cleanCommitLogService.run();
this.cleanConsumeQueueService.run();
this.correctLogicOffsetService.run();
}
可以看到有三个清理任务
CommitLog
ConsumeQueue
、index
ConsumeQueueStore
cleanCommitLogService
这里我们先看看CommitLog
是如何清理的
public void run() {
try {
this.deleteExpiredFiles();
this.reDeleteHangedFile();
} catch (Throwable e) {
DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e);
}
}
这里可以看到又分两步
这里我们先看看deleteExpiredFiles
方法
deleteExpiredFiles
private void deleteExpiredFiles() {
int deleteCount = 0;
// 获取配置的过期时间 默认72小时 3天
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
// 获取删除文件的时间间隔 默认100毫秒
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
// 1000 * 120 120s 强制删除文件映射
int destroyMappedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// 每次删除文件的最大数量 默认10
int deleteFileBatchMax = DefaultMessageStore.this.getMessageStoreConfig().getDeleteFileBatchMax();
//是否是删除时间 默认凌晨 4点(04) 04:00~04:59
boolean isTimeUp = this.isTimeToDelete();
// 磁盘是否已满
boolean isUsageExceedsThreshold = this.isSpaceToDelete();
boolean isManualDelete = this.manualDeleteFileSeveralTimes > 0;
if (isTimeUp || isUsageExceedsThreshold || isManualDelete) {
if (isManualDelete) {
this.manualDeleteFileSeveralTimes--;
}
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
LOGGER.info("begin to delete before {} hours file. isTimeUp: {} isUsageExceedsThreshold: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {} deleteFileBatchMax: {}",
fileReservedTime,
isTimeUp,
isUsageExceedsThreshold,
manualDeleteFileSeveralTimes,
cleanAtOnce,
deleteFileBatchMax);
fileReservedTime *= 60 * 60 * 1000;
// 实际清理逻辑
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMappedFileIntervalForcibly, cleanAtOnce, deleteFileBatchMax);
if (deleteCount > 0) {
// 如果是 controller模式,还要清理 EpochFile
// If in the controller mode, we should notify the AutoSwitchHaService to truncateEpochFile
if (DefaultMessageStore.this.brokerConfig.isEnableControllerMode()) {
if (DefaultMessageStore.this.haService instanceof AutoSwitchHAService) {
final long minPhyOffset = getMinPhyOffset();
((AutoSwitchHAService) DefaultMessageStore.this.haService).truncateEpochFilePrefix(minPhyOffset - 1);
}
}
} else if (isUsageExceedsThreshold) {
LOGGER.warn("disk space will be full soon, but delete file failed.");
}
}
}
上面有一些代码都已经注释了,可以看到这里主要是做一些判断,是否徐要清理commitLog
有如下几个条件
3这里的条件我们可以在这里确定
可以看到这个参数主要是给外部控制台去使用的
三个条件任意满足一个就去执行删除逻辑
实际的文件删除逻辑是在方法
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMappedFileIntervalForcibly, cleanAtOnce, deleteFileBatchMax);
commitLog.deleteExpiredFile
最终是调用的MappedFileQueue
的deleteExpiredFileByTime
方法
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately,
final int deleteFileBatchMax) {
//获取映射文件列表 commitlog文件可能随时有写入,copy一份不影响写入
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return 0;
int mfsLength = mfs.length - 1;
// 删除的文件数量
int deleteCount = 0;
// 需要删除的 MappedFile
List files = new ArrayList();
int skipFileNum = 0;
if (null != mfs) {
//do check before deleting
checkSelf();
for (int i = 0; i = liveMaxTimestamp || cleanImmediately) {
if (skipFileNum > 0) {
log.info("Delete CommitLog {} but skip {} files", mappedFile.getFileName(), skipFileNum);
}
// 真正的删除逻辑
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++;
if (files.size() >= deleteFileBatchMax) {
break;
}
if (deleteFilesInterval > 0 && (i + 1) this.lastPhysicalMinOffset) {
this.lastPhysicalMinOffset = minOffset;
// 获取消费队列表
ConcurrentMap tables = DefaultMessageStore.this.getConsumeQueueTable();
for (ConcurrentMap maps : tables.values()) {
for (ConsumeQueueInterface logic : maps.values()) {
//删除过期文件,并返回删除的文件数量
int deleteCount = DefaultMessageStore.this.consumeQueueStore.deleteExpiredFile(logic, minOffset);
if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
try {
//当上一个ConsumeQueue成功删除之后,下一个ConsumeQueue删除需要等待0.1s
Thread.sleep(deleteLogicsFilesInterval);
} catch (InterruptedException ignored) {
// 忽略中断异常
}
}
}
}
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
}
}
这里最终的删除逻辑还是在deleteExpiredFile
中封在的
DefaultMessageStore.this.consumeQueueStore.deleteExpiredFile(logic, minOffset);
实际和上面commitLog
删除类似,这里我们不过多深究MappedByteBuffer
这种偏底层的细节。主要是了解主流程,后面再单独聊聊一些有深度的细节
idnex删除
实际我们在queue里面删除逻辑中会看到这个么一个方法
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
没错这里面就是删除索引index的文件
public void deleteExpiredFile(long offset) {
Object[] files = null;
try {
// 获取读锁
this.readWriteLock.readLock().lock();
// 如果索引文件列表为空,则直接返回
if (this.indexFileList.isEmpty()) {
return;
}
// 获取第一个索引文件的结束物理偏移量
long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
// 如果第一个索引文件的结束物理偏移量小于commitLog的最小偏移量,说明索引无效需要删除
if (endPhyOffset