Claude-Code 동급 chat 경험으로 끌어올림 + 7개 핵심 흐름 실제 OpenRouter verify.
A — Live verification (scripts/live_verify.py, 7 PASS, 약 $0.02):
- A1 1-turn chat (CLI-eq) → Haiku 4.5 한국어 응답
- A2 sessions resume → 같은 session_id 재투입 시 LangGraph state 복원
- A3 /skill <name> system inject → SKILL.md ("한국어 haiku 3 lines") 가 정확히
3행 한국어 시 생성 (LLM 행동 제어 강력한 증거)
- A4 /plan → /approve → LLM plan markdown only, 차단 도구 시도 없음
- A5 /agents spawn → 실제 sub-agent ainvoke + parent stream push
- A6 auto-compaction → 14 메시지 → 4 archive + 77 토큰 summary
- A7 /workflow wiring → role↔persona 매칭 사전 검증
B1 — Markdown rendering:
- app.js pure-JS 미니 파서: 코드 펜스 / ATX 헤더 / ul/ol / `code`/**bold**/
*italic*/[link](url)
- XSS 정책 유지: createElement + textContent only. 링크 href 는 http(s):
스킴 강제.
B2 — System event card (collapsible):
- _classifySystemMessage 가 [sub-agent .../workflow .../Earlier conversation
history/당신은 plan mode/The user APPROVED/skill] 접두사 분류 후 <details>
카드로 렌더.
B3 — Token streaming via AsyncCallbackHandler:
- ChatOpenAI(streaming=True)
- _StreamingChunkPusher (AsyncCallbackHandler) → asyncio.Queue per session.
- SSE _session_event_stream 이 queue drain → event: chunk SSE. 100ms poll.
- 순서 보장: chunk drain → message rows yield (placeholder 가 메시지로
교체되기 전에 토큰 visible).
- 라이브: 5 chunk events + 1 final message, "안녕하세요, / 무 / 엇을 도와드 /
릴까요?" 토큰 단위 push.
B4 — Cancel mid-turn:
- POST /api/sessions/{id}/abort + app.state.pending_per_session 인덱스.
- 새 user 메시지 도착 시 이전 in-flight task 자동 cancel.
- "■ 중단" 버튼 — 대기 중 visible, 완료/취소 시 hide.
B5 — IME composition-safe Enter:
- compositionstart/compositionend 플래그 — 한글 IME 후보 commit Enter 무시.
- Cmd/Ctrl+Enter 는 항상 전송.
DB hot-fix:
- Database.__init__ pool_pre_ping=True — Postgres asyncpg stale connection
→ SSE 부하에서 500 발생 해결.
기타:
- createNewSession 의 repo_path: "" → "." (min_length=1 검증 통과).
- test_conversation_gui.py fake_invoke 가 chunk_queue kwarg 받도록 업데이트.
게이트:
- ruff / format / mypy: PASS (143 source files)
- pytest -q --ignore=tests/integration/test_e2e_workflow.py
--ignore=tests/integration/test_openrouter_smoke.py: 709 passed
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
243 lines
8.0 KiB
Python
243 lines
8.0 KiB
Python
"""v0.3 PR #8 — Conversation Web GUI tests.
|
|
|
|
Covers:
|
|
1. GET /conversation.html serves the static file (200).
|
|
2. POST /api/sessions/{id}/messages still returns 200 + queues a background
|
|
task (the agent_runner is stubbed so we never hit OpenRouter).
|
|
3. The background task persists an assistant MessageRow that the SSE stream
|
|
then surfaces.
|
|
4. The background task is awaited correctly (asyncio.Task ref held on
|
|
app.state so RUF006 doesn't drop it mid-flight).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from collections.abc import AsyncIterator
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
from fastapi import FastAPI
|
|
from httpx import ASGITransport, AsyncClient
|
|
from sqlalchemy import select
|
|
|
|
from my_deepagent.api.app import create_app
|
|
from my_deepagent.config import load_config
|
|
from my_deepagent.persistence.db import Database
|
|
from my_deepagent.persistence.models import InteractiveSessionRow, MessageRow
|
|
|
|
|
|
@pytest.fixture
|
|
async def app_client(
|
|
tmp_path: Path,
|
|
) -> AsyncIterator[tuple[AsyncClient, Database, FastAPI]]:
|
|
db_url = f"sqlite+aiosqlite:///{tmp_path / 'conv.sqlite3'}"
|
|
cfg = load_config(
|
|
workspace_root=tmp_path,
|
|
data_dir=tmp_path / "data",
|
|
database_url=db_url,
|
|
)
|
|
db = Database(db_url)
|
|
await db.init_schema()
|
|
await db.dispose()
|
|
app = create_app(cfg)
|
|
transport = ASGITransport(app=app)
|
|
async with app.router.lifespan_context(app):
|
|
# Tests get their own Database instance for direct row inspection.
|
|
external_db = Database(db_url)
|
|
async with AsyncClient(transport=transport, base_url="http://test", timeout=10.0) as client:
|
|
yield (client, external_db, app)
|
|
await external_db.dispose()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Static file serving
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_conversation_page_served(
|
|
app_client: tuple[AsyncClient, Database, FastAPI],
|
|
) -> None:
|
|
client, _db, _app = app_client
|
|
r = await client.get("/conversation.html")
|
|
assert r.status_code == 200
|
|
assert 'data-page="conversation"' in r.text
|
|
assert "message-input" in r.text
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /messages still 200 + background task fires
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_post_message_returns_ack_and_persists_user_row(
|
|
app_client: tuple[AsyncClient, Database, FastAPI], monkeypatch: pytest.MonkeyPatch
|
|
) -> None:
|
|
client, db, _app = app_client
|
|
|
|
invocations: list[tuple[str, str]] = []
|
|
|
|
async def fake_invoke(
|
|
_db: Any,
|
|
_config: Any,
|
|
_personas: Any,
|
|
session_id: Any,
|
|
user_message: str,
|
|
*,
|
|
saver: Any = None,
|
|
chunk_queue: Any = None,
|
|
) -> None:
|
|
invocations.append((str(session_id), user_message))
|
|
|
|
monkeypatch.setattr("my_deepagent.api.routes.sessions.invoke_session_agent", fake_invoke)
|
|
|
|
# Create a session.
|
|
r = await client.post(
|
|
"/api/sessions",
|
|
json={"persona_name": "default-interactive", "repo_path": str(Path.cwd())},
|
|
)
|
|
assert r.status_code == 200
|
|
sid = r.json()["session_id"]
|
|
|
|
# POST a message.
|
|
r2 = await client.post(f"/api/sessions/{sid}/messages", json={"content": "hello agent"})
|
|
assert r2.status_code == 200
|
|
assert r2.json()["state"] == "active"
|
|
|
|
# User row persisted synchronously.
|
|
async with db.session() as s:
|
|
rows = (
|
|
(
|
|
await s.execute(
|
|
select(MessageRow).where(MessageRow.session_id == sid).order_by(MessageRow.seq)
|
|
)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
assert len(rows) == 1
|
|
assert rows[0].role == "user"
|
|
assert rows[0].content == "hello agent"
|
|
|
|
# Give the event loop one cycle so the background task can fire.
|
|
await asyncio.sleep(0.05)
|
|
assert invocations == [(sid, "hello agent")]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_post_message_holds_task_ref_on_app_state(
|
|
app_client: tuple[AsyncClient, Database, FastAPI], monkeypatch: pytest.MonkeyPatch
|
|
) -> None:
|
|
"""Background task must be held on app.state.pending_invocations so the
|
|
GC + RUF006 don't drop it before completion."""
|
|
client, _db, app = app_client
|
|
|
|
started = asyncio.Event()
|
|
can_finish = asyncio.Event()
|
|
|
|
async def slow_invoke(*_a: Any, **_k: Any) -> None:
|
|
started.set()
|
|
await can_finish.wait()
|
|
|
|
monkeypatch.setattr("my_deepagent.api.routes.sessions.invoke_session_agent", slow_invoke)
|
|
|
|
r = await client.post(
|
|
"/api/sessions",
|
|
json={"persona_name": "default-interactive", "repo_path": str(Path.cwd())},
|
|
)
|
|
sid = r.json()["session_id"]
|
|
await client.post(f"/api/sessions/{sid}/messages", json={"content": "x"})
|
|
|
|
# Wait for the task to start.
|
|
await asyncio.wait_for(started.wait(), timeout=2.0)
|
|
# The pending_invocations set on the app should hold a reference.
|
|
pending = app.state.pending_invocations
|
|
assert len(pending) == 1
|
|
# Release the task and let the discard callback fire.
|
|
can_finish.set()
|
|
await asyncio.sleep(0.05)
|
|
assert len(app.state.pending_invocations) == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# End-to-end: assistant message materializes for SSE
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_background_invocation_persists_assistant_row(
|
|
app_client: tuple[AsyncClient, Database, FastAPI], monkeypatch: pytest.MonkeyPatch
|
|
) -> None:
|
|
"""When the runner finishes, an assistant MessageRow should be visible."""
|
|
client, db, _app = app_client
|
|
|
|
async def fake_invoke(
|
|
passed_db: Any,
|
|
_config: Any,
|
|
_personas: Any,
|
|
session_id: Any,
|
|
_user_message: str,
|
|
*,
|
|
saver: Any = None,
|
|
chunk_queue: Any = None,
|
|
) -> None:
|
|
# Simulate what the real runner does: write an assistant MessageRow.
|
|
from datetime import UTC, datetime
|
|
|
|
from sqlalchemy import desc
|
|
|
|
async with passed_db.session() as s:
|
|
last = (
|
|
await s.execute(
|
|
select(MessageRow.seq)
|
|
.where(MessageRow.session_id == str(session_id))
|
|
.order_by(desc(MessageRow.seq))
|
|
.limit(1)
|
|
)
|
|
).scalar_one_or_none() or 0
|
|
s.add(
|
|
MessageRow(
|
|
session_id=str(session_id),
|
|
seq=last + 1,
|
|
role="assistant",
|
|
content="(stubbed assistant reply)",
|
|
tool_calls=None,
|
|
token_count=5,
|
|
is_summary=False,
|
|
archived=False,
|
|
ts=datetime.now(UTC).isoformat(timespec="seconds"),
|
|
)
|
|
)
|
|
await s.commit()
|
|
|
|
monkeypatch.setattr("my_deepagent.api.routes.sessions.invoke_session_agent", fake_invoke)
|
|
|
|
r = await client.post(
|
|
"/api/sessions",
|
|
json={"persona_name": "default-interactive", "repo_path": str(Path.cwd())},
|
|
)
|
|
sid = r.json()["session_id"]
|
|
await client.post(f"/api/sessions/{sid}/messages", json={"content": "ping"})
|
|
# Let the background task complete.
|
|
await asyncio.sleep(0.1)
|
|
|
|
# Verify the conversation now has both user + assistant rows.
|
|
async with db.session() as s:
|
|
rows = (
|
|
(
|
|
await s.execute(
|
|
select(MessageRow).where(MessageRow.session_id == sid).order_by(MessageRow.seq)
|
|
)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
sess_row = await s.get(InteractiveSessionRow, sid)
|
|
assert [r.role for r in rows] == ["user", "assistant"]
|
|
assert rows[1].content == "(stubbed assistant reply)"
|
|
assert sess_row is not None
|
|
assert sess_row.title is not None # set from first user message
|