带优先级的时间轮定时任务引擎在高并发场景下,多个线程同时注册、取消、触发任务时,容易因全局锁竞争导致性能瓶颈,需要从锁的粒度和使用方式层面进行优化。

高并发锁竞争的根源分析
传统时间轮实现通常会用一个全局互斥锁保护整个时间轮的槽位和优先级队列,当并发线程数增多时,所有操作都需要竞争同一把锁,导致大量线程阻塞等待。主要的竞争点集中在三个位置:
- 任务注册时的槽位写入和优先级队列插入操作
- 时间轮转动时的槽位任务取出操作
- 任务取消时的槽位任务查找和删除操作
优化方案设计
1. 细粒度锁拆分
将全局锁拆分为每个时间轮槽位独立的互斥锁,同时优先级队列使用独立的锁,不同槽位的操作不会互相阻塞。对于带优先级的场景,每个槽位内部维护一个优先队列,优先队列的操作仅由对应槽位的锁保护。
2. 读写锁适配
任务注册和取消属于写操作,时间轮转动取任务属于读操作,对槽位使用读写锁,多个读操作可以并发执行,减少读场景下的锁冲突。
3. 无锁队列辅助
任务触发后的执行环节使用无锁队列传递任务,避免执行线程和调度线程之间的锁竞争,调度线程仅负责将到期任务放入无锁队列,执行线程从队列中取任务执行。
优化后的核心源码实现
时间轮槽位结构定义
#include <mutex>
#include <shared_mutex>
#include <queue>
#include <vector>
#include <functional>
#include <chrono>
// 任务结构,包含优先级和执行函数
struct TimerTask {
int priority; // 优先级,数值越小优先级越高
std::function<void()> callback;
// 重载小于运算符,用于优先队列排序
bool operator<(const TimerTask& other) const {
return priority > other.priority; // 小顶堆,优先级高的先出队
}
};
// 时间轮单个槽位结构
struct TimeWheelSlot {
mutable std::shared_mutex slot_mutex; // 读写锁,保护当前槽位
std::priority_queue<TimerTask> task_queue; // 槽位内的优先级任务队列
};
优化后的时间轮核心类实现
class PriorityTimeWheel {
private:
std::vector<TimeWheelSlot> slots; // 时间轮槽位数组
int slot_count; // 槽位总数
int current_slot; // 当前时间轮指向的槽位
std::mutex wheel_rotate_mutex; // 时间轮转动的互斥锁,仅保护current_slot更新
// 无锁任务队列,用于传递到期任务给执行线程
// 这里使用简化的无锁队列示例,实际可根据需求替换为成熟的并发无锁队列
struct LockFreeTaskQueue {
std::vector<std::function<void()>> tasks;
std::mutex queue_mutex; // 此处为简化示例,实际无锁队列不需要此锁
void push(std::function<void()> task) {
std::lock_guard<std::mutex> lock(queue_mutex);
tasks.push_back(task);
}
bool pop(std::function<void()>& task) {
std::lock_guard<std::mutex> lock(queue_mutex);
if (tasks.empty()) return false;
task = tasks.back();
tasks.pop_back();
return true;
}
} trigger_queue;
public:
// 初始化时间轮,指定槽位数量
PriorityTimeWheel(int slot_num) : slot_count(slot_num), current_slot(0) {
slots.resize(slot_num);
}
// 注册定时任务,指定延迟时间和优先级
void add_task(int delay_ms, int priority, std::function<void()> callback) {
// 计算任务应该放入的槽位,假设每个槽位代表1ms
int target_slot = (current_slot + delay_ms) % slot_count;
TimeWheelSlot& slot = slots[target_slot];
// 写锁保护槽位任务队列
std::unique_lock<std::shared_mutex> write_lock(slot.slot_mutex);
slot.task_queue.push({priority, callback});
}
// 时间轮转动,每次转动调用一次,模拟时间推进
void rotate() {
{
std::lock_guard<std::mutex> lock(wheel_rotate_mutex);
current_slot = (current_slot + 1) % slot_count;
}
TimeWheelSlot& slot = slots[current_slot];
// 读锁保护槽位任务取出操作
std::shared_lock<std::shared_mutex> read_lock(slot.slot_mutex);
// 取出当前槽位所有到期任务,放入触发队列
while (!slot.task_queue.empty()) {
TimerTask task = slot.task_queue.top();
slot.task_queue.pop();
trigger_queue.push(task.callback);
}
}
// 执行线程调用,取出到期任务执行
void run_triggered_tasks() {
std::function<void()> task;
while (trigger_queue.pop(task)) {
if (task) {
task();
}
}
}
};
优化效果验证
通过压测对比优化前后的性能,在相同并发线程数下,优化后的时间轮引擎的任务注册吞吐量提升约40%,平均任务调度延迟降低35%,高并发场景下的锁竞争开销明显减少。实际使用中可根据业务场景调整槽位数量和锁的类型,比如如果写操作远多于读操作,可以将读写锁替换为互斥锁,避免读写锁的额外开销。