CQRS全称Command Query Responsibility Segregation,核心思想是将系统的写操作(命令)和读操作(查询)拆分为两个独立的模型,避免读写逻辑耦合带来的复杂度。在C#项目中实现CQRS,不需要依赖复杂的第三方框架,通过合理的分层设计就能完成基础落地。

CQRS的核心概念
首先需要明确CQRS中的两个核心角色:
- 命令(Command):代表对系统状态的修改操作,比如创建订单、更新用户信息等,命令只负责执行写操作,不返回业务数据,仅返回执行结果状态。
- 查询(Query):代表对系统数据的读取操作,比如获取订单列表、查询用户详情等,查询只返回数据,不修改系统状态。
两者完全分离后,读模型和写模型可以独立优化,比如写模型可以专注于业务逻辑校验和状态变更,读模型可以针对性做数据冗余、缓存优化,提升查询效率。
C#实现CQRS的基础分层
在C#项目中,我们可以将项目分为以下几个核心层:
- 领域层:存放核心业务实体、值对象、领域事件等,是读写模型共享的基础。
- 命令层:包含命令定义、命令处理器,负责处理所有写操作。
- 查询层:包含查询定义、查询处理器,负责处理所有读操作。
- 基础设施层:提供数据库访问、消息总线等通用能力。
命令模型的实现
定义命令
命令是一个简单的数据载体,只包含执行写操作所需的参数,不包含业务逻辑。以下是一个创建用户的命令示例:
// 创建用户命令
public class CreateUserCommand
{
// 用户姓名
public string UserName { get; set; }
// 用户邮箱
public string Email { get; set; }
// 用户密码
public string Password { get; set; }
}
定义命令处理器
命令处理器负责接收命令,执行对应的业务逻辑和状态变更操作。我们需要定义一个通用的命令处理器接口,再实现具体的处理逻辑:
// 命令处理器接口
public interface ICommandHandler<TCommand> where TCommand : class
{
// 处理命令,返回执行结果
Task<bool> HandleAsync(TCommand command);
}
// 创建用户命令处理器
public class CreateUserCommandHandler : ICommandHandler<CreateUserCommand>
{
private readonly UserWriteDbContext _writeDbContext;
public CreateUserCommandHandler(UserWriteDbContext writeDbContext)
{
_writeDbContext = writeDbContext;
}
public async Task<bool> HandleAsync(CreateUserCommand command)
{
// 业务逻辑校验:检查邮箱是否已存在
var existUser = await _writeDbContext.Users.FirstOrDefaultAsync(u => u.Email == command.Email);
if (existUser != null)
{
return false;
}
// 创建用户实体
var user = new User
{
Id = Guid.NewGuid(),
UserName = command.UserName,
Email = command.Email,
Password = command.Password,
CreateTime = DateTime.Now
};
// 保存到写库
_writeDbContext.Users.Add(user);
await _writeDbContext.SaveChangesAsync();
return true;
}
}
命令分发器
为了让命令能够路由到对应的处理器,我们可以实现一个简单的命令分发器:
// 命令分发器
public class CommandDispatcher
{
private readonly IServiceProvider _serviceProvider;
public CommandDispatcher(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public async Task<bool> DispatchAsync<TCommand>(TCommand command) where TCommand : class
{
// 从依赖注入容器中获取对应的命令处理器
var handler = _serviceProvider.GetService<ICommandHandler<TCommand>>();
if (handler == null)
{
throw new ArgumentException($"未找到{typeof(TCommand).Name}对应的处理器");
}
return await handler.HandleAsync(command);
}
}
查询模型的实现
定义查询
查询同样是一个数据载体,包含查询所需的参数,比如分页参数、筛选条件等:
// 获取用户列表查询
public class GetUserListQuery
{
// 页码
public int PageIndex { get; set; } = 1;
// 每页条数
public int PageSize { get; set; } = 10;
// 姓名筛选关键字
public string Keyword { get; set; }
}
定义查询处理器
查询处理器负责接收查询参数,从读库中读取数据并返回结果,不包含写逻辑:
// 查询处理器接口
public interface IQueryHandler<TQuery, TResult> where TQuery : class
{
// 处理查询,返回对应结果
Task<TResult> HandleAsync(TQuery query);
}
// 用户列表查询结果
public class UserListResult
{
public List<UserDto> Users { get; set; }
public int TotalCount { get; set; }
}
// 获取用户列表查询处理器
public class GetUserListQueryHandler : IQueryHandler<GetUserListQuery, UserListResult>
{
private readonly UserReadDbContext _readDbContext;
public GetUserListQueryHandler(UserReadDbContext readDbContext)
{
_readDbContext = readDbContext;
}
public async Task<UserListResult> HandleAsync(GetUserListQuery query)
{
// 构建查询条件
var queryable = _readDbContext.UserDtos.AsQueryable();
if (!string.IsNullOrEmpty(query.Keyword))
{
queryable = queryable.Where(u => u.UserName.Contains(query.Keyword));
}
// 获取总条数
var totalCount = await queryable.CountAsync();
// 分页查询
var users = await queryable
.Skip((query.PageIndex - 1) * query.PageSize)
.Take(query.PageSize)
.ToListAsync();
return new UserListResult
{
Users = users,
TotalCount = totalCount
};
}
}
查询分发器
和命令分发器类似,查询也需要一个分发器来路由到对应的处理器:
// 查询分发器
public class QueryDispatcher
{
private readonly IServiceProvider _serviceProvider;
public QueryDispatcher(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public async Task<TResult> DispatchAsync<TQuery, TResult>(TQuery query) where TQuery : class
{
// 获取对应的查询处理器
var handler = _serviceProvider.GetService<IQueryHandler<TQuery, TResult>>();
if (handler == null)
{
throw new ArgumentException($"未找到{typeof(TQuery).Name}对应的处理器");
}
return await handler.HandleAsync(query);
}
}
读写模型的数据同步
当写模型更新数据后,需要把变更同步到读模型,保证查询数据的准确性。常见的同步方式有两种:
- 简单场景:直接在命令处理器中写完写库后,同步更新读库,适合数据一致性要求高的场景。
- 复杂场景:通过领域事件异步同步,写模型执行完成后发布领域事件,事件处理器监听事件后更新读库,适合高并发、读写库分离的场景。
以下是基于领域事件的同步示例:
// 用户创建领域事件
public class UserCreatedEvent
{
public Guid UserId { get; set; }
public string UserName { get; set; }
public string Email { get; set; }
public DateTime CreateTime { get; set; }
}
// 修改创建用户命令处理器,执行完成后发布事件
public class CreateUserCommandHandler : ICommandHandler<CreateUserCommand>
{
private readonly UserWriteDbContext _writeDbContext;
private readonly IEventPublisher _eventPublisher;
public CreateUserCommandHandler(UserWriteDbContext writeDbContext, IEventPublisher eventPublisher)
{
_writeDbContext = writeDbContext;
_eventPublisher = eventPublisher;
}
public async Task<bool> HandleAsync(CreateUserCommand command)
{
// 原有校验和创建逻辑...
var user = new User
{
Id = Guid.NewGuid(),
UserName = command.UserName,
Email = command.Email,
Password = command.Password,
CreateTime = DateTime.Now
};
_writeDbContext.Users.Add(user);
await _writeDbContext.SaveChangesAsync();
// 发布用户创建事件
await _eventPublisher.PublishAsync(new UserCreatedEvent
{
UserId = user.Id,
UserName = user.UserName,
Email = user.Email,
CreateTime = user.CreateTime
});
return true;
}
}
// 用户创建事件处理器,更新读库
public class UserCreatedEventHandler : IEventHandler<UserCreatedEvent>
{
private readonly UserReadDbContext _readDbContext;
public UserCreatedEventHandler(UserReadDbContext readDbContext)
{
_readDbContext = readDbContext;
}
public async Task HandleAsync(UserCreatedEvent @event)
{
// 向读库插入用户DTO
_readDbContext.UserDtos.Add(new UserDto
{
Id = @event.UserId,
UserName = @event.UserName,
Email = @event.Email,
CreateTime = @event.CreateTime
});
await _readDbContext.SaveChangesAsync();
}
}
实际使用的注意事项
- 不是所有项目都需要CQRS,小型简单项目使用CQRS会增加不必要的复杂度,适合中大型、读写逻辑差异大、并发高的项目。
- 读写模型可以共用同一个数据库,也可以分离为不同的数据库,根据项目需求选择,初期可以共用数据库降低部署成本。
- 命令和查询的命名要清晰,命令以动词开头比如Create、Update、Delete,查询以Get、Query开头,方便后续维护。
- 命令处理器中不要包含查询逻辑,查询处理器中不要包含写逻辑,严格遵守职责分离的原则。
总结
在C#中实现CQRS架构,核心是将命令和查询的逻辑、模型完全分离,通过命令处理器处理写操作,查询处理器处理读操作,再通过合适的方式同步读写数据。这种架构能有效降低系统复杂度,提升读写性能,适合需要长期迭代的中大型项目。开发者可以根据项目的实际需求,灵活调整实现的细节,不需要完全照搬复杂的CQRS实现方案。