From 941304398de8ee87b927da13bce6e166c698f86c Mon Sep 17 00:00:00 2001 From: chungyeong Date: Fri, 13 Mar 2026 21:47:54 +0900 Subject: [PATCH] release: cut 0.2.0 baseline --- cross_eval.egg-info/PKG-INFO | 2 +- cross_eval.egg-info/SOURCES.txt | 5 +- cross_eval/__init__.py | 2 +- cross_eval/agent.py | 231 ++++++++++- cross_eval/cli.py | 49 ++- cross_eval/config.py | 135 ++++-- cross_eval/demo.py | 2 +- cross_eval/models.py | 18 +- cross_eval/pipeline.py | 680 +++++++++++++++++++++---------- cross_eval/report.py | 4 + cross_eval/runtime_env.py | 152 +++++++ cross_eval/worktree.py | 135 ++++++ pyproject.toml | 2 +- tests/test_agentic.py | 701 ++++++++++++++++++++++++++++++++ tests/test_config.py | 82 +++- 15 files changed, 1930 insertions(+), 270 deletions(-) create mode 100644 cross_eval/runtime_env.py create mode 100644 cross_eval/worktree.py create mode 100644 tests/test_agentic.py diff --git a/cross_eval.egg-info/PKG-INFO b/cross_eval.egg-info/PKG-INFO index 1eeed19..0f402eb 100644 --- a/cross_eval.egg-info/PKG-INFO +++ b/cross_eval.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.4 Name: cross-eval -Version: 0.1.0 +Version: 0.2.0 Summary: AI agent cross-evaluation CLI tool Requires-Python: >=3.9 Requires-Dist: pyyaml>=6.0 diff --git a/cross_eval.egg-info/SOURCES.txt b/cross_eval.egg-info/SOURCES.txt index 8272bb0..26a3503 100644 --- a/cross_eval.egg-info/SOURCES.txt +++ b/cross_eval.egg-info/SOURCES.txt @@ -10,12 +10,15 @@ cross_eval/models.py cross_eval/pipeline.py cross_eval/prompts.py cross_eval/report.py +cross_eval/runtime_env.py +cross_eval/worktree.py cross_eval.egg-info/PKG-INFO cross_eval.egg-info/SOURCES.txt cross_eval.egg-info/dependency_links.txt cross_eval.egg-info/entry_points.txt cross_eval.egg-info/requires.txt cross_eval.egg-info/top_level.txt +tests/test_agentic.py tests/test_config.py tests/test_onboarding.py -tests/test_pipeline_integration.py \ No newline at end of file +tests/test_pipeline_integration.py diff --git a/cross_eval/__init__.py b/cross_eval/__init__.py index 3dc1f76..d3ec452 100644 --- a/cross_eval/__init__.py +++ b/cross_eval/__init__.py @@ -1 +1 @@ -__version__ = "0.1.0" +__version__ = "0.2.0" diff --git a/cross_eval/agent.py b/cross_eval/agent.py index 8fb6ef4..243c4a9 100644 --- a/cross_eval/agent.py +++ b/cross_eval/agent.py @@ -3,8 +3,10 @@ from __future__ import annotations import itertools import logging +import os import subprocess import sys +import tempfile import threading import time from pathlib import Path @@ -142,11 +144,17 @@ class _Spinner: sys.stderr.flush() +def _is_print_mode(args: list[str]) -> bool: + """Check if the agent args include -p / --print flag.""" + return "-p" in args or "--print" in args + + def invoke_agent( agent: AgentConfig, prompt: str, step_name: str, cwd: Optional[Path] = None, + env: Optional[dict[str, str]] = None, timeout: int | None = None, quiet: bool = False, ) -> AgentResult: @@ -155,30 +163,67 @@ def invoke_agent( Args: quiet: If True, suppress spinner (for parallel execution). """ + is_claude = "claude" in agent.command + is_interactive = is_claude and not _is_print_mode(agent.args) + cmd = [agent.command] if agent.reasoning_effort and _supports_reasoning_effort(agent.command): cmd.extend(["-c", f'model_reasoning_effort="{agent.reasoning_effort}"']) cmd.extend(agent.args) - # Build the full prompt (system prompt + user prompt) - if agent.system_prompt and _supports_system_prompt_flag(agent.command): - # claude: --system-prompt flag supported natively - cmd.extend(["--system-prompt", agent.system_prompt]) - input_data = prompt - elif agent.system_prompt: - # codex, others: no --system-prompt flag, prepend to prompt - input_data = ( - f"\n{agent.system_prompt}\n\n\n" - f"{prompt}" + # --- Temp files for interactive (non -p) claude --- + task_file: Optional[Path] = None + output_file: Optional[Path] = None + + if is_interactive: + # Write prompt + output instruction to temp task file + task_fd, task_path = tempfile.mkstemp(suffix=".md", prefix="cross_eval_task_") + task_file = Path(task_path) + os.close(task_fd) + + out_fd, out_path = tempfile.mkstemp(suffix=".md", prefix="cross_eval_out_") + output_file = Path(out_path) + os.close(out_fd) + # Clear the output file so we can detect if agent wrote to it + output_file.write_text("", encoding="utf-8") + + wrapped_prompt = ( + f"{prompt}\n\n" + f"---\n" + f"IMPORTANT: Write your COMPLETE response to this file: {output_file}\n" + f"Do NOT modify any other files in the project." ) + task_file.write_text(wrapped_prompt, encoding="utf-8") + + # System prompt via flag + if agent.system_prompt and _supports_system_prompt_flag(agent.command): + cmd.extend(["--system-prompt", agent.system_prompt]) + + # Positional arg: point claude to the task file + cmd.append( + f"Read the task file at {task_file} and follow all instructions in it. " + f"Write your complete output to {output_file}." + ) + input_data: str | None = None else: - input_data = prompt + # Print mode (-p) or non-claude: deliver prompt via stdin + if agent.system_prompt and _supports_system_prompt_flag(agent.command): + cmd.extend(["--system-prompt", agent.system_prompt]) + input_data = prompt + elif agent.system_prompt: + input_data = ( + f"\n{agent.system_prompt}\n\n\n" + f"{prompt}" + ) + else: + input_data = prompt logger.debug("Invoking agent '%s': %s", agent.name, " ".join(cmd[:5]) + " ...") spinner: Optional[_Spinner] = None if not quiet: - logger.info(" cmd: %s", " ".join(cmd[:6])) + mode_label = "interactive" if is_interactive else "" + logger.info(" cmd: %s %s", " ".join(cmd[:6]), f"({mode_label})" if mode_label else "") spinner = _Spinner(f"[{step_name}] {agent.name} running...") spinner.start() @@ -191,6 +236,7 @@ def invoke_agent( text=True, timeout=timeout, cwd=cwd, + env=env, ) duration = time.monotonic() - start except subprocess.TimeoutExpired: @@ -201,10 +247,154 @@ def invoke_agent( if spinner: spinner.stop(f"[{step_name}] ERROR") raise + finally: + if task_file: + task_file.unlink(missing_ok=True) + + if result.returncode != 0: + if spinner: + spinner.stop(f"[{step_name}] FAILED (exit {result.returncode})") + if output_file: + output_file.unlink(missing_ok=True) + err_detail = result.stderr.strip() or result.stdout.strip() + if err_detail and len(err_detail) > 500: + err_detail = err_detail[:500] + "..." + cmd_preview = " ".join(cmd[:6]) + failure_type, suggested_action = _classify_agent_failure(err_detail or "") + raise AgentInvocationError( + agent_name=agent.name, + step_name=step_name, + cmd_preview=cmd_preview, + raw_error=err_detail or "(no output)", + failure_type=failure_type, + suggested_action=suggested_action, + ) + + # --- Capture output --- + if output_file: + output = output_file.read_text(encoding="utf-8").strip() + output_file.unlink(missing_ok=True) + if not output: + # Fallback to stdout if agent didn't write to the file + output = result.stdout.strip() + else: + output = result.stdout.strip() - output = result.stdout.strip() chars = len(output) + if spinner: + spinner.stop(f"[{step_name}] done — {chars} chars") + + if not output: + stderr_info = result.stderr.strip() + if stderr_info: + logger.warning( + "Agent '%s' produced empty output at step '%s'. stderr: %s", + agent.name, step_name, stderr_info[:500], + ) + else: + logger.warning( + "Agent '%s' produced empty output at step '%s' (no stderr either)", + agent.name, step_name, + ) + + return AgentResult( + output=output, + exit_code=result.returncode, + agent_name=agent.name, + step_name=step_name, + duration_seconds=round(duration, 1), + ) + + +def invoke_agent_agentic( + agent: AgentConfig, + prompt: str, + step_name: str, + worktree_path: Path, + env: Optional[dict[str, str]] = None, + timeout: int | None = None, + quiet: bool = False, +) -> AgentResult: + """Invoke an agent in agentic mode (no -p, runs in worktree, captures git diff). + + The agent runs without print mode so it can modify files directly. + After the agent exits, git diff (since last commit) is captured as the output. + """ + from cross_eval.worktree import capture_diff + + # Write prompt to a temp file (outside worktree, won't appear in diffs) + import tempfile + task_fd, task_path = tempfile.mkstemp(suffix=".md", prefix="cross_eval_task_") + task_file = Path(task_path) + task_file.write_text(prompt, encoding="utf-8") + os.close(task_fd) + + cmd = [agent.command] + if agent.reasoning_effort and _supports_reasoning_effort(agent.command): + cmd.extend(["-c", f'model_reasoning_effort="{agent.reasoning_effort}"']) + + # Strip stdin sentinel ("-") from args for agentic mode + args = [a for a in agent.args if a != "-"] + cmd.extend(args) + + # System prompt via flag if supported + if agent.system_prompt and _supports_system_prompt_flag(agent.command): + cmd.extend(["--system-prompt", agent.system_prompt]) + + # Deliver the prompt differently per agent type + is_codex = "codex" in agent.command + input_data: str | None = None + if is_codex: + # codex: stdin mode + cmd.append("-") + if agent.system_prompt and not _supports_system_prompt_flag(agent.command): + input_data = f"\n{agent.system_prompt}\n\n\n{prompt}" + else: + input_data = prompt + else: + # claude: use positional arg with a pointer to the task file + # (avoids OS arg length limits for large prompts) + cmd.append( + f"Read the task file at {task_file} and execute all instructions in it. " + f"Work in the current directory." + ) + + logger.debug( + "Invoking agent '%s' (agentic) in worktree: %s", + agent.name, worktree_path, + ) + + spinner: Optional[_Spinner] = None + if not quiet: + logger.info(" cmd: %s (agentic)", " ".join(cmd[:6])) + spinner = _Spinner(f"[{step_name}] {agent.name} (agentic) running...") + spinner.start() + + try: + start = time.monotonic() + result = subprocess.run( + cmd, + input=input_data, + capture_output=True, + text=True, + timeout=timeout, + cwd=worktree_path, + env=env, + ) + duration = time.monotonic() - start + except subprocess.TimeoutExpired: + if spinner: + spinner.stop(f"[{step_name}] TIMEOUT after {timeout}s") + raise + except Exception: + if spinner: + spinner.stop(f"[{step_name}] ERROR") + raise + finally: + # Clean up temp task file (it's in /tmp, not in worktree) + task_file.unlink(missing_ok=True) + if result.returncode != 0: if spinner: spinner.stop(f"[{step_name}] FAILED (exit {result.returncode})") @@ -222,17 +412,22 @@ def invoke_agent( suggested_action=suggested_action, ) - if spinner: - spinner.stop(f"[{step_name}] done — {chars} chars") + # Capture git diff as the output (changes since last commit on the branch) + diff_output = capture_diff(worktree_path) - if not output: + if not diff_output: + diff_output = "(no changes)" logger.warning( - "Agent '%s' produced empty output at step '%s'", + "Agent '%s' made no file changes at step '%s'", agent.name, step_name, ) + chars = len(diff_output) + if spinner: + spinner.stop(f"[{step_name}] done — {chars} chars (agentic)") + return AgentResult( - output=output, + output=diff_output, exit_code=result.returncode, agent_name=agent.name, step_name=step_name, diff --git a/cross_eval/cli.py b/cross_eval/cli.py index 45d424a..7d10bb8 100644 --- a/cross_eval/cli.py +++ b/cross_eval/cli.py @@ -49,7 +49,7 @@ max_iterations: 3 language: {language} # 결과 저장 경로 -output_dir: output +output_dir: .cross-eval/output # ─── 커스텀 에이전트 (선택) ──────────────────────────────────── # 기본 제공 에이전트를 덮어쓰거나 새 에이전트를 정의할 수 있습니다. @@ -372,6 +372,14 @@ def main(argv: list[str] | None = None) -> int: "--input", action="append", dest="inputs", metavar="KEY=PATH", help="추가 입력 파일 (예: --input spec=./api-spec.md)", ) + input_group.add_argument( + "--env-file", action="append", dest="env_files", type=Path, default=None, + help="에이전트 subprocess에 주입할 추가 .env 파일 (여러 개 가능)", + ) + input_group.add_argument( + "--target", action="append", dest="execution_targets", default=None, + help="에이전트에게 강조할 실행 대상 힌트 (예: clickhouse, postgres)", + ) # -- 에이전트 설정 -- agent_group = run_parser.add_argument_group( @@ -410,6 +418,10 @@ def main(argv: list[str] | None = None) -> int: choices=REASONING_EFFORT_CHOICES + ("extra-high", "extra_high", "x-high"), help="Senior용 reasoning effort", ) + agent_group.add_argument( + "--agentic", action="store_true", default=False, + help="Coder를 agentic 모드로 실행 (worktree에서 파일 직접 수정, git diff로 결과 캡처)", + ) agent_group.add_argument( "--model", default=None, metavar="MODEL", help="모든 에이전트의 모델을 한번에 변경 (예: sonnet, opus)", @@ -761,7 +773,7 @@ def _generate_guided_config( "", f"max_iterations: {settings['max_iter']}", f"language: {lang}", - "output_dir: output", + "output_dir: .cross-eval/output", "", ]) @@ -799,20 +811,19 @@ def _apply_model_override(config, agent_name: str, model: str) -> None: def _apply_phased_iteration_override(config, max_iter: int | None) -> None: """Apply CLI max-iter to converging phases while preserving setup phases.""" - if max_iter is None: - return + from cross_eval.config import sync_phased_iterations - for phase in config.phases: - if any(step.verdict for step in phase.steps): - phase.max_iterations = max_iter + sync_phased_iterations(config, max_iter) def cmd_run(args: argparse.Namespace) -> int: """Load config, validate, and execute the pipeline.""" from cross_eval.config import ( + ensure_fix_preset_agentic, apply_input_overrides, default_config, load_config, + sync_phased_iterations, validate_config, ) from cross_eval.prompts import PIPELINE_PRESETS @@ -917,6 +928,10 @@ def cmd_run(args: argparse.Namespace) -> int: if preset in {"plan-review", "review-only"} and args.max_iter is None and args.min_iter is None: config.max_iterations = 1 + sync_phased_iterations(config) + if args.max_iter is not None: + sync_phased_iterations(config, args.max_iter) + apply_reasoning_effort_settings( config, reasoning_effort=args.reasoning_effort, @@ -925,6 +940,15 @@ def cmd_run(args: argparse.Namespace) -> int: senior_effort=args.senior_effort, ) + # --agentic: convert coder agents to agentic mode + if args.agentic: + from cross_eval.config import _make_agentic + for coder_name in config.coders: + if coder_name in config.agents: + _make_agentic(config.agents[coder_name]) + + ensure_fix_preset_agentic(config) + # --model: apply to ALL agents if args.model is not None: for agent_name in config.agents: @@ -958,6 +982,17 @@ def cmd_run(args: argparse.Namespace) -> int: return 1 config.inputs["docs"] = docs_content + if args.env_files: + for env_file in args.env_files: + resolved = env_file.resolve() + if not resolved.exists(): + print(f"Env file not found: {resolved}", file=sys.stderr) + return 1 + config.execution.env_files.append(str(resolved)) + + if args.execution_targets: + config.execution.auto_context_targets = list(args.execution_targets) + if args.inputs: overrides = {} for item in args.inputs: diff --git a/cross_eval/config.py b/cross_eval/config.py index c9751f8..3fa73d4 100644 --- a/cross_eval/config.py +++ b/cross_eval/config.py @@ -1,6 +1,7 @@ """Configuration loading, validation, and preset resolution.""" from __future__ import annotations +import copy import logging import re from pathlib import Path @@ -8,7 +9,13 @@ from typing import Any import yaml -from cross_eval.models import AgentConfig, PhaseConfig, PipelineConfig, StepConfig +from cross_eval.models import ( + AgentConfig, + ExecutionConfig, + PhaseConfig, + PipelineConfig, + StepConfig, +) from cross_eval.prompts import PHASED_PRESETS, PIPELINE_PRESETS logger = logging.getLogger(__name__) @@ -24,6 +31,7 @@ DEFAULT_ROLE_REASONING_EFFORTS = { "reviewer": "medium", "senior": "high", } +FIX_STYLE_PRESETS = {"review-fix", "coding-review-fix"} # --------------------------------------------------------------------------- @@ -54,7 +62,12 @@ _CLAUDE_CODER_ARGS = list(_CLAUDE_BASE_ARGS) + [ "bypassPermissions", ] -_CLAUDE_REVIEW_ARGS = list(_CLAUDE_BASE_ARGS) + [ +_CLAUDE_REVIEW_ARGS = [ + "--setting-sources", + "user", + "--disable-slash-commands", + "--model", + "opus", "--permission-mode", "plan", ] @@ -64,29 +77,37 @@ _CODER_SYSTEM_PROMPT = ( "Rules:\n" "1. FIRST explore the project directory to understand the existing codebase, " "patterns, and conventions before writing any code.\n" - "2. Implement ONLY what the plan specifies. Do NOT add extra features, " + "2. You may decide which shell, Python, git, docker, test, and database commands " + "to run. The user does not need to pre-specify exact commands.\n" + "3. Environment variables from configured .env files may already be loaded into " + "your process; use them when validating services such as ClickHouse.\n" + "4. Implement ONLY what the plan specifies. Do NOT add extra features, " "unnecessary abstractions, premature optimizations, or \"nice-to-have\" improvements.\n" - "3. Follow the project's existing coding style, naming conventions, and directory structure.\n" - "4. If previous review feedback is provided, fix ONLY the specific issues mentioned. " + "5. Follow the project's existing coding style, naming conventions, and directory structure.\n" + "6. If previous review feedback is provided, fix ONLY the specific issues mentioned. " "Do NOT refactor unrelated code.\n" - "5. Ignore any items from previous feedback that were marked as DISMISSED or false positive.\n" - "6. When in doubt about scope, do LESS, not more." + "7. Ignore any items from previous feedback that were marked as DISMISSED or false positive.\n" + "8. When in doubt about scope, do LESS, not more." ) _REVIEWER_SYSTEM_PROMPT = ( "You are a code reviewer. You MUST NOT create, modify, or delete any files.\n" "Rules:\n" "1. Explore the project directory to understand the full codebase context.\n" - "2. Compare the implementation against the plan and checklist ONLY.\n" - "3. Classify every issue with BOTH severity AND category:\n" + "2. You may decide which shell, Python, test, git, docker, and database read commands " + "to run in order to verify behavior. The user does not need to pre-specify exact commands.\n" + "3. Environment variables from configured .env files may already be loaded into " + "your process; use them for verification when relevant.\n" + "4. Compare the implementation against the plan and checklist ONLY.\n" + "5. Classify every issue with BOTH severity AND category:\n" " - Severity: Critical (breaks functionality/security) > Major (requirement mismatch) > Minor (convention/style)\n" " - Category: Over-engineering / Omission\n" - "4. When reviewing with previous feedback, mark items as CONFIRMED (still an issue) " + "6. When reviewing with previous feedback, mark items as CONFIRMED (still an issue) " "or DISMISSED (false positive) with rationale.\n" - "5. Report out-of-scope issues separately — problems found outside plan/checklist scope.\n" - "6. Order issues by severity (Critical first).\n" - "7. Do NOT suggest improvements beyond the plan scope.\n" - "8. End with VERDICT: PASS (all requirements met, no over-engineering) " + "7. Report out-of-scope issues separately — problems found outside plan/checklist scope.\n" + "8. Order issues by severity (Critical first).\n" + "9. Do NOT suggest improvements beyond the plan scope.\n" + "10. End with VERDICT: PASS (all requirements met, no over-engineering) " "or VERDICT: FAIL (issues found)." ) @@ -94,16 +115,20 @@ _SENIOR_SYSTEM_PROMPT = ( "You are a senior technical reviewer coordinating a review-fix-verification loop.\n" "Rules:\n" "1. Explore the project directory to understand the full codebase context.\n" - "2. In aggregation mode, deduplicate overlaps, resolve disagreements, and keep only " + "2. You may decide which shell, Python, test, git, docker, and database read commands " + "to run to verify disputed issues. The user does not need to pre-specify exact commands.\n" + "3. Environment variables from configured .env files may already be loaded into " + "your process; use them when validating service integrations.\n" + "4. In aggregation mode, deduplicate overlaps, resolve disagreements, and keep only " "evidence-backed issues. Categorize dismissed findings as [False positive] or [Already fixed].\n" - "3. In verification mode, judge the current implementation directly against ONLY the " + "5. In verification mode, judge the current implementation directly against ONLY the " "plan and checklist.\n" - "4. Be skeptical of false positives, but do not lower the bar on real requirement " + "6. Be skeptical of false positives, but do not lower the bar on real requirement " "gaps.\n" - "5. When issues remain, produce a concise prioritized action list the coder can act on.\n" - "6. Maintain an Issue Tracker table across iterations to track issue status.\n" - "7. Do NOT invent new requirements beyond the plan and checklist.\n" - "8. End with one of three verdicts:\n" + "7. When issues remain, produce a concise prioritized action list the coder can act on.\n" + "8. Maintain an Issue Tracker table across iterations to track issue status.\n" + "9. Do NOT invent new requirements beyond the plan and checklist.\n" + "10. End with one of three verdicts:\n" " - VERDICT: PASS — all requirements met, no issues remain.\n" " - VERDICT: FAIL — issues found that the coder can fix.\n" " - VERDICT: ESCALATE — issues that require human intervention. Use ESCALATE when:\n" @@ -263,7 +288,7 @@ def _resolve_agents( for name in all_referenced: if name not in result and name in BUILTIN_AGENTS: - result[name] = BUILTIN_AGENTS[name] + result[name] = copy.deepcopy(BUILTIN_AGENTS[name]) return result @@ -354,15 +379,16 @@ def _apply_role_effort( def default_config() -> PipelineConfig: """Return a PipelineConfig with sensible defaults (no YAML needed).""" - agents = dict(BUILTIN_AGENTS) + agents = copy.deepcopy(BUILTIN_AGENTS) coders = ["claude-coder"] reviewers = ["claude-reviewer"] seniors: list[str] = [] pipeline = PIPELINE_PRESETS["simple"](coders, reviewers, seniors) return PipelineConfig( - output_dir=Path("output"), + output_dir=Path(".cross-eval/output"), max_iterations=3, language="ko", + execution=ExecutionConfig(), inputs={}, agents=agents, coders=coders, @@ -406,6 +432,7 @@ def _parse_raw(raw: dict[str, Any], config_path: Path) -> PipelineConfig: system_prompt=agent_data.get("system_prompt"), reasoning_effort=agent_data.get("reasoning_effort"), stdin_mode=agent_data.get("stdin_mode", False), + agentic=agent_data.get("agentic", False), ) # --- roles: explicit or inferred --- @@ -445,6 +472,17 @@ def _parse_raw(raw: dict[str, Any], config_path: Path) -> PipelineConfig: p = config_dir / p inputs[key] = p + execution_raw = raw.get("execution", {}) or {} + execution = ExecutionConfig( + mode=execution_raw.get("mode", "agent-decides"), + command_policy=execution_raw.get("command_policy", "broad"), + inherit_env=bool(execution_raw.get("inherit_env", True)), + auto_env_files=list(execution_raw.get("auto_env_files", [".env", ".env.local"])), + env_files=list(execution_raw.get("env_files", [])), + expose_env_names=bool(execution_raw.get("expose_env_names", True)), + auto_context_targets=list(execution_raw.get("auto_context_targets", [])), + ) + # --- pipeline (preset or custom) --- steps, phases = _resolve_pipeline(pipeline_raw, coders, reviewers, seniors) @@ -453,12 +491,13 @@ def _parse_raw(raw: dict[str, Any], config_path: Path) -> PipelineConfig: if isinstance(pipeline_raw, str) and pipeline_raw.startswith("preset:"): preset_name = pipeline_raw.split(":", 1)[1] - return PipelineConfig( - output_dir=Path(raw.get("output_dir", "output")), + config = PipelineConfig( + output_dir=Path(raw.get("output_dir", ".cross-eval/output")), max_iterations=int(raw.get("max_iterations", 3)), min_iterations=int(raw.get("min_iterations", 1)), verbose=bool(raw.get("verbose", False)), language=raw.get("language", "en"), + execution=execution, inputs=inputs, agents=agents, coders=coders, @@ -470,6 +509,9 @@ def _parse_raw(raw: dict[str, Any], config_path: Path) -> PipelineConfig: _config_path=config_path, _config_mtime=config_path.stat().st_mtime, ) + sync_phased_iterations(config) + ensure_fix_preset_agentic(config) + return config def try_reload_config(config: PipelineConfig) -> PipelineConfig: @@ -619,6 +661,16 @@ def validate_config(config: PipelineConfig) -> list[str]: if config.language not in ("en", "ko"): errors.append(f"Unsupported language '{config.language}'. Use 'en' or 'ko'.") + if config.execution.mode not in {"agent-decides"}: + errors.append( + f"Unsupported execution.mode '{config.execution.mode}'. Use 'agent-decides'." + ) + if config.execution.command_policy not in {"broad", "restricted"}: + errors.append( + "Unsupported execution.command_policy " + f"'{config.execution.command_policy}'. Use 'broad' or 'restricted'." + ) + return errors @@ -642,6 +694,37 @@ def _validate_unique_step_fields( seen_output_keys.add(step.output_key) +def _make_agentic(agent: AgentConfig) -> None: + """Convert an agent to agentic mode in-place (remove -p, set agentic=True).""" + agent.agentic = True + agent.args = [a for a in agent.args if a != "-p"] + + +def sync_phased_iterations( + config: PipelineConfig, + max_iter: int | None = None, +) -> None: + """Apply effective max iterations to converging phases while preserving setup phases.""" + if not config.phases: + return + + effective_max_iter = config.max_iterations if max_iter is None else max_iter + for phase in config.phases: + if any(step.verdict for step in phase.steps): + phase.max_iterations = effective_max_iter + + +def ensure_fix_preset_agentic(config: PipelineConfig) -> None: + """Fix-style presets should modify code, so coders run agentically by default.""" + if config.preset_name not in FIX_STYLE_PRESETS: + return + + for coder_name in config.coders: + agent = config.agents.get(coder_name) + if agent is not None and not agent.agentic: + _make_agentic(agent) + + def apply_input_overrides( config: PipelineConfig, overrides: dict[str, str] ) -> None: diff --git a/cross_eval/demo.py b/cross_eval/demo.py index f02ce9f..ee8ffa2 100644 --- a/cross_eval/demo.py +++ b/cross_eval/demo.py @@ -265,7 +265,7 @@ def run_live_demo( checklist_path.write_text(DEMO_CHECKLIST, encoding="utf-8") config = PipelineConfig( - output_dir=Path("output"), + output_dir=Path(".cross-eval/output"), max_iterations=3, language="en", inputs={"plan": plan_path, "checklist": checklist_path}, diff --git a/cross_eval/models.py b/cross_eval/models.py index 8fa29ad..45b4066 100644 --- a/cross_eval/models.py +++ b/cross_eval/models.py @@ -16,6 +16,7 @@ class AgentConfig: system_prompt: Optional[str] = None reasoning_effort: Optional[str] = None stdin_mode: bool = False + agentic: bool = False # run in worktree, capture git diff instead of stdout @dataclass @@ -43,15 +44,29 @@ class PhaseConfig: consecutive_pass: int = 1 # stop after N consecutive PASSes +@dataclass +class ExecutionConfig: + """Runtime execution policy for agent subprocesses.""" + + mode: str = "agent-decides" + command_policy: str = "broad" + inherit_env: bool = True + auto_env_files: list[str] = field(default_factory=lambda: [".env", ".env.local"]) + env_files: list[str] = field(default_factory=list) + expose_env_names: bool = True + auto_context_targets: list[str] = field(default_factory=list) + + @dataclass class PipelineConfig: """Full cross-eval configuration.""" - output_dir: Path = field(default_factory=lambda: Path("output")) + output_dir: Path = field(default_factory=lambda: Path(".cross-eval/output")) max_iterations: int = 3 min_iterations: int = 1 verbose: bool = False language: str = "en" # "en" or "ko" + execution: ExecutionConfig = field(default_factory=ExecutionConfig) inputs: dict[str, Path | str] = field(default_factory=dict) agents: dict[str, AgentConfig] = field(default_factory=dict) coders: list[str] = field(default_factory=list) @@ -118,3 +133,4 @@ class PipelineResult: run_dir: Optional[Path] = None repeated_aggregate_warnings: list[str] = field(default_factory=list) escalated_issues: list[str] = field(default_factory=list) + agentic_branch: Optional[str] = None diff --git a/cross_eval/pipeline.py b/cross_eval/pipeline.py index 7981cfe..b31fc8d 100644 --- a/cross_eval/pipeline.py +++ b/cross_eval/pipeline.py @@ -10,9 +10,11 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path -from cross_eval.agent import AgentInvocationError, invoke_agent +from cross_eval.agent import AgentInvocationError, invoke_agent, invoke_agent_agentic +from cross_eval.worktree import WorktreeError from cross_eval.config import try_reload_config from cross_eval.models import ( + AgentConfig, AgentResult, IterationResult, PipelineConfig, @@ -21,6 +23,11 @@ from cross_eval.models import ( ) from cross_eval.prompts import render_template, resolve_template, set_language from cross_eval.report import build_report +from cross_eval.runtime_env import ( + build_execution_policy, + build_runtime_environment, + summarize_environment, +) logger = logging.getLogger(__name__) @@ -48,6 +55,104 @@ def _make_run_dir(config: PipelineConfig) -> Path: return run_dir +def _commit_iteration( + worktree_path: Path, + label: str, + iteration: int, + verdict: str | None, +) -> None: + """Intermediate commit after each agentic iteration. + + This resets the diff baseline so the next iteration only captures new changes. + """ + from cross_eval.worktree import commit_worktree + committed = commit_worktree( + worktree_path, + f"cross-eval: {label} v{iteration} ({verdict or 'no-verdict'})", + ) + if committed: + logger.debug(" Intermediate commit: v%d (%s)", iteration, verdict) + + +def _has_agentic_steps(config: PipelineConfig, steps: list[StepConfig]) -> bool: + """Check if any step uses an agentic agent.""" + return any( + config.agents.get(s.agent, AgentConfig(name="", command="")).agentic + for s in steps + ) + + +def _setup_worktree(cwd: Path, run_dir: Path, preset_name: str) -> tuple[Path, str]: + """Create a shared worktree for the entire pipeline run. + + 1. Generate branch name (cross-eval/_) + 2. Create branch from HEAD + 3. Create worktree on that branch + + Returns (worktree_path, branch_name). + """ + from cross_eval.worktree import create_worktree, make_branch_name + branch_name = make_branch_name(preset_name) + worktree_dir = run_dir / "work" + worktree_path = create_worktree( + base_cwd=cwd, work_dir=worktree_dir, branch_name=branch_name, + ) + return worktree_path, branch_name + + +def _finalize_worktree( + cwd: Path, + worktree_path: Path, + branch_name: str, + preset_name: str, + final_verdict: str, +) -> str | None: + """Commit changes on the branch, then remove the worktree. + + The branch survives worktree removal and stays in the original repo. + Returns the branch name if changes were committed, None otherwise. + """ + from cross_eval.worktree import commit_worktree, remove_worktree + + committed = False + try: + committed = commit_worktree( + worktree_path, + f"cross-eval: {preset_name} ({final_verdict})", + ) + if committed: + logger.info(" Agentic changes committed on branch: %s", branch_name) + else: + logger.warning(" No agentic changes to commit (empty diff)") + except Exception: + logger.warning(" Failed to commit agentic changes", exc_info=True) + + try: + remove_worktree(base_cwd=cwd, work_dir=worktree_path) + except Exception: + logger.warning("Failed to clean up worktree: %s", worktree_path) + + # Check if branch has any commits beyond the base — if not, delete it + if not committed: + try: + # Check if branch has diverged from its base + result = subprocess.run( + ["git", "log", "--oneline", f"HEAD..{branch_name}"], + cwd=cwd, capture_output=True, text=True, + ) + if not result.stdout.strip(): + # No commits on branch beyond base — clean up + subprocess.run( + ["git", "branch", "-D", branch_name], + cwd=cwd, capture_output=True, + ) + logger.info(" Deleted empty branch: %s", branch_name) + except Exception: + pass # best-effort cleanup + + return branch_name if committed else None + + def _run_simple_pipeline( config: PipelineConfig, run_dir: Path, @@ -61,6 +166,15 @@ def _run_simple_pipeline( set_language(config.language) input_contents = _load_inputs(config) + runtime_env = _build_runtime_inputs(config, input_contents, cwd or Path(os.getcwd())) + + # Setup shared worktree for agentic mode + worktree_path: Path | None = None + agentic_branch_name: str | None = None + if not dry_run and _has_agentic_steps(config, config.pipeline): + worktree_path, agentic_branch_name = _setup_worktree( + cwd, run_dir, config.preset_name, + ) feedback = "(no feedback — first iteration)" iterations: list[IterationResult] = [] @@ -71,99 +185,114 @@ def _run_simple_pipeline( escalated_issues: list[str] = [] all_feedbacks: list[str] = [] - for i in range(1, config.max_iterations + 1): - config = try_reload_config(config) - set_language(config.language) - _refresh_inputs(config, input_contents) + try: + for i in range(1, config.max_iterations + 1): + config = try_reload_config(config) + set_language(config.language) + _refresh_inputs(config, input_contents) + runtime_env = _build_runtime_inputs(config, input_contents, cwd) - logger.info("=" * 50) - logger.info(" Iteration %d/%d", i, config.max_iterations) - logger.info("=" * 50) + logger.info("=" * 50) + logger.info(" Iteration %d/%d", i, config.max_iterations) + logger.info("=" * 50) - step_outputs, step_results, verdict = _run_steps( - config.pipeline, config, input_contents, feedback, - i, config.max_iterations, cwd, timeout, dry_run, - run_dir=run_dir, output_iter=i, - ) + step_outputs, step_results, verdict = _run_steps( + config.pipeline, config, input_contents, feedback, + i, config.max_iterations, cwd, timeout, dry_run, + run_dir=run_dir, output_iter=i, + worktree_path=worktree_path, + runtime_env=runtime_env, + ) - iter_result = IterationResult( - iteration=i, - step_results=step_results, - step_outputs=step_outputs, - verdict=verdict, - ) - warning = _detect_repeated_aggregate( - config.pipeline, step_outputs, aggregate_history, iteration=i, - ) - if warning: - iter_result.repeated_aggregate_warning = warning - aggregate_warnings.append(warning) - logger.warning(" %s", warning) + # Intermediate commit so next iteration's diff only shows new changes + if worktree_path is not None: + _commit_iteration(worktree_path, config.preset_name, i, verdict) - iter_result.feedback = _collect_feedback(config.pipeline, step_outputs) - feedback = iter_result.feedback or feedback - all_feedbacks.append(feedback) + iter_result = IterationResult( + iteration=i, + step_results=step_results, + step_outputs=step_outputs, + verdict=verdict, + ) + warning = _detect_repeated_aggregate( + config.pipeline, step_outputs, aggregate_history, iteration=i, + ) + if warning: + iter_result.repeated_aggregate_warning = warning + aggregate_warnings.append(warning) + logger.warning(" %s", warning) - # Extract tracker from verdict/review steps for next iteration - for step in config.pipeline: - if step.verdict or step.role == "review": - tracker = _extract_senior_tracker( - step_outputs.get(step.output_key, ""), - ) - if tracker: - input_contents["previous_senior_tracker"] = tracker + iter_result.feedback = _collect_feedback(config.pipeline, step_outputs) + feedback = iter_result.feedback or feedback + all_feedbacks.append(feedback) - iterations.append(iter_result) - - # ESCALATE check (highest priority) - if verdict == "ESCALATE": - final_verdict = "ESCALATE" - # Extract escalation details from verdict step outputs + # Extract tracker from verdict/review steps for next iteration for step in config.pipeline: - if step.verdict: - esc = _extract_escalated_issues( + if step.verdict or step.role == "review": + tracker = _extract_senior_tracker( step_outputs.get(step.output_key, ""), ) - if esc: - escalated_issues.append(esc) - iter_result.escalated_issues = esc - logger.info(" ESCALATE at iteration %d — stopping loop.", i) - break + if tracker: + input_contents["previous_senior_tracker"] = tracker - if verdict == "PASS": - final_verdict = "PASS" - if i >= config.min_iterations: - logger.info(" PASS at iteration %d (min=%d reached)!", i, config.min_iterations) + iterations.append(iter_result) + + # ESCALATE check (highest priority) + if verdict == "ESCALATE": + final_verdict = "ESCALATE" + for step in config.pipeline: + if step.verdict: + esc = _extract_escalated_issues( + step_outputs.get(step.output_key, ""), + ) + if esc: + escalated_issues.append(esc) + iter_result.escalated_issues = esc + logger.info(" ESCALATE at iteration %d — stopping loop.", i) break - else: - logger.info( - " PASS at iteration %d, but min_iterations=%d — continuing", - i, config.min_iterations, - ) - # Auto-escalate: no senior/aggregator + repeated FAIL - has_aggregator = config.seniors or any( - s.prompt_template == "default:aggregate-review" for s in config.pipeline - ) - if ( - verdict == "FAIL" - and not has_aggregator - and i >= 2 - and _detect_auto_escalate(all_feedbacks[:-1], feedback) - ): - final_verdict = "ESCALATE" - auto_msg = ( - f"Auto-escalated: same issues detected across {i} iterations " - f"without resolution (no senior reviewer configured)." + if verdict == "PASS": + final_verdict = "PASS" + if i >= config.min_iterations: + logger.info(" PASS at iteration %d (min=%d reached)!", i, config.min_iterations) + break + else: + logger.info( + " PASS at iteration %d, but min_iterations=%d — continuing", + i, config.min_iterations, + ) + + # Auto-escalate: no senior/aggregator + repeated FAIL + has_aggregator = config.seniors or any( + s.prompt_template == "default:aggregate-review" for s in config.pipeline ) - escalated_issues.append(auto_msg) - iter_result.escalated_issues = auto_msg - logger.info(" AUTO-ESCALATE at iteration %d", i) - break + if ( + verdict == "FAIL" + and not has_aggregator + and i >= 2 + and _detect_auto_escalate(all_feedbacks[:-1], feedback) + ): + final_verdict = "ESCALATE" + auto_msg = ( + f"Auto-escalated: same issues detected across {i} iterations " + f"without resolution (no senior reviewer configured)." + ) + escalated_issues.append(auto_msg) + iter_result.escalated_issues = auto_msg + logger.info(" AUTO-ESCALATE at iteration %d", i) + break - if dry_run: - logger.info(" (dry-run: stopping after iteration 1)") - break + if dry_run: + logger.info(" (dry-run: stopping after iteration 1)") + break + + finally: + agentic_branch: str | None = None + if worktree_path is not None and agentic_branch_name is not None: + agentic_branch = _finalize_worktree( + cwd, worktree_path, agentic_branch_name, + config.preset_name, final_verdict, + ) total_duration = time.monotonic() - start_time @@ -174,6 +303,7 @@ def _run_simple_pipeline( run_dir=run_dir, repeated_aggregate_warnings=aggregate_warnings, escalated_issues=escalated_issues, + agentic_branch=agentic_branch, ) if not dry_run: @@ -195,6 +325,16 @@ def _run_phased_pipeline( set_language(config.language) input_contents = _load_inputs(config) + runtime_env = _build_runtime_inputs(config, input_contents, cwd) + + # Setup shared worktree for agentic mode + all_phase_steps = [s for p in config.phases for s in p.steps] + worktree_path: Path | None = None + agentic_branch_name: str | None = None + if not dry_run and _has_agentic_steps(config, all_phase_steps): + worktree_path, agentic_branch_name = _setup_worktree( + cwd, run_dir, config.preset_name, + ) iterations: list[IterationResult] = [] feedback = "(no feedback — first iteration)" @@ -207,152 +347,171 @@ def _run_phased_pipeline( all_feedbacks: list[str] = [] escalated = False - for phase_idx, phase in enumerate(config.phases): - if escalated: - break + try: + for phase_idx, phase in enumerate(config.phases): + if escalated: + break - logger.info("=" * 60) - logger.info( - " Phase: %s (max_iter=%d, consecutive_pass=%d)", - phase.name, phase.max_iterations, phase.consecutive_pass, - ) - logger.info("=" * 60) - - consecutive_passes = 0 - phase_converged = False - - for pi in range(1, phase.max_iterations + 1): - global_iter += 1 - - config = try_reload_config(config) - set_language(config.language) - _refresh_inputs(config, input_contents) - - logger.info("-" * 50) + logger.info("=" * 60) logger.info( - " [%s] Iteration %d/%d (global: v%d)", - phase.name, pi, phase.max_iterations, global_iter, + " Phase: %s (max_iter=%d, consecutive_pass=%d)", + phase.name, phase.max_iterations, phase.consecutive_pass, ) - logger.info("-" * 50) + logger.info("=" * 60) - step_outputs, step_results, verdict = _run_steps( - phase.steps, config, input_contents, feedback, - pi, phase.max_iterations, cwd, timeout, dry_run, - run_dir=run_dir, output_iter=global_iter, phase_name=phase.name, - ) + consecutive_passes = 0 + phase_converged = False - iter_result = IterationResult( - iteration=global_iter, - step_results=step_results, - step_outputs=step_outputs, - verdict=verdict, - phase_name=phase.name, - ) - phase_history = aggregate_history_by_phase.setdefault(phase.name, {}) - warning = _detect_repeated_aggregate( - phase.steps, step_outputs, phase_history, iteration=global_iter, - phase_name=phase.name, - ) - if warning: - iter_result.repeated_aggregate_warning = warning - aggregate_warnings.append(warning) - logger.warning(" %s", warning) + for pi in range(1, phase.max_iterations + 1): + global_iter += 1 - iter_result.feedback = _collect_feedback(phase.steps, step_outputs) - feedback = iter_result.feedback or feedback - all_feedbacks.append(feedback) + config = try_reload_config(config) + set_language(config.language) + _refresh_inputs(config, input_contents) + runtime_env = _build_runtime_inputs(config, input_contents, cwd) - # Extract tracker from verdict/review steps - for step in phase.steps: - if step.verdict or step.role == "review": - tracker = _extract_senior_tracker( - step_outputs.get(step.output_key, ""), + logger.info("-" * 50) + logger.info( + " [%s] Iteration %d/%d (global: v%d)", + phase.name, pi, phase.max_iterations, global_iter, + ) + logger.info("-" * 50) + + step_outputs, step_results, verdict = _run_steps( + phase.steps, config, input_contents, feedback, + pi, phase.max_iterations, cwd, timeout, dry_run, + run_dir=run_dir, output_iter=global_iter, phase_name=phase.name, + worktree_path=worktree_path, + runtime_env=runtime_env, + ) + + # Intermediate commit so next iteration's diff only shows new changes + if worktree_path is not None: + _commit_iteration( + worktree_path, f"{config.preset_name}/{phase.name}", + global_iter, verdict, ) - if tracker: - input_contents["previous_senior_tracker"] = tracker - iterations.append(iter_result) + iter_result = IterationResult( + iteration=global_iter, + step_results=step_results, + step_outputs=step_outputs, + verdict=verdict, + phase_name=phase.name, + ) + phase_history = aggregate_history_by_phase.setdefault(phase.name, {}) + warning = _detect_repeated_aggregate( + phase.steps, step_outputs, phase_history, iteration=global_iter, + phase_name=phase.name, + ) + if warning: + iter_result.repeated_aggregate_warning = warning + aggregate_warnings.append(warning) + logger.warning(" %s", warning) - # ESCALATE check - if verdict == "ESCALATE": - final_verdict = "ESCALATE" + iter_result.feedback = _collect_feedback(phase.steps, step_outputs) + feedback = iter_result.feedback or feedback + all_feedbacks.append(feedback) + + # Extract tracker from verdict/review steps for step in phase.steps: - if step.verdict: - esc = _extract_escalated_issues( + if step.verdict or step.role == "review": + tracker = _extract_senior_tracker( step_outputs.get(step.output_key, ""), ) - if esc: - escalated_issues.append(esc) - iter_result.escalated_issues = esc - logger.info( - " [%s] ESCALATE at iteration %d — stopping.", - phase.name, pi, - ) - escalated = True - break + if tracker: + input_contents["previous_senior_tracker"] = tracker - if verdict is None: - logger.info( - " [%s] completed (no verdict step; single-pass phase)", - phase.name, - ) - phase_converged = True - break + iterations.append(iter_result) - if verdict == "PASS": - consecutive_passes += 1 - logger.info( - " [%s] PASS (%d/%d consecutive)", - phase.name, consecutive_passes, phase.consecutive_pass, - ) - if consecutive_passes >= phase.consecutive_pass: + # ESCALATE check + if verdict == "ESCALATE": + final_verdict = "ESCALATE" + for step in phase.steps: + if step.verdict: + esc = _extract_escalated_issues( + step_outputs.get(step.output_key, ""), + ) + if esc: + escalated_issues.append(esc) + iter_result.escalated_issues = esc logger.info( - " [%s] Converged! %d consecutive PASSes.", - phase.name, phase.consecutive_pass, + " [%s] ESCALATE at iteration %d — stopping.", + phase.name, pi, + ) + escalated = True + break + + if verdict is None: + logger.info( + " [%s] completed (no verdict step; single-pass phase)", + phase.name, ) phase_converged = True break - else: - consecutive_passes = 0 - # Auto-escalate in phased pipeline - has_aggregator = config.seniors or any( - s.prompt_template == "default:aggregate-review" for s in phase.steps - ) - if ( - verdict == "FAIL" - and not has_aggregator - and pi >= 2 - and _detect_auto_escalate(all_feedbacks[:-1], feedback) - ): - final_verdict = "ESCALATE" - auto_msg = ( - f"Auto-escalated: same issues detected across {pi} iterations " - f"in phase '{phase.name}' without resolution." + if verdict == "PASS": + consecutive_passes += 1 + logger.info( + " [%s] PASS (%d/%d consecutive)", + phase.name, consecutive_passes, phase.consecutive_pass, + ) + if consecutive_passes >= phase.consecutive_pass: + logger.info( + " [%s] Converged! %d consecutive PASSes.", + phase.name, phase.consecutive_pass, + ) + phase_converged = True + break + else: + consecutive_passes = 0 + + # Auto-escalate in phased pipeline + has_aggregator = config.seniors or any( + s.prompt_template == "default:aggregate-review" for s in phase.steps ) - escalated_issues.append(auto_msg) - iter_result.escalated_issues = auto_msg - logger.info(" [%s] AUTO-ESCALATE at iteration %d", phase.name, pi) - escalated = True + if ( + verdict == "FAIL" + and not has_aggregator + and pi >= 2 + and _detect_auto_escalate(all_feedbacks[:-1], feedback) + ): + final_verdict = "ESCALATE" + auto_msg = ( + f"Auto-escalated: same issues detected across {pi} iterations " + f"in phase '{phase.name}' without resolution." + ) + escalated_issues.append(auto_msg) + iter_result.escalated_issues = auto_msg + logger.info(" [%s] AUTO-ESCALATE at iteration %d", phase.name, pi) + escalated = True + break + + if dry_run: + break + + if escalated: break - if dry_run: - break + if phase_converged: + logger.info(" Phase '%s' completed: CONVERGED", phase.name) + else: + logger.info( + " Phase '%s' completed: max iterations (%d) reached", + phase.name, phase.max_iterations, + ) - if escalated: - break + if phase_idx == len(config.phases) - 1: + final_verdict = "PASS" if phase_converged else "MAX_ITERATIONS_REACHED" - if phase_converged: - logger.info(" Phase '%s' completed: CONVERGED", phase.name) - else: - logger.info( - " Phase '%s' completed: max iterations (%d) reached", - phase.name, phase.max_iterations, + finally: + agentic_branch: str | None = None + if worktree_path is not None and agentic_branch_name is not None: + agentic_branch = _finalize_worktree( + cwd, worktree_path, agentic_branch_name, + config.preset_name, final_verdict, ) - if phase_idx == len(config.phases) - 1: - final_verdict = "PASS" if phase_converged else "MAX_ITERATIONS_REACHED" - total_duration = time.monotonic() - start_time pipeline_result = PipelineResult( @@ -362,6 +521,7 @@ def _run_phased_pipeline( run_dir=run_dir, repeated_aggregate_warnings=aggregate_warnings, escalated_issues=escalated_issues, + agentic_branch=agentic_branch, ) if not dry_run: @@ -463,6 +623,8 @@ def _run_steps( run_dir: Path, output_iter: int, phase_name: str | None = None, + worktree_path: Path | None = None, + runtime_env: dict[str, str] | None = None, ) -> tuple[dict[str, str], dict[str, AgentResult], str | None]: """Execute all steps in one iteration, parallelizing where possible.""" step_outputs: dict[str, str] = {} @@ -473,21 +635,23 @@ def _run_steps( for batch in batches: if len(batch) == 1: - # Single step — run directly step = batch[0] _execute_step( step, config, input_contents, feedback, iteration, max_iterations, cwd, timeout, dry_run, step_outputs, step_results, - run_dir=run_dir, output_iter=output_iter, phase_name=phase_name, + run_dir=run_dir, output_iter=output_iter, + phase_name=phase_name, worktree_path=worktree_path, + runtime_env=runtime_env, ) else: - # Parallel batch — run with ThreadPoolExecutor _execute_parallel_batch( batch, config, input_contents, feedback, iteration, max_iterations, cwd, timeout, dry_run, step_outputs, step_results, - run_dir=run_dir, output_iter=output_iter, phase_name=phase_name, + run_dir=run_dir, output_iter=output_iter, + phase_name=phase_name, worktree_path=worktree_path, + runtime_env=runtime_env, ) # Extract verdict from all verdict steps (ALL must PASS; ESCALATE wins over all) @@ -506,6 +670,25 @@ def _run_steps( return step_outputs, step_results, verdict +def _invoke_agentic( + agent_config: AgentConfig, + prompt: str, + step_name: str, + *, + worktree_path: Path, + env: dict[str, str] | None = None, + timeout: int | None = None, + quiet: bool = False, +) -> AgentResult: + """Run an agent in agentic mode using an existing worktree.""" + return invoke_agent_agentic( + agent_config, prompt, step_name, + worktree_path=worktree_path, + env=env, + timeout=timeout, quiet=quiet, + ) + + def _execute_step( step: StepConfig, config: PipelineConfig, @@ -523,6 +706,8 @@ def _execute_step( output_iter: int, phase_name: str | None = None, quiet: bool = False, + worktree_path: Path | None = None, + runtime_env: dict[str, str] | None = None, ) -> None: """Execute a single step, updating step_outputs and step_results in place.""" if not quiet: @@ -542,6 +727,7 @@ def _execute_step( # 4. Render prompt prompt = render_template(template, context) + prompt = _augment_prompt_with_runtime_context(prompt, context) # 5. Dry run: print and skip if dry_run: @@ -555,10 +741,21 @@ def _execute_step( # 6. Invoke agent agent_config = config.agents[step.agent] try: - result = invoke_agent( - agent_config, prompt, step.name, - cwd=cwd, timeout=timeout, quiet=quiet, - ) + if agent_config.agentic and worktree_path: + result = _invoke_agentic( + agent_config, prompt, step.name, + worktree_path=worktree_path, + env=runtime_env, + timeout=timeout, quiet=quiet, + ) + else: + # When worktree exists, run non-agentic agents (reviewers) in + # the worktree too so they can inspect the modified files. + effective_cwd = worktree_path if worktree_path else cwd + result = invoke_agent( + agent_config, prompt, step.name, + cwd=effective_cwd, env=runtime_env, timeout=timeout, quiet=quiet, + ) except subprocess.TimeoutExpired as e: stdout = (e.stdout or b"") if isinstance(e.stdout, bytes) else (e.stdout or "") stderr = (e.stderr or b"") if isinstance(e.stderr, bytes) else (e.stderr or "") @@ -625,6 +822,8 @@ def _execute_parallel_batch( run_dir: Path, output_iter: int, phase_name: str | None = None, + worktree_path: Path | None = None, + runtime_env: dict[str, str] | None = None, ) -> None: """Execute multiple steps in parallel using threads.""" agent_names = ", ".join(s.agent for s in batch) @@ -640,6 +839,26 @@ def _execute_parallel_batch( ) return + # Agentic steps cannot run in parallel (they share a worktree) + agentic_in_batch = [ + s for s in batch + if config.agents.get(s.agent, AgentConfig(name="", command="")).agentic + ] + if len(agentic_in_batch) > 1: + logger.warning( + " [parallel] %d agentic steps cannot run concurrently — running sequentially", + len(agentic_in_batch), + ) + for step in batch: + _execute_step( + step, config, input_contents, feedback, + iteration, max_iterations, cwd, timeout, dry_run, + step_outputs, step_results, + run_dir=run_dir, output_iter=output_iter, + phase_name=phase_name, worktree_path=worktree_path, + ) + return + # Snapshot context before parallel execution (all steps see same state) context_snapshot = dict(input_contents) context_snapshot.update(step_outputs) @@ -666,12 +885,22 @@ def _execute_parallel_batch( if step.context_override: context = _apply_context_override(context, step.context_override) prompt = render_template(template, context) + prompt = _augment_prompt_with_runtime_context(prompt, context) agent_config = config.agents[step.agent] - result = invoke_agent( - agent_config, prompt, step.name, - cwd=cwd, timeout=timeout, quiet=True, - ) + if agent_config.agentic and worktree_path: + result = _invoke_agentic( + agent_config, prompt, step.name, + worktree_path=worktree_path, + env=runtime_env, + timeout=timeout, quiet=True, + ) + else: + effective_cwd = worktree_path if worktree_path else cwd + result = invoke_agent( + agent_config, prompt, step.name, + cwd=effective_cwd, env=runtime_env, timeout=timeout, quiet=True, + ) return step.output_key, result.output, result with ThreadPoolExecutor(max_workers=len(batch)) as executor: @@ -765,6 +994,35 @@ def _build_context( return context +def _build_runtime_inputs( + config: PipelineConfig, + input_contents: dict[str, str], + cwd: Path, +) -> dict[str, str]: + """Load runtime env and expose safe execution hints to prompts.""" + env, loaded_files, loaded_values = build_runtime_environment(config.execution, cwd) + input_contents["execution_policy"] = build_execution_policy(config.execution) + input_contents["environment_context"] = summarize_environment( + config.execution, loaded_files, env, loaded_values, + ) + return env + + +def _augment_prompt_with_runtime_context( + prompt: str, + context: dict[str, str], +) -> str: + """Append execution/env guidance without requiring every template to include placeholders.""" + extras: list[str] = [] + if context.get("execution_policy"): + extras.append("## Execution Policy\n" + context["execution_policy"]) + if context.get("environment_context"): + extras.append("## Environment Context\n" + context["environment_context"]) + if not extras: + return prompt + return prompt.rstrip() + "\n\n" + "\n\n".join(extras) + "\n" + + def _apply_context_override( context: dict[str, str], overrides: dict[str, str], diff --git a/cross_eval/report.py b/cross_eval/report.py index 9b29e2e..eda32ea 100644 --- a/cross_eval/report.py +++ b/cross_eval/report.py @@ -535,6 +535,10 @@ def _append_final_verdict( lines.append("---\n") lines.append(f"## {_t(config, 'final_verdict_title')}: {result.final_verdict}\n") + if result.agentic_branch: + lines.append(f"**Agentic branch**: `{result.agentic_branch}`") + lines.append(f"```bash\ngit checkout {result.agentic_branch}\n```\n") + if result.final_verdict == "PASS": lines.append(_t(config, "pass_msg")) elif result.final_verdict == "ESCALATE": diff --git a/cross_eval/runtime_env.py b/cross_eval/runtime_env.py new file mode 100644 index 0000000..5604585 --- /dev/null +++ b/cross_eval/runtime_env.py @@ -0,0 +1,152 @@ +"""Helpers for building agent runtime environments from .env files.""" +from __future__ import annotations + +import os +from pathlib import Path + +from cross_eval.models import ExecutionConfig + +_SUMMARY_PREFIXES = ( + "CLICKHOUSE", + "CH_", + "DB_", + "DATABASE", + "PG", + "POSTGRES", + "MYSQL", + "REDIS", + "AWS", + "S3", +) + + +def _strip_quotes(value: str) -> str: + if len(value) >= 2 and value[0] == value[-1] and value[0] in {"'", '"'}: + unwrapped = value[1:-1] + if value[0] == '"': + return bytes(unwrapped, "utf-8").decode("unicode_escape") + return unwrapped + return value + + +def parse_dotenv(path: Path) -> dict[str, str]: + """Parse a simple dotenv file into key/value pairs.""" + values: dict[str, str] = {} + for raw_line in path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + if line.startswith("export "): + line = line[len("export ") :].strip() + if "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + if not key: + continue + values[key] = _strip_quotes(value.strip()) + return values + + +def resolve_env_files(execution: ExecutionConfig, project_root: Path) -> list[Path]: + """Resolve and deduplicate configured env files under the project root.""" + candidates: list[Path] = [] + for raw in execution.env_files: + path = Path(raw) + if not path.is_absolute(): + path = project_root / path + candidates.append(path) + + for raw in execution.auto_env_files: + path = project_root / raw + candidates.append(path) + + resolved: list[Path] = [] + seen: set[Path] = set() + for path in candidates: + try: + normalized = path.resolve() + except OSError: + normalized = path + if normalized in seen or not normalized.exists() or not normalized.is_file(): + continue + seen.add(normalized) + resolved.append(normalized) + return resolved + + +def build_runtime_environment( + execution: ExecutionConfig, + project_root: Path, +) -> tuple[dict[str, str], list[Path], dict[str, str]]: + """Build subprocess env plus metadata about loaded files and names.""" + env = os.environ.copy() if execution.inherit_env else {} + loaded_files = resolve_env_files(execution, project_root) + loaded_values: dict[str, str] = {} + for path in loaded_files: + file_values = parse_dotenv(path) + loaded_values.update(file_values) + env.update(file_values) + return env, loaded_files, loaded_values + + +def summarize_environment( + execution: ExecutionConfig, + loaded_files: list[Path], + env: dict[str, str], + loaded_values: dict[str, str], +) -> str: + """Generate a safe environment summary for prompts without leaking secrets.""" + lines: list[str] = [] + if loaded_files: + joined = ", ".join(str(path) for path in loaded_files) + lines.append(f"Loaded env files into the agent process: {joined}") + else: + lines.append("No .env file was auto-loaded into the agent process.") + + if execution.auto_context_targets: + lines.append( + "Execution targets hinted by the user: " + + ", ".join(execution.auto_context_targets) + ) + + if execution.expose_env_names: + visible_names = sorted( + { + key + for key in set(loaded_values) | set(env) + if key.startswith(_SUMMARY_PREFIXES) + or any(prefix in key for prefix in ("CLICKHOUSE", "DATABASE", "DB_")) + } + ) + if visible_names: + lines.append("Relevant env var names available to commands: " + ", ".join(visible_names)) + else: + lines.append("No DB/service env var names matched the default summary filters.") + else: + lines.append("Environment variable values are loaded but names are hidden from the prompt.") + + wants_clickhouse = "clickhouse" in {target.lower() for target in execution.auto_context_targets} + clickhouse_keys = [key for key in env if "CLICKHOUSE" in key or key.startswith("CH_")] + if wants_clickhouse or clickhouse_keys: + if clickhouse_keys: + lines.append("ClickHouse-related environment variables are available to the agent.") + else: + lines.append("No ClickHouse-specific env vars were detected in the loaded environment.") + + return "\n".join(lines) + + +def build_execution_policy(execution: ExecutionConfig) -> str: + """Describe the execution latitude granted to agentic coders/reviewers.""" + lines = [ + f"Execution mode: {execution.mode}", + f"Command policy: {execution.command_policy}", + "The agent may choose shell, Python, git, docker, test, and database commands on its own when needed.", + "The user does not need to pre-specify exact commands.", + ] + if execution.command_policy == "broad": + lines.append("Prefer direct validation by running the minimum set of commands needed to prove a fix.") + else: + lines.append("Keep command usage minimal and focused on validation.") + return "\n".join(lines) diff --git a/cross_eval/worktree.py b/cross_eval/worktree.py new file mode 100644 index 0000000..dda710f --- /dev/null +++ b/cross_eval/worktree.py @@ -0,0 +1,135 @@ +"""Git worktree lifecycle management for agentic mode.""" +from __future__ import annotations + +import logging +import shutil +import subprocess +from datetime import datetime +from pathlib import Path + +logger = logging.getLogger(__name__) + + +class WorktreeError(RuntimeError): + """Error during worktree operations.""" + + +def make_branch_name(preset_name: str) -> str: + """Generate a branch name for agentic results.""" + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + return f"cross-eval/{preset_name}_{ts}" + + +def create_worktree(base_cwd: Path, work_dir: Path, branch_name: str) -> Path: + """Create a git worktree on a new branch from HEAD. + + 1. Create branch from HEAD + 2. Create worktree checked out to that branch + + The branch lives in the original repo, so it survives worktree removal. + """ + work_dir = work_dir.resolve() + if work_dir.exists(): + shutil.rmtree(work_dir) + + # Create the branch at HEAD + try: + subprocess.run( + ["git", "branch", branch_name, "HEAD"], + cwd=base_cwd, + capture_output=True, + text=True, + check=True, + ) + except subprocess.CalledProcessError as e: + raise WorktreeError( + f"Failed to create branch '{branch_name}': {e.stderr.strip()}" + ) from e + + # Create worktree on that branch + try: + subprocess.run( + ["git", "worktree", "add", str(work_dir), branch_name], + cwd=base_cwd, + capture_output=True, + text=True, + check=True, + ) + except subprocess.CalledProcessError as e: + # Clean up the branch if worktree creation fails + subprocess.run( + ["git", "branch", "-D", branch_name], + cwd=base_cwd, + capture_output=True, + ) + raise WorktreeError( + f"Failed to create worktree at {work_dir}: {e.stderr.strip()}" + ) from e + + logger.debug("Created worktree on branch '%s': %s", branch_name, work_dir) + return work_dir + + +def capture_diff(worktree_path: Path) -> str: + """Capture all changes made in the worktree as a unified diff. + + Includes both tracked modifications and new untracked files. + """ + subprocess.run( + ["git", "add", "-A"], + cwd=worktree_path, + capture_output=True, + check=True, + ) + + result = subprocess.run( + ["git", "diff", "--cached", "HEAD"], + cwd=worktree_path, + capture_output=True, + text=True, + ) + return result.stdout.strip() + + +def commit_worktree(worktree_path: Path, message: str) -> bool: + """Stage and commit all changes in the worktree. + + Returns True if a commit was made, False if nothing to commit. + """ + subprocess.run( + ["git", "add", "-A"], + cwd=worktree_path, + capture_output=True, + check=True, + ) + + result = subprocess.run( + ["git", "commit", "-m", message], + cwd=worktree_path, + capture_output=True, + text=True, + ) + # exit code 1 = nothing to commit + return result.returncode == 0 + + +def remove_worktree(base_cwd: Path, work_dir: Path) -> None: + """Remove a git worktree (branch is preserved in the original repo).""" + work_dir = work_dir.resolve() + try: + subprocess.run( + ["git", "worktree", "remove", "--force", str(work_dir)], + cwd=base_cwd, + capture_output=True, + text=True, + check=True, + ) + except subprocess.CalledProcessError: + if work_dir.exists(): + shutil.rmtree(work_dir, ignore_errors=True) + subprocess.run( + ["git", "worktree", "prune"], + cwd=base_cwd, + capture_output=True, + ) + logger.debug("Removed worktree: %s (branch preserved)", work_dir) diff --git a/pyproject.toml b/pyproject.toml index 24d3f55..896a8f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "cross-eval" -version = "0.1.0" +version = "0.2.0" description = "AI agent cross-evaluation CLI tool" requires-python = ">=3.9" dependencies = [ diff --git a/tests/test_agentic.py b/tests/test_agentic.py new file mode 100644 index 0000000..7b3ea70 --- /dev/null +++ b/tests/test_agentic.py @@ -0,0 +1,701 @@ +"""Comprehensive tests for the agentic worktree flow. + +Covers: + 1. worktree.py unit tests (real temp git repo) + 2. agent.py agentic tests (mocking subprocess) + 3. config.py _make_agentic tests + 4. pipeline integration tests (mock invoke_agent / invoke_agent_agentic) +""" +from __future__ import annotations + +import subprocess +import tempfile +import unittest +from pathlib import Path +from unittest.mock import MagicMock, call, patch + +from cross_eval.agent import invoke_agent_agentic +from cross_eval.config import BUILTIN_AGENTS, _make_agentic +from cross_eval.models import ( + AgentConfig, + AgentResult, + PipelineConfig, + StepConfig, +) +from cross_eval.pipeline import ( + _commit_iteration, + _finalize_worktree, + _has_agentic_steps, + _setup_worktree, + run_pipeline, +) +from cross_eval.worktree import ( + capture_diff, + commit_worktree, + create_worktree, + make_branch_name, + remove_worktree, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _init_git_repo(path: Path) -> None: + """Initialise a minimal git repo with one commit.""" + subprocess.run(["git", "init"], cwd=path, capture_output=True, check=True) + subprocess.run( + ["git", "config", "user.email", "test@test.com"], + cwd=path, capture_output=True, check=True, + ) + subprocess.run( + ["git", "config", "user.name", "Test"], + cwd=path, capture_output=True, check=True, + ) + (path / "README.md").write_text("# init\n") + subprocess.run(["git", "add", "."], cwd=path, capture_output=True, check=True) + subprocess.run( + ["git", "commit", "-m", "initial"], + cwd=path, capture_output=True, check=True, + ) + + +# =================================================================== +# 1. worktree.py unit tests (real temp git repo) +# =================================================================== + +class TestCreateWorktree(unittest.TestCase): + """create_worktree creates a worktree on a named branch.""" + + def test_creates_worktree_and_branch(self) -> None: + with tempfile.TemporaryDirectory() as td: + base = Path(td) / "repo" + base.mkdir() + _init_git_repo(base) + + wt_dir = Path(td) / "wt" + branch = "cross-eval/test_branch" + result_path = create_worktree(base, wt_dir, branch) + + # Worktree directory exists + self.assertTrue(result_path.exists()) + # Branch was created in the original repo + branches = subprocess.run( + ["git", "branch", "--list", branch], + cwd=base, capture_output=True, text=True, + ) + self.assertIn(branch, branches.stdout) + + # Clean up + remove_worktree(base, wt_dir) + + +class TestCaptureDiff(unittest.TestCase): + """capture_diff captures changes correctly.""" + + def test_captures_new_and_modified_files(self) -> None: + with tempfile.TemporaryDirectory() as td: + base = Path(td) / "repo" + base.mkdir() + _init_git_repo(base) + + wt_dir = Path(td) / "wt" + branch = "cross-eval/diff_test" + create_worktree(base, wt_dir, branch) + + # Make changes in the worktree + (wt_dir / "new_file.txt").write_text("hello\n") + (wt_dir / "README.md").write_text("# modified\n") + + diff = capture_diff(wt_dir) + self.assertIn("new_file.txt", diff) + self.assertIn("hello", diff) + self.assertIn("modified", diff) + + remove_worktree(base, wt_dir) + + +class TestCommitWorktree(unittest.TestCase): + """commit_worktree commits changes and returns True; False when nothing to commit.""" + + def test_commit_returns_true_on_changes(self) -> None: + with tempfile.TemporaryDirectory() as td: + base = Path(td) / "repo" + base.mkdir() + _init_git_repo(base) + + wt_dir = Path(td) / "wt" + branch = "cross-eval/commit_test" + create_worktree(base, wt_dir, branch) + + (wt_dir / "file.txt").write_text("data\n") + result = commit_worktree(wt_dir, "test commit") + self.assertTrue(result) + + remove_worktree(base, wt_dir) + + def test_commit_returns_false_when_nothing_to_commit(self) -> None: + with tempfile.TemporaryDirectory() as td: + base = Path(td) / "repo" + base.mkdir() + _init_git_repo(base) + + wt_dir = Path(td) / "wt" + branch = "cross-eval/empty_commit" + create_worktree(base, wt_dir, branch) + + result = commit_worktree(wt_dir, "empty") + self.assertFalse(result) + + remove_worktree(base, wt_dir) + + +class TestRemoveWorktree(unittest.TestCase): + """remove_worktree removes worktree but branch survives.""" + + def test_branch_survives_worktree_removal(self) -> None: + with tempfile.TemporaryDirectory() as td: + base = Path(td) / "repo" + base.mkdir() + _init_git_repo(base) + + wt_dir = Path(td) / "wt" + branch = "cross-eval/remove_test" + create_worktree(base, wt_dir, branch) + + remove_worktree(base, wt_dir) + + # Worktree directory should be gone + self.assertFalse(wt_dir.exists()) + + # Branch should still exist in the original repo + branches = subprocess.run( + ["git", "branch", "--list", branch], + cwd=base, capture_output=True, text=True, + ) + self.assertIn(branch, branches.stdout) + + +class TestMakeBranchName(unittest.TestCase): + """make_branch_name generates expected format.""" + + def test_format(self) -> None: + name = make_branch_name("review-fix") + self.assertTrue(name.startswith("cross-eval/review-fix_")) + # Should contain a timestamp-like suffix + parts = name.split("_", 1) + self.assertEqual(len(parts), 2) + # Timestamp portion should be like 20260313_123456 + ts_part = parts[1] # after "cross-eval/review-fix_" + self.assertEqual(len(ts_part), 15) # YYYYMMDD_HHMMSS + + +# =================================================================== +# 2. agent.py agentic tests (mocking subprocess) +# =================================================================== + +class TestInvokeAgentAgenticClaude(unittest.TestCase): + """invoke_agent_agentic builds correct cmd for claude (no -p, prompt as positional arg).""" + + @patch("cross_eval.worktree.capture_diff", return_value="diff --git a/file ...") + @patch("subprocess.run") + def test_claude_cmd_has_no_dash_p_and_prompt_as_positional( + self, mock_run: MagicMock, mock_diff: MagicMock, + ) -> None: + mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="") + + agent = AgentConfig( + name="claude-coder", + command="claude", + args=["--setting-sources", "user", "--dangerously-skip-permissions"], + agentic=True, + ) + + with tempfile.TemporaryDirectory() as td: + wt = Path(td) + _init_git_repo(wt) + + invoke_agent_agentic( + agent, "implement feature X", "coding", + worktree_path=wt, quiet=True, + ) + + # Find the subprocess.run call that actually runs the agent + agent_call = None + for c in mock_run.call_args_list: + cmd = c[0][0] if c[0] else c[1].get("args", []) + if cmd and cmd[0] == "claude": + agent_call = c + break + + self.assertIsNotNone(agent_call, "Expected a subprocess.run call with 'claude'") + cmd = agent_call[0][0] + + # No -p flag + self.assertNotIn("-p", cmd) + # Last arg is a task file reference (not raw prompt — avoids arg length limits) + self.assertIn("task file", cmd[-1].lower()) + + +class TestInvokeAgentAgenticCodex(unittest.TestCase): + """invoke_agent_agentic builds correct cmd for codex (stdin mode, - sentinel).""" + + @patch("cross_eval.worktree.capture_diff", return_value="diff --git a/file ...") + @patch("subprocess.run") + def test_codex_cmd_uses_stdin_with_dash_sentinel( + self, mock_run: MagicMock, mock_diff: MagicMock, + ) -> None: + mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="") + + agent = AgentConfig( + name="codex-coder", + command="codex", + args=["exec", "--full-auto", "--skip-git-repo-check"], + agentic=True, + ) + + with tempfile.TemporaryDirectory() as td: + wt = Path(td) + _init_git_repo(wt) + + invoke_agent_agentic( + agent, "implement feature Y", "coding", + worktree_path=wt, quiet=True, + ) + + agent_call = None + for c in mock_run.call_args_list: + cmd = c[0][0] if c[0] else c[1].get("args", []) + if cmd and cmd[0] == "codex": + agent_call = c + break + + self.assertIsNotNone(agent_call, "Expected a subprocess.run call with 'codex'") + cmd = agent_call[0][0] + + # Should have "-" sentinel at the end for stdin + self.assertEqual(cmd[-1], "-") + # Stdin input should contain the prompt + input_data = agent_call[1].get("input") + self.assertIsNotNone(input_data) + self.assertIn("implement feature Y", input_data) + + +class TestTaskFileCleanup(unittest.TestCase): + """Task file is cleaned up before capture_diff.""" + + @patch("cross_eval.worktree.capture_diff", return_value="(no changes)") + @patch("subprocess.run") + def test_task_file_in_tmp_not_worktree( + self, mock_run: MagicMock, mock_diff: MagicMock, + ) -> None: + mock_run.return_value = MagicMock(returncode=0, stdout="ok", stderr="") + + agent = AgentConfig( + name="claude-coder", command="claude", args=[], agentic=True, + ) + + with tempfile.TemporaryDirectory() as td: + wt = Path(td) + _init_git_repo(wt) + + invoke_agent_agentic( + agent, "do stuff", "coding", + worktree_path=wt, quiet=True, + ) + + # Task file should NOT be in the worktree (it's in /tmp) + self.assertFalse((wt / "CROSS_EVAL_TASK.md").exists()) + + +# =================================================================== +# 3. config.py tests +# =================================================================== + +class TestMakeAgenticClaude(unittest.TestCase): + """_make_agentic strips -p from claude args and sets agentic=True.""" + + def test_strips_dash_p_and_sets_agentic(self) -> None: + agent = AgentConfig( + name="claude-coder", + command="claude", + args=["-p", "--setting-sources", "user", "--model", "opus"], + ) + self.assertFalse(agent.agentic) + _make_agentic(agent) + self.assertTrue(agent.agentic) + self.assertNotIn("-p", agent.args) + self.assertIn("--setting-sources", agent.args) + + def test_idempotent_when_no_dash_p(self) -> None: + agent = AgentConfig( + name="claude-coder", + command="claude", + args=["--setting-sources", "user"], + ) + _make_agentic(agent) + self.assertTrue(agent.agentic) + self.assertEqual(agent.args, ["--setting-sources", "user"]) + + +class TestMakeAgenticCodex(unittest.TestCase): + """_make_agentic on codex agent still works (no -p to strip).""" + + def test_codex_agentic_works(self) -> None: + agent = AgentConfig( + name="codex-coder", + command="codex", + args=["exec", "--full-auto", "-"], + ) + _make_agentic(agent) + self.assertTrue(agent.agentic) + # -p was never there so args are unchanged + self.assertIn("exec", agent.args) + self.assertIn("--full-auto", agent.args) + + +# =================================================================== +# 4. pipeline integration tests +# =================================================================== + +def _make_agentic_config( + run_dir: Path, + agentic_coder: bool = True, +) -> PipelineConfig: + """Build a config with an agentic coder + non-agentic reviewer.""" + coder = AgentConfig( + name="claude-coder", command="claude", + args=["--setting-sources", "user"], + agentic=agentic_coder, + ) + reviewer = AgentConfig( + name="claude-reviewer", command="claude", + args=["-p", "--setting-sources", "user"], + agentic=False, + ) + steps = [ + StepConfig( + name="coding", + agent="claude-coder", + role="coding", + prompt_template="default:coding", + output_key="coding_output", + ), + StepConfig( + name="review", + agent="claude-reviewer", + role="review", + prompt_template="default:review", + output_key="review_result", + verdict=True, + ), + ] + return PipelineConfig( + output_dir=run_dir, + max_iterations=2, + min_iterations=1, + language="en", + inputs={"plan": "Test plan", "checklist": "Test checklist"}, + agents={"claude-coder": coder, "claude-reviewer": reviewer}, + coders=["claude-coder"], + reviewers=["claude-reviewer"], + pipeline=steps, + preset_name="simple", + ) + + +class TestSetupWorktreeCalledForAgentic(unittest.TestCase): + """When agentic agent is configured, _setup_worktree is called.""" + + @patch("cross_eval.pipeline._finalize_worktree", return_value="cross-eval/test") + @patch("cross_eval.pipeline._commit_iteration") + @patch("cross_eval.pipeline._setup_worktree") + @patch("cross_eval.pipeline.invoke_agent_agentic") + @patch("cross_eval.pipeline.invoke_agent") + def test_setup_worktree_called( + self, + mock_invoke: MagicMock, + mock_invoke_agentic: MagicMock, + mock_setup: MagicMock, + mock_commit_iter: MagicMock, + mock_finalize: MagicMock, + ) -> None: + with tempfile.TemporaryDirectory() as td: + run_dir = Path(td) + config = _make_agentic_config(run_dir) + + wt_path = run_dir / "work" + wt_path.mkdir() + mock_setup.return_value = (wt_path, "cross-eval/test") + + mock_invoke_agentic.return_value = AgentResult( + output="diff output", exit_code=0, + agent_name="claude-coder", step_name="coding", + duration_seconds=0.1, + ) + mock_invoke.return_value = AgentResult( + output="VERDICT: PASS", exit_code=0, + agent_name="claude-reviewer", step_name="review", + duration_seconds=0.1, + ) + + run_pipeline(config, cwd=Path(td)) + + mock_setup.assert_called_once() + + +class TestReviewerRunsInWorktreeCwd(unittest.TestCase): + """Reviewer runs with worktree cwd (not original cwd) when worktree exists.""" + + @patch("cross_eval.pipeline._finalize_worktree", return_value="cross-eval/test") + @patch("cross_eval.pipeline._commit_iteration") + @patch("cross_eval.pipeline._setup_worktree") + @patch("cross_eval.pipeline.invoke_agent_agentic") + @patch("cross_eval.pipeline.invoke_agent") + def test_reviewer_uses_worktree_cwd( + self, + mock_invoke: MagicMock, + mock_invoke_agentic: MagicMock, + mock_setup: MagicMock, + mock_commit_iter: MagicMock, + mock_finalize: MagicMock, + ) -> None: + with tempfile.TemporaryDirectory() as td: + run_dir = Path(td) + config = _make_agentic_config(run_dir) + + wt_path = run_dir / "work" + wt_path.mkdir() + mock_setup.return_value = (wt_path, "cross-eval/test") + + mock_invoke_agentic.return_value = AgentResult( + output="diff output", exit_code=0, + agent_name="claude-coder", step_name="coding", + duration_seconds=0.1, + ) + mock_invoke.return_value = AgentResult( + output="VERDICT: PASS", exit_code=0, + agent_name="claude-reviewer", step_name="review", + duration_seconds=0.1, + ) + + run_pipeline(config, cwd=Path(td)) + + # The reviewer (non-agentic) should have been called with cwd=worktree_path + reviewer_call = mock_invoke.call_args + self.assertEqual(reviewer_call[1].get("cwd") or reviewer_call[0][3], wt_path) + + +class TestCommitIterationCalled(unittest.TestCase): + """_commit_iteration is called after each iteration when worktree exists.""" + + @patch("cross_eval.pipeline._finalize_worktree", return_value="cross-eval/test") + @patch("cross_eval.pipeline._commit_iteration") + @patch("cross_eval.pipeline._setup_worktree") + @patch("cross_eval.pipeline.invoke_agent_agentic") + @patch("cross_eval.pipeline.invoke_agent") + def test_commit_iteration_called( + self, + mock_invoke: MagicMock, + mock_invoke_agentic: MagicMock, + mock_setup: MagicMock, + mock_commit_iter: MagicMock, + mock_finalize: MagicMock, + ) -> None: + with tempfile.TemporaryDirectory() as td: + run_dir = Path(td) + config = _make_agentic_config(run_dir) + + wt_path = run_dir / "work" + wt_path.mkdir() + mock_setup.return_value = (wt_path, "cross-eval/test") + + mock_invoke_agentic.return_value = AgentResult( + output="diff output", exit_code=0, + agent_name="claude-coder", step_name="coding", + duration_seconds=0.1, + ) + mock_invoke.return_value = AgentResult( + output="VERDICT: PASS", exit_code=0, + agent_name="claude-reviewer", step_name="review", + duration_seconds=0.1, + ) + + run_pipeline(config, cwd=Path(td)) + + mock_commit_iter.assert_called_once() + call_args = mock_commit_iter.call_args + self.assertEqual(call_args[0][0], wt_path) + + +class TestFinalizeWorktreeCalled(unittest.TestCase): + """_finalize_worktree commits and cleans up at end.""" + + @patch("cross_eval.pipeline._finalize_worktree", return_value="cross-eval/test") + @patch("cross_eval.pipeline._commit_iteration") + @patch("cross_eval.pipeline._setup_worktree") + @patch("cross_eval.pipeline.invoke_agent_agentic") + @patch("cross_eval.pipeline.invoke_agent") + def test_finalize_called( + self, + mock_invoke: MagicMock, + mock_invoke_agentic: MagicMock, + mock_setup: MagicMock, + mock_commit_iter: MagicMock, + mock_finalize: MagicMock, + ) -> None: + with tempfile.TemporaryDirectory() as td: + run_dir = Path(td) + config = _make_agentic_config(run_dir) + + wt_path = run_dir / "work" + wt_path.mkdir() + mock_setup.return_value = (wt_path, "cross-eval/test") + + mock_invoke_agentic.return_value = AgentResult( + output="diff output", exit_code=0, + agent_name="claude-coder", step_name="coding", + duration_seconds=0.1, + ) + mock_invoke.return_value = AgentResult( + output="VERDICT: PASS", exit_code=0, + agent_name="claude-reviewer", step_name="review", + duration_seconds=0.1, + ) + + run_pipeline(config, cwd=Path(td)) + + mock_finalize.assert_called_once() + call_args = mock_finalize.call_args + # Should pass cwd, worktree_path, branch_name, preset_name, verdict + self.assertEqual(call_args[0][1], wt_path) + self.assertEqual(call_args[0][2], "cross-eval/test") + + +class TestParallelAgenticFallsBackToSequential(unittest.TestCase): + """Multiple agentic steps in parallel batch fall back to sequential.""" + + def test_has_agentic_steps_detects_agentic(self) -> None: + coder = AgentConfig( + name="claude-coder", command="claude", args=[], agentic=True, + ) + reviewer = AgentConfig( + name="claude-reviewer", command="claude", args=[], agentic=False, + ) + config = PipelineConfig( + agents={"claude-coder": coder, "claude-reviewer": reviewer}, + ) + steps = [ + StepConfig(name="a", agent="claude-coder", role="coding", + prompt_template="default:coding", output_key="a"), + ] + self.assertTrue(_has_agentic_steps(config, steps)) + + def test_has_agentic_steps_returns_false_without_agentic(self) -> None: + reviewer = AgentConfig( + name="claude-reviewer", command="claude", args=[], agentic=False, + ) + config = PipelineConfig( + agents={"claude-reviewer": reviewer}, + ) + steps = [ + StepConfig(name="r", agent="claude-reviewer", role="review", + prompt_template="default:review", output_key="r", verdict=True), + ] + self.assertFalse(_has_agentic_steps(config, steps)) + + @patch("cross_eval.pipeline._finalize_worktree", return_value="cross-eval/test") + @patch("cross_eval.pipeline._commit_iteration") + @patch("cross_eval.pipeline._setup_worktree") + @patch("cross_eval.pipeline.invoke_agent_agentic") + @patch("cross_eval.pipeline.invoke_agent") + def test_parallel_agentic_runs_sequentially( + self, + mock_invoke: MagicMock, + mock_invoke_agentic: MagicMock, + mock_setup: MagicMock, + mock_commit_iter: MagicMock, + mock_finalize: MagicMock, + ) -> None: + """When multiple agentic steps are parallel, they should run sequentially.""" + with tempfile.TemporaryDirectory() as td: + run_dir = Path(td) + + coder_a = AgentConfig( + name="coder-a", command="claude", args=[], agentic=True, + ) + coder_b = AgentConfig( + name="coder-b", command="claude", args=[], agentic=True, + ) + reviewer = AgentConfig( + name="reviewer", command="claude", args=["-p"], agentic=False, + ) + + steps = [ + StepConfig( + name="code_a", agent="coder-a", role="coding", + prompt_template="default:coding", output_key="code_a", + parallel=True, + ), + StepConfig( + name="code_b", agent="coder-b", role="coding", + prompt_template="default:coding", output_key="code_b", + parallel=True, + ), + StepConfig( + name="review", agent="reviewer", role="review", + prompt_template="default:review", output_key="review_result", + verdict=True, + ), + ] + + config = PipelineConfig( + output_dir=run_dir, + max_iterations=1, + min_iterations=1, + language="en", + inputs={"plan": "Test plan", "checklist": "Test checklist"}, + agents={ + "coder-a": coder_a, + "coder-b": coder_b, + "reviewer": reviewer, + }, + coders=["coder-a", "coder-b"], + reviewers=["reviewer"], + pipeline=steps, + preset_name="custom", + ) + + wt_path = run_dir / "work" + wt_path.mkdir() + mock_setup.return_value = (wt_path, "cross-eval/test") + + call_order: list[str] = [] + + def _track_agentic(agent_config, prompt, step_name, **kwargs): + call_order.append(step_name) + return AgentResult( + output="diff", exit_code=0, + agent_name=agent_config.name, step_name=step_name, + duration_seconds=0.1, + ) + + mock_invoke_agentic.side_effect = _track_agentic + mock_invoke.return_value = AgentResult( + output="VERDICT: PASS", exit_code=0, + agent_name="reviewer", step_name="review", + duration_seconds=0.1, + ) + + run_pipeline(config, cwd=Path(td)) + + # Both agentic steps should have been called (sequentially) + agentic_calls = [c for c in call_order if c.startswith("code_")] + self.assertEqual(len(agentic_calls), 2) + # They should appear in order (sequential, not concurrent) + self.assertEqual(agentic_calls, ["code_a", "code_b"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_config.py b/tests/test_config.py index ba61b92..95f2944 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -6,12 +6,14 @@ from pathlib import Path from unittest.mock import patch from cross_eval.agent import AgentInvocationError, _supports_reasoning_effort -from cross_eval.cli import _apply_phased_iteration_override +from cross_eval.cli import _apply_phased_iteration_override, main from cross_eval.agent import invoke_agent from cross_eval.config import ( BUILTIN_AGENTS, + _SENIOR_SYSTEM_PROMPT, _default_seniors_for_preset, apply_reasoning_effort_settings, + load_config, normalize_reasoning_effort, normalize_prompt_template, normalize_step_role, @@ -52,7 +54,6 @@ from cross_eval.prompts import ( _build_review_only_preset, _build_simple_preset, ) -from cross_eval.config import _SENIOR_SYSTEM_PROMPT from cross_eval.report import build_report, parse_review_metrics, print_escalation_report class BuiltinAgentConfigTest(unittest.TestCase): @@ -954,5 +955,82 @@ class EscalateVerdictTest(unittest.TestCase): self.assertIn("VERDICT: ESCALATE", AGGREGATE_REVIEW_TEMPLATE_KO) +class FixPresetBehaviorTest(unittest.TestCase): + def _write_fix_config(self, root: Path, *, max_iterations: int = 7) -> Path: + (root / "plan.md").write_text("# plan\n", encoding="utf-8") + (root / "checklist.md").write_text("# checklist\n", encoding="utf-8") + config_path = root / "config.yaml" + config_path.write_text( + ( + "inputs:\n" + " plan: plan.md\n" + " checklist: checklist.md\n" + "coders: [claude-coder]\n" + "reviewers: [claude-reviewer]\n" + "pipeline: preset:review-fix\n" + f"max_iterations: {max_iterations}\n" + "language: en\n" + ), + encoding="utf-8", + ) + return config_path + + def test_load_config_syncs_phased_iterations_and_enables_agentic(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + config = load_config(self._write_fix_config(Path(tmpdir), max_iterations=7)) + + self.assertEqual(config.preset_name, "review-fix") + self.assertEqual(config.phases[0].max_iterations, 7) + self.assertTrue(config.agents["claude-coder"].agentic) + self.assertNotIn("-p", config.agents["claude-coder"].args) + + def test_run_config_max_iter_updates_existing_phased_pipeline(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + config_path = self._write_fix_config(Path(tmpdir), max_iterations=7) + captured: dict[str, object] = {} + + def _fake_run_pipeline(config, **kwargs): + captured["phase_max"] = config.phases[0].max_iterations + captured["agentic"] = config.agents[config.coders[0]].agentic + return PipelineResult( + iterations=[], + final_verdict="PASS", + run_dir=Path(tmpdir) / "output", + ) + + with patch("cross_eval.pipeline.run_pipeline", side_effect=_fake_run_pipeline): + exit_code = main([ + "run", + "--config", str(config_path), + "--max-iter", "9", + "--dry-run", + ]) + + self.assertEqual(exit_code, 0) + self.assertEqual(captured["phase_max"], 9) + self.assertTrue(captured["agentic"]) + + def test_run_preset_review_fix_auto_enables_agentic_without_flag(self) -> None: + captured: dict[str, object] = {} + + def _fake_run_pipeline(config, **kwargs): + captured["preset"] = config.preset_name + captured["agentic"] = config.agents[config.coders[0]].agentic + captured["phase_max"] = config.phases[0].max_iterations + return PipelineResult( + iterations=[], + final_verdict="PASS", + run_dir=Path(".cross-eval/output"), + ) + + with patch("cross_eval.pipeline.run_pipeline", side_effect=_fake_run_pipeline): + exit_code = main(["run", "--preset", "review-fix", "--dry-run"]) + + self.assertEqual(exit_code, 0) + self.assertEqual(captured["preset"], "review-fix") + self.assertTrue(captured["agentic"]) + self.assertEqual(captured["phase_max"], 3) + + if __name__ == "__main__": unittest.main()