在C++框架开发中,并发与多线程处理是优化程序执行效率、适配高并发场景的核心能力,合理的多线程设计可以充分释放多核CPU的性能潜力,减少任务等待时间。很多框架在设计初期就需要考虑线程模型的选型,避免后续出现资源竞争、死锁等难以排查的问题。

C++并发多线程核心基础工具
在展开案例之前,先明确C++标准库中提供的核心并发工具,这些是后续案例实现的基础:
std::thread:用于创建和管理线程,支持传入可调用对象作为线程执行体std::mutex:互斥锁,用于保护共享资源,同一时间只允许一个线程持有锁std::condition_variable:条件变量,用于线程间的同步通知,配合互斥锁使用std::atomic:原子类型,保证操作的原子性,避免多线程下的数据竞争std::future/std::promise:用于线程间的结果传递,获取异步任务的返回值
案例一:通用线程池框架实现
线程池是C++框架中最常见的并发组件,通过预先创建一定数量的线程,复用线程资源处理提交的任务,避免频繁创建销毁线程的开销。以下是一个简单的通用线程池实现案例:
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>
class ThreadPool {
private:
std::vector<std::thread> workers; // 工作线程集合
std::queue<std::function<void()>> tasks; // 任务队列
std::mutex queue_mutex; // 保护任务队列的互斥锁
std::condition_variable condition; // 条件变量,用于唤醒工作线程
std::atomic<bool> stop; // 线程池停止标志
public:
// 构造函数,初始化指定数量的工作线程
ThreadPool(size_t thread_num) : stop(false) {
for (size_t i = 0; i < thread_num; ++i) {
workers.emplace_back([this]() {
// 线程执行体,循环获取任务执行
while (true) {
std::function<void()> task;
{
// 加锁获取任务
std::unique_lock<std::mutex> lock(this->queue_mutex);
// 等待条件:线程池停止 或者 任务队列不为空
this->condition.wait(lock, [this]() {
return this->stop.load() || !this->tasks.empty();
});
// 如果线程池停止且任务队列为空,退出线程
if (this->stop.load() && this->tasks.empty()) {
return;
}
// 取出队首任务
task = std::move(this->tasks.front());
this->tasks.pop();
}
// 执行任务
task();
}
});
}
}
// 提交任务到线程池
template<class F, class... Args>
auto submit(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
// 将任务和参数绑定,封装为packaged_task
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// 如果线程池已停止,不允许提交任务
if (stop.load()) {
throw std::runtime_error("submit task to stopped thread pool");
}
// 将任务加入队列
tasks.emplace([task]() { (*task)(); });
}
// 唤醒一个等待的工作线程
condition.notify_one();
return res;
}
// 析构函数,停止线程池并等待所有线程退出
~ThreadPool() {
stop.store(true);
condition.notify_all(); // 唤醒所有工作线程
for (std::thread& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
}
};
// 测试示例
int main() {
ThreadPool pool(4); // 创建4个工作线程的线程池
// 提交10个任务
for (int i = 0; i < 10; ++i) {
auto result = pool.submit([i]() {
std::cout << "Task " << i << " is running on thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return i * i;
});
// 获取任务返回值
std::cout << "Task " << i << " result: " << result.get() << std::endl;
}
return 0;
}
这个线程池框架支持提交任意参数的任务,并且可以获取任务的返回值,适用于大多数需要异步处理任务的C++框架场景,比如网络框架的请求处理、计算框架的任务分发等。
案例二:线程安全阻塞队列实现
在生产者-消费者模型的框架场景中,线程安全的阻塞队列是核心组件,用于解耦生产者和消费者线程,平衡两者的处理速度差异。以下是基于条件变量实现的线程安全阻塞队列:
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
template<typename T>
class BlockingQueue {
private:
std::queue<T> queue; // 底层队列
mutable std::mutex mtx; // 互斥锁
std::condition_variable not_empty; // 非空条件变量,消费者等待该条件
std::condition_variable not_full; // 非满条件变量,生产者等待该条件
size_t max_size; // 队列最大容量
public:
explicit BlockingQueue(size_t max_size = 1024) : max_size(max_size) {}
// 向队列中放入元素,队列满时阻塞
void push(const T& val) {
std::unique_lock<std::mutex> lock(mtx);
// 等待队列非满
not_full.wait(lock, [this]() { return queue.size() < max_size; });
queue.push(val);
// 唤醒一个等待非空条件的消费者线程
not_empty.notify_one();
}
// 从队列中取出元素,队列空时阻塞
std::shared_ptr<T> pop() {
std::unique_lock<std::mutex> lock(mtx);
// 等待队列非空
not_empty.wait(lock, [this]() { return !queue.empty(); });
std::shared_ptr<T> val = std::make_shared<T>(std::move(queue.front()));
queue.pop();
// 唤醒一个等待非满条件的生产者线程
not_full.notify_one();
return val;
}
// 获取队列当前大小
size_t size() const {
std::unique_lock<std::mutex> lock(mtx);
return queue.size();
}
// 判断队列是否为空
bool empty() const {
std::unique_lock<std::mutex> lock(mtx);
return queue.empty();
}
};
这个阻塞队列可以作为框架中不同模块之间的数据传递通道,比如网络框架中,IO线程负责接收数据放入队列,工作线程从队列中取出数据进行处理,两者通过队列解耦,不需要直接交互。
案例三:基于原子操作的轻量计数器框架
在一些需要统计框架运行指标的场景中,比如请求数量、任务处理数量,需要高并发下的计数器,使用互斥锁开销较大,此时可以使用std::atomic实现无锁的轻量计数器:
#include <iostream>
#include <atomic>
#include <vector>
#include <thread>
class AtomicCounter {
private:
std::atomic<int> count; // 原子计数器
public:
AtomicCounter() : count(0) {}
// 增加计数,线程安全
void increment() {
count.fetch_add(1, std::memory_order_relaxed);
}
// 减少计数,线程安全
void decrement() {
count.fetch_sub(1, std::memory_order_relaxed);
}
// 获取当前计数
int get() const {
return count.load(std::memory_order_relaxed);
}
};
// 测试示例:10个线程每个线程增加1000次计数
int main() {
AtomicCounter counter;
std::vector<std::thread> threads;
for (int i = 0; i < 10; ++i) {
threads.emplace_back([&counter]() {
for (int j = 0; j < 1000; ++j) {
counter.increment();
}
});
}
for (auto& t : threads) {
t.join();
}
std::cout << "Final count: " << counter.get() << std::endl; // 输出10000
return 0;
}
这种基于原子操作的计数器没有锁的开销,性能远高于使用互斥锁的计数器,适合高频更新的统计场景,比如框架的QPS统计、在线连接数统计等。
并发多线程编程的注意事项
在C++框架中实现并发多线程处理时,需要注意以下几点:
- 避免死锁:获取多个锁时,保证所有线程获取锁的顺序一致,或者使用
std::lock同时获取多个锁 - 尽量减少锁的持有时间:锁的保护范围越小越好,只保护共享资源的操作,不要将无关逻辑放在锁内部
- 优先使用标准库工具:尽量使用
std::thread、std::mutex等标准库组件,避免直接使用平台相关的线程API,提升代码可移植性 - 注意内存序:使用原子操作时,根据场景选择合适的内存序,不需要强一致性的场景可以使用
std::memory_order_relaxed提升性能
以上三个案例覆盖了C++框架中大部分并发多线程的使用场景,开发者可以根据实际框架的需求选择合适的方案,或者基于这些案例进行扩展,适配更复杂的业务场景。
C++concurrent_programmingmulti_threadingthread_pool修改时间:2026-06-14 19:18:34