"""Main pipeline execution engine.""" from __future__ import annotations import logging import os import re import subprocess import time from hashlib import sha256 from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path from cross_eval.agent import AgentInvocationError, invoke_agent, invoke_agent_agentic from cross_eval.worktree import WorktreeError from cross_eval.config import try_reload_config from cross_eval.discovery import discover_repo, format_repo_discovery from cross_eval.models import ( AgentConfig, AgentResult, IterationResult, PipelineConfig, PipelineResult, StepConfig, ) from cross_eval.prompts import render_template, resolve_template, set_language from cross_eval.report import build_report from cross_eval.runtime_env import ( build_execution_policy, build_runtime_environment, summarize_environment, ) logger = logging.getLogger(__name__) def run_pipeline( config: PipelineConfig, cwd: Path | None = None, dry_run: bool = False, timeout: int | None = None, ) -> PipelineResult: """Execute the full cross-eval pipeline.""" # Create run directory: output/{preset}_{datetime}/ run_dir = _make_run_dir(config) if config.phases: return _run_phased_pipeline(config, run_dir, cwd, dry_run, timeout) return _run_simple_pipeline(config, run_dir, cwd, dry_run, timeout) def _make_run_dir(config: PipelineConfig) -> Path: """Create timestamped run directory under output_dir.""" ts = datetime.now().strftime("%Y%m%d_%H%M%S") run_dir = config.output_dir / f"{config.preset_name}_{ts}" run_dir.mkdir(parents=True, exist_ok=True) return run_dir def _commit_iteration( worktree_path: Path, label: str, iteration: int, verdict: str | None, ) -> str: """Intermediate commit after each agentic iteration. This resets the diff baseline so the next iteration only captures new changes. Returns the new HEAD SHA to use as the base_commit for the next iteration. """ from cross_eval.worktree import commit_worktree, get_current_head committed = commit_worktree( worktree_path, f"cross-eval: {label} v{iteration} ({verdict or 'no-verdict'})", ) if committed: logger.debug(" Intermediate commit: v%d (%s)", iteration, verdict) return get_current_head(worktree_path) def _has_agentic_steps(config: PipelineConfig, steps: list[StepConfig]) -> bool: """Check if any step uses an agentic agent.""" return any( config.agents.get(s.agent, AgentConfig(name="", command="")).agentic for s in steps ) def _setup_worktree(cwd: Path, run_dir: Path, preset_name: str) -> tuple[Path, str, str]: """Create a shared worktree for the entire pipeline run. 1. Generate branch name (cross-eval/_) 2. Create branch from HEAD 3. Create worktree on that branch Returns (worktree_path, branch_name, base_commit). """ from cross_eval.worktree import create_worktree, make_branch_name, make_worktree_dir branch_name = make_branch_name(preset_name) worktree_dir = make_worktree_dir(cwd, branch_name) worktree_path, base_commit = create_worktree( base_cwd=cwd, work_dir=worktree_dir, branch_name=branch_name, ) (run_dir / "worktree_path.txt").write_text(f"{worktree_path}\n", encoding="utf-8") (run_dir / "worktree_branch.txt").write_text(f"{branch_name}\n", encoding="utf-8") (run_dir / "worktree_base.txt").write_text(f"{base_commit}\n", encoding="utf-8") return worktree_path, branch_name, base_commit def _copy_inputs_to_worktree( config: PipelineConfig, worktree_path: Path, *, base_cwd: Path, ) -> None: """Copy input files (plan, checklist, etc.) into the worktree. Repo-local inputs are remapped to the corresponding path inside the worktree so agentic edits produce a real git diff. External inputs are copied into a dedicated inputs directory. For ``plan-review`` these external copies remain tracked so document edits can survive on the branch; other presets keep them ignored to avoid polluting code diffs. Updates ``config.inputs`` in-place so subsequent reference refreshes use worktree-local paths. """ import shutil base_root = base_cwd.resolve() track_external_inputs = config.preset_name == "plan-review" inputs_dir = worktree_path / ".cross-eval-inputs" inputs_dir.mkdir(exist_ok=True) if not track_external_inputs: # Exclude read-only input copies from git so they don't pollute code diffs. (inputs_dir / ".gitignore").write_text("*\n", encoding="utf-8") for key, val in list(config.inputs.items()): if key.endswith("_ref") or not isinstance(val, Path): continue if not val.exists(): continue resolved = val.resolve() try: rel_path = resolved.relative_to(base_root) except ValueError: dest = inputs_dir / val.name shutil.copy2(resolved, dest) config.inputs[key] = dest continue worktree_target = worktree_path / rel_path if not worktree_target.exists(): worktree_target.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(resolved, worktree_target) config.inputs[key] = worktree_target def _snapshot_repo_state(cwd: Path) -> dict[str, str]: """Capture the base repository working-tree state. This is used to detect agentic runs that accidentally modify the original checkout instead of the isolated worktree. We intentionally use only two components: - ``diff``: ``git diff HEAD`` — all tracked changes vs HEAD, combining staged and unstaged so that staging-state changes don't cause false positives. - ``untracked``: SHA-256 hashes of untracked files — detects new or modified untracked files appearing in the base repo. ``git status --short`` and ``git diff --cached`` are NOT used because external tools (IDEs, git hooks) can change staging state during a long-running pipeline, causing spurious failures. """ # Refresh index stat cache to prevent false positives from mtime drift subprocess.run( ["git", "update-index", "--refresh", "-q"], cwd=cwd, capture_output=True, ) # git diff HEAD: all changes (staged + unstaged) vs HEAD diff = subprocess.run( ["git", "diff", "--no-ext-diff", "--binary", "HEAD"], cwd=cwd, capture_output=True, text=True, ) if diff.returncode != 0: return {} untracked = subprocess.run( ["git", "ls-files", "--others", "--exclude-standard", "-z"], cwd=cwd, capture_output=True, ) untracked_parts: list[str] = [] if untracked.returncode == 0 and untracked.stdout: for rel_path in untracked.stdout.decode("utf-8", errors="replace").split("\0"): if not rel_path: continue file_path = cwd / rel_path if file_path.is_file(): digest = sha256(file_path.read_bytes()).hexdigest() untracked_parts.append(f"UNTRACKED {rel_path} {digest}") else: untracked_parts.append(f"UNTRACKED {rel_path} (non-file)") return { "diff": diff.stdout, "untracked": "\n".join(untracked_parts), } def _snapshot_repo_status(cwd: Path) -> str: """Capture a human-readable status summary for error reporting.""" result = subprocess.run( ["git", "status", "--short", "--untracked-files=all"], cwd=cwd, capture_output=True, text=True, ) if result.returncode != 0: return "" return result.stdout.strip() def _assert_base_repo_isolation( cwd: Path, baseline_state: dict[str, str], *, step_name: str, agent_name: str, worktree_path: Path, baseline_status: str, ) -> None: """Fail fast if an agentic run leaked changes into the base repo.""" current_state = _snapshot_repo_state(cwd) if current_state == baseline_state: return # Identify which component(s) actually changed changed: list[str] = [] for key in ("diff", "untracked"): if baseline_state.get(key, "") != current_state.get(key, ""): changed.append(key) if not changed: # State dicts differ only in keys we no longer track — benign. return # untracked-only change: new files appeared — real leak # diff-only change: tracked file content changed — real leak current_status = _snapshot_repo_status(cwd) before = baseline_status or "(clean)" after = current_status or "(clean)" raise WorktreeError( "Agent modified the base repository instead of the isolated worktree.\n\n" f"Step: {step_name}\n" f"Agent: {agent_name}\n" f"Worktree: {worktree_path}\n" f"Changed components: {', '.join(changed)}\n\n" f"Baseline status:\n{before}\n\n" f"Current status:\n{after}" ) def _finalize_worktree( cwd: Path, worktree_path: Path, branch_name: str, preset_name: str, final_verdict: str, ) -> str | None: """Commit changes on the branch, then remove the worktree. The branch survives worktree removal and stays in the original repo. Returns the branch name if changes were committed, None otherwise. """ from cross_eval.worktree import commit_worktree, remove_worktree committed = False try: committed = commit_worktree( worktree_path, f"cross-eval: {preset_name} ({final_verdict})", ) if committed: logger.info(" Agentic changes committed on branch: %s", branch_name) except Exception: logger.warning(" Failed to commit agentic changes", exc_info=True) try: remove_worktree(base_cwd=cwd, work_dir=worktree_path) except Exception: logger.warning("Failed to clean up worktree: %s", worktree_path) # Check if branch has any commits beyond the base (including intermediate # commits from _commit_iteration, not just the final commit). has_branch_commits = False try: result = subprocess.run( ["git", "log", "--oneline", f"HEAD..{branch_name}"], cwd=cwd, capture_output=True, text=True, ) has_branch_commits = bool(result.stdout.strip()) except Exception: pass if has_branch_commits: if not committed: logger.info(" Agentic changes on branch: %s (from intermediate commits)", branch_name) return branch_name # No commits on branch at all — clean up try: subprocess.run( ["git", "branch", "-D", branch_name], cwd=cwd, capture_output=True, ) logger.info(" Deleted empty branch: %s", branch_name) except Exception: pass # best-effort cleanup if not committed: logger.warning(" No agentic changes to commit (empty diff)") return None def _run_simple_pipeline( config: PipelineConfig, run_dir: Path, cwd: Path | None = None, dry_run: bool = False, timeout: int | None = None, ) -> PipelineResult: """Execute a simple (non-phased) pipeline.""" if cwd is None: cwd = Path(os.getcwd()) set_language(config.language) input_contents = _load_inputs(config) runtime_env = _build_runtime_inputs(config, input_contents, cwd or Path(os.getcwd())) # Setup shared worktree for agentic mode worktree_path: Path | None = None agentic_branch_name: str | None = None agentic_base_commit: str | None = None base_repo_state: dict[str, str] | None = None base_repo_status: str | None = None if not dry_run and _has_agentic_steps(config, config.pipeline): worktree_path, agentic_branch_name, agentic_base_commit = _setup_worktree( cwd, run_dir, config.preset_name, ) _copy_inputs_to_worktree(config, worktree_path, base_cwd=cwd) _refresh_input_references(config, input_contents) base_repo_state = _snapshot_repo_state(cwd) base_repo_status = _snapshot_repo_status(cwd) feedback = "(no feedback — first iteration)" iterations: list[IterationResult] = [] start_time = time.monotonic() final_verdict = "MAX_ITERATIONS_REACHED" aggregate_history: dict[str, int] = {} aggregate_warnings: list[str] = [] escalated_issues: list[str] = [] all_feedbacks: list[str] = [] try: for i in range(1, config.max_iterations + 1): config = try_reload_config(config) set_language(config.language) _refresh_inputs(config, input_contents) runtime_env = _build_runtime_inputs(config, input_contents, cwd) logger.info("=" * 50) logger.info(" Iteration %d/%d", i, config.max_iterations) logger.info("=" * 50) step_outputs, step_results, verdict = _run_steps( config.pipeline, config, input_contents, feedback, i, config.max_iterations, cwd, timeout, dry_run, run_dir=run_dir, output_iter=i, worktree_path=worktree_path, runtime_env=runtime_env, base_repo_state=base_repo_state, base_repo_status=base_repo_status, base_commit=agentic_base_commit, ) # Intermediate commit so next iteration's diff only shows new changes if worktree_path is not None: agentic_base_commit = _commit_iteration(worktree_path, config.preset_name, i, verdict) iter_result = IterationResult( iteration=i, step_results=step_results, step_outputs=step_outputs, verdict=verdict, ) warning = _detect_repeated_aggregate( config.pipeline, step_outputs, aggregate_history, iteration=i, ) if warning: iter_result.repeated_aggregate_warning = warning aggregate_warnings.append(warning) logger.warning(" %s", warning) iter_result.feedback = _collect_feedback(config.pipeline, step_outputs) feedback = iter_result.feedback or feedback all_feedbacks.append(feedback) # Extract tracker from verdict/review steps for next iteration for step in config.pipeline: if step.verdict or step.role == "review": tracker = _extract_senior_tracker( step_outputs.get(step.output_key, ""), ) if tracker: input_contents["previous_senior_tracker"] = tracker # Carry execution evidence forward so subsequent iterations' # reviewer/senior prompts can inspect prior transcript data. if step_results: input_contents["execution_evidence"] = _format_execution_evidence( step_results, run_dir=run_dir, iteration=i, ) iterations.append(iter_result) # ESCALATE check (highest priority) if verdict == "ESCALATE": final_verdict = "ESCALATE" for step in config.pipeline: if step.verdict: esc = _extract_escalated_issues( step_outputs.get(step.output_key, ""), ) if esc: escalated_issues.append(esc) iter_result.escalated_issues = esc logger.info(" ESCALATE at iteration %d — stopping loop.", i) break if verdict == "PASS": final_verdict = "PASS" if i >= config.min_iterations: logger.info(" PASS at iteration %d (min=%d reached)!", i, config.min_iterations) break else: logger.info( " PASS at iteration %d, but min_iterations=%d — continuing", i, config.min_iterations, ) # Auto-escalate: no senior/aggregator + repeated FAIL has_aggregator = config.seniors or any( s.prompt_template == "default:aggregate-review" for s in config.pipeline ) if ( verdict == "FAIL" and not has_aggregator and i >= 2 and _detect_auto_escalate(all_feedbacks[:-1], feedback) ): final_verdict = "ESCALATE" auto_msg = ( f"Auto-escalated: same issues detected across {i} iterations " f"without resolution (no senior reviewer configured)." ) escalated_issues.append(auto_msg) iter_result.escalated_issues = auto_msg logger.info(" AUTO-ESCALATE at iteration %d", i) break if dry_run: logger.info(" (dry-run: stopping after iteration 1)") break finally: agentic_branch: str | None = None if worktree_path is not None and agentic_branch_name is not None: agentic_branch = _finalize_worktree( cwd, worktree_path, agentic_branch_name, config.preset_name, final_verdict, ) total_duration = time.monotonic() - start_time pipeline_result = PipelineResult( iterations=iterations, final_verdict=final_verdict, total_duration=round(total_duration, 1), run_dir=run_dir, repeated_aggregate_warnings=aggregate_warnings, escalated_issues=escalated_issues, agentic_branch=agentic_branch, ) if not dry_run: _save_report(run_dir, config, pipeline_result) return pipeline_result def _run_phased_pipeline( config: PipelineConfig, run_dir: Path, cwd: Path | None = None, dry_run: bool = False, timeout: int | None = None, ) -> PipelineResult: """Execute a multi-phase pipeline (e.g. review-fix).""" if cwd is None: cwd = Path(os.getcwd()) set_language(config.language) input_contents = _load_inputs(config) runtime_env = _build_runtime_inputs(config, input_contents, cwd) # Setup shared worktree for agentic mode all_phase_steps = [s for p in config.phases for s in p.steps] worktree_path: Path | None = None agentic_branch_name: str | None = None agentic_base_commit: str | None = None base_repo_state: dict[str, str] | None = None base_repo_status: str | None = None if not dry_run and _has_agentic_steps(config, all_phase_steps): worktree_path, agentic_branch_name, agentic_base_commit = _setup_worktree( cwd, run_dir, config.preset_name, ) _copy_inputs_to_worktree(config, worktree_path, base_cwd=cwd) _refresh_input_references(config, input_contents) base_repo_state = _snapshot_repo_state(cwd) base_repo_status = _snapshot_repo_status(cwd) iterations: list[IterationResult] = [] feedback = "(no feedback — first iteration)" start_time = time.monotonic() final_verdict = "MAX_ITERATIONS_REACHED" global_iter = 0 aggregate_history_by_phase: dict[str, dict[str, int]] = {} aggregate_warnings: list[str] = [] escalated_issues: list[str] = [] all_feedbacks: list[str] = [] escalated = False try: for phase_idx, phase in enumerate(config.phases): if escalated: break logger.info("=" * 60) logger.info( " Phase: %s (max_iter=%d, consecutive_pass=%d)", phase.name, phase.max_iterations, phase.consecutive_pass, ) logger.info("=" * 60) consecutive_passes = 0 phase_converged = False for pi in range(1, phase.max_iterations + 1): global_iter += 1 config = try_reload_config(config) set_language(config.language) _refresh_inputs(config, input_contents) runtime_env = _build_runtime_inputs(config, input_contents, cwd) logger.info("-" * 50) logger.info( " [%s] Iteration %d/%d (global: v%d)", phase.name, pi, phase.max_iterations, global_iter, ) logger.info("-" * 50) step_outputs, step_results, verdict = _run_steps( phase.steps, config, input_contents, feedback, pi, phase.max_iterations, cwd, timeout, dry_run, run_dir=run_dir, output_iter=global_iter, phase_name=phase.name, worktree_path=worktree_path, runtime_env=runtime_env, base_repo_state=base_repo_state, base_repo_status=base_repo_status, base_commit=agentic_base_commit, ) # Intermediate commit so next iteration's diff only shows new changes if worktree_path is not None: agentic_base_commit = _commit_iteration( worktree_path, f"{config.preset_name}/{phase.name}", global_iter, verdict, ) iter_result = IterationResult( iteration=global_iter, step_results=step_results, step_outputs=step_outputs, verdict=verdict, phase_name=phase.name, ) phase_history = aggregate_history_by_phase.setdefault(phase.name, {}) warning = _detect_repeated_aggregate( phase.steps, step_outputs, phase_history, iteration=global_iter, phase_name=phase.name, ) if warning: iter_result.repeated_aggregate_warning = warning aggregate_warnings.append(warning) logger.warning(" %s", warning) iter_result.feedback = _collect_feedback(phase.steps, step_outputs) feedback = iter_result.feedback or feedback all_feedbacks.append(feedback) # Extract tracker from verdict/review steps for step in phase.steps: if step.verdict or step.role == "review": tracker = _extract_senior_tracker( step_outputs.get(step.output_key, ""), ) if tracker: input_contents["previous_senior_tracker"] = tracker # Carry execution evidence forward so subsequent iterations' # reviewer/senior prompts can inspect prior transcript data. if step_results: input_contents["execution_evidence"] = _format_execution_evidence( step_results, run_dir=run_dir, iteration=global_iter, ) iterations.append(iter_result) # ESCALATE check if verdict == "ESCALATE": final_verdict = "ESCALATE" for step in phase.steps: if step.verdict: esc = _extract_escalated_issues( step_outputs.get(step.output_key, ""), ) if esc: escalated_issues.append(esc) iter_result.escalated_issues = esc logger.info( " [%s] ESCALATE at iteration %d — stopping.", phase.name, pi, ) escalated = True break if verdict is None: logger.info( " [%s] completed (no verdict step; single-pass phase)", phase.name, ) phase_converged = True break if verdict == "PASS": consecutive_passes += 1 logger.info( " [%s] PASS (%d/%d consecutive)", phase.name, consecutive_passes, phase.consecutive_pass, ) if consecutive_passes >= phase.consecutive_pass: logger.info( " [%s] Converged! %d consecutive PASSes.", phase.name, phase.consecutive_pass, ) phase_converged = True break else: consecutive_passes = 0 # Auto-escalate in phased pipeline has_aggregator = config.seniors or any( s.prompt_template == "default:aggregate-review" for s in phase.steps ) if ( verdict == "FAIL" and not has_aggregator and pi >= 2 and _detect_auto_escalate(all_feedbacks[:-1], feedback) ): final_verdict = "ESCALATE" auto_msg = ( f"Auto-escalated: same issues detected across {pi} iterations " f"in phase '{phase.name}' without resolution." ) escalated_issues.append(auto_msg) iter_result.escalated_issues = auto_msg logger.info(" [%s] AUTO-ESCALATE at iteration %d", phase.name, pi) escalated = True break if dry_run: break if escalated: break if phase_converged: logger.info(" Phase '%s' completed: CONVERGED", phase.name) else: logger.info( " Phase '%s' completed: max iterations (%d) reached", phase.name, phase.max_iterations, ) if phase_idx == len(config.phases) - 1: final_verdict = "PASS" if phase_converged else "MAX_ITERATIONS_REACHED" finally: agentic_branch: str | None = None if worktree_path is not None and agentic_branch_name is not None: agentic_branch = _finalize_worktree( cwd, worktree_path, agentic_branch_name, config.preset_name, final_verdict, ) total_duration = time.monotonic() - start_time pipeline_result = PipelineResult( iterations=iterations, final_verdict=final_verdict, total_duration=round(total_duration, 1), run_dir=run_dir, repeated_aggregate_warnings=aggregate_warnings, escalated_issues=escalated_issues, agentic_branch=agentic_branch, ) if not dry_run: _save_report(run_dir, config, pipeline_result) return pipeline_result # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _load_inputs(config: PipelineConfig) -> dict[str, str]: """Load input file contents from config.""" input_contents: dict[str, str] = {} for key, val in config.inputs.items(): if key.endswith("_ref"): input_contents[key] = str(val) elif isinstance(val, str): input_contents[key] = val else: input_contents[key] = val.read_text(encoding="utf-8") _refresh_input_references(config, input_contents) return input_contents def _refresh_inputs( config: PipelineConfig, input_contents: dict[str, str], ) -> None: """Re-read input files (they may have changed on disk).""" for key, val in config.inputs.items(): if key.endswith("_ref"): input_contents[key] = str(val) elif isinstance(val, str): input_contents[key] = val elif isinstance(val, Path) and val.exists(): input_contents[key] = val.read_text(encoding="utf-8") _refresh_input_references(config, input_contents) def _refresh_input_references( config: PipelineConfig, input_contents: dict[str, str], ) -> None: """Expose stable file references for canonical planning inputs.""" for key, val in config.inputs.items(): if key.endswith("_ref"): input_contents[key] = str(val) continue ref_key = f"{key}_ref" if isinstance(val, Path): input_contents[ref_key] = str(val.resolve()) else: input_contents.setdefault(ref_key, f"(inline {key}; no file path available)") def _git_ref(cwd: Path, *args: str) -> str: """Best-effort git metadata lookup.""" result = subprocess.run( ["git", *args], cwd=cwd, capture_output=True, text=True, ) if result.returncode != 0: return "(unknown)" return result.stdout.strip() or "(unknown)" def _collect_markdown_refs(run_dir: Path, iteration: int) -> list[Path]: """Collect prior markdown artifacts available to the current step.""" refs: list[Path] = [] for idx in range(1, iteration + 1): iter_dir = run_dir / f"v{idx}" if not iter_dir.exists(): continue refs.extend(sorted(iter_dir.glob("*.md"))) return refs def _build_artifact_references( context: dict[str, str], *, cwd: Path, run_dir: Path, iteration: int, worktree_path: Path | None, step_results: dict[str, AgentResult] | None = None, ) -> str: """Build a compact reference-only handoff for agentic steps.""" repo_cwd = worktree_path or cwd branch = _git_ref(repo_cwd, "rev-parse", "--abbrev-ref", "HEAD") commit_hash = _git_ref(repo_cwd, "rev-parse", "HEAD") lines = [ "### Canonical References", f"- Plan: {context.get('plan_ref', '(missing)')}", f"- Checklist: {context.get('checklist_ref', '(missing)')}", f"- Docs: {context.get('docs_ref', '(none)')}", f"- Run directory: {run_dir}", f"- Current iteration directory: {run_dir / f'v{iteration}'}", f"- Target repository: {repo_cwd}", f"- Git branch: {branch}", f"- Git commit: {commit_hash}", "", "Use git/cat to inspect the referenced files directly instead of relying on inline summaries.", f"Suggested git commands: `git -C {repo_cwd} show {commit_hash}` and `git -C {repo_cwd} diff HEAD`", ] markdown_refs = _collect_markdown_refs(run_dir, iteration) if markdown_refs: lines.extend(["", "### Markdown Artifacts"]) lines.extend(f"- {path}" for path in markdown_refs) if step_results: lines.extend(["", "### Current Step Artifacts"]) for result in step_results.values(): lines.append(f"- Output: {run_dir / f'v{iteration}' / f'{result.step_name}.md'}") if result.transcript: lines.append( f"- Transcript: {run_dir / f'v{iteration}' / f'{result.step_name}_transcript.md'}" ) return "\n".join(lines) # --------------------------------------------------------------------------- # Parallel step grouping # --------------------------------------------------------------------------- def _get_step_dependencies(step: StepConfig) -> set[str]: """Extract output_key references from context_override values.""" deps: set[str] = set() for val in step.context_override.values(): for match in re.finditer(r"\{(\w+)\}", val): deps.add(match.group(1)) return deps def _group_parallel_steps(steps: list[StepConfig]) -> list[list[StepConfig]]: """Group consecutive parallel steps into batches. Consecutive steps with parallel=True are grouped together, but a new batch starts when a step depends on an output_key from a step in the current batch (dependency breaking). """ batches: list[list[StepConfig]] = [] current: list[StepConfig] = [] current_output_keys: set[str] = set() for step in steps: if not step.parallel: if current: batches.append(current) current = [] current_output_keys = set() batches.append([step]) continue # Check if this step depends on any output from the current batch deps = _get_step_dependencies(step) if deps & current_output_keys: batches.append(current) current = [] current_output_keys = set() current.append(step) current_output_keys.add(step.output_key) if current: batches.append(current) return batches # --------------------------------------------------------------------------- # Step execution # --------------------------------------------------------------------------- def _run_steps( steps: list[StepConfig], config: PipelineConfig, input_contents: dict[str, str], feedback: str, iteration: int, max_iterations: int, cwd: Path, timeout: int | None, dry_run: bool, *, run_dir: Path, output_iter: int, phase_name: str | None = None, worktree_path: Path | None = None, runtime_env: dict[str, str] | None = None, base_repo_state: dict[str, str] | None = None, base_repo_status: str | None = None, base_commit: str | None = None, ) -> tuple[dict[str, str], dict[str, AgentResult], str | None]: """Execute all steps in one iteration, parallelizing where possible.""" step_outputs: dict[str, str] = {} step_results: dict[str, AgentResult] = {} verdict: str | None = None batches = _group_parallel_steps(steps) for batch in batches: if len(batch) == 1: step = batch[0] _execute_step( step, config, input_contents, feedback, iteration, max_iterations, cwd, timeout, dry_run, step_outputs, step_results, run_dir=run_dir, output_iter=output_iter, phase_name=phase_name, worktree_path=worktree_path, runtime_env=runtime_env, base_repo_state=base_repo_state, base_repo_status=base_repo_status, base_commit=base_commit, ) else: _execute_parallel_batch( batch, config, input_contents, feedback, iteration, max_iterations, cwd, timeout, dry_run, step_outputs, step_results, run_dir=run_dir, output_iter=output_iter, phase_name=phase_name, worktree_path=worktree_path, runtime_env=runtime_env, base_repo_state=base_repo_state, base_repo_status=base_repo_status, base_commit=base_commit, ) # Extract verdict from all verdict steps (ALL must PASS; ESCALATE wins over all) for step in steps: if step.verdict: output = step_outputs.get(step.output_key, "") step_verdict = _extract_verdict(output, step.verdict_pattern) logger.info(" [%s] verdict: %s", step.name, step_verdict) if step_verdict == "ESCALATE": verdict = "ESCALATE" elif verdict is None: verdict = step_verdict elif verdict != "ESCALATE" and step_verdict == "FAIL": verdict = "FAIL" return step_outputs, step_results, verdict def _invoke_agentic( agent_config: AgentConfig, prompt: str, step_name: str, *, worktree_path: Path, env: dict[str, str] | None = None, timeout: int | None = None, quiet: bool = False, base_commit: str | None = None, ) -> AgentResult: """Run an agent in agentic mode using an existing worktree.""" return invoke_agent_agentic( agent_config, prompt, step_name, worktree_path=worktree_path, env=env, timeout=timeout, quiet=quiet, base_commit=base_commit, ) def _execute_step( step: StepConfig, config: PipelineConfig, input_contents: dict[str, str], feedback: str, iteration: int, max_iterations: int, cwd: Path, timeout: int | None, dry_run: bool, step_outputs: dict[str, str], step_results: dict[str, AgentResult], *, run_dir: Path, output_iter: int, phase_name: str | None = None, quiet: bool = False, worktree_path: Path | None = None, runtime_env: dict[str, str] | None = None, base_repo_state: dict[str, str] | None = None, base_repo_status: str | None = None, base_commit: str | None = None, ) -> None: """Execute a single step, updating step_outputs and step_results in place.""" if not quiet: logger.info(" [%s] agent='%s' role='%s'", step.name, step.agent, step.role) # 1. Resolve template template = resolve_template(step.prompt_template) # 2. Build context (include prior step results for evidence) context = _build_context( input_contents, step_outputs, feedback, iteration, max_iterations, cwd=cwd, run_dir=run_dir, worktree_path=worktree_path, step_results=step_results, ) # 3. Apply context overrides if step.context_override: context = _apply_context_override(context, step.context_override) # 4. Render prompt prompt = render_template(template, context) prompt = _augment_prompt_with_runtime_context(prompt, context) # 5. Dry run: print and skip if dry_run: phase_label = f" phase={phase_name}" if phase_name else "" print(f"\n--- Step: {step.name} (agent={step.agent}{phase_label}) ---") print(prompt) print(f"--- end {step.name} ---\n") step_outputs[step.output_key] = f"(dry-run: no output for {step.output_key})" return # 6. Invoke agent agent_config = config.agents[step.agent] try: if agent_config.agentic and worktree_path: result = _invoke_agentic( agent_config, prompt, step.name, worktree_path=worktree_path, env=runtime_env, timeout=timeout, quiet=quiet, base_commit=base_commit, ) else: # When worktree exists, run non-agentic agents (reviewers) in # the worktree too so they can inspect the modified files. effective_cwd = worktree_path if worktree_path else cwd result = invoke_agent( agent_config, prompt, step.name, cwd=effective_cwd, env=runtime_env, timeout=timeout, quiet=quiet, ) except subprocess.TimeoutExpired as e: stdout = (e.stdout or b"") if isinstance(e.stdout, bytes) else (e.stdout or "") stderr = (e.stderr or b"") if isinstance(e.stderr, bytes) else (e.stderr or "") if isinstance(stdout, bytes): stdout = stdout.decode("utf-8", errors="replace") if isinstance(stderr, bytes): stderr = stderr.decode("utf-8", errors="replace") phase_info = f"- **Phase**: {phase_name}\n" if phase_name else "" error_msg = ( f"# Agent Timeout\n\n" f"{phase_info}" f"- **Step**: {step.name}\n" f"- **Agent**: {step.agent}\n" f"- **Timeout**: {timeout}s\n\n" f"Partial stdout ({len(stdout)} chars):\n" f"```\n{stdout[:2000] or '(none)'}\n```\n\n" f"Stderr:\n```\n{stderr[:2000] or '(none)'}\n```\n" ) _save_step_output(run_dir, output_iter, f"{step.name}_error", error_msg) logger.error(" [%s] TIMEOUT after %ss — saved to output", step.name, timeout) raise RuntimeError( f"Agent '{step.agent}' timed out after {timeout}s at step '{step.name}'. " f"Error saved to {run_dir}/v{output_iter}/{step.name}_error.md. " f"Try --timeout 0 (unlimited)" ) except RuntimeError as e: error_msg = _format_runtime_error_markdown( e, step_name=step.name, agent_name=step.agent, phase_name=phase_name, ) _save_step_output(run_dir, output_iter, f"{step.name}_error", error_msg) logger.error(" [%s] FAILED — saved to output", step.name) raise # 7. Store output if worktree_path is not None and base_repo_state is not None: _assert_base_repo_isolation( cwd, base_repo_state, step_name=step.name, agent_name=step.agent, worktree_path=worktree_path, baseline_status=base_repo_status or "", ) step_outputs[step.output_key] = result.output step_results[step.output_key] = result if not quiet: logger.info( " [%s] completed (%.1fs, %d chars)", step.name, result.duration_seconds, len(result.output), ) # 8. Save to disk _save_step_output(run_dir, output_iter, step.name, result.output) _maybe_save_step_transcript(run_dir, output_iter, step.name, result) def _execute_parallel_batch( batch: list[StepConfig], config: PipelineConfig, input_contents: dict[str, str], feedback: str, iteration: int, max_iterations: int, cwd: Path, timeout: int | None, dry_run: bool, step_outputs: dict[str, str], step_results: dict[str, AgentResult], *, run_dir: Path, output_iter: int, phase_name: str | None = None, worktree_path: Path | None = None, runtime_env: dict[str, str] | None = None, base_repo_state: dict[str, str] | None = None, base_repo_status: str | None = None, base_commit: str | None = None, ) -> None: """Execute multiple steps in parallel using threads.""" agent_names = ", ".join(s.agent for s in batch) logger.info(" [parallel] %d agents: %s", len(batch), agent_names) if dry_run: for step in batch: _execute_step( step, config, input_contents, feedback, iteration, max_iterations, cwd, timeout, dry_run, step_outputs, step_results, run_dir=run_dir, output_iter=output_iter, phase_name=phase_name, base_repo_state=base_repo_state, base_repo_status=base_repo_status, base_commit=base_commit, ) return # Agentic steps cannot run in parallel (they share a worktree) agentic_in_batch = [ s for s in batch if config.agents.get(s.agent, AgentConfig(name="", command="")).agentic ] if len(agentic_in_batch) > 1: logger.warning( " [parallel] %d agentic steps cannot run concurrently — running sequentially", len(agentic_in_batch), ) for step in batch: _execute_step( step, config, input_contents, feedback, iteration, max_iterations, cwd, timeout, dry_run, step_outputs, step_results, run_dir=run_dir, output_iter=output_iter, phase_name=phase_name, worktree_path=worktree_path, base_repo_state=base_repo_state, base_repo_status=base_repo_status, base_commit=base_commit, ) return # Snapshot context before parallel execution (all steps see same state) context_snapshot = dict(input_contents) context_snapshot.update(step_outputs) results_snapshot = dict(step_results) # Collect results from parallel threads local_outputs: dict[str, str] = {} local_results: dict[str, AgentResult] = {} errors: list[tuple[StepConfig, Exception]] = [] # Show a single spinner for the batch from cross_eval.agent import _Spinner spinner = _Spinner( f"[parallel] {len(batch)} agents running ({agent_names})..." ) spinner.start() batch_start = time.monotonic() def _run_one(step: StepConfig) -> tuple[str, str, AgentResult]: """Run one step, return (output_key, output, result).""" template = resolve_template(step.prompt_template) context = _build_context( context_snapshot, {}, feedback, iteration, max_iterations, cwd=cwd, run_dir=run_dir, worktree_path=worktree_path, step_results=results_snapshot, ) if step.context_override: context = _apply_context_override(context, step.context_override) prompt = render_template(template, context) prompt = _augment_prompt_with_runtime_context(prompt, context) agent_config = config.agents[step.agent] if agent_config.agentic and worktree_path: result = _invoke_agentic( agent_config, prompt, step.name, worktree_path=worktree_path, env=runtime_env, timeout=timeout, quiet=True, base_commit=base_commit, ) else: effective_cwd = worktree_path if worktree_path else cwd result = invoke_agent( agent_config, prompt, step.name, cwd=effective_cwd, env=runtime_env, timeout=timeout, quiet=True, ) return step.output_key, result.output, result with ThreadPoolExecutor(max_workers=len(batch)) as executor: futures = {executor.submit(_run_one, step): step for step in batch} for future in as_completed(futures): step = futures[future] try: output_key, output, result = future.result() local_results[output_key] = result local_outputs[output_key] = output except Exception as e: errors.append((step, e)) batch_elapsed = round(time.monotonic() - batch_start, 1) # Persist successful outputs even if a sibling step failed. if worktree_path is not None and base_repo_state is not None: _assert_base_repo_isolation( cwd, base_repo_state, step_name=phase_name or "parallel-batch", agent_name=agent_names, worktree_path=worktree_path, baseline_status=base_repo_status or "", ) for step in batch: key = step.output_key if key not in local_outputs: continue step_outputs[key] = local_outputs[key] step_results[key] = local_results[key] r = local_results[key] logger.info( " [%s] completed (%.1fs, %d chars)", step.name, r.duration_seconds, len(r.output), ) _save_step_output(run_dir, output_iter, step.name, r.output) _maybe_save_step_transcript(run_dir, output_iter, step.name, r) if errors: spinner.stop(f"[parallel] FAILED ({batch_elapsed}s)") for failed_step, exc in errors: if isinstance(exc, subprocess.TimeoutExpired): stdout = (exc.stdout or b"") if isinstance(exc.stdout, bytes) else (exc.stdout or "") stderr = (exc.stderr or b"") if isinstance(exc.stderr, bytes) else (exc.stderr or "") if isinstance(stdout, bytes): stdout = stdout.decode("utf-8", errors="replace") if isinstance(stderr, bytes): stderr = stderr.decode("utf-8", errors="replace") phase_info = f"- **Phase**: {phase_name}\n" if phase_name else "" error_msg = ( f"# Agent Timeout\n\n" f"{phase_info}" f"- **Step**: {failed_step.name}\n" f"- **Agent**: {failed_step.agent}\n" f"- **Timeout**: {timeout}s\n\n" f"Partial stdout ({len(stdout)} chars):\n" f"```\n{stdout[:2000] or '(none)'}\n```\n\n" f"Stderr:\n```\n{stderr[:2000] or '(none)'}\n```\n" ) else: error_msg = _format_runtime_error_markdown( exc, step_name=failed_step.name, agent_name=failed_step.agent, phase_name=phase_name, ) _save_step_output(run_dir, output_iter, f"{failed_step.name}_error", error_msg) logger.error(" [%s] FAILED — saved to output", failed_step.name) failed_steps = ", ".join(step.name for step, _ in errors) saved_steps = ", ".join(step.name for step in batch if step.output_key in local_outputs) first_error = errors[0][1] saved_note = f" Successful outputs were saved for: {saved_steps}." if saved_steps else "" raise RuntimeError( f"Parallel batch failed: {len(errors)}/{len(batch)} steps failed ({failed_steps})." f"{saved_note} First error:\n{first_error}" ) spinner.stop(f"[parallel] {len(batch)} agents done ({batch_elapsed}s)") # --------------------------------------------------------------------------- # Context and template helpers # --------------------------------------------------------------------------- def _build_context( input_contents: dict[str, str], step_outputs: dict[str, str], feedback: str, iteration: int, max_iterations: int, *, cwd: Path | None = None, run_dir: Path | None = None, worktree_path: Path | None = None, step_results: dict[str, AgentResult] | None = None, ) -> dict[str, str]: """Build the template context dict. Execution evidence from prior iterations is carried forward in ``input_contents["execution_evidence"]``. When the current iteration has its own step results, the evidence is merged so reviewers/seniors see both prior and current data. """ context: dict[str, str] = {} context.update(input_contents) context.update(step_outputs) context["feedback"] = feedback context["iteration"] = str(iteration) context["max_iterations"] = str(max_iterations) ref_cwd = cwd or Path.cwd() ref_run_dir = run_dir or ref_cwd / ".cross-eval" / "output" / "ad-hoc" context["artifact_references"] = _build_artifact_references( context, cwd=ref_cwd, run_dir=ref_run_dir, iteration=iteration, worktree_path=worktree_path, step_results=step_results, ) # Surface execution evidence from prior steps so reviewers can inspect it. # Prior-iteration evidence may already live in context via input_contents. prior_evidence = context.get("execution_evidence", "") if step_results: current_evidence = _format_execution_evidence( step_results, run_dir=ref_run_dir, iteration=iteration, ) if prior_evidence and prior_evidence != "(no prior execution evidence)": context["execution_evidence"] = ( "# Prior Iteration Evidence\n" + prior_evidence + "\n\n# Current Iteration Evidence\n" + current_evidence ) else: context["execution_evidence"] = current_evidence return context def _format_execution_evidence( step_results: dict[str, AgentResult], *, run_dir: Path | None = None, iteration: int | None = None, ) -> str: """Format execution evidence from prior steps for reviewer consumption. Produces a compact summary of command, exit code, duration, and artifact paths so that later agents can read markdown/git state directly. """ if not step_results: return "(no prior execution evidence)" parts: list[str] = [] for key, result in step_results.items(): section = [ f"### Step: {result.step_name} ({result.agent_name})", f"- Command: `{result.command_preview}`" if result.command_preview else "", f"- Exit code: {result.exit_code}", f"- Duration: {result.duration_seconds}s", f"- Output size: {len(result.output)} chars", ] section = [line for line in section if line] if run_dir is not None and iteration is not None: section.append(f"- Output artifact: {run_dir / f'v{iteration}' / f'{result.step_name}.md'}") if result.transcript: section.append( f"- Transcript artifact: {run_dir / f'v{iteration}' / f'{result.step_name}_transcript.md'}" ) parts.append("\n".join(section)) return "\n\n---\n\n".join(parts) def _build_runtime_inputs( config: PipelineConfig, input_contents: dict[str, str], cwd: Path, ) -> dict[str, str]: """Load runtime env and expose safe execution hints to prompts.""" env, loaded_files, loaded_values = build_runtime_environment(config.execution, cwd) discovery = discover_repo(cwd, set(loaded_values) | set(env)) input_contents["execution_policy"] = build_execution_policy(config.execution) input_contents["environment_context"] = summarize_environment( config.execution, loaded_files, env, loaded_values, ) input_contents["repo_discovery"] = format_repo_discovery(discovery) return env def _augment_prompt_with_runtime_context( prompt: str, context: dict[str, str], ) -> str: """Append execution/env guidance without requiring every template to include placeholders.""" extras: list[str] = [] if context.get("execution_policy"): extras.append("## Execution Policy\n" + context["execution_policy"]) if context.get("environment_context"): extras.append("## Environment Context\n" + context["environment_context"]) if context.get("repo_discovery"): extras.append("## Repository Discovery\n" + context["repo_discovery"]) if not extras: return prompt return prompt.rstrip() + "\n\n" + "\n\n".join(extras) + "\n" def _apply_context_override( context: dict[str, str], overrides: dict[str, str], ) -> dict[str, str]: """Apply context_override mappings for cross-review scenarios.""" result = dict(context) for key, value_template in overrides.items(): result[key] = render_template(value_template, context) return result def _collect_feedback( steps: list[StepConfig], step_outputs: dict[str, str], ) -> str: """Collect feedback from all verdict steps. Single verdict step → raw output (backward compatible). Multiple verdict steps → combined with agent headers for cross-referencing. """ verdict_steps = [s for s in steps if s.verdict] if len(verdict_steps) == 1: return step_outputs.get(verdict_steps[0].output_key, "") parts: list[str] = [] for s in verdict_steps: output = step_outputs.get(s.output_key, "") if output: parts.append(f"## Review by {s.agent} ({s.name})\n{output}") return "\n\n---\n\n".join(parts) def _detect_repeated_aggregate( steps: list[StepConfig], step_outputs: dict[str, str], history: dict[str, int], *, iteration: int, phase_name: str | None = None, ) -> str | None: """Detect repeated aggregate-review outputs across iterations.""" for step in steps: if step.prompt_template != "default:aggregate-review": continue output = step_outputs.get(step.output_key, "") normalized = _normalize_aggregate_output(output) if not normalized: return None if normalized in history: prev_iter = history[normalized] phase_prefix = f"[{phase_name}] " if phase_name else "" return ( f"{phase_prefix}Repeated aggregate_review detected at iteration {iteration} " f"(same as iteration {prev_iter})." ) history[normalized] = iteration return None return None def _normalize_aggregate_output(output: str) -> str: """Normalize aggregate output for repeat detection.""" return " ".join(output.lower().split()) _ESCALATE_PATTERN = re.compile(r"VERDICT:\s*ESCALATE", re.IGNORECASE) _TRACKER_TABLE_PATTERN = re.compile( r"(##+ Issue Tracker[^\n]*\n(?:\|[^\n]+\|\n?)+)", re.DOTALL, ) def _extract_verdict(output: str, pattern: str) -> str: """Extract PASS, FAIL, or ESCALATE from output using regex pattern.""" if re.search(_ESCALATE_PATTERN, output): return "ESCALATE" # highest priority if re.search(pattern, output): return "PASS" return "FAIL" def _extract_senior_tracker(output: str) -> str: """Extract Issue Tracker table from senior review output.""" match = _TRACKER_TABLE_PATTERN.search(output) return match.group(0) if match else "" def _extract_escalated_issues(output: str) -> str: """Extract escalation details from senior review output.""" # Look for content between VERDICT: ESCALATE and end, or an escalation section pattern = r"(?:###?\s*Escalat(?:ed|ion).*?\n)(.*?)(?=\n###|\Z)" match = re.search(pattern, output, re.DOTALL | re.IGNORECASE) if match: return match.group(1).strip() # Fallback: grab the Action Items section pattern2 = r"(?:###?\s*Action Items.*?\n)(.*?)(?=\n###|\Z)" match2 = re.search(pattern2, output, re.DOTALL | re.IGNORECASE) if match2: return match2.group(1).strip() return "" _FP_PATTERN = re.compile(r"[\w/\\]+\.\w{1,5}") _ISSUE_KEYWORDS = re.compile( r"\b(missing|validation|error[\s_-]?handling|unused|import|" r"injection|auth(?:entication|orization)?|deprecated|" r"leak|overflow|null|undefined|timeout|deadlock|race[\s_-]?condition|" r"security|permission|encoding|format|parsing|connection|" r"boundary|initialization|cleanup|resource|concurrency|" r"exception|crash|hang|corrupt|truncat|duplicat|inconsisten|" r"omission|over[\s_-]?engineer|refactor|naming|docstring|" r"type[\s_-]?hint|test|coverage|logging|config|performance)\w*", re.IGNORECASE, ) def _issue_fingerprints(text: str) -> set[tuple[str, str]]: """Extract (file_path, issue_keyword) pairs from feedback text. For each file path found, look for issue keywords within a window of ~120 characters around the file path mention and create composite keys. """ lower = text.lower() paths = list(_FP_PATTERN.finditer(lower)) if not paths: return set() pairs: set[tuple[str, str]] = set() for m in paths: fp = m.group() # Search a window around the file path for issue keywords window_start = max(0, m.start() - 60) window_end = min(len(lower), m.end() + 60) window = lower[window_start:window_end] for kw_match in _ISSUE_KEYWORDS.finditer(window): pairs.add((fp, kw_match.group().lower())) return pairs def _detect_auto_escalate( feedbacks: list[str], current_feedback: str, threshold: int = 2, ) -> bool: """Detect repeated identical issues across iterations (for auto-escalation). Extracts (file_path, issue_keyword) fingerprints from feedback and checks if any identical pair appears in >= *threshold* previous iterations. This avoids false positives when the same file is mentioned for completely different issues across iterations. """ current_fps = _issue_fingerprints(current_feedback) if not current_fps: return False repeat_count = 0 for prev in feedbacks: prev_fps = _issue_fingerprints(prev) if current_fps & prev_fps: repeat_count += 1 return repeat_count >= threshold def _save_step_output( run_dir: Path, iteration: int, step_name: str, content: str, ) -> Path: """Save step output to run_dir/v{iteration}/{step_name}.md""" path = run_dir / f"v{iteration}" / f"{step_name}.md" path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") return path def _maybe_save_step_transcript( run_dir: Path, iteration: int, step_name: str, result: AgentResult, ) -> Path | None: """Persist raw stdout/stderr transcript when available.""" if not result.transcript: return None return _save_step_output( run_dir, iteration, f"{step_name}_transcript", result.transcript, ) def _format_runtime_error_markdown( exc: Exception, *, step_name: str, agent_name: str, phase_name: str | None = None, ) -> str: """Render a structured markdown error report for a failed step.""" phase_info = f"- **Phase**: {phase_name}\n" if phase_name else "" lines = [ "# Agent Error", "", phase_info.rstrip(), f"- **Step**: {step_name}", f"- **Agent**: {agent_name}", ] lines = [line for line in lines if line] if isinstance(exc, AgentInvocationError): lines.extend( [ f"- **Failure Type**: {exc.failure_type}", f"- **Suggested Action**: {exc.suggested_action}", "", "## Command", "```", exc.cmd_preview, "```", "", "## Raw Error", "```", exc.raw_error, "```", ], ) else: lines.extend( [ "", "```", str(exc), "```", ], ) return "\n".join(lines) + "\n" def _save_report(run_dir: Path, config: PipelineConfig, result: PipelineResult) -> None: """Build and save the final markdown report.""" report = build_report(config, result) report_path = run_dir / "final-report.md" report_path.parent.mkdir(parents=True, exist_ok=True) report_path.write_text(report, encoding="utf-8") logger.info("Report saved: %s", report_path)