diff --git a/my-deepagent/CHANGELOG.md b/my-deepagent/CHANGELOG.md index b7ce6f0..c010f9e 100644 --- a/my-deepagent/CHANGELOG.md +++ b/my-deepagent/CHANGELOG.md @@ -3,6 +3,45 @@ ## [Unreleased] ### Added +- **v0.2 PR #2b — `mydeepagent runs resume ` real implementation**. + Closes the v0.1.0 KNOWN LIMIT where resume was an exit-2 stub. Reuses + v0.2 PR #2a's LangGraph wiring + sweep_orphan_runs's DB state machine, + no Temporal (DR-3). + - `src/my_deepagent/engine.py`: + - New `WorkflowEngine.resume(run_id)` async method. Loads `RunRow`, + rejects terminal states with `MyDeepAgentError.human_required("run_already_terminal")`, + reloads `worktree_root` + `WorkflowTemplate` (via `_reload_template`) + + bindings (via `_reload_bindings`) from DB. Does **not** call + `bind_personas` again — locks in the original binding so consent / + pool changes don't silently shift roles. + - New `_execute_run` helper (shared phase loop) extracted from `run()`. + Skips already-`completed` phases (emits `phase.skipped` event) and + re-executes the rest. Both `run` (new) and `resume` dispatch through + it. + - New helpers: `_get_run_or_raise`, `_reload_template`, + `_reload_bindings` (rebuilds `{role_id: Binding}` from + `run_bindings` ⨝ `agent_personas`; corrupt persona rows are logged + and skipped, surfacing as `run_metadata_missing` if no bindings remain), + `_get_completed_phase_keys`. + - New `RunEventType.RUN_RESUMED` and `RunEventType.PHASE_SKIPPED` are + now actually emitted (the enum members existed already from v0.1.0). + - `src/my_deepagent/cli/runs.py` `_runs_resume_async`: stub → real impl. + Validates run exists + non-terminal, loads seed personas + artifact + schemas (`docs/schemas/`), constructs `WorkflowEngine` with a + "abort-on-new-approval" callback, calls `engine.resume(UUID(id))`, + prints final state + report path. Catches `MyDeepAgentError` and prints + a red error with exit 1. + - `tests/integration/test_resume.py` (new, 5 scenarios): + 1. 2-phase workflow: phase 1 succeeds, phase 2 fails → flip run row + back to executing → resume → phase 2 completes; assert phase 1 was + skipped (`phase.skipped` event present) and `run.resumed` event emitted. + 2. Terminal run → `resume()` raises `MyDeepAgentError(code="run_already_terminal")`. + 3. Unknown run id → raises `MyDeepAgentError(code="run_not_found")`. + 4. RunBindingRow rows missing → raises `MyDeepAgentError(code="run_metadata_missing")`. + 5. workflow_templates.definition is malformed → raises `MyDeepAgentError(code="template_load_failed")`. + - E2E real OpenRouter regression PASS 78.52 s (baseline 71–122 s); + within DR-3 acceptance threshold (+20%). + - **v0.2 PR #2a — LangGraph `AsyncPostgresSaver` engine wiring** (foundation for `runs resume`). v0.2 PR #1 added the dependency; this commit actually uses it. diff --git a/my-deepagent/src/my_deepagent/cli/runs.py b/my-deepagent/src/my_deepagent/cli/runs.py index a2ef914..3b089a4 100644 --- a/my-deepagent/src/my_deepagent/cli/runs.py +++ b/my-deepagent/src/my_deepagent/cli/runs.py @@ -135,35 +135,91 @@ async def _runs_show_async(run_id: str) -> None: async def _runs_resume_async(run_id: str) -> None: - """v0.1.0: resume is not implemented. + """Resume a non-terminal run from its first non-completed phase. - Surfaces the run state and hints at next steps. Future v0.2 implementation: - rehydrate the workflow template by template_hash, replay phase loop from the - first non-completed phase using the existing checkpointer. + v0.2 PR #2b: actually re-enters `WorkflowEngine.resume(run_id)`. Reloads + bindings + template + worktree from DB; skips already-completed phases; + runs the remaining phases under the LangGraph saver wired in v0.2 PR #2a. """ + from uuid import UUID + + from ..artifact_schema import ArtifactSchemaRegistry + from ..binding import BackendAvailability, PersonaConsentStore + from ..budget import make_budget_tracker_from_config + from ..engine import WorkflowEngine + from ..enums import Backend + from ..errors import MyDeepAgentError + from ..persona import load_personas_from_dir + full_id = await _resolve_run_id(run_id) config = load_config() + db = Database(config.database_url) await db.init_schema() + + # Fail fast on missing / terminal runs before constructing the engine. try: async with db.session() as s: run = await s.get(RunRow, full_id) if run is None: _CONSOLE.print(f"[red]run not found:[/] {run_id}") raise typer.Exit(code=1) - if run.state in ("completed", "failed", "aborted"): - _CONSOLE.print( - f"[yellow]Run {run.id} is already terminal ({run.state}). " - "Start a fresh run with `mydeepagent run `.[/]" - ) - raise typer.Exit(code=1) - _CONSOLE.print( - "[yellow]Resume is not implemented in v0.1.0. The crash-recovery sweep at startup " - "marked this run as failed; relaunch the workflow with `mydeepagent run`.[/]" - ) - raise typer.Exit(code=2) - finally: + if run.state in ("completed", "failed", "aborted"): + _CONSOLE.print( + f"[yellow]Run {run.id} is already terminal ({run.state}). " + "Start a fresh run with `mydeepagent run `.[/]" + ) + raise typer.Exit(code=1) + except typer.Exit: await db.dispose() + raise + + # Seed assets needed by WorkflowEngine. Personas / artifact schemas come + # from the seed directory, not from DB — they're language-neutral immutable + # fixtures versioned in `docs/schemas/`. + seed_root = Path(__file__).resolve().parents[3] / "docs" / "schemas" + personas = load_personas_from_dir(seed_root / "personas") + registry = ArtifactSchemaRegistry(roots=[seed_root / "artifacts"]) + consent = PersonaConsentStore(config.data_dir / "consents.json") + backends = BackendAvailability(available_backends=frozenset(Backend)) + + async def _no_op_approval(_payload: dict[str, object], _gates: list[str]) -> object: + # Resume in CLI mode does not currently re-prompt for approval — the + # original run already passed any gates it had reached. Reaching this + # callback means a phase we *didn't* pass before is now hitting an + # approval gate; treat that as REQUEST_CHANGES so the user knows. + from ..enums import ApprovalDecisionAction + + _CONSOLE.print("[yellow]A new phase needs human approval; aborting resume.[/]") + return ApprovalDecisionAction.REQUEST_CHANGES + + budget = make_budget_tracker_from_config(db, config) + await budget.init() + + engine = WorkflowEngine( + db=db, + config=config, + persona_pool=personas, + artifact_registry=registry, + consent_store=consent, + available_backends=backends, + approval_callback=_no_op_approval, + budget_tracker=budget, + ) + + try: + result = await engine.resume(UUID(full_id)) + except MyDeepAgentError as e: + _CONSOLE.print(f"[red]resume failed:[/] {e.code} — {e}") + await db.dispose() + raise typer.Exit(code=1) from e + + _CONSOLE.print(f"[green]Resume complete:[/] run={result.run_id} state={result.state.value}") + if result.final_report_path is not None: + _CONSOLE.print(f" report: {result.final_report_path}") + if result.error: + _CONSOLE.print(f" error: {result.error}") + await db.dispose() async def _resolve_run_id(prefix_or_full: str) -> str: diff --git a/my-deepagent/src/my_deepagent/engine.py b/my-deepagent/src/my_deepagent/engine.py index 7a05bb0..9e25452 100644 --- a/my-deepagent/src/my_deepagent/engine.py +++ b/my-deepagent/src/my_deepagent/engine.py @@ -4,6 +4,7 @@ from __future__ import annotations import asyncio import json +import logging import signal from collections.abc import AsyncIterator from contextlib import asynccontextmanager, suppress @@ -58,6 +59,8 @@ ApprovalCallback = Any # Callable[[dict, list[str]], Awaitable[ApprovalDecision _DEFAULT_PHASE_TIMEOUT_SECONDS = 300 # 5 minutes +_LOG_CORRUPT_PERSONA = logging.getLogger(__name__ + ".resume") + @dataclass(frozen=True) class RunResult: @@ -165,6 +168,11 @@ class WorkflowEngine: requirements_md: str = "", override: BindingOverride | None = None, ) -> RunResult: + """Start a brand-new run. Allocates a new `run_id`, binds personas, persists + skeleton metadata, and dispatches to the shared `_execute_run` phase loop. + + For resuming an existing non-terminal run, use :meth:`resume` instead. + """ run_id = uuid4() worktree_root = self._config.workspace_root / str(run_id) worktree_root.mkdir(parents=True, exist_ok=True) @@ -186,6 +194,59 @@ class WorkflowEngine: await self._append_event(run_id, None, RunEventType.RUN_CREATED, {}) await self._append_event(run_id, None, RunEventType.RUN_STARTED, {}) + + return await self._execute_run(run_id, template, worktree_root, bindings) + + async def resume(self, run_id: UUID) -> RunResult: + """Resume a non-terminal run from its first non-completed phase. + + Reloads worktree_root, template, and bindings from the DB — does **not** + re-run `bind_personas`, so consent/pool changes since the original run + do not silently shift the binding. Phases whose `RunPhaseRow.state` is + already ``completed`` are skipped; the rest re-execute and (when a + LangGraph saver is wired) replay deepagents from the last checkpoint + for that phase's thread_id. + + Raises: + MyDeepAgentError: if the run is missing, terminal, or its bindings + / template metadata cannot be reloaded. + """ + run_row = await self._get_run_or_raise(run_id) + if run_row.state in { + RunState.COMPLETED.value, + RunState.FAILED.value, + RunState.ABORTED.value, + }: + raise MyDeepAgentError.human_required( + "run_already_terminal", + message=( + f"run {run_id} is already {run_row.state}; start a fresh run " + f"with `mydeepagent run`" + ), + ) + + worktree_root = Path(run_row.worktree_root) + template = await self._reload_template(run_row.template_id) + bindings = await self._reload_bindings(run_id) + if not bindings: + raise MyDeepAgentError.human_required( + "run_metadata_missing", + message=( + f"run {run_id} has no binding rows; cannot resume — start a fresh run instead" + ), + ) + + await self._append_event(run_id, None, RunEventType.RUN_RESUMED, {}) + return await self._execute_run(run_id, template, worktree_root, bindings) + + async def _execute_run( + self, + run_id: UUID, + template: WorkflowTemplate, + worktree_root: Path, + bindings: dict[str, Binding], + ) -> RunResult: + """Shared phase loop used by both `run` (new) and `resume`.""" await self._set_run_state(run_id, RunState.EXECUTING) # Open the LangGraph AsyncPostgresSaver once per run; all phases share it. @@ -195,8 +256,17 @@ class WorkflowEngine: # checkpointer=None and runs without resume support. async with self._maybe_open_saver() as saver: self._saver = saver + completed_keys = await self._get_completed_phase_keys(run_id) try: for phase_def in template.phases: + if phase_def.key in completed_keys: + await self._append_event( + run_id, + None, + RunEventType.PHASE_SKIPPED, + {"phase_key": phase_def.key, "reason": "already_completed"}, + ) + continue role_binding = bindings[phase_def.role] await self._run_phase(run_id, worktree_root, template, phase_def, role_binding) await self._set_run_state(run_id, RunState.COMPLETED) @@ -933,6 +1003,90 @@ class WorkflowEngine: except Exception: await s.rollback() + # ------------------------------------------------------------------ + # Resume helpers (used by `resume` to rehydrate state from DB) + # ------------------------------------------------------------------ + + async def _get_run_or_raise(self, run_id: UUID) -> RunRow: + async with self._db.session() as s: + row = await s.get(RunRow, str(run_id)) + if row is None: + raise MyDeepAgentError.human_required( + "run_not_found", + message=f"run {run_id} not found in DB", + ) + return row + + async def _reload_template(self, template_id: str) -> WorkflowTemplate: + async with self._db.session() as s: + row = await s.get(WorkflowTemplateRow, template_id) + if row is None: + raise MyDeepAgentError.fatal( + "template_load_failed", + message=f"workflow_templates row {template_id} not found", + ) + try: + return WorkflowTemplate.model_validate(row.definition) + except Exception as e: + raise MyDeepAgentError.fatal( + "template_load_failed", + message=f"workflow_templates.definition for {template_id} is malformed: {e}", + ) from e + + async def _reload_bindings(self, run_id: UUID) -> dict[str, Binding]: + """Rebuild the `{role_id: Binding}` dict from `run_bindings` + `agent_personas`. + + Empty result means the run was never fully persisted — caller raises + `run_metadata_missing`. We do NOT re-run `bind_personas` here on purpose: + consent / pool state could have shifted since the original run. + """ + from .binding import Binding as _Binding # local import to avoid cycle hint + + async with self._db.session() as s: + binding_rows = ( + (await s.execute(select(RunBindingRow).where(RunBindingRow.run_id == str(run_id)))) + .scalars() + .all() + ) + persona_rows: dict[str, AgentPersonaRow] = {} + for br in binding_rows: + pr = await s.get(AgentPersonaRow, br.persona_id) + if pr is not None: + persona_rows[br.persona_id] = pr + + out: dict[str, Binding] = {} + for br in binding_rows: + pr = persona_rows.get(br.persona_id) + if pr is None: + continue + try: + persona = Persona.model_validate(pr.definition) + except Exception as e: + # Corrupt persona JSON: skip the binding; an empty bindings dict + # surfaces as `run_metadata_missing` in `resume`. + _LOG_CORRUPT_PERSONA.warning("corrupt persona row %s during resume: %s", pr.id, e) + continue + out[br.role_id] = _Binding( + role_id=br.role_id, persona=persona, binding_hash=br.binding_hash + ) + return out + + async def _get_completed_phase_keys(self, run_id: UUID) -> set[str]: + """Return the set of phase_keys that already reached `completed` state.""" + async with self._db.session() as s: + rows = ( + ( + await s.execute( + select(RunPhaseRow.phase_key) + .where(RunPhaseRow.run_id == str(run_id)) + .where(RunPhaseRow.state == RunPhaseState.COMPLETED.value) + ) + ) + .scalars() + .all() + ) + return set(rows) + # ------------------------------------------------------------------ # Module-level helpers diff --git a/my-deepagent/tests/integration/test_resume.py b/my-deepagent/tests/integration/test_resume.py new file mode 100644 index 0000000..3a6b763 --- /dev/null +++ b/my-deepagent/tests/integration/test_resume.py @@ -0,0 +1,539 @@ +"""WorkflowEngine.resume — v0.2 PR #2b integration tests. + +Covers 5 scenarios from the plan: + 1. 2-phase workflow with phase 1 already completed → resume picks up phase 2. + 2. Terminal run (state=completed) → resume raises MyDeepAgentError.human_required. + 3. Unknown run_id → raises MyDeepAgentError.human_required("run_not_found"). + 4. RunBindingRow rows missing → raises MyDeepAgentError.human_required. + 5. Workflow template definition is corrupt → raises MyDeepAgentError.fatal. + +All scenarios use sqlite tmp_path + mock build_agent (no real OpenRouter). +""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch +from uuid import UUID, uuid4 + +import pytest +from sqlalchemy import select + +from my_deepagent.artifact_schema import ArtifactSchemaRegistry +from my_deepagent.binding import BackendAvailability, PersonaConsentStore +from my_deepagent.config import load_config +from my_deepagent.engine import WorkflowEngine +from my_deepagent.enums import ( + ApprovalDecisionAction, + Backend, + RunPhaseState, + RunState, +) +from my_deepagent.errors import MyDeepAgentError +from my_deepagent.persistence.db import Database +from my_deepagent.persistence.models import ( + AgentPersonaRow, + RunBindingRow, + RunEventRow, + RunPhaseRow, + RunRow, + WorkflowTemplateRow, +) +from my_deepagent.persona import load_personas_from_dir +from my_deepagent.workflow import WorkflowTemplate + +_DOCS = Path(__file__).resolve().parents[2] / "docs" / "schemas" +_ARTIFACTS_ROOT = _DOCS / "artifacts" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _valid_spec(run_id: UUID) -> dict[str, Any]: + return { + "runId": str(run_id), + "phaseKey": "spec", + "requirements": "Implement feature X with full test coverage", + "acceptance_criteria": ["All tests pass", "Coverage >= 90%"], + "approach": "TDD: write tests first, then implement.", + "risks": [], + } + + +def _two_phase_workflow() -> WorkflowTemplate: + raw = { + "name": "test-resume-wf", + "version": 1, + "description": "two-phase workflow for resume tests", + "roles": [ + { + "id": "spec_writer", + "required_capabilities": ["spec_write", "phase_planning"], + "preferred_backends": ["openrouter"], + } + ], + "phases": [ + { + "key": "spec", + "title": "Write spec phase 1", + "risk": "low", + "role": "spec_writer", + "instructions": "Write the spec with enough words.", + "timeout_seconds": 10, + "expected_artifact": {"path": "artifacts/spec.json", "schema": "dev/spec@1"}, + }, + { + "key": "spec2", + "title": "Write spec phase 2", + "risk": "low", + "role": "spec_writer", + "instructions": "Write the second spec with enough words.", + "timeout_seconds": 10, + "expected_artifact": {"path": "artifacts/spec2.json", "schema": "dev/spec@1"}, + }, + ], + } + return WorkflowTemplate.model_validate(raw) + + +@pytest.fixture +def personas() -> list[Any]: + return load_personas_from_dir(_DOCS / "personas") + + +@pytest.fixture +def registry() -> ArtifactSchemaRegistry: + return ArtifactSchemaRegistry(roots=[_ARTIFACTS_ROOT]) + + +@pytest.fixture +def consent_store(tmp_path: Path) -> PersonaConsentStore: + return PersonaConsentStore(tmp_path / "consents.json") + + +@pytest.fixture +def available_backends() -> BackendAvailability: + return BackendAvailability(available_backends=frozenset(Backend)) + + +@pytest.fixture +async def db(tmp_path: Path) -> Database: + url = f"sqlite+aiosqlite:///{tmp_path / 'test.sqlite3'}" + database = Database(url) + await database.init_schema() + return database + + +def _make_engine( + database: Database, + tmp_path: Path, + personas: list[Any], + registry: ArtifactSchemaRegistry, + consent_store: PersonaConsentStore, + available_backends: BackendAvailability, +) -> WorkflowEngine: + cfg = load_config( + workspace_root=tmp_path, + data_dir=tmp_path / "data", + database_url=f"sqlite+aiosqlite:///{tmp_path / 'test.sqlite3'}", + ) + auto_approve = AsyncMock(return_value=ApprovalDecisionAction.APPROVE) + return WorkflowEngine( + db=database, + config=cfg, + persona_pool=personas, + artifact_registry=registry, + consent_store=consent_store, + available_backends=available_backends, + approval_callback=auto_approve, + ) + + +def _fake_agent_writes( + expected_artifact_path: Path, run_id: UUID, on_call: list[str] | None = None +) -> Any: + """A build_agent stub: the returned agent writes the expected artifact then returns.""" + + def _build( + persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any + ) -> Any: + async def _ainvoke(messages: Any, **_kwargs: Any) -> Any: + if on_call is not None: + on_call.append(str(expected_artifact_path)) + expected_artifact_path.parent.mkdir(parents=True, exist_ok=True) + artifact = _valid_spec(run_id) + content = json.dumps(artifact) + expected_artifact_path.write_text(content, encoding="utf-8") + # Drive watcher middleware so engine sees the artifact write. + for mw in middleware: + if hasattr(mw, "awrap_tool_call"): + req = MagicMock() + req.tool_call = { + "name": "write_file", + "args": { + "file_path": str(expected_artifact_path), + "content": content, + }, + "id": "x", + } + await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock())) + return {"messages": []} + + agent = MagicMock() + agent.ainvoke = _ainvoke + return agent + + return _build + + +# --------------------------------------------------------------------------- +# Scenario 1: phase 1 completed, phase 2 pending → resume picks up phase 2 +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_resume_completes_remaining_phase( # noqa: C901 — 2-phase scenario inherently spans 2 build_agent stubs + DB seam + tmp_path: Path, + personas: list[Any], + registry: ArtifactSchemaRegistry, + consent_store: PersonaConsentStore, + available_backends: BackendAvailability, + db: Database, +) -> None: + """Run phase 1 to completion via engine.run, then truncate phase 2 by aborting + the agent the first time around, then resume and verify phase 2 finishes.""" + template = _two_phase_workflow() + engine = _make_engine( + db, tmp_path, personas, registry, consent_store, available_backends + ) + + # First run: phase 1 succeeds, phase 2 deliberately fails (agent never writes). + phase2_calls: list[int] = [] + + def _build_first( + persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any + ) -> Any: + async def _ainvoke(messages: Any, **_kwargs: Any) -> Any: + # Decide which phase by the envelope content (phaseKey "spec" vs "spec2"). + envelope = messages["messages"][0]["content"] if "messages" in messages else "" + is_phase2 = "spec2" in envelope + if is_phase2: + phase2_calls.append(1) + # Do NOT write spec2.json → phase will time out + fail. + return {"messages": []} + expected = root_dir / "artifacts" / "spec.json" + expected.parent.mkdir(parents=True, exist_ok=True) + # Use a stable placeholder run_id; the schema only checks UUID format + content = json.dumps(_valid_spec(uuid4())) + expected.write_text(content, encoding="utf-8") + for mw in middleware: + if hasattr(mw, "awrap_tool_call"): + req = MagicMock() + req.tool_call = { + "name": "write_file", + "args": {"file_path": str(expected), "content": content}, + "id": "x", + } + await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock())) + return {"messages": []} + + agent = MagicMock() + agent.ainvoke = _ainvoke + return agent + + with patch("my_deepagent.engine.build_agent", side_effect=_build_first): + first_result = await engine.run( + template, repo_path=tmp_path, base_branch="main", requirements_md="test" + ) + assert first_result.state == RunState.FAILED, "phase 2 should have failed first run" + assert len(phase2_calls) >= 1 + + run_id = first_result.run_id + + # The first run is now state=failed (terminal). Reopen it as non-terminal so + # resume is willing to pick it up. (In a real crash, the run would have been + # left mid-state by SIGKILL; sweep_orphan_runs marks it failed at next start. + # Here we mimic the *pre-sweep* state by flipping it back to executing.) + async with db.session() as s: + run = await s.get(RunRow, str(run_id)) + assert run is not None + run.state = RunState.EXECUTING.value + # Reset phase 2 row so resume sees it as non-completed + phases = ( + (await s.execute(select(RunPhaseRow).where(RunPhaseRow.run_id == str(run_id)))) + .scalars() + .all() + ) + for p in phases: + if p.phase_key == "spec2": + p.state = RunPhaseState.PENDING.value + p.ended_at = None + await s.commit() + + # Second run: phase 2 succeeds this time. + phase2_calls.clear() + + def _build_second( + persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any + ) -> Any: + async def _ainvoke(messages: Any, **_kwargs: Any) -> Any: + envelope = messages["messages"][0]["content"] if "messages" in messages else "" + is_phase2 = "spec2" in envelope + phase2_calls.append(1 if is_phase2 else 0) + expected_name = "spec2.json" if is_phase2 else "spec.json" + expected = root_dir / "artifacts" / expected_name + expected.parent.mkdir(parents=True, exist_ok=True) + content = json.dumps(_valid_spec(uuid4())) + expected.write_text(content, encoding="utf-8") + for mw in middleware: + if hasattr(mw, "awrap_tool_call"): + req = MagicMock() + req.tool_call = { + "name": "write_file", + "args": {"file_path": str(expected), "content": content}, + "id": "y", + } + await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock())) + return {"messages": []} + + agent = MagicMock() + agent.ainvoke = _ainvoke + return agent + + with patch("my_deepagent.engine.build_agent", side_effect=_build_second): + resume_result = await engine.resume(run_id) + + assert resume_result.state == RunState.COMPLETED, ( + f"resume should complete: state={resume_result.state}, error={resume_result.error}" + ) + # Phase 1 should be skipped on resume (only phase 2 invoked). + assert phase2_calls == [1], f"phase 2 should be the only re-invoked phase, got {phase2_calls}" + + # RUN_RESUMED event must have been emitted. + async with db.session() as s: + events = ( + ( + await s.execute( + select(RunEventRow.type).where(RunEventRow.run_id == str(run_id)) + ) + ) + .scalars() + .all() + ) + assert "run.resumed" in events + assert "phase.skipped" in events # phase 1 skipped + + +# --------------------------------------------------------------------------- +# Scenario 2: terminal run rejected +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_resume_terminal_run_raises( + tmp_path: Path, + personas: list[Any], + registry: ArtifactSchemaRegistry, + consent_store: PersonaConsentStore, + available_backends: BackendAvailability, + db: Database, +) -> None: + """A run in a terminal state (e.g. completed) cannot be resumed.""" + template = _two_phase_workflow() + engine = _make_engine( + db, tmp_path, personas, registry, consent_store, available_backends + ) + + def _build( + persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any + ) -> Any: + async def _ainvoke(messages: Any, **_kwargs: Any) -> Any: + envelope = messages["messages"][0]["content"] if "messages" in messages else "" + phase_key = "spec2" if "spec2" in envelope else "spec" + expected = root_dir / "artifacts" / f"{phase_key}.json" + expected.parent.mkdir(parents=True, exist_ok=True) + content = json.dumps(_valid_spec(uuid4())) + expected.write_text(content, encoding="utf-8") + for mw in middleware: + if hasattr(mw, "awrap_tool_call"): + req = MagicMock() + req.tool_call = { + "name": "write_file", + "args": {"file_path": str(expected), "content": content}, + "id": "x", + } + await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock())) + return {"messages": []} + + agent = MagicMock() + agent.ainvoke = _ainvoke + return agent + + with patch("my_deepagent.engine.build_agent", side_effect=_build): + result = await engine.run(template, repo_path=tmp_path, base_branch="main") + assert result.state == RunState.COMPLETED + + with pytest.raises(MyDeepAgentError) as exc: + await engine.resume(result.run_id) + assert exc.value.code == "run_already_terminal" + + +# --------------------------------------------------------------------------- +# Scenario 3: unknown run id +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_resume_unknown_run_raises( + tmp_path: Path, + personas: list[Any], + registry: ArtifactSchemaRegistry, + consent_store: PersonaConsentStore, + available_backends: BackendAvailability, + db: Database, +) -> None: + """resume(unknown_uuid) → MyDeepAgentError(code=run_not_found).""" + engine = _make_engine( + db, tmp_path, personas, registry, consent_store, available_backends + ) + with pytest.raises(MyDeepAgentError) as exc: + await engine.resume(uuid4()) + assert exc.value.code == "run_not_found" + + +# --------------------------------------------------------------------------- +# Scenario 4: RunBindingRow rows missing +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_resume_missing_bindings_raises( + tmp_path: Path, + personas: list[Any], + registry: ArtifactSchemaRegistry, + consent_store: PersonaConsentStore, + available_backends: BackendAvailability, + db: Database, +) -> None: + """A run whose RunBindingRow rows were never persisted cannot be resumed.""" + template = _two_phase_workflow() + engine = _make_engine( + db, tmp_path, personas, registry, consent_store, available_backends + ) + + # Seed a RunRow + WorkflowTemplateRow but NO RunBindingRow. + run_id = uuid4() + tpl_id = uuid4() + async with db.session() as s: + s.add( + WorkflowTemplateRow( + id=str(tpl_id), + name=template.name, + version=template.version, + hash="sha:fake", + definition=template.model_dump(by_alias=True), + created_at="2026-05-16T00:00:00+00:00", + ) + ) + await s.flush() + s.add( + RunRow( + id=str(run_id), + template_id=str(tpl_id), + template_hash="sha:fake", + state=RunState.EXECUTING.value, + repo_path=str(tmp_path / "repo"), + base_branch="main", + worktree_root=str(tmp_path / "wt"), + created_at="2026-05-16T00:00:00+00:00", + updated_at="2026-05-16T00:00:00+00:00", + ) + ) + await s.commit() + + with pytest.raises(MyDeepAgentError) as exc: + await engine.resume(run_id) + assert exc.value.code == "run_metadata_missing" + + +# --------------------------------------------------------------------------- +# Scenario 5: workflow_templates.definition is corrupt +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_resume_corrupt_template_definition_raises( + tmp_path: Path, + personas: list[Any], + registry: ArtifactSchemaRegistry, + consent_store: PersonaConsentStore, + available_backends: BackendAvailability, + db: Database, +) -> None: + """A run whose workflow_templates.definition is malformed → fatal.""" + engine = _make_engine( + db, tmp_path, personas, registry, consent_store, available_backends + ) + + run_id = uuid4() + tpl_id = uuid4() + persona_id = uuid4() + async with db.session() as s: + # Corrupt definition (missing required `roles` / `phases`) + s.add( + WorkflowTemplateRow( + id=str(tpl_id), + name="corrupt", + version=1, + hash="sha:corrupt", + definition={"name": "corrupt"}, + created_at="2026-05-16T00:00:00+00:00", + ) + ) + await s.flush() + s.add( + RunRow( + id=str(run_id), + template_id=str(tpl_id), + template_hash="sha:corrupt", + state=RunState.EXECUTING.value, + repo_path=str(tmp_path / "repo"), + base_branch="main", + worktree_root=str(tmp_path / "wt"), + created_at="2026-05-16T00:00:00+00:00", + updated_at="2026-05-16T00:00:00+00:00", + ) + ) + s.add( + AgentPersonaRow( + id=str(persona_id), + name="openrouter-deepseek-spec-writer", + version=1, + hash="sha:p", + definition=personas[0].model_dump(by_alias=True), + created_at="2026-05-16T00:00:00+00:00", + ) + ) + # Flush RunRow + AgentPersonaRow before adding RunBindingRow so the FKs + # are satisfied at INSERT time (SQLite with PRAGMA foreign_keys=ON). + await s.flush() + s.add( + RunBindingRow( + id=str(uuid4()), + run_id=str(run_id), + role_id="spec_writer", + persona_id=str(persona_id), + persona_hash="sha:p", + backend="openrouter", + binding_hash="sha:b", + ) + ) + await s.commit() + + with pytest.raises(MyDeepAgentError) as exc: + await engine.resume(run_id) + assert exc.value.code == "template_load_failed"