"""WorkflowEngine integration tests using a mock build_agent (no real OpenRouter calls).""" from __future__ import annotations import json import textwrap from pathlib import Path from typing import Any from unittest.mock import AsyncMock, MagicMock, patch from uuid import UUID, uuid4 import pytest from my_deepagent.artifact_schema import ArtifactSchemaRegistry from my_deepagent.binding import BackendAvailability, PersonaConsentStore from my_deepagent.config import load_config from my_deepagent.engine import WorkflowEngine, _render_report_md from my_deepagent.enums import ApprovalDecisionAction, Backend, RunState from my_deepagent.persistence.db import Database from my_deepagent.persona import load_personas_from_dir from my_deepagent.workflow import WorkflowTemplate # --------------------------------------------------------------------------- # Path constants # --------------------------------------------------------------------------- _DOCS = Path(__file__).resolve().parents[2] / "docs" / "schemas" _ARTIFACTS_ROOT = _DOCS / "artifacts" # --------------------------------------------------------------------------- # Helper: valid spec artifact # --------------------------------------------------------------------------- def _valid_spec_artifact(run_id: UUID) -> dict[str, Any]: return { "runId": str(run_id), "phaseKey": "spec", "requirements": "Implement feature X with full test coverage", "acceptance_criteria": ["All tests pass", "Coverage >= 90%"], "approach": "TDD: write tests first, then implement the feature", "risks": [], } # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def personas() -> list[Any]: return load_personas_from_dir(_DOCS / "personas") @pytest.fixture def artifact_registry() -> ArtifactSchemaRegistry: return ArtifactSchemaRegistry(roots=[_ARTIFACTS_ROOT]) @pytest.fixture def consent_store(tmp_path: Path) -> PersonaConsentStore: return PersonaConsentStore(tmp_path / "consents.json") @pytest.fixture def available_backends() -> BackendAvailability: return BackendAvailability(available_backends=frozenset(Backend)) @pytest.fixture async def db(tmp_path: Path) -> Database: url = f"sqlite+aiosqlite:///{tmp_path / 'test.sqlite3'}" database = Database(url) await database.init_schema() return database @pytest.fixture def governance(tmp_path: Path) -> Path: """Create governance consent file so require_consent passes.""" data_dir = tmp_path / "data" data_dir.mkdir(parents=True) (data_dir / "governance-accepted.json").write_text( '{"accepted_at":"2026-01-01T00:00:00+00:00"}' ) return data_dir def _minimal_workflow_yaml( tmp_path: Path, schema_id: str = "dev/spec@1", gates: list[str] | None = None ) -> WorkflowTemplate: """Build a single-phase workflow template (in-memory) for testing.""" phase_data: dict[str, object] = { "key": "spec", "title": "Write spec", "risk": "low", "role": "spec_writer", "instructions": "Write a detailed specification document with at least ten words here.", "timeout_seconds": 10, "expected_artifact": { "path": "artifacts/spec.json", "schema": schema_id, }, } if gates: phase_data["gates"] = gates raw = { "name": "test-workflow", "version": 1, "description": "unit test workflow", "roles": [ { "id": "spec_writer", "required_capabilities": ["spec_write", "phase_planning"], "preferred_backends": ["openrouter"], } ], "phases": [phase_data], } return WorkflowTemplate.model_validate(raw) def _make_engine( database: Database, tmp_path: Path, personas: list[Any], artifact_registry: ArtifactSchemaRegistry, consent_store: PersonaConsentStore, available_backends: BackendAvailability, approval_cb: Any, ) -> WorkflowEngine: cfg = load_config( workspace_root=tmp_path, data_dir=tmp_path / "data", database_url=f"sqlite+aiosqlite:///{tmp_path / 'test.sqlite3'}", ) return WorkflowEngine( db=database, config=cfg, persona_pool=personas, artifact_registry=artifact_registry, consent_store=consent_store, available_backends=available_backends, approval_callback=approval_cb, ) # --------------------------------------------------------------------------- # Unit-level tests (no DB, no agent) # --------------------------------------------------------------------------- class TestRunEventUtils: """Tests for run_event helpers.""" def test_run_idempotency_key_deterministic(self) -> None: from my_deepagent.run_event import RunEventType, run_idempotency_key run_id = uuid4() k1 = run_idempotency_key(RunEventType.PHASE_STARTED, run_id, phase_key="spec", attempt=1) k2 = run_idempotency_key(RunEventType.PHASE_STARTED, run_id, attempt=1, phase_key="spec") assert k1 == k2 def test_run_idempotency_key_contains_event_type(self) -> None: from my_deepagent.run_event import RunEventType, run_idempotency_key run_id = uuid4() key = run_idempotency_key(RunEventType.RUN_CREATED, run_id) assert "run.created" in key assert str(run_id) in key def test_run_idempotency_key_extra_sorted(self) -> None: from my_deepagent.run_event import RunEventType, run_idempotency_key run_id = uuid4() key = run_idempotency_key(RunEventType.PHASE_FAILED, run_id, z_key="z", a_key="a") # extra keys must be in sorted order assert key.index("a_key") < key.index("z_key") class TestBuildEnvelope: """Tests for _build_envelope output format.""" def test_envelope_contains_markers(self) -> None: import yaml raw = textwrap.dedent("""\ name: t version: 1 roles: - id: r required_capabilities: [spec_write, phase_planning] phases: - key: p title: T risk: low role: r instructions: Must be at least ten characters long here. expected_artifact: path: out.json schema: dev/spec@1 """) template = WorkflowTemplate.model_validate(yaml.safe_load(raw)) phase = template.phases[0] run_id = uuid4() phase_id = uuid4() from my_deepagent.engine import WorkflowEngine # Access internal _build_envelope via instance cfg = load_config() engine = WorkflowEngine.__new__(WorkflowEngine) engine._config = cfg envelope = engine._build_envelope(run_id, phase_id, phase, 1, Path("/tmp/out.json")) assert f"MYDEEPAGENT_PROMPT_BEGIN {phase_id}" in envelope assert f"MYDEEPAGENT_PROMPT_END {phase_id}" in envelope assert str(run_id) in envelope assert "dev/spec@1" in envelope def test_repair_note_appears_on_attempt_2(self) -> None: import yaml raw = textwrap.dedent("""\ name: t version: 1 roles: - id: r required_capabilities: [spec_write, phase_planning] phases: - key: p title: T risk: low role: r instructions: Must be at least ten characters long here. expected_artifact: path: out.json schema: dev/spec@1 """) template = WorkflowTemplate.model_validate(yaml.safe_load(raw)) phase = template.phases[0] run_id = uuid4() phase_id = uuid4() cfg = load_config() engine = WorkflowEngine.__new__(WorkflowEngine) engine._config = cfg envelope_1 = engine._build_envelope(run_id, phase_id, phase, 1, Path("/tmp/out.json")) envelope_2 = engine._build_envelope(run_id, phase_id, phase, 2, Path("/tmp/out.json")) assert "REPAIR ATTEMPT" not in envelope_1 assert "REPAIR ATTEMPT" in envelope_2 class TestRenderReportMd: """Tests for _render_report_md output format.""" def test_render_contains_run_id(self) -> None: run_id = str(uuid4()) report: dict[str, Any] = { "runId": run_id, "templateHash": "abc123", "status": "completed", "phases": [], "artifacts": [], "events": [], "unresolved": [], "endedAt": "2026-01-01T00:00:00+00:00", "error": None, } md = _render_report_md(report) assert run_id in md assert "completed" in md def test_render_includes_error_section(self) -> None: report = { "runId": str(uuid4()), "templateHash": "", "status": "failed", "phases": [], "artifacts": [], "events": [], "unresolved": [], "endedAt": "2026-01-01T00:00:00+00:00", "error": "something went wrong", } md = _render_report_md(report) assert "Error" in md assert "something went wrong" in md # --------------------------------------------------------------------------- # Integration tests (real DB, mock agent) # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_engine_phase_completes_with_valid_artifact( tmp_path: Path, personas: list[Any], artifact_registry: ArtifactSchemaRegistry, consent_store: PersonaConsentStore, available_backends: BackendAvailability, db: Database, ) -> None: """Engine: mock agent writes a valid artifact → RunState.COMPLETED + report written.""" template = _minimal_workflow_yaml(tmp_path) auto_approve = AsyncMock(return_value=ApprovalDecisionAction.APPROVE) engine = _make_engine( db, tmp_path, personas, artifact_registry, consent_store, available_backends, auto_approve ) def _fake_build_agent( persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any ) -> Any: run_id_placeholder = uuid4() # placeholder; overwritten by test side-effect below async def _ainvoke(messages: Any, **_kwargs: Any) -> Any: # Write a valid spec.json to the expected path expected = root_dir / "artifacts" / "spec.json" expected.parent.mkdir(parents=True, exist_ok=True) artifact = _valid_spec_artifact(run_id_placeholder) content = json.dumps(artifact) expected.write_text(content, encoding="utf-8") # Trigger artifact watcher middleware if present for mw in middleware: if hasattr(mw, "awrap_tool_call"): req = MagicMock() req.tool_call = { "name": "write_file", "args": {"file_path": str(expected), "content": content}, "id": "x", } await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock())) return {"messages": []} agent = MagicMock() agent.ainvoke = _ainvoke return agent with patch("my_deepagent.engine.build_agent", side_effect=_fake_build_agent): result = await engine.run( template, repo_path=tmp_path, base_branch="main", requirements_md="test", ) assert result.state == RunState.COMPLETED assert result.error is None assert result.final_report_path is not None assert result.final_report_path.exists() @pytest.mark.asyncio async def test_engine_invalid_artifact_triggers_repair_then_fails( tmp_path: Path, personas: list[Any], artifact_registry: ArtifactSchemaRegistry, consent_store: PersonaConsentStore, available_backends: BackendAvailability, db: Database, ) -> None: """Engine: agent always writes invalid JSON → repair 1x → RunState.FAILED.""" template = _minimal_workflow_yaml(tmp_path) auto_approve = AsyncMock(return_value=ApprovalDecisionAction.APPROVE) engine = _make_engine( db, tmp_path, personas, artifact_registry, consent_store, available_backends, auto_approve ) call_count = 0 def _fake_build_agent( persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any ) -> Any: async def _ainvoke(messages: Any, **_kwargs: Any) -> Any: nonlocal call_count call_count += 1 expected = root_dir / "artifacts" / "spec.json" expected.parent.mkdir(parents=True, exist_ok=True) # Write invalid artifact (missing required fields) invalid = {"wrong_field": "bad data"} content = json.dumps(invalid) expected.write_text(content, encoding="utf-8") for mw in middleware: if hasattr(mw, "awrap_tool_call"): req = MagicMock() req.tool_call = { "name": "write_file", "args": {"file_path": str(expected), "content": content}, "id": "x", } await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock())) return {"messages": []} agent = MagicMock() agent.ainvoke = _ainvoke return agent with patch("my_deepagent.engine.build_agent", side_effect=_fake_build_agent): result = await engine.run( template, repo_path=tmp_path, base_branch="main", requirements_md="test", ) assert result.state == RunState.FAILED assert result.error is not None # Agent was invoked twice (original + repair) assert call_count == 2 @pytest.mark.asyncio async def test_engine_agent_writes_nothing_exhausts_timeout( tmp_path: Path, personas: list[Any], artifact_registry: ArtifactSchemaRegistry, consent_store: PersonaConsentStore, available_backends: BackendAvailability, db: Database, ) -> None: """Engine: agent writes no artifact → timeout x2 → RunState.FAILED + timeout_exhausted.""" template = _minimal_workflow_yaml(tmp_path) auto_approve = AsyncMock(return_value=ApprovalDecisionAction.APPROVE) engine = _make_engine( db, tmp_path, personas, artifact_registry, consent_store, available_backends, auto_approve ) invoke_count = 0 def _fake_build_agent( persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any ) -> Any: async def _ainvoke(messages: Any, **_kwargs: Any) -> Any: nonlocal invoke_count invoke_count += 1 # Write NOTHING — simulate timeout by returning immediately return {"messages": []} agent = MagicMock() agent.ainvoke = _ainvoke return agent with patch("my_deepagent.engine.build_agent", side_effect=_fake_build_agent): result = await engine.run( template, repo_path=tmp_path, base_branch="main", ) assert result.state == RunState.FAILED assert result.error is not None assert invoke_count == 2 @pytest.mark.asyncio async def test_engine_approval_reject_fails_run( tmp_path: Path, personas: list[Any], artifact_registry: ArtifactSchemaRegistry, consent_store: PersonaConsentStore, available_backends: BackendAvailability, db: Database, ) -> None: """Engine: approval callback returns REJECT → RunState.FAILED + approval_rejected.""" template = _minimal_workflow_yaml(tmp_path, gates=["human"]) reject_cb = AsyncMock(return_value=ApprovalDecisionAction.REJECT) engine = _make_engine( db, tmp_path, personas, artifact_registry, consent_store, available_backends, reject_cb ) def _fake_build_agent( persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any ) -> Any: async def _ainvoke(messages: Any, **_kwargs: Any) -> Any: expected = root_dir / "artifacts" / "spec.json" expected.parent.mkdir(parents=True, exist_ok=True) artifact = _valid_spec_artifact(uuid4()) content = json.dumps(artifact) expected.write_text(content, encoding="utf-8") for mw in middleware: if hasattr(mw, "awrap_tool_call"): req = MagicMock() req.tool_call = { "name": "write_file", "args": {"file_path": str(expected), "content": content}, "id": "x", } await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock())) return {"messages": []} agent = MagicMock() agent.ainvoke = _ainvoke return agent with patch("my_deepagent.engine.build_agent", side_effect=_fake_build_agent): result = await engine.run( template, repo_path=tmp_path, base_branch="main", ) assert result.state == RunState.FAILED assert result.error is not None @pytest.mark.asyncio async def test_engine_approval_abort_aborts_run( tmp_path: Path, personas: list[Any], artifact_registry: ArtifactSchemaRegistry, consent_store: PersonaConsentStore, available_backends: BackendAvailability, db: Database, ) -> None: """Engine: approval callback returns ABORT → RunState.ABORTED.""" template = _minimal_workflow_yaml(tmp_path, gates=["human"]) abort_cb = AsyncMock(return_value=ApprovalDecisionAction.ABORT) engine = _make_engine( db, tmp_path, personas, artifact_registry, consent_store, available_backends, abort_cb ) def _fake_build_agent( persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any ) -> Any: async def _ainvoke(messages: Any, **_kwargs: Any) -> Any: expected = root_dir / "artifacts" / "spec.json" expected.parent.mkdir(parents=True, exist_ok=True) artifact = _valid_spec_artifact(uuid4()) content = json.dumps(artifact) expected.write_text(content, encoding="utf-8") for mw in middleware: if hasattr(mw, "awrap_tool_call"): req = MagicMock() req.tool_call = { "name": "write_file", "args": {"file_path": str(expected), "content": content}, "id": "x", } await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock())) return {"messages": []} agent = MagicMock() agent.ainvoke = _ainvoke return agent with patch("my_deepagent.engine.build_agent", side_effect=_fake_build_agent): result = await engine.run( template, repo_path=tmp_path, base_branch="main", ) assert result.state == RunState.ABORTED assert result.error is not None