导读:本期聚焦于小伙伴创作的《C++如何利用std::stop_source优雅终止复杂的生产者消费者任务协作》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《C++如何利用std::stop_source优雅终止复杂的生产者消费者任务协作》有用,将其分享出去将是对创作者最好的鼓励。

在C++20之前,终止多线程协作的生产者消费者任务时,开发者通常会使用共享的布尔标志位或者强制中断线程的方式,前者容易出现内存可见性问题,后者可能导致资源未释放、数据损坏等严重问题。std::stop_source配合std::stop_token提供了协作式的停止机制,让线程可以主动检查停止请求,在合适的时机安全退出,非常适合复杂的生产者消费者协作场景。

C++如何利用std::stop_source优雅终止复杂的生产者消费者任务协作

std::stop_source基础概念

std::stop_source是停止状态的所有者,它可以发出停止请求,与之关联的std::stop_token可以被传递给其他线程,用于检查是否收到了停止请求。核心特性如下:

  • 一个std::stop_source可以关联多个std::stop_token,所有关联的token都能感知到停止请求
  • 停止请求发出后,无法撤销,所有关联的token的stop_requested()都会返回true
  • 支持注册停止回调,当停止请求发出时,会自动执行注册的回调函数

传统终止方式的问题

先看一个使用共享布尔标志位终止生产者消费者任务的示例,这种方式存在明显缺陷:

#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>

std::queue<int> data_queue;
std::mutex mtx;
std::condition_variable cv;
// 共享的停止标志,使用atomic保证可见性
std::atomic<bool> stop_flag(false);

void producer() {
    int count = 0;
    while (!stop_flag.load()) {
        std::unique_lock<std::mutex> lock(mtx);
        data_queue.push(count++);
        cv.notify_one();
        lock.unlock();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    // 终止时可能还有未处理的数据
    std::cout << "生产者退出" << std::endl;
}

void consumer() {
    while (!stop_flag.load()) {
        std::unique_lock<std::mutex> lock(mtx);
        if (cv.wait_for(lock, std::chrono::milliseconds(50), []{ return !data_queue.empty() || stop_flag.load(); })) {
            if (stop_flag.load()) break;
            int val = data_queue.front();
            data_queue.pop();
            lock.unlock();
            std::cout << "消费数据:" << val << std::endl;
        }
    }
    std::cout << "消费者退出" << std::endl;
}

int main() {
    std::thread t1(producer);
    std::thread t2(consumer);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    // 发出停止请求
    stop_flag.store(true);
    cv.notify_all();
    t1.join();
    t2.join();
    return 0;
}

上述代码虽然使用了atomic的布尔标志位,但是终止时如果队列中还有未处理的数据,生产者可能来不及把剩余数据全部推入队列就退出,消费者也可能漏处理部分数据,而且停止标志和队列、条件变量的状态没有联动,复杂场景下很容易出现逻辑漏洞。

使用std::stop_source实现优雅终止

下面用std::stop_source改造上述生产者消费者场景,实现安全的协作式终止:

#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <stop_token>
#include <chrono>

std::queue<int> data_queue;
std::mutex mtx;
std::condition_variable cv;

// 生产者函数,接收stop_token用于检查停止请求
void producer(std::stop_token stoken) {
    int count = 0;
    while (!stoken.stop_requested()) {
        std::unique_lock<std::mutex> lock(mtx);
        // 检查停止请求,避免推送多余数据
        if (stoken.stop_requested()) break;
        data_queue.push(count++);
        cv.notify_one();
        lock.unlock();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    // 终止前可以处理收尾逻辑,比如输出剩余队列大小
    std::unique_lock<std::mutex> lock(mtx);
    std::cout << "生产者退出,剩余队列大小:" << data_queue.size() << std::endl;
}

// 消费者函数,接收stop_token用于检查停止请求
void consumer(std::stop_token stoken) {
    while (!stoken.stop_requested()) {
        std::unique_lock<std::mutex> lock(mtx);
        // 等待队列非空或者停止请求
        if (cv.wait_for(lock, std::chrono::milliseconds(50), [&stoken]{ 
            return !data_queue.empty() || stoken.stop_requested(); 
        })) {
            if (stoken.stop_requested()) break;
            int val = data_queue.front();
            data_queue.pop();
            lock.unlock();
            std::cout << "消费数据:" << val << std::endl;
        }
    }
    // 终止前处理剩余数据
    std::unique_lock<std::mutex> lock(mtx);
    while (!data_queue.empty()) {
        int val = data_queue.front();
        data_queue.pop();
        std::cout << "终止前处理剩余数据:" << val << std::endl;
    }
    std::cout << "消费者退出" << std::endl;
}

int main() {
    // 创建stop_source,用于发出停止请求
    std::stop_source stop_src;
    // 启动线程时传入stop_source的stop_token
    std::thread t1(producer, stop_src.get_token());
    std::thread t2(consumer, stop_src.get_token());
    // 运行1秒后发出停止请求
    std::this_thread::sleep_for(std::chrono::seconds(1));
    stop_src.request_stop();
    // 唤醒所有等待的条件变量,避免线程一直阻塞
    cv.notify_all();
    t1.join();
    t2.join();
    return 0;
}

关键点说明

停止请求的传递

通过stop_src.get_token()获取与stop_source关联的stop_token,传递给生产者和消费者线程,线程内部通过stop_requested()方法检查是否收到了停止请求,实现协作式退出。

避免阻塞问题

发出停止请求后,需要调用cv.notify_all()唤醒所有阻塞在条件变量上的线程,否则线程可能一直等待条件变量,无法检查到停止请求,导致无法退出。

收尾逻辑处理

生产者和消费者在检测到停止请求后,可以执行自定义的收尾逻辑,比如生产者输出剩余队列大小,消费者处理队列中剩余的数据,保证数据不丢失,资源正确释放。

停止回调的使用

std::stop_source还支持注册停止回调,当停止请求发出时,会自动执行注册的回调函数,适合需要在停止时执行特定操作的场景:

#include <iostream>
#include <stop_token>
#include <thread>
#include <chrono>

void worker(std::stop_token stoken) {
    // 注册停止回调,停止请求发出时自动执行
    std::stop_callback cb(stoken, []{ 
        std::cout << "收到停止请求,执行清理操作" << std::endl; 
    });
    while (!stoken.stop_requested()) {
        std::cout << "工作中..." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }
    std::cout << "工作线程退出" << std::endl;
}

int main() {
    std::stop_source stop_src;
    std::thread t(worker, stop_src.get_token());
    std::this_thread::sleep_for(std::chrono::seconds(1));
    stop_src.request_stop();
    t.join();
    return 0;
}

上述代码中,当stop_src发出停止请求时,注册的回调函数会自动执行,不需要在线程主逻辑中额外处理清理逻辑,让代码结构更清晰。

注意事项

  • std::stop_source是C++20引入的特性,需要编译器支持C++20及以上标准,比如GCC 10+、Clang 11+、MSVC 2019+。
  • 停止请求是协作式的,线程需要主动检查stop_token的状态,如果线程内部有长时间阻塞且不检查停止请求的逻辑,还是可能无法及时退出。
  • 不要混合使用std::stop_source和传统停止标志,避免状态混乱,增加代码维护难度。

std::stop_sourceC++生产者消费者线程终止修改时间:2026-07-04 20:12:42

免责声明:​ 已尽一切努力确保本网站所含信息的准确性。网站内容多为原创整理与精心编撰,观点力求客观中立。本站旨在免费分享,内容仅供个人学习、研究或参考使用。若引用了第三方作品,版权归原作者所有。如内容涉及您的权益,请联系我们处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。AI、前端、编程、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握开发与运维所需的核心技术。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端编程,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。