feat: tighten agentic runtime handoffs and quality gates

This commit is contained in:
chungyeong
2026-03-14 10:05:25 +09:00
parent 87bc0ffbfb
commit 7b95233edf
15 changed files with 1148 additions and 167 deletions

View File

@@ -12,10 +12,10 @@ import subprocess
import tempfile
import unittest
from pathlib import Path
from unittest.mock import MagicMock, call, patch
from unittest.mock import MagicMock, patch
from cross_eval.agent import AgentInvocationError, invoke_agent_agentic
from cross_eval.config import BUILTIN_AGENTS, _make_agentic
from cross_eval.config import _make_agentic
from cross_eval.models import (
AgentConfig,
AgentResult,
@@ -24,8 +24,6 @@ from cross_eval.models import (
)
from cross_eval.pipeline import (
_assert_base_repo_isolation,
_commit_iteration,
_finalize_worktree,
_has_agentic_steps,
_setup_worktree,
run_pipeline,
@@ -267,6 +265,7 @@ class TestInvokeAgentAgenticClaude(unittest.TestCase):
break
self.assertIsNotNone(agent_call, "Expected a subprocess.run call with 'claude'")
assert agent_call is not None
cmd = agent_call[0][0]
# No -p flag
@@ -274,6 +273,7 @@ class TestInvokeAgentAgenticClaude(unittest.TestCase):
# Prompt is delivered via stdin (input kwarg), not as a positional arg
input_data = agent_call[1].get("input")
self.assertIsNotNone(input_data)
assert input_data is not None
self.assertIn("implement feature X", input_data)
@@ -311,6 +311,7 @@ class TestInvokeAgentAgenticCodex(unittest.TestCase):
break
self.assertIsNotNone(agent_call, "Expected a subprocess.run call with 'codex'")
assert agent_call is not None
cmd = agent_call[0][0]
# Should have "-" sentinel at the end for stdin
@@ -318,6 +319,7 @@ class TestInvokeAgentAgenticCodex(unittest.TestCase):
# Stdin input should contain the prompt
input_data = agent_call[1].get("input")
self.assertIsNotNone(input_data)
assert input_data is not None
self.assertIn("implement feature Y", input_data)
@@ -435,6 +437,16 @@ class TestMakeAgenticClaude(unittest.TestCase):
self.assertNotIn("-p", agent.args)
self.assertIn("--setting-sources", agent.args)
def test_strips_dash_dash_print_alias(self) -> None:
agent = AgentConfig(
name="claude-coder",
command="claude",
args=["--print", "--setting-sources", "user"],
)
_make_agentic(agent)
self.assertTrue(agent.agentic)
self.assertNotIn("--print", agent.args)
def test_idempotent_when_no_dash_p(self) -> None:
agent = AgentConfig(
name="claude-coder",

View File

@@ -26,7 +26,6 @@ from cross_eval.models import (
PhaseConfig,
PipelineConfig,
PipelineResult,
ReviewMetrics,
StepConfig,
)
from cross_eval.pipeline import (
@@ -54,7 +53,7 @@ from cross_eval.prompts import (
_build_review_only_preset,
_build_simple_preset,
)
from cross_eval.report import build_report, parse_review_metrics, print_escalation_report
from cross_eval.report import build_report, parse_review_metrics
class BuiltinAgentConfigTest(unittest.TestCase):
def test_claude_builtin_agents_use_user_settings_and_disable_slash_commands(self) -> None:

View File

@@ -26,10 +26,9 @@ from cross_eval.models import (
IterationResult,
PipelineConfig,
PipelineResult,
ReviewMetrics,
StepConfig,
)
from cross_eval.pipeline import _format_execution_evidence, run_pipeline
from cross_eval.pipeline import _build_artifact_references, _format_execution_evidence, run_pipeline
from cross_eval.report import build_report
@@ -59,7 +58,7 @@ class TestFormatExecutionEvidence(unittest.TestCase):
self.assertIn("Exit code: 0", evidence)
self.assertIn("12.3s", evidence)
self.assertIn("claude --setting-sources user", evidence)
self.assertIn("Transcript excerpt", evidence)
self.assertNotIn("Transcript excerpt", evidence)
def test_multiple_results_separated(self) -> None:
r1 = AgentResult(
@@ -88,10 +87,60 @@ class TestFormatExecutionEvidence(unittest.TestCase):
transcript=long_transcript,
)
evidence = _format_execution_evidence({"key": result})
self.assertIn("truncated", evidence)
# The full 3000-char transcript should NOT appear
self.assertNotIn("x" * 3000, evidence)
def test_artifact_paths_included_when_run_dir_provided(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
result = AgentResult(
output="diff",
exit_code=0,
agent_name="coder",
step_name="coding",
duration_seconds=1.2,
transcript="stdout",
command_preview="claude ...",
)
evidence = _format_execution_evidence(
{"coding_output": result},
run_dir=Path(tmpdir),
iteration=2,
)
self.assertIn("v2/coding.md", evidence)
self.assertIn("v2/coding_transcript.md", evidence)
class TestArtifactReferences(unittest.TestCase):
"""Artifact references should prefer file paths and git state over inline text."""
def test_contains_input_refs_and_git_context(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
repo = Path(tmpdir) / "repo"
repo.mkdir()
(repo / "plan.md").write_text("plan", encoding="utf-8")
(repo / "checklist.md").write_text("checklist", encoding="utf-8")
import subprocess
subprocess.run(["git", "init"], cwd=repo, capture_output=True, check=True)
subprocess.run(["git", "config", "user.email", "test@test.com"], cwd=repo, capture_output=True, check=True)
subprocess.run(["git", "config", "user.name", "Test"], cwd=repo, capture_output=True, check=True)
subprocess.run(["git", "add", "."], cwd=repo, capture_output=True, check=True)
subprocess.run(["git", "commit", "-m", "init"], cwd=repo, capture_output=True, check=True)
refs = _build_artifact_references(
{
"plan_ref": str((repo / "plan.md").resolve()),
"checklist_ref": str((repo / "checklist.md").resolve()),
"docs_ref": "(none)",
},
cwd=repo,
run_dir=repo / ".cross-eval" / "output" / "run",
iteration=1,
worktree_path=None,
)
self.assertIn("Plan:", refs)
self.assertIn("Git commit:", refs)
self.assertIn("Suggested git commands", refs)
# ---------------------------------------------------------------------------
# 2. Evidence in reviewer prompts (integration)
@@ -162,7 +211,7 @@ class TestEvidenceInReviewerPrompt(unittest.TestCase):
]
self.assertTrue(len(review_prompts) >= 1)
review_prompt = review_prompts[0]["prompt"]
# Evidence section should reference the coding step's command
self.assertIn("Artifact References", review_prompt)
self.assertIn("Execution Evidence", review_prompt)
self.assertIn("claude-coder", review_prompt)

View File

@@ -11,7 +11,6 @@ from cross_eval.doctor import (
check_cli_installed,
check_config,
format_doctor_results,
run_doctor,
)
from cross_eval.demo import (
DEMO_CHECKLIST,

View File

@@ -8,9 +8,7 @@ from unittest.mock import patch
from cross_eval.config import BUILTIN_AGENTS
from cross_eval.models import (
AgentConfig,
AgentResult,
PhaseConfig,
PipelineConfig,
StepConfig,
)

View File

@@ -390,6 +390,7 @@ class TranscriptSavingRegressionTest(unittest.TestCase):
# Verify transcript files were saved
run_dir = result.run_dir
self.assertIsNotNone(run_dir)
assert run_dir is not None
coding_transcript = run_dir / "v1" / "coding_transcript.md"
review_transcript = run_dir / "v1" / "review_transcript.md"
self.assertTrue(

831
tests/test_runtime_misc.py Normal file
View File

@@ -0,0 +1,831 @@
from __future__ import annotations
import re
import subprocess
import tempfile
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
from cross_eval.agent import (
AgentInvocationError,
_build_transcript,
_classify_agent_failure,
invoke_agent,
invoke_agent_agentic,
)
from cross_eval.models import AgentConfig, AgentResult, ExecutionConfig, PipelineConfig, StepConfig
from cross_eval.pipeline import (
_commit_iteration,
_execute_parallel_batch,
_execute_step,
_finalize_worktree,
_format_runtime_error_markdown,
_maybe_save_step_transcript,
_snapshot_repo_state,
)
from cross_eval.runtime_env import (
build_execution_policy,
parse_dotenv,
resolve_env_files,
summarize_environment,
)
from cross_eval.worktree import WorktreeError, create_worktree, remove_worktree
def _init_git_repo(path: Path) -> None:
subprocess.run(["git", "init"], cwd=path, capture_output=True, check=True)
subprocess.run(
["git", "config", "user.email", "test@test.com"],
cwd=path,
capture_output=True,
check=True,
)
subprocess.run(
["git", "config", "user.name", "Test"],
cwd=path,
capture_output=True,
check=True,
)
(path / "README.md").write_text("# init\n", encoding="utf-8")
subprocess.run(["git", "add", "."], cwd=path, capture_output=True, check=True)
subprocess.run(
["git", "commit", "-m", "initial"],
cwd=path,
capture_output=True,
check=True,
)
class TestInvokeAgentRuntime(unittest.TestCase):
@patch("cross_eval.agent.subprocess.run")
def test_interactive_claude_reads_output_file(self, mock_run: MagicMock) -> None:
def _fake_run(cmd: list[str], **kwargs: object) -> MagicMock:
match = re.search(r"Write your complete output to (.+)\.$", cmd[-1])
self.assertIsNotNone(match)
assert match is not None
Path(match.group(1)).write_text("review result", encoding="utf-8")
return MagicMock(returncode=0, stdout="", stderr="")
mock_run.side_effect = _fake_run
agent = AgentConfig(
name="claude-reviewer",
command="claude",
args=["--model", "opus"],
system_prompt="system",
)
result = invoke_agent(agent, "inspect code", "review", quiet=True)
self.assertEqual(result.output, "review result")
called_cmd = mock_run.call_args[0][0]
self.assertIn("--system-prompt", called_cmd)
@patch("cross_eval.agent.subprocess.run")
def test_interactive_claude_falls_back_to_stdout(self, mock_run: MagicMock) -> None:
mock_run.return_value = MagicMock(returncode=0, stdout="stdout fallback", stderr="")
agent = AgentConfig(name="claude-reviewer", command="claude", args=["--model", "opus"])
result = invoke_agent(agent, "inspect code", "review", quiet=True)
self.assertEqual(result.output, "stdout fallback")
@patch("cross_eval.agent.subprocess.run")
def test_non_claude_wraps_system_prompt_in_stdin(self, mock_run: MagicMock) -> None:
mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="")
agent = AgentConfig(
name="custom-reviewer",
command="custom-cli",
args=["run"],
system_prompt="strict mode",
)
invoke_agent(agent, "check things", "review", quiet=True)
self.assertEqual(
mock_run.call_args.kwargs["input"],
"<system>\nstrict mode\n</system>\n\ncheck things",
)
@patch("cross_eval.agent.subprocess.run")
def test_failure_raises_structured_error(self, mock_run: MagicMock) -> None:
mock_run.return_value = MagicMock(returncode=1, stdout="", stderr="API Error: backend down")
agent = AgentConfig(name="codex-reviewer", command="codex", args=["exec", "-"])
with self.assertRaises(AgentInvocationError) as ctx:
invoke_agent(agent, "check", "review", quiet=True)
self.assertEqual(ctx.exception.failure_type, "API_ERROR")
self.assertIn("backend down", ctx.exception.raw_error)
def test_classify_unknown_failure(self) -> None:
failure_type, suggested_action = _classify_agent_failure("weird crash")
self.assertEqual(failure_type, "UNKNOWN")
self.assertIn("Inspect", suggested_action)
def test_build_transcript_includes_cwd_and_duration(self) -> None:
transcript = _build_transcript(
command_preview="claude -p",
stdout="ok",
stderr="",
exit_code=0,
duration_seconds=1.2,
cwd="/tmp/repo",
)
self.assertIn("## Working Directory", transcript)
self.assertIn("## Duration: 1.2s", transcript)
@patch("cross_eval.agent._Spinner")
@patch("cross_eval.agent.subprocess.run")
def test_timeout_stops_spinner(self, mock_run: MagicMock, mock_spinner: MagicMock) -> None:
spinner = mock_spinner.return_value
mock_run.side_effect = subprocess.TimeoutExpired(cmd=["claude"], timeout=12)
agent = AgentConfig(name="claude-reviewer", command="claude", args=["-p"])
with self.assertRaises(subprocess.TimeoutExpired):
invoke_agent(agent, "inspect code", "review", quiet=False, timeout=12)
spinner.stop.assert_called_once()
@patch("cross_eval.agent._Spinner")
@patch("cross_eval.agent.subprocess.run")
def test_generic_exception_stops_spinner(self, mock_run: MagicMock, mock_spinner: MagicMock) -> None:
spinner = mock_spinner.return_value
mock_run.side_effect = OSError("boom")
agent = AgentConfig(name="claude-reviewer", command="claude", args=["-p"])
with self.assertRaises(OSError):
invoke_agent(agent, "inspect code", "review", quiet=False)
spinner.stop.assert_called_once()
@patch("cross_eval.agent.logger.warning")
@patch("cross_eval.agent.subprocess.run")
def test_empty_output_logs_warning(self, mock_run: MagicMock, mock_warning: MagicMock) -> None:
mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="")
agent = AgentConfig(name="claude-reviewer", command="claude", args=["-p"])
result = invoke_agent(agent, "inspect code", "review", quiet=True)
self.assertEqual(result.output, "")
mock_warning.assert_called_once()
@patch("cross_eval.agent.subprocess.run")
def test_print_mode_claude_uses_native_system_prompt_flag(self, mock_run: MagicMock) -> None:
mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="")
agent = AgentConfig(
name="claude-reviewer",
command="claude",
args=["-p"],
system_prompt="be strict",
)
invoke_agent(agent, "review this", "review", quiet=True)
called_cmd = mock_run.call_args[0][0]
self.assertIn("--system-prompt", called_cmd)
self.assertEqual(mock_run.call_args.kwargs["input"], "review this")
@patch("cross_eval.agent.subprocess.run")
def test_interactive_failure_truncates_error_and_removes_output_file(
self,
mock_run: MagicMock,
) -> None:
seen_output_path: Path | None = None
def _fake_run(cmd: list[str], **kwargs: object) -> MagicMock:
nonlocal seen_output_path
match = re.search(r"Write your complete output to (.+)\.$", cmd[-1])
self.assertIsNotNone(match)
assert match is not None
seen_output_path = Path(match.group(1))
return MagicMock(returncode=1, stdout="", stderr="x" * 600)
mock_run.side_effect = _fake_run
agent = AgentConfig(name="claude-reviewer", command="claude", args=["--model", "opus"])
with self.assertRaises(AgentInvocationError) as ctx:
invoke_agent(agent, "inspect code", "review", quiet=True)
self.assertEqual(len(ctx.exception.raw_error), 503)
self.assertIsNotNone(seen_output_path)
assert seen_output_path is not None
self.assertFalse(seen_output_path.exists())
@patch("cross_eval.agent.logger.warning")
@patch("cross_eval.agent.subprocess.run")
def test_empty_output_with_stderr_logs_stderr_warning(
self,
mock_run: MagicMock,
mock_warning: MagicMock,
) -> None:
mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="stderr text")
agent = AgentConfig(name="claude-reviewer", command="claude", args=["-p"])
invoke_agent(agent, "inspect code", "review", quiet=True)
self.assertIn("stderr:", mock_warning.call_args[0][0])
class TestInvokeAgenticRuntime(unittest.TestCase):
@patch("cross_eval.worktree.capture_diff", return_value="diff --git a/file ...")
@patch("cross_eval.agent.subprocess.run")
def test_codex_agentic_adds_reasoning_and_system_wrapper(
self,
mock_run: MagicMock,
mock_diff: MagicMock,
) -> None:
mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="")
agent = AgentConfig(
name="codex-coder",
command="codex",
args=["exec", "--full-auto"],
system_prompt="strict mode",
reasoning_effort="high",
agentic=True,
)
with tempfile.TemporaryDirectory() as tmpdir:
repo = Path(tmpdir)
_init_git_repo(repo)
invoke_agent_agentic(agent, "fix bug", "coding", repo, quiet=True)
called_cmd = mock_run.call_args[0][0]
self.assertIn("-c", called_cmd)
self.assertEqual(called_cmd[-1], "-")
self.assertIn("<system>", mock_run.call_args.kwargs["input"])
@patch("cross_eval.agent._Spinner")
@patch("cross_eval.worktree.capture_diff", return_value="diff --git a/file ...")
@patch("cross_eval.agent.subprocess.run")
def test_agentic_claude_success_uses_system_prompt_and_spinner(
self,
mock_run: MagicMock,
mock_diff: MagicMock,
mock_spinner: MagicMock,
) -> None:
mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="")
agent = AgentConfig(
name="claude-coder",
command="claude",
args=["-p", "--print"],
system_prompt="stay in scope",
agentic=True,
)
with tempfile.TemporaryDirectory() as tmpdir:
repo = Path(tmpdir)
_init_git_repo(repo)
result = invoke_agent_agentic(agent, "fix bug", "coding", repo, quiet=False)
called_cmd = mock_run.call_args[0][0]
self.assertNotIn("-p", called_cmd)
self.assertIn("--system-prompt", called_cmd)
self.assertEqual(result.output, "diff --git a/file ...")
mock_spinner.return_value.stop.assert_called_once()
@patch("cross_eval.agent._Spinner")
def test_agentic_timeout_stops_spinner(self, mock_spinner: MagicMock) -> None:
spinner = mock_spinner.return_value
agent = AgentConfig(name="codex-coder", command="codex", args=["exec"], agentic=True)
with tempfile.TemporaryDirectory() as tmpdir:
repo = Path(tmpdir)
_init_git_repo(repo)
with patch(
"cross_eval.agent.subprocess.run",
side_effect=subprocess.TimeoutExpired(cmd=["codex"], timeout=20),
):
with self.assertRaises(subprocess.TimeoutExpired):
invoke_agent_agentic(agent, "fix bug", "coding", repo, quiet=False, timeout=20)
spinner.stop.assert_called_once()
@patch("cross_eval.agent.subprocess.run")
def test_agentic_nonzero_exit_raises_structured_error(self, mock_run: MagicMock) -> None:
mock_run.return_value = MagicMock(returncode=1, stdout="", stderr="unauthorized")
agent = AgentConfig(name="codex-coder", command="codex", args=["exec"], agentic=True)
with tempfile.TemporaryDirectory() as tmpdir:
repo = Path(tmpdir)
_init_git_repo(repo)
with self.assertRaises(AgentInvocationError) as ctx:
invoke_agent_agentic(agent, "fix bug", "coding", repo, quiet=True)
self.assertEqual(ctx.exception.failure_type, "AUTH")
@patch("cross_eval.agent._Spinner")
def test_agentic_generic_exception_stops_spinner(
self,
mock_spinner: MagicMock,
) -> None:
agent = AgentConfig(name="codex-coder", command="codex", args=["exec"], agentic=True)
with tempfile.TemporaryDirectory() as tmpdir:
repo = Path(tmpdir)
_init_git_repo(repo)
with patch("cross_eval.agent.subprocess.run", side_effect=OSError("boom")):
with self.assertRaises(OSError):
invoke_agent_agentic(agent, "fix bug", "coding", repo, quiet=False)
mock_spinner.return_value.stop.assert_called_once()
@patch("cross_eval.agent._Spinner")
@patch("cross_eval.agent.subprocess.run")
def test_agentic_failure_truncates_error(
self,
mock_run: MagicMock,
mock_spinner: MagicMock,
) -> None:
mock_run.return_value = MagicMock(returncode=1, stdout="", stderr="x" * 600)
agent = AgentConfig(name="codex-coder", command="codex", args=["exec"], agentic=True)
with tempfile.TemporaryDirectory() as tmpdir:
repo = Path(tmpdir)
_init_git_repo(repo)
with self.assertRaises(AgentInvocationError) as ctx:
invoke_agent_agentic(agent, "fix bug", "coding", repo, quiet=False)
self.assertEqual(len(ctx.exception.raw_error), 503)
mock_spinner.return_value.stop.assert_called_once()
@patch("cross_eval.agent._Spinner")
@patch("cross_eval.worktree.capture_diff", return_value="")
@patch("cross_eval.agent.subprocess.run")
def test_agentic_empty_diff_failure_truncates_error_and_stops_spinner(
self,
mock_run: MagicMock,
mock_diff: MagicMock,
mock_spinner: MagicMock,
) -> None:
mock_run.return_value = MagicMock(
returncode=0,
stdout="implemented",
stderr="permission denied " * 300,
)
agent = AgentConfig(name="codex-coder", command="codex", args=["exec"], agentic=True)
with tempfile.TemporaryDirectory() as tmpdir:
repo = Path(tmpdir)
_init_git_repo(repo)
with self.assertRaises(AgentInvocationError) as ctx:
invoke_agent_agentic(agent, "fix bug", "coding", repo, quiet=False)
self.assertLessEqual(len(ctx.exception.raw_error), 2003)
self.assertEqual(ctx.exception.failure_type, "WRITE_FAILURE")
mock_spinner.return_value.stop.assert_called_once()
class TestPipelineHelpers(unittest.TestCase):
@patch("cross_eval.worktree.commit_worktree", return_value=True)
def test_commit_iteration_logs_only_when_committed(self, mock_commit: MagicMock) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
_commit_iteration(Path(tmpdir), "review-fix", 2, "PASS")
mock_commit.assert_called_once()
def test_snapshot_repo_state_includes_untracked_digest(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
repo = Path(tmpdir)
_init_git_repo(repo)
(repo / "scratch.txt").write_text("draft", encoding="utf-8")
snapshot = _snapshot_repo_state(repo)
self.assertIn("UNTRACKED scratch.txt", snapshot)
def test_finalize_worktree_deletes_empty_branch(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
base = Path(tmpdir) / "repo"
base.mkdir()
_init_git_repo(base)
branch = "cross-eval/empty"
subprocess.run(
["git", "branch", branch, "HEAD"],
cwd=base,
capture_output=True,
check=True,
)
worktree = Path(tmpdir) / "wt"
subprocess.run(
["git", "worktree", "add", str(worktree), branch],
cwd=base,
capture_output=True,
check=True,
)
branch_result = _finalize_worktree(base, worktree, branch, "review-fix", "PASS")
self.assertIsNone(branch_result)
branches = subprocess.run(
["git", "branch", "--list", branch],
cwd=base,
capture_output=True,
text=True,
check=True,
)
self.assertEqual(branches.stdout.strip(), "")
def test_format_runtime_error_markdown_for_generic_exception(self) -> None:
markdown = _format_runtime_error_markdown(
RuntimeError("boom"),
step_name="review",
agent_name="claude-reviewer",
phase_name="review_fix",
)
self.assertIn("# Agent Error", markdown)
self.assertIn("review_fix", markdown)
self.assertIn("boom", markdown)
def test_maybe_save_step_transcript_returns_none_without_transcript(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
result = AgentResult(
output="ok",
exit_code=0,
agent_name="claude-reviewer",
step_name="review",
duration_seconds=0.1,
)
saved = _maybe_save_step_transcript(Path(tmpdir), 1, "review", result)
self.assertIsNone(saved)
@patch("cross_eval.pipeline.invoke_agent")
def test_execute_step_saves_timeout_markdown(self, mock_invoke: MagicMock) -> None:
mock_invoke.side_effect = subprocess.TimeoutExpired(
cmd=["claude"],
timeout=45,
output="partial output",
stderr="still running",
)
step = StepConfig(
name="review",
agent="claude-reviewer",
role="review",
prompt_template="default:review",
output_key="review_output",
)
config = PipelineConfig(
agents={
"claude-reviewer": AgentConfig(
name="claude-reviewer",
command="claude",
args=["-p"],
),
},
)
step_outputs: dict[str, str] = {}
step_results: dict[str, AgentResult] = {}
with tempfile.TemporaryDirectory() as tmpdir:
run_dir = Path(tmpdir)
with self.assertRaises(RuntimeError) as ctx:
_execute_step(
step,
config,
{"plan": "Plan", "checklist": "Checklist"},
"",
1,
3,
run_dir,
45,
False,
step_outputs,
step_results,
run_dir=run_dir,
output_iter=1,
)
self.assertIn("timed out after 45s", str(ctx.exception))
error_path = run_dir / "v1" / "review_error.md"
self.assertTrue(error_path.exists())
self.assertIn("# Agent Timeout", error_path.read_text(encoding="utf-8"))
@patch("cross_eval.pipeline.invoke_agent")
def test_execute_step_saves_runtime_error_markdown(self, mock_invoke: MagicMock) -> None:
mock_invoke.side_effect = AgentInvocationError(
agent_name="claude-reviewer",
step_name="review",
cmd_preview="claude -p",
raw_error="api broke",
failure_type="API_ERROR",
suggested_action="retry",
)
step = StepConfig(
name="review",
agent="claude-reviewer",
role="review",
prompt_template="default:review",
output_key="review_output",
)
config = PipelineConfig(
agents={
"claude-reviewer": AgentConfig(
name="claude-reviewer",
command="claude",
args=["-p"],
),
},
)
with tempfile.TemporaryDirectory() as tmpdir:
run_dir = Path(tmpdir)
with self.assertRaises(AgentInvocationError):
_execute_step(
step,
config,
{"plan": "Plan", "checklist": "Checklist"},
"",
1,
3,
run_dir,
45,
False,
{},
{},
run_dir=run_dir,
output_iter=1,
)
error_text = (run_dir / "v1" / "review_error.md").read_text(encoding="utf-8")
self.assertIn("API_ERROR", error_text)
self.assertIn("retry", error_text)
@patch("cross_eval.pipeline.invoke_agent")
def test_execute_parallel_batch_saves_success_and_timeout_error(self, mock_invoke: MagicMock) -> None:
def _fake_invoke(agent_config: AgentConfig, prompt: str, step_name: str, **kwargs: object) -> AgentResult:
if step_name == "review_ok":
return AgentResult(
output="VERDICT: PASS",
exit_code=0,
agent_name=agent_config.name,
step_name=step_name,
duration_seconds=0.1,
)
raise subprocess.TimeoutExpired(
cmd=["codex"],
timeout=30,
output="halfway",
stderr="timeout stderr",
)
mock_invoke.side_effect = _fake_invoke
batch = [
StepConfig(
name="review_ok",
agent="claude-reviewer",
role="review",
prompt_template="default:review",
output_key="review_ok",
parallel=True,
),
StepConfig(
name="review_slow",
agent="codex-reviewer",
role="review",
prompt_template="default:review",
output_key="review_slow",
parallel=True,
),
]
config = PipelineConfig(
agents={
"claude-reviewer": AgentConfig(name="claude-reviewer", command="claude", args=["-p"]),
"codex-reviewer": AgentConfig(name="codex-reviewer", command="codex", args=["exec", "-"]),
},
)
step_outputs: dict[str, str] = {}
step_results: dict[str, AgentResult] = {}
with tempfile.TemporaryDirectory() as tmpdir:
run_dir = Path(tmpdir)
with self.assertRaises(RuntimeError) as ctx:
_execute_parallel_batch(
batch,
config,
{"plan": "Plan", "checklist": "Checklist"},
"",
1,
3,
run_dir,
30,
False,
step_outputs,
step_results,
run_dir=run_dir,
output_iter=1,
)
self.assertIn("Successful outputs were saved for: review_ok", str(ctx.exception))
self.assertEqual(step_outputs["review_ok"], "VERDICT: PASS")
self.assertTrue((run_dir / "v1" / "review_ok.md").exists())
self.assertTrue((run_dir / "v1" / "review_slow_error.md").exists())
@patch("cross_eval.pipeline._execute_step")
def test_execute_parallel_batch_dry_run_uses_sequential_path(self, mock_step: MagicMock) -> None:
batch = [
StepConfig(
name="review_a",
agent="claude-reviewer",
role="review",
prompt_template="default:review",
output_key="review_a",
parallel=True,
),
StepConfig(
name="review_b",
agent="codex-reviewer",
role="review",
prompt_template="default:review",
output_key="review_b",
parallel=True,
),
]
config = PipelineConfig(agents={})
with tempfile.TemporaryDirectory() as tmpdir:
_execute_parallel_batch(
batch,
config,
{"plan": "Plan"},
"",
1,
3,
Path(tmpdir),
None,
True,
{},
{},
run_dir=Path(tmpdir),
output_iter=1,
)
self.assertEqual(mock_step.call_count, 2)
@patch("cross_eval.pipeline._execute_step")
def test_execute_parallel_batch_agentic_steps_fall_back_to_sequential(self, mock_step: MagicMock) -> None:
batch = [
StepConfig(
name="review_a",
agent="agentic-a",
role="review",
prompt_template="default:review",
output_key="review_a",
parallel=True,
),
StepConfig(
name="review_b",
agent="agentic-b",
role="review",
prompt_template="default:review",
output_key="review_b",
parallel=True,
),
]
config = PipelineConfig(
agents={
"agentic-a": AgentConfig(name="agentic-a", command="claude", agentic=True),
"agentic-b": AgentConfig(name="agentic-b", command="codex", agentic=True),
},
)
with tempfile.TemporaryDirectory() as tmpdir:
_execute_parallel_batch(
batch,
config,
{"plan": "Plan"},
"",
1,
3,
Path(tmpdir),
None,
False,
{},
{},
run_dir=Path(tmpdir),
output_iter=1,
worktree_path=Path(tmpdir),
)
self.assertEqual(mock_step.call_count, 2)
@patch("cross_eval.worktree.remove_worktree", side_effect=RuntimeError("cleanup failed"))
@patch("cross_eval.worktree.commit_worktree", side_effect=RuntimeError("commit failed"))
def test_finalize_worktree_handles_cleanup_failures(
self,
mock_commit: MagicMock,
mock_remove: MagicMock,
) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
branch = _finalize_worktree(
Path(tmpdir),
Path(tmpdir) / "wt",
"cross-eval/fail",
"review-fix",
"FAIL",
)
self.assertIsNone(branch)
class TestRuntimeEnvironmentHelpers(unittest.TestCase):
def test_parse_dotenv_handles_export_and_quotes(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
env_path = Path(tmpdir) / ".env"
env_path.write_text(
"export FOO='bar'\nBAR=\"line\\nvalue\"\nINVALID\n=skip\n",
encoding="utf-8",
)
values = parse_dotenv(env_path)
self.assertEqual(values["FOO"], "bar")
self.assertEqual(values["BAR"], "line\nvalue")
self.assertNotIn("INVALID", values)
def test_resolve_env_files_deduplicates_and_filters_missing(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir)
env_path = root / ".env"
env_path.write_text("FOO=bar\n", encoding="utf-8")
execution = ExecutionConfig(
env_files=[".env", str(env_path)],
auto_env_files=[".env", ".env.local"],
)
resolved = resolve_env_files(execution, root)
self.assertEqual(resolved, [env_path.resolve()])
def test_summarize_environment_hides_names_when_disabled(self) -> None:
execution = ExecutionConfig(expose_env_names=False, auto_context_targets=["postgres"])
summary = summarize_environment(
execution,
[],
{"DATABASE_URL": "postgres://localhost"},
{},
)
self.assertIn("names are hidden", summary)
self.assertIn("Execution targets hinted by the user: postgres", summary)
def test_build_execution_policy_for_minimal_mode(self) -> None:
policy = build_execution_policy(
ExecutionConfig(mode="agent-decides", command_policy="minimal"),
)
self.assertIn("Command policy: minimal", policy)
self.assertIn("Keep command usage minimal", policy)
class TestWorktreeFailures(unittest.TestCase):
@patch("cross_eval.worktree.subprocess.run")
def test_create_worktree_raises_when_branch_creation_fails(self, mock_run: MagicMock) -> None:
mock_run.side_effect = subprocess.CalledProcessError(
1,
["git", "branch"],
stderr="branch failed",
)
with tempfile.TemporaryDirectory() as tmpdir:
base = Path(tmpdir)
work_dir = base / "wt"
with self.assertRaises(WorktreeError) as ctx:
create_worktree(base, work_dir, "cross-eval/fail")
self.assertIn("Failed to create branch", str(ctx.exception))
@patch("cross_eval.worktree.subprocess.run")
def test_create_worktree_cleans_branch_on_worktree_failure(self, mock_run: MagicMock) -> None:
mock_run.side_effect = [
MagicMock(returncode=0),
subprocess.CalledProcessError(
1,
["git", "worktree", "add"],
stderr="worktree failed",
),
MagicMock(returncode=0),
]
with tempfile.TemporaryDirectory() as tmpdir:
base = Path(tmpdir)
work_dir = base / "wt"
with self.assertRaises(WorktreeError):
create_worktree(base, work_dir, "cross-eval/fail")
cleanup_call = mock_run.call_args_list[-1]
self.assertEqual(cleanup_call[0][0][:3], ["git", "branch", "-D"])
@patch("cross_eval.worktree.shutil.rmtree")
@patch("cross_eval.worktree.subprocess.run")
def test_remove_worktree_falls_back_to_prune(self, mock_run: MagicMock, mock_rmtree: MagicMock) -> None:
mock_run.side_effect = [
subprocess.CalledProcessError(1, ["git", "worktree", "remove"]),
MagicMock(returncode=0),
]
with tempfile.TemporaryDirectory() as tmpdir:
base = Path(tmpdir) / "repo"
work_dir = Path(tmpdir) / "wt"
base.mkdir()
work_dir.mkdir()
remove_worktree(base, work_dir)
resolved = work_dir.resolve()
mock_rmtree.assert_any_call(resolved, ignore_errors=True)
self.assertEqual(mock_run.call_args_list[-1][0][0], ["git", "worktree", "prune"])