fastapi websocket 流式输出
时间: 2025-06-01 20:58:38 浏览: 27
<think>好的,用户的问题是关于如何在FastAPI中实现WebSocket的流式输出功能,需要示例代码。首先,我需要回忆一下FastAPI的WebSocket相关文档。FastAPI支持WebSocket通信,通过WebSocketEndpoint或者直接的路由装饰器来处理。流式输出通常指的是通过WebSocket持续发送数据块,而不是一次性发送全部内容。接下来,用户提到了示例代码,所以需要提供一个简单但完整的例子。可能需要包括服务器端和客户端的代码片段。服务器端部分,使用FastAPI的WebSocket路由,接收连接,然后在一个循环中生成数据并发送。例如,模拟实时数据流或者处理LLM的逐词生成。然后,考虑到流式输出的常见应用场景,比如聊天应用或实时数据推送,代码应该展示如何分次发送消息。同时,需要注意WebSocket的生命周期管理,确保连接正确关闭,处理异常情况,比如客户端断开连接。
还要检查是否需要依赖项,比如websockets库,但FastAPI本身已经内置了对WebSocket的支持,可能不需要额外安装。不过,可能需要提醒用户安装uvicorn来运行服务器。
另外,用户提供的引用中有关于流式HTTP响应和SSE的内容,但这里的问题是关于WebSocket的,所以需要区分两者。WebSocket是双向通信,而SSE是单向的。需要确保示例代码正确使用WebSocket协议。然后,考虑代码的正确性。例如,在FastAPI中,WebSocket路由使用@websocket装饰器,处理函数接收WebSocket对象,需要先accept连接,然后在循环中接收和发送数据。可能需要使用asyncdef,因为WebSocket操作是异步的。可能的示例结构:1.导入FastAPI和WebSocket相关模块。
2.创建FastAPI应用实例。3.定义WebSocket路由处理函数。4.在函数内接受连接,进入循环,生成数据并发送。5.添加客户端代码,比如JavaScript的WebSocket连接示例。还需要处理可能的错误,比如在发送数据时捕获异常,确保连接关闭。此外,流式输出可能需要间隔时间,比如使用awaitasyncio.sleep来控制发送频率。
最后,生成相关问题,可能需要涉及性能优化、异常处理、并发控制等,参考用户之前的引用,比如并发控制防止服务器过载,可以作为一个相关问题。</think>在 FastAPI 中实现 WebSocket 流式输出功能,可以通过以下示例代码演示实时分块数据传输的实现过程:
### 服务端代码
```python
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
import asyncio
app = FastAPI()
# 简易测试页面
@app.get("/")
async def get():
return HTMLResponse("""
<html>
<body>
<script>
const ws = new WebSocket('ws://' + window.location.host + '/ws');
ws.onmessage = (event) => {
document.body.innerHTML += event.data + '<br>';
};
</script>
</body>
</html>
""")
# WebSocket 流式输出实现
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
# 模拟流式数据生成(例如LLM逐词生成)
for i in range(1, 6):
message = f"数据块 {i}/5"
await websocket.send_text(message)
await asyncio.sleep(1) # 模拟处理延迟
# 发送结束标识
await websocket.send_text("[STREAM_END]")
except Exception as e:
print(f"连接异常: {e}")
finally:
await websocket.close()
```
### 核心实现要点
1. **异步通信架构**
使用`async/await`语法处理并发请求,每个 WebSocket 连接独立运行在事件循环中[^3]
2. **分块传输机制**
通过`send_text()`方法实现数据分块发送,配合`asyncio.sleep()`模拟实时处理延迟
3. **生命周期管理**
`try/finally`代码块确保连接异常时能正确关闭,避免资源泄漏
4. **协议控制标识**
发送特殊标记`[STREAM_END]`告知客户端流式传输结束
### 进阶优化方向
```python
# 带背压控制的改进版
@app.websocket("/ws_advanced")
async def websocket_advanced(websocket: WebSocket):
await websocket.accept()
try:
while True:
# 接收客户端控制指令
data = await websocket.receive_text()
if data == "START_STREAM":
# 动态生成数据流
for chunk in generate_stream_data():
if await websocket.send_text(chunk):
await asyncio.sleep(0.1)
else:
break
await websocket.send_text("[STREAM_END]")
except Exception as e:
print(f"连接异常: {e}")
finally:
await websocket.close()
```
阅读全文
相关推荐


















