阻塞队列知多少?
1、BlockingQueue初探
阻塞队列(BlockingQueue
)被广泛使用在 “生产者-消费者” 问题中,其原因是 BlockingQueue
提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空!
首先,最基本的来说, BlockingQueue 是一个先进先出的队列(Queue),为什么说是阻塞(Blocking)的呢?是因为 BlockingQueue 支持当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。
BlockingQueue 只是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口! 提供的操作总结如下:
Operation | Throws exception | Special value | Block | Time out |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
获取 | element() | peek() | — | — |
BlockingQueue继承于Queue接口,因此基本操作有:
插入元素
- add(E e) :往队列插入数据,当队列满时,插入元素时会抛出IllegalStateException异常
- offer(E e):当往队列插入数据时,插入成功返回
true
,否则则返回false
。当队列满时不会抛出异常
删除元素
- remove(Object o):从队列中删除数据,成功则返回
true
,否则为false
- poll:删除数据,当队列为空时,返回null
查看元素
- element:获取队头元素,如果队列为空时则抛出NoSuchElementException异常
- peek:获取队头元素,如果队列为空则抛出NoSuchElementException异常
BlockingQueue具有的特殊操作有:
插入数据
- put:当阻塞队列容量已经满时,往阻塞队列插入数据的线程会被阻塞,直至阻塞队列已经有空余的容量可供使用
- offer(E e, long timeout, TimeUnit unit):若阻塞队列已经满时,同样会阻塞插入数据的线程,直至阻塞队列已经有空余的地方,与put方法不同的是,该方法会有一个超时时间,若超过当前给定的超时时间,插入数据的线程会退出
删除数据
- take:当阻塞队列为空时,获取队头数据的线程会被阻塞
- poll(long timeout, TimeUnit unit):当阻塞队列为空时,获取数据的线程会被阻塞,另外,如果被阻塞的线程超过了给定的时长,该线程会退出
BlockingQueue 不接受 null 值的插入! 相应的插入方法在碰到 null 的插入时会抛出 NullPointerException 异常。null 值在这里通常用于作为特殊值返回(表格中的第三列),代表 poll 失败。所以,如果允许插入 null 值的话,那获取的时候,就不能很好地用 null 来判断到底是代表插入失败,还是获取的值就是 null 值!
一个 BlockingQueue 可能是有界的! 如果在插入的时候,发现队列满了,那么插入操作将会阻塞。通常,在这里我们说的无界队列也不是说真正的无界 ,而是它的容量是 Integer.MAX_VALUE(21亿多)!
BlockingQueue 的实现都是线程安全的! 但是批量的集合操作如 addAll
, containsAll
, retainAll
和 removeAll
不一定能保证原子操作!如 addAll(c) 有可能在添加了一些元素后中途抛出异常,此时 BlockingQueue 中已经添加了部分元素,不会因为异常而将这些已添加的元素再移除!
BlockingQueue 的各个实现(ArrayBlockingQueue、LinkedBlockingQueue 、SynchronousQueue、PriorityBlockingQueue等
)都遵循了这些规则,当然我们也不用死记这个表格,知道有这么回事,然后写代码的时候根据自己的需要去看方法的注释来选取合适的方法即可!
2、ArrayBlockingQueue
ArrayBlockingQueue
是 BlockingQueue
接口的有界队列 实现类,底层采用数组 来实现!
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable{}
ArrayBlockingQueue
一旦创建,容量不能改变(因为它是基于数组实现的,也就具有数组的特性: 一旦初始化,大小就无法修改)!
它的并发控制采用了可重入锁 ReentrantLock
,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。
当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞!
我们用1个示意图来描述其同步机制:
ArrayBlockingQueue 实现并发同步的原理就是,读操作和写操作都需要获取到 AQS 独占锁 才能进行操作!
如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。
如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。
对于 ArrayBlockingQueue,我们可以在构造的时候指定以下3个参数:
队列容量:其限制了队列中最多允许的元素个数
指定独占锁的公平性:默认是非公平锁,非公平锁的吞吐量比较高;公平锁可以保证每次都是等待最久的线程获取到锁
可以指定用一个集合:将此集合中的元素在构造方法期间就先添加到队列中
ArrayBlockingQueue初始化示例1:
//初始化时指定容量大小为10,即有界队列
//默认保持非公平性
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
ArrayBlockingQueue初始化示例2:
//初始化时指定容量大小为10,即有界队列
//true表示保持公平性
//初始化放入时先队列中的集合
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10, true, new ArrayList<>(Arrays.asList(1, 2, 3)));
3、LinkedBlockingQueue
LinkedBlockingQueue
底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列 来使用,同样满足 FIFO 的特性!
它与 ArrayBlockingQueue
相比起来,具有更高的吞吐量,为了防止 LinkedBlockingQueue
容量迅速增,损耗大量内存,通常在创建 LinkedBlockingQueue
对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE(21亿多)!
来看它的两个构造方法:
// 传说中的无界队列
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 传说中的有界队列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
初始化示例:
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>(); //初始化时不指定容量大小,即无界队列
BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024); //初始化时指定容量大小,即有界队列
我们看看这个类有哪些属性:
// 队列容量
private final int capacity;
// 队列中的元素数量
private final AtomicInteger count = new AtomicInteger(0);
// 队头
private transient Node<E> head;
// 队尾
private transient Node<E> last;
// take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();
// 如果读操作的时候队列是空的,那么等待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();
// put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();
// 如果写操作的时候队列是满的,那么等待 notFull 条件
private final Condition notFull = putLock.newCondition();
这里用了两个ReentrantLock、两个 Condition:
takeLock 和 notEmpty 怎么搭配: 如果要获取(take)一个元素,需要获取 takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。
putLock 需要和 notFull 搭配: 如果要插入(put)一个元素,需要获取 putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)。
这里用一个示意图来看看 LinkedBlockingQueue 的并发读写控制,然后再进行源码分析:
读操作是排好队的,写操作也是排好队的,唯一的并发问题在于一个写操作和一个读操作同时进行,只要控制好这个(通过前面提到的两个ReentrantLock、两个 Condition配合使用达到控制目的)就可以了!
我们来看下 put 方法是怎么将元素插入到队尾的:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 如果你纠结这里为什么是 -1,可以看看 offer 方法。这就是个标识成功、失败的标志而已。
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 必须要获取到 putLock 才可以进行插入操作
putLock.lockInterruptibly();
try {
// 如果队列满,等待 notFull 的条件满足。
while (count.get() == capacity) {
notFull.await();
}
// 入队
enqueue(node);
// count 原子加 1,c 还是加 1 前的值
c = count.getAndIncrement();
// 如果这个元素入队后,还有至少一个槽可以使用,调用 notFull.signal() 唤醒等待线程。
// 哪些线程会等待在 notFull 这个 Condition 上呢?
if (c + 1 < capacity)
notFull.signal();
} finally {
// 入队后,释放掉 putLock
putLock.unlock();
}
// 如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),
// 那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作
if (c == 0)
signalNotEmpty();
}
// 入队的代码非常简单,就是将 last 属性指向这个新元素,并且让原队尾的 next 指向这个元素
// 这里入队没有并发问题,因为只有获取到 putLock 独占锁以后,才可以进行此操作
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
// 元素入队后,如果需要,调用这个方法唤醒读线程来读
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
我们再看看 take 方法时怎么取队头元素的:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 首先,需要获取到 takeLock 才能进行出队操作
takeLock.lockInterruptibly();
try {
// 如果队列为空,等待 notEmpty 这个条件满足再继续执行
while (count.get() == 0) {
notEmpty.await();
}
// 出队
x = dequeue();
// count 进行原子减 1
c = count.getAndDecrement();
// 如果这次出队后,队列中至少还有一个元素,那么调用 notEmpty.signal() 唤醒其他的读线程
if (c > 1)
notEmpty.signal();
} finally {
// 出队后释放掉 takeLock
takeLock.unlock();
}
// 如果 c == capacity,那么说明在这个 take 方法发生的时候,队列是满的
// 既然出队了一个,那么意味着队列不满了,唤醒写线程去写
if (c == capacity)
signalNotFull();
return x;
}
// 取队头,出队
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
// 之前说了,头结点是空的
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
// 设置这个为新的头结点
head = first;
E x = first.item;
first.item = null;
return x;
}
// 元素出队后,如果需要,调用这个方法唤醒写线程来写
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
源码分析就到这里结束了,毕竟还是比较简单的源码,基本上只要读者认真点都看得懂!
4、SynchronousQueue
SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。 据此,把这个类称作一个队列显然是夸大其词了,它更多像是一个汇合点!
它是一个特殊的队列,它的名字其实就蕴含了它的特征—同步的队列。为什么说是同步的呢?这里说的并不是多线程的并发问题,而是因为当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作。这里的 Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。
我们比较少使用到 SynchronousQueue 这个类,不过它在线程池的实现类 ThreadPoolExecutor 中得到了应用,感兴趣的读者可以自行上网搜索相关文章!
虽然说 SynchronousQueue 是队列,但它其实是虚队列,因为其不提供任何空间(一个都没有)来存储元素。数据只能从某个写线程交给另外一个读线程,而不是写到某个队列中等待被消费!
你不能调用 SynchronousQueue的 peek 方法(会直接返回 null),peek 方法的语义是只读取不移除,显然,peek 方法的语义是不符合 SynchronousQueue 的特征!
SynchronousQueue 也不能被迭代,因为根本就没有元素可以拿来迭代!
虽然 SynchronousQueue 间接地实现了 Collection 接口,但是如果你将其当做 Collection 来用的话,那么集合是空的!
当然,这个类也是不允许传递 null 值 (并发包中的容器类好像都不支持插入 null 值,因为 null 值往往用作其他用途,比如用于方法的返回值代表操作失败)!
SynchronousQueue 的使用方式举例:
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueExample {
public static SynchronousQueue<Integer> queue = new SynchronousQueue<>();
public static void main(String[] args) {
// 线程1往队列中插入元素
new Thread(() -> {
try {
int element = 42;
System.out.println("Thread 1 putting: " + element);
queue.put(element); // 等待另一个线程删除元素
System.out.println("Thread 1 put done");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 线程2从队列中删除元素
new Thread(() -> {
try {
System.out.println("Thread 2 taking...");
int element = queue.take(); // 等待另一个线程插入元素
System.out.println("Thread 2 took: " + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
这个示例创建了一个SynchronousQueue
对象,线程1向队列插入元素,线程2从队列中删除元素。由于SynchronousQueue
的特性,线程1和线程2都会被阻塞,直到另一个线程执行相反的操作。这种方式可以用于实现线程间的一对一的数据交换!
总结: SynchronousQueue内部采用的就是ArrayBlockingQueue的阻塞原语,所以在功能上完全可以用ArrayBlockingQueue替换之,但是SynchronousQueue 是轻量级的,SynchronousQueue 不具有任何内部容量,我们可以用来在线程间安全的交换单一元素。所以功能比较单一,优势应该就在于轻量吧!
5、PriorityBlockingQueue
PriorityBlockingQueue
是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo()
方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator
来指定排序规则。
PriorityBlockingQueue
并发控制采用的是可重入锁 ReentrantLock
,队列为无界队列。
ArrayBlockingQueue
是有界队列,LinkedBlockingQueue
也可以通过在构造函数中传入capacity
指定队列最大的容量,但是PriorityBlockingQueue
只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话它会自动扩容!
简单地说,它就是 PriorityQueue
的线程安全版本。
不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException
异常!
它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)!
同时注意,如果你从一个 PriorityBlockingQueue 获得一个 Iterator 的话,使用 Iterator遍历该队列,遍历的顺序不会按照优先级顺序,并且顺序是不确定的,可能会是任意顺序!
PriorityBlockingQueue 使用了基于数组的**二叉堆(完全二叉树)**来存放元素,所有的 public 方法采用同一个 lock 进行并发控制。
二叉堆中最小的值就是根节点,但是删除根节点是比较麻烦的,因为需要调整树结构!
用个图解释一下什么是二叉堆,这种数据结构的优点是一目了然的,最小的元素一定是根元素,它是一棵满的树,除了最后一层未必是满的,但是最后一层的节点从左到右紧密排列:
PriorityBlockingQueue 的使用举例:
import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
class Person {
private String name;
private int age;
public Person(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
}
class PersonComparator implements Comparator<Person> {
@Override
public int compare(Person person1, Person person2) {
return person1.getAge() - person2.getAge();
}
}
public class CustomPriorityBlockingQueueExample {
public static void main(String[] args) {
PriorityBlockingQueue<Person> queue = new PriorityBlockingQueue<>(3, new PersonComparator());
// 插入元素
queue.offer(new Person("Alice", 25));
queue.offer(new Person("Bob", 20));
queue.offer(new Person("Charlie", 30);
// 从队列中取出并打印元素(按年龄从小到大排序)
while (!queue.isEmpty()) {
Person person = queue.poll();
System.out.println("Person taken: " + person.getName() + " (age: " + person.getAge() + ")");
}
}
}
输出结果:
Person taken: Bob (age: 20)
Person taken: Alice (age: 25)
Person taken: Charlie (age: 30)
6、总结
ArrayBlockingQueue 底层是数组,有界队列,如果我们遇到生产者-消费者场景 ,这是非常好的选择。
- 生产者-消费者模式:常用来实现生产者-消费者模式,生产者线程向队列中放入元素,消费者线程从队列中取出元素进行处理。
- 线程池任务队列:在线程池中使用
ArrayBlockingQueue
作为任务队列,控制任务的执行顺序和数量,避免任务堆积过多。- 固定大小的任务缓冲区:在并发编程中,有时需要使用固定大小的任务缓冲区,可以选择
ArrayBlockingQueue
来实现。
LinkedBlockingQueue 底层是链表,可以当做无界和有界队列来使用,所以大家不要以为它就是无界队列。
- 生产者-消费者模式:
LinkedBlockingQueue
常用于实现生产者-消费者模式,生产者线程将数据放入队列,消费者线程从队列中取出数据进行处理。- 限流、任务调度:可以利用
LinkedBlockingQueue
的阻塞特性来实现系统的流量控制,如果队列已满则阻塞新任务的加入,以避免系统过载。- 事件驱动系统:在事件驱动的系统中,可以使用
LinkedBlockingQueue
来作为事件队列,将事件按顺序放入队列,并由后台线程依次处理事件。
SynchronousQueue 本身不带有空间来存储任何元素,可以用于实现线程间的一对一的数据交换。
- 生产者-消费者模式:由于
SynchronousQueue
没有存储的能力,适合用于在生产者和消费者之间一对一传递数据的场景。- 异步消息传递:可以用于异步消息传递机制,一个线程发送消息到
SynchronousQueue
,另一个线程接收处理消息。- 线程协作:可以用于线程之间的协作,实现一种同步的线程通信方式,例如确保某个线程在另一个线程完成某项操作之后才能继续执行。
PriorityBlockingQueue 是无界队列,基于数组,数据结构为二叉堆,数组第一个也是树的根节点总是最小值。
- 任务调度: 可以用于实现任务调度,根据任务的优先级来进行调度。
- 事件处理: 当需要按照事件的优先级来处理事件时,可以使用
PriorityBlockingQueue
来实现事件处理。- 数据结构排序: 适用于需要按照一定规则对数据进行排序的场景,可以使用
PriorityBlockingQueue
来维护有序的数据集合。