使用 Redis 流实现消息队列
Redis 流(Stream)是 Redis 5.0 引入的数据结构,天生适合实现消息队列场景,支持持久化、消息确认、消费组等特性,相比传统的发布订阅模式,它能保证消息不丢失,适合对可靠性要求较高的业务场景。
核心概念说明
在使用 Redis 流实现消息队列前,需要了解几个核心概念:
流(Stream):消息的存储载体,类似一个仅追加的日志文件,每条消息都有唯一的 ID。
消息 ID:默认由 Redis 生成,格式为 时间戳-序列号,也可以自定义 ID,但需要保证单调递增。
消费者组(Consumer Group):多个消费者可以组成一个消费组,组内的消息会被分摊给不同的消费者,且每个消费者有自己的待确认消息列表。
待确认列表(Pending Entries List,PEL):消费者读取消息后,如果未确认,消息会存储在该列表中,避免消息丢失。
环境准备
需要提前安装 Redis 5.0 及以上版本,同时准备对应编程语言的 Redis 客户端,本文以 Python 为例,使用 redis-py 客户端,安装命令如下:
pip install redis
基础实现示例
1. 生产者发送消息
生产者负责向 Redis 流中写入消息,示例代码如下:
import redis
# 连接 Redis 服务,默认地址为 https://www.ipipp.com 对应本地 6379 端口
client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def produce_message(stream_key, message_data):
"""
向指定流发送消息
:param stream_key: 流的名称
:param message_data: 消息内容,为字典类型
:return: 消息 ID
"""
# 使用 * 让 Redis 自动生成消息 ID
message_id = client.xadd(stream_key, message_data)
print(f"发送消息成功,流:{stream_key},消息ID:{message_id},内容:{message_data}")
return message_id
if __name__ == '__main__':
# 定义流名称
stream_name = 'order_stream'
# 发送3条订单消息
for i in range(3):
order_data = {
'order_id': f'ORD{i + 1}',
'amount': str((i + 1) * 100),
'create_time': '2024-05-20 10:00:00'
}
produce_message(stream_name, order_data)2. 消费者组初始化
在使用消费组前,需要先创建消费组,如果流不存在,可以通过 MKSTREAM 参数自动创建流:
def create_consumer_group(stream_key, group_name):
"""
创建消费者组
:param stream_key: 流的名称
:param group_name: 消费组名称
"""
try:
# 从流的开头开始消费,MKSTREAM 表示如果流不存在则自动创建
client.xgroup_create(stream_key, group_name, id='0', mkstream=True)
print(f"创建消费组成功,流:{stream_key},消费组:{group_name}")
except redis.exceptions.ResponseError as e:
# 消费组已存在时会抛出异常,此处忽略该异常
if 'BUSYGROUP' in str(e):
print(f"消费组已存在,流:{stream_key},消费组:{group_name}")
else:
raise e
if __name__ == '__main__':
stream_name = 'order_stream'
group_name = 'order_consumer_group'
create_consumer_group(stream_name, group_name)3. 消费者消费消息
消费者需要从所属的消费组中读取消息,处理完成后进行确认,示例代码如下:
def consume_message(stream_key, group_name, consumer_name):
"""
消费者消费消息
:param stream_key: 流的名称
:param group_name: 消费组名称
:param consumer_name: 消费者名称
"""
while True:
# 读取消息,'>' 表示读取未被该消费组消费过的消息,block=0 表示阻塞等待新消息
messages = client.xreadgroup(
group_name,
consumer_name,
{stream_key: '>'},
block=0,
count=1
)
for stream, msg_list in messages:
for msg_id, msg_data in msg_list:
print(f"消费者 {consumer_name} 收到消息,ID:{msg_id},内容:{msg_data}")
try:
# 此处编写实际的消息处理逻辑
# 处理完成后确认消息,将消息从待确认列表中移除
client.xack(stream_key, group_name, msg_id)
print(f"消息 {msg_id} 确认成功")
except Exception as e:
print(f"消息 {msg_id} 处理失败,原因:{e}")
# 处理失败可以不确认,消息会留在待确认列表中,后续可以重试
if __name__ == '__main__':
stream_name = 'order_stream'
group_name = 'order_consumer_group'
consumer_name = 'consumer_1'
# 启动消费者
consume_message(stream_name, group_name, consumer_name)待确认消息处理
如果消费者处理消息失败未确认,消息会留在待确认列表中,我们可以通过以下代码查看和重试待确认消息:
def handle_pending_messages(stream_key, group_name, consumer_name):
"""
处理待确认列表中的消息
:param stream_key: 流的名称
:param group_name: 消费组名称
:param consumer_name: 消费者名称
"""
# 查看待确认消息信息
pending_info = client.xpending(stream_key, group_name)
print(f"待确认消息总览:{pending_info}")
if pending_info['total'] == 0:
print("暂无待确认消息")
return
# 获取待确认列表中最早的10条消息
pending_msgs = client.xpending_range(
stream_key,
group_name,
min=pending_info['min'],
max=pending_info['max'],
count=10
)
for msg in pending_msgs:
msg_id = msg['message_id']
# 认领该消息,将其归属到当前消费者,并重置空闲时间
claimed_msgs = client.xclaim(
stream_key,
group_name,
consumer_name,
1000,
[msg_id]
)
for claimed_id, claimed_data in claimed_msgs:
print(f"认领到待处理消息,ID:{claimed_id},内容:{claimed_data}")
# 重新处理消息
try:
# 实际处理逻辑
client.xack(stream_key, group_name, claimed_id)
print(f"认领的消息 {claimed_id} 处理并确认成功")
except Exception as e:
print(f"认领的消息 {claimed_id} 处理失败,原因:{e}")
if __name__ == '__main__':
stream_name = 'order_stream'
group_name = 'order_consumer_group'
consumer_name = 'consumer_1'
handle_pending_messages(stream_name, group_name, consumer_name)注意事项
消息 ID 需要保证单调递增,自定义 ID 时需要遵循该规则,否则 Redis 会拒绝写入。
消费者组内的消费者名称需要唯一,避免消息重复消费。
合理设置阻塞读取的超时时间,避免消费者长时间占用连接。
对于重要消息,建议结合业务逻辑做好重试和死信处理,避免消息无限重试。