没看过ReentrantLock源码,别说精通Java并发编程

2024年 3月 18日 103.6k 0

引言

高手程序员与新手程序员一个简单的判断标准,就是有没有使用过CountDownLatch,在互联网公司工作超过3年的程序员基本上应该都用过。CountDownLatch中文名称叫做闭锁,也叫计数锁,不过不是用来加锁的,而是通过计数实现条件等待的功能。CountDownLatch的使用场景有两个:

  • 当前线程等待其他线程都执行完成之后,再执行。
  • 所有线程满足条件后,再一起执行。
  • 使用示例

    CountDownLatch常用的方法就两个,countDown()方法用来将计数器减一,await()方法会阻塞当前线程,直到计数器值等于0。

    场景1:

    先看一下第一种场景,也是最常用的场景:

    • 当前线程等待其他线程都执行完成之后,再执行。

    在工作中什么时候会遇到这种场景呢?比如当前线程需要查询3个数据库,并且把查询结果汇总返回给前端。查询3个数据库的逻辑,可以分别使用3个线程加快查询速度。但是怎么判断3个线程都执行结束了呢?这时候就可以使用CountDownLatch了。

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @author 一灯架构
     * @apiNote CountDownLatch测试类(场景1)
     **/
    public class CountDownLatchTest1 {
    
        public static void main(String[] args) throws InterruptedException {
    
            // 1. 创建一个线程池,用来执行3个查询任务
            ExecutorService executorService = Executors.newFixedThreadPool(3);
    
            // 2. 创建一个计数锁,数量是3
            CountDownLatch countDownLatch = new CountDownLatch(3);
    
            // 3. 启动3个查询任务
            for (int i = 0; i  {
                    try {
                        // 4. 睡眠1秒,模拟任务执行过程
                        Thread.sleep(1000);
                        System.out.println(Thread.currentThread().getName() + " 执行完成");
                        // 5. 任务执行完成,计数器减一
                        countDownLatch.countDown();
                    } catch (InterruptedException e) {
                    }
                });
            }
    
            // 6. 等待所有任务执行完成
            countDownLatch.await();
            System.out.println("所有任务执行完成。");
    
            // 7. 关闭线程池
            executorService.shutdown();
        }
    }

    输出结果:

    pool-1-thread-2 执行完成 pool-1-thread-1 执行完成 pool-1-thread-3 执行完成 所有任务执行完成。

    需要注意的是,这里创建CountDownLatch计数器的时候,指定的数量是3,因为有3个任务。在3个任务没有执行完成之前,await()方法会一直阻塞,直到3个任务都执行完成。

    场景2

    再看一下第二种场景,有些情况用的也比较多:

    • 所有线程满足条件后,再一起执行。

    什么情况下会遇到这种场景呢?比如系统中多个任务线程存在先后依赖关系,必须等待其他线程启动完成后,才能一起执行。

    /**
     * @author 一灯架构
     * @apiNote CountDownLatch测试类(场景2)
     **/
    public class CountDownLatchTest {
    
        public static void main(String[] args) throws InterruptedException {
    
            // 1. 创建一个线程池,用来执行3个任务
            ExecutorService executorService = Executors.newFixedThreadPool(3);
    
            // 2. 创建一个计数锁,数量是1
            CountDownLatch countDownLatch = new CountDownLatch(1);
    
            // 3. 启动3个任务
            for (int i = 0; i  {
                    try {
                        System.out.println(Thread.currentThread().getName() + " 启动完成");
                        // 4. 等待其他任务启动完成
                        countDownLatch.await();
                        // 5. 睡眠1秒,模拟任务执行过程
                        Thread.sleep(1000);
                        System.out.println(Thread.currentThread().getName() + " 执行完成");
                    } catch (InterruptedException e) {
                    }
                });
            }
    
            // 6. 所有任务启动完成,计数器减一
            countDownLatch.countDown();
            System.out.println("所有任务启动完成,开始执行。");
    
            // 7. 关闭线程池
            executorService.shutdown();
        }
    }

    输出结果:

    pool-1-thread-1 启动完成 pool-1-thread-2 启动完成 pool-1-thread-3 启动完成 所有任务启动完成,开始执行。 pool-1-thread-1 执行完成 pool-1-thread-3 执行完成 pool-1-thread-2 执行完成

    需要注意的是,与场景1不同,这里创建CountDownLatch计数器的时候,指定的数量是1,因为3个任务需要满足同一个条件,就是都启动完成,也就是只需要调用一次countDown()方法。 看完了CountDownLatch的使用方式,再看一下CountDownLatch的源码实现。

    类属性

    public class CountDownLatch {
    
        // 只有一个Sync同步变量
        private final Sync sync;
    
        // Sync继承自AQS,主要逻辑都在这里面
        private static final class Sync extends AbstractQueuedSynchronizer {
    
            // 只有这一个构造方法,需要指定计数器数值
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    
    }

    跟ReentrantLock一样,CountDownLatch也没有直接继承AQS,也是采用组合的方式,使用Sync同步变量实现计数的功能,而Sync同步变量才是真正继承AQS的。

    countDown方法源码

    public void countDown() {
        // 底层调用父类AQS中的releaseShared()方法
        sync.releaseShared(1);
    }

    countDown()方法里面调用的是父类AQS中的releaseShared()方法,而releaseShared()方法又在调用子类Sync中tryReleaseShared()方法。

    /**
     * 父类AQS
     */
    public abstract class AbstractQueuedSynchronizer
            extends AbstractOwnableSynchronizer
            implements java.io.Serializable {
                
        public final boolean releaseShared(int arg) {
            // tryReleaseShared()由子类实现
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
        // 定义抽象方法,由子类实现
        protected boolean tryReleaseShared(int arg) {
            throw new UnsupportedOperationException();
        }
    }
    /**
     * 子类Sync
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        
        // 实现父类AQS中的tryReleaseShared()方法
        @Override
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0) {
                    return false;
                }
                int nextc = c-1;
                if (compareAndSetState(c, nextc)) {
                    return nextc == 0;
                }
            }
        }
    }

    而Sync同步类中tryReleaseShared()方法逻辑也很简单,就是把同步状态state值减一。

    await源码

    await()方法底层也是调用父类中acquireSharedInterruptibly()方法,而父类AQS又需要调用子类Sync中的具体实现。

    public void await() throws InterruptedException {
        // 底层调用父类AQS中的releaseShared()方法
        sync.acquireSharedInterruptibly(1);
    }
    /**
     * 父类AQS
     */
    public abstract class AbstractQueuedSynchronizer
            extends AbstractOwnableSynchronizer
            implements java.io.Serializable {
    
        public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            // tryAcquireShared()由子类实现
            if (tryAcquireShared(arg) < 0) {
                doAcquireSharedInterruptibly(arg);
            }
        }
    
        // 定义抽象方法,由子类实现
        protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
    
    }

    子类Sync只需要实现tryAcquireShared()方法即可,而tryAcquireShared()方法的作用就是判断锁是否已经完全释放,即同步状态state=0。

    /**
     * 子类Sync
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
    
        // 实现父类AQS中的tryAcquireShared()方法
        @Override
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
    }

    总结

    看完了CountDownLatch的所有源码,是不是觉得CountDownLatch逻辑很简单。

    因为加锁流程的编排工作已经在父类AQS中实现,子类只需要实现具体的加锁逻辑即可,也就是实现tryReleaseShared()方法和tryAcquireShared()方法。而加锁逻辑也很简单,也就是修改同步状态state的值即可。想要详细了解父类AQS的流程,可以翻看前几篇文章。

    下篇文章再一块学习一下共享锁Semaphore的源码实现。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论