Java并发编程:深入剖析CyclicBarrier源码

2024年 4月 29日 30.2k 0

引言

CyclicBarrier中文叫做循环栅栏,用来控制线程的执行速率。

适用场景:一组线程在到达栅栏之前,需要相互等待,到达栅栏之后(满足了特定条件),再一起执行。

适用场景好像跟CountDownLatch一样,前面介绍过CountDownLatch的适用场景,跟第二种场景很像,不过还是有点区别:

  • CountDownLatch需要手动调用countDown()方法,这组线程才能一起执行,而CyclicBarrier无需调用调用任何方法,线程会自动执行。
  • CountDownLatch只能使用一次,而CyclicBarrier可以循环使用。
  • 再提一下CountDownLatch的两个适用场景:

  • 当前线程等待其他线程都执行完成之后,再执行。
  • 所有线程满足条件后,再一起执行。
  • 使用示例

    CyclicBarrier常用的方法就一个await()方法,调用await()方法之后,会阻塞当前线程,直到栅栏前的所有线程都调用了await()方法,才会放行,并且一起执行。

    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @author 一灯架构
     * @apiNote CyclicBarrier测试类
     **/
    @Slf4j
    public class CyclicBarrierTest {
    
        public static void main(String[] args) throws InterruptedException {
    
            // 1. 创建一个线程池,用来执行任务
            ExecutorService executorService = Executors.newCachedThreadPool();
    
            // 2. 创建一个循环栅栏,线程数是3
            CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
    
            // 3. 提交9个任务,刚好可以循环3轮
            for (int i = 0; i  {
                    try {
                        // 5. 睡眠1秒,模拟任务准备阶段
                        Thread.sleep(1000);
                        log.info(Thread.currentThread().getName() + " 准备 " + cyclicBarrier.getNumberWaiting());
                        // 6. 阻塞当前任务,直到3个线程都到达栅栏
                        cyclicBarrier.await();
    
                        log.info(Thread.currentThread().getName() + " 执行完成");
                    } catch (Exception e) {
                    }
                });
            }
    
            // 7. 关闭线程池
            executorService.shutdown();
        }
    }

    输出结果:

    10:00:00.001 [pool-1-thread-1] INFO com.yideng.CyclicBarrierTest - pool-1-thread-1 准备 0
    10:00:00.002 [pool-1-thread-2] INFO com.yideng.CyclicBarrierTest - pool-1-thread-2 准备 1
    10:00:00.003 [pool-1-thread-3] INFO com.yideng.CyclicBarrierTest - pool-1-thread-3 准备 2
    10:00:00.003 [pool-1-thread-3] INFO com.yideng.CyclicBarrierTest - pool-1-thread-3 执行完成
    10:00:00.003 [pool-1-thread-1] INFO com.yideng.CyclicBarrierTest - pool-1-thread-1 执行完成
    10:00:00.004 [pool-1-thread-2] INFO com.yideng.CyclicBarrierTest - pool-1-thread-2 执行完成
    10:00:00.010 [pool-1-thread-4] INFO com.yideng.CyclicBarrierTest - pool-1-thread-4 准备 0
    10:00:00.011 [pool-1-thread-5] INFO com.yideng.CyclicBarrierTest - pool-1-thread-5 准备 1
    10:00:01.003 [pool-1-thread-6] INFO com.yideng.CyclicBarrierTest - pool-1-thread-6 准备 2
    10:00:01.004 [pool-1-thread-6] INFO com.yideng.CyclicBarrierTest - pool-1-thread-6 执行完成
    10:00:01.004 [pool-1-thread-4] INFO com.yideng.CyclicBarrierTest - pool-1-thread-4 执行完成
    10:00:01.004 [pool-1-thread-5] INFO com.yideng.CyclicBarrierTest - pool-1-thread-5 执行完成
    10:00:01.114 [pool-1-thread-7] INFO com.yideng.CyclicBarrierTest - pool-1-thread-7 准备 0
    10:00:01.213 [pool-1-thread-8] INFO com.yideng.CyclicBarrierTest - pool-1-thread-8 准备 1
    10:00:01.317 [pool-1-thread-9] INFO com.yideng.CyclicBarrierTest - pool-1-thread-9 准备 2
    10:00:01.318 [pool-1-thread-9] INFO com.yideng.CyclicBarrierTest - pool-1-thread-9 执行完成
    10:00:01.318 [pool-1-thread-7] INFO com.yideng.CyclicBarrierTest - pool-1-thread-7 执行完成
    10:00:01.319 [pool-1-thread-8] INFO com.yideng.CyclicBarrierTest - pool-1-thread-8 执行完成

    示例中CyclicBarrier包含3个线程,提交9个任务,每3个任务为一组,调用await()方法后会相互等待,直到3个线程都调用了await()方法,然后放行,并且一起执行,9个任务会循环3轮,从输出结果中可以看出。

    示例中getNumberWaiting()方法可以查看CyclicBarrier中已经等待的线程数。

    看完了CyclicBarrier的使用方式,再看一下CyclicBarrier的源码实现。

    类属性

    public class CyclicBarrier {
    
        /**
         * 互斥锁,用来保证线程安全
         */
        private final ReentrantLock lock = new ReentrantLock();
    
        /**
         * 栅栏条件操作
         */
        private final Condition trip = lock.newCondition();
        
        /**
         * 栅栏初始线程数
         */
        private final int parties;
        
        /**
         * 到达栅栏后的操作
         */
        private final Runnable barrierCommand;
    
        /**
         * 栅栏前未到达的线程数
         */
        private int count;
    
        /**
         * 当前循环轮数
         */
        private Generation generation = new Generation();
    
        
        private static class Generation {
            boolean broken = false;
        }
    }

    CyclicBarrier内部使用了ReentrantLock来保证线程安全,又使用了Condition来实现线程的等待与唤醒操作。

    初始化

    CyclicBarrier初始化的可以指定线程数和到达栅栏后的操作。

    /**
     * 指定线程数
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    
    /**
     * 指定线程数和到达栅栏后的操作
     * @param parties 线程数
     * @param barrierAction 到达栅栏后的操作
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties  executorService.shutdown());

    看一下await()方法源码。

    await方法源码

    /**
    * await方法入口
    */
    public int await() throws InterruptedException, BrokenBarrierException {
    try {
    return dowait(false, 0L);
    } catch (TimeoutException toe) {
    throw new Error(toe); // cannot happen
    }
    }

    /**
    * await方法核心逻辑
    * @param timed 是否允许超时,false表示不允许
    * @param nanos 超时时间
    */
    private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException, TimeoutException {
    // 1. 加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    // 2. 获取当前循环轮数
    final Generation g = generation;
    if (g.broken) {
    throw new BrokenBarrierException();
    }

    // 3. 如果当前线程已中断,就打破栅栏
    if (Thread.interrupted()) {
    breakBarrier();
    throw new InterruptedException();
    }

    // 4. 计数器减一,如果计数器为零,表示所有线程都到达了栅栏
    int index = --count;
    if (index == 0) {
    boolean ranAction = false;
    try {
    // 5. 如果初始化时指定了barrierCommand,就执行
    final Runnable command = barrierCommand;
    if (command != null) {
    command.run();
    }
    ranAction = true;
    nextGeneration();
    return 0;
    } finally {
    if (!ranAction) {
    breakBarrier();
    }
    }
    }

    for (; ; ) {
    try {
    // 6. 如果不允许超时,就阻塞当前线程
    if (!timed) {
    trip.await();
    } else if (nanos > 0L) {
    nanos = trip.awaitNanos(nanos);
    }
    } catch (InterruptedException ie) {
    if (g == generation && !g.broken) {
    breakBarrier();
    throw ie;
    } else {
    Thread.currentThread().interrupt();
    }
    }

    if (g.broken) {
    throw new BrokenBarrierException();
    }

    if (g != generation) {
    return index;
    }

    // 7. 如果已超时,就打破栅栏
    if (timed && nanos

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论