diff --git a/cross_eval/agent.py b/cross_eval/agent.py
index 0af5949..b52acf8 100644
--- a/cross_eval/agent.py
+++ b/cross_eval/agent.py
@@ -32,20 +32,33 @@ _NO_CHANGE_ACK_MARKERS = (
_CHANGE_CLAIM_MARKERS = (
"summary of all changes made",
"here's a summary of all changes made",
+ "here is a summary of all changes",
"implemented",
"i implemented",
+ "i've implemented",
"added",
"i added",
+ "i've added",
"updated",
"i updated",
+ "i've updated",
"modified",
"i modified",
+ "i've modified",
"created",
"i created",
+ "i've created",
"fixed",
"i fixed",
+ "i've fixed",
"completed the changes",
"finished the changes",
+ "made the following changes",
+ "applied the fix",
+ "changes have been applied",
+ "wrote the code",
+ "refactored",
+ "i refactored",
)
@@ -134,6 +147,29 @@ def _classify_agent_failure(detail: str) -> tuple[str, str]:
)
+_WRITE_FAILURE_MARKERS = (
+ "permission denied",
+ "read-only file system",
+ "read only file system",
+ "operation not permitted",
+ "cannot write",
+ "failed to write",
+ "could not write",
+ "unable to write",
+ "sandbox",
+ "eacces",
+ "erofs",
+)
+
+
+def _has_write_failure_indicators(stderr: str) -> bool:
+ """Detect stderr patterns indicating the agent could not write files."""
+ if not stderr.strip():
+ return False
+ normalized = stderr.lower()
+ return any(marker in normalized for marker in _WRITE_FAILURE_MARKERS)
+
+
def _claims_file_changes(output: str) -> bool:
"""Heuristic for agent text that claims code changes were made."""
normalized = output.lower()
@@ -406,7 +442,8 @@ def invoke_agent_agentic(
# (avoids OS arg length limits for large prompts)
cmd.append(
f"Read the task file at {task_file} and execute all instructions in it. "
- f"Work in the current directory."
+ f"Work only inside the current directory and do not modify files "
+ f"outside it."
)
cmd_preview = " ".join(cmd[:6])
@@ -467,7 +504,14 @@ def invoke_agent_agentic(
if not diff_output:
stdout_excerpt = (result.stdout or "").strip()
stderr_excerpt = (result.stderr or "").strip()
- if _claims_file_changes(stdout_excerpt):
+
+ # Detect two failure modes:
+ # 1. Agent claims changes in stdout but produced no diff
+ # 2. Agent stderr contains permission or write-failure indicators
+ claims_changes = _claims_file_changes(stdout_excerpt)
+ has_write_failure = _has_write_failure_indicators(stderr_excerpt)
+
+ if claims_changes or has_write_failure:
if spinner:
spinner.stop(f"[{step_name}] FAILED (empty diff)")
raw_error = stdout_excerpt or "(stdout empty)"
@@ -475,16 +519,27 @@ def invoke_agent_agentic(
raw_error = f"{raw_error}\n\n[stderr]\n{stderr_excerpt}"
if len(raw_error) > 2000:
raw_error = raw_error[:2000] + "..."
+
+ if has_write_failure:
+ failure_type = "WRITE_FAILURE"
+ suggested_action = (
+ "Agent encountered file write errors (permission denied, read-only, "
+ "or sandbox restriction). Check agent permissions and worktree state."
+ )
+ else:
+ failure_type = "EMPTY_DIFF"
+ suggested_action = (
+ "Agent reported code changes but produced no git diff. "
+ "Treat this run as failed and require a real worktree diff before continuing."
+ )
+
raise AgentInvocationError(
agent_name=agent.name,
step_name=step_name,
cmd_preview=cmd_preview,
raw_error=raw_error,
- failure_type="EMPTY_DIFF",
- suggested_action=(
- "Agent reported code changes but produced no git diff. "
- "Treat this run as failed and require a real worktree diff before continuing."
- ),
+ failure_type=failure_type,
+ suggested_action=suggested_action,
)
diff_output = "(no changes)"
diff --git a/cross_eval/pipeline.py b/cross_eval/pipeline.py
index 7047318..f42e681 100644
--- a/cross_eval/pipeline.py
+++ b/cross_eval/pipeline.py
@@ -6,6 +6,7 @@ import os
import re
import subprocess
import time
+from hashlib import sha256
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
@@ -92,15 +93,110 @@ def _setup_worktree(cwd: Path, run_dir: Path, preset_name: str) -> tuple[Path, s
Returns (worktree_path, branch_name).
"""
- from cross_eval.worktree import create_worktree, make_branch_name
+ from cross_eval.worktree import create_worktree, make_branch_name, make_worktree_dir
branch_name = make_branch_name(preset_name)
- worktree_dir = run_dir / "work"
+ worktree_dir = make_worktree_dir(cwd, branch_name)
worktree_path = create_worktree(
base_cwd=cwd, work_dir=worktree_dir, branch_name=branch_name,
)
+ (run_dir / "worktree_path.txt").write_text(f"{worktree_path}\n", encoding="utf-8")
+ (run_dir / "worktree_branch.txt").write_text(f"{branch_name}\n", encoding="utf-8")
return worktree_path, branch_name
+def _snapshot_repo_state(cwd: Path) -> str:
+ """Capture the base repository working-tree state.
+
+ This is used to detect agentic runs that accidentally modify the original
+ checkout instead of the isolated worktree.
+ """
+ status = subprocess.run(
+ ["git", "status", "--short", "--untracked-files=all"],
+ cwd=cwd,
+ capture_output=True,
+ text=True,
+ )
+ if status.returncode != 0:
+ return ""
+
+ diff = subprocess.run(
+ ["git", "diff", "--no-ext-diff", "--binary", "HEAD"],
+ cwd=cwd,
+ capture_output=True,
+ text=True,
+ )
+ cached_diff = subprocess.run(
+ ["git", "diff", "--no-ext-diff", "--binary", "--cached"],
+ cwd=cwd,
+ capture_output=True,
+ text=True,
+ )
+ untracked = subprocess.run(
+ ["git", "ls-files", "--others", "--exclude-standard", "-z"],
+ cwd=cwd,
+ capture_output=True,
+ )
+
+ parts = [
+ status.stdout,
+ diff.stdout,
+ cached_diff.stdout,
+ ]
+
+ if untracked.returncode == 0 and untracked.stdout:
+ for rel_path in untracked.stdout.decode("utf-8", errors="replace").split("\0"):
+ if not rel_path:
+ continue
+ file_path = cwd / rel_path
+ if file_path.is_file():
+ digest = sha256(file_path.read_bytes()).hexdigest()
+ parts.append(f"UNTRACKED {rel_path} {digest}")
+ else:
+ parts.append(f"UNTRACKED {rel_path} (non-file)")
+
+ return "\n".join(parts)
+
+
+def _snapshot_repo_status(cwd: Path) -> str:
+ """Capture a human-readable status summary for error reporting."""
+ result = subprocess.run(
+ ["git", "status", "--short", "--untracked-files=all"],
+ cwd=cwd,
+ capture_output=True,
+ text=True,
+ )
+ if result.returncode != 0:
+ return ""
+ return result.stdout.strip()
+
+
+def _assert_base_repo_isolation(
+ cwd: Path,
+ baseline_state: str,
+ *,
+ step_name: str,
+ agent_name: str,
+ worktree_path: Path,
+ baseline_status: str,
+) -> None:
+ """Fail fast if an agentic run leaked changes into the base repo."""
+ current_state = _snapshot_repo_state(cwd)
+ if current_state == baseline_state:
+ return
+
+ current_status = _snapshot_repo_status(cwd)
+ before = baseline_status or "(clean)"
+ after = current_status or "(clean)"
+ raise WorktreeError(
+ "Agent modified the base repository instead of the isolated worktree.\n\n"
+ f"Step: {step_name}\n"
+ f"Agent: {agent_name}\n"
+ f"Worktree: {worktree_path}\n\n"
+ f"Baseline status:\n{before}\n\n"
+ f"Current status:\n{after}"
+ )
+
+
def _finalize_worktree(
cwd: Path,
worktree_path: Path,
@@ -172,10 +268,14 @@ def _run_simple_pipeline(
# Setup shared worktree for agentic mode
worktree_path: Path | None = None
agentic_branch_name: str | None = None
+ base_repo_state: str | None = None
+ base_repo_status: str | None = None
if not dry_run and _has_agentic_steps(config, config.pipeline):
worktree_path, agentic_branch_name = _setup_worktree(
cwd, run_dir, config.preset_name,
)
+ base_repo_state = _snapshot_repo_state(cwd)
+ base_repo_status = _snapshot_repo_status(cwd)
feedback = "(no feedback — first iteration)"
iterations: list[IterationResult] = []
@@ -203,6 +303,8 @@ def _run_simple_pipeline(
run_dir=run_dir, output_iter=i,
worktree_path=worktree_path,
runtime_env=runtime_env,
+ base_repo_state=base_repo_state,
+ base_repo_status=base_repo_status,
)
# Intermediate commit so next iteration's diff only shows new changes
@@ -332,10 +434,14 @@ def _run_phased_pipeline(
all_phase_steps = [s for p in config.phases for s in p.steps]
worktree_path: Path | None = None
agentic_branch_name: str | None = None
+ base_repo_state: str | None = None
+ base_repo_status: str | None = None
if not dry_run and _has_agentic_steps(config, all_phase_steps):
worktree_path, agentic_branch_name = _setup_worktree(
cwd, run_dir, config.preset_name,
)
+ base_repo_state = _snapshot_repo_state(cwd)
+ base_repo_status = _snapshot_repo_status(cwd)
iterations: list[IterationResult] = []
feedback = "(no feedback — first iteration)"
@@ -384,6 +490,8 @@ def _run_phased_pipeline(
run_dir=run_dir, output_iter=global_iter, phase_name=phase.name,
worktree_path=worktree_path,
runtime_env=runtime_env,
+ base_repo_state=base_repo_state,
+ base_repo_status=base_repo_status,
)
# Intermediate commit so next iteration's diff only shows new changes
@@ -626,6 +734,8 @@ def _run_steps(
phase_name: str | None = None,
worktree_path: Path | None = None,
runtime_env: dict[str, str] | None = None,
+ base_repo_state: str | None = None,
+ base_repo_status: str | None = None,
) -> tuple[dict[str, str], dict[str, AgentResult], str | None]:
"""Execute all steps in one iteration, parallelizing where possible."""
step_outputs: dict[str, str] = {}
@@ -644,6 +754,8 @@ def _run_steps(
run_dir=run_dir, output_iter=output_iter,
phase_name=phase_name, worktree_path=worktree_path,
runtime_env=runtime_env,
+ base_repo_state=base_repo_state,
+ base_repo_status=base_repo_status,
)
else:
_execute_parallel_batch(
@@ -653,6 +765,8 @@ def _run_steps(
run_dir=run_dir, output_iter=output_iter,
phase_name=phase_name, worktree_path=worktree_path,
runtime_env=runtime_env,
+ base_repo_state=base_repo_state,
+ base_repo_status=base_repo_status,
)
# Extract verdict from all verdict steps (ALL must PASS; ESCALATE wins over all)
@@ -709,6 +823,8 @@ def _execute_step(
quiet: bool = False,
worktree_path: Path | None = None,
runtime_env: dict[str, str] | None = None,
+ base_repo_state: str | None = None,
+ base_repo_status: str | None = None,
) -> None:
"""Execute a single step, updating step_outputs and step_results in place."""
if not quiet:
@@ -717,9 +833,10 @@ def _execute_step(
# 1. Resolve template
template = resolve_template(step.prompt_template)
- # 2. Build context
+ # 2. Build context (include prior step results for evidence)
context = _build_context(
input_contents, step_outputs, feedback, iteration, max_iterations,
+ step_results=step_results,
)
# 3. Apply context overrides
@@ -794,6 +911,16 @@ def _execute_step(
raise
# 7. Store output
+ if worktree_path is not None and base_repo_state is not None:
+ _assert_base_repo_isolation(
+ cwd,
+ base_repo_state,
+ step_name=step.name,
+ agent_name=step.agent,
+ worktree_path=worktree_path,
+ baseline_status=base_repo_status or "",
+ )
+
step_outputs[step.output_key] = result.output
step_results[step.output_key] = result
@@ -826,6 +953,8 @@ def _execute_parallel_batch(
phase_name: str | None = None,
worktree_path: Path | None = None,
runtime_env: dict[str, str] | None = None,
+ base_repo_state: str | None = None,
+ base_repo_status: str | None = None,
) -> None:
"""Execute multiple steps in parallel using threads."""
agent_names = ", ".join(s.agent for s in batch)
@@ -838,6 +967,8 @@ def _execute_parallel_batch(
iteration, max_iterations, cwd, timeout, dry_run,
step_outputs, step_results,
run_dir=run_dir, output_iter=output_iter, phase_name=phase_name,
+ base_repo_state=base_repo_state,
+ base_repo_status=base_repo_status,
)
return
@@ -858,12 +989,15 @@ def _execute_parallel_batch(
step_outputs, step_results,
run_dir=run_dir, output_iter=output_iter,
phase_name=phase_name, worktree_path=worktree_path,
+ base_repo_state=base_repo_state,
+ base_repo_status=base_repo_status,
)
return
# Snapshot context before parallel execution (all steps see same state)
context_snapshot = dict(input_contents)
context_snapshot.update(step_outputs)
+ results_snapshot = dict(step_results)
# Collect results from parallel threads
local_outputs: dict[str, str] = {}
@@ -883,6 +1017,7 @@ def _execute_parallel_batch(
template = resolve_template(step.prompt_template)
context = _build_context(
context_snapshot, {}, feedback, iteration, max_iterations,
+ step_results=results_snapshot,
)
if step.context_override:
context = _apply_context_override(context, step.context_override)
@@ -919,6 +1054,16 @@ def _execute_parallel_batch(
batch_elapsed = round(time.monotonic() - batch_start, 1)
# Persist successful outputs even if a sibling step failed.
+ if worktree_path is not None and base_repo_state is not None:
+ _assert_base_repo_isolation(
+ cwd,
+ base_repo_state,
+ step_name=phase_name or "parallel-batch",
+ agent_name=agent_names,
+ worktree_path=worktree_path,
+ baseline_status=base_repo_status or "",
+ )
+
for step in batch:
key = step.output_key
if key not in local_outputs:
@@ -986,6 +1131,7 @@ def _build_context(
feedback: str,
iteration: int,
max_iterations: int,
+ step_results: dict[str, AgentResult] | None = None,
) -> dict[str, str]:
"""Build the template context dict."""
context: dict[str, str] = {}
@@ -994,9 +1140,42 @@ def _build_context(
context["feedback"] = feedback
context["iteration"] = str(iteration)
context["max_iterations"] = str(max_iterations)
+ # Surface execution evidence from prior steps so reviewers can inspect it
+ if step_results:
+ context["execution_evidence"] = _format_execution_evidence(step_results)
return context
+def _format_execution_evidence(
+ step_results: dict[str, AgentResult],
+) -> str:
+ """Format execution evidence from prior steps for reviewer consumption.
+
+ Produces a compact summary of command, exit code, duration, and a truncated
+ transcript excerpt for each completed step so that reviewers and seniors
+ can verify claims against real execution data.
+ """
+ if not step_results:
+ return "(no prior execution evidence)"
+ parts: list[str] = []
+ for key, result in step_results.items():
+ section = [
+ f"### Step: {result.step_name} ({result.agent_name})",
+ f"- Command: `{result.command_preview}`" if result.command_preview else "",
+ f"- Exit code: {result.exit_code}",
+ f"- Duration: {result.duration_seconds}s",
+ ]
+ section = [line for line in section if line]
+ if result.transcript:
+ # Include a truncated transcript excerpt for debugging
+ excerpt = result.transcript[:2000]
+ if len(result.transcript) > 2000:
+ excerpt += "\n... (truncated)"
+ section.append(f"\n\nTranscript excerpt
\n\n{excerpt}\n ")
+ parts.append("\n".join(section))
+ return "\n\n---\n\n".join(parts)
+
+
def _build_runtime_inputs(
config: PipelineConfig,
input_contents: dict[str, str],
diff --git a/cross_eval/prompts.py b/cross_eval/prompts.py
index 48f1183..e6daa74 100644
--- a/cross_eval/prompts.py
+++ b/cross_eval/prompts.py
@@ -59,9 +59,14 @@ You are tasked with reviewing code against a plan and checklist.
## Previous Review Feedback
{feedback}
+## Execution Evidence
+{execution_evidence}
+
## Review Instructions
Explore the project directory to understand the full codebase context, \
-then evaluate the code against ONLY the plan and checklist above.
+then evaluate the code against ONLY the plan and checklist above. \
+Use the execution evidence above to verify agent claims against actual \
+command outputs and exit codes.
For each issue found, classify it with BOTH severity AND category:
@@ -164,9 +169,13 @@ REVIEW_TEMPLATE_KO = """\
## 이전 리뷰 피드백
{feedback}
+## 실행 증거
+{execution_evidence}
+
## 검토 지침
프로젝트 디렉토리를 직접 탐색하여 전체 코드베이스 맥락을 파악한 뒤, \
-위 기획서와 체크리스트 기준으로만 코드를 평가하세요.
+위 기획서와 체크리스트 기준으로만 코드를 평가하세요. \
+위 실행 증거를 활용하여 에이전트의 주장을 실제 명령어 출력과 종료 코드로 검증하세요.
발견된 각 이슈에 심각도와 카테고리를 모두 부여하세요:
@@ -525,8 +534,13 @@ You are adjudicating multiple review results and turning them into an actionable
## Previous Issue Tracker
{previous_senior_tracker}
+## Execution Evidence
+{execution_evidence}
+
## Instructions
-Explore the project directory to confirm the current codebase state. Then:
+Explore the project directory to confirm the current codebase state. \
+Use the execution evidence above to verify claims against actual command \
+outputs and exit codes. Then:
1. Deduplicate overlapping issues across reviewers.
2. Resolve disagreements explicitly.
3. Keep only issues supported by the plan, checklist, code, or reviewer evidence.
@@ -592,8 +606,13 @@ AGGREGATE_REVIEW_TEMPLATE_KO = """\
## 이전 이슈 트래커
{previous_senior_tracker}
+## 실행 증거
+{execution_evidence}
+
## 지침
-프로젝트 디렉토리를 탐색하여 현재 코드베이스 상태를 확인한 뒤 다음을 수행하세요.
+프로젝트 디렉토리를 탐색하여 현재 코드베이스 상태를 확인한 뒤, \
+위 실행 증거를 활용하여 에이전트의 주장을 실제 명령어 출력과 종료 코드로 검증하세요. \
+그런 다음 아래를 수행하세요.
1. 리뷰어들 사이에 중복되는 이슈를 합치세요.
2. 의견 충돌은 명시적으로 정리하세요.
3. 기획서, 체크리스트, 코드, 리뷰 근거로 뒷받침되는 이슈만 남기세요.
diff --git a/cross_eval/report.py b/cross_eval/report.py
index eda32ea..a7cbc6d 100644
--- a/cross_eval/report.py
+++ b/cross_eval/report.py
@@ -386,6 +386,11 @@ def _append_iteration_steps(
lines.append(f"### {_t(config, 'step')}: {step.name} ({agent_name}){duration}\n")
+ # Show command preview and exit code for execution evidence
+ if agent_result and agent_result.command_preview:
+ lines.append(f"**Command**: `{agent_result.command_preview}`")
+ lines.append(f"**Exit code**: {agent_result.exit_code}\n")
+
if step.verdict and iter_result.verdict:
lines.append(f"**{_t(config, 'verdict')}: {iter_result.verdict}**\n")
@@ -400,6 +405,16 @@ def _append_iteration_steps(
lines.append(output)
lines.append("")
+ # Include transcript excerpt for execution evidence visibility
+ if agent_result and agent_result.transcript:
+ transcript_preview = agent_result.transcript[:1500]
+ if len(agent_result.transcript) > 1500:
+ transcript_preview += "\n... (truncated)"
+ lines.append("")
+ lines.append("Execution transcript
\n")
+ lines.append(transcript_preview)
+ lines.append("\n \n")
+
if not skip_extraction and step.role == "review":
oos = _extract_out_of_scope(output)
if oos:
diff --git a/cross_eval/worktree.py b/cross_eval/worktree.py
index dda710f..7fd0932 100644
--- a/cross_eval/worktree.py
+++ b/cross_eval/worktree.py
@@ -4,6 +4,7 @@ from __future__ import annotations
import logging
import shutil
import subprocess
+import tempfile
from datetime import datetime
from pathlib import Path
@@ -20,6 +21,22 @@ def make_branch_name(preset_name: str) -> str:
return f"cross-eval/{preset_name}_{ts}"
+def make_worktree_dir(base_cwd: Path, branch_name: str) -> Path:
+ """Choose a worktree directory outside the base repo.
+
+ Keeping agentic worktrees outside the source checkout avoids tools that
+ incorrectly walk up to the outer repo and write into the base worktree.
+ """
+ repo_name = base_cwd.resolve().name or "repo"
+ branch_slug = branch_name.replace("/", "__")
+ return (
+ Path(tempfile.gettempdir())
+ / "cross-eval-worktrees"
+ / repo_name
+ / branch_slug
+ )
+
+
def create_worktree(base_cwd: Path, work_dir: Path, branch_name: str) -> Path:
"""Create a git worktree on a new branch from HEAD.
diff --git a/tests/test_agentic.py b/tests/test_agentic.py
index dc3768f..178bf63 100644
--- a/tests/test_agentic.py
+++ b/tests/test_agentic.py
@@ -23,6 +23,7 @@ from cross_eval.models import (
StepConfig,
)
from cross_eval.pipeline import (
+ _assert_base_repo_isolation,
_commit_iteration,
_finalize_worktree,
_has_agentic_steps,
@@ -34,6 +35,7 @@ from cross_eval.worktree import (
commit_worktree,
create_worktree,
make_branch_name,
+ make_worktree_dir,
remove_worktree,
)
@@ -191,6 +193,41 @@ class TestMakeBranchName(unittest.TestCase):
self.assertEqual(len(ts_part), 15) # YYYYMMDD_HHMMSS
+class TestMakeWorktreeDir(unittest.TestCase):
+ """make_worktree_dir chooses an external temp location."""
+
+ def test_uses_tmp_dir_outside_repo(self) -> None:
+ with tempfile.TemporaryDirectory() as td:
+ base = Path(td) / "repo"
+ base.mkdir()
+ path = make_worktree_dir(base, "cross-eval/review-fix_20260313_123456")
+ self.assertIn("cross-eval-worktrees", str(path))
+ self.assertNotIn(str(base), str(path))
+
+
+class TestBaseRepoIsolation(unittest.TestCase):
+ """Base repo mutations should fail fast during agentic execution."""
+
+ def test_raises_when_base_repo_status_changes(self) -> None:
+ with tempfile.TemporaryDirectory() as td:
+ base = Path(td) / "repo"
+ worktree = Path(td) / "worktree"
+ base.mkdir()
+ worktree.mkdir()
+
+ with self.assertRaises(RuntimeError) as ctx:
+ _assert_base_repo_isolation(
+ base,
+ "M cross_eval/agent.py",
+ step_name="coding",
+ agent_name="claude-coder",
+ worktree_path=worktree,
+ baseline_status="M cross_eval/agent.py",
+ )
+
+ self.assertIn("base repository", str(ctx.exception))
+
+
# ===================================================================
# 2. agent.py agentic tests (mocking subprocess)
# ===================================================================
@@ -513,6 +550,33 @@ class TestSetupWorktreeCalledForAgentic(unittest.TestCase):
mock_setup.assert_called_once()
+class TestSetupWorktreeLocation(unittest.TestCase):
+ """_setup_worktree places agentic worktrees outside the base repo."""
+
+ def test_worktree_is_created_outside_repo(self) -> None:
+ with tempfile.TemporaryDirectory() as td:
+ base = Path(td) / "repo"
+ run_dir = base / ".cross-eval" / "output" / "smoke"
+ base.mkdir()
+ run_dir.mkdir(parents=True)
+ _init_git_repo(base)
+
+ worktree_path, branch_name = _setup_worktree(base, run_dir, "review-fix")
+ try:
+ self.assertTrue(worktree_path.exists())
+ self.assertNotIn(str(base.resolve()), str(worktree_path.resolve()))
+ self.assertEqual(
+ (run_dir / "worktree_path.txt").read_text(encoding="utf-8").strip(),
+ str(worktree_path),
+ )
+ self.assertEqual(
+ (run_dir / "worktree_branch.txt").read_text(encoding="utf-8").strip(),
+ branch_name,
+ )
+ finally:
+ remove_worktree(base, worktree_path)
+
+
class TestReviewerRunsInWorktreeCwd(unittest.TestCase):
"""Reviewer runs with worktree cwd (not original cwd) when worktree exists."""
diff --git a/tests/test_evidence.py b/tests/test_evidence.py
new file mode 100644
index 0000000..fc66682
--- /dev/null
+++ b/tests/test_evidence.py
@@ -0,0 +1,395 @@
+"""Regression tests for runtime evidence propagation and report visibility.
+
+Covers:
+ 1. Execution evidence is surfaced in reviewer/senior prompt context.
+ 2. Reports include command preview and transcript excerpts.
+ 3. Claude agentic failure detection (empty diff, write failure, expanded markers).
+ 4. _format_execution_evidence produces expected output.
+"""
+from __future__ import annotations
+
+import tempfile
+import unittest
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+from cross_eval.agent import (
+ AgentInvocationError,
+ _claims_file_changes,
+ _has_write_failure_indicators,
+ invoke_agent_agentic,
+)
+from cross_eval.config import BUILTIN_AGENTS
+from cross_eval.models import (
+ AgentConfig,
+ AgentResult,
+ IterationResult,
+ PipelineConfig,
+ PipelineResult,
+ ReviewMetrics,
+ StepConfig,
+)
+from cross_eval.pipeline import _format_execution_evidence, run_pipeline
+from cross_eval.report import build_report
+
+
+# ---------------------------------------------------------------------------
+# 1. Execution evidence formatting
+# ---------------------------------------------------------------------------
+
+class TestFormatExecutionEvidence(unittest.TestCase):
+ """_format_execution_evidence produces a compact summary for reviewers."""
+
+ def test_empty_results_returns_placeholder(self) -> None:
+ self.assertIn("no prior execution evidence", _format_execution_evidence({}))
+
+ def test_single_result_includes_key_fields(self) -> None:
+ result = AgentResult(
+ output="some diff",
+ exit_code=0,
+ agent_name="claude-coder",
+ step_name="coding",
+ duration_seconds=12.3,
+ transcript="# Agent Execution Transcript\n\n## Command\nclaude ...",
+ command_preview="claude --setting-sources user",
+ )
+ evidence = _format_execution_evidence({"coding_output": result})
+ self.assertIn("claude-coder", evidence)
+ self.assertIn("coding", evidence)
+ self.assertIn("Exit code: 0", evidence)
+ self.assertIn("12.3s", evidence)
+ self.assertIn("claude --setting-sources user", evidence)
+ self.assertIn("Transcript excerpt", evidence)
+
+ def test_multiple_results_separated(self) -> None:
+ r1 = AgentResult(
+ output="diff1", exit_code=0, agent_name="coder",
+ step_name="coding", duration_seconds=1.0,
+ command_preview="cmd1",
+ )
+ r2 = AgentResult(
+ output="review text", exit_code=0, agent_name="reviewer",
+ step_name="review", duration_seconds=2.0,
+ command_preview="cmd2",
+ )
+ evidence = _format_execution_evidence({
+ "coding_output": r1,
+ "review_result": r2,
+ })
+ self.assertIn("coder", evidence)
+ self.assertIn("reviewer", evidence)
+ self.assertIn("---", evidence)
+
+ def test_transcript_truncated_at_2000_chars(self) -> None:
+ long_transcript = "x" * 3000
+ result = AgentResult(
+ output="out", exit_code=0, agent_name="agent",
+ step_name="step", duration_seconds=1.0,
+ transcript=long_transcript,
+ )
+ evidence = _format_execution_evidence({"key": result})
+ self.assertIn("truncated", evidence)
+ # The full 3000-char transcript should NOT appear
+ self.assertNotIn("x" * 3000, evidence)
+
+
+# ---------------------------------------------------------------------------
+# 2. Evidence in reviewer prompts (integration)
+# ---------------------------------------------------------------------------
+
+class TestEvidenceInReviewerPrompt(unittest.TestCase):
+ """Reviewer prompts include execution evidence from prior coding step."""
+
+ def test_reviewer_receives_evidence(self) -> None:
+ with tempfile.TemporaryDirectory() as tmpdir:
+ steps = [
+ StepConfig(
+ name="coding", agent="claude-coder", role="coding",
+ prompt_template="default:coding", output_key="coding_output",
+ ),
+ StepConfig(
+ name="review", agent="claude-reviewer", role="review",
+ prompt_template="default:review", output_key="review_result",
+ verdict=True,
+ ),
+ ]
+ config = PipelineConfig(
+ output_dir=Path(tmpdir),
+ max_iterations=1,
+ min_iterations=1,
+ language="en",
+ inputs={"plan": "Test plan", "checklist": "Test checklist"},
+ agents=dict(BUILTIN_AGENTS),
+ coders=["claude-coder"],
+ reviewers=["claude-reviewer"],
+ pipeline=steps,
+ preset_name="simple",
+ )
+
+ captured_prompts: list[dict] = []
+
+ def _mock(agent_config, prompt, step_name, **kwargs):
+ captured_prompts.append({
+ "step_name": step_name,
+ "prompt": prompt,
+ })
+ if step_name == "coding":
+ return AgentResult(
+ output="Implemented feature X",
+ exit_code=0,
+ agent_name=agent_config.name,
+ step_name=step_name,
+ duration_seconds=5.0,
+ transcript="# Transcript\nclaude ran...",
+ command_preview="claude --setting-sources user",
+ )
+ return AgentResult(
+ output="VERDICT: PASS",
+ exit_code=0,
+ agent_name=agent_config.name,
+ step_name=step_name,
+ duration_seconds=2.0,
+ )
+
+ with patch("cross_eval.pipeline.invoke_agent", side_effect=_mock):
+ result = run_pipeline(config)
+
+ self.assertEqual(result.final_verdict, "PASS")
+
+ # The reviewer prompt should contain execution evidence
+ review_prompts = [
+ p for p in captured_prompts if p["step_name"] == "review"
+ ]
+ self.assertTrue(len(review_prompts) >= 1)
+ review_prompt = review_prompts[0]["prompt"]
+ # Evidence section should reference the coding step's command
+ self.assertIn("Execution Evidence", review_prompt)
+ self.assertIn("claude-coder", review_prompt)
+
+
+# ---------------------------------------------------------------------------
+# 3. Report includes evidence
+# ---------------------------------------------------------------------------
+
+class TestReportIncludesEvidence(unittest.TestCase):
+ """Report generation includes command preview and transcript excerpts."""
+
+ def _make_pipeline_result(self) -> tuple[PipelineConfig, PipelineResult]:
+ steps = [
+ StepConfig(
+ name="coding", agent="claude-coder", role="coding",
+ prompt_template="default:coding", output_key="coding_output",
+ ),
+ StepConfig(
+ name="review", agent="claude-reviewer", role="review",
+ prompt_template="default:review", output_key="review_result",
+ verdict=True,
+ ),
+ ]
+ config = PipelineConfig(
+ max_iterations=1,
+ language="en",
+ inputs={"plan": "Plan", "checklist": "CL"},
+ agents=dict(BUILTIN_AGENTS),
+ pipeline=steps,
+ preset_name="simple",
+ )
+
+ coding_result = AgentResult(
+ output="diff --git a/file ...",
+ exit_code=0,
+ agent_name="claude-coder",
+ step_name="coding",
+ duration_seconds=10.0,
+ transcript="# Agent Execution Transcript\n## Command\nclaude ...\n## Stdout\nok",
+ command_preview="claude --setting-sources user",
+ )
+ review_result = AgentResult(
+ output="All good.\n\nVERDICT: PASS",
+ exit_code=0,
+ agent_name="claude-reviewer",
+ step_name="review",
+ duration_seconds=5.0,
+ transcript="# Agent Execution Transcript\n## Command\nclaude -p ...\n## Stdout\nAll good.",
+ command_preview="claude -p --setting-sources user",
+ )
+
+ iteration = IterationResult(
+ iteration=1,
+ step_results={
+ "coding_output": coding_result,
+ "review_result": review_result,
+ },
+ step_outputs={
+ "coding_output": "diff --git a/file ...",
+ "review_result": "All good.\n\nVERDICT: PASS",
+ },
+ verdict="PASS",
+ )
+
+ pipeline_result = PipelineResult(
+ iterations=[iteration],
+ final_verdict="PASS",
+ total_duration=15.0,
+ )
+
+ return config, pipeline_result
+
+ def test_report_contains_command_preview(self) -> None:
+ config, result = self._make_pipeline_result()
+ report = build_report(config, result)
+ self.assertIn("claude --setting-sources user", report)
+ self.assertIn("**Command**", report)
+
+ def test_report_contains_transcript_excerpt(self) -> None:
+ config, result = self._make_pipeline_result()
+ report = build_report(config, result)
+ self.assertIn("Execution transcript", report)
+ self.assertIn("Agent Execution Transcript", report)
+
+ def test_report_contains_exit_code(self) -> None:
+ config, result = self._make_pipeline_result()
+ report = build_report(config, result)
+ self.assertIn("**Exit code**: 0", report)
+
+
+# ---------------------------------------------------------------------------
+# 4. Claude agentic hardened failure detection
+# ---------------------------------------------------------------------------
+
+class TestClaimsFileChangesExpanded(unittest.TestCase):
+ """Expanded change-claim markers detect more Claude output patterns."""
+
+ def test_ive_implemented(self) -> None:
+ self.assertTrue(_claims_file_changes("I've implemented the feature"))
+
+ def test_ive_updated(self) -> None:
+ self.assertTrue(_claims_file_changes("I've updated the config"))
+
+ def test_made_the_following_changes(self) -> None:
+ self.assertTrue(_claims_file_changes("I made the following changes to the file"))
+
+ def test_applied_the_fix(self) -> None:
+ self.assertTrue(_claims_file_changes("Applied the fix for the bug"))
+
+ def test_changes_have_been_applied(self) -> None:
+ self.assertTrue(_claims_file_changes("Changes have been applied successfully"))
+
+ def test_wrote_the_code(self) -> None:
+ self.assertTrue(_claims_file_changes("Wrote the code for the new module"))
+
+ def test_refactored(self) -> None:
+ self.assertTrue(_claims_file_changes("I refactored the pipeline"))
+
+ def test_no_changes_still_returns_false(self) -> None:
+ self.assertFalse(_claims_file_changes("No changes were necessary"))
+
+ def test_empty_string_returns_false(self) -> None:
+ self.assertFalse(_claims_file_changes(""))
+
+
+class TestWriteFailureIndicators(unittest.TestCase):
+ """_has_write_failure_indicators detects stderr patterns."""
+
+ def test_permission_denied(self) -> None:
+ self.assertTrue(_has_write_failure_indicators("Error: Permission denied"))
+
+ def test_read_only_filesystem(self) -> None:
+ self.assertTrue(_has_write_failure_indicators("read-only file system"))
+
+ def test_sandbox_restriction(self) -> None:
+ self.assertTrue(_has_write_failure_indicators("Blocked by sandbox policy"))
+
+ def test_eacces(self) -> None:
+ self.assertTrue(_has_write_failure_indicators("EACCES: operation not permitted"))
+
+ def test_empty_stderr_returns_false(self) -> None:
+ self.assertFalse(_has_write_failure_indicators(""))
+
+ def test_normal_stderr_returns_false(self) -> None:
+ self.assertFalse(_has_write_failure_indicators("Downloading model..."))
+
+
+class TestAgenticWriteFailureRaisesError(unittest.TestCase):
+ """Agentic mode raises AgentInvocationError on stderr write-failure indicators."""
+
+ @patch("cross_eval.worktree.capture_diff", return_value="")
+ @patch("subprocess.run")
+ def test_write_failure_detected_from_stderr(
+ self, mock_run: MagicMock, mock_diff: MagicMock,
+ ) -> None:
+ mock_run.return_value = MagicMock(
+ returncode=0,
+ stdout="Done.",
+ stderr="Error: Permission denied writing to /src/main.py",
+ )
+
+ agent = AgentConfig(
+ name="claude-coder", command="claude",
+ args=["--setting-sources", "user"], agentic=True,
+ )
+
+ import subprocess as _sp
+ import tempfile as _tf
+
+ with _tf.TemporaryDirectory() as td:
+ wt = Path(td)
+ _sp.run(["git", "init"], cwd=wt, capture_output=True, check=True)
+ _sp.run(["git", "config", "user.email", "t@t.com"], cwd=wt, capture_output=True)
+ _sp.run(["git", "config", "user.name", "T"], cwd=wt, capture_output=True)
+ (wt / "README.md").write_text("# init\n")
+ _sp.run(["git", "add", "."], cwd=wt, capture_output=True, check=True)
+ _sp.run(["git", "commit", "-m", "init"], cwd=wt, capture_output=True, check=True)
+
+ with self.assertRaises(AgentInvocationError) as ctx:
+ invoke_agent_agentic(
+ agent, "implement feature", "coding",
+ worktree_path=wt, quiet=True,
+ )
+
+ self.assertEqual(ctx.exception.failure_type, "WRITE_FAILURE")
+ self.assertIn("Permission denied", ctx.exception.raw_error)
+
+
+class TestAgenticExpandedClaimMarkers(unittest.TestCase):
+ """Agentic mode detects expanded claim markers in empty diff scenarios."""
+
+ @patch("cross_eval.worktree.capture_diff", return_value="")
+ @patch("subprocess.run")
+ def test_ive_implemented_triggers_empty_diff_error(
+ self, mock_run: MagicMock, mock_diff: MagicMock,
+ ) -> None:
+ mock_run.return_value = MagicMock(
+ returncode=0,
+ stdout="I've implemented the requested changes to the pipeline.",
+ stderr="",
+ )
+
+ agent = AgentConfig(
+ name="claude-coder", command="claude",
+ args=["--setting-sources", "user"], agentic=True,
+ )
+
+ import subprocess as _sp
+ import tempfile as _tf
+
+ with _tf.TemporaryDirectory() as td:
+ wt = Path(td)
+ _sp.run(["git", "init"], cwd=wt, capture_output=True, check=True)
+ _sp.run(["git", "config", "user.email", "t@t.com"], cwd=wt, capture_output=True)
+ _sp.run(["git", "config", "user.name", "T"], cwd=wt, capture_output=True)
+ (wt / "README.md").write_text("# init\n")
+ _sp.run(["git", "add", "."], cwd=wt, capture_output=True, check=True)
+ _sp.run(["git", "commit", "-m", "init"], cwd=wt, capture_output=True, check=True)
+
+ with self.assertRaises(AgentInvocationError) as ctx:
+ invoke_agent_agentic(
+ agent, "implement feature", "coding",
+ worktree_path=wt, quiet=True,
+ )
+
+ self.assertEqual(ctx.exception.failure_type, "EMPTY_DIFF")
+
+
+if __name__ == "__main__":
+ unittest.main()