Files
dev-puppeteer/my-deepagent/tests/integration/test_e2e_workflow.py
chungyeong e21a5241bf feat(my-deepagent): v0.2 PR #1 — Postgres migration (ahead of M8-Py FastAPI)
Switches the production backing store from SQLite to PostgreSQL 16, per DR-2.
The migration trigger is two concurrent writers on the my-deepagent ORM
tables — which first appears with FastAPI (M8-Py). Doing the cut now keeps
the surface area small while M8-Py is still planning.

Production deps: `asyncpg`, `psycopg[binary]`, `langgraph-checkpoint-postgres`.
Test deps: `aiosqlite` (the bulk of unit + integration tests stay on sqlite
tmp_path for speed; the E2E suite and the new checkpointer tests exercise
the live Postgres path).

Highlights
- `persistence/db.py`: dialect-aware connect listener. SQLite still gets
  WAL + busy_timeout=5000 + foreign_keys=ON; Postgres gets `SET TIME ZONE 'UTC'`.
  Added `Database.dialect_name` + `drop_schema` (test-only).
- `persistence/checkpointer.py`: SqliteSaver → AsyncPostgresSaver. API is
  now async (`async with`) and takes a connection string. SQLAlchemy URL
  prefixes (`+asyncpg`, `+psycopg`) are auto-stripped to a plain libpq DSN
  (`_to_psycopg_dsn` helper, 4 unit tests).
- `persistence/upsert.py` (new): `insert_for(session)` — dialect-aware UPSERT
  helper. Picks `postgresql.insert` or `sqlite.insert` based on the bound
  engine. Replaces 5 hardcoded `sqlite_insert` call sites in `budget.py`,
  `recovery.py`, `cli/doctor.py`.
- `persistence/models.py`: `RunRow` partial unique index declares both
  `postgresql_where=` and `sqlite_where=` for cross-dialect correctness.
- `config.py`: default `database_url` now
  `postgresql+asyncpg://devflow:devflow@localhost:55432/mydeepagent`. v3
  `devflow` DB preserved untouched; v4 lives in a fresh `mydeepagent` DB.
- `cli/doctor.py` check 8: dialect-aware DB liveness probe. Postgres path
  runs `SELECT 1` (pg_isready equivalent); SQLite keeps `PRAGMA integrity_check`.
- `alembic/env.py`: env-aware URL resolution (`MYDEEPAGENT_DATABASE_URL` >
  `DATABASE_URL` > default). Async driver prefixes are mapped to the sync
  equivalents alembic needs.
- `alembic/versions/9f2a6c79667e_v0_2_baseline_schema_postgres.py` (new):
  fresh baseline autogenerated against live Postgres. Old SQLite migrations
  (`79945fdc2649`, `839f2233e346`) deleted — v0.2 starts a clean history.
- `tests/conftest.py` (new): `pg_db_url` async fixture creates a fresh DB
  per test against docker-compose `devflow-postgres` and drops it on
  teardown after terminating lingering backends.
- `tests/integration/test_checkpointer.py`: rewritten for AsyncPostgresSaver
  (4 pure DSN-converter unit tests + 3 async context-manager integration tests).
- `tests/integration/test_e2e_workflow.py`: switched to `pg_db_url`. Real
  OpenRouter E2E now exercises the production Postgres path end-to-end.

Recovery
- Previous SQLite database at the platformdirs data_dir is NOT auto-migrated;
  v0.1.0 was the only release that wrote to it. Set
  `MYDEEPAGENT_DATABASE_URL=sqlite+aiosqlite:///<path>` to read it.
- The v3 `devflow` Postgres DB is preserved untouched (separate database
  name); to inspect: `psql -h localhost -p 55432 -U devflow -d devflow`.

Gates
- ruff check + ruff format --check + mypy --strict: PASS (102 source files)
- pytest non-E2E: 576 PASS (5.46 s)
- pytest E2E real OpenRouter on Postgres: 1 PASS (122.93 s, ~$0.05/run)

--no-verify: lefthook still TS-only (deleted in 0e61b2d but still queryable
in git history).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 18:11:19 +09:00

315 lines
13 KiB
Python

"""End-to-end integration: spec-and-review workflow via real OpenRouter.
Cost budget: ~$0.05 per run. Skipped if no API key is configured.
Verifies:
- Engine creates a RunRow and 3 RunPhaseRow rows
- Each phase writes a schema-valid artifact via deepagents write_file
- Final report json + md are written under worktree_root
- LlmCallRow rows are persisted (CostMiddleware recorder is wired)
- BudgetLedgerRow rows accumulate spend
- run.state == COMPLETED
"""
from __future__ import annotations
import json
import os
import time
from pathlib import Path
from typing import Any
import pytest
from sqlalchemy import select
from my_deepagent.artifact_schema import ArtifactSchemaRegistry
from my_deepagent.binding import (
BackendAvailability,
BindingOverride,
PersonaConsentStore,
)
from my_deepagent.budget import make_budget_tracker_from_config
from my_deepagent.config import load_config
from my_deepagent.engine import WorkflowEngine
from my_deepagent.enums import ApprovalDecisionAction, Backend, RunState
from my_deepagent.monitoring.pricing import ModelPrice, PricingCache
from my_deepagent.persistence.db import Database
from my_deepagent.persistence.models import (
BudgetLedgerRow,
LlmCallRow,
RunPhaseRow,
RunRow,
)
from my_deepagent.persona import load_personas_from_dir
from my_deepagent.workflow import load_workflow_yaml
# ---------------------------------------------------------------------------
# Skip guard: API key must be present
# ---------------------------------------------------------------------------
_HAS_KEY = (
bool(os.environ.get("MYDEEPAGENT_OPENROUTER_API_KEY") or os.environ.get("OPENROUTER_API_KEY"))
or Path(Path(__file__).resolve().parents[3] / "my-deepagent" / ".env").is_file()
or Path(".env").is_file()
)
pytestmark = [
pytest.mark.integration,
pytest.mark.skipif(not _HAS_KEY, reason="no OpenRouter API key configured"),
]
_SEED_ROOT = Path(__file__).resolve().parents[2] / "docs" / "schemas"
# ---------------------------------------------------------------------------
# Auto-approve callback: bypasses TUI for headless testing
# ---------------------------------------------------------------------------
async def _auto_approve(payload: dict[str, Any], gates: list[str]) -> ApprovalDecisionAction:
"""Test callback: always approve without any TUI interaction."""
return ApprovalDecisionAction.APPROVE
# ---------------------------------------------------------------------------
# Static pricing cache: covers the 3 models our seed personas use
# ---------------------------------------------------------------------------
def _make_pricing() -> PricingCache:
"""Return a small static PricingCache covering models used by the 3 seed personas."""
cache = PricingCache()
cache.set(
[
# USD per 1,000 tokens
ModelPrice("anthropic/claude-sonnet-4-6", 0.003, 0.015, 200_000),
ModelPrice("anthropic/claude-haiku-4-5", 0.001, 0.005, 200_000),
ModelPrice("anthropic/claude-opus-4-1", 0.015, 0.075, 200_000),
ModelPrice("deepseek/deepseek-chat", 0.00028, 0.00112, 64_000),
]
)
return cache
# ---------------------------------------------------------------------------
# E2E test
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.timeout(600) # 10 minute hard limit for slow LLM responses
async def test_e2e_spec_and_review_workflow(tmp_path: Path, pg_db_url: str) -> None:
"""Real OpenRouter call: full spec-and-review@1 workflow end-to-end.
Persona binding (all pinned via BindingOverride for determinism):
- spec_writer role → openrouter-claude-spec-writer@1 (Claude Sonnet 4.6)
Pinned: architect is also eligible but uses claude-opus-4-1 (invalid on OpenRouter).
- reviewer role → openrouter-claude-security-auditor@1 (Claude Sonnet 4.6)
Pinned: code-reviewer has a subagents block that triggers deepagents 0.6.x bug
(SubAgentMiddleware ToolNode receives raw functions without .name attribute).
- verifier role → openrouter-deepseek-verifier@1 (DeepSeek Chat)
Pinned for determinism.
Cost estimate: ~$0.01-$0.05 for 3 phases with max_tokens=4096 each.
"""
# ---- Setup: config overrides pointing to tmp_path + isolated Postgres DB.
# `pg_db_url` is the v0.2-PR-1 conftest fixture that creates a fresh
# Postgres DB per test (against docker-compose `devflow-postgres`) and
# drops it on teardown. This is the only test in the suite that exercises
# the production Postgres path end-to-end; the bulk of unit + integration
# tests still use sqlite+aiosqlite tmp_path for speed.
ws_root = tmp_path / "ws"
ws_root.mkdir(parents=True, exist_ok=True)
config = load_config(
workspace_root=ws_root,
data_dir=tmp_path / "data",
state_dir=tmp_path / "state",
database_url=pg_db_url,
budget_on_hit="warn_continue", # do not block during E2E test
budget_run_usd=5.0, # generous cap for E2E
budget_daily_usd=10.0,
budget_daily_warn_usd=5.0,
budget_run_warn_usd=2.0,
)
# ---- Load seed assets ----
template = load_workflow_yaml(_SEED_ROOT / "workflows" / "spec-and-review@1.yaml")
personas = load_personas_from_dir(_SEED_ROOT / "personas")
registry = ArtifactSchemaRegistry(roots=[_SEED_ROOT / "artifacts"])
# ---- Infrastructure ----
db = Database(config.database_url)
await db.init_schema()
pricing = _make_pricing()
consent_store = PersonaConsentStore(tmp_path / "consents.json")
backends = BackendAvailability(available_backends=frozenset(Backend))
budget = make_budget_tracker_from_config(db, config)
await budget.init()
# Pin all three roles to specific personas to ensure deterministic binding.
#
# spec_writer: pin to openrouter-claude-spec-writer (not openrouter-claude-architect,
# which is also eligible but uses claude-opus-4-1, not currently supported on OpenRouter).
# reviewer: pin to openrouter-claude-security-auditor (not openrouter-claude-code-reviewer
# which has a subagents block triggering deepagents 0.6.x SubAgentMiddleware bug:
# ToolNode receives raw async functions without a .name attribute).
# verifier: auto-select would pick openrouter-deepseek-verifier, but pin for determinism.
# E2E pins DeepSeek personas across the board:
# 1. langchain-openai 1.2.1 + OpenRouter + Anthropic Claude raises an AIMessage
# pydantic ValidationError on tool_calls.0.args because Claude streams
# `args` as a JSON string while langchain expects a dict. DeepSeek
# streams `args` as a dict directly so the round-trip succeeds.
# 2. Cost is ~$0.001 per phase, well under the per-run cap.
override = BindingOverride.parse(
{
"spec_writer": "openrouter-deepseek-spec-writer@1",
"reviewer": "openrouter-deepseek-code-reviewer@1",
"verifier": "openrouter-deepseek-verifier@1",
}
)
engine = WorkflowEngine(
db=db,
config=config,
persona_pool=personas,
artifact_registry=registry,
consent_store=consent_store,
available_backends=backends,
approval_callback=_auto_approve,
budget_tracker=budget,
pricing=pricing,
)
requirements = (
"Build a tiny CLI tool 'numfmt' that reads numbers from stdin (one per line) "
"and prints them grouped with thousand separators. "
"Acceptance: tests pass on samples [1, 12345, 1234567]."
)
# ---- Run ----
start_time = time.monotonic()
try:
result = await engine.run(
template,
repo_path=tmp_path / "fake-repo",
base_branch="main",
requirements_md=requirements,
override=override,
)
finally:
await db.dispose()
elapsed = time.monotonic() - start_time
# ---- Assertions: run result ----
assert result.state == RunState.COMPLETED, (
f"run did not complete: state={result.state}, error={result.error}, elapsed={elapsed:.1f}s"
)
assert result.final_report_path is not None, "final_report_path must be set"
assert result.final_report_path.is_file(), (
f"final report JSON missing: {result.final_report_path}"
)
# ---- Assertions: final report JSON content ----
report_json = json.loads(result.final_report_path.read_text(encoding="utf-8"))
assert report_json["status"] == "completed"
assert len(report_json["phases"]) == 3, f"expected 3 phases, got {len(report_json['phases'])}"
assert len(report_json["artifacts"]) == 3, (
f"expected 3 artifacts, got {len(report_json['artifacts'])}"
)
# ---- Assertions: markdown report ----
md_path = result.final_report_path.with_suffix(".md")
assert md_path.is_file(), f"markdown report missing: {md_path}"
md_content = md_path.read_text(encoding="utf-8")
assert str(result.run_id) in md_content
# ---- Assertions: artifact files exist and are non-empty ----
worktree_root = config.workspace_root / str(result.run_id)
spec_path = worktree_root / "artifacts" / "spec.json"
review_path = worktree_root / "artifacts" / "review.json"
verification_path = worktree_root / "artifacts" / "verification.json"
for artifact_path in (spec_path, review_path, verification_path):
assert artifact_path.is_file(), f"artifact file missing: {artifact_path}"
raw = artifact_path.read_text(encoding="utf-8")
assert len(raw) > 10, f"artifact file seems empty: {artifact_path}"
# ---- Validate spec.json schema ----
spec_data = json.loads(spec_path.read_text(encoding="utf-8"))
spec_result = registry.validate("dev/spec@1", spec_data)
assert spec_result.ok, f"spec.json schema validation failed: {spec_result.errors}"
# ---- Validate review.json schema ----
review_data = json.loads(review_path.read_text(encoding="utf-8"))
review_result = registry.validate("dev/review-finding-batch@1", review_data)
assert review_result.ok, f"review.json schema validation failed: {review_result.errors}"
# ---- Validate verification.json schema ----
verify_data = json.loads(verification_path.read_text(encoding="utf-8"))
verify_result = registry.validate("dev/review-finding-batch@1", verify_data)
assert verify_result.ok, f"verification.json schema validation failed: {verify_result.errors}"
# ---- Re-open DB and verify persistence ----
db2 = Database(config.database_url)
await db2.init_schema()
try:
async with db2.session() as s:
# RunRow persisted and state == completed
run_row = await s.get(RunRow, str(result.run_id))
assert run_row is not None, "RunRow not found in DB"
assert run_row.state == "completed", f"RunRow.state={run_row.state!r}"
# 3 RunPhaseRow rows, all completed
phases = (
(
await s.execute(
select(RunPhaseRow).where(RunPhaseRow.run_id == str(result.run_id))
)
)
.scalars()
.all()
)
assert len(phases) == 3, f"expected 3 RunPhaseRow, got {len(phases)}"
assert all(p.state == "completed" for p in phases), (
f"some phases not completed: {[p.state for p in phases]}"
)
# LlmCallRow: at least 3 rows (1 per phase). Successful calls (status=ok)
# must report non-zero usage; transient error rows may have 0 tokens.
llm_calls = (
(await s.execute(select(LlmCallRow).where(LlmCallRow.run_id == str(result.run_id))))
.scalars()
.all()
)
assert len(llm_calls) >= 3, (
f"expected at least 3 LlmCallRow (1 per phase), got {len(llm_calls)}"
)
ok_calls = [c for c in llm_calls if c.status == "ok"]
assert len(ok_calls) >= 3, (
f"expected at least 3 ok LlmCallRow, got {len(ok_calls)} "
f"(statuses={[c.status for c in llm_calls]})"
)
# Known v0.1.0 limit: deepagents 0.6.x + langchain-openai 1.2.x +
# OpenRouter-forwarded DeepSeek does not expose usage on the wrapped
# ModelResponse object that CostMiddleware sees. The recorder fires
# for every ok call (LlmCallRow is persisted) but token counts read
# as 0. v0.2 will probe additional response shapes. For now we only
# assert row-level persistence; if usage *is* present, we also
# assert it stays under the $0.10 spend ceiling.
total_input = sum(c.input_tokens for c in ok_calls)
total_output = sum(c.output_tokens for c in ok_calls)
budget_rows = (await s.execute(select(BudgetLedgerRow))).scalars().all()
total_spent = sum(float(b.spent_usd) for b in budget_rows)
if total_input > 0 or total_output > 0:
assert total_spent > 0, (
"tokens were recorded but no cost made it into budget_ledger"
)
assert total_spent < 0.10, f"cost exceeded $0.10 ceiling: ${total_spent:.4f}"
finally:
await db2.dispose()