CountDownLatch原理深度解析

2023年 7月 18日 22.1k 0

大家好,我是小趴菜,在高并发编程中,AbstractQueuedSynchronizer(简称AQS)抽象的队列同步器是我们必须掌握的,AQS底层提供了二种锁模式

  • 独占锁:ReentrantLock就是基于独占锁模式实现的
  • 共享锁:CountDownLatch,ReadWriteLock,Semplere都是基于共享锁模式实现的

接下来我们通过CountDownLatch底层实现原理来了解AQS共享锁模式的实现原理

CountDownLatch用法

CountDownLatch一般是在需要等待多个线程全部执行完毕之后才继续执行剩下的业务逻辑,举个例子,比如你现在去餐厅吃饭点了份辣子鸡。

这时候餐厅有处理鸡块的,有配置调料的,还有烧菜的等多个厨师一起协作最后才能完成一道辣子鸡,而且这几个步骤可以是一起执行的。一个厨师在配置调料的同时,另外一个厨师正在处理鸡块,还有一个厨师正在热油等。

但是作为顾客的我们来说,我们必须等到这几个厨师全部执行完毕之后我们才能吃到辣子鸡


public static void main(String[] args) throws Exception{

    CountDownLatch countDownLatch = new CountDownLatch(3);
    
    new Thread(() -> {
        System.out.println("处理鸡块");
        try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
        countDownLatch.countDown();
    }).start();

    new Thread(() -> {
        System.out.println("配置调料");
        try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
        countDownLatch.countDown();
    }).start();

    new Thread(() -> {
        System.out.println("起锅热油");
        try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
        countDownLatch.countDown();
    }).start();
    
    //会阻塞,等待所有的线程执行结束之后才会继续执行剩下的逻辑
    countDownLatch.await();
    //执行剩下业务逻辑
}

首先我们看 countDownLatch.await(); 这段阻塞的代码,看下底层是如何让线程进入阻塞等待的

进入之后到CountDownLatch类中,然后继续这个方法

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

此时就会进去AQS的内部实现中

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
      
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

首先我们看下 tryAcquireShared(arg) < 0 这个判断是干嘛的,他是进入到CountDownLatch的类中,这里判断 state的值是否等于0,在初始化 CountDownLatch 的时候,我们将state的值初始化成了3,只有当执行一次 countDownLatch.countDown(); 的时候,这个值才会减1,但是此时我们的线程还没有执行结束,所以这个值不会等于0,那么这时候就会返回 -1

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

返回-1以后,就会执行 doAcquireSharedInterruptibly(arg); 这个业务逻辑了

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //创建一个新的共享的Node节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                //尝试判断state是否已经等于0了,如果是,那么主线程就不用阻塞了,
                //可以继续执行了,以此来提高程序性能
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())  //这里才是真正让主线程阻塞的核心方法
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

我们看一下这里的 addWaiter(Node.SHARED)方法

private Node addWaiter(Node mode) {
    //把当前线程,也就是main线程封装成一个Node,并设置成共享模式
    Node node = new Node(Thread.currentThread(), mode);
    
    //在第一次的时候,这个tail节点是为null的
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //初始化链表
    enq(node);
    return node;
}

分析下初始化双向链表逻辑

private Node enq(final Node node) {
    for (;;) {  //注意:这里是死循环
        Node t = tail;
        //第一次进来,因为tail=null,所以会进入到if里面去
        if (t == null) { // Must initialize
            //这里新创建一个空的Node节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
  • 第一次进来:因为第一次进来的时候tail=null,所以会进入到if中去,然后创建一个新的空的节点,然后将头节点和尾节点都指向这个节点

image.png

  • 然后进入第二次循环:这时候tail已经不为空了,所以会进入到else分支里面去,所以的操作就是将当前线程封装成的Node设置尾巴节点,然后设置前置节点和后置节点的关系

image.png

现在回头addWaiter()方法已经清楚了,继续分析剩下的逻辑

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //创建一个新的共享的Node节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) { //这里还是个死循环
            //拿到头节点
            final Node p = node.predecessor();
            if (p == head) {
                //继续判断state的值是否等于0,如果已经等于0了,那么主线程就不需要阻塞等待了,可以继续执行了
                int r = tryAcquireShared(arg);
                //如果state的值等于0,这里r=1,不等于0,r=-1
                //我们假设现在就是不等于0,也就是其它线程还没有执行结束,所以不会进入到if
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

进入 shouldParkAfterFailedAcquire()方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //这里获取Node的waitStatus,在Node初始化之后,默认是是0,
    //所以会进入到 else 分支里面去,将Node的waitStatus的值修改成Node.SIGNAL
    //但是在上一步中是一个死循环,所以会再次进入到这个方法中,这时候waitStatus的值是Node.SIGNAL
    //所以会进入到第一个if分支里面去,最后返回true
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else { 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

这时候shouldParkAfterFailedAcquire()方法返回了true,就会执行 parkAndCheckInterrupt()方法了

private final boolean parkAndCheckInterrupt() {
    //真正让线程阻塞的核心方法
    LockSupport.park(this);
    return Thread.interrupted();
}

当主线程挂起之后,只有全部线程执行结束了,才会继续执行,所以我们来分析下 countDownLatch.countDown();

public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

进入tryReleaseShared(arg)方法,是判断state是否等于0的,

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        //第一次进来,因为state=3,所以不会进入if,只有在初始化的时候将state设置成0,
        //或者你有10个资源,但是有11个线程来获取资源,最后一个线程进来的时候也会等于0
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

一直到第三次进来之后,nextc就会等于0,因为一共减了三次1,也就是最后一个线程执行到这里来了,最后返回true,返回true以后就会执行doReleaseShared();方法了

private void doReleaseShared() {
    
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 核心方法,唤醒阻塞线程,这里传入的是头节点  
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }
        if (h == head)                   
            break;
    }
}
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
if (ws 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论