CQRS模式的核心思想是将系统的读操作(查询)和写操作(命令)分离,让两者可以使用不同的模型、存储甚至服务来实现,从而提升系统的可维护性和扩展性。很多C#开发者实现CQRS时会选择MediatR库,但手动实现可以让我们更清晰地理解模式本身的运行逻辑,也能更灵活地适配特殊业务场景。

CQRS的核心概念
手动实现CQRS前,我们需要先明确几个核心组件的定义:
- 命令(Command):代表一次写操作请求,比如创建用户、修改订单状态,只包含执行写操作需要的参数,不包含返回值。
- 命令处理器(CommandHandler):负责处理对应的命令,执行具体的写逻辑,比如操作数据库完成数据变更。
- 查询(Query):代表一次读操作请求,比如获取用户列表、查询订单详情,包含查询需要的过滤参数。
- 查询处理器(QueryHandler):负责处理对应的查询,执行具体的读逻辑,返回对应的查询结果。
- 分发器(Dispatcher):负责根据传入的命令或查询,找到对应的处理器并执行,是整个流程的调度中心。
基础模型定义
首先我们定义命令和查询的基础接口,所有具体的命令、查询都需要实现这些接口,方便后续分发器统一处理。
// 命令基础接口,没有返回值
public interface ICommand
{
}
// 带返回值的命令基础接口,TResult是返回值类型
public interface ICommand<TResult>
{
}
// 查询基础接口,TResult是查询结果类型
public interface IQuery<TResult>
{
}
接下来定义命令处理器和查询处理器的接口,约束处理器的实现规范。
// 无返回值命令处理器接口
public interface ICommandHandler<TCommand> where TCommand : ICommand
{
Task Handle(TCommand command, CancellationToken cancellationToken = default);
}
// 带返回值命令处理器接口
public interface ICommandHandler<TCommand, TResult> where TCommand : ICommand<TResult>
{
Task<TResult> Handle(TCommand command, CancellationToken cancellationToken = default);
}
// 查询处理器接口
public interface IQueryHandler<TQuery, TResult> where TQuery : IQuery<TResult>
{
Task<TResult> Handle(TQuery query, CancellationToken cancellationToken = default);
}
实现具体命令和查询
我们以一个简单的用户管理场景为例,实现创建用户的命令和查询用户列表的查询。
创建用户命令
// 创建用户的命令,携带用户名和邮箱参数
public class CreateUserCommand : ICommand<int>
{
public string UserName { get; set; }
public string Email { get; set; }
}
查询用户列表查询
// 查询用户列表的查询,支持按用户名模糊搜索
public class GetUserListQuery : IQuery<List<UserDto>>
{
public string KeyWord { get; set; }
}
// 用户查询返回的数据传输对象
public class UserDto
{
public int Id { get; set; }
public string UserName { get; set; }
public string Email { get; set; }
}
实现处理器逻辑
接下来为上面的命令和查询编写对应的处理器,这里我们假设有一个简单的内存用户存储来模拟数据库操作。
// 模拟用户存储
public static class UserStorage
{
public static List<UserDto> Users = new List<UserDto>();
public static int NextId = 1;
}
// 创建用户命令处理器
public class CreateUserCommandHandler : ICommandHandler<CreateUserCommand, int>
{
public Task<int> Handle(CreateUserCommand command, CancellationToken cancellationToken = default)
{
var user = new UserDto
{
Id = UserStorage.NextId,
UserName = command.UserName,
Email = command.Email
};
UserStorage.Users.Add(user);
UserStorage.NextId++;
return Task.FromResult(user.Id);
}
}
// 查询用户列表处理器
public class GetUserListQueryHandler : IQueryHandler<GetUserListQuery, List<UserDto>>
{
public Task<List<UserDto>> Handle(GetUserListQuery query, CancellationToken cancellationToken = default)
{
var result = string.IsNullOrEmpty(query.KeyWord)
? UserStorage.Users
: UserStorage.Users.Where(u => u.UserName.Contains(query.KeyWord)).ToList();
return Task.FromResult(result);
}
}
实现分发器
分发器的作用是根据传入的命令或查询,找到对应的处理器并执行。我们可以通过字典维护命令/查询类型到处理器的映射关系来实现。
public class Dispatcher
{
// 存储命令类型到处理器的映射
private readonly Dictionary<Type, object> _handlers = new Dictionary<Type, object>();
// 注册命令处理器(无返回值)
public void RegisterCommandHandler<TCommand>(ICommandHandler<TCommand> handler) where TCommand : ICommand
{
_handlers[typeof(TCommand)] = handler;
}
// 注册带返回值的命令处理器
public void RegisterCommandHandler<TCommand, TResult>(ICommandHandler<TCommand, TResult> handler)
where TCommand : ICommand<TResult>
{
_handlers[typeof(TCommand)] = handler;
}
// 注册查询处理器
public void RegisterQueryHandler<TQuery, TResult>(IQueryHandler<TQuery, TResult> handler)
where TQuery : IQuery<TResult>
{
_handlers[typeof(TQuery)] = handler;
}
// 执行无返回值命令
public async Task Send(ICommand command, CancellationToken cancellationToken = default)
{
var commandType = command.GetType();
if (!_handlers.ContainsKey(commandType))
{
throw new Exception($"未找到命令{commandType.Name}对应的处理器");
}
dynamic handler = _handlers[commandType];
await handler.Handle((dynamic)command, cancellationToken);
}
// 执行带返回值的命令
public async Task<TResult> Send<TResult>(ICommand<TResult> command, CancellationToken cancellationToken = default)
{
var commandType = command.GetType();
if (!_handlers.ContainsKey(commandType))
{
throw new Exception($"未找到命令{commandType.Name}对应的处理器");
}
dynamic handler = _handlers[commandType];
return await handler.Handle((dynamic)command, cancellationToken);
}
// 执行查询
public async Task<TResult> Query<TResult>(IQuery<TResult> query, CancellationToken cancellationToken = default)
{
var queryType = query.GetType();
if (!_handlers.ContainsKey(queryType))
{
throw new Exception($"未找到查询{queryType.Name}对应的处理器");
}
dynamic handler = _handlers[queryType];
return await handler.Handle((dynamic)query, cancellationToken);
}
}
整体流程验证
最后我们把所有组件串起来,验证手动实现的CQRS是否正常工作。
class Program
{
static async Task Main(string[] args)
{
// 初始化分发器
var dispatcher = new Dispatcher();
// 注册处理器
dispatcher.RegisterCommandHandler<CreateUserCommand, int>(new CreateUserCommandHandler());
dispatcher.RegisterQueryHandler<GetUserListQuery, List<UserDto>>(new GetUserListQueryHandler());
// 执行创建用户命令
var createCommand = new CreateUserCommand
{
UserName = "张三",
Email = "zhangsan@ipipp.com"
};
var userId = await dispatcher.Send(createCommand);
Console.WriteLine($"创建用户成功,用户ID为:{userId}");
// 执行查询用户列表查询
var query = new GetUserListQuery();
var userList = await dispatcher.Query(query);
Console.WriteLine($"当前用户总数:{userList.Count}");
foreach (var user in userList)
{
Console.WriteLine($"用户ID:{user.Id},用户名:{user.UserName},邮箱:{user.Email}");
}
}
}
扩展建议
上面的实现是最基础的版本,实际项目中可以根据需求扩展:
- 可以结合依赖注入框架,自动扫描并注册所有命令和查询处理器,不需要手动调用注册方法。
- 可以为命令和查询添加验证逻辑,比如参数合法性校验,在处理器执行前先完成验证。
- 如果写操作需要发消息通知其他服务,可以在命令处理器中添加消息发布的逻辑,实现事件驱动架构的扩展。
- 查询侧可以使用专门的读库,和写库分离,进一步提升查询性能,这也是CQRS模式的常见落地方式。