Python异步迭代器是支持异步遍历的可迭代对象,它可以在迭代过程中执行异步操作,避免阻塞事件循环,在处理异步数据流、异步分页请求等场景中有重要作用。

异步迭代器的核心概念
异步迭代器和普通迭代器类似,都需要实现特定的魔术方法,但异步迭代器的方法都是异步的,需要配合async和await语法使用。其核心是两个魔术方法:
__aiter__:异步迭代器必须实现的方法,返回异步迭代器对象本身,该方法需要定义为异步方法。__anext__:异步迭代器必须实现的方法,返回下一个异步结果,如果没有更多元素则抛出StopAsyncIteration异常,该方法也需要定义为异步方法。
手动实现异步迭代器
我们可以通过自定义类来实现一个简单的异步迭代器,比如实现一个每隔1秒返回一个数的异步迭代器,总共返回3个数:
import asyncio
class AsyncCounter:
def __init__(self, start=0, end=3):
self.current = start
self.end = end
def __aiter__(self):
# 返回异步迭代器本身
return self
async def __anext__(self):
if self.current >= self.end:
# 没有更多元素时抛出StopAsyncIteration异常
raise StopAsyncIteration
# 模拟异步操作,比如等待IO
await asyncio.sleep(1)
self.current += 1
return self.current - 1
async def main():
# 使用async for遍历异步迭代器
async for num in AsyncCounter():
print(num)
if __name__ == "__main__":
asyncio.run(main())
运行上述代码,会每隔1秒打印一个数字,依次输出0、1、2,和预期效果一致。
使用async for遍历异步迭代器
遍历异步迭代器不能直接使用普通的for循环,必须使用async for语法,async for会自动调用异步迭代器的__aiter__和__anext__方法,并且自动处理StopAsyncIteration异常,使用方式和普通for循环类似,只是需要放在异步函数内部。
异步迭代器的常见应用场景
异步分页数据获取
在调用异步API获取分页数据时,异步迭代器可以很好地封装分页逻辑,让调用方只需要用async for就能遍历所有页的数据:
import asyncio
import aiohttp
class AsyncPageFetcher:
def __init__(self, base_url, page_size=10):
self.base_url = base_url
self.page_size = page_size
self.current_page = 1
self.has_more = True
def __aiter__(self):
return self
async def __anext__(self):
if not self.has_more:
raise StopAsyncIteration
# 模拟异步请求获取当前页数据
await asyncio.sleep(0.5)
# 假设这里是请求返回的数据,实际场景用aiohttp发送请求
data = [f"item_{self.current_page}_{i}" for i in range(self.page_size)]
self.current_page += 1
# 假设第3页之后没有更多数据
if self.current_page > 3:
self.has_more = False
return data
async def fetch_all_data():
fetcher = AsyncPageFetcher("https://ipipp.com/api/list")
async for page_data in fetcher:
print(f"获取到一页数据:{page_data}")
if __name__ == "__main__":
asyncio.run(fetch_all_data())
异步数据流处理
处理异步生成的数据流时,异步迭代器可以逐条处理数据,不需要等待所有数据全部生成后再处理,提升处理效率:
import asyncio
class AsyncDataStream:
def __init__(self, data_source):
self.data_source = data_source
self.index = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.index >= len(self.data_source):
raise StopAsyncIteration
# 模拟异步获取单条数据
await asyncio.sleep(0.3)
item = self.data_source[self.index]
self.index += 1
return item
async def process_stream():
stream = AsyncDataStream([10, 20, 30, 40, 50])
async for data in stream:
# 处理单条数据
print(f"处理数据:{data * 2}")
if __name__ == "__main__":
asyncio.run(process_stream())
注意事项
- 异步迭代器的
__aiter__和__anext__方法都必须是异步方法,否则会抛出类型错误。 - 遍历异步迭代器的
async for语法只能用在异步函数内部,不能在同步函数中使用。 - 如果异步迭代器需要释放资源,可以实现
__aenter__和__aexit__方法,让它支持异步上下文管理器,方便资源回收。
Python异步迭代器async_await__aiter____anext__修改时间:2026-06-15 23:12:31