生产者消费者模型是多线程编程中处理线程间数据交互的经典设计模式,核心是通过一个共享的缓冲区,让生产者线程向缓冲区写入数据,消费者线程从缓冲区读取数据,同时保证多线程操作下的数据安全和线程协作效率。在C++中实现该模型,需要借助标准库提供的多线程同步工具来完成。

核心同步工具介绍
实现生产者消费者模型需要用到三个核心的同步组件,分别是互斥锁、条件变量和共享队列,各自的作用如下:
- 互斥锁(std::mutex):保证同一时间只有一个线程可以操作共享缓冲区,避免数据竞争问题。
- 条件变量(std::condition_variable):用于线程间的等待和唤醒通知,当缓冲区满时生产者等待,缓冲区空时消费者等待,有空闲或有数据时对应线程被唤醒。
- 共享队列(std::queue):作为生产者和消费者之间的数据缓冲区,存储生产者生成的数据供消费者读取。
完整实现代码示例
下面的代码实现了一个简单的生产者消费者模型,包含一个生产者线程和三个消费者线程,生产者每隔100毫秒生成一个随机整数放入队列,消费者从队列中取出数据并处理,当生产者生成10个数据后结束生产,所有消费者处理完剩余数据后退出。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <random>
#include <chrono>
// 共享队列,作为数据缓冲区
std::queue<int> data_queue;
// 互斥锁,保护共享队列的操作
std::mutex queue_mutex;
// 条件变量,用于生产者消费者之间的通知
std::condition_variable queue_cv;
// 标记生产者是否结束生产
bool producer_finished = false;
// 生产者需要生成的数据总量
const int TOTAL_DATA_COUNT = 10;
// 生产者线程函数
void producer() {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1, 100);
for (int i = 0; i < TOTAL_DATA_COUNT; ++i) {
// 模拟生产耗时
std::this_thread::sleep_for(std::chrono::milliseconds(100));
int data = dis(gen);
{
// 加锁,操作共享队列
std::lock_guard<std::mutex> lock(queue_mutex);
data_queue.push(data);
std::cout << "生产者生成数据: " << data << ", 当前队列大小: " << data_queue.size() << std::endl;
}
// 唤醒一个等待的消费者线程
queue_cv.notify_one();
}
{
// 生产结束,加锁修改标记
std::lock_guard<std::mutex> lock(queue_mutex);
producer_finished = true;
}
// 唤醒所有消费者线程,让它们检查生产结束标记
queue_cv.notify_all();
std::cout << "生产者结束生产" << std::endl;
}
// 消费者线程函数
void consumer(int consumer_id) {
while (true) {
std::unique_lock<std::mutex> lock(queue_mutex);
// 等待条件:队列不为空 或者 生产者已经结束生产
queue_cv.wait(lock, []() {
return !data_queue.empty() || producer_finished;
});
// 如果队列为空且生产者已经结束,退出循环
if (data_queue.empty() && producer_finished) {
break;
}
// 从队列取出数据
int data = data_queue.front();
data_queue.pop();
std::cout << "消费者" << consumer_id << "消费数据: " << data << ", 剩余队列大小: " << data_queue.size() << std::endl;
// 解锁,让其他线程可以操作队列
lock.unlock();
// 模拟消费耗时
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}
std::cout << "消费者" << consumer_id << "退出" << std::endl;
}
int main() {
// 创建生产者线程
std::thread prod_thread(producer);
// 创建三个消费者线程
std::thread cons_threads[3];
for (int i = 0; i < 3; ++i) {
cons_threads[i] = std::thread(consumer, i + 1);
}
// 等待生产者线程结束
prod_thread.join();
// 等待所有消费者线程结束
for (int i = 0; i < 3; ++i) {
cons_threads[i].join();
}
std::cout << "所有线程执行完毕" << std::endl;
return 0;
}
代码关键点解析
互斥锁的使用
代码中使用了std::lock_guard和std::unique_lock两种锁管理工具,std::lock_guard适合简单的加锁场景,在作用域结束时自动解锁;std::unique_lock更灵活,可以手动解锁,适合需要配合条件变量使用的场景,因为条件变量的wait函数需要接收一个std::unique_lock对象,并且会在等待时自动释放锁,被唤醒后重新加锁。
条件变量的等待逻辑
消费者线程的等待使用了带谓词的wait函数,即queue_cv.wait(lock, [](){ return !data_queue.empty() || producer_finished; });,这样可以避免虚假唤醒问题。虚假唤醒是指线程在没有收到明确通知的情况下被唤醒,使用谓词可以让线程被唤醒后再次检查条件是否满足,不满足则继续等待。
生产结束的处理
生产者生产完所有数据后,会修改producer_finished标记并调用notify_all唤醒所有等待的消费者,消费者被唤醒后会检查队列是否为空且生产已经结束,如果是则退出循环,避免消费者线程一直等待。
实际应用场景扩展
上述示例是最基础的生产者消费者模型实现,在实际项目中可以根据需求扩展,比如可以设置缓冲区的最大容量,当队列大小达到最大值时生产者等待,避免队列无限增长占用过多内存;也可以支持多个生产者线程,只需要保证生产者的操作同样受互斥锁保护即可;还可以给数据加上优先级,使用优先队列代替普通队列作为缓冲区。