没研究过SynchronousQueue源码,就别写精通线程池

2024年 2月 4日 49.5k 0

引言

前面文章我们讲解了ArrayBlockingQueue和LinkedBlockingQueue源码,这篇文章开始讲解SynchronousQueue源码。从名字上就能看到ArrayBlockingQueue是基于数组实现的,而LinkedBlockingQueue是基于链表实现,而SynchronousQueue是基于什么数据结构实现的,看不来。

无论是ArrayBlockingQueue还是LinkedBlockingQueue都是起到缓冲队列的作用,当消费者的消费速度跟不上时,任务就在队列中堆积,需要等待消费者慢慢消费。

如果我们想要自己的任务快速执行,不要积压在队列中,该怎么办? 今天的主角SynchronousQueue就派上用场了。

SynchronousQueue被称为同步队列,当生产者往队列中放元素的时候,必须等待消费者把这个元素取走,否则一直阻塞。消费者取元素的时候,同理也必须等待生产者放队列中放元素。

由于SynchronousQueue实现了BlockingQueue接口,而BlockingQueue接口中定义了几组放数据和取数据的方法,来满足不同的场景。

操作

抛出异常

返回特定值

一直阻塞

阻塞指定时间

放数据

add()

offer()

put()

offer(e, time, unit)

取数据(同时删除数据)

remove()

poll()

take()

poll(time, unit)

取数据(不删除)

element()

peek()

不支持

不支持

SynchronousQueue也会有针对这几组放数据和取数据方法的具体实现。

Java线程池中的带缓存的线程池就是基于SynchronousQueue实现的:

// 创建带缓存的线程池
ExecutorService executorService = Executors.newCachedThreadPool();

对应的源码实现:

// 底层使用SynchronousQueue队列处理任务
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue());
}

类结构

先看一下SynchronousQueue类里面有哪些属性:

public class SynchronousQueue
        extends AbstractQueue
        implements BlockingQueue, java.io.Serializable {

    /**
     * 转接器(栈和队列的父类)
     */
    abstract static class Transferer {
        
        /**
         * 转移(put和take都用这一个方法)
         *
         * @param e     元素
         * @param timed 是否超时
         * @param nanos 纳秒
         */
        abstract E transfer(E e, boolean timed, long nanos);
        
    }

    /**
     * 栈实现类
     */
    static final class TransferStack extends Transferer {
    }

    /**
     * 队列实现类
     */
    static final class TransferQueue extends Transferer {
    }

}

SynchronousQueue底层是基于Transferer抽象类实现的,放数据和取数据的逻辑都耦合在transfer()方法中。而Transferer抽象类又有两个实现类,分别是基于栈结构实现和基于队列实现。

初始化

SynchronousQueue常用的初始化方法有两个:

  • 无参构造方法
  • 指定容量大小的有参构造方法
  • /**
     * 无参构造方法
     */
    BlockingQueue blockingQueue1 = new SynchronousQueue();
    
    /**
     * 有参构造方法,指定是否使用公平锁(默认使用非公平锁)
     */
    BlockingQueue blockingQueue2 = new SynchronousQueue(true);

    再看一下对应的源码实现:

    /**
     * 无参构造方法
     */
    public SynchronousQueue() {
        this(false);
    }
    
    /**
     * 有参构造方法,指定是否使用公平锁
     */
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue() : new TransferStack();
    }

    可以看出SynchronousQueue的无参构造方法默认使用的非公平策略,有参构造方法可以指定使用公平策略。操作策略:

  • 公平策略,基于队列实现的是公平策略,先进先出。
  • 非公平策略,基于栈实现的是非公平策略,先进后出。
  • 栈实现

    栈的类结构

    /**
     * 栈实现
     */
    static final class TransferStack extends Transferer {
    
        /**
         * 头节点(也是栈顶节点)
         */
        volatile SNode head;
    
        /**
         * 栈节点类
         */
        static final class SNode {
    
            /**
             * 当前操作的线程
             */
            volatile Thread waiter;
    
            /**
             * 节点值(取数据的时候,该字段为null)
             */
            Object item;
    
            /**
             * 节点模式(也叫操作类型)
             */
            int mode;
    
            /**
             * 后继节点
             */
            volatile SNode next;
    
            /**
             * 匹配到的节点
             */
            volatile SNode match;
    
        }
    }

    节点模式有以下三种:

    类型值

    类型描述

    作用

    0

    REQUEST

    表示取数据

    1

    DATA

    表示放数据

    2

    FULFILLING

    表示正在执行中(比如取数据的线程正在匹配放数据的线程)

    图片图片

    栈的transfer方法实现

    transfer()方法中,把放数据和取数据的逻辑耦合在一块了,逻辑有点绕,不过核心逻辑就四点,把握住就能豁然开朗。其实就是从栈顶压入,从栈顶弹出。

    详细流程如下:

  • 首先判断当前线程的操作类型与栈顶节点的操作类型是否一致,比如都是放数据,或者都是取数据。
  • 如果是一致,把当前操作包装成SNode节点,压入栈顶,并挂起当前线程。
  • 如果不一致,表示相互匹配(比如当前操作是放数据,而栈顶节点是取数据,或者相反)。然后也把当前操作包装成SNode节点压入栈顶,并使用tryMatch()方法匹配两个节点,匹配成功后,弹出两个这两个节点,并唤醒栈顶节点线程,同时把数据传递给栈顶节点线程,最后返回。
  • 栈顶节点线程被唤醒,继续执行,然后返回传递过来的数据。
  • /**
     * 转移(put和take都用这一个方法)
     *
     * @param e     元素(取数据的时候,元素为null)
     * @param timed 是否超时
     * @param nanos 纳秒
     */
    E transfer(E e, boolean timed, long nanos) {
        SNode s = null;
        // 1. e为null,表示要取数据,否则是放数据
        int mode = (e == null) ? REQUEST : DATA;
        for (; ; ) {
            SNode h = head;
            // 2. 如果本次操作跟栈顶节点模式相同(都是取数据,或者都是放数据),就把本次操作包装成SNode,压入栈顶
            if (h == null || h.mode == mode) {
                if (timed && nanos  spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanos);
        }
    }

    再看一下匹配节点的tryMatch()方法逻辑: 作用就是唤醒栈顶节点,并当前节点传递给栈顶节点。

    /**
     * 匹配节点
     *
     * @param s 当前节点
     */
    boolean tryMatch(SNode s) {
        if (match == null &&
                UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
            Thread w = waiter;
            if (w != null) {
                waiter = null;
                // 1. 唤醒栈顶节点
                LockSupport.unpark(w);
            }
            return true;
        }
        // 2. 把当前节点传递给栈顶节点
        return match == s;
    }

    队列实现

    队列的类结构

    /**
     * 队列实现
     */
    static final class TransferQueue extends Transferer {
    
        /**
         * 头节点
         */
        transient volatile QNode head;
    
        /**
         * 尾节点
         */
        transient volatile QNode tail;
    
        /**
         * 队列节点类
         */
        static final class QNode {
    
            /**
             * 当前操作的线程
             */
            volatile Thread waiter;
    
            /**
             * 节点值
             */
            volatile Object item;
    
            /**
             * 后继节点
             */
            volatile QNode next;
    
            /**
             * 当前节点是否为数据节点
             */
            final boolean isData;
        }
    }

    可以看出TransferQueue队列是使用带有头尾节点的单链表实现的。 还有一点需要提一下,TransferQueue默认构造方法,会初始化头尾节点,默认是空节点。

    /**
     * TransferQueue默认的构造方法
     */
    TransferQueue() {
        QNode h = new QNode(null, false);
        head = h;
        tail = h;
    }

    队列的transfer方法实现

    队列使用的公平策略,体现在,每次操作的时候,都是从队尾压入,从队头弹出。 详细流程如下:

  • 首先判断当前线程的操作类型与队尾节点的操作类型是否一致,比如都是放数据,或者都是取数据。
  • 如果是一致,把当前操作包装成QNode节点,压入队尾,并挂起当前线程。
  • 如果不一致,表示相互匹配(比如当前操作是放数据,而队尾节点是取数据,或者相反)。然后在队头节点开始遍历,找到与当前操作类型相匹配的节点,把当前操作的节点值传递给这个节点,并弹出这个节点,唤醒这个节点的线程,最后返回。
  • 队头节点线程被唤醒,继续执行,然后返回传递过来的数据。
  • /**
    * 转移(put和take都用这一个方法)
    *
    * @param e 元素(取数据的时候,元素为null)
    * @param timed 是否超时
    * @param nanos 超时时间
    */
    E transfer(E e, boolean timed, long nanos) {
    QNode s = null;
    // 1. e不为null,表示要放数据,否则是取数据
    boolean isData = (e != null);
    for (; ; ) {
    QNode t = tail;
    QNode h = head;
    if (t == null || h == null) {
    continue;
    }

    // 2. 如果本次操作跟队尾节点模式相同(都是取数据,或者都是放数据),就把本次操作包装成QNode,压入队尾
    if (h == t || t.isData == isData) {
    QNode tn = t.next;
    if (t != tail) {
    continue;
    }
    if (tn != null) {
    advanceTail(t, tn);
    continue;
    }
    if (timed && nanos

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论