在FastAPI的WebSocket接口开发中,经常会出现ws.send_text("1")先于load_dataset("beans")执行的奇怪现象,很多刚接触异步编程的开发者对此感到困惑,这背后和FastAPI基于的异步事件循环机制密切相关。

问题复现场景
我们先看一段典型的触发该问题的代码,这是很多开发者会写出的WebSocket接口逻辑:
from fastapi import FastAPI, WebSocket
from datasets import load_dataset
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# 先发送初始消息
await websocket.send_text("1")
# 再加载数据集
dataset = load_dataset("beans")
await websocket.send_text(f"数据集加载完成,共{len(dataset)}条数据")按照代码的书写顺序,我们预期是先发送"1",再加载数据集,最后发送完成提示。但实际运行时,经常会出现ws.send_text("1")已经执行完毕,load_dataset还在运行中,甚至有时候发送"1"的操作会等到load_dataset执行一部分后才触发,这是为什么呢?
核心原因分析
1. 异步事件循环的调度规则
FastAPI基于Python的asyncio事件循环运行,所有的异步任务都需要交给事件循环调度执行。当代码执行到await websocket.send_text("1")时,这个操作会向事件循环注册一个发送任务,然后当前协程会暂停,让出执行权给事件循环。如果此时事件循环中没有其他待执行的任务,发送任务会立刻执行,看起来符合预期。
但如果load_dataset("beans")是一个同步阻塞操作,它不会主动让出事件循环的执行权,这时候就会出现问题:如果发送任务的IO等待时间稍长,或者事件循环在调度时优先处理了其他任务,就可能出现发送操作被阻塞的情况。不过更常见的原因是下面的第二点。
2. load_dataset的阻塞特性
很多开发者会忽略load_dataset默认是同步操作,它内部涉及网络请求、文件读取、数据解析等耗时步骤,这些步骤都是阻塞式的,不会主动触发await让出执行权。当代码执行到dataset = load_dataset("beans")时,如果前面的send_text操作还没有完全完成,事件循环会被这个阻塞操作卡住,看起来就像send_text在load_dataset之后才执行。
还有一种情况是,send_text本身是异步操作,需要等待WebSocket的写入缓冲区就绪,如果此时客户端还没有完全建立好接收状态,send_text的await会暂时挂起,事件循环就会先去执行后面的load_dataset,导致执行顺序和书写顺序不一致。
解决方案
1. 确保发送操作完全完成后再执行耗时任务
可以在send_text之后增加一个确认步骤,确保消息已经发送成功:
from fastapi import FastAPI, WebSocket
from datasets import load_dataset
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# 发送初始消息并等待完成
await websocket.send_text("1")
# 等待客户端确认收到,可选步骤
# confirm = await websocket.receive_text()
# 再执行耗时的数据集加载,将同步操作放到线程池中避免阻塞事件循环
from asyncio import get_event_loop
loop = get_event_loop()
dataset = await loop.run_in_executor(None, load_dataset, "beans")
await websocket.send_text(f"数据集加载完成,共{len(dataset)}条数据")2. 将阻塞操作转为异步执行
如果希望load_dataset不阻塞事件循环,可以使用asyncio.run_in_executor将同步的load_dataset放到线程池或者进程池中执行,这样事件循环可以在等待load_dataset完成的同时,正常处理send_text的回调。
from fastapi import FastAPI, WebSocket
from datasets import load_dataset
import asyncio
app = FastAPI()
async def async_load_dataset(dataset_name):
# 将同步的load_dataset放到默认线程池中执行
return await asyncio.get_event_loop().run_in_executor(None, load_dataset, dataset_name)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# 先发送消息,await确保发送完成
await websocket.send_text("1")
# 异步加载数据集,不会阻塞事件循环
dataset = await async_load_dataset("beans")
await websocket.send_text(f"数据集加载完成,共{len(dataset)}条数据")总结
ws.send_text先于load_dataset执行的问题,本质是FastAPI的异步事件循环调度和同步阻塞操作冲突导致的。只要明确异步操作的await语义,将耗时的同步操作转为异步执行,就能保证代码的执行顺序符合预期。在实际开发中,需要注意区分异步和同步操作,避免同步阻塞操作影响事件循环的正常调度。
FastAPIWebSocketws.send_textload_dataset异步执行修改时间:2026-06-06 05:57:23