ActiveMQ是Apache开源的消息中间件,支持多种消息协议,常用于分布式系统间的异步通信、解耦服务以及流量削峰场景。Python作为常用的后端开发语言,可以通过STOMP协议与ActiveMQ进行交互,实现消息的发送与接收。

环境准备
要使用Python操作ActiveMQ,首先需要启动ActiveMQ服务,默认情况下ActiveMQ会开启STOMP协议的61613端口。然后需要安装Python的STOMP客户端依赖,推荐使用stomp.py库,安装命令如下:
pip install stomp.py
实现消息生产者
消息生产者负责向ActiveMQ的指定队列发送消息,核心步骤包括建立连接、创建会话、发送消息、关闭连接。以下是完整的生产者代码示例:
import stomp
# 定义消息生产者类
class ActiveMQProducer:
def __init__(self, host="127.0.0.1", port=61613):
# 创建连接对象
self.conn = stomp.Connection([(host, port)])
# 设置消息监听器,处理连接相关事件
self.conn.set_listener("", self.ProducerListener())
# 连接ActiveMQ,默认用户名密码均为admin
self.conn.connect("admin", "admin", wait=True)
# 定义内部监听器类,处理连接状态变化
class ProducerListener(stomp.ConnectionListener):
def on_error(self, headers, message):
print(f"接收错误消息: {message}")
def on_connected(self, headers, body):
print("生产者成功连接到ActiveMQ")
# 发送消息方法
def send_message(self, queue_name, message):
# 发送消息到指定队列,设置消息持久化
self.conn.send(
body=message,
destination=f"/queue/{queue_name}",
headers={"persistent": "true"}
)
print(f"消息已发送到队列 {queue_name}: {message}")
# 关闭连接方法
def close(self):
if self.conn.is_connected():
self.conn.disconnect()
print("生产者连接已关闭")
if __name__ == "__main__":
# 初始化生产者
producer = ActiveMQProducer(host="127.0.0.1", port=61613)
# 发送测试消息
producer.send_message("test_queue", "这是一条来自Python生产者的测试消息")
# 关闭连接
producer.close()
实现消息消费者
消息消费者负责从ActiveMQ的指定队列订阅并接收消息,需要注册消息处理函数来处理接收到的消息。以下是完整的消费者代码示例:
import stomp
import time
# 定义消息消费者类
class ActiveMQConsumer:
def __init__(self, host="127.0.0.1", port=61613):
self.conn = stomp.Connection([(host, port)])
self.conn.set_listener("", self.ConsumerListener())
self.conn.connect("admin", "admin", wait=True)
# 定义内部监听器类,处理接收的消息
class ConsumerListener(stomp.ConnectionListener):
def on_message(self, headers, message):
print(f"接收到消息: {message}")
# 手动确认消息,确保消息被正确处理
# 这里的ack方法需要根据协议版本调整,STOMP 1.2默认自动确认
# 如果需要手动确认,连接时可以指定协议版本
# self.conn.ack(headers["message-id"], headers["subscription"])
def on_error(self, headers, message):
print(f"接收错误消息: {message}")
# 订阅队列方法
def subscribe_queue(self, queue_name):
# 订阅指定队列,默认使用客户端确认模式
self.conn.subscribe(
destination=f"/queue/{queue_name}",
id=1,
ack="auto"
)
print(f"已订阅队列: {queue_name}")
# 保持连接运行,持续接收消息
def run(self):
while True:
time.sleep(1)
# 关闭连接方法
def close(self):
if self.conn.is_connected():
self.conn.disconnect()
print("消费者连接已关闭")
if __name__ == "__main__":
# 初始化消费者
consumer = ActiveMQConsumer(host="127.0.0.1", port=61613)
# 订阅测试队列
consumer.subscribe_queue("test_queue")
try:
# 保持程序运行,持续接收消息
consumer.run()
except KeyboardInterrupt:
consumer.close()
常见场景扩展
消息持久化
如果需要消息在服务重启后不丢失,可以在发送消息时设置persistent为true,如上面生产者代码中的设置,ActiveMQ会将消息存储到磁盘中。
主题订阅模式
除了队列的点对点模式,ActiveMQ还支持发布订阅模式,只需要将目的地从/queue/队列名改为/topic/主题名即可,生产者发送消息到主题,所有订阅该主题的消费者都会收到消息。
连接异常处理
实际生产环境中需要添加重连机制,可以在监听器的on_disconnected方法中实现自动重连逻辑,避免因为网络波动导致连接断开后无法接收消息。
PythonActiveMQ消息队列stomp_protocol修改时间:2026-06-12 15:12:15