引言
上篇文章我们讲解了ArrayBlockingQueue源码,这篇文章开始讲解LinkedBlockingQueue源码。从名字上就能看到ArrayBlockingQueue是基于数组实现的,而LinkedBlockingQueue是基于链表实现。
那么,LinkedBlockingQueue底层源码实现是什么样的?跟ArrayBlockingQueue有何不同?
LinkedBlockingQueue的应用场景跟ArrayBlockingQueue有什么不一样?
看完这篇文章,可以轻松解答这些问题。
由于LinkedBlockingQueue实现了BlockingQueue接口,而BlockingQueue接口中定义了几组放数据和取数据的方法,来满足不同的场景。
操作 |
抛出异常 |
返回特定值 |
一直阻塞 |
阻塞指定时间 |
放数据 |
add() |
offer() |
put() |
offer(e, time, unit) |
取数据(同时删除数据) |
remove() |
poll() |
take() |
poll(time, unit) |
取数据(不删除) |
element() |
peek() |
不支持 |
不支持 |
这四组方法的区别是:
LinkedBlockingQueue也会有针对这几组放数据和取数据方法的具体实现。 Java线程池中的固定大小线程池就是基于LinkedBlockingQueue实现的:
# 创建固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
对应的源码实现:
# 底层使用LinkedBlockingQueue队列存储任务
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
类结构
先看一下LinkedBlockingQueue类里面有哪些属性:
public class LinkedBlockingQueue
extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
/**
* 容量大小
*/
private final int capacity;
/**
* 元素个数
*/
private final AtomicInteger count = new AtomicInteger();
/**
* 头节点
*/
transient Node head;
/**
* 尾节点
*/
private transient Node last;
/**
* 取数据的锁
*/
private final ReentrantLock takeLock = new ReentrantLock();
/**
* 取数据的条件(队列非空)
*/
private final Condition notEmpty = takeLock.newCondition();
/**
* 放数据的锁
*/
private final ReentrantLock putLock = new ReentrantLock();
/**
* 放数据的条件(队列非满)
*/
private final Condition notFull = putLock.newCondition();
/**
* 链表节点类
*/
static class Node {
/**
* 节点元素
*/
E item;
/**
* 后继节点
*/
Node next;
Node(E x) {
item = x;
}
}
}
图片
可以看出LinkedBlockingQueue底层是基于链表实现的,定义了头节点head和尾节点last,由链表节点类Node可以看出是个单链表。 发现个问题,ArrayBlockingQueue中只使用了一把锁,入队出队操作共用这把锁。而LinkedBlockingQueue则使用了两把锁,分别是出队锁takeLock和入队锁putLock,为什么要这么设计呢?
LinkedBlockingQueue把两把锁分开,性能更好,为什么ArrayBlockingQueue不这样设计呢?
原因是ArrayBlockingQueue是基于数组实现的,所有数据都存储在同一个数组对象里面,对同一个对象没办法使用两把锁,会有数据可见性的问题。而LinkedBlockingQueue底层是基于链表实现的,从头节点删除,尾节点插入,头尾节点分别是两个对象,可以分别使用两把锁,提升操作性能。
另外也定义了两个条件notEmpty和notFull,当条件满足的时候才允许放数据或者取数据,下面会详细讲。
初始化
LinkedBlockingQueue常用的初始化方法有两个:
/**
* 无参构造方法
*/
BlockingQueue blockingQueue1 = new LinkedBlockingQueue();
/**
* 指定容量大小的构造方法
*/
BlockingQueue blockingQueue2 = new LinkedBlockingQueue(10);
再看一下对应的源码实现:
/**
* 无参构造方法
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 指定容量大小的构造方法
*/
public LinkedBlockingQueue(int capacity) {
if (capacity = 0;
}
/**
* 入队
*
* @param node 节点
*/
private void enqueue(LinkedBlockingQueue.Node node) {
// 直接追加到链表末尾
last = last.next = node;
}
/**
* 唤醒因为队列为空而等待取数据的线程
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
offer()方法逻辑也很简单,追加元素到链表末尾,如果是第一次添加元素,就唤醒因为队列为空而等待取数据的线程。
再看一下另外三个添加元素方法源码:
add方法源码
add()方法在数组满的时候,会抛出异常,底层基于offer()实现。
/**
* add方法入口
*
* @param e 元素
* @return 是否添加成功
*/
public boolean add(E e) {
if (offer(e)) {
return true;
} else {
throw new IllegalStateException("Queue full");
}
}
put方法源码
put()方法在数组满的时候,会一直阻塞,直到有其他线程取走数据,空出位置,才能添加成功。
/**
* put方法入口
*
* @param e 元素
*/
public void put(E e) throws InterruptedException {
// 1. 判空,传参不允许为null
if (e == null) {
throw new NullPointerException();
}
int c = -1;
Node node = new Node(e);
// 2. 加可中断的锁,防止一直阻塞
final ReentrantLock putLock = this.putLock;
putLock.lockInterruptibly();
final AtomicInteger count = this.count;
try {
// 3. 如果队列已满,就一直阻塞,直到被唤醒
while (count.get() == capacity) {
notFull.await();
}
// 4. 如果队列未满,则直接入队
enqueue(node);
c = count.getAndIncrement();
// 5. 如果队列未满,则唤醒因为队列已满而等待放数据的线程(用来补偿,不加也行)
if (c + 1 < capacity) {
notFull.signal();
}
} finally {
// 6. 释放锁
putLock.unlock();
}
// 7. c等于0,表示插入前,队列为空,是第一次插入,需要唤醒因为队列为空而等待取数据的线程
if (c == 0) {
signalNotEmpty();
}
}
offer(e, time, unit)源码
再看一下offer(e, time, unit)方法源码,在数组满的时候, offer(e, time, unit)方法会阻塞一段时间。
/**
* offer方法入口
*
* @param e 元素
* @param timeout 超时时间
* @param unit 时间单位
* @return 是否添加成功
*/
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 1. 判空,传参不允许为null
if (e == null) {
throw new NullPointerException();
}
// 2. 把超时时间转换为纳秒
long nanos = unit.toNanos(timeout);
int c = -1;
final AtomicInteger count = this.count;
// 2. 加可中断的锁,防止一直阻塞
final ReentrantLock putLock = this.putLock;
putLock.lockInterruptibly();
try {
// 4. 循环判断队列是否已满
while (count.get() == capacity) {
if (nanos 0) {
x = dequeue();
// 4. 元素个数减一
c = count.getAndDecrement();
// 5. 如果队列不为空,则唤醒因为队列为空而等待取数据的线程
if (c > 1) {
notEmpty.signal();
}
}
} finally {
// 6. 释放锁
takeLock.unlock();
}
// 7. 如果取数据之前,队列已满,取数据之后队列肯定不满了,则唤醒因为队列已满而等待放数据的线程
if (c == capacity) {
signalNotFull();
}
return x;
}
/**
* 取出队头元素
*/
private E dequeue() {
Node h = head;
Node first = h.next;
h.next = h;
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* 唤醒因为队列已满而等待放数据的线程
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
remove方法源码
再看一下remove()方法源码,如果队列为空,remove()会抛出异常。
/**
* remove方法入口
*/
public E remove() {
// 1. 直接调用poll方法
E x = poll();
// 2. 如果取到数据,直接返回,否则抛出异常
if (x != null) {
return x;
} else {
throw new NoSuchElementException();
}
}
take方法源码
再看一下take()方法源码,如果队列为空,take()方法就一直阻塞,直到被唤醒。
/**
* take方法入口
*/
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
// 1. 加可中断的锁,防止一直阻塞
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 2. 如果队列为空,就一直阻塞,直到被唤醒
while (count.get() == 0) {
notEmpty.await();
}
// 3. 如果队列不为空,则取出队头元素
x = dequeue();
// 4. 队列元素个数减一
c = count.getAndDecrement();
// 5. 如果队列不为空,则唤醒因为队列为空而等待取数据的线程
if (c > 1) {
notEmpty.signal();
}
} finally {
// 6. 释放锁
takeLock.unlock();
}
// 7. 如果取数据之前,队列已满,取数据之后队列肯定不满了,则唤醒因为队列已满而等待放数据的线程
if (c == capacity) {
signalNotFull();
}
return x;
}
poll(time, unit)源码
再看一下poll(time, unit)方法源码,在队列满的时候, poll(time, unit)方法会阻塞指定时间,然后然后null。
/**
* poll方法入口
*
* @param timeout 超时时间
* @param unit 时间单位
* @return 元素
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
// 1. 把超时时间转换成纳秒
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
// 2. 加可中断的锁,防止一直阻塞
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 3. 循环判断队列是否为空
while (count.get() == 0) {
if (nanos 1) {
notEmpty.signal();
}
} finally {
// 9. 释放锁
takeLock.unlock();
}
// 7. 如果取数据之前,队列已满,取数据之后队列肯定不满了,则唤醒因为队列已满而等待放数据的线程
if (c == capacity) {
signalNotFull();
}
return x;
}
查看数据源码
再看一下查看数据源码,查看数据,并不删除数据。
操作 |
抛出异常 |
返回特定值 |
阻塞 |
阻塞一段时间 |
取数据(不删除) |
element() |
peek() |
不支持 |
不支持 |
peek方法源码
先看一下peek()方法源码,如果数组为空,直接返回null。
/**
* peek方法入口
*/
public E peek() {
// 1. 如果队列为空,则返回null
if (count.get() == 0) {
return null;
}
// 2. 加锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 3. 取出队头元素
Node first = head.next;
if (first == null) {
return null;
} else {
return first.item;
}
} finally {
// 4. 释放锁
takeLock.unlock();
}
}
element方法源码
再看一下element()方法源码,如果队列为空,则抛出异常。
/**
* element方法入口
*/
public E element() {
// 1. 调用peek方法查询数据
E x = peek();
// 2. 如果查到数据,直接返回
if (x != null) {
return x;
} else {
// 3. 如果没找到,则抛出异常
throw new NoSuchElementException();
}
}
总结
这篇文章讲解了LinkedBlockingQueue阻塞队列的核心源码,了解到LinkedBlockingQueue队列具有以下特点:
那么ArrayBlockingQueue与LinkedBlockingQueue区别是什么?相同点:
不同点:
今天一起分析了LinkedBlockingQueue队列的源码,可以看到LinkedBlockingQueue的源码非常简单,没有什么神秘复杂的东西,下篇文章再一起接着分析其他的阻塞队列源码。