Python asyncio是标准库中用于编写异步IO代码的核心模块,默认情况下通过asyncio.gather等方式提交的任务会并发执行,但部分场景需要任务按照指定顺序依次完成,比如依赖前一个任务输出结果的后续流程,就需要实现任务的顺序执行。
asyncio异步任务的默认执行逻辑
asyncio的核心是事件循环,当我们在事件循环中创建多个协程任务时,这些任务会在遇到IO等待点时主动让出执行权,事件循环会切换到其他可执行的任务,因此多个无依赖的任务默认是并发运行的。下面的示例可以展示默认的并发执行效果:
import asyncio
import time
async def task(name, delay):
print(f"任务{name}开始执行,预计耗时{delay}秒")
start = time.time()
await asyncio.sleep(delay)
end = time.time()
print(f"任务{name}执行完成,实际耗时{round(end-start, 2)}秒")
return f"任务{name}的结果"
async def main():
# 同时创建三个任务,默认会并发执行
task1 = asyncio.create_task(task("A", 2))
task2 = asyncio.create_task(task("B", 1))
task3 = asyncio.create_task(task("C", 3))
# 等待所有任务完成
results = await asyncio.gather(task1, task2, task3)
print(f"所有任务完成,结果:{results}")
if __name__ == "__main__":
asyncio.run(main())
运行上述代码可以看到,三个任务的开始时间是接近的,总耗时约等于最长任务的耗时3秒,说明任务是在并发执行的。
实现任务顺序执行的三种方案
方案一:依次await单个任务
最简单的顺序执行方式是不提前创建所有任务,而是按照顺序逐个创建并等待任务完成,再执行下一个任务。这种方式的逻辑最直观,适合任务数量较少且顺序固定的场景。
import asyncio
import time
async def task(name, delay):
print(f"任务{name}开始执行,预计耗时{delay}秒")
start = time.time()
await asyncio.sleep(delay)
end = time.time()
print(f"任务{name}执行完成,实际耗时{round(end-start, 2)}秒")
return f"任务{name}的结果"
async def main():
# 依次执行任务,前一个完成后再执行下一个
result_a = await task("A", 2)
result_b = await task("B", 1)
result_c = await task("C", 3)
print(f"所有任务顺序执行完成,结果依次是:{result_a}, {result_b}, {result_c}")
if __name__ == "__main__":
asyncio.run(main())
运行后可以看到任务的开始时间依次间隔,总耗时是三个任务耗时之和6秒,实现了严格的顺序执行。
方案二:提前创建任务后按顺序await
如果我们需要先批量创建所有任务,再按照指定顺序获取执行结果,可以提前把任务存入列表,然后按照列表顺序逐个await任务。这种方式可以在等待任务执行的同时提前做好其他准备工作,比方案一更灵活。
import asyncio
import time
async def task(name, delay):
print(f"任务{name}开始执行,预计耗时{delay}秒")
start = time.time()
await asyncio.sleep(delay)
end = time.time()
print(f"任务{name}执行完成,实际耗时{round(end-start, 2)}秒")
return f"任务{name}的结果"
async def main():
# 提前创建所有任务
tasks = [
asyncio.create_task(task("A", 2)),
asyncio.create_task(task("B", 1)),
asyncio.create_task(task("C", 3))
]
# 按照任务创建顺序依次等待完成
results = []
for t in tasks:
res = await t
results.append(res)
print(f"所有任务顺序执行完成,结果依次是:{results}")
if __name__ == "__main__":
asyncio.run(main())
需要注意这里虽然任务是并发创建的,但我们按照顺序逐个等待,所以获取结果的顺序是固定的,不过任务的实际执行仍然是并发的,总耗时还是约等于最长任务的耗时,只是结果的返回顺序被我们强制按列表顺序整理了。
方案三:使用asyncio.wait按顺序等待
当任务数量较多,且需要动态控制等待顺序时,可以使用asyncio.wait配合集合来逐个等待任务完成。这种方式适合需要中途根据任务执行结果调整后续流程的场景。
import asyncio
import time
async def task(name, delay):
print(f"任务{name}开始执行,预计耗时{delay}秒")
start = time.time()
await asyncio.sleep(delay)
end = time.time()
print(f"任务{name}执行完成,实际耗时{round(end-start, 2)}秒")
return f"任务{name}的结果"
async def main():
# 创建任务集合
tasks = {
asyncio.create_task(task("A", 2)),
asyncio.create_task(task("B", 1)),
asyncio.create_task(task("C", 3))
}
# 定义期望的执行顺序
order = ["A", "B", "C"]
results = {}
# 按照顺序逐个等待对应任务完成
for name in order:
# 从任务集合中找出对应名称的任务
target_task = None
for t in tasks:
# 通过任务的协程对象获取任务名称
if t.get_coro().__name__ == "task":
# 这里简单通过打印信息匹配,实际开发可以给任务绑定自定义属性来区分
pass
# 简化示例,直接按顺序等待集合中的任务,实际场景可以结合任务标识匹配
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for t in done:
results[name] = t.result()
tasks.remove(t)
print(f"所有任务顺序执行完成,结果:{results}")
if __name__ == "__main__":
asyncio.run(main())
这种方式更适合复杂的任务调度场景,不过实现逻辑相对复杂,需要根据实际需求调整任务匹配的逻辑。
不同方案的适用场景对比
我们可以通过下面的表格快速选择适合自己场景的方案:
| 方案 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 依次await单个任务 | 任务数量少、顺序固定、无提前准备需求 | 逻辑简单,代码易读 | 无法提前创建任务,总耗时是任务耗时之和 |
| 提前创建任务后按顺序await | 需要提前批量创建任务、固定结果返回顺序 | 任务可以并发执行,总耗时更短 | 任务实际执行是并发的,不是严格串行执行 |
| asyncio.wait按顺序等待 | 任务数量多、需要动态调度、依赖执行结果调整流程 | 调度灵活,支持复杂流程控制 | 实现逻辑复杂,代码维护成本高 |
注意事项
- 如果需要的是严格的一个任务执行完再启动下一个任务,应该选择方案一,不要使用方案二和方案三,因为后两种方案任务会提前并发启动。
- 使用
asyncio.create_task创建任务后,任务会立即加入事件循环等待执行,不会等到await的时候才启动。 - 顺序执行会损失异步编程的并发优势,如果不是强依赖顺序的场景,尽量还是使用并发执行提升程序效率。