在Golang中实现任务队列系统,核心是利用goroutine的轻量级并发特性和channel的线程安全通信能力,搭建一套可以异步处理任务、控制并发数量、支持任务重试的调度体系,适配项目中各类耗时操作的后台处理需求。
任务队列核心设计思路
一个基础的任务队列系统需要包含几个核心模块:任务定义模块、任务队列存储模块、worker工作池模块、调度控制模块。其中任务定义用来规范每个异步任务的结构,队列存储用来临时存放待处理的任务,worker池负责消费队列中的任务,调度模块负责将新任务放入队列并协调worker的工作状态。
任务结构定义
首先我们需要定义一个通用的任务结构体,包含任务ID、任务执行函数、任务参数、重试次数等基础字段,这样不同类型的任务都可以统一放到队列中处理。
package main
import (
"context"
"fmt"
"time"
)
// Task 定义任务基础结构
type Task struct {
ID string // 任务唯一ID
Handler func(ctx context.Context, params map[string]interface{}) error // 任务执行函数
Params map[string]interface{} // 任务参数
Retry int // 剩余重试次数
MaxRetry int // 最大重试次数
}
任务队列与worker池实现
我们可以使用带缓冲的channel作为任务队列的存储载体,同时创建固定数量的goroutine作为worker,持续从channel中读取任务并执行。为了避免无限制创建goroutine导致系统资源耗尽,worker的数量需要可配置。
// TaskQueue 任务队列结构体
type TaskQueue struct {
taskChan chan *Task // 任务通道,作为队列存储
workerNum int // worker数量
ctx context.Context
cancel context.CancelFunc
}
// NewTaskQueue 创建任务队列实例
func NewTaskQueue(workerNum int, queueSize int) *TaskQueue {
ctx, cancel := context.WithCancel(context.Background())
return &TaskQueue{
taskChan: make(chan *Task, queueSize),
workerNum: workerNum,
ctx: ctx,
cancel: cancel,
}
}
// Start 启动任务队列,创建worker池
func (q *TaskQueue) Start() {
for i := 0; i < q.workerNum; i++ {
go q.worker(i)
}
}
// worker 单个worker的处理逻辑
func (q *TaskQueue) worker(workerID int) {
for {
select {
case <-q.ctx.Done():
fmt.Printf("worker %d 停止工作n", workerID)
return
case task, ok := <-q.taskChan:
if !ok {
fmt.Printf("worker %d 任务通道已关闭,停止工作n", workerID)
return
}
q.executeTask(workerID, task)
}
}
}
// executeTask 执行单个任务,包含重试逻辑
func (q *TaskQueue) executeTask(workerID int, task *Task) {
fmt.Printf("worker %d 开始执行任务 %sn", workerID, task.ID)
err := task.Handler(q.ctx, task.Params)
if err != nil {
fmt.Printf("worker %d 执行任务 %s 失败:%vn", workerID, task.ID, err)
// 如果还有重试次数,将任务重新放回队列
if task.Retry > 0 {
task.Retry--
fmt.Printf("任务 %s 剩余重试次数:%d,重新入队n", task.ID, task.Retry)
go func(t *Task) {
time.Sleep(time.Second * 2) // 重试前等待2秒
q.taskChan <- t
}(task)
}
return
}
fmt.Printf("worker %d 执行任务 %s 成功n", workerID, task.ID)
}
任务提交与队列关闭
任务队列需要提供提交任务的方法,同时还要支持优雅关闭,等待所有正在执行的任务完成后再退出,避免任务丢失。
// Submit 提交新任务到队列
func (q *TaskQueue) Submit(task *Task) error {
select {
case <-q.ctx.Done():
return fmt.Errorf("任务队列已关闭,无法提交任务 %s", task.ID)
case q.taskChan <- task:
fmt.Printf("任务 %s 已提交到队列n", task.ID)
return nil
}
}
// Stop 优雅关闭任务队列
func (q *TaskQueue) Stop() {
q.cancel() // 通知所有worker停止工作
close(q.taskChan) // 关闭任务通道,不再接收新任务
}
实际使用示例
我们可以定义一个模拟的耗时任务,比如发送邮件或者处理文件,然后提交到任务队列中,测试整个系统的运行效果。
// 模拟发送邮件的任务处理函数
func sendEmailTask(ctx context.Context, params map[string]interface{}) error {
email, ok := params["email"].(string)
if !ok {
return fmt.Errorf("邮件地址参数错误")
}
// 模拟耗时操作
time.Sleep(time.Second * 1)
fmt.Printf("向邮箱 %s 发送邮件成功n", email)
return nil
}
func main() {
// 创建任务队列,设置3个worker,队列容量为10
queue := NewTaskQueue(3, 10)
// 启动队列
queue.Start()
// 提交5个测试任务
for i := 0; i < 5; i++ {
taskID := fmt.Sprintf("task_%d", i)
task := &Task{
ID: taskID,
Handler: sendEmailTask,
Params: map[string]interface{}{"email": fmt.Sprintf("user%d@ipipp.com", i)},
Retry: 2,
MaxRetry: 2,
}
_ = queue.Submit(task)
}
// 等待任务执行完成
time.Sleep(time.Second * 5)
// 关闭队列
queue.Stop()
}
注意事项
- 任务队列的容量需要根据实际业务场景设置,过小的缓冲容量会导致提交任务时阻塞,过大的容量可能会占用过多内存。
- worker数量不是越多越好,需要根据CPU核心数和任务类型调整,IO密集型任务可以适当增加worker数量,CPU密集型任务建议和CPU核心数持平。
- 如果任务需要持久化存储,避免服务重启后任务丢失,可以将channel替换为Redis、数据库等外部存储,只需要在提交和获取任务的逻辑中对接存储即可。
- 任务执行函数的ctx参数可以用来传递上下文信息,比如超时控制、链路追踪ID等,方便后续问题排查。