feat(my-deepagent): v0.3 PR #8 — conversation-centric Web GUI (/conversation.html)
Workflow run 페이지를 archive 로 격하시키고, 사용자가 처음 보는 화면을
chat-style 대화 thread 로 전환. Claude Code 의 Web GUI 와 동일한 UX.
핵심 동작:
- 새 페이지 `/conversation.html` 에서 세션을 picker 로 고르거나 "새 대화"
버튼으로 만들고 메시지 입력. Cmd/Ctrl+Enter 로 전송.
- POST /api/sessions/{id}/messages 가 user MessageRow 를 영속한 즉시 200 응답
후 `asyncio.create_task(invoke_session_agent(...))` 로 백그라운드 invoke 발사.
- 백그라운드 task 는 lifespan 에서 1회 열어둔 LangGraph saver 를 재사용하고
agent.ainvoke → assistant MessageRow 영속 → 자동 compaction 까지 처리.
- 기존 SSE 스트림 (`/api/sessions/{id}/stream`) 이 새 메시지를 push,
프론트엔드의 `EventSource` 가 받아 thread 에 렌더.
신규 / 수정 파일:
- `static/conversation.html` (신규): chat UI 마크업. data-page="conversation".
- `static/app.js`: 새 페이지 핸들러 `bootstrapConversationPage` +
세션 picker + 메시지 thread 렌더 + SSE 구독 + Cmd/Ctrl+Enter 단축키.
XSS 정책 동일: 모든 사용자 콘텐츠는 `textContent` 만 사용.
- `static/style.css`: `.messages-thread`, `.msg-bubble`, `.conv-topbar`,
`.conv-input-bar` 등 chat UI 스타일.
- `api/app.py`: lifespan 에서 LangGraph saver 를 1회 열어 `app.state.saver`
에 보관 (Postgres 일 때만).
- `api/agent_runner.py` (신규): `invoke_session_agent(...)` — REPL 의
`InteractiveSession + _invoke_and_stream` 와 동일한 stack 을 HTTP background
context 용으로 재구성. 실패는 로깅 + return.
- `api/routes/sessions.py`: POST /messages 가 background task 발사 + ref 를
`app.state.pending_invocations` set 에 보관 (RUF006 / GC drop 방지).
테스트 (`tests/integration/test_conversation_gui.py`, 4 케이스):
- GET /conversation.html → 200 + 필수 마크업
- POST /messages → 200 + user row 영속 + 스텁 runner 호출 확인
- 백그라운드 task ref 가 `pending_invocations` 에 잡혀있고 완료 후 자동 discard
- 스텁 runner 가 assistant row 영속 → user + assistant 시퀀스 검증
게이트:
- ruff check / format --check / mypy: PASS
- pytest -q --ignore=tests/integration/test_e2e_workflow.py
--ignore=tests/integration/test_openrouter_smoke.py: 675 passed (4 신규 포함)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
238
my-deepagent/src/my_deepagent/api/agent_runner.py
Normal file
238
my-deepagent/src/my_deepagent/api/agent_runner.py
Normal file
@@ -0,0 +1,238 @@
|
||||
"""Background agent invocation for the Web GUI (v0.3 PR #8).
|
||||
|
||||
The Web GUI POSTs user messages to ``/api/sessions/{id}/messages`` and expects
|
||||
an assistant response to appear via the SSE stream shortly after. The route
|
||||
handler persists the user message and kicks off this runner as a fire-and-
|
||||
forget asyncio task — same fundamentals as :mod:`cli.interactive` but without
|
||||
the prompt-toolkit REPL loop.
|
||||
|
||||
This runner is **single-uvicorn-worker** by design (see ``api/app.py``'s
|
||||
docstring): the saver is held on ``app.state.saver`` and shared across all
|
||||
background invocations. Multi-worker support would require Postgres
|
||||
``LISTEN/NOTIFY`` fanout — deferred per plan.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from sqlalchemy import desc, select
|
||||
|
||||
from ..audit import make_audit_recorder
|
||||
from ..budget import make_budget_tracker_from_config
|
||||
from ..compaction import compact_session, should_compact
|
||||
from ..config import Config
|
||||
from ..hash import sha256
|
||||
from ..instructions import (
|
||||
ensure_global_instructions_initialized,
|
||||
resolve_instruction_paths,
|
||||
)
|
||||
from ..memory import (
|
||||
ensure_memory_initialized,
|
||||
list_memory_paths,
|
||||
project_memory_dir,
|
||||
)
|
||||
from ..middleware.audit import AuditToolMiddleware
|
||||
from ..middleware.cost import CostMiddleware
|
||||
from ..middleware.plan_mode import PlanModeMiddleware
|
||||
from ..monitoring.pricing import ModelPrice, PricingCache
|
||||
from ..monitoring.token_budget import count_tokens
|
||||
from ..persistence.db import Database
|
||||
from ..persistence.models import InteractiveSessionRow, MessageRow
|
||||
from ..persona import Persona
|
||||
from ..session import build_agent
|
||||
from ..skills import ensure_skills_initialized, resolve_skill_sources, user_skills_dir
|
||||
|
||||
_LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _static_pricing_seed() -> PricingCache:
|
||||
"""Minimal seed identical to the REPL's _static_pricing_seed."""
|
||||
cache = PricingCache()
|
||||
cache.set(
|
||||
[
|
||||
ModelPrice("anthropic/claude-sonnet-4-6", 0.003, 0.015, 200_000),
|
||||
ModelPrice("anthropic/claude-haiku-4-5", 0.001, 0.005, 200_000),
|
||||
ModelPrice("anthropic/claude-opus-4-1", 0.015, 0.075, 200_000),
|
||||
ModelPrice("deepseek/deepseek-chat", 0.00028, 0.00112, 64_000),
|
||||
]
|
||||
)
|
||||
return cache
|
||||
|
||||
|
||||
def _flatten_assistant_content(message: Any) -> str:
|
||||
"""Convert a langchain assistant message's content into a plain string.
|
||||
|
||||
LangChain may return a list of content blocks (text + tool_use); we
|
||||
concatenate the text-bearing pieces. Falls back to ``str(content)`` if
|
||||
the shape is unexpected.
|
||||
"""
|
||||
content = getattr(message, "content", "") or ""
|
||||
if isinstance(content, list):
|
||||
parts: list[str] = []
|
||||
for block in content:
|
||||
if isinstance(block, dict):
|
||||
parts.append(block.get("text", "") or "")
|
||||
else:
|
||||
parts.append(str(block))
|
||||
return "\n".join(p for p in parts if p)
|
||||
return str(content)
|
||||
|
||||
|
||||
async def _bootstrap_session_dirs(config: Config, project_key: str) -> None:
|
||||
"""Ensure memory + skills + global instruction dirs exist for the session.
|
||||
|
||||
Mirrors :class:`cli.interactive.InteractiveSession.__init__`. Idempotent
|
||||
so repeated background invocations are cheap.
|
||||
"""
|
||||
ensure_memory_initialized(project_memory_dir(config, project_key))
|
||||
ensure_skills_initialized(user_skills_dir(config))
|
||||
ensure_global_instructions_initialized(config)
|
||||
|
||||
|
||||
async def invoke_session_agent(
|
||||
db: Database,
|
||||
config: Config,
|
||||
personas: list[Persona],
|
||||
session_id: UUID,
|
||||
user_message: str,
|
||||
*,
|
||||
saver: Any | None = None,
|
||||
) -> None:
|
||||
"""Run one ainvoke + persist the assistant reply for the given session.
|
||||
|
||||
The user message is assumed to be ALREADY persisted by the HTTP handler
|
||||
(POST /api/sessions/{id}/messages). This runner only adds the assistant
|
||||
response and runs the post-turn auto-compaction check.
|
||||
|
||||
Failures are logged but never raised — the route handler returned 200 as
|
||||
soon as the user message was persisted, and the SSE stream is how the
|
||||
client observes success or absence of progress.
|
||||
"""
|
||||
async with db.session() as s:
|
||||
row = await s.get(InteractiveSessionRow, str(session_id))
|
||||
if row is None:
|
||||
_LOG.warning("invoke_session_agent: session %s not found", session_id)
|
||||
return
|
||||
|
||||
persona = _resolve_persona(personas, row.persona_hash)
|
||||
if persona is None:
|
||||
_LOG.warning(
|
||||
"invoke_session_agent: persona hash %s not in loaded personas", row.persona_hash
|
||||
)
|
||||
return
|
||||
|
||||
project_key = row.project_key or sha256(str(config.workspace_root.resolve()))[:16]
|
||||
await _bootstrap_session_dirs(config, project_key)
|
||||
|
||||
# Build agent. No model override at the API layer — `/model` slash is REPL-only.
|
||||
pricing = _static_pricing_seed()
|
||||
budget = make_budget_tracker_from_config(db, config)
|
||||
cost_mw = CostMiddleware(
|
||||
pricing=pricing,
|
||||
model_name=row.model or persona.model,
|
||||
interactive_session_id=session_id,
|
||||
persona_name=persona.name,
|
||||
budget_tracker=budget,
|
||||
)
|
||||
audit_mw = AuditToolMiddleware(
|
||||
interactive_session_id=session_id,
|
||||
file_recorder=make_audit_recorder(config.state_dir),
|
||||
)
|
||||
# Plan-mode is read from the session row — API users can toggle via a
|
||||
# future API endpoint; REPL toggles via /plan slash.
|
||||
is_plan = bool(row.plan_mode)
|
||||
plan_mw = PlanModeMiddleware(is_active=lambda: is_plan)
|
||||
|
||||
memory_dir = project_memory_dir(config, project_key)
|
||||
instruction_paths = resolve_instruction_paths(config, config.workspace_root)
|
||||
memory_paths = list_memory_paths(memory_dir)
|
||||
skill_sources = resolve_skill_sources(config)
|
||||
|
||||
agent = build_agent(
|
||||
persona,
|
||||
config,
|
||||
root_dir=config.workspace_root,
|
||||
middleware=[plan_mw, cost_mw, audit_mw],
|
||||
checkpointer=saver,
|
||||
memory_paths_override=[*instruction_paths, *memory_paths],
|
||||
skills_sources_override=skill_sources,
|
||||
)
|
||||
|
||||
thread_id = f"{session_id}:0"
|
||||
try:
|
||||
result = await agent.ainvoke(
|
||||
{"messages": [{"role": "user", "content": user_message}]},
|
||||
config={"configurable": {"thread_id": thread_id}},
|
||||
)
|
||||
except Exception:
|
||||
_LOG.exception("agent.ainvoke failed for session %s", session_id)
|
||||
return
|
||||
|
||||
messages = result.get("messages", []) if isinstance(result, dict) else []
|
||||
if not messages:
|
||||
return
|
||||
assistant_text = _flatten_assistant_content(messages[-1])
|
||||
if not assistant_text:
|
||||
return
|
||||
|
||||
await _persist_assistant_message(db, session_id, assistant_text, row.model or persona.model)
|
||||
|
||||
# Post-turn auto-compaction (mirrors REPL behaviour).
|
||||
async with db.session() as s:
|
||||
refreshed = await s.get(InteractiveSessionRow, str(session_id))
|
||||
if refreshed is not None and should_compact(refreshed):
|
||||
await compact_session(db, config, str(session_id))
|
||||
|
||||
|
||||
def _resolve_persona(personas: list[Persona], persona_hash: str) -> Persona | None:
|
||||
for p in personas:
|
||||
if p.compute_hash() == persona_hash:
|
||||
return p
|
||||
return None
|
||||
|
||||
|
||||
async def _persist_assistant_message(
|
||||
db: Database,
|
||||
session_id: UUID,
|
||||
content: str,
|
||||
model: str,
|
||||
) -> None:
|
||||
token_count = count_tokens(content, model)
|
||||
from datetime import UTC, datetime
|
||||
|
||||
now = datetime.now(UTC).isoformat(timespec="seconds")
|
||||
async with db.session() as s:
|
||||
last_seq = (
|
||||
await s.execute(
|
||||
select(MessageRow.seq)
|
||||
.where(MessageRow.session_id == str(session_id))
|
||||
.order_by(desc(MessageRow.seq))
|
||||
.limit(1)
|
||||
)
|
||||
).scalar_one_or_none() or 0
|
||||
s.add(
|
||||
MessageRow(
|
||||
session_id=str(session_id),
|
||||
seq=last_seq + 1,
|
||||
role="assistant",
|
||||
content=content,
|
||||
tool_calls=None,
|
||||
token_count=token_count,
|
||||
is_summary=False,
|
||||
archived=False,
|
||||
ts=now,
|
||||
)
|
||||
)
|
||||
row = await s.get(InteractiveSessionRow, str(session_id))
|
||||
if row is not None:
|
||||
row.last_message_at = now
|
||||
row.total_output_tokens += token_count
|
||||
await s.commit()
|
||||
|
||||
|
||||
# Re-exported for tests that want to construct a fresh persona+session row
|
||||
# without going through the HTTP layer.
|
||||
__all__ = ["invoke_session_agent", "uuid4"]
|
||||
@@ -17,6 +17,7 @@ from fastapi.staticfiles import StaticFiles
|
||||
from starlette.responses import FileResponse
|
||||
|
||||
from ..config import Config, load_config
|
||||
from ..persistence.checkpointer import get_checkpointer_ctx
|
||||
from ..persistence.db import Database
|
||||
from ..persona import load_personas_from_dir
|
||||
from ..workflow import WorkflowTemplate, load_workflow_yaml
|
||||
@@ -54,7 +55,14 @@ def _load_seed_workflows() -> list[tuple[Path, WorkflowTemplate]]:
|
||||
|
||||
@asynccontextmanager
|
||||
async def _lifespan(app: FastAPI) -> AsyncIterator[None]:
|
||||
"""Initialize the shared Database, personas, workflows on startup; dispose on shutdown."""
|
||||
"""Initialize the shared Database, personas, workflows, LangGraph saver on
|
||||
startup; dispose on shutdown.
|
||||
|
||||
The saver is opened once per app lifetime and reused by background agent
|
||||
invocations from POST /api/sessions/{id}/messages (v0.3 PR #8). Opening
|
||||
per-request would be too expensive (each open establishes a Postgres
|
||||
connection + verifies the checkpoint schema).
|
||||
"""
|
||||
config: Config = app.state.config or load_config()
|
||||
db = Database(config.database_url)
|
||||
# init_schema is a no-op against an already-migrated DB; cheap to call.
|
||||
@@ -63,9 +71,23 @@ async def _lifespan(app: FastAPI) -> AsyncIterator[None]:
|
||||
app.state.db = db
|
||||
app.state.personas = load_personas_from_dir(_DOCS_SCHEMAS / "personas")
|
||||
app.state.workflows = _load_seed_workflows()
|
||||
saver_ctx = get_checkpointer_ctx(config.database_url)
|
||||
try:
|
||||
# AsyncPostgresSaver.from_conn_string only works for postgres; for sqlite
|
||||
# tests we silently fall back to None and let background ainvoke run
|
||||
# without checkpointing (acceptable: tests stub agents anyway).
|
||||
if config.database_url.startswith("postgresql"):
|
||||
saver = await saver_ctx.__aenter__()
|
||||
app.state.saver = saver
|
||||
else:
|
||||
app.state.saver = None
|
||||
yield
|
||||
finally:
|
||||
if app.state.saver is not None:
|
||||
try:
|
||||
await saver_ctx.__aexit__(None, None, None)
|
||||
except Exception:
|
||||
_LOG.exception("saver context exit failed during shutdown")
|
||||
await db.dispose()
|
||||
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ from ...persistence.models import (
|
||||
MessageRow,
|
||||
)
|
||||
from ...persona import Persona
|
||||
from ..agent_runner import invoke_session_agent
|
||||
from ..deps import get_config, get_db, get_personas
|
||||
from ..models import (
|
||||
CreateSessionRequest,
|
||||
@@ -218,8 +219,18 @@ async def create_session(
|
||||
async def post_message(
|
||||
session_id: str,
|
||||
body: PostMessageRequest,
|
||||
request: Request,
|
||||
db: DbDep,
|
||||
config: ConfigDep,
|
||||
personas: PersonasDep,
|
||||
) -> SessionAck:
|
||||
"""Persist a user message + fire the agent invocation in the background.
|
||||
|
||||
v0.3 PR #8: returns immediately after the user message is durably
|
||||
persisted. The background task fetches the saver from ``app.state`` (set
|
||||
up by the lifespan) and emits the assistant reply via the same SSE stream
|
||||
that the client is already subscribed to.
|
||||
"""
|
||||
async with db.session() as s:
|
||||
row = await s.get(InteractiveSessionRow, session_id)
|
||||
if row is None:
|
||||
@@ -257,6 +268,27 @@ async def post_message(
|
||||
row.title = body.content[:50]
|
||||
await s.commit()
|
||||
|
||||
# Fire-and-forget background invocation. We do NOT await it — the route
|
||||
# returns 200 immediately and the SSE stream picks up the assistant reply.
|
||||
# Hold a reference on app.state so RUF006 + GC don't kill the task mid-flight.
|
||||
saver = getattr(request.app.state, "saver", None)
|
||||
from uuid import UUID
|
||||
|
||||
task = asyncio.create_task(
|
||||
invoke_session_agent(
|
||||
db,
|
||||
config,
|
||||
personas,
|
||||
UUID(session_id),
|
||||
body.content,
|
||||
saver=saver,
|
||||
)
|
||||
)
|
||||
pending: set[asyncio.Task[Any]] = getattr(request.app.state, "pending_invocations", set())
|
||||
pending.add(task)
|
||||
request.app.state.pending_invocations = pending
|
||||
task.add_done_callback(pending.discard)
|
||||
|
||||
return SessionAck(session_id=session_id, state="active", message="queued")
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user