导读:本期聚焦于小伙伴创作的《C#如何结合EF Core和Outbox模式实现事务性发件箱模式方法》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《C#如何结合EF Core和Outbox模式实现事务性发件箱模式方法》有用,将其分享出去将是对创作者最好的鼓励。

在分布式系统中,当业务操作需要同时更新数据库和发送消息到消息队列时,很容易出现数据库提交成功但消息发送失败,或者消息发送成功但数据库回滚的不一致问题。事务性发件箱模式(Outbox模式)通过将消息先持久化到本地数据库的发件箱表中,和业务操作在同一个事务内完成,之后再异步将发件箱中的消息投递到消息队列,从而保证了业务操作和消息的原子性。

C#如何结合EF Core和Outbox模式实现事务性发件箱模式方法

核心设计思路

结合EF Core实现Outbox模式的核心流程可以分为三个步骤:

  • 定义发件箱表结构,用于存储待发送的消息内容和状态
  • 在业务操作的事务中,同时向业务表和发件箱表插入数据,保证两者在同一个事务内提交
  • 启动独立的后台任务,定期扫描发件箱表中未发送的消息,投递到消息队列后更新消息状态

发件箱表结构定义

首先我们需要定义发件箱实体和对应的数据库表,包含消息唯一标识、消息类型、消息内容、发送状态、创建时间、重试次数等字段:

using System;
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;

namespace OutboxDemo.Entities
{
    /// <summary>
    /// 发件箱消息实体
    /// </summary>
    public class OutboxMessage
    {
        /// <summary>
        /// 消息唯一标识
        /// </summary>
        [Key]
        public Guid Id { get; set; }

        /// <summary>
        /// 消息类型,用于区分不同业务的消息
        /// </summary>
        [Required]
        [MaxLength(200)]
        public string MessageType { get; set; }

        /// <summary>
        /// 消息内容,通常为序列化后的JSON字符串
        /// </summary>
        [Required]
        public string Payload { get; set; }

        /// <summary>
        /// 消息是否已发送
        /// </summary>
        public bool IsSent { get; set; }

        /// <summary>
        /// 消息创建时间
        /// </summary>
        public DateTime CreatedAt { get; set; }

        /// <summary>
        /// 消息发送时间
        /// </summary>
        public DateTime? SentAt { get; set; }

        /// <summary>
        /// 重试次数
        /// </summary>
        public int RetryCount { get; set; }
    }
}

然后在EF Core的DbContext中配置该实体对应的表:

using Microsoft.EntityFrameworkCore;
using OutboxDemo.Entities;

namespace OutboxDemo.Data
{
    public class AppDbContext : DbContext
    {
        public AppDbContext(DbContextOptions<AppDbContext> options) : base(options)
        {
        }

        public DbSet<OutboxMessage> OutboxMessages { get; set; }

        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            base.OnModelCreating(modelBuilder);
            // 配置发件箱表
            modelBuilder.Entity<OutboxMessage>(entity =>
            {
                entity.ToTable("OutboxMessages");
                entity.HasKey(e => e.Id);
                entity.Property(e => e.MessageType).IsRequired().HasMaxLength(200);
                entity.Property(e => e.Payload).IsRequired();
                entity.Property(e => e.IsSent).HasDefaultValue(false);
                entity.Property(e => e.CreatedAt).HasDefaultValueSql("GETDATE()");
                entity.Property(e => e.RetryCount).HasDefaultValue(0);
                // 为未发送的消息创建索引,提升查询效率
                entity.HasIndex(e => new { e.IsSent, e.CreatedAt });
            });
        }
    }
}

业务操作中写入发件箱

接下来在业务处理逻辑中,我们将业务数据更新和发件箱消息插入放在同一个事务中,确保两者要么同时成功,要么同时失败:

using Microsoft.EntityFrameworkCore;
using OutboxDemo.Entities;
using System;
using System.Text.Json;
using System.Threading.Tasks;

namespace OutboxDemo.Services
{
    public class OrderService
    {
        private readonly AppDbContext _dbContext;

        public OrderService(AppDbContext dbContext)
        {
            _dbContext = dbContext;
        }

        /// <summary>
        /// 创建订单并发送订单创建消息
        /// </summary>
        public async Task CreateOrderAsync(string orderNo, decimal amount)
        {
            // 开启事务,保证业务操作和发件箱写入原子性
            using var transaction = await _dbContext.Database.BeginTransactionAsync();
            try
            {
                // 1. 处理业务操作,这里模拟订单表插入
                // 假设已有Order实体和对应的DbSet
                var order = new Order
                {
                    Id = Guid.NewGuid(),
                    OrderNo = orderNo,
                    Amount = amount,
                    CreatedAt = DateTime.Now
                };
                _dbContext.Orders.Add(order);

                // 2. 构造发件箱消息
                var outboxMessage = new OutboxMessage
                {
                    Id = Guid.NewGuid(),
                    MessageType = "OrderCreated",
                    // 序列化消息内容
                    Payload = JsonSerializer.Serialize(new
                    {
                        OrderId = order.Id,
                        OrderNo = order.OrderNo,
                        Amount = order.Amount
                    }),
                    IsSent = false,
                    CreatedAt = DateTime.Now,
                    RetryCount = 0
                };
                _dbContext.OutboxMessages.Add(outboxMessage);

                // 3. 提交事务,业务数据和发件箱消息同时入库
                await _dbContext.SaveChangesAsync();
                await transaction.CommitAsync();
            }
            catch (Exception)
            {
                // 出现异常回滚事务
                await transaction.RollbackAsync();
                throw;
            }
        }
    }

    // 模拟订单实体
    public class Order
    {
        public Guid Id { get; set; }
        public string OrderNo { get; set; }
        public decimal Amount { get; set; }
        public DateTime CreatedAt { get; set; }
    }
}

异步投递发件箱消息

发件箱消息写入数据库后,我们需要一个后台任务来定期扫描未发送的消息,投递到消息队列(这里以模拟消息队列发送为例),然后更新消息状态:

using Microsoft.EntityFrameworkCore;
using OutboxDemo.Entities;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace OutboxDemo.BackgroundServices
{
    public class OutboxDispatcher : BackgroundService
    {
        private readonly IServiceProvider _serviceProvider;
        private readonly PeriodicTimer _timer;

        public OutboxDispatcher(IServiceProvider serviceProvider)
        {
            _serviceProvider = serviceProvider;
            // 每5秒执行一次扫描
            _timer = new PeriodicTimer(TimeSpan.FromSeconds(5));
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (await _timer.WaitForNextTickAsync(stoppingToken) && !stoppingToken.IsCancellationRequested)
            {
                await ProcessOutboxMessagesAsync();
            }
        }

        private async Task ProcessOutboxMessagesAsync()
        {
            // 创建作用域获取DbContext,避免线程安全问题
            using var scope = _serviceProvider.CreateScope();
            var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();

            // 查询未发送的消息,每次最多处理100条
            var unsentMessages = await dbContext.OutboxMessages
                .Where(m => !m.IsSent)
                .OrderBy(m => m.CreatedAt)
                .Take(100)
                .ToListAsync();

            foreach (var message in unsentMessages)
            {
                try
                {
                    // 模拟消息投递到消息队列的逻辑
                    // 实际项目中可以替换为RabbitMQ、Kafka等客户端的发送代码
                    await SendToMessageQueueAsync(message.MessageType, message.Payload);

                    // 投递成功,更新消息状态
                    message.IsSent = true;
                    message.SentAt = DateTime.Now;
                    await dbContext.SaveChangesAsync();
                }
                catch (Exception)
                {
                    // 投递失败,增加重试次数
                    message.RetryCount += 1;
                    await dbContext.SaveChangesAsync();
                    // 实际项目中可以添加重试次数上限的判断,超过上限后记录日志或转入死信队列
                }
            }
        }

        private Task SendToMessageQueueAsync(string messageType, string payload)
        {
            // 模拟消息发送,实际项目中替换为真实的消息队列发送逻辑
            Console.WriteLine($"发送消息,类型:{messageType},内容:{payload}");
            return Task.CompletedTask;
        }
    }
}

注册后台服务

最后需要在Program.cs中注册DbContext和后台服务,确保程序启动时自动运行消息投递任务:

using Microsoft.EntityFrameworkCore;
using OutboxDemo.BackgroundServices;
using OutboxDemo.Data;

var builder = WebApplication.CreateBuilder(args);

// 注册DbContext,使用SQL Server数据库
builder.Services.AddDbContext<AppDbContext>(options =>
    options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection")));

// 注册订单服务
builder.Services.AddScoped<OrderService>();

// 注册Outbox后台投递服务
builder.Services.AddHostedService<OutboxDispatcher>();

var app = builder.Build();

// 省略其他中间件配置

app.Run();

注意事项

在实际落地过程中,还需要注意以下几点:

  • 发件箱表的索引优化,针对未发送消息的查询需要建立合适的索引,避免全表扫描影响性能
  • 消息投递的幂等性处理,消息队列的消费端需要支持幂等,避免消息重复投递导致业务异常
  • 重试策略设计,对于发送失败的消息需要有合理的重试机制,超过重试上限后需要转入死信队列人工处理
  • 大消息处理,如果消息内容过大,可以考虑将消息内容存储到对象存储,发件箱只存储消息的引用地址

C#EF_CoreOutbox模式事务性发件箱模式修改时间:2026-06-26 14:54:50

免责声明:​ 已尽一切努力确保本网站所含信息的准确性。网站内容多为原创整理与精心编撰,观点力求客观中立。本站旨在免费分享,内容仅供个人学习、研究或参考使用。若引用了第三方作品,版权归原作者所有。如内容涉及您的权益,请联系我们处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。AI、前端、编程、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握开发与运维所需的核心技术。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端编程,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。