Actor并发模型是经典的并发设计模式之一,核心思想是将并发单元抽象为独立的Actor,每个Actor拥有自己的状态,且状态不对外暴露,Actor之间仅通过发送消息进行通信,所有消息处理都是串行的,从根源上避免了多线程共享内存的锁竞争问题。C++本身没有内置Actor模型的实现,但可以通过标准库提供的线程、队列、互斥量等组件自行实现简单的Actor模型。

Actor模型的核心组件
要实现一个简单的Actor模型,需要明确几个核心组成部分:
- Actor主体:每个Actor是一个独立的运行单元,拥有自己的运行线程和内部状态。
- 消息队列:每个Actor持有一个线程安全的消息队列,用于接收其他Actor发送的消息。
- 消息类型:定义统一的消息结构,支持不同的消息内容和类型标识。
- 消息循环:Actor启动后持续从自己的消息队列中取出消息并处理,直到收到停止信号。
基础消息结构设计
首先定义通用的消息基类,所有具体消息都继承自该基类,方便消息队列统一存储和分发:
#include <string>
#include <memory>
#include <variant>
// 消息基类
class Message {
public:
virtual ~Message() = default;
// 消息类型标识
virtual std::string type() const = 0;
};
// 具体消息类型示例:字符串消息
class StringMessage : public Message {
public:
explicit StringMessage(std::string content) : content_(std::move(content)) {}
std::string type() const override {
return "StringMessage";
}
const std::string& content() const {
return content_;
}
private:
std::string content_;
};
// 具体消息类型示例:整数消息
class IntMessage : public Message {
public:
explicit IntMessage(int value) : value_(value) {}
std::string type() const override {
return "IntMessage";
}
int value() const {
return value_;
}
private:
int value_;
};
线程安全消息队列实现
Actor的消息队列需要支持多线程下的安全入队和出队操作,这里使用std::queue配合互斥量和条件变量实现:
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class ThreadSafeQueue {
public:
ThreadSafeQueue() = default;
~ThreadSafeQueue() = default;
// 禁止拷贝
ThreadSafeQueue(const ThreadSafeQueue&) = delete;
ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;
// 入队操作
void push(T item) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::move(item));
cond_var_.notify_one();
}
// 出队操作,队列为空时阻塞等待
T pop() {
std::unique_lock<std::mutex> lock(mutex_);
cond_var_.wait(lock, [this]() { return !queue_.empty() || stop_; });
if (stop_) {
return nullptr;
}
T item = std::move(queue_.front());
queue_.pop();
return item;
}
// 停止队列,唤醒所有等待的线程
void stop() {
std::lock_guard<std::mutex> lock(mutex_);
stop_ = true;
cond_var_.notify_all();
}
private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cond_var_;
bool stop_ = false;
};
简单Actor类实现
接下来实现Actor类,包含启动、发送消息、停止等核心方法,每个Actor运行在独立的线程中:
#include <thread>
#include <functional>
#include <unordered_map>
class Actor {
public:
using MessageHandler = std::function<void(const std::shared_ptr<Message>&)>;
Actor() : running_(false) {}
virtual ~Actor() {
stop();
}
// 启动Actor,开始消息循环
void start() {
if (running_) {
return;
}
running_ = true;
thread_ = std::thread([this]() {
message_loop();
});
}
// 发送消息到当前Actor的消息队列
void send(std::shared_ptr<Message> message) {
if (!running_) {
return;
}
message_queue_.push(std::move(message));
}
// 停止Actor
void stop() {
if (!running_) {
return;
}
running_ = false;
message_queue_.stop();
if (thread_.joinable()) {
thread_.join();
}
}
// 注册消息处理函数
void register_handler(const std::string& message_type, MessageHandler handler) {
handlers_[message_type] = std::move(handler);
}
protected:
// 消息循环逻辑
virtual void message_loop() {
while (true) {
auto message = message_queue_.pop();
if (!message) {
break;
}
auto it = handlers_.find(message->type());
if (it != handlers_.end()) {
it->second(message);
}
}
}
private:
ThreadSafeQueue<std::shared_ptr<Message>> message_queue_;
std::thread thread_;
std::unordered_map<std::string, MessageHandler> handlers_;
bool running_;
};
示例:两个Actor之间的消息通信
下面实现一个简单的示例,创建两个Actor,一个发送消息,一个接收并处理消息:
#include <iostream>
#include <chrono>
// 接收者Actor,处理接收到的消息
class ReceiverActor : public Actor {
public:
ReceiverActor() {
// 注册字符串消息处理函数
register_handler("StringMessage", [this](const std::shared_ptr<Message>& msg) {
auto string_msg = std::dynamic_pointer_cast<StringMessage>(msg);
if (string_msg) {
std::cout << "ReceiverActor received string message: " << string_msg->content() << std::endl;
}
});
// 注册整数消息处理函数
register_handler("IntMessage", [this](const std::shared_ptr<Message>& msg) {
auto int_msg = std::dynamic_pointer_cast<IntMessage>(msg);
if (int_msg) {
std::cout << "ReceiverActor received int message: " << int_msg->value() << std::endl;
}
});
}
};
// 发送者Actor,定时发送消息
class SenderActor : public Actor {
public:
void set_receiver(Actor* receiver) {
receiver_ = receiver;
}
void send_string(const std::string& content) {
if (receiver_) {
receiver_->send(std::make_shared<StringMessage>(content));
}
}
void send_int(int value) {
if (receiver_) {
receiver_->send(std::make_shared<IntMessage>(value));
}
}
private:
Actor* receiver_ = nullptr;
};
int main() {
// 创建接收者和发送者Actor
ReceiverActor receiver;
SenderActor sender;
sender.set_receiver(&receiver);
// 启动两个Actor
receiver.start();
sender.start();
// 发送测试消息
sender.send_string("Hello Actor Model");
sender.send_int(12345);
sender.send_string("C++ Actor Implementation");
// 等待消息处理完成
std::this_thread::sleep_for(std::chrono::seconds(1));
// 停止Actor
sender.stop();
receiver.stop();
return 0;
}
实现注意事项
上述实现是一个极简的Actor模型示例,实际使用中还需要考虑更多场景:
- 消息的所有权管理:示例中使用
shared_ptr管理消息生命周期,实际可以根据需求选择更合适的智能指针方案。 - 错误处理:消息处理过程中出现异常时需要有对应的兜底逻辑,避免单个Actor崩溃影响整个系统。
- Actor生命周期管理:需要明确Actor的创建、启动、停止、销毁的完整流程,避免线程泄漏。
- 消息优先级:如果业务需要,可以给消息队列增加优先级支持,让高优先级消息优先被处理。
这种基于消息传递的Actor模型在C++中可以有效规避多线程开发中的死锁、数据竞争等问题,适合高并发、分布式场景下的模块解耦设计。
C++Actor_concurrent_modelmessage_passingconcurrent_design_pattern修改时间:2026-06-20 04:48:28