Java中创建并同步包含多个阻塞队列的列表
在多线程编程场景中,我们经常会遇到需要管理多个阻塞队列的需求,比如不同业务模块对应独立的任务队列,或者实现生产者-消费者模型时拆分不同优先级的任务通道。此时如果直接将多个阻塞队列放在普通列表中,会存在线程安全问题,比如多个线程同时修改列表结构、访问队列时可能出现竞态条件。本文将介绍如何创建包含多个阻塞队列的列表,并对其进行同步处理,保证多线程环境下的安全性。
核心需求分析
我们需要实现的目标主要包含三点:
- 创建多个阻塞队列实例,用于存储不同类型的任务或数据
- 将这些阻塞队列统一管理到一个列表中,方便批量操作
- 对列表的访问、队列的操作进行同步控制,避免多线程环境下的并发问题
同步方案设计与实现
Java中阻塞队列本身(如ArrayBlockingQueue、LinkedBlockingQueue)是线程安全的,但是存储这些队列的列表如果不是线程安全的容器,在多线程增删队列、遍历列表时依然会出现问题。我们可以结合Collections.synchronizedList包装列表,同时配合同步代码块控制对列表和队列的复合操作。
1. 基础实现:创建同步的阻塞队列列表
下面的代码演示了如何创建包含多个ArrayBlockingQueue的同步列表,同时实现了向指定队列添加元素、从指定队列获取元素的同步方法:
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class SynchronizedBlockingQueueList {
// 同步的阻塞队列列表,使用final修饰避免引用被修改
private final List<BlockingQueue<String>> queueList;
public SynchronizedBlockingQueueList(int queueCount, int singleQueueCapacity) {
// 先创建普通列表,再包装为同步列表
List<BlockingQueue<String>> tempList = new ArrayList<>();
for (int i = 0; i < queueCount; i++) {
// 创建指定容量的有界阻塞队列,也可以根据需求换成LinkedBlockingQueue等
tempList.add(new ArrayBlockingQueue<>(singleQueueCapacity));
}
// 包装为线程安全的同步列表
this.queueList = Collections.synchronizedList(tempList);
}
/**
* 向指定索引的阻塞队列中添加元素,索引越界或队列满时会抛出异常
* @param queueIndex 队列索引
* @param element 要添加的元素
*/
public void addElementToQueue(int queueIndex, String element) {
// 同步代码块,保证对列表的访问和队列操作的整体原子性
synchronized (queueList) {
if (queueIndex < 0 || queueIndex >= queueList.size()) {
throw new IllegalArgumentException("队列索引超出范围,当前列表大小为:" + queueList.size());
}
BlockingQueue<String> targetQueue = queueList.get(queueIndex);
// 阻塞队列的add方法是线程安全的,这里在同步块内进一步保证整体操作的原子性
targetQueue.add(element);
}
}
/**
* 从指定索引的阻塞队列中获取并移除元素,队列为空时阻塞等待
* @param queueIndex 队列索引
* @return 获取到的元素
* @throws InterruptedException 阻塞等待时被中断抛出
*/
public String takeElementFromQueue(int queueIndex) throws InterruptedException {
synchronized (queueList) {
if (queueIndex < 0 || queueIndex >= queueList.size()) {
throw new IllegalArgumentException("队列索引超出范围,当前列表大小为:" + queueList.size());
}
BlockingQueue<String> targetQueue = queueList.get(queueIndex);
// take方法是阻塞方法,线程安全,同步块保证操作原子性
return targetQueue.take();
}
}
/**
* 获取当前列表中的所有阻塞队列(注意返回的是同步列表本身,外部操作需自行同步或避免修改)
* @return 同步的阻塞队列列表
*/
public List<BlockingQueue<String>> getQueueList() {
return queueList;
}
public static void main(String[] args) throws InterruptedException {
// 创建包含3个阻塞队列的列表,每个队列容量为10
SynchronizedBlockingQueueList queueManager = new SynchronizedBlockingQueueList(3, 10);
// 测试向第一个队列添加元素
queueManager.addElementToQueue(0, "任务1");
queueManager.addElementToQueue(0, "任务2");
// 测试从第一个队列获取元素
System.out.println("从队列0获取元素:" + queueManager.takeElementFromQueue(0));
System.out.println("从队列0获取元素:" + queueManager.takeElementFromQueue(0));
// 测试遍历所有队列(遍历同步列表需要手动同步,避免遍历过程中列表被修改)
synchronized (queueManager.getQueueList()) {
for (int i = 0; i < queueManager.getQueueList().size(); i++) {
BlockingQueue<String> queue = queueManager.getQueueList().get(i);
System.out.println("队列" + i + "当前元素数量:" + queue.size());
}
}
}
}2. 实现说明
上述代码的实现细节值得注意:
- 阻塞队列选择了ArrayBlockingQueue,这是有界阻塞队列,构造函数需要指定容量,也可以根据场景替换为LinkedBlockingQueue(默认无界,也可指定容量)等其他BlockingQueue实现类。
- 列表通过
Collections.synchronizedList包装后,单个的增删改查操作是线程安全的,但是如果需要执行复合操作(比如先判断索引再获取队列),依然需要额外的同步,因此我们在addElementToQueue和takeElementFromQueue方法中使用了同步代码块,同步对象就是queueList本身,保证操作的原子性。 - 遍历同步列表时,官方文档明确要求需要手动在同步块内执行,否则可能抛出ConcurrentModificationException,因此main方法的遍历逻辑也添加了同步控制。
进阶优化:使用并发容器替代同步列表
如果场景中需要频繁对列表进行增删操作,或者需要更高的并发性能,可以使用CopyOnWriteArrayList来存储阻塞队列,它是Java并发包中的线程安全列表,适合读多写少的场景,遍历时不需要额外同步:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
public class ConcurrentBlockingQueueList {
// 使用CopyOnWriteArrayList存储阻塞队列,本身线程安全
private final CopyOnWriteArrayList<BlockingQueue<String>> queueList;
public ConcurrentBlockingQueueList(int queueCount, int singleQueueCapacity) {
queueList = new CopyOnWriteArrayList<>();
for (int i = 0; i < queueCount; i++) {
queueList.add(new ArrayBlockingQueue<>(singleQueueCapacity));
}
}
/**
* 向指定索引队列添加元素
*/
public void addElementToQueue(int queueIndex, String element) {
// CopyOnWriteArrayList的get操作是线程安全的,但是复合操作仍需同步
synchronized (queueList) {
if (queueIndex < 0 || queueIndex >= queueList.size()) {
throw new IllegalArgumentException("队列索引超出范围");
}
queueList.get(queueIndex).add(element);
}
}
/**
* 从指定索引队列获取元素
*/
public String takeElementFromQueue(int queueIndex) throws InterruptedException {
synchronized (queueList) {
if (queueIndex < 0 || queueIndex >= queueList.size()) {
throw new IllegalArgumentException("队列索引超出范围");
}
return queueList.get(queueIndex).take();
}
}
/**
* 获取所有队列,CopyOnWriteArrayList遍历不需要额外同步
*/
public void printAllQueueSize() {
for (int i = 0; i < queueList.size(); i++) {
System.out.println("队列" + i + "元素数量:" + queueList.get(i).size());
}
}
public static void main(String[] args) throws InterruptedException {
ConcurrentBlockingQueueList manager = new ConcurrentBlockingQueueList(2, 5);
manager.addElementToQueue(0, "测试任务");
manager.printAllQueueSize();
}
}注意事项
在实际使用过程中,还需要注意以下几点:
- 阻塞队列本身的操作(如put、take、add等)是线程安全的,但是操作多个队列的复合逻辑(比如从队列A取元素放到队列B)需要额外同步,避免中间状态不一致。
- 如果列表中的阻塞队列会被动态增删,一定要保证增删操作和后续访问队列的操作在同一个同步块内,避免获取到已经移除的队列实例。
- 同步代码块的粒度要尽可能小,只包裹需要原子性的操作部分,避免长时间阻塞其他线程,影响并发性能。
Java多线程阻塞队列同步列表线程安全CopyOnWriteArrayList修改时间:2026-05-24 12:30:43