Java 中常见并发容器最强总结
1、常见的并发容器有哪些?
JDK 提供的并发容器大部分在 java.util.concurrent
包中:
ConcurrentHashMap
: 线程安全的HashMap
。CopyOnWriteArrayList
: 线程安全的List
,在读多写少的场合性能非常好,远远好于Vector
。ConcurrentLinkedQueue
: 非阻塞队列,高效的并发队列,使用链表实现,可以看做是线程安全的LinkedList
。ConcurrentSkipListMap
: 跳表的实现,这是一个 Map,使用跳表的数据结构进行快速查找。BlockingQueue
: 阻塞队列,这是一个接口,ArrayBlockingQueue、LinkedBlockingQueue等都是它的实现,非常适合作为数据共享的容器。
接下来挨个儿介绍!
2、ConcurrentHashMap
我们知道 HashMap
不是线程安全的!
在并发场景下如果要保证一种可行的方式是使用 Collections.synchronizedMap()
方法来包装我们的 HashMap
;
但这是通过使用一个 全局锁 来同步不同线程间的并发访问,因此会带来不可忽视的性能问题!
所以就有了 HashMap
的线程安全版本—— ConcurrentHashMap
的诞生!
重点: 在 ConcurrentHashMap
中,无论是读操作还是写操作都能保证很高的性能:在读操作时(几乎)不需要加锁,而在写操作时通过 锁技术 只对所操作的数据加锁而不影响客户端对其它数据的访问!
JDK1.7中的锁: 在 JDK1.7 的时候,ConcurrentHashMap
对整个桶数组进行了分割分段(Segment
,分段锁),每一把锁只锁容器其中一个段的数据(下面有示意图)而不影响客户端对其它段的访问,因此多线程访问容器里不同数据段的数据就不会存在锁竞争,提高并发访问率!
JDK1.8中的锁: 到了 JDK1.8 的时候,ConcurrentHashMap
已经摒弃了 Segment
的概念,而是直接用 Node 数组+链表+红黑树
的数据结构来替代Segment,并发控制使用 synchronized
和 CAS
来实现(JDK1.6 以后的 synchronized
锁做了很多优化),因此整体看起来就像是优化过且线程安全的 HashMap
!
关于 ConcurrentHashMap
的详细介绍,前文已经介绍过、此处不再赘述:ConcurrentHashMap及其常见面试题
案例1:遍历的同时删除元素
说明:ConcurrentHashMap 线程安全的,允许一边更新、一边遍历,也就是说在对象遍历的时候,也可以进行remove、put操作,且遍历的数据会随着remove、put操作而产生变化!
使用HashMap进行遍历的同时删除:
public class ConcurrentHashMapDemo {
public static void main(String[] args) {
Map<String, Integer> map = new HashMap<>();
map.put("a", 1);
map.put("b", 1);
map.put("c", 1);
for (Map.Entry<String, Integer> entry : map.entrySet()) {
map.remove(entry.getKey());
}
System.out.println(map.size());
}
}
HashMap不能一边遍历一边更新,否则报异常ConcurrentModificationException
[
使用ConcurrentHashMap进行遍历的同时删除:
public class ConcurrentHashMapDemo1 {
public static void main(String[] args) {
Map<String, Integer> map = new HashMap<>();
Map<String, Integer> map = new ConcurrentHashMap<>();
map.put("a", 1);
map.put("b", 1);
map.put("c", 1);
for (Map.Entry<String, Integer> entry : map.entrySet()) {
map.remove(entry.getKey());
}
System.out.println(map.size());
}
}
而ConcurrentHashMap不会报并发修改异常(ConcurrentModificationException
),输出结果为0!
案例2:多线程环境下更新数据
说明:线程安全的容器,特别是Map,很多情况下一个业务中涉及容器的操作有多个(get、put、remove),即复合操作,而在并发执行时,线程安全的容器只能保证自身的数据不被破坏和 数据在多个线程间是可见的,但无法保证业务的行为是否正确,即 ConcurrentHashMap多线程操作不能保证数据同步!
以下分别使用HashMap、ConcurrentHashMap一边遍历一边 更新 元素,运行了3个线程,理论上最后得到6000:
public class ConcurrentHashMapDemo2 {
public static void main(String[] args) {
// final Map<String, Integer> count = new HashMap<>();
final Map<String, Integer> count = new ConcurrentHashMap<>();
count.put("count", 0);
Runnable task = new Runnable() {
@Override
public void run() {
int value;
for (int i = 0; i < 2000; i++) {
value = count.get("count");
count.put("count", value + 1);
}
}
};
new Thread(task).start();
new Thread(task).start();
new Thread(task).start();
try {
Thread.sleep(1000l);
System.out.println(count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果如下:
使用HashMap
{count=3426}
使用ConcurrentHashMap,也不能保证结果的正确,因为只能保证对容器的操作是没问题的(不报ConcurrentModificationException
等异常),但是不能保证业务的数据没有问题,因为是复和操作且并发执行!
{count=3814}
如果非要在这种情况下保证线程安全问题,就需要加锁,保证读写是同步的:
public class ConcurrentHashMapDemo2 {
public static void main(String[] args) {
// final Map<String, Integer> count = new HashMap<>();
// final Map<String, Integer> count = new Hashtable<>();
final Map<String, Integer> count = new ConcurrentHashMap<>();
count.put("count", 0);
Runnable task = new Runnable() {
@Override
public void run() {
synchronized (count) {
int value;
for (int i = 0; i < 2000; i++) {
value = count.get("count");
count.put("count", value + 1);
}
}
}
};
new Thread(task).start();
new Thread(task).start();
new Thread(task).start();
try {
Thread.sleep(1000l);
System.out.println(count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果如下:
{count=6000}
案例3:多线程环境下删除部分数据
public class ConcurrentHashMapDemo3 {
public static void main(String[] args) {
// final Map<String, Integer> count = new HashMap<>();
final Map<String, Integer> count = new ConcurrentHashMap<>();
for (int i = 0; i < 2000; i++) {
count.put("count" + i, 1);
}
Runnable task1 = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 500; i++) {
count.remove("count" + i);
}
}
};
Runnable task2 = new Runnable() {
@Override
public void run() {
for (int i = 1000; i < 1500; i++) {
count.remove("count" + i);
}
}
};
new Thread(task1).start();
new Thread(task2).start();
try {
Thread.sleep(1000l);
System.out.println(count.size());
} catch (Exception e) {
e.printStackTrace();
}
}
}
使用HashMap
1024
使用ConcurrentHashMap,保证了数据在多线程之间的可见性,因此可以得到正确的结果
1000
案例4:多线程环境下遍历的同时删除不同的数据
public class ConcurrentHashMapDemo4 {
public static void main(String[] args) {
final Map<Integer, Integer> count = new ConcurrentHashMap<>();
for (int i = 0; i < 2000; i++) {
count.put(i, 1);
}
Runnable task1 = new Runnable() {
@Override
public void run() {
Iterator<Map.Entry<Integer, Integer>> it = count.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Integer, Integer> entry = it.next();
if (entry.getKey() < 5) {
count.remove(entry.getKey());
// 多线程遍历的时候删除
System.out.println(entry.getKey());
}
}
}
};
Runnable task2 = new Runnable() {
@Override
public void run() {
Iterator<Map.Entry<Integer, Integer>> it = count.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Integer, Integer> entry = it.next();
if (entry.getKey() >= 1995) {
count.remove(entry.getKey());
// 多线程遍历的时候删除
System.out.println(entry.getKey());
}
}
}
};
new Thread(task1).start();
new Thread(task2).start();
try {
Thread.sleep(1000l);
System.out.println("map中键值对的数量" + count.size());
} catch (Exception e) {
e.printStackTrace();
}
}
}
结果如下:
0
1
2
3
4
1995
1996
1997
1998
1999
map中键值对的数量1990
最后再说说HashMap应用场景:
1、单线程运行环境,并且不需要 遍历时进行添加/删除操作 的场景
这个推荐场景不是硬性条件,通过加入同步控制依然能正常应用HashMap!
ConcurrentHashMap推荐应用场景:
1、多线程环境下一切场景的添加/删除操作
注意 ConcurrentHashMap 多线程环境下的更新场景不能保证数据同步,此时可以加同步代码块进行同步操作!
3、CopyOnWriteArrayList
在 JDK1.5 之前,如果想要使用并发安全的 List
只能选择 Vector
,而 Vector
是一种老旧的集合,已经被淘汰。Vector
对于增删改查等方法基本都加了 synchronized
,这种方式虽然能够保证同步,但这相当于对整个 Vector
加上了一把大锁,使得每个方法执行的时候都要去获得锁,导致性能非常低下!
JDK1.5 引入了 Java.util.concurrent
(JUC)包,其中提供了很多线程安全且并发性能良好的容器,其中唯一的线程安全 List
实现就是 CopyOnWriteArrayList
:
public class CopyOnWriteArrayList<E> extends Object implements List<E>, RandomAccess, Cloneable, Serializable
在很多应用场景中,读操作可能会远远大于写操作!
由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费!
我们应该允许多个线程同时访问 List
的内部数据,毕竟读取操作是安全的!
在Java的并发编程中,CopyOnWriteArrayList是一个线程安全的实现了List接口的类。它通过每次写操作(如增加、修改或删除元素)时创建并使用底层数组的副本来保证数据的一致性。这个类主要适用于读多写少的场景,其中读操作可以获得较高的性能,并且不会阻塞写操作!
适用场景:
读多写少的情况:当应用程序的读操作频率远高于写操作时,CopyOnWriteArrayList是一个很好的选择,因为每次修改操作都会创建一个底层数组的副本,从而避免了读取操作受到写入操作的干扰。
保证数据一致性:CopyOnWriteArrayList适用于需要保证读取操作获取到最新数据的场景。由于每个线程都在自己的副本上进行操作,因此不存在读取过程中数据被其他线程修改的问题。
为了将读取的性能发挥到极致,CopyOnWriteArrayList
读取是完全没加锁的! 更厉害的是:写也不会阻塞读操作;只有 写写 需要进行同步等待。这样一来,读操作的性能就会大幅度提升。那它是怎么做到的呢?
重点:CopyOnWriteArrayList
类的所有可变操作(add、set、remove 等等)都是通过创建 底层数组的新副本 来实现的!
CopyOnWriteArrayList
线程安全的核心在于其采用了 写时复制(Copy-On-Write) 的策略,从 CopyOnWriteArrayList
的名字就能看出了。
所谓 Copy-On-Write
也就是说:如果你想要对一块内存进行修改时,我们不在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后再将指向原来内存的指针指向新的内存,原来的内存就可以被回收掉了!
当需要修改( add
,set
、remove
等操作) CopyOnWriteArrayList
的内容时,不会直接修改原数组,而是会先创建底层数组的副本,对副本数组进行修改,修改完之后再将修改后的数组赋值回去,这样就可以保证写操作不会影响读操作了!
案例1:单线程环境中遍历的同时修改数据
- 使用ArrayList:
public class MyTest {
private static List<String> list = new ArrayList<>();
static {
list.add("aaa");
list.add("bbb");
list.add("ccc");
}
public static void main(String[] args) {
Iterator<String> it = list.iterator();
while (it.hasNext()) {
String next = it.next();
System.out.println(next);
list.add("AA");
}
Iterator<String> it2 = list.iterator();
while (it2.hasNext()) {
String next = it2.next();
System.out.println(next);
}
}
}
结果会发生并发修改异常:
aaa
Exception in thread "main" java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
at java.util.ArrayList$Itr.next(ArrayList.java:859)
at com.leetcode.MyTest.main(MyTest.java:29)
Process finished with exit code 1
- 使用CopyOnWriteArrayList:
public class MyTest {
private static List<String> list = new CopyOnWriteArrayList<>();
static {
list.add("aaa");
list.add("bbb");
list.add("ccc");
}
public static void main(String[] args) {
Iterator<String> it = list.iterator();
while (it.hasNext()) {
String next = it.next();
System.out.println(next);
list.add("AA");
}
Iterator<String> it2 = list.iterator();
while (it2.hasNext()) {
String next = it2.next();
System.out.println(next);
}
}
}
使用CopyOnWriteArrayList可以正常的输出结果,并且可以看到保证了读取时数据的一致性(即读不受写的影响):
aaa
bbb
ccc
aaa
bbb
ccc
AA
AA
AA
Process finished with exit code 0
案例2:多线程环境中修改数据
public class MyTest {
private static CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
public static void main(String[] args) throws InterruptedException {
// 添加元素
list.add("Apple");
list.add("Banana");
list.add("Orange");
Runnable reader = () -> {
for (String item : list) {
System.out.println(Thread.currentThread().getName() + ": " + item);
}
};
Runnable writer = () -> {
list.add("Mango");
list.remove("Apple");
};
// 启动多个读线程和一个写线程
Thread readerThread1 = new Thread(reader);
Thread readerThread2 = new Thread(reader);
Thread writerThread = new Thread(writer);
try {
readerThread1.start();
Thread.sleep(3000);
writerThread.start();
readerThread2.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 等待所有线程执行完毕
try {
readerThread1.join();
writerThread.join();
readerThread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
可以看到,在readerThread1执行完毕后执行了writerThread,readerThread2也拿到了正确结果:
Thread-0: Apple
Thread-0: Banana
Thread-0: Orange
Thread-1: Banana
Thread-1: Orange
Thread-1: Mango
Process finished with exit code 0
最后再说说Vector、ArrayList、CopyOnWriteArrayList,这三个集合类都继承List接口:
- ArrayList是线程不安全的
- Vector是比较古老的线程安全的,但性能不行
- CopyOnWriteArrayList在兼顾了线程安全的同时,又提高了并发能力,性能比Vector要高
4、ConcurrentLinkedQueue
Java 提供的线程安全的 Queue
可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是 BlockingQueue
,非阻塞队列的典型例子是ConcurrentLinkedQueue
。
从名字可以看出,ConcurrentLinkedQueue
这个队列使用链表作为其数据结构.ConcurrentLinkedQueue
应该算是在高并发环境中性能最好的队列了,它之所有能有很好的性能,是因为其内部复杂的实现。
ConcurrentLinkedQueue
适合在对性能要求相对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的成本较高则适合使用无锁的 ConcurrentLinkedQueue
来替代!
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable
说明: ConcurrentLinkedQueue继承了抽象类AbstractQueue,AbstractQueue定义了对队列的基本操作;同时实现了Queue接口,Queue定义了对队列的基本操作,同时,还实现了Serializable接口,表示可以被序列化。
请带着如下问题继续阅读后文:
- 要想用线程安全的队列有哪些选择? Vector,
Collections.synchronizedList(List<T> list)
, ConcurrentLinkedQueue等 - ConcurrentLinkedQueue实现的数据结构?
- ConcurrentLinkedQueue底层原理? 全程无锁(CAS)
- ConcurrentLinkedQueue的核心方法有哪些? offer(),poll(),peek(),isEmpty()等队列常用方法
- ConcurrentLinkedQueue适合什么样的使用场景?
ConcurrentLinkedQueue数据结构:
ConcurrentLinkedQueue的数据结构与LinkedBlockingQueue的数据结构相同,都是使用的链表结构:
说明: ConcurrentLinkedQueue采用的链表结构,并且包含有一个头节点和一个尾结点
ConcurrentLinkedQueue底层原理:
ConcurrentLinkedQueue
是基于链表实现的非阻塞并发队列。它的底层原理主要基于 CAS(Compare and Swap) 操作和 volatile 变量来实现线程安全。在ConcurrentLinkedQueue
中,每个节点都包含一个元素以及指向下一个节点的引用。在进行插入和删除操作时,通过CAS操作来保证并发线程安全性,避免使用锁。这样可以实现高效的并发操作,避免了传统锁机制可能引起的性能问题。链表的结构也使得元素的插入和删除操作更加高效,尤其在多线程并发环境下表现良好。由于ConcurrentLinkedQueue
采用了非阻塞算法,因此在高并发情况下能够提供较好的性能表现!
ConcurrentLinkedQueue的底层原理较为复杂,并且这个队列在平时开发中使用的也不多,这里就不做详细介绍了!感兴趣的小伙伴可以参考这篇:Java ConcurrentLinkedQueue非阻塞式同步队列源码深度解析【一万字】
ConcurrentLinkedQueue中的核心方法:
public boolean add(E e); //将指定元素添加到队列的尾部。注意:ConcurrentLinkedQueue不支持null元素
public boolean offer(E e); //将指定元素添加到队列的尾部,如果队列已满则返回false。注意:ConcurrentLinkedQueue不支持null元素
public E poll(); //检索并移除队列的头部元素,如果队列为空则返回null。
public E peek(); //检索但不移除队列的头部元素,如果队列为空则返回null。
public boolean remove(Object o); //从队列中移除指定元素。
public boolean isEmpty(); //检查队列是否为空。
public int size(); //返回队列中的元素数量。
public Iterator<E> iterator(); //返回在此队列元素上进行迭代的迭代器。
我们用一个例子来看看ConcurrentLinkedQueue使用方法:
import java.util.concurrent.ConcurrentLinkedQueue;
class PutThread extends Thread {
private ConcurrentLinkedQueue<Integer> clq;
public PutThread(ConcurrentLinkedQueue<Integer> clq) {
this.clq = clq;
}
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("add " + i);
clq.add(i);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class GetThread extends Thread {
private ConcurrentLinkedQueue<Integer> clq;
public GetThread(ConcurrentLinkedQueue<Integer> clq) {
this.clq = clq;
}
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("poll " + clq.poll());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ConcurrentLinkedQueueDemo {
public static void main(String[] args) {
ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<Integer>();
PutThread p1 = new PutThread(clq);
GetThread g1 = new GetThread(clq);
p1.start();
g1.start();
}
}
运行结果:
add 0
poll null
add 1
poll 0
add 2
poll 1
add 3
poll 2
add 4
poll 3
add 5
poll 4
poll 5
add 6
add 7
poll 6
poll 7
add 8
add 9
poll 8
说明: GetThread线程不会因为ConcurrentLinkedQueue队列为空而等待,而是直接返回null,所以当实现队列为空而需要等待时,则需要用户自己实现等待逻辑!
ConcurrentLinkedQueue适合的场景:
ConcurrentLinkedQueue
适用于需要在多线程环境下 安全地进行元素插入和删除操作的场景。
- 生产者-消费者模式:当有多个生产者和消费者同时操作队列时,
ConcurrentLinkedQueue
可以保证线程安全,避免数据竞争和死锁。 - 任务调度:在任务调度系统中,多个线程需要安全地添加和获取任务。
ConcurrentLinkedQueue
可以作为任务队列使用,保证任务的安全添加和获取。 - 并发数据处理:在需要并发处理大量数据的情况下,
ConcurrentLinkedQueue
可以作为数据缓冲区,多个线程可以安全地向队列中添加数据,并由其他线程处理这些数据。
总的来说,ConcurrentLinkedQueue
适用于需要高效并发操作的场景,可以提供线程安全的队列操作,避免使用显式的同步机制!
5、BlockingQueue
阻塞队列(BlockingQueue
)被广泛使用在 “生产者-消费者” 问题中,其原因是 BlockingQueue
提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空!
首先,最基本的来说, BlockingQueue 是一个先进先出的队列(Queue),为什么说是阻塞(Blocking)的呢?是因为 BlockingQueue 支持当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。
BlockingQueue 只是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口! 提供的操作总结如下:
Operation | Throws exception | Special value | Block | Time out |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
获取 | element() | peek() | — | — |
BlockingQueue继承于Queue接口,因此基本操作有:
插入元素
- add(E e) :往队列插入数据,当队列满时,插入元素时会抛出IllegalStateException异常
- offer(E e):当往队列插入数据时,插入成功返回
true
,否则则返回false
。当队列满时不会抛出异常
删除元素
- remove(Object o):从队列中删除数据,成功则返回
true
,否则为false
- poll:删除数据,当队列为空时,返回null
查看元素
- element:获取队头元素,如果队列为空时则抛出NoSuchElementException异常
- peek:获取队头元素,如果队列为空则抛出NoSuchElementException异常
BlockingQueue具有的特殊操作有:
插入数据
- put:当阻塞队列容量已经满时,往阻塞队列插入数据的线程会被阻塞,直至阻塞队列已经有空余的容量可供使用
- offer(E e, long timeout, TimeUnit unit):若阻塞队列已经满时,同样会阻塞插入数据的线程,直至阻塞队列已经有空余的地方,与put方法不同的是,该方法会有一个超时时间,若超过当前给定的超时时间,插入数据的线程会退出
删除数据
- take:当阻塞队列为空时,获取队头数据的线程会被阻塞
- poll(long timeout, TimeUnit unit):当阻塞队列为空时,获取数据的线程会被阻塞,另外,如果被阻塞的线程超过了给定的时长,该线程会退出
BlockingQueue 不接受 null 值的插入! 相应的插入方法在碰到 null 的插入时会抛出 NullPointerException 异常。null 值在这里通常用于作为特殊值返回(表格中的第三列),代表 poll 失败。所以,如果允许插入 null 值的话,那获取的时候,就不能很好地用 null 来判断到底是代表插入失败,还是获取的值就是 null 值!
一个 BlockingQueue 可能是有界的! 如果在插入的时候,发现队列满了,那么插入操作将会阻塞。通常,在这里我们说的无界队列也不是说真正的无界 ,而是它的容量是 Integer.MAX_VALUE(21亿多)!
BlockingQueue 的实现都是线程安全的! 但是批量的集合操作如 addAll
, containsAll
, retainAll
和 removeAll
不一定能保证原子操作!如 addAll(c) 有可能在添加了一些元素后中途抛出异常,此时 BlockingQueue 中已经添加了部分元素,不会因为异常而将这些已添加的元素再移除!
BlockingQueue 的各个实现(ArrayBlockingQueue、LinkedBlockingQueue 、SynchronousQueue、PriorityBlockingQueue等
)都遵循了这些规则,当然我们也不用死记这个表格,知道有这么回事,然后写代码的时候根据自己的需要去看方法的注释来选取合适的方法即可!
BlockingQueue
只是一个接口,继承自 Queue
,所以其实现类也可以作为 Queue
的实现来使用,而 Queue
又继承自 Collection
接口:
下面逐个介绍 4 个常见的 BlockingQueue
的实现类:ArrayBlockingQueue
、LinkedBlockingQueue
、SynchronousQueue
、PriorityBlockingQueue
!
①ArrayBlockingQueue:数组阻塞队列
ArrayBlockingQueue
是 BlockingQueue
接口的有界队列 实现类,底层采用数组 来实现!
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable{}
ArrayBlockingQueue
一旦创建,容量不能改变(因为它是基于数组实现的,也就具有数组的特性: 一旦初始化,大小就无法修改)!
它的并发控制采用了可重入锁 ReentrantLock
,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。
当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞!
我们用1个示意图来描述其同步机制:
ArrayBlockingQueue 实现并发同步的原理就是,读操作和写操作都需要获取到 AQS 独占锁 才能进行操作!
如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。
如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。
对于 ArrayBlockingQueue,我们可以在构造的时候指定以下3个参数:
- 队列容量:其限制了队列中最多允许的元素个数
- 指定独占锁的公平性:默认是非公平锁,非公平锁的吞吐量比较高;公平锁可以保证每次都是等待最久的线程获取到锁
- 可以指定用一个集合:将此集合中的元素在构造方法期间就先添加到队列中
ArrayBlockingQueue初始化示例1:
//初始化时指定容量大小为10,即有界队列
//默认保持非公平性
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
ArrayBlockingQueue初始化示例2:
//初始化时指定容量大小为10,即有界队列
//true表示保持公平性
//初始化放入时先队列中的集合
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10, true, new ArrayList<>(Arrays.asList(1, 2, 3)));
②LinkedBlockingQueue:链表阻塞队列
LinkedBlockingQueue
底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列 来使用,同样满足 FIFO 的特性!
它与 ArrayBlockingQueue
相比起来,具有更高的吞吐量,为了防止 LinkedBlockingQueue
容量迅速增,损耗大量内存,通常在创建 LinkedBlockingQueue
对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE(21亿多)!
来看它的两个构造方法:
// 传说中的无界队列
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 传说中的有界队列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
初始化示例:
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>(); //初始化时不指定容量大小,即无界队列
BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(1024); //初始化时指定容量大小,即有界队列
我们看看这个类有哪些属性:
// 队列容量
private final int capacity;
// 队列中的元素数量
private final AtomicInteger count = new AtomicInteger(0);
// 队头
private transient Node<E> head;
// 队尾
private transient Node<E> last;
// take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();
// 如果读操作的时候队列是空的,那么等待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();
// put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();
// 如果写操作的时候队列是满的,那么等待 notFull 条件
private final Condition notFull = putLock.newCondition();
这里用了两个ReentrantLock、两个 Condition:
takeLock 和 notEmpty 怎么搭配: 如果要获取(take)一个元素,需要获取 takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。
putLock 需要和 notFull 搭配: 如果要插入(put)一个元素,需要获取 putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)。
这里用一个示意图来看看 LinkedBlockingQueue 的并发读写控制,然后再进行源码分析:
读操作是排好队的,写操作也是排好队的,唯一的并发问题在于一个写操作和一个读操作同时进行,只要控制好这个(通过前面提到的两个ReentrantLock、两个 Condition配合使用达到控制目的)就可以了!
③SynchronousQueue:同步阻塞队列
SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。 据此,把这个类称作一个队列显然是夸大其词了,它更多像是一个汇合点!
它是一个特殊的队列,它的名字其实就蕴含了它的特征—同步的队列。为什么说是同步的呢?这里说的并不是多线程的并发问题,而是因为当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作。这里的 Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。
我们比较少使用到 SynchronousQueue 这个类,不过它在线程池的实现类 ThreadPoolExecutor 中得到了应用,感兴趣的读者可以自行上网搜索相关文章!
虽然说 SynchronousQueue 是队列,但它其实是虚队列,因为其不提供任何空间(一个都没有)来存储元素。数据只能从某个写线程交给另外一个读线程,而不是写到某个队列中等待被消费!
你不能调用 SynchronousQueue的 peek 方法(会直接返回 null),peek 方法的语义是只读取不移除,显然,peek 方法的语义是不符合 SynchronousQueue 的特征!
SynchronousQueue 也不能被迭代,因为根本就没有元素可以拿来迭代!
虽然 SynchronousQueue 间接地实现了 Collection 接口,但是如果你将其当做 Collection 来用的话,那么集合是空的!
当然,这个类也是不允许传递 null 值 (并发包中的容器类好像都不支持插入 null 值,因为 null 值往往用作其他用途,比如用于方法的返回值代表操作失败)!
SynchronousQueue 的使用方式举例:
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueExample {
public static SynchronousQueue<Integer> queue = new SynchronousQueue<>();
public static void main(String[] args) {
// 线程1往队列中插入元素
new Thread(() -> {
try {
int element = 42;
System.out.println("Thread 1 putting: " + element);
queue.put(element); // 等待另一个线程删除元素
System.out.println("Thread 1 put done");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 线程2从队列中删除元素
new Thread(() -> {
try {
System.out.println("Thread 2 taking...");
int element = queue.take(); // 等待另一个线程插入元素
System.out.println("Thread 2 took: " + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
这个示例创建了一个SynchronousQueue
对象,线程1向队列插入元素,线程2从队列中删除元素。由于SynchronousQueue
的特性,线程1和线程2都会被阻塞,直到另一个线程执行相反的操作。这种方式可以用于实现线程间的一对一的数据交换!
总结: SynchronousQueue内部采用的就是ArrayBlockingQueue的阻塞原语,所以在功能上完全可以用ArrayBlockingQueue替换之,但是SynchronousQueue 是轻量级的,SynchronousQueue 不具有任何内部容量,我们可以用来在线程间安全的交换单一元素。所以功能比较单一,优势应该就在于轻量吧!
④PriorityBlockingQueue:带优先级的阻塞队列
PriorityBlockingQueue
是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo()
方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator
来指定排序规则。
PriorityBlockingQueue
并发控制采用的是可重入锁 ReentrantLock
,队列为无界队列。
ArrayBlockingQueue
是有界队列,LinkedBlockingQueue
也可以通过在构造函数中传入capacity
指定队列最大的容量,但是PriorityBlockingQueue
只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话它会自动扩容!
简单地说,它就是 PriorityQueue
的线程安全版本。
不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException
异常!
它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)!
同时注意,如果你从一个 PriorityBlockingQueue 获得一个 Iterator 的话,使用 Iterator遍历该队列,遍历的顺序不会按照优先级顺序,并且顺序是不确定的,可能会是任意顺序!
PriorityBlockingQueue 使用了基于数组的**二叉堆(完全二叉树)**来存放元素,所有的 public 方法采用同一个 lock 进行并发控制。
二叉堆中最小的值就是根节点,但是删除根节点是比较麻烦的,因为需要调整树结构!
用个图解释一下什么是二叉堆,这种数据结构的优点是一目了然的,最小的元素一定是根元素,它是一棵满的树,除了最后一层未必是满的,但是最后一层的节点从左到右紧密排列:
PriorityBlockingQueue 的使用方式举例:
import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
class Person {
private String name;
private int age;
public Person(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
}
class PersonComparator implements Comparator<Person> {
@Override
public int compare(Person person1, Person person2) {
return person1.getAge() - person2.getAge();
}
}
public class CustomPriorityBlockingQueueExample {
public static void main(String[] args) {
PriorityBlockingQueue<Person> queue = new PriorityBlockingQueue<>(3, new PersonComparator());
// 插入元素
queue.offer(new Person("Alice", 25));
queue.offer(new Person("Bob", 20));
queue.offer(new Person("Charlie", 30);
// 从队列中取出并打印元素(按年龄从小到大排序)
while (!queue.isEmpty()) {
Person person = queue.poll();
System.out.println("Person taken: " + person.getName() + " (age: " + person.getAge() + ")");
}
}
}
输出结果:
Person taken: Bob (age: 20)
Person taken: Alice (age: 25)
Person taken: Charlie (age: 30)
BlockingQueue总结及适用场景:
ArrayBlockingQueue 底层是数组,有界队列,如果我们遇到生产者-消费者场景 ,这是非常好的选择。
- 生产者-消费者模式:常用来实现生产者-消费者模式,生产者线程向队列中放入元素,消费者线程从队列中取出元素进行处理。
- 线程池任务队列:在线程池中使用
ArrayBlockingQueue
作为任务队列,控制任务的执行顺序和数量,避免任务堆积过多。- 固定大小的任务缓冲区:在并发编程中,有时需要使用固定大小的任务缓冲区,可以选择
ArrayBlockingQueue
来实现。
LinkedBlockingQueue 底层是链表,可以当做无界和有界队列来使用,所以大家不要以为它就是无界队列。
- 生产者-消费者模式:
LinkedBlockingQueue
常用于实现生产者-消费者模式,生产者线程将数据放入队列,消费者线程从队列中取出数据进行处理。- 限流、任务调度:可以利用
LinkedBlockingQueue
的阻塞特性来实现系统的流量控制,如果队列已满则阻塞新任务的加入,以避免系统过载。- 事件驱动系统:在事件驱动的系统中,可以使用
LinkedBlockingQueue
来作为事件队列,将事件按顺序放入队列,并由后台线程依次处理事件。
SynchronousQueue 本身不带有空间来存储任何元素,可以用于实现线程间的一对一的数据交换。
- 生产者-消费者模式:由于
SynchronousQueue
没有存储的能力,适合用于在生产者和消费者之间一对一传递数据的场景。- 异步消息传递:可以用于异步消息传递机制,一个线程发送消息到
SynchronousQueue
,另一个线程接收处理消息。- 线程协作:可以用于线程之间的协作,实现一种同步的线程通信方式,例如确保某个线程在另一个线程完成某项操作之后才能继续执行。
PriorityBlockingQueue 是无界队列,基于数组,数据结构为二叉堆,数组第一个也是树的根节点总是最小值。
- 任务调度: 可以用于实现任务调度,根据任务的优先级来进行调度。
- 事件处理: 当需要按照事件的优先级来处理事件时,可以使用
PriorityBlockingQueue
来实现事件处理。- 数据结构排序: 适用于需要按照一定规则对数据进行排序的场景,可以使用
PriorityBlockingQueue
来维护有序的数据集合。
6、ConcurrentSkipListMap
为了引出 ConcurrentSkipListMap
,先带着大家简单理解一下什么是跳表!
对于一个单链表,即使链表是有序的,如果我们想要在其中查找某个数据,也只能从头到尾遍历链表,这样效率自然就会很低,跳表就不一样了。跳表是一种可以用来快速查找的数据结构,有点类似于平衡树,它可以对元素进行快速的查找!
但一个重要的区别是:对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整,而对跳表的插入和删除只需要对局部进行操作即可!
这样带来的好处是:在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全;而对于跳表,你只需要局部锁 即可!
这样,在高并发环境下,就可以拥有更高的性能。就查询的性能而言,跳表的时间复杂度是 O(logn) ,所以在并发数据结构中,JDK 使用跳表来实现了一个 ConcurrentSkipListMap!
跳表的本质是同时维护了多个链表,并且链表是分层的:
最低层的链表维护了跳表内所有的元素,每上面一层链表都是下面一层的子集!
跳表内的所有链表的元素都是排好序的,查找时,可以从顶层链表开始,一旦发现被查找的元素大于当前链表中的取值,就会转入下一层链表继续找。这也就是说在查找过程中,搜索是跳跃式的!
如上图所示,在跳表中查找元素 18 的过程如下:
原来查找 18 的时候原来需要遍历 18 次,现在通过跳表只需要 7 次即可。针对链表长度比较大的时候,构建索引查找效率的提升就会非常明显!
从上面很容易看出,跳表是一种利用空间换时间的算法!
我们可以认为ConcurrentSkipListMap是 “线程安全” 的HashMap,此外,使用跳表实现的 ConcurrentSkipListMap
和使用哈希算法实现的 HashMap
的另外一个不同之处是:哈希并不会保存元素的顺序,而跳表内所有的元素都是排好序的!
因此在对跳表进行遍历时,你会得到一个有序的结果。所以,如果你的应用需要有序性,那么ConcurrentSkipListMap
就是你不二之选!
/**
* ConcurrentSkipListMap是“线程安全”的哈希表,而 HashMap 是非线程安全的!
* 1、当map是ConcurrentSkipListMap对象时,程序能正常运行
* 2、当map是HashMap对象时,程序会产生ConcurrentModificationException异常
*/
public class ConcurrentSkipListMapDemo {
//private static Map<String, Integer> map = new HashMap<String, Integer>();
private static Map<String, Integer> map = new ConcurrentSkipListMap<String, Integer>();
public static void main(String[] args) {
// 同时启动两个线程对map进行操作
new MyThread("a").start();
new MyThread("b").start();
}
/**
* 修改map的同时遍历map
*/
private static class MyThread extends Thread {
MyThread(String name) {
super(name);
}
@Override
public void run() {
int i = 0;
while (i++ < 6) {
// 修改map
map.put(Thread.currentThread().getName() + i, i);
// 遍历map
Iterator<Map.Entry<String, Integer>> iter = map.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Integer> entry = iter.next();
System.out.print("(" + entry.getKey() + ", " + entry.getValue() + "), ");
}
System.out.println();
}
}
}
}
某一次运行的结果:
(a1, 1), (b1, 1),
(a1, 1), (a1, 1), (a2, 2), (b1, 1),
(b1, 1),
(a1, 1), (a1, 1), (a2, 2), (a2, 2), (a3, 3), (a3, 3), (b1, 1), (b2, 2),
(b1, 1), (a1, 1), (a2, 2), (a3, 3), (b1, 1), (b2, 2), (b3, 3),
(b2, 2), (a1, 1), (b3, 3), (b4, 4),
(a2, 2), (a1, 1), (a3, 3), (a2, 2), (a4, 4), (a3, 3), (b1, 1), (a4, 4), (b2, 2), (b1, 1), (b3, 3), (b2, 2), (b4, 4),
(b3, 3), (b4, 4), (a1, 1), (b5, 5),
(a2, 2), (a3, 3), (a4, 4), (a5, 5), (b1, 1), (b2, 2), (b3, 3), (b4, 4), (b5, 5),
(a1, 1), (a1, 1), (a2, 2), (a2, 2), (a3, 3), (a3, 3), (a4, 4), (a4, 4), (a5, 5), (a5, 5), (b1, 1), (b1, 1), (b2, 2), (b2, 2), (b3, 3), (b4, 4), (b3, 3), (b5, 5), (b6, 6),
(b4, 4), (b5, 5), (b6, 6),
(a1, 1), (a2, 2), (a3, 3), (a4, 4), (a5, 5), (a6, 6), (b1, 1), (b2, 2), (b3, 3), (b4, 4), (b5, 5), (b6, 6),
忽略中间的临时态,可以看到,最终的结果(最后一行)是有序的!
7、总结
1. ConcurrentHashMap
- 是线程安全的HashMap实现
- JDK 1.7版本使用分段锁提高并发访问率
- JDK 1.8版本使用Node数组、链表和红黑树,通过synchronized和CAS操作实现并发控制
- 适用于多线程、高并发环境下的添加/删除操作
2. CopyOnWriteArrayList
- 写操作通过创建底层数组的副本来实现,不影响读取操作
- 适用于读多写少的场景,提供高性能的线程安全List实现
3. ConcurrentLinkedQueue
- 基于链表的非阻塞并发队列
- 通过CAS操作和volatile变量实现线程安全
- 适用于多线程环境下的安全元素插入和删除
4. BlockingQueue
- 阻塞队列接口,包括
ArrayBlockingQueue
、LinkedBlockingQueue
、SynchronousQueue
和PriorityBlockingQueue
等实现 - 适用于生产者-消费者问题,提供阻塞的插入和移除方法
5. ConcurrentSkipListMap
- 使用跳表实现的线程安全的Map
- 提供有序的键值存储,适合于需要有序性的数据存储和检索的场景
总之,这些容器在多线程编程中非常有用,它们通过不同的并发控制技术,如锁分段、写时复制、CAS、跳表等 ,来保证线程安全的同时,也提供了较高的并发性能!