C#中的Queue是System.Collections.Generic命名空间下的泛型集合类,它严格遵循先进先出FIFO的规则,即最先进入队列的元素会最先被取出,这种特性非常适合消息缓冲场景,能够暂存生产端产生的消息,再按序提供给消费端处理。

C# Queue队列基础操作
Queue提供了几个核心方法用于元素的入队、出队和查看,使用前需要先引入对应的命名空间。
using System;
using System.Collections.Generic;
class Program
{
static void Main()
{
// 初始化一个存储字符串类型的Queue队列
Queue<string> messageQueue = new Queue<string>();
// 入队操作:将元素添加到队列末尾
messageQueue.Enqueue("消息1");
messageQueue.Enqueue("消息2");
messageQueue.Enqueue("消息3");
// 查看队首元素,不会移除该元素
string peekMsg = messageQueue.Peek();
Console.WriteLine("队首消息:" + peekMsg);
// 出队操作:移除并返回队首元素
string dequeueMsg = messageQueue.Dequeue();
Console.WriteLine("出队消息:" + dequeueMsg);
// 查看当前队列元素数量
Console.WriteLine("剩余队列长度:" + messageQueue.Count);
}
}
Queue先进先出FIFO的实现原理
Queue内部使用循环数组来存储元素,它会维护两个指针分别指向队首和队尾的位置。当执行Enqueue操作时,元素会被添加到队尾指针指向的位置,队尾指针后移;执行Dequeue操作时,会取出队首指针指向的元素,队首指针后移。这种实现方式保证了元素的处理顺序严格按照进入队列的顺序,不会出现乱序的情况。
需要注意的是,当队列的元素数量超过内部数组的容量时,Queue会自动扩容,扩容时会创建一个新的更大的数组,并将原有元素复制到新数组中,这个过程会有一定的性能开销,如果可以预估消息的最大数量,建议在初始化时指定队列的初始容量。
Queue在消息缓冲中的应用实现
在实际的消息缓冲场景中,通常会有消息生产者和消息消费者两个角色,生产者产生消息的速度可能快于消费者处理消息的速度,这时候就可以用Queue作为缓冲区暂存消息,避免消息丢失或者阻塞生产者。
基础消息缓冲实现
下面是一个简单的消息缓冲示例,模拟生产者不断产生消息,消费者按序处理消息的逻辑:
using System;
using System.Collections.Generic;
using System.Threading;
class MessageBuffer
{
// 消息缓冲队列
private Queue<string> _bufferQueue = new Queue<string>();
// 队列最大容量,避免内存溢出
private readonly int _maxCapacity;
public MessageBuffer(int maxCapacity = 100)
{
_maxCapacity = maxCapacity;
}
// 生产者入队方法,超过最大容量时等待
public void EnqueueMessage(string message)
{
while (_bufferQueue.Count >= _maxCapacity)
{
Console.WriteLine("队列已满,等待消费后再入队");
Thread.Sleep(100);
}
_bufferQueue.Enqueue(message);
Console.WriteLine($"消息入队:{message},当前队列长度:{_bufferQueue.Count}");
}
// 消费者出队方法,队列为空时等待
public string DequeueMessage()
{
while (_bufferQueue.Count == 0)
{
Console.WriteLine("队列为空,等待生产消息");
Thread.Sleep(100);
}
string message = _bufferQueue.Dequeue();
Console.WriteLine($"消息出队:{message},当前队列长度:{_bufferQueue.Count}");
return message;
}
}
class Program
{
static void Main()
{
MessageBuffer buffer = new MessageBuffer(5);
// 启动生产者线程
Thread producer = new Thread(() =>
{
for (int i = 1; i <= 10; i++)
{
buffer.EnqueueMessage($"生产消息_{i}");
Thread.Sleep(50);
}
});
// 启动消费者线程
Thread consumer = new Thread(() =>
{
for (int i = 1; i <= 10; i++)
{
buffer.DequeueMessage();
Thread.Sleep(150);
}
});
producer.Start();
consumer.Start();
producer.Join();
consumer.Join();
}
}
线程安全的消息缓冲优化
上面的示例在单线程场景下没有问题,但如果多个生产者或者多个消费者同时操作队列,就会出现线程安全问题,这时候可以使用ConcurrentQueue来替代普通的Queue,它是线程安全的队列类型,不需要手动加锁就可以在多线程场景下使用。
using System;
using System.Collections.Concurrent;
using System.Threading;
class SafeMessageBuffer
{
// 线程安全的队列
private ConcurrentQueue<string> _concurrentQueue = new ConcurrentQueue<string>();
// 入队操作
public void EnqueueMessage(string message)
{
_concurrentQueue.Enqueue(message);
Console.WriteLine($"消息入队:{message},当前队列长度:{_concurrentQueue.Count}");
}
// 出队操作
public bool TryDequeueMessage(out string message)
{
return _concurrentQueue.TryDequeue(out message);
}
}
class Program
{
static void Main()
{
SafeMessageBuffer safeBuffer = new SafeMessageBuffer();
// 多个生产者线程
for (int i = 0; i < 3; i++)
{
int producerId = i;
Thread producer = new Thread(() =>
{
for (int j = 1; j <= 5; j++)
{
safeBuffer.EnqueueMessage($"生产者{producerId}_消息{j}");
Thread.Sleep(30);
}
});
producer.Start();
}
// 多个消费者线程
for (int i = 0; i < 2; i++)
{
int consumerId = i;
Thread consumer = new Thread(() =>
{
for (int j = 0; j < 7; j++)
{
if (safeBuffer.TryDequeueMessage(out string msg))
{
Console.WriteLine($"消费者{consumerId}处理消息:{msg}");
}
else
{
Console.WriteLine($"消费者{consumerId}未获取到消息");
}
Thread.Sleep(100);
}
});
consumer.Start();
}
}
}
使用Queue实现消息缓冲的注意事项
- 合理设置队列的最大容量,避免无限制入队导致内存溢出,尤其是消息生产速度远快于消费速度的场景。
- 多线程场景下优先使用
ConcurrentQueue,避免手动加锁带来的复杂度和死锁风险。 - 如果消息需要持久化,不能只依赖内存中的Queue,需要结合文件或者数据库存储,避免程序重启后消息丢失。
- 处理出队消息时要注意异常捕获,避免单个消息处理失败导致整个消费流程阻塞。