在多线程编程中,阻塞队列是一种常用的线程安全数据结构,当队列为空时,获取元素的操作会阻塞等待直到有元素入队;当队列满时,插入元素的操作会阻塞等待直到队列有空闲空间。下面介绍C++中阻塞队列的实现方式。

阻塞队列的核心设计思路
要实现线程安全的阻塞队列,需要解决两个核心问题:一是保证队列内部数据操作的线程安全,避免多线程同时修改队列导致数据错乱;二是实现阻塞等待和唤醒机制,让线程在条件不满足时进入等待状态,条件满足时被唤醒继续执行。
在C++中,我们可以借助标准库提供的std::mutex实现互斥访问,用std::condition_variable实现线程间的条件通知,再结合std::queue作为底层容器,就可以完成阻塞队列的基础实现。
核心组件说明
互斥锁std::mutex
std::mutex用于保护队列的共享数据,每次对队列进行插入、删除、判空、判满等操作时,都需要先获取互斥锁,操作完成后再释放锁,保证同一时间只有一个线程能修改队列状态。
条件变量std::condition_variable
std::condition_variable需要和互斥锁配合使用,用于实现线程的阻塞和唤醒。当队列为空时,获取元素的线程会在条件变量上等待,直到有其他线程插入元素后通过条件变量通知等待的线程;当队列满时,插入元素的线程会等待,直到有其他线程取出元素后通知。
底层容器
这里选择std::queue作为底层存储容器,也可以选择std::deque或者自定义环形缓冲区,根据需求调整即可。
完整实现代码
下面是一个支持固定容量的阻塞队列实现,包含了入队、出队、停止队列等核心功能:
#include <queue>
#include <mutex>
#include <condition_variable>
#include <iostream>
template <typename T>
class BlockingQueue {
private:
std::queue<T> queue_; // 底层存储队列
std::mutex mutex_; // 互斥锁,保护队列操作
std::condition_variable not_empty_; // 条件变量:队列不为空时通知
std::condition_variable not_full_; // 条件变量:队列不满时通知
size_t max_size_; // 队列最大容量
bool stop_flag_; // 停止标志,用于唤醒所有等待线程
public:
// 构造函数,指定队列最大容量
explicit BlockingQueue(size_t max_size)
: max_size_(max_size), stop_flag_(false) {}
// 入队操作,队列满时阻塞
void push(const T& value) {
std::unique_lock<std::mutex> lock(mutex_);
// 等待队列不满或者停止标志被设置
not_full_.wait(lock, [this]() {
return queue_.size() < max_size_ || stop_flag_;
});
// 如果设置了停止标志,直接返回
if (stop_flag_) {
return;
}
queue_.push(value);
// 通知等待取元素的线程,队列现在不为空了
not_empty_.notify_one();
}
// 出队操作,队列空时阻塞,返回是否成功取出元素
bool pop(T& value) {
std::unique_lock<std::mutex> lock(mutex_);
// 等待队列不为空或者停止标志被设置
not_empty_.wait(lock, [this]() {
return !queue_.empty() || stop_flag_;
});
// 如果队列为空且停止标志被设置,返回失败
if (queue_.empty() && stop_flag_) {
return false;
}
value = queue_.front();
queue_.pop();
// 通知等待入队的线程,队列现在有空闲空间了
not_full_.notify_one();
return true;
}
// 停止队列,唤醒所有等待的线程
void stop() {
std::unique_lock<std::mutex> lock(mutex_);
stop_flag_ = true;
// 唤醒所有等待的线程,让它们检查停止标志
not_empty_.notify_all();
not_full_.notify_all();
}
// 获取当前队列大小
size_t size() {
std::unique_lock<std::mutex> lock(mutex_);
return queue_.size();
}
// 判断队列是否为空
bool empty() {
std::unique_lock<std::mutex> lock(mutex_);
return queue_.empty();
}
};
// 测试代码示例
void producer(BlockingQueue<int>* queue, int start, int end) {
for (int i = start; i <= end; ++i) {
queue->push(i);
std::cout << "生产者插入元素: " << i << std::endl;
}
}
void consumer(BlockingQueue<int>* queue, int id) {
int value;
while (queue->pop(value)) {
std::cout << "消费者" << id << "取出元素: " << value << std::endl;
}
std::cout << "消费者" << id << "退出" << std::endl;
}
int main() {
BlockingQueue<int> queue(5); // 创建容量为5的阻塞队列
// 创建生产者线程
std::thread prod1(producer, &queue, 1, 5);
std::thread prod2(producer, &queue, 6, 10);
// 创建消费者线程
std::thread cons1(consumer, &queue, 1);
std::thread cons2(consumer, &queue, 2);
// 等待生产者线程完成
prod1.join();
prod2.join();
// 停止队列,唤醒所有消费者线程
queue.stop();
// 等待消费者线程完成
cons1.join();
cons2.join();
return 0;
}
代码关键点解析
1. 条件变量的等待必须使用带谓词的wait方法,避免虚假唤醒问题。比如not_empty_.wait(lock, [this]() { return !queue_.empty() || stop_flag_; });,这样即使线程被虚假唤醒,也会重新检查条件是否满足,只有条件满足才会继续执行。
2. stop方法的作用是优雅关闭阻塞队列,当程序需要退出时,调用stop可以唤醒所有等待的线程,让它们检查到停止标志后主动退出,避免出现线程一直阻塞无法退出的问题。
3. 互斥锁使用std::unique_lock而不是std::lock_guard,因为std::condition_variable的wait方法需要接收std::unique_lock对象,它会在等待时自动释放锁,被唤醒后重新获取锁,符合我们的使用需求。
使用场景与扩展
这个阻塞队列可以用于生产者消费者模型,实现多线程之间的数据传递和解耦。如果需要无界队列,可以去掉容量限制,去掉not_full_条件变量相关的逻辑,入队操作不需要等待队列不满。如果需要支持超时等待,可以把wait替换为wait_for或者wait_until,指定最大等待时间,超时后直接返回。
C++阻塞队列C++并发队列std_mutexstd_condition_variable线程同步修改时间:2026-06-19 18:06:41