"""Shared helpers for verify_v04 scripts. - session_factory: persist a fresh InteractiveSessionRow + return an InteractiveSession ready for ``_invoke_and_stream``. - result accumulator: every script appends ``(id, ok, note)`` to a shared JSON file under ``scripts/verify_v04/results/.json`` and the orchestrator stitches them into ``verify_report_v04.md``. """ from __future__ import annotations import json import sys import uuid from datetime import UTC, datetime from pathlib import Path from typing import Any # Ensure the repo's src/ is importable. _REPO = Path(__file__).resolve().parents[2] sys.path.insert(0, str(_REPO / "src")) _RESULTS_DIR = _REPO / "scripts" / "verify_v04" / "results" _RESULTS_DIR.mkdir(parents=True, exist_ok=True) def _now() -> str: return datetime.now(UTC).isoformat(timespec="seconds") def record(scenario_id: str, ok: bool, note: str, **extras: Any) -> None: """Persist a single scenario outcome as JSON. Idempotent — overwrites.""" payload: dict[str, Any] = { "id": scenario_id, "ok": ok, "note": note, "ts": _now(), **extras, } target = _RESULTS_DIR / f"{scenario_id}.json" target.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") marker = "✅" if ok else "❌" print(f" {marker} {scenario_id}: {note}", flush=True) def load_results() -> list[dict[str, Any]]: """Return all saved results sorted by id.""" rows: list[dict[str, Any]] = [] for p in sorted(_RESULTS_DIR.glob("*.json")): try: rows.append(json.loads(p.read_text(encoding="utf-8"))) except Exception: continue return rows def repo_root() -> Path: return _REPO # --------------------------------------------------------------------------- # Session factory — shared by verify_c / verify_m / verify_q etc. # --------------------------------------------------------------------------- async def mk_session( db: Any, config: Any, personas: Any, saver: Any, session_id: uuid.UUID, persona_name: str = "default-interactive", ) -> Any: """Persist a session row + return an InteractiveSession instance.""" from sqlalchemy import select from my_deepagent.cli.interactive import InteractiveSession from my_deepagent.hash import sha256 from my_deepagent.persistence.models import AgentPersonaRow, InteractiveSessionRow from my_deepagent.user_dirs import load_combined_workflows persona = next((p for p in personas if p.name == persona_name), None) if persona is None: raise RuntimeError(f"persona {persona_name!r} not loaded") project_key = sha256(str(Path.cwd().resolve()))[:16] async with db.session() as s: ph = persona.compute_hash() existing = ( await s.execute(select(AgentPersonaRow).where(AgentPersonaRow.hash == ph)) ).scalar_one_or_none() if existing is None: existing = AgentPersonaRow( id=str(uuid.uuid4()), name=persona.name, version=persona.version, hash=ph, definition=persona.model_dump(by_alias=True), created_at=_now(), ) s.add(existing) await s.flush() existing_row = await s.get(InteractiveSessionRow, str(session_id)) if existing_row is None: s.add( InteractiveSessionRow( id=str(session_id), persona_id=existing.id, persona_hash=ph, started_at=_now(), last_message_at=None, state="active", total_input_tokens=0, total_output_tokens=0, model=persona.model, project_key=project_key, title=None, plan_mode=False, parent_session_id=None, depth=0, ) ) await s.commit() from my_deepagent.monitoring.pricing import ModelPrice, PricingCache pricing = PricingCache() pricing.set( [ ModelPrice("anthropic/claude-sonnet-4-6", 0.003, 0.015, 200_000), ModelPrice("anthropic/claude-haiku-4-5", 0.001, 0.005, 200_000), ModelPrice("anthropic/claude-opus-4-1", 0.015, 0.075, 200_000), ModelPrice("deepseek/deepseek-chat", 0.00028, 0.00112, 64_000), ] ) return InteractiveSession( config, personas, db, pricing, Path.cwd(), session_id, saver, project_key, workflows=load_combined_workflows(config, _REPO / "docs" / "schemas" / "workflows"), ) async def last_assistant_text(db: Any, session_id: uuid.UUID) -> str: """Return the most recent non-archived assistant message body, or '' if none.""" from sqlalchemy import desc, select from my_deepagent.persistence.models import MessageRow async with db.session() as s: row = ( await s.execute( select(MessageRow) .where(MessageRow.session_id == str(session_id)) .where(MessageRow.role == "assistant") .where(MessageRow.archived.is_(False)) .order_by(desc(MessageRow.seq)) .limit(1) ) ).scalar_one_or_none() return row.content if row is not None else ""