高并发场景下,大量任务同时涌入系统,如果直接让单个处理单元承接所有任务,很容易出现处理不及时、资源耗尽的问题。任务分流结合负载均衡算法可以将任务合理分配到多个处理单元,提升整体处理效率。本文通过C++实现一个基础的高并发任务分流与负载均衡示例,采用轮询算法完成任务分配。
核心实现思路
整个实现包含三个核心部分:任务队列、工作线程组、负载均衡调度器。任务队列用于存储待处理的任务,工作线程组负责实际执行任务,调度器负责按照轮询算法将任务分配到不同的工作线程对应的任务队列中,实现分流效果。
任务结构设计
首先定义基础的任务结构,包含任务ID和任务需要处理的模拟数据:
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
// 任务结构体
struct Task {
int task_id; // 任务ID
int data; // 任务携带的数据
Task(int id, int d) : task_id(id), data(d) {}
};
线程安全的任务队列
每个工作线程对应一个独立的任务队列,队列需要支持线程安全的入队和出队操作,避免多线程操作时的数据竞争问题:
// 线程安全的任务队列
class SafeTaskQueue {
private:
std::queue<Task> task_queue;
std::mutex mtx;
std::condition_variable cv;
public:
// 入队操作
void push(const Task& task) {
std::lock_guard<std::mutex> lock(mtx);
task_queue.push(task);
cv.notify_one(); // 通知等待的线程有新任务
}
// 出队操作,队列为空时阻塞等待
bool pop(Task& task) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this]() { return !task_queue.empty(); });
task = task_queue.front();
task_queue.pop();
return true;
}
// 判断队列是否为空
bool empty() {
std::lock_guard<std::mutex> lock(mtx);
return task_queue.empty();
}
};
工作线程实现
工作线程会循环从自己对应的任务队列中获取任务并执行,执行完成后输出处理结果:
// 工作线程函数
void worker_thread(int worker_id, SafeTaskQueue* task_queue, std::atomic<bool>* stop_flag) {
while (true) {
if (*stop_flag && task_queue->empty()) {
break; // 停止标志触发且队列无任务时退出线程
}
Task task(-1, -1);
if (task_queue->pop(task)) {
// 模拟任务处理,这里简单输出任务信息
std::cout << "工作线程" << worker_id << "处理任务,ID:" << task.task_id << ",数据:" << task.data << std::endl;
// 模拟处理耗时
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
std::cout << "工作线程" << worker_id << "退出" << std::endl;
}
轮询负载均衡调度器
调度器采用轮询算法,依次将任务分配到不同的工作线程队列中,实现简单的负载均衡效果:
// 负载均衡调度器
class LoadBalancer {
private:
std::vector<SafeTaskQueue*> worker_queues; // 所有工作线程的任务队列
std::atomic<int> current_index; // 轮询索引
int worker_num; // 工作线程数量
public:
LoadBalancer(int num) : worker_num(num), current_index(0) {
for (int i = 0; i < num; ++i) {
worker_queues.push_back(new SafeTaskQueue());
}
}
~LoadBalancer() {
for (auto queue : worker_queues) {
delete queue;
}
}
// 添加任务到调度器,按照轮询算法分配
void add_task(const Task& task) {
int index = current_index.fetch_add(1) % worker_num;
worker_queues[index]->push(task);
}
// 获取指定工作线程的任务队列
SafeTaskQueue* get_worker_queue(int index) {
return worker_queues[index];
}
int get_worker_num() {
return worker_num;
}
};
完整主函数示例
主函数负责初始化调度器、启动工作线程、提交任务、等待所有任务处理完成后退出:
int main() {
const int WORKER_NUM = 3; // 工作线程数量
const int TASK_NUM = 10; // 总任务数量
std::atomic<bool> stop_flag(false);
// 初始化负载均衡调度器
LoadBalancer balancer(WORKER_NUM);
// 启动工作线程
std::vector<std::thread> workers;
for (int i = 0; i < WORKER_NUM; ++i) {
workers.emplace_back(worker_thread, i, balancer.get_worker_queue(i), &stop_flag);
}
// 提交任务
for (int i = 0; i < TASK_NUM; ++i) {
Task task(i, i * 10);
balancer.add_task(task);
std::cout << "提交任务ID:" << i << std::endl;
}
// 等待所有任务处理完成
std::this_thread::sleep_for(std::chrono::seconds(2));
stop_flag = true;
// 通知所有工作线程退出
for (int i = 0; i < WORKER_NUM; ++i) {
balancer.get_worker_queue(i)->push(Task(-1, -1)); // 推送空任务唤醒阻塞的线程
}
// 等待所有工作线程退出
for (auto& t : workers) {
if (t.joinable()) {
t.join();
}
}
std::cout << "所有任务处理完成,程序退出" << std::endl;
return 0;
}
实现说明
上述代码实现了基础的高并发任务分流和轮询负载均衡逻辑,轮询算法保证了任务会依次分配到不同的工作线程,避免单个线程负载过高。实际场景中可以根据需求替换负载均衡算法,比如加权轮询、最小连接数等,也可以扩展任务队列的容量限制、任务优先级等功能。需要注意的是,示例中使用了简单的原子变量控制线程退出,实际生产环境可以根据需求优化线程生命周期管理逻辑。