fastapi 中断异步执行的程序
时间: 2025-07-19 15:16:07 浏览: 0
在 FastAPI 中,中断正在执行的异步任务是一个较为复杂的问题,因为 Python 的异步模型并不直接支持任务的强制中断。然而,可以通过一些机制来实现任务的取消或优雅地中止任务。
### 1. 使用 `asyncio` 的 `Task` 对象取消任务
FastAPI 基于 `asyncio` 实现异步特性[^4],因此可以使用 `asyncio.create_task()` 创建任务,并通过 `task.cancel()` 方法尝试取消任务。需要注意的是,任务的取消是协作式的,即任务本身需要能够响应取消请求并主动退出。
```python
from fastapi import FastAPI
import asyncio
app = FastAPI()
tasks = {}
@app.post("/start-task/{task_id}")
async def start_task(task_id: str):
task = asyncio.create_task(long_running_task(task_id))
tasks[task_id] = task
return {"message": f"Task {task_id} started"}
@app.post("/cancel-task/{task_id}")
async def cancel_task(task_id: str):
task = tasks.get(task_id)
if task:
task.cancel()
try:
await task # 确保任务完成或被取消
except asyncio.CancelledError:
return {"message": f"Task {task_id} cancelled"}
return {"message": f"Task {task_id} not found"}
async def long_running_task(task_id: str):
try:
for i in range(10):
await asyncio.sleep(1)
print(f"Task {task_id} is running step {i}")
return {"status": "completed"}
except asyncio.CancelledError:
print(f"Task {task_id} is being cancelled")
raise
```
### 2. 使用后台任务(Background Tasks)
FastAPI 提供了 `BackgroundTasks` 机制,允许在请求完成后执行一些清理或后续操作。虽然 `BackgroundTasks` 本身不支持中断,但可以结合 `asyncio` 的 `Task` 对象实现类似功能。
```python
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
tasks = {}
@app.post("/start-bg-task/{task_id}")
async def start_bg_task(task_id: str, background_tasks: BackgroundTasks):
task = asyncio.create_task(long_running_task(task_id))
tasks[task_id] = task
background_tasks.add_task(wait_for_task_completion, task_id, task)
return {"message": f"Background task {task_id} started"}
async def wait_for_task_completion(task_id: str, task: asyncio.Task):
try:
result = await task
print(f"Task {task_id} completed with result: {result}")
except asyncio.CancelledError:
print(f"Task {task_id} was cancelled")
@app.post("/cancel-bg-task/{task_id}")
async def cancel_bg_task(task_id: str):
task = tasks.get(task_id)
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
return {"message": f"Background task {task_id} cancelled"}
return {"message": f"Background task {task_id} not found"}
```
### 3. 使用 Celery 等任务队列进行任务管理
如果任务较为复杂且需要持久化或跨服务管理,可以考虑使用 Celery 等分布式任务队列。Celery 支持任务的取消和状态查询,适合处理长时间运行的任务。
```python
from celery import Celery
from fastapi import FastAPI
app = FastAPI()
celery_app = Celery("tasks", broker="redis://localhost:6379/0")
@celery_app.task(name="long_running_task")
def long_running_task(task_id: str):
for i in range(10):
time.sleep(1)
print(f"Task {task_id} is running step {i}")
return {"status": "completed"}
@app.post("/start-celery-task/{task_id}")
async def start_celery_task(task_id: str):
task = long_running_task.delay(task_id)
return {"task_id": task.id}
@app.post("/cancel-celery-task/{task_id}")
async def cancel_celery_task(task_id: str):
from celery.result import AsyncResult
async_result = AsyncResult(id=task_id, app=celery_app)
if async_result.state in ["PENDING", "STARTED"]:
async_result.revoke(terminate=True)
return {"message": f"Celery task {task_id} revoked"}
return {"message": f"Celery task {task_id} not found"}
```
### 4. 异步与同步混合任务的处理
在某些情况下,可能需要处理同步 I/O 操作。FastAPI 推荐使用异步路由函数,并在同步 I/O 操作中使用线程池进行封装,以避免阻塞主线程[^2]。
```python
import asyncio
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI
app = FastAPI()
executor = ThreadPoolExecutor()
def sync_io_operation():
time.sleep(5)
return "IO operation completed"
@app.get("/run-sync-io")
async def run_sync_io():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, sync_io_operation)
return {"result": result}
```
---
###
阅读全文
相关推荐


















