"""WorkflowEngine.resume — v0.2 PR #2b integration tests. Covers 5 scenarios from the plan: 1. 2-phase workflow with phase 1 already completed → resume picks up phase 2. 2. Terminal run (state=completed) → resume raises MyDeepAgentError.human_required. 3. Unknown run_id → raises MyDeepAgentError.human_required("run_not_found"). 4. RunBindingRow rows missing → raises MyDeepAgentError.human_required. 5. Workflow template definition is corrupt → raises MyDeepAgentError.fatal. All scenarios use sqlite tmp_path + mock build_agent (no real OpenRouter). """ from __future__ import annotations import json from pathlib import Path from typing import Any from unittest.mock import AsyncMock, MagicMock, patch from uuid import UUID, uuid4 import pytest from sqlalchemy import select from my_deepagent.artifact_schema import ArtifactSchemaRegistry from my_deepagent.binding import BackendAvailability, PersonaConsentStore from my_deepagent.config import load_config from my_deepagent.engine import WorkflowEngine from my_deepagent.enums import ( ApprovalDecisionAction, Backend, RunPhaseState, RunState, ) from my_deepagent.errors import MyDeepAgentError from my_deepagent.persistence.db import Database from my_deepagent.persistence.models import ( AgentPersonaRow, RunBindingRow, RunEventRow, RunPhaseRow, RunRow, WorkflowTemplateRow, ) from my_deepagent.persona import load_personas_from_dir from my_deepagent.workflow import WorkflowTemplate _DOCS = Path(__file__).resolve().parents[2] / "docs" / "schemas" _ARTIFACTS_ROOT = _DOCS / "artifacts" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _valid_spec(run_id: UUID) -> dict[str, Any]: return { "runId": str(run_id), "phaseKey": "spec", "requirements": "Implement feature X with full test coverage", "acceptance_criteria": ["All tests pass", "Coverage >= 90%"], "approach": "TDD: write tests first, then implement.", "risks": [], } def _two_phase_workflow() -> WorkflowTemplate: raw = { "name": "test-resume-wf", "version": 1, "description": "two-phase workflow for resume tests", "roles": [ { "id": "spec_writer", "required_capabilities": ["spec_write", "phase_planning"], "preferred_backends": ["openrouter"], } ], "phases": [ { "key": "spec", "title": "Write spec phase 1", "risk": "low", "role": "spec_writer", "instructions": "Write the spec with enough words.", "timeout_seconds": 10, "expected_artifact": {"path": "artifacts/spec.json", "schema": "dev/spec@1"}, }, { "key": "spec2", "title": "Write spec phase 2", "risk": "low", "role": "spec_writer", "instructions": "Write the second spec with enough words.", "timeout_seconds": 10, "expected_artifact": {"path": "artifacts/spec2.json", "schema": "dev/spec@1"}, }, ], } return WorkflowTemplate.model_validate(raw) @pytest.fixture def personas() -> list[Any]: return load_personas_from_dir(_DOCS / "personas") @pytest.fixture def registry() -> ArtifactSchemaRegistry: return ArtifactSchemaRegistry(roots=[_ARTIFACTS_ROOT]) @pytest.fixture def consent_store(tmp_path: Path) -> PersonaConsentStore: return PersonaConsentStore(tmp_path / "consents.json") @pytest.fixture def available_backends() -> BackendAvailability: return BackendAvailability(available_backends=frozenset(Backend)) @pytest.fixture async def db(tmp_path: Path) -> Database: url = f"sqlite+aiosqlite:///{tmp_path / 'test.sqlite3'}" database = Database(url) await database.init_schema() return database def _make_engine( database: Database, tmp_path: Path, personas: list[Any], registry: ArtifactSchemaRegistry, consent_store: PersonaConsentStore, available_backends: BackendAvailability, ) -> WorkflowEngine: cfg = load_config( workspace_root=tmp_path, data_dir=tmp_path / "data", database_url=f"sqlite+aiosqlite:///{tmp_path / 'test.sqlite3'}", ) auto_approve = AsyncMock(return_value=ApprovalDecisionAction.APPROVE) return WorkflowEngine( db=database, config=cfg, persona_pool=personas, artifact_registry=registry, consent_store=consent_store, available_backends=available_backends, approval_callback=auto_approve, ) def _fake_agent_writes( expected_artifact_path: Path, run_id: UUID, on_call: list[str] | None = None ) -> Any: """A build_agent stub: the returned agent writes the expected artifact then returns.""" def _build( persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any ) -> Any: async def _ainvoke(messages: Any, **_kwargs: Any) -> Any: if on_call is not None: on_call.append(str(expected_artifact_path)) expected_artifact_path.parent.mkdir(parents=True, exist_ok=True) artifact = _valid_spec(run_id) content = json.dumps(artifact) expected_artifact_path.write_text(content, encoding="utf-8") # Drive watcher middleware so engine sees the artifact write. for mw in middleware: if hasattr(mw, "awrap_tool_call"): req = MagicMock() req.tool_call = { "name": "write_file", "args": { "file_path": str(expected_artifact_path), "content": content, }, "id": "x", } await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock())) return {"messages": []} agent = MagicMock() agent.ainvoke = _ainvoke return agent return _build # --------------------------------------------------------------------------- # Scenario 1: phase 1 completed, phase 2 pending → resume picks up phase 2 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_resume_completes_remaining_phase( # noqa: C901 — 2-phase scenario inherently spans 2 build_agent stubs + DB seam tmp_path: Path, personas: list[Any], registry: ArtifactSchemaRegistry, consent_store: PersonaConsentStore, available_backends: BackendAvailability, db: Database, ) -> None: """Run phase 1 to completion via engine.run, then truncate phase 2 by aborting the agent the first time around, then resume and verify phase 2 finishes.""" template = _two_phase_workflow() engine = _make_engine(db, tmp_path, personas, registry, consent_store, available_backends) # First run: phase 1 succeeds, phase 2 deliberately fails (agent never writes). phase2_calls: list[int] = [] def _build_first( persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any ) -> Any: async def _ainvoke(messages: Any, **_kwargs: Any) -> Any: # Decide which phase by the envelope content (phaseKey "spec" vs "spec2"). envelope = messages["messages"][0]["content"] if "messages" in messages else "" is_phase2 = "spec2" in envelope if is_phase2: phase2_calls.append(1) # Do NOT write spec2.json → phase will time out + fail. return {"messages": []} expected = root_dir / "artifacts" / "spec.json" expected.parent.mkdir(parents=True, exist_ok=True) # Use a stable placeholder run_id; the schema only checks UUID format content = json.dumps(_valid_spec(uuid4())) expected.write_text(content, encoding="utf-8") for mw in middleware: if hasattr(mw, "awrap_tool_call"): req = MagicMock() req.tool_call = { "name": "write_file", "args": {"file_path": str(expected), "content": content}, "id": "x", } await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock())) return {"messages": []} agent = MagicMock() agent.ainvoke = _ainvoke return agent with patch("my_deepagent.engine.build_agent", side_effect=_build_first): first_result = await engine.run( template, repo_path=tmp_path, base_branch="main", requirements_md="test" ) assert first_result.state == RunState.FAILED, "phase 2 should have failed first run" assert len(phase2_calls) >= 1 run_id = first_result.run_id # The first run is now state=failed (terminal). Reopen it as non-terminal so # resume is willing to pick it up. (In a real crash, the run would have been # left mid-state by SIGKILL; sweep_orphan_runs marks it failed at next start. # Here we mimic the *pre-sweep* state by flipping it back to executing.) async with db.session() as s: run = await s.get(RunRow, str(run_id)) assert run is not None run.state = RunState.EXECUTING.value # Reset phase 2 row so resume sees it as non-completed phases = ( (await s.execute(select(RunPhaseRow).where(RunPhaseRow.run_id == str(run_id)))) .scalars() .all() ) for p in phases: if p.phase_key == "spec2": p.state = RunPhaseState.PENDING.value p.ended_at = None await s.commit() # Second run: phase 2 succeeds this time. phase2_calls.clear() def _build_second( persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any ) -> Any: async def _ainvoke(messages: Any, **_kwargs: Any) -> Any: envelope = messages["messages"][0]["content"] if "messages" in messages else "" is_phase2 = "spec2" in envelope phase2_calls.append(1 if is_phase2 else 0) expected_name = "spec2.json" if is_phase2 else "spec.json" expected = root_dir / "artifacts" / expected_name expected.parent.mkdir(parents=True, exist_ok=True) content = json.dumps(_valid_spec(uuid4())) expected.write_text(content, encoding="utf-8") for mw in middleware: if hasattr(mw, "awrap_tool_call"): req = MagicMock() req.tool_call = { "name": "write_file", "args": {"file_path": str(expected), "content": content}, "id": "y", } await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock())) return {"messages": []} agent = MagicMock() agent.ainvoke = _ainvoke return agent with patch("my_deepagent.engine.build_agent", side_effect=_build_second): resume_result = await engine.resume(run_id) assert resume_result.state == RunState.COMPLETED, ( f"resume should complete: state={resume_result.state}, error={resume_result.error}" ) # Phase 1 should be skipped on resume (only phase 2 invoked). assert phase2_calls == [1], f"phase 2 should be the only re-invoked phase, got {phase2_calls}" # RUN_RESUMED event must have been emitted. async with db.session() as s: events = ( (await s.execute(select(RunEventRow.type).where(RunEventRow.run_id == str(run_id)))) .scalars() .all() ) assert "run.resumed" in events assert "phase.skipped" in events # phase 1 skipped # --------------------------------------------------------------------------- # Scenario 2: terminal run rejected # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_resume_terminal_run_raises( tmp_path: Path, personas: list[Any], registry: ArtifactSchemaRegistry, consent_store: PersonaConsentStore, available_backends: BackendAvailability, db: Database, ) -> None: """A run in a terminal state (e.g. completed) cannot be resumed.""" template = _two_phase_workflow() engine = _make_engine(db, tmp_path, personas, registry, consent_store, available_backends) def _build( persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any ) -> Any: async def _ainvoke(messages: Any, **_kwargs: Any) -> Any: envelope = messages["messages"][0]["content"] if "messages" in messages else "" phase_key = "spec2" if "spec2" in envelope else "spec" expected = root_dir / "artifacts" / f"{phase_key}.json" expected.parent.mkdir(parents=True, exist_ok=True) content = json.dumps(_valid_spec(uuid4())) expected.write_text(content, encoding="utf-8") for mw in middleware: if hasattr(mw, "awrap_tool_call"): req = MagicMock() req.tool_call = { "name": "write_file", "args": {"file_path": str(expected), "content": content}, "id": "x", } await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock())) return {"messages": []} agent = MagicMock() agent.ainvoke = _ainvoke return agent with patch("my_deepagent.engine.build_agent", side_effect=_build): result = await engine.run(template, repo_path=tmp_path, base_branch="main") assert result.state == RunState.COMPLETED with pytest.raises(MyDeepAgentError) as exc: await engine.resume(result.run_id) assert exc.value.code == "run_already_terminal" # --------------------------------------------------------------------------- # Scenario 3: unknown run id # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_resume_unknown_run_raises( tmp_path: Path, personas: list[Any], registry: ArtifactSchemaRegistry, consent_store: PersonaConsentStore, available_backends: BackendAvailability, db: Database, ) -> None: """resume(unknown_uuid) → MyDeepAgentError(code=run_not_found).""" engine = _make_engine(db, tmp_path, personas, registry, consent_store, available_backends) with pytest.raises(MyDeepAgentError) as exc: await engine.resume(uuid4()) assert exc.value.code == "run_not_found" # --------------------------------------------------------------------------- # Scenario 4: RunBindingRow rows missing # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_resume_missing_bindings_raises( tmp_path: Path, personas: list[Any], registry: ArtifactSchemaRegistry, consent_store: PersonaConsentStore, available_backends: BackendAvailability, db: Database, ) -> None: """A run whose RunBindingRow rows were never persisted cannot be resumed.""" template = _two_phase_workflow() engine = _make_engine(db, tmp_path, personas, registry, consent_store, available_backends) # Seed a RunRow + WorkflowTemplateRow but NO RunBindingRow. run_id = uuid4() tpl_id = uuid4() async with db.session() as s: s.add( WorkflowTemplateRow( id=str(tpl_id), name=template.name, version=template.version, hash="sha:fake", definition=template.model_dump(by_alias=True), created_at="2026-05-16T00:00:00+00:00", ) ) await s.flush() s.add( RunRow( id=str(run_id), template_id=str(tpl_id), template_hash="sha:fake", state=RunState.EXECUTING.value, repo_path=str(tmp_path / "repo"), base_branch="main", worktree_root=str(tmp_path / "wt"), created_at="2026-05-16T00:00:00+00:00", updated_at="2026-05-16T00:00:00+00:00", ) ) await s.commit() with pytest.raises(MyDeepAgentError) as exc: await engine.resume(run_id) assert exc.value.code == "run_metadata_missing" # --------------------------------------------------------------------------- # Scenario 5: workflow_templates.definition is corrupt # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_resume_corrupt_template_definition_raises( tmp_path: Path, personas: list[Any], registry: ArtifactSchemaRegistry, consent_store: PersonaConsentStore, available_backends: BackendAvailability, db: Database, ) -> None: """A run whose workflow_templates.definition is malformed → fatal.""" engine = _make_engine(db, tmp_path, personas, registry, consent_store, available_backends) run_id = uuid4() tpl_id = uuid4() persona_id = uuid4() async with db.session() as s: # Corrupt definition (missing required `roles` / `phases`) s.add( WorkflowTemplateRow( id=str(tpl_id), name="corrupt", version=1, hash="sha:corrupt", definition={"name": "corrupt"}, created_at="2026-05-16T00:00:00+00:00", ) ) await s.flush() s.add( RunRow( id=str(run_id), template_id=str(tpl_id), template_hash="sha:corrupt", state=RunState.EXECUTING.value, repo_path=str(tmp_path / "repo"), base_branch="main", worktree_root=str(tmp_path / "wt"), created_at="2026-05-16T00:00:00+00:00", updated_at="2026-05-16T00:00:00+00:00", ) ) s.add( AgentPersonaRow( id=str(persona_id), name="openrouter-deepseek-spec-writer", version=1, hash="sha:p", definition=personas[0].model_dump(by_alias=True), created_at="2026-05-16T00:00:00+00:00", ) ) # Flush RunRow + AgentPersonaRow before adding RunBindingRow so the FKs # are satisfied at INSERT time (SQLite with PRAGMA foreign_keys=ON). await s.flush() s.add( RunBindingRow( id=str(uuid4()), run_id=str(run_id), role_id="spec_writer", persona_id=str(persona_id), persona_hash="sha:p", backend="openrouter", binding_hash="sha:b", ) ) await s.commit() with pytest.raises(MyDeepAgentError) as exc: await engine.resume(run_id) assert exc.value.code == "template_load_failed"