Go语言原生支持并发特性,Goroutine作为轻量级线程被广泛用于并发任务处理,但无限制创建Goroutine会占用大量内存资源,甚至引发程序崩溃。Goroutine工作池通过预先创建固定数量的Goroutine来处理任务队列,既能控制并发规模,又能复用Goroutine资源,是Go并发编程中常用的优化方案。

工作池核心设计思路
一个基础的Goroutine工作池通常包含三个核心部分:任务队列、工作Goroutine集合、结果收集机制。任务队列用于存放待处理的任务,工作Goroutine从队列中取出任务执行,执行完成后将结果传递到结果通道,同时需要控制所有任务完成后的流程关闭,避免资源泄漏。
核心组件说明
- 任务通道:用于传递待执行的任务,通常定义为函数类型的通道,方便传递不同的任务逻辑
- 结果通道:用于收集任务执行后的返回结果,根据实际需求可以选择是否使用
- WaitGroup:用于等待所有工作Goroutine完成任务,确保主流程不会提前退出
- 关闭信号:用于通知工作Goroutine退出循环,避免Goroutine空转占用资源
基础工作池实现示例
下面实现一个支持自定义任务、固定工作Goroutine数量的基础工作池,包含任务分发、结果收集、优雅关闭的完整逻辑。
任务与结果定义
首先定义任务函数和结果的结构,方便统一处理不同类型的任务:
package main
import (
"fmt"
"sync"
)
// 定义任务类型,接受整型参数,返回整型和错误
type Task func(int) (int, error)
// 定义结果结构,包含任务参数、返回结果和错误信息
type Result struct {
TaskParam int
Value int
Err error
}
工作池结构体实现
定义工作池的结构体,包含核心的通道和配置参数:
// WorkerPool 工作池结构体
type WorkerPool struct {
taskChan chan Task // 任务通道
resultChan chan Result // 结果通道
workerNum int // 工作Goroutine数量
wg sync.WaitGroup
stopChan chan struct{} // 停止信号通道
}
工作池初始化与启动
实现工作池的初始化方法,创建对应数量的Goroutine并启动:
// NewWorkerPool 创建工作池实例
func NewWorkerPool(workerNum int, taskBuffer int, resultBuffer int) *WorkerPool {
return &WorkerPool{
taskChan: make(chan Task, taskBuffer),
resultChan: make(chan Result, resultBuffer),
workerNum: workerNum,
stopChan: make(chan struct{}),
}
}
// Start 启动工作池,创建指定数量的工作Goroutine
func (p *WorkerPool) Start() {
for i := 0; i < p.workerNum; i++ {
p.wg.Add(1)
go p.worker(i)
}
}
// worker 工作Goroutine逻辑,循环从任务通道取任务执行
func (p *WorkerPool) worker(workerID int) {
defer p.wg.Done()
for {
select {
case task, ok := <-p.taskChan:
if !ok {
// 任务通道关闭,退出循环
fmt.Printf("worker %d exit, task channel closedn", workerID)
return
}
// 执行任务,这里默认任务参数为workerID,实际可以根据需求调整
val, err := task(workerID)
// 将结果发送到结果通道
p.resultChan <- Result{
TaskParam: workerID,
Value: val,
Err: err,
}
case <-p.stopChan:
// 收到停止信号,退出循环
fmt.Printf("worker %d exit, receive stop signaln", workerID)
return
}
}
}
任务分发与结果收集
实现添加任务和收集结果的方法:
// AddTask 向工作池添加任务
func (p *WorkerPool) AddTask(task Task) {
p.taskChan <- task
}
// GetResult 从结果通道获取结果
func (p *WorkerPool) GetResult() <-chan Result {
return p.resultChan
}
// Close 关闭工作池,先关闭任务通道,等待所有worker完成后再关闭结果通道
func (p *WorkerPool) Close() {
close(p.taskChan)
// 等待所有worker完成
p.wg.Wait()
close(p.resultChan)
}
完整调用示例
下面演示如何使用上述工作池处理一批计算任务:
func main() {
// 创建包含3个工作Goroutine的工作池,任务通道缓冲10,结果通道缓冲10
pool := NewWorkerPool(3, 10, 10)
// 启动工作池
pool.Start()
// 定义任务:计算参数的平方
task := func(param int) (int, error) {
return param * param, nil
}
// 添加5个任务到工作池
for i := 1; i <= 5; i++ {
curr := i
pool.AddTask(func(int) (int, error) {
return curr * curr, nil
})
}
// 启动协程收集结果
go func() {
for res := range pool.GetResult() {
if res.Err != nil {
fmt.Printf("task param %d failed, err: %vn", res.TaskParam, res.Err)
} else {
fmt.Printf("task param %d result: %dn", res.TaskParam, res.Value)
}
}
}()
// 关闭工作池
pool.Close()
}
工作池优化方向
上述基础工作池可以满足简单的并发任务需求,实际业务场景中可以根据需求做进一步优化:
动态扩容缩容
基础工作池的工作Goroutine数量是固定的,如果任务量波动较大,可以结合任务队列的长度动态调整工作Goroutine数量,任务多时扩容,任务少时缩容,提升资源利用率。
错误重试机制
如果任务执行失败,可以在工作Goroutine中加入重试逻辑,设置最大重试次数,避免临时故障导致任务失败。
超时控制
为单个任务或者整个工作池设置超时时间,避免某个任务长时间阻塞导致整个工作池卡住,可以结合context包实现超时控制逻辑。
任务优先级
如果任务有优先级区分,可以使用多个优先级队列替换单个任务通道,高优先级任务优先被Goroutine处理。
注意事项
- 任务通道和结果通道的缓冲大小需要根据实际任务量设置,避免通道满导致任务添加阻塞
- 关闭工作池时需要先关闭任务通道,等待所有Goroutine处理完剩余任务后再关闭结果通道,避免向已关闭的通道发送数据引发panic
- 如果任务中可能出现panic,需要在worker中加入recover逻辑,避免单个任务的panic导致整个Goroutine退出
- 工作Goroutine数量不是越多越好,需要根据CPU核心数和任务类型(CPU密集型/IO密集型)合理设置,IO密集型任务可以适当调大Goroutine数量
Goroutine工作池Go并发channelsync_WaitGroup修改时间:2026-06-14 08:30:16