在C#多线程编程中,多个线程之间需要安全地传递数据时,普通的集合类型往往无法满足线程安全的要求,而BlockingCollection是.NET框架提供的线程安全阻塞集合,能够很好地解决这类问题,它内部封装了多种线程安全集合实现,默认使用ConcurrentQueue作为底层存储,同时提供了阻塞等待、集合上限控制等实用功能。

BlockingCollection基础初始化
BlockingCollection的初始化方式比较灵活,我们可以指定底层使用的线程安全集合类型,也可以设置集合的最大容量,避免内存占用过高。
using System.Collections.Concurrent; // 不指定参数,默认使用ConcurrentQueue,无上限 BlockingCollection<int> defaultCollection = new BlockingCollection<int>(); // 指定最大容量为10,超过容量时添加操作会阻塞 BlockingCollection<string> boundedCollection = new BlockingCollection<string>(10); // 指定底层使用ConcurrentStack,先进后出 BlockingCollection<int> stackBasedCollection = new BlockingCollection<int>(new ConcurrentStack<int>());
核心数据操作方法
添加数据
添加数据可以使用Add方法,如果集合设置了上限且当前已满,该方法会阻塞当前线程直到有空间可用,也可以使用TryAdd方法设置超时时间,避免无限阻塞。
BlockingCollection<int> collection = new BlockingCollection<int>(2); // 阻塞添加,集合满时当前线程会等待 collection.Add(1); collection.Add(2); // 尝试添加,最多等待1秒,返回是否添加成功 bool success = collection.TryAdd(3, TimeSpan.FromSeconds(1));
获取数据
获取数据可以使用Take方法,如果集合为空,该方法会阻塞当前线程直到有数据可用,同样也有TryTake方法支持超时设置。
BlockingCollection<string> dataCollection = new BlockingCollection<string>();
// 启动一个线程添加数据
Task.Run(() =>
{
Thread.Sleep(1000);
dataCollection.Add("测试数据");
});
// 阻塞获取数据,等待1秒后拿到数据
string result = dataCollection.Take();
Console.WriteLine(result);
// 尝试获取数据,最多等待500毫秒
bool hasData = dataCollection.TryTake(out string tempData, TimeSpan.FromMilliseconds(500));
完成添加标记
当生产者不再添加数据时,需要调用CompleteAdding方法标记集合已完成添加,此时消费者端的阻塞获取操作会正常结束,不会一直等待新数据。
BlockingCollection<int> markCollection = new BlockingCollection<int>();
// 生产者线程
Task.Run(() =>
{
for (int i = 0; i < 5; i++)
{
markCollection.Add(i);
}
// 标记完成添加
markCollection.CompleteAdding();
});
// 消费者线程
Task.Run(() =>
{
// IsCompleted为true时表示已完成添加且集合为空
while (!markCollection.IsCompleted)
{
try
{
int value = markCollection.Take();
Console.WriteLine($"获取到数据:{value}");
}
catch (InvalidOperationException)
{
// Take在CompleteAdding后调用会抛异常,这里捕获处理
break;
}
}
});
生产者消费者模式实战
BlockingCollection最常见的应用场景就是生产者消费者模式,下面通过一个完整的示例展示多个生产者和多个消费者协同工作的场景。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class ProducerConsumerDemo
{
static void Main()
{
// 创建容量为5的阻塞集合
BlockingCollection<int> queue = new BlockingCollection<int>(5);
CancellationTokenSource cts = new CancellationTokenSource();
// 启动2个生产者
Task producer1 = Task.Run(() => ProduceData(queue, 1, cts.Token));
Task producer2 = Task.Run(() => ProduceData(queue, 2, cts.Token));
// 启动3个消费者
Task consumer1 = Task.Run(() => ConsumeData(queue, 1, cts.Token));
Task consumer2 = Task.Run(() => ConsumeData(queue, 2, cts.Token));
Task consumer3 = Task.Run(() => ConsumeData(queue, 3, cts.Token));
// 运行5秒后取消操作
Thread.Sleep(5000);
cts.Cancel();
// 标记完成添加
queue.CompleteAdding();
Task.WaitAll(producer1, producer2, consumer1, consumer2, consumer3);
Console.WriteLine("所有任务执行完成");
}
static void ProduceData(BlockingCollection<int> collection, int producerId, CancellationToken token)
{
int count = 0;
while (!token.IsCancellationRequested)
{
try
{
// 尝试添加数据,超时100毫秒
bool added = collection.TryAdd(count, TimeSpan.FromMilliseconds(100), token);
if (added)
{
Console.WriteLine($"生产者{producerId}添加数据:{count}");
count++;
}
}
catch (OperationCanceledException)
{
// 取消时退出循环
break;
}
}
Console.WriteLine($"生产者{producerId}停止工作");
}
static void ConsumeData(BlockingCollection<int> collection, int consumerId, CancellationToken token)
{
while (!collection.IsCompleted)
{
try
{
// 尝试获取数据,超时100毫秒
bool hasData = collection.TryTake(out int data, TimeSpan.FromMilliseconds(100), token);
if (hasData)
{
Console.WriteLine($"消费者{consumerId}获取数据:{data}");
// 模拟处理耗时
Thread.Sleep(50);
}
}
catch (OperationCanceledException)
{
// 取消时退出循环
break;
}
}
Console.WriteLine($"消费者{consumerId}停止工作");
}
}
使用注意事项
- 调用
CompleteAdding后不能再调用Add方法添加数据,否则会抛出InvalidOperationException异常。 - 如果有多个消费者,
Take方法会自动分配数据,不需要额外做负载均衡处理。 - 使用
GetConsumingEnumerable方法可以遍历集合中的数据,遍历会在CompleteAdding调用且集合为空时自动结束。 - 如果需要支持取消操作,尽量使用带
CancellationToken参数的重载方法,避免使用Thread.Sleep等方式做等待。
BlockingCollection<int> enumCollection = new BlockingCollection<int>();
Task.Run(() =>
{
for (int i = 0; i < 3; i++)
{
enumCollection.Add(i);
}
enumCollection.CompleteAdding();
});
// 遍历消费数据,集合完成添加且为空时遍历结束
foreach (int item in enumCollection.GetConsumingEnumerable())
{
Console.WriteLine($"遍历获取数据:{item}");
}
BlockingCollectionC#线程安全队列阻塞集合修改时间:2026-07-05 08:30:33