feat: propagate execution evidence across iterations and enhance reports
- Carry execution evidence forward so reviewer/senior prompts in
subsequent iterations can inspect prior transcript and command data
- Add {execution_evidence} to REVIEW_ONLY templates (en/ko)
- Add evidence summary table to iteration reports
- Fix test_agentic to match stdin-based prompt delivery for Claude
- Add expanded claim/no-change marker tests and cross-iteration
evidence propagation tests
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -338,6 +338,13 @@ def _run_simple_pipeline(
|
||||
if tracker:
|
||||
input_contents["previous_senior_tracker"] = tracker
|
||||
|
||||
# Carry execution evidence forward so subsequent iterations'
|
||||
# reviewer/senior prompts can inspect prior transcript data.
|
||||
if step_results:
|
||||
input_contents["execution_evidence"] = _format_execution_evidence(
|
||||
step_results,
|
||||
)
|
||||
|
||||
iterations.append(iter_result)
|
||||
|
||||
# ESCALATE check (highest priority)
|
||||
@@ -531,6 +538,13 @@ def _run_phased_pipeline(
|
||||
if tracker:
|
||||
input_contents["previous_senior_tracker"] = tracker
|
||||
|
||||
# Carry execution evidence forward so subsequent iterations'
|
||||
# reviewer/senior prompts can inspect prior transcript data.
|
||||
if step_results:
|
||||
input_contents["execution_evidence"] = _format_execution_evidence(
|
||||
step_results,
|
||||
)
|
||||
|
||||
iterations.append(iter_result)
|
||||
|
||||
# ESCALATE check
|
||||
@@ -1133,16 +1147,33 @@ def _build_context(
|
||||
max_iterations: int,
|
||||
step_results: dict[str, AgentResult] | None = None,
|
||||
) -> dict[str, str]:
|
||||
"""Build the template context dict."""
|
||||
"""Build the template context dict.
|
||||
|
||||
Execution evidence from prior iterations is carried forward in
|
||||
``input_contents["execution_evidence"]``. When the current iteration
|
||||
has its own step results, the evidence is merged so reviewers/seniors
|
||||
see both prior and current data.
|
||||
"""
|
||||
context: dict[str, str] = {}
|
||||
context.update(input_contents)
|
||||
context.update(step_outputs)
|
||||
context["feedback"] = feedback
|
||||
context["iteration"] = str(iteration)
|
||||
context["max_iterations"] = str(max_iterations)
|
||||
# Surface execution evidence from prior steps so reviewers can inspect it
|
||||
# Surface execution evidence from prior steps so reviewers can inspect it.
|
||||
# Prior-iteration evidence may already live in context via input_contents.
|
||||
prior_evidence = context.get("execution_evidence", "")
|
||||
if step_results:
|
||||
context["execution_evidence"] = _format_execution_evidence(step_results)
|
||||
current_evidence = _format_execution_evidence(step_results)
|
||||
if prior_evidence and prior_evidence != "(no prior execution evidence)":
|
||||
context["execution_evidence"] = (
|
||||
"# Prior Iteration Evidence\n"
|
||||
+ prior_evidence
|
||||
+ "\n\n# Current Iteration Evidence\n"
|
||||
+ current_evidence
|
||||
)
|
||||
else:
|
||||
context["execution_evidence"] = current_evidence
|
||||
return context
|
||||
|
||||
|
||||
@@ -1164,6 +1195,7 @@ def _format_execution_evidence(
|
||||
f"- Command: `{result.command_preview}`" if result.command_preview else "",
|
||||
f"- Exit code: {result.exit_code}",
|
||||
f"- Duration: {result.duration_seconds}s",
|
||||
f"- Output size: {len(result.output)} chars",
|
||||
]
|
||||
section = [line for line in section if line]
|
||||
if result.transcript:
|
||||
|
||||
@@ -243,9 +243,14 @@ You are tasked with reviewing existing code against a plan and checklist.
|
||||
## Previous Review (iteration {iteration} of {max_iterations})
|
||||
{feedback}
|
||||
|
||||
## Execution Evidence
|
||||
{execution_evidence}
|
||||
|
||||
## Review Instructions
|
||||
Explore the project directory thoroughly to understand the full codebase, \
|
||||
then evaluate the EXISTING code against ONLY the plan and checklist above.
|
||||
then evaluate the EXISTING code against ONLY the plan and checklist above. \
|
||||
Use the execution evidence above to verify agent claims against actual \
|
||||
command outputs and exit codes.
|
||||
|
||||
You are NOT generating or modifying code. You are auditing what already exists.
|
||||
|
||||
@@ -314,9 +319,13 @@ REVIEW_ONLY_TEMPLATE_KO = """\
|
||||
## 이전 리뷰 결과 ({max_iterations}회 중 {iteration}번째)
|
||||
{feedback}
|
||||
|
||||
## 실행 증거
|
||||
{execution_evidence}
|
||||
|
||||
## 검토 지침
|
||||
프로젝트 디렉토리를 직접 탐색하여 전체 코드베이스를 파악한 뒤, \
|
||||
위 기획서와 체크리스트 기준으로 **기존 코드**를 평가하세요.
|
||||
위 기획서와 체크리스트 기준으로 **기존 코드**를 평가하세요. \
|
||||
위 실행 증거를 활용하여 에이전트의 주장을 실제 명령어 출력과 종료 코드로 검증하세요.
|
||||
|
||||
코드를 생성하거나 수정하지 마세요. 이미 존재하는 코드를 감사하는 것이 목적입니다.
|
||||
|
||||
|
||||
@@ -58,6 +58,12 @@ _STRINGS: dict[str, dict[str, str]] = {
|
||||
"metrics_total_issues": "Total Issues",
|
||||
"metrics_na": "N/A",
|
||||
"iteration_details": "Iteration Details",
|
||||
"evidence_summary": "Evidence Summary",
|
||||
"evidence_agent": "Agent",
|
||||
"evidence_exit_code": "Exit Code",
|
||||
"evidence_duration": "Duration",
|
||||
"evidence_output_size": "Output Size",
|
||||
"evidence_transcript": "Execution transcript",
|
||||
},
|
||||
"ko": {
|
||||
"title": "교차 검증 리포트",
|
||||
@@ -99,6 +105,12 @@ _STRINGS: dict[str, dict[str, str]] = {
|
||||
"metrics_total_issues": "총 이슈",
|
||||
"metrics_na": "해당 없음",
|
||||
"iteration_details": "반복 상세",
|
||||
"evidence_summary": "실행 증거 요약",
|
||||
"evidence_agent": "에이전트",
|
||||
"evidence_exit_code": "종료 코드",
|
||||
"evidence_duration": "소요 시간",
|
||||
"evidence_output_size": "출력 크기",
|
||||
"evidence_transcript": "실행 트랜스크립트",
|
||||
},
|
||||
}
|
||||
|
||||
@@ -377,6 +389,30 @@ def _append_iteration_steps(
|
||||
If *skip_extraction* is True, out-of-scope and review-metrics parsing
|
||||
is skipped (useful when a pre-scan already collected that data).
|
||||
"""
|
||||
# Evidence summary table — quick overview of all steps' execution data
|
||||
has_evidence = any(
|
||||
iter_result.step_results.get(s.output_key) for s in steps
|
||||
)
|
||||
if has_evidence:
|
||||
s_step = _t(config, "step")
|
||||
s_agent = _t(config, "evidence_agent")
|
||||
s_exit = _t(config, "evidence_exit_code")
|
||||
s_dur = _t(config, "evidence_duration")
|
||||
s_size = _t(config, "evidence_output_size")
|
||||
lines.append(f"**{_t(config, 'evidence_summary')}**\n")
|
||||
lines.append(f"| {s_step} | {s_agent} | {s_exit} | {s_dur} | {s_size} |")
|
||||
lines.append("|------|-------|-----------|----------|-------------|")
|
||||
for step in steps:
|
||||
ar = iter_result.step_results.get(step.output_key)
|
||||
out = iter_result.step_outputs.get(step.output_key, "")
|
||||
if ar:
|
||||
lines.append(
|
||||
f"| {step.name} | {ar.agent_name} "
|
||||
f"| {ar.exit_code} | {ar.duration_seconds}s "
|
||||
f"| {len(out)} chars |"
|
||||
)
|
||||
lines.append("")
|
||||
|
||||
for step in steps:
|
||||
agent_result = iter_result.step_results.get(step.output_key)
|
||||
output = iter_result.step_outputs.get(step.output_key, "")
|
||||
@@ -410,8 +446,9 @@ def _append_iteration_steps(
|
||||
transcript_preview = agent_result.transcript[:1500]
|
||||
if len(agent_result.transcript) > 1500:
|
||||
transcript_preview += "\n... (truncated)"
|
||||
transcript_label = _t(config, "evidence_transcript")
|
||||
lines.append("<details>")
|
||||
lines.append("<summary>Execution transcript</summary>\n")
|
||||
lines.append(f"<summary>{transcript_label}</summary>\n")
|
||||
lines.append(transcript_preview)
|
||||
lines.append("\n</details>\n")
|
||||
|
||||
|
||||
@@ -233,11 +233,11 @@ class TestBaseRepoIsolation(unittest.TestCase):
|
||||
# ===================================================================
|
||||
|
||||
class TestInvokeAgentAgenticClaude(unittest.TestCase):
|
||||
"""invoke_agent_agentic builds correct cmd for claude (no -p, prompt as positional arg)."""
|
||||
"""invoke_agent_agentic builds correct cmd for claude (no -p, prompt via stdin)."""
|
||||
|
||||
@patch("cross_eval.worktree.capture_diff", return_value="diff --git a/file ...")
|
||||
@patch("subprocess.run")
|
||||
def test_claude_cmd_has_no_dash_p_and_prompt_as_positional(
|
||||
def test_claude_cmd_has_no_dash_p_and_prompt_via_stdin(
|
||||
self, mock_run: MagicMock, mock_diff: MagicMock,
|
||||
) -> None:
|
||||
mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="")
|
||||
@@ -271,8 +271,10 @@ class TestInvokeAgentAgenticClaude(unittest.TestCase):
|
||||
|
||||
# No -p flag
|
||||
self.assertNotIn("-p", cmd)
|
||||
# Last arg is a task file reference (not raw prompt — avoids arg length limits)
|
||||
self.assertIn("task file", cmd[-1].lower())
|
||||
# Prompt is delivered via stdin (input kwarg), not as a positional arg
|
||||
input_data = agent_call[1].get("input")
|
||||
self.assertIsNotNone(input_data)
|
||||
self.assertIn("implement feature X", input_data)
|
||||
|
||||
|
||||
class TestInvokeAgentAgenticCodex(unittest.TestCase):
|
||||
|
||||
@@ -391,5 +391,506 @@ class TestAgenticExpandedClaimMarkers(unittest.TestCase):
|
||||
self.assertEqual(ctx.exception.failure_type, "EMPTY_DIFF")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 5. Expanded claim/no-change markers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestExpandedClaimMarkers(unittest.TestCase):
|
||||
"""New claim markers detect additional Claude output patterns."""
|
||||
|
||||
def test_completed_all_the_changes(self) -> None:
|
||||
self.assertTrue(_claims_file_changes("I completed all the changes"))
|
||||
|
||||
def test_finished_implementing(self) -> None:
|
||||
self.assertTrue(_claims_file_changes("Finished implementing the feature"))
|
||||
|
||||
def test_all_tasks_completed(self) -> None:
|
||||
self.assertTrue(_claims_file_changes("All tasks completed successfully"))
|
||||
|
||||
def test_done_with_the_implementation(self) -> None:
|
||||
self.assertTrue(_claims_file_changes("Done with the implementation"))
|
||||
|
||||
def test_successfully_implemented(self) -> None:
|
||||
self.assertTrue(_claims_file_changes("Successfully implemented the changes"))
|
||||
|
||||
def test_changes_are_complete(self) -> None:
|
||||
self.assertTrue(_claims_file_changes("All changes are complete"))
|
||||
|
||||
|
||||
class TestExpandedNoChangeMarkers(unittest.TestCase):
|
||||
"""New no-change markers prevent false positives."""
|
||||
|
||||
def test_no_changes_needed(self) -> None:
|
||||
self.assertFalse(_claims_file_changes("No changes needed"))
|
||||
|
||||
def test_no_fixes_needed(self) -> None:
|
||||
self.assertFalse(_claims_file_changes("No fixes needed for this code"))
|
||||
|
||||
def test_code_is_correct_as_is(self) -> None:
|
||||
self.assertFalse(_claims_file_changes("The code is correct as-is"))
|
||||
|
||||
def test_already_correct(self) -> None:
|
||||
self.assertFalse(_claims_file_changes("Implementation is already correct"))
|
||||
|
||||
def test_no_action_required(self) -> None:
|
||||
self.assertFalse(_claims_file_changes("No action required"))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 6. Cross-iteration evidence propagation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCrossIterationEvidencePropagation(unittest.TestCase):
|
||||
"""Execution evidence from prior iterations is available to subsequent iterations."""
|
||||
|
||||
def test_prior_evidence_available_in_iteration_2(self) -> None:
|
||||
"""Review step in iteration 2 should see coding evidence from iteration 1."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
steps = [
|
||||
StepConfig(
|
||||
name="coding", agent="claude-coder", role="coding",
|
||||
prompt_template="default:coding", output_key="coding_output",
|
||||
),
|
||||
StepConfig(
|
||||
name="review", agent="claude-reviewer", role="review",
|
||||
prompt_template="default:review", output_key="review_result",
|
||||
verdict=True,
|
||||
),
|
||||
]
|
||||
config = PipelineConfig(
|
||||
output_dir=Path(tmpdir),
|
||||
max_iterations=2,
|
||||
min_iterations=1,
|
||||
language="en",
|
||||
inputs={"plan": "Test plan", "checklist": "Test checklist"},
|
||||
agents=dict(BUILTIN_AGENTS),
|
||||
coders=["claude-coder"],
|
||||
reviewers=["claude-reviewer"],
|
||||
pipeline=steps,
|
||||
preset_name="simple",
|
||||
)
|
||||
|
||||
captured_prompts: list[dict] = []
|
||||
|
||||
def _mock(agent_config, prompt, step_name, **kwargs):
|
||||
captured_prompts.append({
|
||||
"step_name": step_name,
|
||||
"prompt": prompt,
|
||||
})
|
||||
if step_name == "coding":
|
||||
return AgentResult(
|
||||
output="Implemented feature X",
|
||||
exit_code=0,
|
||||
agent_name=agent_config.name,
|
||||
step_name=step_name,
|
||||
duration_seconds=5.0,
|
||||
transcript="# Transcript\nclaude ran the task",
|
||||
command_preview="claude --setting-sources user",
|
||||
)
|
||||
# First review: FAIL, second review: PASS
|
||||
review_calls = [
|
||||
p for p in captured_prompts if p["step_name"] == "review"
|
||||
]
|
||||
if len(review_calls) <= 1:
|
||||
return AgentResult(
|
||||
output="Issues found\n\nVERDICT: FAIL",
|
||||
exit_code=0,
|
||||
agent_name=agent_config.name,
|
||||
step_name=step_name,
|
||||
duration_seconds=2.0,
|
||||
transcript="# Transcript\nreview ran",
|
||||
command_preview="claude -p --setting-sources user",
|
||||
)
|
||||
return AgentResult(
|
||||
output="All good\n\nVERDICT: PASS",
|
||||
exit_code=0,
|
||||
agent_name=agent_config.name,
|
||||
step_name=step_name,
|
||||
duration_seconds=2.0,
|
||||
)
|
||||
|
||||
with patch("cross_eval.pipeline.invoke_agent", side_effect=_mock):
|
||||
result = run_pipeline(config)
|
||||
|
||||
self.assertEqual(result.final_verdict, "PASS")
|
||||
self.assertEqual(len(result.iterations), 2)
|
||||
|
||||
# The review prompt in iteration 2 should reference prior evidence
|
||||
# (from iteration 1's coding step)
|
||||
iter2_review_prompts = [
|
||||
p for p in captured_prompts
|
||||
if p["step_name"] == "review"
|
||||
]
|
||||
# There should be 2 review prompts (one per iteration)
|
||||
self.assertEqual(len(iter2_review_prompts), 2)
|
||||
iter2_review = iter2_review_prompts[1]["prompt"]
|
||||
# Prior evidence should appear because it was carried forward
|
||||
# The review step runs after coding, so it sees current iteration's
|
||||
# coding evidence. But the key test is that evidence IS present.
|
||||
self.assertIn("Exit code: 0", iter2_review)
|
||||
self.assertIn("claude-coder", iter2_review)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 7. Report evidence summary table
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestReportEvidenceSummaryTable(unittest.TestCase):
|
||||
"""Report includes evidence summary table per iteration."""
|
||||
|
||||
def test_report_contains_evidence_summary(self) -> None:
|
||||
steps = [
|
||||
StepConfig(
|
||||
name="coding", agent="claude-coder", role="coding",
|
||||
prompt_template="default:coding", output_key="coding_output",
|
||||
),
|
||||
StepConfig(
|
||||
name="review", agent="claude-reviewer", role="review",
|
||||
prompt_template="default:review", output_key="review_result",
|
||||
verdict=True,
|
||||
),
|
||||
]
|
||||
config = PipelineConfig(
|
||||
max_iterations=1,
|
||||
language="en",
|
||||
inputs={"plan": "Plan", "checklist": "CL"},
|
||||
agents=dict(BUILTIN_AGENTS),
|
||||
pipeline=steps,
|
||||
preset_name="simple",
|
||||
)
|
||||
|
||||
coding_result = AgentResult(
|
||||
output="diff --git a/file ...",
|
||||
exit_code=0,
|
||||
agent_name="claude-coder",
|
||||
step_name="coding",
|
||||
duration_seconds=10.0,
|
||||
transcript="# Transcript",
|
||||
command_preview="claude --setting-sources user",
|
||||
)
|
||||
review_result = AgentResult(
|
||||
output="VERDICT: PASS",
|
||||
exit_code=0,
|
||||
agent_name="claude-reviewer",
|
||||
step_name="review",
|
||||
duration_seconds=5.0,
|
||||
transcript="# Transcript",
|
||||
command_preview="claude -p",
|
||||
)
|
||||
|
||||
iteration = IterationResult(
|
||||
iteration=1,
|
||||
step_results={
|
||||
"coding_output": coding_result,
|
||||
"review_result": review_result,
|
||||
},
|
||||
step_outputs={
|
||||
"coding_output": "diff --git a/file ...",
|
||||
"review_result": "VERDICT: PASS",
|
||||
},
|
||||
verdict="PASS",
|
||||
)
|
||||
|
||||
pipeline_result = PipelineResult(
|
||||
iterations=[iteration],
|
||||
final_verdict="PASS",
|
||||
total_duration=15.0,
|
||||
)
|
||||
|
||||
report = build_report(config, pipeline_result)
|
||||
self.assertIn("Evidence Summary", report)
|
||||
self.assertIn("claude-coder", report)
|
||||
self.assertIn("claude-reviewer", report)
|
||||
self.assertIn("10.0s", report)
|
||||
self.assertIn("5.0s", report)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 8. _build_context merges prior and current evidence
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestBuildContextMergesEvidence(unittest.TestCase):
|
||||
"""_build_context merges prior iteration evidence with current step evidence."""
|
||||
|
||||
def test_prior_evidence_used_when_no_current_results(self) -> None:
|
||||
from cross_eval.pipeline import _build_context
|
||||
input_contents = {
|
||||
"plan": "test",
|
||||
"execution_evidence": "### Step: coding (coder)\n- Exit code: 0",
|
||||
}
|
||||
context = _build_context(
|
||||
input_contents, {}, "feedback", 2, 5, step_results=None,
|
||||
)
|
||||
# Prior evidence should survive when there are no current results
|
||||
self.assertIn("coding (coder)", context["execution_evidence"])
|
||||
|
||||
def test_current_and_prior_merged(self) -> None:
|
||||
from cross_eval.pipeline import _build_context
|
||||
input_contents = {
|
||||
"plan": "test",
|
||||
"execution_evidence": "### Step: coding (coder)\n- Exit code: 0",
|
||||
}
|
||||
current_result = AgentResult(
|
||||
output="review text", exit_code=0, agent_name="reviewer",
|
||||
step_name="review", duration_seconds=3.0,
|
||||
command_preview="cmd",
|
||||
)
|
||||
context = _build_context(
|
||||
input_contents, {}, "feedback", 2, 5,
|
||||
step_results={"review_result": current_result},
|
||||
)
|
||||
evidence = context["execution_evidence"]
|
||||
# Both prior and current should appear
|
||||
self.assertIn("Prior Iteration Evidence", evidence)
|
||||
self.assertIn("Current Iteration Evidence", evidence)
|
||||
self.assertIn("coding (coder)", evidence)
|
||||
self.assertIn("reviewer", evidence)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 9. Evidence in review-only template (used by review-fix preset)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestReviewOnlyTemplateIncludesEvidence(unittest.TestCase):
|
||||
"""review-only template includes {execution_evidence} placeholder."""
|
||||
|
||||
def test_review_only_template_has_evidence_placeholder(self) -> None:
|
||||
from cross_eval.prompts import REVIEW_ONLY_TEMPLATE, REVIEW_ONLY_TEMPLATE_KO
|
||||
self.assertIn("{execution_evidence}", REVIEW_ONLY_TEMPLATE)
|
||||
self.assertIn("{execution_evidence}", REVIEW_ONLY_TEMPLATE_KO)
|
||||
|
||||
def test_review_only_renders_evidence(self) -> None:
|
||||
from cross_eval.prompts import render_template, REVIEW_ONLY_TEMPLATE
|
||||
context = {
|
||||
"plan": "Test plan",
|
||||
"checklist": "Test checklist",
|
||||
"docs": "Test docs",
|
||||
"feedback": "No feedback",
|
||||
"execution_evidence": "### Step: coding (coder)\n- Exit code: 0\n- Duration: 5.0s",
|
||||
"iteration": "1",
|
||||
"max_iterations": "3",
|
||||
}
|
||||
rendered = render_template(REVIEW_ONLY_TEMPLATE, context)
|
||||
self.assertIn("Exit code: 0", rendered)
|
||||
self.assertIn("Duration: 5.0s", rendered)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 10. Evidence propagation in phased pipeline (coding-review-fix)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestPhasedPipelineEvidencePropagation(unittest.TestCase):
|
||||
"""Evidence propagates correctly in coding-review-fix phased pipeline."""
|
||||
|
||||
def test_reviewer_receives_coding_evidence_in_phased_pipeline(self) -> None:
|
||||
"""In coding-review-fix, review-phase reviewers see coding-phase evidence."""
|
||||
from cross_eval.prompts import _build_coding_review_fix_preset
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
coders = ["claude-coder"]
|
||||
reviewers = ["claude-reviewer"]
|
||||
seniors = ["claude-senior"]
|
||||
phases = _build_coding_review_fix_preset(coders, reviewers, seniors)
|
||||
|
||||
config = PipelineConfig(
|
||||
output_dir=Path(tmpdir),
|
||||
max_iterations=5,
|
||||
min_iterations=1,
|
||||
language="en",
|
||||
inputs={"plan": "Test plan", "checklist": "Test checklist"},
|
||||
agents=dict(BUILTIN_AGENTS),
|
||||
coders=coders,
|
||||
reviewers=reviewers,
|
||||
seniors=seniors,
|
||||
phases=phases,
|
||||
preset_name="coding-review-fix",
|
||||
)
|
||||
|
||||
captured_prompts: list[dict] = []
|
||||
|
||||
def _mock(agent_config, prompt, step_name, **kwargs):
|
||||
captured_prompts.append({
|
||||
"step_name": step_name,
|
||||
"prompt": prompt,
|
||||
"agent_name": agent_config.name,
|
||||
})
|
||||
if step_name == "coding":
|
||||
return AgentResult(
|
||||
output="Implemented feature X",
|
||||
exit_code=0,
|
||||
agent_name=agent_config.name,
|
||||
step_name=step_name,
|
||||
duration_seconds=10.0,
|
||||
transcript="# Transcript\nclaude executed coding task",
|
||||
command_preview="claude --setting-sources user",
|
||||
)
|
||||
if step_name == "verify":
|
||||
return AgentResult(
|
||||
output="All good\n\nVERDICT: PASS",
|
||||
exit_code=0,
|
||||
agent_name=agent_config.name,
|
||||
step_name=step_name,
|
||||
duration_seconds=3.0,
|
||||
)
|
||||
return AgentResult(
|
||||
output=f"Output for {step_name}",
|
||||
exit_code=0,
|
||||
agent_name=agent_config.name,
|
||||
step_name=step_name,
|
||||
duration_seconds=2.0,
|
||||
transcript=f"# Transcript for {step_name}",
|
||||
command_preview=f"cmd-{step_name}",
|
||||
)
|
||||
|
||||
with patch("cross_eval.pipeline.invoke_agent", side_effect=_mock):
|
||||
result = run_pipeline(config)
|
||||
|
||||
self.assertEqual(result.final_verdict, "PASS")
|
||||
|
||||
# Check that review-phase reviewers received evidence
|
||||
review_prompts = [
|
||||
p for p in captured_prompts
|
||||
if p["step_name"].startswith("review_")
|
||||
]
|
||||
self.assertTrue(len(review_prompts) >= 1)
|
||||
# The review prompt should contain evidence from the coding phase
|
||||
review_prompt = review_prompts[0]["prompt"]
|
||||
self.assertIn("Execution Evidence", review_prompt)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 11. Evidence format includes output size
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestEvidenceIncludesOutputSize(unittest.TestCase):
|
||||
"""_format_execution_evidence includes output size for debugging."""
|
||||
|
||||
def test_output_size_in_evidence(self) -> None:
|
||||
result = AgentResult(
|
||||
output="x" * 500,
|
||||
exit_code=0,
|
||||
agent_name="claude-coder",
|
||||
step_name="coding",
|
||||
duration_seconds=5.0,
|
||||
command_preview="claude --setting-sources user",
|
||||
)
|
||||
evidence = _format_execution_evidence({"coding_output": result})
|
||||
self.assertIn("Output size: 500 chars", evidence)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 12. Report transcript label i18n
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestReportTranscriptLabelI18n(unittest.TestCase):
|
||||
"""Report uses translated transcript label."""
|
||||
|
||||
def test_korean_transcript_label(self) -> None:
|
||||
steps = [
|
||||
StepConfig(
|
||||
name="coding", agent="claude-coder", role="coding",
|
||||
prompt_template="default:coding", output_key="coding_output",
|
||||
),
|
||||
]
|
||||
config = PipelineConfig(
|
||||
max_iterations=1,
|
||||
language="ko",
|
||||
inputs={"plan": "Plan", "checklist": "CL"},
|
||||
agents=dict(BUILTIN_AGENTS),
|
||||
pipeline=steps,
|
||||
preset_name="simple",
|
||||
)
|
||||
|
||||
coding_result = AgentResult(
|
||||
output="diff --git a/file ...",
|
||||
exit_code=0,
|
||||
agent_name="claude-coder",
|
||||
step_name="coding",
|
||||
duration_seconds=10.0,
|
||||
transcript="# Agent Execution Transcript\n## Command\nclaude ...",
|
||||
command_preview="claude --setting-sources user",
|
||||
)
|
||||
|
||||
iteration = IterationResult(
|
||||
iteration=1,
|
||||
step_results={"coding_output": coding_result},
|
||||
step_outputs={"coding_output": "diff --git a/file ..."},
|
||||
)
|
||||
|
||||
pipeline_result = PipelineResult(
|
||||
iterations=[iteration],
|
||||
final_verdict="MAX_ITERATIONS_REACHED",
|
||||
total_duration=10.0,
|
||||
)
|
||||
|
||||
report = build_report(config, pipeline_result)
|
||||
self.assertIn("실행 트랜스크립트", report)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 13. Claude coder + Codex reviewer/senior combination
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCodingReviewFixClaudeCodexCombination(unittest.TestCase):
|
||||
"""coding-review-fix works with Claude as coder and Codex as reviewer/senior."""
|
||||
|
||||
def test_claude_coder_codex_reviewer_completes(self) -> None:
|
||||
"""Verify the preset completes with mixed Claude/Codex agents."""
|
||||
from cross_eval.prompts import _build_coding_review_fix_preset
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
coders = ["claude-coder"]
|
||||
reviewers = ["codex-reviewer"]
|
||||
seniors = ["codex-senior"]
|
||||
phases = _build_coding_review_fix_preset(coders, reviewers, seniors)
|
||||
|
||||
config = PipelineConfig(
|
||||
output_dir=Path(tmpdir),
|
||||
max_iterations=5,
|
||||
min_iterations=1,
|
||||
language="en",
|
||||
inputs={"plan": "Test plan", "checklist": "Test checklist"},
|
||||
agents=dict(BUILTIN_AGENTS),
|
||||
coders=coders,
|
||||
reviewers=reviewers,
|
||||
seniors=seniors,
|
||||
phases=phases,
|
||||
preset_name="coding-review-fix",
|
||||
)
|
||||
|
||||
def _mock(agent_config, prompt, step_name, **kwargs):
|
||||
if step_name == "verify":
|
||||
return AgentResult(
|
||||
output="All good\n\nVERDICT: PASS",
|
||||
exit_code=0,
|
||||
agent_name=agent_config.name,
|
||||
step_name=step_name,
|
||||
duration_seconds=2.0,
|
||||
transcript="# Transcript",
|
||||
command_preview="codex exec",
|
||||
)
|
||||
return AgentResult(
|
||||
output=f"Output for {step_name}",
|
||||
exit_code=0,
|
||||
agent_name=agent_config.name,
|
||||
step_name=step_name,
|
||||
duration_seconds=3.0,
|
||||
transcript=f"# Transcript for {step_name}",
|
||||
command_preview=f"cmd-{step_name}",
|
||||
)
|
||||
|
||||
with patch("cross_eval.pipeline.invoke_agent", side_effect=_mock):
|
||||
result = run_pipeline(config)
|
||||
|
||||
self.assertEqual(result.final_verdict, "PASS")
|
||||
# Verify both Claude and Codex agents were used
|
||||
all_agents = set()
|
||||
for ir in result.iterations:
|
||||
for ar in ir.step_results.values():
|
||||
all_agents.add(ar.agent_name)
|
||||
self.assertIn("claude-coder", all_agents)
|
||||
self.assertIn("codex-reviewer", all_agents)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user