引言
前面文章我们讲解了ArrayBlockingQueue和LinkedBlockingQueue源码,这篇文章开始讲解SynchronousQueue源码。从名字上就能看到ArrayBlockingQueue是基于数组实现的,而LinkedBlockingQueue是基于链表实现,而SynchronousQueue是基于什么数据结构实现的,看不来。
无论是ArrayBlockingQueue还是LinkedBlockingQueue都是起到缓冲队列的作用,当消费者的消费速度跟不上时,任务就在队列中堆积,需要等待消费者慢慢消费。
如果我们想要自己的任务快速执行,不要积压在队列中,该怎么办? 今天的主角SynchronousQueue就派上用场了。
SynchronousQueue被称为同步队列,当生产者往队列中放元素的时候,必须等待消费者把这个元素取走,否则一直阻塞。消费者取元素的时候,同理也必须等待生产者放队列中放元素。
由于SynchronousQueue实现了BlockingQueue接口,而BlockingQueue接口中定义了几组放数据和取数据的方法,来满足不同的场景。
操作 |
抛出异常 |
返回特定值 |
一直阻塞 |
阻塞指定时间 |
放数据 |
add() |
offer() |
put() |
offer(e, time, unit) |
取数据(同时删除数据) |
remove() |
poll() |
take() |
poll(time, unit) |
取数据(不删除) |
element() |
peek() |
不支持 |
不支持 |
SynchronousQueue也会有针对这几组放数据和取数据方法的具体实现。
Java线程池中的带缓存的线程池就是基于SynchronousQueue实现的:
// 创建带缓存的线程池
ExecutorService executorService = Executors.newCachedThreadPool();
对应的源码实现:
// 底层使用SynchronousQueue队列处理任务
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
类结构
先看一下SynchronousQueue类里面有哪些属性:
public class SynchronousQueue
extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
/**
* 转接器(栈和队列的父类)
*/
abstract static class Transferer {
/**
* 转移(put和take都用这一个方法)
*
* @param e 元素
* @param timed 是否超时
* @param nanos 纳秒
*/
abstract E transfer(E e, boolean timed, long nanos);
}
/**
* 栈实现类
*/
static final class TransferStack extends Transferer {
}
/**
* 队列实现类
*/
static final class TransferQueue extends Transferer {
}
}
SynchronousQueue底层是基于Transferer抽象类实现的,放数据和取数据的逻辑都耦合在transfer()方法中。而Transferer抽象类又有两个实现类,分别是基于栈结构实现和基于队列实现。
初始化
SynchronousQueue常用的初始化方法有两个:
/**
* 无参构造方法
*/
BlockingQueue blockingQueue1 = new SynchronousQueue();
/**
* 有参构造方法,指定是否使用公平锁(默认使用非公平锁)
*/
BlockingQueue blockingQueue2 = new SynchronousQueue(true);
再看一下对应的源码实现:
/**
* 无参构造方法
*/
public SynchronousQueue() {
this(false);
}
/**
* 有参构造方法,指定是否使用公平锁
*/
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue() : new TransferStack();
}
可以看出SynchronousQueue的无参构造方法默认使用的非公平策略,有参构造方法可以指定使用公平策略。操作策略:
栈实现
栈的类结构
/**
* 栈实现
*/
static final class TransferStack extends Transferer {
/**
* 头节点(也是栈顶节点)
*/
volatile SNode head;
/**
* 栈节点类
*/
static final class SNode {
/**
* 当前操作的线程
*/
volatile Thread waiter;
/**
* 节点值(取数据的时候,该字段为null)
*/
Object item;
/**
* 节点模式(也叫操作类型)
*/
int mode;
/**
* 后继节点
*/
volatile SNode next;
/**
* 匹配到的节点
*/
volatile SNode match;
}
}
节点模式有以下三种:
类型值 |
类型描述 |
作用 |
0 |
REQUEST |
表示取数据 |
1 |
DATA |
表示放数据 |
2 |
FULFILLING |
表示正在执行中(比如取数据的线程正在匹配放数据的线程) |
图片
栈的transfer方法实现
transfer()方法中,把放数据和取数据的逻辑耦合在一块了,逻辑有点绕,不过核心逻辑就四点,把握住就能豁然开朗。其实就是从栈顶压入,从栈顶弹出。
详细流程如下:
/**
* 转移(put和take都用这一个方法)
*
* @param e 元素(取数据的时候,元素为null)
* @param timed 是否超时
* @param nanos 纳秒
*/
E transfer(E e, boolean timed, long nanos) {
SNode s = null;
// 1. e为null,表示要取数据,否则是放数据
int mode = (e == null) ? REQUEST : DATA;
for (; ; ) {
SNode h = head;
// 2. 如果本次操作跟栈顶节点模式相同(都是取数据,或者都是放数据),就把本次操作包装成SNode,压入栈顶
if (h == null || h.mode == mode) {
if (timed && nanos spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
再看一下匹配节点的tryMatch()方法逻辑: 作用就是唤醒栈顶节点,并当前节点传递给栈顶节点。
/**
* 匹配节点
*
* @param s 当前节点
*/
boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) {
waiter = null;
// 1. 唤醒栈顶节点
LockSupport.unpark(w);
}
return true;
}
// 2. 把当前节点传递给栈顶节点
return match == s;
}
队列实现
队列的类结构
/**
* 队列实现
*/
static final class TransferQueue extends Transferer {
/**
* 头节点
*/
transient volatile QNode head;
/**
* 尾节点
*/
transient volatile QNode tail;
/**
* 队列节点类
*/
static final class QNode {
/**
* 当前操作的线程
*/
volatile Thread waiter;
/**
* 节点值
*/
volatile Object item;
/**
* 后继节点
*/
volatile QNode next;
/**
* 当前节点是否为数据节点
*/
final boolean isData;
}
}
可以看出TransferQueue队列是使用带有头尾节点的单链表实现的。 还有一点需要提一下,TransferQueue默认构造方法,会初始化头尾节点,默认是空节点。
/**
* TransferQueue默认的构造方法
*/
TransferQueue() {
QNode h = new QNode(null, false);
head = h;
tail = h;
}
队列的transfer方法实现
队列使用的公平策略,体现在,每次操作的时候,都是从队尾压入,从队头弹出。 详细流程如下:
/**
* 转移(put和take都用这一个方法)
*
* @param e 元素(取数据的时候,元素为null)
* @param timed 是否超时
* @param nanos 超时时间
*/
E transfer(E e, boolean timed, long nanos) {
QNode s = null;
// 1. e不为null,表示要放数据,否则是取数据
boolean isData = (e != null);
for (; ; ) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) {
continue;
}
// 2. 如果本次操作跟队尾节点模式相同(都是取数据,或者都是放数据),就把本次操作包装成QNode,压入队尾
if (h == t || t.isData == isData) {
QNode tn = t.next;
if (t != tail) {
continue;
}
if (tn != null) {
advanceTail(t, tn);
continue;
}
if (timed && nanos