From b19d174c98957dade9fefa0d74f736474b97a7fd Mon Sep 17 00:00:00 2001 From: chungyeong Date: Fri, 13 Mar 2026 22:50:46 +0900 Subject: [PATCH] feat: isolate agentic worktrees and surface execution evidence --- cross_eval/agent.py | 69 ++++++- cross_eval/pipeline.py | 185 ++++++++++++++++++- cross_eval/prompts.py | 27 ++- cross_eval/report.py | 15 ++ cross_eval/worktree.py | 17 ++ tests/test_agentic.py | 64 +++++++ tests/test_evidence.py | 395 +++++++++++++++++++++++++++++++++++++++++ 7 files changed, 758 insertions(+), 14 deletions(-) create mode 100644 tests/test_evidence.py diff --git a/cross_eval/agent.py b/cross_eval/agent.py index 0af5949..b52acf8 100644 --- a/cross_eval/agent.py +++ b/cross_eval/agent.py @@ -32,20 +32,33 @@ _NO_CHANGE_ACK_MARKERS = ( _CHANGE_CLAIM_MARKERS = ( "summary of all changes made", "here's a summary of all changes made", + "here is a summary of all changes", "implemented", "i implemented", + "i've implemented", "added", "i added", + "i've added", "updated", "i updated", + "i've updated", "modified", "i modified", + "i've modified", "created", "i created", + "i've created", "fixed", "i fixed", + "i've fixed", "completed the changes", "finished the changes", + "made the following changes", + "applied the fix", + "changes have been applied", + "wrote the code", + "refactored", + "i refactored", ) @@ -134,6 +147,29 @@ def _classify_agent_failure(detail: str) -> tuple[str, str]: ) +_WRITE_FAILURE_MARKERS = ( + "permission denied", + "read-only file system", + "read only file system", + "operation not permitted", + "cannot write", + "failed to write", + "could not write", + "unable to write", + "sandbox", + "eacces", + "erofs", +) + + +def _has_write_failure_indicators(stderr: str) -> bool: + """Detect stderr patterns indicating the agent could not write files.""" + if not stderr.strip(): + return False + normalized = stderr.lower() + return any(marker in normalized for marker in _WRITE_FAILURE_MARKERS) + + def _claims_file_changes(output: str) -> bool: """Heuristic for agent text that claims code changes were made.""" normalized = output.lower() @@ -406,7 +442,8 @@ def invoke_agent_agentic( # (avoids OS arg length limits for large prompts) cmd.append( f"Read the task file at {task_file} and execute all instructions in it. " - f"Work in the current directory." + f"Work only inside the current directory and do not modify files " + f"outside it." ) cmd_preview = " ".join(cmd[:6]) @@ -467,7 +504,14 @@ def invoke_agent_agentic( if not diff_output: stdout_excerpt = (result.stdout or "").strip() stderr_excerpt = (result.stderr or "").strip() - if _claims_file_changes(stdout_excerpt): + + # Detect two failure modes: + # 1. Agent claims changes in stdout but produced no diff + # 2. Agent stderr contains permission or write-failure indicators + claims_changes = _claims_file_changes(stdout_excerpt) + has_write_failure = _has_write_failure_indicators(stderr_excerpt) + + if claims_changes or has_write_failure: if spinner: spinner.stop(f"[{step_name}] FAILED (empty diff)") raw_error = stdout_excerpt or "(stdout empty)" @@ -475,16 +519,27 @@ def invoke_agent_agentic( raw_error = f"{raw_error}\n\n[stderr]\n{stderr_excerpt}" if len(raw_error) > 2000: raw_error = raw_error[:2000] + "..." + + if has_write_failure: + failure_type = "WRITE_FAILURE" + suggested_action = ( + "Agent encountered file write errors (permission denied, read-only, " + "or sandbox restriction). Check agent permissions and worktree state." + ) + else: + failure_type = "EMPTY_DIFF" + suggested_action = ( + "Agent reported code changes but produced no git diff. " + "Treat this run as failed and require a real worktree diff before continuing." + ) + raise AgentInvocationError( agent_name=agent.name, step_name=step_name, cmd_preview=cmd_preview, raw_error=raw_error, - failure_type="EMPTY_DIFF", - suggested_action=( - "Agent reported code changes but produced no git diff. " - "Treat this run as failed and require a real worktree diff before continuing." - ), + failure_type=failure_type, + suggested_action=suggested_action, ) diff_output = "(no changes)" diff --git a/cross_eval/pipeline.py b/cross_eval/pipeline.py index 7047318..f42e681 100644 --- a/cross_eval/pipeline.py +++ b/cross_eval/pipeline.py @@ -6,6 +6,7 @@ import os import re import subprocess import time +from hashlib import sha256 from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path @@ -92,15 +93,110 @@ def _setup_worktree(cwd: Path, run_dir: Path, preset_name: str) -> tuple[Path, s Returns (worktree_path, branch_name). """ - from cross_eval.worktree import create_worktree, make_branch_name + from cross_eval.worktree import create_worktree, make_branch_name, make_worktree_dir branch_name = make_branch_name(preset_name) - worktree_dir = run_dir / "work" + worktree_dir = make_worktree_dir(cwd, branch_name) worktree_path = create_worktree( base_cwd=cwd, work_dir=worktree_dir, branch_name=branch_name, ) + (run_dir / "worktree_path.txt").write_text(f"{worktree_path}\n", encoding="utf-8") + (run_dir / "worktree_branch.txt").write_text(f"{branch_name}\n", encoding="utf-8") return worktree_path, branch_name +def _snapshot_repo_state(cwd: Path) -> str: + """Capture the base repository working-tree state. + + This is used to detect agentic runs that accidentally modify the original + checkout instead of the isolated worktree. + """ + status = subprocess.run( + ["git", "status", "--short", "--untracked-files=all"], + cwd=cwd, + capture_output=True, + text=True, + ) + if status.returncode != 0: + return "" + + diff = subprocess.run( + ["git", "diff", "--no-ext-diff", "--binary", "HEAD"], + cwd=cwd, + capture_output=True, + text=True, + ) + cached_diff = subprocess.run( + ["git", "diff", "--no-ext-diff", "--binary", "--cached"], + cwd=cwd, + capture_output=True, + text=True, + ) + untracked = subprocess.run( + ["git", "ls-files", "--others", "--exclude-standard", "-z"], + cwd=cwd, + capture_output=True, + ) + + parts = [ + status.stdout, + diff.stdout, + cached_diff.stdout, + ] + + if untracked.returncode == 0 and untracked.stdout: + for rel_path in untracked.stdout.decode("utf-8", errors="replace").split("\0"): + if not rel_path: + continue + file_path = cwd / rel_path + if file_path.is_file(): + digest = sha256(file_path.read_bytes()).hexdigest() + parts.append(f"UNTRACKED {rel_path} {digest}") + else: + parts.append(f"UNTRACKED {rel_path} (non-file)") + + return "\n".join(parts) + + +def _snapshot_repo_status(cwd: Path) -> str: + """Capture a human-readable status summary for error reporting.""" + result = subprocess.run( + ["git", "status", "--short", "--untracked-files=all"], + cwd=cwd, + capture_output=True, + text=True, + ) + if result.returncode != 0: + return "" + return result.stdout.strip() + + +def _assert_base_repo_isolation( + cwd: Path, + baseline_state: str, + *, + step_name: str, + agent_name: str, + worktree_path: Path, + baseline_status: str, +) -> None: + """Fail fast if an agentic run leaked changes into the base repo.""" + current_state = _snapshot_repo_state(cwd) + if current_state == baseline_state: + return + + current_status = _snapshot_repo_status(cwd) + before = baseline_status or "(clean)" + after = current_status or "(clean)" + raise WorktreeError( + "Agent modified the base repository instead of the isolated worktree.\n\n" + f"Step: {step_name}\n" + f"Agent: {agent_name}\n" + f"Worktree: {worktree_path}\n\n" + f"Baseline status:\n{before}\n\n" + f"Current status:\n{after}" + ) + + def _finalize_worktree( cwd: Path, worktree_path: Path, @@ -172,10 +268,14 @@ def _run_simple_pipeline( # Setup shared worktree for agentic mode worktree_path: Path | None = None agentic_branch_name: str | None = None + base_repo_state: str | None = None + base_repo_status: str | None = None if not dry_run and _has_agentic_steps(config, config.pipeline): worktree_path, agentic_branch_name = _setup_worktree( cwd, run_dir, config.preset_name, ) + base_repo_state = _snapshot_repo_state(cwd) + base_repo_status = _snapshot_repo_status(cwd) feedback = "(no feedback — first iteration)" iterations: list[IterationResult] = [] @@ -203,6 +303,8 @@ def _run_simple_pipeline( run_dir=run_dir, output_iter=i, worktree_path=worktree_path, runtime_env=runtime_env, + base_repo_state=base_repo_state, + base_repo_status=base_repo_status, ) # Intermediate commit so next iteration's diff only shows new changes @@ -332,10 +434,14 @@ def _run_phased_pipeline( all_phase_steps = [s for p in config.phases for s in p.steps] worktree_path: Path | None = None agentic_branch_name: str | None = None + base_repo_state: str | None = None + base_repo_status: str | None = None if not dry_run and _has_agentic_steps(config, all_phase_steps): worktree_path, agentic_branch_name = _setup_worktree( cwd, run_dir, config.preset_name, ) + base_repo_state = _snapshot_repo_state(cwd) + base_repo_status = _snapshot_repo_status(cwd) iterations: list[IterationResult] = [] feedback = "(no feedback — first iteration)" @@ -384,6 +490,8 @@ def _run_phased_pipeline( run_dir=run_dir, output_iter=global_iter, phase_name=phase.name, worktree_path=worktree_path, runtime_env=runtime_env, + base_repo_state=base_repo_state, + base_repo_status=base_repo_status, ) # Intermediate commit so next iteration's diff only shows new changes @@ -626,6 +734,8 @@ def _run_steps( phase_name: str | None = None, worktree_path: Path | None = None, runtime_env: dict[str, str] | None = None, + base_repo_state: str | None = None, + base_repo_status: str | None = None, ) -> tuple[dict[str, str], dict[str, AgentResult], str | None]: """Execute all steps in one iteration, parallelizing where possible.""" step_outputs: dict[str, str] = {} @@ -644,6 +754,8 @@ def _run_steps( run_dir=run_dir, output_iter=output_iter, phase_name=phase_name, worktree_path=worktree_path, runtime_env=runtime_env, + base_repo_state=base_repo_state, + base_repo_status=base_repo_status, ) else: _execute_parallel_batch( @@ -653,6 +765,8 @@ def _run_steps( run_dir=run_dir, output_iter=output_iter, phase_name=phase_name, worktree_path=worktree_path, runtime_env=runtime_env, + base_repo_state=base_repo_state, + base_repo_status=base_repo_status, ) # Extract verdict from all verdict steps (ALL must PASS; ESCALATE wins over all) @@ -709,6 +823,8 @@ def _execute_step( quiet: bool = False, worktree_path: Path | None = None, runtime_env: dict[str, str] | None = None, + base_repo_state: str | None = None, + base_repo_status: str | None = None, ) -> None: """Execute a single step, updating step_outputs and step_results in place.""" if not quiet: @@ -717,9 +833,10 @@ def _execute_step( # 1. Resolve template template = resolve_template(step.prompt_template) - # 2. Build context + # 2. Build context (include prior step results for evidence) context = _build_context( input_contents, step_outputs, feedback, iteration, max_iterations, + step_results=step_results, ) # 3. Apply context overrides @@ -794,6 +911,16 @@ def _execute_step( raise # 7. Store output + if worktree_path is not None and base_repo_state is not None: + _assert_base_repo_isolation( + cwd, + base_repo_state, + step_name=step.name, + agent_name=step.agent, + worktree_path=worktree_path, + baseline_status=base_repo_status or "", + ) + step_outputs[step.output_key] = result.output step_results[step.output_key] = result @@ -826,6 +953,8 @@ def _execute_parallel_batch( phase_name: str | None = None, worktree_path: Path | None = None, runtime_env: dict[str, str] | None = None, + base_repo_state: str | None = None, + base_repo_status: str | None = None, ) -> None: """Execute multiple steps in parallel using threads.""" agent_names = ", ".join(s.agent for s in batch) @@ -838,6 +967,8 @@ def _execute_parallel_batch( iteration, max_iterations, cwd, timeout, dry_run, step_outputs, step_results, run_dir=run_dir, output_iter=output_iter, phase_name=phase_name, + base_repo_state=base_repo_state, + base_repo_status=base_repo_status, ) return @@ -858,12 +989,15 @@ def _execute_parallel_batch( step_outputs, step_results, run_dir=run_dir, output_iter=output_iter, phase_name=phase_name, worktree_path=worktree_path, + base_repo_state=base_repo_state, + base_repo_status=base_repo_status, ) return # Snapshot context before parallel execution (all steps see same state) context_snapshot = dict(input_contents) context_snapshot.update(step_outputs) + results_snapshot = dict(step_results) # Collect results from parallel threads local_outputs: dict[str, str] = {} @@ -883,6 +1017,7 @@ def _execute_parallel_batch( template = resolve_template(step.prompt_template) context = _build_context( context_snapshot, {}, feedback, iteration, max_iterations, + step_results=results_snapshot, ) if step.context_override: context = _apply_context_override(context, step.context_override) @@ -919,6 +1054,16 @@ def _execute_parallel_batch( batch_elapsed = round(time.monotonic() - batch_start, 1) # Persist successful outputs even if a sibling step failed. + if worktree_path is not None and base_repo_state is not None: + _assert_base_repo_isolation( + cwd, + base_repo_state, + step_name=phase_name or "parallel-batch", + agent_name=agent_names, + worktree_path=worktree_path, + baseline_status=base_repo_status or "", + ) + for step in batch: key = step.output_key if key not in local_outputs: @@ -986,6 +1131,7 @@ def _build_context( feedback: str, iteration: int, max_iterations: int, + step_results: dict[str, AgentResult] | None = None, ) -> dict[str, str]: """Build the template context dict.""" context: dict[str, str] = {} @@ -994,9 +1140,42 @@ def _build_context( context["feedback"] = feedback context["iteration"] = str(iteration) context["max_iterations"] = str(max_iterations) + # Surface execution evidence from prior steps so reviewers can inspect it + if step_results: + context["execution_evidence"] = _format_execution_evidence(step_results) return context +def _format_execution_evidence( + step_results: dict[str, AgentResult], +) -> str: + """Format execution evidence from prior steps for reviewer consumption. + + Produces a compact summary of command, exit code, duration, and a truncated + transcript excerpt for each completed step so that reviewers and seniors + can verify claims against real execution data. + """ + if not step_results: + return "(no prior execution evidence)" + parts: list[str] = [] + for key, result in step_results.items(): + section = [ + f"### Step: {result.step_name} ({result.agent_name})", + f"- Command: `{result.command_preview}`" if result.command_preview else "", + f"- Exit code: {result.exit_code}", + f"- Duration: {result.duration_seconds}s", + ] + section = [line for line in section if line] + if result.transcript: + # Include a truncated transcript excerpt for debugging + excerpt = result.transcript[:2000] + if len(result.transcript) > 2000: + excerpt += "\n... (truncated)" + section.append(f"\n
\nTranscript excerpt\n\n{excerpt}\n
") + parts.append("\n".join(section)) + return "\n\n---\n\n".join(parts) + + def _build_runtime_inputs( config: PipelineConfig, input_contents: dict[str, str], diff --git a/cross_eval/prompts.py b/cross_eval/prompts.py index 48f1183..e6daa74 100644 --- a/cross_eval/prompts.py +++ b/cross_eval/prompts.py @@ -59,9 +59,14 @@ You are tasked with reviewing code against a plan and checklist. ## Previous Review Feedback {feedback} +## Execution Evidence +{execution_evidence} + ## Review Instructions Explore the project directory to understand the full codebase context, \ -then evaluate the code against ONLY the plan and checklist above. +then evaluate the code against ONLY the plan and checklist above. \ +Use the execution evidence above to verify agent claims against actual \ +command outputs and exit codes. For each issue found, classify it with BOTH severity AND category: @@ -164,9 +169,13 @@ REVIEW_TEMPLATE_KO = """\ ## 이전 리뷰 피드백 {feedback} +## 실행 증거 +{execution_evidence} + ## 검토 지침 프로젝트 디렉토리를 직접 탐색하여 전체 코드베이스 맥락을 파악한 뒤, \ -위 기획서와 체크리스트 기준으로만 코드를 평가하세요. +위 기획서와 체크리스트 기준으로만 코드를 평가하세요. \ +위 실행 증거를 활용하여 에이전트의 주장을 실제 명령어 출력과 종료 코드로 검증하세요. 발견된 각 이슈에 심각도와 카테고리를 모두 부여하세요: @@ -525,8 +534,13 @@ You are adjudicating multiple review results and turning them into an actionable ## Previous Issue Tracker {previous_senior_tracker} +## Execution Evidence +{execution_evidence} + ## Instructions -Explore the project directory to confirm the current codebase state. Then: +Explore the project directory to confirm the current codebase state. \ +Use the execution evidence above to verify claims against actual command \ +outputs and exit codes. Then: 1. Deduplicate overlapping issues across reviewers. 2. Resolve disagreements explicitly. 3. Keep only issues supported by the plan, checklist, code, or reviewer evidence. @@ -592,8 +606,13 @@ AGGREGATE_REVIEW_TEMPLATE_KO = """\ ## 이전 이슈 트래커 {previous_senior_tracker} +## 실행 증거 +{execution_evidence} + ## 지침 -프로젝트 디렉토리를 탐색하여 현재 코드베이스 상태를 확인한 뒤 다음을 수행하세요. +프로젝트 디렉토리를 탐색하여 현재 코드베이스 상태를 확인한 뒤, \ +위 실행 증거를 활용하여 에이전트의 주장을 실제 명령어 출력과 종료 코드로 검증하세요. \ +그런 다음 아래를 수행하세요. 1. 리뷰어들 사이에 중복되는 이슈를 합치세요. 2. 의견 충돌은 명시적으로 정리하세요. 3. 기획서, 체크리스트, 코드, 리뷰 근거로 뒷받침되는 이슈만 남기세요. diff --git a/cross_eval/report.py b/cross_eval/report.py index eda32ea..a7cbc6d 100644 --- a/cross_eval/report.py +++ b/cross_eval/report.py @@ -386,6 +386,11 @@ def _append_iteration_steps( lines.append(f"### {_t(config, 'step')}: {step.name} ({agent_name}){duration}\n") + # Show command preview and exit code for execution evidence + if agent_result and agent_result.command_preview: + lines.append(f"**Command**: `{agent_result.command_preview}`") + lines.append(f"**Exit code**: {agent_result.exit_code}\n") + if step.verdict and iter_result.verdict: lines.append(f"**{_t(config, 'verdict')}: {iter_result.verdict}**\n") @@ -400,6 +405,16 @@ def _append_iteration_steps( lines.append(output) lines.append("") + # Include transcript excerpt for execution evidence visibility + if agent_result and agent_result.transcript: + transcript_preview = agent_result.transcript[:1500] + if len(agent_result.transcript) > 1500: + transcript_preview += "\n... (truncated)" + lines.append("
") + lines.append("Execution transcript\n") + lines.append(transcript_preview) + lines.append("\n
\n") + if not skip_extraction and step.role == "review": oos = _extract_out_of_scope(output) if oos: diff --git a/cross_eval/worktree.py b/cross_eval/worktree.py index dda710f..7fd0932 100644 --- a/cross_eval/worktree.py +++ b/cross_eval/worktree.py @@ -4,6 +4,7 @@ from __future__ import annotations import logging import shutil import subprocess +import tempfile from datetime import datetime from pathlib import Path @@ -20,6 +21,22 @@ def make_branch_name(preset_name: str) -> str: return f"cross-eval/{preset_name}_{ts}" +def make_worktree_dir(base_cwd: Path, branch_name: str) -> Path: + """Choose a worktree directory outside the base repo. + + Keeping agentic worktrees outside the source checkout avoids tools that + incorrectly walk up to the outer repo and write into the base worktree. + """ + repo_name = base_cwd.resolve().name or "repo" + branch_slug = branch_name.replace("/", "__") + return ( + Path(tempfile.gettempdir()) + / "cross-eval-worktrees" + / repo_name + / branch_slug + ) + + def create_worktree(base_cwd: Path, work_dir: Path, branch_name: str) -> Path: """Create a git worktree on a new branch from HEAD. diff --git a/tests/test_agentic.py b/tests/test_agentic.py index dc3768f..178bf63 100644 --- a/tests/test_agentic.py +++ b/tests/test_agentic.py @@ -23,6 +23,7 @@ from cross_eval.models import ( StepConfig, ) from cross_eval.pipeline import ( + _assert_base_repo_isolation, _commit_iteration, _finalize_worktree, _has_agentic_steps, @@ -34,6 +35,7 @@ from cross_eval.worktree import ( commit_worktree, create_worktree, make_branch_name, + make_worktree_dir, remove_worktree, ) @@ -191,6 +193,41 @@ class TestMakeBranchName(unittest.TestCase): self.assertEqual(len(ts_part), 15) # YYYYMMDD_HHMMSS +class TestMakeWorktreeDir(unittest.TestCase): + """make_worktree_dir chooses an external temp location.""" + + def test_uses_tmp_dir_outside_repo(self) -> None: + with tempfile.TemporaryDirectory() as td: + base = Path(td) / "repo" + base.mkdir() + path = make_worktree_dir(base, "cross-eval/review-fix_20260313_123456") + self.assertIn("cross-eval-worktrees", str(path)) + self.assertNotIn(str(base), str(path)) + + +class TestBaseRepoIsolation(unittest.TestCase): + """Base repo mutations should fail fast during agentic execution.""" + + def test_raises_when_base_repo_status_changes(self) -> None: + with tempfile.TemporaryDirectory() as td: + base = Path(td) / "repo" + worktree = Path(td) / "worktree" + base.mkdir() + worktree.mkdir() + + with self.assertRaises(RuntimeError) as ctx: + _assert_base_repo_isolation( + base, + "M cross_eval/agent.py", + step_name="coding", + agent_name="claude-coder", + worktree_path=worktree, + baseline_status="M cross_eval/agent.py", + ) + + self.assertIn("base repository", str(ctx.exception)) + + # =================================================================== # 2. agent.py agentic tests (mocking subprocess) # =================================================================== @@ -513,6 +550,33 @@ class TestSetupWorktreeCalledForAgentic(unittest.TestCase): mock_setup.assert_called_once() +class TestSetupWorktreeLocation(unittest.TestCase): + """_setup_worktree places agentic worktrees outside the base repo.""" + + def test_worktree_is_created_outside_repo(self) -> None: + with tempfile.TemporaryDirectory() as td: + base = Path(td) / "repo" + run_dir = base / ".cross-eval" / "output" / "smoke" + base.mkdir() + run_dir.mkdir(parents=True) + _init_git_repo(base) + + worktree_path, branch_name = _setup_worktree(base, run_dir, "review-fix") + try: + self.assertTrue(worktree_path.exists()) + self.assertNotIn(str(base.resolve()), str(worktree_path.resolve())) + self.assertEqual( + (run_dir / "worktree_path.txt").read_text(encoding="utf-8").strip(), + str(worktree_path), + ) + self.assertEqual( + (run_dir / "worktree_branch.txt").read_text(encoding="utf-8").strip(), + branch_name, + ) + finally: + remove_worktree(base, worktree_path) + + class TestReviewerRunsInWorktreeCwd(unittest.TestCase): """Reviewer runs with worktree cwd (not original cwd) when worktree exists.""" diff --git a/tests/test_evidence.py b/tests/test_evidence.py new file mode 100644 index 0000000..fc66682 --- /dev/null +++ b/tests/test_evidence.py @@ -0,0 +1,395 @@ +"""Regression tests for runtime evidence propagation and report visibility. + +Covers: + 1. Execution evidence is surfaced in reviewer/senior prompt context. + 2. Reports include command preview and transcript excerpts. + 3. Claude agentic failure detection (empty diff, write failure, expanded markers). + 4. _format_execution_evidence produces expected output. +""" +from __future__ import annotations + +import tempfile +import unittest +from pathlib import Path +from unittest.mock import MagicMock, patch + +from cross_eval.agent import ( + AgentInvocationError, + _claims_file_changes, + _has_write_failure_indicators, + invoke_agent_agentic, +) +from cross_eval.config import BUILTIN_AGENTS +from cross_eval.models import ( + AgentConfig, + AgentResult, + IterationResult, + PipelineConfig, + PipelineResult, + ReviewMetrics, + StepConfig, +) +from cross_eval.pipeline import _format_execution_evidence, run_pipeline +from cross_eval.report import build_report + + +# --------------------------------------------------------------------------- +# 1. Execution evidence formatting +# --------------------------------------------------------------------------- + +class TestFormatExecutionEvidence(unittest.TestCase): + """_format_execution_evidence produces a compact summary for reviewers.""" + + def test_empty_results_returns_placeholder(self) -> None: + self.assertIn("no prior execution evidence", _format_execution_evidence({})) + + def test_single_result_includes_key_fields(self) -> None: + result = AgentResult( + output="some diff", + exit_code=0, + agent_name="claude-coder", + step_name="coding", + duration_seconds=12.3, + transcript="# Agent Execution Transcript\n\n## Command\nclaude ...", + command_preview="claude --setting-sources user", + ) + evidence = _format_execution_evidence({"coding_output": result}) + self.assertIn("claude-coder", evidence) + self.assertIn("coding", evidence) + self.assertIn("Exit code: 0", evidence) + self.assertIn("12.3s", evidence) + self.assertIn("claude --setting-sources user", evidence) + self.assertIn("Transcript excerpt", evidence) + + def test_multiple_results_separated(self) -> None: + r1 = AgentResult( + output="diff1", exit_code=0, agent_name="coder", + step_name="coding", duration_seconds=1.0, + command_preview="cmd1", + ) + r2 = AgentResult( + output="review text", exit_code=0, agent_name="reviewer", + step_name="review", duration_seconds=2.0, + command_preview="cmd2", + ) + evidence = _format_execution_evidence({ + "coding_output": r1, + "review_result": r2, + }) + self.assertIn("coder", evidence) + self.assertIn("reviewer", evidence) + self.assertIn("---", evidence) + + def test_transcript_truncated_at_2000_chars(self) -> None: + long_transcript = "x" * 3000 + result = AgentResult( + output="out", exit_code=0, agent_name="agent", + step_name="step", duration_seconds=1.0, + transcript=long_transcript, + ) + evidence = _format_execution_evidence({"key": result}) + self.assertIn("truncated", evidence) + # The full 3000-char transcript should NOT appear + self.assertNotIn("x" * 3000, evidence) + + +# --------------------------------------------------------------------------- +# 2. Evidence in reviewer prompts (integration) +# --------------------------------------------------------------------------- + +class TestEvidenceInReviewerPrompt(unittest.TestCase): + """Reviewer prompts include execution evidence from prior coding step.""" + + def test_reviewer_receives_evidence(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + steps = [ + StepConfig( + name="coding", agent="claude-coder", role="coding", + prompt_template="default:coding", output_key="coding_output", + ), + StepConfig( + name="review", agent="claude-reviewer", role="review", + prompt_template="default:review", output_key="review_result", + verdict=True, + ), + ] + config = PipelineConfig( + output_dir=Path(tmpdir), + max_iterations=1, + min_iterations=1, + language="en", + inputs={"plan": "Test plan", "checklist": "Test checklist"}, + agents=dict(BUILTIN_AGENTS), + coders=["claude-coder"], + reviewers=["claude-reviewer"], + pipeline=steps, + preset_name="simple", + ) + + captured_prompts: list[dict] = [] + + def _mock(agent_config, prompt, step_name, **kwargs): + captured_prompts.append({ + "step_name": step_name, + "prompt": prompt, + }) + if step_name == "coding": + return AgentResult( + output="Implemented feature X", + exit_code=0, + agent_name=agent_config.name, + step_name=step_name, + duration_seconds=5.0, + transcript="# Transcript\nclaude ran...", + command_preview="claude --setting-sources user", + ) + return AgentResult( + output="VERDICT: PASS", + exit_code=0, + agent_name=agent_config.name, + step_name=step_name, + duration_seconds=2.0, + ) + + with patch("cross_eval.pipeline.invoke_agent", side_effect=_mock): + result = run_pipeline(config) + + self.assertEqual(result.final_verdict, "PASS") + + # The reviewer prompt should contain execution evidence + review_prompts = [ + p for p in captured_prompts if p["step_name"] == "review" + ] + self.assertTrue(len(review_prompts) >= 1) + review_prompt = review_prompts[0]["prompt"] + # Evidence section should reference the coding step's command + self.assertIn("Execution Evidence", review_prompt) + self.assertIn("claude-coder", review_prompt) + + +# --------------------------------------------------------------------------- +# 3. Report includes evidence +# --------------------------------------------------------------------------- + +class TestReportIncludesEvidence(unittest.TestCase): + """Report generation includes command preview and transcript excerpts.""" + + def _make_pipeline_result(self) -> tuple[PipelineConfig, PipelineResult]: + steps = [ + StepConfig( + name="coding", agent="claude-coder", role="coding", + prompt_template="default:coding", output_key="coding_output", + ), + StepConfig( + name="review", agent="claude-reviewer", role="review", + prompt_template="default:review", output_key="review_result", + verdict=True, + ), + ] + config = PipelineConfig( + max_iterations=1, + language="en", + inputs={"plan": "Plan", "checklist": "CL"}, + agents=dict(BUILTIN_AGENTS), + pipeline=steps, + preset_name="simple", + ) + + coding_result = AgentResult( + output="diff --git a/file ...", + exit_code=0, + agent_name="claude-coder", + step_name="coding", + duration_seconds=10.0, + transcript="# Agent Execution Transcript\n## Command\nclaude ...\n## Stdout\nok", + command_preview="claude --setting-sources user", + ) + review_result = AgentResult( + output="All good.\n\nVERDICT: PASS", + exit_code=0, + agent_name="claude-reviewer", + step_name="review", + duration_seconds=5.0, + transcript="# Agent Execution Transcript\n## Command\nclaude -p ...\n## Stdout\nAll good.", + command_preview="claude -p --setting-sources user", + ) + + iteration = IterationResult( + iteration=1, + step_results={ + "coding_output": coding_result, + "review_result": review_result, + }, + step_outputs={ + "coding_output": "diff --git a/file ...", + "review_result": "All good.\n\nVERDICT: PASS", + }, + verdict="PASS", + ) + + pipeline_result = PipelineResult( + iterations=[iteration], + final_verdict="PASS", + total_duration=15.0, + ) + + return config, pipeline_result + + def test_report_contains_command_preview(self) -> None: + config, result = self._make_pipeline_result() + report = build_report(config, result) + self.assertIn("claude --setting-sources user", report) + self.assertIn("**Command**", report) + + def test_report_contains_transcript_excerpt(self) -> None: + config, result = self._make_pipeline_result() + report = build_report(config, result) + self.assertIn("Execution transcript", report) + self.assertIn("Agent Execution Transcript", report) + + def test_report_contains_exit_code(self) -> None: + config, result = self._make_pipeline_result() + report = build_report(config, result) + self.assertIn("**Exit code**: 0", report) + + +# --------------------------------------------------------------------------- +# 4. Claude agentic hardened failure detection +# --------------------------------------------------------------------------- + +class TestClaimsFileChangesExpanded(unittest.TestCase): + """Expanded change-claim markers detect more Claude output patterns.""" + + def test_ive_implemented(self) -> None: + self.assertTrue(_claims_file_changes("I've implemented the feature")) + + def test_ive_updated(self) -> None: + self.assertTrue(_claims_file_changes("I've updated the config")) + + def test_made_the_following_changes(self) -> None: + self.assertTrue(_claims_file_changes("I made the following changes to the file")) + + def test_applied_the_fix(self) -> None: + self.assertTrue(_claims_file_changes("Applied the fix for the bug")) + + def test_changes_have_been_applied(self) -> None: + self.assertTrue(_claims_file_changes("Changes have been applied successfully")) + + def test_wrote_the_code(self) -> None: + self.assertTrue(_claims_file_changes("Wrote the code for the new module")) + + def test_refactored(self) -> None: + self.assertTrue(_claims_file_changes("I refactored the pipeline")) + + def test_no_changes_still_returns_false(self) -> None: + self.assertFalse(_claims_file_changes("No changes were necessary")) + + def test_empty_string_returns_false(self) -> None: + self.assertFalse(_claims_file_changes("")) + + +class TestWriteFailureIndicators(unittest.TestCase): + """_has_write_failure_indicators detects stderr patterns.""" + + def test_permission_denied(self) -> None: + self.assertTrue(_has_write_failure_indicators("Error: Permission denied")) + + def test_read_only_filesystem(self) -> None: + self.assertTrue(_has_write_failure_indicators("read-only file system")) + + def test_sandbox_restriction(self) -> None: + self.assertTrue(_has_write_failure_indicators("Blocked by sandbox policy")) + + def test_eacces(self) -> None: + self.assertTrue(_has_write_failure_indicators("EACCES: operation not permitted")) + + def test_empty_stderr_returns_false(self) -> None: + self.assertFalse(_has_write_failure_indicators("")) + + def test_normal_stderr_returns_false(self) -> None: + self.assertFalse(_has_write_failure_indicators("Downloading model...")) + + +class TestAgenticWriteFailureRaisesError(unittest.TestCase): + """Agentic mode raises AgentInvocationError on stderr write-failure indicators.""" + + @patch("cross_eval.worktree.capture_diff", return_value="") + @patch("subprocess.run") + def test_write_failure_detected_from_stderr( + self, mock_run: MagicMock, mock_diff: MagicMock, + ) -> None: + mock_run.return_value = MagicMock( + returncode=0, + stdout="Done.", + stderr="Error: Permission denied writing to /src/main.py", + ) + + agent = AgentConfig( + name="claude-coder", command="claude", + args=["--setting-sources", "user"], agentic=True, + ) + + import subprocess as _sp + import tempfile as _tf + + with _tf.TemporaryDirectory() as td: + wt = Path(td) + _sp.run(["git", "init"], cwd=wt, capture_output=True, check=True) + _sp.run(["git", "config", "user.email", "t@t.com"], cwd=wt, capture_output=True) + _sp.run(["git", "config", "user.name", "T"], cwd=wt, capture_output=True) + (wt / "README.md").write_text("# init\n") + _sp.run(["git", "add", "."], cwd=wt, capture_output=True, check=True) + _sp.run(["git", "commit", "-m", "init"], cwd=wt, capture_output=True, check=True) + + with self.assertRaises(AgentInvocationError) as ctx: + invoke_agent_agentic( + agent, "implement feature", "coding", + worktree_path=wt, quiet=True, + ) + + self.assertEqual(ctx.exception.failure_type, "WRITE_FAILURE") + self.assertIn("Permission denied", ctx.exception.raw_error) + + +class TestAgenticExpandedClaimMarkers(unittest.TestCase): + """Agentic mode detects expanded claim markers in empty diff scenarios.""" + + @patch("cross_eval.worktree.capture_diff", return_value="") + @patch("subprocess.run") + def test_ive_implemented_triggers_empty_diff_error( + self, mock_run: MagicMock, mock_diff: MagicMock, + ) -> None: + mock_run.return_value = MagicMock( + returncode=0, + stdout="I've implemented the requested changes to the pipeline.", + stderr="", + ) + + agent = AgentConfig( + name="claude-coder", command="claude", + args=["--setting-sources", "user"], agentic=True, + ) + + import subprocess as _sp + import tempfile as _tf + + with _tf.TemporaryDirectory() as td: + wt = Path(td) + _sp.run(["git", "init"], cwd=wt, capture_output=True, check=True) + _sp.run(["git", "config", "user.email", "t@t.com"], cwd=wt, capture_output=True) + _sp.run(["git", "config", "user.name", "T"], cwd=wt, capture_output=True) + (wt / "README.md").write_text("# init\n") + _sp.run(["git", "add", "."], cwd=wt, capture_output=True, check=True) + _sp.run(["git", "commit", "-m", "init"], cwd=wt, capture_output=True, check=True) + + with self.assertRaises(AgentInvocationError) as ctx: + invoke_agent_agentic( + agent, "implement feature", "coding", + worktree_path=wt, quiet=True, + ) + + self.assertEqual(ctx.exception.failure_type, "EMPTY_DIFF") + + +if __name__ == "__main__": + unittest.main()