在C++并发编程中,传统的共享内存加锁模式常常会带来死锁、数据竞争等难以排查的问题,Actor模型作为一种基于消息传递的并发范式,为解决这类问题提供了新的思路。每个Actor都是独立的执行单元,拥有自己的状态,不共享任何可变数据,仅通过异步消息进行通信,从根源上避免了共享状态引发的并发问题。本文将介绍如何在C++中实现一个简单的Actor模型。

Actor模型核心概念
实现Actor模型前需要先明确其核心组成,每个Actor实例都包含以下几个部分:
- 私有状态:Actor内部维护的变量,只有自身可以修改,外部无法直接访问
- 邮箱:用于接收其他Actor发送的消息,通常是一个线程安全的消息队列
- 消息处理逻辑:接收到消息后执行的处理函数,根据消息类型修改自身状态或发送新消息
- 执行循环:持续从邮箱中取出消息并处理的逻辑,一般运行在独立的线程中
基础组件实现
1. 消息基类设计
首先需要一个通用的消息基类,所有具体的消息类型都继承自这个类,方便邮箱统一存储和分发:
#include <iostream>
#include <memory>
#include <string>
// 消息基类,所有消息的父类
class Message {
public:
virtual ~Message() = default;
// 获取消息类型的标识,方便后续处理逻辑分发
virtual std::string type() const = 0;
};
// 示例:字符串消息类型
class StringMessage : public Message {
public:
explicit StringMessage(std::string content) : content_(std::move(content)) {}
std::string type() const override {
return "StringMessage";
}
const std::string& content() const {
return content_;
}
private:
std::string content_;
};
// 示例:整数消息类型
class IntMessage : public Message {
public:
explicit IntMessage(int value) : value_(value) {}
std::string type() const override {
return "IntMessage";
}
int value() const {
return value_;
}
private:
int value_;
};
2. 线程安全邮箱实现
邮箱需要支持多线程下的消息入队和出队操作,这里使用std::queue配合互斥锁和条件变量实现:
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class ThreadSafeQueue {
public:
// 入队操作,支持多线程调用
void push(T item) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::move(item));
// 通知等待的消费者有新消息到来
cond_var_.notify_one();
}
// 出队操作,队列为空时会阻塞等待
T pop() {
std::unique_lock<std::mutex> lock(mutex_);
// 等待队列非空
cond_var_.wait(lock, [this]() { return !queue_.empty(); });
T item = std::move(queue_.front());
queue_.pop();
return item;
}
// 判断队列是否为空
bool empty() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
private:
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable cond_var_;
};
简单Actor类实现
接下来实现核心的Actor类,整合邮箱、执行循环和消息处理逻辑:
#include <thread>
#include <functional>
#include <unordered_map>
class Actor {
public:
Actor() : running_(false) {}
virtual ~Actor() {
stop();
}
// 启动Actor的执行循环,在独立线程中运行
void start() {
if (running_) return;
running_ = true;
thread_ = std::thread(&Actor::run, this);
}
// 停止Actor,等待执行线程退出
void stop() {
if (!running_) return;
running_ = false;
// 发送一个空消息唤醒可能阻塞的pop操作
mailbox_.push(nullptr);
if (thread_.joinable()) {
thread_.join();
}
}
// 向当前Actor发送消息
void send(std::shared_ptr<Message> msg) {
if (!running_) return;
mailbox_.push(std::move(msg));
}
protected:
// 注册消息处理函数,子类可以调用这个方法绑定不同消息类型的处理逻辑
template <typename MsgType>
void register_handler(std::function<void(const MsgType&)> handler) {
handlers_[MsgType().type()] = [handler](const std::shared_ptr<Message>& msg) {
// 将基类消息指针转换为具体类型
auto derived_msg = std::dynamic_pointer_cast<MsgType>(msg);
if (derived_msg) {
handler(*derived_msg);
}
};
}
private:
// Actor的执行循环,不断从邮箱取消息处理
void run() {
while (running_) {
auto msg = mailbox_.pop();
// 收到空消息表示需要退出
if (!msg) break;
// 根据消息类型调用对应的处理函数
auto it = handlers_.find(msg->type());
if (it != handlers_.end()) {
it->second(msg);
} else {
// 没有对应处理函数的消息,可自定义默认处理逻辑
std::cout << "Actor received unknown message type: " << msg->type() << std::endl;
}
}
}
// 邮箱,存储接收到的消息
ThreadSafeQueue<std::shared_ptr<Message>> mailbox_;
// 消息类型到处理函数的映射
std::unordered_map<std::string, std::function<void(const std::shared_ptr<Message>&)>> handlers_;
// Actor运行的线程
std::thread thread_;
// 运行标志
bool running_;
};
示例:实现一个计数Actor
下面通过一个具体的子类示例,展示如何使用上面的Actor基类实现一个简单的计数Actor,它可以接收整数消息累加数值,也可以接收字符串消息打印当前状态:
class CounterActor : public Actor {
public:
CounterActor() : count_(0) {
// 注册整数消息的处理逻辑:累加计数
register_handler<IntMessage>([this](const IntMessage& msg) {
count_ += msg.value();
std::cout << "CounterActor add " << msg.value() << ", current count: " << count_ << std::endl;
});
// 注册字符串消息的处理逻辑:打印当前计数
register_handler<StringMessage>([this](const StringMessage& msg) {
std::cout << "CounterActor receive string: " << msg.content() << ", current count: " << count_ << std::endl;
});
}
private:
int count_;
};
测试代码
最后编写测试代码,验证整个Actor模型的运行效果:
int main() {
// 创建计数Actor实例并启动
CounterActor counter;
counter.start();
// 向Actor发送不同类型的消息
counter.send(std::make_shared<IntMessage>(10));
counter.send(std::make_shared<StringMessage>("check current count"));
counter.send(std::make_shared<IntMessage>(5));
counter.send(std::make_shared<StringMessage>("check again"));
// 等待消息处理完成,实际使用中可以根据需求调整等待逻辑
std::this_thread::sleep_for(std::chrono::seconds(1));
// 停止Actor
counter.stop();
return 0;
}
实现注意事项
上述实现是一个最简化的Actor模型,实际生产使用中还需要考虑更多场景:
- 消息处理的异常捕获,避免单个消息处理失败导致整个Actor退出
- 支持Actor的地址管理,方便不同Actor之间互相发送消息
- 邮箱的容量限制,避免消息过多导致内存溢出
- 支持同步消息请求,即发送消息后等待返回结果的需求
这种基于消息传递的Actor模型,在C++中可以有效减少锁的使用,降低并发编程的复杂度,适合构建高并发、分布式的程序场景。