生产者消费者模型通过解耦生产任务的模块和消费任务的模块,提升系统的并发处理能力和稳定性,在Golang中可以利用channel的阻塞特性和goroutine的轻量特性高效实现该模型,同时协调任务队列的流转。

基础实现思路
核心逻辑分为三个部分:任务定义、生产者逻辑、消费者逻辑。生产者负责生成任务并发送到channel,消费者从channel读取任务并处理,channel作为任务队列的中间载体协调两者的通信。
任务定义
首先定义需要处理的任务结构,这里以简单的整数任务为例:
package main
import "fmt"
// 定义任务结构
type Task struct {
ID int // 任务ID
}
生产者实现
生产者会循环生成任务,并将任务发送到任务队列channel中,生成完成后关闭channel通知消费者没有新任务:
// 生产者函数,参数分别为任务队列通道、需要生成的任务数量
func producer(taskChan chan<- Task, taskNum int) {
for i := 1; i <= taskNum; i++ {
task := Task{ID: i}
fmt.Printf("生产者生成任务: %dn", i)
taskChan <- task // 将任务发送到通道
}
close(taskChan) // 关闭通道,标识任务发送完毕
fmt.Println("生产者完成任务发送,关闭通道")
}
消费者实现
消费者从任务队列channel中读取任务,处理完成后输出结果,当channel关闭且没有剩余任务时自动退出:
// 消费者函数,参数分别为任务队列通道、消费者ID
func consumer(taskChan <-chan Task, consumerID int) {
for task := range taskChan { // 循环读取通道中的任务,通道关闭后自动退出循环
fmt.Printf("消费者%d处理任务: %dn", consumerID, task.ID)
}
fmt.Printf("消费者%d处理完所有任务,退出n", consumerID)
}
主函数调用
在主函数中创建任务队列channel,启动生产者和消费者goroutine,等待所有任务处理完成:
func main() {
taskChan := make(chan Task, 10) // 创建带缓冲的任务队列,缓冲大小为10
// 启动1个生产者
go producer(taskChan, 20)
// 启动2个消费者
go consumer(taskChan, 1)
go consumer(taskChan, 2)
// 阻塞主goroutine,等待所有消费者处理完任务
// 这里简单使用select阻塞,实际场景可以用sync.WaitGroup
select {}
}
多生产者多消费者扩展
实际场景中可能存在多个生产者和多个消费者,此时需要注意所有生产者完成任务后统一关闭channel,避免消费者读取到已关闭的通道导致panic。可以使用sync.WaitGroup等待所有生产者完成:
package main
import (
"fmt"
"sync"
)
type Task struct {
ID int
}
func producer(taskChan chan<- Task, wg *sync.WaitGroup, producerID int, taskNum int) {
defer wg.Done()
for i := 1; i <= taskNum; i++ {
task := Task{ID: i + producerID*100} // 不同生产者的任务ID做区分
fmt.Printf("生产者%d生成任务: %dn", producerID, task.ID)
taskChan <- task
}
fmt.Printf("生产者%d完成任务发送n", producerID)
}
func consumer(taskChan <-chan Task, consumerID int, wg *sync.WaitGroup) {
defer wg.Done()
for task := range taskChan {
fmt.Printf("消费者%d处理任务: %dn", consumerID, task.ID)
}
fmt.Printf("消费者%d处理完所有任务,退出n", consumerID)
}
func main() {
taskChan := make(chan Task, 20)
var producerWg, consumerWg sync.WaitGroup
// 启动3个生产者,每个生成10个任务
for i := 0; i < 3; i++ {
producerWg.Add(1)
go producer(taskChan, &producerWg, i, 10)
}
// 启动4个消费者
for i := 0; i < 4; i++ {
consumerWg.Add(1)
go consumer(taskChan, i, &consumerWg)
}
// 等待所有生产者完成,然后关闭通道
go func() {
producerWg.Wait()
close(taskChan)
fmt.Println("所有生产者完成,关闭任务通道")
}()
// 等待所有消费者完成
consumerWg.Wait()
fmt.Println("所有任务处理完成,程序退出")
}
注意事项
- channel的关闭操作只能执行一次,多次关闭会触发panic,因此建议由生产方统一关闭,或者在确认所有生产方都完成后关闭。
- 带缓冲的channel可以缓解生产者和消费者的速度差异,缓冲大小需要根据实际任务处理速度合理设置,避免过大占用过多内存。
- 如果消费者处理逻辑中出现panic,需要添加recover机制,避免单个消费者异常导致整个程序崩溃。
- 当任务处理完成后,需要正确等待所有消费者退出,避免主程序提前退出导致任务未处理完成。
场景适配
该模型适合处理异步任务分发、流量削峰、模块解耦等场景,比如接口请求异步处理、批量数据异步导出、消息队列本地简化实现等。如果任务处理过程中需要返回结果,可以在任务结构中添加结果字段,或者额外创建一个结果channel用于传递处理结果。