feat: isolate agentic worktrees and surface execution evidence

This commit is contained in:
chungyeong
2026-03-13 22:50:46 +09:00
parent 3fb19e90c0
commit b19d174c98
7 changed files with 758 additions and 14 deletions

View File

@@ -32,20 +32,33 @@ _NO_CHANGE_ACK_MARKERS = (
_CHANGE_CLAIM_MARKERS = ( _CHANGE_CLAIM_MARKERS = (
"summary of all changes made", "summary of all changes made",
"here's a summary of all changes made", "here's a summary of all changes made",
"here is a summary of all changes",
"implemented", "implemented",
"i implemented", "i implemented",
"i've implemented",
"added", "added",
"i added", "i added",
"i've added",
"updated", "updated",
"i updated", "i updated",
"i've updated",
"modified", "modified",
"i modified", "i modified",
"i've modified",
"created", "created",
"i created", "i created",
"i've created",
"fixed", "fixed",
"i fixed", "i fixed",
"i've fixed",
"completed the changes", "completed the changes",
"finished the changes", "finished the changes",
"made the following changes",
"applied the fix",
"changes have been applied",
"wrote the code",
"refactored",
"i refactored",
) )
@@ -134,6 +147,29 @@ def _classify_agent_failure(detail: str) -> tuple[str, str]:
) )
_WRITE_FAILURE_MARKERS = (
"permission denied",
"read-only file system",
"read only file system",
"operation not permitted",
"cannot write",
"failed to write",
"could not write",
"unable to write",
"sandbox",
"eacces",
"erofs",
)
def _has_write_failure_indicators(stderr: str) -> bool:
"""Detect stderr patterns indicating the agent could not write files."""
if not stderr.strip():
return False
normalized = stderr.lower()
return any(marker in normalized for marker in _WRITE_FAILURE_MARKERS)
def _claims_file_changes(output: str) -> bool: def _claims_file_changes(output: str) -> bool:
"""Heuristic for agent text that claims code changes were made.""" """Heuristic for agent text that claims code changes were made."""
normalized = output.lower() normalized = output.lower()
@@ -406,7 +442,8 @@ def invoke_agent_agentic(
# (avoids OS arg length limits for large prompts) # (avoids OS arg length limits for large prompts)
cmd.append( cmd.append(
f"Read the task file at {task_file} and execute all instructions in it. " f"Read the task file at {task_file} and execute all instructions in it. "
f"Work in the current directory." f"Work only inside the current directory and do not modify files "
f"outside it."
) )
cmd_preview = " ".join(cmd[:6]) cmd_preview = " ".join(cmd[:6])
@@ -467,7 +504,14 @@ def invoke_agent_agentic(
if not diff_output: if not diff_output:
stdout_excerpt = (result.stdout or "").strip() stdout_excerpt = (result.stdout or "").strip()
stderr_excerpt = (result.stderr or "").strip() stderr_excerpt = (result.stderr or "").strip()
if _claims_file_changes(stdout_excerpt):
# Detect two failure modes:
# 1. Agent claims changes in stdout but produced no diff
# 2. Agent stderr contains permission or write-failure indicators
claims_changes = _claims_file_changes(stdout_excerpt)
has_write_failure = _has_write_failure_indicators(stderr_excerpt)
if claims_changes or has_write_failure:
if spinner: if spinner:
spinner.stop(f"[{step_name}] FAILED (empty diff)") spinner.stop(f"[{step_name}] FAILED (empty diff)")
raw_error = stdout_excerpt or "(stdout empty)" raw_error = stdout_excerpt or "(stdout empty)"
@@ -475,16 +519,27 @@ def invoke_agent_agentic(
raw_error = f"{raw_error}\n\n[stderr]\n{stderr_excerpt}" raw_error = f"{raw_error}\n\n[stderr]\n{stderr_excerpt}"
if len(raw_error) > 2000: if len(raw_error) > 2000:
raw_error = raw_error[:2000] + "..." raw_error = raw_error[:2000] + "..."
if has_write_failure:
failure_type = "WRITE_FAILURE"
suggested_action = (
"Agent encountered file write errors (permission denied, read-only, "
"or sandbox restriction). Check agent permissions and worktree state."
)
else:
failure_type = "EMPTY_DIFF"
suggested_action = (
"Agent reported code changes but produced no git diff. "
"Treat this run as failed and require a real worktree diff before continuing."
)
raise AgentInvocationError( raise AgentInvocationError(
agent_name=agent.name, agent_name=agent.name,
step_name=step_name, step_name=step_name,
cmd_preview=cmd_preview, cmd_preview=cmd_preview,
raw_error=raw_error, raw_error=raw_error,
failure_type="EMPTY_DIFF", failure_type=failure_type,
suggested_action=( suggested_action=suggested_action,
"Agent reported code changes but produced no git diff. "
"Treat this run as failed and require a real worktree diff before continuing."
),
) )
diff_output = "(no changes)" diff_output = "(no changes)"

View File

@@ -6,6 +6,7 @@ import os
import re import re
import subprocess import subprocess
import time import time
from hashlib import sha256
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
@@ -92,15 +93,110 @@ def _setup_worktree(cwd: Path, run_dir: Path, preset_name: str) -> tuple[Path, s
Returns (worktree_path, branch_name). Returns (worktree_path, branch_name).
""" """
from cross_eval.worktree import create_worktree, make_branch_name from cross_eval.worktree import create_worktree, make_branch_name, make_worktree_dir
branch_name = make_branch_name(preset_name) branch_name = make_branch_name(preset_name)
worktree_dir = run_dir / "work" worktree_dir = make_worktree_dir(cwd, branch_name)
worktree_path = create_worktree( worktree_path = create_worktree(
base_cwd=cwd, work_dir=worktree_dir, branch_name=branch_name, base_cwd=cwd, work_dir=worktree_dir, branch_name=branch_name,
) )
(run_dir / "worktree_path.txt").write_text(f"{worktree_path}\n", encoding="utf-8")
(run_dir / "worktree_branch.txt").write_text(f"{branch_name}\n", encoding="utf-8")
return worktree_path, branch_name return worktree_path, branch_name
def _snapshot_repo_state(cwd: Path) -> str:
"""Capture the base repository working-tree state.
This is used to detect agentic runs that accidentally modify the original
checkout instead of the isolated worktree.
"""
status = subprocess.run(
["git", "status", "--short", "--untracked-files=all"],
cwd=cwd,
capture_output=True,
text=True,
)
if status.returncode != 0:
return ""
diff = subprocess.run(
["git", "diff", "--no-ext-diff", "--binary", "HEAD"],
cwd=cwd,
capture_output=True,
text=True,
)
cached_diff = subprocess.run(
["git", "diff", "--no-ext-diff", "--binary", "--cached"],
cwd=cwd,
capture_output=True,
text=True,
)
untracked = subprocess.run(
["git", "ls-files", "--others", "--exclude-standard", "-z"],
cwd=cwd,
capture_output=True,
)
parts = [
status.stdout,
diff.stdout,
cached_diff.stdout,
]
if untracked.returncode == 0 and untracked.stdout:
for rel_path in untracked.stdout.decode("utf-8", errors="replace").split("\0"):
if not rel_path:
continue
file_path = cwd / rel_path
if file_path.is_file():
digest = sha256(file_path.read_bytes()).hexdigest()
parts.append(f"UNTRACKED {rel_path} {digest}")
else:
parts.append(f"UNTRACKED {rel_path} (non-file)")
return "\n".join(parts)
def _snapshot_repo_status(cwd: Path) -> str:
"""Capture a human-readable status summary for error reporting."""
result = subprocess.run(
["git", "status", "--short", "--untracked-files=all"],
cwd=cwd,
capture_output=True,
text=True,
)
if result.returncode != 0:
return ""
return result.stdout.strip()
def _assert_base_repo_isolation(
cwd: Path,
baseline_state: str,
*,
step_name: str,
agent_name: str,
worktree_path: Path,
baseline_status: str,
) -> None:
"""Fail fast if an agentic run leaked changes into the base repo."""
current_state = _snapshot_repo_state(cwd)
if current_state == baseline_state:
return
current_status = _snapshot_repo_status(cwd)
before = baseline_status or "(clean)"
after = current_status or "(clean)"
raise WorktreeError(
"Agent modified the base repository instead of the isolated worktree.\n\n"
f"Step: {step_name}\n"
f"Agent: {agent_name}\n"
f"Worktree: {worktree_path}\n\n"
f"Baseline status:\n{before}\n\n"
f"Current status:\n{after}"
)
def _finalize_worktree( def _finalize_worktree(
cwd: Path, cwd: Path,
worktree_path: Path, worktree_path: Path,
@@ -172,10 +268,14 @@ def _run_simple_pipeline(
# Setup shared worktree for agentic mode # Setup shared worktree for agentic mode
worktree_path: Path | None = None worktree_path: Path | None = None
agentic_branch_name: str | None = None agentic_branch_name: str | None = None
base_repo_state: str | None = None
base_repo_status: str | None = None
if not dry_run and _has_agentic_steps(config, config.pipeline): if not dry_run and _has_agentic_steps(config, config.pipeline):
worktree_path, agentic_branch_name = _setup_worktree( worktree_path, agentic_branch_name = _setup_worktree(
cwd, run_dir, config.preset_name, cwd, run_dir, config.preset_name,
) )
base_repo_state = _snapshot_repo_state(cwd)
base_repo_status = _snapshot_repo_status(cwd)
feedback = "(no feedback — first iteration)" feedback = "(no feedback — first iteration)"
iterations: list[IterationResult] = [] iterations: list[IterationResult] = []
@@ -203,6 +303,8 @@ def _run_simple_pipeline(
run_dir=run_dir, output_iter=i, run_dir=run_dir, output_iter=i,
worktree_path=worktree_path, worktree_path=worktree_path,
runtime_env=runtime_env, runtime_env=runtime_env,
base_repo_state=base_repo_state,
base_repo_status=base_repo_status,
) )
# Intermediate commit so next iteration's diff only shows new changes # Intermediate commit so next iteration's diff only shows new changes
@@ -332,10 +434,14 @@ def _run_phased_pipeline(
all_phase_steps = [s for p in config.phases for s in p.steps] all_phase_steps = [s for p in config.phases for s in p.steps]
worktree_path: Path | None = None worktree_path: Path | None = None
agentic_branch_name: str | None = None agentic_branch_name: str | None = None
base_repo_state: str | None = None
base_repo_status: str | None = None
if not dry_run and _has_agentic_steps(config, all_phase_steps): if not dry_run and _has_agentic_steps(config, all_phase_steps):
worktree_path, agentic_branch_name = _setup_worktree( worktree_path, agentic_branch_name = _setup_worktree(
cwd, run_dir, config.preset_name, cwd, run_dir, config.preset_name,
) )
base_repo_state = _snapshot_repo_state(cwd)
base_repo_status = _snapshot_repo_status(cwd)
iterations: list[IterationResult] = [] iterations: list[IterationResult] = []
feedback = "(no feedback — first iteration)" feedback = "(no feedback — first iteration)"
@@ -384,6 +490,8 @@ def _run_phased_pipeline(
run_dir=run_dir, output_iter=global_iter, phase_name=phase.name, run_dir=run_dir, output_iter=global_iter, phase_name=phase.name,
worktree_path=worktree_path, worktree_path=worktree_path,
runtime_env=runtime_env, runtime_env=runtime_env,
base_repo_state=base_repo_state,
base_repo_status=base_repo_status,
) )
# Intermediate commit so next iteration's diff only shows new changes # Intermediate commit so next iteration's diff only shows new changes
@@ -626,6 +734,8 @@ def _run_steps(
phase_name: str | None = None, phase_name: str | None = None,
worktree_path: Path | None = None, worktree_path: Path | None = None,
runtime_env: dict[str, str] | None = None, runtime_env: dict[str, str] | None = None,
base_repo_state: str | None = None,
base_repo_status: str | None = None,
) -> tuple[dict[str, str], dict[str, AgentResult], str | None]: ) -> tuple[dict[str, str], dict[str, AgentResult], str | None]:
"""Execute all steps in one iteration, parallelizing where possible.""" """Execute all steps in one iteration, parallelizing where possible."""
step_outputs: dict[str, str] = {} step_outputs: dict[str, str] = {}
@@ -644,6 +754,8 @@ def _run_steps(
run_dir=run_dir, output_iter=output_iter, run_dir=run_dir, output_iter=output_iter,
phase_name=phase_name, worktree_path=worktree_path, phase_name=phase_name, worktree_path=worktree_path,
runtime_env=runtime_env, runtime_env=runtime_env,
base_repo_state=base_repo_state,
base_repo_status=base_repo_status,
) )
else: else:
_execute_parallel_batch( _execute_parallel_batch(
@@ -653,6 +765,8 @@ def _run_steps(
run_dir=run_dir, output_iter=output_iter, run_dir=run_dir, output_iter=output_iter,
phase_name=phase_name, worktree_path=worktree_path, phase_name=phase_name, worktree_path=worktree_path,
runtime_env=runtime_env, runtime_env=runtime_env,
base_repo_state=base_repo_state,
base_repo_status=base_repo_status,
) )
# Extract verdict from all verdict steps (ALL must PASS; ESCALATE wins over all) # Extract verdict from all verdict steps (ALL must PASS; ESCALATE wins over all)
@@ -709,6 +823,8 @@ def _execute_step(
quiet: bool = False, quiet: bool = False,
worktree_path: Path | None = None, worktree_path: Path | None = None,
runtime_env: dict[str, str] | None = None, runtime_env: dict[str, str] | None = None,
base_repo_state: str | None = None,
base_repo_status: str | None = None,
) -> None: ) -> None:
"""Execute a single step, updating step_outputs and step_results in place.""" """Execute a single step, updating step_outputs and step_results in place."""
if not quiet: if not quiet:
@@ -717,9 +833,10 @@ def _execute_step(
# 1. Resolve template # 1. Resolve template
template = resolve_template(step.prompt_template) template = resolve_template(step.prompt_template)
# 2. Build context # 2. Build context (include prior step results for evidence)
context = _build_context( context = _build_context(
input_contents, step_outputs, feedback, iteration, max_iterations, input_contents, step_outputs, feedback, iteration, max_iterations,
step_results=step_results,
) )
# 3. Apply context overrides # 3. Apply context overrides
@@ -794,6 +911,16 @@ def _execute_step(
raise raise
# 7. Store output # 7. Store output
if worktree_path is not None and base_repo_state is not None:
_assert_base_repo_isolation(
cwd,
base_repo_state,
step_name=step.name,
agent_name=step.agent,
worktree_path=worktree_path,
baseline_status=base_repo_status or "",
)
step_outputs[step.output_key] = result.output step_outputs[step.output_key] = result.output
step_results[step.output_key] = result step_results[step.output_key] = result
@@ -826,6 +953,8 @@ def _execute_parallel_batch(
phase_name: str | None = None, phase_name: str | None = None,
worktree_path: Path | None = None, worktree_path: Path | None = None,
runtime_env: dict[str, str] | None = None, runtime_env: dict[str, str] | None = None,
base_repo_state: str | None = None,
base_repo_status: str | None = None,
) -> None: ) -> None:
"""Execute multiple steps in parallel using threads.""" """Execute multiple steps in parallel using threads."""
agent_names = ", ".join(s.agent for s in batch) agent_names = ", ".join(s.agent for s in batch)
@@ -838,6 +967,8 @@ def _execute_parallel_batch(
iteration, max_iterations, cwd, timeout, dry_run, iteration, max_iterations, cwd, timeout, dry_run,
step_outputs, step_results, step_outputs, step_results,
run_dir=run_dir, output_iter=output_iter, phase_name=phase_name, run_dir=run_dir, output_iter=output_iter, phase_name=phase_name,
base_repo_state=base_repo_state,
base_repo_status=base_repo_status,
) )
return return
@@ -858,12 +989,15 @@ def _execute_parallel_batch(
step_outputs, step_results, step_outputs, step_results,
run_dir=run_dir, output_iter=output_iter, run_dir=run_dir, output_iter=output_iter,
phase_name=phase_name, worktree_path=worktree_path, phase_name=phase_name, worktree_path=worktree_path,
base_repo_state=base_repo_state,
base_repo_status=base_repo_status,
) )
return return
# Snapshot context before parallel execution (all steps see same state) # Snapshot context before parallel execution (all steps see same state)
context_snapshot = dict(input_contents) context_snapshot = dict(input_contents)
context_snapshot.update(step_outputs) context_snapshot.update(step_outputs)
results_snapshot = dict(step_results)
# Collect results from parallel threads # Collect results from parallel threads
local_outputs: dict[str, str] = {} local_outputs: dict[str, str] = {}
@@ -883,6 +1017,7 @@ def _execute_parallel_batch(
template = resolve_template(step.prompt_template) template = resolve_template(step.prompt_template)
context = _build_context( context = _build_context(
context_snapshot, {}, feedback, iteration, max_iterations, context_snapshot, {}, feedback, iteration, max_iterations,
step_results=results_snapshot,
) )
if step.context_override: if step.context_override:
context = _apply_context_override(context, step.context_override) context = _apply_context_override(context, step.context_override)
@@ -919,6 +1054,16 @@ def _execute_parallel_batch(
batch_elapsed = round(time.monotonic() - batch_start, 1) batch_elapsed = round(time.monotonic() - batch_start, 1)
# Persist successful outputs even if a sibling step failed. # Persist successful outputs even if a sibling step failed.
if worktree_path is not None and base_repo_state is not None:
_assert_base_repo_isolation(
cwd,
base_repo_state,
step_name=phase_name or "parallel-batch",
agent_name=agent_names,
worktree_path=worktree_path,
baseline_status=base_repo_status or "",
)
for step in batch: for step in batch:
key = step.output_key key = step.output_key
if key not in local_outputs: if key not in local_outputs:
@@ -986,6 +1131,7 @@ def _build_context(
feedback: str, feedback: str,
iteration: int, iteration: int,
max_iterations: int, max_iterations: int,
step_results: dict[str, AgentResult] | None = None,
) -> dict[str, str]: ) -> dict[str, str]:
"""Build the template context dict.""" """Build the template context dict."""
context: dict[str, str] = {} context: dict[str, str] = {}
@@ -994,9 +1140,42 @@ def _build_context(
context["feedback"] = feedback context["feedback"] = feedback
context["iteration"] = str(iteration) context["iteration"] = str(iteration)
context["max_iterations"] = str(max_iterations) context["max_iterations"] = str(max_iterations)
# Surface execution evidence from prior steps so reviewers can inspect it
if step_results:
context["execution_evidence"] = _format_execution_evidence(step_results)
return context return context
def _format_execution_evidence(
step_results: dict[str, AgentResult],
) -> str:
"""Format execution evidence from prior steps for reviewer consumption.
Produces a compact summary of command, exit code, duration, and a truncated
transcript excerpt for each completed step so that reviewers and seniors
can verify claims against real execution data.
"""
if not step_results:
return "(no prior execution evidence)"
parts: list[str] = []
for key, result in step_results.items():
section = [
f"### Step: {result.step_name} ({result.agent_name})",
f"- Command: `{result.command_preview}`" if result.command_preview else "",
f"- Exit code: {result.exit_code}",
f"- Duration: {result.duration_seconds}s",
]
section = [line for line in section if line]
if result.transcript:
# Include a truncated transcript excerpt for debugging
excerpt = result.transcript[:2000]
if len(result.transcript) > 2000:
excerpt += "\n... (truncated)"
section.append(f"\n<details>\n<summary>Transcript excerpt</summary>\n\n{excerpt}\n</details>")
parts.append("\n".join(section))
return "\n\n---\n\n".join(parts)
def _build_runtime_inputs( def _build_runtime_inputs(
config: PipelineConfig, config: PipelineConfig,
input_contents: dict[str, str], input_contents: dict[str, str],

View File

@@ -59,9 +59,14 @@ You are tasked with reviewing code against a plan and checklist.
## Previous Review Feedback ## Previous Review Feedback
{feedback} {feedback}
## Execution Evidence
{execution_evidence}
## Review Instructions ## Review Instructions
Explore the project directory to understand the full codebase context, \ Explore the project directory to understand the full codebase context, \
then evaluate the code against ONLY the plan and checklist above. then evaluate the code against ONLY the plan and checklist above. \
Use the execution evidence above to verify agent claims against actual \
command outputs and exit codes.
For each issue found, classify it with BOTH severity AND category: For each issue found, classify it with BOTH severity AND category:
@@ -164,9 +169,13 @@ REVIEW_TEMPLATE_KO = """\
## 이전 리뷰 피드백 ## 이전 리뷰 피드백
{feedback} {feedback}
## 실행 증거
{execution_evidence}
## 검토 지침 ## 검토 지침
프로젝트 디렉토리를 직접 탐색하여 전체 코드베이스 맥락을 파악한 뒤, \ 프로젝트 디렉토리를 직접 탐색하여 전체 코드베이스 맥락을 파악한 뒤, \
위 기획서와 체크리스트 기준으로만 코드를 평가하세요. 위 기획서와 체크리스트 기준으로만 코드를 평가하세요. \
위 실행 증거를 활용하여 에이전트의 주장을 실제 명령어 출력과 종료 코드로 검증하세요.
발견된 각 이슈에 심각도와 카테고리를 모두 부여하세요: 발견된 각 이슈에 심각도와 카테고리를 모두 부여하세요:
@@ -525,8 +534,13 @@ You are adjudicating multiple review results and turning them into an actionable
## Previous Issue Tracker ## Previous Issue Tracker
{previous_senior_tracker} {previous_senior_tracker}
## Execution Evidence
{execution_evidence}
## Instructions ## Instructions
Explore the project directory to confirm the current codebase state. Then: Explore the project directory to confirm the current codebase state. \
Use the execution evidence above to verify claims against actual command \
outputs and exit codes. Then:
1. Deduplicate overlapping issues across reviewers. 1. Deduplicate overlapping issues across reviewers.
2. Resolve disagreements explicitly. 2. Resolve disagreements explicitly.
3. Keep only issues supported by the plan, checklist, code, or reviewer evidence. 3. Keep only issues supported by the plan, checklist, code, or reviewer evidence.
@@ -592,8 +606,13 @@ AGGREGATE_REVIEW_TEMPLATE_KO = """\
## 이전 이슈 트래커 ## 이전 이슈 트래커
{previous_senior_tracker} {previous_senior_tracker}
## 실행 증거
{execution_evidence}
## 지침 ## 지침
프로젝트 디렉토리를 탐색하여 현재 코드베이스 상태를 확인한 뒤 다음을 수행하세요. 프로젝트 디렉토리를 탐색하여 현재 코드베이스 상태를 확인한 뒤, \
위 실행 증거를 활용하여 에이전트의 주장을 실제 명령어 출력과 종료 코드로 검증하세요. \
그런 다음 아래를 수행하세요.
1. 리뷰어들 사이에 중복되는 이슈를 합치세요. 1. 리뷰어들 사이에 중복되는 이슈를 합치세요.
2. 의견 충돌은 명시적으로 정리하세요. 2. 의견 충돌은 명시적으로 정리하세요.
3. 기획서, 체크리스트, 코드, 리뷰 근거로 뒷받침되는 이슈만 남기세요. 3. 기획서, 체크리스트, 코드, 리뷰 근거로 뒷받침되는 이슈만 남기세요.

View File

@@ -386,6 +386,11 @@ def _append_iteration_steps(
lines.append(f"### {_t(config, 'step')}: {step.name} ({agent_name}){duration}\n") lines.append(f"### {_t(config, 'step')}: {step.name} ({agent_name}){duration}\n")
# Show command preview and exit code for execution evidence
if agent_result and agent_result.command_preview:
lines.append(f"**Command**: `{agent_result.command_preview}`")
lines.append(f"**Exit code**: {agent_result.exit_code}\n")
if step.verdict and iter_result.verdict: if step.verdict and iter_result.verdict:
lines.append(f"**{_t(config, 'verdict')}: {iter_result.verdict}**\n") lines.append(f"**{_t(config, 'verdict')}: {iter_result.verdict}**\n")
@@ -400,6 +405,16 @@ def _append_iteration_steps(
lines.append(output) lines.append(output)
lines.append("") lines.append("")
# Include transcript excerpt for execution evidence visibility
if agent_result and agent_result.transcript:
transcript_preview = agent_result.transcript[:1500]
if len(agent_result.transcript) > 1500:
transcript_preview += "\n... (truncated)"
lines.append("<details>")
lines.append("<summary>Execution transcript</summary>\n")
lines.append(transcript_preview)
lines.append("\n</details>\n")
if not skip_extraction and step.role == "review": if not skip_extraction and step.role == "review":
oos = _extract_out_of_scope(output) oos = _extract_out_of_scope(output)
if oos: if oos:

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import logging import logging
import shutil import shutil
import subprocess import subprocess
import tempfile
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
@@ -20,6 +21,22 @@ def make_branch_name(preset_name: str) -> str:
return f"cross-eval/{preset_name}_{ts}" return f"cross-eval/{preset_name}_{ts}"
def make_worktree_dir(base_cwd: Path, branch_name: str) -> Path:
"""Choose a worktree directory outside the base repo.
Keeping agentic worktrees outside the source checkout avoids tools that
incorrectly walk up to the outer repo and write into the base worktree.
"""
repo_name = base_cwd.resolve().name or "repo"
branch_slug = branch_name.replace("/", "__")
return (
Path(tempfile.gettempdir())
/ "cross-eval-worktrees"
/ repo_name
/ branch_slug
)
def create_worktree(base_cwd: Path, work_dir: Path, branch_name: str) -> Path: def create_worktree(base_cwd: Path, work_dir: Path, branch_name: str) -> Path:
"""Create a git worktree on a new branch from HEAD. """Create a git worktree on a new branch from HEAD.

View File

@@ -23,6 +23,7 @@ from cross_eval.models import (
StepConfig, StepConfig,
) )
from cross_eval.pipeline import ( from cross_eval.pipeline import (
_assert_base_repo_isolation,
_commit_iteration, _commit_iteration,
_finalize_worktree, _finalize_worktree,
_has_agentic_steps, _has_agentic_steps,
@@ -34,6 +35,7 @@ from cross_eval.worktree import (
commit_worktree, commit_worktree,
create_worktree, create_worktree,
make_branch_name, make_branch_name,
make_worktree_dir,
remove_worktree, remove_worktree,
) )
@@ -191,6 +193,41 @@ class TestMakeBranchName(unittest.TestCase):
self.assertEqual(len(ts_part), 15) # YYYYMMDD_HHMMSS self.assertEqual(len(ts_part), 15) # YYYYMMDD_HHMMSS
class TestMakeWorktreeDir(unittest.TestCase):
"""make_worktree_dir chooses an external temp location."""
def test_uses_tmp_dir_outside_repo(self) -> None:
with tempfile.TemporaryDirectory() as td:
base = Path(td) / "repo"
base.mkdir()
path = make_worktree_dir(base, "cross-eval/review-fix_20260313_123456")
self.assertIn("cross-eval-worktrees", str(path))
self.assertNotIn(str(base), str(path))
class TestBaseRepoIsolation(unittest.TestCase):
"""Base repo mutations should fail fast during agentic execution."""
def test_raises_when_base_repo_status_changes(self) -> None:
with tempfile.TemporaryDirectory() as td:
base = Path(td) / "repo"
worktree = Path(td) / "worktree"
base.mkdir()
worktree.mkdir()
with self.assertRaises(RuntimeError) as ctx:
_assert_base_repo_isolation(
base,
"M cross_eval/agent.py",
step_name="coding",
agent_name="claude-coder",
worktree_path=worktree,
baseline_status="M cross_eval/agent.py",
)
self.assertIn("base repository", str(ctx.exception))
# =================================================================== # ===================================================================
# 2. agent.py agentic tests (mocking subprocess) # 2. agent.py agentic tests (mocking subprocess)
# =================================================================== # ===================================================================
@@ -513,6 +550,33 @@ class TestSetupWorktreeCalledForAgentic(unittest.TestCase):
mock_setup.assert_called_once() mock_setup.assert_called_once()
class TestSetupWorktreeLocation(unittest.TestCase):
"""_setup_worktree places agentic worktrees outside the base repo."""
def test_worktree_is_created_outside_repo(self) -> None:
with tempfile.TemporaryDirectory() as td:
base = Path(td) / "repo"
run_dir = base / ".cross-eval" / "output" / "smoke"
base.mkdir()
run_dir.mkdir(parents=True)
_init_git_repo(base)
worktree_path, branch_name = _setup_worktree(base, run_dir, "review-fix")
try:
self.assertTrue(worktree_path.exists())
self.assertNotIn(str(base.resolve()), str(worktree_path.resolve()))
self.assertEqual(
(run_dir / "worktree_path.txt").read_text(encoding="utf-8").strip(),
str(worktree_path),
)
self.assertEqual(
(run_dir / "worktree_branch.txt").read_text(encoding="utf-8").strip(),
branch_name,
)
finally:
remove_worktree(base, worktree_path)
class TestReviewerRunsInWorktreeCwd(unittest.TestCase): class TestReviewerRunsInWorktreeCwd(unittest.TestCase):
"""Reviewer runs with worktree cwd (not original cwd) when worktree exists.""" """Reviewer runs with worktree cwd (not original cwd) when worktree exists."""

395
tests/test_evidence.py Normal file
View File

@@ -0,0 +1,395 @@
"""Regression tests for runtime evidence propagation and report visibility.
Covers:
1. Execution evidence is surfaced in reviewer/senior prompt context.
2. Reports include command preview and transcript excerpts.
3. Claude agentic failure detection (empty diff, write failure, expanded markers).
4. _format_execution_evidence produces expected output.
"""
from __future__ import annotations
import tempfile
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
from cross_eval.agent import (
AgentInvocationError,
_claims_file_changes,
_has_write_failure_indicators,
invoke_agent_agentic,
)
from cross_eval.config import BUILTIN_AGENTS
from cross_eval.models import (
AgentConfig,
AgentResult,
IterationResult,
PipelineConfig,
PipelineResult,
ReviewMetrics,
StepConfig,
)
from cross_eval.pipeline import _format_execution_evidence, run_pipeline
from cross_eval.report import build_report
# ---------------------------------------------------------------------------
# 1. Execution evidence formatting
# ---------------------------------------------------------------------------
class TestFormatExecutionEvidence(unittest.TestCase):
"""_format_execution_evidence produces a compact summary for reviewers."""
def test_empty_results_returns_placeholder(self) -> None:
self.assertIn("no prior execution evidence", _format_execution_evidence({}))
def test_single_result_includes_key_fields(self) -> None:
result = AgentResult(
output="some diff",
exit_code=0,
agent_name="claude-coder",
step_name="coding",
duration_seconds=12.3,
transcript="# Agent Execution Transcript\n\n## Command\nclaude ...",
command_preview="claude --setting-sources user",
)
evidence = _format_execution_evidence({"coding_output": result})
self.assertIn("claude-coder", evidence)
self.assertIn("coding", evidence)
self.assertIn("Exit code: 0", evidence)
self.assertIn("12.3s", evidence)
self.assertIn("claude --setting-sources user", evidence)
self.assertIn("Transcript excerpt", evidence)
def test_multiple_results_separated(self) -> None:
r1 = AgentResult(
output="diff1", exit_code=0, agent_name="coder",
step_name="coding", duration_seconds=1.0,
command_preview="cmd1",
)
r2 = AgentResult(
output="review text", exit_code=0, agent_name="reviewer",
step_name="review", duration_seconds=2.0,
command_preview="cmd2",
)
evidence = _format_execution_evidence({
"coding_output": r1,
"review_result": r2,
})
self.assertIn("coder", evidence)
self.assertIn("reviewer", evidence)
self.assertIn("---", evidence)
def test_transcript_truncated_at_2000_chars(self) -> None:
long_transcript = "x" * 3000
result = AgentResult(
output="out", exit_code=0, agent_name="agent",
step_name="step", duration_seconds=1.0,
transcript=long_transcript,
)
evidence = _format_execution_evidence({"key": result})
self.assertIn("truncated", evidence)
# The full 3000-char transcript should NOT appear
self.assertNotIn("x" * 3000, evidence)
# ---------------------------------------------------------------------------
# 2. Evidence in reviewer prompts (integration)
# ---------------------------------------------------------------------------
class TestEvidenceInReviewerPrompt(unittest.TestCase):
"""Reviewer prompts include execution evidence from prior coding step."""
def test_reviewer_receives_evidence(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
steps = [
StepConfig(
name="coding", agent="claude-coder", role="coding",
prompt_template="default:coding", output_key="coding_output",
),
StepConfig(
name="review", agent="claude-reviewer", role="review",
prompt_template="default:review", output_key="review_result",
verdict=True,
),
]
config = PipelineConfig(
output_dir=Path(tmpdir),
max_iterations=1,
min_iterations=1,
language="en",
inputs={"plan": "Test plan", "checklist": "Test checklist"},
agents=dict(BUILTIN_AGENTS),
coders=["claude-coder"],
reviewers=["claude-reviewer"],
pipeline=steps,
preset_name="simple",
)
captured_prompts: list[dict] = []
def _mock(agent_config, prompt, step_name, **kwargs):
captured_prompts.append({
"step_name": step_name,
"prompt": prompt,
})
if step_name == "coding":
return AgentResult(
output="Implemented feature X",
exit_code=0,
agent_name=agent_config.name,
step_name=step_name,
duration_seconds=5.0,
transcript="# Transcript\nclaude ran...",
command_preview="claude --setting-sources user",
)
return AgentResult(
output="VERDICT: PASS",
exit_code=0,
agent_name=agent_config.name,
step_name=step_name,
duration_seconds=2.0,
)
with patch("cross_eval.pipeline.invoke_agent", side_effect=_mock):
result = run_pipeline(config)
self.assertEqual(result.final_verdict, "PASS")
# The reviewer prompt should contain execution evidence
review_prompts = [
p for p in captured_prompts if p["step_name"] == "review"
]
self.assertTrue(len(review_prompts) >= 1)
review_prompt = review_prompts[0]["prompt"]
# Evidence section should reference the coding step's command
self.assertIn("Execution Evidence", review_prompt)
self.assertIn("claude-coder", review_prompt)
# ---------------------------------------------------------------------------
# 3. Report includes evidence
# ---------------------------------------------------------------------------
class TestReportIncludesEvidence(unittest.TestCase):
"""Report generation includes command preview and transcript excerpts."""
def _make_pipeline_result(self) -> tuple[PipelineConfig, PipelineResult]:
steps = [
StepConfig(
name="coding", agent="claude-coder", role="coding",
prompt_template="default:coding", output_key="coding_output",
),
StepConfig(
name="review", agent="claude-reviewer", role="review",
prompt_template="default:review", output_key="review_result",
verdict=True,
),
]
config = PipelineConfig(
max_iterations=1,
language="en",
inputs={"plan": "Plan", "checklist": "CL"},
agents=dict(BUILTIN_AGENTS),
pipeline=steps,
preset_name="simple",
)
coding_result = AgentResult(
output="diff --git a/file ...",
exit_code=0,
agent_name="claude-coder",
step_name="coding",
duration_seconds=10.0,
transcript="# Agent Execution Transcript\n## Command\nclaude ...\n## Stdout\nok",
command_preview="claude --setting-sources user",
)
review_result = AgentResult(
output="All good.\n\nVERDICT: PASS",
exit_code=0,
agent_name="claude-reviewer",
step_name="review",
duration_seconds=5.0,
transcript="# Agent Execution Transcript\n## Command\nclaude -p ...\n## Stdout\nAll good.",
command_preview="claude -p --setting-sources user",
)
iteration = IterationResult(
iteration=1,
step_results={
"coding_output": coding_result,
"review_result": review_result,
},
step_outputs={
"coding_output": "diff --git a/file ...",
"review_result": "All good.\n\nVERDICT: PASS",
},
verdict="PASS",
)
pipeline_result = PipelineResult(
iterations=[iteration],
final_verdict="PASS",
total_duration=15.0,
)
return config, pipeline_result
def test_report_contains_command_preview(self) -> None:
config, result = self._make_pipeline_result()
report = build_report(config, result)
self.assertIn("claude --setting-sources user", report)
self.assertIn("**Command**", report)
def test_report_contains_transcript_excerpt(self) -> None:
config, result = self._make_pipeline_result()
report = build_report(config, result)
self.assertIn("Execution transcript", report)
self.assertIn("Agent Execution Transcript", report)
def test_report_contains_exit_code(self) -> None:
config, result = self._make_pipeline_result()
report = build_report(config, result)
self.assertIn("**Exit code**: 0", report)
# ---------------------------------------------------------------------------
# 4. Claude agentic hardened failure detection
# ---------------------------------------------------------------------------
class TestClaimsFileChangesExpanded(unittest.TestCase):
"""Expanded change-claim markers detect more Claude output patterns."""
def test_ive_implemented(self) -> None:
self.assertTrue(_claims_file_changes("I've implemented the feature"))
def test_ive_updated(self) -> None:
self.assertTrue(_claims_file_changes("I've updated the config"))
def test_made_the_following_changes(self) -> None:
self.assertTrue(_claims_file_changes("I made the following changes to the file"))
def test_applied_the_fix(self) -> None:
self.assertTrue(_claims_file_changes("Applied the fix for the bug"))
def test_changes_have_been_applied(self) -> None:
self.assertTrue(_claims_file_changes("Changes have been applied successfully"))
def test_wrote_the_code(self) -> None:
self.assertTrue(_claims_file_changes("Wrote the code for the new module"))
def test_refactored(self) -> None:
self.assertTrue(_claims_file_changes("I refactored the pipeline"))
def test_no_changes_still_returns_false(self) -> None:
self.assertFalse(_claims_file_changes("No changes were necessary"))
def test_empty_string_returns_false(self) -> None:
self.assertFalse(_claims_file_changes(""))
class TestWriteFailureIndicators(unittest.TestCase):
"""_has_write_failure_indicators detects stderr patterns."""
def test_permission_denied(self) -> None:
self.assertTrue(_has_write_failure_indicators("Error: Permission denied"))
def test_read_only_filesystem(self) -> None:
self.assertTrue(_has_write_failure_indicators("read-only file system"))
def test_sandbox_restriction(self) -> None:
self.assertTrue(_has_write_failure_indicators("Blocked by sandbox policy"))
def test_eacces(self) -> None:
self.assertTrue(_has_write_failure_indicators("EACCES: operation not permitted"))
def test_empty_stderr_returns_false(self) -> None:
self.assertFalse(_has_write_failure_indicators(""))
def test_normal_stderr_returns_false(self) -> None:
self.assertFalse(_has_write_failure_indicators("Downloading model..."))
class TestAgenticWriteFailureRaisesError(unittest.TestCase):
"""Agentic mode raises AgentInvocationError on stderr write-failure indicators."""
@patch("cross_eval.worktree.capture_diff", return_value="")
@patch("subprocess.run")
def test_write_failure_detected_from_stderr(
self, mock_run: MagicMock, mock_diff: MagicMock,
) -> None:
mock_run.return_value = MagicMock(
returncode=0,
stdout="Done.",
stderr="Error: Permission denied writing to /src/main.py",
)
agent = AgentConfig(
name="claude-coder", command="claude",
args=["--setting-sources", "user"], agentic=True,
)
import subprocess as _sp
import tempfile as _tf
with _tf.TemporaryDirectory() as td:
wt = Path(td)
_sp.run(["git", "init"], cwd=wt, capture_output=True, check=True)
_sp.run(["git", "config", "user.email", "t@t.com"], cwd=wt, capture_output=True)
_sp.run(["git", "config", "user.name", "T"], cwd=wt, capture_output=True)
(wt / "README.md").write_text("# init\n")
_sp.run(["git", "add", "."], cwd=wt, capture_output=True, check=True)
_sp.run(["git", "commit", "-m", "init"], cwd=wt, capture_output=True, check=True)
with self.assertRaises(AgentInvocationError) as ctx:
invoke_agent_agentic(
agent, "implement feature", "coding",
worktree_path=wt, quiet=True,
)
self.assertEqual(ctx.exception.failure_type, "WRITE_FAILURE")
self.assertIn("Permission denied", ctx.exception.raw_error)
class TestAgenticExpandedClaimMarkers(unittest.TestCase):
"""Agentic mode detects expanded claim markers in empty diff scenarios."""
@patch("cross_eval.worktree.capture_diff", return_value="")
@patch("subprocess.run")
def test_ive_implemented_triggers_empty_diff_error(
self, mock_run: MagicMock, mock_diff: MagicMock,
) -> None:
mock_run.return_value = MagicMock(
returncode=0,
stdout="I've implemented the requested changes to the pipeline.",
stderr="",
)
agent = AgentConfig(
name="claude-coder", command="claude",
args=["--setting-sources", "user"], agentic=True,
)
import subprocess as _sp
import tempfile as _tf
with _tf.TemporaryDirectory() as td:
wt = Path(td)
_sp.run(["git", "init"], cwd=wt, capture_output=True, check=True)
_sp.run(["git", "config", "user.email", "t@t.com"], cwd=wt, capture_output=True)
_sp.run(["git", "config", "user.name", "T"], cwd=wt, capture_output=True)
(wt / "README.md").write_text("# init\n")
_sp.run(["git", "add", "."], cwd=wt, capture_output=True, check=True)
_sp.run(["git", "commit", "-m", "init"], cwd=wt, capture_output=True, check=True)
with self.assertRaises(AgentInvocationError) as ctx:
invoke_agent_agentic(
agent, "implement feature", "coding",
worktree_path=wt, quiet=True,
)
self.assertEqual(ctx.exception.failure_type, "EMPTY_DIFF")
if __name__ == "__main__":
unittest.main()