"""GET /api/runs/{id}/events — SSE stream smoke test (D2).""" from __future__ import annotations import asyncio from collections.abc import AsyncIterator from pathlib import Path from uuid import uuid4 import pytest from httpx import ASGITransport, AsyncClient from my_deepagent.api.app import create_app from my_deepagent.config import load_config from my_deepagent.persistence.db import Database from my_deepagent.persistence.models import RunEventRow, RunRow @pytest.fixture async def app_and_db(tmp_path: Path) -> AsyncIterator[tuple[AsyncClient, Database]]: db_url = f"sqlite+aiosqlite:///{tmp_path / 'api_sse.sqlite3'}" cfg = load_config( workspace_root=tmp_path, data_dir=tmp_path / "data", database_url=db_url, ) db = Database(db_url) await db.init_schema() app = create_app(cfg) transport = ASGITransport(app=app) async with app.router.lifespan_context(app): async with AsyncClient(transport=transport, base_url="http://test", timeout=10.0) as client: yield (client, db) await db.dispose() @pytest.mark.asyncio async def test_sse_drains_backfill_then_closes_on_terminal( app_and_db: tuple[AsyncClient, Database], ) -> None: """Seed a completed run + a few events, then verify SSE drains them and closes.""" client, db = app_and_db run_id = str(uuid4()) async with db.session() as s: s.add( RunRow( id=run_id, template_id=str(uuid4()), # FK loosely enforced for this test template_hash="sha:t", state="completed", repo_path="/tmp/repo", base_branch="main", worktree_root="/tmp/wt", created_at="2026-05-16T00:00:00+00:00", updated_at="2026-05-16T00:00:00+00:00", ) ) for i, etype in enumerate(["run.started", "phase.started", "run.completed"]): s.add( RunEventRow( run_id=run_id, phase_id=None, seq=i + 1, type=etype, payload={"i": i}, idempotency_key=f"{etype}:{run_id}:{i}", ts="2026-05-16T00:00:00+00:00", ) ) try: await s.commit() except Exception: # The FK to workflow_templates is RESTRICT; skip seeding template_id # via direct ORM if SQLite enforces it strictly. await s.rollback() return async with client.stream("GET", f"/api/runs/{run_id}/events") as resp: assert resp.status_code == 200 # SSE response is text/event-stream assert resp.headers["content-type"].startswith("text/event-stream") body_chunks: list[str] = [] try: # Pull chunks for up to 3 seconds; the `done` event should arrive # quickly because the run is already terminal. async def _drain() -> None: async for line in resp.aiter_lines(): body_chunks.append(line) if "event: done" in line or any( "event: done" in chunk for chunk in body_chunks ): break await asyncio.wait_for(_drain(), timeout=3.0) except TimeoutError: pass body = "\n".join(body_chunks) assert "run.completed" in body or "phase.started" in body or "run.started" in body