"""Agent invocation via subprocess with live spinner.""" from __future__ import annotations import itertools import logging import os import subprocess import sys import tempfile import threading import time from pathlib import Path from typing import Optional from cross_eval.models import AgentConfig, AgentResult logger = logging.getLogger(__name__) # CLI tools that support --system-prompt flag natively _SYSTEM_PROMPT_AGENTS = ("claude",) _REASONING_EFFORT_AGENTS = ("codex",) _NO_CHANGE_ACK_MARKERS = ( "no changes", "no code changes", "no file changes", "did not make any changes", "nothing to change", "no modifications were necessary", "no update was necessary", "already satisfied", "no changes needed", "no fixes needed", "everything is correct", "code is correct as-is", "already correct", "no action required", "변경 없음", "수정 없음", "수정할 필요 없음", "변경할 필요 없음", "이미 올바름", "조치 불필요", ) _CHANGE_CLAIM_MARKERS = ( "summary of all changes made", "here's a summary of all changes made", "here is a summary of all changes", "implemented", "i implemented", "i've implemented", "added", "i added", "i've added", "updated", "i updated", "i've updated", "modified", "i modified", "i've modified", "created", "i created", "i've created", "fixed", "i fixed", "i've fixed", "completed the changes", "finished the changes", "made the following changes", "applied the fix", "changes have been applied", "wrote the code", "refactored", "i refactored", "completed all the changes", "finished implementing", "all tasks completed", "done with the implementation", "successfully implemented", "completed the implementation", "all changes have been made", "changes are complete", "수정 완료", "모든 수정이 완료", "변경 요약", "변경 파일", "신규 생성", "기획서 수정", "체크리스트 수정", "문서를 수정", "문서 수정", ) class AgentInvocationError(RuntimeError): """Structured error for agent CLI failures.""" def __init__( self, *, agent_name: str, step_name: str, cmd_preview: str, raw_error: str, failure_type: str, suggested_action: str, ) -> None: self.agent_name = agent_name self.step_name = step_name self.cmd_preview = cmd_preview self.raw_error = raw_error self.failure_type = failure_type self.suggested_action = suggested_action super().__init__( f"Agent '{agent_name}' failed (exit code != 0) at step '{step_name}':\n" f" type: {failure_type}\n" f" cmd: {cmd_preview}\n" f" error: {raw_error or '(no output)'}\n" f" action: {suggested_action}" ) def _supports_system_prompt_flag(command: str) -> bool: """Check if the agent CLI supports --system-prompt flag.""" return any(name in command for name in _SYSTEM_PROMPT_AGENTS) def _supports_reasoning_effort(command: str) -> bool: """Check if the agent CLI supports reasoning effort overrides.""" return any(name in command for name in _REASONING_EFFORT_AGENTS) def _classify_agent_failure(detail: str) -> tuple[str, str]: """Classify a failed agent invocation into a user-actionable bucket.""" normalized = detail.lower() auth_markers = ( "not logged in", "please run /login", "auth", "authentication", "invalid api key", "api key", "unauthorized", "forbidden", ) usage_limit_markers = ( "quota", "rate limit", "credits", "credit balance", "budget", "insufficient funds", "usage limit", "token limit", "billing", ) if any(marker in normalized for marker in auth_markers): return ( "AUTH", "Agent CLI authentication is missing or expired. Re-authenticate the CLI, then rerun.", ) if any(marker in normalized for marker in usage_limit_markers): return ( "USAGE_LIMIT", "Agent CLI hit a quota, billing, or token budget limit. Refill or raise the limit, then rerun.", ) if "api error" in normalized: return ( "API_ERROR", "Agent CLI returned an API error. Inspect the saved error file for the raw response.", ) return ( "UNKNOWN", "Agent CLI failed for an unknown reason. Inspect the saved error file for details.", ) _WRITE_FAILURE_MARKERS = ( "permission denied", "read-only file system", "read only file system", "operation not permitted", "cannot write", "failed to write", "could not write", "unable to write", "sandbox", "eacces", "erofs", ) def _has_write_failure_indicators(stderr: str) -> bool: """Detect stderr patterns indicating the agent could not write files.""" if not stderr.strip(): return False normalized = stderr.lower() return any(marker in normalized for marker in _WRITE_FAILURE_MARKERS) def _claims_file_changes(output: str) -> bool: """Heuristic for agent text that claims code changes were made.""" normalized = output.lower() if not normalized.strip(): return False if any(marker in normalized for marker in _NO_CHANGE_ACK_MARKERS): return False return any(marker in normalized for marker in _CHANGE_CLAIM_MARKERS) class _Spinner: """Animated spinner for long-running agent calls.""" FRAMES = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" _CLEAR_LINE = "\r" + (" " * 160) + "\r" def __init__(self, message: str) -> None: self.message = message self._running = False self._thread: Optional[threading.Thread] = None self._start_time = 0.0 def start(self) -> None: self._running = True self._start_time = time.monotonic() self._thread = threading.Thread(target=self._spin, daemon=True) self._thread.start() def _spin(self) -> None: for frame in itertools.cycle(self.FRAMES): if not self._running: break elapsed = int(time.monotonic() - self._start_time) line = f"\r {frame} {self.message} ({elapsed}s)" sys.stderr.write(line) sys.stderr.flush() time.sleep(0.1) def stop(self, final: str) -> None: self._running = False if self._thread: self._thread.join(timeout=1) elapsed = round(time.monotonic() - self._start_time, 1) sys.stderr.write(self._CLEAR_LINE) sys.stderr.write(f" \u2713 {final} ({elapsed}s)\n") sys.stderr.flush() def _is_print_mode(args: list[str]) -> bool: """Check if the agent args include -p / --print flag.""" return "-p" in args or "--print" in args def invoke_agent( agent: AgentConfig, prompt: str, step_name: str, cwd: Optional[Path] = None, env: Optional[dict[str, str]] = None, timeout: int | None = None, quiet: bool = False, ) -> AgentResult: """Invoke an agent CLI with the given prompt. Args: quiet: If True, suppress spinner (for parallel execution). """ is_claude = "claude" in agent.command is_interactive = is_claude and not _is_print_mode(agent.args) cmd = [agent.command] if agent.reasoning_effort and _supports_reasoning_effort(agent.command): cmd.extend(["-c", f'model_reasoning_effort="{agent.reasoning_effort}"']) cmd.extend(agent.args) # --- Temp files for interactive (non -p) claude --- task_file: Optional[Path] = None output_file: Optional[Path] = None if is_interactive: # Write prompt + output instruction to temp task file task_fd, task_path = tempfile.mkstemp(suffix=".md", prefix="cross_eval_task_") task_file = Path(task_path) os.close(task_fd) out_fd, out_path = tempfile.mkstemp(suffix=".md", prefix="cross_eval_out_") output_file = Path(out_path) os.close(out_fd) # Clear the output file so we can detect if agent wrote to it output_file.write_text("", encoding="utf-8") wrapped_prompt = ( f"{prompt}\n\n" f"---\n" f"IMPORTANT: Write your COMPLETE response to this file: {output_file}\n" f"Do NOT modify any other files in the project." ) task_file.write_text(wrapped_prompt, encoding="utf-8") # System prompt via flag if agent.system_prompt and _supports_system_prompt_flag(agent.command): cmd.extend(["--system-prompt", agent.system_prompt]) # Positional arg: point claude to the task file cmd.append( f"Read the task file at {task_file} and follow all instructions in it. " f"Write your complete output to {output_file}." ) input_data: str | None = None else: # Print mode (-p) or non-claude: deliver prompt via stdin if agent.system_prompt and _supports_system_prompt_flag(agent.command): cmd.extend(["--system-prompt", agent.system_prompt]) input_data = prompt elif agent.system_prompt: input_data = ( f"\n{agent.system_prompt}\n\n\n" f"{prompt}" ) else: input_data = prompt cmd_preview = " ".join(cmd[:6]) logger.debug("Invoking agent '%s': %s", agent.name, " ".join(cmd[:5]) + " ...") spinner: Optional[_Spinner] = None if not quiet: mode_label = "interactive" if is_interactive else "" logger.info(" cmd: %s %s", " ".join(cmd[:6]), f"({mode_label})" if mode_label else "") spinner = _Spinner(f"[{step_name}] {agent.name} running...") spinner.start() try: start = time.monotonic() result = subprocess.run( cmd, input=input_data, capture_output=True, text=True, timeout=timeout, cwd=cwd, env=env, ) duration = time.monotonic() - start except subprocess.TimeoutExpired: if spinner: spinner.stop(f"[{step_name}] TIMEOUT after {timeout}s") raise except Exception: if spinner: spinner.stop(f"[{step_name}] ERROR") raise finally: if task_file: task_file.unlink(missing_ok=True) if result.returncode != 0: if spinner: spinner.stop(f"[{step_name}] FAILED (exit {result.returncode})") if output_file: output_file.unlink(missing_ok=True) err_detail = result.stderr.strip() or result.stdout.strip() if err_detail and len(err_detail) > 500: err_detail = err_detail[:500] + "..." failure_type, suggested_action = _classify_agent_failure(err_detail or "") raise AgentInvocationError( agent_name=agent.name, step_name=step_name, cmd_preview=cmd_preview, raw_error=err_detail or "(no output)", failure_type=failure_type, suggested_action=suggested_action, ) # --- Capture output --- if output_file: output = output_file.read_text(encoding="utf-8").strip() output_file.unlink(missing_ok=True) if not output: # Fallback to stdout if agent didn't write to the file output = result.stdout.strip() else: output = result.stdout.strip() chars = len(output) if spinner: spinner.stop(f"[{step_name}] done — {chars} chars") if not output: stderr_info = result.stderr.strip() if stderr_info: logger.warning( "Agent '%s' produced empty output at step '%s'. stderr: %s", agent.name, step_name, stderr_info[:500], ) else: logger.warning( "Agent '%s' produced empty output at step '%s' (no stderr either)", agent.name, step_name, ) transcript = _build_transcript( command_preview=cmd_preview, stdout=result.stdout, stderr=result.stderr, exit_code=result.returncode, duration_seconds=round(duration, 1), cwd=str(cwd) if cwd else "", ) return AgentResult( output=output, exit_code=result.returncode, agent_name=agent.name, step_name=step_name, duration_seconds=round(duration, 1), transcript=transcript, command_preview=cmd_preview, ) def invoke_agent_agentic( agent: AgentConfig, prompt: str, step_name: str, worktree_path: Path, env: Optional[dict[str, str]] = None, timeout: int | None = None, quiet: bool = False, base_commit: str | None = None, ) -> AgentResult: """Invoke an agent in agentic mode using the worktree as the source of truth.""" from cross_eval.worktree import capture_diff # Write prompt to a temp file (outside worktree, won't appear in diffs) import tempfile task_fd, task_path = tempfile.mkstemp(suffix=".md", prefix="cross_eval_task_") task_file = Path(task_path) task_file.write_text(prompt, encoding="utf-8") os.close(task_fd) cmd = [agent.command] if agent.reasoning_effort and _supports_reasoning_effort(agent.command): cmd.extend(["-c", f'model_reasoning_effort="{agent.reasoning_effort}"']) # Strip print-mode flags and stdin sentinels for agentic mode. # Agentic runs should operate on the worktree and return a real git diff, # not behave as a one-shot text completer. args = [a for a in agent.args if a not in {"-", "-p", "--print"}] cmd.extend(args) # System prompt via flag if supported if agent.system_prompt and _supports_system_prompt_flag(agent.command): cmd.extend(["--system-prompt", agent.system_prompt]) # Deliver the prompt differently per agent type is_codex = "codex" in agent.command input_data: str | None = None if is_codex: # codex: stdin mode cmd.append("-") if agent.system_prompt and not _supports_system_prompt_flag(agent.command): input_data = f"\n{agent.system_prompt}\n\n\n{prompt}" else: input_data = prompt else: # claude: deliver the task through stdin and let the worktree be the # canonical place where files are read/written. input_data = prompt cmd_preview = " ".join(cmd[:6]) logger.debug( "Invoking agent '%s' (agentic) in worktree: %s", agent.name, worktree_path, ) spinner: Optional[_Spinner] = None if not quiet: logger.info(" cmd: %s (agentic)", " ".join(cmd[:6])) spinner = _Spinner(f"[{step_name}] {agent.name} (agentic) running...") spinner.start() try: start = time.monotonic() result = subprocess.run( cmd, input=input_data, capture_output=True, text=True, timeout=timeout, cwd=worktree_path, env=env, ) duration = time.monotonic() - start except subprocess.TimeoutExpired: if spinner: spinner.stop(f"[{step_name}] TIMEOUT after {timeout}s") raise except Exception: if spinner: spinner.stop(f"[{step_name}] ERROR") raise finally: # Clean up temp task file (it's in /tmp, not in worktree) task_file.unlink(missing_ok=True) if result.returncode != 0: if spinner: spinner.stop(f"[{step_name}] FAILED (exit {result.returncode})") err_detail = result.stderr.strip() or result.stdout.strip() if err_detail and len(err_detail) > 500: err_detail = err_detail[:500] + "..." failure_type, suggested_action = _classify_agent_failure(err_detail or "") raise AgentInvocationError( agent_name=agent.name, step_name=step_name, cmd_preview=cmd_preview, raw_error=err_detail or "(no output)", failure_type=failure_type, suggested_action=suggested_action, ) # Capture git diff as the output (changes since the base commit) diff_output = capture_diff(worktree_path, base_commit=base_commit) if not diff_output: stdout_excerpt = (result.stdout or "").strip() stderr_excerpt = (result.stderr or "").strip() # Detect two failure modes: # 1. Agent claims changes in stdout but produced no diff # 2. Agent stderr contains permission or write-failure indicators claims_changes = _claims_file_changes(stdout_excerpt) has_write_failure = _has_write_failure_indicators(stderr_excerpt) if claims_changes or has_write_failure: if spinner: spinner.stop(f"[{step_name}] FAILED (empty diff)") raw_error = stdout_excerpt or "(stdout empty)" if stderr_excerpt: raw_error = f"{raw_error}\n\n[stderr]\n{stderr_excerpt}" if len(raw_error) > 2000: raw_error = raw_error[:2000] + "..." if has_write_failure: failure_type = "WRITE_FAILURE" suggested_action = ( "Agent encountered file write errors (permission denied, read-only, " "or sandbox restriction). Check agent permissions and worktree state." ) else: failure_type = "EMPTY_DIFF" suggested_action = ( "Agent reported code changes but produced no git diff. " "Treat this run as failed and require a real worktree diff before continuing." ) raise AgentInvocationError( agent_name=agent.name, step_name=step_name, cmd_preview=cmd_preview, raw_error=raw_error, failure_type=failure_type, suggested_action=suggested_action, ) diff_output = "(no changes)" logger.warning( "Agent '%s' made no file changes at step '%s'", agent.name, step_name, ) chars = len(diff_output) if spinner: spinner.stop(f"[{step_name}] done — {chars} chars (agentic)") transcript = _build_transcript( command_preview=cmd_preview, stdout=result.stdout, stderr=result.stderr, exit_code=result.returncode, duration_seconds=round(duration, 1), cwd=str(worktree_path), ) return AgentResult( output=diff_output, exit_code=result.returncode, agent_name=agent.name, step_name=step_name, duration_seconds=round(duration, 1), transcript=transcript, command_preview=cmd_preview, ) def _build_transcript( *, command_preview: str, stdout: str, stderr: str, exit_code: int = 0, duration_seconds: float = 0.0, cwd: str = "", ) -> str: """Build a compact execution transcript for debugging/audit output.""" sections = [ "# Agent Execution Transcript", "", "## Command", "```", command_preview or "(unknown command)", "```", "", ] if cwd: sections.extend(["## Working Directory", f"`{cwd}`", ""]) sections.extend([ f"## Exit Code: {exit_code}", "", ]) if duration_seconds > 0: sections.extend([f"## Duration: {duration_seconds}s", ""]) sections.extend([ "## Stdout", "```", (stdout or "(empty)").strip(), "```", "", "## Stderr", "```", (stderr or "(empty)").strip(), "```", "", ]) return "\n".join(sections)