feat(my-deepagent): v0.4 chat UX boost + A/B live verification
Claude-Code 동급 chat 경험으로 끌어올림 + 7개 핵심 흐름 실제 OpenRouter verify.
A — Live verification (scripts/live_verify.py, 7 PASS, 약 $0.02):
- A1 1-turn chat (CLI-eq) → Haiku 4.5 한국어 응답
- A2 sessions resume → 같은 session_id 재투입 시 LangGraph state 복원
- A3 /skill <name> system inject → SKILL.md ("한국어 haiku 3 lines") 가 정확히
3행 한국어 시 생성 (LLM 행동 제어 강력한 증거)
- A4 /plan → /approve → LLM plan markdown only, 차단 도구 시도 없음
- A5 /agents spawn → 실제 sub-agent ainvoke + parent stream push
- A6 auto-compaction → 14 메시지 → 4 archive + 77 토큰 summary
- A7 /workflow wiring → role↔persona 매칭 사전 검증
B1 — Markdown rendering:
- app.js pure-JS 미니 파서: 코드 펜스 / ATX 헤더 / ul/ol / `code`/**bold**/
*italic*/[link](url)
- XSS 정책 유지: createElement + textContent only. 링크 href 는 http(s):
스킴 강제.
B2 — System event card (collapsible):
- _classifySystemMessage 가 [sub-agent .../workflow .../Earlier conversation
history/당신은 plan mode/The user APPROVED/skill] 접두사 분류 후 <details>
카드로 렌더.
B3 — Token streaming via AsyncCallbackHandler:
- ChatOpenAI(streaming=True)
- _StreamingChunkPusher (AsyncCallbackHandler) → asyncio.Queue per session.
- SSE _session_event_stream 이 queue drain → event: chunk SSE. 100ms poll.
- 순서 보장: chunk drain → message rows yield (placeholder 가 메시지로
교체되기 전에 토큰 visible).
- 라이브: 5 chunk events + 1 final message, "안녕하세요, / 무 / 엇을 도와드 /
릴까요?" 토큰 단위 push.
B4 — Cancel mid-turn:
- POST /api/sessions/{id}/abort + app.state.pending_per_session 인덱스.
- 새 user 메시지 도착 시 이전 in-flight task 자동 cancel.
- "■ 중단" 버튼 — 대기 중 visible, 완료/취소 시 hide.
B5 — IME composition-safe Enter:
- compositionstart/compositionend 플래그 — 한글 IME 후보 commit Enter 무시.
- Cmd/Ctrl+Enter 는 항상 전송.
DB hot-fix:
- Database.__init__ pool_pre_ping=True — Postgres asyncpg stale connection
→ SSE 부하에서 500 발생 해결.
기타:
- createNewSession 의 repo_path: "" → "." (min_length=1 검증 통과).
- test_conversation_gui.py fake_invoke 가 chunk_queue kwarg 받도록 업데이트.
게이트:
- ruff / format / mypy: PASS (143 source files)
- pytest -q --ignore=tests/integration/test_e2e_workflow.py
--ignore=tests/integration/test_openrouter_smoke.py: 709 passed
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
"""Background agent invocation for the Web GUI (v0.3 PR #8).
|
||||
"""Background agent invocation for the Web GUI (v0.3 PR #8 + v0.4 B3 streaming).
|
||||
|
||||
The Web GUI POSTs user messages to ``/api/sessions/{id}/messages`` and expects
|
||||
an assistant response to appear via the SSE stream shortly after. The route
|
||||
@@ -6,6 +6,12 @@ handler persists the user message and kicks off this runner as a fire-and-
|
||||
forget asyncio task — same fundamentals as :mod:`cli.interactive` but without
|
||||
the prompt-toolkit REPL loop.
|
||||
|
||||
v0.4 B3 adds token streaming: a ``chunk_queue`` (per-session ``asyncio.Queue``)
|
||||
can be passed in. We attach a ``BaseAsyncCallbackHandler`` to the ainvoke
|
||||
config so every new token the LLM emits lands on the queue as
|
||||
``{"type": "delta", "text": "..."}``. The SSE stream loop drains the queue
|
||||
and pushes each chunk as an ``event: chunk`` SSE.
|
||||
|
||||
This runner is **single-uvicorn-worker** by design (see ``api/app.py``'s
|
||||
docstring): the saver is held on ``app.state.saver`` and shared across all
|
||||
background invocations. Multi-worker support would require Postgres
|
||||
@@ -14,10 +20,12 @@ background invocations. Multi-worker support would require Postgres
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Any
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from langchain_core.callbacks import AsyncCallbackHandler
|
||||
from sqlalchemy import desc, select
|
||||
|
||||
from ..audit import make_audit_recorder
|
||||
@@ -92,6 +100,74 @@ async def _bootstrap_session_dirs(config: Config, project_key: str) -> None:
|
||||
ensure_global_instructions_initialized(config)
|
||||
|
||||
|
||||
class _StreamingChunkPusher(AsyncCallbackHandler):
|
||||
"""Push every `on_llm_new_token` onto a session-bound asyncio.Queue.
|
||||
|
||||
The SSE stream consumes the queue and pushes each chunk as an SSE
|
||||
``event: chunk`` so the browser can render typing-style streaming.
|
||||
"""
|
||||
|
||||
def __init__(self, queue: asyncio.Queue[dict[str, Any]]) -> None:
|
||||
self._queue = queue
|
||||
|
||||
async def on_llm_new_token(self, token: str, **_kwargs: Any) -> None:
|
||||
if not token:
|
||||
return
|
||||
try:
|
||||
await self._queue.put({"type": "delta", "text": token})
|
||||
except Exception:
|
||||
# Never let a streaming push failure abort the LLM call.
|
||||
_LOG.debug("chunk-queue put failed (queue likely closed)", exc_info=True)
|
||||
|
||||
|
||||
def _build_session_agent(
|
||||
db: Database,
|
||||
config: Config,
|
||||
persona: Persona,
|
||||
session_id: UUID,
|
||||
row: InteractiveSessionRow,
|
||||
*,
|
||||
saver: Any | None,
|
||||
) -> Any:
|
||||
"""Assemble the deepagents CompiledStateGraph for one session invocation.
|
||||
|
||||
Extracted from :func:`invoke_session_agent` to keep that function under
|
||||
the C901 complexity threshold. Pure construction — no side effects on the
|
||||
DB beyond what `build_agent` itself does.
|
||||
"""
|
||||
pricing = _static_pricing_seed()
|
||||
budget = make_budget_tracker_from_config(db, config)
|
||||
cost_mw = CostMiddleware(
|
||||
pricing=pricing,
|
||||
model_name=row.model or persona.model,
|
||||
interactive_session_id=session_id,
|
||||
persona_name=persona.name,
|
||||
budget_tracker=budget,
|
||||
)
|
||||
audit_mw = AuditToolMiddleware(
|
||||
interactive_session_id=session_id,
|
||||
file_recorder=make_audit_recorder(config.state_dir),
|
||||
)
|
||||
is_plan = bool(row.plan_mode)
|
||||
plan_mw = PlanModeMiddleware(is_active=lambda: is_plan)
|
||||
|
||||
project_key = row.project_key or sha256(str(config.workspace_root.resolve()))[:16]
|
||||
memory_dir = project_memory_dir(config, project_key)
|
||||
instruction_paths = resolve_instruction_paths(config, config.workspace_root)
|
||||
memory_paths = list_memory_paths(memory_dir)
|
||||
skill_sources = resolve_skill_sources(config)
|
||||
|
||||
return build_agent(
|
||||
persona,
|
||||
config,
|
||||
root_dir=config.workspace_root,
|
||||
middleware=[plan_mw, cost_mw, audit_mw],
|
||||
checkpointer=saver,
|
||||
memory_paths_override=[*instruction_paths, *memory_paths],
|
||||
skills_sources_override=skill_sources,
|
||||
)
|
||||
|
||||
|
||||
async def invoke_session_agent(
|
||||
db: Database,
|
||||
config: Config,
|
||||
@@ -100,6 +176,7 @@ async def invoke_session_agent(
|
||||
user_message: str,
|
||||
*,
|
||||
saver: Any | None = None,
|
||||
chunk_queue: asyncio.Queue[dict[str, Any]] | None = None,
|
||||
) -> None:
|
||||
"""Run one ainvoke + persist the assistant reply for the given session.
|
||||
|
||||
@@ -127,50 +204,12 @@ async def invoke_session_agent(
|
||||
project_key = row.project_key or sha256(str(config.workspace_root.resolve()))[:16]
|
||||
await _bootstrap_session_dirs(config, project_key)
|
||||
|
||||
# Build agent. No model override at the API layer — `/model` slash is REPL-only.
|
||||
pricing = _static_pricing_seed()
|
||||
budget = make_budget_tracker_from_config(db, config)
|
||||
cost_mw = CostMiddleware(
|
||||
pricing=pricing,
|
||||
model_name=row.model or persona.model,
|
||||
interactive_session_id=session_id,
|
||||
persona_name=persona.name,
|
||||
budget_tracker=budget,
|
||||
)
|
||||
audit_mw = AuditToolMiddleware(
|
||||
interactive_session_id=session_id,
|
||||
file_recorder=make_audit_recorder(config.state_dir),
|
||||
)
|
||||
# Plan-mode is read from the session row — API users can toggle via a
|
||||
# future API endpoint; REPL toggles via /plan slash.
|
||||
is_plan = bool(row.plan_mode)
|
||||
plan_mw = PlanModeMiddleware(is_active=lambda: is_plan)
|
||||
|
||||
memory_dir = project_memory_dir(config, project_key)
|
||||
instruction_paths = resolve_instruction_paths(config, config.workspace_root)
|
||||
memory_paths = list_memory_paths(memory_dir)
|
||||
skill_sources = resolve_skill_sources(config)
|
||||
|
||||
agent = build_agent(
|
||||
persona,
|
||||
config,
|
||||
root_dir=config.workspace_root,
|
||||
middleware=[plan_mw, cost_mw, audit_mw],
|
||||
checkpointer=saver,
|
||||
memory_paths_override=[*instruction_paths, *memory_paths],
|
||||
skills_sources_override=skill_sources,
|
||||
)
|
||||
agent = _build_session_agent(db, config, persona, session_id, row, saver=saver)
|
||||
|
||||
thread_id = f"{session_id}:0"
|
||||
try:
|
||||
result = await agent.ainvoke(
|
||||
{"messages": [{"role": "user", "content": user_message}]},
|
||||
config={"configurable": {"thread_id": thread_id}},
|
||||
)
|
||||
except Exception:
|
||||
_LOG.exception("agent.ainvoke failed for session %s", session_id)
|
||||
result = await _run_ainvoke(agent, user_message, thread_id, chunk_queue, session_id)
|
||||
if result is None:
|
||||
return
|
||||
|
||||
messages = result.get("messages", []) if isinstance(result, dict) else []
|
||||
if not messages:
|
||||
return
|
||||
@@ -187,6 +226,42 @@ async def invoke_session_agent(
|
||||
await compact_session(db, config, str(session_id))
|
||||
|
||||
|
||||
async def _run_ainvoke(
|
||||
agent: Any,
|
||||
user_message: str,
|
||||
thread_id: str,
|
||||
chunk_queue: asyncio.Queue[dict[str, Any]] | None,
|
||||
session_id: UUID,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Wrapper around agent.ainvoke that emits chunk_queue lifecycle events.
|
||||
|
||||
Returns the raw result dict on success, ``None`` on any failure (logged).
|
||||
Re-raises ``CancelledError`` so the asyncio task is correctly marked
|
||||
cancelled and the route's done-callback can clean up.
|
||||
"""
|
||||
invoke_config: dict[str, Any] = {"configurable": {"thread_id": thread_id}}
|
||||
if chunk_queue is not None:
|
||||
invoke_config["callbacks"] = [_StreamingChunkPusher(chunk_queue)]
|
||||
try:
|
||||
return await agent.ainvoke( # type: ignore[no-any-return]
|
||||
{"messages": [{"role": "user", "content": user_message}]},
|
||||
config=invoke_config,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
_LOG.info("agent.ainvoke cancelled for session %s", session_id)
|
||||
if chunk_queue is not None:
|
||||
await chunk_queue.put({"type": "cancelled"})
|
||||
raise
|
||||
except Exception:
|
||||
_LOG.exception("agent.ainvoke failed for session %s", session_id)
|
||||
if chunk_queue is not None:
|
||||
await chunk_queue.put({"type": "error"})
|
||||
return None
|
||||
finally:
|
||||
if chunk_queue is not None:
|
||||
await chunk_queue.put({"type": "done"})
|
||||
|
||||
|
||||
def _resolve_persona(personas: list[Persona], persona_hash: str) -> Persona | None:
|
||||
for p in personas:
|
||||
if p.compute_hash() == persona_hash:
|
||||
|
||||
@@ -42,7 +42,9 @@ from ..models import (
|
||||
)
|
||||
|
||||
_LOG = logging.getLogger(__name__)
|
||||
_POLL_INTERVAL_S: float = 0.5
|
||||
# v0.4 B3: 100ms poll keeps token-streaming UX snappy. At idle the loop just
|
||||
# does two cheap selects — well within asyncpg + SSE budgets.
|
||||
_POLL_INTERVAL_S: float = 0.1
|
||||
_TERMINAL_STATES: frozenset[str] = frozenset({"ended"})
|
||||
|
||||
router = APIRouter()
|
||||
@@ -274,6 +276,15 @@ async def post_message(
|
||||
saver = getattr(request.app.state, "saver", None)
|
||||
from uuid import UUID
|
||||
|
||||
# v0.4 B3: per-session token chunk queue. agent_runner pushes deltas
|
||||
# via AsyncCallbackHandler; the SSE stream below drains the queue.
|
||||
chunk_queues: dict[str, asyncio.Queue[Any]] = getattr(
|
||||
request.app.state, "token_chunk_queues", {}
|
||||
)
|
||||
queue: asyncio.Queue[Any] = asyncio.Queue()
|
||||
chunk_queues[session_id] = queue
|
||||
request.app.state.token_chunk_queues = chunk_queues
|
||||
|
||||
task = asyncio.create_task(
|
||||
invoke_session_agent(
|
||||
db,
|
||||
@@ -282,26 +293,101 @@ async def post_message(
|
||||
UUID(session_id),
|
||||
body.content,
|
||||
saver=saver,
|
||||
chunk_queue=queue,
|
||||
)
|
||||
)
|
||||
pending: set[asyncio.Task[Any]] = getattr(request.app.state, "pending_invocations", set())
|
||||
pending.add(task)
|
||||
request.app.state.pending_invocations = pending
|
||||
task.add_done_callback(pending.discard)
|
||||
# v0.4 B4: index the task by session_id so a subsequent POST /abort can
|
||||
# cancel mid-flight. We deliberately overwrite an earlier task if one is
|
||||
# still in flight — the new user message implicitly cancels the previous
|
||||
# turn (Claude Code parity).
|
||||
per_session: dict[str, asyncio.Task[Any]] = getattr(
|
||||
request.app.state, "pending_per_session", {}
|
||||
)
|
||||
prev = per_session.get(session_id)
|
||||
if prev is not None and not prev.done():
|
||||
prev.cancel()
|
||||
per_session[session_id] = task
|
||||
request.app.state.pending_per_session = per_session
|
||||
|
||||
def _remove_from_session_map(_t: asyncio.Task[Any], sid: str = session_id) -> None:
|
||||
per_session.pop(sid, None)
|
||||
|
||||
task.add_done_callback(_remove_from_session_map)
|
||||
|
||||
return SessionAck(session_id=session_id, state="active", message="queued")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# POST /api/sessions/{id}/abort — cancel an in-flight turn (v0.4 B4)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@router.post("/{session_id}/abort", response_model=SessionAck)
|
||||
async def abort_turn(session_id: str, request: Request, db: DbDep) -> SessionAck:
|
||||
"""Cancel the in-flight ainvoke for this session, if any.
|
||||
|
||||
Idempotent — returns ok even when no task is in flight. The cancelled
|
||||
task's ``finally`` clauses still run, so the LangGraph checkpoint stays
|
||||
consistent. The next POST /messages reuses the same thread.
|
||||
"""
|
||||
async with db.session() as s:
|
||||
row = await s.get(InteractiveSessionRow, session_id)
|
||||
if row is None:
|
||||
raise HTTPException(status_code=404, detail=f"session {session_id} not found")
|
||||
|
||||
per_session: dict[str, asyncio.Task[Any]] = getattr(
|
||||
request.app.state, "pending_per_session", {}
|
||||
)
|
||||
task = per_session.get(session_id)
|
||||
if task is not None and not task.done():
|
||||
task.cancel()
|
||||
return SessionAck(session_id=session_id, state="active", message="aborted")
|
||||
return SessionAck(session_id=session_id, state="active", message="no-in-flight-task")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GET /api/sessions/{id}/stream — SSE
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _session_event_stream(db: Database, session_id: str, last_seq: int = 0) -> Any:
|
||||
"""Yield ServerSentEvent per new MessageRow. Closes when session ends."""
|
||||
async def _session_event_stream(
|
||||
db: Database, session_id: str, last_seq: int = 0, app: Any = None
|
||||
) -> Any:
|
||||
"""Yield ServerSentEvent per new MessageRow + token chunk. Closes on terminal.
|
||||
|
||||
Three event types emitted:
|
||||
- ``message`` (existing): one row per new MessageRow.
|
||||
- ``chunk`` (v0.4 B3): token delta from the in-flight ainvoke. Drained
|
||||
from ``app.state.token_chunk_queues[session_id]`` if present.
|
||||
- ``done`` (existing): session terminal or deleted.
|
||||
"""
|
||||
seen = last_seq
|
||||
|
||||
while True:
|
||||
# v0.4 B3: drain queued token chunks FIRST so streaming visibly
|
||||
# precedes the final `message` SSE. Without this the placeholder
|
||||
# is replaced by the persisted MessageRow before any chunk reaches
|
||||
# the browser — Claude-Code-style typing would never appear.
|
||||
if app is not None:
|
||||
queues: dict[str, asyncio.Queue[Any]] = getattr(app.state, "token_chunk_queues", {})
|
||||
queue = queues.get(session_id)
|
||||
if queue is not None:
|
||||
drained = 0
|
||||
while not queue.empty() and drained < 200:
|
||||
chunk = queue.get_nowait()
|
||||
yield ServerSentEvent(
|
||||
data=json.dumps(chunk, ensure_ascii=False),
|
||||
event="chunk",
|
||||
)
|
||||
drained += 1
|
||||
if chunk.get("type") in ("done", "cancelled", "error"):
|
||||
queues.pop(session_id, None)
|
||||
break
|
||||
|
||||
async with db.session() as s:
|
||||
message_rows = (
|
||||
(
|
||||
@@ -365,7 +451,7 @@ async def stream_session(
|
||||
row = await s.get(InteractiveSessionRow, session_id)
|
||||
if row is None:
|
||||
raise HTTPException(status_code=404, detail=f"session {session_id} not found")
|
||||
return EventSourceResponse(_session_event_stream(db, session_id, last_seq))
|
||||
return EventSourceResponse(_session_event_stream(db, session_id, last_seq, app=request.app))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -153,6 +153,9 @@ def resolve_model_instance(
|
||||
max_tokens=params.get("max_tokens", 4096),
|
||||
temperature=params.get("temperature", 0.2),
|
||||
top_p=params.get("top_p", 1.0),
|
||||
# v0.4 B3: enable token streaming so AsyncCallbackHandler.on_llm_new_token
|
||||
# receives chunks during ainvoke. Final response is unchanged.
|
||||
streaming=True,
|
||||
)
|
||||
return model_spec
|
||||
|
||||
|
||||
Reference in New Issue
Block a user