ThreadPoolExecutor应用&源码剖析
1.1 为什么要自定义线程池
首先ThreadPoolExecutor中,一共提供了7个参数,每个参数都是非常核心的属性,在线程池去执行任务时,每个参数都有决定性的作用。
但是如果直接采用JDK提供的方式去构建,可以设置的核心参数最多就两个,这样就会导致对线程池的控制粒度很粗。所以在阿里规范中也推荐自己去自定义线程池。手动的去new ThreadPoolExecutor设置他的一些核心属性。
自定义构建线程池,可以细粒度的控制线程池,去管理内存的属性,并且针对一些参数的设置可能更好的在后期排查问题。
查看一下ThreadPoolExecutor提供的七个核心参数
public ThreadPoolExecutor(
int corePoolSize, // 核心工作线程(当前任务执行结束后,不会被销毁)
int maximumPoolSize, // 最大工作线程(代表当前线程池中,一共可以有多少个工作线程)
long keepAliveTime, // 非核心工作线程在阻塞队列位置等待的时间
TimeUnit unit, // 非核心工作线程在阻塞队列位置等待时间的单位
BlockingQueue workQueue, // 任务在没有核心工作线程处理时,任务先扔到阻塞队列中
ThreadFactory threadFactory, // 构建线程的线程工作,可以设置thread的一些信息
RejectedExecutionHandler handler) { // 当线程池无法处理投递过来的任务时,执行当前的拒绝策略
// 初始化线程池的操作
}
1.2 ThreadPoolExecutor应用
手动new一下,处理的方式还是执行execute或者submit方法。
JDK提供的几种拒绝策略:
-
AbortPolicy:当前拒绝策略会在无法处理任务时,直接抛出一个异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
-
CallerRunsPolicy:当前拒绝策略会在线程池无法处理任务时,将任务交给调用者处理
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
-
DiscardPolicy:当前拒绝策略会在线程池无法处理任务时,直接将任务丢弃掉
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
-
DiscardOldestPolicy:当前拒绝策略会在线程池无法处理任务时,将队列中最早的任务丢弃掉,将当前任务再次尝试交给线程池处理
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
-
自定义Policy:根据自己的业务,可以将任务扔到数据库,也可以做其他操作。
private static class MyRejectedExecution implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("根据自己的业务情况,决定编写的代码!"); } }
代码构建线程池,并处理有无返回结果的任务
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1. 构建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
2,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue(5),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("test-ThreadPoolExecutor");
return thread;
}
},
new MyRejectedExecution()
);
//2. 让线程池处理任务,没返回结果
threadPool.execute(() -> {
System.out.println("没有返回结果的任务");
});
//3. 让线程池处理有返回结果的任务
Future future = threadPool.submit(new Callable() {
@Override
public Object call() throws Exception {
System.out.println("我有返回结果!");
return "返回结果";
}
});
Object result = future.get();
System.out.println(result);
//4. 如果是局部变量的线程池,记得用完要shutdown
threadPool.shutdown();
}
private static class MyRejectedExecution implements RejectedExecutionHandler{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("根据自己的业务情况,决定编写的代码!");
}
}
1.3 ThreadPoolExecutor源码剖析
线程池的源码内容会比较多一点,需要一点一点的去查看,内部比较多。
1.1.1 ThreadPoolExecutor的核心属性
核心属性主要就是ctl,基于ctl拿到线程池的状态以及工作线程个数
在整个线程池的执行流程中,会基于ctl判断上述两个内容
// 当前是线程池的核心属性
// 当前的ctl其实就是一个int类型的数值,内部是基于AtomicInteger套了一层,进行运算时,是原子性的。
// ctl表示着线程池中的2个核心状态:
// 线程池的状态:ctl的高3位,表示线程池状态
// 工作线程的数量:ctl的低29位,表示工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer.SIZE:在获取Integer的bit位个数
// 声明了一个常量:COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
00000000 00000000 00000000 00000001
00100000 00000000 00000000 00000000
00011111 11111111 11111111 11111111
// CAPACITY就是当前工作线程能记录的工作线程的最大个数
private static final int CAPACITY = (1 = STOP,如果满足找个状态,说明线程池已经到了STOP状态甚至已经要凉凉了
// 线程池到STOP状态,并且当前线程还没有中断,确保线程是中断的,进到if内部执行中断方法
// if(runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()) {中断线程}
// 如果线程池状态不是STOP,确保线程不是中断的。
// 如果发现线程中断标记位是true了,再次查看线程池状态是大于STOP了,再次中断线程
// 这里其实就是做了一个事情,如果线程池状态 >= STOP,确保线程中断了。
if (
(
runStateAtLeast(ctl.get(), STOP) ||
( Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) )
)
&& !wt.isInterrupted())
wt.interrupt();
try {
// 勾子函数在线程池中没有做任何的实现,如果需要在线程池执行任务前后做一些额外的处理,可以重写勾子函数
// 前置勾子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 前后置勾子函数
afterExecute(task, thrown);
}
} finally {
// 任务执行完,丢掉任务
task = null;
// 当前工作线程处理的任务数+1
w.completedTasks++;
// 执行unlock方法,此时shutdown方法才可以中断当前线程
w.unlock();
}
}
// 如果while循环结束,正常走到这,说明是正常结束
// 正常结束的话,在getTask中就会做一个额外的处理,将ctl - 1,代表工作线程没一个。
completedAbruptly = false;
} finally {
// 考虑干掉工作线程
processWorkerExit(w, completedAbruptly);
}
}
// 工作线程结束前,要执行当前方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果是异常结束
if (completedAbruptly)
// 将ctl - 1,扣掉一个工作线程
decrementWorkerCount();
// 操作Worker,为了线程安全,加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 当前工作线程处理的任务个数累加到线程池处理任务的个数属性中
completedTaskCount += w.completedTasks;
// 将工作线程从hashSet中移除
workers.remove(w);
} finally {
// 释放锁
mainLock.unlock();
}
// 只要工作线程凉了,查看是不是线程池状态改变了。
tryTerminate();
// 获取ctl
int c = ctl.get();
// 判断线程池状态,当前线程池要么是RUNNING,要么是SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 如果正常结束工作线程
if (!completedAbruptly) {
// 如果核心线程允许超时,min = 0,否则就是核心线程个数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min == 0,可能会出现没有工作线程,并且阻塞队列有任务没有线程处理
if (min == 0 && ! workQueue.isEmpty())
// 至少要有一个工作线程处理阻塞队列任务
min = 1;
// 如果工作线程个数 大于等于1,不怕没线程处理,正常return
if (workerCountOf(c) >= min)
return;
}
// 异常结束,为了避免出现问题,添加一个空任务的非核心线程来填补上刚刚异常结束的工作线程
addWorker(null, false);
}
}
1.1.7 ThreadPoolExecutor的getTask方法
工作线程在去阻塞队列获取任务前,要先查看线程池状态
如果状态没问题,去阻塞队列take或者是poll任务
第二个循环时,不但要判断线程池状态,还要判断当前工作线程是否可以被干掉
// 当前方法就在阻塞队列中获取任务
// 前面半部分是判断当前工作线程是否可以返回null,结束。
// 后半部分就是从阻塞队列中拿任务
private Runnable getTask() {
// timeOut默认值是false。
boolean timedOut = false;
// 死循环
for (;;) {
// 拿到ctl
int c = ctl.get();
// 拿到线程池的状态
int rs = runStateOf(c);
// 如果线程池状态是STOP,没有必要处理阻塞队列任务,直接返回null
// 如果线程池状态是SHUTDOWN,并且阻塞队列是空的,直接返回null
if (rs >= SHUTDOWN &&
(rs >= STOP || workQueue.isEmpty())) {
// 如果可以返回null,先扣减工作线程个数
decrementWorkerCount();
// 返回null,结束runWorker的while循环
return null;
}
// 基于ctl拿到工作线程个数
int wc = workerCountOf(c);
// 核心线程允许超时,timed为true
// 工作线程个数大于核心线程数,timed为true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (
// 如果工作线程个数,大于最大线程数。(一般情况不会满足),把他看成false
// 第二个判断代表,只要工作线程数小于等于核心线程数,必然为false
// 即便工作线程个数大于核心线程数了,此时第一次循环也不会为true,因为timedOut默认值是false
// 考虑第二次循环了,因为循环内部必然有修改timeOut的位置
(wc > maximumPoolSize || (timed && timedOut))
&&
// 要么工作线程还有,要么阻塞队列为空,并且满足上述条件后,工作线程才会走到if内部,结束工作线程
(wc > 1 || workQueue.isEmpty())
) {
// 第二次循环才有可能到这。
// 正常结束,工作线程 - 1,因为是CAS操作,如果失败了,重新走for循环
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 工作线程从阻塞队列拿任务
try {
// 如果是核心线程,timed是false,如果是非核心线程,timed就是true
Runnable r = timed ?
// 如果是非核心,走poll方法,拿任务,等待一会
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 如果是核心,走take方法,死等。
workQueue.take();
// 从阻塞队列拿到的任务不为null,这边就正常返回任务,去执行
if (r != null)
return r;
// 说明当前线程没拿到任务,将timeOut设置为true,在上面就可以返回null退出了。
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
1.1.8 ThreadPoolExecutor的关闭方法
首先查看shutdownNow方法,可以从RUNNING状态转变为STOP
// shutDownNow方法,shutdownNow不会处理阻塞队列的任务,将任务全部给你返回了。
public List shutdownNow() {
// 声明返回结果
List tasks;
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 不关注这个方法……
checkShutdownAccess();
// 将线程池状态修改为STOP
advanceRunState(STOP);
// 无论怎么,直接中断工作线程。
interruptWorkers();
// 将阻塞队列的任务全部扔到List集合中。
tasks = drainQueue();
} finally {
// 释放锁
mainLock.unlock();
}
tryTerminate();
return tasks;
}
// 将线程池状态修改为STOP
private void advanceRunState(int STOP) {
// 死循环。
for (;;) {
// 获取ctl属性的值
int c = ctl.get();
// 第一个判断:如果当前线程池状态已经大于等于STOP了,不管了,告辞。
if (runStateAtLeast(c, STOP) ||
// 基于CAS,将ctl从c修改为STOP状态,不修改工作线程个数,但是状态变为了STOP
// 如果修改成功结束
ctl.compareAndSet(c, ctlOf(STOP, workerCountOf(c))))
break;
}
}
// 无论怎么,直接中断工作线程。
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历HashSet,拿到所有的工作线程,直接中断。
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
// 移除阻塞队列,内容全部扔到List集合中
private List drainQueue() {
BlockingQueue q = workQueue;
ArrayList taskList = new ArrayList();
// 阻塞队列自带的,直接清空阻塞队列,内容扔到List集合
q.drainTo(taskList);
// 为了避免任务丢失,重新判断,是否需要编辑阻塞队列,重新扔到List
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
// 查看当前线程池是否可以变为TERMINATED状态
final void tryTerminate() {
// 死循环。
for (;;) {
// 拿到ctl
int c = ctl.get();
// 如果是RUNNING,直接告辞。
// 如果状态已经大于等于TIDYING,马上就要凉凉,直接告辞。
// 如果状态是SHUTDOWN,但是阻塞队列还有任务,直接告辞。
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果还有工作线程
if (workerCountOf(c) != 0) {
// 再次中断工作线程
interruptIdleWorkers(ONLY_ONE);
// 告辞,等你工作线程全完事,我这再尝试进入到TERMINATED状态
return;
}
// 加锁,为了可以执行Condition的释放操作
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将线程池状态修改为TIDYING状态,如果成功,继续往下走
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 这个方法是空的,如果你需要在线程池关闭后做一些额外操作,这里你可以自行实现
terminated();
} finally {
// 最终修改为TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
// 线程池提供了一个方法,主线程在提交任务到线程池后,是可以继续做其他操作的。
// 咱们也可以让主线程提交任务后,等待线程池处理完毕,再做后续操作
// 这里线程池凉凉后,要唤醒哪些调用了awaitTermination方法的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
再次shutdown方法,可以从RUNNING状态转变为SHUTDOWN
shutdown状态下,不会中断正在干活的线程,而且会处理阻塞队列中的任务
public void shutdown() {
// 加锁。。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 不看。
checkShutdownAccess();
// 里面是一个死循环,将线程池状态修改为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
// 说了,这个是为了ScheduleThreadPoolExecutor准备的,不管
onShutdown();
} finally {
mainLock.unlock();
}
// 尝试结束线程
tryTerminate();
}
// 中断空闲线程
private void interruptIdleWorkers(boolean onlyOne) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 如果线程没有中断,那么就去获取Worker的锁,基于tryLock可知,不会中断正在干活的线程
if (!t.isInterrupted() && w.tryLock()) {
try {
// 会中断空闲线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
1.4 线程池的核心参数设计规则
线程池的使用难度不大,难度在于线程池的参数并不好配置。
主要难点在于任务类型无法控制,比如任务有CPU密集型,还有IO密集型,甚至还有混合型的。
因为IO咱们无法直接控制,所以很多时间按照一些书上提供的一些方法,是无法解决问题的。
《Java并发编程实践》
想调试出一个符合当前任务情况的核心参数,最好的方式就是测试。
需要将项目部署到测试环境或者是沙箱环境中,结果各种压测得到一个相对符合的参数。
如果每次修改项目都需要重新部署,成本太高了。
此时咱们可以实现一个动态监控以及修改线程池的方案。
因为线程池的核心参数无非就是:
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
- workQueue:工作队列
线程池中提供了获取核心信息的get方法,同时也提供了动态修改核心属性的set方法。
也可以采用一些开源项目提供的方式去做监控和修改
比如hippo4j就可以对线程池进行监控,而且可以和SpringBoot整合。
Github地址:github.com/opengoofy/h…
官方文档:hippo4j.cn/docs/user_d…