生产者消费者模式是并发编程中非常经典的设计模式,Golang通过goroutine和channel可以非常简洁地实现这个模式,下面我们就一步步讲解具体的实现方法。

基础实现:无缓冲channel版本
最基础的实现方式是使用无缓冲channel,生产者协程生产数据后发送到channel,消费者协程从channel接收数据进行处理,两者会同步阻塞等待对方操作。
package main
import (
"fmt"
"time"
)
// 生产者函数,向channel发送数据
func producer(ch chan<- int, count int) {
for i := 1; i <= count; i++ {
fmt.Printf("生产者生产数据: %d\n", i)
ch <- i // 发送数据到channel,无缓冲时会阻塞直到消费者接收
time.Sleep(time.Millisecond * 500) // 模拟生产耗时
}
close(ch) // 生产完成后关闭channel,通知消费者没有更多数据
}
// 消费者函数,从channel接收数据并处理
func consumer(ch <-chan int, id int) {
for data := range ch { // 循环接收channel数据,channel关闭后会自动退出循环
fmt.Printf("消费者%d处理数据: %d\n", id, data)
time.Sleep(time.Millisecond * 800) // 模拟消费耗时
}
fmt.Printf("消费者%d结束工作\n", id)
}
func main() {
ch := make(chan int) // 创建无缓冲channel
go producer(ch, 5) // 启动生产者协程,生产5条数据
go consumer(ch, 1) // 启动消费者协程
time.Sleep(time.Second * 6) // 等待所有任务完成
fmt.Println("所有任务处理完成")
}优化版本:带缓冲channel与多消费者
无缓冲channel的生产和消费是严格同步的,效率较低,我们可以使用带缓冲的channel提升效率,同时启动多个消费者协程并行处理数据。
package main
import (
"fmt"
"sync"
"time"
)
// 生产者函数
func producer(ch chan<- int, count int, wg *sync.WaitGroup) {
defer wg.Done() // 生产者完成后通知WaitGroup
for i := 1; i <= count; i++ {
fmt.Printf("生产者生产数据: %d\n", i)
ch <- i
time.Sleep(time.Millisecond * 300)
}
close(ch)
}
// 消费者函数
func consumer(ch <-chan int, id int, wg *sync.WaitGroup) {
defer wg.Done()
for data := range ch {
fmt.Printf("消费者%d处理数据: %d\n", id, data)
time.Sleep(time.Millisecond * 600)
}
fmt.Printf("消费者%d结束工作\n", id)
}
func main() {
ch := make(chan int, 3) // 创建缓冲大小为3的channel
var wg sync.WaitGroup
// 启动1个生产者
wg.Add(1)
go producer(ch, 10, &wg)
// 启动3个消费者
for i := 1; i <= 3; i++ {
wg.Add(1)
go consumer(ch, i, &wg)
}
wg.Wait() // 等待所有生产者和消费者完成
fmt.Println("所有任务处理完成")
}关键注意点
- channel的关闭只能由发送方(生产者)执行,多次关闭channel会导致panic,接收方不能关闭channel。
- 使用
for range遍历channel时,channel关闭后会自动退出循环,不需要额外判断。 - 如果启动多个协程,建议使用
sync.WaitGroup来等待所有协程完成,避免主协程提前退出。 - 缓冲channel的大小需要根据实际生产消费的速度调整,过大会浪费内存,过小起不到缓冲作用。
常见场景适配
如果生产者和消费者的数量都是动态的,我们可以结合context来实现优雅退出,当收到退出信号时,生产者停止生产,消费者处理完剩余数据后退出。
package main
import (
"context"
"fmt"
"sync"
"time"
)
func producer(ctx context.Context, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
i := 1
for {
select {
case <-ctx.Done(): // 收到退出信号,停止生产
fmt.Println("生产者收到退出信号,停止生产")
return
case ch <- i:
fmt.Printf("生产者生产数据: %d\n", i)
i++
time.Sleep(time.Millisecond * 400)
}
}
}
func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for data := range ch {
fmt.Printf("消费者%d处理数据: %d\n", id, data)
time.Sleep(time.Millisecond * 700)
}
fmt.Printf("消费者%d结束工作\n", id)
}
func main() {
ch := make(chan int, 2)
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
// 启动1个生产者
wg.Add(1)
go producer(ctx, ch, &wg)
// 启动2个消费者
for i := 1; i <= 2; i++ {
wg.Add(1)
go consumer(i, ch, &wg)
}
// 运行3秒后发送退出信号
time.Sleep(time.Second * 3)
cancel()
close(ch) // 关闭channel,通知消费者没有更多数据
wg.Wait()
fmt.Println("程序退出")
}