Python rewrite of the agent harness on top of deepagents 0.6.1 + langchain 1.x, replacing the abandoned TS attempt in packages/. 388 unit/integration tests pass. Steps ----- 0. Scaffolding — uv workspace, ruff/mypy/pre-commit/alembic, src/tests/docs trees with docs/schemas/ seeded from my-deepagent-seed/. 1. Core — config (pydantic-settings with MYDEEPAGENT_ env prefix and TOML source), enums (Backend, Capability, RiskLevel, ApprovalDecisionAction, ApprovalState, RunState, RunPhaseState, SessionState, ErrorClass), errors (MyDeepAgentError + BudgetExhaustedError with PEP-3134 cause + context suppression), hash (canonical JSON + sha256). 2. Persona/Workflow/Binding — pydantic v2 schemas with tuple-based deep immutability (post-construction hash drift prevented), YAML loaders, deterministic auto-select (preferred_backends → version → name → hash), override resolution with ineligibility diagnostics, PersonaConsentStore with fcntl.flock + tmp+fsync+rename atomic write. 3. Artifact schema registry — Draft202012Validator, multi-root resolution, structured ValidationFinding output. 4. Persistence — 18 SQLAlchemy 2.0 async ORM models with FK CASCADE/RESTRICT, WAL + busy_timeout + foreign_keys PRAGMA, alembic baseline + ux_active_run_repo_base partial unique index, LangGraph SqliteSaver as context manager only (lifecycle safety). 5. DeepAgent session — build_agent wires Persona → create_deep_agent with LocalShellBackend / FilesystemBackend / StateBackend / CompositeBackend, ChatOpenAI(base_url=openrouter) for openrouter: model strings, and 4 middleware classes (cost / audit-tool / safety-shell / fallback-model). Critical workarounds -------------------- - deepagents 0.6.1 rejects FilesystemPermission together with backends that implement SandboxBackendProtocol (LocalShellBackend). SafetyShellMiddleware enforces destructive-command and secret-path policy at the tool layer instead, and build_agent strips the permissions kwarg when the persona's deepagents_backend is local_shell. - FilesystemOperation in deepagents is Literal['read', 'write'] only; _map_operations collapses our richer schema (read/write/edit/ls) safely. Real OpenRouter smoke --------------------- test_openrouter_deepagents_local_shell_smoke calls DeepSeek via deepagents + LocalShellBackend + SafetyShellMiddleware end-to-end. PASS, ~$0.000001 cost, input=9 / output=1 tokens with content "OK". Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
392 lines
15 KiB
Python
392 lines
15 KiB
Python
"""Unit tests for src/my_deepagent/artifact_schema.py."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
|
|
from my_deepagent.artifact_schema import (
|
|
ArtifactSchemaRegistry,
|
|
ValidationFinding,
|
|
ValidationResult,
|
|
)
|
|
from my_deepagent.errors import MyDeepAgentError
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
REPO_ROOT = Path(__file__).parent.parent.parent
|
|
SEED_ROOT = REPO_ROOT / "docs" / "schemas" / "artifacts"
|
|
|
|
SEED_SCHEMA_IDS = [
|
|
"common/final-report@1",
|
|
"dev/phase-plan@1",
|
|
"dev/review-finding-batch@1",
|
|
"dev/spec@1",
|
|
]
|
|
|
|
|
|
@pytest.fixture
|
|
def seed_registry() -> ArtifactSchemaRegistry:
|
|
return ArtifactSchemaRegistry(roots=[SEED_ROOT])
|
|
|
|
|
|
@pytest.fixture
|
|
def valid_spec() -> dict[str, Any]:
|
|
return {
|
|
"runId": "00000000-0000-4000-8000-000000000000",
|
|
"phaseKey": "spec",
|
|
"requirements": "User wants a CLI tool that analyzes log files.",
|
|
"acceptance_criteria": ["parses .log files", "outputs JSON summary"],
|
|
"approach": "Build a typer-based CLI using regex and json output.",
|
|
"risks": ["log format variations may break parser"],
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 1. Seed schema load success (4 schemas)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.parametrize("schema_id", SEED_SCHEMA_IDS)
|
|
def test_seed_schema_loads(seed_registry: ArtifactSchemaRegistry, schema_id: str) -> None:
|
|
schema = seed_registry.load(schema_id)
|
|
assert isinstance(schema, dict)
|
|
assert schema.get("$id") == schema_id
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 2. Load result caching — same dict object on second call
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_load_caches_same_object(seed_registry: ArtifactSchemaRegistry) -> None:
|
|
first = seed_registry.load("dev/spec@1")
|
|
second = seed_registry.load("dev/spec@1")
|
|
assert first is second
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 3. Unknown schema_id → artifact_schema_unknown
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_unknown_schema_id_raises(seed_registry: ArtifactSchemaRegistry) -> None:
|
|
with pytest.raises(MyDeepAgentError) as exc_info:
|
|
seed_registry.load("dev/nonexistent@99")
|
|
assert exc_info.value.code == "artifact_schema_unknown"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4. Invalid schema_id format (no slash) → artifact_schema_unknown
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_invalid_schema_id_no_slash(seed_registry: ArtifactSchemaRegistry) -> None:
|
|
with pytest.raises(MyDeepAgentError) as exc_info:
|
|
seed_registry.load("foo")
|
|
assert exc_info.value.code == "artifact_schema_unknown"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 5. schema_id starting with "/" → rejected (no slash separating domain/name)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_invalid_schema_id_leading_slash(seed_registry: ArtifactSchemaRegistry) -> None:
|
|
# "/foo/bar" has a slash but the domain portion would be empty
|
|
# After splitting on "/", domain="" which is not a valid domain/name pair.
|
|
# The registry treats it as a path traversal risk: Path("/foo/bar.json")
|
|
# is absolute and will never exist under a root directory (is_file() → False).
|
|
with pytest.raises(MyDeepAgentError) as exc_info:
|
|
seed_registry.load("/dev/spec@1")
|
|
assert exc_info.value.code == "artifact_schema_unknown"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 6. Empty schema_id → artifact_schema_unknown
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_empty_schema_id_raises(seed_registry: ArtifactSchemaRegistry) -> None:
|
|
with pytest.raises(MyDeepAgentError) as exc_info:
|
|
seed_registry.load("")
|
|
assert exc_info.value.code == "artifact_schema_unknown"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 7. Fallback: schema absent in first root, present in second
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_fallback_to_second_root(tmp_path: Path) -> None:
|
|
first_root = tmp_path / "first"
|
|
first_root.mkdir()
|
|
second_root = tmp_path / "second"
|
|
(second_root / "dev").mkdir(parents=True)
|
|
schema: dict[str, Any] = {
|
|
"$schema": "https://json-schema.org/draft/2020-12/schema",
|
|
"$id": "dev/thing@1",
|
|
"type": "object",
|
|
}
|
|
(second_root / "dev" / "thing@1.json").write_text(json.dumps(schema), encoding="utf-8")
|
|
registry = ArtifactSchemaRegistry(roots=[first_root, second_root])
|
|
loaded = registry.load("dev/thing@1")
|
|
assert loaded["$id"] == "dev/thing@1"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 8. validate with valid data → ok=True
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_validate_valid_spec(
|
|
seed_registry: ArtifactSchemaRegistry, valid_spec: dict[str, Any]
|
|
) -> None:
|
|
result = seed_registry.validate("dev/spec@1", valid_spec)
|
|
assert result.ok is True
|
|
assert result.errors == ()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 9. validate with invalid data → ok=False, findings non-empty
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_validate_invalid_data_returns_findings(
|
|
seed_registry: ArtifactSchemaRegistry,
|
|
) -> None:
|
|
result = seed_registry.validate("dev/spec@1", {"wrong": "data"})
|
|
assert result.ok is False
|
|
assert len(result.errors) > 0
|
|
for finding in result.errors:
|
|
assert isinstance(finding, ValidationFinding)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 10. Missing required field → validator="required", path correct
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_validate_missing_required_field(
|
|
seed_registry: ArtifactSchemaRegistry, valid_spec: dict[str, Any]
|
|
) -> None:
|
|
data = {k: v for k, v in valid_spec.items() if k != "requirements"}
|
|
result = seed_registry.validate("dev/spec@1", data)
|
|
assert result.ok is False
|
|
required_findings = [f for f in result.errors if f.validator == "required"]
|
|
assert any("requirements" in f.message for f in required_findings)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 11. Invalid enum value → validator="enum", expected has enum list
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_validate_invalid_enum_severity(seed_registry: ArtifactSchemaRegistry) -> None:
|
|
data = {
|
|
"runId": "00000000-0000-4000-8000-000000000000",
|
|
"phaseKey": "review",
|
|
"reviewerRole": "code-reviewer",
|
|
"findings": [
|
|
{
|
|
"severity": "bogus",
|
|
"category": "correctness",
|
|
"summary": "something is wrong here",
|
|
}
|
|
],
|
|
"summary": "Overall review summary with enough length.",
|
|
}
|
|
result = seed_registry.validate("dev/review-finding-batch@1", data)
|
|
assert result.ok is False
|
|
enum_findings = [f for f in result.errors if f.validator == "enum"]
|
|
assert len(enum_findings) > 0
|
|
finding = enum_findings[0]
|
|
assert isinstance(finding.expected, list)
|
|
assert "bogus" not in finding.expected
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 12. Wrong type → validator="type", expected has type name
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_validate_wrong_type(
|
|
seed_registry: ArtifactSchemaRegistry, valid_spec: dict[str, Any]
|
|
) -> None:
|
|
data = dict(valid_spec)
|
|
data["acceptance_criteria"] = "should be a list, not a string"
|
|
result = seed_registry.validate("dev/spec@1", data)
|
|
assert result.ok is False
|
|
type_findings = [f for f in result.errors if f.validator == "type"]
|
|
assert len(type_findings) > 0
|
|
assert type_findings[0].expected == "array"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 13. Nested error path — /findings/0/severity format
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_validate_nested_error_path(seed_registry: ArtifactSchemaRegistry) -> None:
|
|
data = {
|
|
"runId": "00000000-0000-4000-8000-000000000000",
|
|
"phaseKey": "review",
|
|
"reviewerRole": "code-reviewer",
|
|
"findings": [
|
|
{
|
|
"severity": "not-valid",
|
|
"category": "correctness",
|
|
"summary": "a finding summary",
|
|
}
|
|
],
|
|
"summary": "Overall review summary with enough length.",
|
|
}
|
|
result = seed_registry.validate("dev/review-finding-batch@1", data)
|
|
assert result.ok is False
|
|
paths = [f.path for f in result.errors]
|
|
assert any(p.startswith("/findings/0/") for p in paths)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 14. known_schema_ids() returns all 4 seed schemas, sorted
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_known_schema_ids_returns_seeds(seed_registry: ArtifactSchemaRegistry) -> None:
|
|
ids = seed_registry.known_schema_ids()
|
|
for expected in SEED_SCHEMA_IDS:
|
|
assert expected in ids
|
|
assert ids == sorted(ids)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 15. Empty roots list → config_invalid
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_empty_roots_raises() -> None:
|
|
with pytest.raises(MyDeepAgentError) as exc_info:
|
|
ArtifactSchemaRegistry(roots=[])
|
|
assert exc_info.value.code == "config_invalid"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 16. Corrupted JSON file → artifact_schema_load_failed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_corrupted_json_raises(tmp_path: Path) -> None:
|
|
(tmp_path / "dev").mkdir()
|
|
(tmp_path / "dev" / "broken@1.json").write_text("{", encoding="utf-8")
|
|
registry = ArtifactSchemaRegistry(roots=[tmp_path])
|
|
with pytest.raises(MyDeepAgentError) as exc_info:
|
|
registry.load("dev/broken@1")
|
|
assert exc_info.value.code == "artifact_schema_load_failed"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 17. Valid JSON but not a dict → artifact_schema_load_failed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_non_dict_json_raises(tmp_path: Path) -> None:
|
|
(tmp_path / "dev").mkdir()
|
|
(tmp_path / "dev" / "array@1.json").write_text("[1, 2, 3]", encoding="utf-8")
|
|
registry = ArtifactSchemaRegistry(roots=[tmp_path])
|
|
with pytest.raises(MyDeepAgentError) as exc_info:
|
|
registry.load("dev/array@1")
|
|
assert exc_info.value.code == "artifact_schema_load_failed"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 18. Schema itself is invalid Draft 2020-12 → artifact_schema_load_failed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_invalid_draft_schema_raises(tmp_path: Path) -> None:
|
|
(tmp_path / "dev").mkdir()
|
|
bad_schema = {"type": "not_a_type"}
|
|
(tmp_path / "dev" / "bad@1.json").write_text(json.dumps(bad_schema), encoding="utf-8")
|
|
registry = ArtifactSchemaRegistry(roots=[tmp_path])
|
|
with pytest.raises(MyDeepAgentError) as exc_info:
|
|
registry.load("dev/bad@1")
|
|
assert exc_info.value.code == "artifact_schema_load_failed"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 19. Validator caching: _validator called twice returns same instance
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_validator_instance_cached(seed_registry: ArtifactSchemaRegistry) -> None:
|
|
# Access internal cache to verify the same validator instance is reused.
|
|
v1 = seed_registry._validator("dev/spec@1")
|
|
v2 = seed_registry._validator("dev/spec@1")
|
|
assert v1 is v2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 20. dev/spec@1 valid example produces ok=True (full fixture check)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_spec_valid_example_ok(seed_registry: ArtifactSchemaRegistry) -> None:
|
|
valid_spec: dict[str, Any] = {
|
|
"runId": "00000000-0000-4000-8000-000000000000",
|
|
"phaseKey": "spec",
|
|
"requirements": "User wants a CLI tool that analyzes log files.",
|
|
"acceptance_criteria": ["parses .log files", "outputs JSON summary"],
|
|
"approach": "Build a typer-based CLI using regex and json output.",
|
|
"risks": ["log format variations may break parser"],
|
|
}
|
|
result = seed_registry.validate("dev/spec@1", valid_spec)
|
|
assert result.ok is True
|
|
assert result.errors == ()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bonus: ValidationResult and ValidationFinding are frozen dataclasses
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_validation_result_frozen() -> None:
|
|
result = ValidationResult(ok=True)
|
|
with pytest.raises((AttributeError, TypeError)):
|
|
result.ok = False # type: ignore[misc]
|
|
|
|
|
|
def test_validation_finding_frozen() -> None:
|
|
finding = ValidationFinding(path="/foo", message="err", validator="type", expected="string")
|
|
with pytest.raises((AttributeError, TypeError)):
|
|
finding.path = "/bar" # type: ignore[misc]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bonus: known_schema_ids with nonexistent root dir is silently skipped
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_known_schema_ids_skips_nonexistent_root(tmp_path: Path) -> None:
|
|
missing = tmp_path / "does_not_exist"
|
|
registry = ArtifactSchemaRegistry(roots=[missing])
|
|
assert registry.known_schema_ids() == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bonus: validate with non-dict top-level data
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_validate_non_dict_data_returns_error(
|
|
seed_registry: ArtifactSchemaRegistry,
|
|
) -> None:
|
|
result = seed_registry.validate("dev/spec@1", [1, 2, 3])
|
|
assert result.ok is False
|
|
type_findings = [f for f in result.errors if f.validator == "type"]
|
|
assert len(type_findings) > 0
|