消息通知系统需要支持短信、邮件、站内信、推送等多种通知方式,同时要保证高并发场景下的消息不丢失、发送效率稳定,使用Golang开发时可以借助其原生并发特性实现高效的系统设计。

核心模块设计
整个消息通知系统可以拆分为以下四个核心模块,各模块职责清晰,方便后续扩展和维护。
- 通知配置模块:管理不同通知类型的配置信息,包括发送通道、限流规则、重试策略等
- 消息队列模块:接收上游业务发送的通知请求,缓存消息避免高并发下系统过载
- 发送处理器模块:从队列中获取消息,根据通知类型调用对应的发送通道实现发送
- 监控统计模块:记录消息发送的成功率、失败原因、发送耗时等指标,方便后续排查问题
基础数据结构定义
首先定义通知消息的基础结构体,包含消息的核心属性,后续所有模块都基于该结构体进行处理。
package model
// NotifyMessage 通知消息结构体
type NotifyMessage struct {
ID string // 消息唯一ID
Type string // 通知类型:sms/email/station_message/push
Receiver string // 接收者标识,如手机号、邮箱、用户ID
Content string // 通知内容
Extra map[string]string // 扩展字段,存放不同通知类型的额外参数
RetryTime int // 已重试次数
}
消息队列模块实现
消息队列模块使用Golang的channel实现内存队列,同时支持配置队列容量,避免内存溢出。这里使用带缓冲的channel作为基础队列容器。
package queue
import (
"sync"
"your_project/model"
)
type MessageQueue struct {
queue chan model.NotifyMessage
lock sync.RWMutex
}
// NewMessageQueue 创建消息队列实例
func NewMessageQueue(capacity int) *MessageQueue {
return &MessageQueue{
queue: make(chan model.NotifyMessage, capacity),
}
}
// Push 向队列中推送消息
func (m *MessageQueue) Push(msg model.NotifyMessage) error {
select {
case m.queue <- msg:
return nil
default:
// 队列已满,返回错误让上游处理
return errors.New("message queue is full")
}
}
// Pop 从队列中取出消息,队列为空时阻塞
func (m *MessageQueue) Pop() model.NotifyMessage {
return <-m.queue
}
发送通道适配模块
不同通知类型的发送逻辑差异较大,这里使用接口抽象发送能力,新增通知类型时只需要实现对应的发送接口即可,不需要修改原有逻辑。
package sender
import "your_project/model"
// Sender 发送接口定义
type Sender interface {
Send(msg model.NotifyMessage) error
GetType() string
}
// SmsSender 短信发送实现
type SmsSender struct{}
func (s *SmsSender) GetType() string {
return "sms"
}
func (s *SmsSender) Send(msg model.NotifyMessage) error {
// 实际业务中调用短信服务商的API发送短信
// 这里模拟发送逻辑
println("send sms to " + msg.Receiver + ", content: " + msg.Content)
return nil
}
// EmailSender 邮件发送实现
type EmailSender struct{}
func (e *EmailSender) GetType() string {
return "email"
}
func (e *EmailSender) Send(msg model.NotifyMessage) error {
// 实际业务中调用邮件发送服务
println("send email to " + msg.Receiver + ", content: " + msg.Content)
return nil
}
发送处理器模块实现
发送处理器模块负责从队列中获取消息,匹配对应的发送器执行发送,同时处理发送失败的重试逻辑,使用goroutine并发处理提升发送效率。
package handler
import (
"errors"
"sync"
"your_project/model"
"your_project/queue"
"your_project/sender"
)
type SendHandler struct {
queue *queue.MessageQueue
senders map[string]sender.Sender
wg sync.WaitGroup
maxRetry int // 最大重试次数
}
// NewSendHandler 创建发送处理器
func NewSendHandler(q *queue.MessageQueue, maxRetry int) *SendHandler {
return &SendHandler{
queue: q,
senders: make(map[string]sender.Sender),
maxRetry: maxRetry,
}
}
// RegisterSender 注册发送器
func (h *SendHandler) RegisterSender(s sender.Sender) {
h.senders[s.GetType()] = s
}
// Start 启动处理器,启动指定数量的goroutine处理消息
func (h *SendHandler) Start(workerNum int) {
for i := 0; i < workerNum; i++ {
h.wg.Add(1)
go func() {
defer h.wg.Done()
for {
msg := h.queue.Pop()
h.processMsg(msg)
}
}()
}
}
// processMsg 处理单条消息
func (h *SendHandler) processMsg(msg model.NotifyMessage) {
s, ok := h.senders[msg.Type]
if !ok {
println("unsupported notify type: " + msg.Type)
return
}
err := s.Send(msg)
if err != nil {
// 发送失败,判断是否重试
if msg.RetryTime < h.maxRetry {
msg.RetryTime++
// 重新推入队列重试,实际业务中可以加延迟重试逻辑
h.queue.Push(msg)
} else {
println("message send failed after retry, id: " + msg.ID)
}
}
}
完整调用示例
以下是整合所有模块的完整调用示例,模拟上游业务发送通知请求的流程。
package main
import (
"your_project/handler"
"your_project/model"
"your_project/queue"
"your_project/sender"
"time"
)
func main() {
// 初始化消息队列,容量设置为1000
msgQueue := queue.NewMessageQueue(1000)
// 初始化发送处理器,最大重试3次
sendHandler := handler.NewSendHandler(msgQueue, 3)
// 注册发送器
sendHandler.RegisterSender(&sender.SmsSender{})
sendHandler.RegisterSender(&sender.EmailSender{})
// 启动3个处理协程
sendHandler.Start(3)
// 模拟上游业务发送通知
for i := 0; i < 10; i++ {
msg := model.NotifyMessage{
ID: string(rune(i)),
Type: "sms",
Receiver: "13800138000",
Content: "您的验证码是123456,5分钟内有效",
Extra: nil,
RetryTime: 0,
}
err := msgQueue.Push(msg)
if err != nil {
println("push message failed: " + err.Error())
}
}
// 等待消息处理完成,实际业务中不需要这样阻塞
time.Sleep(5 * time.Second)
}
扩展优化建议
上述实现是基础版本,实际生产环境中可以根据需求做以下优化:
- 消息队列可以替换为持久化的消息中间件,比如Kafka、RabbitMQ,避免服务重启后消息丢失
- 增加限流逻辑,避免短时间内大量发送通知导致第三方服务限流
- 发送处理器可以增加失败消息的持久化存储,方便后续手动重试
- 监控统计模块可以对接Prometheus等监控工具,实时查看系统运行状态