java框架中例如Tomcat、Dubbo等都离不开线程池,这些框架用到线程的地方,都会用线程池来负责。我们在使用这些框架的时候,会设置线程池参数,用于提高性能。那么开多少线程合适?今天我们将围绕这个问题来学习一下线程池。
为什么使用线程池
平常我们使用java线程的时候,都是直接创建一个Thread
对象,java线程的创建和销毁都会涉及到Thread
对象的创建和销毁,线程切换等问题。创建Thread
对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了。所以线程是一个重量级的对象,应该避免频繁创建和销毁。
一般可以通过“池化”思想来解决上述的问题,而JDK中提供的线程池实现是基于ThreadPoolExecutor
。
使用线程池可以带来一系列好处:
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池
ScheduledThreadPoolExecutor
,就允许任务延期执行或定期执行。
线程池核心设计与实现
总体设计
-
顶层接口是
Executor
,java.util.concurrent.Executor#execute
,用户只需提供Runnable
对象,将任务的运行逻辑提交到执行器(Executor
)中,由Executor
框架完成线程的调配和任务的执行部分。 -
ExecutorService
接口扩展了Executor
并增加了一些能力:- 扩充执行任务的能力,通过调用
submit()
或者invokeAll()
方法可以为一个或一批异步任务生成Future的方法; - 提供了管控线程池的方法,比如调用
shutdown()
等方法停止线程池的运行。
- 扩充执行任务的能力,通过调用
-
AbstractExecutorService
则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。 -
具体实现类是
ThreadPoolExecutor
,ThreadPoolExecutor
将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。 -
ScheduledThreadPoolExecutor
又扩展了ThreadPoolExecutor
和ScheduledExecutorService
接口,增加了调度能力,使任务可以延时定时执行。 -
另外还有一个提供了线程池创建的工厂方法的类
Executors
,用来创建线程池。
本章主要说明ThreadPoolExecutor
的实现原理,ScheduledThreadPoolExecutor
下篇会讨论。
ThreadPoolExecutor实现原理
ThreadPoolExecutor构造参数说明
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize:表示线程池保有的最小线程数。核心线程数,这些核心线程一旦被创建,就不会被销毁。相反,如果是非核心线程,等任务执行完并长时间未被使用则会被销毁。
-
maximumPoolSize:表示线程池创建的最大线程数。
-
keepAliveTime&unit:一个线程如果在一段时间内,都没有执行任务,说明很闲,
keepAliveTime
和unit
就是用来定义这个一段时间的参数。也就是说,如果线程已经空闲了keepAliveTime
和unit
这么久了,而且线程数大于corePoolSize
,那么这个空闲线程就要被回收。 -
workQueue:用来存储任务,当有新的任务请求线程处理时,如果核心线程池已满,那么新来的任务会加入workQueue队列中,workQueue是一个阻塞队列。
-
threadFactory:通过这个参数可以自定义如何创建线程。
-
handler:通过这个参数可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,可以通过这个参数来指定
ThreadPoolExecutor
已经提供了四种策略。 - CallerRunsPolicy:提交任务的线程自己去执行该任务。
- AbortPolicy:默认的拒绝策略,会throws RejectedExecutionException.
- DiscardPolicy:直接丢弃任务,没有任何异常输出。
- DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
ThreadPoolExecutor执行流程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
线程池运行状态
线程池的运行状态,由线程池内部维护,线程池内部使用AtomicInteger
变量,用于维护运行状态runState
和工作线程数workerCount
,高3位保存runState
,低29位保存workerCount
,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS=29,(对于int长度为32来说)表示线程数量的字节位数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 状态掩码,高三位是1,低29位全是0,可以通过 ctl&COUNT_MASK 运算来获取线程池状态
private static final int COUNT_MASK = (1