引言
CyclicBarrier中文叫做循环栅栏,用来控制线程的执行速率。
适用场景:一组线程在到达栅栏之前,需要相互等待,到达栅栏之后(满足了特定条件),再一起执行。
适用场景好像跟CountDownLatch一样,前面介绍过CountDownLatch的适用场景,跟第二种场景很像,不过还是有点区别:
再提一下CountDownLatch的两个适用场景:
使用示例
CyclicBarrier常用的方法就一个await()方法,调用await()方法之后,会阻塞当前线程,直到栅栏前的所有线程都调用了await()方法,才会放行,并且一起执行。
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author 一灯架构
* @apiNote CyclicBarrier测试类
**/
@Slf4j
public class CyclicBarrierTest {
public static void main(String[] args) throws InterruptedException {
// 1. 创建一个线程池,用来执行任务
ExecutorService executorService = Executors.newCachedThreadPool();
// 2. 创建一个循环栅栏,线程数是3
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
// 3. 提交9个任务,刚好可以循环3轮
for (int i = 0; i {
try {
// 5. 睡眠1秒,模拟任务准备阶段
Thread.sleep(1000);
log.info(Thread.currentThread().getName() + " 准备 " + cyclicBarrier.getNumberWaiting());
// 6. 阻塞当前任务,直到3个线程都到达栅栏
cyclicBarrier.await();
log.info(Thread.currentThread().getName() + " 执行完成");
} catch (Exception e) {
}
});
}
// 7. 关闭线程池
executorService.shutdown();
}
}
输出结果:
10:00:00.001 [pool-1-thread-1] INFO com.yideng.CyclicBarrierTest - pool-1-thread-1 准备 0
10:00:00.002 [pool-1-thread-2] INFO com.yideng.CyclicBarrierTest - pool-1-thread-2 准备 1
10:00:00.003 [pool-1-thread-3] INFO com.yideng.CyclicBarrierTest - pool-1-thread-3 准备 2
10:00:00.003 [pool-1-thread-3] INFO com.yideng.CyclicBarrierTest - pool-1-thread-3 执行完成
10:00:00.003 [pool-1-thread-1] INFO com.yideng.CyclicBarrierTest - pool-1-thread-1 执行完成
10:00:00.004 [pool-1-thread-2] INFO com.yideng.CyclicBarrierTest - pool-1-thread-2 执行完成
10:00:00.010 [pool-1-thread-4] INFO com.yideng.CyclicBarrierTest - pool-1-thread-4 准备 0
10:00:00.011 [pool-1-thread-5] INFO com.yideng.CyclicBarrierTest - pool-1-thread-5 准备 1
10:00:01.003 [pool-1-thread-6] INFO com.yideng.CyclicBarrierTest - pool-1-thread-6 准备 2
10:00:01.004 [pool-1-thread-6] INFO com.yideng.CyclicBarrierTest - pool-1-thread-6 执行完成
10:00:01.004 [pool-1-thread-4] INFO com.yideng.CyclicBarrierTest - pool-1-thread-4 执行完成
10:00:01.004 [pool-1-thread-5] INFO com.yideng.CyclicBarrierTest - pool-1-thread-5 执行完成
10:00:01.114 [pool-1-thread-7] INFO com.yideng.CyclicBarrierTest - pool-1-thread-7 准备 0
10:00:01.213 [pool-1-thread-8] INFO com.yideng.CyclicBarrierTest - pool-1-thread-8 准备 1
10:00:01.317 [pool-1-thread-9] INFO com.yideng.CyclicBarrierTest - pool-1-thread-9 准备 2
10:00:01.318 [pool-1-thread-9] INFO com.yideng.CyclicBarrierTest - pool-1-thread-9 执行完成
10:00:01.318 [pool-1-thread-7] INFO com.yideng.CyclicBarrierTest - pool-1-thread-7 执行完成
10:00:01.319 [pool-1-thread-8] INFO com.yideng.CyclicBarrierTest - pool-1-thread-8 执行完成
示例中CyclicBarrier包含3个线程,提交9个任务,每3个任务为一组,调用await()方法后会相互等待,直到3个线程都调用了await()方法,然后放行,并且一起执行,9个任务会循环3轮,从输出结果中可以看出。
示例中getNumberWaiting()方法可以查看CyclicBarrier中已经等待的线程数。
看完了CyclicBarrier的使用方式,再看一下CyclicBarrier的源码实现。
类属性
public class CyclicBarrier {
/**
* 互斥锁,用来保证线程安全
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* 栅栏条件操作
*/
private final Condition trip = lock.newCondition();
/**
* 栅栏初始线程数
*/
private final int parties;
/**
* 到达栅栏后的操作
*/
private final Runnable barrierCommand;
/**
* 栅栏前未到达的线程数
*/
private int count;
/**
* 当前循环轮数
*/
private Generation generation = new Generation();
private static class Generation {
boolean broken = false;
}
}
CyclicBarrier内部使用了ReentrantLock来保证线程安全,又使用了Condition来实现线程的等待与唤醒操作。
初始化
CyclicBarrier初始化的可以指定线程数和到达栅栏后的操作。
/**
* 指定线程数
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
/**
* 指定线程数和到达栅栏后的操作
* @param parties 线程数
* @param barrierAction 到达栅栏后的操作
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties executorService.shutdown());
看一下await()方法源码。
await方法源码
/**
* await方法入口
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/**
* await方法核心逻辑
* @param timed 是否允许超时,false表示不允许
* @param nanos 超时时间
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException, TimeoutException {
// 1. 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 2. 获取当前循环轮数
final Generation g = generation;
if (g.broken) {
throw new BrokenBarrierException();
}
// 3. 如果当前线程已中断,就打破栅栏
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 4. 计数器减一,如果计数器为零,表示所有线程都到达了栅栏
int index = --count;
if (index == 0) {
boolean ranAction = false;
try {
// 5. 如果初始化时指定了barrierCommand,就执行
final Runnable command = barrierCommand;
if (command != null) {
command.run();
}
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction) {
breakBarrier();
}
}
}
for (; ; ) {
try {
// 6. 如果不允许超时,就阻塞当前线程
if (!timed) {
trip.await();
} else if (nanos > 0L) {
nanos = trip.awaitNanos(nanos);
}
} catch (InterruptedException ie) {
if (g == generation && !g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken) {
throw new BrokenBarrierException();
}
if (g != generation) {
return index;
}
// 7. 如果已超时,就打破栅栏
if (timed && nanos