在C++多线程编程中,生产者消费者模型是非常经典的应用场景,它解耦了数据的生产和消费过程,提升程序的整体运行效率。条件变量作为C++标准库提供的线程同步工具,能够配合互斥锁实现线程间的精准通信,是构建该模型的核心组件。

C++条件变量基础概念
条件变量是<condition_variable>头文件中提供的同步原语,需要和互斥锁配合使用,主要作用是让线程在某个条件不满足时进入阻塞等待状态,当其他线程修改了共享数据并通知条件变量时,等待的线程会被唤醒重新检查条件。
核心的常用方法有两个:
- wait(lock, pred):让当前线程进入阻塞等待,释放传入的互斥锁,当被唤醒时会重新获取锁并检查pred谓词,如果pred返回false则继续等待,避免虚假唤醒问题。
- notify_one()/notify_all():唤醒一个或者所有等待在该条件变量上的线程。
生产者消费者模型设计思路
我们需要先明确模型的核心组成部分:
- 一个线程安全的共享队列,用来存储生产者生产的数据
- 生产者线程:不断生成数据放入队列,当队列满时阻塞等待
- 消费者线程:不断从队列中取数据消费,当队列空时阻塞等待
- 两个互斥锁:一个保护共享队列的访问,一个配合条件变量使用
- 两个条件变量:一个用于通知队列非空(消费者等待),一个用于通知队列非满(生产者等待)
完整代码实现
下面是使用C++条件变量实现生产者消费者模型的完整代码:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
// 定义队列最大容量
const int QUEUE_MAX_SIZE = 5;
// 共享队列
std::queue<int> data_queue;
// 互斥锁,保护共享队列
std::mutex queue_mutex;
// 条件变量:队列非空,通知消费者
std::condition_variable not_empty;
// 条件变量:队列非满,通知生产者
std::condition_variable not_full;
// 控制线程退出的标志
bool stop_flag = false;
// 生产者函数
void producer(int producer_id) {
int data = 0;
while (true) {
// 模拟生产耗时
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock(queue_mutex);
// 等待队列非满,避免虚假唤醒
not_full.wait(lock, []() {
return data_queue.size() < QUEUE_MAX_SIZE || stop_flag;
});
// 如果收到退出信号,结束生产者线程
if (stop_flag) {
break;
}
// 生产数据并放入队列
data_queue.push(data);
std::cout << "生产者" << producer_id << "生产数据:" << data << std::endl;
data++;
// 通知消费者队列非空
not_empty.notify_one();
}
}
// 消费者函数
void consumer(int consumer_id) {
while (true) {
std::unique_lock<std::mutex> lock(queue_mutex);
// 等待队列非空,避免虚假唤醒
not_empty.wait(lock, []() {
return !data_queue.empty() || stop_flag;
});
// 如果收到退出信号且队列为空,结束消费者线程
if (stop_flag && data_queue.empty()) {
break;
}
// 取出队首数据消费
int data = data_queue.front();
data_queue.pop();
std::cout << "消费者" << consumer_id << "消费数据:" << data << std::endl;
// 释放锁,模拟消费耗时
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(150));
// 通知生产者队列非满
not_full.notify_one();
}
}
int main() {
// 创建2个生产者线程和3个消费者线程
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
for (int i = 0; i < 2; i++) {
producers.emplace_back(producer, i + 1);
}
for (int i = 0; i < 3; i++) {
consumers.emplace_back(consumer, i + 1);
}
// 运行3秒后停止所有线程
std::this_thread::sleep_for(std::chrono::seconds(3));
{
std::lock_guard<std::mutex> lock(queue_mutex);
stop_flag = true;
}
// 唤醒所有等待的线程,让它们检查退出标志
not_empty.notify_all();
not_full.notify_all();
// 等待所有线程退出
for (auto& p : producers) {
p.join();
}
for (auto& c : consumers) {
c.join();
}
std::cout << "所有线程已退出" << std::endl;
return 0;
}
实现注意事项
避免虚假唤醒
条件变量的wait方法可能会在没有收到通知的情况下被唤醒,这就是虚假唤醒。因此我们必须在wait时传入谓词函数,每次被唤醒后都重新检查条件是否满足,上述代码中wait(lock, [](){ return ...; })的写法就是标准的避免虚假唤醒的方式。
锁的正确使用
条件变量的wait方法必须在持有互斥锁的情况下调用,传入的锁对象会被wait临时释放,唤醒后会重新获取锁,这样才能保证条件检查和数据操作的原子性,避免数据竞争。
退出机制处理
如果需要让线程优雅退出,需要设置全局的退出标志,修改标志后唤醒所有等待的条件变量,让等待的线程能够检查到退出条件,避免线程永久阻塞。
运行结果说明
运行上述代码后,会看到生产者和消费者交替输出生产和消费的数据,队列满时生产者会阻塞,队列空时消费者会阻塞,整体运行符合生产者消费者模型的逻辑。