在并发编程场景中,定时任务调度是非常常见的需求,比如定时清理过期连接、定时执行重试任务等。当系统中存在大量定时任务时,普通的定时器实现方式往往会出现性能下降的问题,带优先级的时间轮定时器是解决这类问题的有效方案。

核心设计思路
带优先级的时间轮定时器主要由三个核心部分组成:分层时间轮、优先级队列、任务执行线程。分层时间轮用来管理不同时间粒度的定时任务,优先级队列用来存储同一时间槽内的不同优先级任务,任务执行线程负责取出到期任务并执行。
分层时间轮结构
时间轮采用分层设计,类似时钟的时、分、秒结构,每一层对应不同的时间精度。最底层的时间轮精度最高,比如1毫秒一个槽,上层时间轮的槽数更少,时间跨度更大。当底层时间轮转动一圈后,会触发上层时间轮的指针移动,将上层时间轮对应槽内的任务下沉到下层时间轮中。
优先级任务管理
每个时间槽对应一个优先级队列,队列中的元素是待执行的定时任务,任务按照优先级从高到低排序。当时间轮指针指向某个槽时,就取出该槽对应优先级队列中的所有到期任务,按照优先级顺序执行。
核心数据结构定义
首先定义定时任务的结构体,包含任务的执行时间、优先级、回调函数等核心信息。
#include <iostream>
#include <vector>
#include <queue>
#include <functional>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
// 定时任务结构体
struct TimerTask {
uint64_t execute_time; // 任务执行时间戳,单位毫秒
int priority; // 任务优先级,数值越大优先级越高
std::function<void()> callback; // 任务回调函数
// 优先级比较函数,优先级高的排在队列前面
bool operator<(const TimerTask& other) const {
if (priority != other.priority) {
return priority < other.priority;
}
// 优先级相同的情况下,先执行的任务排在前面
return execute_time > other.execute_time;
}
};
接下来定义分层时间轮的核心结构,这里实现两层时间轮,第一层精度为1毫秒,共1024个槽,第二层精度为1024毫秒,共64个槽。
// 时间轮类
class TimeWheel {
private:
static const int WHEEL_SLOT_NUM = 1024; // 每层时间轮的槽数
static const int WHEEL_LEVEL_NUM = 2; // 时间轮层数
// 每层时间轮的结构,每个槽对应一个优先级队列
using TaskQueue = std::priority_queue<TimerTask>;
std::vector<std::vector<TaskQueue>> wheels; // 第一维是层数,第二维是槽索引
int current_slots[WHEEL_LEVEL_NUM]; // 每层时间轮当前指针位置
uint64_t start_time; // 时间轮启动的时间戳
std::mutex wheel_mutex; // 保护时间轮数据的互斥锁
std::condition_variable cv; // 用于任务执行线程等待的条件变量
bool is_running; // 时间轮是否运行中
std::thread work_thread; // 任务执行线程
public:
TimeWheel() : is_running(false) {
// 初始化两层时间轮
wheels.resize(WHEEL_LEVEL_NUM);
for (int i = 0; i < WHEEL_LEVEL_NUM; i++) {
wheels[i].resize(WHEEL_SLOT_NUM);
current_slots[i] = 0;
}
// 记录启动时间
start_time = get_current_ms();
}
~TimeWheel() {
stop();
}
// 获取当前时间戳,单位毫秒
static uint64_t get_current_ms() {
auto now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
return std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
}
};
核心功能实现
添加定时任务
添加任务时需要计算任务应该放入哪一层时间轮的哪个槽中,如果任务的执行时间超过当前时间轮的最大覆盖范围,则放入上层时间轮。
// 添加定时任务,delay_ms是延迟执行的毫秒数
void add_task(int delay_ms, int priority, std::function<void()> callback) {
uint64_t execute_time = get_current_ms() + delay_ms;
TimerTask task{execute_time, priority, callback};
std::lock_guard<std::mutex> lock(wheel_mutex);
// 计算任务相对于时间轮启动时间的偏移
uint64_t offset = execute_time - start_time;
// 如果偏移为0,直接放入第一层第0个槽
if (offset == 0) {
wheels[0][0].push(task);
cv.notify_one();
return;
}
// 逐层计算槽位置
for (int level = 0; level < WHEEL_LEVEL_NUM; level++) {
int slot_index = (current_slots[level] + offset) % WHEEL_SLOT_NUM;
wheels[level][slot_index].push(task);
// 上层时间轮一个槽的时间跨度是下层时间轮的总时间跨度
offset = offset / WHEEL_SLOT_NUM;
if (offset == 0) {
break;
}
}
cv.notify_one();
}
时间轮转动与任务执行
时间轮需要有一个线程不断驱动转动,检查到期的任务并执行。转动时先更新最底层时间轮的指针,取出对应槽内的所有任务,执行到期任务,同时处理上层时间轮的任务下沉逻辑。
// 启动时间轮
void start() {
if (is_running) {
return;
}
is_running = true;
work_thread = std::thread(&TimeWheel::run, this);
}
// 停止时间轮
void stop() {
if (!is_running) {
return;
}
is_running = false;
cv.notify_one();
if (work_thread.joinable()) {
work_thread.join();
}
}
private:
// 时间轮运行逻辑
void run() {
while (is_running) {
uint64_t now = get_current_ms();
uint64_t current_offset = now - start_time;
// 计算当前应该转动到的时间偏移对应的槽
int target_slot = current_offset % WHEEL_SLOT_NUM;
// 如果指针还没到目标槽,等待一段时间
if (current_slots[0] != target_slot) {
// 取出当前槽的所有任务
std::vector<TimerTask> tasks_to_execute;
{
std::lock_guard<std::mutex> lock(wheel_mutex);
// 将优先级队列中的任务取出放到临时容器中
TaskQueue& queue = wheels[0][current_slots[0]];
while (!queue.empty()) {
TimerTask task = queue.top();
queue.pop();
if (task.execute_time <= now) {
tasks_to_execute.push_back(task);
} else {
// 还没到执行时间,重新放回队列
queue.push(task);
break;
}
}
}
// 执行到期任务
for (auto& task : tasks_to_execute) {
try {
task.callback();
} catch (...) {
std::cerr << "执行定时任务出现异常" << std::endl;
}
}
// 更新第一层时间轮指针
current_slots[0] = (current_slots[0] + 1) % WHEEL_SLOT_NUM;
// 如果第一层指针回到0,触发上层时间轮转动
if (current_slots[0] == 0) {
cascade(1);
}
} else {
// 还没到下一个槽的时间,等待1毫秒
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
}
// 上层时间轮级联转动
void cascade(int level) {
if (level >= WHEEL_LEVEL_NUM) {
return;
}
current_slots[level] = (current_slots[level] + 1) % WHEEL_SLOT_NUM;
// 取出上层时间轮当前槽的任务,下沉到下层时间轮
std::lock_guard<std::mutex> lock(wheel_mutex);
TaskQueue& queue = wheels[level][current_slots[level]];
std::vector<TimerTask> temp_tasks;
while (!queue.empty()) {
temp_tasks.push_back(queue.top());
queue.pop();
}
for (auto& task : temp_tasks) {
uint64_t offset = task.execute_time - start_time;
int slot_index = offset % WHEEL_SLOT_NUM;
wheels[0][slot_index].push(task);
}
// 如果上层时间轮指针回到0,继续触发更上层时间轮转动
if (current_slots[level] == 0) {
cascade(level + 1);
}
}
};
使用示例
下面是使用该时间轮定时器的示例代码,创建三个不同优先级和延迟时间的任务,观察执行顺序。
int main() {
TimeWheel time_wheel;
time_wheel.start();
// 添加三个定时任务
time_wheel.add_task(100, 1, []() {
std::cout << "执行低优先级任务,延迟100毫秒" << std::endl;
});
time_wheel.add_task(100, 3, []() {
std::cout << "执行高优先级任务,延迟100毫秒" << std::endl;
});
time_wheel.add_task(200, 2, []() {
std::cout << "执行中优先级任务,延迟200毫秒" << std::endl;
});
// 等待任务执行完成
std::this_thread::sleep_for(std::chrono::seconds(1));
time_wheel.stop();
return 0;
}
并发场景调优建议
在并发场景下使用该定时器时,可以从以下几个方面进行调优:
- 调整时间轮的层数和每层的槽数,根据业务中定时任务的时间分布选择合适的参数,减少任务下沉的次数。
- 任务回调函数尽量保证执行时间短,如果任务执行耗时较长,可以将任务投递到独立的线程池中执行,避免阻塞时间轮的转动。
- 如果定时任务数量非常多,可以考虑将优先级队列替换为更高效的并发优先级队列,减少锁的竞争。
- 对于重复执行的定时任务,可以在任务执行完成后重新计算下一次执行时间,再次添加到时间轮中,避免频繁创建新的任务对象。
该实现通过分层时间轮减少了遍历所有定时任务的开销,结合优先级队列保证了高优先级任务优先执行,在大量定时任务的并发场景下相比传统的排序链表定时器有更好的性能表现。