在Python的asyncio异步编程场景中,asyncio.create_task()用于快速创建并调度一个协程任务,让它可以和当前协程并发执行。如果创建任务后没有对其进行await操作,就会出现任务泄漏的情况,这会引发一系列潜在问题。

未await的任务是否还会执行
首先需要明确的是,即使没有对asyncio.create_task()创建的任务进行await,只要事件循环还在运行,这个任务依然会被正常调度执行。因为create_task()调用后就会把任务加入到事件循环的任务队列中,事件循环会按顺序执行队列里的任务,和是否被await没有直接关系。
我们可以通过下面的示例代码验证这个现象:
import asyncio
async def background_task():
print("后台任务开始执行")
await asyncio.sleep(1)
print("后台任务执行完成")
async def main():
# 创建任务但不await
asyncio.create_task(background_task())
print("主协程执行中")
await asyncio.sleep(2)
print("主协程执行完成")
if __name__ == "__main__":
asyncio.run(main())
运行上面的代码,输出结果如下:
主协程执行中 后台任务开始执行 后台任务执行完成 主协程执行完成
可以看到,即使没有await background_task对应的任务对象,后台任务依然正常完成了执行。
未await的任务会引发什么问题
1. 任务异常无法被捕获
如果未await的任务执行过程中抛出了异常,这个异常不会被主协程捕获,也不会直接导致程序崩溃,而是会被事件循环静默忽略,只会在日志中输出警告信息。这会导致开发者很难发现任务执行过程中的错误,给问题排查带来很大困难。
示例代码如下:
import asyncio
async def error_task():
print("错误任务开始执行")
await asyncio.sleep(0.5)
raise ValueError("任务执行出错")
async def main():
asyncio.create_task(error_task())
await asyncio.sleep(1)
print("主协程结束")
if __name__ == "__main__":
asyncio.run(main())
运行后会看到事件循环输出的警告,但是程序不会崩溃,主协程会正常结束。
2. 任务对象被垃圾回收引发警告
如果创建的任务没有被任何变量引用,也没有被await,当任务对象被垃圾回收时,asyncio会抛出RuntimeWarning警告,提示存在未等待的任务。这是因为任务可能还没有执行完成就被回收,会导致不可预期的行为。
示例代码如下:
import asyncio
import gc
async def temp_task():
await asyncio.sleep(0.1)
print("临时任务执行完成")
async def main():
# 任务没有被保存引用
asyncio.create_task(temp_task())
await asyncio.sleep(0.05)
# 手动触发垃圾回收
gc.collect()
await asyncio.sleep(0.2)
if __name__ == "__main__":
asyncio.run(main())
运行后会出现类似RuntimeWarning: coroutine 'temp_task' was never awaited的警告信息。
3. 资源无法及时释放
如果未await的任务中持有文件句柄、网络连接、数据库连接等资源,当任务没有被正确等待结束时,这些资源可能无法及时释放,长期运行会导致资源耗尽,引发程序性能下降甚至崩溃。
如何避免任务泄漏
1. 保存任务引用并适时await
创建任务后,将任务对象保存到列表中,在合适的时机统一await所有任务,确保任务完成并且异常可以被处理。
import asyncio
async def task_func(n):
print(f"任务{n}开始")
await asyncio.sleep(1)
print(f"任务{n}完成")
async def main():
tasks = []
for i in range(3):
task = asyncio.create_task(task_func(i))
tasks.append(task)
# 等待所有任务完成
await asyncio.gather(*tasks)
print("所有任务执行完成")
if __name__ == "__main__":
asyncio.run(main())
2. 使用asyncio.TaskGroup管理任务
Python 3.11及以上版本引入了asyncio.TaskGroup,它可以自动管理任务的生命周期,确保所有任务都被正确等待,任务出现异常时也会统一抛出,避免任务泄漏。
import asyncio
async def task_func(n):
print(f"任务{n}开始")
await asyncio.sleep(1)
print(f"任务{n}完成")
async def main():
async with asyncio.TaskGroup() as tg:
for i in range(3):
tg.create_task(task_func(i))
print("所有任务执行完成")
if __name__ == "__main__":
asyncio.run(main())
3. 对不需要等待结果的任务添加回调
如果有些后台任务不需要等待结果,也不需要捕获异常,可以给任务添加回调,在回调中处理可能的异常,避免警告输出。
import asyncio
async def background_task():
await asyncio.sleep(1)
raise ValueError("后台任务出错")
def handle_task_result(task):
try:
task.result()
except Exception as e:
print(f"任务执行出错: {e}")
async def main():
task = asyncio.create_task(background_task())
task.add_done_callback(handle_task_result)
await asyncio.sleep(2)
if __name__ == "__main__":
asyncio.run(main())
总结
asyncio.create_task()创建的任务即使没有被await也会执行,但是会带来异常无法捕获、垃圾回收警告、资源无法及时释放等问题。实际开发中,我们应该尽量保存任务引用,使用asyncio.gather或者TaskGroup来管理任务,避免任务泄漏,保证异步程序的稳定性和可维护性。
asynciocreate_task任务泄漏Python异步修改时间:2026-06-25 10:27:30