Golang实现生产者消费者模式的核心是利用channel作为生产者和消费者之间的通信载体,结合goroutine实现并发执行,生产者向channel写入数据,消费者从channel读取数据,两者通过channel实现解耦和同步。

基础实现示例
下面是一个最简单的生产者消费者模式实现,包含一个生产者和一个消费者,生产者生成数据后发送到channel,消费者从channel接收数据并处理。
package main
import (
"fmt"
"time"
)
// 生产者函数,向channel发送数据
func producer(ch chan<- int, count int) {
for i := 1; i <= count; i++ {
fmt.Printf("生产者生产数据: %dn", i)
ch <- i
time.Sleep(time.Millisecond * 500) // 模拟生产耗时
}
close(ch) // 生产完成后关闭channel,告知消费者没有更多数据
}
// 消费者函数,从channel接收数据并处理
func consumer(ch <-chan int, id int) {
for data := range ch {
fmt.Printf("消费者%d消费数据: %dn", id, data)
time.Sleep(time.Millisecond * 800) // 模拟消费耗时
}
fmt.Printf("消费者%d结束工作n", id)
}
func main() {
ch := make(chan int, 5) // 创建带缓冲的channel,缓冲大小为5
go producer(ch, 10) // 启动生产者协程
go consumer(ch, 1) // 启动消费者协程
time.Sleep(time.Second * 15) // 等待所有逻辑执行完成
}
多消费者扩展实现
实际场景中往往需要一个生产者对应多个消费者,提升消费效率,此时只需要启动多个消费者协程,共享同一个channel即可,实现方式如下:
package main
import (
"fmt"
"sync"
"time"
)
// 生产者函数
func producer(ch chan<- int, count int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i <= count; i++ {
fmt.Printf("生产者生产数据: %dn", i)
ch <- i
time.Sleep(time.Millisecond * 400)
}
close(ch)
}
// 消费者函数
func consumer(ch <-chan int, id int, wg *sync.WaitGroup) {
defer wg.Done()
for data := range ch {
fmt.Printf("消费者%d消费数据: %dn", id, data)
time.Sleep(time.Millisecond * 600)
}
fmt.Printf("消费者%d结束工作n", id)
}
func main() {
ch := make(chan int, 10)
var wg sync.WaitGroup
// 启动1个生产者
wg.Add(1)
go producer(ch, 15, &wg)
// 启动3个消费者
for i := 1; i <= 3; i++ {
wg.Add(1)
go consumer(ch, i, &wg)
}
wg.Wait() // 等待所有生产者和消费者完成工作
fmt.Println("所有任务执行完成")
}
实现技巧与注意事项
channel的选择
可以根据场景选择带缓冲或者不带缓冲的channel:
- 不带缓冲的channel:生产者和消费者会严格同步,生产者发送数据后会阻塞直到消费者接收,适合需要严格同步的场景。
- 带缓冲的channel:生产者发送数据时如果缓冲未满则不会阻塞,可以平衡生产和消费的速度差异,缓冲大小需要根据实际业务调整。
channel关闭的规范
只有生产者需要关闭channel,消费者不应该关闭channel,否则如果多个消费者同时关闭会触发panic。生产者在所有数据发送完成后关闭channel,消费者通过for range循环读取channel,当channel关闭且没有数据时循环会自动结束。
并发控制
如果有多个生产者,需要确保channel只被关闭一次,可以使用sync.Once来实现:
package main
import (
"fmt"
"sync"
"time"
)
var once sync.Once
func multiProducer(ch chan<- int, start, end int, wg *sync.WaitGroup, closeCh func()) {
defer wg.Done()
for i := start; i <= end; i++ {
fmt.Printf("生产者生产数据: %dn", i)
ch <- i
time.Sleep(time.Millisecond * 300)
}
closeCh() // 多个生产者调用时,只会执行一次关闭操作
}
func main() {
ch := make(chan int, 8)
var wg sync.WaitGroup
// 启动2个生产者
wg.Add(2)
go multiProducer(ch, 1, 5, &wg, func() {
once.Do(func() {
close(ch)
})
})
go multiProducer(ch, 6, 10, &wg, func() {
once.Do(func() {
close(ch)
})
})
// 启动消费者
go func() {
for data := range ch {
fmt.Printf("消费者消费数据: %dn", data)
time.Sleep(time.Millisecond * 500)
}
}()
wg.Wait()
time.Sleep(time.Second * 2)
}
优雅退出处理
如果需要支持外部信号触发生产者消费者停止工作,可以结合context.Context实现,通过context通知所有协程停止工作,避免数据丢失或者协程泄漏。