基于消息队列的任务解耦系统通过引入中间消息载体,让生产者模块和消费者模块不需要直接依赖对方,只需要和消息队列交互,从而降低模块间的耦合度,提升系统的可扩展性和容错能力。这种架构模式在异步处理、流量削峰、分布式系统通信等场景中应用非常广泛。

核心架构组成
整个系统主要包含三个核心部分:
- 消息队列:负责存储生产者发送的消息,提供消息的入队、出队、持久化等能力,保证消息的可靠传递。
- 生产者:产生业务任务的模块,不需要关心任务由谁处理,只需要将任务封装成消息发送到消息队列即可。
- 消费者:处理任务的模块,从消息队列中获取消息,解析后执行对应的业务逻辑,不需要知道任务由谁产生。
基础消息队列封装
首先我们实现一个线程安全的简单消息队列类,作为整个系统的基础组件,支持消息的入队和出队操作,内部使用互斥锁和条件变量保证多线程场景下的安全性。
#include <queue>
#include <mutex>
#include <condition_variable>
#include <string>
#include <iostream>
// 消息结构体,包含消息类型和消息内容
struct Message {
int type; // 消息类型,用于区分不同业务
std::string content; // 消息内容
};
class MessageQueue {
private:
std::queue<Message> queue_; // 存储消息的队列
mutable std::mutex mutex_; // 互斥锁,保护队列操作
std::condition_variable cond_; // 条件变量,用于消费者等待消息
bool stop_flag_; // 停止标志,用于控制队列关闭
public:
MessageQueue() : stop_flag_(false) {}
// 发送消息到队列
void push(const Message& msg) {
std::lock_guard<std::mutex> lock(mutex_);
if (stop_flag_) {
std::cout << "消息队列已停止,无法发送消息" << std::endl;
return;
}
queue_.push(msg);
cond_.notify_one(); // 通知一个等待的消费者
}
// 从队列获取消息,队列为空时阻塞等待
bool pop(Message& msg) {
std::unique_lock<std::mutex> lock(mutex_);
// 等待直到队列非空或者停止标志被设置
cond_.wait(lock, [this]() { return !queue_.empty() || stop_flag_; });
if (stop_flag_ && queue_.empty()) {
return false; // 队列停止且没有消息,返回失败
}
msg = queue_.front();
queue_.pop();
return true;
}
// 停止消息队列,唤醒所有等待的消费者
void stop() {
std::lock_guard<std::mutex> lock(mutex_);
stop_flag_ = true;
cond_.notify_all();
}
};
生产者模块实现
生产者模块负责生成业务任务,将任务封装成消息后发送到消息队列,不需要依赖任何消费者相关的逻辑。
#include <thread>
#include <chrono>
#include <random>
// 生产者类,模拟产生不同类型的任务消息
class Producer {
private:
MessageQueue& mq_; // 引用消息队列,生产者向该队列发送消息
int producer_id_; // 生产者唯一标识
public:
Producer(MessageQueue& mq, int id) : mq_(mq), producer_id_(id) {}
// 启动生产者,持续生成消息
void run() {
std::default_random_engine generator(producer_id_);
std::uniform_int_distribution<int> type_dist(1, 3); // 消息类型1-3
std::uniform_int_distribution<int> sleep_dist(100, 500); // 随机休眠时间
for (int i = 0; i < 5; ++i) { // 每个生产者生成5条消息
Message msg;
msg.type = type_dist(generator);
msg.content = "生产者" + std::to_string(producer_id_) + "生成的任务" + std::to_string(i + 1);
mq_.push(msg);
std::cout << "生产者" << producer_id_ << "发送消息:类型" << msg.type << ",内容:" << msg.content << std::endl;
// 随机休眠一段时间,模拟业务产生的间隔
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_dist(generator)));
}
std::cout << "生产者" << producer_id_ << "任务生成完成" << std::endl;
}
};
消费者模块实现
消费者模块从消息队列中获取消息,根据消息类型执行对应的业务逻辑,不需要关心消息由哪个生产者产生。
// 消费者类,从消息队列获取消息并处理
class Consumer {
private:
MessageQueue& mq_; // 引用消息队列,消费者从该队列获取消息
int consumer_id_; // 消费者唯一标识
public:
Consumer(MessageQueue& mq, int id) : mq_(mq), consumer_id_(id) {}
// 启动消费者,持续处理消息
void run() {
Message msg;
while (mq_.pop(msg)) { // 循环获取消息,队列停止且为空时退出
std::cout << "消费者" << consumer_id_ << "收到消息:类型" << msg.type << ",内容:" << msg.content << std::endl;
// 根据消息类型处理不同业务逻辑
switch (msg.type) {
case 1:
std::cout << "消费者" << consumer_id_ << "处理类型1任务:数据校验" << std::endl;
break;
case 2:
std::cout << "消费者" << consumer_id_ << "处理类型2任务:文件写入" << std::endl;
break;
case 3:
std::cout << "消费者" << consumer_id_ << "处理类型3任务:日志上报" << std::endl;
break;
default:
std::cout << "消费者" << consumer_id_ << "收到未知类型消息" << std::endl;
}
// 模拟任务处理耗时
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
std::cout << "消费者" << consumer_id_ << "任务处理完成" << std::endl;
}
};
系统整合与运行
我们将消息队列、生产者和消费者整合起来,启动多个生产者和消费者线程,验证整个解耦系统的运行效果。
int main() {
MessageQueue mq; // 创建消息队列实例
// 创建2个生产者
Producer producer1(mq, 1);
Producer producer2(mq, 2);
// 创建3个消费者
Consumer consumer1(mq, 1);
Consumer consumer2(mq, 2);
Consumer consumer3(mq, 3);
// 启动生产者和消费者线程
std::thread producer_thread1([&producer1]() { producer1.run(); });
std::thread producer_thread2([&producer2]() { producer2.run(); });
std::thread consumer_thread1([&consumer1]() { consumer1.run(); });
std::thread consumer_thread2([&consumer2]() { consumer2.run(); });
std::thread consumer_thread3([&consumer3]() { consumer3.run(); });
// 等待生产者线程完成
producer_thread1.join();
producer_thread2.join();
// 所有生产者完成后,停止消息队列,让消费者处理完剩余消息后退出
mq.stop();
// 等待消费者线程完成
consumer_thread1.join();
consumer_thread2.join();
consumer_thread3.join();
std::cout << "基于消息队列的任务解耦系统运行结束" << std::endl;
return 0;
}
架构模式优势与注意事项
这种基于消息队列的任务解耦架构模式有以下明显优势:
- 模块间解耦:生产者和消费者不需要互相依赖,修改其中一个模块不会影响另一个模块。
- 支持异步处理:生产者发送消息后不需要等待消费者处理完成,提升系统响应速度。
- 易于扩展:可以增加生产者或者消费者的数量来应对业务量的变化,不需要修改核心逻辑。
实际使用中需要注意几个问题:消息队列需要考虑持久化避免消息丢失,要处理消息重复消费的问题,同时要根据业务场景合理设置消费者的数量,避免资源浪费或者处理不及时的情况。