在高并发服务开发中,定时任务调度是非常常见的需求,比如连接超时检测、周期性任务执行、延迟消息处理等。传统的定时器实现通常基于最小堆或者红黑树,添加和删除任务的时间复杂度为O(log n),当定时任务数量达到十万甚至百万级别时,性能瓶颈会非常明显。时间轮调度算法通过分层的槽位设计,能够将任务添加、触发的时间复杂度降低到接近O(1),非常适合高并发场景下的定时器需求。

时间轮核心设计思路
时间轮的核心结构和时钟类似,由一个环形数组构成,每个数组元素称为一个时间槽,每个槽位存放当前时刻需要执行的定时任务。时间轮有一个指针,按照固定的时间间隔(比如1毫秒)向前移动,指针指向的槽位中的所有任务都会被触发执行。
单层时间轮的局限性
单层时间轮的槽位数量是固定的,比如设置1024个槽位,每个槽位代表1毫秒,那么时间轮只能覆盖1024毫秒的时间范围,超过这个时间范围的定时任务无法被直接存放。为了解决这个问题,通常采用多层时间轮的设计,类似时钟的时、分、秒结构,上层时间轮的一个槽位对应下层时间轮的整个轮次。
多层时间轮的工作逻辑
- 最底层的时间轮(秒轮)每个槽位代表最小时间单位,比如1毫秒
- 上层时间轮(分轮)每个槽位代表底层时间轮的完整一轮时间,比如1秒
- 当底层时间轮的指针转完一轮后,会从上层时间轮取出对应槽位的任务,重新分配到下层时间轮中
- 定时任务添加时,根据任务的触发时间和当前时间的差值,计算应该存放到哪一层哪个槽位
C++时间轮实现核心代码
定时任务结构体定义
首先需要定义定时任务的基本结构,包含任务的唯一标识、触发时间、回调函数以及任务的轮次信息。
#include <iostream>
#include <vector>
#include <functional>
#include <unordered_map>
#include <chrono>
#include <thread>
#include <mutex>
#include <atomic>
// 定时任务结构体
struct TimerTask {
uint64_t task_id; // 任务唯一ID
uint64_t trigger_time; // 任务触发时间,单位毫秒
std::function<void()> callback; // 任务回调函数
int32_t round; // 任务所在轮次,用于多层时间轮
int32_t slot; // 任务所在槽位索引
TimerTask(uint64_t id, uint64_t time, std::function<void()> cb)
: task_id(id), trigger_time(time), callback(cb), round(0), slot(0) {}
};
单层时间轮实现
先实现基础的单层时间轮,包含时间轮的初始化、任务添加、任务触发等核心方法。
class SingleLevelTimeWheel {
private:
int32_t slot_count; // 时间槽数量
int32_t current_slot; // 当前指针指向的槽位
uint64_t tick_interval; // 每个槽位的时间间隔,单位毫秒
uint64_t start_time; // 时间轮启动时间
std::vector<std::vector<TimerTask>> slots; // 所有槽位的任务列表
std::unordered_map<uint64_t, TimerTask> task_map; // 任务ID到任务的映射,用于删除任务
std::mutex mutex; // 互斥锁,保证线程安全
std::atomic<bool> running; // 时间轮是否运行中
std::thread tick_thread; // 时间轮转动线程
public:
// 构造函数,初始化槽位数量和时间间隔
SingleLevelTimeWheel(int32_t slot_cnt, uint64_t interval)
: slot_count(slot_cnt), current_slot(0), tick_interval(interval), running(false) {
slots.resize(slot_count);
// 获取当前时间戳,单位毫秒
start_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()
).count();
}
// 启动时间轮
void start() {
running = true;
tick_thread = std::thread(&SingleLevelTimeWheel::tick, this);
}
// 停止时间轮
void stop() {
running = false;
if (tick_thread.joinable()) {
tick_thread.join();
}
}
// 添加定时任务,delay为延迟时间,单位毫秒
uint64_t add_timer(uint64_t delay, std::function<void()> callback) {
std::lock_guard<std::mutex> lock(mutex);
uint64_t now = get_current_time();
uint64_t trigger_time = now + delay;
static uint64_t task_id_counter = 0;
uint64_t task_id = ++task_id_counter;
TimerTask task(task_id, trigger_time, callback);
// 计算任务应该存放的槽位
uint64_t diff = trigger_time - start_time;
uint64_t slot_index = (diff / tick_interval) % slot_count;
task.slot = slot_index;
slots[slot_index].push_back(task);
task_map[task_id] = task;
return task_id;
}
// 删除定时任务
void remove_timer(uint64_t task_id) {
std::lock_guard<std::mutex> lock(mutex);
auto it = task_map.find(task_id);
if (it != task_map.end()) {
int32_t slot = it->second.slot;
// 遍历对应槽位的任务,找到并删除
auto& slot_tasks = slots[slot];
for (auto iter = slot_tasks.begin(); iter != slot_tasks.end(); ++iter) {
if (iter->task_id == task_id) {
slot_tasks.erase(iter);
break;
}
}
task_map.erase(it);
}
}
private:
// 获取当前时间戳,单位毫秒
uint64_t get_current_time() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()
).count();
}
// 时间轮转动逻辑
void tick() {
while (running) {
std::this_thread::sleep_for(std::chrono::milliseconds(tick_interval));
std::lock_guard<std::mutex> lock(mutex);
// 触发当前槽位的所有任务
auto& current_tasks = slots[current_slot];
for (auto& task : current_tasks) {
if (task.callback) {
task.callback();
}
}
current_tasks.clear();
// 指针移动到下一个槽位
current_slot = (current_slot + 1) % slot_count;
}
}
};
多层时间轮实现
多层时间轮需要扩展单层时间轮的逻辑,支持任务的层级迁移,这里实现一个两层时间轮作为示例,包含毫秒轮和秒轮。
class MultiLevelTimeWheel {
private:
// 毫秒轮,每个槽位1毫秒,共1024个槽位,覆盖1秒时间
int32_t ms_slot_count = 1024;
int32_t ms_current_slot = 0;
uint64_t ms_tick_interval = 1; // 1毫秒
// 秒轮,每个槽位1秒,共60个槽位,覆盖60秒时间
int32_t sec_slot_count = 60;
int32_t sec_current_slot = 0;
uint64_t sec_tick_interval = 1000; // 1000毫秒
std::vector<std::vector<TimerTask>> ms_slots;
std::vector<std::vector<TimerTask>> sec_slots;
std::unordered_map<uint64_t, TimerTask> task_map;
std::mutex mutex;
std::atomic<bool> running;
std::thread tick_thread;
uint64_t start_time;
public:
MultiLevelTimeWheel() : running(false) {
ms_slots.resize(ms_slot_count);
sec_slots.resize(sec_slot_count);
start_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()
).count();
}
void start() {
running = true;
tick_thread = std::thread(&MultiLevelTimeWheel::tick, this);
}
void stop() {
running = false;
if (tick_thread.joinable()) {
tick_thread.join();
}
}
uint64_t add_timer(uint64_t delay, std::function<void()> callback) {
std::lock_guard<std::mutex> lock(mutex);
uint64_t now = get_current_time();
uint64_t trigger_time = now + delay;
static uint64_t task_id_counter = 0;
uint64_t task_id = ++task_id_counter;
TimerTask task(task_id, trigger_time, callback);
uint64_t diff = trigger_time - start_time;
// 如果延迟时间在1秒内,放到毫秒轮
if (diff < ms_slot_count) {
uint64_t slot_index = diff % ms_slot_count;
task.slot = slot_index;
task.round = 0;
ms_slots[slot_index].push_back(task);
} else {
// 否则放到秒轮
uint64_t sec_diff = diff / 1000;
uint64_t slot_index = sec_diff % sec_slot_count;
task.slot = slot_index;
task.round = sec_diff / sec_slot_count;
sec_slots[slot_index].push_back(task);
}
task_map[task_id] = task;
return task_id;
}
private:
uint64_t get_current_time() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()
).count();
}
void tick() {
while (running) {
std::this_thread::sleep_for(std::chrono::milliseconds(ms_tick_interval));
std::lock_guard<std::mutex> lock(mutex);
// 处理毫秒轮当前槽位的任务
auto& ms_current_tasks = ms_slots[ms_current_slot];
for (auto& task : ms_current_tasks) {
if (task.callback) {
task.callback();
}
}
ms_current_tasks.clear();
// 毫秒轮指针移动
ms_current_slot = (ms_current_slot + 1) % ms_slot_count;
// 如果毫秒轮转完一轮,处理秒轮
if (ms_current_slot == 0) {
// 触发秒轮当前槽位的任务,将任务迁移到毫秒轮
auto& sec_current_tasks = sec_slots[sec_current_slot];
std::vector<TimerTask> new_tasks;
for (auto& task : sec_current_tasks) {
if (task.round > 0) {
// 轮次减1,重新放回秒轮
task.round--;
sec_slots[sec_current_slot].push_back(task);
} else {
// 轮次为0,迁移到毫秒轮
uint64_t diff = task.trigger_time - get_current_time();
if (diff < ms_slot_count) {
uint64_t ms_slot = diff % ms_slot_count;
task.slot = ms_slot;
ms_slots[ms_slot].push_back(task);
}
}
}
sec_current_tasks.clear();
sec_current_slot = (sec_current_slot + 1) % sec_slot_count;
}
}
}
};
时间轮性能对比与适用场景
和传统定时器实现方式相比,时间轮的优势非常明显:
| 定时器实现方式 | 添加任务复杂度 | 删除任务复杂度 | 触发任务复杂度 | 适用场景 |
|---|---|---|---|---|
| 最小堆 | O(log n) | O(log n) | O(1) | 任务数量较少的场景 |
| 红黑树 | O(log n) | O(log n) | O(log n) | 任务数量中等的场景 |
| 时间轮 | O(1) | O(1) | O(1) | 高并发、大量定时任务的场景 |
时间轮非常适合以下场景:
- 网络服务中的连接超时管理,比如TCP连接空闲超时、请求超时检测
- 消息队列中的延迟消息投递,比如订单超时关闭、延迟通知
- 周期性任务的调度,比如定时统计数据、定时清理缓存
使用注意事项
在实际使用时间轮时需要注意几个问题:
- 时间槽的数量需要根据业务场景设置,槽位过少会导致任务冲突过多,槽位过多会浪费内存
- 定时任务的回调函数应该尽量简短,避免阻塞时间轮的转动线程,如果需要执行耗时操作,可以将任务抛到独立的线程池中执行
- 多线程操作时间轮时需要做好线程安全保护,上面的实现中使用了互斥锁保证操作的安全性
- 如果需要的定时时间范围超过多层时间轮的覆盖范围,可以继续扩展更多层的时间轮,比如增加分轮、时轮等
通过上述C++实现的时间轮调度模块,可以轻松支撑百万级别的定时任务调度,有效解决高并发场景下的定时器性能瓶颈问题。开发者可以根据自身的业务需求,对时间轮的层级、槽位数量进行调整,也可以扩展任务持久化、任务取消回调等功能。