C#如何实现进程内事件发布订阅的消息总线EventBus

来源:Python编程网作者:日本程序员头衔:程序员
导读:本期聚焦于小伙伴创作的《C#如何实现进程内事件发布订阅的消息总线EventBus》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《C#如何实现进程内事件发布订阅的消息总线EventBus》有用,将其分享出去将是对创作者最好的鼓励。

在C#应用开发中,组件之间的直接调用往往会导致代码耦合度升高,后期维护成本增加。进程内事件发布订阅的消息总线EventBus可以有效解决这个问题,通过统一的事件分发机制,让事件的发布者和订阅者不需要直接依赖对方,只需要关注事件本身即可。下面我们通过完整的实现过程来掌握EventBus的构建方法。

C#如何实现进程内事件发布订阅的消息总线EventBus

核心设计思路

EventBus的核心功能是管理事件的订阅关系,并在事件发布时通知所有对应的订阅者。整体设计可以分为几个核心部分:事件基类、订阅管理器、EventBus核心类、订阅处理委托。事件基类是所有自定义事件的父类,订阅管理器负责维护事件类型和对应订阅者的映射关系,EventBus对外提供订阅、取消订阅、发布事件的接口,订阅处理委托用于定义订阅者处理事件的方法签名。

基础类型定义

首先定义事件基类,所有自定义事件都需要继承这个基类,这样可以统一事件的类型处理:

// 事件基类,所有自定义事件继承此类
public abstract class EventBase
{
    // 事件唯一标识,可用于跟踪事件
    public string EventId { get; set; } = Guid.NewGuid().ToString();
    // 事件发生时间
    public DateTime OccurredTime { get; set; } = DateTime.Now;
}

接着定义订阅者的处理委托,支持同步和异步两种处理方式:

// 同步事件处理委托
public delegate void EventHandler<TEvent>(TEvent @event) where TEvent : EventBase;
// 异步事件处理委托
public delegate Task AsyncEventHandler<TEvent>(TEvent @event) where TEvent : EventBase;

订阅管理器实现

订阅管理器负责维护事件类型和对应处理方法的映射,需要支持添加订阅、移除订阅、获取订阅列表的功能:

public class SubscriptionManager
{
    // 存储事件类型对应的同步处理委托列表
    private readonly Dictionary<Type, List<Delegate>> _syncSubscriptions = new Dictionary<Type, List<Delegate>>();
    // 存储事件类型对应的异步处理委托列表
    private readonly Dictionary<Type, List<Delegate>> _asyncSubscriptions = new Dictionary<Type, List<Delegate>>();

    // 添加同步订阅
    public void AddSubscription<TEvent>(EventHandler<TEvent> handler) where TEvent : EventBase
    {
        var eventType = typeof(TEvent);
        if (!_syncSubscriptions.ContainsKey(eventType))
        {
            _syncSubscriptions[eventType] = new List<Delegate>();
        }
        // 避免重复订阅同一个处理方法
        if (!_syncSubscriptions[eventType].Contains(handler))
        {
            _syncSubscriptions[eventType].Add(handler);
        }
    }

    // 添加异步订阅
    public void AddSubscription<TEvent>(AsyncEventHandler<TEvent> handler) where TEvent : EventBase
    {
        var eventType = typeof(TEvent);
        if (!_asyncSubscriptions.ContainsKey(eventType))
        {
            _asyncSubscriptions[eventType] = new List<Delegate>();
        }
        if (!_asyncSubscriptions[eventType].Contains(handler))
        {
            _asyncSubscriptions[eventType].Add(handler);
        }
    }

    // 移除同步订阅
    public void RemoveSubscription<TEvent>(EventHandler<TEvent> handler) where TEvent : EventBase
    {
        var eventType = typeof(TEvent);
        if (_syncSubscriptions.ContainsKey(eventType))
        {
            _syncSubscriptions[eventType].Remove(handler);
            if (_syncSubscriptions[eventType].Count == 0)
            {
                _syncSubscriptions.Remove(eventType);
            }
        }
    }

    // 移除异步订阅
    public void RemoveSubscription<TEvent>(AsyncEventHandler<TEvent> handler) where TEvent : EventBase
    {
        var eventType = typeof(TEvent);
        if (_asyncSubscriptions.ContainsKey(eventType))
        {
            _asyncSubscriptions[eventType].Remove(handler);
            if (_asyncSubscriptions[eventType].Count == 0)
            {
                _asyncSubscriptions.Remove(eventType);
            }
        }
    }

    // 获取事件对应的所有同步处理委托
    public List<Delegate> GetSyncHandlers<TEvent>() where TEvent : EventBase
    {
        var eventType = typeof(TEvent);
        return _syncSubscriptions.ContainsKey(eventType) ? _syncSubscriptions[eventType] : new List<Delegate>();
    }

    // 获取事件对应的所有异步处理委托
    public List<Delegate> GetAsyncHandlers<TEvent>() where TEvent : EventBase
    {
        var eventType = typeof(TEvent);
        return _asyncSubscriptions.ContainsKey(eventType) ? _asyncSubscriptions[eventType] : new List<Delegate>();
    }
}

EventBus核心类实现

EventBus核心类封装订阅、取消订阅、发布事件的方法,内部使用订阅管理器维护订阅关系:

public class EventBus
{
    private readonly SubscriptionManager _subscriptionManager = new SubscriptionManager();

    // 订阅同步事件
    public void Subscribe<TEvent>(EventHandler<TEvent> handler) where TEvent : EventBase
    {
        _subscriptionManager.AddSubscription(handler);
    }

    // 订阅异步事件
    public void Subscribe<TEvent>(AsyncEventHandler<TEvent> handler) where TEvent : EventBase
    {
        _subscriptionManager.AddSubscription(handler);
    }

    // 取消订阅同步事件
    public void Unsubscribe<TEvent>(EventHandler<TEvent> handler) where TEvent : EventBase
    {
        _subscriptionManager.RemoveSubscription(handler);
    }

    // 取消订阅异步事件
    public void Unsubscribe<TEvent>(AsyncEventHandler<TEvent> handler) where TEvent : EventBase
    {
        _subscriptionManager.RemoveSubscription(handler);
    }

    // 发布同步事件
    public void Publish<TEvent>(TEvent @event) where TEvent : EventBase
    {
        var handlers = _subscriptionManager.GetSyncHandlers<TEvent>();
        foreach (var handler in handlers)
        {
            if (handler is EventHandler<TEvent> eventHandler)
            {
                eventHandler(@event);
            }
        }
    }

    // 发布异步事件,等待所有订阅者处理完成
    public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : EventBase
    {
        // 先执行所有同步处理
        Publish(@event);
        // 再执行所有异步处理
        var asyncHandlers = _subscriptionManager.GetAsyncHandlers<TEvent>();
        var tasks = new List<Task>();
        foreach (var handler in asyncHandlers)
        {
            if (handler is AsyncEventHandler<TEvent> asyncHandler)
            {
                tasks.Add(asyncHandler(@event));
            }
        }
        await Task.WhenAll(tasks);
    }
}

使用示例

首先定义一个自定义事件,继承EventBase

// 用户注册事件
public class UserRegisteredEvent : EventBase
{
    public string UserName { get; set; }
    public string Email { get; set; }
}

然后定义事件订阅者处理方法:

public class UserEventHandler
{
    // 同步处理用户注册事件,比如记录日志
    public void HandleUserRegistered(UserRegisteredEvent @event)
    {
        Console.WriteLine($"同步处理:用户{@event.UserName}注册成功,邮箱:{@event.Email},事件ID:{@event.EventId}");
    }

    // 异步处理用户注册事件,比如发送欢迎邮件
    public async Task AsyncHandleUserRegistered(UserRegisteredEvent @event)
    {
        await Task.Delay(1000); // 模拟异步操作耗时
        Console.WriteLine($"异步处理:已向用户{@event.UserName}的邮箱{@event.Email}发送欢迎邮件");
    }
}

最后在业务代码中使用EventBus:

class Program
{
    static async Task Main(string[] args)
    {
        var eventBus = new EventBus();
        var userEventHandler = new UserEventHandler();

        // 订阅事件
        eventBus.Subscribe<UserRegisteredEvent>(userEventHandler.HandleUserRegistered);
        eventBus.Subscribe<UserRegisteredEvent>(userEventHandler.AsyncHandleUserRegistered);

        // 发布事件
        var registerEvent = new UserRegisteredEvent
        {
            UserName = "张三",
            Email = "test@ipipp.com"
        };
        await eventBus.PublishAsync(registerEvent);

        // 取消订阅
        eventBus.Unsubscribe<UserRegisteredEvent>(userEventHandler.HandleUserRegistered);
        eventBus.Unsubscribe<UserRegisteredEvent>(userEventHandler.AsyncHandleUserRegistered);
    }
}

注意事项

  • 订阅的处理方法如果抛出异常,会影响后续订阅者的执行,实际使用中可以在EventBus中添加异常处理逻辑,避免单个订阅者异常导致整个事件发布流程中断。
  • 如果事件处理逻辑比较耗时,建议优先使用异步订阅,避免阻塞事件发布的线程。
  • 订阅管理器中的委托存储需要注意内存泄漏问题,如果订阅者是短生命周期对象,需要及时取消订阅,避免对象无法被垃圾回收。
  • 可以根据需求扩展EventBus的功能,比如添加事件过滤、事件优先级、事件持久化等特性。

EventBusC#事件发布订阅进程内消息总线修改时间:2026-06-26 03:21:38

免责声明:​ 已尽一切努力确保本网站所含信息的准确性。网站内容多为原创整理与精心编撰,观点力求客观中立。本站旨在免费分享,内容仅供个人学习、研究或参考使用。若引用了第三方作品,版权归原作者所有。如内容涉及您的权益,请联系我们处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。AI、前端、编程、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握开发与运维所需的核心技术。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端编程,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。