在Golang的并发编程场景中,经常会遇到不同优先级任务需要同时处理的情况,比如系统中有紧急的配置更新任务和普通的数据统计任务,需要保证高优先级任务优先被执行,同时兼顾并发处理的效率。实现这个需求的核心是优先级队列的设计以及合理的并发调度逻辑。

优先级队列的设计思路
优先级队列是一种特殊的队列,队列中的元素会按照优先级高低进行排序,出队时优先取出优先级最高的元素。在Golang中,我们可以基于container/heap包实现最小堆结构的优先级队列,通过自定义元素的优先级比较规则,让优先级高的元素(比如优先级数值小的元素)先出队。
首先需要定义任务的结构体,包含任务内容、优先级、任务ID等必要字段:
package main
import (
"container/heap"
"fmt"
"sync"
"time"
)
// 任务结构体
type Task struct {
Id int // 任务ID
Priority int // 优先级,数值越小优先级越高
Content string // 任务内容
}
// 优先级队列,基于Task切片实现
type PriorityQueue []*Task
// 实现heap.Interface的Len方法
func (pq PriorityQueue) Len() int {
return len(pq)
}
// 实现heap.Interface的Less方法,定义优先级比较规则
func (pq PriorityQueue) Less(i, j int) bool {
// 优先级数值越小,优先级越高
return pq[i].Priority < pq[j].Priority
}
// 实现heap.Interface的Swap方法
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
// 实现heap.Interface的Push方法
func (pq *PriorityQueue) Push(x interface{}) {
*pq = append(*pq, x.(*Task))
}
// 实现heap.Interface的Pop方法
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
task := old[n-1]
*pq = old[0 : n-1]
return task
}
并发调度逻辑实现
有了优先级队列之后,需要结合goroutine和channel实现并发的任务处理。我们可以设计一个调度器,包含任务入队通道、结果返回通道、优先级队列实例以及并发控制相关的锁和等待组。
调度器结构体定义
调度器需要管理优先级队列的并发安全访问,同时控制处理任务的goroutine数量:
// 任务调度器
type TaskScheduler struct {
taskQueue PriorityQueue // 优先级队列
taskChan chan *Task // 任务入队通道
resultChan chan string // 结果返回通道
workerNum int // 工作goroutine数量
wg sync.WaitGroup // 等待组,用于等待所有任务处理完成
mu sync.Mutex // 互斥锁,保证优先级队列并发安全
}
// 初始化调度器
func NewTaskScheduler(workerNum int, queueCapacity int) *TaskScheduler {
scheduler := &TaskScheduler{
taskQueue: make(PriorityQueue, 0, queueCapacity),
taskChan: make(chan *Task, 100),
resultChan: make(chan string, 100),
workerNum: workerNum,
}
// 初始化堆
heap.Init(&scheduler.taskQueue)
return scheduler
}
任务入队与调度逻辑
调度器需要启动一个单独的goroutine监听任务入队通道,将接收到的任务放入优先级队列,同时唤醒工作goroutine处理队列中的任务:
// 启动调度器
func (s *TaskScheduler) Start() {
// 启动任务接收goroutine,处理任务入队
go func() {
for task := range s.taskChan {
s.mu.Lock()
heap.Push(&s.taskQueue, task)
s.mu.Unlock()
fmt.Printf("任务%d入队,优先级:%dn", task.Id, task.Priority)
}
}()
// 启动工作goroutine
for i := 0; i < s.workerNum; i++ {
s.wg.Add(1)
go func(workerId int) {
defer s.wg.Done()
for {
s.mu.Lock()
// 如果队列为空,释放锁等待新任务
if s.taskQueue.Len() == 0 {
s.mu.Unlock()
time.Sleep(100 * time.Millisecond)
continue
}
// 取出优先级最高的任务
task := heap.Pop(&s.taskQueue).(*Task)
s.mu.Unlock()
// 处理任务
fmt.Printf("工作goroutine%d开始处理任务%d,优先级:%d,内容:%sn", workerId, task.Id, task.Priority, task.Content)
// 模拟任务处理耗时
time.Sleep(time.Duration(task.Priority+1) * 200 * time.Millisecond)
result := fmt.Sprintf("任务%d处理完成,优先级:%d", task.Id, task.Priority)
s.resultChan <- result
}
}(i)
}
}
提交任务与获取结果
提供提交任务和获取处理结果的方法,方便外部调用:
// 提交任务
func (s *TaskScheduler) SubmitTask(task *Task) {
s.taskChan <- task
}
// 获取处理结果
func (s *TaskScheduler) GetResult() string {
return <-s.resultChan
}
// 关闭调度器
func (s *TaskScheduler) Stop() {
close(s.taskChan)
s.wg.Wait()
close(s.resultChan)
}
完整测试示例
下面通过一个完整的测试代码验证优先级任务并发处理的效果,提交不同优先级的任务,观察处理顺序是否符合预期:
func main() {
// 创建调度器,设置2个工作goroutine,队列初始容量20
scheduler := NewTaskScheduler(2, 20)
// 启动调度器
scheduler.Start()
// 提交不同优先级的任务,优先级数值越小优先级越高
tasks := []*Task{
{Id: 1, Priority: 3, Content: "普通数据统计任务"},
{Id: 2, Priority: 1, Content: "紧急配置更新任务"},
{Id: 3, Priority: 2, Content: "日志归档任务"},
{Id: 4, Priority: 1, Content: "紧急接口熔断任务"},
{Id: 5, Priority: 4, Content: "历史数据清理任务"},
}
for _, task := range tasks {
scheduler.SubmitTask(task)
}
// 获取所有任务的处理结果
for i := 0; i < len(tasks); i++ {
result := scheduler.GetResult()
fmt.Println(result)
}
// 关闭调度器
scheduler.Stop()
}
运行上述代码后可以看到,优先级为1的任务会被优先处理,其次是优先级2、3、4的任务,符合优先级调度的预期。如果工作goroutine数量足够,高优先级任务会在入队后尽快被处理,不用等待低优先级任务完成。这种实现方式既利用了Golang的并发特性,又通过优先级队列保证了任务的处理顺序,适合各类需要优先级调度的并发场景。