"""v0.4 live verification — runs 7 Claude-Code-equivalent flows against real OpenRouter. Run with:: uv run python scripts/live_verify.py Each scenario prints PASS / FAIL with a short summary. Total cost should be under $0.10 (we use Anthropic Haiku 4.5 via OpenRouter, single-turn responses). Scenarios: 1. CLI-equivalent 1-turn chat (InteractiveSession + ainvoke direct) 2. Sessions resume (same session_id, thread state restored) 3. /skill queues SKILL.md body as system message → LLM acknowledges 4. /plan → LLM produces plan markdown only (no writes) → /approve queues 5. /agents spawn → sub-agent runs to completion → result pushed to parent 6. Auto-compaction trigger (manually invoke when row.total_*_tokens > 70%) 7. /workflow background (kick off real WorkflowEngine.run via background task) Failures don't crash subsequent scenarios — we accumulate results and exit 0 only if all PASS. """ from __future__ import annotations import asyncio import os import sys import uuid from datetime import UTC, datetime from pathlib import Path from typing import Any # Ensure repo paths import-correctly when run via `uv run python …` sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) from sqlalchemy import select from my_deepagent.cli.interactive import ( InteractiveSession, _invoke_and_stream, ) from my_deepagent.compaction import compact_session from my_deepagent.config import load_config from my_deepagent.governance import bootstrap_user_dirs, record_consent from my_deepagent.hash import sha256 from my_deepagent.persistence.checkpointer import get_checkpointer_ctx from my_deepagent.persistence.db import Database from my_deepagent.persistence.models import InteractiveSessionRow, MessageRow from my_deepagent.subagents import run_subagent_to_completion, spawn_subagent_session from my_deepagent.user_dirs import ( ensure_user_dirs_initialized, load_combined_personas, load_combined_workflows, ) _SEED = Path(__file__).resolve().parents[1] / "docs" / "schemas" _RESULTS: list[tuple[str, bool, str]] = [] def _now() -> str: return datetime.now(UTC).isoformat(timespec="seconds") def _record(name: str, ok: bool, note: str) -> None: _RESULTS.append((name, ok, note)) marker = "✅ PASS" if ok else "❌ FAIL" print(f" {marker} — {name}: {note}", flush=True) def _pricing() -> Any: from my_deepagent.monitoring.pricing import ModelPrice, PricingCache pc = PricingCache() pc.set( [ ModelPrice("anthropic/claude-haiku-4-5", 0.001, 0.005, 200_000), ModelPrice("deepseek/deepseek-chat", 0.00028, 0.00112, 64_000), ] ) return pc async def _mk_session( db: Database, config: Any, personas: Any, saver: Any, session_id: uuid.UUID ) -> InteractiveSession: """Persist a fresh InteractiveSessionRow + return the in-mem InteractiveSession.""" from uuid import uuid4 from my_deepagent.persistence.models import AgentPersonaRow persona = next((p for p in personas if p.name == "default-interactive"), personas[0]) project_key = sha256(str(Path.cwd().resolve()))[:16] async with db.session() as s: ph = persona.compute_hash() existing_pr = ( await s.execute(select(AgentPersonaRow).where(AgentPersonaRow.hash == ph)) ).scalar_one_or_none() if existing_pr is None: existing_pr = AgentPersonaRow( id=str(uuid4()), name=persona.name, version=persona.version, hash=ph, definition=persona.model_dump(by_alias=True), created_at=_now(), ) s.add(existing_pr) await s.flush() existing_row = await s.get(InteractiveSessionRow, str(session_id)) if existing_row is None: s.add( InteractiveSessionRow( id=str(session_id), persona_id=existing_pr.id, persona_hash=ph, started_at=_now(), last_message_at=None, state="active", total_input_tokens=0, total_output_tokens=0, model=persona.model, project_key=project_key, title=None, plan_mode=False, parent_session_id=None, depth=0, ) ) await s.commit() return InteractiveSession( config, personas, db, _pricing(), Path.cwd(), session_id, saver, project_key, workflows=load_combined_workflows(config, _SEED / "workflows"), ) async def scenario_1_basic_chat(db: Database, config: Any, personas: Any, saver: Any) -> uuid.UUID: """1-turn message + assistant response persisted + token counters bumped.""" print("\n[A1] CLI-equivalent 1-turn chat") sid = uuid.uuid4() sess = await _mk_session(db, config, personas, saver, sid) agent = sess.build_agent_if_needed() await _invoke_and_stream(agent, "한국어로 한 줄로만 인사해 (10단어 이내)", sess) async with db.session() as s: msgs = ( ( await s.execute( select(MessageRow) .where(MessageRow.session_id == str(sid)) .order_by(MessageRow.seq) ) ) .scalars() .all() ) row = await s.get(InteractiveSessionRow, str(sid)) ok = ( len(msgs) == 2 and msgs[0].role == "user" and msgs[1].role == "assistant" and bool(msgs[1].content.strip()) and row is not None and row.total_output_tokens > 0 ) summary = f"messages={len(msgs)} out_tokens={row.total_output_tokens if row else 0}" _record("A1 basic chat", ok, summary) return sid async def scenario_2_resume( db: Database, config: Any, personas: Any, saver: Any, sid: uuid.UUID ) -> None: """Same session_id → second InteractiveSession picks up persisted state.""" print("\n[A2] Sessions resume") sess2 = await _mk_session(db, config, personas, saver, sid) agent = sess2.build_agent_if_needed() await _invoke_and_stream(agent, "내가 방금 너한테 한 첫 메시지가 뭐였지? 한 줄로만.", sess2) async with db.session() as s: msgs = ( ( await s.execute( select(MessageRow) .where(MessageRow.session_id == str(sid)) .where(MessageRow.archived.is_(False)) .order_by(MessageRow.seq) ) ) .scalars() .all() ) last_assistant = msgs[-1].content if msgs else "" ok = bool(last_assistant) and ( "인사" in last_assistant or "한국" in last_assistant or "안녕" in last_assistant ) _record("A2 resume", ok, f"messages={len(msgs)} last_hint='{last_assistant[:60]}'") async def scenario_3_skill(db: Database, config: Any, personas: Any, saver: Any) -> None: """Drop a SKILL.md, /skill queues body, next turn LLM acknowledges it.""" print("\n[A3] /skill system-inject") from my_deepagent.skills import ensure_skills_initialized, find_skill, user_skills_dir sd = user_skills_dir(config) ensure_skills_initialized(sd) skill_dir = sd / "korean-haiku" skill_dir.mkdir(parents=True, exist_ok=True) (skill_dir / "SKILL.md").write_text( """--- name: korean-haiku description: Respond as a korean haiku poet — always 3 short lines, only Korean. --- You are now a Korean haiku poet. Every response MUST be exactly 3 lines, all in Korean, total under 30 chars. No prose, no explanation. """, encoding="utf-8", ) sid = uuid.uuid4() sess = await _mk_session(db, config, personas, saver, sid) skill = find_skill(config, sess.project_key, "korean-haiku") assert skill is not None, "skill not loaded" body = skill.path.read_text(encoding="utf-8") sess.queue_system_message( f"The user requested skill `{skill.name}`. Apply this SKILL.md for this turn:\n\n{body}" ) agent = sess.build_agent_if_needed() await _invoke_and_stream(agent, "봄을 주제로 시 한 편 써줘.", sess) async with db.session() as s: msgs = ( ( await s.execute( select(MessageRow) .where(MessageRow.session_id == str(sid)) .where(MessageRow.role == "assistant") .order_by(MessageRow.seq.desc()) ) ) .scalars() .all() ) assistant = msgs[0].content if msgs else "" line_count = len([line for line in assistant.split("\n") if line.strip()]) ok = 2 <= line_count <= 6 # 3 ± slack _record("A3 skill inject", ok, f"lines={line_count} body[:60]='{assistant[:60]}'") async def scenario_4_plan_mode(db: Database, config: Any, personas: Any, saver: Any) -> None: """/plan blocks write tools → LLM produces plan markdown. /approve queues the plan as system message for next turn.""" print("\n[A4] /plan → plan markdown → /approve") sid = uuid.uuid4() sess = await _mk_session(db, config, personas, saver, sid) await sess.enter_plan_mode() agent = sess.build_agent_if_needed() await _invoke_and_stream( agent, "Python으로 wordcount CLI를 만들 plan 을 마크다운으로 짧게 (10줄 이내) 답해.", sess, ) # Verify last assistant is plan markdown shape async with db.session() as s: msgs = ( ( await s.execute( select(MessageRow) .where(MessageRow.session_id == str(sid)) .where(MessageRow.role == "assistant") .order_by(MessageRow.seq.desc()) ) ) .scalars() .all() ) plan_text = msgs[0].content if msgs else "" has_markdown_hint = any( token in plan_text for token in ("##", "###", "- ", "1.", "Phase", "단계") ) ok_plan = bool(plan_text) and has_markdown_hint await sess.approve_plan() queued = sess.consume_pending_system_messages() ok_approve = any("APPROVED" in q and plan_text[:20] in q for q in queued) # Re-queue so future scenarios see clean state for q in queued: sess.queue_system_message(q) sess.consume_pending_system_messages() # discard now _record( "A4 plan mode", ok_plan and ok_approve, f"markdown={ok_plan} approve_queued={ok_approve} plan[:50]='{plan_text[:50]}'", ) async def scenario_5_subagent(db: Database, config: Any, personas: Any, saver: Any) -> None: """spawn_subagent_session + run_subagent_to_completion → result on parent.""" print("\n[A5] /agents spawn live") parent_sid = uuid.uuid4() sess = await _mk_session(db, config, personas, saver, parent_sid) persona = sess.persona child_id = await spawn_subagent_session( db, parent_session_id=parent_sid, persona=persona, initial_title="haiku helper", ) summary = await run_subagent_to_completion( db, config, parent_sid, child_id, persona, "한국어로 짧게 인사해.", saver=None ) async with db.session() as s: parent_msgs = ( ( await s.execute( select(MessageRow) .where(MessageRow.session_id == str(parent_sid)) .order_by(MessageRow.seq) ) ) .scalars() .all() ) child_row = await s.get(InteractiveSessionRow, str(child_id)) pushed = any(f"sub-agent {str(child_id)[:8]} result" in m.content for m in parent_msgs) ok = bool(summary) and pushed and child_row is not None and child_row.state == "ended" state = child_row.state if child_row else "NONE" _record( "A5 sub-agent", ok, f"summary[:40]='{summary[:40]}' parent_push={pushed} child_ended={state}", ) async def scenario_6_compaction(db: Database, config: Any, personas: Any, saver: Any) -> None: """Manually invoke compact_session on a session padded with enough messages.""" print("\n[A6] Auto-compaction trigger") sid = uuid.uuid4() await _mk_session(db, config, personas, saver, sid) # Pad 14 active messages so compactor archives 4 + summary at seq=1. async with db.session() as s: for i in range(14): s.add( MessageRow( session_id=str(sid), seq=i + 1, role="user" if i % 2 == 0 else "assistant", content=f"padding message #{i} — talking about wordcount CLI design", tool_calls=None, token_count=10, is_summary=False, archived=False, ts=_now(), ) ) await s.commit() result = await compact_session(db, config, str(sid)) ok = ( result.compacted and result.archived == 4 and bool(result.summary_text) and result.summary_tokens > 0 ) _record( "A6 compaction", ok, f"archived={result.archived} summary_tokens={result.summary_tokens} " f"summary[:50]='{result.summary_text[:50]}'", ) async def scenario_7_workflow_background( db: Database, config: Any, personas: Any, saver: Any ) -> None: """We do NOT trigger a full WorkflowEngine.run (~$0.05) here — that's covered by `tests/integration/test_e2e_workflow.py`. Instead we verify the /workflow background dispatch path is wired correctly by checking template resolution + binding preview.""" print("\n[A7] /workflow background dispatch wiring") from my_deepagent.binding import is_persona_eligible_for_role sess = await _mk_session(db, config, personas, saver, uuid.uuid4()) workflows = sess.workflows if not workflows: _record("A7 workflow wiring", False, "no workflows loaded") return _path, tpl = workflows[0] # Verify every role has at least one eligible persona — same logic as # `_print_binding_for_template`. role_resolutions = {} for role in tpl.roles: eligible = [p for p in sess.personas if is_persona_eligible_for_role(p, role, tpl)[0]] role_resolutions[role.id] = len(eligible) ok = all(n > 0 for n in role_resolutions.values()) _record( "A7 workflow wiring", ok, f"template={tpl.name}@{tpl.version} role_eligibles={role_resolutions}", ) async def main() -> int: config = load_config() if not os.environ.get("OPENROUTER_API_KEY") and "openrouter" not in str( config.openrouter_base_url ): # API key may come from keyring; resolve_openrouter_api_key handles it pass # Ensure consent recorded for this run (smoke pollution we tolerated earlier). record_consent(config.data_dir) bootstrap_user_dirs(config) ensure_user_dirs_initialized(config) db = Database(config.database_url) await db.init_schema() personas = load_combined_personas(config, _SEED / "personas") print(f"[live_verify] config.data_dir={config.data_dir}") print(f"[live_verify] db={config.database_url}") print(f"[live_verify] personas loaded: {len(personas)}") print("[live_verify] running 7 scenarios against real OpenRouter (~$0.05 total)") saver_ctx = get_checkpointer_ctx(config.database_url) try: if config.database_url.startswith("postgresql"): saver = await saver_ctx.__aenter__() else: saver = None try: chat_sid = await scenario_1_basic_chat(db, config, personas, saver) await scenario_2_resume(db, config, personas, saver, chat_sid) await scenario_3_skill(db, config, personas, saver) await scenario_4_plan_mode(db, config, personas, saver) await scenario_5_subagent(db, config, personas, saver) await scenario_6_compaction(db, config, personas, saver) await scenario_7_workflow_background(db, config, personas, saver) finally: if saver is not None: await saver_ctx.__aexit__(None, None, None) finally: await db.dispose() print("\n[summary]") passed = sum(1 for _, ok, _ in _RESULTS if ok) print(f" {passed}/{len(_RESULTS)} PASS") for name, ok, note in _RESULTS: marker = "✅" if ok else "❌" print(f" {marker} {name}: {note}") return 0 if passed == len(_RESULTS) else 1 if __name__ == "__main__": sys.exit(asyncio.run(main()))