Actor模型是一种经典的并发计算模型,核心思想是每个Actor都是独立的执行单元,拥有自己的内部状态和邮箱,Actor之间仅通过异步消息传递进行通信,不共享任何可变状态,从根源上避免了共享内存带来的竞态条件和锁开销问题。在高并发场景下,这种模型能有效提升程序的扩展性和稳定性。

Actor模型核心组件
要在C++中实现Actor模型,需要先明确几个核心组成部分:
- Actor实例:独立运行的实体,包含内部状态和执行逻辑
- 邮箱:存储其他Actor发送过来的消息,通常是线程安全的队列
- 消息:Actor之间传递的数据载体,需要支持不同的消息类型
- 消息循环:Actor不断从邮箱中取出消息并处理的逻辑
基础实现设计
消息基类定义
首先定义统一的消息基类,所有具体消息都继承自该类,方便邮箱统一存储:
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <thread>
#include <unordered_map>
// 消息基类
class Message {
public:
virtual ~Message() = default;
// 消息类型标识,用于区分不同消息
virtual int get_type() const = 0;
};
// 具体消息示例:整型消息
class IntMessage : public Message {
public:
IntMessage(int val) : value(val) {}
int get_type() const override { return 1; }
int value;
};
// 具体消息示例:字符串消息
class StringMessage : public Message {
public:
StringMessage(const std::string& val) : value(val) {}
int get_type() const override { return 2; }
std::string value;
};
线程安全邮箱实现
邮箱需要支持多线程下的消息入队和出队操作,因此要加锁保证线程安全:
// 线程安全邮箱
class Mailbox {
public:
// 推送消息到邮箱
void push(std::unique_ptr<Message> msg) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.emplace(std::move(msg));
cond_var_.notify_one();
}
// 从邮箱取出消息,邮箱为空时阻塞等待
std::unique_ptr<Message> pop() {
std::unique_lock<std::mutex> lock(mutex_);
cond_var_.wait(lock, [this] { return !queue_.empty(); });
auto msg = std::move(queue_.front());
queue_.pop();
return msg;
}
// 判断邮箱是否为空
bool empty() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
private:
std::queue<std::unique_ptr<Message>> queue_;
mutable std::mutex mutex_;
std::condition_variable cond_var_;
};
Actor基类实现
Actor基类封装了邮箱、消息循环和生命周期管理逻辑:
// Actor基类
class Actor {
public:
Actor() : running_(false) {}
virtual ~Actor() { stop(); }
// 启动Actor的消息循环线程
void start() {
if (running_) return;
running_ = true;
thread_ = std::thread(&Actor::run, this);
}
// 停止Actor
void stop() {
if (!running_) return;
running_ = false;
// 发送一个空消息唤醒等待的线程,方便退出
mailbox_.push(nullptr);
if (thread_.joinable()) {
thread_.join();
}
}
// 向当前Actor发送消息
void send(std::unique_ptr<Message> msg) {
if (!running_) return;
mailbox_.push(std::move(msg));
}
protected:
// 子类重写该方法处理不同类型的消息
virtual void handle_message(Message* msg) = 0;
private:
void run() {
while (running_) {
auto msg = mailbox_.pop();
if (!msg) continue; // 空消息跳过
handle_message(msg.get());
}
}
Mailbox mailbox_;
std::thread thread_;
bool running_;
};
具体Actor实现示例
下面实现一个简单的计数器Actor,支持接收整型消息进行累加,以及接收字符串消息打印当前计数:
// 计数器Actor示例
class CounterActor : public Actor {
protected:
void handle_message(Message* msg) override {
if (!msg) return;
switch (msg->get_type()) {
case 1: { // 处理整型消息
auto* int_msg = static_cast<IntMessage*>(msg);
count_ += int_msg->value;
std::cout << "CounterActor receive int: " << int_msg->value
<< ", current count: " << count_ << std::endl;
break;
}
case 2: { // 处理字符串消息
auto* str_msg = static_cast<StringMessage*>(msg);
std::cout << "CounterActor receive string: " << str_msg->value
<< ", current count: " << count_ << std::endl;
break;
}
default:
std::cout << "CounterActor receive unknown message type" << std::endl;
}
}
private:
int count_ = 0;
};
高并发场景优化
上述基础实现已经能满足简单的Actor模型需求,在高并发场景下还可以做以下优化:
- 使用无锁队列替代互斥锁实现的邮箱,减少锁竞争开销
- 引入Actor调度器,统一管理多个Actor的线程资源,避免创建过多线程
- 消息传递使用零拷贝技术,减少大消息的内存复制开销
- 支持Actor的优先级队列,让高优先级消息优先被处理
使用示例
以下是完整的测试代码,演示两个Actor的启动和消息发送:
int main() {
// 创建两个计数器Actor
CounterActor actor1;
CounterActor actor2;
// 启动Actor
actor1.start();
actor2.start();
// 向actor1发送消息
actor1.send(std::make_unique<IntMessage>(10));
actor1.send(std::make_unique<StringMessage>("test message for actor1"));
actor1.send(std::make_unique<IntMessage>(20));
// 向actor2发送消息
actor2.send(std::make_unique<IntMessage>(5));
actor2.send(std::make_unique<StringMessage>("hello actor2"));
// 等待消息处理完成
std::this_thread::sleep_for(std::chrono::seconds(1));
// 停止Actor
actor1.stop();
actor2.stop();
return 0;
}
通过这种实现方式,开发者可以在C++项目中快速搭建基于Actor模型的高并发系统,各个Actor之间状态隔离,仅通过消息通信,大幅降低了并发编程的复杂度。