在PHP项目里,队列常用于处理耗时、非实时的异步任务,比如发送邮件、生成报表、处理图片等。当系统中同时存在不同重要程度的任务时,普通队列的先进先出逻辑会导致核心任务被无关紧要的任务阻塞,这时就需要为队列任务设置优先级,让高优先级任务优先被消费。

优先级队列的核心实现思路
要实现任务优先级,核心是在任务入队时标记优先级等级,出队时按照优先级从高到低的顺序提取任务。常见的优先级划分方式有两种:
- 数值型优先级:用数字表示优先级,通常数字越小优先级越高,比如1代表最高优先级,5代表最低优先级
- 枚举型优先级:用明确的等级名称划分,比如
high、medium、low三个等级
基于Redis的实现方案
Redis的有序集合(ZSET)天然适合实现优先级队列,它的score字段可以用来存储优先级数值,成员就是任务内容,ZSET会按照score从小到大排序,正好符合优先级数值越小越优先的逻辑。
任务入队代码
<?php
class RedisPriorityQueue {
private $redis;
private $queueKey = 'task_priority_queue';
public function __construct() {
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
}
/**
* 任务入队
* @param string $taskData 任务数据
* @param int $priority 优先级,数值越小优先级越高
* @return int
*/
public function push($taskData, $priority = 5) {
// score使用优先级数值,相同优先级会按照入队时间排序
return $this->redis->zAdd($this->queueKey, $priority, $taskData);
}
}
// 使用示例
$queue = new RedisPriorityQueue();
// 低优先级任务
$queue->push('发送普通通知邮件', 5);
// 高优先级任务
$queue->push('处理用户支付回调', 1);
$queue->push('生成月度报表', 3);
?>任务出队代码
<?php
class RedisPriorityQueue {
// 省略构造方法和push方法
/**
* 取出最高优先级的任务
* @return string|false
*/
public function pop() {
// 取出score最小(优先级最高)的一个成员,并移除该成员
$tasks = $this->redis->zRange($this->queueKey, 0, 0, true);
if (empty($tasks)) {
return false;
}
$taskData = key($tasks);
$this->redis->zRem($this->queueKey, $taskData);
return $taskData;
}
}
// 消费端使用示例
$queue = new RedisPriorityQueue();
while (true) {
$task = $queue->pop();
if ($task === false) {
sleep(1);
continue;
}
// 处理任务逻辑
echo "处理任务:{$task}" . PHP_EOL;
// 模拟任务处理耗时
sleep(2);
}
?>基于数据库的实现方案
如果项目没有引入Redis,也可以使用数据库表来存储队列任务,通过查询时排序实现优先级调度。
任务表结构设计
| 字段名 | 类型 | 说明 |
|---|---|---|
| id | int | 主键自增 |
| task_data | text | 任务内容 |
| priority | int | 优先级,数值越小优先级越高 |
| status | tinyint | 任务状态:0待处理,1处理中,2已完成 |
| create_time | int | 创建时间戳 |
入队和出队代码
<?php
class DbPriorityQueue {
private $pdo;
public function __construct() {
$this->pdo = new PDO('mysql:host=127.0.0.1;dbname=test;charset=utf8', 'root', 'root');
}
/**
* 任务入队
* @param string $taskData
* @param int $priority
* @return bool
*/
public function push($taskData, $priority = 5) {
$sql = "INSERT INTO task_queue (task_data, priority, status, create_time) VALUES (?, ?, 0, ?)";
$stmt = $this->pdo->prepare($sql);
return $stmt->execute([$taskData, $priority, time()]);
}
/**
* 取出最高优先级的任务
* @return array|false
*/
public function pop() {
$this->pdo->beginTransaction();
try {
// 先锁住符合要求的任务,避免并发重复消费
$sql = "SELECT id, task_data FROM task_queue WHERE status = 0 ORDER BY priority ASC, create_time ASC LIMIT 1 FOR UPDATE";
$stmt = $this->pdo->prepare($sql);
$stmt->execute();
$task = $stmt->fetch(PDO::FETCH_ASSOC);
if (!$task) {
$this->pdo->rollBack();
return false;
}
// 更新任务状态为处理中
$updateSql = "UPDATE task_queue SET status = 1 WHERE id = ?";
$updateStmt = $this->pdo->prepare($updateSql);
$updateStmt->execute([$task['id']]);
$this->pdo->commit();
return $task;
} catch (Exception $e) {
$this->pdo->rollBack();
return false;
}
}
}
?>常见调度策略对比
除了基本的优先级优先调度,还可以结合业务需求选择不同的调度策略:
- 严格优先级调度:严格按照优先级顺序执行,只有高优先级任务全部处理完才会处理低优先级任务,适合核心任务绝对优先的场景
- 带权重的优先级调度:给不同优先级分配处理权重的占比,比如高优先级占60%的处理资源,中优先级占30%,低优先级占10%,避免低优先级任务长期饥饿
- 优先级老化调度:低优先级任务等待时间越长,优先级自动提升,防止低优先级任务永远无法被执行
注意事项
在实际使用优先级队列时,需要注意几个问题:
1. 优先级等级不要划分过多,通常3到5个等级足够,过多的等级会增加调度复杂度,也不利于维护
2. 消费端需要做好异常处理,任务处理失败后可以根据情况重新入队,避免重要任务丢失
3. 如果使用Redis方案,要注意Redis的持久化配置,避免服务重启后队列数据丢失
通过上述方案,开发者可以根据项目的实际技术栈和需求,选择合适的PHP队列优先级实现方式,让任务调度更符合业务场景的要求。