TCP是面向字节流的传输协议,本身没有消息边界概念,在Python异步编程的高并发场景下,发送端多次发送的小数据包可能被合并成一个包接收,或者一个大数据包被拆分成多个部分接收,这就是粘包和拆包问题,需要开发者自行设计处理逻辑保证数据完整解析。

粘包拆包问题产生的原因
主要有三类常见原因:
- 发送端待发送数据小于套接字发送缓冲区大小,TCP会合并多个小数据包一起发送,产生粘包
- 接收端读取数据的速度慢于发送端发送速度,缓冲区中堆积多个数据包,读取时一次性拿到多个包内容,产生粘包
- 发送端单个数据包大小超过TCP最大报文段长度,会被拆分成多个片段传输,接收端需要多次读取才能拿到完整数据,产生拆包
异步TCP处理的核心思路
解决粘包拆包的核心是定义明确的数据边界规则,常见的方案有三种:
- 固定长度协议:每个数据包长度固定,接收端按固定长度读取即可
- 长度前缀协议:每个数据包开头添加固定字节的长度字段,先读长度再读对应内容
- 分隔符协议:用特殊字符作为数据包结束标记,接收端按分隔符拆分数据
其中长度前缀协议在异步场景下性能最优,适配性最强,下面基于asyncio实现该方案的处理逻辑。
完整实现示例
协议定义
我们定义简单协议:数据包前4个字节为无符号整数,表示后续数据体的字节长度,数据体为UTF-8编码的字符串内容。
异步服务端实现
服务端负责接收客户端连接,解析粘包拆包数据并返回响应:
import asyncio
import struct
# 定义协议解析类
class TcpProtocolParser:
def __init__(self):
# 缓冲区存储未处理的数据
self.buffer = b''
# 长度字段占4字节
self.header_size = 4
def feed_data(self, data):
"""将新接收的数据加入缓冲区"""
self.buffer += data
def get_packets(self):
"""从缓冲区中提取完整数据包"""
packets = []
while len(self.buffer) >= self.header_size:
# 解析长度字段
body_len = struct.unpack('!I', self.buffer[:self.header_size])[0]
total_len = self.header_size + body_len
# 如果缓冲区数据不足一个完整包,等待后续数据
if len(self.buffer) < total_len:
break
# 提取数据体
body = self.buffer[self.header_size:total_len]
packets.append(body.decode('utf-8'))
# 移除已处理的数据
self.buffer = self.buffer[total_len:]
return packets
async def handle_client(reader, writer):
"""处理单个客户端连接"""
parser = TcpProtocolParser()
addr = writer.get_extra_info('peername')
print(f"客户端连接: {addr}")
try:
while True:
# 异步读取数据,每次最多读1024字节
data = await reader.read(1024)
if not data:
print(f"客户端断开连接: {addr}")
break
# 将数据喂给解析器
parser.feed_data(data)
# 获取所有完整数据包
packets = parser.get_packets()
for packet in packets:
print(f"收到客户端 {addr} 的数据: {packet}")
# 构造响应,同样添加4字节长度头
response_body = f"已收到你的消息: {packet}"
response_bytes = response_body.encode('utf-8')
response_len = len(response_bytes)
response_header = struct.pack('!I', response_len)
writer.write(response_header + response_bytes)
await writer.drain()
except Exception as e:
print(f"处理客户端 {addr} 时发生错误: {e}")
finally:
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(handle_client, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f"服务端启动,监听地址: {addr}")
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main())
异步客户端实现
客户端负责发送数据,同时处理服务端返回数据的粘包拆包:
import asyncio
import struct
class TcpProtocolParser:
def __init__(self):
self.buffer = b''
self.header_size = 4
def feed_data(self, data):
self.buffer += data
def get_packets(self):
packets = []
while len(self.buffer) >= self.header_size:
body_len = struct.unpack('!I', self.buffer[:self.header_size])[0]
total_len = self.header_size + body_len
if len(self.buffer) < total_len:
break
body = self.buffer[self.header_size:total_len]
packets.append(body.decode('utf-8'))
self.buffer = self.buffer[total_len:]
return packets
async def tcp_client():
reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
# 发送三条测试数据,模拟粘包场景
messages = ["第一条消息", "第二条消息", "第三条消息"]
for msg in messages:
body = msg.encode('utf-8')
header = struct.pack('!I', len(body))
# 一次性发送多个包,模拟粘包
writer.write(header + body)
await writer.drain()
# 接收服务端响应
parser = TcpProtocolParser()
try:
while True:
data = await reader.read(1024)
if not data:
break
parser.feed_data(data)
packets = parser.get_packets()
for packet in packets:
print(f"收到服务端响应: {packet}")
except Exception as e:
print(f"客户端发生错误: {e}")
finally:
writer.close()
await writer.wait_closed()
if __name__ == '__main__':
asyncio.run(tcp_client())
优化建议
实际生产环境中可以针对场景做进一步优化:
- 缓冲区大小可以根据业务数据包平均大小调整,避免频繁内存拷贝
- 如果数据体是二进制内容,可以扩展协议支持校验和字段,保证数据完整性
- 高并发场景下可以将解析逻辑做成协程池任务,避免单个连接解析耗时影响其他连接
- 异常处理中可以增加重连机制,提升程序健壮性
常见问题说明
很多开发者会疑惑为什么不能直接用reader.readline()处理,这是因为readline()只适配换行符分隔的场景,而且异步读取时如果数据包中没有换行符,会一直阻塞等待,无法处理自定义协议的拆包问题。另外struct模块的格式字符串!I表示网络字节序的无符号4字节整数,保证不同架构的设备之间传输长度字段不会出现解析错误。