在复杂的C++业务系统中,经常会遇到需要异步执行且具有不同优先级和延迟时间的任务场景,比如高优先级的实时告警任务需要优先处理,低优先级的日志上报任务可以延迟执行。设计一个合理的异步延迟任务优先级调度分发引擎,能够有效提升系统的资源利用率和响应速度。

核心设计思路
整个引擎的核心由四个部分组成:任务封装结构、优先级延迟队列、工作线程池、调度管理模块。任务封装结构需要包含任务的执行函数、延迟时间、优先级、唯一标识等信息;优先级延迟队列需要支持按优先级排序,同时能快速获取到期可执行的任务;工作线程池负责实际执行任务;调度管理模块负责协调队列和线程池,完成任务的入队、到期检测、分发执行全流程。
任务结构定义
首先定义任务的基础结构,包含任务的必要属性和执行入口:
#include <functional>
#include <chrono>
#include <string>
// 任务优先级枚举,数值越小优先级越高
enum class TaskPriority {
HIGH = 0,
NORMAL = 1,
LOW = 2
};
// 异步延迟任务封装结构
struct AsyncDelayTask {
std::string task_id; // 任务唯一ID
std::function<void()> execute_func; // 任务执行函数
TaskPriority priority; // 任务优先级
std::chrono::steady_clock::time_point trigger_time; // 任务触发时间
int64_t delay_ms; // 延迟时间,单位毫秒
// 构造函数,默认延迟0毫秒,优先级为普通
AsyncDelayTask(std::string id, std::function<void()> func,
TaskPriority pri = TaskPriority::NORMAL, int64_t delay = 0)
: task_id(std::move(id)), execute_func(std::move(func)), priority(pri), delay_ms(delay) {
// 计算触发时间,当前时间加上延迟时间
trigger_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(delay_ms);
}
};
// 优先级比较函数,用于队列排序:优先级高的在前,相同优先级触发时间早的在前
struct TaskCompare {
bool operator()(const AsyncDelayTask* a, const AsyncDelayTask* b) const {
if (a->priority != b->priority) {
return a->priority > b->priority;
}
return a->trigger_time > b->trigger_time;
}
};
优先级延迟队列实现
这里使用std::priority_queue结合自定义比较器实现优先级队列,同时需要处理延迟任务的到期检测:
#include <queue>
#include <mutex>
#include <condition_variable>
// 线程安全的优先级延迟任务队列
class PriorityDelayTaskQueue {
public:
PriorityDelayTaskQueue() = default;
// 入队任务
void push(AsyncDelayTask* task) {
std::unique_lock<std::mutex> lock(queue_mutex_);
task_queue_.push(task);
lock.unlock();
// 唤醒等待的调度线程
cv_.notify_one();
}
// 获取下一个到期的任务,如果没有到期任务则阻塞等待
AsyncDelayTask* pop() {
std::unique_lock<std::mutex> lock(queue_mutex_);
// 循环等待直到有可执行的任务
cv_.wait(lock, [this]() {
if (task_queue_.empty()) {
return false;
}
auto now = std::chrono::steady_clock::now();
return task_queue_.top()->trigger_time <= now;
});
AsyncDelayTask* task = task_queue_.top();
task_queue_.pop();
return task;
}
// 判断队列是否为空
bool empty() const {
std::unique_lock<std::mutex> lock(queue_mutex_);
return task_queue_.empty();
}
private:
std::priority_queue<AsyncDelayTask*, std::vector<AsyncDelayTask*>, TaskCompare> task_queue_;
mutable std::mutex queue_mutex_;
std::condition_variable cv_;
};
工作线程池实现
工作线程池负责执行从队列中取出的到期任务,支持动态调整线程数量:
#include <thread>
#include <vector>
#include <atomic>
class WorkerThreadPool {
public:
explicit WorkerThreadPool(size_t thread_num) : stop_flag_(false) {
// 创建指定数量的工作线程
for (size_t i = 0; i < thread_num; ++i) {
workers_.emplace_back([this]() {
// 线程循环,不断从任务队列取任务执行
while (true) {
AsyncDelayTask* task = task_queue_.pop();
if (stop_flag_.load()) {
delete task;
break;
}
if (task) {
try {
// 执行任务函数
task->execute_func();
} catch (...) {
// 捕获任务执行中的异常,避免线程退出
}
delete task;
}
}
});
}
}
// 提交任务到队列
void submit(AsyncDelayTask* task) {
task_queue_.push(task);
}
// 停止线程池,等待所有线程退出
~WorkerThreadPool() {
stop_flag_.store(true);
// 唤醒所有等待的线程
for (auto& worker : workers_) {
if (worker.joinable()) {
worker.join();
}
}
}
private:
std::vector<std::thread> workers_;
PriorityDelayTaskQueue task_queue_;
std::atomic<bool> stop_flag_;
};
调度分发引擎整合
最后将各个模块整合,形成完整的调度分发引擎:
#include <iostream>
#include <memory>
class AsyncDelayTaskScheduler {
public:
// 初始化调度引擎,指定工作线程数量
explicit AsyncDelayTaskScheduler(size_t worker_num = 4)
: thread_pool_(std::make_unique<WorkerThreadPool>(worker_num)) {}
// 提交异步延迟任务
void schedule_task(std::unique_ptr<AsyncDelayTask> task) {
// 将任务所有权转移到线程池
thread_pool_->submit(task.release());
}
// 提交普通任务(无延迟,默认普通优先级)
void schedule_task(const std::string& task_id, std::function<void()> func) {
auto task = std::make_unique<AsyncDelayTask>(task_id, std::move(func));
schedule_task(std::move(task));
}
// 提交带优先级和延迟的任务
void schedule_task(const std::string& task_id, std::function<void()> func,
TaskPriority priority, int64_t delay_ms) {
auto task = std::make_unique<AsyncDelayTask>(task_id, std::move(func), priority, delay_ms);
schedule_task(std::move(task));
}
private:
std::unique_ptr<WorkerThreadPool> thread_pool_;
};
// 测试示例
int main() {
// 创建调度引擎,开启4个工作线程
AsyncDelayTaskScheduler scheduler(4);
// 提交高优先级延迟任务,延迟500毫秒执行
scheduler.schedule_task("high_task_1", []() {
std::cout << "高优先级延迟任务执行,时间:"
<< std::chrono::system_clock::now().time_since_epoch().count() << std::endl;
}, TaskPriority::HIGH, 500);
// 提交普通优先级无延迟任务
scheduler.schedule_task("normal_task_1", []() {
std::cout << "普通优先级无延迟任务执行" << std::endl;
});
// 提交低优先级延迟任务,延迟1000毫秒执行
scheduler.schedule_task("low_task_1", []() {
std::cout << "低优先级延迟任务执行,时间:"
<< std::chrono::system_clock::now().time_since_epoch().count() << std::endl;
}, TaskPriority::LOW, 1000);
// 等待任务执行完成,实际使用中可以根据业务需求调整等待逻辑
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}
关键注意事项
- 任务内存管理:示例中任务通过
new分配,执行完成后在worker线程中delete,实际生产环境可以结合智能指针优化,避免内存泄漏。 - 线程安全:队列操作都加了互斥锁,条件变量用于线程间通信,避免忙等浪费CPU资源。
- 优先级调整:如果需要支持运行中修改任务优先级,需要额外设计任务更新逻辑,先移除旧任务再重新入队。
- 异常处理:任务执行函数需要做好异常捕获,避免单个任务异常导致整个工作线程退出。