Python FastAPI作为高性能的异步Web框架,和SQLAlchemy的异步模式结合可以大幅提升数据库操作的效率,避免接口请求时因数据库IO阻塞导致性能下降。下面我们将一步步实现两者的异步集成。
环境准备
首先需要安装对应的依赖包,FastAPI本身需要uvicorn作为服务器,同时需要安装支持异步的SQLAlchemy和对应的数据库驱动,这里以PostgreSQL为例,驱动使用asyncpg:
pip install fastapi uvicorn sqlalchemy asyncpg
配置异步SQLAlchemy引擎与会话
异步连接的核心是创建异步引擎和对应的异步会话工厂,和传统同步SQLAlchemy的配置略有不同,需要注意使用异步的驱动和对应的创建方法。
定义数据库连接配置
先定义数据库的连接参数,包括地址、端口、数据库名、用户名和密码:
# 数据库连接配置
DB_HOST = "127.0.0.1"
DB_PORT = 5432
DB_USER = "postgres"
DB_PASSWORD = "123456"
DB_NAME = "test_db"
# 异步数据库连接字符串,使用asyncpg驱动
DATABASE_URL = f"postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
创建异步引擎和会话
使用sqlalchemy.ext.asyncio模块下的create_async_engine和async_sessionmaker创建异步引擎和会话工厂:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
# 创建异步引擎,echo=True可以打印执行的SQL语句,调试时使用
async_engine = create_async_engine(DATABASE_URL, echo=True)
# 创建异步会话工厂,后续通过这个工厂生成会话实例
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
autocommit=False,
autoflush=False,
expire_on_commit=False
)
定义数据模型
使用SQLAlchemy的声明式基类定义数据表对应的模型,需要注意使用异步版本的基类:
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
# 声明式基类
class Base(DeclarativeBase):
pass
# 定义用户表模型
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(nullable=False)
age: Mapped[int] = mapped_column(nullable=False)
初始化数据库表
需要在项目启动时创建对应的数据库表,这里可以通过FastAPI的启动事件实现:
from fastapi import FastAPI
app = FastAPI()
# 应用启动事件,初始化数据库表
@app.on_event("startup")
async def init_db():
async with async_engine.begin() as conn:
# 创建所有定义的表,如果表已存在不会重复创建
await conn.run_sync(Base.metadata.create_all)
实现数据库会话依赖
FastAPI推荐使用依赖注入的方式管理数据库会话,确保每个请求都有独立的会话,并且在请求结束后自动关闭会话:
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
# 依赖函数,生成数据库会话
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
yield session
编写接口调用数据库
接下来编写具体的接口,通过依赖注入获取数据库会话,执行异步的数据库操作,比如新增用户和查询用户列表:
新增用户接口
from fastapi import APIRouter
user_router = APIRouter(prefix="/users", tags=["用户管理"])
@user_router.post("/")
async def create_user(name: str, age: int, db: AsyncSession = Depends(get_db)):
# 创建用户实例
new_user = User(name=name, age=age)
# 添加到会话
db.add(new_user)
# 异步提交事务
await db.commit()
# 刷新实例,获取数据库生成的id
await db.refresh(new_user)
return {"code": 0, "msg": "创建成功", "data": {"id": new_user.id, "name": new_user.name, "age": new_user.age}}
查询用户列表接口
@user_router.get("/")
async def get_user_list(db: AsyncSession = Depends(get_db)):
# 异步执行查询语句
result = await db.execute(User.__table__.select())
# 获取所有结果
users = result.fetchall()
# 转换为字典列表返回
user_list = [{"id": u.id, "name": u.name, "age": u.age} for u in users]
return {"code": 0, "msg": "查询成功", "data": user_list}
注册路由并启动服务
最后将用户路由注册到FastAPI应用中,就可以启动服务测试接口了:
# 注册路由
app.include_router(user_router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
常见问题说明
- 如果使用的是MySQL数据库,驱动需要换成aiomysql,连接字符串格式为
mysql+aiomysql://用户:密码@地址:端口/数据库名 - 异步操作数据库时,所有的数据库操作都需要加
await关键字,否则会报协程未执行的错误 - 会话的
expire_on_commit参数设置为False,可以避免提交后访问实例属性时再次查询数据库
FastAPISQLAlchemy异步连接Python修改时间:2026-06-29 01:03:44