"""v0.3 PR #8 — Conversation Web GUI tests. Covers: 1. GET /conversation.html serves the static file (200). 2. POST /api/sessions/{id}/messages still returns 200 + queues a background task (the agent_runner is stubbed so we never hit OpenRouter). 3. The background task persists an assistant MessageRow that the SSE stream then surfaces. 4. The background task is awaited correctly (asyncio.Task ref held on app.state so RUF006 doesn't drop it mid-flight). """ from __future__ import annotations import asyncio from collections.abc import AsyncIterator from pathlib import Path from typing import Any import pytest from fastapi import FastAPI from httpx import ASGITransport, AsyncClient from sqlalchemy import select from my_deepagent.api.app import create_app from my_deepagent.config import load_config from my_deepagent.persistence.db import Database from my_deepagent.persistence.models import InteractiveSessionRow, MessageRow @pytest.fixture async def app_client( tmp_path: Path, ) -> AsyncIterator[tuple[AsyncClient, Database, FastAPI]]: db_url = f"sqlite+aiosqlite:///{tmp_path / 'conv.sqlite3'}" cfg = load_config( workspace_root=tmp_path, data_dir=tmp_path / "data", database_url=db_url, ) db = Database(db_url) await db.init_schema() await db.dispose() app = create_app(cfg) transport = ASGITransport(app=app) async with app.router.lifespan_context(app): # Tests get their own Database instance for direct row inspection. external_db = Database(db_url) async with AsyncClient(transport=transport, base_url="http://test", timeout=10.0) as client: yield (client, external_db, app) await external_db.dispose() # --------------------------------------------------------------------------- # Static file serving # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_conversation_page_served( app_client: tuple[AsyncClient, Database, FastAPI], ) -> None: client, _db, _app = app_client r = await client.get("/conversation.html") assert r.status_code == 200 assert 'data-page="conversation"' in r.text assert "message-input" in r.text # --------------------------------------------------------------------------- # POST /messages still 200 + background task fires # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_post_message_returns_ack_and_persists_user_row( app_client: tuple[AsyncClient, Database, FastAPI], monkeypatch: pytest.MonkeyPatch ) -> None: client, db, _app = app_client invocations: list[tuple[str, str]] = [] async def fake_invoke( _db: Any, _config: Any, _personas: Any, session_id: Any, user_message: str, *, saver: Any = None, ) -> None: invocations.append((str(session_id), user_message)) monkeypatch.setattr("my_deepagent.api.routes.sessions.invoke_session_agent", fake_invoke) # Create a session. r = await client.post( "/api/sessions", json={"persona_name": "default-interactive", "repo_path": str(Path.cwd())}, ) assert r.status_code == 200 sid = r.json()["session_id"] # POST a message. r2 = await client.post(f"/api/sessions/{sid}/messages", json={"content": "hello agent"}) assert r2.status_code == 200 assert r2.json()["state"] == "active" # User row persisted synchronously. async with db.session() as s: rows = ( ( await s.execute( select(MessageRow).where(MessageRow.session_id == sid).order_by(MessageRow.seq) ) ) .scalars() .all() ) assert len(rows) == 1 assert rows[0].role == "user" assert rows[0].content == "hello agent" # Give the event loop one cycle so the background task can fire. await asyncio.sleep(0.05) assert invocations == [(sid, "hello agent")] @pytest.mark.asyncio async def test_post_message_holds_task_ref_on_app_state( app_client: tuple[AsyncClient, Database, FastAPI], monkeypatch: pytest.MonkeyPatch ) -> None: """Background task must be held on app.state.pending_invocations so the GC + RUF006 don't drop it before completion.""" client, _db, app = app_client started = asyncio.Event() can_finish = asyncio.Event() async def slow_invoke(*_a: Any, **_k: Any) -> None: started.set() await can_finish.wait() monkeypatch.setattr("my_deepagent.api.routes.sessions.invoke_session_agent", slow_invoke) r = await client.post( "/api/sessions", json={"persona_name": "default-interactive", "repo_path": str(Path.cwd())}, ) sid = r.json()["session_id"] await client.post(f"/api/sessions/{sid}/messages", json={"content": "x"}) # Wait for the task to start. await asyncio.wait_for(started.wait(), timeout=2.0) # The pending_invocations set on the app should hold a reference. pending = app.state.pending_invocations assert len(pending) == 1 # Release the task and let the discard callback fire. can_finish.set() await asyncio.sleep(0.05) assert len(app.state.pending_invocations) == 0 # --------------------------------------------------------------------------- # End-to-end: assistant message materializes for SSE # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_background_invocation_persists_assistant_row( app_client: tuple[AsyncClient, Database, FastAPI], monkeypatch: pytest.MonkeyPatch ) -> None: """When the runner finishes, an assistant MessageRow should be visible.""" client, db, _app = app_client async def fake_invoke( passed_db: Any, _config: Any, _personas: Any, session_id: Any, _user_message: str, *, saver: Any = None, ) -> None: # Simulate what the real runner does: write an assistant MessageRow. from datetime import UTC, datetime from sqlalchemy import desc async with passed_db.session() as s: last = ( await s.execute( select(MessageRow.seq) .where(MessageRow.session_id == str(session_id)) .order_by(desc(MessageRow.seq)) .limit(1) ) ).scalar_one_or_none() or 0 s.add( MessageRow( session_id=str(session_id), seq=last + 1, role="assistant", content="(stubbed assistant reply)", tool_calls=None, token_count=5, is_summary=False, archived=False, ts=datetime.now(UTC).isoformat(timespec="seconds"), ) ) await s.commit() monkeypatch.setattr("my_deepagent.api.routes.sessions.invoke_session_agent", fake_invoke) r = await client.post( "/api/sessions", json={"persona_name": "default-interactive", "repo_path": str(Path.cwd())}, ) sid = r.json()["session_id"] await client.post(f"/api/sessions/{sid}/messages", json={"content": "ping"}) # Let the background task complete. await asyncio.sleep(0.1) # Verify the conversation now has both user + assistant rows. async with db.session() as s: rows = ( ( await s.execute( select(MessageRow).where(MessageRow.session_id == sid).order_by(MessageRow.seq) ) ) .scalars() .all() ) sess_row = await s.get(InteractiveSessionRow, sid) assert [r.role for r in rows] == ["user", "assistant"] assert rows[1].content == "(stubbed assistant reply)" assert sess_row is not None assert sess_row.title is not None # set from first user message