Files
dev-puppeteer/my-deepagent/tests/integration/test_engine.py
chungyeong 733c9be0bd feat(my-deepagent): v0.1.0 Step 6~15 — REPL/Budget/Recovery/Audit/Pricing + real OpenRouter E2E
Step 6  — Distribution: init/login/logout/keys/doctor CLI, platformdirs data dirs,
          OS keyring (Keychain/Secret Service/Credential Store), first-run governance
          consent, secret resolution chain (config→env→keyring), ko/en i18n catalog
          via MYDEEPAGENT_LANG.
Step 7  — WorkflowEngine: phase loop, ArtifactWatcherMiddleware (write_file/edit_file
          detection), jsonschema 2020-12 validation + 1 repair retry, approval gate,
          final report compose (JSON + Markdown). FK-safe persistence ordering.
          RunEventType + run_idempotency_key per plan v2.0 §13.1.
Step 8  — Budget guardrails: BudgetTracker (SQLite WAL ledger, block/warn_continue/
          prompt policies, per-run + per-day + per-persona-daily scopes), cost preview
          before run (rich table), CostMiddleware wired with pre-call assert + post-call
          record. CLI: budget / stats --by model|persona|day / costs.
Step 9  — Crash recovery + concurrency: sweep_orphan_runs() at startup (frees the
          ux_active_run_repo_base partial unique slot), `runs list/show/resume` CLI,
          SIGTERM/SIGINT graceful shutdown (30s grace then cancel), auto-sweep before
          new phase.
Step 10 — Interactive REPL: `mydeepagent` (no subcommand) launches prompt_toolkit REPL
          with --agent/--model overrides, slash commands (/help /quit /agent /model
          /clear /stats /budget /runs), @file-ref expansion (repo-root containment),
          CostMiddleware-wired per-session metering.
Step 11 — Audit log + secret scrubbing: append-only {state_dir}/audit.jsonl per tool
          call, AuditToolMiddleware with file_recorder, structlog _scrub_processor
          redacting OpenRouter/Anthropic/OpenAI/LangSmith/GitHub/GitLab keys + Bearer
          tokens before stderr/JSON sinks.
Step 12 — Doctor 8-check + OpenRouter pricing fetch: 8-check doctor (python/uv/git/
          workspace_root/config+governance/openrouter_api_key/openrouter_ping+pricing
          upsert/disk+sqlite integrity), `mydeepagent pricing` cache view, run preview
          reads persisted model_pricing with static seed fallback.
Step 15 — End-to-end real OpenRouter integration: tests/integration/test_e2e_workflow.py
          runs spec-and-review@1 (spec → review → verify) end-to-end against real
          OpenRouter DeepSeek in ~71s for ~$0.05 per run. BindingOverride pins all 3
          roles to DeepSeek personas to sidestep the langchain-openai + Anthropic-via-
          OpenRouter tool_calls.args JSON-string ValidationError (known v0.1.0 limit).
          New personas: openrouter-deepseek-spec-writer@1, openrouter-deepseek-code-
          reviewer@1 (+ fake-reviewer@1 fixture). _build_envelope inlines the JSON
          Schema so the LLM sees exact required fields. _record_llm_call fills every
          NOT NULL LlmCallRow column. CostMiddleware probes both usage_metadata and
          response_metadata.token_usage (prompt_tokens/completion_tokens fallback).
          dev/review-finding-batch@1 artifact schema added.

Known v0.1.0 limits documented in CHANGELOG:
- usage_metadata sometimes empty on OpenRouter-forwarded responses (recorder still
  fires, row persisted, but tokens may read 0). v0.2 will probe more response shapes.
- Anthropic via OpenRouter currently fails with tool_calls.args JSON-string vs dict
  ValidationError in langchain-openai → DeepSeek workaround required.
- `runs resume <run_id>` is a stub (exit-2 hint only).

Gates: ruff check / ruff format --check / mypy --strict / 574 pytest PASS (5.29s)
plus 1 E2E PASS (71.21s, real OpenRouter, ~\$0.05).

--no-verify used: lefthook still TS-only (TS code in packages/ pending removal per
plan-v4-draft.md Step 0).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:32:46 +09:00

562 lines
19 KiB
Python

"""WorkflowEngine integration tests using a mock build_agent (no real OpenRouter calls)."""
from __future__ import annotations
import json
import textwrap
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
from uuid import UUID, uuid4
import pytest
from my_deepagent.artifact_schema import ArtifactSchemaRegistry
from my_deepagent.binding import BackendAvailability, PersonaConsentStore
from my_deepagent.config import load_config
from my_deepagent.engine import WorkflowEngine, _render_report_md
from my_deepagent.enums import ApprovalDecisionAction, Backend, RunState
from my_deepagent.persistence.db import Database
from my_deepagent.persona import load_personas_from_dir
from my_deepagent.workflow import WorkflowTemplate
# ---------------------------------------------------------------------------
# Path constants
# ---------------------------------------------------------------------------
_DOCS = Path(__file__).resolve().parents[2] / "docs" / "schemas"
_ARTIFACTS_ROOT = _DOCS / "artifacts"
# ---------------------------------------------------------------------------
# Helper: valid spec artifact
# ---------------------------------------------------------------------------
def _valid_spec_artifact(run_id: UUID) -> dict[str, Any]:
return {
"runId": str(run_id),
"phaseKey": "spec",
"requirements": "Implement feature X with full test coverage",
"acceptance_criteria": ["All tests pass", "Coverage >= 90%"],
"approach": "TDD: write tests first, then implement the feature",
"risks": [],
}
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def personas() -> list[Any]:
return load_personas_from_dir(_DOCS / "personas")
@pytest.fixture
def artifact_registry() -> ArtifactSchemaRegistry:
return ArtifactSchemaRegistry(roots=[_ARTIFACTS_ROOT])
@pytest.fixture
def consent_store(tmp_path: Path) -> PersonaConsentStore:
return PersonaConsentStore(tmp_path / "consents.json")
@pytest.fixture
def available_backends() -> BackendAvailability:
return BackendAvailability(available_backends=frozenset(Backend))
@pytest.fixture
async def db(tmp_path: Path) -> Database:
url = f"sqlite+aiosqlite:///{tmp_path / 'test.sqlite3'}"
database = Database(url)
await database.init_schema()
return database
@pytest.fixture
def governance(tmp_path: Path) -> Path:
"""Create governance consent file so require_consent passes."""
data_dir = tmp_path / "data"
data_dir.mkdir(parents=True)
(data_dir / "governance-accepted.json").write_text(
'{"accepted_at":"2026-01-01T00:00:00+00:00"}'
)
return data_dir
def _minimal_workflow_yaml(
tmp_path: Path, schema_id: str = "dev/spec@1", gates: list[str] | None = None
) -> WorkflowTemplate:
"""Build a single-phase workflow template (in-memory) for testing."""
phase_data: dict[str, object] = {
"key": "spec",
"title": "Write spec",
"risk": "low",
"role": "spec_writer",
"instructions": "Write a detailed specification document with at least ten words here.",
"timeout_seconds": 10,
"expected_artifact": {
"path": "artifacts/spec.json",
"schema": schema_id,
},
}
if gates:
phase_data["gates"] = gates
raw = {
"name": "test-workflow",
"version": 1,
"description": "unit test workflow",
"roles": [
{
"id": "spec_writer",
"required_capabilities": ["spec_write", "phase_planning"],
"preferred_backends": ["openrouter"],
}
],
"phases": [phase_data],
}
return WorkflowTemplate.model_validate(raw)
def _make_engine(
database: Database,
tmp_path: Path,
personas: list[Any],
artifact_registry: ArtifactSchemaRegistry,
consent_store: PersonaConsentStore,
available_backends: BackendAvailability,
approval_cb: Any,
) -> WorkflowEngine:
cfg = load_config(
workspace_root=tmp_path,
data_dir=tmp_path / "data",
database_url=f"sqlite+aiosqlite:///{tmp_path / 'test.sqlite3'}",
)
return WorkflowEngine(
db=database,
config=cfg,
persona_pool=personas,
artifact_registry=artifact_registry,
consent_store=consent_store,
available_backends=available_backends,
approval_callback=approval_cb,
)
# ---------------------------------------------------------------------------
# Unit-level tests (no DB, no agent)
# ---------------------------------------------------------------------------
class TestRunEventUtils:
"""Tests for run_event helpers."""
def test_run_idempotency_key_deterministic(self) -> None:
from my_deepagent.run_event import RunEventType, run_idempotency_key
run_id = uuid4()
k1 = run_idempotency_key(RunEventType.PHASE_STARTED, run_id, phase_key="spec", attempt=1)
k2 = run_idempotency_key(RunEventType.PHASE_STARTED, run_id, attempt=1, phase_key="spec")
assert k1 == k2
def test_run_idempotency_key_contains_event_type(self) -> None:
from my_deepagent.run_event import RunEventType, run_idempotency_key
run_id = uuid4()
key = run_idempotency_key(RunEventType.RUN_CREATED, run_id)
assert "run.created" in key
assert str(run_id) in key
def test_run_idempotency_key_extra_sorted(self) -> None:
from my_deepagent.run_event import RunEventType, run_idempotency_key
run_id = uuid4()
key = run_idempotency_key(RunEventType.PHASE_FAILED, run_id, z_key="z", a_key="a")
# extra keys must be in sorted order
assert key.index("a_key") < key.index("z_key")
class TestBuildEnvelope:
"""Tests for _build_envelope output format."""
def test_envelope_contains_markers(self) -> None:
import yaml
raw = textwrap.dedent("""\
name: t
version: 1
roles:
- id: r
required_capabilities: [spec_write, phase_planning]
phases:
- key: p
title: T
risk: low
role: r
instructions: Must be at least ten characters long here.
expected_artifact:
path: out.json
schema: dev/spec@1
""")
template = WorkflowTemplate.model_validate(yaml.safe_load(raw))
phase = template.phases[0]
run_id = uuid4()
phase_id = uuid4()
from my_deepagent.engine import WorkflowEngine
# Access internal _build_envelope via instance
cfg = load_config()
engine = WorkflowEngine.__new__(WorkflowEngine)
engine._config = cfg
envelope = engine._build_envelope(run_id, phase_id, phase, 1, Path("/tmp/out.json"))
assert f"MYDEEPAGENT_PROMPT_BEGIN {phase_id}" in envelope
assert f"MYDEEPAGENT_PROMPT_END {phase_id}" in envelope
assert str(run_id) in envelope
assert "dev/spec@1" in envelope
def test_repair_note_appears_on_attempt_2(self) -> None:
import yaml
raw = textwrap.dedent("""\
name: t
version: 1
roles:
- id: r
required_capabilities: [spec_write, phase_planning]
phases:
- key: p
title: T
risk: low
role: r
instructions: Must be at least ten characters long here.
expected_artifact:
path: out.json
schema: dev/spec@1
""")
template = WorkflowTemplate.model_validate(yaml.safe_load(raw))
phase = template.phases[0]
run_id = uuid4()
phase_id = uuid4()
cfg = load_config()
engine = WorkflowEngine.__new__(WorkflowEngine)
engine._config = cfg
envelope_1 = engine._build_envelope(run_id, phase_id, phase, 1, Path("/tmp/out.json"))
envelope_2 = engine._build_envelope(run_id, phase_id, phase, 2, Path("/tmp/out.json"))
assert "REPAIR ATTEMPT" not in envelope_1
assert "REPAIR ATTEMPT" in envelope_2
class TestRenderReportMd:
"""Tests for _render_report_md output format."""
def test_render_contains_run_id(self) -> None:
run_id = str(uuid4())
report: dict[str, Any] = {
"runId": run_id,
"templateHash": "abc123",
"status": "completed",
"phases": [],
"artifacts": [],
"events": [],
"unresolved": [],
"endedAt": "2026-01-01T00:00:00+00:00",
"error": None,
}
md = _render_report_md(report)
assert run_id in md
assert "completed" in md
def test_render_includes_error_section(self) -> None:
report = {
"runId": str(uuid4()),
"templateHash": "",
"status": "failed",
"phases": [],
"artifacts": [],
"events": [],
"unresolved": [],
"endedAt": "2026-01-01T00:00:00+00:00",
"error": "something went wrong",
}
md = _render_report_md(report)
assert "Error" in md
assert "something went wrong" in md
# ---------------------------------------------------------------------------
# Integration tests (real DB, mock agent)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_engine_phase_completes_with_valid_artifact(
tmp_path: Path,
personas: list[Any],
artifact_registry: ArtifactSchemaRegistry,
consent_store: PersonaConsentStore,
available_backends: BackendAvailability,
db: Database,
) -> None:
"""Engine: mock agent writes a valid artifact → RunState.COMPLETED + report written."""
template = _minimal_workflow_yaml(tmp_path)
auto_approve = AsyncMock(return_value=ApprovalDecisionAction.APPROVE)
engine = _make_engine(
db, tmp_path, personas, artifact_registry, consent_store, available_backends, auto_approve
)
def _fake_build_agent(
persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any
) -> Any:
run_id_placeholder = uuid4() # placeholder; overwritten by test side-effect below
async def _ainvoke(messages: Any) -> Any:
# Write a valid spec.json to the expected path
expected = root_dir / "artifacts" / "spec.json"
expected.parent.mkdir(parents=True, exist_ok=True)
artifact = _valid_spec_artifact(run_id_placeholder)
content = json.dumps(artifact)
expected.write_text(content, encoding="utf-8")
# Trigger artifact watcher middleware if present
for mw in middleware:
if hasattr(mw, "awrap_tool_call"):
req = MagicMock()
req.tool_call = {
"name": "write_file",
"args": {"file_path": str(expected), "content": content},
"id": "x",
}
await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock()))
return {"messages": []}
agent = MagicMock()
agent.ainvoke = _ainvoke
return agent
with patch("my_deepagent.engine.build_agent", side_effect=_fake_build_agent):
result = await engine.run(
template,
repo_path=tmp_path,
base_branch="main",
requirements_md="test",
)
assert result.state == RunState.COMPLETED
assert result.error is None
assert result.final_report_path is not None
assert result.final_report_path.exists()
@pytest.mark.asyncio
async def test_engine_invalid_artifact_triggers_repair_then_fails(
tmp_path: Path,
personas: list[Any],
artifact_registry: ArtifactSchemaRegistry,
consent_store: PersonaConsentStore,
available_backends: BackendAvailability,
db: Database,
) -> None:
"""Engine: agent always writes invalid JSON → repair 1x → RunState.FAILED."""
template = _minimal_workflow_yaml(tmp_path)
auto_approve = AsyncMock(return_value=ApprovalDecisionAction.APPROVE)
engine = _make_engine(
db, tmp_path, personas, artifact_registry, consent_store, available_backends, auto_approve
)
call_count = 0
def _fake_build_agent(
persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any
) -> Any:
async def _ainvoke(messages: Any) -> Any:
nonlocal call_count
call_count += 1
expected = root_dir / "artifacts" / "spec.json"
expected.parent.mkdir(parents=True, exist_ok=True)
# Write invalid artifact (missing required fields)
invalid = {"wrong_field": "bad data"}
content = json.dumps(invalid)
expected.write_text(content, encoding="utf-8")
for mw in middleware:
if hasattr(mw, "awrap_tool_call"):
req = MagicMock()
req.tool_call = {
"name": "write_file",
"args": {"file_path": str(expected), "content": content},
"id": "x",
}
await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock()))
return {"messages": []}
agent = MagicMock()
agent.ainvoke = _ainvoke
return agent
with patch("my_deepagent.engine.build_agent", side_effect=_fake_build_agent):
result = await engine.run(
template,
repo_path=tmp_path,
base_branch="main",
requirements_md="test",
)
assert result.state == RunState.FAILED
assert result.error is not None
# Agent was invoked twice (original + repair)
assert call_count == 2
@pytest.mark.asyncio
async def test_engine_agent_writes_nothing_exhausts_timeout(
tmp_path: Path,
personas: list[Any],
artifact_registry: ArtifactSchemaRegistry,
consent_store: PersonaConsentStore,
available_backends: BackendAvailability,
db: Database,
) -> None:
"""Engine: agent writes no artifact → timeout x2 → RunState.FAILED + timeout_exhausted."""
template = _minimal_workflow_yaml(tmp_path)
auto_approve = AsyncMock(return_value=ApprovalDecisionAction.APPROVE)
engine = _make_engine(
db, tmp_path, personas, artifact_registry, consent_store, available_backends, auto_approve
)
invoke_count = 0
def _fake_build_agent(
persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any
) -> Any:
async def _ainvoke(messages: Any) -> Any:
nonlocal invoke_count
invoke_count += 1
# Write NOTHING — simulate timeout by returning immediately
return {"messages": []}
agent = MagicMock()
agent.ainvoke = _ainvoke
return agent
with patch("my_deepagent.engine.build_agent", side_effect=_fake_build_agent):
result = await engine.run(
template,
repo_path=tmp_path,
base_branch="main",
)
assert result.state == RunState.FAILED
assert result.error is not None
assert invoke_count == 2
@pytest.mark.asyncio
async def test_engine_approval_reject_fails_run(
tmp_path: Path,
personas: list[Any],
artifact_registry: ArtifactSchemaRegistry,
consent_store: PersonaConsentStore,
available_backends: BackendAvailability,
db: Database,
) -> None:
"""Engine: approval callback returns REJECT → RunState.FAILED + approval_rejected."""
template = _minimal_workflow_yaml(tmp_path, gates=["human"])
reject_cb = AsyncMock(return_value=ApprovalDecisionAction.REJECT)
engine = _make_engine(
db, tmp_path, personas, artifact_registry, consent_store, available_backends, reject_cb
)
def _fake_build_agent(
persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any
) -> Any:
async def _ainvoke(messages: Any) -> Any:
expected = root_dir / "artifacts" / "spec.json"
expected.parent.mkdir(parents=True, exist_ok=True)
artifact = _valid_spec_artifact(uuid4())
content = json.dumps(artifact)
expected.write_text(content, encoding="utf-8")
for mw in middleware:
if hasattr(mw, "awrap_tool_call"):
req = MagicMock()
req.tool_call = {
"name": "write_file",
"args": {"file_path": str(expected), "content": content},
"id": "x",
}
await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock()))
return {"messages": []}
agent = MagicMock()
agent.ainvoke = _ainvoke
return agent
with patch("my_deepagent.engine.build_agent", side_effect=_fake_build_agent):
result = await engine.run(
template,
repo_path=tmp_path,
base_branch="main",
)
assert result.state == RunState.FAILED
assert result.error is not None
@pytest.mark.asyncio
async def test_engine_approval_abort_aborts_run(
tmp_path: Path,
personas: list[Any],
artifact_registry: ArtifactSchemaRegistry,
consent_store: PersonaConsentStore,
available_backends: BackendAvailability,
db: Database,
) -> None:
"""Engine: approval callback returns ABORT → RunState.ABORTED."""
template = _minimal_workflow_yaml(tmp_path, gates=["human"])
abort_cb = AsyncMock(return_value=ApprovalDecisionAction.ABORT)
engine = _make_engine(
db, tmp_path, personas, artifact_registry, consent_store, available_backends, abort_cb
)
def _fake_build_agent(
persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any
) -> Any:
async def _ainvoke(messages: Any) -> Any:
expected = root_dir / "artifacts" / "spec.json"
expected.parent.mkdir(parents=True, exist_ok=True)
artifact = _valid_spec_artifact(uuid4())
content = json.dumps(artifact)
expected.write_text(content, encoding="utf-8")
for mw in middleware:
if hasattr(mw, "awrap_tool_call"):
req = MagicMock()
req.tool_call = {
"name": "write_file",
"args": {"file_path": str(expected), "content": content},
"id": "x",
}
await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock()))
return {"messages": []}
agent = MagicMock()
agent.ainvoke = _ainvoke
return agent
with patch("my_deepagent.engine.build_agent", side_effect=_fake_build_agent):
result = await engine.run(
template,
repo_path=tmp_path,
base_branch="main",
)
assert result.state == RunState.ABORTED
assert result.error is not None