Files
dev-puppeteer/my-deepagent/tests/integration/test_conversation_gui.py
chungyeong 9a02f22acb feat(my-deepagent): v0.4 chat UX boost + A/B live verification
Claude-Code 동급 chat 경험으로 끌어올림 + 7개 핵심 흐름 실제 OpenRouter verify.

A — Live verification (scripts/live_verify.py, 7 PASS, 약 $0.02):
- A1 1-turn chat (CLI-eq) → Haiku 4.5 한국어 응답
- A2 sessions resume → 같은 session_id 재투입 시 LangGraph state 복원
- A3 /skill <name> system inject → SKILL.md ("한국어 haiku 3 lines") 가 정확히
  3행 한국어 시 생성 (LLM 행동 제어 강력한 증거)
- A4 /plan → /approve → LLM plan markdown only, 차단 도구 시도 없음
- A5 /agents spawn → 실제 sub-agent ainvoke + parent stream push
- A6 auto-compaction → 14 메시지 → 4 archive + 77 토큰 summary
- A7 /workflow wiring → role↔persona 매칭 사전 검증

B1 — Markdown rendering:
- app.js pure-JS 미니 파서: 코드 펜스 / ATX 헤더 / ul/ol / `code`/**bold**/
  *italic*/[link](url)
- XSS 정책 유지: createElement + textContent only.  링크 href 는 http(s):
  스킴 강제.

B2 — System event card (collapsible):
- _classifySystemMessage 가 [sub-agent .../workflow .../Earlier conversation
  history/당신은 plan mode/The user APPROVED/skill] 접두사 분류 후 <details>
  카드로 렌더.

B3 — Token streaming via AsyncCallbackHandler:
- ChatOpenAI(streaming=True)
- _StreamingChunkPusher (AsyncCallbackHandler) → asyncio.Queue per session.
- SSE _session_event_stream 이 queue drain → event: chunk SSE.  100ms poll.
- 순서 보장: chunk drain → message rows yield (placeholder 가 메시지로
  교체되기 전에 토큰 visible).
- 라이브: 5 chunk events + 1 final message, "안녕하세요, / 무 / 엇을 도와드 /
  릴까요?" 토큰 단위 push.

B4 — Cancel mid-turn:
- POST /api/sessions/{id}/abort + app.state.pending_per_session 인덱스.
- 새 user 메시지 도착 시 이전 in-flight task 자동 cancel.
- "■ 중단" 버튼 — 대기 중 visible, 완료/취소 시 hide.

B5 — IME composition-safe Enter:
- compositionstart/compositionend 플래그 — 한글 IME 후보 commit Enter 무시.
- Cmd/Ctrl+Enter 는 항상 전송.

DB hot-fix:
- Database.__init__ pool_pre_ping=True — Postgres asyncpg stale connection
  → SSE 부하에서 500 발생 해결.

기타:
- createNewSession 의 repo_path: "" → "." (min_length=1 검증 통과).
- test_conversation_gui.py fake_invoke 가 chunk_queue kwarg 받도록 업데이트.

게이트:
- ruff / format / mypy: PASS (143 source files)
- pytest -q --ignore=tests/integration/test_e2e_workflow.py
  --ignore=tests/integration/test_openrouter_smoke.py: 709 passed

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 01:08:40 +09:00

243 lines
8.0 KiB
Python

"""v0.3 PR #8 — Conversation Web GUI tests.
Covers:
1. GET /conversation.html serves the static file (200).
2. POST /api/sessions/{id}/messages still returns 200 + queues a background
task (the agent_runner is stubbed so we never hit OpenRouter).
3. The background task persists an assistant MessageRow that the SSE stream
then surfaces.
4. The background task is awaited correctly (asyncio.Task ref held on
app.state so RUF006 doesn't drop it mid-flight).
"""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator
from pathlib import Path
from typing import Any
import pytest
from fastapi import FastAPI
from httpx import ASGITransport, AsyncClient
from sqlalchemy import select
from my_deepagent.api.app import create_app
from my_deepagent.config import load_config
from my_deepagent.persistence.db import Database
from my_deepagent.persistence.models import InteractiveSessionRow, MessageRow
@pytest.fixture
async def app_client(
tmp_path: Path,
) -> AsyncIterator[tuple[AsyncClient, Database, FastAPI]]:
db_url = f"sqlite+aiosqlite:///{tmp_path / 'conv.sqlite3'}"
cfg = load_config(
workspace_root=tmp_path,
data_dir=tmp_path / "data",
database_url=db_url,
)
db = Database(db_url)
await db.init_schema()
await db.dispose()
app = create_app(cfg)
transport = ASGITransport(app=app)
async with app.router.lifespan_context(app):
# Tests get their own Database instance for direct row inspection.
external_db = Database(db_url)
async with AsyncClient(transport=transport, base_url="http://test", timeout=10.0) as client:
yield (client, external_db, app)
await external_db.dispose()
# ---------------------------------------------------------------------------
# Static file serving
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_conversation_page_served(
app_client: tuple[AsyncClient, Database, FastAPI],
) -> None:
client, _db, _app = app_client
r = await client.get("/conversation.html")
assert r.status_code == 200
assert 'data-page="conversation"' in r.text
assert "message-input" in r.text
# ---------------------------------------------------------------------------
# POST /messages still 200 + background task fires
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_post_message_returns_ack_and_persists_user_row(
app_client: tuple[AsyncClient, Database, FastAPI], monkeypatch: pytest.MonkeyPatch
) -> None:
client, db, _app = app_client
invocations: list[tuple[str, str]] = []
async def fake_invoke(
_db: Any,
_config: Any,
_personas: Any,
session_id: Any,
user_message: str,
*,
saver: Any = None,
chunk_queue: Any = None,
) -> None:
invocations.append((str(session_id), user_message))
monkeypatch.setattr("my_deepagent.api.routes.sessions.invoke_session_agent", fake_invoke)
# Create a session.
r = await client.post(
"/api/sessions",
json={"persona_name": "default-interactive", "repo_path": str(Path.cwd())},
)
assert r.status_code == 200
sid = r.json()["session_id"]
# POST a message.
r2 = await client.post(f"/api/sessions/{sid}/messages", json={"content": "hello agent"})
assert r2.status_code == 200
assert r2.json()["state"] == "active"
# User row persisted synchronously.
async with db.session() as s:
rows = (
(
await s.execute(
select(MessageRow).where(MessageRow.session_id == sid).order_by(MessageRow.seq)
)
)
.scalars()
.all()
)
assert len(rows) == 1
assert rows[0].role == "user"
assert rows[0].content == "hello agent"
# Give the event loop one cycle so the background task can fire.
await asyncio.sleep(0.05)
assert invocations == [(sid, "hello agent")]
@pytest.mark.asyncio
async def test_post_message_holds_task_ref_on_app_state(
app_client: tuple[AsyncClient, Database, FastAPI], monkeypatch: pytest.MonkeyPatch
) -> None:
"""Background task must be held on app.state.pending_invocations so the
GC + RUF006 don't drop it before completion."""
client, _db, app = app_client
started = asyncio.Event()
can_finish = asyncio.Event()
async def slow_invoke(*_a: Any, **_k: Any) -> None:
started.set()
await can_finish.wait()
monkeypatch.setattr("my_deepagent.api.routes.sessions.invoke_session_agent", slow_invoke)
r = await client.post(
"/api/sessions",
json={"persona_name": "default-interactive", "repo_path": str(Path.cwd())},
)
sid = r.json()["session_id"]
await client.post(f"/api/sessions/{sid}/messages", json={"content": "x"})
# Wait for the task to start.
await asyncio.wait_for(started.wait(), timeout=2.0)
# The pending_invocations set on the app should hold a reference.
pending = app.state.pending_invocations
assert len(pending) == 1
# Release the task and let the discard callback fire.
can_finish.set()
await asyncio.sleep(0.05)
assert len(app.state.pending_invocations) == 0
# ---------------------------------------------------------------------------
# End-to-end: assistant message materializes for SSE
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_background_invocation_persists_assistant_row(
app_client: tuple[AsyncClient, Database, FastAPI], monkeypatch: pytest.MonkeyPatch
) -> None:
"""When the runner finishes, an assistant MessageRow should be visible."""
client, db, _app = app_client
async def fake_invoke(
passed_db: Any,
_config: Any,
_personas: Any,
session_id: Any,
_user_message: str,
*,
saver: Any = None,
chunk_queue: Any = None,
) -> None:
# Simulate what the real runner does: write an assistant MessageRow.
from datetime import UTC, datetime
from sqlalchemy import desc
async with passed_db.session() as s:
last = (
await s.execute(
select(MessageRow.seq)
.where(MessageRow.session_id == str(session_id))
.order_by(desc(MessageRow.seq))
.limit(1)
)
).scalar_one_or_none() or 0
s.add(
MessageRow(
session_id=str(session_id),
seq=last + 1,
role="assistant",
content="(stubbed assistant reply)",
tool_calls=None,
token_count=5,
is_summary=False,
archived=False,
ts=datetime.now(UTC).isoformat(timespec="seconds"),
)
)
await s.commit()
monkeypatch.setattr("my_deepagent.api.routes.sessions.invoke_session_agent", fake_invoke)
r = await client.post(
"/api/sessions",
json={"persona_name": "default-interactive", "repo_path": str(Path.cwd())},
)
sid = r.json()["session_id"]
await client.post(f"/api/sessions/{sid}/messages", json={"content": "ping"})
# Let the background task complete.
await asyncio.sleep(0.1)
# Verify the conversation now has both user + assistant rows.
async with db.session() as s:
rows = (
(
await s.execute(
select(MessageRow).where(MessageRow.session_id == sid).order_by(MessageRow.seq)
)
)
.scalars()
.all()
)
sess_row = await s.get(InteractiveSessionRow, sid)
assert [r.role for r in rows] == ["user", "assistant"]
assert rows[1].content == "(stubbed assistant reply)"
assert sess_row is not None
assert sess_row.title is not None # set from first user message