diff --git a/cross_eval.egg-info/PKG-INFO b/cross_eval.egg-info/PKG-INFO
index 1eeed19..0f402eb 100644
--- a/cross_eval.egg-info/PKG-INFO
+++ b/cross_eval.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 2.4
Name: cross-eval
-Version: 0.1.0
+Version: 0.2.0
Summary: AI agent cross-evaluation CLI tool
Requires-Python: >=3.9
Requires-Dist: pyyaml>=6.0
diff --git a/cross_eval.egg-info/SOURCES.txt b/cross_eval.egg-info/SOURCES.txt
index 8272bb0..26a3503 100644
--- a/cross_eval.egg-info/SOURCES.txt
+++ b/cross_eval.egg-info/SOURCES.txt
@@ -10,12 +10,15 @@ cross_eval/models.py
cross_eval/pipeline.py
cross_eval/prompts.py
cross_eval/report.py
+cross_eval/runtime_env.py
+cross_eval/worktree.py
cross_eval.egg-info/PKG-INFO
cross_eval.egg-info/SOURCES.txt
cross_eval.egg-info/dependency_links.txt
cross_eval.egg-info/entry_points.txt
cross_eval.egg-info/requires.txt
cross_eval.egg-info/top_level.txt
+tests/test_agentic.py
tests/test_config.py
tests/test_onboarding.py
-tests/test_pipeline_integration.py
\ No newline at end of file
+tests/test_pipeline_integration.py
diff --git a/cross_eval/__init__.py b/cross_eval/__init__.py
index 3dc1f76..d3ec452 100644
--- a/cross_eval/__init__.py
+++ b/cross_eval/__init__.py
@@ -1 +1 @@
-__version__ = "0.1.0"
+__version__ = "0.2.0"
diff --git a/cross_eval/agent.py b/cross_eval/agent.py
index 8fb6ef4..243c4a9 100644
--- a/cross_eval/agent.py
+++ b/cross_eval/agent.py
@@ -3,8 +3,10 @@ from __future__ import annotations
import itertools
import logging
+import os
import subprocess
import sys
+import tempfile
import threading
import time
from pathlib import Path
@@ -142,11 +144,17 @@ class _Spinner:
sys.stderr.flush()
+def _is_print_mode(args: list[str]) -> bool:
+ """Check if the agent args include -p / --print flag."""
+ return "-p" in args or "--print" in args
+
+
def invoke_agent(
agent: AgentConfig,
prompt: str,
step_name: str,
cwd: Optional[Path] = None,
+ env: Optional[dict[str, str]] = None,
timeout: int | None = None,
quiet: bool = False,
) -> AgentResult:
@@ -155,30 +163,67 @@ def invoke_agent(
Args:
quiet: If True, suppress spinner (for parallel execution).
"""
+ is_claude = "claude" in agent.command
+ is_interactive = is_claude and not _is_print_mode(agent.args)
+
cmd = [agent.command]
if agent.reasoning_effort and _supports_reasoning_effort(agent.command):
cmd.extend(["-c", f'model_reasoning_effort="{agent.reasoning_effort}"'])
cmd.extend(agent.args)
- # Build the full prompt (system prompt + user prompt)
- if agent.system_prompt and _supports_system_prompt_flag(agent.command):
- # claude: --system-prompt flag supported natively
- cmd.extend(["--system-prompt", agent.system_prompt])
- input_data = prompt
- elif agent.system_prompt:
- # codex, others: no --system-prompt flag, prepend to prompt
- input_data = (
- f"\n{agent.system_prompt}\n\n\n"
- f"{prompt}"
+ # --- Temp files for interactive (non -p) claude ---
+ task_file: Optional[Path] = None
+ output_file: Optional[Path] = None
+
+ if is_interactive:
+ # Write prompt + output instruction to temp task file
+ task_fd, task_path = tempfile.mkstemp(suffix=".md", prefix="cross_eval_task_")
+ task_file = Path(task_path)
+ os.close(task_fd)
+
+ out_fd, out_path = tempfile.mkstemp(suffix=".md", prefix="cross_eval_out_")
+ output_file = Path(out_path)
+ os.close(out_fd)
+ # Clear the output file so we can detect if agent wrote to it
+ output_file.write_text("", encoding="utf-8")
+
+ wrapped_prompt = (
+ f"{prompt}\n\n"
+ f"---\n"
+ f"IMPORTANT: Write your COMPLETE response to this file: {output_file}\n"
+ f"Do NOT modify any other files in the project."
)
+ task_file.write_text(wrapped_prompt, encoding="utf-8")
+
+ # System prompt via flag
+ if agent.system_prompt and _supports_system_prompt_flag(agent.command):
+ cmd.extend(["--system-prompt", agent.system_prompt])
+
+ # Positional arg: point claude to the task file
+ cmd.append(
+ f"Read the task file at {task_file} and follow all instructions in it. "
+ f"Write your complete output to {output_file}."
+ )
+ input_data: str | None = None
else:
- input_data = prompt
+ # Print mode (-p) or non-claude: deliver prompt via stdin
+ if agent.system_prompt and _supports_system_prompt_flag(agent.command):
+ cmd.extend(["--system-prompt", agent.system_prompt])
+ input_data = prompt
+ elif agent.system_prompt:
+ input_data = (
+ f"\n{agent.system_prompt}\n\n\n"
+ f"{prompt}"
+ )
+ else:
+ input_data = prompt
logger.debug("Invoking agent '%s': %s", agent.name, " ".join(cmd[:5]) + " ...")
spinner: Optional[_Spinner] = None
if not quiet:
- logger.info(" cmd: %s", " ".join(cmd[:6]))
+ mode_label = "interactive" if is_interactive else ""
+ logger.info(" cmd: %s %s", " ".join(cmd[:6]), f"({mode_label})" if mode_label else "")
spinner = _Spinner(f"[{step_name}] {agent.name} running...")
spinner.start()
@@ -191,6 +236,7 @@ def invoke_agent(
text=True,
timeout=timeout,
cwd=cwd,
+ env=env,
)
duration = time.monotonic() - start
except subprocess.TimeoutExpired:
@@ -201,10 +247,154 @@ def invoke_agent(
if spinner:
spinner.stop(f"[{step_name}] ERROR")
raise
+ finally:
+ if task_file:
+ task_file.unlink(missing_ok=True)
+
+ if result.returncode != 0:
+ if spinner:
+ spinner.stop(f"[{step_name}] FAILED (exit {result.returncode})")
+ if output_file:
+ output_file.unlink(missing_ok=True)
+ err_detail = result.stderr.strip() or result.stdout.strip()
+ if err_detail and len(err_detail) > 500:
+ err_detail = err_detail[:500] + "..."
+ cmd_preview = " ".join(cmd[:6])
+ failure_type, suggested_action = _classify_agent_failure(err_detail or "")
+ raise AgentInvocationError(
+ agent_name=agent.name,
+ step_name=step_name,
+ cmd_preview=cmd_preview,
+ raw_error=err_detail or "(no output)",
+ failure_type=failure_type,
+ suggested_action=suggested_action,
+ )
+
+ # --- Capture output ---
+ if output_file:
+ output = output_file.read_text(encoding="utf-8").strip()
+ output_file.unlink(missing_ok=True)
+ if not output:
+ # Fallback to stdout if agent didn't write to the file
+ output = result.stdout.strip()
+ else:
+ output = result.stdout.strip()
- output = result.stdout.strip()
chars = len(output)
+ if spinner:
+ spinner.stop(f"[{step_name}] done — {chars} chars")
+
+ if not output:
+ stderr_info = result.stderr.strip()
+ if stderr_info:
+ logger.warning(
+ "Agent '%s' produced empty output at step '%s'. stderr: %s",
+ agent.name, step_name, stderr_info[:500],
+ )
+ else:
+ logger.warning(
+ "Agent '%s' produced empty output at step '%s' (no stderr either)",
+ agent.name, step_name,
+ )
+
+ return AgentResult(
+ output=output,
+ exit_code=result.returncode,
+ agent_name=agent.name,
+ step_name=step_name,
+ duration_seconds=round(duration, 1),
+ )
+
+
+def invoke_agent_agentic(
+ agent: AgentConfig,
+ prompt: str,
+ step_name: str,
+ worktree_path: Path,
+ env: Optional[dict[str, str]] = None,
+ timeout: int | None = None,
+ quiet: bool = False,
+) -> AgentResult:
+ """Invoke an agent in agentic mode (no -p, runs in worktree, captures git diff).
+
+ The agent runs without print mode so it can modify files directly.
+ After the agent exits, git diff (since last commit) is captured as the output.
+ """
+ from cross_eval.worktree import capture_diff
+
+ # Write prompt to a temp file (outside worktree, won't appear in diffs)
+ import tempfile
+ task_fd, task_path = tempfile.mkstemp(suffix=".md", prefix="cross_eval_task_")
+ task_file = Path(task_path)
+ task_file.write_text(prompt, encoding="utf-8")
+ os.close(task_fd)
+
+ cmd = [agent.command]
+ if agent.reasoning_effort and _supports_reasoning_effort(agent.command):
+ cmd.extend(["-c", f'model_reasoning_effort="{agent.reasoning_effort}"'])
+
+ # Strip stdin sentinel ("-") from args for agentic mode
+ args = [a for a in agent.args if a != "-"]
+ cmd.extend(args)
+
+ # System prompt via flag if supported
+ if agent.system_prompt and _supports_system_prompt_flag(agent.command):
+ cmd.extend(["--system-prompt", agent.system_prompt])
+
+ # Deliver the prompt differently per agent type
+ is_codex = "codex" in agent.command
+ input_data: str | None = None
+ if is_codex:
+ # codex: stdin mode
+ cmd.append("-")
+ if agent.system_prompt and not _supports_system_prompt_flag(agent.command):
+ input_data = f"\n{agent.system_prompt}\n\n\n{prompt}"
+ else:
+ input_data = prompt
+ else:
+ # claude: use positional arg with a pointer to the task file
+ # (avoids OS arg length limits for large prompts)
+ cmd.append(
+ f"Read the task file at {task_file} and execute all instructions in it. "
+ f"Work in the current directory."
+ )
+
+ logger.debug(
+ "Invoking agent '%s' (agentic) in worktree: %s",
+ agent.name, worktree_path,
+ )
+
+ spinner: Optional[_Spinner] = None
+ if not quiet:
+ logger.info(" cmd: %s (agentic)", " ".join(cmd[:6]))
+ spinner = _Spinner(f"[{step_name}] {agent.name} (agentic) running...")
+ spinner.start()
+
+ try:
+ start = time.monotonic()
+ result = subprocess.run(
+ cmd,
+ input=input_data,
+ capture_output=True,
+ text=True,
+ timeout=timeout,
+ cwd=worktree_path,
+ env=env,
+ )
+ duration = time.monotonic() - start
+ except subprocess.TimeoutExpired:
+ if spinner:
+ spinner.stop(f"[{step_name}] TIMEOUT after {timeout}s")
+ raise
+ except Exception:
+ if spinner:
+ spinner.stop(f"[{step_name}] ERROR")
+ raise
+ finally:
+ # Clean up temp task file (it's in /tmp, not in worktree)
+ task_file.unlink(missing_ok=True)
+
if result.returncode != 0:
if spinner:
spinner.stop(f"[{step_name}] FAILED (exit {result.returncode})")
@@ -222,17 +412,22 @@ def invoke_agent(
suggested_action=suggested_action,
)
- if spinner:
- spinner.stop(f"[{step_name}] done — {chars} chars")
+ # Capture git diff as the output (changes since last commit on the branch)
+ diff_output = capture_diff(worktree_path)
- if not output:
+ if not diff_output:
+ diff_output = "(no changes)"
logger.warning(
- "Agent '%s' produced empty output at step '%s'",
+ "Agent '%s' made no file changes at step '%s'",
agent.name, step_name,
)
+ chars = len(diff_output)
+ if spinner:
+ spinner.stop(f"[{step_name}] done — {chars} chars (agentic)")
+
return AgentResult(
- output=output,
+ output=diff_output,
exit_code=result.returncode,
agent_name=agent.name,
step_name=step_name,
diff --git a/cross_eval/cli.py b/cross_eval/cli.py
index 45d424a..7d10bb8 100644
--- a/cross_eval/cli.py
+++ b/cross_eval/cli.py
@@ -49,7 +49,7 @@ max_iterations: 3
language: {language}
# 결과 저장 경로
-output_dir: output
+output_dir: .cross-eval/output
# ─── 커스텀 에이전트 (선택) ────────────────────────────────────
# 기본 제공 에이전트를 덮어쓰거나 새 에이전트를 정의할 수 있습니다.
@@ -372,6 +372,14 @@ def main(argv: list[str] | None = None) -> int:
"--input", action="append", dest="inputs", metavar="KEY=PATH",
help="추가 입력 파일 (예: --input spec=./api-spec.md)",
)
+ input_group.add_argument(
+ "--env-file", action="append", dest="env_files", type=Path, default=None,
+ help="에이전트 subprocess에 주입할 추가 .env 파일 (여러 개 가능)",
+ )
+ input_group.add_argument(
+ "--target", action="append", dest="execution_targets", default=None,
+ help="에이전트에게 강조할 실행 대상 힌트 (예: clickhouse, postgres)",
+ )
# -- 에이전트 설정 --
agent_group = run_parser.add_argument_group(
@@ -410,6 +418,10 @@ def main(argv: list[str] | None = None) -> int:
choices=REASONING_EFFORT_CHOICES + ("extra-high", "extra_high", "x-high"),
help="Senior용 reasoning effort",
)
+ agent_group.add_argument(
+ "--agentic", action="store_true", default=False,
+ help="Coder를 agentic 모드로 실행 (worktree에서 파일 직접 수정, git diff로 결과 캡처)",
+ )
agent_group.add_argument(
"--model", default=None, metavar="MODEL",
help="모든 에이전트의 모델을 한번에 변경 (예: sonnet, opus)",
@@ -761,7 +773,7 @@ def _generate_guided_config(
"",
f"max_iterations: {settings['max_iter']}",
f"language: {lang}",
- "output_dir: output",
+ "output_dir: .cross-eval/output",
"",
])
@@ -799,20 +811,19 @@ def _apply_model_override(config, agent_name: str, model: str) -> None:
def _apply_phased_iteration_override(config, max_iter: int | None) -> None:
"""Apply CLI max-iter to converging phases while preserving setup phases."""
- if max_iter is None:
- return
+ from cross_eval.config import sync_phased_iterations
- for phase in config.phases:
- if any(step.verdict for step in phase.steps):
- phase.max_iterations = max_iter
+ sync_phased_iterations(config, max_iter)
def cmd_run(args: argparse.Namespace) -> int:
"""Load config, validate, and execute the pipeline."""
from cross_eval.config import (
+ ensure_fix_preset_agentic,
apply_input_overrides,
default_config,
load_config,
+ sync_phased_iterations,
validate_config,
)
from cross_eval.prompts import PIPELINE_PRESETS
@@ -917,6 +928,10 @@ def cmd_run(args: argparse.Namespace) -> int:
if preset in {"plan-review", "review-only"} and args.max_iter is None and args.min_iter is None:
config.max_iterations = 1
+ sync_phased_iterations(config)
+ if args.max_iter is not None:
+ sync_phased_iterations(config, args.max_iter)
+
apply_reasoning_effort_settings(
config,
reasoning_effort=args.reasoning_effort,
@@ -925,6 +940,15 @@ def cmd_run(args: argparse.Namespace) -> int:
senior_effort=args.senior_effort,
)
+ # --agentic: convert coder agents to agentic mode
+ if args.agentic:
+ from cross_eval.config import _make_agentic
+ for coder_name in config.coders:
+ if coder_name in config.agents:
+ _make_agentic(config.agents[coder_name])
+
+ ensure_fix_preset_agentic(config)
+
# --model: apply to ALL agents
if args.model is not None:
for agent_name in config.agents:
@@ -958,6 +982,17 @@ def cmd_run(args: argparse.Namespace) -> int:
return 1
config.inputs["docs"] = docs_content
+ if args.env_files:
+ for env_file in args.env_files:
+ resolved = env_file.resolve()
+ if not resolved.exists():
+ print(f"Env file not found: {resolved}", file=sys.stderr)
+ return 1
+ config.execution.env_files.append(str(resolved))
+
+ if args.execution_targets:
+ config.execution.auto_context_targets = list(args.execution_targets)
+
if args.inputs:
overrides = {}
for item in args.inputs:
diff --git a/cross_eval/config.py b/cross_eval/config.py
index c9751f8..3fa73d4 100644
--- a/cross_eval/config.py
+++ b/cross_eval/config.py
@@ -1,6 +1,7 @@
"""Configuration loading, validation, and preset resolution."""
from __future__ import annotations
+import copy
import logging
import re
from pathlib import Path
@@ -8,7 +9,13 @@ from typing import Any
import yaml
-from cross_eval.models import AgentConfig, PhaseConfig, PipelineConfig, StepConfig
+from cross_eval.models import (
+ AgentConfig,
+ ExecutionConfig,
+ PhaseConfig,
+ PipelineConfig,
+ StepConfig,
+)
from cross_eval.prompts import PHASED_PRESETS, PIPELINE_PRESETS
logger = logging.getLogger(__name__)
@@ -24,6 +31,7 @@ DEFAULT_ROLE_REASONING_EFFORTS = {
"reviewer": "medium",
"senior": "high",
}
+FIX_STYLE_PRESETS = {"review-fix", "coding-review-fix"}
# ---------------------------------------------------------------------------
@@ -54,7 +62,12 @@ _CLAUDE_CODER_ARGS = list(_CLAUDE_BASE_ARGS) + [
"bypassPermissions",
]
-_CLAUDE_REVIEW_ARGS = list(_CLAUDE_BASE_ARGS) + [
+_CLAUDE_REVIEW_ARGS = [
+ "--setting-sources",
+ "user",
+ "--disable-slash-commands",
+ "--model",
+ "opus",
"--permission-mode",
"plan",
]
@@ -64,29 +77,37 @@ _CODER_SYSTEM_PROMPT = (
"Rules:\n"
"1. FIRST explore the project directory to understand the existing codebase, "
"patterns, and conventions before writing any code.\n"
- "2. Implement ONLY what the plan specifies. Do NOT add extra features, "
+ "2. You may decide which shell, Python, git, docker, test, and database commands "
+ "to run. The user does not need to pre-specify exact commands.\n"
+ "3. Environment variables from configured .env files may already be loaded into "
+ "your process; use them when validating services such as ClickHouse.\n"
+ "4. Implement ONLY what the plan specifies. Do NOT add extra features, "
"unnecessary abstractions, premature optimizations, or \"nice-to-have\" improvements.\n"
- "3. Follow the project's existing coding style, naming conventions, and directory structure.\n"
- "4. If previous review feedback is provided, fix ONLY the specific issues mentioned. "
+ "5. Follow the project's existing coding style, naming conventions, and directory structure.\n"
+ "6. If previous review feedback is provided, fix ONLY the specific issues mentioned. "
"Do NOT refactor unrelated code.\n"
- "5. Ignore any items from previous feedback that were marked as DISMISSED or false positive.\n"
- "6. When in doubt about scope, do LESS, not more."
+ "7. Ignore any items from previous feedback that were marked as DISMISSED or false positive.\n"
+ "8. When in doubt about scope, do LESS, not more."
)
_REVIEWER_SYSTEM_PROMPT = (
"You are a code reviewer. You MUST NOT create, modify, or delete any files.\n"
"Rules:\n"
"1. Explore the project directory to understand the full codebase context.\n"
- "2. Compare the implementation against the plan and checklist ONLY.\n"
- "3. Classify every issue with BOTH severity AND category:\n"
+ "2. You may decide which shell, Python, test, git, docker, and database read commands "
+ "to run in order to verify behavior. The user does not need to pre-specify exact commands.\n"
+ "3. Environment variables from configured .env files may already be loaded into "
+ "your process; use them for verification when relevant.\n"
+ "4. Compare the implementation against the plan and checklist ONLY.\n"
+ "5. Classify every issue with BOTH severity AND category:\n"
" - Severity: Critical (breaks functionality/security) > Major (requirement mismatch) > Minor (convention/style)\n"
" - Category: Over-engineering / Omission\n"
- "4. When reviewing with previous feedback, mark items as CONFIRMED (still an issue) "
+ "6. When reviewing with previous feedback, mark items as CONFIRMED (still an issue) "
"or DISMISSED (false positive) with rationale.\n"
- "5. Report out-of-scope issues separately — problems found outside plan/checklist scope.\n"
- "6. Order issues by severity (Critical first).\n"
- "7. Do NOT suggest improvements beyond the plan scope.\n"
- "8. End with VERDICT: PASS (all requirements met, no over-engineering) "
+ "7. Report out-of-scope issues separately — problems found outside plan/checklist scope.\n"
+ "8. Order issues by severity (Critical first).\n"
+ "9. Do NOT suggest improvements beyond the plan scope.\n"
+ "10. End with VERDICT: PASS (all requirements met, no over-engineering) "
"or VERDICT: FAIL (issues found)."
)
@@ -94,16 +115,20 @@ _SENIOR_SYSTEM_PROMPT = (
"You are a senior technical reviewer coordinating a review-fix-verification loop.\n"
"Rules:\n"
"1. Explore the project directory to understand the full codebase context.\n"
- "2. In aggregation mode, deduplicate overlaps, resolve disagreements, and keep only "
+ "2. You may decide which shell, Python, test, git, docker, and database read commands "
+ "to run to verify disputed issues. The user does not need to pre-specify exact commands.\n"
+ "3. Environment variables from configured .env files may already be loaded into "
+ "your process; use them when validating service integrations.\n"
+ "4. In aggregation mode, deduplicate overlaps, resolve disagreements, and keep only "
"evidence-backed issues. Categorize dismissed findings as [False positive] or [Already fixed].\n"
- "3. In verification mode, judge the current implementation directly against ONLY the "
+ "5. In verification mode, judge the current implementation directly against ONLY the "
"plan and checklist.\n"
- "4. Be skeptical of false positives, but do not lower the bar on real requirement "
+ "6. Be skeptical of false positives, but do not lower the bar on real requirement "
"gaps.\n"
- "5. When issues remain, produce a concise prioritized action list the coder can act on.\n"
- "6. Maintain an Issue Tracker table across iterations to track issue status.\n"
- "7. Do NOT invent new requirements beyond the plan and checklist.\n"
- "8. End with one of three verdicts:\n"
+ "7. When issues remain, produce a concise prioritized action list the coder can act on.\n"
+ "8. Maintain an Issue Tracker table across iterations to track issue status.\n"
+ "9. Do NOT invent new requirements beyond the plan and checklist.\n"
+ "10. End with one of three verdicts:\n"
" - VERDICT: PASS — all requirements met, no issues remain.\n"
" - VERDICT: FAIL — issues found that the coder can fix.\n"
" - VERDICT: ESCALATE — issues that require human intervention. Use ESCALATE when:\n"
@@ -263,7 +288,7 @@ def _resolve_agents(
for name in all_referenced:
if name not in result and name in BUILTIN_AGENTS:
- result[name] = BUILTIN_AGENTS[name]
+ result[name] = copy.deepcopy(BUILTIN_AGENTS[name])
return result
@@ -354,15 +379,16 @@ def _apply_role_effort(
def default_config() -> PipelineConfig:
"""Return a PipelineConfig with sensible defaults (no YAML needed)."""
- agents = dict(BUILTIN_AGENTS)
+ agents = copy.deepcopy(BUILTIN_AGENTS)
coders = ["claude-coder"]
reviewers = ["claude-reviewer"]
seniors: list[str] = []
pipeline = PIPELINE_PRESETS["simple"](coders, reviewers, seniors)
return PipelineConfig(
- output_dir=Path("output"),
+ output_dir=Path(".cross-eval/output"),
max_iterations=3,
language="ko",
+ execution=ExecutionConfig(),
inputs={},
agents=agents,
coders=coders,
@@ -406,6 +432,7 @@ def _parse_raw(raw: dict[str, Any], config_path: Path) -> PipelineConfig:
system_prompt=agent_data.get("system_prompt"),
reasoning_effort=agent_data.get("reasoning_effort"),
stdin_mode=agent_data.get("stdin_mode", False),
+ agentic=agent_data.get("agentic", False),
)
# --- roles: explicit or inferred ---
@@ -445,6 +472,17 @@ def _parse_raw(raw: dict[str, Any], config_path: Path) -> PipelineConfig:
p = config_dir / p
inputs[key] = p
+ execution_raw = raw.get("execution", {}) or {}
+ execution = ExecutionConfig(
+ mode=execution_raw.get("mode", "agent-decides"),
+ command_policy=execution_raw.get("command_policy", "broad"),
+ inherit_env=bool(execution_raw.get("inherit_env", True)),
+ auto_env_files=list(execution_raw.get("auto_env_files", [".env", ".env.local"])),
+ env_files=list(execution_raw.get("env_files", [])),
+ expose_env_names=bool(execution_raw.get("expose_env_names", True)),
+ auto_context_targets=list(execution_raw.get("auto_context_targets", [])),
+ )
+
# --- pipeline (preset or custom) ---
steps, phases = _resolve_pipeline(pipeline_raw, coders, reviewers, seniors)
@@ -453,12 +491,13 @@ def _parse_raw(raw: dict[str, Any], config_path: Path) -> PipelineConfig:
if isinstance(pipeline_raw, str) and pipeline_raw.startswith("preset:"):
preset_name = pipeline_raw.split(":", 1)[1]
- return PipelineConfig(
- output_dir=Path(raw.get("output_dir", "output")),
+ config = PipelineConfig(
+ output_dir=Path(raw.get("output_dir", ".cross-eval/output")),
max_iterations=int(raw.get("max_iterations", 3)),
min_iterations=int(raw.get("min_iterations", 1)),
verbose=bool(raw.get("verbose", False)),
language=raw.get("language", "en"),
+ execution=execution,
inputs=inputs,
agents=agents,
coders=coders,
@@ -470,6 +509,9 @@ def _parse_raw(raw: dict[str, Any], config_path: Path) -> PipelineConfig:
_config_path=config_path,
_config_mtime=config_path.stat().st_mtime,
)
+ sync_phased_iterations(config)
+ ensure_fix_preset_agentic(config)
+ return config
def try_reload_config(config: PipelineConfig) -> PipelineConfig:
@@ -619,6 +661,16 @@ def validate_config(config: PipelineConfig) -> list[str]:
if config.language not in ("en", "ko"):
errors.append(f"Unsupported language '{config.language}'. Use 'en' or 'ko'.")
+ if config.execution.mode not in {"agent-decides"}:
+ errors.append(
+ f"Unsupported execution.mode '{config.execution.mode}'. Use 'agent-decides'."
+ )
+ if config.execution.command_policy not in {"broad", "restricted"}:
+ errors.append(
+ "Unsupported execution.command_policy "
+ f"'{config.execution.command_policy}'. Use 'broad' or 'restricted'."
+ )
+
return errors
@@ -642,6 +694,37 @@ def _validate_unique_step_fields(
seen_output_keys.add(step.output_key)
+def _make_agentic(agent: AgentConfig) -> None:
+ """Convert an agent to agentic mode in-place (remove -p, set agentic=True)."""
+ agent.agentic = True
+ agent.args = [a for a in agent.args if a != "-p"]
+
+
+def sync_phased_iterations(
+ config: PipelineConfig,
+ max_iter: int | None = None,
+) -> None:
+ """Apply effective max iterations to converging phases while preserving setup phases."""
+ if not config.phases:
+ return
+
+ effective_max_iter = config.max_iterations if max_iter is None else max_iter
+ for phase in config.phases:
+ if any(step.verdict for step in phase.steps):
+ phase.max_iterations = effective_max_iter
+
+
+def ensure_fix_preset_agentic(config: PipelineConfig) -> None:
+ """Fix-style presets should modify code, so coders run agentically by default."""
+ if config.preset_name not in FIX_STYLE_PRESETS:
+ return
+
+ for coder_name in config.coders:
+ agent = config.agents.get(coder_name)
+ if agent is not None and not agent.agentic:
+ _make_agentic(agent)
+
+
def apply_input_overrides(
config: PipelineConfig, overrides: dict[str, str]
) -> None:
diff --git a/cross_eval/demo.py b/cross_eval/demo.py
index f02ce9f..ee8ffa2 100644
--- a/cross_eval/demo.py
+++ b/cross_eval/demo.py
@@ -265,7 +265,7 @@ def run_live_demo(
checklist_path.write_text(DEMO_CHECKLIST, encoding="utf-8")
config = PipelineConfig(
- output_dir=Path("output"),
+ output_dir=Path(".cross-eval/output"),
max_iterations=3,
language="en",
inputs={"plan": plan_path, "checklist": checklist_path},
diff --git a/cross_eval/models.py b/cross_eval/models.py
index 8fa29ad..45b4066 100644
--- a/cross_eval/models.py
+++ b/cross_eval/models.py
@@ -16,6 +16,7 @@ class AgentConfig:
system_prompt: Optional[str] = None
reasoning_effort: Optional[str] = None
stdin_mode: bool = False
+ agentic: bool = False # run in worktree, capture git diff instead of stdout
@dataclass
@@ -43,15 +44,29 @@ class PhaseConfig:
consecutive_pass: int = 1 # stop after N consecutive PASSes
+@dataclass
+class ExecutionConfig:
+ """Runtime execution policy for agent subprocesses."""
+
+ mode: str = "agent-decides"
+ command_policy: str = "broad"
+ inherit_env: bool = True
+ auto_env_files: list[str] = field(default_factory=lambda: [".env", ".env.local"])
+ env_files: list[str] = field(default_factory=list)
+ expose_env_names: bool = True
+ auto_context_targets: list[str] = field(default_factory=list)
+
+
@dataclass
class PipelineConfig:
"""Full cross-eval configuration."""
- output_dir: Path = field(default_factory=lambda: Path("output"))
+ output_dir: Path = field(default_factory=lambda: Path(".cross-eval/output"))
max_iterations: int = 3
min_iterations: int = 1
verbose: bool = False
language: str = "en" # "en" or "ko"
+ execution: ExecutionConfig = field(default_factory=ExecutionConfig)
inputs: dict[str, Path | str] = field(default_factory=dict)
agents: dict[str, AgentConfig] = field(default_factory=dict)
coders: list[str] = field(default_factory=list)
@@ -118,3 +133,4 @@ class PipelineResult:
run_dir: Optional[Path] = None
repeated_aggregate_warnings: list[str] = field(default_factory=list)
escalated_issues: list[str] = field(default_factory=list)
+ agentic_branch: Optional[str] = None
diff --git a/cross_eval/pipeline.py b/cross_eval/pipeline.py
index 7981cfe..b31fc8d 100644
--- a/cross_eval/pipeline.py
+++ b/cross_eval/pipeline.py
@@ -10,9 +10,11 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
-from cross_eval.agent import AgentInvocationError, invoke_agent
+from cross_eval.agent import AgentInvocationError, invoke_agent, invoke_agent_agentic
+from cross_eval.worktree import WorktreeError
from cross_eval.config import try_reload_config
from cross_eval.models import (
+ AgentConfig,
AgentResult,
IterationResult,
PipelineConfig,
@@ -21,6 +23,11 @@ from cross_eval.models import (
)
from cross_eval.prompts import render_template, resolve_template, set_language
from cross_eval.report import build_report
+from cross_eval.runtime_env import (
+ build_execution_policy,
+ build_runtime_environment,
+ summarize_environment,
+)
logger = logging.getLogger(__name__)
@@ -48,6 +55,104 @@ def _make_run_dir(config: PipelineConfig) -> Path:
return run_dir
+def _commit_iteration(
+ worktree_path: Path,
+ label: str,
+ iteration: int,
+ verdict: str | None,
+) -> None:
+ """Intermediate commit after each agentic iteration.
+
+ This resets the diff baseline so the next iteration only captures new changes.
+ """
+ from cross_eval.worktree import commit_worktree
+ committed = commit_worktree(
+ worktree_path,
+ f"cross-eval: {label} v{iteration} ({verdict or 'no-verdict'})",
+ )
+ if committed:
+ logger.debug(" Intermediate commit: v%d (%s)", iteration, verdict)
+
+
+def _has_agentic_steps(config: PipelineConfig, steps: list[StepConfig]) -> bool:
+ """Check if any step uses an agentic agent."""
+ return any(
+ config.agents.get(s.agent, AgentConfig(name="", command="")).agentic
+ for s in steps
+ )
+
+
+def _setup_worktree(cwd: Path, run_dir: Path, preset_name: str) -> tuple[Path, str]:
+ """Create a shared worktree for the entire pipeline run.
+
+ 1. Generate branch name (cross-eval/_)
+ 2. Create branch from HEAD
+ 3. Create worktree on that branch
+
+ Returns (worktree_path, branch_name).
+ """
+ from cross_eval.worktree import create_worktree, make_branch_name
+ branch_name = make_branch_name(preset_name)
+ worktree_dir = run_dir / "work"
+ worktree_path = create_worktree(
+ base_cwd=cwd, work_dir=worktree_dir, branch_name=branch_name,
+ )
+ return worktree_path, branch_name
+
+
+def _finalize_worktree(
+ cwd: Path,
+ worktree_path: Path,
+ branch_name: str,
+ preset_name: str,
+ final_verdict: str,
+) -> str | None:
+ """Commit changes on the branch, then remove the worktree.
+
+ The branch survives worktree removal and stays in the original repo.
+ Returns the branch name if changes were committed, None otherwise.
+ """
+ from cross_eval.worktree import commit_worktree, remove_worktree
+
+ committed = False
+ try:
+ committed = commit_worktree(
+ worktree_path,
+ f"cross-eval: {preset_name} ({final_verdict})",
+ )
+ if committed:
+ logger.info(" Agentic changes committed on branch: %s", branch_name)
+ else:
+ logger.warning(" No agentic changes to commit (empty diff)")
+ except Exception:
+ logger.warning(" Failed to commit agentic changes", exc_info=True)
+
+ try:
+ remove_worktree(base_cwd=cwd, work_dir=worktree_path)
+ except Exception:
+ logger.warning("Failed to clean up worktree: %s", worktree_path)
+
+ # Check if branch has any commits beyond the base — if not, delete it
+ if not committed:
+ try:
+ # Check if branch has diverged from its base
+ result = subprocess.run(
+ ["git", "log", "--oneline", f"HEAD..{branch_name}"],
+ cwd=cwd, capture_output=True, text=True,
+ )
+ if not result.stdout.strip():
+ # No commits on branch beyond base — clean up
+ subprocess.run(
+ ["git", "branch", "-D", branch_name],
+ cwd=cwd, capture_output=True,
+ )
+ logger.info(" Deleted empty branch: %s", branch_name)
+ except Exception:
+ pass # best-effort cleanup
+
+ return branch_name if committed else None
+
+
def _run_simple_pipeline(
config: PipelineConfig,
run_dir: Path,
@@ -61,6 +166,15 @@ def _run_simple_pipeline(
set_language(config.language)
input_contents = _load_inputs(config)
+ runtime_env = _build_runtime_inputs(config, input_contents, cwd or Path(os.getcwd()))
+
+ # Setup shared worktree for agentic mode
+ worktree_path: Path | None = None
+ agentic_branch_name: str | None = None
+ if not dry_run and _has_agentic_steps(config, config.pipeline):
+ worktree_path, agentic_branch_name = _setup_worktree(
+ cwd, run_dir, config.preset_name,
+ )
feedback = "(no feedback — first iteration)"
iterations: list[IterationResult] = []
@@ -71,99 +185,114 @@ def _run_simple_pipeline(
escalated_issues: list[str] = []
all_feedbacks: list[str] = []
- for i in range(1, config.max_iterations + 1):
- config = try_reload_config(config)
- set_language(config.language)
- _refresh_inputs(config, input_contents)
+ try:
+ for i in range(1, config.max_iterations + 1):
+ config = try_reload_config(config)
+ set_language(config.language)
+ _refresh_inputs(config, input_contents)
+ runtime_env = _build_runtime_inputs(config, input_contents, cwd)
- logger.info("=" * 50)
- logger.info(" Iteration %d/%d", i, config.max_iterations)
- logger.info("=" * 50)
+ logger.info("=" * 50)
+ logger.info(" Iteration %d/%d", i, config.max_iterations)
+ logger.info("=" * 50)
- step_outputs, step_results, verdict = _run_steps(
- config.pipeline, config, input_contents, feedback,
- i, config.max_iterations, cwd, timeout, dry_run,
- run_dir=run_dir, output_iter=i,
- )
+ step_outputs, step_results, verdict = _run_steps(
+ config.pipeline, config, input_contents, feedback,
+ i, config.max_iterations, cwd, timeout, dry_run,
+ run_dir=run_dir, output_iter=i,
+ worktree_path=worktree_path,
+ runtime_env=runtime_env,
+ )
- iter_result = IterationResult(
- iteration=i,
- step_results=step_results,
- step_outputs=step_outputs,
- verdict=verdict,
- )
- warning = _detect_repeated_aggregate(
- config.pipeline, step_outputs, aggregate_history, iteration=i,
- )
- if warning:
- iter_result.repeated_aggregate_warning = warning
- aggregate_warnings.append(warning)
- logger.warning(" %s", warning)
+ # Intermediate commit so next iteration's diff only shows new changes
+ if worktree_path is not None:
+ _commit_iteration(worktree_path, config.preset_name, i, verdict)
- iter_result.feedback = _collect_feedback(config.pipeline, step_outputs)
- feedback = iter_result.feedback or feedback
- all_feedbacks.append(feedback)
+ iter_result = IterationResult(
+ iteration=i,
+ step_results=step_results,
+ step_outputs=step_outputs,
+ verdict=verdict,
+ )
+ warning = _detect_repeated_aggregate(
+ config.pipeline, step_outputs, aggregate_history, iteration=i,
+ )
+ if warning:
+ iter_result.repeated_aggregate_warning = warning
+ aggregate_warnings.append(warning)
+ logger.warning(" %s", warning)
- # Extract tracker from verdict/review steps for next iteration
- for step in config.pipeline:
- if step.verdict or step.role == "review":
- tracker = _extract_senior_tracker(
- step_outputs.get(step.output_key, ""),
- )
- if tracker:
- input_contents["previous_senior_tracker"] = tracker
+ iter_result.feedback = _collect_feedback(config.pipeline, step_outputs)
+ feedback = iter_result.feedback or feedback
+ all_feedbacks.append(feedback)
- iterations.append(iter_result)
-
- # ESCALATE check (highest priority)
- if verdict == "ESCALATE":
- final_verdict = "ESCALATE"
- # Extract escalation details from verdict step outputs
+ # Extract tracker from verdict/review steps for next iteration
for step in config.pipeline:
- if step.verdict:
- esc = _extract_escalated_issues(
+ if step.verdict or step.role == "review":
+ tracker = _extract_senior_tracker(
step_outputs.get(step.output_key, ""),
)
- if esc:
- escalated_issues.append(esc)
- iter_result.escalated_issues = esc
- logger.info(" ESCALATE at iteration %d — stopping loop.", i)
- break
+ if tracker:
+ input_contents["previous_senior_tracker"] = tracker
- if verdict == "PASS":
- final_verdict = "PASS"
- if i >= config.min_iterations:
- logger.info(" PASS at iteration %d (min=%d reached)!", i, config.min_iterations)
+ iterations.append(iter_result)
+
+ # ESCALATE check (highest priority)
+ if verdict == "ESCALATE":
+ final_verdict = "ESCALATE"
+ for step in config.pipeline:
+ if step.verdict:
+ esc = _extract_escalated_issues(
+ step_outputs.get(step.output_key, ""),
+ )
+ if esc:
+ escalated_issues.append(esc)
+ iter_result.escalated_issues = esc
+ logger.info(" ESCALATE at iteration %d — stopping loop.", i)
break
- else:
- logger.info(
- " PASS at iteration %d, but min_iterations=%d — continuing",
- i, config.min_iterations,
- )
- # Auto-escalate: no senior/aggregator + repeated FAIL
- has_aggregator = config.seniors or any(
- s.prompt_template == "default:aggregate-review" for s in config.pipeline
- )
- if (
- verdict == "FAIL"
- and not has_aggregator
- and i >= 2
- and _detect_auto_escalate(all_feedbacks[:-1], feedback)
- ):
- final_verdict = "ESCALATE"
- auto_msg = (
- f"Auto-escalated: same issues detected across {i} iterations "
- f"without resolution (no senior reviewer configured)."
+ if verdict == "PASS":
+ final_verdict = "PASS"
+ if i >= config.min_iterations:
+ logger.info(" PASS at iteration %d (min=%d reached)!", i, config.min_iterations)
+ break
+ else:
+ logger.info(
+ " PASS at iteration %d, but min_iterations=%d — continuing",
+ i, config.min_iterations,
+ )
+
+ # Auto-escalate: no senior/aggregator + repeated FAIL
+ has_aggregator = config.seniors or any(
+ s.prompt_template == "default:aggregate-review" for s in config.pipeline
)
- escalated_issues.append(auto_msg)
- iter_result.escalated_issues = auto_msg
- logger.info(" AUTO-ESCALATE at iteration %d", i)
- break
+ if (
+ verdict == "FAIL"
+ and not has_aggregator
+ and i >= 2
+ and _detect_auto_escalate(all_feedbacks[:-1], feedback)
+ ):
+ final_verdict = "ESCALATE"
+ auto_msg = (
+ f"Auto-escalated: same issues detected across {i} iterations "
+ f"without resolution (no senior reviewer configured)."
+ )
+ escalated_issues.append(auto_msg)
+ iter_result.escalated_issues = auto_msg
+ logger.info(" AUTO-ESCALATE at iteration %d", i)
+ break
- if dry_run:
- logger.info(" (dry-run: stopping after iteration 1)")
- break
+ if dry_run:
+ logger.info(" (dry-run: stopping after iteration 1)")
+ break
+
+ finally:
+ agentic_branch: str | None = None
+ if worktree_path is not None and agentic_branch_name is not None:
+ agentic_branch = _finalize_worktree(
+ cwd, worktree_path, agentic_branch_name,
+ config.preset_name, final_verdict,
+ )
total_duration = time.monotonic() - start_time
@@ -174,6 +303,7 @@ def _run_simple_pipeline(
run_dir=run_dir,
repeated_aggregate_warnings=aggregate_warnings,
escalated_issues=escalated_issues,
+ agentic_branch=agentic_branch,
)
if not dry_run:
@@ -195,6 +325,16 @@ def _run_phased_pipeline(
set_language(config.language)
input_contents = _load_inputs(config)
+ runtime_env = _build_runtime_inputs(config, input_contents, cwd)
+
+ # Setup shared worktree for agentic mode
+ all_phase_steps = [s for p in config.phases for s in p.steps]
+ worktree_path: Path | None = None
+ agentic_branch_name: str | None = None
+ if not dry_run and _has_agentic_steps(config, all_phase_steps):
+ worktree_path, agentic_branch_name = _setup_worktree(
+ cwd, run_dir, config.preset_name,
+ )
iterations: list[IterationResult] = []
feedback = "(no feedback — first iteration)"
@@ -207,152 +347,171 @@ def _run_phased_pipeline(
all_feedbacks: list[str] = []
escalated = False
- for phase_idx, phase in enumerate(config.phases):
- if escalated:
- break
+ try:
+ for phase_idx, phase in enumerate(config.phases):
+ if escalated:
+ break
- logger.info("=" * 60)
- logger.info(
- " Phase: %s (max_iter=%d, consecutive_pass=%d)",
- phase.name, phase.max_iterations, phase.consecutive_pass,
- )
- logger.info("=" * 60)
-
- consecutive_passes = 0
- phase_converged = False
-
- for pi in range(1, phase.max_iterations + 1):
- global_iter += 1
-
- config = try_reload_config(config)
- set_language(config.language)
- _refresh_inputs(config, input_contents)
-
- logger.info("-" * 50)
+ logger.info("=" * 60)
logger.info(
- " [%s] Iteration %d/%d (global: v%d)",
- phase.name, pi, phase.max_iterations, global_iter,
+ " Phase: %s (max_iter=%d, consecutive_pass=%d)",
+ phase.name, phase.max_iterations, phase.consecutive_pass,
)
- logger.info("-" * 50)
+ logger.info("=" * 60)
- step_outputs, step_results, verdict = _run_steps(
- phase.steps, config, input_contents, feedback,
- pi, phase.max_iterations, cwd, timeout, dry_run,
- run_dir=run_dir, output_iter=global_iter, phase_name=phase.name,
- )
+ consecutive_passes = 0
+ phase_converged = False
- iter_result = IterationResult(
- iteration=global_iter,
- step_results=step_results,
- step_outputs=step_outputs,
- verdict=verdict,
- phase_name=phase.name,
- )
- phase_history = aggregate_history_by_phase.setdefault(phase.name, {})
- warning = _detect_repeated_aggregate(
- phase.steps, step_outputs, phase_history, iteration=global_iter,
- phase_name=phase.name,
- )
- if warning:
- iter_result.repeated_aggregate_warning = warning
- aggregate_warnings.append(warning)
- logger.warning(" %s", warning)
+ for pi in range(1, phase.max_iterations + 1):
+ global_iter += 1
- iter_result.feedback = _collect_feedback(phase.steps, step_outputs)
- feedback = iter_result.feedback or feedback
- all_feedbacks.append(feedback)
+ config = try_reload_config(config)
+ set_language(config.language)
+ _refresh_inputs(config, input_contents)
+ runtime_env = _build_runtime_inputs(config, input_contents, cwd)
- # Extract tracker from verdict/review steps
- for step in phase.steps:
- if step.verdict or step.role == "review":
- tracker = _extract_senior_tracker(
- step_outputs.get(step.output_key, ""),
+ logger.info("-" * 50)
+ logger.info(
+ " [%s] Iteration %d/%d (global: v%d)",
+ phase.name, pi, phase.max_iterations, global_iter,
+ )
+ logger.info("-" * 50)
+
+ step_outputs, step_results, verdict = _run_steps(
+ phase.steps, config, input_contents, feedback,
+ pi, phase.max_iterations, cwd, timeout, dry_run,
+ run_dir=run_dir, output_iter=global_iter, phase_name=phase.name,
+ worktree_path=worktree_path,
+ runtime_env=runtime_env,
+ )
+
+ # Intermediate commit so next iteration's diff only shows new changes
+ if worktree_path is not None:
+ _commit_iteration(
+ worktree_path, f"{config.preset_name}/{phase.name}",
+ global_iter, verdict,
)
- if tracker:
- input_contents["previous_senior_tracker"] = tracker
- iterations.append(iter_result)
+ iter_result = IterationResult(
+ iteration=global_iter,
+ step_results=step_results,
+ step_outputs=step_outputs,
+ verdict=verdict,
+ phase_name=phase.name,
+ )
+ phase_history = aggregate_history_by_phase.setdefault(phase.name, {})
+ warning = _detect_repeated_aggregate(
+ phase.steps, step_outputs, phase_history, iteration=global_iter,
+ phase_name=phase.name,
+ )
+ if warning:
+ iter_result.repeated_aggregate_warning = warning
+ aggregate_warnings.append(warning)
+ logger.warning(" %s", warning)
- # ESCALATE check
- if verdict == "ESCALATE":
- final_verdict = "ESCALATE"
+ iter_result.feedback = _collect_feedback(phase.steps, step_outputs)
+ feedback = iter_result.feedback or feedback
+ all_feedbacks.append(feedback)
+
+ # Extract tracker from verdict/review steps
for step in phase.steps:
- if step.verdict:
- esc = _extract_escalated_issues(
+ if step.verdict or step.role == "review":
+ tracker = _extract_senior_tracker(
step_outputs.get(step.output_key, ""),
)
- if esc:
- escalated_issues.append(esc)
- iter_result.escalated_issues = esc
- logger.info(
- " [%s] ESCALATE at iteration %d — stopping.",
- phase.name, pi,
- )
- escalated = True
- break
+ if tracker:
+ input_contents["previous_senior_tracker"] = tracker
- if verdict is None:
- logger.info(
- " [%s] completed (no verdict step; single-pass phase)",
- phase.name,
- )
- phase_converged = True
- break
+ iterations.append(iter_result)
- if verdict == "PASS":
- consecutive_passes += 1
- logger.info(
- " [%s] PASS (%d/%d consecutive)",
- phase.name, consecutive_passes, phase.consecutive_pass,
- )
- if consecutive_passes >= phase.consecutive_pass:
+ # ESCALATE check
+ if verdict == "ESCALATE":
+ final_verdict = "ESCALATE"
+ for step in phase.steps:
+ if step.verdict:
+ esc = _extract_escalated_issues(
+ step_outputs.get(step.output_key, ""),
+ )
+ if esc:
+ escalated_issues.append(esc)
+ iter_result.escalated_issues = esc
logger.info(
- " [%s] Converged! %d consecutive PASSes.",
- phase.name, phase.consecutive_pass,
+ " [%s] ESCALATE at iteration %d — stopping.",
+ phase.name, pi,
+ )
+ escalated = True
+ break
+
+ if verdict is None:
+ logger.info(
+ " [%s] completed (no verdict step; single-pass phase)",
+ phase.name,
)
phase_converged = True
break
- else:
- consecutive_passes = 0
- # Auto-escalate in phased pipeline
- has_aggregator = config.seniors or any(
- s.prompt_template == "default:aggregate-review" for s in phase.steps
- )
- if (
- verdict == "FAIL"
- and not has_aggregator
- and pi >= 2
- and _detect_auto_escalate(all_feedbacks[:-1], feedback)
- ):
- final_verdict = "ESCALATE"
- auto_msg = (
- f"Auto-escalated: same issues detected across {pi} iterations "
- f"in phase '{phase.name}' without resolution."
+ if verdict == "PASS":
+ consecutive_passes += 1
+ logger.info(
+ " [%s] PASS (%d/%d consecutive)",
+ phase.name, consecutive_passes, phase.consecutive_pass,
+ )
+ if consecutive_passes >= phase.consecutive_pass:
+ logger.info(
+ " [%s] Converged! %d consecutive PASSes.",
+ phase.name, phase.consecutive_pass,
+ )
+ phase_converged = True
+ break
+ else:
+ consecutive_passes = 0
+
+ # Auto-escalate in phased pipeline
+ has_aggregator = config.seniors or any(
+ s.prompt_template == "default:aggregate-review" for s in phase.steps
)
- escalated_issues.append(auto_msg)
- iter_result.escalated_issues = auto_msg
- logger.info(" [%s] AUTO-ESCALATE at iteration %d", phase.name, pi)
- escalated = True
+ if (
+ verdict == "FAIL"
+ and not has_aggregator
+ and pi >= 2
+ and _detect_auto_escalate(all_feedbacks[:-1], feedback)
+ ):
+ final_verdict = "ESCALATE"
+ auto_msg = (
+ f"Auto-escalated: same issues detected across {pi} iterations "
+ f"in phase '{phase.name}' without resolution."
+ )
+ escalated_issues.append(auto_msg)
+ iter_result.escalated_issues = auto_msg
+ logger.info(" [%s] AUTO-ESCALATE at iteration %d", phase.name, pi)
+ escalated = True
+ break
+
+ if dry_run:
+ break
+
+ if escalated:
break
- if dry_run:
- break
+ if phase_converged:
+ logger.info(" Phase '%s' completed: CONVERGED", phase.name)
+ else:
+ logger.info(
+ " Phase '%s' completed: max iterations (%d) reached",
+ phase.name, phase.max_iterations,
+ )
- if escalated:
- break
+ if phase_idx == len(config.phases) - 1:
+ final_verdict = "PASS" if phase_converged else "MAX_ITERATIONS_REACHED"
- if phase_converged:
- logger.info(" Phase '%s' completed: CONVERGED", phase.name)
- else:
- logger.info(
- " Phase '%s' completed: max iterations (%d) reached",
- phase.name, phase.max_iterations,
+ finally:
+ agentic_branch: str | None = None
+ if worktree_path is not None and agentic_branch_name is not None:
+ agentic_branch = _finalize_worktree(
+ cwd, worktree_path, agentic_branch_name,
+ config.preset_name, final_verdict,
)
- if phase_idx == len(config.phases) - 1:
- final_verdict = "PASS" if phase_converged else "MAX_ITERATIONS_REACHED"
-
total_duration = time.monotonic() - start_time
pipeline_result = PipelineResult(
@@ -362,6 +521,7 @@ def _run_phased_pipeline(
run_dir=run_dir,
repeated_aggregate_warnings=aggregate_warnings,
escalated_issues=escalated_issues,
+ agentic_branch=agentic_branch,
)
if not dry_run:
@@ -463,6 +623,8 @@ def _run_steps(
run_dir: Path,
output_iter: int,
phase_name: str | None = None,
+ worktree_path: Path | None = None,
+ runtime_env: dict[str, str] | None = None,
) -> tuple[dict[str, str], dict[str, AgentResult], str | None]:
"""Execute all steps in one iteration, parallelizing where possible."""
step_outputs: dict[str, str] = {}
@@ -473,21 +635,23 @@ def _run_steps(
for batch in batches:
if len(batch) == 1:
- # Single step — run directly
step = batch[0]
_execute_step(
step, config, input_contents, feedback,
iteration, max_iterations, cwd, timeout, dry_run,
step_outputs, step_results,
- run_dir=run_dir, output_iter=output_iter, phase_name=phase_name,
+ run_dir=run_dir, output_iter=output_iter,
+ phase_name=phase_name, worktree_path=worktree_path,
+ runtime_env=runtime_env,
)
else:
- # Parallel batch — run with ThreadPoolExecutor
_execute_parallel_batch(
batch, config, input_contents, feedback,
iteration, max_iterations, cwd, timeout, dry_run,
step_outputs, step_results,
- run_dir=run_dir, output_iter=output_iter, phase_name=phase_name,
+ run_dir=run_dir, output_iter=output_iter,
+ phase_name=phase_name, worktree_path=worktree_path,
+ runtime_env=runtime_env,
)
# Extract verdict from all verdict steps (ALL must PASS; ESCALATE wins over all)
@@ -506,6 +670,25 @@ def _run_steps(
return step_outputs, step_results, verdict
+def _invoke_agentic(
+ agent_config: AgentConfig,
+ prompt: str,
+ step_name: str,
+ *,
+ worktree_path: Path,
+ env: dict[str, str] | None = None,
+ timeout: int | None = None,
+ quiet: bool = False,
+) -> AgentResult:
+ """Run an agent in agentic mode using an existing worktree."""
+ return invoke_agent_agentic(
+ agent_config, prompt, step_name,
+ worktree_path=worktree_path,
+ env=env,
+ timeout=timeout, quiet=quiet,
+ )
+
+
def _execute_step(
step: StepConfig,
config: PipelineConfig,
@@ -523,6 +706,8 @@ def _execute_step(
output_iter: int,
phase_name: str | None = None,
quiet: bool = False,
+ worktree_path: Path | None = None,
+ runtime_env: dict[str, str] | None = None,
) -> None:
"""Execute a single step, updating step_outputs and step_results in place."""
if not quiet:
@@ -542,6 +727,7 @@ def _execute_step(
# 4. Render prompt
prompt = render_template(template, context)
+ prompt = _augment_prompt_with_runtime_context(prompt, context)
# 5. Dry run: print and skip
if dry_run:
@@ -555,10 +741,21 @@ def _execute_step(
# 6. Invoke agent
agent_config = config.agents[step.agent]
try:
- result = invoke_agent(
- agent_config, prompt, step.name,
- cwd=cwd, timeout=timeout, quiet=quiet,
- )
+ if agent_config.agentic and worktree_path:
+ result = _invoke_agentic(
+ agent_config, prompt, step.name,
+ worktree_path=worktree_path,
+ env=runtime_env,
+ timeout=timeout, quiet=quiet,
+ )
+ else:
+ # When worktree exists, run non-agentic agents (reviewers) in
+ # the worktree too so they can inspect the modified files.
+ effective_cwd = worktree_path if worktree_path else cwd
+ result = invoke_agent(
+ agent_config, prompt, step.name,
+ cwd=effective_cwd, env=runtime_env, timeout=timeout, quiet=quiet,
+ )
except subprocess.TimeoutExpired as e:
stdout = (e.stdout or b"") if isinstance(e.stdout, bytes) else (e.stdout or "")
stderr = (e.stderr or b"") if isinstance(e.stderr, bytes) else (e.stderr or "")
@@ -625,6 +822,8 @@ def _execute_parallel_batch(
run_dir: Path,
output_iter: int,
phase_name: str | None = None,
+ worktree_path: Path | None = None,
+ runtime_env: dict[str, str] | None = None,
) -> None:
"""Execute multiple steps in parallel using threads."""
agent_names = ", ".join(s.agent for s in batch)
@@ -640,6 +839,26 @@ def _execute_parallel_batch(
)
return
+ # Agentic steps cannot run in parallel (they share a worktree)
+ agentic_in_batch = [
+ s for s in batch
+ if config.agents.get(s.agent, AgentConfig(name="", command="")).agentic
+ ]
+ if len(agentic_in_batch) > 1:
+ logger.warning(
+ " [parallel] %d agentic steps cannot run concurrently — running sequentially",
+ len(agentic_in_batch),
+ )
+ for step in batch:
+ _execute_step(
+ step, config, input_contents, feedback,
+ iteration, max_iterations, cwd, timeout, dry_run,
+ step_outputs, step_results,
+ run_dir=run_dir, output_iter=output_iter,
+ phase_name=phase_name, worktree_path=worktree_path,
+ )
+ return
+
# Snapshot context before parallel execution (all steps see same state)
context_snapshot = dict(input_contents)
context_snapshot.update(step_outputs)
@@ -666,12 +885,22 @@ def _execute_parallel_batch(
if step.context_override:
context = _apply_context_override(context, step.context_override)
prompt = render_template(template, context)
+ prompt = _augment_prompt_with_runtime_context(prompt, context)
agent_config = config.agents[step.agent]
- result = invoke_agent(
- agent_config, prompt, step.name,
- cwd=cwd, timeout=timeout, quiet=True,
- )
+ if agent_config.agentic and worktree_path:
+ result = _invoke_agentic(
+ agent_config, prompt, step.name,
+ worktree_path=worktree_path,
+ env=runtime_env,
+ timeout=timeout, quiet=True,
+ )
+ else:
+ effective_cwd = worktree_path if worktree_path else cwd
+ result = invoke_agent(
+ agent_config, prompt, step.name,
+ cwd=effective_cwd, env=runtime_env, timeout=timeout, quiet=True,
+ )
return step.output_key, result.output, result
with ThreadPoolExecutor(max_workers=len(batch)) as executor:
@@ -765,6 +994,35 @@ def _build_context(
return context
+def _build_runtime_inputs(
+ config: PipelineConfig,
+ input_contents: dict[str, str],
+ cwd: Path,
+) -> dict[str, str]:
+ """Load runtime env and expose safe execution hints to prompts."""
+ env, loaded_files, loaded_values = build_runtime_environment(config.execution, cwd)
+ input_contents["execution_policy"] = build_execution_policy(config.execution)
+ input_contents["environment_context"] = summarize_environment(
+ config.execution, loaded_files, env, loaded_values,
+ )
+ return env
+
+
+def _augment_prompt_with_runtime_context(
+ prompt: str,
+ context: dict[str, str],
+) -> str:
+ """Append execution/env guidance without requiring every template to include placeholders."""
+ extras: list[str] = []
+ if context.get("execution_policy"):
+ extras.append("## Execution Policy\n" + context["execution_policy"])
+ if context.get("environment_context"):
+ extras.append("## Environment Context\n" + context["environment_context"])
+ if not extras:
+ return prompt
+ return prompt.rstrip() + "\n\n" + "\n\n".join(extras) + "\n"
+
+
def _apply_context_override(
context: dict[str, str],
overrides: dict[str, str],
diff --git a/cross_eval/report.py b/cross_eval/report.py
index 9b29e2e..eda32ea 100644
--- a/cross_eval/report.py
+++ b/cross_eval/report.py
@@ -535,6 +535,10 @@ def _append_final_verdict(
lines.append("---\n")
lines.append(f"## {_t(config, 'final_verdict_title')}: {result.final_verdict}\n")
+ if result.agentic_branch:
+ lines.append(f"**Agentic branch**: `{result.agentic_branch}`")
+ lines.append(f"```bash\ngit checkout {result.agentic_branch}\n```\n")
+
if result.final_verdict == "PASS":
lines.append(_t(config, "pass_msg"))
elif result.final_verdict == "ESCALATE":
diff --git a/cross_eval/runtime_env.py b/cross_eval/runtime_env.py
new file mode 100644
index 0000000..5604585
--- /dev/null
+++ b/cross_eval/runtime_env.py
@@ -0,0 +1,152 @@
+"""Helpers for building agent runtime environments from .env files."""
+from __future__ import annotations
+
+import os
+from pathlib import Path
+
+from cross_eval.models import ExecutionConfig
+
+_SUMMARY_PREFIXES = (
+ "CLICKHOUSE",
+ "CH_",
+ "DB_",
+ "DATABASE",
+ "PG",
+ "POSTGRES",
+ "MYSQL",
+ "REDIS",
+ "AWS",
+ "S3",
+)
+
+
+def _strip_quotes(value: str) -> str:
+ if len(value) >= 2 and value[0] == value[-1] and value[0] in {"'", '"'}:
+ unwrapped = value[1:-1]
+ if value[0] == '"':
+ return bytes(unwrapped, "utf-8").decode("unicode_escape")
+ return unwrapped
+ return value
+
+
+def parse_dotenv(path: Path) -> dict[str, str]:
+ """Parse a simple dotenv file into key/value pairs."""
+ values: dict[str, str] = {}
+ for raw_line in path.read_text(encoding="utf-8").splitlines():
+ line = raw_line.strip()
+ if not line or line.startswith("#"):
+ continue
+ if line.startswith("export "):
+ line = line[len("export ") :].strip()
+ if "=" not in line:
+ continue
+ key, value = line.split("=", 1)
+ key = key.strip()
+ if not key:
+ continue
+ values[key] = _strip_quotes(value.strip())
+ return values
+
+
+def resolve_env_files(execution: ExecutionConfig, project_root: Path) -> list[Path]:
+ """Resolve and deduplicate configured env files under the project root."""
+ candidates: list[Path] = []
+ for raw in execution.env_files:
+ path = Path(raw)
+ if not path.is_absolute():
+ path = project_root / path
+ candidates.append(path)
+
+ for raw in execution.auto_env_files:
+ path = project_root / raw
+ candidates.append(path)
+
+ resolved: list[Path] = []
+ seen: set[Path] = set()
+ for path in candidates:
+ try:
+ normalized = path.resolve()
+ except OSError:
+ normalized = path
+ if normalized in seen or not normalized.exists() or not normalized.is_file():
+ continue
+ seen.add(normalized)
+ resolved.append(normalized)
+ return resolved
+
+
+def build_runtime_environment(
+ execution: ExecutionConfig,
+ project_root: Path,
+) -> tuple[dict[str, str], list[Path], dict[str, str]]:
+ """Build subprocess env plus metadata about loaded files and names."""
+ env = os.environ.copy() if execution.inherit_env else {}
+ loaded_files = resolve_env_files(execution, project_root)
+ loaded_values: dict[str, str] = {}
+ for path in loaded_files:
+ file_values = parse_dotenv(path)
+ loaded_values.update(file_values)
+ env.update(file_values)
+ return env, loaded_files, loaded_values
+
+
+def summarize_environment(
+ execution: ExecutionConfig,
+ loaded_files: list[Path],
+ env: dict[str, str],
+ loaded_values: dict[str, str],
+) -> str:
+ """Generate a safe environment summary for prompts without leaking secrets."""
+ lines: list[str] = []
+ if loaded_files:
+ joined = ", ".join(str(path) for path in loaded_files)
+ lines.append(f"Loaded env files into the agent process: {joined}")
+ else:
+ lines.append("No .env file was auto-loaded into the agent process.")
+
+ if execution.auto_context_targets:
+ lines.append(
+ "Execution targets hinted by the user: "
+ + ", ".join(execution.auto_context_targets)
+ )
+
+ if execution.expose_env_names:
+ visible_names = sorted(
+ {
+ key
+ for key in set(loaded_values) | set(env)
+ if key.startswith(_SUMMARY_PREFIXES)
+ or any(prefix in key for prefix in ("CLICKHOUSE", "DATABASE", "DB_"))
+ }
+ )
+ if visible_names:
+ lines.append("Relevant env var names available to commands: " + ", ".join(visible_names))
+ else:
+ lines.append("No DB/service env var names matched the default summary filters.")
+ else:
+ lines.append("Environment variable values are loaded but names are hidden from the prompt.")
+
+ wants_clickhouse = "clickhouse" in {target.lower() for target in execution.auto_context_targets}
+ clickhouse_keys = [key for key in env if "CLICKHOUSE" in key or key.startswith("CH_")]
+ if wants_clickhouse or clickhouse_keys:
+ if clickhouse_keys:
+ lines.append("ClickHouse-related environment variables are available to the agent.")
+ else:
+ lines.append("No ClickHouse-specific env vars were detected in the loaded environment.")
+
+ return "\n".join(lines)
+
+
+def build_execution_policy(execution: ExecutionConfig) -> str:
+ """Describe the execution latitude granted to agentic coders/reviewers."""
+ lines = [
+ f"Execution mode: {execution.mode}",
+ f"Command policy: {execution.command_policy}",
+ "The agent may choose shell, Python, git, docker, test, and database commands on its own when needed.",
+ "The user does not need to pre-specify exact commands.",
+ ]
+ if execution.command_policy == "broad":
+ lines.append("Prefer direct validation by running the minimum set of commands needed to prove a fix.")
+ else:
+ lines.append("Keep command usage minimal and focused on validation.")
+ return "\n".join(lines)
diff --git a/cross_eval/worktree.py b/cross_eval/worktree.py
new file mode 100644
index 0000000..dda710f
--- /dev/null
+++ b/cross_eval/worktree.py
@@ -0,0 +1,135 @@
+"""Git worktree lifecycle management for agentic mode."""
+from __future__ import annotations
+
+import logging
+import shutil
+import subprocess
+from datetime import datetime
+from pathlib import Path
+
+logger = logging.getLogger(__name__)
+
+
+class WorktreeError(RuntimeError):
+ """Error during worktree operations."""
+
+
+def make_branch_name(preset_name: str) -> str:
+ """Generate a branch name for agentic results."""
+ ts = datetime.now().strftime("%Y%m%d_%H%M%S")
+ return f"cross-eval/{preset_name}_{ts}"
+
+
+def create_worktree(base_cwd: Path, work_dir: Path, branch_name: str) -> Path:
+ """Create a git worktree on a new branch from HEAD.
+
+ 1. Create branch from HEAD
+ 2. Create worktree checked out to that branch
+
+ The branch lives in the original repo, so it survives worktree removal.
+ """
+ work_dir = work_dir.resolve()
+ if work_dir.exists():
+ shutil.rmtree(work_dir)
+
+ # Create the branch at HEAD
+ try:
+ subprocess.run(
+ ["git", "branch", branch_name, "HEAD"],
+ cwd=base_cwd,
+ capture_output=True,
+ text=True,
+ check=True,
+ )
+ except subprocess.CalledProcessError as e:
+ raise WorktreeError(
+ f"Failed to create branch '{branch_name}': {e.stderr.strip()}"
+ ) from e
+
+ # Create worktree on that branch
+ try:
+ subprocess.run(
+ ["git", "worktree", "add", str(work_dir), branch_name],
+ cwd=base_cwd,
+ capture_output=True,
+ text=True,
+ check=True,
+ )
+ except subprocess.CalledProcessError as e:
+ # Clean up the branch if worktree creation fails
+ subprocess.run(
+ ["git", "branch", "-D", branch_name],
+ cwd=base_cwd,
+ capture_output=True,
+ )
+ raise WorktreeError(
+ f"Failed to create worktree at {work_dir}: {e.stderr.strip()}"
+ ) from e
+
+ logger.debug("Created worktree on branch '%s': %s", branch_name, work_dir)
+ return work_dir
+
+
+def capture_diff(worktree_path: Path) -> str:
+ """Capture all changes made in the worktree as a unified diff.
+
+ Includes both tracked modifications and new untracked files.
+ """
+ subprocess.run(
+ ["git", "add", "-A"],
+ cwd=worktree_path,
+ capture_output=True,
+ check=True,
+ )
+
+ result = subprocess.run(
+ ["git", "diff", "--cached", "HEAD"],
+ cwd=worktree_path,
+ capture_output=True,
+ text=True,
+ )
+ return result.stdout.strip()
+
+
+def commit_worktree(worktree_path: Path, message: str) -> bool:
+ """Stage and commit all changes in the worktree.
+
+ Returns True if a commit was made, False if nothing to commit.
+ """
+ subprocess.run(
+ ["git", "add", "-A"],
+ cwd=worktree_path,
+ capture_output=True,
+ check=True,
+ )
+
+ result = subprocess.run(
+ ["git", "commit", "-m", message],
+ cwd=worktree_path,
+ capture_output=True,
+ text=True,
+ )
+ # exit code 1 = nothing to commit
+ return result.returncode == 0
+
+
+def remove_worktree(base_cwd: Path, work_dir: Path) -> None:
+ """Remove a git worktree (branch is preserved in the original repo)."""
+ work_dir = work_dir.resolve()
+ try:
+ subprocess.run(
+ ["git", "worktree", "remove", "--force", str(work_dir)],
+ cwd=base_cwd,
+ capture_output=True,
+ text=True,
+ check=True,
+ )
+ except subprocess.CalledProcessError:
+ if work_dir.exists():
+ shutil.rmtree(work_dir, ignore_errors=True)
+ subprocess.run(
+ ["git", "worktree", "prune"],
+ cwd=base_cwd,
+ capture_output=True,
+ )
+ logger.debug("Removed worktree: %s (branch preserved)", work_dir)
diff --git a/pyproject.toml b/pyproject.toml
index 24d3f55..896a8f4 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "cross-eval"
-version = "0.1.0"
+version = "0.2.0"
description = "AI agent cross-evaluation CLI tool"
requires-python = ">=3.9"
dependencies = [
diff --git a/tests/test_agentic.py b/tests/test_agentic.py
new file mode 100644
index 0000000..7b3ea70
--- /dev/null
+++ b/tests/test_agentic.py
@@ -0,0 +1,701 @@
+"""Comprehensive tests for the agentic worktree flow.
+
+Covers:
+ 1. worktree.py unit tests (real temp git repo)
+ 2. agent.py agentic tests (mocking subprocess)
+ 3. config.py _make_agentic tests
+ 4. pipeline integration tests (mock invoke_agent / invoke_agent_agentic)
+"""
+from __future__ import annotations
+
+import subprocess
+import tempfile
+import unittest
+from pathlib import Path
+from unittest.mock import MagicMock, call, patch
+
+from cross_eval.agent import invoke_agent_agentic
+from cross_eval.config import BUILTIN_AGENTS, _make_agentic
+from cross_eval.models import (
+ AgentConfig,
+ AgentResult,
+ PipelineConfig,
+ StepConfig,
+)
+from cross_eval.pipeline import (
+ _commit_iteration,
+ _finalize_worktree,
+ _has_agentic_steps,
+ _setup_worktree,
+ run_pipeline,
+)
+from cross_eval.worktree import (
+ capture_diff,
+ commit_worktree,
+ create_worktree,
+ make_branch_name,
+ remove_worktree,
+)
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+def _init_git_repo(path: Path) -> None:
+ """Initialise a minimal git repo with one commit."""
+ subprocess.run(["git", "init"], cwd=path, capture_output=True, check=True)
+ subprocess.run(
+ ["git", "config", "user.email", "test@test.com"],
+ cwd=path, capture_output=True, check=True,
+ )
+ subprocess.run(
+ ["git", "config", "user.name", "Test"],
+ cwd=path, capture_output=True, check=True,
+ )
+ (path / "README.md").write_text("# init\n")
+ subprocess.run(["git", "add", "."], cwd=path, capture_output=True, check=True)
+ subprocess.run(
+ ["git", "commit", "-m", "initial"],
+ cwd=path, capture_output=True, check=True,
+ )
+
+
+# ===================================================================
+# 1. worktree.py unit tests (real temp git repo)
+# ===================================================================
+
+class TestCreateWorktree(unittest.TestCase):
+ """create_worktree creates a worktree on a named branch."""
+
+ def test_creates_worktree_and_branch(self) -> None:
+ with tempfile.TemporaryDirectory() as td:
+ base = Path(td) / "repo"
+ base.mkdir()
+ _init_git_repo(base)
+
+ wt_dir = Path(td) / "wt"
+ branch = "cross-eval/test_branch"
+ result_path = create_worktree(base, wt_dir, branch)
+
+ # Worktree directory exists
+ self.assertTrue(result_path.exists())
+ # Branch was created in the original repo
+ branches = subprocess.run(
+ ["git", "branch", "--list", branch],
+ cwd=base, capture_output=True, text=True,
+ )
+ self.assertIn(branch, branches.stdout)
+
+ # Clean up
+ remove_worktree(base, wt_dir)
+
+
+class TestCaptureDiff(unittest.TestCase):
+ """capture_diff captures changes correctly."""
+
+ def test_captures_new_and_modified_files(self) -> None:
+ with tempfile.TemporaryDirectory() as td:
+ base = Path(td) / "repo"
+ base.mkdir()
+ _init_git_repo(base)
+
+ wt_dir = Path(td) / "wt"
+ branch = "cross-eval/diff_test"
+ create_worktree(base, wt_dir, branch)
+
+ # Make changes in the worktree
+ (wt_dir / "new_file.txt").write_text("hello\n")
+ (wt_dir / "README.md").write_text("# modified\n")
+
+ diff = capture_diff(wt_dir)
+ self.assertIn("new_file.txt", diff)
+ self.assertIn("hello", diff)
+ self.assertIn("modified", diff)
+
+ remove_worktree(base, wt_dir)
+
+
+class TestCommitWorktree(unittest.TestCase):
+ """commit_worktree commits changes and returns True; False when nothing to commit."""
+
+ def test_commit_returns_true_on_changes(self) -> None:
+ with tempfile.TemporaryDirectory() as td:
+ base = Path(td) / "repo"
+ base.mkdir()
+ _init_git_repo(base)
+
+ wt_dir = Path(td) / "wt"
+ branch = "cross-eval/commit_test"
+ create_worktree(base, wt_dir, branch)
+
+ (wt_dir / "file.txt").write_text("data\n")
+ result = commit_worktree(wt_dir, "test commit")
+ self.assertTrue(result)
+
+ remove_worktree(base, wt_dir)
+
+ def test_commit_returns_false_when_nothing_to_commit(self) -> None:
+ with tempfile.TemporaryDirectory() as td:
+ base = Path(td) / "repo"
+ base.mkdir()
+ _init_git_repo(base)
+
+ wt_dir = Path(td) / "wt"
+ branch = "cross-eval/empty_commit"
+ create_worktree(base, wt_dir, branch)
+
+ result = commit_worktree(wt_dir, "empty")
+ self.assertFalse(result)
+
+ remove_worktree(base, wt_dir)
+
+
+class TestRemoveWorktree(unittest.TestCase):
+ """remove_worktree removes worktree but branch survives."""
+
+ def test_branch_survives_worktree_removal(self) -> None:
+ with tempfile.TemporaryDirectory() as td:
+ base = Path(td) / "repo"
+ base.mkdir()
+ _init_git_repo(base)
+
+ wt_dir = Path(td) / "wt"
+ branch = "cross-eval/remove_test"
+ create_worktree(base, wt_dir, branch)
+
+ remove_worktree(base, wt_dir)
+
+ # Worktree directory should be gone
+ self.assertFalse(wt_dir.exists())
+
+ # Branch should still exist in the original repo
+ branches = subprocess.run(
+ ["git", "branch", "--list", branch],
+ cwd=base, capture_output=True, text=True,
+ )
+ self.assertIn(branch, branches.stdout)
+
+
+class TestMakeBranchName(unittest.TestCase):
+ """make_branch_name generates expected format."""
+
+ def test_format(self) -> None:
+ name = make_branch_name("review-fix")
+ self.assertTrue(name.startswith("cross-eval/review-fix_"))
+ # Should contain a timestamp-like suffix
+ parts = name.split("_", 1)
+ self.assertEqual(len(parts), 2)
+ # Timestamp portion should be like 20260313_123456
+ ts_part = parts[1] # after "cross-eval/review-fix_"
+ self.assertEqual(len(ts_part), 15) # YYYYMMDD_HHMMSS
+
+
+# ===================================================================
+# 2. agent.py agentic tests (mocking subprocess)
+# ===================================================================
+
+class TestInvokeAgentAgenticClaude(unittest.TestCase):
+ """invoke_agent_agentic builds correct cmd for claude (no -p, prompt as positional arg)."""
+
+ @patch("cross_eval.worktree.capture_diff", return_value="diff --git a/file ...")
+ @patch("subprocess.run")
+ def test_claude_cmd_has_no_dash_p_and_prompt_as_positional(
+ self, mock_run: MagicMock, mock_diff: MagicMock,
+ ) -> None:
+ mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="")
+
+ agent = AgentConfig(
+ name="claude-coder",
+ command="claude",
+ args=["--setting-sources", "user", "--dangerously-skip-permissions"],
+ agentic=True,
+ )
+
+ with tempfile.TemporaryDirectory() as td:
+ wt = Path(td)
+ _init_git_repo(wt)
+
+ invoke_agent_agentic(
+ agent, "implement feature X", "coding",
+ worktree_path=wt, quiet=True,
+ )
+
+ # Find the subprocess.run call that actually runs the agent
+ agent_call = None
+ for c in mock_run.call_args_list:
+ cmd = c[0][0] if c[0] else c[1].get("args", [])
+ if cmd and cmd[0] == "claude":
+ agent_call = c
+ break
+
+ self.assertIsNotNone(agent_call, "Expected a subprocess.run call with 'claude'")
+ cmd = agent_call[0][0]
+
+ # No -p flag
+ self.assertNotIn("-p", cmd)
+ # Last arg is a task file reference (not raw prompt — avoids arg length limits)
+ self.assertIn("task file", cmd[-1].lower())
+
+
+class TestInvokeAgentAgenticCodex(unittest.TestCase):
+ """invoke_agent_agentic builds correct cmd for codex (stdin mode, - sentinel)."""
+
+ @patch("cross_eval.worktree.capture_diff", return_value="diff --git a/file ...")
+ @patch("subprocess.run")
+ def test_codex_cmd_uses_stdin_with_dash_sentinel(
+ self, mock_run: MagicMock, mock_diff: MagicMock,
+ ) -> None:
+ mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="")
+
+ agent = AgentConfig(
+ name="codex-coder",
+ command="codex",
+ args=["exec", "--full-auto", "--skip-git-repo-check"],
+ agentic=True,
+ )
+
+ with tempfile.TemporaryDirectory() as td:
+ wt = Path(td)
+ _init_git_repo(wt)
+
+ invoke_agent_agentic(
+ agent, "implement feature Y", "coding",
+ worktree_path=wt, quiet=True,
+ )
+
+ agent_call = None
+ for c in mock_run.call_args_list:
+ cmd = c[0][0] if c[0] else c[1].get("args", [])
+ if cmd and cmd[0] == "codex":
+ agent_call = c
+ break
+
+ self.assertIsNotNone(agent_call, "Expected a subprocess.run call with 'codex'")
+ cmd = agent_call[0][0]
+
+ # Should have "-" sentinel at the end for stdin
+ self.assertEqual(cmd[-1], "-")
+ # Stdin input should contain the prompt
+ input_data = agent_call[1].get("input")
+ self.assertIsNotNone(input_data)
+ self.assertIn("implement feature Y", input_data)
+
+
+class TestTaskFileCleanup(unittest.TestCase):
+ """Task file is cleaned up before capture_diff."""
+
+ @patch("cross_eval.worktree.capture_diff", return_value="(no changes)")
+ @patch("subprocess.run")
+ def test_task_file_in_tmp_not_worktree(
+ self, mock_run: MagicMock, mock_diff: MagicMock,
+ ) -> None:
+ mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="")
+
+ agent = AgentConfig(
+ name="claude-coder", command="claude", args=[], agentic=True,
+ )
+
+ with tempfile.TemporaryDirectory() as td:
+ wt = Path(td)
+ _init_git_repo(wt)
+
+ invoke_agent_agentic(
+ agent, "do stuff", "coding",
+ worktree_path=wt, quiet=True,
+ )
+
+ # Task file should NOT be in the worktree (it's in /tmp)
+ self.assertFalse((wt / "CROSS_EVAL_TASK.md").exists())
+
+
+# ===================================================================
+# 3. config.py tests
+# ===================================================================
+
+class TestMakeAgenticClaude(unittest.TestCase):
+ """_make_agentic strips -p from claude args and sets agentic=True."""
+
+ def test_strips_dash_p_and_sets_agentic(self) -> None:
+ agent = AgentConfig(
+ name="claude-coder",
+ command="claude",
+ args=["-p", "--setting-sources", "user", "--model", "opus"],
+ )
+ self.assertFalse(agent.agentic)
+ _make_agentic(agent)
+ self.assertTrue(agent.agentic)
+ self.assertNotIn("-p", agent.args)
+ self.assertIn("--setting-sources", agent.args)
+
+ def test_idempotent_when_no_dash_p(self) -> None:
+ agent = AgentConfig(
+ name="claude-coder",
+ command="claude",
+ args=["--setting-sources", "user"],
+ )
+ _make_agentic(agent)
+ self.assertTrue(agent.agentic)
+ self.assertEqual(agent.args, ["--setting-sources", "user"])
+
+
+class TestMakeAgenticCodex(unittest.TestCase):
+ """_make_agentic on codex agent still works (no -p to strip)."""
+
+ def test_codex_agentic_works(self) -> None:
+ agent = AgentConfig(
+ name="codex-coder",
+ command="codex",
+ args=["exec", "--full-auto", "-"],
+ )
+ _make_agentic(agent)
+ self.assertTrue(agent.agentic)
+ # -p was never there so args are unchanged
+ self.assertIn("exec", agent.args)
+ self.assertIn("--full-auto", agent.args)
+
+
+# ===================================================================
+# 4. pipeline integration tests
+# ===================================================================
+
+def _make_agentic_config(
+ run_dir: Path,
+ agentic_coder: bool = True,
+) -> PipelineConfig:
+ """Build a config with an agentic coder + non-agentic reviewer."""
+ coder = AgentConfig(
+ name="claude-coder", command="claude",
+ args=["--setting-sources", "user"],
+ agentic=agentic_coder,
+ )
+ reviewer = AgentConfig(
+ name="claude-reviewer", command="claude",
+ args=["-p", "--setting-sources", "user"],
+ agentic=False,
+ )
+ steps = [
+ StepConfig(
+ name="coding",
+ agent="claude-coder",
+ role="coding",
+ prompt_template="default:coding",
+ output_key="coding_output",
+ ),
+ StepConfig(
+ name="review",
+ agent="claude-reviewer",
+ role="review",
+ prompt_template="default:review",
+ output_key="review_result",
+ verdict=True,
+ ),
+ ]
+ return PipelineConfig(
+ output_dir=run_dir,
+ max_iterations=2,
+ min_iterations=1,
+ language="en",
+ inputs={"plan": "Test plan", "checklist": "Test checklist"},
+ agents={"claude-coder": coder, "claude-reviewer": reviewer},
+ coders=["claude-coder"],
+ reviewers=["claude-reviewer"],
+ pipeline=steps,
+ preset_name="simple",
+ )
+
+
+class TestSetupWorktreeCalledForAgentic(unittest.TestCase):
+ """When agentic agent is configured, _setup_worktree is called."""
+
+ @patch("cross_eval.pipeline._finalize_worktree", return_value="cross-eval/test")
+ @patch("cross_eval.pipeline._commit_iteration")
+ @patch("cross_eval.pipeline._setup_worktree")
+ @patch("cross_eval.pipeline.invoke_agent_agentic")
+ @patch("cross_eval.pipeline.invoke_agent")
+ def test_setup_worktree_called(
+ self,
+ mock_invoke: MagicMock,
+ mock_invoke_agentic: MagicMock,
+ mock_setup: MagicMock,
+ mock_commit_iter: MagicMock,
+ mock_finalize: MagicMock,
+ ) -> None:
+ with tempfile.TemporaryDirectory() as td:
+ run_dir = Path(td)
+ config = _make_agentic_config(run_dir)
+
+ wt_path = run_dir / "work"
+ wt_path.mkdir()
+ mock_setup.return_value = (wt_path, "cross-eval/test")
+
+ mock_invoke_agentic.return_value = AgentResult(
+ output="diff output", exit_code=0,
+ agent_name="claude-coder", step_name="coding",
+ duration_seconds=0.1,
+ )
+ mock_invoke.return_value = AgentResult(
+ output="VERDICT: PASS", exit_code=0,
+ agent_name="claude-reviewer", step_name="review",
+ duration_seconds=0.1,
+ )
+
+ run_pipeline(config, cwd=Path(td))
+
+ mock_setup.assert_called_once()
+
+
+class TestReviewerRunsInWorktreeCwd(unittest.TestCase):
+ """Reviewer runs with worktree cwd (not original cwd) when worktree exists."""
+
+ @patch("cross_eval.pipeline._finalize_worktree", return_value="cross-eval/test")
+ @patch("cross_eval.pipeline._commit_iteration")
+ @patch("cross_eval.pipeline._setup_worktree")
+ @patch("cross_eval.pipeline.invoke_agent_agentic")
+ @patch("cross_eval.pipeline.invoke_agent")
+ def test_reviewer_uses_worktree_cwd(
+ self,
+ mock_invoke: MagicMock,
+ mock_invoke_agentic: MagicMock,
+ mock_setup: MagicMock,
+ mock_commit_iter: MagicMock,
+ mock_finalize: MagicMock,
+ ) -> None:
+ with tempfile.TemporaryDirectory() as td:
+ run_dir = Path(td)
+ config = _make_agentic_config(run_dir)
+
+ wt_path = run_dir / "work"
+ wt_path.mkdir()
+ mock_setup.return_value = (wt_path, "cross-eval/test")
+
+ mock_invoke_agentic.return_value = AgentResult(
+ output="diff output", exit_code=0,
+ agent_name="claude-coder", step_name="coding",
+ duration_seconds=0.1,
+ )
+ mock_invoke.return_value = AgentResult(
+ output="VERDICT: PASS", exit_code=0,
+ agent_name="claude-reviewer", step_name="review",
+ duration_seconds=0.1,
+ )
+
+ run_pipeline(config, cwd=Path(td))
+
+ # The reviewer (non-agentic) should have been called with cwd=worktree_path
+ reviewer_call = mock_invoke.call_args
+ self.assertEqual(reviewer_call[1].get("cwd") or reviewer_call[0][3], wt_path)
+
+
+class TestCommitIterationCalled(unittest.TestCase):
+ """_commit_iteration is called after each iteration when worktree exists."""
+
+ @patch("cross_eval.pipeline._finalize_worktree", return_value="cross-eval/test")
+ @patch("cross_eval.pipeline._commit_iteration")
+ @patch("cross_eval.pipeline._setup_worktree")
+ @patch("cross_eval.pipeline.invoke_agent_agentic")
+ @patch("cross_eval.pipeline.invoke_agent")
+ def test_commit_iteration_called(
+ self,
+ mock_invoke: MagicMock,
+ mock_invoke_agentic: MagicMock,
+ mock_setup: MagicMock,
+ mock_commit_iter: MagicMock,
+ mock_finalize: MagicMock,
+ ) -> None:
+ with tempfile.TemporaryDirectory() as td:
+ run_dir = Path(td)
+ config = _make_agentic_config(run_dir)
+
+ wt_path = run_dir / "work"
+ wt_path.mkdir()
+ mock_setup.return_value = (wt_path, "cross-eval/test")
+
+ mock_invoke_agentic.return_value = AgentResult(
+ output="diff output", exit_code=0,
+ agent_name="claude-coder", step_name="coding",
+ duration_seconds=0.1,
+ )
+ mock_invoke.return_value = AgentResult(
+ output="VERDICT: PASS", exit_code=0,
+ agent_name="claude-reviewer", step_name="review",
+ duration_seconds=0.1,
+ )
+
+ run_pipeline(config, cwd=Path(td))
+
+ mock_commit_iter.assert_called_once()
+ call_args = mock_commit_iter.call_args
+ self.assertEqual(call_args[0][0], wt_path)
+
+
+class TestFinalizeWorktreeCalled(unittest.TestCase):
+ """_finalize_worktree commits and cleans up at end."""
+
+ @patch("cross_eval.pipeline._finalize_worktree", return_value="cross-eval/test")
+ @patch("cross_eval.pipeline._commit_iteration")
+ @patch("cross_eval.pipeline._setup_worktree")
+ @patch("cross_eval.pipeline.invoke_agent_agentic")
+ @patch("cross_eval.pipeline.invoke_agent")
+ def test_finalize_called(
+ self,
+ mock_invoke: MagicMock,
+ mock_invoke_agentic: MagicMock,
+ mock_setup: MagicMock,
+ mock_commit_iter: MagicMock,
+ mock_finalize: MagicMock,
+ ) -> None:
+ with tempfile.TemporaryDirectory() as td:
+ run_dir = Path(td)
+ config = _make_agentic_config(run_dir)
+
+ wt_path = run_dir / "work"
+ wt_path.mkdir()
+ mock_setup.return_value = (wt_path, "cross-eval/test")
+
+ mock_invoke_agentic.return_value = AgentResult(
+ output="diff output", exit_code=0,
+ agent_name="claude-coder", step_name="coding",
+ duration_seconds=0.1,
+ )
+ mock_invoke.return_value = AgentResult(
+ output="VERDICT: PASS", exit_code=0,
+ agent_name="claude-reviewer", step_name="review",
+ duration_seconds=0.1,
+ )
+
+ run_pipeline(config, cwd=Path(td))
+
+ mock_finalize.assert_called_once()
+ call_args = mock_finalize.call_args
+ # Should pass cwd, worktree_path, branch_name, preset_name, verdict
+ self.assertEqual(call_args[0][1], wt_path)
+ self.assertEqual(call_args[0][2], "cross-eval/test")
+
+
+class TestParallelAgenticFallsBackToSequential(unittest.TestCase):
+ """Multiple agentic steps in parallel batch fall back to sequential."""
+
+ def test_has_agentic_steps_detects_agentic(self) -> None:
+ coder = AgentConfig(
+ name="claude-coder", command="claude", args=[], agentic=True,
+ )
+ reviewer = AgentConfig(
+ name="claude-reviewer", command="claude", args=[], agentic=False,
+ )
+ config = PipelineConfig(
+ agents={"claude-coder": coder, "claude-reviewer": reviewer},
+ )
+ steps = [
+ StepConfig(name="a", agent="claude-coder", role="coding",
+ prompt_template="default:coding", output_key="a"),
+ ]
+ self.assertTrue(_has_agentic_steps(config, steps))
+
+ def test_has_agentic_steps_returns_false_without_agentic(self) -> None:
+ reviewer = AgentConfig(
+ name="claude-reviewer", command="claude", args=[], agentic=False,
+ )
+ config = PipelineConfig(
+ agents={"claude-reviewer": reviewer},
+ )
+ steps = [
+ StepConfig(name="r", agent="claude-reviewer", role="review",
+ prompt_template="default:review", output_key="r", verdict=True),
+ ]
+ self.assertFalse(_has_agentic_steps(config, steps))
+
+ @patch("cross_eval.pipeline._finalize_worktree", return_value="cross-eval/test")
+ @patch("cross_eval.pipeline._commit_iteration")
+ @patch("cross_eval.pipeline._setup_worktree")
+ @patch("cross_eval.pipeline.invoke_agent_agentic")
+ @patch("cross_eval.pipeline.invoke_agent")
+ def test_parallel_agentic_runs_sequentially(
+ self,
+ mock_invoke: MagicMock,
+ mock_invoke_agentic: MagicMock,
+ mock_setup: MagicMock,
+ mock_commit_iter: MagicMock,
+ mock_finalize: MagicMock,
+ ) -> None:
+ """When multiple agentic steps are parallel, they should run sequentially."""
+ with tempfile.TemporaryDirectory() as td:
+ run_dir = Path(td)
+
+ coder_a = AgentConfig(
+ name="coder-a", command="claude", args=[], agentic=True,
+ )
+ coder_b = AgentConfig(
+ name="coder-b", command="claude", args=[], agentic=True,
+ )
+ reviewer = AgentConfig(
+ name="reviewer", command="claude", args=["-p"], agentic=False,
+ )
+
+ steps = [
+ StepConfig(
+ name="code_a", agent="coder-a", role="coding",
+ prompt_template="default:coding", output_key="code_a",
+ parallel=True,
+ ),
+ StepConfig(
+ name="code_b", agent="coder-b", role="coding",
+ prompt_template="default:coding", output_key="code_b",
+ parallel=True,
+ ),
+ StepConfig(
+ name="review", agent="reviewer", role="review",
+ prompt_template="default:review", output_key="review_result",
+ verdict=True,
+ ),
+ ]
+
+ config = PipelineConfig(
+ output_dir=run_dir,
+ max_iterations=1,
+ min_iterations=1,
+ language="en",
+ inputs={"plan": "Test plan", "checklist": "Test checklist"},
+ agents={
+ "coder-a": coder_a,
+ "coder-b": coder_b,
+ "reviewer": reviewer,
+ },
+ coders=["coder-a", "coder-b"],
+ reviewers=["reviewer"],
+ pipeline=steps,
+ preset_name="custom",
+ )
+
+ wt_path = run_dir / "work"
+ wt_path.mkdir()
+ mock_setup.return_value = (wt_path, "cross-eval/test")
+
+ call_order: list[str] = []
+
+ def _track_agentic(agent_config, prompt, step_name, **kwargs):
+ call_order.append(step_name)
+ return AgentResult(
+ output="diff", exit_code=0,
+ agent_name=agent_config.name, step_name=step_name,
+ duration_seconds=0.1,
+ )
+
+ mock_invoke_agentic.side_effect = _track_agentic
+ mock_invoke.return_value = AgentResult(
+ output="VERDICT: PASS", exit_code=0,
+ agent_name="reviewer", step_name="review",
+ duration_seconds=0.1,
+ )
+
+ run_pipeline(config, cwd=Path(td))
+
+ # Both agentic steps should have been called (sequentially)
+ agentic_calls = [c for c in call_order if c.startswith("code_")]
+ self.assertEqual(len(agentic_calls), 2)
+ # They should appear in order (sequential, not concurrent)
+ self.assertEqual(agentic_calls, ["code_a", "code_b"])
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_config.py b/tests/test_config.py
index ba61b92..95f2944 100644
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -6,12 +6,14 @@ from pathlib import Path
from unittest.mock import patch
from cross_eval.agent import AgentInvocationError, _supports_reasoning_effort
-from cross_eval.cli import _apply_phased_iteration_override
+from cross_eval.cli import _apply_phased_iteration_override, main
from cross_eval.agent import invoke_agent
from cross_eval.config import (
BUILTIN_AGENTS,
+ _SENIOR_SYSTEM_PROMPT,
_default_seniors_for_preset,
apply_reasoning_effort_settings,
+ load_config,
normalize_reasoning_effort,
normalize_prompt_template,
normalize_step_role,
@@ -52,7 +54,6 @@ from cross_eval.prompts import (
_build_review_only_preset,
_build_simple_preset,
)
-from cross_eval.config import _SENIOR_SYSTEM_PROMPT
from cross_eval.report import build_report, parse_review_metrics, print_escalation_report
class BuiltinAgentConfigTest(unittest.TestCase):
@@ -954,5 +955,82 @@ class EscalateVerdictTest(unittest.TestCase):
self.assertIn("VERDICT: ESCALATE", AGGREGATE_REVIEW_TEMPLATE_KO)
+class FixPresetBehaviorTest(unittest.TestCase):
+ def _write_fix_config(self, root: Path, *, max_iterations: int = 7) -> Path:
+ (root / "plan.md").write_text("# plan\n", encoding="utf-8")
+ (root / "checklist.md").write_text("# checklist\n", encoding="utf-8")
+ config_path = root / "config.yaml"
+ config_path.write_text(
+ (
+ "inputs:\n"
+ " plan: plan.md\n"
+ " checklist: checklist.md\n"
+ "coders: [claude-coder]\n"
+ "reviewers: [claude-reviewer]\n"
+ "pipeline: preset:review-fix\n"
+ f"max_iterations: {max_iterations}\n"
+ "language: en\n"
+ ),
+ encoding="utf-8",
+ )
+ return config_path
+
+ def test_load_config_syncs_phased_iterations_and_enables_agentic(self) -> None:
+ with tempfile.TemporaryDirectory() as tmpdir:
+ config = load_config(self._write_fix_config(Path(tmpdir), max_iterations=7))
+
+ self.assertEqual(config.preset_name, "review-fix")
+ self.assertEqual(config.phases[0].max_iterations, 7)
+ self.assertTrue(config.agents["claude-coder"].agentic)
+ self.assertNotIn("-p", config.agents["claude-coder"].args)
+
+ def test_run_config_max_iter_updates_existing_phased_pipeline(self) -> None:
+ with tempfile.TemporaryDirectory() as tmpdir:
+ config_path = self._write_fix_config(Path(tmpdir), max_iterations=7)
+ captured: dict[str, object] = {}
+
+ def _fake_run_pipeline(config, **kwargs):
+ captured["phase_max"] = config.phases[0].max_iterations
+ captured["agentic"] = config.agents[config.coders[0]].agentic
+ return PipelineResult(
+ iterations=[],
+ final_verdict="PASS",
+ run_dir=Path(tmpdir) / "output",
+ )
+
+ with patch("cross_eval.pipeline.run_pipeline", side_effect=_fake_run_pipeline):
+ exit_code = main([
+ "run",
+ "--config", str(config_path),
+ "--max-iter", "9",
+ "--dry-run",
+ ])
+
+ self.assertEqual(exit_code, 0)
+ self.assertEqual(captured["phase_max"], 9)
+ self.assertTrue(captured["agentic"])
+
+ def test_run_preset_review_fix_auto_enables_agentic_without_flag(self) -> None:
+ captured: dict[str, object] = {}
+
+ def _fake_run_pipeline(config, **kwargs):
+ captured["preset"] = config.preset_name
+ captured["agentic"] = config.agents[config.coders[0]].agentic
+ captured["phase_max"] = config.phases[0].max_iterations
+ return PipelineResult(
+ iterations=[],
+ final_verdict="PASS",
+ run_dir=Path(".cross-eval/output"),
+ )
+
+ with patch("cross_eval.pipeline.run_pipeline", side_effect=_fake_run_pipeline):
+ exit_code = main(["run", "--preset", "review-fix", "--dry-run"])
+
+ self.assertEqual(exit_code, 0)
+ self.assertEqual(captured["preset"], "review-fix")
+ self.assertTrue(captured["agentic"])
+ self.assertEqual(captured["phase_max"], 3)
+
+
if __name__ == "__main__":
unittest.main()