Skip to content

StreamingResponse with FastAPI return error ContextVar #435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
passacagllia opened this issue Apr 4, 2025 · 2 comments · Fixed by #540
Closed

StreamingResponse with FastAPI return error ContextVar #435

passacagllia opened this issue Apr 4, 2025 · 2 comments · Fixed by #540
Labels
bug Something isn't working

Comments

@passacagllia
Copy link

passacagllia commented Apr 4, 2025

Describe the bug

When using the stream endpoint in the normal_v2 router, the server throws an exception during the streaming process. The error occurs when processing the stream_events method in the Agent's Runner. The client receives partial responses before the connection is abruptly closed with the error: curl: (18) transfer closed with outstanding read data remaining.

Debug information

  • Agents SDK version: (e.g. v0.0.7)
  • Python version (e.g. Python 3.11)

Repro steps

Set up the normal_v2 router with the provided code.
Send a POST request to the /normal_v2/stream endpoint with the following payload:

{
    "chat_history": [{"role": "system", "content": "You are a helpful assistant."}],
    "user_query": "What is the weather today?"
}

Observe the server logs and client response.

Expected behavior

The server should stream the response without errors, and the client should receive the full response in a properly formatted streaming format (e.g., application/x-ndjson).

Server logs: The server throws the following exception:

ERROR:    Exception in ASGI application
+ Exception Group Traceback (most recent call last):
...lib/python3.12/site-packages/agents/tracing/scope.py", line 45, in reset_current_trace
    |     _current_trace.reset(token)
ValueError: <Token var=<ContextVar name='current_trace' default=None at 0x1462a79c0> at 0x169dfec80> was created in a different Context

Client response: The client receives partial responses:

{"event_type":"data","data":{"answer":" those"}}
{"event_type":"data","data":{"answer":"!"}}

Followed by the error:

curl: (18) transfer closed with outstanding read data remaining

Additional context

Code snippets
Router code:

import uuid
from fastapi import APIRouter
from agents import Agent, Runner
from app.core.chat_interface.base import ChatInterface
from app.core.models.human_input import NormalQueryInput
from app.core.services.openai.model import openai_model, openai_model_settings
from starlette.responses import StreamingResponse

router = APIRouter(prefix="/normal_v2", tags=["normal_v2"])

agent = Agent(
    name="Assistant",
    instructions="You are a helpful assistant.",
    model=openai_model,
    model_settings=openai_model_settings,
)

@router.post("/stream")
async def stream(query_input: NormalQueryInput):
    full_history = query_input.chat_history + [{"role": "user", "content": query_input.user_query}]
    result = Runner.run_streamed(agent, input=full_history)
    chat_interface = ChatInterface(result.stream_events(), trace_id=str(uuid.uuid4()))
    return StreamingResponse(chat_interface.process_stream(), media_type="application/x-ndjson")
ChatInterface code:

from enum import Enum
from typing import Optional, AsyncIterator
from openai.types.responses import ResponseTextDeltaEvent
from pydantic import BaseModel

class EventType(Enum):
    DATA = "data"
    METADATA = "metadata"

class EventData(BaseModel):
    answer: Optional[str] = None
    trace_id: Optional[str] = None

class EventStreamOutput(BaseModel):
    event_type: EventType
    data: EventData

class ChatInterface:
    def __init__(self, result: AsyncIterator, trace_id: Optional[str]):
        self.stream_result = result
        self.trace_id = trace_id

    async def process_stream(self):
        if self.trace_id:
            ted = EventStreamOutput(
                event_type=EventType.METADATA,
                data=EventData(trace_id=self.trace_id)
            )
            yield f"{ted.json(exclude_none=True)}\n\n"
        async for event in self.stream_result:
            if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
                eed = EventStreamOutput(
                    event_type=EventType.DATA,
                    data=EventData(answer=event.data.delta)
                )
                yield f"{eed.json(exclude_none=True)}\n\n"


@passacagllia passacagllia added the bug Something isn't working label Apr 4, 2025
@rm-openai
Copy link
Collaborator

rm-openai commented Apr 4, 2025

Looking into it. I haven't yet tested anything, but one thing that should probably work is to do the Runner.run_streamed(agent, input=full_history) inside ChatInterface. But I'll get back to you.

Edit: confirming that my suggestion does resolve the issue. So you can do that to workaround, while I investigate a better long term fix.

@passacagllia
Copy link
Author

Confirmed it worked. Thank you for your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants