C++怎么实现一个简单的Actor并发模型

来源:网络学院作者:阿亮头衔:草根站长
导读:本期聚焦于小伙伴创作的《C++怎么实现一个简单的Actor并发模型》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《C++怎么实现一个简单的Actor并发模型》有用,将其分享出去将是对创作者最好的鼓励。

Actor并发模型是经典的并发设计模式之一,核心思想是将并发单元抽象为独立的Actor,每个Actor拥有自己的状态,且状态不对外暴露,Actor之间仅通过发送消息进行通信,所有消息处理都是串行的,从根源上避免了多线程共享内存的锁竞争问题。C++本身没有内置Actor模型的实现,但可以通过标准库提供的线程、队列、互斥量等组件自行实现简单的Actor模型。

C++怎么实现一个简单的Actor并发模型

Actor模型的核心组件

要实现一个简单的Actor模型,需要明确几个核心组成部分:

  • Actor主体:每个Actor是一个独立的运行单元,拥有自己的运行线程和内部状态。
  • 消息队列:每个Actor持有一个线程安全的消息队列,用于接收其他Actor发送的消息。
  • 消息类型:定义统一的消息结构,支持不同的消息内容和类型标识。
  • 消息循环:Actor启动后持续从自己的消息队列中取出消息并处理,直到收到停止信号。

基础消息结构设计

首先定义通用的消息基类,所有具体消息都继承自该基类,方便消息队列统一存储和分发:

#include <string>
#include <memory>
#include <variant>

// 消息基类
class Message {
public:
    virtual ~Message() = default;
    // 消息类型标识
    virtual std::string type() const = 0;
};

// 具体消息类型示例:字符串消息
class StringMessage : public Message {
public:
    explicit StringMessage(std::string content) : content_(std::move(content)) {}
    std::string type() const override {
        return "StringMessage";
    }
    const std::string& content() const {
        return content_;
    }
private:
    std::string content_;
};

// 具体消息类型示例:整数消息
class IntMessage : public Message {
public:
    explicit IntMessage(int value) : value_(value) {}
    std::string type() const override {
        return "IntMessage";
    }
    int value() const {
        return value_;
    }
private:
    int value_;
};

线程安全消息队列实现

Actor的消息队列需要支持多线程下的安全入队和出队操作,这里使用std::queue配合互斥量和条件变量实现:

#include <queue>
#include <mutex>
#include <condition_variable>

template <typename T>
class ThreadSafeQueue {
public:
    ThreadSafeQueue() = default;
    ~ThreadSafeQueue() = default;

    // 禁止拷贝
    ThreadSafeQueue(const ThreadSafeQueue&) = delete;
    ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;

    // 入队操作
    void push(T item) {
        std::lock_guard<std::mutex> lock(mutex_);
        queue_.push(std::move(item));
        cond_var_.notify_one();
    }

    // 出队操作,队列为空时阻塞等待
    T pop() {
        std::unique_lock<std::mutex> lock(mutex_);
        cond_var_.wait(lock, [this]() { return !queue_.empty() || stop_; });
        if (stop_) {
            return nullptr;
        }
        T item = std::move(queue_.front());
        queue_.pop();
        return item;
    }

    // 停止队列,唤醒所有等待的线程
    void stop() {
        std::lock_guard<std::mutex> lock(mutex_);
        stop_ = true;
        cond_var_.notify_all();
    }

private:
    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable cond_var_;
    bool stop_ = false;
};

简单Actor类实现

接下来实现Actor类,包含启动、发送消息、停止等核心方法,每个Actor运行在独立的线程中:

#include <thread>
#include <functional>
#include <unordered_map>

class Actor {
public:
    using MessageHandler = std::function<void(const std::shared_ptr<Message>&)>;

    Actor() : running_(false) {}

    virtual ~Actor() {
        stop();
    }

    // 启动Actor,开始消息循环
    void start() {
        if (running_) {
            return;
        }
        running_ = true;
        thread_ = std::thread([this]() {
            message_loop();
        });
    }

    // 发送消息到当前Actor的消息队列
    void send(std::shared_ptr<Message> message) {
        if (!running_) {
            return;
        }
        message_queue_.push(std::move(message));
    }

    // 停止Actor
    void stop() {
        if (!running_) {
            return;
        }
        running_ = false;
        message_queue_.stop();
        if (thread_.joinable()) {
            thread_.join();
        }
    }

    // 注册消息处理函数
    void register_handler(const std::string& message_type, MessageHandler handler) {
        handlers_[message_type] = std::move(handler);
    }

protected:
    // 消息循环逻辑
    virtual void message_loop() {
        while (true) {
            auto message = message_queue_.pop();
            if (!message) {
                break;
            }
            auto it = handlers_.find(message->type());
            if (it != handlers_.end()) {
                it->second(message);
            }
        }
    }

private:
    ThreadSafeQueue<std::shared_ptr<Message>> message_queue_;
    std::thread thread_;
    std::unordered_map<std::string, MessageHandler> handlers_;
    bool running_;
};

示例:两个Actor之间的消息通信

下面实现一个简单的示例,创建两个Actor,一个发送消息,一个接收并处理消息:

#include <iostream>
#include <chrono>

// 接收者Actor,处理接收到的消息
class ReceiverActor : public Actor {
public:
    ReceiverActor() {
        // 注册字符串消息处理函数
        register_handler("StringMessage", [this](const std::shared_ptr<Message>& msg) {
            auto string_msg = std::dynamic_pointer_cast<StringMessage>(msg);
            if (string_msg) {
                std::cout << "ReceiverActor received string message: " << string_msg->content() << std::endl;
            }
        });
        // 注册整数消息处理函数
        register_handler("IntMessage", [this](const std::shared_ptr<Message>& msg) {
            auto int_msg = std::dynamic_pointer_cast<IntMessage>(msg);
            if (int_msg) {
                std::cout << "ReceiverActor received int message: " << int_msg->value() << std::endl;
            }
        });
    }
};

// 发送者Actor,定时发送消息
class SenderActor : public Actor {
public:
    void set_receiver(Actor* receiver) {
        receiver_ = receiver;
    }

    void send_string(const std::string& content) {
        if (receiver_) {
            receiver_->send(std::make_shared<StringMessage>(content));
        }
    }

    void send_int(int value) {
        if (receiver_) {
            receiver_->send(std::make_shared<IntMessage>(value));
        }
    }

private:
    Actor* receiver_ = nullptr;
};

int main() {
    // 创建接收者和发送者Actor
    ReceiverActor receiver;
    SenderActor sender;
    sender.set_receiver(&receiver);

    // 启动两个Actor
    receiver.start();
    sender.start();

    // 发送测试消息
    sender.send_string("Hello Actor Model");
    sender.send_int(12345);
    sender.send_string("C++ Actor Implementation");

    // 等待消息处理完成
    std::this_thread::sleep_for(std::chrono::seconds(1));

    // 停止Actor
    sender.stop();
    receiver.stop();

    return 0;
}

实现注意事项

上述实现是一个极简的Actor模型示例,实际使用中还需要考虑更多场景:

  • 消息的所有权管理:示例中使用shared_ptr管理消息生命周期,实际可以根据需求选择更合适的智能指针方案。
  • 错误处理:消息处理过程中出现异常时需要有对应的兜底逻辑,避免单个Actor崩溃影响整个系统。
  • Actor生命周期管理:需要明确Actor的创建、启动、停止、销毁的完整流程,避免线程泄漏。
  • 消息优先级:如果业务需要,可以给消息队列增加优先级支持,让高优先级消息优先被处理。

这种基于消息传递的Actor模型在C++中可以有效规避多线程开发中的死锁、数据竞争等问题,适合高并发、分布式场景下的模块解耦设计。

C++Actor_concurrent_modelmessage_passingconcurrent_design_pattern修改时间:2026-06-20 04:48:28

免责声明:​ 已尽一切努力确保本网站所含信息的准确性。网站内容多为原创整理与精心编撰,观点力求客观中立。本站旨在免费分享,内容仅供个人学习、研究或参考使用。若引用了第三方作品,版权归原作者所有。如内容涉及您的权益,请联系我们处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。AI、前端、编程、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握开发与运维所需的核心技术。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端编程,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。