如何用Python操作ActiveMQ实现消息队列集成

来源:微信开发网作者:香港程序员头衔:程序员
导读:本期聚焦于小伙伴创作的《如何用Python操作ActiveMQ实现消息队列集成》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《如何用Python操作ActiveMQ实现消息队列集成》有用,将其分享出去将是对创作者最好的鼓励。

ActiveMQ是Apache开源的消息中间件,支持多种消息协议,常用于分布式系统间的异步通信、解耦服务以及流量削峰场景。Python作为常用的后端开发语言,可以通过STOMP协议与ActiveMQ进行交互,实现消息的发送与接收。

如何用Python操作ActiveMQ实现消息队列集成

环境准备

要使用Python操作ActiveMQ,首先需要启动ActiveMQ服务,默认情况下ActiveMQ会开启STOMP协议的61613端口。然后需要安装Python的STOMP客户端依赖,推荐使用stomp.py库,安装命令如下:

pip install stomp.py

实现消息生产者

消息生产者负责向ActiveMQ的指定队列发送消息,核心步骤包括建立连接、创建会话、发送消息、关闭连接。以下是完整的生产者代码示例:

import stomp

# 定义消息生产者类
class ActiveMQProducer:
    def __init__(self, host="127.0.0.1", port=61613):
        # 创建连接对象
        self.conn = stomp.Connection([(host, port)])
        # 设置消息监听器,处理连接相关事件
        self.conn.set_listener("", self.ProducerListener())
        # 连接ActiveMQ,默认用户名密码均为admin
        self.conn.connect("admin", "admin", wait=True)

    # 定义内部监听器类,处理连接状态变化
    class ProducerListener(stomp.ConnectionListener):
        def on_error(self, headers, message):
            print(f"接收错误消息: {message}")

        def on_connected(self, headers, body):
            print("生产者成功连接到ActiveMQ")

    # 发送消息方法
    def send_message(self, queue_name, message):
        # 发送消息到指定队列,设置消息持久化
        self.conn.send(
            body=message,
            destination=f"/queue/{queue_name}",
            headers={"persistent": "true"}
        )
        print(f"消息已发送到队列 {queue_name}: {message}")

    # 关闭连接方法
    def close(self):
        if self.conn.is_connected():
            self.conn.disconnect()
            print("生产者连接已关闭")

if __name__ == "__main__":
    # 初始化生产者
    producer = ActiveMQProducer(host="127.0.0.1", port=61613)
    # 发送测试消息
    producer.send_message("test_queue", "这是一条来自Python生产者的测试消息")
    # 关闭连接
    producer.close()

实现消息消费者

消息消费者负责从ActiveMQ的指定队列订阅并接收消息,需要注册消息处理函数来处理接收到的消息。以下是完整的消费者代码示例:

import stomp
import time

# 定义消息消费者类
class ActiveMQConsumer:
    def __init__(self, host="127.0.0.1", port=61613):
        self.conn = stomp.Connection([(host, port)])
        self.conn.set_listener("", self.ConsumerListener())
        self.conn.connect("admin", "admin", wait=True)

    # 定义内部监听器类,处理接收的消息
    class ConsumerListener(stomp.ConnectionListener):
        def on_message(self, headers, message):
            print(f"接收到消息: {message}")
            # 手动确认消息,确保消息被正确处理
            # 这里的ack方法需要根据协议版本调整,STOMP 1.2默认自动确认
            # 如果需要手动确认,连接时可以指定协议版本
            # self.conn.ack(headers["message-id"], headers["subscription"])

        def on_error(self, headers, message):
            print(f"接收错误消息: {message}")

    # 订阅队列方法
    def subscribe_queue(self, queue_name):
        # 订阅指定队列,默认使用客户端确认模式
        self.conn.subscribe(
            destination=f"/queue/{queue_name}",
            id=1,
            ack="auto"
        )
        print(f"已订阅队列: {queue_name}")

    # 保持连接运行,持续接收消息
    def run(self):
        while True:
            time.sleep(1)

    # 关闭连接方法
    def close(self):
        if self.conn.is_connected():
            self.conn.disconnect()
            print("消费者连接已关闭")

if __name__ == "__main__":
    # 初始化消费者
    consumer = ActiveMQConsumer(host="127.0.0.1", port=61613)
    # 订阅测试队列
    consumer.subscribe_queue("test_queue")
    try:
        # 保持程序运行,持续接收消息
        consumer.run()
    except KeyboardInterrupt:
        consumer.close()

常见场景扩展

消息持久化

如果需要消息在服务重启后不丢失,可以在发送消息时设置persistenttrue,如上面生产者代码中的设置,ActiveMQ会将消息存储到磁盘中。

主题订阅模式

除了队列的点对点模式,ActiveMQ还支持发布订阅模式,只需要将目的地从/queue/队列名改为/topic/主题名即可,生产者发送消息到主题,所有订阅该主题的消费者都会收到消息。

连接异常处理

实际生产环境中需要添加重连机制,可以在监听器的on_disconnected方法中实现自动重连逻辑,避免因为网络波动导致连接断开后无法接收消息。

PythonActiveMQ消息队列stomp_protocol修改时间:2026-06-12 15:12:15

免责声明:​ 已尽一切努力确保本网站所含信息的准确性。网站内容多为原创整理与精心编撰,观点力求客观中立。本站旨在免费分享,内容仅供个人学习、研究或参考使用。若引用了第三方作品,版权归原作者所有。如内容涉及您的权益,请联系我们处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。AI、前端、编程、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握开发与运维所需的核心技术。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端编程,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。