在分布式系统开发中,Kafka作为高性能的消息队列组件,被广泛用于解耦服务、削峰填谷、异步处理等场景。Python生态中提供了kafka-python库,能够便捷地实现Kafka的生产者和消费者功能,满足大部分业务场景的消息处理需求。
环境准备
首先需要在本地或服务器安装Kafka服务,同时安装Python的kafka-python库,执行以下命令即可完成库的安装:
pip install kafka-python
Kafka生产者实现
生产者负责向Kafka的指定主题发送消息,以下是发送字符串消息的基础实现代码:
from kafka import KafkaProducer
# 初始化生产者,指定Kafka服务地址
producer = KafkaProducer(
bootstrap_servers=['127.0.0.1:9092'],
# 消息序列化方式,将字符串转为字节
value_serializer=lambda v: v.encode('utf-8')
)
# 向test_topic主题发送消息
for i in range(5):
msg = f'测试消息_{i}'
# 发送消息,topic为目标主题,value为消息内容
future = producer.send('test_topic', value=msg)
# 获取发送结果,等待消息发送完成
record_metadata = future.get(timeout=10)
print(f'消息发送成功,主题:{record_metadata.topic},分区:{record_metadata.partition},偏移量:{record_metadata.offset}')
# 关闭生产者
producer.close()
生产者常用参数说明
- bootstrap_servers:Kafka集群的地址列表,格式为[host:port]
- value_serializer:消息内容的序列化函数,默认消息需要为字节类型
- acks:消息确认机制,0表示不需要确认,1表示leader确认,all表示所有副本确认
- retries:消息发送失败时的重试次数
Kafka消费者实现
消费者负责从Kafka主题中拉取消息进行处理,以下是基础消费者的实现代码:
from kafka import KafkaConsumer
# 初始化消费者,指定服务地址、消费的主题、消费者组
consumer = KafkaConsumer(
'test_topic',
bootstrap_servers=['127.0.0.1:9092'],
# 消费者组ID,相同组内的消费者会均分消息
group_id='test_group',
# 消息反序列化方式,将字节转为字符串
value_deserializer=lambda v: v.decode('utf-8'),
# 关闭自动提交偏移量,手动控制提交时机
enable_auto_commit=False
)
# 循环拉取消息
for msg in consumer:
print(f'接收到消息:主题:{msg.topic},分区:{msg.partition},偏移量:{msg.offset},内容:{msg.value}')
# 手动提交偏移量,避免消息重复消费
consumer.commit()
# 关闭消费者
consumer.close()
消费者核心概念说明
- group_id:消费者组标识,同一主题的消息会被同一组内的消费者均分,不同组的消费者会各自消费全量消息
- enable_auto_commit:是否自动提交偏移量,自动提交可能存在消息丢失或重复消费的问题,生产环境建议手动提交
- auto_offset_reset:当没有初始偏移量或偏移量不存在时的策略,earliest表示从最早消息开始消费,latest表示从最新消息开始消费
常见问题与注意事项
1. 主题不存在时,生产者发送消息默认会自动创建主题,但是自动创建的主题分区数和副本数可能不符合业务需求,建议提前手动创建主题。
2. 消费者消费消息时,如果处理消息的业务逻辑抛出异常,不要提交偏移量,避免消息丢失,可以在异常处理中记录日志,后续重试处理。
3. 如果发送的消息是复杂对象,可以先将对象序列化为JSON字符串,再发送,消费者端反序列化JSON即可,示例代码如下:
import json
from kafka import KafkaProducer, KafkaConsumer
# 发送JSON消息的生产者
producer = KafkaProducer(
bootstrap_servers=['127.0.0.1:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
producer.send('test_topic', value={'id': 1, 'name': '测试数据'})
producer.close()
# 消费JSON消息的消费者
consumer = KafkaConsumer(
'test_topic',
bootstrap_servers=['127.0.0.1:9092'],
group_id='test_group',
value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)
for msg in consumer:
print(f'接收到JSON消息:{msg.value}')
consumer.close()
PythonKafkakafka_python消息队列修改时间:2026-06-23 13:39:36