Files
cross-eval/cross_eval/pipeline.py
2026-03-13 21:52:13 +09:00

1277 lines
45 KiB
Python

"""Main pipeline execution engine."""
from __future__ import annotations
import logging
import os
import re
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from cross_eval.agent import AgentInvocationError, invoke_agent, invoke_agent_agentic
from cross_eval.worktree import WorktreeError
from cross_eval.config import try_reload_config
from cross_eval.discovery import discover_repo, format_repo_discovery
from cross_eval.models import (
AgentConfig,
AgentResult,
IterationResult,
PipelineConfig,
PipelineResult,
StepConfig,
)
from cross_eval.prompts import render_template, resolve_template, set_language
from cross_eval.report import build_report
from cross_eval.runtime_env import (
build_execution_policy,
build_runtime_environment,
summarize_environment,
)
logger = logging.getLogger(__name__)
def run_pipeline(
config: PipelineConfig,
cwd: Path | None = None,
dry_run: bool = False,
timeout: int | None = None,
) -> PipelineResult:
"""Execute the full cross-eval pipeline."""
# Create run directory: output/{preset}_{datetime}/
run_dir = _make_run_dir(config)
if config.phases:
return _run_phased_pipeline(config, run_dir, cwd, dry_run, timeout)
return _run_simple_pipeline(config, run_dir, cwd, dry_run, timeout)
def _make_run_dir(config: PipelineConfig) -> Path:
"""Create timestamped run directory under output_dir."""
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
run_dir = config.output_dir / f"{config.preset_name}_{ts}"
run_dir.mkdir(parents=True, exist_ok=True)
return run_dir
def _commit_iteration(
worktree_path: Path,
label: str,
iteration: int,
verdict: str | None,
) -> None:
"""Intermediate commit after each agentic iteration.
This resets the diff baseline so the next iteration only captures new changes.
"""
from cross_eval.worktree import commit_worktree
committed = commit_worktree(
worktree_path,
f"cross-eval: {label} v{iteration} ({verdict or 'no-verdict'})",
)
if committed:
logger.debug(" Intermediate commit: v%d (%s)", iteration, verdict)
def _has_agentic_steps(config: PipelineConfig, steps: list[StepConfig]) -> bool:
"""Check if any step uses an agentic agent."""
return any(
config.agents.get(s.agent, AgentConfig(name="", command="")).agentic
for s in steps
)
def _setup_worktree(cwd: Path, run_dir: Path, preset_name: str) -> tuple[Path, str]:
"""Create a shared worktree for the entire pipeline run.
1. Generate branch name (cross-eval/<preset>_<timestamp>)
2. Create branch from HEAD
3. Create worktree on that branch
Returns (worktree_path, branch_name).
"""
from cross_eval.worktree import create_worktree, make_branch_name
branch_name = make_branch_name(preset_name)
worktree_dir = run_dir / "work"
worktree_path = create_worktree(
base_cwd=cwd, work_dir=worktree_dir, branch_name=branch_name,
)
return worktree_path, branch_name
def _finalize_worktree(
cwd: Path,
worktree_path: Path,
branch_name: str,
preset_name: str,
final_verdict: str,
) -> str | None:
"""Commit changes on the branch, then remove the worktree.
The branch survives worktree removal and stays in the original repo.
Returns the branch name if changes were committed, None otherwise.
"""
from cross_eval.worktree import commit_worktree, remove_worktree
committed = False
try:
committed = commit_worktree(
worktree_path,
f"cross-eval: {preset_name} ({final_verdict})",
)
if committed:
logger.info(" Agentic changes committed on branch: %s", branch_name)
else:
logger.warning(" No agentic changes to commit (empty diff)")
except Exception:
logger.warning(" Failed to commit agentic changes", exc_info=True)
try:
remove_worktree(base_cwd=cwd, work_dir=worktree_path)
except Exception:
logger.warning("Failed to clean up worktree: %s", worktree_path)
# Check if branch has any commits beyond the base — if not, delete it
if not committed:
try:
# Check if branch has diverged from its base
result = subprocess.run(
["git", "log", "--oneline", f"HEAD..{branch_name}"],
cwd=cwd, capture_output=True, text=True,
)
if not result.stdout.strip():
# No commits on branch beyond base — clean up
subprocess.run(
["git", "branch", "-D", branch_name],
cwd=cwd, capture_output=True,
)
logger.info(" Deleted empty branch: %s", branch_name)
except Exception:
pass # best-effort cleanup
return branch_name if committed else None
def _run_simple_pipeline(
config: PipelineConfig,
run_dir: Path,
cwd: Path | None = None,
dry_run: bool = False,
timeout: int | None = None,
) -> PipelineResult:
"""Execute a simple (non-phased) pipeline."""
if cwd is None:
cwd = Path(os.getcwd())
set_language(config.language)
input_contents = _load_inputs(config)
runtime_env = _build_runtime_inputs(config, input_contents, cwd or Path(os.getcwd()))
# Setup shared worktree for agentic mode
worktree_path: Path | None = None
agentic_branch_name: str | None = None
if not dry_run and _has_agentic_steps(config, config.pipeline):
worktree_path, agentic_branch_name = _setup_worktree(
cwd, run_dir, config.preset_name,
)
feedback = "(no feedback — first iteration)"
iterations: list[IterationResult] = []
start_time = time.monotonic()
final_verdict = "MAX_ITERATIONS_REACHED"
aggregate_history: dict[str, int] = {}
aggregate_warnings: list[str] = []
escalated_issues: list[str] = []
all_feedbacks: list[str] = []
try:
for i in range(1, config.max_iterations + 1):
config = try_reload_config(config)
set_language(config.language)
_refresh_inputs(config, input_contents)
runtime_env = _build_runtime_inputs(config, input_contents, cwd)
logger.info("=" * 50)
logger.info(" Iteration %d/%d", i, config.max_iterations)
logger.info("=" * 50)
step_outputs, step_results, verdict = _run_steps(
config.pipeline, config, input_contents, feedback,
i, config.max_iterations, cwd, timeout, dry_run,
run_dir=run_dir, output_iter=i,
worktree_path=worktree_path,
runtime_env=runtime_env,
)
# Intermediate commit so next iteration's diff only shows new changes
if worktree_path is not None:
_commit_iteration(worktree_path, config.preset_name, i, verdict)
iter_result = IterationResult(
iteration=i,
step_results=step_results,
step_outputs=step_outputs,
verdict=verdict,
)
warning = _detect_repeated_aggregate(
config.pipeline, step_outputs, aggregate_history, iteration=i,
)
if warning:
iter_result.repeated_aggregate_warning = warning
aggregate_warnings.append(warning)
logger.warning(" %s", warning)
iter_result.feedback = _collect_feedback(config.pipeline, step_outputs)
feedback = iter_result.feedback or feedback
all_feedbacks.append(feedback)
# Extract tracker from verdict/review steps for next iteration
for step in config.pipeline:
if step.verdict or step.role == "review":
tracker = _extract_senior_tracker(
step_outputs.get(step.output_key, ""),
)
if tracker:
input_contents["previous_senior_tracker"] = tracker
iterations.append(iter_result)
# ESCALATE check (highest priority)
if verdict == "ESCALATE":
final_verdict = "ESCALATE"
for step in config.pipeline:
if step.verdict:
esc = _extract_escalated_issues(
step_outputs.get(step.output_key, ""),
)
if esc:
escalated_issues.append(esc)
iter_result.escalated_issues = esc
logger.info(" ESCALATE at iteration %d — stopping loop.", i)
break
if verdict == "PASS":
final_verdict = "PASS"
if i >= config.min_iterations:
logger.info(" PASS at iteration %d (min=%d reached)!", i, config.min_iterations)
break
else:
logger.info(
" PASS at iteration %d, but min_iterations=%d — continuing",
i, config.min_iterations,
)
# Auto-escalate: no senior/aggregator + repeated FAIL
has_aggregator = config.seniors or any(
s.prompt_template == "default:aggregate-review" for s in config.pipeline
)
if (
verdict == "FAIL"
and not has_aggregator
and i >= 2
and _detect_auto_escalate(all_feedbacks[:-1], feedback)
):
final_verdict = "ESCALATE"
auto_msg = (
f"Auto-escalated: same issues detected across {i} iterations "
f"without resolution (no senior reviewer configured)."
)
escalated_issues.append(auto_msg)
iter_result.escalated_issues = auto_msg
logger.info(" AUTO-ESCALATE at iteration %d", i)
break
if dry_run:
logger.info(" (dry-run: stopping after iteration 1)")
break
finally:
agentic_branch: str | None = None
if worktree_path is not None and agentic_branch_name is not None:
agentic_branch = _finalize_worktree(
cwd, worktree_path, agentic_branch_name,
config.preset_name, final_verdict,
)
total_duration = time.monotonic() - start_time
pipeline_result = PipelineResult(
iterations=iterations,
final_verdict=final_verdict,
total_duration=round(total_duration, 1),
run_dir=run_dir,
repeated_aggregate_warnings=aggregate_warnings,
escalated_issues=escalated_issues,
agentic_branch=agentic_branch,
)
if not dry_run:
_save_report(run_dir, config, pipeline_result)
return pipeline_result
def _run_phased_pipeline(
config: PipelineConfig,
run_dir: Path,
cwd: Path | None = None,
dry_run: bool = False,
timeout: int | None = None,
) -> PipelineResult:
"""Execute a multi-phase pipeline (e.g. review-fix)."""
if cwd is None:
cwd = Path(os.getcwd())
set_language(config.language)
input_contents = _load_inputs(config)
runtime_env = _build_runtime_inputs(config, input_contents, cwd)
# Setup shared worktree for agentic mode
all_phase_steps = [s for p in config.phases for s in p.steps]
worktree_path: Path | None = None
agentic_branch_name: str | None = None
if not dry_run and _has_agentic_steps(config, all_phase_steps):
worktree_path, agentic_branch_name = _setup_worktree(
cwd, run_dir, config.preset_name,
)
iterations: list[IterationResult] = []
feedback = "(no feedback — first iteration)"
start_time = time.monotonic()
final_verdict = "MAX_ITERATIONS_REACHED"
global_iter = 0
aggregate_history_by_phase: dict[str, dict[str, int]] = {}
aggregate_warnings: list[str] = []
escalated_issues: list[str] = []
all_feedbacks: list[str] = []
escalated = False
try:
for phase_idx, phase in enumerate(config.phases):
if escalated:
break
logger.info("=" * 60)
logger.info(
" Phase: %s (max_iter=%d, consecutive_pass=%d)",
phase.name, phase.max_iterations, phase.consecutive_pass,
)
logger.info("=" * 60)
consecutive_passes = 0
phase_converged = False
for pi in range(1, phase.max_iterations + 1):
global_iter += 1
config = try_reload_config(config)
set_language(config.language)
_refresh_inputs(config, input_contents)
runtime_env = _build_runtime_inputs(config, input_contents, cwd)
logger.info("-" * 50)
logger.info(
" [%s] Iteration %d/%d (global: v%d)",
phase.name, pi, phase.max_iterations, global_iter,
)
logger.info("-" * 50)
step_outputs, step_results, verdict = _run_steps(
phase.steps, config, input_contents, feedback,
pi, phase.max_iterations, cwd, timeout, dry_run,
run_dir=run_dir, output_iter=global_iter, phase_name=phase.name,
worktree_path=worktree_path,
runtime_env=runtime_env,
)
# Intermediate commit so next iteration's diff only shows new changes
if worktree_path is not None:
_commit_iteration(
worktree_path, f"{config.preset_name}/{phase.name}",
global_iter, verdict,
)
iter_result = IterationResult(
iteration=global_iter,
step_results=step_results,
step_outputs=step_outputs,
verdict=verdict,
phase_name=phase.name,
)
phase_history = aggregate_history_by_phase.setdefault(phase.name, {})
warning = _detect_repeated_aggregate(
phase.steps, step_outputs, phase_history, iteration=global_iter,
phase_name=phase.name,
)
if warning:
iter_result.repeated_aggregate_warning = warning
aggregate_warnings.append(warning)
logger.warning(" %s", warning)
iter_result.feedback = _collect_feedback(phase.steps, step_outputs)
feedback = iter_result.feedback or feedback
all_feedbacks.append(feedback)
# Extract tracker from verdict/review steps
for step in phase.steps:
if step.verdict or step.role == "review":
tracker = _extract_senior_tracker(
step_outputs.get(step.output_key, ""),
)
if tracker:
input_contents["previous_senior_tracker"] = tracker
iterations.append(iter_result)
# ESCALATE check
if verdict == "ESCALATE":
final_verdict = "ESCALATE"
for step in phase.steps:
if step.verdict:
esc = _extract_escalated_issues(
step_outputs.get(step.output_key, ""),
)
if esc:
escalated_issues.append(esc)
iter_result.escalated_issues = esc
logger.info(
" [%s] ESCALATE at iteration %d — stopping.",
phase.name, pi,
)
escalated = True
break
if verdict is None:
logger.info(
" [%s] completed (no verdict step; single-pass phase)",
phase.name,
)
phase_converged = True
break
if verdict == "PASS":
consecutive_passes += 1
logger.info(
" [%s] PASS (%d/%d consecutive)",
phase.name, consecutive_passes, phase.consecutive_pass,
)
if consecutive_passes >= phase.consecutive_pass:
logger.info(
" [%s] Converged! %d consecutive PASSes.",
phase.name, phase.consecutive_pass,
)
phase_converged = True
break
else:
consecutive_passes = 0
# Auto-escalate in phased pipeline
has_aggregator = config.seniors or any(
s.prompt_template == "default:aggregate-review" for s in phase.steps
)
if (
verdict == "FAIL"
and not has_aggregator
and pi >= 2
and _detect_auto_escalate(all_feedbacks[:-1], feedback)
):
final_verdict = "ESCALATE"
auto_msg = (
f"Auto-escalated: same issues detected across {pi} iterations "
f"in phase '{phase.name}' without resolution."
)
escalated_issues.append(auto_msg)
iter_result.escalated_issues = auto_msg
logger.info(" [%s] AUTO-ESCALATE at iteration %d", phase.name, pi)
escalated = True
break
if dry_run:
break
if escalated:
break
if phase_converged:
logger.info(" Phase '%s' completed: CONVERGED", phase.name)
else:
logger.info(
" Phase '%s' completed: max iterations (%d) reached",
phase.name, phase.max_iterations,
)
if phase_idx == len(config.phases) - 1:
final_verdict = "PASS" if phase_converged else "MAX_ITERATIONS_REACHED"
finally:
agentic_branch: str | None = None
if worktree_path is not None and agentic_branch_name is not None:
agentic_branch = _finalize_worktree(
cwd, worktree_path, agentic_branch_name,
config.preset_name, final_verdict,
)
total_duration = time.monotonic() - start_time
pipeline_result = PipelineResult(
iterations=iterations,
final_verdict=final_verdict,
total_duration=round(total_duration, 1),
run_dir=run_dir,
repeated_aggregate_warnings=aggregate_warnings,
escalated_issues=escalated_issues,
agentic_branch=agentic_branch,
)
if not dry_run:
_save_report(run_dir, config, pipeline_result)
return pipeline_result
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _load_inputs(config: PipelineConfig) -> dict[str, str]:
"""Load input file contents from config."""
input_contents: dict[str, str] = {}
for key, val in config.inputs.items():
if isinstance(val, str):
input_contents[key] = val
else:
input_contents[key] = val.read_text(encoding="utf-8")
return input_contents
def _refresh_inputs(
config: PipelineConfig, input_contents: dict[str, str],
) -> None:
"""Re-read input files (they may have changed on disk)."""
for key, val in config.inputs.items():
if isinstance(val, str):
input_contents[key] = val
elif isinstance(val, Path) and val.exists():
input_contents[key] = val.read_text(encoding="utf-8")
# ---------------------------------------------------------------------------
# Parallel step grouping
# ---------------------------------------------------------------------------
def _get_step_dependencies(step: StepConfig) -> set[str]:
"""Extract output_key references from context_override values."""
deps: set[str] = set()
for val in step.context_override.values():
for match in re.finditer(r"\{(\w+)\}", val):
deps.add(match.group(1))
return deps
def _group_parallel_steps(steps: list[StepConfig]) -> list[list[StepConfig]]:
"""Group consecutive parallel steps into batches.
Consecutive steps with parallel=True are grouped together,
but a new batch starts when a step depends on an output_key
from a step in the current batch (dependency breaking).
"""
batches: list[list[StepConfig]] = []
current: list[StepConfig] = []
current_output_keys: set[str] = set()
for step in steps:
if not step.parallel:
if current:
batches.append(current)
current = []
current_output_keys = set()
batches.append([step])
continue
# Check if this step depends on any output from the current batch
deps = _get_step_dependencies(step)
if deps & current_output_keys:
batches.append(current)
current = []
current_output_keys = set()
current.append(step)
current_output_keys.add(step.output_key)
if current:
batches.append(current)
return batches
# ---------------------------------------------------------------------------
# Step execution
# ---------------------------------------------------------------------------
def _run_steps(
steps: list[StepConfig],
config: PipelineConfig,
input_contents: dict[str, str],
feedback: str,
iteration: int,
max_iterations: int,
cwd: Path,
timeout: int | None,
dry_run: bool,
*,
run_dir: Path,
output_iter: int,
phase_name: str | None = None,
worktree_path: Path | None = None,
runtime_env: dict[str, str] | None = None,
) -> tuple[dict[str, str], dict[str, AgentResult], str | None]:
"""Execute all steps in one iteration, parallelizing where possible."""
step_outputs: dict[str, str] = {}
step_results: dict[str, AgentResult] = {}
verdict: str | None = None
batches = _group_parallel_steps(steps)
for batch in batches:
if len(batch) == 1:
step = batch[0]
_execute_step(
step, config, input_contents, feedback,
iteration, max_iterations, cwd, timeout, dry_run,
step_outputs, step_results,
run_dir=run_dir, output_iter=output_iter,
phase_name=phase_name, worktree_path=worktree_path,
runtime_env=runtime_env,
)
else:
_execute_parallel_batch(
batch, config, input_contents, feedback,
iteration, max_iterations, cwd, timeout, dry_run,
step_outputs, step_results,
run_dir=run_dir, output_iter=output_iter,
phase_name=phase_name, worktree_path=worktree_path,
runtime_env=runtime_env,
)
# Extract verdict from all verdict steps (ALL must PASS; ESCALATE wins over all)
for step in steps:
if step.verdict:
output = step_outputs.get(step.output_key, "")
step_verdict = _extract_verdict(output, step.verdict_pattern)
logger.info(" [%s] verdict: %s", step.name, step_verdict)
if step_verdict == "ESCALATE":
verdict = "ESCALATE"
elif verdict is None:
verdict = step_verdict
elif verdict != "ESCALATE" and step_verdict == "FAIL":
verdict = "FAIL"
return step_outputs, step_results, verdict
def _invoke_agentic(
agent_config: AgentConfig,
prompt: str,
step_name: str,
*,
worktree_path: Path,
env: dict[str, str] | None = None,
timeout: int | None = None,
quiet: bool = False,
) -> AgentResult:
"""Run an agent in agentic mode using an existing worktree."""
return invoke_agent_agentic(
agent_config, prompt, step_name,
worktree_path=worktree_path,
env=env,
timeout=timeout, quiet=quiet,
)
def _execute_step(
step: StepConfig,
config: PipelineConfig,
input_contents: dict[str, str],
feedback: str,
iteration: int,
max_iterations: int,
cwd: Path,
timeout: int | None,
dry_run: bool,
step_outputs: dict[str, str],
step_results: dict[str, AgentResult],
*,
run_dir: Path,
output_iter: int,
phase_name: str | None = None,
quiet: bool = False,
worktree_path: Path | None = None,
runtime_env: dict[str, str] | None = None,
) -> None:
"""Execute a single step, updating step_outputs and step_results in place."""
if not quiet:
logger.info(" [%s] agent='%s' role='%s'", step.name, step.agent, step.role)
# 1. Resolve template
template = resolve_template(step.prompt_template)
# 2. Build context
context = _build_context(
input_contents, step_outputs, feedback, iteration, max_iterations,
)
# 3. Apply context overrides
if step.context_override:
context = _apply_context_override(context, step.context_override)
# 4. Render prompt
prompt = render_template(template, context)
prompt = _augment_prompt_with_runtime_context(prompt, context)
# 5. Dry run: print and skip
if dry_run:
phase_label = f" phase={phase_name}" if phase_name else ""
print(f"\n--- Step: {step.name} (agent={step.agent}{phase_label}) ---")
print(prompt)
print(f"--- end {step.name} ---\n")
step_outputs[step.output_key] = f"(dry-run: no output for {step.output_key})"
return
# 6. Invoke agent
agent_config = config.agents[step.agent]
try:
if agent_config.agentic and worktree_path:
result = _invoke_agentic(
agent_config, prompt, step.name,
worktree_path=worktree_path,
env=runtime_env,
timeout=timeout, quiet=quiet,
)
else:
# When worktree exists, run non-agentic agents (reviewers) in
# the worktree too so they can inspect the modified files.
effective_cwd = worktree_path if worktree_path else cwd
result = invoke_agent(
agent_config, prompt, step.name,
cwd=effective_cwd, env=runtime_env, timeout=timeout, quiet=quiet,
)
except subprocess.TimeoutExpired as e:
stdout = (e.stdout or b"") if isinstance(e.stdout, bytes) else (e.stdout or "")
stderr = (e.stderr or b"") if isinstance(e.stderr, bytes) else (e.stderr or "")
if isinstance(stdout, bytes):
stdout = stdout.decode("utf-8", errors="replace")
if isinstance(stderr, bytes):
stderr = stderr.decode("utf-8", errors="replace")
phase_info = f"- **Phase**: {phase_name}\n" if phase_name else ""
error_msg = (
f"# Agent Timeout\n\n"
f"{phase_info}"
f"- **Step**: {step.name}\n"
f"- **Agent**: {step.agent}\n"
f"- **Timeout**: {timeout}s\n\n"
f"Partial stdout ({len(stdout)} chars):\n"
f"```\n{stdout[:2000] or '(none)'}\n```\n\n"
f"Stderr:\n```\n{stderr[:2000] or '(none)'}\n```\n"
)
_save_step_output(run_dir, output_iter, f"{step.name}_error", error_msg)
logger.error(" [%s] TIMEOUT after %ss — saved to output", step.name, timeout)
raise RuntimeError(
f"Agent '{step.agent}' timed out after {timeout}s at step '{step.name}'. "
f"Error saved to {run_dir}/v{output_iter}/{step.name}_error.md. "
f"Try --timeout 0 (unlimited)"
)
except RuntimeError as e:
error_msg = _format_runtime_error_markdown(
e,
step_name=step.name,
agent_name=step.agent,
phase_name=phase_name,
)
_save_step_output(run_dir, output_iter, f"{step.name}_error", error_msg)
logger.error(" [%s] FAILED — saved to output", step.name)
raise
# 7. Store output
step_outputs[step.output_key] = result.output
step_results[step.output_key] = result
if not quiet:
logger.info(
" [%s] completed (%.1fs, %d chars)",
step.name, result.duration_seconds, len(result.output),
)
# 8. Save to disk
_save_step_output(run_dir, output_iter, step.name, result.output)
_maybe_save_step_transcript(run_dir, output_iter, step.name, result)
def _execute_parallel_batch(
batch: list[StepConfig],
config: PipelineConfig,
input_contents: dict[str, str],
feedback: str,
iteration: int,
max_iterations: int,
cwd: Path,
timeout: int | None,
dry_run: bool,
step_outputs: dict[str, str],
step_results: dict[str, AgentResult],
*,
run_dir: Path,
output_iter: int,
phase_name: str | None = None,
worktree_path: Path | None = None,
runtime_env: dict[str, str] | None = None,
) -> None:
"""Execute multiple steps in parallel using threads."""
agent_names = ", ".join(s.agent for s in batch)
logger.info(" [parallel] %d agents: %s", len(batch), agent_names)
if dry_run:
for step in batch:
_execute_step(
step, config, input_contents, feedback,
iteration, max_iterations, cwd, timeout, dry_run,
step_outputs, step_results,
run_dir=run_dir, output_iter=output_iter, phase_name=phase_name,
)
return
# Agentic steps cannot run in parallel (they share a worktree)
agentic_in_batch = [
s for s in batch
if config.agents.get(s.agent, AgentConfig(name="", command="")).agentic
]
if len(agentic_in_batch) > 1:
logger.warning(
" [parallel] %d agentic steps cannot run concurrently — running sequentially",
len(agentic_in_batch),
)
for step in batch:
_execute_step(
step, config, input_contents, feedback,
iteration, max_iterations, cwd, timeout, dry_run,
step_outputs, step_results,
run_dir=run_dir, output_iter=output_iter,
phase_name=phase_name, worktree_path=worktree_path,
)
return
# Snapshot context before parallel execution (all steps see same state)
context_snapshot = dict(input_contents)
context_snapshot.update(step_outputs)
# Collect results from parallel threads
local_outputs: dict[str, str] = {}
local_results: dict[str, AgentResult] = {}
errors: list[tuple[StepConfig, Exception]] = []
# Show a single spinner for the batch
from cross_eval.agent import _Spinner
spinner = _Spinner(
f"[parallel] {len(batch)} agents running ({agent_names})..."
)
spinner.start()
batch_start = time.monotonic()
def _run_one(step: StepConfig) -> tuple[str, str, AgentResult]:
"""Run one step, return (output_key, output, result)."""
template = resolve_template(step.prompt_template)
context = _build_context(
context_snapshot, {}, feedback, iteration, max_iterations,
)
if step.context_override:
context = _apply_context_override(context, step.context_override)
prompt = render_template(template, context)
prompt = _augment_prompt_with_runtime_context(prompt, context)
agent_config = config.agents[step.agent]
if agent_config.agentic and worktree_path:
result = _invoke_agentic(
agent_config, prompt, step.name,
worktree_path=worktree_path,
env=runtime_env,
timeout=timeout, quiet=True,
)
else:
effective_cwd = worktree_path if worktree_path else cwd
result = invoke_agent(
agent_config, prompt, step.name,
cwd=effective_cwd, env=runtime_env, timeout=timeout, quiet=True,
)
return step.output_key, result.output, result
with ThreadPoolExecutor(max_workers=len(batch)) as executor:
futures = {executor.submit(_run_one, step): step for step in batch}
for future in as_completed(futures):
step = futures[future]
try:
output_key, output, result = future.result()
local_results[output_key] = result
local_outputs[output_key] = output
except Exception as e:
errors.append((step, e))
batch_elapsed = round(time.monotonic() - batch_start, 1)
# Persist successful outputs even if a sibling step failed.
for step in batch:
key = step.output_key
if key not in local_outputs:
continue
step_outputs[key] = local_outputs[key]
step_results[key] = local_results[key]
r = local_results[key]
logger.info(
" [%s] completed (%.1fs, %d chars)",
step.name, r.duration_seconds, len(r.output),
)
_save_step_output(run_dir, output_iter, step.name, r.output)
_maybe_save_step_transcript(run_dir, output_iter, step.name, r)
if errors:
spinner.stop(f"[parallel] FAILED ({batch_elapsed}s)")
for failed_step, exc in errors:
if isinstance(exc, subprocess.TimeoutExpired):
stdout = (exc.stdout or b"") if isinstance(exc.stdout, bytes) else (exc.stdout or "")
stderr = (exc.stderr or b"") if isinstance(exc.stderr, bytes) else (exc.stderr or "")
if isinstance(stdout, bytes):
stdout = stdout.decode("utf-8", errors="replace")
if isinstance(stderr, bytes):
stderr = stderr.decode("utf-8", errors="replace")
phase_info = f"- **Phase**: {phase_name}\n" if phase_name else ""
error_msg = (
f"# Agent Timeout\n\n"
f"{phase_info}"
f"- **Step**: {failed_step.name}\n"
f"- **Agent**: {failed_step.agent}\n"
f"- **Timeout**: {timeout}s\n\n"
f"Partial stdout ({len(stdout)} chars):\n"
f"```\n{stdout[:2000] or '(none)'}\n```\n\n"
f"Stderr:\n```\n{stderr[:2000] or '(none)'}\n```\n"
)
else:
error_msg = _format_runtime_error_markdown(
exc,
step_name=failed_step.name,
agent_name=failed_step.agent,
phase_name=phase_name,
)
_save_step_output(run_dir, output_iter, f"{failed_step.name}_error", error_msg)
logger.error(" [%s] FAILED — saved to output", failed_step.name)
failed_steps = ", ".join(step.name for step, _ in errors)
saved_steps = ", ".join(step.name for step in batch if step.output_key in local_outputs)
first_error = errors[0][1]
saved_note = f" Successful outputs were saved for: {saved_steps}." if saved_steps else ""
raise RuntimeError(
f"Parallel batch failed: {len(errors)}/{len(batch)} steps failed ({failed_steps})."
f"{saved_note} First error:\n{first_error}"
)
spinner.stop(f"[parallel] {len(batch)} agents done ({batch_elapsed}s)")
# ---------------------------------------------------------------------------
# Context and template helpers
# ---------------------------------------------------------------------------
def _build_context(
input_contents: dict[str, str],
step_outputs: dict[str, str],
feedback: str,
iteration: int,
max_iterations: int,
) -> dict[str, str]:
"""Build the template context dict."""
context: dict[str, str] = {}
context.update(input_contents)
context.update(step_outputs)
context["feedback"] = feedback
context["iteration"] = str(iteration)
context["max_iterations"] = str(max_iterations)
return context
def _build_runtime_inputs(
config: PipelineConfig,
input_contents: dict[str, str],
cwd: Path,
) -> dict[str, str]:
"""Load runtime env and expose safe execution hints to prompts."""
env, loaded_files, loaded_values = build_runtime_environment(config.execution, cwd)
discovery = discover_repo(cwd, set(loaded_values) | set(env))
input_contents["execution_policy"] = build_execution_policy(config.execution)
input_contents["environment_context"] = summarize_environment(
config.execution, loaded_files, env, loaded_values,
)
input_contents["repo_discovery"] = format_repo_discovery(discovery)
return env
def _augment_prompt_with_runtime_context(
prompt: str,
context: dict[str, str],
) -> str:
"""Append execution/env guidance without requiring every template to include placeholders."""
extras: list[str] = []
if context.get("execution_policy"):
extras.append("## Execution Policy\n" + context["execution_policy"])
if context.get("environment_context"):
extras.append("## Environment Context\n" + context["environment_context"])
if context.get("repo_discovery"):
extras.append("## Repository Discovery\n" + context["repo_discovery"])
if not extras:
return prompt
return prompt.rstrip() + "\n\n" + "\n\n".join(extras) + "\n"
def _apply_context_override(
context: dict[str, str],
overrides: dict[str, str],
) -> dict[str, str]:
"""Apply context_override mappings for cross-review scenarios."""
result = dict(context)
for key, value_template in overrides.items():
result[key] = render_template(value_template, context)
return result
def _collect_feedback(
steps: list[StepConfig],
step_outputs: dict[str, str],
) -> str:
"""Collect feedback from all verdict steps.
Single verdict step → raw output (backward compatible).
Multiple verdict steps → combined with agent headers for cross-referencing.
"""
verdict_steps = [s for s in steps if s.verdict]
if len(verdict_steps) == 1:
return step_outputs.get(verdict_steps[0].output_key, "")
parts: list[str] = []
for s in verdict_steps:
output = step_outputs.get(s.output_key, "")
if output:
parts.append(f"## Review by {s.agent} ({s.name})\n{output}")
return "\n\n---\n\n".join(parts)
def _detect_repeated_aggregate(
steps: list[StepConfig],
step_outputs: dict[str, str],
history: dict[str, int],
*,
iteration: int,
phase_name: str | None = None,
) -> str | None:
"""Detect repeated aggregate-review outputs across iterations."""
for step in steps:
if step.prompt_template != "default:aggregate-review":
continue
output = step_outputs.get(step.output_key, "")
normalized = _normalize_aggregate_output(output)
if not normalized:
return None
if normalized in history:
prev_iter = history[normalized]
phase_prefix = f"[{phase_name}] " if phase_name else ""
return (
f"{phase_prefix}Repeated aggregate_review detected at iteration {iteration} "
f"(same as iteration {prev_iter})."
)
history[normalized] = iteration
return None
return None
def _normalize_aggregate_output(output: str) -> str:
"""Normalize aggregate output for repeat detection."""
return " ".join(output.lower().split())
_ESCALATE_PATTERN = re.compile(r"VERDICT:\s*ESCALATE", re.IGNORECASE)
_TRACKER_TABLE_PATTERN = re.compile(
r"(##+ Issue Tracker[^\n]*\n(?:\|[^\n]+\|\n?)+)", re.DOTALL,
)
def _extract_verdict(output: str, pattern: str) -> str:
"""Extract PASS, FAIL, or ESCALATE from output using regex pattern."""
if re.search(_ESCALATE_PATTERN, output):
return "ESCALATE" # highest priority
if re.search(pattern, output):
return "PASS"
return "FAIL"
def _extract_senior_tracker(output: str) -> str:
"""Extract Issue Tracker table from senior review output."""
match = _TRACKER_TABLE_PATTERN.search(output)
return match.group(0) if match else ""
def _extract_escalated_issues(output: str) -> str:
"""Extract escalation details from senior review output."""
# Look for content between VERDICT: ESCALATE and end, or an escalation section
pattern = r"(?:###?\s*Escalat(?:ed|ion).*?\n)(.*?)(?=\n###|\Z)"
match = re.search(pattern, output, re.DOTALL | re.IGNORECASE)
if match:
return match.group(1).strip()
# Fallback: grab the Action Items section
pattern2 = r"(?:###?\s*Action Items.*?\n)(.*?)(?=\n###|\Z)"
match2 = re.search(pattern2, output, re.DOTALL | re.IGNORECASE)
if match2:
return match2.group(1).strip()
return ""
_FP_PATTERN = re.compile(r"[\w/\\]+\.\w{1,5}")
_ISSUE_KEYWORDS = re.compile(
r"\b(missing|validation|error[\s_-]?handling|unused|import|"
r"injection|auth(?:entication|orization)?|deprecated|"
r"leak|overflow|null|undefined|timeout|deadlock|race[\s_-]?condition|"
r"security|permission|encoding|format|parsing|connection|"
r"boundary|initialization|cleanup|resource|concurrency|"
r"exception|crash|hang|corrupt|truncat|duplicat|inconsisten|"
r"omission|over[\s_-]?engineer|refactor|naming|docstring|"
r"type[\s_-]?hint|test|coverage|logging|config|performance)\w*",
re.IGNORECASE,
)
def _issue_fingerprints(text: str) -> set[tuple[str, str]]:
"""Extract (file_path, issue_keyword) pairs from feedback text.
For each file path found, look for issue keywords within a window of
~120 characters around the file path mention and create composite keys.
"""
lower = text.lower()
paths = list(_FP_PATTERN.finditer(lower))
if not paths:
return set()
pairs: set[tuple[str, str]] = set()
for m in paths:
fp = m.group()
# Search a window around the file path for issue keywords
window_start = max(0, m.start() - 60)
window_end = min(len(lower), m.end() + 60)
window = lower[window_start:window_end]
for kw_match in _ISSUE_KEYWORDS.finditer(window):
pairs.add((fp, kw_match.group().lower()))
return pairs
def _detect_auto_escalate(
feedbacks: list[str],
current_feedback: str,
threshold: int = 2,
) -> bool:
"""Detect repeated identical issues across iterations (for auto-escalation).
Extracts (file_path, issue_keyword) fingerprints from feedback and checks
if any identical pair appears in >= *threshold* previous iterations.
This avoids false positives when the same file is mentioned for completely
different issues across iterations.
"""
current_fps = _issue_fingerprints(current_feedback)
if not current_fps:
return False
repeat_count = 0
for prev in feedbacks:
prev_fps = _issue_fingerprints(prev)
if current_fps & prev_fps:
repeat_count += 1
return repeat_count >= threshold
def _save_step_output(
run_dir: Path,
iteration: int,
step_name: str,
content: str,
) -> Path:
"""Save step output to run_dir/v{iteration}/{step_name}.md"""
path = run_dir / f"v{iteration}" / f"{step_name}.md"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
return path
def _maybe_save_step_transcript(
run_dir: Path,
iteration: int,
step_name: str,
result: AgentResult,
) -> Path | None:
"""Persist raw stdout/stderr transcript when available."""
if not result.transcript:
return None
return _save_step_output(
run_dir, iteration, f"{step_name}_transcript", result.transcript,
)
def _format_runtime_error_markdown(
exc: Exception,
*,
step_name: str,
agent_name: str,
phase_name: str | None = None,
) -> str:
"""Render a structured markdown error report for a failed step."""
phase_info = f"- **Phase**: {phase_name}\n" if phase_name else ""
lines = [
"# Agent Error",
"",
phase_info.rstrip(),
f"- **Step**: {step_name}",
f"- **Agent**: {agent_name}",
]
lines = [line for line in lines if line]
if isinstance(exc, AgentInvocationError):
lines.extend(
[
f"- **Failure Type**: {exc.failure_type}",
f"- **Suggested Action**: {exc.suggested_action}",
"",
"## Command",
f"```",
exc.cmd_preview,
"```",
"",
"## Raw Error",
"```",
exc.raw_error,
"```",
],
)
else:
lines.extend(
[
"",
"```",
str(exc),
"```",
],
)
return "\n".join(lines) + "\n"
def _save_report(run_dir: Path, config: PipelineConfig, result: PipelineResult) -> None:
"""Build and save the final markdown report."""
report = build_report(config, result)
report_path = run_dir / "final-report.md"
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(report, encoding="utf-8")
logger.info("Report saved: %s", report_path)