导读:本期聚焦于小伙伴创作的《C++如何实现高性能时间轮任务调度解决高并发定时器瓶颈》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《C++如何实现高性能时间轮任务调度解决高并发定时器瓶颈》有用,将其分享出去将是对创作者最好的鼓励。

在高并发服务开发中,定时任务调度是非常常见的需求,比如连接超时检测、周期性任务执行、延迟消息处理等。传统的定时器实现通常基于最小堆或者红黑树,添加和删除任务的时间复杂度为O(log n),当定时任务数量达到十万甚至百万级别时,性能瓶颈会非常明显。时间轮调度算法通过分层的槽位设计,能够将任务添加、触发的时间复杂度降低到接近O(1),非常适合高并发场景下的定时器需求。

C++如何实现高性能时间轮任务调度解决高并发定时器瓶颈

时间轮核心设计思路

时间轮的核心结构和时钟类似,由一个环形数组构成,每个数组元素称为一个时间槽,每个槽位存放当前时刻需要执行的定时任务。时间轮有一个指针,按照固定的时间间隔(比如1毫秒)向前移动,指针指向的槽位中的所有任务都会被触发执行。

单层时间轮的局限性

单层时间轮的槽位数量是固定的,比如设置1024个槽位,每个槽位代表1毫秒,那么时间轮只能覆盖1024毫秒的时间范围,超过这个时间范围的定时任务无法被直接存放。为了解决这个问题,通常采用多层时间轮的设计,类似时钟的时、分、秒结构,上层时间轮的一个槽位对应下层时间轮的整个轮次。

多层时间轮的工作逻辑

  • 最底层的时间轮(秒轮)每个槽位代表最小时间单位,比如1毫秒
  • 上层时间轮(分轮)每个槽位代表底层时间轮的完整一轮时间,比如1秒
  • 当底层时间轮的指针转完一轮后,会从上层时间轮取出对应槽位的任务,重新分配到下层时间轮中
  • 定时任务添加时,根据任务的触发时间和当前时间的差值,计算应该存放到哪一层哪个槽位

C++时间轮实现核心代码

定时任务结构体定义

首先需要定义定时任务的基本结构,包含任务的唯一标识、触发时间、回调函数以及任务的轮次信息。

#include <iostream>
#include <vector>
#include <functional>
#include <unordered_map>
#include <chrono>
#include <thread>
#include <mutex>
#include <atomic>

// 定时任务结构体
struct TimerTask {
    uint64_t task_id;               // 任务唯一ID
    uint64_t trigger_time;          // 任务触发时间,单位毫秒
    std::function<void()> callback; // 任务回调函数
    int32_t round;                  // 任务所在轮次,用于多层时间轮
    int32_t slot;                   // 任务所在槽位索引

    TimerTask(uint64_t id, uint64_t time, std::function<void()> cb)
        : task_id(id), trigger_time(time), callback(cb), round(0), slot(0) {}
};

单层时间轮实现

先实现基础的单层时间轮,包含时间轮的初始化、任务添加、任务触发等核心方法。

class SingleLevelTimeWheel {
private:
    int32_t slot_count;                     // 时间槽数量
    int32_t current_slot;                   // 当前指针指向的槽位
    uint64_t tick_interval;                 // 每个槽位的时间间隔,单位毫秒
    uint64_t start_time;                    // 时间轮启动时间
    std::vector<std::vector<TimerTask>> slots; // 所有槽位的任务列表
    std::unordered_map<uint64_t, TimerTask> task_map; // 任务ID到任务的映射,用于删除任务
    std::mutex mutex;                       // 互斥锁,保证线程安全
    std::atomic<bool> running;              // 时间轮是否运行中
    std::thread tick_thread;                // 时间轮转动线程

public:
    // 构造函数,初始化槽位数量和时间间隔
    SingleLevelTimeWheel(int32_t slot_cnt, uint64_t interval)
        : slot_count(slot_cnt), current_slot(0), tick_interval(interval), running(false) {
        slots.resize(slot_count);
        // 获取当前时间戳,单位毫秒
        start_time = std::chrono::duration_cast<std::chrono::milliseconds>(
            std::chrono::system_clock::now().time_since_epoch()
        ).count();
    }

    // 启动时间轮
    void start() {
        running = true;
        tick_thread = std::thread(&SingleLevelTimeWheel::tick, this);
    }

    // 停止时间轮
    void stop() {
        running = false;
        if (tick_thread.joinable()) {
            tick_thread.join();
        }
    }

    // 添加定时任务,delay为延迟时间,单位毫秒
    uint64_t add_timer(uint64_t delay, std::function<void()> callback) {
        std::lock_guard<std::mutex> lock(mutex);
        uint64_t now = get_current_time();
        uint64_t trigger_time = now + delay;
        static uint64_t task_id_counter = 0;
        uint64_t task_id = ++task_id_counter;

        TimerTask task(task_id, trigger_time, callback);
        // 计算任务应该存放的槽位
        uint64_t diff = trigger_time - start_time;
        uint64_t slot_index = (diff / tick_interval) % slot_count;
        task.slot = slot_index;
        slots[slot_index].push_back(task);
        task_map[task_id] = task;
        return task_id;
    }

    // 删除定时任务
    void remove_timer(uint64_t task_id) {
        std::lock_guard<std::mutex> lock(mutex);
        auto it = task_map.find(task_id);
        if (it != task_map.end()) {
            int32_t slot = it->second.slot;
            // 遍历对应槽位的任务,找到并删除
            auto& slot_tasks = slots[slot];
            for (auto iter = slot_tasks.begin(); iter != slot_tasks.end(); ++iter) {
                if (iter->task_id == task_id) {
                    slot_tasks.erase(iter);
                    break;
                }
            }
            task_map.erase(it);
        }
    }

private:
    // 获取当前时间戳,单位毫秒
    uint64_t get_current_time() {
        return std::chrono::duration_cast<std::chrono::milliseconds>(
            std::chrono::system_clock::now().time_since_epoch()
        ).count();
    }

    // 时间轮转动逻辑
    void tick() {
        while (running) {
            std::this_thread::sleep_for(std::chrono::milliseconds(tick_interval));
            std::lock_guard<std::mutex> lock(mutex);
            // 触发当前槽位的所有任务
            auto& current_tasks = slots[current_slot];
            for (auto& task : current_tasks) {
                if (task.callback) {
                    task.callback();
                }
            }
            current_tasks.clear();
            // 指针移动到下一个槽位
            current_slot = (current_slot + 1) % slot_count;
        }
    }
};

多层时间轮实现

多层时间轮需要扩展单层时间轮的逻辑,支持任务的层级迁移,这里实现一个两层时间轮作为示例,包含毫秒轮和秒轮。

class MultiLevelTimeWheel {
private:
    // 毫秒轮,每个槽位1毫秒,共1024个槽位,覆盖1秒时间
    int32_t ms_slot_count = 1024;
    int32_t ms_current_slot = 0;
    uint64_t ms_tick_interval = 1; // 1毫秒

    // 秒轮,每个槽位1秒,共60个槽位,覆盖60秒时间
    int32_t sec_slot_count = 60;
    int32_t sec_current_slot = 0;
    uint64_t sec_tick_interval = 1000; // 1000毫秒

    std::vector<std::vector<TimerTask>> ms_slots;
    std::vector<std::vector<TimerTask>> sec_slots;
    std::unordered_map<uint64_t, TimerTask> task_map;
    std::mutex mutex;
    std::atomic<bool> running;
    std::thread tick_thread;
    uint64_t start_time;

public:
    MultiLevelTimeWheel() : running(false) {
        ms_slots.resize(ms_slot_count);
        sec_slots.resize(sec_slot_count);
        start_time = std::chrono::duration_cast<std::chrono::milliseconds>(
            std::chrono::system_clock::now().time_since_epoch()
        ).count();
    }

    void start() {
        running = true;
        tick_thread = std::thread(&MultiLevelTimeWheel::tick, this);
    }

    void stop() {
        running = false;
        if (tick_thread.joinable()) {
            tick_thread.join();
        }
    }

    uint64_t add_timer(uint64_t delay, std::function<void()> callback) {
        std::lock_guard<std::mutex> lock(mutex);
        uint64_t now = get_current_time();
        uint64_t trigger_time = now + delay;
        static uint64_t task_id_counter = 0;
        uint64_t task_id = ++task_id_counter;

        TimerTask task(task_id, trigger_time, callback);
        uint64_t diff = trigger_time - start_time;

        // 如果延迟时间在1秒内,放到毫秒轮
        if (diff < ms_slot_count) {
            uint64_t slot_index = diff % ms_slot_count;
            task.slot = slot_index;
            task.round = 0;
            ms_slots[slot_index].push_back(task);
        } else {
            // 否则放到秒轮
            uint64_t sec_diff = diff / 1000;
            uint64_t slot_index = sec_diff % sec_slot_count;
            task.slot = slot_index;
            task.round = sec_diff / sec_slot_count;
            sec_slots[slot_index].push_back(task);
        }
        task_map[task_id] = task;
        return task_id;
    }

private:
    uint64_t get_current_time() {
        return std::chrono::duration_cast<std::chrono::milliseconds>(
            std::chrono::system_clock::now().time_since_epoch()
        ).count();
    }

    void tick() {
        while (running) {
            std::this_thread::sleep_for(std::chrono::milliseconds(ms_tick_interval));
            std::lock_guard<std::mutex> lock(mutex);
            // 处理毫秒轮当前槽位的任务
            auto& ms_current_tasks = ms_slots[ms_current_slot];
            for (auto& task : ms_current_tasks) {
                if (task.callback) {
                    task.callback();
                }
            }
            ms_current_tasks.clear();

            // 毫秒轮指针移动
            ms_current_slot = (ms_current_slot + 1) % ms_slot_count;

            // 如果毫秒轮转完一轮,处理秒轮
            if (ms_current_slot == 0) {
                // 触发秒轮当前槽位的任务,将任务迁移到毫秒轮
                auto& sec_current_tasks = sec_slots[sec_current_slot];
                std::vector<TimerTask> new_tasks;
                for (auto& task : sec_current_tasks) {
                    if (task.round > 0) {
                        // 轮次减1,重新放回秒轮
                        task.round--;
                        sec_slots[sec_current_slot].push_back(task);
                    } else {
                        // 轮次为0,迁移到毫秒轮
                        uint64_t diff = task.trigger_time - get_current_time();
                        if (diff < ms_slot_count) {
                            uint64_t ms_slot = diff % ms_slot_count;
                            task.slot = ms_slot;
                            ms_slots[ms_slot].push_back(task);
                        }
                    }
                }
                sec_current_tasks.clear();
                sec_current_slot = (sec_current_slot + 1) % sec_slot_count;
            }
        }
    }
};

时间轮性能对比与适用场景

和传统定时器实现方式相比,时间轮的优势非常明显:

定时器实现方式添加任务复杂度删除任务复杂度触发任务复杂度适用场景
最小堆O(log n)O(log n)O(1)任务数量较少的场景
红黑树O(log n)O(log n)O(log n)任务数量中等的场景
时间轮O(1)O(1)O(1)高并发、大量定时任务的场景

时间轮非常适合以下场景:

  • 网络服务中的连接超时管理,比如TCP连接空闲超时、请求超时检测
  • 消息队列中的延迟消息投递,比如订单超时关闭、延迟通知
  • 周期性任务的调度,比如定时统计数据、定时清理缓存

使用注意事项

在实际使用时间轮时需要注意几个问题:

  • 时间槽的数量需要根据业务场景设置,槽位过少会导致任务冲突过多,槽位过多会浪费内存
  • 定时任务的回调函数应该尽量简短,避免阻塞时间轮的转动线程,如果需要执行耗时操作,可以将任务抛到独立的线程池中执行
  • 多线程操作时间轮时需要做好线程安全保护,上面的实现中使用了互斥锁保证操作的安全性
  • 如果需要的定时时间范围超过多层时间轮的覆盖范围,可以继续扩展更多层的时间轮,比如增加分轮、时轮等

通过上述C++实现的时间轮调度模块,可以轻松支撑百万级别的定时任务调度,有效解决高并发场景下的定时器性能瓶颈问题。开发者可以根据自身的业务需求,对时间轮的层级、槽位数量进行调整,也可以扩展任务持久化、任务取消回调等功能。

C++时间轮任务调度高并发定时器修改时间:2026-06-28 21:54:30

免责声明:​ 已尽一切努力确保本网站所含信息的准确性。网站内容多为原创整理与精心编撰,观点力求客观中立。本站旨在免费分享,内容仅供个人学习、研究或参考使用。若引用了第三方作品,版权归原作者所有。如内容涉及您的权益,请联系我们处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。AI、前端、编程、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握开发与运维所需的核心技术。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端编程,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。