在C++20之前,终止多线程协作的生产者消费者任务时,开发者通常会使用共享的布尔标志位或者强制中断线程的方式,前者容易出现内存可见性问题,后者可能导致资源未释放、数据损坏等严重问题。std::stop_source配合std::stop_token提供了协作式的停止机制,让线程可以主动检查停止请求,在合适的时机安全退出,非常适合复杂的生产者消费者协作场景。

std::stop_source基础概念
std::stop_source是停止状态的所有者,它可以发出停止请求,与之关联的std::stop_token可以被传递给其他线程,用于检查是否收到了停止请求。核心特性如下:
- 一个std::stop_source可以关联多个std::stop_token,所有关联的token都能感知到停止请求
- 停止请求发出后,无法撤销,所有关联的token的stop_requested()都会返回true
- 支持注册停止回调,当停止请求发出时,会自动执行注册的回调函数
传统终止方式的问题
先看一个使用共享布尔标志位终止生产者消费者任务的示例,这种方式存在明显缺陷:
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
std::queue<int> data_queue;
std::mutex mtx;
std::condition_variable cv;
// 共享的停止标志,使用atomic保证可见性
std::atomic<bool> stop_flag(false);
void producer() {
int count = 0;
while (!stop_flag.load()) {
std::unique_lock<std::mutex> lock(mtx);
data_queue.push(count++);
cv.notify_one();
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// 终止时可能还有未处理的数据
std::cout << "生产者退出" << std::endl;
}
void consumer() {
while (!stop_flag.load()) {
std::unique_lock<std::mutex> lock(mtx);
if (cv.wait_for(lock, std::chrono::milliseconds(50), []{ return !data_queue.empty() || stop_flag.load(); })) {
if (stop_flag.load()) break;
int val = data_queue.front();
data_queue.pop();
lock.unlock();
std::cout << "消费数据:" << val << std::endl;
}
}
std::cout << "消费者退出" << std::endl;
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
std::this_thread::sleep_for(std::chrono::seconds(1));
// 发出停止请求
stop_flag.store(true);
cv.notify_all();
t1.join();
t2.join();
return 0;
}
上述代码虽然使用了atomic的布尔标志位,但是终止时如果队列中还有未处理的数据,生产者可能来不及把剩余数据全部推入队列就退出,消费者也可能漏处理部分数据,而且停止标志和队列、条件变量的状态没有联动,复杂场景下很容易出现逻辑漏洞。
使用std::stop_source实现优雅终止
下面用std::stop_source改造上述生产者消费者场景,实现安全的协作式终止:
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <stop_token>
#include <chrono>
std::queue<int> data_queue;
std::mutex mtx;
std::condition_variable cv;
// 生产者函数,接收stop_token用于检查停止请求
void producer(std::stop_token stoken) {
int count = 0;
while (!stoken.stop_requested()) {
std::unique_lock<std::mutex> lock(mtx);
// 检查停止请求,避免推送多余数据
if (stoken.stop_requested()) break;
data_queue.push(count++);
cv.notify_one();
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// 终止前可以处理收尾逻辑,比如输出剩余队列大小
std::unique_lock<std::mutex> lock(mtx);
std::cout << "生产者退出,剩余队列大小:" << data_queue.size() << std::endl;
}
// 消费者函数,接收stop_token用于检查停止请求
void consumer(std::stop_token stoken) {
while (!stoken.stop_requested()) {
std::unique_lock<std::mutex> lock(mtx);
// 等待队列非空或者停止请求
if (cv.wait_for(lock, std::chrono::milliseconds(50), [&stoken]{
return !data_queue.empty() || stoken.stop_requested();
})) {
if (stoken.stop_requested()) break;
int val = data_queue.front();
data_queue.pop();
lock.unlock();
std::cout << "消费数据:" << val << std::endl;
}
}
// 终止前处理剩余数据
std::unique_lock<std::mutex> lock(mtx);
while (!data_queue.empty()) {
int val = data_queue.front();
data_queue.pop();
std::cout << "终止前处理剩余数据:" << val << std::endl;
}
std::cout << "消费者退出" << std::endl;
}
int main() {
// 创建stop_source,用于发出停止请求
std::stop_source stop_src;
// 启动线程时传入stop_source的stop_token
std::thread t1(producer, stop_src.get_token());
std::thread t2(consumer, stop_src.get_token());
// 运行1秒后发出停止请求
std::this_thread::sleep_for(std::chrono::seconds(1));
stop_src.request_stop();
// 唤醒所有等待的条件变量,避免线程一直阻塞
cv.notify_all();
t1.join();
t2.join();
return 0;
}
关键点说明
停止请求的传递
通过stop_src.get_token()获取与stop_source关联的stop_token,传递给生产者和消费者线程,线程内部通过stop_requested()方法检查是否收到了停止请求,实现协作式退出。
避免阻塞问题
发出停止请求后,需要调用cv.notify_all()唤醒所有阻塞在条件变量上的线程,否则线程可能一直等待条件变量,无法检查到停止请求,导致无法退出。
收尾逻辑处理
生产者和消费者在检测到停止请求后,可以执行自定义的收尾逻辑,比如生产者输出剩余队列大小,消费者处理队列中剩余的数据,保证数据不丢失,资源正确释放。
停止回调的使用
std::stop_source还支持注册停止回调,当停止请求发出时,会自动执行注册的回调函数,适合需要在停止时执行特定操作的场景:
#include <iostream>
#include <stop_token>
#include <thread>
#include <chrono>
void worker(std::stop_token stoken) {
// 注册停止回调,停止请求发出时自动执行
std::stop_callback cb(stoken, []{
std::cout << "收到停止请求,执行清理操作" << std::endl;
});
while (!stoken.stop_requested()) {
std::cout << "工作中..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
std::cout << "工作线程退出" << std::endl;
}
int main() {
std::stop_source stop_src;
std::thread t(worker, stop_src.get_token());
std::this_thread::sleep_for(std::chrono::seconds(1));
stop_src.request_stop();
t.join();
return 0;
}
上述代码中,当stop_src发出停止请求时,注册的回调函数会自动执行,不需要在线程主逻辑中额外处理清理逻辑,让代码结构更清晰。
注意事项
- std::stop_source是C++20引入的特性,需要编译器支持C++20及以上标准,比如GCC 10+、Clang 11+、MSVC 2019+。
- 停止请求是协作式的,线程需要主动检查stop_token的状态,如果线程内部有长时间阻塞且不检查停止请求的逻辑,还是可能无法及时退出。
- 不要混合使用std::stop_source和传统停止标志,避免状态混乱,增加代码维护难度。
std::stop_sourceC++生产者消费者线程终止修改时间:2026-07-04 20:12:42