在C#应用开发中,组件之间的直接调用往往会导致代码耦合度升高,后期维护成本增加。进程内事件发布订阅的消息总线EventBus可以有效解决这个问题,通过统一的事件分发机制,让事件的发布者和订阅者不需要直接依赖对方,只需要关注事件本身即可。下面我们通过完整的实现过程来掌握EventBus的构建方法。

核心设计思路
EventBus的核心功能是管理事件的订阅关系,并在事件发布时通知所有对应的订阅者。整体设计可以分为几个核心部分:事件基类、订阅管理器、EventBus核心类、订阅处理委托。事件基类是所有自定义事件的父类,订阅管理器负责维护事件类型和对应订阅者的映射关系,EventBus对外提供订阅、取消订阅、发布事件的接口,订阅处理委托用于定义订阅者处理事件的方法签名。
基础类型定义
首先定义事件基类,所有自定义事件都需要继承这个基类,这样可以统一事件的类型处理:
// 事件基类,所有自定义事件继承此类
public abstract class EventBase
{
// 事件唯一标识,可用于跟踪事件
public string EventId { get; set; } = Guid.NewGuid().ToString();
// 事件发生时间
public DateTime OccurredTime { get; set; } = DateTime.Now;
}
接着定义订阅者的处理委托,支持同步和异步两种处理方式:
// 同步事件处理委托 public delegate void EventHandler<TEvent>(TEvent @event) where TEvent : EventBase; // 异步事件处理委托 public delegate Task AsyncEventHandler<TEvent>(TEvent @event) where TEvent : EventBase;
订阅管理器实现
订阅管理器负责维护事件类型和对应处理方法的映射,需要支持添加订阅、移除订阅、获取订阅列表的功能:
public class SubscriptionManager
{
// 存储事件类型对应的同步处理委托列表
private readonly Dictionary<Type, List<Delegate>> _syncSubscriptions = new Dictionary<Type, List<Delegate>>();
// 存储事件类型对应的异步处理委托列表
private readonly Dictionary<Type, List<Delegate>> _asyncSubscriptions = new Dictionary<Type, List<Delegate>>();
// 添加同步订阅
public void AddSubscription<TEvent>(EventHandler<TEvent> handler) where TEvent : EventBase
{
var eventType = typeof(TEvent);
if (!_syncSubscriptions.ContainsKey(eventType))
{
_syncSubscriptions[eventType] = new List<Delegate>();
}
// 避免重复订阅同一个处理方法
if (!_syncSubscriptions[eventType].Contains(handler))
{
_syncSubscriptions[eventType].Add(handler);
}
}
// 添加异步订阅
public void AddSubscription<TEvent>(AsyncEventHandler<TEvent> handler) where TEvent : EventBase
{
var eventType = typeof(TEvent);
if (!_asyncSubscriptions.ContainsKey(eventType))
{
_asyncSubscriptions[eventType] = new List<Delegate>();
}
if (!_asyncSubscriptions[eventType].Contains(handler))
{
_asyncSubscriptions[eventType].Add(handler);
}
}
// 移除同步订阅
public void RemoveSubscription<TEvent>(EventHandler<TEvent> handler) where TEvent : EventBase
{
var eventType = typeof(TEvent);
if (_syncSubscriptions.ContainsKey(eventType))
{
_syncSubscriptions[eventType].Remove(handler);
if (_syncSubscriptions[eventType].Count == 0)
{
_syncSubscriptions.Remove(eventType);
}
}
}
// 移除异步订阅
public void RemoveSubscription<TEvent>(AsyncEventHandler<TEvent> handler) where TEvent : EventBase
{
var eventType = typeof(TEvent);
if (_asyncSubscriptions.ContainsKey(eventType))
{
_asyncSubscriptions[eventType].Remove(handler);
if (_asyncSubscriptions[eventType].Count == 0)
{
_asyncSubscriptions.Remove(eventType);
}
}
}
// 获取事件对应的所有同步处理委托
public List<Delegate> GetSyncHandlers<TEvent>() where TEvent : EventBase
{
var eventType = typeof(TEvent);
return _syncSubscriptions.ContainsKey(eventType) ? _syncSubscriptions[eventType] : new List<Delegate>();
}
// 获取事件对应的所有异步处理委托
public List<Delegate> GetAsyncHandlers<TEvent>() where TEvent : EventBase
{
var eventType = typeof(TEvent);
return _asyncSubscriptions.ContainsKey(eventType) ? _asyncSubscriptions[eventType] : new List<Delegate>();
}
}
EventBus核心类实现
EventBus核心类封装订阅、取消订阅、发布事件的方法,内部使用订阅管理器维护订阅关系:
public class EventBus
{
private readonly SubscriptionManager _subscriptionManager = new SubscriptionManager();
// 订阅同步事件
public void Subscribe<TEvent>(EventHandler<TEvent> handler) where TEvent : EventBase
{
_subscriptionManager.AddSubscription(handler);
}
// 订阅异步事件
public void Subscribe<TEvent>(AsyncEventHandler<TEvent> handler) where TEvent : EventBase
{
_subscriptionManager.AddSubscription(handler);
}
// 取消订阅同步事件
public void Unsubscribe<TEvent>(EventHandler<TEvent> handler) where TEvent : EventBase
{
_subscriptionManager.RemoveSubscription(handler);
}
// 取消订阅异步事件
public void Unsubscribe<TEvent>(AsyncEventHandler<TEvent> handler) where TEvent : EventBase
{
_subscriptionManager.RemoveSubscription(handler);
}
// 发布同步事件
public void Publish<TEvent>(TEvent @event) where TEvent : EventBase
{
var handlers = _subscriptionManager.GetSyncHandlers<TEvent>();
foreach (var handler in handlers)
{
if (handler is EventHandler<TEvent> eventHandler)
{
eventHandler(@event);
}
}
}
// 发布异步事件,等待所有订阅者处理完成
public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : EventBase
{
// 先执行所有同步处理
Publish(@event);
// 再执行所有异步处理
var asyncHandlers = _subscriptionManager.GetAsyncHandlers<TEvent>();
var tasks = new List<Task>();
foreach (var handler in asyncHandlers)
{
if (handler is AsyncEventHandler<TEvent> asyncHandler)
{
tasks.Add(asyncHandler(@event));
}
}
await Task.WhenAll(tasks);
}
}
使用示例
首先定义一个自定义事件,继承EventBase:
// 用户注册事件
public class UserRegisteredEvent : EventBase
{
public string UserName { get; set; }
public string Email { get; set; }
}
然后定义事件订阅者处理方法:
public class UserEventHandler
{
// 同步处理用户注册事件,比如记录日志
public void HandleUserRegistered(UserRegisteredEvent @event)
{
Console.WriteLine($"同步处理:用户{@event.UserName}注册成功,邮箱:{@event.Email},事件ID:{@event.EventId}");
}
// 异步处理用户注册事件,比如发送欢迎邮件
public async Task AsyncHandleUserRegistered(UserRegisteredEvent @event)
{
await Task.Delay(1000); // 模拟异步操作耗时
Console.WriteLine($"异步处理:已向用户{@event.UserName}的邮箱{@event.Email}发送欢迎邮件");
}
}
最后在业务代码中使用EventBus:
class Program
{
static async Task Main(string[] args)
{
var eventBus = new EventBus();
var userEventHandler = new UserEventHandler();
// 订阅事件
eventBus.Subscribe<UserRegisteredEvent>(userEventHandler.HandleUserRegistered);
eventBus.Subscribe<UserRegisteredEvent>(userEventHandler.AsyncHandleUserRegistered);
// 发布事件
var registerEvent = new UserRegisteredEvent
{
UserName = "张三",
Email = "test@ipipp.com"
};
await eventBus.PublishAsync(registerEvent);
// 取消订阅
eventBus.Unsubscribe<UserRegisteredEvent>(userEventHandler.HandleUserRegistered);
eventBus.Unsubscribe<UserRegisteredEvent>(userEventHandler.AsyncHandleUserRegistered);
}
}
注意事项
- 订阅的处理方法如果抛出异常,会影响后续订阅者的执行,实际使用中可以在EventBus中添加异常处理逻辑,避免单个订阅者异常导致整个事件发布流程中断。
- 如果事件处理逻辑比较耗时,建议优先使用异步订阅,避免阻塞事件发布的线程。
- 订阅管理器中的委托存储需要注意内存泄漏问题,如果订阅者是短生命周期对象,需要及时取消订阅,避免对象无法被垃圾回收。
- 可以根据需求扩展EventBus的功能,比如添加事件过滤、事件优先级、事件持久化等特性。