fastapi连接多个数据库
时间: 2025-06-16 08:24:07 浏览: 14
### FastAPI 连接和管理多个数据库的实现方法
在 FastAPI 中连接和管理多个数据库可以通过自定义中间件或依赖注入来实现。以下是一个完整的解决方案,包含多数据库配置、动态数据库会话管理和示例代码。
#### 1. 配置多个数据库连接
通过 SQLAlchemy 的 `create_engine` 方法为每个数据库创建独立的引擎,并使用 `sessionmaker` 创建对应的会话工厂[^2]。
```python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.pool import QueuePool
# 数据库URL配置
DATABASE_URL_1 = "mysql+pymysql://user:password@localhost:3306/db1"
DATABASE_URL_2 = "mysql+pymysql://user:password@localhost:3306/db2"
# 创建数据库引擎
engine_db1 = create_engine(
DATABASE_URL_1,
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_timeout=30,
pool_recycle=1800,
pool_pre_ping=True,
echo=False
)
engine_db2 = create_engine(
DATABASE_URL_2,
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_timeout=30,
pool_recycle=1800,
pool_pre_ping=True,
echo=False
)
# 创建会话工厂
SessionLocalDB1 = sessionmaker(autocommit=False, autoflush=False, bind=engine_db1)
SessionLocalDB2 = sessionmaker(autocommit=False, autoflush=False, bind=engine_db2)
# 声明基类
BaseDB1 = declarative_base()
BaseDB2 = declarative_base()
```
#### 2. 动态数据库会话管理
为了在不同的路由中动态选择数据库会话,可以创建一个通用的依赖注入函数 `get_db`,并通过参数指定需要使用的数据库[^1]。
```python
from fastapi import Depends, HTTPException, status
from typing import Generator
def get_db(db_name: str) -> Generator:
try:
if db_name == "db1":
db = SessionLocalDB1()
elif db_name == "db2":
db = SessionLocalDB2()
else:
raise ValueError("Invalid database name")
yield db
except Exception as e:
logger.error(f"Database error: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Database connection error")
finally:
db.close()
```
#### 3. 路由中的依赖注入
在不同的路由中,通过传递 `db_name` 参数来动态选择数据库会话。
```python
from fastapi import FastAPI, Depends
app = FastAPI()
@app.get("/data-from-db1/")
async def read_data_from_db1(db=Depends(lambda: get_db("db1"))):
# 示例查询
result = db.query(SomeModelDB1).all()
return result
@app.get("/data-from-db2/")
async def read_data_from_db2(db=Depends(lambda: get_db("db2"))):
# 示例查询
result = db.query(SomeModelDB2).all()
return result
```
#### 4. 自定义中间件(可选)
如果需要更复杂的动态数据库选择逻辑,可以创建自定义中间件,在请求处理前根据特定条件选择数据库[^1]。
```python
from fastapi import Request, Response
@app.middleware("http")
async def dynamic_db_selection(request: Request, call_next):
# 根据请求头或路径动态选择数据库
db_name = request.headers.get("X-Database", "db1")
request.state.db = get_db(db_name)
response = await call_next(request)
return response
```
---
### 注意事项
- 确保每个数据库的连接池配置合理,避免因连接耗尽导致性能问题。
- 使用 `pool_pre_ping=True` 可以自动检测并替换失效的连接。
- 在生产环境中,建议启用日志记录以便监控数据库连接状态。
---
阅读全文
相关推荐


















