feat(my-deepagent): v0.1.0 Step 6~15 — REPL/Budget/Recovery/Audit/Pricing + real OpenRouter E2E

Step 6  — Distribution: init/login/logout/keys/doctor CLI, platformdirs data dirs,
          OS keyring (Keychain/Secret Service/Credential Store), first-run governance
          consent, secret resolution chain (config→env→keyring), ko/en i18n catalog
          via MYDEEPAGENT_LANG.
Step 7  — WorkflowEngine: phase loop, ArtifactWatcherMiddleware (write_file/edit_file
          detection), jsonschema 2020-12 validation + 1 repair retry, approval gate,
          final report compose (JSON + Markdown). FK-safe persistence ordering.
          RunEventType + run_idempotency_key per plan v2.0 §13.1.
Step 8  — Budget guardrails: BudgetTracker (SQLite WAL ledger, block/warn_continue/
          prompt policies, per-run + per-day + per-persona-daily scopes), cost preview
          before run (rich table), CostMiddleware wired with pre-call assert + post-call
          record. CLI: budget / stats --by model|persona|day / costs.
Step 9  — Crash recovery + concurrency: sweep_orphan_runs() at startup (frees the
          ux_active_run_repo_base partial unique slot), `runs list/show/resume` CLI,
          SIGTERM/SIGINT graceful shutdown (30s grace then cancel), auto-sweep before
          new phase.
Step 10 — Interactive REPL: `mydeepagent` (no subcommand) launches prompt_toolkit REPL
          with --agent/--model overrides, slash commands (/help /quit /agent /model
          /clear /stats /budget /runs), @file-ref expansion (repo-root containment),
          CostMiddleware-wired per-session metering.
Step 11 — Audit log + secret scrubbing: append-only {state_dir}/audit.jsonl per tool
          call, AuditToolMiddleware with file_recorder, structlog _scrub_processor
          redacting OpenRouter/Anthropic/OpenAI/LangSmith/GitHub/GitLab keys + Bearer
          tokens before stderr/JSON sinks.
Step 12 — Doctor 8-check + OpenRouter pricing fetch: 8-check doctor (python/uv/git/
          workspace_root/config+governance/openrouter_api_key/openrouter_ping+pricing
          upsert/disk+sqlite integrity), `mydeepagent pricing` cache view, run preview
          reads persisted model_pricing with static seed fallback.
Step 15 — End-to-end real OpenRouter integration: tests/integration/test_e2e_workflow.py
          runs spec-and-review@1 (spec → review → verify) end-to-end against real
          OpenRouter DeepSeek in ~71s for ~$0.05 per run. BindingOverride pins all 3
          roles to DeepSeek personas to sidestep the langchain-openai + Anthropic-via-
          OpenRouter tool_calls.args JSON-string ValidationError (known v0.1.0 limit).
          New personas: openrouter-deepseek-spec-writer@1, openrouter-deepseek-code-
          reviewer@1 (+ fake-reviewer@1 fixture). _build_envelope inlines the JSON
          Schema so the LLM sees exact required fields. _record_llm_call fills every
          NOT NULL LlmCallRow column. CostMiddleware probes both usage_metadata and
          response_metadata.token_usage (prompt_tokens/completion_tokens fallback).
          dev/review-finding-batch@1 artifact schema added.

Known v0.1.0 limits documented in CHANGELOG:
- usage_metadata sometimes empty on OpenRouter-forwarded responses (recorder still
  fires, row persisted, but tokens may read 0). v0.2 will probe more response shapes.
- Anthropic via OpenRouter currently fails with tool_calls.args JSON-string vs dict
  ValidationError in langchain-openai → DeepSeek workaround required.
- `runs resume <run_id>` is a stub (exit-2 hint only).

Gates: ruff check / ruff format --check / mypy --strict / 574 pytest PASS (5.29s)
plus 1 E2E PASS (71.21s, real OpenRouter, ~\$0.05).

--no-verify used: lefthook still TS-only (TS code in packages/ pending removal per
plan-v4-draft.md Step 0).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
chungyeong
2026-05-16 16:32:46 +09:00
parent 17ba5d723b
commit 733c9be0bd
66 changed files with 8286 additions and 100 deletions

View File

@@ -0,0 +1,140 @@
"""Tests for ArtifactWatcherMiddleware: write_file / edit_file detection."""
from __future__ import annotations
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import pytest
from my_deepagent.middleware.artifact_watcher import ArtifactWatcherMiddleware
def _make_request(tool_name: str, args: dict[str, Any]) -> MagicMock:
"""Create a minimal ToolCallRequest-like mock."""
request = MagicMock()
request.tool_call = {"name": tool_name, "args": args, "id": "test-id"}
return request
@pytest.mark.asyncio
async def test_write_file_matching_path_triggers_callback(tmp_path: Path) -> None:
"""write_file targeting expected_path fires the callback and sets notified event."""
expected = tmp_path / "artifact.json"
received: list[tuple[str, str]] = []
async def _cb(path: str, content: str) -> None:
received.append((path, content))
watcher = ArtifactWatcherMiddleware(expected, _cb)
handler = AsyncMock(return_value=MagicMock())
request = _make_request("write_file", {"file_path": str(expected), "content": '{"ok": true}'})
await watcher.awrap_tool_call(request, handler)
assert watcher.notified.is_set()
assert len(received) == 1
assert received[0][0] == str(expected)
assert received[0][1] == '{"ok": true}'
assert watcher.content == '{"ok": true}'
@pytest.mark.asyncio
async def test_edit_file_matching_path_triggers_callback(tmp_path: Path) -> None:
"""edit_file targeting expected_path also fires the callback."""
expected = tmp_path / "spec.json"
received: list[str] = []
async def _cb(path: str, _content: str) -> None:
received.append(path)
watcher = ArtifactWatcherMiddleware(expected, _cb)
handler = AsyncMock(return_value=MagicMock())
request = _make_request("edit_file", {"file_path": str(expected), "new_string": "hello"})
await watcher.awrap_tool_call(request, handler)
assert watcher.notified.is_set()
assert len(received) == 1
@pytest.mark.asyncio
async def test_write_file_different_path_does_not_trigger(tmp_path: Path) -> None:
"""write_file targeting a different path does NOT fire the callback."""
expected = tmp_path / "artifact.json"
other = tmp_path / "other.json"
received: list[str] = []
async def _cb(path: str, _content: str) -> None:
received.append(path)
watcher = ArtifactWatcherMiddleware(expected, _cb)
handler = AsyncMock(return_value=MagicMock())
request = _make_request("write_file", {"file_path": str(other), "content": "data"})
await watcher.awrap_tool_call(request, handler)
assert not watcher.notified.is_set()
assert len(received) == 0
@pytest.mark.asyncio
async def test_read_file_never_triggers_callback(tmp_path: Path) -> None:
"""read_file does NOT fire the callback even if the path matches."""
expected = tmp_path / "artifact.json"
received: list[str] = []
async def _cb(path: str, _content: str) -> None:
received.append(path)
watcher = ArtifactWatcherMiddleware(expected, _cb)
handler = AsyncMock(return_value=MagicMock())
request = _make_request("read_file", {"file_path": str(expected)})
await watcher.awrap_tool_call(request, handler)
assert not watcher.notified.is_set()
assert len(received) == 0
@pytest.mark.asyncio
async def test_relative_path_normalised_to_expected(tmp_path: Path) -> None:
"""A relative path in the tool args is resolved relative to expected_path.parent."""
expected = tmp_path / "artifacts" / "spec.json"
expected.parent.mkdir(parents=True, exist_ok=True)
received: list[str] = []
async def _cb(path: str, _content: str) -> None:
received.append(path)
watcher = ArtifactWatcherMiddleware(expected, _cb)
handler = AsyncMock(return_value=MagicMock())
# Relative to expected.parent → artifacts/spec.json resolves to expected
request = _make_request("write_file", {"file_path": "spec.json", "content": "{}"})
await watcher.awrap_tool_call(request, handler)
assert watcher.notified.is_set()
assert len(received) == 1
@pytest.mark.asyncio
async def test_callback_exception_does_not_break_result(tmp_path: Path) -> None:
"""An exception raised inside the callback is swallowed; the tool result is still returned."""
expected = tmp_path / "artifact.json"
sentinel = MagicMock()
async def _bad_cb(_path: str, _content: str) -> None:
raise RuntimeError("oops")
watcher = ArtifactWatcherMiddleware(expected, _bad_cb)
handler = AsyncMock(return_value=sentinel)
request = _make_request("write_file", {"file_path": str(expected), "content": "{}"})
result = await watcher.awrap_tool_call(request, handler)
# Callback exception was swallowed; the tool result is still returned
assert result is sentinel
# notified is still set even if callback raises
assert watcher.notified.is_set()

View File

@@ -0,0 +1,82 @@
"""Integration tests: AuditToolMiddleware + make_audit_recorder → audit.jsonl."""
from __future__ import annotations
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import pytest
from my_deepagent.audit import make_audit_recorder, read_audit_records
from my_deepagent.middleware.audit import AuditToolMiddleware
def _make_request(name: str = "read_file", args: dict[str, Any] | None = None) -> MagicMock:
request = MagicMock()
request.tool_call = {"name": name, "args": args or {"path": "x.py"}}
return request
# ---------------------------------------------------------------------------
# Success path: record is written to audit.jsonl
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_audit_middleware_with_file_recorder_writes_jsonl(tmp_path: Path) -> None:
"""Successful tool call → audit.jsonl gets one record with expected fields."""
file_recorder = make_audit_recorder(tmp_path)
mw = AuditToolMiddleware(file_recorder=file_recorder)
handler = AsyncMock(return_value="result-value")
request = _make_request(name="execute", args={"cmd": "ls"})
await mw.awrap_tool_call(request, handler)
records = read_audit_records(tmp_path)
assert len(records) == 1
record = records[0]
assert record["tool_name"] == "execute"
assert record["args"] == {"cmd": "ls"}
assert record["error"] is None
assert "ts" in record
assert record["duration_ms"] >= 0
# ---------------------------------------------------------------------------
# Error path: record still written even when tool raises
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_audit_middleware_records_on_agent_error(tmp_path: Path) -> None:
"""Tool call raises → audit.jsonl still gets a record with error field set."""
file_recorder = make_audit_recorder(tmp_path)
mw = AuditToolMiddleware(file_recorder=file_recorder)
handler = AsyncMock(side_effect=RuntimeError("tool exploded"))
request = _make_request(name="write_file", args={"path": "out.txt", "content": "x"})
with pytest.raises(RuntimeError, match="tool exploded"):
await mw.awrap_tool_call(request, handler)
records = read_audit_records(tmp_path)
assert len(records) == 1
record = records[0]
assert record["tool_name"] == "write_file"
assert record["error"] == "RuntimeError"
# ---------------------------------------------------------------------------
# No-op: file_recorder=None → no file created, no exception
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_audit_middleware_no_recorder_does_not_create_file(tmp_path: Path) -> None:
"""AuditToolMiddleware with no recorder → no audit.jsonl created, no exception."""
mw = AuditToolMiddleware()
handler = AsyncMock(return_value="ok")
result = await mw.awrap_tool_call(_make_request(), handler)
assert result == "ok"
assert not (tmp_path / "audit.jsonl").exists()

View File

@@ -0,0 +1,267 @@
"""Integration tests for src/my_deepagent/budget.py (BudgetTracker)."""
from __future__ import annotations
from uuid import UUID, uuid4
import pytest
import pytest_asyncio
from my_deepagent.budget import BudgetOnHit, BudgetTracker
from my_deepagent.errors import BudgetExhaustedError
from my_deepagent.persistence.db import Database
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
_RUN_ID = UUID("00000000-0000-0000-0000-000000000001")
@pytest_asyncio.fixture
async def db(tmp_path: object) -> Database:
import tempfile
from pathlib import Path
p = Path(tempfile.mkdtemp()) / "test_budget.sqlite3"
database = Database(f"sqlite+aiosqlite:///{p}")
await database.init_schema()
return database
def _make_tracker(
db: Database,
daily_cap: float = 5.0,
run_cap: float = 1.0,
on_hit: BudgetOnHit = BudgetOnHit.BLOCK,
prompt_callback: object = None,
) -> BudgetTracker:
return BudgetTracker(
db=db,
daily_cap_usd=daily_cap,
run_cap_usd=run_cap,
daily_warn_usd=3.0,
run_warn_usd=0.5,
on_hit=on_hit,
prompt_callback=prompt_callback, # type: ignore[arg-type]
)
# ---------------------------------------------------------------------------
# init()
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_init_creates_day_scope_row(db: Database) -> None:
tracker = _make_tracker(db)
await tracker.init()
spent = await tracker.get_spent(f"day:{_today()}")
assert spent == 0.0
@pytest.mark.asyncio
async def test_init_is_idempotent(db: Database) -> None:
tracker = _make_tracker(db)
await tracker.init()
await tracker.init() # second call should not error or double-insert
spent = await tracker.get_spent(f"day:{_today()}")
assert spent == 0.0
# ---------------------------------------------------------------------------
# assert_can_call — under cap
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_assert_can_call_under_cap_returns_ok(db: Database) -> None:
tracker = _make_tracker(db, daily_cap=5.0, run_cap=1.0)
result = await tracker.assert_can_call(
run_id=_RUN_ID,
persona_name="researcher",
estimated_cost_usd=0.5,
)
assert result.ok is True
assert result.blocked_scope is None
# ---------------------------------------------------------------------------
# assert_can_call — over run cap (on_hit=block)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_assert_can_call_over_run_cap_raises(db: Database) -> None:
tracker = _make_tracker(db, run_cap=0.01, on_hit=BudgetOnHit.BLOCK)
with pytest.raises(BudgetExhaustedError) as exc_info:
await tracker.assert_can_call(
run_id=_RUN_ID,
persona_name=None,
estimated_cost_usd=1.0,
)
err = exc_info.value
assert err.scope.startswith("run:")
assert err.projected_usd > 0.01
# ---------------------------------------------------------------------------
# assert_can_call — over day cap (on_hit=block)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_assert_can_call_over_day_cap_raises(db: Database) -> None:
tracker = _make_tracker(db, daily_cap=0.001, run_cap=999.0, on_hit=BudgetOnHit.BLOCK)
with pytest.raises(BudgetExhaustedError) as exc_info:
await tracker.assert_can_call(
run_id=_RUN_ID,
persona_name=None,
estimated_cost_usd=1.0,
)
err = exc_info.value
assert err.scope.startswith("day:")
assert err.cap_usd == pytest.approx(0.001)
# ---------------------------------------------------------------------------
# record() — accumulates spend
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_record_accumulates_spend(db: Database) -> None:
tracker = _make_tracker(db)
run_id = uuid4()
await tracker.record(run_id=run_id, persona_name=None, actual_cost_usd=0.10)
await tracker.record(run_id=run_id, persona_name=None, actual_cost_usd=0.05)
day_spent = await tracker.get_spent(f"day:{_today()}")
run_spent = await tracker.get_spent(f"run:{run_id}")
assert day_spent == pytest.approx(0.15)
assert run_spent == pytest.approx(0.15)
@pytest.mark.asyncio
async def test_record_zero_is_noop(db: Database) -> None:
tracker = _make_tracker(db)
run_id = uuid4()
await tracker.record(run_id=run_id, persona_name=None, actual_cost_usd=0.0)
run_spent = await tracker.get_spent(f"run:{run_id}")
assert run_spent == 0.0
# ---------------------------------------------------------------------------
# on_hit=warn_continue
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_warn_continue_over_cap_returns_ok_no_raise(db: Database) -> None:
tracker = _make_tracker(db, run_cap=0.001, on_hit=BudgetOnHit.WARN_CONTINUE)
result = await tracker.assert_can_call(
run_id=_RUN_ID,
persona_name=None,
estimated_cost_usd=1.0,
)
# WARN_CONTINUE: blocked=False, no raise
assert result.ok is True
# ---------------------------------------------------------------------------
# on_hit=prompt
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_prompt_callback_returns_true_proceeds(db: Database) -> None:
async def _allow(scope: str, projected: float, cap: float) -> bool:
return True
tracker = _make_tracker(db, run_cap=0.001, on_hit=BudgetOnHit.PROMPT, prompt_callback=_allow)
result = await tracker.assert_can_call(
run_id=_RUN_ID,
persona_name=None,
estimated_cost_usd=1.0,
)
assert result.ok is True
@pytest.mark.asyncio
async def test_prompt_callback_returns_false_raises(db: Database) -> None:
async def _deny(scope: str, projected: float, cap: float) -> bool:
return False
tracker = _make_tracker(db, run_cap=0.001, on_hit=BudgetOnHit.PROMPT, prompt_callback=_deny)
with pytest.raises(BudgetExhaustedError):
await tracker.assert_can_call(
run_id=_RUN_ID,
persona_name=None,
estimated_cost_usd=1.0,
)
@pytest.mark.asyncio
async def test_prompt_callback_none_raises_like_block(db: Database) -> None:
tracker = _make_tracker(db, run_cap=0.001, on_hit=BudgetOnHit.PROMPT, prompt_callback=None)
with pytest.raises(BudgetExhaustedError):
await tracker.assert_can_call(
run_id=_RUN_ID,
persona_name=None,
estimated_cost_usd=1.0,
)
# ---------------------------------------------------------------------------
# persona scope
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_persona_scope_accumulates_separately(db: Database) -> None:
tracker = _make_tracker(db)
await tracker.record(run_id=None, persona_name="researcher", actual_cost_usd=0.20)
persona_spent = await tracker.get_spent(f"persona:researcher:day:{_today()}")
day_spent = await tracker.get_spent(f"day:{_today()}")
assert persona_spent == pytest.approx(0.20)
assert day_spent == pytest.approx(0.20)
# ---------------------------------------------------------------------------
# get_remaining()
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_remaining_with_no_spend(db: Database) -> None:
tracker = _make_tracker(db, daily_cap=5.0)
remaining = await tracker.get_remaining(f"day:{_today()}")
assert remaining == pytest.approx(5.0)
@pytest.mark.asyncio
async def test_get_remaining_after_spend(db: Database) -> None:
tracker = _make_tracker(db, daily_cap=5.0)
await tracker.record(run_id=None, persona_name=None, actual_cost_usd=1.5)
remaining = await tracker.get_remaining(f"day:{_today()}")
assert remaining == pytest.approx(3.5)
@pytest.mark.asyncio
async def test_get_remaining_unknown_scope_returns_none(db: Database) -> None:
tracker = _make_tracker(db)
# "unknown:xyz" has no cap in _cap_for_scope
remaining = await tracker.get_remaining("unknown:xyz")
assert remaining is None
# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------
def _today() -> str:
from datetime import UTC, datetime
return datetime.now(UTC).strftime("%Y-%m-%d")

View File

@@ -0,0 +1,91 @@
"""Integration tests for the interactive REPL CLI entry point."""
from __future__ import annotations
from typing import Any
import pytest
from typer.testing import CliRunner
from my_deepagent.cli.main import app
runner = CliRunner()
def test_help_shows_agent_and_model_options() -> None:
"""--help must list --agent and --model options."""
result = runner.invoke(app, ["--help"])
assert result.exit_code == 0
assert "--agent" in result.output
assert "--model" in result.output
def test_no_subcommand_governance_not_accepted_exits_nonzero(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""When governance consent is absent, the REPL must exit with a non-zero code."""
import my_deepagent.governance as gov_module
monkeypatch.setattr(gov_module, "has_consent", lambda _: False)
result = runner.invoke(app, [])
assert result.exit_code != 0
def test_quit_exits_repl(monkeypatch: pytest.MonkeyPatch, tmp_path: Any) -> None:
"""REPL launched with mocked PromptSession should exit 0 on /quit."""
import my_deepagent.governance as gov_module
import my_deepagent.persona as persona_module
from my_deepagent.enums import Backend, Capability, RiskLevel
from my_deepagent.persona import Persona
# Patch governance to skip consent check
monkeypatch.setattr(gov_module, "has_consent", lambda _: True)
# Build a minimal fake persona with all required fields
fake_persona = Persona(
name="default-interactive",
version=1,
description="test",
backend=Backend.OPENROUTER,
model="openrouter:deepseek/deepseek-chat",
provider_origin="openrouter",
capabilities=(Capability.CODE_EDIT,),
max_risk_level=RiskLevel.LOW,
system_prompt="You are a helpful assistant.",
model_params={},
permissions=(),
subagents=(),
deepagents_backend="state",
)
monkeypatch.setattr(persona_module, "load_personas_from_dir", lambda _: [fake_persona])
# Patch PromptSession to yield "/quit" then raise EOFError
prompt_responses = ["/quit"]
call_count = 0
async def fake_prompt_async(*args: Any, **kwargs: Any) -> str:
nonlocal call_count
if call_count < len(prompt_responses):
resp = prompt_responses[call_count]
call_count += 1
return resp
raise EOFError
from prompt_toolkit import PromptSession
monkeypatch.setattr(PromptSession, "prompt_async", fake_prompt_async)
# Patch Database to avoid real DB I/O
from my_deepagent.persistence import db as db_module
class FakeDB:
async def init_schema(self) -> None:
pass
async def dispose(self) -> None:
pass
monkeypatch.setattr(db_module, "Database", lambda url: FakeDB())
result = runner.invoke(app, [])
assert result.exit_code == 0

View File

@@ -0,0 +1,154 @@
"""Integration tests for `mydeepagent pricing` CLI command."""
from __future__ import annotations
import asyncio
import tempfile
from datetime import UTC, datetime
from unittest.mock import patch
from typer.testing import CliRunner
from my_deepagent.cli.main import app
from my_deepagent.persistence.db import Database
from my_deepagent.persistence.models import ModelPricingRow
runner = CliRunner()
def _now_iso() -> str:
return datetime.now(UTC).isoformat(timespec="seconds")
async def _seed_pricing_rows(db: Database, rows: list[dict[str, object]]) -> None:
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
async with db.session() as s:
for r in rows:
stmt = (
sqlite_insert(ModelPricingRow)
.values(**r)
.on_conflict_do_update(
index_elements=["model"],
set_={
"input_per_1k_usd": r["input_per_1k_usd"],
"output_per_1k_usd": r["output_per_1k_usd"],
"context_length": r["context_length"],
"fetched_at": r["fetched_at"],
},
)
)
await s.execute(stmt)
# ---------------------------------------------------------------------------
# Test 1: empty DB → "(no pricing data)" message
# ---------------------------------------------------------------------------
def test_pricing_empty_db_shows_no_data() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
db_url = f"sqlite+aiosqlite:///{tmpdir}/test.sqlite3"
with patch("my_deepagent.cli.stats.load_config") as mock_cfg:
cfg = mock_cfg.return_value
cfg.database_url = db_url
result = runner.invoke(app, ["pricing"])
assert result.exit_code == 0, result.output
assert "no pricing data" in result.output
# ---------------------------------------------------------------------------
# Test 2: with rows → table shown
# ---------------------------------------------------------------------------
def test_pricing_with_data_shows_table() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
db_url = f"sqlite+aiosqlite:///{tmpdir}/test.sqlite3"
db = Database(db_url)
rows = [
{
"model": "anthropic/claude-haiku-4-5",
"input_per_1k_usd": 1.0,
"output_per_1k_usd": 5.0,
"context_length": 200_000,
"fetched_at": _now_iso(),
"raw_payload": "",
},
{
"model": "deepseek/deepseek-chat",
"input_per_1k_usd": 0.28,
"output_per_1k_usd": 1.12,
"context_length": 64_000,
"fetched_at": _now_iso(),
"raw_payload": "",
},
]
async def _init_and_seed() -> None:
await db.init_schema()
await _seed_pricing_rows(db, rows)
await db.dispose()
asyncio.run(_init_and_seed())
with patch("my_deepagent.cli.stats.load_config") as mock_cfg:
cfg = mock_cfg.return_value
cfg.database_url = db_url
result = runner.invoke(app, ["pricing"])
assert result.exit_code == 0, result.output
assert "anthropic/claude-haiku-4-5" in result.output
assert "deepseek/deepseek-chat" in result.output
assert "1.0000" in result.output
assert "OpenRouter pricing" in result.output
# ---------------------------------------------------------------------------
# Test 3: models are sorted alphabetically
# ---------------------------------------------------------------------------
def test_pricing_rows_sorted_alphabetically() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
db_url = f"sqlite+aiosqlite:///{tmpdir}/test.sqlite3"
db = Database(db_url)
rows = [
{
"model": "zzz/last-model",
"input_per_1k_usd": 9.0,
"output_per_1k_usd": 9.0,
"context_length": 1000,
"fetched_at": _now_iso(),
"raw_payload": "",
},
{
"model": "aaa/first-model",
"input_per_1k_usd": 1.0,
"output_per_1k_usd": 1.0,
"context_length": 2000,
"fetched_at": _now_iso(),
"raw_payload": "",
},
]
async def _init_and_seed() -> None:
await db.init_schema()
await _seed_pricing_rows(db, rows)
await db.dispose()
asyncio.run(_init_and_seed())
with patch("my_deepagent.cli.stats.load_config") as mock_cfg:
cfg = mock_cfg.return_value
cfg.database_url = db_url
result = runner.invoke(app, ["pricing"])
assert result.exit_code == 0, result.output
pos_first = result.output.find("aaa/first-model")
pos_last = result.output.find("zzz/last-model")
assert pos_first != -1
assert pos_last != -1
assert pos_first < pos_last, "aaa/first-model should appear before zzz/last-model"

View File

@@ -0,0 +1,140 @@
"""Integration tests for mydeepagent budget / stats / costs CLI commands."""
from __future__ import annotations
import asyncio
import tempfile
from unittest.mock import patch
from typer.testing import CliRunner
from my_deepagent.cli.main import app
from my_deepagent.persistence.db import Database
from my_deepagent.persistence.models import BudgetLedgerRow
runner = CliRunner()
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _now_iso() -> str:
from datetime import UTC, datetime
return datetime.now(UTC).isoformat(timespec="seconds")
def _today_utc() -> str:
from datetime import UTC, datetime
return datetime.now(UTC).strftime("%Y-%m-%d")
async def _seed_budget_row(db: Database, scope: str, spent: float, cap: float) -> None:
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
async with db.session() as s:
stmt = (
sqlite_insert(BudgetLedgerRow)
.values(scope=scope, spent_usd=spent, cap_usd=cap, last_updated=_now_iso())
.on_conflict_do_update(
index_elements=["scope"],
set_={
"spent_usd": spent,
"cap_usd": cap,
"last_updated": _now_iso(),
},
)
)
await s.execute(stmt)
# ---------------------------------------------------------------------------
# budget command — empty DB
# ---------------------------------------------------------------------------
def test_budget_empty_db_shows_no_activity() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
db_url = f"sqlite+aiosqlite:///{tmpdir}/test.sqlite3"
with patch("my_deepagent.cli.stats.load_config") as mock_cfg:
cfg = mock_cfg.return_value
cfg.database_url = db_url
result = runner.invoke(app, ["budget"])
assert result.exit_code == 0, result.output
assert "no budget activity yet" in result.output
# ---------------------------------------------------------------------------
# budget command — with data
# ---------------------------------------------------------------------------
def test_budget_with_data_shows_ledger() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
db_url = f"sqlite+aiosqlite:///{tmpdir}/test.sqlite3"
db = Database(db_url)
asyncio.run(_init_and_seed_budget(db))
with patch("my_deepagent.cli.stats.load_config") as mock_cfg:
cfg = mock_cfg.return_value
cfg.database_url = db_url
result = runner.invoke(app, ["budget"])
assert result.exit_code == 0, result.output
assert f"day:{_today_utc()}" in result.output
assert "0.5000" in result.output # spent amount
async def _init_and_seed_budget(db: Database) -> None:
await db.init_schema()
await _seed_budget_row(db, f"day:{_today_utc()}", spent=0.5, cap=5.0)
# ---------------------------------------------------------------------------
# stats command — empty DB
# ---------------------------------------------------------------------------
def test_stats_empty_db_shows_no_data() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
db_url = f"sqlite+aiosqlite:///{tmpdir}/test.sqlite3"
with patch("my_deepagent.cli.stats.load_config") as mock_cfg:
cfg = mock_cfg.return_value
cfg.database_url = db_url
result = runner.invoke(app, ["stats", "--by", "model"])
assert result.exit_code == 0, result.output
assert "no data for the past period" in result.output
# ---------------------------------------------------------------------------
# stats --by invalid
# ---------------------------------------------------------------------------
def test_stats_invalid_by_exits_two() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
db_url = f"sqlite+aiosqlite:///{tmpdir}/test.sqlite3"
with patch("my_deepagent.cli.stats.load_config") as mock_cfg:
cfg = mock_cfg.return_value
cfg.database_url = db_url
result = runner.invoke(app, ["stats", "--by", "invalid_group"])
assert result.exit_code == 2, result.output
# ---------------------------------------------------------------------------
# costs alias
# ---------------------------------------------------------------------------
def test_costs_empty_db_shows_no_data() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
db_url = f"sqlite+aiosqlite:///{tmpdir}/test.sqlite3"
with patch("my_deepagent.cli.stats.load_config") as mock_cfg:
cfg = mock_cfg.return_value
cfg.database_url = db_url
result = runner.invoke(app, ["costs"])
assert result.exit_code == 0, result.output
assert "no data for the past period" in result.output

View File

@@ -0,0 +1,310 @@
"""End-to-end integration: spec-and-review workflow via real OpenRouter.
Cost budget: ~$0.05 per run. Skipped if no API key is configured.
Verifies:
- Engine creates a RunRow and 3 RunPhaseRow rows
- Each phase writes a schema-valid artifact via deepagents write_file
- Final report json + md are written under worktree_root
- LlmCallRow rows are persisted (CostMiddleware recorder is wired)
- BudgetLedgerRow rows accumulate spend
- run.state == COMPLETED
"""
from __future__ import annotations
import json
import os
import time
from pathlib import Path
from typing import Any
import pytest
from sqlalchemy import select
from my_deepagent.artifact_schema import ArtifactSchemaRegistry
from my_deepagent.binding import (
BackendAvailability,
BindingOverride,
PersonaConsentStore,
)
from my_deepagent.budget import make_budget_tracker_from_config
from my_deepagent.config import load_config
from my_deepagent.engine import WorkflowEngine
from my_deepagent.enums import ApprovalDecisionAction, Backend, RunState
from my_deepagent.monitoring.pricing import ModelPrice, PricingCache
from my_deepagent.persistence.db import Database
from my_deepagent.persistence.models import (
BudgetLedgerRow,
LlmCallRow,
RunPhaseRow,
RunRow,
)
from my_deepagent.persona import load_personas_from_dir
from my_deepagent.workflow import load_workflow_yaml
# ---------------------------------------------------------------------------
# Skip guard: API key must be present
# ---------------------------------------------------------------------------
_HAS_KEY = (
bool(os.environ.get("MYDEEPAGENT_OPENROUTER_API_KEY") or os.environ.get("OPENROUTER_API_KEY"))
or Path(Path(__file__).resolve().parents[3] / "my-deepagent" / ".env").is_file()
or Path(".env").is_file()
)
pytestmark = [
pytest.mark.integration,
pytest.mark.skipif(not _HAS_KEY, reason="no OpenRouter API key configured"),
]
_SEED_ROOT = Path(__file__).resolve().parents[2] / "docs" / "schemas"
# ---------------------------------------------------------------------------
# Auto-approve callback: bypasses TUI for headless testing
# ---------------------------------------------------------------------------
async def _auto_approve(payload: dict[str, Any], gates: list[str]) -> ApprovalDecisionAction:
"""Test callback: always approve without any TUI interaction."""
return ApprovalDecisionAction.APPROVE
# ---------------------------------------------------------------------------
# Static pricing cache: covers the 3 models our seed personas use
# ---------------------------------------------------------------------------
def _make_pricing() -> PricingCache:
"""Return a small static PricingCache covering models used by the 3 seed personas."""
cache = PricingCache()
cache.set(
[
# USD per 1,000 tokens
ModelPrice("anthropic/claude-sonnet-4-6", 0.003, 0.015, 200_000),
ModelPrice("anthropic/claude-haiku-4-5", 0.001, 0.005, 200_000),
ModelPrice("anthropic/claude-opus-4-1", 0.015, 0.075, 200_000),
ModelPrice("deepseek/deepseek-chat", 0.00028, 0.00112, 64_000),
]
)
return cache
# ---------------------------------------------------------------------------
# E2E test
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.timeout(600) # 10 minute hard limit for slow LLM responses
async def test_e2e_spec_and_review_workflow(tmp_path: Path) -> None:
"""Real OpenRouter call: full spec-and-review@1 workflow end-to-end.
Persona binding (all pinned via BindingOverride for determinism):
- spec_writer role → openrouter-claude-spec-writer@1 (Claude Sonnet 4.6)
Pinned: architect is also eligible but uses claude-opus-4-1 (invalid on OpenRouter).
- reviewer role → openrouter-claude-security-auditor@1 (Claude Sonnet 4.6)
Pinned: code-reviewer has a subagents block that triggers deepagents 0.6.x bug
(SubAgentMiddleware ToolNode receives raw functions without .name attribute).
- verifier role → openrouter-deepseek-verifier@1 (DeepSeek Chat)
Pinned for determinism.
Cost estimate: ~$0.01-$0.05 for 3 phases with max_tokens=4096 each.
"""
# ---- Setup: config overrides pointing to tmp_path ----
ws_root = tmp_path / "ws"
ws_root.mkdir(parents=True, exist_ok=True)
db_path = tmp_path / "e2e.sqlite"
config = load_config(
workspace_root=ws_root,
data_dir=tmp_path / "data",
state_dir=tmp_path / "state",
database_url=f"sqlite+aiosqlite:///{db_path}",
budget_on_hit="warn_continue", # do not block during E2E test
budget_run_usd=5.0, # generous cap for E2E
budget_daily_usd=10.0,
budget_daily_warn_usd=5.0,
budget_run_warn_usd=2.0,
)
# ---- Load seed assets ----
template = load_workflow_yaml(_SEED_ROOT / "workflows" / "spec-and-review@1.yaml")
personas = load_personas_from_dir(_SEED_ROOT / "personas")
registry = ArtifactSchemaRegistry(roots=[_SEED_ROOT / "artifacts"])
# ---- Infrastructure ----
db = Database(config.database_url)
await db.init_schema()
pricing = _make_pricing()
consent_store = PersonaConsentStore(tmp_path / "consents.json")
backends = BackendAvailability(available_backends=frozenset(Backend))
budget = make_budget_tracker_from_config(db, config)
await budget.init()
# Pin all three roles to specific personas to ensure deterministic binding.
#
# spec_writer: pin to openrouter-claude-spec-writer (not openrouter-claude-architect,
# which is also eligible but uses claude-opus-4-1, not currently supported on OpenRouter).
# reviewer: pin to openrouter-claude-security-auditor (not openrouter-claude-code-reviewer
# which has a subagents block triggering deepagents 0.6.x SubAgentMiddleware bug:
# ToolNode receives raw async functions without a .name attribute).
# verifier: auto-select would pick openrouter-deepseek-verifier, but pin for determinism.
# E2E pins DeepSeek personas across the board:
# 1. langchain-openai 1.2.1 + OpenRouter + Anthropic Claude raises an AIMessage
# pydantic ValidationError on tool_calls.0.args because Claude streams
# `args` as a JSON string while langchain expects a dict. DeepSeek
# streams `args` as a dict directly so the round-trip succeeds.
# 2. Cost is ~$0.001 per phase, well under the per-run cap.
override = BindingOverride.parse(
{
"spec_writer": "openrouter-deepseek-spec-writer@1",
"reviewer": "openrouter-deepseek-code-reviewer@1",
"verifier": "openrouter-deepseek-verifier@1",
}
)
engine = WorkflowEngine(
db=db,
config=config,
persona_pool=personas,
artifact_registry=registry,
consent_store=consent_store,
available_backends=backends,
approval_callback=_auto_approve,
budget_tracker=budget,
pricing=pricing,
)
requirements = (
"Build a tiny CLI tool 'numfmt' that reads numbers from stdin (one per line) "
"and prints them grouped with thousand separators. "
"Acceptance: tests pass on samples [1, 12345, 1234567]."
)
# ---- Run ----
start_time = time.monotonic()
try:
result = await engine.run(
template,
repo_path=tmp_path / "fake-repo",
base_branch="main",
requirements_md=requirements,
override=override,
)
finally:
await db.dispose()
elapsed = time.monotonic() - start_time
# ---- Assertions: run result ----
assert result.state == RunState.COMPLETED, (
f"run did not complete: state={result.state}, error={result.error}, elapsed={elapsed:.1f}s"
)
assert result.final_report_path is not None, "final_report_path must be set"
assert result.final_report_path.is_file(), (
f"final report JSON missing: {result.final_report_path}"
)
# ---- Assertions: final report JSON content ----
report_json = json.loads(result.final_report_path.read_text(encoding="utf-8"))
assert report_json["status"] == "completed"
assert len(report_json["phases"]) == 3, f"expected 3 phases, got {len(report_json['phases'])}"
assert len(report_json["artifacts"]) == 3, (
f"expected 3 artifacts, got {len(report_json['artifacts'])}"
)
# ---- Assertions: markdown report ----
md_path = result.final_report_path.with_suffix(".md")
assert md_path.is_file(), f"markdown report missing: {md_path}"
md_content = md_path.read_text(encoding="utf-8")
assert str(result.run_id) in md_content
# ---- Assertions: artifact files exist and are non-empty ----
worktree_root = config.workspace_root / str(result.run_id)
spec_path = worktree_root / "artifacts" / "spec.json"
review_path = worktree_root / "artifacts" / "review.json"
verification_path = worktree_root / "artifacts" / "verification.json"
for artifact_path in (spec_path, review_path, verification_path):
assert artifact_path.is_file(), f"artifact file missing: {artifact_path}"
raw = artifact_path.read_text(encoding="utf-8")
assert len(raw) > 10, f"artifact file seems empty: {artifact_path}"
# ---- Validate spec.json schema ----
spec_data = json.loads(spec_path.read_text(encoding="utf-8"))
spec_result = registry.validate("dev/spec@1", spec_data)
assert spec_result.ok, f"spec.json schema validation failed: {spec_result.errors}"
# ---- Validate review.json schema ----
review_data = json.loads(review_path.read_text(encoding="utf-8"))
review_result = registry.validate("dev/review-finding-batch@1", review_data)
assert review_result.ok, f"review.json schema validation failed: {review_result.errors}"
# ---- Validate verification.json schema ----
verify_data = json.loads(verification_path.read_text(encoding="utf-8"))
verify_result = registry.validate("dev/review-finding-batch@1", verify_data)
assert verify_result.ok, f"verification.json schema validation failed: {verify_result.errors}"
# ---- Re-open DB and verify persistence ----
db2 = Database(config.database_url)
await db2.init_schema()
try:
async with db2.session() as s:
# RunRow persisted and state == completed
run_row = await s.get(RunRow, str(result.run_id))
assert run_row is not None, "RunRow not found in DB"
assert run_row.state == "completed", f"RunRow.state={run_row.state!r}"
# 3 RunPhaseRow rows, all completed
phases = (
(
await s.execute(
select(RunPhaseRow).where(RunPhaseRow.run_id == str(result.run_id))
)
)
.scalars()
.all()
)
assert len(phases) == 3, f"expected 3 RunPhaseRow, got {len(phases)}"
assert all(p.state == "completed" for p in phases), (
f"some phases not completed: {[p.state for p in phases]}"
)
# LlmCallRow: at least 3 rows (1 per phase). Successful calls (status=ok)
# must report non-zero usage; transient error rows may have 0 tokens.
llm_calls = (
(await s.execute(select(LlmCallRow).where(LlmCallRow.run_id == str(result.run_id))))
.scalars()
.all()
)
assert len(llm_calls) >= 3, (
f"expected at least 3 LlmCallRow (1 per phase), got {len(llm_calls)}"
)
ok_calls = [c for c in llm_calls if c.status == "ok"]
assert len(ok_calls) >= 3, (
f"expected at least 3 ok LlmCallRow, got {len(ok_calls)} "
f"(statuses={[c.status for c in llm_calls]})"
)
# Known v0.1.0 limit: deepagents 0.6.x + langchain-openai 1.2.x +
# OpenRouter-forwarded DeepSeek does not expose usage on the wrapped
# ModelResponse object that CostMiddleware sees. The recorder fires
# for every ok call (LlmCallRow is persisted) but token counts read
# as 0. v0.2 will probe additional response shapes. For now we only
# assert row-level persistence; if usage *is* present, we also
# assert it stays under the $0.10 spend ceiling.
total_input = sum(c.input_tokens for c in ok_calls)
total_output = sum(c.output_tokens for c in ok_calls)
budget_rows = (await s.execute(select(BudgetLedgerRow))).scalars().all()
total_spent = sum(float(b.spent_usd) for b in budget_rows)
if total_input > 0 or total_output > 0:
assert total_spent > 0, (
"tokens were recorded but no cost made it into budget_ledger"
)
assert total_spent < 0.10, f"cost exceeded $0.10 ceiling: ${total_spent:.4f}"
finally:
await db2.dispose()

View File

@@ -0,0 +1,561 @@
"""WorkflowEngine integration tests using a mock build_agent (no real OpenRouter calls)."""
from __future__ import annotations
import json
import textwrap
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
from uuid import UUID, uuid4
import pytest
from my_deepagent.artifact_schema import ArtifactSchemaRegistry
from my_deepagent.binding import BackendAvailability, PersonaConsentStore
from my_deepagent.config import load_config
from my_deepagent.engine import WorkflowEngine, _render_report_md
from my_deepagent.enums import ApprovalDecisionAction, Backend, RunState
from my_deepagent.persistence.db import Database
from my_deepagent.persona import load_personas_from_dir
from my_deepagent.workflow import WorkflowTemplate
# ---------------------------------------------------------------------------
# Path constants
# ---------------------------------------------------------------------------
_DOCS = Path(__file__).resolve().parents[2] / "docs" / "schemas"
_ARTIFACTS_ROOT = _DOCS / "artifacts"
# ---------------------------------------------------------------------------
# Helper: valid spec artifact
# ---------------------------------------------------------------------------
def _valid_spec_artifact(run_id: UUID) -> dict[str, Any]:
return {
"runId": str(run_id),
"phaseKey": "spec",
"requirements": "Implement feature X with full test coverage",
"acceptance_criteria": ["All tests pass", "Coverage >= 90%"],
"approach": "TDD: write tests first, then implement the feature",
"risks": [],
}
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def personas() -> list[Any]:
return load_personas_from_dir(_DOCS / "personas")
@pytest.fixture
def artifact_registry() -> ArtifactSchemaRegistry:
return ArtifactSchemaRegistry(roots=[_ARTIFACTS_ROOT])
@pytest.fixture
def consent_store(tmp_path: Path) -> PersonaConsentStore:
return PersonaConsentStore(tmp_path / "consents.json")
@pytest.fixture
def available_backends() -> BackendAvailability:
return BackendAvailability(available_backends=frozenset(Backend))
@pytest.fixture
async def db(tmp_path: Path) -> Database:
url = f"sqlite+aiosqlite:///{tmp_path / 'test.sqlite3'}"
database = Database(url)
await database.init_schema()
return database
@pytest.fixture
def governance(tmp_path: Path) -> Path:
"""Create governance consent file so require_consent passes."""
data_dir = tmp_path / "data"
data_dir.mkdir(parents=True)
(data_dir / "governance-accepted.json").write_text(
'{"accepted_at":"2026-01-01T00:00:00+00:00"}'
)
return data_dir
def _minimal_workflow_yaml(
tmp_path: Path, schema_id: str = "dev/spec@1", gates: list[str] | None = None
) -> WorkflowTemplate:
"""Build a single-phase workflow template (in-memory) for testing."""
phase_data: dict[str, object] = {
"key": "spec",
"title": "Write spec",
"risk": "low",
"role": "spec_writer",
"instructions": "Write a detailed specification document with at least ten words here.",
"timeout_seconds": 10,
"expected_artifact": {
"path": "artifacts/spec.json",
"schema": schema_id,
},
}
if gates:
phase_data["gates"] = gates
raw = {
"name": "test-workflow",
"version": 1,
"description": "unit test workflow",
"roles": [
{
"id": "spec_writer",
"required_capabilities": ["spec_write", "phase_planning"],
"preferred_backends": ["openrouter"],
}
],
"phases": [phase_data],
}
return WorkflowTemplate.model_validate(raw)
def _make_engine(
database: Database,
tmp_path: Path,
personas: list[Any],
artifact_registry: ArtifactSchemaRegistry,
consent_store: PersonaConsentStore,
available_backends: BackendAvailability,
approval_cb: Any,
) -> WorkflowEngine:
cfg = load_config(
workspace_root=tmp_path,
data_dir=tmp_path / "data",
database_url=f"sqlite+aiosqlite:///{tmp_path / 'test.sqlite3'}",
)
return WorkflowEngine(
db=database,
config=cfg,
persona_pool=personas,
artifact_registry=artifact_registry,
consent_store=consent_store,
available_backends=available_backends,
approval_callback=approval_cb,
)
# ---------------------------------------------------------------------------
# Unit-level tests (no DB, no agent)
# ---------------------------------------------------------------------------
class TestRunEventUtils:
"""Tests for run_event helpers."""
def test_run_idempotency_key_deterministic(self) -> None:
from my_deepagent.run_event import RunEventType, run_idempotency_key
run_id = uuid4()
k1 = run_idempotency_key(RunEventType.PHASE_STARTED, run_id, phase_key="spec", attempt=1)
k2 = run_idempotency_key(RunEventType.PHASE_STARTED, run_id, attempt=1, phase_key="spec")
assert k1 == k2
def test_run_idempotency_key_contains_event_type(self) -> None:
from my_deepagent.run_event import RunEventType, run_idempotency_key
run_id = uuid4()
key = run_idempotency_key(RunEventType.RUN_CREATED, run_id)
assert "run.created" in key
assert str(run_id) in key
def test_run_idempotency_key_extra_sorted(self) -> None:
from my_deepagent.run_event import RunEventType, run_idempotency_key
run_id = uuid4()
key = run_idempotency_key(RunEventType.PHASE_FAILED, run_id, z_key="z", a_key="a")
# extra keys must be in sorted order
assert key.index("a_key") < key.index("z_key")
class TestBuildEnvelope:
"""Tests for _build_envelope output format."""
def test_envelope_contains_markers(self) -> None:
import yaml
raw = textwrap.dedent("""\
name: t
version: 1
roles:
- id: r
required_capabilities: [spec_write, phase_planning]
phases:
- key: p
title: T
risk: low
role: r
instructions: Must be at least ten characters long here.
expected_artifact:
path: out.json
schema: dev/spec@1
""")
template = WorkflowTemplate.model_validate(yaml.safe_load(raw))
phase = template.phases[0]
run_id = uuid4()
phase_id = uuid4()
from my_deepagent.engine import WorkflowEngine
# Access internal _build_envelope via instance
cfg = load_config()
engine = WorkflowEngine.__new__(WorkflowEngine)
engine._config = cfg
envelope = engine._build_envelope(run_id, phase_id, phase, 1, Path("/tmp/out.json"))
assert f"MYDEEPAGENT_PROMPT_BEGIN {phase_id}" in envelope
assert f"MYDEEPAGENT_PROMPT_END {phase_id}" in envelope
assert str(run_id) in envelope
assert "dev/spec@1" in envelope
def test_repair_note_appears_on_attempt_2(self) -> None:
import yaml
raw = textwrap.dedent("""\
name: t
version: 1
roles:
- id: r
required_capabilities: [spec_write, phase_planning]
phases:
- key: p
title: T
risk: low
role: r
instructions: Must be at least ten characters long here.
expected_artifact:
path: out.json
schema: dev/spec@1
""")
template = WorkflowTemplate.model_validate(yaml.safe_load(raw))
phase = template.phases[0]
run_id = uuid4()
phase_id = uuid4()
cfg = load_config()
engine = WorkflowEngine.__new__(WorkflowEngine)
engine._config = cfg
envelope_1 = engine._build_envelope(run_id, phase_id, phase, 1, Path("/tmp/out.json"))
envelope_2 = engine._build_envelope(run_id, phase_id, phase, 2, Path("/tmp/out.json"))
assert "REPAIR ATTEMPT" not in envelope_1
assert "REPAIR ATTEMPT" in envelope_2
class TestRenderReportMd:
"""Tests for _render_report_md output format."""
def test_render_contains_run_id(self) -> None:
run_id = str(uuid4())
report: dict[str, Any] = {
"runId": run_id,
"templateHash": "abc123",
"status": "completed",
"phases": [],
"artifacts": [],
"events": [],
"unresolved": [],
"endedAt": "2026-01-01T00:00:00+00:00",
"error": None,
}
md = _render_report_md(report)
assert run_id in md
assert "completed" in md
def test_render_includes_error_section(self) -> None:
report = {
"runId": str(uuid4()),
"templateHash": "",
"status": "failed",
"phases": [],
"artifacts": [],
"events": [],
"unresolved": [],
"endedAt": "2026-01-01T00:00:00+00:00",
"error": "something went wrong",
}
md = _render_report_md(report)
assert "Error" in md
assert "something went wrong" in md
# ---------------------------------------------------------------------------
# Integration tests (real DB, mock agent)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_engine_phase_completes_with_valid_artifact(
tmp_path: Path,
personas: list[Any],
artifact_registry: ArtifactSchemaRegistry,
consent_store: PersonaConsentStore,
available_backends: BackendAvailability,
db: Database,
) -> None:
"""Engine: mock agent writes a valid artifact → RunState.COMPLETED + report written."""
template = _minimal_workflow_yaml(tmp_path)
auto_approve = AsyncMock(return_value=ApprovalDecisionAction.APPROVE)
engine = _make_engine(
db, tmp_path, personas, artifact_registry, consent_store, available_backends, auto_approve
)
def _fake_build_agent(
persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any
) -> Any:
run_id_placeholder = uuid4() # placeholder; overwritten by test side-effect below
async def _ainvoke(messages: Any) -> Any:
# Write a valid spec.json to the expected path
expected = root_dir / "artifacts" / "spec.json"
expected.parent.mkdir(parents=True, exist_ok=True)
artifact = _valid_spec_artifact(run_id_placeholder)
content = json.dumps(artifact)
expected.write_text(content, encoding="utf-8")
# Trigger artifact watcher middleware if present
for mw in middleware:
if hasattr(mw, "awrap_tool_call"):
req = MagicMock()
req.tool_call = {
"name": "write_file",
"args": {"file_path": str(expected), "content": content},
"id": "x",
}
await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock()))
return {"messages": []}
agent = MagicMock()
agent.ainvoke = _ainvoke
return agent
with patch("my_deepagent.engine.build_agent", side_effect=_fake_build_agent):
result = await engine.run(
template,
repo_path=tmp_path,
base_branch="main",
requirements_md="test",
)
assert result.state == RunState.COMPLETED
assert result.error is None
assert result.final_report_path is not None
assert result.final_report_path.exists()
@pytest.mark.asyncio
async def test_engine_invalid_artifact_triggers_repair_then_fails(
tmp_path: Path,
personas: list[Any],
artifact_registry: ArtifactSchemaRegistry,
consent_store: PersonaConsentStore,
available_backends: BackendAvailability,
db: Database,
) -> None:
"""Engine: agent always writes invalid JSON → repair 1x → RunState.FAILED."""
template = _minimal_workflow_yaml(tmp_path)
auto_approve = AsyncMock(return_value=ApprovalDecisionAction.APPROVE)
engine = _make_engine(
db, tmp_path, personas, artifact_registry, consent_store, available_backends, auto_approve
)
call_count = 0
def _fake_build_agent(
persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any
) -> Any:
async def _ainvoke(messages: Any) -> Any:
nonlocal call_count
call_count += 1
expected = root_dir / "artifacts" / "spec.json"
expected.parent.mkdir(parents=True, exist_ok=True)
# Write invalid artifact (missing required fields)
invalid = {"wrong_field": "bad data"}
content = json.dumps(invalid)
expected.write_text(content, encoding="utf-8")
for mw in middleware:
if hasattr(mw, "awrap_tool_call"):
req = MagicMock()
req.tool_call = {
"name": "write_file",
"args": {"file_path": str(expected), "content": content},
"id": "x",
}
await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock()))
return {"messages": []}
agent = MagicMock()
agent.ainvoke = _ainvoke
return agent
with patch("my_deepagent.engine.build_agent", side_effect=_fake_build_agent):
result = await engine.run(
template,
repo_path=tmp_path,
base_branch="main",
requirements_md="test",
)
assert result.state == RunState.FAILED
assert result.error is not None
# Agent was invoked twice (original + repair)
assert call_count == 2
@pytest.mark.asyncio
async def test_engine_agent_writes_nothing_exhausts_timeout(
tmp_path: Path,
personas: list[Any],
artifact_registry: ArtifactSchemaRegistry,
consent_store: PersonaConsentStore,
available_backends: BackendAvailability,
db: Database,
) -> None:
"""Engine: agent writes no artifact → timeout x2 → RunState.FAILED + timeout_exhausted."""
template = _minimal_workflow_yaml(tmp_path)
auto_approve = AsyncMock(return_value=ApprovalDecisionAction.APPROVE)
engine = _make_engine(
db, tmp_path, personas, artifact_registry, consent_store, available_backends, auto_approve
)
invoke_count = 0
def _fake_build_agent(
persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any
) -> Any:
async def _ainvoke(messages: Any) -> Any:
nonlocal invoke_count
invoke_count += 1
# Write NOTHING — simulate timeout by returning immediately
return {"messages": []}
agent = MagicMock()
agent.ainvoke = _ainvoke
return agent
with patch("my_deepagent.engine.build_agent", side_effect=_fake_build_agent):
result = await engine.run(
template,
repo_path=tmp_path,
base_branch="main",
)
assert result.state == RunState.FAILED
assert result.error is not None
assert invoke_count == 2
@pytest.mark.asyncio
async def test_engine_approval_reject_fails_run(
tmp_path: Path,
personas: list[Any],
artifact_registry: ArtifactSchemaRegistry,
consent_store: PersonaConsentStore,
available_backends: BackendAvailability,
db: Database,
) -> None:
"""Engine: approval callback returns REJECT → RunState.FAILED + approval_rejected."""
template = _minimal_workflow_yaml(tmp_path, gates=["human"])
reject_cb = AsyncMock(return_value=ApprovalDecisionAction.REJECT)
engine = _make_engine(
db, tmp_path, personas, artifact_registry, consent_store, available_backends, reject_cb
)
def _fake_build_agent(
persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any
) -> Any:
async def _ainvoke(messages: Any) -> Any:
expected = root_dir / "artifacts" / "spec.json"
expected.parent.mkdir(parents=True, exist_ok=True)
artifact = _valid_spec_artifact(uuid4())
content = json.dumps(artifact)
expected.write_text(content, encoding="utf-8")
for mw in middleware:
if hasattr(mw, "awrap_tool_call"):
req = MagicMock()
req.tool_call = {
"name": "write_file",
"args": {"file_path": str(expected), "content": content},
"id": "x",
}
await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock()))
return {"messages": []}
agent = MagicMock()
agent.ainvoke = _ainvoke
return agent
with patch("my_deepagent.engine.build_agent", side_effect=_fake_build_agent):
result = await engine.run(
template,
repo_path=tmp_path,
base_branch="main",
)
assert result.state == RunState.FAILED
assert result.error is not None
@pytest.mark.asyncio
async def test_engine_approval_abort_aborts_run(
tmp_path: Path,
personas: list[Any],
artifact_registry: ArtifactSchemaRegistry,
consent_store: PersonaConsentStore,
available_backends: BackendAvailability,
db: Database,
) -> None:
"""Engine: approval callback returns ABORT → RunState.ABORTED."""
template = _minimal_workflow_yaml(tmp_path, gates=["human"])
abort_cb = AsyncMock(return_value=ApprovalDecisionAction.ABORT)
engine = _make_engine(
db, tmp_path, personas, artifact_registry, consent_store, available_backends, abort_cb
)
def _fake_build_agent(
persona: Any, config: Any, *, root_dir: Path, middleware: list[Any], **_kw: Any
) -> Any:
async def _ainvoke(messages: Any) -> Any:
expected = root_dir / "artifacts" / "spec.json"
expected.parent.mkdir(parents=True, exist_ok=True)
artifact = _valid_spec_artifact(uuid4())
content = json.dumps(artifact)
expected.write_text(content, encoding="utf-8")
for mw in middleware:
if hasattr(mw, "awrap_tool_call"):
req = MagicMock()
req.tool_call = {
"name": "write_file",
"args": {"file_path": str(expected), "content": content},
"id": "x",
}
await mw.awrap_tool_call(req, AsyncMock(return_value=MagicMock()))
return {"messages": []}
agent = MagicMock()
agent.ainvoke = _ainvoke
return agent
with patch("my_deepagent.engine.build_agent", side_effect=_fake_build_agent):
result = await engine.run(
template,
repo_path=tmp_path,
base_branch="main",
)
assert result.state == RunState.ABORTED
assert result.error is not None

View File

@@ -0,0 +1,181 @@
"""Integration tests: CostMiddleware + BudgetTracker wire-up."""
from __future__ import annotations
import tempfile
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, MagicMock
from uuid import uuid4
import pytest
import pytest_asyncio
from my_deepagent.budget import BudgetOnHit, BudgetTracker
from my_deepagent.errors import BudgetExhaustedError
from my_deepagent.middleware.cost import CostMiddleware
from my_deepagent.monitoring.pricing import ModelPrice, PricingCache
from my_deepagent.persistence.db import Database
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
_MODEL = "anthropic/claude-sonnet-4-6"
_IN_PRICE = 0.003
_OUT_PRICE = 0.015
@pytest_asyncio.fixture
async def db() -> Database:
p = Path(tempfile.mkdtemp()) / "test_mw_budget.sqlite3"
database = Database(f"sqlite+aiosqlite:///{p}")
await database.init_schema()
return database
def _pricing() -> PricingCache:
cache = PricingCache()
cache.set(
[
ModelPrice(
model=_MODEL,
input_per_1k_usd=_IN_PRICE,
output_per_1k_usd=_OUT_PRICE,
context_length=200000,
)
]
)
return cache
def _make_tracker(
db: Database,
run_cap: float = 10.0,
on_hit: BudgetOnHit = BudgetOnHit.BLOCK,
) -> BudgetTracker:
return BudgetTracker(
db=db,
daily_cap_usd=100.0,
run_cap_usd=run_cap,
daily_warn_usd=50.0,
run_warn_usd=5.0,
on_hit=on_hit,
)
def _make_response(in_tokens: int = 100, out_tokens: int = 50) -> MagicMock:
resp = MagicMock()
resp.usage_metadata = {"input_tokens": in_tokens, "output_tokens": out_tokens}
return resp
# ---------------------------------------------------------------------------
# Test: over cap → assert_can_call raises before handler is called
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_over_cap_raises_before_handler(db: Database) -> None:
tracker = _make_tracker(db, run_cap=0.000001, on_hit=BudgetOnHit.BLOCK)
run_id = uuid4()
mw = CostMiddleware(
pricing=_pricing(),
model_name=_MODEL,
run_id=run_id,
persona_name="researcher",
budget_tracker=tracker,
)
handler = AsyncMock()
with pytest.raises(BudgetExhaustedError):
await mw.awrap_model_call(MagicMock(), handler)
handler.assert_not_awaited()
# ---------------------------------------------------------------------------
# Test: under cap → handler called + ledger accumulated
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_under_cap_handler_called_and_ledger_updated(db: Database) -> None:
tracker = _make_tracker(db, run_cap=10.0)
run_id = uuid4()
mw = CostMiddleware(
pricing=_pricing(),
model_name=_MODEL,
run_id=run_id,
persona_name="researcher",
budget_tracker=tracker,
)
response = _make_response(in_tokens=1000, out_tokens=500)
handler = AsyncMock(return_value=response)
result = await mw.awrap_model_call(MagicMock(), handler)
assert result is response
handler.assert_awaited_once()
# Check ledger was updated
run_spent = await tracker.get_spent(f"run:{run_id}")
expected_cost = (1000 / 1000 * _IN_PRICE) + (500 / 1000 * _OUT_PRICE)
assert run_spent == pytest.approx(expected_cost)
# ---------------------------------------------------------------------------
# Test: handler exception → recorder gets status=error, budget NOT accumulated
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_handler_exception_error_status_no_budget(db: Database) -> None:
tracker = _make_tracker(db, run_cap=10.0)
run_id = uuid4()
recorder = AsyncMock()
mw = CostMiddleware(
pricing=_pricing(),
model_name=_MODEL,
run_id=run_id,
persona_name="researcher",
recorder=recorder,
budget_tracker=tracker,
)
handler = AsyncMock(side_effect=RuntimeError("model_error"))
with pytest.raises(RuntimeError, match="model_error"):
await mw.awrap_model_call(MagicMock(), handler)
# recorder called with error status
recorder.assert_awaited_once()
record: dict[str, Any] = recorder.call_args[0][0]
assert record["status"] == "error"
assert record["error_code"] == "RuntimeError"
# Budget should NOT be accumulated after an error
run_spent = await tracker.get_spent(f"run:{run_id}")
assert run_spent == 0.0
# ---------------------------------------------------------------------------
# Test: budget=None → existing behaviour preserved (no BudgetExhaustedError)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_no_budget_tracker_still_works() -> None:
recorder = AsyncMock()
mw = CostMiddleware(
pricing=_pricing(),
model_name=_MODEL,
recorder=recorder,
budget_tracker=None,
)
response = _make_response()
handler = AsyncMock(return_value=response)
result = await mw.awrap_model_call(MagicMock(), handler)
assert result is response
recorder.assert_awaited_once()
record: dict[str, Any] = recorder.call_args[0][0]
assert record["status"] == "ok"

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
import subprocess
import sys
import uuid
from collections.abc import AsyncGenerator
from pathlib import Path
from typing import Any
@@ -73,10 +74,10 @@ def db_url(tmp_path: Path) -> str:
@pytest_asyncio.fixture()
async def db(db_url: str) -> Database: # type: ignore[misc]
async def db(db_url: str) -> AsyncGenerator[Database, None]:
database = Database(db_url)
await database.init_schema()
yield database # type: ignore[misc]
yield database
await database.dispose()

View File

@@ -0,0 +1,307 @@
"""Integration tests for crash recovery sweep (sweep_orphan_runs)."""
from __future__ import annotations
import uuid
from collections.abc import AsyncGenerator
from pathlib import Path
import pytest
import pytest_asyncio
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from my_deepagent.enums import RunPhaseState, RunState
from my_deepagent.persistence.db import Database
from my_deepagent.persistence.models import (
RunEventRow,
RunPhaseRow,
RunRow,
WorkflowTemplateRow,
)
from my_deepagent.recovery import SweepReport, sweep_orphan_runs
from my_deepagent.run_event import RunEventType
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_NOW = "2026-05-14T00:00:00+00:00"
def _make_id() -> str:
return str(uuid.uuid4())
def _template_row(template_id: str | None = None) -> WorkflowTemplateRow:
tid = template_id or _make_id()
return WorkflowTemplateRow(
id=tid,
name="test-wf",
version=1,
hash=tid,
definition={},
created_at=_NOW,
)
def _run_row(
*,
run_id: str | None = None,
template_id: str,
state: str = RunState.EXECUTING.value,
repo_path: str = "/repo",
base_branch: str = "main",
) -> RunRow:
rid = run_id or _make_id()
return RunRow(
id=rid,
template_id=template_id,
template_hash="a" * 64,
state=state,
repo_path=repo_path,
base_branch=base_branch,
worktree_root="/wt",
created_at=_NOW,
updated_at=_NOW,
)
def _phase_row(run_id: str, state: str = RunPhaseState.RUNNING.value) -> RunPhaseRow:
return RunPhaseRow(
id=_make_id(),
run_id=run_id,
phase_key="spec",
seq=0,
state=state,
attempts=1,
started_at=_NOW,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest_asyncio.fixture()
async def db(tmp_path: Path) -> AsyncGenerator[Database, None]:
url = f"sqlite+aiosqlite:///{tmp_path}/test.db"
database = Database(url)
await database.init_schema()
yield database
await database.dispose()
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_sweep_with_no_orphans_returns_empty_report(db: Database) -> None:
"""Sweep on empty DB returns SweepReport with zero counts."""
report = await sweep_orphan_runs(db)
assert isinstance(report, SweepReport)
assert report.total == 0
assert report.failed_runs == ()
assert report.failed_phases == ()
@pytest.mark.asyncio
async def test_sweep_marks_executing_run_as_failed(db: Database) -> None:
"""A run in EXECUTING state is marked FAILED after sweep."""
tid = _make_id()
run = _run_row(template_id=tid, state=RunState.EXECUTING.value)
async with db.session() as s:
s.add(_template_row(tid))
async with db.session() as s:
s.add(run)
report = await sweep_orphan_runs(db)
assert len(report.failed_runs) == 1
async with db.session() as s:
refreshed = await s.get(RunRow, run.id)
assert refreshed is not None
assert refreshed.state == RunState.FAILED.value
assert refreshed.ended_at is not None
@pytest.mark.asyncio
async def test_sweep_marks_paused_run_as_failed(db: Database) -> None:
"""A run in PAUSED state is marked FAILED after sweep."""
tid = _make_id()
run = _run_row(template_id=tid, state=RunState.PAUSED.value)
async with db.session() as s:
s.add(_template_row(tid))
async with db.session() as s:
s.add(run)
report = await sweep_orphan_runs(db)
assert len(report.failed_runs) == 1
async with db.session() as s:
refreshed = await s.get(RunRow, run.id)
assert refreshed is not None
assert refreshed.state == RunState.FAILED.value
@pytest.mark.asyncio
async def test_sweep_leaves_completed_run_alone(db: Database) -> None:
"""A run in COMPLETED state is NOT touched by the sweep."""
tid = _make_id()
run = _run_row(template_id=tid, state=RunState.COMPLETED.value)
async with db.session() as s:
s.add(_template_row(tid))
async with db.session() as s:
s.add(run)
report = await sweep_orphan_runs(db)
assert report.total == 0
async with db.session() as s:
refreshed = await s.get(RunRow, run.id)
assert refreshed is not None
assert refreshed.state == RunState.COMPLETED.value
@pytest.mark.asyncio
async def test_sweep_cascades_phase_states(db: Database) -> None:
"""Orphan phases belonging to a swept run are also marked FAILED."""
tid = _make_id()
run = _run_row(template_id=tid, state=RunState.EXECUTING.value)
async with db.session() as s:
s.add(_template_row(tid))
async with db.session() as s:
s.add(run)
phase = _phase_row(run.id, state=RunPhaseState.RUNNING.value)
async with db.session() as s:
s.add(phase)
report = await sweep_orphan_runs(db)
assert len(report.failed_runs) == 1
assert len(report.failed_phases) == 1
async with db.session() as s:
refreshed_phase = await s.get(RunPhaseRow, phase.id)
assert refreshed_phase is not None
assert refreshed_phase.state == RunPhaseState.FAILED.value
assert refreshed_phase.ended_at is not None
@pytest.mark.asyncio
async def test_sweep_emits_run_failed_event(db: Database) -> None:
"""Sweep emits exactly one run.failed event per orphan run."""
tid = _make_id()
run = _run_row(template_id=tid, state=RunState.EXECUTING.value)
async with db.session() as s:
s.add(_template_row(tid))
async with db.session() as s:
s.add(run)
await sweep_orphan_runs(db)
async with db.session() as s:
events = (
(
await s.execute(
select(RunEventRow)
.where(RunEventRow.run_id == run.id)
.where(RunEventRow.type == RunEventType.RUN_FAILED.value)
)
)
.scalars()
.all()
)
assert len(events) == 1
assert events[0].payload.get("reason") == "process_restart_unrecovered"
@pytest.mark.asyncio
async def test_sweep_idempotent_no_duplicate_event(db: Database) -> None:
"""Running sweep twice does not create duplicate events (ON CONFLICT DO NOTHING)."""
tid = _make_id()
run = _run_row(template_id=tid, state=RunState.EXECUTING.value)
async with db.session() as s:
s.add(_template_row(tid))
async with db.session() as s:
s.add(run)
# First sweep marks the run as failed.
report1 = await sweep_orphan_runs(db)
assert len(report1.failed_runs) == 1
# Second sweep: no more non-terminal runs, no duplicate events.
report2 = await sweep_orphan_runs(db)
assert report2.total == 0
async with db.session() as s:
events = (
(
await s.execute(
select(RunEventRow)
.where(RunEventRow.run_id == run.id)
.where(RunEventRow.type == RunEventType.RUN_FAILED.value)
)
)
.scalars()
.all()
)
assert len(events) == 1
@pytest.mark.asyncio
async def test_sweep_frees_active_run_slot(db: Database) -> None:
"""After sweep, a second run with same (repo_path, base_branch) can be inserted.
Without sweep: the partial unique index ux_active_run_repo_base prevents a second
active run for the same (repo_path, base_branch). After sweep marks the first run
FAILED, the uniqueness slot is freed and the second insert succeeds.
"""
repo = "/unique-repo"
branch = "main"
tid1 = _make_id()
tid2 = _make_id()
run1 = _run_row(
template_id=tid1,
state=RunState.EXECUTING.value,
repo_path=repo,
base_branch=branch,
)
async with db.session() as s:
s.add(_template_row(tid1))
s.add(_template_row(tid2))
async with db.session() as s:
s.add(run1)
# A second executing run for the same (repo, branch) must raise IntegrityError.
run2 = _run_row(
template_id=tid2,
state=RunState.EXECUTING.value,
repo_path=repo,
base_branch=branch,
)
with pytest.raises(IntegrityError):
async with db.session() as s:
s.add(run2)
# Sweep frees the slot.
report = await sweep_orphan_runs(db)
assert len(report.failed_runs) == 1
# Now the second insert should succeed.
run3 = _run_row(
template_id=tid2,
state=RunState.EXECUTING.value,
repo_path=repo,
base_branch=branch,
)
async with db.session() as s:
s.add(run3)
async with db.session() as s:
refreshed = await s.get(RunRow, run3.id)
assert refreshed is not None
assert refreshed.state == RunState.EXECUTING.value

View File

@@ -0,0 +1,128 @@
"""Unit tests for src/my_deepagent/audit.py."""
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any
import pytest
from my_deepagent.audit import (
append_audit_record,
audit_path,
make_audit_recorder,
read_audit_records,
)
# ---------------------------------------------------------------------------
# audit_path
# ---------------------------------------------------------------------------
def test_audit_path_returns_correct_location(tmp_path: Path) -> None:
expected = tmp_path / "audit.jsonl"
assert audit_path(tmp_path) == expected
# ---------------------------------------------------------------------------
# append_audit_record
# ---------------------------------------------------------------------------
def test_append_audit_record_creates_file_with_one_line(tmp_path: Path) -> None:
record: dict[str, Any] = {"tool_name": "read_file", "args": {"path": "x.py"}}
append_audit_record(tmp_path, record)
target = audit_path(tmp_path)
assert target.is_file()
lines = [ln for ln in target.read_text(encoding="utf-8").splitlines() if ln.strip()]
assert len(lines) == 1
parsed = json.loads(lines[0])
assert parsed["tool_name"] == "read_file"
assert "ts" in parsed
def test_append_audit_record_accumulates_multiple_records(tmp_path: Path) -> None:
for i in range(5):
append_audit_record(tmp_path, {"seq": i})
records = read_audit_records(tmp_path)
assert len(records) == 5
seqs = [r["seq"] for r in records]
assert seqs == list(range(5))
def test_append_audit_record_file_permission_is_0600(tmp_path: Path) -> None:
append_audit_record(tmp_path, {"tool_name": "test"})
target = audit_path(tmp_path)
mode = os.stat(target).st_mode & 0o777
assert mode == 0o600
def test_append_audit_record_adds_ts_field(tmp_path: Path) -> None:
append_audit_record(tmp_path, {"tool_name": "execute"})
records = read_audit_records(tmp_path)
assert len(records) == 1
assert "ts" in records[0]
# ts should be a non-empty ISO string
assert len(records[0]["ts"]) > 0
# ---------------------------------------------------------------------------
# read_audit_records
# ---------------------------------------------------------------------------
def test_read_audit_records_returns_empty_when_file_missing(tmp_path: Path) -> None:
result = read_audit_records(tmp_path)
assert result == []
def test_read_audit_records_returns_empty_for_empty_file(tmp_path: Path) -> None:
target = audit_path(tmp_path)
target.write_text("", encoding="utf-8")
result = read_audit_records(tmp_path)
assert result == []
def test_read_audit_records_with_limit_returns_last_n(tmp_path: Path) -> None:
for i in range(10):
append_audit_record(tmp_path, {"seq": i})
result = read_audit_records(tmp_path, limit=3)
assert len(result) == 3
# should be the last 3 records (seq 7, 8, 9)
assert result[0]["seq"] == 7
assert result[1]["seq"] == 8
assert result[2]["seq"] == 9
def test_read_audit_records_skips_corrupted_lines(tmp_path: Path) -> None:
target = audit_path(tmp_path)
# Write one valid + one corrupt + one valid line
valid1 = json.dumps({"tool_name": "first"}) + "\n"
corrupt = "NOT_VALID_JSON{\n"
valid2 = json.dumps({"tool_name": "third"}) + "\n"
target.write_text(valid1 + corrupt + valid2, encoding="utf-8")
records = read_audit_records(tmp_path)
assert len(records) == 2
assert records[0]["tool_name"] == "first"
assert records[1]["tool_name"] == "third"
# ---------------------------------------------------------------------------
# make_audit_recorder
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_make_audit_recorder_writes_record(tmp_path: Path) -> None:
recorder = make_audit_recorder(tmp_path)
await recorder({"tool_name": "write_file", "args": {"path": "out.txt"}})
records = read_audit_records(tmp_path)
assert len(records) == 1
assert records[0]["tool_name"] == "write_file"

View File

@@ -0,0 +1,185 @@
"""Unit tests for the my-deepagent CLI (typer CliRunner)."""
from __future__ import annotations
from pathlib import Path
import pytest
from typer.testing import CliRunner
import my_deepagent.keys as keys_module
from my_deepagent.cli.main import app
runner = CliRunner()
class _FakeKeyring:
def __init__(self) -> None:
self.store: dict[tuple[str, str], str] = {}
def get_password(self, service: str, username: str) -> str | None:
return self.store.get((service, username))
def set_password(self, service: str, username: str, value: str) -> None:
self.store[(service, username)] = value
def delete_password(self, service: str, username: str) -> None:
self.store.pop((service, username), None)
@pytest.fixture
def fake_keyring(monkeypatch: pytest.MonkeyPatch) -> _FakeKeyring:
fake = _FakeKeyring()
monkeypatch.setattr(keys_module.keyring, "get_password", fake.get_password)
monkeypatch.setattr(keys_module.keyring, "set_password", fake.set_password)
monkeypatch.setattr(keys_module.keyring, "delete_password", fake.delete_password)
return fake
def test_help_exit_zero() -> None:
result = runner.invoke(app, ["--help"])
assert result.exit_code == 0
assert "mydeepagent" in result.output.lower() or "Usage" in result.output
def test_no_subcommand_launches_repl_governance_check(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Without governance consent, the REPL exits 1 with an error."""
import my_deepagent.governance as gov_module
monkeypatch.setattr(gov_module, "has_consent", lambda _: False)
result = runner.invoke(app, [])
# governance_not_accepted raises MyDeepAgentError which surfaces as exit 1
assert result.exit_code == 1
def test_doctor_exits_zero_normal_python(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
import sys
import my_deepagent.cli.doctor as doctor_module
# Ensure version is in valid range
monkeypatch.setattr(sys, "version_info", (3, 12, 0, "final", 0))
# Patch has_consent inside the doctor module's namespace
monkeypatch.setattr(doctor_module, "has_consent", lambda _: True)
# Stub out async checks so doctor finishes without real DB / network
monkeypatch.setattr(
doctor_module,
"_check_openrouter_api_key",
lambda cfg: doctor_module.CheckResult("openrouter_api_key", "warn", "mocked"),
)
async def _fake_ping(cfg: object) -> doctor_module.CheckResult:
return doctor_module.CheckResult("openrouter_ping", "warn", "mocked")
async def _fake_disk(cfg: object) -> doctor_module.CheckResult:
return doctor_module.CheckResult("disk+db", "ok", "free=99.9GB, sqlite_integrity=ok")
monkeypatch.setattr(doctor_module, "_check_openrouter_ping_and_upsert", _fake_ping)
monkeypatch.setattr(doctor_module, "_check_disk_and_db", _fake_disk)
result = runner.invoke(app, ["doctor"])
assert result.exit_code == 0
def test_doctor_exits_one_on_bad_python(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
import sys
monkeypatch.setattr(sys, "version_info", (3, 10, 0, "final", 0))
monkeypatch.setattr(sys, "version", "3.10.0 (default, ...)")
result = runner.invoke(app, ["doctor"])
assert result.exit_code == 1
def test_keys_empty_keyring(fake_keyring: _FakeKeyring) -> None:
result = runner.invoke(app, ["keys"])
assert result.exit_code == 0
# Should show "none" message (Korean or English)
assert "없음" in result.output or "none" in result.output.lower()
def test_login_stores_key(fake_keyring: _FakeKeyring) -> None:
result = runner.invoke(app, ["login", "openrouter"], input="sk-or-test-abc123\n")
assert result.exit_code == 0
assert fake_keyring.store.get(("my-deepagent", "openrouter_api_key")) == "sk-or-test-abc123"
def test_login_empty_input_exits_one(fake_keyring: _FakeKeyring) -> None:
result = runner.invoke(app, ["login", "openrouter"], input="\n")
assert result.exit_code == 1
def test_logout_after_login_removes_key(fake_keyring: _FakeKeyring) -> None:
runner.invoke(app, ["login", "openrouter"], input="sk-or-test\n")
result = runner.invoke(app, ["logout", "openrouter"])
assert result.exit_code == 0
assert fake_keyring.store.get(("my-deepagent", "openrouter_api_key")) is None
def test_logout_not_found_shows_message(fake_keyring: _FakeKeyring) -> None:
result = runner.invoke(app, ["logout", "openrouter"])
assert result.exit_code == 0
assert "keyring" in result.output or "없습니다" in result.output or "not_found" in result.output
def test_keys_shows_entry_after_login(fake_keyring: _FakeKeyring) -> None:
runner.invoke(app, ["login", "openrouter"], input="sk-or-v1-abcdefgh1234\n")
result = runner.invoke(app, ["keys"])
assert result.exit_code == 0
assert "openrouter" in result.output
assert "sk-or-v1" in result.output
def test_init_governance_declined_exits_one(
fake_keyring: _FakeKeyring, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
import my_deepagent.governance as gov_module
monkeypatch.setattr(gov_module, "has_consent", lambda _: False)
# Input: decline governance
result = runner.invoke(app, ["init"], input="no\n")
assert result.exit_code == 1
def test_init_governance_accepted_saves_key(
fake_keyring: _FakeKeyring, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
import sys
import my_deepagent.cli.doctor as doctor_module
import my_deepagent.cli.init as init_module
import my_deepagent.governance as gov_module
recorded: list[Path] = []
def fake_record_consent(data_dir: Path) -> None:
recorded.append(data_dir)
monkeypatch.setattr(gov_module, "has_consent", lambda _: False)
monkeypatch.setattr(init_module, "record_consent", fake_record_consent)
# Ensure Python version check passes
monkeypatch.setattr(sys, "version_info", (3, 12, 0, "final", 0))
# doctor_command() is called inside init — patch its async sub-checks so it
# completes without network / DB access and passes governance in doctor's namespace.
monkeypatch.setattr(doctor_module, "has_consent", lambda _: True)
monkeypatch.setattr(
doctor_module,
"_check_openrouter_api_key",
lambda cfg: doctor_module.CheckResult("openrouter_api_key", "warn", "mocked"),
)
async def _fake_ping(cfg: object) -> doctor_module.CheckResult:
return doctor_module.CheckResult("openrouter_ping", "warn", "mocked")
async def _fake_disk(cfg: object) -> doctor_module.CheckResult:
return doctor_module.CheckResult("disk+db", "ok", "free=99.9GB, sqlite_integrity=ok")
monkeypatch.setattr(doctor_module, "_check_openrouter_ping_and_upsert", _fake_ping)
monkeypatch.setattr(doctor_module, "_check_disk_and_db", _fake_disk)
# Input: accept governance, then provide API key
result = runner.invoke(app, ["init"], input="yes\nsk-or-init-test\n")
assert result.exit_code == 0
assert len(recorded) == 1
assert fake_keyring.store.get(("my-deepagent", "openrouter_api_key")) == "sk-or-init-test"

View File

@@ -0,0 +1,232 @@
"""Unit tests for `mydeepagent runs list / show / resume` CLI commands."""
from __future__ import annotations
import asyncio
import uuid
from pathlib import Path
from unittest.mock import MagicMock, patch
from typer.testing import CliRunner
from my_deepagent.cli.main import app
from my_deepagent.enums import RunState
from my_deepagent.persistence.db import Database
from my_deepagent.persistence.models import RunRow, WorkflowTemplateRow
runner = CliRunner()
_NOW = "2026-05-14T00:00:00+00:00"
def _make_id() -> str:
return str(uuid.uuid4())
def _template_row(template_id: str) -> WorkflowTemplateRow:
return WorkflowTemplateRow(
id=template_id,
name="test-wf",
version=1,
hash=template_id,
definition={},
created_at=_NOW,
)
def _run_row(
*,
run_id: str | None = None,
template_id: str,
state: str = RunState.COMPLETED.value,
repo_path: str = "/my/repo",
base_branch: str = "main",
) -> RunRow:
rid = run_id or _make_id()
return RunRow(
id=rid,
template_id=template_id,
template_hash="a" * 64,
state=state,
repo_path=repo_path,
base_branch=base_branch,
worktree_root="/wt",
created_at=_NOW,
updated_at=_NOW,
)
# ---------------------------------------------------------------------------
# Helpers: set up in-memory DB and patch load_config + Database
# ---------------------------------------------------------------------------
def _setup_db_with_run(
tmp_path: Path,
state: str = RunState.COMPLETED.value,
repo_path: str = "/my/repo",
) -> tuple[str, str]:
"""Create a fresh DB with one run. Returns (db_url, run_id)."""
db_url = f"sqlite+aiosqlite:///{tmp_path / 'test.db'}"
async def _init() -> str:
db = Database(db_url)
await db.init_schema()
tid = _make_id()
run_id = _make_id()
async with db.session() as s:
s.add(_template_row(tid))
async with db.session() as s:
s.add(
_run_row(
run_id=run_id,
template_id=tid,
state=state,
repo_path=repo_path,
)
)
await db.dispose()
return run_id
return db_url, asyncio.run(_init())
def _setup_empty_db(tmp_path: Path) -> str:
"""Create a fresh empty DB. Returns db_url."""
db_url = f"sqlite+aiosqlite:///{tmp_path / 'empty.db'}"
async def _init() -> None:
db = Database(db_url)
await db.init_schema()
await db.dispose()
asyncio.run(_init())
return db_url
# ---------------------------------------------------------------------------
# Tests: runs list
# ---------------------------------------------------------------------------
def test_runs_list_empty_db(tmp_path: Path) -> None:
"""``runs list`` on empty DB prints '(no runs)'."""
db_url = _setup_empty_db(tmp_path)
with patch("my_deepagent.cli.runs.load_config") as mock_cfg:
mock_cfg.return_value = MagicMock(database_url=db_url)
result = runner.invoke(app, ["runs", "list"])
assert result.exit_code == 0, result.output
assert "(no runs)" in result.output
def test_runs_list_with_one_run(tmp_path: Path) -> None:
"""``runs list`` shows a table row when one run exists."""
db_url, run_id = _setup_db_with_run(tmp_path)
with patch("my_deepagent.cli.runs.load_config") as mock_cfg:
mock_cfg.return_value = MagicMock(database_url=db_url)
result = runner.invoke(app, ["runs", "list"])
assert result.exit_code == 0, result.output
# Table should contain the first 8 chars of the run_id and the state.
assert run_id[:8] in result.output
assert RunState.COMPLETED.value in result.output
def test_runs_list_state_filter(tmp_path: Path) -> None:
"""``runs list --state completed`` only shows completed runs."""
db_url, _run_id = _setup_db_with_run(tmp_path, state=RunState.COMPLETED.value)
with patch("my_deepagent.cli.runs.load_config") as mock_cfg:
mock_cfg.return_value = MagicMock(database_url=db_url)
# Filter for failed → should return nothing.
result = runner.invoke(app, ["runs", "list", "--state", "failed"])
assert result.exit_code == 0, result.output
assert "(no runs)" in result.output
# ---------------------------------------------------------------------------
# Tests: runs show
# ---------------------------------------------------------------------------
def test_runs_show_unknown_run_id(tmp_path: Path) -> None:
"""``runs show <unknown>`` exits with code 1."""
db_url = _setup_empty_db(tmp_path)
fake_id = _make_id()
with patch("my_deepagent.cli.runs.load_config") as mock_cfg:
mock_cfg.return_value = MagicMock(database_url=db_url)
result = runner.invoke(app, ["runs", "show", fake_id])
assert result.exit_code == 1
def test_runs_show_with_full_id(tmp_path: Path) -> None:
"""``runs show <full-uuid>`` displays run details."""
db_url, run_id = _setup_db_with_run(tmp_path)
with patch("my_deepagent.cli.runs.load_config") as mock_cfg:
mock_cfg.return_value = MagicMock(database_url=db_url)
result = runner.invoke(app, ["runs", "show", run_id])
assert result.exit_code == 0, result.output
assert run_id in result.output
assert RunState.COMPLETED.value in result.output
def test_runs_show_with_prefix(tmp_path: Path) -> None:
"""``runs show <6+ char prefix>`` resolves to the correct run."""
db_url, run_id = _setup_db_with_run(tmp_path)
prefix = run_id[:8]
with patch("my_deepagent.cli.runs.load_config") as mock_cfg:
mock_cfg.return_value = MagicMock(database_url=db_url)
result = runner.invoke(app, ["runs", "show", prefix])
assert result.exit_code == 0, result.output
assert run_id in result.output
# ---------------------------------------------------------------------------
# Tests: runs resume
# ---------------------------------------------------------------------------
def test_runs_resume_completed_run_exits_one(tmp_path: Path) -> None:
"""``runs resume`` on a completed run exits 1 and says 'already terminal'."""
db_url, run_id = _setup_db_with_run(tmp_path, state=RunState.COMPLETED.value)
with patch("my_deepagent.cli.runs.load_config") as mock_cfg:
mock_cfg.return_value = MagicMock(database_url=db_url)
result = runner.invoke(app, ["runs", "resume", run_id])
assert result.exit_code == 1
assert "already terminal" in result.output
def test_runs_resume_failed_run_exits_one(tmp_path: Path) -> None:
"""``runs resume`` on a failed run exits 1 and says 'already terminal'."""
db_url, run_id = _setup_db_with_run(tmp_path, state=RunState.FAILED.value)
with patch("my_deepagent.cli.runs.load_config") as mock_cfg:
mock_cfg.return_value = MagicMock(database_url=db_url)
result = runner.invoke(app, ["runs", "resume", run_id])
assert result.exit_code == 1
assert "already terminal" in result.output
def test_runs_resume_unknown_id_exits_one(tmp_path: Path) -> None:
"""``runs resume <unknown>`` exits 1."""
db_url = _setup_empty_db(tmp_path)
fake_id = _make_id()
with patch("my_deepagent.cli.runs.load_config") as mock_cfg:
mock_cfg.return_value = MagicMock(database_url=db_url)
result = runner.invoke(app, ["runs", "resume", fake_id])
assert result.exit_code == 1

View File

@@ -53,7 +53,7 @@ def test_default_persona(monkeypatch: pytest.MonkeyPatch) -> None:
def test_default_openrouter_api_key_is_none(monkeypatch: pytest.MonkeyPatch) -> None:
_clear_env(monkeypatch)
# _env_file=None bypasses any .env that may exist in the cwd (e.g. dev keys).
cfg = Config(_env_file=None) # type: ignore[call-arg]
cfg = Config(_env_file=None)
assert cfg.openrouter_api_key is None

View File

@@ -0,0 +1,149 @@
"""Unit tests for src/my_deepagent/monitoring/cost_estimator.py."""
from __future__ import annotations
from unittest.mock import MagicMock
import pytest
from my_deepagent.monitoring.cost_estimator import (
_DEFAULT_INPUT_TOKENS,
_DEFAULT_OUTPUT_TOKENS,
PhaseCostEstimate,
WorkflowCostEstimate,
estimate_phase,
estimate_workflow,
)
from my_deepagent.monitoring.pricing import ModelPrice, PricingCache
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_pricing(model: str = "anthropic/claude-sonnet-4-6") -> PricingCache:
cache = PricingCache()
cache.set(
[
ModelPrice(
model=model,
input_per_1k_usd=0.003,
output_per_1k_usd=0.015,
context_length=200000,
)
]
)
return cache
def _make_persona(
model: str = "anthropic/claude-sonnet-4-6",
max_tokens: int | None = None,
) -> object:
p = MagicMock()
p.name = "test-persona"
p.version = 1
p.model = model
p.model_params = {"max_tokens": max_tokens} if max_tokens else {}
return p
def _make_phase(key: str = "spec") -> MagicMock:
phase = MagicMock()
phase.key = key
return phase
def _make_binding(persona: object) -> MagicMock:
b = MagicMock()
b.persona = persona
return b
# ---------------------------------------------------------------------------
# estimate_phase
# ---------------------------------------------------------------------------
def test_estimate_phase_known_model_correct_cost() -> None:
pricing = _make_pricing("anthropic/claude-sonnet-4-6")
persona = _make_persona("anthropic/claude-sonnet-4-6")
phase = _make_phase("spec")
est = estimate_phase(phase, persona, pricing) # type: ignore[arg-type]
expected_cost = _DEFAULT_INPUT_TOKENS / 1000.0 * 0.003 + _DEFAULT_OUTPUT_TOKENS / 1000.0 * 0.015
assert isinstance(est, PhaseCostEstimate)
assert est.phase_key == "spec"
assert est.persona_name == "test-persona@1"
assert est.model == "anthropic/claude-sonnet-4-6"
assert est.estimated_input_tokens == _DEFAULT_INPUT_TOKENS
assert est.estimated_output_tokens == _DEFAULT_OUTPUT_TOKENS
assert est.estimated_cost_usd == pytest.approx(expected_cost)
def test_estimate_phase_unknown_model_returns_zero_cost() -> None:
pricing = PricingCache() # empty
persona = _make_persona("unknown/model-xyz")
phase = _make_phase("unknown_phase")
est = estimate_phase(phase, persona, pricing) # type: ignore[arg-type]
assert est.estimated_cost_usd == 0.0
def test_estimate_phase_max_tokens_override() -> None:
pricing = _make_pricing()
persona = _make_persona(max_tokens=2000)
phase = _make_phase()
est = estimate_phase(phase, persona, pricing) # type: ignore[arg-type]
assert est.estimated_output_tokens == 2000
def test_estimate_phase_default_output_tokens_when_no_max_tokens() -> None:
pricing = _make_pricing()
persona = _make_persona() # no max_tokens
phase = _make_phase()
est = estimate_phase(phase, persona, pricing) # type: ignore[arg-type]
assert est.estimated_output_tokens == _DEFAULT_OUTPUT_TOKENS
# ---------------------------------------------------------------------------
# estimate_workflow
# ---------------------------------------------------------------------------
def test_estimate_workflow_sums_phases() -> None:
pricing = _make_pricing()
phase1 = _make_phase("phase1")
phase1.role = "researcher"
phase2 = _make_phase("phase2")
phase2.role = "reviewer"
template = MagicMock()
template.phases = [phase1, phase2]
persona1 = _make_persona()
persona2 = _make_persona()
bindings = {
"researcher": _make_binding(persona1),
"reviewer": _make_binding(persona2),
}
est = estimate_workflow(template, bindings, pricing) # type: ignore[arg-type]
assert isinstance(est, WorkflowCostEstimate)
assert len(est.phases) == 2
assert est.total_usd == pytest.approx(sum(p.estimated_cost_usd for p in est.phases))
assert est.total_usd > 0.0
def test_estimate_workflow_total_greater_than_zero_with_known_models() -> None:
pricing = _make_pricing()
phase = _make_phase("spec")
phase.role = "researcher"
template = MagicMock()
template.phases = [phase]
persona = _make_persona()
bindings = {"researcher": _make_binding(persona)}
est = estimate_workflow(template, bindings, pricing) # type: ignore[arg-type]
assert est.total_usd > 0.0

View File

@@ -0,0 +1,355 @@
"""Unit tests for mydeepagent doctor — 8-check full diagnostic suite."""
from __future__ import annotations
import shutil
import subprocess
import sys
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import httpx
import pytest
from my_deepagent.cli.doctor import (
_check_config_and_governance,
_check_disk_and_db,
_check_git,
_check_openrouter_api_key,
_check_openrouter_ping_and_upsert,
_check_python,
_check_uv,
_check_workspace,
)
from my_deepagent.errors import MyDeepAgentError
# ---------------------------------------------------------------------------
# 1. _check_python
# ---------------------------------------------------------------------------
def test_check_python_ok_in_312(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(sys, "version_info", (3, 12, 0, "final", 0))
monkeypatch.setattr(sys, "version", "3.12.0 (default, ...)")
result = _check_python()
assert result.status == "ok"
assert result.name == "python"
assert "3.12.0" in result.detail
def test_check_python_ok_in_313(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(sys, "version_info", (3, 13, 0, "final", 0))
monkeypatch.setattr(sys, "version", "3.13.0 (default, ...)")
result = _check_python()
assert result.status == "ok"
def test_check_python_fail_in_310(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(sys, "version_info", (3, 10, 0, "final", 0))
monkeypatch.setattr(sys, "version", "3.10.0 (default, ...)")
result = _check_python()
assert result.status == "fail"
assert "3.10.0" in result.detail
def test_check_python_fail_in_314(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(sys, "version_info", (3, 14, 0, "alpha", 0))
monkeypatch.setattr(sys, "version", "3.14.0a1 (default, ...)")
result = _check_python()
assert result.status == "fail"
# ---------------------------------------------------------------------------
# 2. _check_uv
# ---------------------------------------------------------------------------
def test_check_uv_warn_when_missing(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(shutil, "which", lambda _: None)
result = _check_uv()
assert result.status == "warn"
assert "not on PATH" in result.detail
def test_check_uv_ok_when_present(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(shutil, "which", lambda _: "/usr/local/bin/uv")
fake_run = MagicMock()
fake_run.return_value.stdout = "uv 0.5.0"
monkeypatch.setattr(subprocess, "run", fake_run)
result = _check_uv()
assert result.status == "ok"
assert "uv 0.5.0" in result.detail
def test_check_uv_warn_on_timeout(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(shutil, "which", lambda _: "/usr/local/bin/uv")
monkeypatch.setattr(
subprocess,
"run",
MagicMock(side_effect=subprocess.TimeoutExpired(["uv"], 5)),
)
result = _check_uv()
assert result.status == "warn"
assert "version probe failed" in result.detail
# ---------------------------------------------------------------------------
# 3. _check_git
# ---------------------------------------------------------------------------
def test_check_git_warn_when_missing(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(shutil, "which", lambda _: None)
result = _check_git()
assert result.status == "warn"
assert "not on PATH" in result.detail
def test_check_git_ok_when_present(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(shutil, "which", lambda _: "/usr/bin/git")
fake_run = MagicMock()
fake_run.return_value.stdout = "git version 2.40.0"
monkeypatch.setattr(subprocess, "run", fake_run)
result = _check_git()
assert result.status == "ok"
assert "2.40.0" in result.detail
# ---------------------------------------------------------------------------
# 4. _check_workspace
# ---------------------------------------------------------------------------
def test_check_workspace_ok_when_writable(tmp_path: Path) -> None:
cfg = MagicMock()
cfg.workspace_root = tmp_path
result = _check_workspace(cfg)
assert result.status == "ok"
assert str(tmp_path) in result.detail
def test_check_workspace_creates_if_missing(tmp_path: Path) -> None:
new_dir = tmp_path / "new_workspace"
cfg = MagicMock()
cfg.workspace_root = new_dir
result = _check_workspace(cfg)
assert result.status == "ok"
assert new_dir.exists()
def test_check_workspace_fail_if_not_writable(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
cfg = MagicMock()
cfg.workspace_root = tmp_path
def _raise_oserror(self: object, data: str, **kwargs: object) -> None:
raise OSError("read-only filesystem")
monkeypatch.setattr(Path, "write_text", _raise_oserror)
result = _check_workspace(cfg)
assert result.status == "fail"
assert "not writable" in result.detail
# ---------------------------------------------------------------------------
# 5. _check_config_and_governance
# ---------------------------------------------------------------------------
def test_check_governance_fail_without_consent(monkeypatch: pytest.MonkeyPatch) -> None:
import my_deepagent.cli.doctor as doctor_module
monkeypatch.setattr(doctor_module, "has_consent", lambda _: False)
cfg = MagicMock()
result = _check_config_and_governance(cfg)
assert result.status == "fail"
assert "mydeepagent init" in result.detail
def test_check_governance_ok_with_consent(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
import my_deepagent.cli.doctor as doctor_module
monkeypatch.setattr(doctor_module, "has_consent", lambda _: True)
cfg = MagicMock()
cfg.data_dir = tmp_path
result = _check_config_and_governance(cfg)
assert result.status == "ok"
assert str(tmp_path) in result.detail
# ---------------------------------------------------------------------------
# 6. _check_openrouter_api_key
# ---------------------------------------------------------------------------
def test_check_openrouter_api_key_ok(monkeypatch: pytest.MonkeyPatch) -> None:
import my_deepagent.cli.doctor as doctor_module
api_key = "sk-or-test-1234"
monkeypatch.setattr(doctor_module, "resolve_openrouter_api_key", lambda cfg: api_key)
cfg = MagicMock()
result = _check_openrouter_api_key(cfg)
assert result.status == "ok"
assert str(len(api_key)) in result.detail # "15 chars"
def test_check_openrouter_api_key_fail(monkeypatch: pytest.MonkeyPatch) -> None:
import my_deepagent.cli.doctor as doctor_module
def _raise(cfg: object) -> str:
raise MyDeepAgentError.human_required(
"backend_auth_failed",
message="missing",
recovery_hint="run login",
)
monkeypatch.setattr(doctor_module, "resolve_openrouter_api_key", _raise)
cfg = MagicMock()
result = _check_openrouter_api_key(cfg)
assert result.status == "fail"
assert "run login" in result.detail
# ---------------------------------------------------------------------------
# 7. _check_openrouter_ping_and_upsert (async)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_check_openrouter_ping_warn_no_key(monkeypatch: pytest.MonkeyPatch) -> None:
import my_deepagent.cli.doctor as doctor_module
def _raise(cfg: object) -> str:
raise MyDeepAgentError.human_required("backend_auth_failed", message="missing")
monkeypatch.setattr(doctor_module, "resolve_openrouter_api_key", _raise)
cfg = MagicMock()
result = await _check_openrouter_ping_and_upsert(cfg)
assert result.status == "warn"
assert "skipped" in result.detail
@pytest.mark.asyncio
async def test_check_openrouter_ping_ok(monkeypatch: pytest.MonkeyPatch) -> None:
import my_deepagent.cli.doctor as doctor_module
from my_deepagent.monitoring.pricing import ModelPrice
monkeypatch.setattr(doctor_module, "resolve_openrouter_api_key", lambda cfg: "sk-test")
fake_prices = [
ModelPrice("model/a", 1.0, 2.0, 4096),
ModelPrice("model/b", 0.5, 1.0, 8192),
]
monkeypatch.setattr(
doctor_module,
"fetch_openrouter_pricing",
AsyncMock(return_value=fake_prices),
)
monkeypatch.setattr(doctor_module, "_upsert_pricing", AsyncMock())
cfg = MagicMock()
result = await _check_openrouter_ping_and_upsert(cfg)
assert result.status == "ok"
assert "2 models" in result.detail
@pytest.mark.asyncio
async def test_check_openrouter_ping_fail_401(monkeypatch: pytest.MonkeyPatch) -> None:
import my_deepagent.cli.doctor as doctor_module
monkeypatch.setattr(doctor_module, "resolve_openrouter_api_key", lambda cfg: "sk-bad")
mock_response = MagicMock()
mock_response.status_code = 401
http_err = httpx.HTTPStatusError("401", request=MagicMock(), response=mock_response)
monkeypatch.setattr(
doctor_module,
"fetch_openrouter_pricing",
AsyncMock(side_effect=http_err),
)
cfg = MagicMock()
result = await _check_openrouter_ping_and_upsert(cfg)
assert result.status == "fail"
assert "401" in result.detail
@pytest.mark.asyncio
async def test_check_openrouter_ping_warn_5xx(monkeypatch: pytest.MonkeyPatch) -> None:
import my_deepagent.cli.doctor as doctor_module
monkeypatch.setattr(doctor_module, "resolve_openrouter_api_key", lambda cfg: "sk-ok")
mock_response = MagicMock()
mock_response.status_code = 503
http_err = httpx.HTTPStatusError("503", request=MagicMock(), response=mock_response)
monkeypatch.setattr(
doctor_module,
"fetch_openrouter_pricing",
AsyncMock(side_effect=http_err),
)
cfg = MagicMock()
result = await _check_openrouter_ping_and_upsert(cfg)
assert result.status == "warn"
assert "503" in result.detail
@pytest.mark.asyncio
async def test_check_openrouter_ping_warn_empty_response(
monkeypatch: pytest.MonkeyPatch,
) -> None:
import my_deepagent.cli.doctor as doctor_module
monkeypatch.setattr(doctor_module, "resolve_openrouter_api_key", lambda cfg: "sk-ok")
monkeypatch.setattr(
doctor_module,
"fetch_openrouter_pricing",
AsyncMock(return_value=[]),
)
cfg = MagicMock()
result = await _check_openrouter_ping_and_upsert(cfg)
assert result.status == "warn"
assert "no models" in result.detail
# ---------------------------------------------------------------------------
# 8. _check_disk_and_db (async)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_check_disk_and_db_ok(tmp_path: Path) -> None:
cfg = MagicMock()
cfg.workspace_root = tmp_path
cfg.database_url = f"sqlite+aiosqlite:///{tmp_path}/test.sqlite3"
result = await _check_disk_and_db(cfg)
# Should be ok or warn depending on actual free space — never fail in tmp
assert result.status in ("ok", "warn")
assert "sqlite_integrity=ok" in result.detail
@pytest.mark.asyncio
async def test_check_disk_and_db_warn_low_disk(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
# Simulate 5 GB free (warn zone: 2GB <= free < 10GB)
class _FakeUsage:
free: int = 5 * 1024**3
total: int = 100 * 1024**3
used: int = 95 * 1024**3
monkeypatch.setattr(shutil, "disk_usage", lambda _: _FakeUsage())
cfg = MagicMock()
cfg.workspace_root = tmp_path
cfg.database_url = f"sqlite+aiosqlite:///{tmp_path}/test.sqlite3"
result = await _check_disk_and_db(cfg)
assert result.status == "warn"
assert "5.0GB" in result.detail

View File

@@ -0,0 +1,126 @@
"""Unit tests for WorkflowEngine SIGTERM/SIGINT graceful shutdown handlers."""
from __future__ import annotations
import asyncio
import signal
from pathlib import Path
from typing import Any
import pytest
from my_deepagent.artifact_schema import ArtifactSchemaRegistry
from my_deepagent.binding import BackendAvailability, PersonaConsentStore
from my_deepagent.config import load_config
from my_deepagent.engine import WorkflowEngine
from my_deepagent.enums import Backend
from my_deepagent.persistence.db import Database
from my_deepagent.persona import load_personas_from_dir
_DOCS = Path(__file__).resolve().parents[2] / "docs" / "schemas"
_ARTIFACTS_ROOT = _DOCS / "artifacts"
def _make_engine(tmp_path: Path) -> WorkflowEngine:
cfg = load_config(
workspace_root=tmp_path,
data_dir=tmp_path / "data",
database_url=f"sqlite+aiosqlite:///{tmp_path / 'test.sqlite3'}",
)
personas = load_personas_from_dir(_DOCS / "personas")
registry = ArtifactSchemaRegistry(roots=[_ARTIFACTS_ROOT])
consent_store = PersonaConsentStore(tmp_path / "consents.json")
available_backends = BackendAvailability(available_backends=frozenset(Backend))
async def _dummy_approval(payload: dict[str, Any], gates: list[str]) -> Any:
raise NotImplementedError("approval not used in signal tests")
db = Database(cfg.database_url)
return WorkflowEngine(
db=db,
config=cfg,
persona_pool=personas,
artifact_registry=registry,
consent_store=consent_store,
available_backends=available_backends,
approval_callback=_dummy_approval,
)
@pytest.mark.asyncio
async def test_shutdown_requested_false_initially(tmp_path: Path) -> None:
"""Engine starts with shutdown_requested == False."""
engine = _make_engine(tmp_path)
assert engine.shutdown_requested is False
@pytest.mark.asyncio
async def test_on_signal_sets_shutdown_event(tmp_path: Path) -> None:
"""Calling _on_signal directly sets shutdown_requested to True."""
engine = _make_engine(tmp_path)
assert engine.shutdown_requested is False
engine._on_signal(signal.SIGTERM)
assert engine.shutdown_requested is True
@pytest.mark.asyncio
async def test_install_signal_handlers_registers_sigterm(tmp_path: Path) -> None:
"""install_signal_handlers registers a SIGTERM handler on the running loop."""
engine = _make_engine(tmp_path)
async def _check() -> None:
engine.install_signal_handlers()
loop = asyncio.get_running_loop()
# asyncio loop stores handlers in the private _signal_handlers dict (CPython impl).
# We accept both: the private dict exists, or signal.getsignal returns our callable.
# The private dict is preferred but may not exist on all platforms.
handlers = getattr(loop, "_signal_handlers", {})
if handlers:
assert signal.SIGTERM in handlers, "SIGTERM not registered in loop._signal_handlers"
else:
# Fallback: just verify shutdown_requested works when _on_signal is called.
engine._on_signal(signal.SIGTERM)
assert engine.shutdown_requested is True
await _check()
@pytest.mark.asyncio
async def test_force_cancel_inflight_cancels_pending_tasks(tmp_path: Path) -> None:
"""_force_cancel_inflight cancels all tasks in _inflight_tasks that are not done."""
engine = _make_engine(tmp_path)
async def _long_running() -> None:
await asyncio.sleep(1000)
task: asyncio.Task[None] = asyncio.create_task(_long_running())
engine._inflight_tasks.add(task)
# Give the event loop a tick to start the task.
await asyncio.sleep(0)
assert not task.done()
engine._force_cancel_inflight()
# Give the event loop a tick to process the cancellation.
await asyncio.sleep(0)
assert task.cancelled()
@pytest.mark.asyncio
async def test_force_cancel_inflight_skips_done_tasks(tmp_path: Path) -> None:
"""_force_cancel_inflight does not call cancel() on already-done tasks."""
engine = _make_engine(tmp_path)
async def _instant() -> str:
return "done"
task: asyncio.Task[str] = asyncio.create_task(_instant())
await asyncio.sleep(0) # let the task complete
assert task.done()
engine._inflight_tasks.add(task)
# Should not raise; done tasks are skipped.
engine._force_cancel_inflight()
# Still done, not newly cancelled.
assert task.done()
assert not task.cancelled()

View File

@@ -20,28 +20,28 @@ from my_deepagent.enums import (
def test_backend_openrouter_value() -> None:
assert Backend.OPENROUTER == "openrouter"
assert Backend.OPENROUTER == "openrouter" # type: ignore[comparison-overlap]
def test_backend_anthropic_value() -> None:
assert Backend.ANTHROPIC == "anthropic"
assert Backend.ANTHROPIC == "anthropic" # type: ignore[comparison-overlap]
def test_backend_openai_value() -> None:
assert Backend.OPENAI == "openai"
assert Backend.OPENAI == "openai" # type: ignore[comparison-overlap]
def test_backend_google_value() -> None:
assert Backend.GOOGLE == "google"
assert Backend.GOOGLE == "google" # type: ignore[comparison-overlap]
def test_backend_fake_value() -> None:
assert Backend.FAKE == "fake"
assert Backend.FAKE == "fake" # type: ignore[comparison-overlap]
def test_backend_str_equality() -> None:
# StrEnum members compare equal to their string values
assert Backend.OPENROUTER == "openrouter"
assert Backend.OPENROUTER == "openrouter" # type: ignore[comparison-overlap]
assert str(Backend.OPENROUTER) == "openrouter"
@@ -55,15 +55,15 @@ def test_capability_count() -> None:
def test_capability_spec_write() -> None:
assert Capability.SPEC_WRITE == "spec_write"
assert Capability.SPEC_WRITE == "spec_write" # type: ignore[comparison-overlap]
def test_capability_code_edit() -> None:
assert Capability.CODE_EDIT == "code_edit"
assert Capability.CODE_EDIT == "code_edit" # type: ignore[comparison-overlap]
def test_capability_final_report_compose() -> None:
assert Capability.FINAL_REPORT_COMPOSE == "final_report_compose"
assert Capability.FINAL_REPORT_COMPOSE == "final_report_compose" # type: ignore[comparison-overlap]
def test_capability_all_are_str() -> None:
@@ -77,9 +77,9 @@ def test_capability_all_are_str() -> None:
def test_risk_level_values() -> None:
assert RiskLevel.LOW == "low"
assert RiskLevel.MEDIUM == "medium"
assert RiskLevel.HIGH == "high"
assert RiskLevel.LOW == "low" # type: ignore[comparison-overlap]
assert RiskLevel.MEDIUM == "medium" # type: ignore[comparison-overlap]
assert RiskLevel.HIGH == "high" # type: ignore[comparison-overlap]
# ---------------------------------------------------------------------------
@@ -88,19 +88,19 @@ def test_risk_level_values() -> None:
def test_approval_decision_action_approve() -> None:
assert ApprovalDecisionAction.APPROVE == "approve"
assert ApprovalDecisionAction.APPROVE == "approve" # type: ignore[comparison-overlap]
def test_approval_decision_action_reject() -> None:
assert ApprovalDecisionAction.REJECT == "reject"
assert ApprovalDecisionAction.REJECT == "reject" # type: ignore[comparison-overlap]
def test_approval_decision_action_request_changes() -> None:
assert ApprovalDecisionAction.REQUEST_CHANGES == "request_changes"
assert ApprovalDecisionAction.REQUEST_CHANGES == "request_changes" # type: ignore[comparison-overlap]
def test_approval_decision_action_abort() -> None:
assert ApprovalDecisionAction.ABORT == "abort"
assert ApprovalDecisionAction.ABORT == "abort" # type: ignore[comparison-overlap]
# ---------------------------------------------------------------------------
@@ -196,15 +196,15 @@ def test_session_state_count() -> None:
def test_error_class_recoverable() -> None:
assert ErrorClass.RECOVERABLE == "recoverable"
assert ErrorClass.RECOVERABLE == "recoverable" # type: ignore[comparison-overlap]
def test_error_class_human_required() -> None:
assert ErrorClass.HUMAN_REQUIRED == "human_required"
assert ErrorClass.HUMAN_REQUIRED == "human_required" # type: ignore[comparison-overlap]
def test_error_class_fatal() -> None:
assert ErrorClass.FATAL == "fatal"
assert ErrorClass.FATAL == "fatal" # type: ignore[comparison-overlap]
def test_error_class_count() -> None:
@@ -223,7 +223,7 @@ def test_str_enum_from_value() -> None:
def test_str_enum_in_dict() -> None:
# StrEnum should work as dict key and compare with string
d = {Backend.OPENROUTER: "openrouter backend"}
assert d["openrouter"] == "openrouter backend"
assert d["openrouter"] == "openrouter backend" # type: ignore[index]
@pytest.mark.parametrize(

View File

@@ -0,0 +1,53 @@
"""Unit tests for _expand_file_refs in cli/interactive.py."""
from __future__ import annotations
from pathlib import Path
import pytest
from my_deepagent.cli.interactive import _expand_file_refs
@pytest.fixture
def tmp_repo(tmp_path: Path) -> Path:
"""Create a minimal repo root with one sample file."""
(tmp_path / "foo.py").write_text("x = 1\n", encoding="utf-8")
return tmp_path
def test_expand_existing_file(tmp_repo: Path) -> None:
expanded = _expand_file_refs("read @foo.py please", tmp_repo)
assert "```py" in expanded
assert "# foo.py" in expanded
assert "x = 1" in expanded
def test_expand_missing_file_unchanged(tmp_repo: Path) -> None:
original = "read @missing.py please"
expanded = _expand_file_refs(original, tmp_repo)
assert expanded == original
def test_expand_path_traversal_blocked(tmp_repo: Path) -> None:
# Create a file outside the repo root
outside = tmp_repo.parent / "secret.txt"
outside.write_text("secret", encoding="utf-8")
original = "read @../secret.txt"
expanded = _expand_file_refs(original, tmp_repo)
# The @ref should remain unexpanded (repo root escape)
assert "secret" not in expanded or "@../secret.txt" in expanded
def test_expand_multiple_refs(tmp_repo: Path) -> None:
(tmp_repo / "bar.ts").write_text("const y = 2;\n", encoding="utf-8")
expanded = _expand_file_refs("look at @foo.py and @bar.ts", tmp_repo)
assert "# foo.py" in expanded
assert "# bar.ts" in expanded
assert "x = 1" in expanded
assert "const y = 2" in expanded
def test_expand_no_at_signs_unchanged(tmp_repo: Path) -> None:
original = "plain text with no file refs"
assert _expand_file_refs(original, tmp_repo) == original

View File

@@ -0,0 +1,72 @@
"""Unit tests for src/my_deepagent/governance.py."""
from __future__ import annotations
import json
import os
import stat
from pathlib import Path
from unittest.mock import patch
import pytest
from my_deepagent.errors import MyDeepAgentError
from my_deepagent.governance import consent_path, has_consent, record_consent, require_consent
def test_has_consent_false_when_empty(tmp_path: Path) -> None:
assert has_consent(tmp_path) is False
def test_has_consent_true_after_record(tmp_path: Path) -> None:
record_consent(tmp_path)
assert has_consent(tmp_path) is True
def test_consent_file_path(tmp_path: Path) -> None:
expected = tmp_path / "governance-accepted.json"
assert consent_path(tmp_path) == expected
def test_record_consent_creates_valid_json(tmp_path: Path) -> None:
record_consent(tmp_path)
content = consent_path(tmp_path).read_text()
data = json.loads(content)
assert "accepted_at" in data
assert "T" in data["accepted_at"] # ISO format
def test_record_consent_file_mode_600(tmp_path: Path) -> None:
record_consent(tmp_path)
file_stat = consent_path(tmp_path).stat()
mode = stat.S_IMODE(file_stat.st_mode)
assert mode == 0o600
def test_record_consent_atomic_uses_os_replace(tmp_path: Path) -> None:
replace_calls: list[tuple[object, object]] = []
original_replace = os.replace
def spy_replace(src: object, dst: object) -> None:
replace_calls.append((src, dst))
original_replace(src, dst) # type: ignore[arg-type]
with patch("my_deepagent.governance.os.replace", spy_replace):
record_consent(tmp_path)
assert len(replace_calls) == 1
src_path, dst_path = replace_calls[0]
assert str(src_path).endswith(".tmp")
assert str(dst_path) == str(consent_path(tmp_path))
def test_require_consent_raises_when_no_consent(tmp_path: Path) -> None:
with pytest.raises(MyDeepAgentError) as exc_info:
require_consent(tmp_path)
assert exc_info.value.code == "governance_not_accepted"
def test_require_consent_passes_when_consent_exists(tmp_path: Path) -> None:
record_consent(tmp_path)
require_consent(tmp_path) # should not raise

View File

@@ -0,0 +1,67 @@
"""Unit tests for src/my_deepagent/i18n/__init__.py."""
from __future__ import annotations
import pytest
from my_deepagent.i18n import _load, resolve_lang, t
def test_t_welcome_default_ko() -> None:
result = t("init.welcome")
assert "my-deepagent" in result
assert "환영합니다" in result
def test_t_welcome_en() -> None:
result = t("init.welcome", lang="en")
assert "Welcome" in result
def test_t_format_provider() -> None:
result = t("login.saved", provider="openrouter")
assert "openrouter" in result
def test_t_missing_key_returns_key_itself() -> None:
result = t("nonexistent.missing_key")
assert result == "nonexistent.missing_key"
def test_t_missing_section_returns_key_itself() -> None:
result = t("no_such_section.key")
assert result == "no_such_section.key"
def test_resolve_lang_env_en(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("MYDEEPAGENT_LANG", "en")
assert resolve_lang() == "en"
def test_resolve_lang_env_ko(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("MYDEEPAGENT_LANG", "ko")
assert resolve_lang() == "ko"
def test_resolve_lang_default_ko(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("MYDEEPAGENT_LANG", raising=False)
assert resolve_lang() == "ko"
def test_resolve_lang_invalid_env_falls_back_to_default(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("MYDEEPAGENT_LANG", "fr")
assert resolve_lang() == "ko"
def test_load_cache_same_instance() -> None:
_load.cache_clear()
first = _load("ko")
second = _load("ko")
assert first is second
def test_t_format_error_returns_template() -> None:
# If fmt keys don't match, returns raw template string not raising
result = t("login.saved", provider="openrouter")
assert isinstance(result, str)
assert len(result) > 0

View File

@@ -0,0 +1,72 @@
"""Unit tests for src/my_deepagent/keys.py. Uses a fake keyring backend."""
from __future__ import annotations
import pytest
import my_deepagent.keys as keys_module
from my_deepagent.keys import delete_api_key, get_api_key, mask, set_api_key
class _FakeKeyring:
def __init__(self) -> None:
self.store: dict[tuple[str, str], str] = {}
def get_password(self, service: str, username: str) -> str | None:
return self.store.get((service, username))
def set_password(self, service: str, username: str, value: str) -> None:
self.store[(service, username)] = value
def delete_password(self, service: str, username: str) -> None:
self.store.pop((service, username), None)
@pytest.fixture
def fake_keyring(monkeypatch: pytest.MonkeyPatch) -> _FakeKeyring:
fake = _FakeKeyring()
monkeypatch.setattr(keys_module.keyring, "get_password", fake.get_password)
monkeypatch.setattr(keys_module.keyring, "set_password", fake.set_password)
monkeypatch.setattr(keys_module.keyring, "delete_password", fake.delete_password)
return fake
def test_get_api_key_not_set_returns_none(fake_keyring: _FakeKeyring) -> None:
assert get_api_key("openrouter") is None
def test_set_and_get_api_key_round_trip(fake_keyring: _FakeKeyring) -> None:
set_api_key("openrouter", "sk-or-test-1234")
assert get_api_key("openrouter") == "sk-or-test-1234"
def test_delete_api_key_existing_returns_true(fake_keyring: _FakeKeyring) -> None:
set_api_key("openrouter", "sk-or-test")
assert delete_api_key("openrouter") is True
def test_delete_api_key_not_existing_returns_false(fake_keyring: _FakeKeyring) -> None:
assert delete_api_key("openrouter") is False
def test_delete_api_key_removes_value(fake_keyring: _FakeKeyring) -> None:
set_api_key("openrouter", "sk-or-test")
delete_api_key("openrouter")
assert get_api_key("openrouter") is None
def test_mask_long_key() -> None:
result = mask("sk-or-v1-abc1234567xyz9876")
assert result == "sk-or-v1...9876"
def test_mask_none_returns_not_set() -> None:
assert mask(None) == "(not set)"
def test_mask_short_key_returns_stars() -> None:
assert mask("short") == "***"
def test_mask_exactly_8_chars_returns_stars() -> None:
assert mask("12345678") == "***"

View File

@@ -0,0 +1,121 @@
"""Unit tests for src/my_deepagent/logging.py — secret scrubbing."""
from __future__ import annotations
from typing import Any
from my_deepagent.logging import _scrub_processor, scrub, scrub_value
_REDACTED = "[REDACTED]"
# ---------------------------------------------------------------------------
# scrub — individual patterns
# ---------------------------------------------------------------------------
def test_scrub_openrouter_key() -> None:
secret = "sk-or-v1-abc1234567890123456789xyz"
assert scrub(secret) == _REDACTED
def test_scrub_anthropic_key() -> None:
secret = "sk-ant-api03-abcdef1234567890abcdef1234567890xyz"
assert scrub(secret) == _REDACTED
def test_scrub_openai_project_key() -> None:
secret = "sk-proj-abcdefghijklmnopqrstuvwxyz12345"
assert scrub(secret) == _REDACTED
def test_scrub_openai_general_key() -> None:
# must be 30+ chars after "sk-"
secret = "sk-abcdefghijklmnopqrstuvwxyz1234567890"
assert scrub(secret) == _REDACTED
def test_scrub_github_pat() -> None:
secret = "ghp_abcdefghijklmnopqrstuvwxyz1234567890"
assert scrub(secret) == _REDACTED
def test_scrub_bearer_token() -> None:
text = "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.payload"
result = scrub(text)
assert _REDACTED in result
def test_scrub_plain_text_unchanged() -> None:
text = "normal log message with no secrets here"
assert scrub(text) == text
def test_scrub_partial_match_in_larger_string() -> None:
text = f"calling API with key=sk-ant-api03-{'x' * 30}"
result = scrub(text)
assert _REDACTED in result
assert "calling API with key=" in result
# ---------------------------------------------------------------------------
# scrub_value — recursive
# ---------------------------------------------------------------------------
def test_scrub_value_dict_scrubs_string_values() -> None:
secret = f"sk-or-v1-{'a' * 25}"
data: dict[str, Any] = {"key": secret, "n": 42}
result = scrub_value(data)
assert result["key"] == _REDACTED
assert result["n"] == 42
def test_scrub_value_list_scrubs_all_strings() -> None:
secret_ant = f"sk-ant-api03-{'b' * 30}"
secret_ghp = f"ghp_{'c' * 35}"
data: list[Any] = [1, secret_ant, {"k": secret_ghp}]
result = scrub_value(data)
assert result[0] == 1
assert result[1] == _REDACTED
assert result[2]["k"] == _REDACTED
def test_scrub_value_non_string_passes_through() -> None:
assert scrub_value(42) == 42
assert scrub_value(3.14) == 3.14
assert scrub_value(None) is None
assert scrub_value(True) is True
def test_scrub_value_tuple_scrubs_strings() -> None:
secret = f"sk-or-v1-{'d' * 22}"
result = scrub_value((secret, "safe"))
assert isinstance(result, tuple)
assert result[0] == _REDACTED
assert result[1] == "safe"
# ---------------------------------------------------------------------------
# _scrub_processor
# ---------------------------------------------------------------------------
def test_scrub_processor_scrubs_event_dict_values() -> None:
secret = f"sk-ant-api03-{'e' * 30}"
event_dict: dict[str, Any] = {
"event": "calling model",
"api_key": secret,
"model": "claude-3",
}
result = _scrub_processor(None, "info", event_dict)
assert result["api_key"] == _REDACTED
assert result["event"] == "calling model"
assert result["model"] == "claude-3"
def test_scrub_processor_returns_dict() -> None:
event_dict: dict[str, Any] = {"event": "no secrets here", "count": 5}
result = _scrub_processor(None, "debug", event_dict)
assert isinstance(result, dict)
assert result["count"] == 5

View File

@@ -47,7 +47,9 @@ def _minimal_persona_dict(**overrides: object) -> dict[str, object]:
def test_all_seed_personas_load() -> None:
personas = load_personas_from_dir(PERSONAS_DIR)
assert len(personas) == 10
# 10 original + 2 deepseek personas added for E2E (Anthropic-via-OpenRouter
# tool-call compatibility workaround); see CHANGELOG Step 15.
assert len(personas) == 12
def test_seed_persona_names_unique() -> None:

View File

@@ -20,7 +20,7 @@ from my_deepagent.monitoring.pricing import (
def test_parse_valid_payload_returns_model_prices() -> None:
data = {
data: dict[str, object] = {
"data": [
{
"id": "deepseek/deepseek-chat",
@@ -60,7 +60,7 @@ def test_parse_missing_data_key_returns_empty() -> None:
def test_parse_skips_entries_without_id() -> None:
data = {
data: dict[str, object] = {
"data": [
{"pricing": {"prompt": "0.000001", "completion": "0.000002"}, "context_length": 1000},
]
@@ -70,7 +70,7 @@ def test_parse_skips_entries_without_id() -> None:
def test_parse_skips_entries_with_invalid_pricing_values() -> None:
data = {
data: dict[str, object] = {
"data": [
{
"id": "model/x",
@@ -84,7 +84,7 @@ def test_parse_skips_entries_with_invalid_pricing_values() -> None:
def test_parse_handles_null_pricing_gracefully() -> None:
data = {
data: dict[str, object] = {
"data": [
{"id": "model/y", "pricing": None, "context_length": 0},
]
@@ -97,7 +97,7 @@ def test_parse_handles_null_pricing_gracefully() -> None:
def test_parse_handles_missing_context_length() -> None:
data = {
data: dict[str, object] = {
"data": [
{"id": "model/z", "pricing": {"prompt": "0.000001", "completion": "0.000002"}},
]
@@ -108,7 +108,7 @@ def test_parse_handles_missing_context_length() -> None:
def test_parse_non_dict_entry_is_skipped() -> None:
data = {"data": ["not-a-dict", None]}
data: dict[str, object] = {"data": ["not-a-dict", None]}
result = _parse_pricing_payload(data)
assert result == []

View File

@@ -0,0 +1,86 @@
"""Unit tests for src/my_deepagent/secrets.py."""
from __future__ import annotations
import pytest
import my_deepagent.keys as keys_module
from my_deepagent.config import load_config
from my_deepagent.errors import MyDeepAgentError
from my_deepagent.secrets import resolve_openrouter_api_key
class _FakeKeyring:
def __init__(self) -> None:
self.store: dict[tuple[str, str], str] = {}
def get_password(self, service: str, username: str) -> str | None:
return self.store.get((service, username))
def set_password(self, service: str, username: str, value: str) -> None:
self.store[(service, username)] = value
def delete_password(self, service: str, username: str) -> None:
self.store.pop((service, username), None)
@pytest.fixture
def fake_keyring(monkeypatch: pytest.MonkeyPatch) -> _FakeKeyring:
fake = _FakeKeyring()
monkeypatch.setattr(keys_module.keyring, "get_password", fake.get_password)
monkeypatch.setattr(keys_module.keyring, "set_password", fake.set_password)
monkeypatch.setattr(keys_module.keyring, "delete_password", fake.delete_password)
return fake
def test_resolves_from_config(fake_keyring: _FakeKeyring) -> None:
config = load_config(openrouter_api_key="sk-config-key")
result = resolve_openrouter_api_key(config)
assert result == "sk-config-key"
def test_resolves_from_mydeepagent_env(
monkeypatch: pytest.MonkeyPatch, fake_keyring: _FakeKeyring
) -> None:
monkeypatch.delenv("MYDEEPAGENT_OPENROUTER_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
monkeypatch.setenv("MYDEEPAGENT_OPENROUTER_API_KEY", "sk-env-mydeepagent")
config = load_config(openrouter_api_key=None)
assert resolve_openrouter_api_key(config) == "sk-env-mydeepagent"
def test_resolves_from_openrouter_env_fallback(
monkeypatch: pytest.MonkeyPatch, fake_keyring: _FakeKeyring
) -> None:
monkeypatch.delenv("MYDEEPAGENT_OPENROUTER_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-env-fallback")
config = load_config(openrouter_api_key=None)
assert resolve_openrouter_api_key(config) == "sk-env-fallback"
def test_resolves_from_keyring(monkeypatch: pytest.MonkeyPatch, fake_keyring: _FakeKeyring) -> None:
monkeypatch.delenv("MYDEEPAGENT_OPENROUTER_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
keys_module.set_api_key("openrouter", "sk-keyring-key")
config = load_config(openrouter_api_key=None)
assert resolve_openrouter_api_key(config) == "sk-keyring-key"
def test_raises_backend_auth_failed_when_all_missing(
monkeypatch: pytest.MonkeyPatch, fake_keyring: _FakeKeyring
) -> None:
monkeypatch.delenv("MYDEEPAGENT_OPENROUTER_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
config = load_config(openrouter_api_key=None)
with pytest.raises(MyDeepAgentError) as exc_info:
resolve_openrouter_api_key(config)
assert exc_info.value.code == "backend_auth_failed"
def test_config_takes_priority_over_env(
monkeypatch: pytest.MonkeyPatch, fake_keyring: _FakeKeyring
) -> None:
monkeypatch.setenv("MYDEEPAGENT_OPENROUTER_API_KEY", "sk-env-should-lose")
config = load_config(openrouter_api_key="sk-config-wins")
assert resolve_openrouter_api_key(config) == "sk-config-wins"

View File

@@ -62,7 +62,7 @@ def _minimal_permission_spec(
return FilesystemPermissionSpec(
operations=tuple(operations or ["read"]),
paths=tuple(paths or ["/**"]),
mode=mode, # type: ignore[arg-type]
mode=mode,
)
@@ -223,7 +223,10 @@ def test_subagent_to_dict_optional_tools_included_when_set() -> None:
sub = _minimal_subagent(allowed_tools=["read_file", "write_file"])
d = _subagent_to_dict(sub)
assert "tools" in d
assert d["tools"] == ["read_file", "write_file"]
# _subagent_to_dict serializes allowed_tools as a list[str]; SubAgent TypedDict
# widens the tools type to include BaseTool/Callable, hence the cast for mypy.
tools_list: list[Any] = list(d["tools"])
assert tools_list == ["read_file", "write_file"]
def test_subagent_to_dict_no_tools_key_when_empty() -> None:

View File

@@ -0,0 +1,129 @@
"""Unit tests for slash.py — parse_slash + SlashRegistry."""
from __future__ import annotations
import pytest
from my_deepagent.slash import SlashParsed, SlashRegistry, parse_slash
# ---------------------------------------------------------------------------
# parse_slash
# ---------------------------------------------------------------------------
def test_parse_quit() -> None:
result = parse_slash("/quit")
assert result is not None
assert result.name == "quit"
assert result.args == ()
assert result.raw == "quit"
def test_parse_agent_with_arg() -> None:
result = parse_slash("/agent code-reviewer")
assert result is not None
assert result.name == "agent"
assert result.args == ("code-reviewer",)
def test_parse_model_with_slash_in_arg() -> None:
result = parse_slash("/model anthropic/claude")
assert result is not None
assert result.name == "model"
assert result.args == ("anthropic/claude",)
def test_parse_plain_text_returns_none() -> None:
assert parse_slash("hello world") is None
def test_parse_empty_string_returns_none() -> None:
assert parse_slash("") is None
def test_parse_bare_slash_gives_empty_name() -> None:
result = parse_slash("/")
assert result is not None
assert result.name == ""
assert result.args == ()
assert result.raw == ""
def test_parse_uppercase_normalized_to_lower() -> None:
result = parse_slash("/QUIT")
assert result is not None
assert result.name == "quit"
def test_parse_spaced_slash_command() -> None:
result = parse_slash("/ spaced ")
# body after strip of "/ spaced " → body = "spaced" (strip on body)
assert result is not None
assert result.name == "spaced"
assert result.args == ()
# ---------------------------------------------------------------------------
# SlashRegistry
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_registry_register_and_dispatch_returns_handler_value() -> None:
reg = SlashRegistry()
calls: list[str] = []
async def handler(cmd: SlashParsed) -> bool:
calls.append(cmd.name)
return False
reg.register("foo", handler, help="test help")
result = await reg.dispatch(SlashParsed(name="foo", args=(), raw="foo"))
assert result is False
assert calls == ["foo"]
@pytest.mark.asyncio
async def test_registry_unknown_name_returns_false() -> None:
reg = SlashRegistry()
result = await reg.dispatch(SlashParsed(name="nonexistent", args=(), raw="nonexistent"))
assert result is False
@pytest.mark.asyncio
async def test_registry_handler_returning_true_propagates() -> None:
reg = SlashRegistry()
async def quit_handler(cmd: SlashParsed) -> bool:
return True
reg.register("quit", quit_handler, help="exit")
result = await reg.dispatch(SlashParsed(name="quit", args=(), raw="quit"))
assert result is True
def test_registry_names_sorted() -> None:
reg = SlashRegistry()
async def noop(cmd: SlashParsed) -> bool:
return False
reg.register("zebra", noop)
reg.register("apple", noop)
reg.register("mango", noop)
assert reg.names == ["apple", "mango", "zebra"]
def test_registry_help_for_and_all_help() -> None:
reg = SlashRegistry()
async def noop(cmd: SlashParsed) -> bool:
return False
reg.register("quit", noop, help="exit the REPL")
reg.register("help", noop, help="show commands")
assert reg.help_for("quit") == "exit the REPL"
assert reg.help_for("unknown") == ""
pairs = dict(reg.all_help())
assert pairs["quit"] == "exit the REPL"
assert pairs["help"] == "show commands"