1456 lines
51 KiB
Python
1456 lines
51 KiB
Python
"""Main pipeline execution engine."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import time
|
|
from hashlib import sha256
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from cross_eval.agent import AgentInvocationError, invoke_agent, invoke_agent_agentic
|
|
from cross_eval.worktree import WorktreeError
|
|
from cross_eval.config import try_reload_config
|
|
from cross_eval.discovery import discover_repo, format_repo_discovery
|
|
from cross_eval.models import (
|
|
AgentConfig,
|
|
AgentResult,
|
|
IterationResult,
|
|
PipelineConfig,
|
|
PipelineResult,
|
|
StepConfig,
|
|
)
|
|
from cross_eval.prompts import render_template, resolve_template, set_language
|
|
from cross_eval.report import build_report
|
|
from cross_eval.runtime_env import (
|
|
build_execution_policy,
|
|
build_runtime_environment,
|
|
summarize_environment,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def run_pipeline(
|
|
config: PipelineConfig,
|
|
cwd: Path | None = None,
|
|
dry_run: bool = False,
|
|
timeout: int | None = None,
|
|
) -> PipelineResult:
|
|
"""Execute the full cross-eval pipeline."""
|
|
# Create run directory: output/{preset}_{datetime}/
|
|
run_dir = _make_run_dir(config)
|
|
|
|
if config.phases:
|
|
return _run_phased_pipeline(config, run_dir, cwd, dry_run, timeout)
|
|
return _run_simple_pipeline(config, run_dir, cwd, dry_run, timeout)
|
|
|
|
|
|
def _make_run_dir(config: PipelineConfig) -> Path:
|
|
"""Create timestamped run directory under output_dir."""
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
run_dir = config.output_dir / f"{config.preset_name}_{ts}"
|
|
run_dir.mkdir(parents=True, exist_ok=True)
|
|
return run_dir
|
|
|
|
|
|
def _commit_iteration(
|
|
worktree_path: Path,
|
|
label: str,
|
|
iteration: int,
|
|
verdict: str | None,
|
|
) -> None:
|
|
"""Intermediate commit after each agentic iteration.
|
|
|
|
This resets the diff baseline so the next iteration only captures new changes.
|
|
"""
|
|
from cross_eval.worktree import commit_worktree
|
|
committed = commit_worktree(
|
|
worktree_path,
|
|
f"cross-eval: {label} v{iteration} ({verdict or 'no-verdict'})",
|
|
)
|
|
if committed:
|
|
logger.debug(" Intermediate commit: v%d (%s)", iteration, verdict)
|
|
|
|
|
|
def _has_agentic_steps(config: PipelineConfig, steps: list[StepConfig]) -> bool:
|
|
"""Check if any step uses an agentic agent."""
|
|
return any(
|
|
config.agents.get(s.agent, AgentConfig(name="", command="")).agentic
|
|
for s in steps
|
|
)
|
|
|
|
|
|
def _setup_worktree(cwd: Path, run_dir: Path, preset_name: str) -> tuple[Path, str]:
|
|
"""Create a shared worktree for the entire pipeline run.
|
|
|
|
1. Generate branch name (cross-eval/<preset>_<timestamp>)
|
|
2. Create branch from HEAD
|
|
3. Create worktree on that branch
|
|
|
|
Returns (worktree_path, branch_name).
|
|
"""
|
|
from cross_eval.worktree import create_worktree, make_branch_name, make_worktree_dir
|
|
branch_name = make_branch_name(preset_name)
|
|
worktree_dir = make_worktree_dir(cwd, branch_name)
|
|
worktree_path = create_worktree(
|
|
base_cwd=cwd, work_dir=worktree_dir, branch_name=branch_name,
|
|
)
|
|
(run_dir / "worktree_path.txt").write_text(f"{worktree_path}\n", encoding="utf-8")
|
|
(run_dir / "worktree_branch.txt").write_text(f"{branch_name}\n", encoding="utf-8")
|
|
return worktree_path, branch_name
|
|
|
|
|
|
def _snapshot_repo_state(cwd: Path) -> str:
|
|
"""Capture the base repository working-tree state.
|
|
|
|
This is used to detect agentic runs that accidentally modify the original
|
|
checkout instead of the isolated worktree.
|
|
"""
|
|
status = subprocess.run(
|
|
["git", "status", "--short", "--untracked-files=all"],
|
|
cwd=cwd,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if status.returncode != 0:
|
|
return ""
|
|
|
|
diff = subprocess.run(
|
|
["git", "diff", "--no-ext-diff", "--binary", "HEAD"],
|
|
cwd=cwd,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
cached_diff = subprocess.run(
|
|
["git", "diff", "--no-ext-diff", "--binary", "--cached"],
|
|
cwd=cwd,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
untracked = subprocess.run(
|
|
["git", "ls-files", "--others", "--exclude-standard", "-z"],
|
|
cwd=cwd,
|
|
capture_output=True,
|
|
)
|
|
|
|
parts = [
|
|
status.stdout,
|
|
diff.stdout,
|
|
cached_diff.stdout,
|
|
]
|
|
|
|
if untracked.returncode == 0 and untracked.stdout:
|
|
for rel_path in untracked.stdout.decode("utf-8", errors="replace").split("\0"):
|
|
if not rel_path:
|
|
continue
|
|
file_path = cwd / rel_path
|
|
if file_path.is_file():
|
|
digest = sha256(file_path.read_bytes()).hexdigest()
|
|
parts.append(f"UNTRACKED {rel_path} {digest}")
|
|
else:
|
|
parts.append(f"UNTRACKED {rel_path} (non-file)")
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
def _snapshot_repo_status(cwd: Path) -> str:
|
|
"""Capture a human-readable status summary for error reporting."""
|
|
result = subprocess.run(
|
|
["git", "status", "--short", "--untracked-files=all"],
|
|
cwd=cwd,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if result.returncode != 0:
|
|
return ""
|
|
return result.stdout.strip()
|
|
|
|
|
|
def _assert_base_repo_isolation(
|
|
cwd: Path,
|
|
baseline_state: str,
|
|
*,
|
|
step_name: str,
|
|
agent_name: str,
|
|
worktree_path: Path,
|
|
baseline_status: str,
|
|
) -> None:
|
|
"""Fail fast if an agentic run leaked changes into the base repo."""
|
|
current_state = _snapshot_repo_state(cwd)
|
|
if current_state == baseline_state:
|
|
return
|
|
|
|
current_status = _snapshot_repo_status(cwd)
|
|
before = baseline_status or "(clean)"
|
|
after = current_status or "(clean)"
|
|
raise WorktreeError(
|
|
"Agent modified the base repository instead of the isolated worktree.\n\n"
|
|
f"Step: {step_name}\n"
|
|
f"Agent: {agent_name}\n"
|
|
f"Worktree: {worktree_path}\n\n"
|
|
f"Baseline status:\n{before}\n\n"
|
|
f"Current status:\n{after}"
|
|
)
|
|
|
|
|
|
def _finalize_worktree(
|
|
cwd: Path,
|
|
worktree_path: Path,
|
|
branch_name: str,
|
|
preset_name: str,
|
|
final_verdict: str,
|
|
) -> str | None:
|
|
"""Commit changes on the branch, then remove the worktree.
|
|
|
|
The branch survives worktree removal and stays in the original repo.
|
|
Returns the branch name if changes were committed, None otherwise.
|
|
"""
|
|
from cross_eval.worktree import commit_worktree, remove_worktree
|
|
|
|
committed = False
|
|
try:
|
|
committed = commit_worktree(
|
|
worktree_path,
|
|
f"cross-eval: {preset_name} ({final_verdict})",
|
|
)
|
|
if committed:
|
|
logger.info(" Agentic changes committed on branch: %s", branch_name)
|
|
else:
|
|
logger.warning(" No agentic changes to commit (empty diff)")
|
|
except Exception:
|
|
logger.warning(" Failed to commit agentic changes", exc_info=True)
|
|
|
|
try:
|
|
remove_worktree(base_cwd=cwd, work_dir=worktree_path)
|
|
except Exception:
|
|
logger.warning("Failed to clean up worktree: %s", worktree_path)
|
|
|
|
# Check if branch has any commits beyond the base — if not, delete it
|
|
if not committed:
|
|
try:
|
|
# Check if branch has diverged from its base
|
|
result = subprocess.run(
|
|
["git", "log", "--oneline", f"HEAD..{branch_name}"],
|
|
cwd=cwd, capture_output=True, text=True,
|
|
)
|
|
if not result.stdout.strip():
|
|
# No commits on branch beyond base — clean up
|
|
subprocess.run(
|
|
["git", "branch", "-D", branch_name],
|
|
cwd=cwd, capture_output=True,
|
|
)
|
|
logger.info(" Deleted empty branch: %s", branch_name)
|
|
except Exception:
|
|
pass # best-effort cleanup
|
|
|
|
return branch_name if committed else None
|
|
|
|
|
|
def _run_simple_pipeline(
|
|
config: PipelineConfig,
|
|
run_dir: Path,
|
|
cwd: Path | None = None,
|
|
dry_run: bool = False,
|
|
timeout: int | None = None,
|
|
) -> PipelineResult:
|
|
"""Execute a simple (non-phased) pipeline."""
|
|
if cwd is None:
|
|
cwd = Path(os.getcwd())
|
|
|
|
set_language(config.language)
|
|
input_contents = _load_inputs(config)
|
|
runtime_env = _build_runtime_inputs(config, input_contents, cwd or Path(os.getcwd()))
|
|
|
|
# Setup shared worktree for agentic mode
|
|
worktree_path: Path | None = None
|
|
agentic_branch_name: str | None = None
|
|
base_repo_state: str | None = None
|
|
base_repo_status: str | None = None
|
|
if not dry_run and _has_agentic_steps(config, config.pipeline):
|
|
worktree_path, agentic_branch_name = _setup_worktree(
|
|
cwd, run_dir, config.preset_name,
|
|
)
|
|
base_repo_state = _snapshot_repo_state(cwd)
|
|
base_repo_status = _snapshot_repo_status(cwd)
|
|
|
|
feedback = "(no feedback — first iteration)"
|
|
iterations: list[IterationResult] = []
|
|
start_time = time.monotonic()
|
|
final_verdict = "MAX_ITERATIONS_REACHED"
|
|
aggregate_history: dict[str, int] = {}
|
|
aggregate_warnings: list[str] = []
|
|
escalated_issues: list[str] = []
|
|
all_feedbacks: list[str] = []
|
|
|
|
try:
|
|
for i in range(1, config.max_iterations + 1):
|
|
config = try_reload_config(config)
|
|
set_language(config.language)
|
|
_refresh_inputs(config, input_contents)
|
|
runtime_env = _build_runtime_inputs(config, input_contents, cwd)
|
|
|
|
logger.info("=" * 50)
|
|
logger.info(" Iteration %d/%d", i, config.max_iterations)
|
|
logger.info("=" * 50)
|
|
|
|
step_outputs, step_results, verdict = _run_steps(
|
|
config.pipeline, config, input_contents, feedback,
|
|
i, config.max_iterations, cwd, timeout, dry_run,
|
|
run_dir=run_dir, output_iter=i,
|
|
worktree_path=worktree_path,
|
|
runtime_env=runtime_env,
|
|
base_repo_state=base_repo_state,
|
|
base_repo_status=base_repo_status,
|
|
)
|
|
|
|
# Intermediate commit so next iteration's diff only shows new changes
|
|
if worktree_path is not None:
|
|
_commit_iteration(worktree_path, config.preset_name, i, verdict)
|
|
|
|
iter_result = IterationResult(
|
|
iteration=i,
|
|
step_results=step_results,
|
|
step_outputs=step_outputs,
|
|
verdict=verdict,
|
|
)
|
|
warning = _detect_repeated_aggregate(
|
|
config.pipeline, step_outputs, aggregate_history, iteration=i,
|
|
)
|
|
if warning:
|
|
iter_result.repeated_aggregate_warning = warning
|
|
aggregate_warnings.append(warning)
|
|
logger.warning(" %s", warning)
|
|
|
|
iter_result.feedback = _collect_feedback(config.pipeline, step_outputs)
|
|
feedback = iter_result.feedback or feedback
|
|
all_feedbacks.append(feedback)
|
|
|
|
# Extract tracker from verdict/review steps for next iteration
|
|
for step in config.pipeline:
|
|
if step.verdict or step.role == "review":
|
|
tracker = _extract_senior_tracker(
|
|
step_outputs.get(step.output_key, ""),
|
|
)
|
|
if tracker:
|
|
input_contents["previous_senior_tracker"] = tracker
|
|
|
|
iterations.append(iter_result)
|
|
|
|
# ESCALATE check (highest priority)
|
|
if verdict == "ESCALATE":
|
|
final_verdict = "ESCALATE"
|
|
for step in config.pipeline:
|
|
if step.verdict:
|
|
esc = _extract_escalated_issues(
|
|
step_outputs.get(step.output_key, ""),
|
|
)
|
|
if esc:
|
|
escalated_issues.append(esc)
|
|
iter_result.escalated_issues = esc
|
|
logger.info(" ESCALATE at iteration %d — stopping loop.", i)
|
|
break
|
|
|
|
if verdict == "PASS":
|
|
final_verdict = "PASS"
|
|
if i >= config.min_iterations:
|
|
logger.info(" PASS at iteration %d (min=%d reached)!", i, config.min_iterations)
|
|
break
|
|
else:
|
|
logger.info(
|
|
" PASS at iteration %d, but min_iterations=%d — continuing",
|
|
i, config.min_iterations,
|
|
)
|
|
|
|
# Auto-escalate: no senior/aggregator + repeated FAIL
|
|
has_aggregator = config.seniors or any(
|
|
s.prompt_template == "default:aggregate-review" for s in config.pipeline
|
|
)
|
|
if (
|
|
verdict == "FAIL"
|
|
and not has_aggregator
|
|
and i >= 2
|
|
and _detect_auto_escalate(all_feedbacks[:-1], feedback)
|
|
):
|
|
final_verdict = "ESCALATE"
|
|
auto_msg = (
|
|
f"Auto-escalated: same issues detected across {i} iterations "
|
|
f"without resolution (no senior reviewer configured)."
|
|
)
|
|
escalated_issues.append(auto_msg)
|
|
iter_result.escalated_issues = auto_msg
|
|
logger.info(" AUTO-ESCALATE at iteration %d", i)
|
|
break
|
|
|
|
if dry_run:
|
|
logger.info(" (dry-run: stopping after iteration 1)")
|
|
break
|
|
|
|
finally:
|
|
agentic_branch: str | None = None
|
|
if worktree_path is not None and agentic_branch_name is not None:
|
|
agentic_branch = _finalize_worktree(
|
|
cwd, worktree_path, agentic_branch_name,
|
|
config.preset_name, final_verdict,
|
|
)
|
|
|
|
total_duration = time.monotonic() - start_time
|
|
|
|
pipeline_result = PipelineResult(
|
|
iterations=iterations,
|
|
final_verdict=final_verdict,
|
|
total_duration=round(total_duration, 1),
|
|
run_dir=run_dir,
|
|
repeated_aggregate_warnings=aggregate_warnings,
|
|
escalated_issues=escalated_issues,
|
|
agentic_branch=agentic_branch,
|
|
)
|
|
|
|
if not dry_run:
|
|
_save_report(run_dir, config, pipeline_result)
|
|
|
|
return pipeline_result
|
|
|
|
|
|
def _run_phased_pipeline(
|
|
config: PipelineConfig,
|
|
run_dir: Path,
|
|
cwd: Path | None = None,
|
|
dry_run: bool = False,
|
|
timeout: int | None = None,
|
|
) -> PipelineResult:
|
|
"""Execute a multi-phase pipeline (e.g. review-fix)."""
|
|
if cwd is None:
|
|
cwd = Path(os.getcwd())
|
|
|
|
set_language(config.language)
|
|
input_contents = _load_inputs(config)
|
|
runtime_env = _build_runtime_inputs(config, input_contents, cwd)
|
|
|
|
# Setup shared worktree for agentic mode
|
|
all_phase_steps = [s for p in config.phases for s in p.steps]
|
|
worktree_path: Path | None = None
|
|
agentic_branch_name: str | None = None
|
|
base_repo_state: str | None = None
|
|
base_repo_status: str | None = None
|
|
if not dry_run and _has_agentic_steps(config, all_phase_steps):
|
|
worktree_path, agentic_branch_name = _setup_worktree(
|
|
cwd, run_dir, config.preset_name,
|
|
)
|
|
base_repo_state = _snapshot_repo_state(cwd)
|
|
base_repo_status = _snapshot_repo_status(cwd)
|
|
|
|
iterations: list[IterationResult] = []
|
|
feedback = "(no feedback — first iteration)"
|
|
start_time = time.monotonic()
|
|
final_verdict = "MAX_ITERATIONS_REACHED"
|
|
global_iter = 0
|
|
aggregate_history_by_phase: dict[str, dict[str, int]] = {}
|
|
aggregate_warnings: list[str] = []
|
|
escalated_issues: list[str] = []
|
|
all_feedbacks: list[str] = []
|
|
escalated = False
|
|
|
|
try:
|
|
for phase_idx, phase in enumerate(config.phases):
|
|
if escalated:
|
|
break
|
|
|
|
logger.info("=" * 60)
|
|
logger.info(
|
|
" Phase: %s (max_iter=%d, consecutive_pass=%d)",
|
|
phase.name, phase.max_iterations, phase.consecutive_pass,
|
|
)
|
|
logger.info("=" * 60)
|
|
|
|
consecutive_passes = 0
|
|
phase_converged = False
|
|
|
|
for pi in range(1, phase.max_iterations + 1):
|
|
global_iter += 1
|
|
|
|
config = try_reload_config(config)
|
|
set_language(config.language)
|
|
_refresh_inputs(config, input_contents)
|
|
runtime_env = _build_runtime_inputs(config, input_contents, cwd)
|
|
|
|
logger.info("-" * 50)
|
|
logger.info(
|
|
" [%s] Iteration %d/%d (global: v%d)",
|
|
phase.name, pi, phase.max_iterations, global_iter,
|
|
)
|
|
logger.info("-" * 50)
|
|
|
|
step_outputs, step_results, verdict = _run_steps(
|
|
phase.steps, config, input_contents, feedback,
|
|
pi, phase.max_iterations, cwd, timeout, dry_run,
|
|
run_dir=run_dir, output_iter=global_iter, phase_name=phase.name,
|
|
worktree_path=worktree_path,
|
|
runtime_env=runtime_env,
|
|
base_repo_state=base_repo_state,
|
|
base_repo_status=base_repo_status,
|
|
)
|
|
|
|
# Intermediate commit so next iteration's diff only shows new changes
|
|
if worktree_path is not None:
|
|
_commit_iteration(
|
|
worktree_path, f"{config.preset_name}/{phase.name}",
|
|
global_iter, verdict,
|
|
)
|
|
|
|
iter_result = IterationResult(
|
|
iteration=global_iter,
|
|
step_results=step_results,
|
|
step_outputs=step_outputs,
|
|
verdict=verdict,
|
|
phase_name=phase.name,
|
|
)
|
|
phase_history = aggregate_history_by_phase.setdefault(phase.name, {})
|
|
warning = _detect_repeated_aggregate(
|
|
phase.steps, step_outputs, phase_history, iteration=global_iter,
|
|
phase_name=phase.name,
|
|
)
|
|
if warning:
|
|
iter_result.repeated_aggregate_warning = warning
|
|
aggregate_warnings.append(warning)
|
|
logger.warning(" %s", warning)
|
|
|
|
iter_result.feedback = _collect_feedback(phase.steps, step_outputs)
|
|
feedback = iter_result.feedback or feedback
|
|
all_feedbacks.append(feedback)
|
|
|
|
# Extract tracker from verdict/review steps
|
|
for step in phase.steps:
|
|
if step.verdict or step.role == "review":
|
|
tracker = _extract_senior_tracker(
|
|
step_outputs.get(step.output_key, ""),
|
|
)
|
|
if tracker:
|
|
input_contents["previous_senior_tracker"] = tracker
|
|
|
|
iterations.append(iter_result)
|
|
|
|
# ESCALATE check
|
|
if verdict == "ESCALATE":
|
|
final_verdict = "ESCALATE"
|
|
for step in phase.steps:
|
|
if step.verdict:
|
|
esc = _extract_escalated_issues(
|
|
step_outputs.get(step.output_key, ""),
|
|
)
|
|
if esc:
|
|
escalated_issues.append(esc)
|
|
iter_result.escalated_issues = esc
|
|
logger.info(
|
|
" [%s] ESCALATE at iteration %d — stopping.",
|
|
phase.name, pi,
|
|
)
|
|
escalated = True
|
|
break
|
|
|
|
if verdict is None:
|
|
logger.info(
|
|
" [%s] completed (no verdict step; single-pass phase)",
|
|
phase.name,
|
|
)
|
|
phase_converged = True
|
|
break
|
|
|
|
if verdict == "PASS":
|
|
consecutive_passes += 1
|
|
logger.info(
|
|
" [%s] PASS (%d/%d consecutive)",
|
|
phase.name, consecutive_passes, phase.consecutive_pass,
|
|
)
|
|
if consecutive_passes >= phase.consecutive_pass:
|
|
logger.info(
|
|
" [%s] Converged! %d consecutive PASSes.",
|
|
phase.name, phase.consecutive_pass,
|
|
)
|
|
phase_converged = True
|
|
break
|
|
else:
|
|
consecutive_passes = 0
|
|
|
|
# Auto-escalate in phased pipeline
|
|
has_aggregator = config.seniors or any(
|
|
s.prompt_template == "default:aggregate-review" for s in phase.steps
|
|
)
|
|
if (
|
|
verdict == "FAIL"
|
|
and not has_aggregator
|
|
and pi >= 2
|
|
and _detect_auto_escalate(all_feedbacks[:-1], feedback)
|
|
):
|
|
final_verdict = "ESCALATE"
|
|
auto_msg = (
|
|
f"Auto-escalated: same issues detected across {pi} iterations "
|
|
f"in phase '{phase.name}' without resolution."
|
|
)
|
|
escalated_issues.append(auto_msg)
|
|
iter_result.escalated_issues = auto_msg
|
|
logger.info(" [%s] AUTO-ESCALATE at iteration %d", phase.name, pi)
|
|
escalated = True
|
|
break
|
|
|
|
if dry_run:
|
|
break
|
|
|
|
if escalated:
|
|
break
|
|
|
|
if phase_converged:
|
|
logger.info(" Phase '%s' completed: CONVERGED", phase.name)
|
|
else:
|
|
logger.info(
|
|
" Phase '%s' completed: max iterations (%d) reached",
|
|
phase.name, phase.max_iterations,
|
|
)
|
|
|
|
if phase_idx == len(config.phases) - 1:
|
|
final_verdict = "PASS" if phase_converged else "MAX_ITERATIONS_REACHED"
|
|
|
|
finally:
|
|
agentic_branch: str | None = None
|
|
if worktree_path is not None and agentic_branch_name is not None:
|
|
agentic_branch = _finalize_worktree(
|
|
cwd, worktree_path, agentic_branch_name,
|
|
config.preset_name, final_verdict,
|
|
)
|
|
|
|
total_duration = time.monotonic() - start_time
|
|
|
|
pipeline_result = PipelineResult(
|
|
iterations=iterations,
|
|
final_verdict=final_verdict,
|
|
total_duration=round(total_duration, 1),
|
|
run_dir=run_dir,
|
|
repeated_aggregate_warnings=aggregate_warnings,
|
|
escalated_issues=escalated_issues,
|
|
agentic_branch=agentic_branch,
|
|
)
|
|
|
|
if not dry_run:
|
|
_save_report(run_dir, config, pipeline_result)
|
|
|
|
return pipeline_result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _load_inputs(config: PipelineConfig) -> dict[str, str]:
|
|
"""Load input file contents from config."""
|
|
input_contents: dict[str, str] = {}
|
|
for key, val in config.inputs.items():
|
|
if isinstance(val, str):
|
|
input_contents[key] = val
|
|
else:
|
|
input_contents[key] = val.read_text(encoding="utf-8")
|
|
return input_contents
|
|
|
|
|
|
def _refresh_inputs(
|
|
config: PipelineConfig, input_contents: dict[str, str],
|
|
) -> None:
|
|
"""Re-read input files (they may have changed on disk)."""
|
|
for key, val in config.inputs.items():
|
|
if isinstance(val, str):
|
|
input_contents[key] = val
|
|
elif isinstance(val, Path) and val.exists():
|
|
input_contents[key] = val.read_text(encoding="utf-8")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Parallel step grouping
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _get_step_dependencies(step: StepConfig) -> set[str]:
|
|
"""Extract output_key references from context_override values."""
|
|
deps: set[str] = set()
|
|
for val in step.context_override.values():
|
|
for match in re.finditer(r"\{(\w+)\}", val):
|
|
deps.add(match.group(1))
|
|
return deps
|
|
|
|
|
|
def _group_parallel_steps(steps: list[StepConfig]) -> list[list[StepConfig]]:
|
|
"""Group consecutive parallel steps into batches.
|
|
|
|
Consecutive steps with parallel=True are grouped together,
|
|
but a new batch starts when a step depends on an output_key
|
|
from a step in the current batch (dependency breaking).
|
|
"""
|
|
batches: list[list[StepConfig]] = []
|
|
current: list[StepConfig] = []
|
|
current_output_keys: set[str] = set()
|
|
|
|
for step in steps:
|
|
if not step.parallel:
|
|
if current:
|
|
batches.append(current)
|
|
current = []
|
|
current_output_keys = set()
|
|
batches.append([step])
|
|
continue
|
|
|
|
# Check if this step depends on any output from the current batch
|
|
deps = _get_step_dependencies(step)
|
|
if deps & current_output_keys:
|
|
batches.append(current)
|
|
current = []
|
|
current_output_keys = set()
|
|
|
|
current.append(step)
|
|
current_output_keys.add(step.output_key)
|
|
|
|
if current:
|
|
batches.append(current)
|
|
|
|
return batches
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Step execution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _run_steps(
|
|
steps: list[StepConfig],
|
|
config: PipelineConfig,
|
|
input_contents: dict[str, str],
|
|
feedback: str,
|
|
iteration: int,
|
|
max_iterations: int,
|
|
cwd: Path,
|
|
timeout: int | None,
|
|
dry_run: bool,
|
|
*,
|
|
run_dir: Path,
|
|
output_iter: int,
|
|
phase_name: str | None = None,
|
|
worktree_path: Path | None = None,
|
|
runtime_env: dict[str, str] | None = None,
|
|
base_repo_state: str | None = None,
|
|
base_repo_status: str | None = None,
|
|
) -> tuple[dict[str, str], dict[str, AgentResult], str | None]:
|
|
"""Execute all steps in one iteration, parallelizing where possible."""
|
|
step_outputs: dict[str, str] = {}
|
|
step_results: dict[str, AgentResult] = {}
|
|
verdict: str | None = None
|
|
|
|
batches = _group_parallel_steps(steps)
|
|
|
|
for batch in batches:
|
|
if len(batch) == 1:
|
|
step = batch[0]
|
|
_execute_step(
|
|
step, config, input_contents, feedback,
|
|
iteration, max_iterations, cwd, timeout, dry_run,
|
|
step_outputs, step_results,
|
|
run_dir=run_dir, output_iter=output_iter,
|
|
phase_name=phase_name, worktree_path=worktree_path,
|
|
runtime_env=runtime_env,
|
|
base_repo_state=base_repo_state,
|
|
base_repo_status=base_repo_status,
|
|
)
|
|
else:
|
|
_execute_parallel_batch(
|
|
batch, config, input_contents, feedback,
|
|
iteration, max_iterations, cwd, timeout, dry_run,
|
|
step_outputs, step_results,
|
|
run_dir=run_dir, output_iter=output_iter,
|
|
phase_name=phase_name, worktree_path=worktree_path,
|
|
runtime_env=runtime_env,
|
|
base_repo_state=base_repo_state,
|
|
base_repo_status=base_repo_status,
|
|
)
|
|
|
|
# Extract verdict from all verdict steps (ALL must PASS; ESCALATE wins over all)
|
|
for step in steps:
|
|
if step.verdict:
|
|
output = step_outputs.get(step.output_key, "")
|
|
step_verdict = _extract_verdict(output, step.verdict_pattern)
|
|
logger.info(" [%s] verdict: %s", step.name, step_verdict)
|
|
if step_verdict == "ESCALATE":
|
|
verdict = "ESCALATE"
|
|
elif verdict is None:
|
|
verdict = step_verdict
|
|
elif verdict != "ESCALATE" and step_verdict == "FAIL":
|
|
verdict = "FAIL"
|
|
|
|
return step_outputs, step_results, verdict
|
|
|
|
|
|
def _invoke_agentic(
|
|
agent_config: AgentConfig,
|
|
prompt: str,
|
|
step_name: str,
|
|
*,
|
|
worktree_path: Path,
|
|
env: dict[str, str] | None = None,
|
|
timeout: int | None = None,
|
|
quiet: bool = False,
|
|
) -> AgentResult:
|
|
"""Run an agent in agentic mode using an existing worktree."""
|
|
return invoke_agent_agentic(
|
|
agent_config, prompt, step_name,
|
|
worktree_path=worktree_path,
|
|
env=env,
|
|
timeout=timeout, quiet=quiet,
|
|
)
|
|
|
|
|
|
def _execute_step(
|
|
step: StepConfig,
|
|
config: PipelineConfig,
|
|
input_contents: dict[str, str],
|
|
feedback: str,
|
|
iteration: int,
|
|
max_iterations: int,
|
|
cwd: Path,
|
|
timeout: int | None,
|
|
dry_run: bool,
|
|
step_outputs: dict[str, str],
|
|
step_results: dict[str, AgentResult],
|
|
*,
|
|
run_dir: Path,
|
|
output_iter: int,
|
|
phase_name: str | None = None,
|
|
quiet: bool = False,
|
|
worktree_path: Path | None = None,
|
|
runtime_env: dict[str, str] | None = None,
|
|
base_repo_state: str | None = None,
|
|
base_repo_status: str | None = None,
|
|
) -> None:
|
|
"""Execute a single step, updating step_outputs and step_results in place."""
|
|
if not quiet:
|
|
logger.info(" [%s] agent='%s' role='%s'", step.name, step.agent, step.role)
|
|
|
|
# 1. Resolve template
|
|
template = resolve_template(step.prompt_template)
|
|
|
|
# 2. Build context (include prior step results for evidence)
|
|
context = _build_context(
|
|
input_contents, step_outputs, feedback, iteration, max_iterations,
|
|
step_results=step_results,
|
|
)
|
|
|
|
# 3. Apply context overrides
|
|
if step.context_override:
|
|
context = _apply_context_override(context, step.context_override)
|
|
|
|
# 4. Render prompt
|
|
prompt = render_template(template, context)
|
|
prompt = _augment_prompt_with_runtime_context(prompt, context)
|
|
|
|
# 5. Dry run: print and skip
|
|
if dry_run:
|
|
phase_label = f" phase={phase_name}" if phase_name else ""
|
|
print(f"\n--- Step: {step.name} (agent={step.agent}{phase_label}) ---")
|
|
print(prompt)
|
|
print(f"--- end {step.name} ---\n")
|
|
step_outputs[step.output_key] = f"(dry-run: no output for {step.output_key})"
|
|
return
|
|
|
|
# 6. Invoke agent
|
|
agent_config = config.agents[step.agent]
|
|
try:
|
|
if agent_config.agentic and worktree_path:
|
|
result = _invoke_agentic(
|
|
agent_config, prompt, step.name,
|
|
worktree_path=worktree_path,
|
|
env=runtime_env,
|
|
timeout=timeout, quiet=quiet,
|
|
)
|
|
else:
|
|
# When worktree exists, run non-agentic agents (reviewers) in
|
|
# the worktree too so they can inspect the modified files.
|
|
effective_cwd = worktree_path if worktree_path else cwd
|
|
result = invoke_agent(
|
|
agent_config, prompt, step.name,
|
|
cwd=effective_cwd, env=runtime_env, timeout=timeout, quiet=quiet,
|
|
)
|
|
except subprocess.TimeoutExpired as e:
|
|
stdout = (e.stdout or b"") if isinstance(e.stdout, bytes) else (e.stdout or "")
|
|
stderr = (e.stderr or b"") if isinstance(e.stderr, bytes) else (e.stderr or "")
|
|
if isinstance(stdout, bytes):
|
|
stdout = stdout.decode("utf-8", errors="replace")
|
|
if isinstance(stderr, bytes):
|
|
stderr = stderr.decode("utf-8", errors="replace")
|
|
phase_info = f"- **Phase**: {phase_name}\n" if phase_name else ""
|
|
error_msg = (
|
|
f"# Agent Timeout\n\n"
|
|
f"{phase_info}"
|
|
f"- **Step**: {step.name}\n"
|
|
f"- **Agent**: {step.agent}\n"
|
|
f"- **Timeout**: {timeout}s\n\n"
|
|
f"Partial stdout ({len(stdout)} chars):\n"
|
|
f"```\n{stdout[:2000] or '(none)'}\n```\n\n"
|
|
f"Stderr:\n```\n{stderr[:2000] or '(none)'}\n```\n"
|
|
)
|
|
_save_step_output(run_dir, output_iter, f"{step.name}_error", error_msg)
|
|
logger.error(" [%s] TIMEOUT after %ss — saved to output", step.name, timeout)
|
|
raise RuntimeError(
|
|
f"Agent '{step.agent}' timed out after {timeout}s at step '{step.name}'. "
|
|
f"Error saved to {run_dir}/v{output_iter}/{step.name}_error.md. "
|
|
f"Try --timeout 0 (unlimited)"
|
|
)
|
|
except RuntimeError as e:
|
|
error_msg = _format_runtime_error_markdown(
|
|
e,
|
|
step_name=step.name,
|
|
agent_name=step.agent,
|
|
phase_name=phase_name,
|
|
)
|
|
_save_step_output(run_dir, output_iter, f"{step.name}_error", error_msg)
|
|
logger.error(" [%s] FAILED — saved to output", step.name)
|
|
raise
|
|
|
|
# 7. Store output
|
|
if worktree_path is not None and base_repo_state is not None:
|
|
_assert_base_repo_isolation(
|
|
cwd,
|
|
base_repo_state,
|
|
step_name=step.name,
|
|
agent_name=step.agent,
|
|
worktree_path=worktree_path,
|
|
baseline_status=base_repo_status or "",
|
|
)
|
|
|
|
step_outputs[step.output_key] = result.output
|
|
step_results[step.output_key] = result
|
|
|
|
if not quiet:
|
|
logger.info(
|
|
" [%s] completed (%.1fs, %d chars)",
|
|
step.name, result.duration_seconds, len(result.output),
|
|
)
|
|
|
|
# 8. Save to disk
|
|
_save_step_output(run_dir, output_iter, step.name, result.output)
|
|
_maybe_save_step_transcript(run_dir, output_iter, step.name, result)
|
|
|
|
|
|
def _execute_parallel_batch(
|
|
batch: list[StepConfig],
|
|
config: PipelineConfig,
|
|
input_contents: dict[str, str],
|
|
feedback: str,
|
|
iteration: int,
|
|
max_iterations: int,
|
|
cwd: Path,
|
|
timeout: int | None,
|
|
dry_run: bool,
|
|
step_outputs: dict[str, str],
|
|
step_results: dict[str, AgentResult],
|
|
*,
|
|
run_dir: Path,
|
|
output_iter: int,
|
|
phase_name: str | None = None,
|
|
worktree_path: Path | None = None,
|
|
runtime_env: dict[str, str] | None = None,
|
|
base_repo_state: str | None = None,
|
|
base_repo_status: str | None = None,
|
|
) -> None:
|
|
"""Execute multiple steps in parallel using threads."""
|
|
agent_names = ", ".join(s.agent for s in batch)
|
|
logger.info(" [parallel] %d agents: %s", len(batch), agent_names)
|
|
|
|
if dry_run:
|
|
for step in batch:
|
|
_execute_step(
|
|
step, config, input_contents, feedback,
|
|
iteration, max_iterations, cwd, timeout, dry_run,
|
|
step_outputs, step_results,
|
|
run_dir=run_dir, output_iter=output_iter, phase_name=phase_name,
|
|
base_repo_state=base_repo_state,
|
|
base_repo_status=base_repo_status,
|
|
)
|
|
return
|
|
|
|
# Agentic steps cannot run in parallel (they share a worktree)
|
|
agentic_in_batch = [
|
|
s for s in batch
|
|
if config.agents.get(s.agent, AgentConfig(name="", command="")).agentic
|
|
]
|
|
if len(agentic_in_batch) > 1:
|
|
logger.warning(
|
|
" [parallel] %d agentic steps cannot run concurrently — running sequentially",
|
|
len(agentic_in_batch),
|
|
)
|
|
for step in batch:
|
|
_execute_step(
|
|
step, config, input_contents, feedback,
|
|
iteration, max_iterations, cwd, timeout, dry_run,
|
|
step_outputs, step_results,
|
|
run_dir=run_dir, output_iter=output_iter,
|
|
phase_name=phase_name, worktree_path=worktree_path,
|
|
base_repo_state=base_repo_state,
|
|
base_repo_status=base_repo_status,
|
|
)
|
|
return
|
|
|
|
# Snapshot context before parallel execution (all steps see same state)
|
|
context_snapshot = dict(input_contents)
|
|
context_snapshot.update(step_outputs)
|
|
results_snapshot = dict(step_results)
|
|
|
|
# Collect results from parallel threads
|
|
local_outputs: dict[str, str] = {}
|
|
local_results: dict[str, AgentResult] = {}
|
|
errors: list[tuple[StepConfig, Exception]] = []
|
|
|
|
# Show a single spinner for the batch
|
|
from cross_eval.agent import _Spinner
|
|
spinner = _Spinner(
|
|
f"[parallel] {len(batch)} agents running ({agent_names})..."
|
|
)
|
|
spinner.start()
|
|
batch_start = time.monotonic()
|
|
|
|
def _run_one(step: StepConfig) -> tuple[str, str, AgentResult]:
|
|
"""Run one step, return (output_key, output, result)."""
|
|
template = resolve_template(step.prompt_template)
|
|
context = _build_context(
|
|
context_snapshot, {}, feedback, iteration, max_iterations,
|
|
step_results=results_snapshot,
|
|
)
|
|
if step.context_override:
|
|
context = _apply_context_override(context, step.context_override)
|
|
prompt = render_template(template, context)
|
|
prompt = _augment_prompt_with_runtime_context(prompt, context)
|
|
|
|
agent_config = config.agents[step.agent]
|
|
if agent_config.agentic and worktree_path:
|
|
result = _invoke_agentic(
|
|
agent_config, prompt, step.name,
|
|
worktree_path=worktree_path,
|
|
env=runtime_env,
|
|
timeout=timeout, quiet=True,
|
|
)
|
|
else:
|
|
effective_cwd = worktree_path if worktree_path else cwd
|
|
result = invoke_agent(
|
|
agent_config, prompt, step.name,
|
|
cwd=effective_cwd, env=runtime_env, timeout=timeout, quiet=True,
|
|
)
|
|
return step.output_key, result.output, result
|
|
|
|
with ThreadPoolExecutor(max_workers=len(batch)) as executor:
|
|
futures = {executor.submit(_run_one, step): step for step in batch}
|
|
for future in as_completed(futures):
|
|
step = futures[future]
|
|
try:
|
|
output_key, output, result = future.result()
|
|
local_results[output_key] = result
|
|
local_outputs[output_key] = output
|
|
except Exception as e:
|
|
errors.append((step, e))
|
|
|
|
batch_elapsed = round(time.monotonic() - batch_start, 1)
|
|
|
|
# Persist successful outputs even if a sibling step failed.
|
|
if worktree_path is not None and base_repo_state is not None:
|
|
_assert_base_repo_isolation(
|
|
cwd,
|
|
base_repo_state,
|
|
step_name=phase_name or "parallel-batch",
|
|
agent_name=agent_names,
|
|
worktree_path=worktree_path,
|
|
baseline_status=base_repo_status or "",
|
|
)
|
|
|
|
for step in batch:
|
|
key = step.output_key
|
|
if key not in local_outputs:
|
|
continue
|
|
step_outputs[key] = local_outputs[key]
|
|
step_results[key] = local_results[key]
|
|
r = local_results[key]
|
|
logger.info(
|
|
" [%s] completed (%.1fs, %d chars)",
|
|
step.name, r.duration_seconds, len(r.output),
|
|
)
|
|
_save_step_output(run_dir, output_iter, step.name, r.output)
|
|
_maybe_save_step_transcript(run_dir, output_iter, step.name, r)
|
|
|
|
if errors:
|
|
spinner.stop(f"[parallel] FAILED ({batch_elapsed}s)")
|
|
for failed_step, exc in errors:
|
|
if isinstance(exc, subprocess.TimeoutExpired):
|
|
stdout = (exc.stdout or b"") if isinstance(exc.stdout, bytes) else (exc.stdout or "")
|
|
stderr = (exc.stderr or b"") if isinstance(exc.stderr, bytes) else (exc.stderr or "")
|
|
if isinstance(stdout, bytes):
|
|
stdout = stdout.decode("utf-8", errors="replace")
|
|
if isinstance(stderr, bytes):
|
|
stderr = stderr.decode("utf-8", errors="replace")
|
|
phase_info = f"- **Phase**: {phase_name}\n" if phase_name else ""
|
|
error_msg = (
|
|
f"# Agent Timeout\n\n"
|
|
f"{phase_info}"
|
|
f"- **Step**: {failed_step.name}\n"
|
|
f"- **Agent**: {failed_step.agent}\n"
|
|
f"- **Timeout**: {timeout}s\n\n"
|
|
f"Partial stdout ({len(stdout)} chars):\n"
|
|
f"```\n{stdout[:2000] or '(none)'}\n```\n\n"
|
|
f"Stderr:\n```\n{stderr[:2000] or '(none)'}\n```\n"
|
|
)
|
|
else:
|
|
error_msg = _format_runtime_error_markdown(
|
|
exc,
|
|
step_name=failed_step.name,
|
|
agent_name=failed_step.agent,
|
|
phase_name=phase_name,
|
|
)
|
|
_save_step_output(run_dir, output_iter, f"{failed_step.name}_error", error_msg)
|
|
logger.error(" [%s] FAILED — saved to output", failed_step.name)
|
|
|
|
failed_steps = ", ".join(step.name for step, _ in errors)
|
|
saved_steps = ", ".join(step.name for step in batch if step.output_key in local_outputs)
|
|
first_error = errors[0][1]
|
|
saved_note = f" Successful outputs were saved for: {saved_steps}." if saved_steps else ""
|
|
raise RuntimeError(
|
|
f"Parallel batch failed: {len(errors)}/{len(batch)} steps failed ({failed_steps})."
|
|
f"{saved_note} First error:\n{first_error}"
|
|
)
|
|
|
|
spinner.stop(f"[parallel] {len(batch)} agents done ({batch_elapsed}s)")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Context and template helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _build_context(
|
|
input_contents: dict[str, str],
|
|
step_outputs: dict[str, str],
|
|
feedback: str,
|
|
iteration: int,
|
|
max_iterations: int,
|
|
step_results: dict[str, AgentResult] | None = None,
|
|
) -> dict[str, str]:
|
|
"""Build the template context dict."""
|
|
context: dict[str, str] = {}
|
|
context.update(input_contents)
|
|
context.update(step_outputs)
|
|
context["feedback"] = feedback
|
|
context["iteration"] = str(iteration)
|
|
context["max_iterations"] = str(max_iterations)
|
|
# Surface execution evidence from prior steps so reviewers can inspect it
|
|
if step_results:
|
|
context["execution_evidence"] = _format_execution_evidence(step_results)
|
|
return context
|
|
|
|
|
|
def _format_execution_evidence(
|
|
step_results: dict[str, AgentResult],
|
|
) -> str:
|
|
"""Format execution evidence from prior steps for reviewer consumption.
|
|
|
|
Produces a compact summary of command, exit code, duration, and a truncated
|
|
transcript excerpt for each completed step so that reviewers and seniors
|
|
can verify claims against real execution data.
|
|
"""
|
|
if not step_results:
|
|
return "(no prior execution evidence)"
|
|
parts: list[str] = []
|
|
for key, result in step_results.items():
|
|
section = [
|
|
f"### Step: {result.step_name} ({result.agent_name})",
|
|
f"- Command: `{result.command_preview}`" if result.command_preview else "",
|
|
f"- Exit code: {result.exit_code}",
|
|
f"- Duration: {result.duration_seconds}s",
|
|
]
|
|
section = [line for line in section if line]
|
|
if result.transcript:
|
|
# Include a truncated transcript excerpt for debugging
|
|
excerpt = result.transcript[:2000]
|
|
if len(result.transcript) > 2000:
|
|
excerpt += "\n... (truncated)"
|
|
section.append(f"\n<details>\n<summary>Transcript excerpt</summary>\n\n{excerpt}\n</details>")
|
|
parts.append("\n".join(section))
|
|
return "\n\n---\n\n".join(parts)
|
|
|
|
|
|
def _build_runtime_inputs(
|
|
config: PipelineConfig,
|
|
input_contents: dict[str, str],
|
|
cwd: Path,
|
|
) -> dict[str, str]:
|
|
"""Load runtime env and expose safe execution hints to prompts."""
|
|
env, loaded_files, loaded_values = build_runtime_environment(config.execution, cwd)
|
|
discovery = discover_repo(cwd, set(loaded_values) | set(env))
|
|
input_contents["execution_policy"] = build_execution_policy(config.execution)
|
|
input_contents["environment_context"] = summarize_environment(
|
|
config.execution, loaded_files, env, loaded_values,
|
|
)
|
|
input_contents["repo_discovery"] = format_repo_discovery(discovery)
|
|
return env
|
|
|
|
|
|
def _augment_prompt_with_runtime_context(
|
|
prompt: str,
|
|
context: dict[str, str],
|
|
) -> str:
|
|
"""Append execution/env guidance without requiring every template to include placeholders."""
|
|
extras: list[str] = []
|
|
if context.get("execution_policy"):
|
|
extras.append("## Execution Policy\n" + context["execution_policy"])
|
|
if context.get("environment_context"):
|
|
extras.append("## Environment Context\n" + context["environment_context"])
|
|
if context.get("repo_discovery"):
|
|
extras.append("## Repository Discovery\n" + context["repo_discovery"])
|
|
if not extras:
|
|
return prompt
|
|
return prompt.rstrip() + "\n\n" + "\n\n".join(extras) + "\n"
|
|
|
|
|
|
def _apply_context_override(
|
|
context: dict[str, str],
|
|
overrides: dict[str, str],
|
|
) -> dict[str, str]:
|
|
"""Apply context_override mappings for cross-review scenarios."""
|
|
result = dict(context)
|
|
for key, value_template in overrides.items():
|
|
result[key] = render_template(value_template, context)
|
|
return result
|
|
|
|
|
|
def _collect_feedback(
|
|
steps: list[StepConfig],
|
|
step_outputs: dict[str, str],
|
|
) -> str:
|
|
"""Collect feedback from all verdict steps.
|
|
|
|
Single verdict step → raw output (backward compatible).
|
|
Multiple verdict steps → combined with agent headers for cross-referencing.
|
|
"""
|
|
verdict_steps = [s for s in steps if s.verdict]
|
|
if len(verdict_steps) == 1:
|
|
return step_outputs.get(verdict_steps[0].output_key, "")
|
|
parts: list[str] = []
|
|
for s in verdict_steps:
|
|
output = step_outputs.get(s.output_key, "")
|
|
if output:
|
|
parts.append(f"## Review by {s.agent} ({s.name})\n{output}")
|
|
return "\n\n---\n\n".join(parts)
|
|
|
|
|
|
def _detect_repeated_aggregate(
|
|
steps: list[StepConfig],
|
|
step_outputs: dict[str, str],
|
|
history: dict[str, int],
|
|
*,
|
|
iteration: int,
|
|
phase_name: str | None = None,
|
|
) -> str | None:
|
|
"""Detect repeated aggregate-review outputs across iterations."""
|
|
for step in steps:
|
|
if step.prompt_template != "default:aggregate-review":
|
|
continue
|
|
output = step_outputs.get(step.output_key, "")
|
|
normalized = _normalize_aggregate_output(output)
|
|
if not normalized:
|
|
return None
|
|
if normalized in history:
|
|
prev_iter = history[normalized]
|
|
phase_prefix = f"[{phase_name}] " if phase_name else ""
|
|
return (
|
|
f"{phase_prefix}Repeated aggregate_review detected at iteration {iteration} "
|
|
f"(same as iteration {prev_iter})."
|
|
)
|
|
history[normalized] = iteration
|
|
return None
|
|
return None
|
|
|
|
|
|
def _normalize_aggregate_output(output: str) -> str:
|
|
"""Normalize aggregate output for repeat detection."""
|
|
return " ".join(output.lower().split())
|
|
|
|
|
|
_ESCALATE_PATTERN = re.compile(r"VERDICT:\s*ESCALATE", re.IGNORECASE)
|
|
|
|
_TRACKER_TABLE_PATTERN = re.compile(
|
|
r"(##+ Issue Tracker[^\n]*\n(?:\|[^\n]+\|\n?)+)", re.DOTALL,
|
|
)
|
|
|
|
|
|
def _extract_verdict(output: str, pattern: str) -> str:
|
|
"""Extract PASS, FAIL, or ESCALATE from output using regex pattern."""
|
|
if re.search(_ESCALATE_PATTERN, output):
|
|
return "ESCALATE" # highest priority
|
|
if re.search(pattern, output):
|
|
return "PASS"
|
|
return "FAIL"
|
|
|
|
|
|
def _extract_senior_tracker(output: str) -> str:
|
|
"""Extract Issue Tracker table from senior review output."""
|
|
match = _TRACKER_TABLE_PATTERN.search(output)
|
|
return match.group(0) if match else ""
|
|
|
|
|
|
def _extract_escalated_issues(output: str) -> str:
|
|
"""Extract escalation details from senior review output."""
|
|
# Look for content between VERDICT: ESCALATE and end, or an escalation section
|
|
pattern = r"(?:###?\s*Escalat(?:ed|ion).*?\n)(.*?)(?=\n###|\Z)"
|
|
match = re.search(pattern, output, re.DOTALL | re.IGNORECASE)
|
|
if match:
|
|
return match.group(1).strip()
|
|
# Fallback: grab the Action Items section
|
|
pattern2 = r"(?:###?\s*Action Items.*?\n)(.*?)(?=\n###|\Z)"
|
|
match2 = re.search(pattern2, output, re.DOTALL | re.IGNORECASE)
|
|
if match2:
|
|
return match2.group(1).strip()
|
|
return ""
|
|
|
|
|
|
_FP_PATTERN = re.compile(r"[\w/\\]+\.\w{1,5}")
|
|
_ISSUE_KEYWORDS = re.compile(
|
|
r"\b(missing|validation|error[\s_-]?handling|unused|import|"
|
|
r"injection|auth(?:entication|orization)?|deprecated|"
|
|
r"leak|overflow|null|undefined|timeout|deadlock|race[\s_-]?condition|"
|
|
r"security|permission|encoding|format|parsing|connection|"
|
|
r"boundary|initialization|cleanup|resource|concurrency|"
|
|
r"exception|crash|hang|corrupt|truncat|duplicat|inconsisten|"
|
|
r"omission|over[\s_-]?engineer|refactor|naming|docstring|"
|
|
r"type[\s_-]?hint|test|coverage|logging|config|performance)\w*",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def _issue_fingerprints(text: str) -> set[tuple[str, str]]:
|
|
"""Extract (file_path, issue_keyword) pairs from feedback text.
|
|
|
|
For each file path found, look for issue keywords within a window of
|
|
~120 characters around the file path mention and create composite keys.
|
|
"""
|
|
lower = text.lower()
|
|
paths = list(_FP_PATTERN.finditer(lower))
|
|
if not paths:
|
|
return set()
|
|
|
|
pairs: set[tuple[str, str]] = set()
|
|
for m in paths:
|
|
fp = m.group()
|
|
# Search a window around the file path for issue keywords
|
|
window_start = max(0, m.start() - 60)
|
|
window_end = min(len(lower), m.end() + 60)
|
|
window = lower[window_start:window_end]
|
|
for kw_match in _ISSUE_KEYWORDS.finditer(window):
|
|
pairs.add((fp, kw_match.group().lower()))
|
|
return pairs
|
|
|
|
|
|
def _detect_auto_escalate(
|
|
feedbacks: list[str],
|
|
current_feedback: str,
|
|
threshold: int = 2,
|
|
) -> bool:
|
|
"""Detect repeated identical issues across iterations (for auto-escalation).
|
|
|
|
Extracts (file_path, issue_keyword) fingerprints from feedback and checks
|
|
if any identical pair appears in >= *threshold* previous iterations.
|
|
This avoids false positives when the same file is mentioned for completely
|
|
different issues across iterations.
|
|
"""
|
|
current_fps = _issue_fingerprints(current_feedback)
|
|
if not current_fps:
|
|
return False
|
|
|
|
repeat_count = 0
|
|
for prev in feedbacks:
|
|
prev_fps = _issue_fingerprints(prev)
|
|
if current_fps & prev_fps:
|
|
repeat_count += 1
|
|
return repeat_count >= threshold
|
|
|
|
|
|
def _save_step_output(
|
|
run_dir: Path,
|
|
iteration: int,
|
|
step_name: str,
|
|
content: str,
|
|
) -> Path:
|
|
"""Save step output to run_dir/v{iteration}/{step_name}.md"""
|
|
path = run_dir / f"v{iteration}" / f"{step_name}.md"
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(content, encoding="utf-8")
|
|
return path
|
|
|
|
|
|
def _maybe_save_step_transcript(
|
|
run_dir: Path,
|
|
iteration: int,
|
|
step_name: str,
|
|
result: AgentResult,
|
|
) -> Path | None:
|
|
"""Persist raw stdout/stderr transcript when available."""
|
|
if not result.transcript:
|
|
return None
|
|
return _save_step_output(
|
|
run_dir, iteration, f"{step_name}_transcript", result.transcript,
|
|
)
|
|
|
|
|
|
def _format_runtime_error_markdown(
|
|
exc: Exception,
|
|
*,
|
|
step_name: str,
|
|
agent_name: str,
|
|
phase_name: str | None = None,
|
|
) -> str:
|
|
"""Render a structured markdown error report for a failed step."""
|
|
phase_info = f"- **Phase**: {phase_name}\n" if phase_name else ""
|
|
lines = [
|
|
"# Agent Error",
|
|
"",
|
|
phase_info.rstrip(),
|
|
f"- **Step**: {step_name}",
|
|
f"- **Agent**: {agent_name}",
|
|
]
|
|
lines = [line for line in lines if line]
|
|
|
|
if isinstance(exc, AgentInvocationError):
|
|
lines.extend(
|
|
[
|
|
f"- **Failure Type**: {exc.failure_type}",
|
|
f"- **Suggested Action**: {exc.suggested_action}",
|
|
"",
|
|
"## Command",
|
|
f"```",
|
|
exc.cmd_preview,
|
|
"```",
|
|
"",
|
|
"## Raw Error",
|
|
"```",
|
|
exc.raw_error,
|
|
"```",
|
|
],
|
|
)
|
|
else:
|
|
lines.extend(
|
|
[
|
|
"",
|
|
"```",
|
|
str(exc),
|
|
"```",
|
|
],
|
|
)
|
|
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def _save_report(run_dir: Path, config: PipelineConfig, result: PipelineResult) -> None:
|
|
"""Build and save the final markdown report."""
|
|
report = build_report(config, result)
|
|
report_path = run_dir / "final-report.md"
|
|
report_path.parent.mkdir(parents=True, exist_ok=True)
|
|
report_path.write_text(report, encoding="utf-8")
|
|
logger.info("Report saved: %s", report_path)
|