在分布式系统中,当业务操作需要同时更新数据库和发送消息到消息队列时,很容易出现数据库提交成功但消息发送失败,或者消息发送成功但数据库回滚的不一致问题。事务性发件箱模式(Outbox模式)通过将消息先持久化到本地数据库的发件箱表中,和业务操作在同一个事务内完成,之后再异步将发件箱中的消息投递到消息队列,从而保证了业务操作和消息的原子性。

核心设计思路
结合EF Core实现Outbox模式的核心流程可以分为三个步骤:
- 定义发件箱表结构,用于存储待发送的消息内容和状态
- 在业务操作的事务中,同时向业务表和发件箱表插入数据,保证两者在同一个事务内提交
- 启动独立的后台任务,定期扫描发件箱表中未发送的消息,投递到消息队列后更新消息状态
发件箱表结构定义
首先我们需要定义发件箱实体和对应的数据库表,包含消息唯一标识、消息类型、消息内容、发送状态、创建时间、重试次数等字段:
using System;
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
namespace OutboxDemo.Entities
{
/// <summary>
/// 发件箱消息实体
/// </summary>
public class OutboxMessage
{
/// <summary>
/// 消息唯一标识
/// </summary>
[Key]
public Guid Id { get; set; }
/// <summary>
/// 消息类型,用于区分不同业务的消息
/// </summary>
[Required]
[MaxLength(200)]
public string MessageType { get; set; }
/// <summary>
/// 消息内容,通常为序列化后的JSON字符串
/// </summary>
[Required]
public string Payload { get; set; }
/// <summary>
/// 消息是否已发送
/// </summary>
public bool IsSent { get; set; }
/// <summary>
/// 消息创建时间
/// </summary>
public DateTime CreatedAt { get; set; }
/// <summary>
/// 消息发送时间
/// </summary>
public DateTime? SentAt { get; set; }
/// <summary>
/// 重试次数
/// </summary>
public int RetryCount { get; set; }
}
}
然后在EF Core的DbContext中配置该实体对应的表:
using Microsoft.EntityFrameworkCore;
using OutboxDemo.Entities;
namespace OutboxDemo.Data
{
public class AppDbContext : DbContext
{
public AppDbContext(DbContextOptions<AppDbContext> options) : base(options)
{
}
public DbSet<OutboxMessage> OutboxMessages { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
// 配置发件箱表
modelBuilder.Entity<OutboxMessage>(entity =>
{
entity.ToTable("OutboxMessages");
entity.HasKey(e => e.Id);
entity.Property(e => e.MessageType).IsRequired().HasMaxLength(200);
entity.Property(e => e.Payload).IsRequired();
entity.Property(e => e.IsSent).HasDefaultValue(false);
entity.Property(e => e.CreatedAt).HasDefaultValueSql("GETDATE()");
entity.Property(e => e.RetryCount).HasDefaultValue(0);
// 为未发送的消息创建索引,提升查询效率
entity.HasIndex(e => new { e.IsSent, e.CreatedAt });
});
}
}
}
业务操作中写入发件箱
接下来在业务处理逻辑中,我们将业务数据更新和发件箱消息插入放在同一个事务中,确保两者要么同时成功,要么同时失败:
using Microsoft.EntityFrameworkCore;
using OutboxDemo.Entities;
using System;
using System.Text.Json;
using System.Threading.Tasks;
namespace OutboxDemo.Services
{
public class OrderService
{
private readonly AppDbContext _dbContext;
public OrderService(AppDbContext dbContext)
{
_dbContext = dbContext;
}
/// <summary>
/// 创建订单并发送订单创建消息
/// </summary>
public async Task CreateOrderAsync(string orderNo, decimal amount)
{
// 开启事务,保证业务操作和发件箱写入原子性
using var transaction = await _dbContext.Database.BeginTransactionAsync();
try
{
// 1. 处理业务操作,这里模拟订单表插入
// 假设已有Order实体和对应的DbSet
var order = new Order
{
Id = Guid.NewGuid(),
OrderNo = orderNo,
Amount = amount,
CreatedAt = DateTime.Now
};
_dbContext.Orders.Add(order);
// 2. 构造发件箱消息
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
MessageType = "OrderCreated",
// 序列化消息内容
Payload = JsonSerializer.Serialize(new
{
OrderId = order.Id,
OrderNo = order.OrderNo,
Amount = order.Amount
}),
IsSent = false,
CreatedAt = DateTime.Now,
RetryCount = 0
};
_dbContext.OutboxMessages.Add(outboxMessage);
// 3. 提交事务,业务数据和发件箱消息同时入库
await _dbContext.SaveChangesAsync();
await transaction.CommitAsync();
}
catch (Exception)
{
// 出现异常回滚事务
await transaction.RollbackAsync();
throw;
}
}
}
// 模拟订单实体
public class Order
{
public Guid Id { get; set; }
public string OrderNo { get; set; }
public decimal Amount { get; set; }
public DateTime CreatedAt { get; set; }
}
}
异步投递发件箱消息
发件箱消息写入数据库后,我们需要一个后台任务来定期扫描未发送的消息,投递到消息队列(这里以模拟消息队列发送为例),然后更新消息状态:
using Microsoft.EntityFrameworkCore;
using OutboxDemo.Entities;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace OutboxDemo.BackgroundServices
{
public class OutboxDispatcher : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly PeriodicTimer _timer;
public OutboxDispatcher(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
// 每5秒执行一次扫描
_timer = new PeriodicTimer(TimeSpan.FromSeconds(5));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (await _timer.WaitForNextTickAsync(stoppingToken) && !stoppingToken.IsCancellationRequested)
{
await ProcessOutboxMessagesAsync();
}
}
private async Task ProcessOutboxMessagesAsync()
{
// 创建作用域获取DbContext,避免线程安全问题
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
// 查询未发送的消息,每次最多处理100条
var unsentMessages = await dbContext.OutboxMessages
.Where(m => !m.IsSent)
.OrderBy(m => m.CreatedAt)
.Take(100)
.ToListAsync();
foreach (var message in unsentMessages)
{
try
{
// 模拟消息投递到消息队列的逻辑
// 实际项目中可以替换为RabbitMQ、Kafka等客户端的发送代码
await SendToMessageQueueAsync(message.MessageType, message.Payload);
// 投递成功,更新消息状态
message.IsSent = true;
message.SentAt = DateTime.Now;
await dbContext.SaveChangesAsync();
}
catch (Exception)
{
// 投递失败,增加重试次数
message.RetryCount += 1;
await dbContext.SaveChangesAsync();
// 实际项目中可以添加重试次数上限的判断,超过上限后记录日志或转入死信队列
}
}
}
private Task SendToMessageQueueAsync(string messageType, string payload)
{
// 模拟消息发送,实际项目中替换为真实的消息队列发送逻辑
Console.WriteLine($"发送消息,类型:{messageType},内容:{payload}");
return Task.CompletedTask;
}
}
}
注册后台服务
最后需要在Program.cs中注册DbContext和后台服务,确保程序启动时自动运行消息投递任务:
using Microsoft.EntityFrameworkCore;
using OutboxDemo.BackgroundServices;
using OutboxDemo.Data;
var builder = WebApplication.CreateBuilder(args);
// 注册DbContext,使用SQL Server数据库
builder.Services.AddDbContext<AppDbContext>(options =>
options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection")));
// 注册订单服务
builder.Services.AddScoped<OrderService>();
// 注册Outbox后台投递服务
builder.Services.AddHostedService<OutboxDispatcher>();
var app = builder.Build();
// 省略其他中间件配置
app.Run();
注意事项
在实际落地过程中,还需要注意以下几点:
- 发件箱表的索引优化,针对未发送消息的查询需要建立合适的索引,避免全表扫描影响性能
- 消息投递的幂等性处理,消息队列的消费端需要支持幂等,避免消息重复投递导致业务异常
- 重试策略设计,对于发送失败的消息需要有合理的重试机制,超过重试上限后需要转入死信队列人工处理
- 大消息处理,如果消息内容过大,可以考虑将消息内容存储到对象存储,发件箱只存储消息的引用地址