在C#多线程编程中,线程间的数据传递和共享经常需要处理线程安全问题,手动编写锁逻辑不仅复杂还容易出错。BlockingCollection是.NET框架提供的线程安全集合类,专门用于处理生产者消费者场景下的数据流转,内置了阻塞和限流能力,大幅降低了多线程数据交互的实现难度。

BlockingCollection基础概念
BlockingCollection是对IProducerConsumerCollection<T>接口的包装,默认使用ConcurrentQueue<T>作为底层存储,也可以指定其他实现了该接口的集合作为底层容器。它的核心特性是支持阻塞操作:当集合为空时,消费者线程尝试获取元素会被阻塞,直到有新的元素加入;当集合达到指定容量上限时,生产者线程尝试添加元素会被阻塞,直到有元素被取出。
核心属性与方法
- Capacity:集合的最大容量,默认是无上限,设置后超过容量时添加操作会阻塞
- IsAddingCompleted:标记是否不再有新的元素添加,用于通知消费者所有元素已生产完毕
- Add(T item):添加元素到集合,若集合已满则阻塞当前线程
- TryAdd(T item, int millisecondsTimeout):尝试在指定超时时间内添加元素,超时返回false
- Take():从集合取出一个元素,若集合为空则阻塞当前线程
- TryTake(out T item, int millisecondsTimeout):尝试在指定超时时间内取出元素,超时返回false
- CompleteAdding():标记集合不再接受新的添加操作,触发消费者结束等待
入门示例:简单生产者消费者实现
下面通过一个简单的示例演示最基本的生产者消费者模式,生产者线程生成10个整数并添加到集合,消费者线程从集合取出元素并打印。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main()
{
// 创建无容量限制的BlockingCollection
BlockingCollection<int> collection = new BlockingCollection<int>();
// 启动生产者任务
Task producer = Task.Run(() =>
{
for (int i = 1; i <= 10; i++)
{
Console.WriteLine($"生产者生产元素:{i}");
collection.Add(i);
// 模拟生产耗时
Thread.Sleep(500);
}
// 标记生产完成,不再添加新元素
collection.CompleteAdding();
Console.WriteLine("生产者生产完毕");
});
// 启动消费者任务
Task consumer = Task.Run(() =>
{
// 当集合没有完成添加,或者集合中还有剩余元素时持续消费
while (!collection.IsAddingCompleted || collection.Count > 0)
{
try
{
// 取出元素,若集合为空会阻塞当前线程
int item = collection.Take();
Console.WriteLine($"消费者消费元素:{item}");
// 模拟消费耗时
Thread.Sleep(800);
}
catch (InvalidOperationException)
{
// 集合被标记完成添加且为空时,Take会抛出该异常,直接退出循环
break;
}
}
Console.WriteLine("消费者消费完毕");
});
// 等待两个任务执行完成
Task.WaitAll(producer, consumer);
}
}
进阶用法:限制集合容量与超时操作
实际场景中通常需要限制集合的最大容量,避免内存占用过高,同时可以使用超时方法避免线程无限阻塞。下面的示例设置集合容量为3,生产者生产速度大于消费者消费速度时,添加操作会阻塞。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main()
{
// 创建容量为3的BlockingCollection
BlockingCollection<string> collection = new BlockingCollection<string>(3);
// 生产者任务
Task producer = Task.Run(() =>
{
for (int i = 1; i <= 5; i++)
{
string item = $"数据{i}";
// 尝试在1秒内添加元素,超时则放弃
if (collection.TryAdd(item, 1000))
{
Console.WriteLine($"成功添加:{item},当前集合数量:{collection.Count}");
}
else
{
Console.WriteLine($"添加{item}超时,放弃添加");
}
Thread.Sleep(300);
}
collection.CompleteAdding();
});
// 消费者任务
Task consumer = Task.Run(() =>
{
while (!collection.IsAddingCompleted || collection.Count > 0)
{
if (collection.TryTake(out string item, 1000))
{
Console.WriteLine($"成功取出:{item},当前集合数量:{collection.Count}");
Thread.Sleep(1000);
}
else
{
Console.WriteLine("取出元素超时");
}
}
});
Task.WaitAll(producer, consumer);
}
}
高级场景:自定义底层集合与批量操作
BlockingCollection支持指定自定义底层集合,比如使用ConcurrentStack<T>实现后进先出的消费逻辑,同时提供了GetConsumingEnumerable()方法,可以遍历集合中的元素,自动处理阻塞和完成标记,简化消费者代码。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main()
{
// 使用ConcurrentStack作为底层集合,实现后进先出
BlockingCollection<int> collection = new BlockingCollection<int>(new ConcurrentStack<int>());
Task producer = Task.Run(() =>
{
for (int i = 1; i <= 5; i++)
{
collection.Add(i);
Console.WriteLine($"生产:{i}");
Thread.Sleep(200);
}
collection.CompleteAdding();
});
// 使用GetConsumingEnumerable遍历,集合为空时自动阻塞,标记完成后自动结束遍历
Task consumer = Task.Run(() =>
{
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine($"消费:{item}");
Thread.Sleep(500);
}
Console.WriteLine("消费完成");
});
Task.WaitAll(producer, consumer);
}
}
使用注意事项
- 调用
CompleteAdding()后不能再调用Add()方法,否则会抛出InvalidOperationException - 多个消费者同时消费时,每个元素只会被一个消费者获取,不会出现重复消费的问题
- 不需要使用集合时,建议调用
Dispose()方法释放资源,避免内存泄漏 - 若不需要阻塞特性,可以直接使用底层的
ConcurrentQueue<T>等集合,性能会更高
常见问题解答
BlockingCollection和ConcurrentQueue有什么区别?
ConcurrentQueue是单纯的线程安全队列,没有阻塞和限流能力,当队列为空时取元素会返回默认值,不会阻塞线程。BlockingCollection包装了ConcurrentQueue,增加了阻塞等待、容量限制、完成标记等特性,更适合生产者消费者场景。
多个生产者同时添加元素是否安全?
BlockingCollection的所有添加和取出操作都是线程安全的,多个生产者和多个消费者同时操作不会出现数据错乱的问题,不需要额外加锁。
BlockingCollectionC#线程安全生产者消费者模式集合修改时间:2026-06-20 09:45:37