Python的asyncio是标准库中用于编写异步IO代码的模块,能够在单线程内通过事件循环实现并发操作,特别适合处理大量网络请求场景,避免同步请求时线程阻塞带来的性能损耗。搭配aiohttp这类异步HTTP客户端库,可以快速实现高并发的网络请求逻辑。
asyncio核心概念
要理解asyncio实现并发请求的逻辑,首先需要掌握几个核心概念:
- 事件循环:asyncio的运行核心,负责调度和执行所有的异步任务,监听IO事件并触发对应的回调。
- 协程:通过async def定义的函数,是异步执行的基本单元,调用协程不会立即执行,而是返回一个协程对象。
- 任务:将协程包装成任务后,会被事件循环调度执行,任务可以跟踪协程的执行状态,也能获取执行结果。
- await:用于挂起当前协程的执行,等待后面的可等待对象(协程、任务、Future)完成后再继续往下执行。
基础并发请求实现
首先需要安装aiohttp库,它是基于asyncio的异步HTTP客户端,支持异步发送GET、POST等请求。安装命令如下:
pip install aiohttp
下面是一个简单的并发请求示例,同时请求三个不同的接口并获取结果:
import asyncio
import aiohttp
# 定义单个异步请求协程
async def fetch_url(session, url):
# 发送GET请求,await挂起等待响应
async with session.get(url) as response:
# 等待响应内容读取完成
content = await response.text()
return {"url": url, "status": response.status, "content_length": len(content)}
# 定义主协程,管理所有请求任务
async def main():
# 待请求的URL列表
urls = [
"http://ipipp.com/api/test1",
"http://ipipp.com/api/test2",
"http://ipipp.com/api/test3"
]
# 创建异步HTTP会话
async with aiohttp.ClientSession() as session:
# 创建所有请求任务
tasks = [asyncio.create_task(fetch_url(session, url)) for url in urls]
# 等待所有任务完成,获取结果
results = await asyncio.gather(*tasks)
# 遍历输出结果
for res in results:
print(f"URL: {res['url']}, 状态码: {res['status']}, 内容长度: {res['content_length']}")
# 运行事件循环
if __name__ == "__main__":
asyncio.run(main())
上述代码中,asyncio.gather会将多个任务同时提交到事件循环,所有请求会并发执行,而不是顺序等待每个请求完成,大幅提升了执行效率。
并发请求的控制与优化
限制并发数量
如果同时发起的请求数量过多,可能会给目标服务器带来压力,也可能触发对方的反爬限制,此时可以使用信号量控制并发数:
import asyncio
import aiohttp
# 定义最大并发数为3
MAX_CONCURRENCY = 3
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
async def fetch_url(session, url):
# 获取信号量,超过并发数时当前协程会挂起等待
async with semaphore:
async with session.get(url) as response:
content = await response.text()
return {"url": url, "status": response.status}
async def main():
urls = [f"http://ipipp.com/api/test?id={i}" for i in range(10)]
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(fetch_url(session, url)) for url in urls]
results = await asyncio.gather(*tasks)
print(f"共完成{len(results)}个请求")
if __name__ == "__main__":
asyncio.run(main())
异常处理
网络请求过程中可能会出现超时、连接失败等异常,需要为请求逻辑添加异常处理,避免单个请求失败导致整个程序崩溃:
import asyncio
import aiohttp
async def fetch_url(session, url):
try:
# 设置请求超时时间为5秒
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as response:
content = await response.text()
return {"url": url, "status": response.status, "success": True}
except asyncio.TimeoutError:
return {"url": url, "status": None, "success": False, "error": "请求超时"}
except Exception as e:
return {"url": url, "status": None, "success": False, "error": str(e)}
async def main():
urls = [
"http://ipipp.com/api/normal",
"http://ipipp.com/api/timeout", # 模拟超时接口
"http://invalid.url" # 模拟无效地址
]
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(fetch_url(session, url)) for url in urls]
results = await asyncio.gather(*tasks)
for res in results:
if res["success"]:
print(f"{res['url']} 请求成功,状态码: {res['status']}")
else:
print(f"{res['url']} 请求失败,原因: {res['error']}")
if __name__ == "__main__":
asyncio.run(main())
常见问题说明
1. 不要在异步代码中使用同步阻塞操作,比如time.sleep(),会阻塞整个事件循环,应该使用asyncio.sleep()替代。
2. asyncio.run()只能调用一次,多次调用需要放在不同的程序入口中,或者在已有的事件循环中嵌套运行。
3. 如果需要和同步代码结合使用,可以使用asyncio.run_in_executor将同步操作放到线程池或进程池中执行,避免阻塞事件循环。
注意:异步请求仅适用于IO密集型场景,如果是CPU密集型任务,asyncio无法提升执行效率,反而可能因为单线程调度带来额外开销。