如何用C++实现异步延迟任务的优先级调度分发引擎

来源:PHP编程网作者:北京SEO公司头衔:草根站长
导读:本期聚焦于小伙伴创作的《如何用C++实现异步延迟任务的优先级调度分发引擎》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《如何用C++实现异步延迟任务的优先级调度分发引擎》有用,将其分享出去将是对创作者最好的鼓励。

在复杂的C++业务系统中,经常会遇到需要异步执行且具有不同优先级和延迟时间的任务场景,比如高优先级的实时告警任务需要优先处理,低优先级的日志上报任务可以延迟执行。设计一个合理的异步延迟任务优先级调度分发引擎,能够有效提升系统的资源利用率和响应速度。

如何用C++实现异步延迟任务的优先级调度分发引擎

核心设计思路

整个引擎的核心由四个部分组成:任务封装结构、优先级延迟队列、工作线程池、调度管理模块。任务封装结构需要包含任务的执行函数、延迟时间、优先级、唯一标识等信息;优先级延迟队列需要支持按优先级排序,同时能快速获取到期可执行的任务;工作线程池负责实际执行任务;调度管理模块负责协调队列和线程池,完成任务的入队、到期检测、分发执行全流程。

任务结构定义

首先定义任务的基础结构,包含任务的必要属性和执行入口:

#include <functional>
#include <chrono>
#include <string>

// 任务优先级枚举,数值越小优先级越高
enum class TaskPriority {
    HIGH = 0,
    NORMAL = 1,
    LOW = 2
};

// 异步延迟任务封装结构
struct AsyncDelayTask {
    std::string task_id;               // 任务唯一ID
    std::function<void()> execute_func; // 任务执行函数
    TaskPriority priority;             // 任务优先级
    std::chrono::steady_clock::time_point trigger_time; // 任务触发时间
    int64_t delay_ms;                  // 延迟时间,单位毫秒

    // 构造函数,默认延迟0毫秒,优先级为普通
    AsyncDelayTask(std::string id, std::function<void()> func, 
                   TaskPriority pri = TaskPriority::NORMAL, int64_t delay = 0)
        : task_id(std::move(id)), execute_func(std::move(func)), priority(pri), delay_ms(delay) {
        // 计算触发时间,当前时间加上延迟时间
        trigger_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(delay_ms);
    }
};

// 优先级比较函数,用于队列排序:优先级高的在前,相同优先级触发时间早的在前
struct TaskCompare {
    bool operator()(const AsyncDelayTask* a, const AsyncDelayTask* b) const {
        if (a->priority != b->priority) {
            return a->priority > b->priority;
        }
        return a->trigger_time > b->trigger_time;
    }
};

优先级延迟队列实现

这里使用std::priority_queue结合自定义比较器实现优先级队列,同时需要处理延迟任务的到期检测:

#include <queue>
#include <mutex>
#include <condition_variable>

// 线程安全的优先级延迟任务队列
class PriorityDelayTaskQueue {
public:
    PriorityDelayTaskQueue() = default;

    // 入队任务
    void push(AsyncDelayTask* task) {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        task_queue_.push(task);
        lock.unlock();
        // 唤醒等待的调度线程
        cv_.notify_one();
    }

    // 获取下一个到期的任务,如果没有到期任务则阻塞等待
    AsyncDelayTask* pop() {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        // 循环等待直到有可执行的任务
        cv_.wait(lock, [this]() {
            if (task_queue_.empty()) {
                return false;
            }
            auto now = std::chrono::steady_clock::now();
            return task_queue_.top()->trigger_time <= now;
        });

        AsyncDelayTask* task = task_queue_.top();
        task_queue_.pop();
        return task;
    }

    // 判断队列是否为空
    bool empty() const {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        return task_queue_.empty();
    }

private:
    std::priority_queue<AsyncDelayTask*, std::vector<AsyncDelayTask*>, TaskCompare> task_queue_;
    mutable std::mutex queue_mutex_;
    std::condition_variable cv_;
};

工作线程池实现

工作线程池负责执行从队列中取出的到期任务,支持动态调整线程数量:

#include <thread>
#include <vector>
#include <atomic>

class WorkerThreadPool {
public:
    explicit WorkerThreadPool(size_t thread_num) : stop_flag_(false) {
        // 创建指定数量的工作线程
        for (size_t i = 0; i < thread_num; ++i) {
            workers_.emplace_back([this]() {
                // 线程循环,不断从任务队列取任务执行
                while (true) {
                    AsyncDelayTask* task = task_queue_.pop();
                    if (stop_flag_.load()) {
                        delete task;
                        break;
                    }
                    if (task) {
                        try {
                            // 执行任务函数
                            task->execute_func();
                        } catch (...) {
                            // 捕获任务执行中的异常,避免线程退出
                        }
                        delete task;
                    }
                }
            });
        }
    }

    // 提交任务到队列
    void submit(AsyncDelayTask* task) {
        task_queue_.push(task);
    }

    // 停止线程池,等待所有线程退出
    ~WorkerThreadPool() {
        stop_flag_.store(true);
        // 唤醒所有等待的线程
        for (auto& worker : workers_) {
            if (worker.joinable()) {
                worker.join();
            }
        }
    }

private:
    std::vector<std::thread> workers_;
    PriorityDelayTaskQueue task_queue_;
    std::atomic<bool> stop_flag_;
};

调度分发引擎整合

最后将各个模块整合,形成完整的调度分发引擎:

#include <iostream>
#include <memory>

class AsyncDelayTaskScheduler {
public:
    // 初始化调度引擎,指定工作线程数量
    explicit AsyncDelayTaskScheduler(size_t worker_num = 4) 
        : thread_pool_(std::make_unique<WorkerThreadPool>(worker_num)) {}

    // 提交异步延迟任务
    void schedule_task(std::unique_ptr<AsyncDelayTask> task) {
        // 将任务所有权转移到线程池
        thread_pool_->submit(task.release());
    }

    // 提交普通任务(无延迟,默认普通优先级)
    void schedule_task(const std::string& task_id, std::function<void()> func) {
        auto task = std::make_unique<AsyncDelayTask>(task_id, std::move(func));
        schedule_task(std::move(task));
    }

    // 提交带优先级和延迟的任务
    void schedule_task(const std::string& task_id, std::function<void()> func, 
                      TaskPriority priority, int64_t delay_ms) {
        auto task = std::make_unique<AsyncDelayTask>(task_id, std::move(func), priority, delay_ms);
        schedule_task(std::move(task));
    }

private:
    std::unique_ptr<WorkerThreadPool> thread_pool_;
};

// 测试示例
int main() {
    // 创建调度引擎,开启4个工作线程
    AsyncDelayTaskScheduler scheduler(4);

    // 提交高优先级延迟任务,延迟500毫秒执行
    scheduler.schedule_task("high_task_1", []() {
        std::cout << "高优先级延迟任务执行,时间:" 
                  << std::chrono::system_clock::now().time_since_epoch().count() << std::endl;
    }, TaskPriority::HIGH, 500);

    // 提交普通优先级无延迟任务
    scheduler.schedule_task("normal_task_1", []() {
        std::cout << "普通优先级无延迟任务执行" << std::endl;
    });

    // 提交低优先级延迟任务,延迟1000毫秒执行
    scheduler.schedule_task("low_task_1", []() {
        std::cout << "低优先级延迟任务执行,时间:" 
                  << std::chrono::system_clock::now().time_since_epoch().count() << std::endl;
    }, TaskPriority::LOW, 1000);

    // 等待任务执行完成,实际使用中可以根据业务需求调整等待逻辑
    std::this_thread::sleep_for(std::chrono::seconds(2));
    return 0;
}

关键注意事项

  • 任务内存管理:示例中任务通过new分配,执行完成后在worker线程中delete,实际生产环境可以结合智能指针优化,避免内存泄漏。
  • 线程安全:队列操作都加了互斥锁,条件变量用于线程间通信,避免忙等浪费CPU资源。
  • 优先级调整:如果需要支持运行中修改任务优先级,需要额外设计任务更新逻辑,先移除旧任务再重新入队。
  • 异常处理:任务执行函数需要做好异常捕获,避免单个任务异常导致整个工作线程退出。

C++异步延迟任务优先级调度任务分发引擎修改时间:2026-06-25 10:54:29

免责声明:​ 已尽一切努力确保本网站所含信息的准确性。网站内容多为原创整理与精心编撰,观点力求客观中立。本站旨在免费分享,内容仅供个人学习、研究或参考使用。若引用了第三方作品,版权归原作者所有。如内容涉及您的权益,请联系我们处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。AI、前端、编程、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握开发与运维所需的核心技术。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端编程,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。