在Golang的并发编程场景中,多个协程同时操作同一个队列时,如果没有做同步控制,很容易出现数据竞争、队列状态不一致的问题,因此实现并发安全队列是很多业务开发的基础需求。不同的实现方案适用场景不同,开发者可以根据业务特性选择最合适的方案。

基于channel实现并发安全队列
channel是Golang天生支持并发安全的通信机制,利用其特性可以快速实现一个简单的并发安全队列,不需要额外加锁,使用起来非常便捷。
package main
import "fmt"
// ChannelQueue 基于channel实现的并发安全队列
type ChannelQueue struct {
ch chan interface{} // 存储队列元素的channel
}
// NewChannelQueue 初始化队列,指定队列容量
func NewChannelQueue(capacity int) *ChannelQueue {
return &ChannelQueue{
ch: make(chan interface{}, capacity),
}
}
// Enqueue 入队操作
func (q *ChannelQueue) Enqueue(val interface{}) {
q.ch <- val
}
// Dequeue 出队操作,返回元素和是否成功
func (q *ChannelQueue) Dequeue() (interface{}, bool) {
select {
case val := <-q.ch:
return val, true
default:
// 队列为空时返回失败
return nil, false
}
}
// Size 获取队列当前长度
func (q *ChannelQueue) Size() int {
return len(q.ch)
}
func main() {
queue := NewChannelQueue(10)
// 启动协程入队
go func() {
for i := 0; i < 5; i++ {
queue.Enqueue(i)
fmt.Printf("入队元素: %dn", i)
}
}()
// 主协程出队
for i := 0; i < 5; i++ {
val, ok := queue.Dequeue()
if ok {
fmt.Printf("出队元素: %dn", val)
}
}
}
这种实现方式的优势是代码简洁,不需要手动处理锁逻辑,channel内部已经做好了并发控制。但是缺点也比较明显,队列的容量在初始化时就固定了,无法动态扩容,而且不支持获取队列头部元素但不移除的操作。
基于mutex实现基础并发安全队列
如果需要更灵活的队列操作,可以基于切片和互斥锁sync.Mutex实现并发安全队列,支持动态扩容,也可以自定义更多操作方法。
package main
import (
"fmt"
"sync"
)
// MutexQueue 基于互斥锁实现的并发安全队列
type MutexQueue struct {
mu sync.Mutex // 互斥锁,保证并发安全
items []interface{} // 存储队列元素的切片
}
// NewMutexQueue 初始化队列
func NewMutexQueue() *MutexQueue {
return &MutexQueue{
items: make([]interface{}, 0),
}
}
// Enqueue 入队操作
func (q *MutexQueue) Enqueue(val interface{}) {
q.mu.Lock()
defer q.mu.Unlock()
q.items = append(q.items, val)
}
// Dequeue 出队操作,返回元素和是否成功
func (q *MutexQueue) Dequeue() (interface{}, bool) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.items) == 0 {
return nil, false
}
// 取第一个元素
val := q.items[0]
// 移除第一个元素,重新切片
q.items = q.items[1:]
return val, true
}
// Peek 获取队首元素但不移除
func (q *MutexQueue) Peek() (interface{}, bool) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.items) == 0 {
return nil, false
}
return q.items[0], true
}
// Size 获取队列长度
func (q *MutexQueue) Size() int {
q.mu.Lock()
defer q.mu.Unlock()
return len(q.items)
}
func main() {
queue := NewMutexQueue()
var wg sync.WaitGroup
// 启动两个协程入队
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 3; i++ {
queue.Enqueue(fmt.Sprintf("协程1-元素%d", i))
}
}()
go func() {
defer wg.Done()
for i := 0; i < 3; i++ {
queue.Enqueue(fmt.Sprintf("协程2-元素%d", i))
}
}()
wg.Wait()
// 出队所有元素
for queue.Size() > 0 {
val, _ := queue.Dequeue()
fmt.Printf("出队元素: %sn", val)
}
}
这种实现方式支持动态扩容,也可以自定义更多操作接口,但是每次操作都需要加锁,高并发场景下锁竞争会比较明显,性能会有所下降。
基于sync.Cond实现阻塞式并发安全队列
如果业务需要队列在为空时出队操作阻塞等待,直到有元素入队,可以使用sync.Cond条件变量配合互斥锁实现阻塞式队列。
package main
import (
"fmt"
"sync"
)
// CondQueue 基于条件变量实现的阻塞式并发安全队列
type CondQueue struct {
mu sync.Mutex
cond *sync.Cond // 条件变量,用于阻塞等待
items []interface{}
}
// NewCondQueue 初始化队列
func NewCondQueue() *CondQueue {
q := &CondQueue{
items: make([]interface{}, 0),
}
q.cond = sync.NewCond(&q.mu)
return q
}
// Enqueue 入队操作,入队后唤醒等待的协程
func (q *CondQueue) Enqueue(val interface{}) {
q.mu.Lock()
defer q.mu.Unlock()
q.items = append(q.items, val)
// 唤醒一个等待出队的协程
q.cond.Signal()
}
// Dequeue 出队操作,队列为空时阻塞等待
func (q *CondQueue) Dequeue() interface{} {
q.mu.Lock()
defer q.mu.Unlock()
// 队列为空时等待
for len(q.items) == 0 {
q.cond.Wait()
}
val := q.items[0]
q.items = q.items[1:]
return val
}
func main() {
queue := NewCondQueue()
// 启动消费者协程,先阻塞等待
go func() {
for i := 0; i < 3; i++ {
val := queue.Dequeue()
fmt.Printf("消费者拿到元素: %vn", val)
}
}()
// 主协程延迟入队,模拟生产逻辑
for i := 0; i < 3; i++ {
queue.Enqueue(fmt.Sprintf("生产元素%d", i))
fmt.Printf("生产入队元素: 生产元素%dn", i)
}
// 防止主协程提前退出
select {}
}
这种实现方式可以满足阻塞等待的需求,适合生产者消费者模型的场景,但是使用条件变量需要注意等待条件的判断要用for循环,避免虚假唤醒的问题。
不同实现方案对比
三种常见的并发安全队列实现方案各有适用场景,具体对比如下:
| 实现方案 | 优势 | 劣势 | 适用场景 |
|---|---|---|---|
| channel实现 | 代码简洁,无需手动处理锁,并发安全由runtime保证 | 容量固定,功能有限,不支持动态扩容 | 简单的队列场景,容量可预估,不需要复杂操作 |
| mutex实现 | 支持动态扩容,可自定义操作接口,灵活度高 | 高并发下锁竞争明显,性能一般 | 需要灵活操作队列,并发量不高的场景 |
| sync.Cond实现 | 支持阻塞等待,适合生产者消费者模型 | 使用复杂度高,需要注意虚假唤醒问题 | 需要阻塞等待元素的消费者场景,生产者消费者模型 |
实际应用注意事项
在实际使用并发安全队列时,还需要注意几个问题:第一是队列的元素类型如果是引用类型,需要注意元素本身的并发安全,队列只保证入队出队操作的原子性,不保证元素内部状态的并发安全;第二是如果队列使用场景是极高并发,建议优先选择channel实现,或者根据压测结果选择性能更优的方案;第三是如果队列需要支持关闭操作,需要在实现中增加关闭标记,避免关闭后继续入队导致异常。