结合源码聊一聊为何线上RocketMQ偶尔出现system busy

2023年 10月 12日 44.8k 0

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

RocketMQ 版本

  • 5.1.0

背景

继之前研究过的RocketMQ发送消息还有这种坑?遇到SYSTEM_BUSY不重试?

今天我们来分析分析RocketMQ什么情况下会出现system busy,因为线上的RocketMQ集群偶尔会出现[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while。所以打算详细研究下为何会出现这种情况

总共有几种system busy

我们通过查看SYSTEM_BUSY状态码调用的方式可以看到总共有如下几种system busy

  • [REJECTREQUEST]system busy, start flow control for a while
  • too many requests and system thread pool busy, RejectedExecutionException
  • [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue
  • [PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d
  • Grpc这边的异常我们暂时不讨论,因为只是简单的超时判断,和其他四种不太一样

    下面我们结合源码来详细分析这四种情况是如何出现的

    [REJECTREQUEST]system busy

    何时会抛出[REJECTREQUEST]system busy

    先说结论

  • 操作系统页面缓存(Os PageCache)繁忙
  • 如果开启堆外内存存储消息,堆外内存不够也会抛出[REJECTREQUEST]system busy
  • 下面我们来结合源码具体分析下

    我们直接全局搜索找到对应的源码,然后看看方法

    这里看不到实际的逻辑,所以我们继续往上看看,NettyRequestProcessor接口的实现类有很多,我们主要看SendMessageProcessor

    这里第一个if else我们不用管,我们现在不是使用的这种模式部署的,主要看下面的代码

    这里可以看到是否抛出[REJECTREQUEST]system busy异常主要有两个条件

  • 检查操作系统页面缓存(Os PageCache)是否繁忙
  • 检查暂存器池(transientStorePool)是否不足
  • 如何检查操作系统页面缓存(Os PageCache)是否繁忙

    @Override
        public boolean isOSPageCacheBusy() {
            long begin = this.getCommitLog().getBeginTimeInLock();
            long diff = this.systemClock.now() - begin;
    
            return diff  this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
        }
    
    • begin: (将消息写入Commitlog文件所持有锁的时间),beginbeginTimeInLock,在asyncPutMessages中赋值

    也就是消息追加到MappedFile的开始时间

    • diff: (一次消息追加过程中持有锁的总时长,即往内存映射文件或pageCache追加一条消息所耗时间)

    diff 大于 配置的osPageCacheBusyTimeOutMills时间,默认1s就表示Os PageCache Busy

    如何检查暂存器池(transientStorePool)是否不足

    这里的transientStorePool实际是对外内存,判断条件的代码我们来看看

        @Override
        public boolean isTransientStorePoolDeficient() {
            return remainTransientStoreBufferNumbs() == 0;
        }
    
        public int remainTransientStoreBufferNumbs() {
            return this.transientStorePool.availableBufferNums();
        }
    
        public int availableBufferNums() {
            if (messageStore.isTransientStorePoolEnable()) {
                return availableBuffers.size();
            }
            return Integer.MAX_VALUE;
        }
    

    首先判断是否开启transientStorePoolEnable堆外内存存储消息,默认为false

    如果没开启返回Integer.MAX_VALUE一定不为0.

    所以如果没有开启transientStorePoolEnable,则只会判断操作系统页面缓存(Os PageCache)是否繁忙

    开启了transientStorePoolEnable还会判断堆外内存是否够用

    堆外内存的相关配置通过transientStorePoolSizemappedFileSizeCommitLog配置,默认为5个ByteBuffer,每个1G
    所以开启堆外内存存储消息,会额外消耗5G内存。

    如果对外内存不够则建议调整transientStorePoolSize的值

    too many requests and system thread pool busy

    这里结合源码分析主要是在线程池提交任务被拒绝的时候会给client返回这个异常,我们具体来看看这个线程池

    我们看看队列的容量是多少

    this.sendThreadPoolQueue = new LinkedBlockingQueue(this.brokerConfig.getSendThreadPoolQueueCapacity());
    

    可以看到默认是1w,也就是如果发送消息的请求超过了1w未处理,就会直接给client抛出这个异常

    [TIMEOUT_CLEAN_QUEUE]broker busy

    我们全局搜索[TIMEOUT_CLEAN_QUEUE]broker busy

    有两处地方,我们先看看RemotingProtocolServer

    this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 10, 10, TimeUnit.SECONDS);
    

    可以看到使用启动器启动扫描去处理过期的请求,如何处理呢

    这里就是检验线程池中的任务是否超过执行时间,如果超过则移除,然后给client返回[TIMEOUT_CLEAN_QUEUE]broker busy,具体的超时时间不同的线程池有不同的配置,

    消息发送的3s

    我们再看看BrokerFastFailure

    这里其实可以看到也是清理线程池的BlockingQueue,和上面的RemotingProtocolServer类似,不同的是这里的这些BlockingQueue主要是给broker使用的,上面的主要是给远程通信用的。

    [PCBUSY_CLEAN_QUEUE]broker busy

    这里可以看到主要是如果发生了 OS page cache is busy,每隔10毫秒执行一次,直接去将broker中等待处理的消息queue全部丢弃,返回失败

    这里和上面[TIMEOUT_CLEAN_QUEUE]broker busy不同的是如果发生OS page cache is busy是直接丢弃,不是等待超时

    两者都需要broker开启快速失败即brokerFastFailureEnable为true,默认为true

    总结

    本次我们结合源码分析了出现system busy的四种情况

  • OS page cache is busy
  • broker中消息发送线程池(sendMessageExecutor)处理任务超时
  • OS page cache is busy,开始清理送线程池(sendMessageExecutor)中的queue任务
  • 消息发送线程池(sendMessageExecutor)队列容量满了(默认1w)
  • 如何缓解这几种异常呢?本质都是broker的压力太大了,处理不过来。

    最好的解决方式就是提升broker的处理速度,即RocketMQ的 QPS。
    有以下几种方式

  • 开启读写分离(即transientStorePoolEnable = true)
  • 扩容broker集群
  • 优化broker qps(实际1中的方法也属于这条)
  • 不太可取方案:

  • 关闭快速失败(brokerFastFailureEnable = true)
  • 扩大sendMessageExecutor线程池的队列容量
  • 增大osPageCacheBusyTimeOutMills超时时间
  • 参考

    • 博客
    • RocketMQ源码

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论