聊聊TaskExecutorMetricsAutoConfiguration
序
springboot2.6.0版本提供了TaskExecutorMetricsAutoConfiguration,可以自动给线程池加上metrics
TaskExecutorMetricsAutoConfiguration
spring-boot-actuator-autoconfigure-2.7.14-sources.jar!/org/springframework/boot/actuate/autoconfigure/metrics/task/TaskExecutorMetricsAutoConfiguration.java
/** * {@link EnableAutoConfiguration Auto-configuration} for metrics on all available * {@link ThreadPoolTaskExecutor task executors} and {@link ThreadPoolTaskScheduler task * schedulers}. * * @author Stephane Nicoll * @author Scott Frederick * @since 2.6.0 */ @AutoConfiguration(after = { MetricsAutoConfiguration.class, SimpleMetricsExportAutoConfiguration.class, TaskExecutionAutoConfiguration.class, TaskSchedulingAutoConfiguration.class }) @ConditionalOnClass(ExecutorServiceMetrics.class) @ConditionalOnBean({ Executor.class, MeterRegistry.class }) public class TaskExecutorMetricsAutoConfiguration { @Autowired public void bindTaskExecutorsToRegistry(Map executors, MeterRegistry registry) { executors.forEach((beanName, executor) -> { if (executor instanceof ThreadPoolTaskExecutor) { monitor(registry, safeGetThreadPoolExecutor((ThreadPoolTaskExecutor) executor), beanName); } else if (executor instanceof ThreadPoolTaskScheduler) { monitor(registry, safeGetThreadPoolExecutor((ThreadPoolTaskScheduler) executor), beanName); } }); } private void monitor(MeterRegistry registry, ThreadPoolExecutor threadPoolExecutor, String name) { if (threadPoolExecutor != null) { new ExecutorServiceMetrics(threadPoolExecutor, name, Collections.emptyList()).bindTo(registry); } } private ThreadPoolExecutor safeGetThreadPoolExecutor(ThreadPoolTaskExecutor taskExecutor) { try { return taskExecutor.getThreadPoolExecutor(); } catch (IllegalStateException ex) { return null; } } private ThreadPoolExecutor safeGetThreadPoolExecutor(ThreadPoolTaskScheduler taskScheduler) { try { return taskScheduler.getScheduledThreadPoolExecutor(); } catch (IllegalStateException ex) { return null; } } }
这里会遍历executors,然后挨个执行monitor方法,而monitor方法则是创建ExecutorServiceMetrics然后绑定到meterRegistry
ExecutorServiceMetrics
micrometer-core-1.9.13-sources.jar!/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetrics.java
@NonNullApi @NonNullFields public class ExecutorServiceMetrics implements MeterBinder { private static boolean allowIllegalReflectiveAccess = true; private static final InternalLogger log = InternalLoggerFactory.getInstance(ExecutorServiceMetrics.class); private static final String DEFAULT_EXECUTOR_METRIC_PREFIX = ""; @Nullable private final ExecutorService executorService; private final Iterable tags; private final String metricPrefix; public ExecutorServiceMetrics(@Nullable ExecutorService executorService, String executorServiceName, Iterable tags) { this(executorService, executorServiceName, DEFAULT_EXECUTOR_METRIC_PREFIX, tags); } /** * Create an {@code ExecutorServiceMetrics} instance. * @param executorService executor service * @param executorServiceName executor service name which will be used as * {@literal name} tag * @param metricPrefix metrics prefix which will be used to prefix metric name * @param tags additional tags * @since 1.5.0 */ public ExecutorServiceMetrics(@Nullable ExecutorService executorService, String executorServiceName, String metricPrefix, Iterable tags) { this.executorService = executorService; this.tags = Tags.concat(tags, "name", executorServiceName); this.metricPrefix = sanitizePrefix(metricPrefix); } @Override public void bindTo(MeterRegistry registry) { if (executorService == null) { return; } String className = executorService.getClass().getName(); if (executorService instanceof ThreadPoolExecutor) { monitor(registry, (ThreadPoolExecutor) executorService); } else if (executorService instanceof ForkJoinPool) { monitor(registry, (ForkJoinPool) executorService); } else if (allowIllegalReflectiveAccess) { if (className.equals("java.util.concurrent.Executors$DelegatedScheduledExecutorService")) { monitor(registry, unwrapThreadPoolExecutor(executorService, executorService.getClass())); } else if (className.equals("java.util.concurrent.Executors$FinalizableDelegatedExecutorService")) { monitor(registry, unwrapThreadPoolExecutor(executorService, executorService.getClass().getSuperclass())); } else { log.warn("Failed to bind as {} is unsupported.", className); } } else { log.warn("Failed to bind as {} is unsupported or reflective access is not allowed.", className); } } // ...... }
这里主要是bindTo方法,区分了ThreadPoolExecutor及ForkJoinPool
monitor ThreadPoolExecutor
private void monitor(MeterRegistry registry, @Nullable ThreadPoolExecutor tp) { if (tp == null) { return; } FunctionCounter.builder(metricPrefix + "executor.completed", tp, ThreadPoolExecutor::getCompletedTaskCount) .tags(tags) .description("The approximate total number of tasks that have completed execution") .baseUnit(BaseUnits.TASKS) .register(registry); Gauge.builder(metricPrefix + "executor.active", tp, ThreadPoolExecutor::getActiveCount) .tags(tags) .description("The approximate number of threads that are actively executing tasks") .baseUnit(BaseUnits.THREADS) .register(registry); Gauge.builder(metricPrefix + "executor.queued", tp, tpRef -> tpRef.getQueue().size()) .tags(tags) .description("The approximate number of tasks that are queued for execution") .baseUnit(BaseUnits.TASKS) .register(registry); Gauge.builder(metricPrefix + "executor.queue.remaining", tp, tpRef -> tpRef.getQueue().remainingCapacity()) .tags(tags) .description("The number of additional elements that this queue can ideally accept without blocking") .baseUnit(BaseUnits.TASKS) .register(registry); Gauge.builder(metricPrefix + "executor.pool.size", tp, ThreadPoolExecutor::getPoolSize) .tags(tags) .description("The current number of threads in the pool") .baseUnit(BaseUnits.THREADS) .register(registry); Gauge.builder(metricPrefix + "executor.pool.core", tp, ThreadPoolExecutor::getCorePoolSize) .tags(tags) .description("The core number of threads for the pool") .baseUnit(BaseUnits.THREADS) .register(registry); Gauge.builder(metricPrefix + "executor.pool.max", tp, ThreadPoolExecutor::getMaximumPoolSize) .tags(tags) .description("The maximum allowed number of threads in the pool") .baseUnit(BaseUnits.THREADS) .register(registry); }
针对ThreadPoolExecutor主要是上报了executor.completed、executor.active、executor.queued、executor.queue.remaining、executor.pool.size、executor.pool.core、executor.pool.max
monitor ForkJoinPool
private void monitor(MeterRegistry registry, ForkJoinPool fj) { FunctionCounter.builder(metricPrefix + "executor.steals", fj, ForkJoinPool::getStealCount) .tags(tags) .description("Estimate of the total number of tasks stolen from " + "one thread's work queue by another. The reported value " + "underestimates the actual total number of steals when the pool " + "is not quiescent") .register(registry); Gauge.builder(metricPrefix + "executor.queued", fj, ForkJoinPool::getQueuedTaskCount) .tags(tags) .description("An estimate of the total number of tasks currently held in queues by worker threads") .register(registry); Gauge.builder(metricPrefix + "executor.active", fj, ForkJoinPool::getActiveThreadCount) .tags(tags) .description("An estimate of the number of threads that are currently stealing or executing tasks") .register(registry); Gauge.builder(metricPrefix + "executor.running", fj, ForkJoinPool::getRunningThreadCount) .tags(tags) .description( "An estimate of the number of worker threads that are not blocked waiting to join tasks or for other managed synchronization threads") .register(registry); }
针对ForkJoinPool主要是上报了executor.steals、executor.queued、executor.active、executor.running
小结
springboot 2.6.0版本提供了TaskExecutorMetricsAutoConfiguration,它利用micrometer的ExecutorServiceMetrics提供了对Executor的metrics上报。升级到新版本的服务就不用再手工给线程池进行指标上报了。
doc
- TaskExecutorMetricsAutoConfiguration