引言
在日常开发中,我们好像很少用到BlockingQueue(阻塞队列),BlockingQueue到底有什么作用?应用场景是什么样的?
如果使用过线程池或者阅读过线程池源码,就会知道线程池的核心功能都是基于BlockingQueue实现的。
大家用过消息队列(MessageQueue),就知道消息队列作用是解耦、异步、削峰。同样BlockingQueue的作用也是这三种,区别是BlockingQueue只作用于本机器,而消息队列相当于分布式BlockingQueue。
BlockingQueue作为阻塞队列,主要应用于生产者-消费者模式的场景,在并发多线程中尤其常用。
BlockingQueue是个接口,定义了几组放数据和取数据的方法,来满足不同的场景。
操作 |
抛出异常 |
返回特定值 |
阻塞 |
阻塞一段时间 |
放数据 |
add() |
offer() |
put() |
offer(e, time, unit) |
取数据(同时删除数据) |
remove() |
poll() |
take() |
poll(time, unit) |
取数据(不删除) |
element() |
peek() |
不支持 |
不支持 |
BlockingQueue有5个常见的实现类,应用场景不同。
- ArrayBlockingQueue
基于数组实现的阻塞队列,创建队列时需指定容量大小,是有界队列。
- LinkedBlockingQueue
基于链表实现的阻塞队列,默认是无界队列,创建可以指定容量大小
- SynchronousQueue
一种没有缓冲的阻塞队列,生产出的数据需要立刻被消费
- PriorityBlockingQueue
实现了优先级的阻塞队列,基于数据显示,是无界队列
- DelayQueue
实现了延迟功能的阻塞队列,基于PriorityQueue实现的,是无界队列
今天重点讲一下ArrayBlockingQueue的底层实现原理,在接下来的文章中再讲一下其他队列实现。
ArrayBlockingQueue类结构
先看一下ArrayBlockingQueue类里面有哪些属性:
public class ArrayBlockingQueue
extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
/**
* 用来存放数据的数组
*/
final Object[] items;
/**
* 下次取数据的数组下标位置
*/
int takeIndex;
/**
* 下次放数据的数组下标位置
*/
int putIndex;
/**
* 元素个数
*/
int count;
/**
* 独占锁,用来保证存取数据安全
*/
final ReentrantLock lock;
/**
* 取数据的条件
*/
private final Condition notEmpty;
/**
* 放数据的条件
*/
private final Condition notFull;
}
可以看出ArrayBlockingQueue底层是基于数组实现的,使用对象数组items存储元素。为了实现队列特性(一端插入,另一端删除),定义了两个指针,takeIndex表示下次取数据的位置,putIndex表示下次放数据的位置。 另外ArrayBlockingQueue还使用ReentrantLock保证线程安全,并且定义了两个条件,当条件满足的时候才允许放数据或者取数据,下面会详细讲。
初始化
ArrayBlockingQueue常用的初始化方法有两个:
/**
* 指定容量大小的构造方法
*/
BlockingQueue blockingDeque1 = new ArrayBlockingQueue(1);
/**
* 指定容量大小、公平锁的构造方法
*/
BlockingQueue blockingDeque1 = new ArrayBlockingQueue(1, true);
再看一下对应的源码实现:
/**
* 指定容量大小的构造方法(默认是非公平锁)
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* 指定容量大小、公平锁的构造方法
*
* @param capacity 数组容量
* @param fair 是否是公平锁
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity