没看过ArrayBlockingQueue源码,就别说精通线程池

2023年 12月 28日 72.3k 0

引言

在日常开发中,我们好像很少用到BlockingQueue(阻塞队列),BlockingQueue到底有什么作用?应用场景是什么样的?

如果使用过线程池或者阅读过线程池源码,就会知道线程池的核心功能都是基于BlockingQueue实现的。

大家用过消息队列(MessageQueue),就知道消息队列作用是解耦、异步、削峰。同样BlockingQueue的作用也是这三种,区别是BlockingQueue只作用于本机器,而消息队列相当于分布式BlockingQueue。

BlockingQueue作为阻塞队列,主要应用于生产者-消费者模式的场景,在并发多线程中尤其常用。

  • 比如像线程池中的任务调度场景,提交任务和拉取并执行任务。
  • 生产者与消费者解耦的场景,生产者把数据放到队列中,消费者从队列中取数据进行消费。两者进行解耦,不用感知对方的存在。
  • 应对突发流量的场景,业务高峰期突然来了很多请求,可以放到队列中缓存起来,消费者以正常的频率从队列中拉取并消费数据,起到削峰的作用。
  • BlockingQueue是个接口,定义了几组放数据和取数据的方法,来满足不同的场景。

    操作

    抛出异常

    返回特定值

    阻塞

    阻塞一段时间

    放数据

    add()

    offer()

    put()

    offer(e, time, unit)

    取数据(同时删除数据)

    remove()

    poll()

    take()

    poll(time, unit)

    取数据(不删除)

    element()

    peek()

    不支持

    不支持

    BlockingQueue有5个常见的实现类,应用场景不同。

    • ArrayBlockingQueue

    基于数组实现的阻塞队列,创建队列时需指定容量大小,是有界队列。

    • LinkedBlockingQueue

    基于链表实现的阻塞队列,默认是无界队列,创建可以指定容量大小

    • SynchronousQueue

    一种没有缓冲的阻塞队列,生产出的数据需要立刻被消费

    • PriorityBlockingQueue

    实现了优先级的阻塞队列,基于数据显示,是无界队列

    • DelayQueue

    实现了延迟功能的阻塞队列,基于PriorityQueue实现的,是无界队列

    今天重点讲一下ArrayBlockingQueue的底层实现原理,在接下来的文章中再讲一下其他队列实现。

    ArrayBlockingQueue类结构

    先看一下ArrayBlockingQueue类里面有哪些属性:

    public class ArrayBlockingQueue
            extends AbstractQueue
            implements BlockingQueue, java.io.Serializable {
    
    
        /**
         * 用来存放数据的数组
         */
        final Object[] items;
    
        /**
         * 下次取数据的数组下标位置
         */
        int takeIndex;
    
        /**
         * 下次放数据的数组下标位置
         */
        int putIndex;
    
        /**
         * 元素个数
         */
        int count;
    
        /**
         * 独占锁,用来保证存取数据安全
         */
        final ReentrantLock lock;
    
        /**
         * 取数据的条件
         */
        private final Condition notEmpty;
    
        /**
         * 放数据的条件
         */
        private final Condition notFull;
    
    }

    可以看出ArrayBlockingQueue底层是基于数组实现的,使用对象数组items存储元素。为了实现队列特性(一端插入,另一端删除),定义了两个指针,takeIndex表示下次取数据的位置,putIndex表示下次放数据的位置。 另外ArrayBlockingQueue还使用ReentrantLock保证线程安全,并且定义了两个条件,当条件满足的时候才允许放数据或者取数据,下面会详细讲。

    初始化

    ArrayBlockingQueue常用的初始化方法有两个:

  • 指定容量大小
  • 指定容量大小和是否是公平锁
  • /**
     * 指定容量大小的构造方法
     */
    BlockingQueue blockingDeque1 = new ArrayBlockingQueue(1);
    /**
     * 指定容量大小、公平锁的构造方法
     */
    BlockingQueue blockingDeque1 = new ArrayBlockingQueue(1, true);

    再看一下对应的源码实现:

    /**
    * 指定容量大小的构造方法(默认是非公平锁)
    */
    public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
    }

    /**
    * 指定容量大小、公平锁的构造方法
    *
    * @param capacity 数组容量
    * @param fair 是否是公平锁
    */
    public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论