导读:本期聚焦于小伙伴创作的《C#中BlockingCollection怎么用?阻塞集合线程安全队列使用教程》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《C#中BlockingCollection怎么用?阻塞集合线程安全队列使用教程》有用,将其分享出去将是对创作者最好的鼓励。

在C#多线程编程中,多个线程之间需要安全地传递数据时,普通的集合类型往往无法满足线程安全的要求,而BlockingCollection是.NET框架提供的线程安全阻塞集合,能够很好地解决这类问题,它内部封装了多种线程安全集合实现,默认使用ConcurrentQueue作为底层存储,同时提供了阻塞等待、集合上限控制等实用功能。

C#中BlockingCollection怎么用?阻塞集合线程安全队列使用教程

BlockingCollection基础初始化

BlockingCollection的初始化方式比较灵活,我们可以指定底层使用的线程安全集合类型,也可以设置集合的最大容量,避免内存占用过高。

using System.Collections.Concurrent;

// 不指定参数,默认使用ConcurrentQueue,无上限
BlockingCollection<int> defaultCollection = new BlockingCollection<int>();

// 指定最大容量为10,超过容量时添加操作会阻塞
BlockingCollection<string> boundedCollection = new BlockingCollection<string>(10);

// 指定底层使用ConcurrentStack,先进后出
BlockingCollection<int> stackBasedCollection = new BlockingCollection<int>(new ConcurrentStack<int>());

核心数据操作方法

添加数据

添加数据可以使用Add方法,如果集合设置了上限且当前已满,该方法会阻塞当前线程直到有空间可用,也可以使用TryAdd方法设置超时时间,避免无限阻塞。

BlockingCollection<int> collection = new BlockingCollection<int>(2);

// 阻塞添加,集合满时当前线程会等待
collection.Add(1);
collection.Add(2);

// 尝试添加,最多等待1秒,返回是否添加成功
bool success = collection.TryAdd(3, TimeSpan.FromSeconds(1));

获取数据

获取数据可以使用Take方法,如果集合为空,该方法会阻塞当前线程直到有数据可用,同样也有TryTake方法支持超时设置。

BlockingCollection<string> dataCollection = new BlockingCollection<string>();

// 启动一个线程添加数据
Task.Run(() =>
{
    Thread.Sleep(1000);
    dataCollection.Add("测试数据");
});

// 阻塞获取数据,等待1秒后拿到数据
string result = dataCollection.Take();
Console.WriteLine(result);

// 尝试获取数据,最多等待500毫秒
bool hasData = dataCollection.TryTake(out string tempData, TimeSpan.FromMilliseconds(500));

完成添加标记

当生产者不再添加数据时,需要调用CompleteAdding方法标记集合已完成添加,此时消费者端的阻塞获取操作会正常结束,不会一直等待新数据。

BlockingCollection<int> markCollection = new BlockingCollection<int>();

// 生产者线程
Task.Run(() =>
{
    for (int i = 0; i < 5; i++)
    {
        markCollection.Add(i);
    }
    // 标记完成添加
    markCollection.CompleteAdding();
});

// 消费者线程
Task.Run(() =>
{
    // IsCompleted为true时表示已完成添加且集合为空
    while (!markCollection.IsCompleted)
    {
        try
        {
            int value = markCollection.Take();
            Console.WriteLine($"获取到数据:{value}");
        }
        catch (InvalidOperationException)
        {
            // Take在CompleteAdding后调用会抛异常,这里捕获处理
            break;
        }
    }
});

生产者消费者模式实战

BlockingCollection最常见的应用场景就是生产者消费者模式,下面通过一个完整的示例展示多个生产者和多个消费者协同工作的场景。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

class ProducerConsumerDemo
{
    static void Main()
    {
        // 创建容量为5的阻塞集合
        BlockingCollection<int> queue = new BlockingCollection<int>(5);
        CancellationTokenSource cts = new CancellationTokenSource();

        // 启动2个生产者
        Task producer1 = Task.Run(() => ProduceData(queue, 1, cts.Token));
        Task producer2 = Task.Run(() => ProduceData(queue, 2, cts.Token));

        // 启动3个消费者
        Task consumer1 = Task.Run(() => ConsumeData(queue, 1, cts.Token));
        Task consumer2 = Task.Run(() => ConsumeData(queue, 2, cts.Token));
        Task consumer3 = Task.Run(() => ConsumeData(queue, 3, cts.Token));

        // 运行5秒后取消操作
        Thread.Sleep(5000);
        cts.Cancel();
        // 标记完成添加
        queue.CompleteAdding();

        Task.WaitAll(producer1, producer2, consumer1, consumer2, consumer3);
        Console.WriteLine("所有任务执行完成");
    }

    static void ProduceData(BlockingCollection<int> collection, int producerId, CancellationToken token)
    {
        int count = 0;
        while (!token.IsCancellationRequested)
        {
            try
            {
                // 尝试添加数据,超时100毫秒
                bool added = collection.TryAdd(count, TimeSpan.FromMilliseconds(100), token);
                if (added)
                {
                    Console.WriteLine($"生产者{producerId}添加数据:{count}");
                    count++;
                }
            }
            catch (OperationCanceledException)
            {
                // 取消时退出循环
                break;
            }
        }
        Console.WriteLine($"生产者{producerId}停止工作");
    }

    static void ConsumeData(BlockingCollection<int> collection, int consumerId, CancellationToken token)
    {
        while (!collection.IsCompleted)
        {
            try
            {
                // 尝试获取数据,超时100毫秒
                bool hasData = collection.TryTake(out int data, TimeSpan.FromMilliseconds(100), token);
                if (hasData)
                {
                    Console.WriteLine($"消费者{consumerId}获取数据:{data}");
                    // 模拟处理耗时
                    Thread.Sleep(50);
                }
            }
            catch (OperationCanceledException)
            {
                // 取消时退出循环
                break;
            }
        }
        Console.WriteLine($"消费者{consumerId}停止工作");
    }
}

使用注意事项

  • 调用CompleteAdding后不能再调用Add方法添加数据,否则会抛出InvalidOperationException异常。
  • 如果有多个消费者,Take方法会自动分配数据,不需要额外做负载均衡处理。
  • 使用GetConsumingEnumerable方法可以遍历集合中的数据,遍历会在CompleteAdding调用且集合为空时自动结束。
  • 如果需要支持取消操作,尽量使用带CancellationToken参数的重载方法,避免使用Thread.Sleep等方式做等待。
BlockingCollection<int> enumCollection = new BlockingCollection<int>();

Task.Run(() =>
{
    for (int i = 0; i < 3; i++)
    {
        enumCollection.Add(i);
    }
    enumCollection.CompleteAdding();
});

// 遍历消费数据,集合完成添加且为空时遍历结束
foreach (int item in enumCollection.GetConsumingEnumerable())
{
    Console.WriteLine($"遍历获取数据:{item}");
}

BlockingCollectionC#线程安全队列阻塞集合修改时间:2026-07-05 08:30:33

免责声明:​ 已尽一切努力确保本网站所含信息的准确性。网站内容多为原创整理与精心编撰,观点力求客观中立。本站旨在免费分享,内容仅供个人学习、研究或参考使用。若引用了第三方作品,版权归原作者所有。如内容涉及您的权益,请联系我们处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。AI、前端、编程、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握开发与运维所需的核心技术。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端编程,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。