Skip to content

Start and finish streaming trace in impl metod #540

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,5 @@ cython_debug/
.ruff_cache/

# PyPI configuration file
.pypirc
.pypirc
.aider*
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ dev = [
"graphviz",
"mkdocs-static-i18n>=1.3.0",
"eval-type-backport>=0.2.2",
"fastapi >= 0.110.0, <1",
]

[tool.uv.workspace]
Expand Down
5 changes: 1 addition & 4 deletions src/agents/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class RunResultStreaming(RunResultBase):

_current_agent_output_schema: AgentOutputSchema | None = field(repr=False)

_trace: Trace | None = field(repr=False)
trace: Trace | None = field(repr=False)

is_complete: bool = False
"""Whether the agent has finished running."""
Expand Down Expand Up @@ -185,9 +185,6 @@ async def stream_events(self) -> AsyncIterator[StreamEvent]:
yield item
self._event_queue.task_done()

if self._trace:
self._trace.finish(reset_current=True)

self._cleanup_tasks()

if self._stored_exception:
Expand Down
11 changes: 6 additions & 5 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,6 @@ def run_streamed(
disabled=run_config.tracing_disabled,
)
)
# Need to start the trace here, because the current trace contextvar is captured at
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong but won't this break the following use case:

    result = Runner.run_streamed(
        agent, "Write ten haikus about recursion in programming."
    )

    i = 0
    async for event in result.stream_events():
        with custom_span("Processing event " + str(i)):
            print(event)
            i += 1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, you're right. Talked on slack, the fix for this would be to manually attach to the trace via:

  async for event in result.stream_events():
        with custom_span("Processing event " + str(i), parent=result.trace):
            print(event)
            i += 1

# asyncio.create_task time
if new_trace:
new_trace.start(mark_as_current=True)

output_schema = cls._get_output_schema(starting_agent)
context_wrapper: RunContextWrapper[TContext] = RunContextWrapper(
Expand All @@ -426,7 +422,7 @@ def run_streamed(
input_guardrail_results=[],
output_guardrail_results=[],
_current_agent_output_schema=output_schema,
_trace=new_trace,
trace=new_trace,
)

# Kick off the actual agent loop in the background and return the streamed result object.
Expand Down Expand Up @@ -499,6 +495,9 @@ async def _run_streamed_impl(
run_config: RunConfig,
previous_response_id: str | None,
):
if streamed_result.trace:
streamed_result.trace.start(mark_as_current=True)

current_span: Span[AgentSpanData] | None = None
current_agent = starting_agent
current_turn = 0
Expand Down Expand Up @@ -625,6 +624,8 @@ async def _run_streamed_impl(
finally:
if current_span:
current_span.finish(reset_current=True)
if streamed_result.trace:
streamed_result.trace.finish(reset_current=True)

@classmethod
async def _run_single_turn_streamed(
Expand Down
Empty file added tests/fastapi/__init__.py
Empty file.
30 changes: 30 additions & 0 deletions tests/fastapi/streaming_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from collections.abc import AsyncIterator

from fastapi import FastAPI
from starlette.responses import StreamingResponse

from agents import Agent, Runner, RunResultStreaming

agent = Agent(
name="Assistant",
instructions="You are a helpful assistant.",
)


app = FastAPI()


@app.post("/stream")
async def stream():
result = Runner.run_streamed(agent, input="Tell me a joke")
stream_handler = StreamHandler(result)
return StreamingResponse(stream_handler.stream_events(), media_type="application/x-ndjson")


class StreamHandler:
def __init__(self, result: RunResultStreaming):
self.result = result

async def stream_events(self) -> AsyncIterator[str]:
async for event in self.result.stream_events():
yield f"{event.type}\n\n"
29 changes: 29 additions & 0 deletions tests/fastapi/test_streaming_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import pytest
from httpx import ASGITransport, AsyncClient
from inline_snapshot import snapshot

from ..fake_model import FakeModel
from ..test_responses import get_text_message
from .streaming_app import agent, app


@pytest.mark.asyncio
async def test_streaming_context():
"""This ensures that FastAPI streaming works. The context for this test is that the Runner
method was called in one async context, and the streaming was ended in another context,
leading to a tracing error because the context was closed in the wrong context. This test
ensures that this actually works.
"""
model = FakeModel()
agent.model = model
model.set_next_output([get_text_message("done")])

transport = ASGITransport(app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
async with ac.stream("POST", "/stream") as r:
assert r.status_code == 200
body = (await r.aread()).decode("utf-8")
lines = [line for line in body.splitlines() if line]
assert lines == snapshot(
["agent_updated_stream_event", "raw_response_event", "run_item_stream_event"]
)
19 changes: 18 additions & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.