Claude-Code 동급 chat 경험으로 끌어올림 + 7개 핵심 흐름 실제 OpenRouter verify.
A — Live verification (scripts/live_verify.py, 7 PASS, 약 $0.02):
- A1 1-turn chat (CLI-eq) → Haiku 4.5 한국어 응답
- A2 sessions resume → 같은 session_id 재투입 시 LangGraph state 복원
- A3 /skill <name> system inject → SKILL.md ("한국어 haiku 3 lines") 가 정확히
3행 한국어 시 생성 (LLM 행동 제어 강력한 증거)
- A4 /plan → /approve → LLM plan markdown only, 차단 도구 시도 없음
- A5 /agents spawn → 실제 sub-agent ainvoke + parent stream push
- A6 auto-compaction → 14 메시지 → 4 archive + 77 토큰 summary
- A7 /workflow wiring → role↔persona 매칭 사전 검증
B1 — Markdown rendering:
- app.js pure-JS 미니 파서: 코드 펜스 / ATX 헤더 / ul/ol / `code`/**bold**/
*italic*/[link](url)
- XSS 정책 유지: createElement + textContent only. 링크 href 는 http(s):
스킴 강제.
B2 — System event card (collapsible):
- _classifySystemMessage 가 [sub-agent .../workflow .../Earlier conversation
history/당신은 plan mode/The user APPROVED/skill] 접두사 분류 후 <details>
카드로 렌더.
B3 — Token streaming via AsyncCallbackHandler:
- ChatOpenAI(streaming=True)
- _StreamingChunkPusher (AsyncCallbackHandler) → asyncio.Queue per session.
- SSE _session_event_stream 이 queue drain → event: chunk SSE. 100ms poll.
- 순서 보장: chunk drain → message rows yield (placeholder 가 메시지로
교체되기 전에 토큰 visible).
- 라이브: 5 chunk events + 1 final message, "안녕하세요, / 무 / 엇을 도와드 /
릴까요?" 토큰 단위 push.
B4 — Cancel mid-turn:
- POST /api/sessions/{id}/abort + app.state.pending_per_session 인덱스.
- 새 user 메시지 도착 시 이전 in-flight task 자동 cancel.
- "■ 중단" 버튼 — 대기 중 visible, 완료/취소 시 hide.
B5 — IME composition-safe Enter:
- compositionstart/compositionend 플래그 — 한글 IME 후보 commit Enter 무시.
- Cmd/Ctrl+Enter 는 항상 전송.
DB hot-fix:
- Database.__init__ pool_pre_ping=True — Postgres asyncpg stale connection
→ SSE 부하에서 500 발생 해결.
기타:
- createNewSession 의 repo_path: "" → "." (min_length=1 검증 통과).
- test_conversation_gui.py fake_invoke 가 chunk_queue kwarg 받도록 업데이트.
게이트:
- ruff / format / mypy: PASS (143 source files)
- pytest -q --ignore=tests/integration/test_e2e_workflow.py
--ignore=tests/integration/test_openrouter_smoke.py: 709 passed
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
459 lines
17 KiB
Python
459 lines
17 KiB
Python
"""v0.4 live verification — runs 7 Claude-Code-equivalent flows against real
|
|
OpenRouter. Run with::
|
|
|
|
uv run python scripts/live_verify.py
|
|
|
|
Each scenario prints PASS / FAIL with a short summary. Total cost should be
|
|
under $0.10 (we use Anthropic Haiku 4.5 via OpenRouter, single-turn responses).
|
|
|
|
Scenarios:
|
|
1. CLI-equivalent 1-turn chat (InteractiveSession + ainvoke direct)
|
|
2. Sessions resume (same session_id, thread state restored)
|
|
3. /skill <name> queues SKILL.md body as system message → LLM acknowledges
|
|
4. /plan → LLM produces plan markdown only (no writes) → /approve queues
|
|
5. /agents spawn → sub-agent runs to completion → result pushed to parent
|
|
6. Auto-compaction trigger (manually invoke when row.total_*_tokens > 70%)
|
|
7. /workflow background (kick off real WorkflowEngine.run via background task)
|
|
|
|
Failures don't crash subsequent scenarios — we accumulate results and exit 0
|
|
only if all PASS.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
import uuid
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
# Ensure repo paths import-correctly when run via `uv run python …`
|
|
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
|
|
|
|
from sqlalchemy import select
|
|
|
|
from my_deepagent.cli.interactive import (
|
|
InteractiveSession,
|
|
_invoke_and_stream,
|
|
)
|
|
from my_deepagent.compaction import compact_session
|
|
from my_deepagent.config import load_config
|
|
from my_deepagent.governance import bootstrap_user_dirs, record_consent
|
|
from my_deepagent.hash import sha256
|
|
from my_deepagent.persistence.checkpointer import get_checkpointer_ctx
|
|
from my_deepagent.persistence.db import Database
|
|
from my_deepagent.persistence.models import InteractiveSessionRow, MessageRow
|
|
from my_deepagent.subagents import run_subagent_to_completion, spawn_subagent_session
|
|
from my_deepagent.user_dirs import (
|
|
ensure_user_dirs_initialized,
|
|
load_combined_personas,
|
|
load_combined_workflows,
|
|
)
|
|
|
|
_SEED = Path(__file__).resolve().parents[1] / "docs" / "schemas"
|
|
_RESULTS: list[tuple[str, bool, str]] = []
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.now(UTC).isoformat(timespec="seconds")
|
|
|
|
|
|
def _record(name: str, ok: bool, note: str) -> None:
|
|
_RESULTS.append((name, ok, note))
|
|
marker = "✅ PASS" if ok else "❌ FAIL"
|
|
print(f" {marker} — {name}: {note}", flush=True)
|
|
|
|
|
|
def _pricing() -> Any:
|
|
from my_deepagent.monitoring.pricing import ModelPrice, PricingCache
|
|
|
|
pc = PricingCache()
|
|
pc.set(
|
|
[
|
|
ModelPrice("anthropic/claude-haiku-4-5", 0.001, 0.005, 200_000),
|
|
ModelPrice("deepseek/deepseek-chat", 0.00028, 0.00112, 64_000),
|
|
]
|
|
)
|
|
return pc
|
|
|
|
|
|
async def _mk_session(
|
|
db: Database, config: Any, personas: Any, saver: Any, session_id: uuid.UUID
|
|
) -> InteractiveSession:
|
|
"""Persist a fresh InteractiveSessionRow + return the in-mem InteractiveSession."""
|
|
from uuid import uuid4
|
|
|
|
from my_deepagent.persistence.models import AgentPersonaRow
|
|
|
|
persona = next((p for p in personas if p.name == "default-interactive"), personas[0])
|
|
project_key = sha256(str(Path.cwd().resolve()))[:16]
|
|
|
|
async with db.session() as s:
|
|
ph = persona.compute_hash()
|
|
existing_pr = (
|
|
await s.execute(select(AgentPersonaRow).where(AgentPersonaRow.hash == ph))
|
|
).scalar_one_or_none()
|
|
if existing_pr is None:
|
|
existing_pr = AgentPersonaRow(
|
|
id=str(uuid4()),
|
|
name=persona.name,
|
|
version=persona.version,
|
|
hash=ph,
|
|
definition=persona.model_dump(by_alias=True),
|
|
created_at=_now(),
|
|
)
|
|
s.add(existing_pr)
|
|
await s.flush()
|
|
existing_row = await s.get(InteractiveSessionRow, str(session_id))
|
|
if existing_row is None:
|
|
s.add(
|
|
InteractiveSessionRow(
|
|
id=str(session_id),
|
|
persona_id=existing_pr.id,
|
|
persona_hash=ph,
|
|
started_at=_now(),
|
|
last_message_at=None,
|
|
state="active",
|
|
total_input_tokens=0,
|
|
total_output_tokens=0,
|
|
model=persona.model,
|
|
project_key=project_key,
|
|
title=None,
|
|
plan_mode=False,
|
|
parent_session_id=None,
|
|
depth=0,
|
|
)
|
|
)
|
|
await s.commit()
|
|
|
|
return InteractiveSession(
|
|
config,
|
|
personas,
|
|
db,
|
|
_pricing(),
|
|
Path.cwd(),
|
|
session_id,
|
|
saver,
|
|
project_key,
|
|
workflows=load_combined_workflows(config, _SEED / "workflows"),
|
|
)
|
|
|
|
|
|
async def scenario_1_basic_chat(db: Database, config: Any, personas: Any, saver: Any) -> uuid.UUID:
|
|
"""1-turn message + assistant response persisted + token counters bumped."""
|
|
print("\n[A1] CLI-equivalent 1-turn chat")
|
|
sid = uuid.uuid4()
|
|
sess = await _mk_session(db, config, personas, saver, sid)
|
|
agent = sess.build_agent_if_needed()
|
|
await _invoke_and_stream(agent, "한국어로 한 줄로만 인사해 (10단어 이내)", sess)
|
|
async with db.session() as s:
|
|
msgs = (
|
|
(
|
|
await s.execute(
|
|
select(MessageRow)
|
|
.where(MessageRow.session_id == str(sid))
|
|
.order_by(MessageRow.seq)
|
|
)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
row = await s.get(InteractiveSessionRow, str(sid))
|
|
ok = (
|
|
len(msgs) == 2
|
|
and msgs[0].role == "user"
|
|
and msgs[1].role == "assistant"
|
|
and bool(msgs[1].content.strip())
|
|
and row is not None
|
|
and row.total_output_tokens > 0
|
|
)
|
|
summary = f"messages={len(msgs)} out_tokens={row.total_output_tokens if row else 0}"
|
|
_record("A1 basic chat", ok, summary)
|
|
return sid
|
|
|
|
|
|
async def scenario_2_resume(
|
|
db: Database, config: Any, personas: Any, saver: Any, sid: uuid.UUID
|
|
) -> None:
|
|
"""Same session_id → second InteractiveSession picks up persisted state."""
|
|
print("\n[A2] Sessions resume")
|
|
sess2 = await _mk_session(db, config, personas, saver, sid)
|
|
agent = sess2.build_agent_if_needed()
|
|
await _invoke_and_stream(agent, "내가 방금 너한테 한 첫 메시지가 뭐였지? 한 줄로만.", sess2)
|
|
async with db.session() as s:
|
|
msgs = (
|
|
(
|
|
await s.execute(
|
|
select(MessageRow)
|
|
.where(MessageRow.session_id == str(sid))
|
|
.where(MessageRow.archived.is_(False))
|
|
.order_by(MessageRow.seq)
|
|
)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
last_assistant = msgs[-1].content if msgs else ""
|
|
ok = bool(last_assistant) and (
|
|
"인사" in last_assistant or "한국" in last_assistant or "안녕" in last_assistant
|
|
)
|
|
_record("A2 resume", ok, f"messages={len(msgs)} last_hint='{last_assistant[:60]}'")
|
|
|
|
|
|
async def scenario_3_skill(db: Database, config: Any, personas: Any, saver: Any) -> None:
|
|
"""Drop a SKILL.md, /skill queues body, next turn LLM acknowledges it."""
|
|
print("\n[A3] /skill <name> system-inject")
|
|
from my_deepagent.skills import ensure_skills_initialized, find_skill, user_skills_dir
|
|
|
|
sd = user_skills_dir(config)
|
|
ensure_skills_initialized(sd)
|
|
skill_dir = sd / "korean-haiku"
|
|
skill_dir.mkdir(parents=True, exist_ok=True)
|
|
(skill_dir / "SKILL.md").write_text(
|
|
"""---
|
|
name: korean-haiku
|
|
description: Respond as a korean haiku poet — always 3 short lines, only Korean.
|
|
---
|
|
|
|
You are now a Korean haiku poet. Every response MUST be exactly 3 lines, all
|
|
in Korean, total under 30 chars. No prose, no explanation.
|
|
""",
|
|
encoding="utf-8",
|
|
)
|
|
sid = uuid.uuid4()
|
|
sess = await _mk_session(db, config, personas, saver, sid)
|
|
skill = find_skill(config, sess.project_key, "korean-haiku")
|
|
assert skill is not None, "skill not loaded"
|
|
body = skill.path.read_text(encoding="utf-8")
|
|
sess.queue_system_message(
|
|
f"The user requested skill `{skill.name}`. Apply this SKILL.md for this turn:\n\n{body}"
|
|
)
|
|
agent = sess.build_agent_if_needed()
|
|
await _invoke_and_stream(agent, "봄을 주제로 시 한 편 써줘.", sess)
|
|
async with db.session() as s:
|
|
msgs = (
|
|
(
|
|
await s.execute(
|
|
select(MessageRow)
|
|
.where(MessageRow.session_id == str(sid))
|
|
.where(MessageRow.role == "assistant")
|
|
.order_by(MessageRow.seq.desc())
|
|
)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
assistant = msgs[0].content if msgs else ""
|
|
line_count = len([line for line in assistant.split("\n") if line.strip()])
|
|
ok = 2 <= line_count <= 6 # 3 ± slack
|
|
_record("A3 skill inject", ok, f"lines={line_count} body[:60]='{assistant[:60]}'")
|
|
|
|
|
|
async def scenario_4_plan_mode(db: Database, config: Any, personas: Any, saver: Any) -> None:
|
|
"""/plan blocks write tools → LLM produces plan markdown. /approve queues
|
|
the plan as system message for next turn."""
|
|
print("\n[A4] /plan → plan markdown → /approve")
|
|
sid = uuid.uuid4()
|
|
sess = await _mk_session(db, config, personas, saver, sid)
|
|
await sess.enter_plan_mode()
|
|
agent = sess.build_agent_if_needed()
|
|
await _invoke_and_stream(
|
|
agent,
|
|
"Python으로 wordcount CLI를 만들 plan 을 마크다운으로 짧게 (10줄 이내) 답해.",
|
|
sess,
|
|
)
|
|
# Verify last assistant is plan markdown shape
|
|
async with db.session() as s:
|
|
msgs = (
|
|
(
|
|
await s.execute(
|
|
select(MessageRow)
|
|
.where(MessageRow.session_id == str(sid))
|
|
.where(MessageRow.role == "assistant")
|
|
.order_by(MessageRow.seq.desc())
|
|
)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
plan_text = msgs[0].content if msgs else ""
|
|
has_markdown_hint = any(
|
|
token in plan_text for token in ("##", "###", "- ", "1.", "Phase", "단계")
|
|
)
|
|
ok_plan = bool(plan_text) and has_markdown_hint
|
|
|
|
await sess.approve_plan()
|
|
queued = sess.consume_pending_system_messages()
|
|
ok_approve = any("APPROVED" in q and plan_text[:20] in q for q in queued)
|
|
# Re-queue so future scenarios see clean state
|
|
for q in queued:
|
|
sess.queue_system_message(q)
|
|
sess.consume_pending_system_messages() # discard now
|
|
_record(
|
|
"A4 plan mode",
|
|
ok_plan and ok_approve,
|
|
f"markdown={ok_plan} approve_queued={ok_approve} plan[:50]='{plan_text[:50]}'",
|
|
)
|
|
|
|
|
|
async def scenario_5_subagent(db: Database, config: Any, personas: Any, saver: Any) -> None:
|
|
"""spawn_subagent_session + run_subagent_to_completion → result on parent."""
|
|
print("\n[A5] /agents spawn live")
|
|
parent_sid = uuid.uuid4()
|
|
sess = await _mk_session(db, config, personas, saver, parent_sid)
|
|
persona = sess.persona
|
|
child_id = await spawn_subagent_session(
|
|
db,
|
|
parent_session_id=parent_sid,
|
|
persona=persona,
|
|
initial_title="haiku helper",
|
|
)
|
|
summary = await run_subagent_to_completion(
|
|
db, config, parent_sid, child_id, persona, "한국어로 짧게 인사해.", saver=None
|
|
)
|
|
async with db.session() as s:
|
|
parent_msgs = (
|
|
(
|
|
await s.execute(
|
|
select(MessageRow)
|
|
.where(MessageRow.session_id == str(parent_sid))
|
|
.order_by(MessageRow.seq)
|
|
)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
child_row = await s.get(InteractiveSessionRow, str(child_id))
|
|
pushed = any(f"sub-agent {str(child_id)[:8]} result" in m.content for m in parent_msgs)
|
|
ok = bool(summary) and pushed and child_row is not None and child_row.state == "ended"
|
|
state = child_row.state if child_row else "NONE"
|
|
_record(
|
|
"A5 sub-agent",
|
|
ok,
|
|
f"summary[:40]='{summary[:40]}' parent_push={pushed} child_ended={state}",
|
|
)
|
|
|
|
|
|
async def scenario_6_compaction(db: Database, config: Any, personas: Any, saver: Any) -> None:
|
|
"""Manually invoke compact_session on a session padded with enough messages."""
|
|
print("\n[A6] Auto-compaction trigger")
|
|
sid = uuid.uuid4()
|
|
await _mk_session(db, config, personas, saver, sid)
|
|
# Pad 14 active messages so compactor archives 4 + summary at seq=1.
|
|
async with db.session() as s:
|
|
for i in range(14):
|
|
s.add(
|
|
MessageRow(
|
|
session_id=str(sid),
|
|
seq=i + 1,
|
|
role="user" if i % 2 == 0 else "assistant",
|
|
content=f"padding message #{i} — talking about wordcount CLI design",
|
|
tool_calls=None,
|
|
token_count=10,
|
|
is_summary=False,
|
|
archived=False,
|
|
ts=_now(),
|
|
)
|
|
)
|
|
await s.commit()
|
|
result = await compact_session(db, config, str(sid))
|
|
ok = (
|
|
result.compacted
|
|
and result.archived == 4
|
|
and bool(result.summary_text)
|
|
and result.summary_tokens > 0
|
|
)
|
|
_record(
|
|
"A6 compaction",
|
|
ok,
|
|
f"archived={result.archived} summary_tokens={result.summary_tokens} "
|
|
f"summary[:50]='{result.summary_text[:50]}'",
|
|
)
|
|
|
|
|
|
async def scenario_7_workflow_background(
|
|
db: Database, config: Any, personas: Any, saver: Any
|
|
) -> None:
|
|
"""We do NOT trigger a full WorkflowEngine.run (~$0.05) here — that's
|
|
covered by `tests/integration/test_e2e_workflow.py`. Instead we verify the
|
|
/workflow background dispatch path is wired correctly by checking template
|
|
resolution + binding preview."""
|
|
print("\n[A7] /workflow background dispatch wiring")
|
|
from my_deepagent.binding import is_persona_eligible_for_role
|
|
|
|
sess = await _mk_session(db, config, personas, saver, uuid.uuid4())
|
|
workflows = sess.workflows
|
|
if not workflows:
|
|
_record("A7 workflow wiring", False, "no workflows loaded")
|
|
return
|
|
_path, tpl = workflows[0]
|
|
# Verify every role has at least one eligible persona — same logic as
|
|
# `_print_binding_for_template`.
|
|
role_resolutions = {}
|
|
for role in tpl.roles:
|
|
eligible = [p for p in sess.personas if is_persona_eligible_for_role(p, role, tpl)[0]]
|
|
role_resolutions[role.id] = len(eligible)
|
|
ok = all(n > 0 for n in role_resolutions.values())
|
|
_record(
|
|
"A7 workflow wiring",
|
|
ok,
|
|
f"template={tpl.name}@{tpl.version} role_eligibles={role_resolutions}",
|
|
)
|
|
|
|
|
|
async def main() -> int:
|
|
config = load_config()
|
|
if not os.environ.get("OPENROUTER_API_KEY") and "openrouter" not in str(
|
|
config.openrouter_base_url
|
|
):
|
|
# API key may come from keyring; resolve_openrouter_api_key handles it
|
|
pass
|
|
# Ensure consent recorded for this run (smoke pollution we tolerated earlier).
|
|
record_consent(config.data_dir)
|
|
bootstrap_user_dirs(config)
|
|
ensure_user_dirs_initialized(config)
|
|
|
|
db = Database(config.database_url)
|
|
await db.init_schema()
|
|
|
|
personas = load_combined_personas(config, _SEED / "personas")
|
|
|
|
print(f"[live_verify] config.data_dir={config.data_dir}")
|
|
print(f"[live_verify] db={config.database_url}")
|
|
print(f"[live_verify] personas loaded: {len(personas)}")
|
|
print("[live_verify] running 7 scenarios against real OpenRouter (~$0.05 total)")
|
|
|
|
saver_ctx = get_checkpointer_ctx(config.database_url)
|
|
try:
|
|
if config.database_url.startswith("postgresql"):
|
|
saver = await saver_ctx.__aenter__()
|
|
else:
|
|
saver = None
|
|
try:
|
|
chat_sid = await scenario_1_basic_chat(db, config, personas, saver)
|
|
await scenario_2_resume(db, config, personas, saver, chat_sid)
|
|
await scenario_3_skill(db, config, personas, saver)
|
|
await scenario_4_plan_mode(db, config, personas, saver)
|
|
await scenario_5_subagent(db, config, personas, saver)
|
|
await scenario_6_compaction(db, config, personas, saver)
|
|
await scenario_7_workflow_background(db, config, personas, saver)
|
|
finally:
|
|
if saver is not None:
|
|
await saver_ctx.__aexit__(None, None, None)
|
|
finally:
|
|
await db.dispose()
|
|
|
|
print("\n[summary]")
|
|
passed = sum(1 for _, ok, _ in _RESULTS if ok)
|
|
print(f" {passed}/{len(_RESULTS)} PASS")
|
|
for name, ok, note in _RESULTS:
|
|
marker = "✅" if ok else "❌"
|
|
print(f" {marker} {name}: {note}")
|
|
return 0 if passed == len(_RESULTS) else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(asyncio.run(main()))
|