在C#多线程编程场景中,生产者消费者模式是处理异步数据流转的常用设计模式,传统实现需要手动处理线程同步、队列阻塞等复杂逻辑,而标准库提供的BlockingCollection类已经封装了这些能力,能快速实现该模式。

BlockingCollection核心特性
BlockingCollection是一个线程安全的集合包装类,默认基于ConcurrentQueue<T>实现,核心特性如下:
- 支持多线程并发添加和移除元素,无需手动加锁
- 当集合为空时,消费线程调用获取方法会自动阻塞,直到有新元素加入
- 当集合达到指定上限时,生产线程调用添加方法会自动阻塞,直到有元素被移除
- 内置完成标记机制,可通知消费线程不再有新的生产数据
基础使用步骤
1. 创建BlockingCollection实例
可以指定底层集合类型和最大容量,不指定容量则为无上限模式:
using System.Collections.Concurrent; // 创建最大容量为10的BlockingCollection,默认底层是ConcurrentQueue BlockingCollection<int> collection = new BlockingCollection<int>(10); // 也可以指定底层集合类型,比如使用ConcurrentStack // BlockingCollection<int> stackCollection = new BlockingCollection<int>(new ConcurrentStack<int>(), 10);
2. 实现生产者逻辑
生产者线程调用Add方法添加元素,当集合满时会自动阻塞:
using System;
using System.Collections.Concurrent;
using System.Threading;
class Producer
{
private BlockingCollection<int> _collection;
private int _produceCount;
public Producer(BlockingCollection<int> collection, int produceCount)
{
_collection = collection;
_produceCount = produceCount;
}
public void Run()
{
for (int i = 0; i < _produceCount; i++)
{
// 添加元素,集合满时会自动阻塞等待
_collection.Add(i);
Console.WriteLine($"生产者生产了数据:{i}");
Thread.Sleep(100); // 模拟生产耗时
}
// 标记生产完成,通知消费者不再有新数据
_collection.CompleteAdding();
Console.WriteLine("生产者已完成所有生产任务");
}
}
3. 实现消费者逻辑
消费者线程调用Take方法获取元素,集合空时会自动阻塞,也可以通过GetConsumingEnumerable遍历消费:
using System;
using System.Collections.Concurrent;
using System.Threading;
class Consumer
{
private BlockingCollection<int> _collection;
private string _consumerName;
public Consumer(BlockingCollection<int> collection, string consumerName)
{
_collection = collection;
_consumerName = consumerName;
}
public void Run()
{
// 使用GetConsumingEnumerable遍历,集合空且未完成添加时会阻塞,完成后自动结束遍历
foreach (var item in _collection.GetConsumingEnumerable())
{
Console.WriteLine($"{_consumerName}消费了数据:{item}");
Thread.Sleep(200); // 模拟消费耗时
}
Console.WriteLine($"{_consumerName}消费完成,无更多数据");
}
}
4. 完整运行示例
启动生产者和消费者线程,验证模式运行效果:
using System;
using System.Collections.Concurrent;
using System.Threading;
class Program
{
static void Main()
{
// 创建容量为5的BlockingCollection
BlockingCollection<int> collection = new BlockingCollection<int>(5);
// 创建生产者,生产20条数据
Producer producer = new Producer(collection, 20);
// 创建两个消费者
Consumer consumer1 = new Consumer(collection, "消费者1");
Consumer consumer2 = new Consumer(collection, "消费者2");
// 启动线程
Thread producerThread = new Thread(producer.Run);
Thread consumerThread1 = new Thread(consumer1.Run);
Thread consumerThread2 = new Thread(consumer2.Run);
producerThread.Start();
consumerThread1.Start();
consumerThread2.Start();
// 等待所有线程执行完成
producerThread.Join();
consumerThread1.Join();
consumerThread2.Join();
Console.WriteLine("所有任务执行完毕");
}
}
使用注意事项
- 调用
CompleteAdding后不能再向集合中添加元素,否则会抛出异常 - 如果不需要限流能力,可以不指定集合容量,此时
Add方法不会阻塞 - 消费时优先使用
GetConsumingEnumerable遍历,比手动循环调用Take更简洁,也能自动处理完成标记 - 多线程场景下不需要额外加锁,BlockingCollection内部已经实现了所有线程同步逻辑
BlockingCollection是.NET标准库中实现生产者消费者模式的首选方案,相比手动使用锁和队列实现,代码量更少,稳定性更高,适合大部分多线程数据流转场景。
C#BlockingCollection生产者消费者模式多线程修改时间:2026-07-05 22:51:29