commit ee4f1a07ef1101051cf61a6fd57780493b5e6bd2 Author: 이충영 에이닷서비스개발 Date: Wed Mar 11 21:53:14 2026 +0900 initial commit diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..ab1f416 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,10 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Ignored default folder with query files +/queries/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml +# Editor-based HTTP Client requests +/httpRequests/ diff --git a/.idea/cross-eval.iml b/.idea/cross-eval.iml new file mode 100644 index 0000000..b525243 --- /dev/null +++ b/.idea/cross-eval.iml @@ -0,0 +1,14 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..03d9549 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..f632e42 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..02ac596 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md new file mode 100644 index 0000000..524e6b9 --- /dev/null +++ b/DEVELOPMENT.md @@ -0,0 +1,169 @@ +# Cross-Eval CLI 구현 계획 + +## Context + +AI 에이전트 2개를 활용한 개발 워크플로우(기획→체크리스트→개발→리뷰→반복)에서 발생하는 **과최적화/오탐/누락** 문제를 잡기 위해, 에이전트 간 교차 검증 루프를 자동화하는 CLI 도구를 만든다. 현재 수동으로 2개 에이전트에 복붙하는 과정을 `cross-eval run` 한 줄로 대체한다. + +## 핵심 설계 결정 + +**에이전트가 코드베이스를 직접 탐색한다** — `claude -p`는 non-interactive지만 내장 도구(Read, Glob, Grep)는 사용 가능. 파일 내용을 프롬프트에 전부 넣는 대신, 에이전트가 프로젝트 디렉토리에서 직접 파일을 탐색하도록 한다. +- Generator: `--permission-mode auto` (파일 읽기/쓰기 가능) +- Reviewer: `--permission-mode plan` (읽기 전용 탐색) +- subprocess의 `cwd`를 현재 작업 디렉토리로 설정 + +## 사용자 경험 (UX Flow) + +```bash +# 1. 프로젝트 초기화 +cd my-project +cross-eval init +# → cross-eval.yaml, plan.md, checklist.md 생성 + +# 2. plan.md, checklist.md 작성 후 실행 +cross-eval run + +# 3. 옵션들 +cross-eval run --config custom.yaml --max-iter 5 --dry-run +cross-eval run --input plan=./docs/spec.md --input checklist=./docs/checks.md + +# 4. 결과 확인 +ls output/v1/ v2/ final-report.md +``` + +## 설정 파일 형식 (`cross-eval.yaml`) + +```yaml +output_dir: output +max_iterations: 3 + +inputs: + plan: plan.md + checklist: checklist.md + +agents: + generator: + command: claude + args: ["-p", "--model", "sonnet", "--permission-mode", "auto"] + system_prompt: "You are a senior software engineer. Follow the plan precisely." + reviewer: + command: claude + args: ["-p", "--model", "opus", "--permission-mode", "plan"] + system_prompt: "You are a meticulous code reviewer." + +# 방법 1: 프리셋 사용 (사용자가 pipeline YAML 직접 작성할 필요 없음) +pipeline: preset:simple # "A 생성 → B 리뷰" (기본값) +# pipeline: preset:cross-review # "둘 다 생성 → 서로 리뷰" + +# 방법 2: 직접 커스텀 (고급 사용자용) +# pipeline: +# - name: generate +# agent: generator +# role: generate +# prompt_template: "default:generate" +# output_key: generated_code +# - name: review +# agent: reviewer +# role: review +# prompt_template: "default:review" +# output_key: review_result +# verdict: true +``` + +### 파이프라인 프리셋 + +| 프리셋 | 설명 | 자동 생성되는 steps | +|--------|------|-------------------| +| `simple` | A 생성 → B 리뷰 | generate(agent1) → review(agent2) | +| `cross-review` | 둘 다 생성, 서로 리뷰 | gen_a → gen_b → review_of_b(agent_a) → review_of_a(agent_b) | + +프리셋은 내부적으로 적절한 pipeline steps + context_override를 자동 구성한다. agents에 정의된 순서대로 agent1, agent2가 배정된다. 프리셋이 불충분하면 직접 steps를 작성할 수 있다. + +## 모듈 구조 및 구현 순서 + +``` +cross_eval/ +├── __init__.py (exists) +├── models.py # 1. 모든 데이터클래스 +├── config.py # 2. YAML 로딩 + 검증 +├── prompts.py # 3. 프롬프트 템플릿 +├── agent.py # 4. subprocess 에이전트 호출 +├── pipeline.py # 5. 핵심 반복 루프 +├── report.py # 6. 마크다운 리포트 +└── cli.py # 7. argparse (init, run) +``` + +### 모듈별 핵심 내용 + +**models.py** — 순환 참조 방지, 모든 데이터클래스 집중: +- `AgentConfig` (command, args, system_prompt, stdin_mode) +- `StepConfig` (name, agent, role, prompt_template, output_key, verdict, verdict_pattern, context_override) +- `PipelineConfig` (output_dir, max_iterations, inputs, agents, pipeline) +- `AgentResult` (output, exit_code, agent_name, step_name, duration_seconds) +- `IterationResult` (iteration, step_outputs, verdict, feedback) +- `PipelineResult` (iterations, final_verdict, total_duration) + +**config.py** — YAML → PipelineConfig + 검증: +- step.agent가 agents에 정의되어 있는지 +- output_key 중복 없는지 +- input 파일 존재 여부 +- verdict_pattern 유효한 정규식인지 + +**prompts.py** — 기본 프롬프트 2종 + 파이프라인 프리셋 정의: +- `default:generate` — "기획서에 명시된 것만 구현하라, 과최적화 금지" + plan/checklist/feedback + **"프로젝트 디렉토리의 기존 코드를 탐색하여 컨텍스트를 파악하라"** 지시 +- `default:review` — 과최적화/오탐/누락 3기준 검토 + `VERDICT: PASS|FAIL` 출력 + **"프로젝트 디렉토리를 직접 탐색하여 코드를 검증하라"** 지시 +- `{variable}` 플레이스홀더, 누락 시 `(no {key} provided)` 출력 +- 사용자가 커스텀 .md 파일로 오버라이드 가능 +- `PIPELINE_PRESETS` dict: `simple`, `cross-review` 등 프리셋별 StepConfig 리스트 정의 + +**agent.py** — `invoke_agent(agent_config, prompt, cwd)`: +- `cwd` 파라미터로 프로젝트 디렉토리 지정 → 에이전트가 해당 디렉토리에서 파일 탐색 가능 +- `stdin_mode=false`: prompt를 마지막 인자로 전달 +- `stdin_mode=true`: stdin으로 파이프 (긴 프롬프트용) +- command가 "claude"이고 system_prompt 있으면 `--system-prompt` 자동 주입 +- timeout 600초, 비정상 종료 시 RuntimeError + +**pipeline.py** — 핵심 루프: +``` +for iteration 1..max_iterations: + for step in pipeline: + 1. 템플릿 resolve → context 구성 (inputs + 이전 step 출력 + feedback) + 2. context_override 적용 (교차 리뷰용 변수 매핑) + 3. 에이전트 호출 (cwd=현재 작업 디렉토리) + 4. output_dir/v{i}/{step.name}.md 저장 + 5. verdict step이면 PASS/FAIL 판정 + PASS면 종료, FAIL이면 review 결과를 feedback으로 다음 반복 +final-report.md 생성 +``` + +**report.py** — 최종 마크다운 리포트: +- 요약 테이블 (반복 횟수, 판정, 소요시간) +- 반복별 상세 (각 step 출력, 에이전트명, 소요시간) +- 최종 판정 + +**cli.py** — 서브커맨드: +- `cross-eval init [--dir .] [--preset simple|cross-review]` — 스캐폴딩 (기존 파일 안 덮어씀) +- `cross-eval run [-c config] [--max-iter N] [--dry-run] [--output-dir path] [--input key=path ...]` +- `--input key=path`: config의 inputs 오버라이드/추가 +- `--dry-run`: 에이전트 호출 없이 렌더링된 프롬프트만 출력 + +## 수정할 파일 목록 + +| 파일 | 작업 | +|------|------| +| `cross_eval/__init__.py` | 이미 존재, 수정 없음 | +| `cross_eval/models.py` | **신규 생성** | +| `cross_eval/config.py` | **신규 생성** | +| `cross_eval/prompts.py` | **신규 생성** | +| `cross_eval/agent.py` | **신규 생성** | +| `cross_eval/pipeline.py` | **신규 생성** | +| `cross_eval/report.py` | **신규 생성** | +| `cross_eval/cli.py` | **신규 생성** | +| `pyproject.toml` | 이미 존재, 수정 없음 | + +## 검증 방법 + +1. `pip install -e .` 로 로컬 설치 +2. `cross-eval init` 로 스캐폴딩 확인 (3개 파일 생성) +3. `cross-eval run --dry-run` 로 프롬프트 렌더링 확인 (에이전트 호출 없이) +4. plan.md/checklist.md에 간단한 내용 넣고 `cross-eval run --max-iter 2` 로 실제 실행 +5. `output/` 디렉토리에 v1/, final-report.md 생성 확인 diff --git a/README.md b/README.md new file mode 100644 index 0000000..e286554 --- /dev/null +++ b/README.md @@ -0,0 +1,120 @@ +# cross-eval + +AI 에이전트 간 교차 검증을 자동화하는 CLI 도구. + +기획서와 체크리스트를 기반으로 "생성 → 리뷰 → 피드백 → 재생성" 루프를 자동으로 돌려서, +**과최적화 / 오탐 / 누락** 문제를 잡아냅니다. + +## 설치 + +```bash +# 1. 저장소 클론 +git clone +cd cross-eval + +# 2. 설치 (editable 모드 — 코드 수정 시 재설치 불필요) +pip3 install -e . +``` + +설치 후 터미널 어디서든 `cross-eval` 명령어를 사용할 수 있습니다. + +```bash +cross-eval --version +``` + +### 요구사항 + +- Python 3.9+ +- [Claude CLI](https://docs.anthropic.com/en/docs/claude-code) 설치 및 인증 완료 + +## 사용법 + +### 1. 프로젝트 초기화 + +```bash +cd my-project +cross-eval init +``` + +`.cross-eval/` 폴더 안에 `config.yaml`, `plan-sample.md`, `checklist-sample.md`가 생성됩니다. + +### 2. 기획서 작성 + +샘플 파일을 복사하여 기획서와 체크리스트를 작성합니다. + +```bash +cp .cross-eval/plan-sample.md .cross-eval/plan.md +cp .cross-eval/checklist-sample.md .cross-eval/checklist.md +# plan.md, checklist.md 편집 +``` + +### 3. 실행 + +```bash +# 기본 실행 (생성 → 리뷰, 최대 3회 반복) +cross-eval run + +# 프롬프트만 확인 (에이전트 호출 없이, 비용 절약) +cross-eval run --dry-run + +# 최대 반복 횟수 변경 +cross-eval run --max-iter 5 + +# 입력 파일 오버라이드 +cross-eval run --input plan=./docs/spec.md + +# 설정 파일 지정 +cross-eval run --config .cross-eval/config.yaml +``` + +### 4. 결과 확인 + +``` +output/ +├── v1/ +│ ├── generate.md # 에이전트 생성 결과 +│ └── review.md # 에이전트 리뷰 결과 +├── v2/ +│ ├── generate.md +│ └── review.md +└── final-report.md # 전체 요약 리포트 +``` + +## 설정 (`.cross-eval/config.yaml`) + +```yaml +output_dir: output +max_iterations: 3 +language: ko # ko 또는 en (프롬프트 템플릿 언어) + +inputs: + plan: plan.md # config.yaml 기준 상대경로 + checklist: checklist.md + +agents: + generator: + command: claude + args: ["-p", "--model", "sonnet", "--permission-mode", "auto"] + system_prompt: "You are a senior software engineer." + reviewer: + command: claude + args: ["-p", "--model", "opus", "--permission-mode", "plan"] + system_prompt: "You are a meticulous code reviewer." + +pipeline: preset:simple +``` + +실행 중에 `config.yaml`을 수정하면 다음 반복부터 자동으로 반영됩니다. + +### 파이프라인 프리셋 + +| 프리셋 | 설명 | +|--------|------| +| `simple` | Agent A가 생성, Agent B가 리뷰 (기본값) | +| `cross-review` | 둘 다 생성, 서로 교차 리뷰 | + +```bash +# 초기화 옵션 +cross-eval init --preset cross-review # 교차 리뷰 프리셋 +cross-eval init --lang en # 영어 템플릿 +``` diff --git a/cross_eval.egg-info/PKG-INFO b/cross_eval.egg-info/PKG-INFO new file mode 100644 index 0000000..1eeed19 --- /dev/null +++ b/cross_eval.egg-info/PKG-INFO @@ -0,0 +1,6 @@ +Metadata-Version: 2.4 +Name: cross-eval +Version: 0.1.0 +Summary: AI agent cross-evaluation CLI tool +Requires-Python: >=3.9 +Requires-Dist: pyyaml>=6.0 diff --git a/cross_eval.egg-info/SOURCES.txt b/cross_eval.egg-info/SOURCES.txt new file mode 100644 index 0000000..77a3801 --- /dev/null +++ b/cross_eval.egg-info/SOURCES.txt @@ -0,0 +1,17 @@ +README.md +pyproject.toml +cross_eval/__init__.py +cross_eval/agent.py +cross_eval/cli.py +cross_eval/config.py +cross_eval/models.py +cross_eval/pipeline.py +cross_eval/prompts.py +cross_eval/report.py +cross_eval.egg-info/PKG-INFO +cross_eval.egg-info/SOURCES.txt +cross_eval.egg-info/dependency_links.txt +cross_eval.egg-info/entry_points.txt +cross_eval.egg-info/requires.txt +cross_eval.egg-info/top_level.txt +tests/test_config.py \ No newline at end of file diff --git a/cross_eval.egg-info/dependency_links.txt b/cross_eval.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/cross_eval.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/cross_eval.egg-info/entry_points.txt b/cross_eval.egg-info/entry_points.txt new file mode 100644 index 0000000..f668a8c --- /dev/null +++ b/cross_eval.egg-info/entry_points.txt @@ -0,0 +1,2 @@ +[console_scripts] +cross-eval = cross_eval.cli:main diff --git a/cross_eval.egg-info/requires.txt b/cross_eval.egg-info/requires.txt new file mode 100644 index 0000000..3aecde9 --- /dev/null +++ b/cross_eval.egg-info/requires.txt @@ -0,0 +1 @@ +pyyaml>=6.0 diff --git a/cross_eval.egg-info/top_level.txt b/cross_eval.egg-info/top_level.txt new file mode 100644 index 0000000..59bc124 --- /dev/null +++ b/cross_eval.egg-info/top_level.txt @@ -0,0 +1 @@ +cross_eval diff --git a/cross_eval/__init__.py b/cross_eval/__init__.py new file mode 100644 index 0000000..3dc1f76 --- /dev/null +++ b/cross_eval/__init__.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/cross_eval/__pycache__/__init__.cpython-312.pyc b/cross_eval/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..b05eafd Binary files /dev/null and b/cross_eval/__pycache__/__init__.cpython-312.pyc differ diff --git a/cross_eval/__pycache__/__init__.cpython-313.pyc b/cross_eval/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..db7e8b0 Binary files /dev/null and b/cross_eval/__pycache__/__init__.cpython-313.pyc differ diff --git a/cross_eval/__pycache__/agent.cpython-312.pyc b/cross_eval/__pycache__/agent.cpython-312.pyc new file mode 100644 index 0000000..1bda52d Binary files /dev/null and b/cross_eval/__pycache__/agent.cpython-312.pyc differ diff --git a/cross_eval/__pycache__/agent.cpython-313.pyc b/cross_eval/__pycache__/agent.cpython-313.pyc new file mode 100644 index 0000000..e40f194 Binary files /dev/null and b/cross_eval/__pycache__/agent.cpython-313.pyc differ diff --git a/cross_eval/__pycache__/cli.cpython-312.pyc b/cross_eval/__pycache__/cli.cpython-312.pyc new file mode 100644 index 0000000..b3a8b4f Binary files /dev/null and b/cross_eval/__pycache__/cli.cpython-312.pyc differ diff --git a/cross_eval/__pycache__/cli.cpython-313.pyc b/cross_eval/__pycache__/cli.cpython-313.pyc new file mode 100644 index 0000000..b136656 Binary files /dev/null and b/cross_eval/__pycache__/cli.cpython-313.pyc differ diff --git a/cross_eval/__pycache__/config.cpython-312.pyc b/cross_eval/__pycache__/config.cpython-312.pyc new file mode 100644 index 0000000..08fad4d Binary files /dev/null and b/cross_eval/__pycache__/config.cpython-312.pyc differ diff --git a/cross_eval/__pycache__/config.cpython-313.pyc b/cross_eval/__pycache__/config.cpython-313.pyc new file mode 100644 index 0000000..ea5d029 Binary files /dev/null and b/cross_eval/__pycache__/config.cpython-313.pyc differ diff --git a/cross_eval/__pycache__/models.cpython-312.pyc b/cross_eval/__pycache__/models.cpython-312.pyc new file mode 100644 index 0000000..e872663 Binary files /dev/null and b/cross_eval/__pycache__/models.cpython-312.pyc differ diff --git a/cross_eval/__pycache__/models.cpython-313.pyc b/cross_eval/__pycache__/models.cpython-313.pyc new file mode 100644 index 0000000..80cdbff Binary files /dev/null and b/cross_eval/__pycache__/models.cpython-313.pyc differ diff --git a/cross_eval/__pycache__/pipeline.cpython-312.pyc b/cross_eval/__pycache__/pipeline.cpython-312.pyc new file mode 100644 index 0000000..4f80d03 Binary files /dev/null and b/cross_eval/__pycache__/pipeline.cpython-312.pyc differ diff --git a/cross_eval/__pycache__/pipeline.cpython-313.pyc b/cross_eval/__pycache__/pipeline.cpython-313.pyc new file mode 100644 index 0000000..1d67873 Binary files /dev/null and b/cross_eval/__pycache__/pipeline.cpython-313.pyc differ diff --git a/cross_eval/__pycache__/prompts.cpython-312.pyc b/cross_eval/__pycache__/prompts.cpython-312.pyc new file mode 100644 index 0000000..5a54dcc Binary files /dev/null and b/cross_eval/__pycache__/prompts.cpython-312.pyc differ diff --git a/cross_eval/__pycache__/prompts.cpython-313.pyc b/cross_eval/__pycache__/prompts.cpython-313.pyc new file mode 100644 index 0000000..e6edd0a Binary files /dev/null and b/cross_eval/__pycache__/prompts.cpython-313.pyc differ diff --git a/cross_eval/__pycache__/report.cpython-312.pyc b/cross_eval/__pycache__/report.cpython-312.pyc new file mode 100644 index 0000000..a89c726 Binary files /dev/null and b/cross_eval/__pycache__/report.cpython-312.pyc differ diff --git a/cross_eval/__pycache__/report.cpython-313.pyc b/cross_eval/__pycache__/report.cpython-313.pyc new file mode 100644 index 0000000..9b5cb39 Binary files /dev/null and b/cross_eval/__pycache__/report.cpython-313.pyc differ diff --git a/cross_eval/agent.py b/cross_eval/agent.py new file mode 100644 index 0000000..9ace21f --- /dev/null +++ b/cross_eval/agent.py @@ -0,0 +1,162 @@ +"""Agent invocation via subprocess with live spinner.""" +from __future__ import annotations + +import itertools +import logging +import subprocess +import sys +import threading +import time +from pathlib import Path +from typing import Optional + +from cross_eval.models import AgentConfig, AgentResult + +logger = logging.getLogger(__name__) + +# CLI tools that support --system-prompt flag natively +_SYSTEM_PROMPT_AGENTS = ("claude",) +_REASONING_EFFORT_AGENTS = ("codex",) + + +def _supports_system_prompt_flag(command: str) -> bool: + """Check if the agent CLI supports --system-prompt flag.""" + return any(name in command for name in _SYSTEM_PROMPT_AGENTS) + + +def _supports_reasoning_effort(command: str) -> bool: + """Check if the agent CLI supports reasoning effort overrides.""" + return any(name in command for name in _REASONING_EFFORT_AGENTS) + + +class _Spinner: + """Animated spinner for long-running agent calls.""" + + FRAMES = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" + _CLEAR_LINE = "\r" + (" " * 160) + "\r" + + def __init__(self, message: str) -> None: + self.message = message + self._running = False + self._thread: Optional[threading.Thread] = None + self._start_time = 0.0 + + def start(self) -> None: + self._running = True + self._start_time = time.monotonic() + self._thread = threading.Thread(target=self._spin, daemon=True) + self._thread.start() + + def _spin(self) -> None: + for frame in itertools.cycle(self.FRAMES): + if not self._running: + break + elapsed = int(time.monotonic() - self._start_time) + line = f"\r {frame} {self.message} ({elapsed}s)" + sys.stderr.write(line) + sys.stderr.flush() + time.sleep(0.1) + + def stop(self, final: str) -> None: + self._running = False + if self._thread: + self._thread.join(timeout=1) + elapsed = round(time.monotonic() - self._start_time, 1) + sys.stderr.write(self._CLEAR_LINE) + sys.stderr.write(f" \u2713 {final} ({elapsed}s)\n") + sys.stderr.flush() + + +def invoke_agent( + agent: AgentConfig, + prompt: str, + step_name: str, + cwd: Optional[Path] = None, + timeout: int | None = None, + quiet: bool = False, +) -> AgentResult: + """Invoke an agent CLI with the given prompt. + + Args: + quiet: If True, suppress spinner (for parallel execution). + """ + cmd = [agent.command] + if agent.reasoning_effort and _supports_reasoning_effort(agent.command): + cmd.extend(["-c", f'model_reasoning_effort="{agent.reasoning_effort}"']) + cmd.extend(agent.args) + + # Build the full prompt (system prompt + user prompt) + if agent.system_prompt and _supports_system_prompt_flag(agent.command): + # claude: --system-prompt flag supported natively + cmd.extend(["--system-prompt", agent.system_prompt]) + input_data = prompt + elif agent.system_prompt: + # codex, others: no --system-prompt flag, prepend to prompt + input_data = ( + f"\n{agent.system_prompt}\n\n\n" + f"{prompt}" + ) + else: + input_data = prompt + + logger.debug("Invoking agent '%s': %s", agent.name, " ".join(cmd[:5]) + " ...") + + spinner: Optional[_Spinner] = None + if not quiet: + logger.info(" cmd: %s", " ".join(cmd[:6])) + spinner = _Spinner(f"[{step_name}] {agent.name} running...") + spinner.start() + + try: + start = time.monotonic() + result = subprocess.run( + cmd, + input=input_data, + capture_output=True, + text=True, + timeout=timeout, + cwd=cwd, + ) + duration = time.monotonic() - start + except subprocess.TimeoutExpired: + if spinner: + spinner.stop(f"[{step_name}] TIMEOUT after {timeout}s") + raise + except Exception: + if spinner: + spinner.stop(f"[{step_name}] ERROR") + raise + + output = result.stdout.strip() + chars = len(output) + + if result.returncode != 0: + if spinner: + spinner.stop(f"[{step_name}] FAILED (exit {result.returncode})") + err_detail = result.stderr.strip() or result.stdout.strip() + if err_detail and len(err_detail) > 500: + err_detail = err_detail[:500] + "..." + cmd_preview = " ".join(cmd[:6]) + raise RuntimeError( + f"Agent '{agent.name}' failed (exit code {result.returncode}) " + f"at step '{step_name}':\n" + f" cmd: {cmd_preview}\n" + f" error: {err_detail or '(no output)'}" + ) + + if spinner: + spinner.stop(f"[{step_name}] done — {chars} chars") + + if not output: + logger.warning( + "Agent '%s' produced empty output at step '%s'", + agent.name, step_name, + ) + + return AgentResult( + output=output, + exit_code=result.returncode, + agent_name=agent.name, + step_name=step_name, + duration_seconds=round(duration, 1), + ) diff --git a/cross_eval/cli.py b/cross_eval/cli.py new file mode 100644 index 0000000..68dc75e --- /dev/null +++ b/cross_eval/cli.py @@ -0,0 +1,701 @@ +"""CLI entry point with argparse subcommands.""" +from __future__ import annotations + +import argparse +import logging +import sys +from pathlib import Path + +from cross_eval import __version__ +from cross_eval.config import REASONING_EFFORT_CHOICES + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Scaffolding templates for `cross-eval init` +# --------------------------------------------------------------------------- + +DEFAULT_CONFIG_YAML = """\ +# ─── cross-eval 설정 ─────────────────────────────────────────── +# +# 기본 제공 에이전트 (별도 정의 없이 바로 사용 가능): +# claude-coder, claude-reviewer (Claude, opus 모델) +# claude-senior (Claude, opus 모델) +# codex-coder, codex-reviewer (Codex, gpt-5.4 모델) +# codex-senior (Codex, gpt-5.4 모델) +# +# CLI에서 --coder claude --reviewer codex --senior codex 같이 축약해서 지정 가능 +# ──────────────────────────────────────────────────────────────── + +# 입력 파일 (이 파일 기준 상대경로) +inputs: + plan: plan.md + checklist: checklist.md + +# 에이전트 역할 지정 +coders: [claude-coder] +reviewers: [claude-reviewer] +# seniors: [codex-senior] + +# 파이프라인 종류: simple | cross-review | review-only | review-fix +pipeline: preset:{preset} + +# 반복 설정 +max_iterations: 3 +# min_iterations: 1 # PASS여도 최소 이만큼 반복 + +# 프롬프트 언어 +language: {language} + +# 결과 저장 경로 +output_dir: output + +# ─── 커스텀 에이전트 (선택) ──────────────────────────────────── +# 기본 제공 에이전트를 덮어쓰거나 새 에이전트를 정의할 수 있습니다. +# +# agents: +# my-reviewer: +# command: my-tool +# args: ["--flag"] +# system_prompt: "..." +# ──────────────────────────────────────────────────────────────── +""" + +PLAN_SAMPLE_EN = """\ +# Project Plan + +## Objective +[Describe what you want to build] + +## Requirements +1. [Requirement 1] +2. [Requirement 2] + +## Constraints +- [Constraint 1] +- [Constraint 2] + +## Out of Scope +- [Explicitly list what should NOT be implemented] +""" + +PLAN_SAMPLE_KO = """\ +# 프로젝트 기획서 + +## 목표 +[구현할 내용을 설명하세요] + +## 요구사항 +1. [요구사항 1] +2. [요구사항 2] + +## 제약조건 +- [제약조건 1] +- [제약조건 2] + +## 범위 밖 (구현하지 않을 것) +- [명시적으로 구현하지 않을 항목 나열] +""" + +CHECKLIST_SAMPLE_EN = """\ +# Implementation Checklist + +## Functional Requirements +- [ ] [Item 1] +- [ ] [Item 2] + +## Code Quality +- [ ] No unused imports or dead code +- [ ] Error handling for edge cases +- [ ] Follows project coding conventions + +## Constraints +- [ ] Does NOT add features beyond the plan +- [ ] Does NOT introduce unnecessary abstractions +""" + +CHECKLIST_SAMPLE_KO = """\ +# 구현 체크리스트 + +## 기능 요구사항 +- [ ] [항목 1] +- [ ] [항목 2] + +## 코드 품질 +- [ ] 사용하지 않는 import나 죽은 코드 없음 +- [ ] 엣지 케이스에 대한 에러 처리 +- [ ] 프로젝트 코딩 컨벤션 준수 + +## 제약 +- [ ] 기획서 범위를 넘는 기능을 추가하지 않음 +- [ ] 불필요한 추상화를 도입하지 않음 +""" + + +# --------------------------------------------------------------------------- +# CLI entry point +# --------------------------------------------------------------------------- + +def main(argv: list[str] | None = None) -> int: + """Main CLI entry point.""" + parser = argparse.ArgumentParser( + prog="cross-eval", + description=( + "AI 코딩 에이전트의 결과물을 자동으로 검증하는 CLI 도구.\n" + "\n" + "동작 방식:\n" + " 1. 기획서(plan)를 바탕으로 Coder 에이전트가 코드를 생성\n" + " 2. Reviewer 에이전트가 기획서 대비 코드를 검토하고 PASS/FAIL 판정\n" + " 3. FAIL이면 피드백을 반영해서 1~2를 반복 (최대 N회)\n" + "\n" + "빠른 시작:\n" + " cross-eval init 설정 파일 생성\n" + " cross-eval run --plan plan.md 기획서로 바로 실행\n" + " cross-eval run .cross-eval/config.yaml 기반 실행\n" + "\n" + "자세한 사용법: cross-eval --help" + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "-v", "--version", + action="version", + version=f"%(prog)s {__version__}", + ) + parser.add_argument( + "--verbose", + action="store_true", + help="상세 로그 출력", + ) + + subparsers = parser.add_subparsers(dest="command") + + # --- init --- + init_parser = subparsers.add_parser( + "init", + help="설정 파일 생성 (config.yaml, plan.md, checklist.md)", + description=( + "현재 디렉토리에 .cross-eval/ 폴더를 만들고 템플릿을 생성합니다.\n" + "이미 있는 파일은 건드리지 않습니다.\n" + "\n" + "생성되는 파일:\n" + " .cross-eval/config.yaml 에이전트, 파이프라인 설정\n" + " .cross-eval/plan.md 기획서 템플릿\n" + " .cross-eval/checklist.md 체크리스트 템플릿" + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + init_parser.add_argument( + "--dir", + type=Path, + default=Path("."), + help="초기화할 디렉토리 (기본: 현재 디렉토리)", + ) + init_parser.add_argument( + "--preset", + default="simple", + choices=["simple", "cross-review", "review-only", "review-fix"], + help=( + "파이프라인 종류 (기본: simple). " + "simple=코딩+리뷰, cross-review=교차리뷰, " + "review-only=리뷰만, review-fix=리뷰수렴+자동수정" + ), + ) + init_parser.add_argument( + "--lang", + default="ko", + choices=["en", "ko"], + help="프롬프트 언어 (기본: ko)", + ) + + # --- run --- + run_parser = subparsers.add_parser( + "run", + help="검증 파이프라인 실행", + description=( + "기획서(plan)를 기반으로 AI 에이전트가 코드 생성과 리뷰를 반복합니다.\n" + "\n" + "설정 파일 없이 바로 실행할 수 있고, config.yaml로도 실행할 수 있습니다.\n" + "CLI 옵션이 config.yaml보다 우선합니다." + ), + epilog=( + "파이프라인 종류 (--preset):\n" + " ┌──────────────┬─────────────────────────────────────────────────────┐\n" + " │ simple │ Coder가 코드 생성 → Reviewer가 리뷰 │\n" + " │ (기본값) │ FAIL이면 피드백 반영해서 재생성, PASS까지 반복 │\n" + " ├──────────────┼─────────────────────────────────────────────────────┤\n" + " │ review-fix │ 2단계 파이프라인: │\n" + " │ │ Reviewer N명 병렬 리뷰 → 취합 → 수정 → 재검증 │\n" + " ├──────────────┼─────────────────────────────────────────────────────┤\n" + " │ review-only │ 코드 생성 없이 Reviewer N명이 기존 코드만 검토 │\n" + " │ │ (이미 작성된 코드의 품질 감사용) │\n" + " ├──────────────┼─────────────────────────────────────────────────────┤\n" + " │ cross-review │ Coder 2명이 각각 구현 → 상대방 코드를 교차 리뷰 │\n" + " │ │ (서로 다른 에이전트의 구현 비교용) │\n" + " └──────────────┴─────────────────────────────────────────────────────┘\n" + "\n" + "기본 제공 에이전트:\n" + " ┌──────────────────┬─────────┬───────────┬──────────────────────────┐\n" + " │ 이름 │ CLI │ 기본 모델 │ 역할 │\n" + " ├──────────────────┼─────────┼───────────┼──────────────────────────┤\n" + " │ claude-coder │ claude │ opus │ 코드 생성 │\n" + " │ claude-reviewer │ claude │ opus │ 코드 리뷰 │\n" + " │ claude-senior │ claude │ opus │ 리뷰 취합/판정 │\n" + " │ codex-coder │ codex │ gpt-5.4 │ 코드 생성 │\n" + " │ codex-reviewer │ codex │ gpt-5.4 │ 코드 리뷰 │\n" + " │ codex-senior │ codex │ gpt-5.4 │ 리뷰 취합/판정 │\n" + " └──────────────────┴─────────┴───────────┴──────────────────────────┘\n" + " --coder, --reviewer, --senior에서 축약 가능: claude → claude-\n" + "\n" + "사용 예시:\n" + "\n" + " 기본 실행 (Claude가 코딩하고 Claude가 리뷰):\n" + " cross-eval run --plan plan.md\n" + "\n" + " Codex가 코딩, Claude가 리뷰:\n" + " cross-eval run --plan plan.md --coder codex --reviewer claude\n" + "\n" + " 리뷰어 2명 (Claude + Codex):\n" + " cross-eval run --plan plan.md --reviewer claude --reviewer codex\n" + "\n" + " 리뷰 취합용 Senior 추가:\n" + " cross-eval run --plan plan.md --preset review-fix \\\n" + " --reviewer claude --reviewer codex --senior codex\n" + "\n" + " 리뷰 수렴 후 자동 수정 (review-fix):\n" + " cross-eval run --plan plan.md --preset review-fix \\\n" + " --reviewer claude --reviewer codex\n" + "\n" + " 기존 코드 리뷰만 (review-only):\n" + " cross-eval run --plan plan.md --preset review-only \\\n" + " --reviewer claude --reviewer codex\n" + "\n" + " 모델 변경:\n" + " cross-eval run --plan plan.md --model sonnet\n" + "\n" + " config.yaml 기반 실행:\n" + " cross-eval run\n" + " cross-eval run -c my-config.yaml" + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + # -- 입력 파일 -- + input_group = run_parser.add_argument_group("입력 파일") + input_group.add_argument( + "--plan", type=Path, default=None, + help="기획서 파일 경로 (필수)", + ) + input_group.add_argument( + "--checklist", type=Path, default=None, + help="체크리스트 파일 경로 (선택)", + ) + input_group.add_argument( + "--docs", type=Path, default=None, + help="참고 문서 폴더. 폴더 안 모든 파일을 에이전트에게 전달", + ) + input_group.add_argument( + "--input", action="append", dest="inputs", metavar="KEY=PATH", + help="추가 입력 파일 (예: --input spec=./api-spec.md)", + ) + + # -- 에이전트 설정 -- + agent_group = run_parser.add_argument_group( + "에이전트 설정", + "축약 가능: claude → claude-, codex → codex-", + ) + agent_group.add_argument( + "--coder", action="append", dest="coders", metavar="NAME", + help="코드를 생성할 에이전트 (여러 개 가능, 기본: claude)", + ) + agent_group.add_argument( + "--reviewer", action="append", dest="reviewers", metavar="NAME", + help="코드를 리뷰할 에이전트 (여러 개 가능, 기본: claude)", + ) + agent_group.add_argument( + "--senior", action="append", dest="seniors", metavar="NAME", + help="리뷰를 취합하고 최종 판정할 시니어 에이전트 (선택)", + ) + agent_group.add_argument( + "--reasoning-effort", default=None, metavar="LEVEL", + choices=REASONING_EFFORT_CHOICES + ("extra-high", "extra_high", "x-high"), + help="모든 역할의 reasoning effort (minimal|low|medium|high|xhigh)", + ) + agent_group.add_argument( + "--coder-effort", default=None, metavar="LEVEL", + choices=REASONING_EFFORT_CHOICES + ("extra-high", "extra_high", "x-high"), + help="Coder용 reasoning effort", + ) + agent_group.add_argument( + "--reviewer-effort", default=None, metavar="LEVEL", + choices=REASONING_EFFORT_CHOICES + ("extra-high", "extra_high", "x-high"), + help="Reviewer용 reasoning effort", + ) + agent_group.add_argument( + "--senior-effort", default=None, metavar="LEVEL", + choices=REASONING_EFFORT_CHOICES + ("extra-high", "extra_high", "x-high"), + help="Senior용 reasoning effort", + ) + agent_group.add_argument( + "--model", default=None, metavar="MODEL", + help="모든 에이전트의 모델을 한번에 변경 (예: sonnet, opus)", + ) + agent_group.add_argument( + "--generator-model", default=None, metavar="MODEL", + help="Coder 에이전트 모델만 변경", + ) + agent_group.add_argument( + "--reviewer-model", default=None, metavar="MODEL", + help="Reviewer 에이전트 모델만 변경", + ) + + # -- 파이프라인 -- + pipe_group = run_parser.add_argument_group("파이프라인") + pipe_group.add_argument( + "--preset", default=None, + choices=["simple", "cross-review", "review-only", "review-fix"], + help="파이프라인 종류 (기본: simple). 각 종류 설명은 아래 참조", + ) + pipe_group.add_argument( + "--max-iter", type=int, default=None, + help="최대 반복 횟수 (기본: 3)", + ) + pipe_group.add_argument( + "--min-iter", type=int, default=None, + help="최소 반복 횟수. PASS여도 이 횟수까지 반복 (기본: 1)", + ) + pipe_group.add_argument( + "--timeout", type=int, default=None, metavar="SEC", + help="에이전트 1회 호출 제한 시간(초). 0=무제한 (기본: 무제한)", + ) + pipe_group.add_argument( + "--lang", default=None, choices=["en", "ko"], + help="프롬프트 언어 (기본: ko)", + ) + + # -- 기타 -- + etc_group = run_parser.add_argument_group("기타") + etc_group.add_argument( + "-c", "--config", type=Path, default=None, + help="설정 파일 경로 (기본: .cross-eval/config.yaml)", + ) + etc_group.add_argument( + "--output-dir", type=Path, default=None, + help="결과 저장 디렉토리 (기본: output/)", + ) + etc_group.add_argument( + "--dry-run", action="store_true", + help="실제 실행 없이 에이전트에게 보낼 프롬프트만 미리보기", + ) + + args = parser.parse_args(argv) + + # Setup logging + level = logging.DEBUG if args.verbose else logging.INFO + logging.basicConfig( + level=level, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", + ) + + if args.command == "init": + return cmd_init(args) + elif args.command == "run": + return cmd_run(args) + else: + parser.print_help() + return 0 + + +def cmd_init(args: argparse.Namespace) -> int: + """Scaffold a new cross-eval project.""" + target = args.dir.resolve() + ce_dir = target / ".cross-eval" + ce_dir.mkdir(parents=True, exist_ok=True) + + lang = args.lang + plan_sample = PLAN_SAMPLE_KO if lang == "ko" else PLAN_SAMPLE_EN + checklist_sample = CHECKLIST_SAMPLE_KO if lang == "ko" else CHECKLIST_SAMPLE_EN + + files = { + ".cross-eval/config.yaml": DEFAULT_CONFIG_YAML.format( + preset=args.preset, language=lang, + ), + ".cross-eval/plan.md": plan_sample, + ".cross-eval/checklist.md": checklist_sample, + } + + created = [] + skipped = [] + for name, content in files.items(): + path = target / name + if path.exists(): + skipped.append(name) + else: + path.write_text(content, encoding="utf-8") + created.append(name) + + if created: + print(f" 생성: {', '.join(created)}") + if skipped: + print(f" 이미 존재 (건너뜀): {', '.join(skipped)}") + + print(f"\n 파이프라인: {args.preset}") + print(f" 언어: {lang}") + print("") + print("다음 단계:") + print(" 1. .cross-eval/plan.md 에 기획서 작성") + print(" 2. .cross-eval/checklist.md 에 체크리스트 작성 (선택)") + print(" 3. cross-eval run 으로 실행") + print("") + print("주의: 에이전트는 기본적으로 파일 읽기/쓰기/실행 권한을 가집니다.") + print(" 실행 전에 .cross-eval/config.yaml 을 확인하세요.") + return 0 + + +def _read_docs_dir(docs_dir: Path) -> str: + """Read all files in a directory and concatenate with filename headers.""" + parts: list[str] = [] + for f in sorted(docs_dir.iterdir()): + if f.is_file() and not f.name.startswith("."): + try: + content = f.read_text(encoding="utf-8") + parts.append(f"### {f.name}\n{content}") + except (UnicodeDecodeError, OSError): + continue # skip binary or unreadable files + return "\n\n".join(parts) + + +def _apply_model_override(config, agent_name: str, model: str) -> None: + """Replace --model in agent args.""" + agent = config.agents.get(agent_name) + if agent is None: + return + new_args = list(agent.args) + for i, arg in enumerate(new_args): + if arg == "--model" and i + 1 < len(new_args): + new_args[i + 1] = model + agent.args = new_args + return + # --model not found, append it + new_args.extend(["--model", model]) + agent.args = new_args + + +def cmd_run(args: argparse.Namespace) -> int: + """Load config, validate, and execute the pipeline.""" + from cross_eval.config import ( + apply_input_overrides, + default_config, + load_config, + validate_config, + ) + from cross_eval.prompts import PIPELINE_PRESETS + from cross_eval.pipeline import run_pipeline + + # 1. Load config: YAML if exists, otherwise defaults + config_path = args.config + if config_path is not None: + config_path = config_path.resolve() + if not config_path.exists(): + print(f"Config file not found: {config_path}", file=sys.stderr) + return 1 + try: + config = load_config(config_path) + except (ValueError, FileNotFoundError) as e: + print(f"Config error: {e}", file=sys.stderr) + return 1 + config_source = config_path.name + else: + # Try default location, fall back to built-in defaults + default_path = Path(".cross-eval/config.yaml").resolve() + if default_path.exists(): + try: + config = load_config(default_path) + config_source = default_path.name + except (ValueError, FileNotFoundError) as e: + print(f"Config error: {e}", file=sys.stderr) + return 1 + else: + config = default_config() + config_source = "defaults" + + # 2. Apply CLI overrides + if args.max_iter is not None: + config.max_iterations = args.max_iter + if args.min_iter is not None: + config.min_iterations = args.min_iter + if args.output_dir is not None: + config.output_dir = args.output_dir + if args.lang is not None: + config.language = args.lang + + # --coder / --reviewer: resolve shorthands and override roles + from cross_eval.config import ( + _default_seniors_for_preset, + _infer_roles, + _resolve_agents, + apply_reasoning_effort_settings, + resolve_agent_shorthand, + ) + if args.coders or args.reviewers or args.seniors: + coders = [resolve_agent_shorthand(c, "coder") for c in (args.coders or [])] + reviewers = [resolve_agent_shorthand(r, "reviewer") for r in (args.reviewers or [])] + seniors = [resolve_agent_shorthand(s, "senior") for s in (args.seniors or [])] + # Fill defaults if only one side specified + if not coders: + coders = config.coders or ["claude-coder"] + if not reviewers: + reviewers = config.reviewers or ["claude-reviewer"] + if not seniors: + seniors = config.seniors + config.coders = coders + config.reviewers = reviewers + config.seniors = seniors + # Auto-merge built-in agents + config.agents = _resolve_agents(config.agents, coders, reviewers, seniors) + + # --preset: rebuild pipeline from preset + need_rebuild = args.preset is not None or args.coders or args.reviewers or args.seniors + if need_rebuild: + from cross_eval.prompts import PHASED_PRESETS + preset = args.preset or "simple" + # Determine which preset was configured (from YAML or defaults) + if args.preset is None and config.phases: + preset = "review-fix" # only phased preset currently + elif args.preset is None and not args.coders and not args.reviewers and not args.seniors: + pass # no changes needed + inferred_coders, inferred_reviewers, inferred_seniors = _infer_roles( + list(config.agents.keys()) + ) + coders = config.coders or inferred_coders + reviewers = config.reviewers or inferred_reviewers + seniors = config.seniors or [] + if not seniors: + seniors = _default_seniors_for_preset( + f"preset:{preset}", + reviewers, + config.agents, + ) + config.agents = _resolve_agents(config.agents, coders, reviewers, seniors) + config.coders = coders + config.reviewers = reviewers + config.seniors = seniors + config.preset_name = preset + if preset in PHASED_PRESETS: + config.phases = PHASED_PRESETS[preset](coders, reviewers, seniors) + config.pipeline = [] + elif preset in PIPELINE_PRESETS: + config.pipeline = PIPELINE_PRESETS[preset](coders, reviewers, seniors) + config.phases = [] + if preset == "review-only" and args.max_iter is None and args.min_iter is None: + config.max_iterations = 1 + + apply_reasoning_effort_settings( + config, + reasoning_effort=args.reasoning_effort, + coder_effort=args.coder_effort, + reviewer_effort=args.reviewer_effort, + senior_effort=args.senior_effort, + ) + + # --model: apply to ALL agents + if args.model is not None: + for agent_name in config.agents: + _apply_model_override(config, agent_name, args.model) + # --generator-model / --reviewer-model: apply by role + if args.generator_model is not None: + for coder_name in config.coders: + _apply_model_override(config, coder_name, args.generator_model) + if args.reviewer_model is not None: + for reviewer_name in config.reviewers: + _apply_model_override(config, reviewer_name, args.reviewer_model) + + # --plan / --checklist shortcuts + for key, val in [("plan", args.plan), ("checklist", args.checklist)]: + if val is not None: + p = val.resolve() + if not p.exists(): + print(f"File not found: {p}", file=sys.stderr) + return 1 + config.inputs[key] = p + + # --docs: read all files in directory, inject as {docs} + if args.docs is not None: + docs_dir = args.docs.resolve() + if not docs_dir.is_dir(): + print(f"Not a directory: {docs_dir}", file=sys.stderr) + return 1 + docs_content = _read_docs_dir(docs_dir) + if not docs_content: + print(f"No files found in: {docs_dir}", file=sys.stderr) + return 1 + config.inputs["docs"] = docs_content + + if args.inputs: + overrides = {} + for item in args.inputs: + if "=" not in item: + print( + f"Invalid --input format: '{item}'. Use KEY=PATH.", + file=sys.stderr, + ) + return 1 + key, path = item.split("=", 1) + overrides[key] = path + apply_input_overrides(config, overrides) + + # 3. Validate after all overrides + from cross_eval.config import validate_config + errors = validate_config(config) + if errors: + print("Config error:\n " + "\n ".join(errors), file=sys.stderr) + return 1 + + # 4. Run pipeline + logger.info("Config: %s", config_source) + logger.info( + "Agents: %s", + ", ".join(f"{n} ({a.command})" for n, a in config.agents.items()), + ) + if config.coders or config.reviewers or config.seniors: + logger.info("Coders: %s", config.coders) + logger.info("Reviewers: %s", config.reviewers) + logger.info("Seniors: %s", config.seniors) + if config.phases: + phase_desc = " → ".join( + f"{p.name}(max {p.max_iterations}, {p.consecutive_pass}xPASS)" + for p in config.phases + ) + logger.info("Pipeline: phased [%s], lang=%s", phase_desc, config.language) + else: + iter_info = f"max {config.max_iterations}" + if config.min_iterations > 1: + iter_info = f"min {config.min_iterations}, max {config.max_iterations}" + logger.info( + "Pipeline: %d steps, %s iterations, lang=%s", + len(config.pipeline), iter_info, config.language, + ) + + try: + raw_timeout = args.timeout if args.timeout is not None else 0 + agent_timeout = None if raw_timeout == 0 else raw_timeout + result = run_pipeline(config, dry_run=args.dry_run, timeout=agent_timeout) + except (RuntimeError, KeyboardInterrupt) as e: + if isinstance(e, KeyboardInterrupt): + print("\nInterrupted by user.", file=sys.stderr) + return 130 + print(f"Pipeline error: {e}", file=sys.stderr) + return 1 + + # 4. Print summary + print(f"\nResult: {result.final_verdict}") + print(f"Iterations: {len(result.iterations)}") + if not args.dry_run and result.run_dir: + print(f"Output: {result.run_dir}/") + + return 0 if result.final_verdict == "PASS" else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/cross_eval/config.py b/cross_eval/config.py new file mode 100644 index 0000000..ad5c620 --- /dev/null +++ b/cross_eval/config.py @@ -0,0 +1,607 @@ +"""Configuration loading, validation, and preset resolution.""" +from __future__ import annotations + +import logging +import re +from pathlib import Path +from typing import Any + +import yaml + +from cross_eval.models import AgentConfig, PhaseConfig, PipelineConfig, StepConfig +from cross_eval.prompts import PHASED_PRESETS, PIPELINE_PRESETS + +logger = logging.getLogger(__name__) + +REASONING_EFFORT_ALIASES = { + "extra-high": "xhigh", + "extra_high": "xhigh", + "x-high": "xhigh", +} +REASONING_EFFORT_CHOICES = ("minimal", "low", "medium", "high", "xhigh") +DEFAULT_ROLE_REASONING_EFFORTS = { + "coder": "medium", + "reviewer": "medium", + "senior": "high", +} + + +# --------------------------------------------------------------------------- +# Built-in agent registry +# --------------------------------------------------------------------------- + +_CODEX_ARGS = [ + "exec", + "--full-auto", + "--skip-git-repo-check", + "--model", + "gpt-5.4", + "-", +] + +_CODER_SYSTEM_PROMPT = ( + "You are a senior software engineer implementing code changes.\n" + "Rules:\n" + "1. FIRST explore the project directory to understand the existing codebase, " + "patterns, and conventions before writing any code.\n" + "2. Implement ONLY what the plan specifies. Do NOT add extra features, " + "unnecessary abstractions, premature optimizations, or \"nice-to-have\" improvements.\n" + "3. Follow the project's existing coding style, naming conventions, and directory structure.\n" + "4. If previous review feedback is provided, fix ONLY the specific issues mentioned. " + "Do NOT refactor unrelated code.\n" + "5. Ignore any items from previous feedback that were marked as DISMISSED or false positive.\n" + "6. When in doubt about scope, do LESS, not more." +) + +_REVIEWER_SYSTEM_PROMPT = ( + "You are a code reviewer. You MUST NOT create, modify, or delete any files.\n" + "Rules:\n" + "1. Explore the project directory to understand the full codebase context.\n" + "2. Compare the implementation against the plan and checklist ONLY.\n" + "3. Classify every issue with BOTH severity AND category:\n" + " - Severity: Critical (breaks functionality/security) > Major (requirement mismatch) > Minor (convention/style)\n" + " - Category: Over-engineering / Omission\n" + "4. When reviewing with previous feedback, mark items as CONFIRMED (still an issue) " + "or DISMISSED (false positive) with rationale.\n" + "5. Report out-of-scope issues separately — problems found outside plan/checklist scope.\n" + "6. Order issues by severity (Critical first).\n" + "7. Do NOT suggest improvements beyond the plan scope.\n" + "8. End with VERDICT: PASS (all requirements met, no over-engineering) " + "or VERDICT: FAIL (issues found)." +) + +_SENIOR_SYSTEM_PROMPT = ( + "You are a senior technical reviewer coordinating a review-fix-verification loop.\n" + "Rules:\n" + "1. Explore the project directory to understand the full codebase context.\n" + "2. In aggregation mode, deduplicate overlaps, resolve disagreements, and keep only " + "evidence-backed issues. Categorize dismissed findings as [False positive] or [Already fixed].\n" + "3. In verification mode, judge the current implementation directly against ONLY the " + "plan and checklist.\n" + "4. Be skeptical of false positives, but do not lower the bar on real requirement " + "gaps.\n" + "5. When issues remain, produce a concise prioritized action list the coder can act on.\n" + "6. Do NOT invent new requirements beyond the plan and checklist.\n" + "7. End with VERDICT: PASS or VERDICT: FAIL." +) + +BUILTIN_AGENTS: dict[str, AgentConfig] = { + "claude-coder": AgentConfig( + name="claude-coder", + command="claude", + args=["-p", "--model", "opus", "--permission-mode", "auto"], + system_prompt=_CODER_SYSTEM_PROMPT, + reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["coder"], + ), + "claude-reviewer": AgentConfig( + name="claude-reviewer", + command="claude", + args=["-p", "--model", "opus", "--permission-mode", "auto"], + system_prompt=_REVIEWER_SYSTEM_PROMPT, + reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["reviewer"], + ), + "claude-senior": AgentConfig( + name="claude-senior", + command="claude", + args=["-p", "--model", "opus", "--permission-mode", "auto"], + system_prompt=_SENIOR_SYSTEM_PROMPT, + reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["senior"], + ), + "codex-coder": AgentConfig( + name="codex-coder", + command="codex", + args=list(_CODEX_ARGS), + system_prompt=_CODER_SYSTEM_PROMPT, + reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["coder"], + ), + "codex-reviewer": AgentConfig( + name="codex-reviewer", + command="codex", + args=list(_CODEX_ARGS), + system_prompt=_REVIEWER_SYSTEM_PROMPT, + reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["reviewer"], + ), + "codex-senior": AgentConfig( + name="codex-senior", + command="codex", + args=list(_CODEX_ARGS), + system_prompt=_SENIOR_SYSTEM_PROMPT, + reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["senior"], + ), +} + +# Shorthand aliases: "claude" → "claude-coder"/"claude-reviewer", "codex" → same +_AGENT_ALIASES: dict[str, str] = { + "claude": "claude", + "codex": "codex", +} + + +def resolve_agent_shorthand(name: str, role: str) -> str: + """Resolve shorthand agent name to full builtin name. + + Examples: + resolve_agent_shorthand("claude", "coder") → "claude-coder" + resolve_agent_shorthand("codex", "reviewer") → "codex-reviewer" + resolve_agent_shorthand("claude-coder", "coder") → "claude-coder" (unchanged) + """ + if name in _AGENT_ALIASES: + return f"{_AGENT_ALIASES[name]}-{role}" + return name + + +# --------------------------------------------------------------------------- +# Role inference (backward compatibility) +# --------------------------------------------------------------------------- + +_CODER_PATTERNS = ("gen", "coder", "implement", "develop", "write") +_SENIOR_PATTERNS = ("senior", "lead", "principal", "aggregate", "adjudicat", "synth") +_REVIEWER_PATTERNS = ("review", "audit", "check", "verify", "inspect") + + +def _infer_roles(agent_names: list[str]) -> tuple[list[str], list[str], list[str]]: + """Infer coder/reviewer/senior roles from agent names. + + Heuristic: + - Names containing 'gen', 'coder', etc. → coder + - Names containing 'senior', 'lead', etc. → senior + - Names containing 'review', 'audit', etc. → reviewer + - If no matches: first agent → coder, rest → reviewers + """ + coders: list[str] = [] + reviewers: list[str] = [] + seniors: list[str] = [] + unclassified: list[str] = [] + + for name in agent_names: + lower = name.lower() + if any(p in lower for p in _CODER_PATTERNS): + coders.append(name) + elif any(p in lower for p in _SENIOR_PATTERNS): + seniors.append(name) + elif any(p in lower for p in _REVIEWER_PATTERNS): + reviewers.append(name) + else: + unclassified.append(name) + + # Fallback: if no classification worked, use positional convention + if not coders and not reviewers: + if len(agent_names) >= 2: + coders = [agent_names[0]] + reviewers = list(agent_names[1:]) + elif agent_names: + # Single agent: treat as reviewer (for review-only) + reviewers = list(agent_names) + elif not coders and unclassified: + coders = [unclassified.pop(0)] + elif not reviewers and unclassified: + reviewers = list(unclassified) + unclassified = [] + + # Any remaining unclassified go to reviewers + reviewers.extend(unclassified) + + return coders, reviewers, seniors + + +def _resolve_agents( + user_agents: dict[str, AgentConfig], + coders: list[str], + reviewers: list[str], + seniors: list[str], +) -> dict[str, AgentConfig]: + """Ensure all referenced agents exist by merging built-in definitions. + + If a coder or reviewer name references an agent not in user_agents + but present in BUILTIN_AGENTS, the built-in definition is added. + """ + all_referenced = set(coders) | set(reviewers) | set(seniors) + result = dict(user_agents) + + for name in all_referenced: + if name not in result and name in BUILTIN_AGENTS: + result[name] = BUILTIN_AGENTS[name] + + return result + + +def _default_seniors_for_preset( + pipeline_raw: Any, + reviewers: list[str], + agents: dict[str, AgentConfig], +) -> list[str]: + """Infer a default senior agent for presets that benefit from adjudication.""" + if not ( + isinstance(pipeline_raw, str) + and pipeline_raw == "preset:review-fix" + and reviewers + ): + return [] + + first_reviewer = reviewers[0] + if first_reviewer.startswith("codex-"): + return ["codex-senior"] + if first_reviewer.startswith("claude-"): + return ["claude-senior"] + + reviewer_agent = agents.get(first_reviewer) + if reviewer_agent is None: + return [] + + command = reviewer_agent.command.lower() + if "codex" in command: + return ["codex-senior"] + if "claude" in command: + return ["claude-senior"] + return [] + + +def normalize_reasoning_effort(effort: str) -> str: + """Normalize user-facing reasoning effort aliases.""" + normalized = REASONING_EFFORT_ALIASES.get(effort, effort) + if normalized not in REASONING_EFFORT_CHOICES: + raise ValueError( + f"Unsupported reasoning effort '{effort}'. " + f"Use one of: {REASONING_EFFORT_CHOICES}" + ) + return normalized + + +def apply_reasoning_effort_settings( + config: PipelineConfig, + *, + reasoning_effort: str | None = None, + coder_effort: str | None = None, + reviewer_effort: str | None = None, + senior_effort: str | None = None, +) -> None: + """Apply default and override reasoning effort settings by role.""" + shared_effort = normalize_reasoning_effort(reasoning_effort) if reasoning_effort else None + role_efforts = { + "coder": normalize_reasoning_effort(coder_effort) if coder_effort else shared_effort, + "reviewer": normalize_reasoning_effort(reviewer_effort) if reviewer_effort else shared_effort, + "senior": normalize_reasoning_effort(senior_effort) if senior_effort else shared_effort, + } + + _apply_role_effort(config.agents, config.coders, role_efforts["coder"], "coder") + _apply_role_effort(config.agents, config.reviewers, role_efforts["reviewer"], "reviewer") + _apply_role_effort(config.agents, config.seniors, role_efforts["senior"], "senior") + + +def _apply_role_effort( + agents: dict[str, AgentConfig], + agent_names: list[str], + override_effort: str | None, + role: str, +) -> None: + """Set reasoning effort on agents for a specific role.""" + for agent_name in agent_names: + agent = agents.get(agent_name) + if agent is None: + continue + if override_effort is not None: + agent.reasoning_effort = override_effort + elif agent.reasoning_effort is None: + agent.reasoning_effort = DEFAULT_ROLE_REASONING_EFFORTS[role] + + +# --------------------------------------------------------------------------- +# Default config (no YAML) +# --------------------------------------------------------------------------- + +def default_config() -> PipelineConfig: + """Return a PipelineConfig with sensible defaults (no YAML needed).""" + agents = dict(BUILTIN_AGENTS) + coders = ["claude-coder"] + reviewers = ["claude-reviewer"] + seniors: list[str] = [] + pipeline = PIPELINE_PRESETS["simple"](coders, reviewers, seniors) + return PipelineConfig( + output_dir=Path("output"), + max_iterations=3, + language="ko", + inputs={}, + agents=agents, + coders=coders, + reviewers=reviewers, + seniors=seniors, + pipeline=pipeline, + ) + + +# --------------------------------------------------------------------------- +# YAML loading +# --------------------------------------------------------------------------- + +def load_config(path: Path) -> PipelineConfig: + """Load and validate a YAML config file, returning PipelineConfig.""" + path = path.resolve() + with open(path, encoding="utf-8") as f: + raw = yaml.safe_load(f) + + if not isinstance(raw, dict): + raise ValueError(f"Config file must be a YAML mapping, got {type(raw).__name__}") + + config = _parse_raw(raw, path) + + errors = validate_config(config) + if errors: + raise ValueError("Config validation failed:\n " + "\n ".join(errors)) + + return config + + +def _parse_raw(raw: dict[str, Any], config_path: Path) -> PipelineConfig: + """Parse raw YAML dict into PipelineConfig.""" + # --- agents --- + agents: dict[str, AgentConfig] = {} + for name, agent_data in raw.get("agents", {}).items(): + agents[name] = AgentConfig( + name=name, + command=agent_data.get("command", "claude"), + args=agent_data.get("args", ["-p"]), + system_prompt=agent_data.get("system_prompt"), + reasoning_effort=agent_data.get("reasoning_effort"), + stdin_mode=agent_data.get("stdin_mode", False), + ) + + # --- roles: explicit or inferred --- + pipeline_raw = raw.get("pipeline", "preset:simple") + coders_raw = raw.get("coders") + reviewers_raw = raw.get("reviewers") + seniors_raw = raw.get("seniors") + + if coders_raw is not None or reviewers_raw is not None or seniors_raw is not None: + # Explicit role assignment from YAML + coders: list[str] = coders_raw if coders_raw is not None else [] + reviewers: list[str] = reviewers_raw if reviewers_raw is not None else [] + seniors: list[str] = seniors_raw if seniors_raw is not None else [] + else: + # Backward compat: infer from agent names + coders, reviewers, seniors = _infer_roles(list(agents.keys())) + + if not seniors: + seniors = _default_seniors_for_preset(pipeline_raw, reviewers, agents) + + # Auto-merge built-in agents for any referenced names not yet defined + agents = _resolve_agents(agents, coders, reviewers, seniors) + config_stub = PipelineConfig( + agents=agents, + coders=coders, + reviewers=reviewers, + seniors=seniors, + ) + apply_reasoning_effort_settings(config_stub) + + # --- inputs (resolve relative to config file location) --- + config_dir = config_path.parent + inputs: dict[str, Path | str] = {} + for key, val in raw.get("inputs", {}).items(): + p = Path(val) + if not p.is_absolute(): + p = config_dir / p + inputs[key] = p + + # --- pipeline (preset or custom) --- + steps, phases = _resolve_pipeline(pipeline_raw, coders, reviewers, seniors) + + # Detect preset name for output directory naming + preset_name = "custom" + if isinstance(pipeline_raw, str) and pipeline_raw.startswith("preset:"): + preset_name = pipeline_raw.split(":", 1)[1] + + return PipelineConfig( + output_dir=Path(raw.get("output_dir", "output")), + max_iterations=int(raw.get("max_iterations", 3)), + min_iterations=int(raw.get("min_iterations", 1)), + verbose=bool(raw.get("verbose", False)), + language=raw.get("language", "en"), + inputs=inputs, + agents=agents, + coders=coders, + reviewers=reviewers, + seniors=seniors, + pipeline=steps, + phases=phases, + preset_name=preset_name, + _config_path=config_path, + _config_mtime=config_path.stat().st_mtime, + ) + + +def try_reload_config(config: PipelineConfig) -> PipelineConfig: + """Reload config if the file has been modified on disk. + + Returns the new config if reloaded, or the same config if unchanged. + Validation errors during reload are logged but do not crash the pipeline. + """ + if config._config_path is None or config._config_mtime is None: + return config + + try: + current_mtime = config._config_path.stat().st_mtime + except OSError: + return config + + if current_mtime <= config._config_mtime: + return config + + logger.info("Config file changed, reloading: %s", config._config_path.name) + try: + new_config = load_config(config._config_path) + logger.info("Config reloaded successfully") + return new_config + except (ValueError, FileNotFoundError, yaml.YAMLError) as e: + logger.warning("Config reload failed, keeping previous config: %s", e) + return config + + +def _resolve_pipeline( + pipeline_raw: Any, + coders: list[str], + reviewers: list[str], + seniors: list[str], +) -> tuple[list[StepConfig], list[PhaseConfig]]: + """Resolve pipeline from preset string or explicit step list. + + Returns (steps, phases) tuple. Only one will be non-empty. + - Simple/cross-review/review-only → steps populated, phases empty. + - Phased presets (review-fix) → steps empty, phases populated. + """ + # Preset: "preset:simple" or "preset:review-fix" + if isinstance(pipeline_raw, str) and pipeline_raw.startswith("preset:"): + preset_name = pipeline_raw.split(":", 1)[1] + if preset_name in PIPELINE_PRESETS: + return PIPELINE_PRESETS[preset_name](coders, reviewers, seniors), [] + if preset_name in PHASED_PRESETS: + return [], PHASED_PRESETS[preset_name](coders, reviewers, seniors) + all_presets = list(PIPELINE_PRESETS.keys()) + list(PHASED_PRESETS.keys()) + raise ValueError( + f"Unknown pipeline preset '{preset_name}'. " + f"Available: {all_presets}" + ) + + # Explicit step list + if isinstance(pipeline_raw, list): + steps = [] + for step_data in pipeline_raw: + steps.append(StepConfig( + name=step_data["name"], + agent=step_data["agent"], + role=step_data.get("role", "generate"), + prompt_template=step_data.get("prompt_template", f"default:{step_data.get('role', 'generate')}"), + output_key=step_data["output_key"], + verdict=step_data.get("verdict", False), + verdict_pattern=step_data.get("verdict_pattern", r"VERDICT:\s*PASS"), + context_override=step_data.get("context_override", {}), + )) + return steps, [] + + raise ValueError( + f"'pipeline' must be a preset string (e.g. 'preset:simple') " + f"or a list of step definitions, got {type(pipeline_raw).__name__}" + ) + + +def validate_config(config: PipelineConfig) -> list[str]: + """Return list of validation error strings (empty = valid).""" + errors: list[str] = [] + + if config.phases: + # --- Phased pipeline validation --- + for phase in config.phases: + if not phase.steps: + errors.append(f"Phase '{phase.name}' has no steps") + for step in phase.steps: + if step.agent not in config.agents: + errors.append( + f"Phase '{phase.name}' step '{step.name}' references " + f"undefined agent '{step.agent}'. " + f"Defined agents: {list(config.agents.keys())}" + ) + _validate_unique_step_fields( + phase.steps, + errors, + scope=f"Phase '{phase.name}'", + ) + if not any(s.verdict for s in phase.steps): + errors.append( + f"Phase '{phase.name}' must have at least one step with verdict: true" + ) + # Validate verdict patterns + for step in phase.steps: + if step.verdict: + try: + re.compile(step.verdict_pattern) + except re.error as e: + errors.append( + f"Phase '{phase.name}' step '{step.name}' " + f"has invalid verdict_pattern: {e}" + ) + else: + # --- Simple pipeline validation --- + if not config.pipeline: + errors.append("Pipeline must have at least one step") + + for step in config.pipeline: + if step.agent not in config.agents: + errors.append( + f"Step '{step.name}' references undefined agent '{step.agent}'. " + f"Defined agents: {list(config.agents.keys())}" + ) + + _validate_unique_step_fields( + config.pipeline, + errors, + scope="Pipeline", + ) + + if not any(s.verdict for s in config.pipeline): + errors.append("Pipeline must have at least one step with verdict: true") + + for step in config.pipeline: + if step.verdict: + try: + re.compile(step.verdict_pattern) + except re.error as e: + errors.append( + f"Step '{step.name}' has invalid verdict_pattern: {e}" + ) + + # --- Common validation --- + for key, val in config.inputs.items(): + if isinstance(val, Path) and not val.exists(): + errors.append(f"Input file '{key}' not found: {val}") + + if config.language not in ("en", "ko"): + errors.append(f"Unsupported language '{config.language}'. Use 'en' or 'ko'.") + + return errors + + +def _validate_unique_step_fields( + steps: list[StepConfig], + errors: list[str], + *, + scope: str, +) -> None: + """Ensure step names and output keys are unique within a step collection.""" + seen_names: set[str] = set() + seen_output_keys: set[str] = set() + + for step in steps: + if step.name in seen_names: + errors.append(f"{scope} has duplicate step name '{step.name}'") + seen_names.add(step.name) + + if step.output_key in seen_output_keys: + errors.append(f"{scope} has duplicate output_key '{step.output_key}'") + seen_output_keys.add(step.output_key) + + +def apply_input_overrides( + config: PipelineConfig, overrides: dict[str, str] +) -> None: + """Apply CLI --input overrides to the config.""" + for key, path_str in overrides.items(): + config.inputs[key] = Path(path_str) diff --git a/cross_eval/models.py b/cross_eval/models.py new file mode 100644 index 0000000..f5d10a9 --- /dev/null +++ b/cross_eval/models.py @@ -0,0 +1,118 @@ +"""Data models for cross-eval pipeline.""" +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + + +@dataclass +class AgentConfig: + """Definition of a single agent.""" + + name: str + command: str + args: list[str] = field(default_factory=list) + system_prompt: Optional[str] = None + reasoning_effort: Optional[str] = None + stdin_mode: bool = False + + +@dataclass +class StepConfig: + """One step in the pipeline.""" + + name: str + agent: str # reference to agents key + role: str # "generate" or "review" + prompt_template: str # "default:" or file path + output_key: str + verdict: bool = False + verdict_pattern: str = r"VERDICT:\s*PASS" + context_override: dict[str, str] = field(default_factory=dict) + parallel: bool = False # Can run concurrently with adjacent parallel steps + + +@dataclass +class PhaseConfig: + """One phase in a multi-phase pipeline (e.g. review-fix).""" + + name: str + steps: list[StepConfig] = field(default_factory=list) + max_iterations: int = 10 + consecutive_pass: int = 1 # stop after N consecutive PASSes + + +@dataclass +class PipelineConfig: + """Full cross-eval configuration.""" + + output_dir: Path = field(default_factory=lambda: Path("output")) + max_iterations: int = 3 + min_iterations: int = 1 + verbose: bool = False + language: str = "en" # "en" or "ko" + inputs: dict[str, Path | str] = field(default_factory=dict) + agents: dict[str, AgentConfig] = field(default_factory=dict) + coders: list[str] = field(default_factory=list) + reviewers: list[str] = field(default_factory=list) + seniors: list[str] = field(default_factory=list) + pipeline: list[StepConfig] = field(default_factory=list) + phases: list[PhaseConfig] = field(default_factory=list) + preset_name: str = "custom" + _config_path: Optional[Path] = field(default=None, repr=False) + _config_mtime: Optional[float] = field(default=None, repr=False) + + +@dataclass +class AgentResult: + """Result from an agent invocation.""" + + output: str + exit_code: int + agent_name: str + step_name: str + duration_seconds: float + + +@dataclass +class ReviewMetrics: + """Parsed metrics from a single review output.""" + + # Severity counts + critical: int = 0 + major: int = 0 + minor: int = 0 + + # Category counts + over_engineering: int = 0 + omission: int = 0 + + # Assessment counts + confirmed: int = 0 + dismissed: int = 0 + + +@dataclass +class IterationResult: + """Results from a single iteration.""" + + iteration: int + step_results: dict[str, AgentResult] = field(default_factory=dict) + step_outputs: dict[str, str] = field(default_factory=dict) + verdict: Optional[str] = None + feedback: Optional[str] = None + phase_name: Optional[str] = None + repeated_aggregate_warning: Optional[str] = None + review_metrics: Optional[ReviewMetrics] = None + + +@dataclass +class PipelineResult: + """Results from the entire pipeline run.""" + + iterations: list[IterationResult] = field(default_factory=list) + final_verdict: str = "MAX_ITERATIONS_REACHED" + total_duration: float = 0.0 + run_dir: Optional[Path] = None + repeated_aggregate_warnings: list[str] = field(default_factory=list) diff --git a/cross_eval/pipeline.py b/cross_eval/pipeline.py new file mode 100644 index 0000000..28086a3 --- /dev/null +++ b/cross_eval/pipeline.py @@ -0,0 +1,700 @@ +"""Main pipeline execution engine.""" +from __future__ import annotations + +import logging +import os +import re +import subprocess +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime +from pathlib import Path + +from cross_eval.agent import invoke_agent +from cross_eval.config import try_reload_config +from cross_eval.models import ( + AgentResult, + IterationResult, + PipelineConfig, + PipelineResult, + StepConfig, +) +from cross_eval.prompts import render_template, resolve_template, set_language +from cross_eval.report import build_report + +logger = logging.getLogger(__name__) + + +def run_pipeline( + config: PipelineConfig, + cwd: Path | None = None, + dry_run: bool = False, + timeout: int | None = None, +) -> PipelineResult: + """Execute the full cross-eval pipeline.""" + # Create run directory: output/{preset}_{datetime}/ + run_dir = _make_run_dir(config) + + if config.phases: + return _run_phased_pipeline(config, run_dir, cwd, dry_run, timeout) + return _run_simple_pipeline(config, run_dir, cwd, dry_run, timeout) + + +def _make_run_dir(config: PipelineConfig) -> Path: + """Create timestamped run directory under output_dir.""" + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + run_dir = config.output_dir / f"{config.preset_name}_{ts}" + run_dir.mkdir(parents=True, exist_ok=True) + return run_dir + + +def _run_simple_pipeline( + config: PipelineConfig, + run_dir: Path, + cwd: Path | None = None, + dry_run: bool = False, + timeout: int | None = None, +) -> PipelineResult: + """Execute a simple (non-phased) pipeline.""" + if cwd is None: + cwd = Path(os.getcwd()) + + set_language(config.language) + input_contents = _load_inputs(config) + + feedback = "(no feedback — first iteration)" + iterations: list[IterationResult] = [] + start_time = time.monotonic() + final_verdict = "MAX_ITERATIONS_REACHED" + aggregate_history: dict[str, int] = {} + aggregate_warnings: list[str] = [] + + for i in range(1, config.max_iterations + 1): + config = try_reload_config(config) + set_language(config.language) + _refresh_inputs(config, input_contents) + + logger.info("=" * 50) + logger.info(" Iteration %d/%d", i, config.max_iterations) + logger.info("=" * 50) + + step_outputs, step_results, verdict = _run_steps( + config.pipeline, config, input_contents, feedback, + i, config.max_iterations, cwd, timeout, dry_run, + run_dir=run_dir, output_iter=i, + ) + + iter_result = IterationResult( + iteration=i, + step_results=step_results, + step_outputs=step_outputs, + verdict=verdict, + ) + warning = _detect_repeated_aggregate( + config.pipeline, step_outputs, aggregate_history, iteration=i, + ) + if warning: + iter_result.repeated_aggregate_warning = warning + aggregate_warnings.append(warning) + logger.warning(" %s", warning) + + iter_result.feedback = _collect_feedback(config.pipeline, step_outputs) + feedback = iter_result.feedback or feedback + iterations.append(iter_result) + + if verdict == "PASS": + final_verdict = "PASS" + if i >= config.min_iterations: + logger.info(" PASS at iteration %d (min=%d reached)!", i, config.min_iterations) + break + else: + logger.info( + " PASS at iteration %d, but min_iterations=%d — continuing", + i, config.min_iterations, + ) + + if dry_run: + logger.info(" (dry-run: stopping after iteration 1)") + break + + total_duration = time.monotonic() - start_time + + pipeline_result = PipelineResult( + iterations=iterations, + final_verdict=final_verdict, + total_duration=round(total_duration, 1), + run_dir=run_dir, + repeated_aggregate_warnings=aggregate_warnings, + ) + + if not dry_run: + _save_report(run_dir, config, pipeline_result) + + return pipeline_result + + +def _run_phased_pipeline( + config: PipelineConfig, + run_dir: Path, + cwd: Path | None = None, + dry_run: bool = False, + timeout: int | None = None, +) -> PipelineResult: + """Execute a multi-phase pipeline (e.g. review-fix).""" + if cwd is None: + cwd = Path(os.getcwd()) + + set_language(config.language) + input_contents = _load_inputs(config) + + iterations: list[IterationResult] = [] + feedback = "(no feedback — first iteration)" + start_time = time.monotonic() + final_verdict = "MAX_ITERATIONS_REACHED" + global_iter = 0 + aggregate_history_by_phase: dict[str, dict[str, int]] = {} + aggregate_warnings: list[str] = [] + + for phase_idx, phase in enumerate(config.phases): + logger.info("=" * 60) + logger.info( + " Phase: %s (max_iter=%d, consecutive_pass=%d)", + phase.name, phase.max_iterations, phase.consecutive_pass, + ) + logger.info("=" * 60) + + consecutive_passes = 0 + phase_converged = False + + for pi in range(1, phase.max_iterations + 1): + global_iter += 1 + + config = try_reload_config(config) + set_language(config.language) + _refresh_inputs(config, input_contents) + + logger.info("-" * 50) + logger.info( + " [%s] Iteration %d/%d (global: v%d)", + phase.name, pi, phase.max_iterations, global_iter, + ) + logger.info("-" * 50) + + step_outputs, step_results, verdict = _run_steps( + phase.steps, config, input_contents, feedback, + pi, phase.max_iterations, cwd, timeout, dry_run, + run_dir=run_dir, output_iter=global_iter, phase_name=phase.name, + ) + + iter_result = IterationResult( + iteration=global_iter, + step_results=step_results, + step_outputs=step_outputs, + verdict=verdict, + phase_name=phase.name, + ) + phase_history = aggregate_history_by_phase.setdefault(phase.name, {}) + warning = _detect_repeated_aggregate( + phase.steps, step_outputs, phase_history, iteration=global_iter, + phase_name=phase.name, + ) + if warning: + iter_result.repeated_aggregate_warning = warning + aggregate_warnings.append(warning) + logger.warning(" %s", warning) + + iter_result.feedback = _collect_feedback(phase.steps, step_outputs) + feedback = iter_result.feedback or feedback + iterations.append(iter_result) + + if verdict == "PASS": + consecutive_passes += 1 + logger.info( + " [%s] PASS (%d/%d consecutive)", + phase.name, consecutive_passes, phase.consecutive_pass, + ) + if consecutive_passes >= phase.consecutive_pass: + logger.info( + " [%s] Converged! %d consecutive PASSes.", + phase.name, phase.consecutive_pass, + ) + phase_converged = True + break + else: + consecutive_passes = 0 + + if dry_run: + break + + if phase_converged: + logger.info(" Phase '%s' completed: CONVERGED", phase.name) + else: + logger.info( + " Phase '%s' completed: max iterations (%d) reached", + phase.name, phase.max_iterations, + ) + + if phase_idx == len(config.phases) - 1: + final_verdict = "PASS" if phase_converged else "MAX_ITERATIONS_REACHED" + + total_duration = time.monotonic() - start_time + + pipeline_result = PipelineResult( + iterations=iterations, + final_verdict=final_verdict, + total_duration=round(total_duration, 1), + run_dir=run_dir, + repeated_aggregate_warnings=aggregate_warnings, + ) + + if not dry_run: + _save_report(run_dir, config, pipeline_result) + + return pipeline_result + + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + +def _load_inputs(config: PipelineConfig) -> dict[str, str]: + """Load input file contents from config.""" + input_contents: dict[str, str] = {} + for key, val in config.inputs.items(): + if isinstance(val, str): + input_contents[key] = val + else: + input_contents[key] = val.read_text(encoding="utf-8") + return input_contents + + +def _refresh_inputs( + config: PipelineConfig, input_contents: dict[str, str], +) -> None: + """Re-read input files (they may have changed on disk).""" + for key, val in config.inputs.items(): + if isinstance(val, str): + input_contents[key] = val + elif isinstance(val, Path) and val.exists(): + input_contents[key] = val.read_text(encoding="utf-8") + + +# --------------------------------------------------------------------------- +# Parallel step grouping +# --------------------------------------------------------------------------- + +def _get_step_dependencies(step: StepConfig) -> set[str]: + """Extract output_key references from context_override values.""" + deps: set[str] = set() + for val in step.context_override.values(): + for match in re.finditer(r"\{(\w+)\}", val): + deps.add(match.group(1)) + return deps + + +def _group_parallel_steps(steps: list[StepConfig]) -> list[list[StepConfig]]: + """Group consecutive parallel steps into batches. + + Consecutive steps with parallel=True are grouped together, + but a new batch starts when a step depends on an output_key + from a step in the current batch (dependency breaking). + """ + batches: list[list[StepConfig]] = [] + current: list[StepConfig] = [] + current_output_keys: set[str] = set() + + for step in steps: + if not step.parallel: + if current: + batches.append(current) + current = [] + current_output_keys = set() + batches.append([step]) + continue + + # Check if this step depends on any output from the current batch + deps = _get_step_dependencies(step) + if deps & current_output_keys: + batches.append(current) + current = [] + current_output_keys = set() + + current.append(step) + current_output_keys.add(step.output_key) + + if current: + batches.append(current) + + return batches + + +# --------------------------------------------------------------------------- +# Step execution +# --------------------------------------------------------------------------- + +def _run_steps( + steps: list[StepConfig], + config: PipelineConfig, + input_contents: dict[str, str], + feedback: str, + iteration: int, + max_iterations: int, + cwd: Path, + timeout: int | None, + dry_run: bool, + *, + run_dir: Path, + output_iter: int, + phase_name: str | None = None, +) -> tuple[dict[str, str], dict[str, AgentResult], str | None]: + """Execute all steps in one iteration, parallelizing where possible.""" + step_outputs: dict[str, str] = {} + step_results: dict[str, AgentResult] = {} + verdict: str | None = None + + batches = _group_parallel_steps(steps) + + for batch in batches: + if len(batch) == 1: + # Single step — run directly + step = batch[0] + _execute_step( + step, config, input_contents, feedback, + iteration, max_iterations, cwd, timeout, dry_run, + step_outputs, step_results, + run_dir=run_dir, output_iter=output_iter, phase_name=phase_name, + ) + else: + # Parallel batch — run with ThreadPoolExecutor + _execute_parallel_batch( + batch, config, input_contents, feedback, + iteration, max_iterations, cwd, timeout, dry_run, + step_outputs, step_results, + run_dir=run_dir, output_iter=output_iter, phase_name=phase_name, + ) + + # Extract verdict from all verdict steps (ALL must PASS) + for step in steps: + if step.verdict: + output = step_outputs.get(step.output_key, "") + step_verdict = _extract_verdict(output, step.verdict_pattern) + logger.info(" [%s] verdict: %s", step.name, step_verdict) + if verdict is None: + verdict = step_verdict + elif step_verdict == "FAIL": + verdict = "FAIL" + + return step_outputs, step_results, verdict + + +def _execute_step( + step: StepConfig, + config: PipelineConfig, + input_contents: dict[str, str], + feedback: str, + iteration: int, + max_iterations: int, + cwd: Path, + timeout: int | None, + dry_run: bool, + step_outputs: dict[str, str], + step_results: dict[str, AgentResult], + *, + run_dir: Path, + output_iter: int, + phase_name: str | None = None, + quiet: bool = False, +) -> None: + """Execute a single step, updating step_outputs and step_results in place.""" + if not quiet: + logger.info(" [%s] agent='%s' role='%s'", step.name, step.agent, step.role) + + # 1. Resolve template + template = resolve_template(step.prompt_template) + + # 2. Build context + context = _build_context( + input_contents, step_outputs, feedback, iteration, max_iterations, + ) + + # 3. Apply context overrides + if step.context_override: + context = _apply_context_override(context, step.context_override) + + # 4. Render prompt + prompt = render_template(template, context) + + # 5. Dry run: print and skip + if dry_run: + phase_label = f" phase={phase_name}" if phase_name else "" + print(f"\n--- Step: {step.name} (agent={step.agent}{phase_label}) ---") + print(prompt) + print(f"--- end {step.name} ---\n") + step_outputs[step.output_key] = f"(dry-run: no output for {step.output_key})" + return + + # 6. Invoke agent + agent_config = config.agents[step.agent] + try: + result = invoke_agent( + agent_config, prompt, step.name, + cwd=cwd, timeout=timeout, quiet=quiet, + ) + except subprocess.TimeoutExpired as e: + stdout = (e.stdout or b"") if isinstance(e.stdout, bytes) else (e.stdout or "") + stderr = (e.stderr or b"") if isinstance(e.stderr, bytes) else (e.stderr or "") + if isinstance(stdout, bytes): + stdout = stdout.decode("utf-8", errors="replace") + if isinstance(stderr, bytes): + stderr = stderr.decode("utf-8", errors="replace") + phase_info = f"- **Phase**: {phase_name}\n" if phase_name else "" + error_msg = ( + f"# Agent Timeout\n\n" + f"{phase_info}" + f"- **Step**: {step.name}\n" + f"- **Agent**: {step.agent}\n" + f"- **Timeout**: {timeout}s\n\n" + f"Partial stdout ({len(stdout)} chars):\n" + f"```\n{stdout[:2000] or '(none)'}\n```\n\n" + f"Stderr:\n```\n{stderr[:2000] or '(none)'}\n```\n" + ) + _save_step_output(run_dir, output_iter, f"{step.name}_error", error_msg) + logger.error(" [%s] TIMEOUT after %ss — saved to output", step.name, timeout) + raise RuntimeError( + f"Agent '{step.agent}' timed out after {timeout}s at step '{step.name}'. " + f"Error saved to {run_dir}/v{output_iter}/{step.name}_error.md. " + f"Try --timeout 0 (unlimited)" + ) + except RuntimeError as e: + phase_info = f"- **Phase**: {phase_name}\n" if phase_name else "" + error_msg = ( + f"# Agent Error\n\n{phase_info}" + f"- **Step**: {step.name}\n- **Agent**: {step.agent}\n\n```\n{e}\n```\n" + ) + _save_step_output(run_dir, output_iter, f"{step.name}_error", error_msg) + logger.error(" [%s] FAILED — saved to output", step.name) + raise + + # 7. Store output + step_outputs[step.output_key] = result.output + step_results[step.output_key] = result + + if not quiet: + logger.info( + " [%s] completed (%.1fs, %d chars)", + step.name, result.duration_seconds, len(result.output), + ) + + # 8. Save to disk + _save_step_output(run_dir, output_iter, step.name, result.output) + + +def _execute_parallel_batch( + batch: list[StepConfig], + config: PipelineConfig, + input_contents: dict[str, str], + feedback: str, + iteration: int, + max_iterations: int, + cwd: Path, + timeout: int | None, + dry_run: bool, + step_outputs: dict[str, str], + step_results: dict[str, AgentResult], + *, + run_dir: Path, + output_iter: int, + phase_name: str | None = None, +) -> None: + """Execute multiple steps in parallel using threads.""" + agent_names = ", ".join(s.agent for s in batch) + logger.info(" [parallel] %d agents: %s", len(batch), agent_names) + + if dry_run: + for step in batch: + _execute_step( + step, config, input_contents, feedback, + iteration, max_iterations, cwd, timeout, dry_run, + step_outputs, step_results, + run_dir=run_dir, output_iter=output_iter, phase_name=phase_name, + ) + return + + # Snapshot context before parallel execution (all steps see same state) + context_snapshot = dict(input_contents) + context_snapshot.update(step_outputs) + + # Collect results from parallel threads + local_outputs: dict[str, str] = {} + local_results: dict[str, AgentResult] = {} + errors: list[Exception] = [] + + # Show a single spinner for the batch + from cross_eval.agent import _Spinner + spinner = _Spinner( + f"[parallel] {len(batch)} agents running ({agent_names})..." + ) + spinner.start() + batch_start = time.monotonic() + + def _run_one(step: StepConfig) -> tuple[str, str, AgentResult]: + """Run one step, return (output_key, output, result).""" + template = resolve_template(step.prompt_template) + context = _build_context( + context_snapshot, {}, feedback, iteration, max_iterations, + ) + if step.context_override: + context = _apply_context_override(context, step.context_override) + prompt = render_template(template, context) + + agent_config = config.agents[step.agent] + result = invoke_agent( + agent_config, prompt, step.name, + cwd=cwd, timeout=timeout, quiet=True, + ) + return step.output_key, result.output, result + + with ThreadPoolExecutor(max_workers=len(batch)) as executor: + futures = {executor.submit(_run_one, step): step for step in batch} + for future in as_completed(futures): + step = futures[future] + try: + output_key, output, result = future.result() + local_results[output_key] = result + local_outputs[output_key] = output + except Exception as e: + errors.append(e) + + batch_elapsed = round(time.monotonic() - batch_start, 1) + + if errors: + spinner.stop(f"[parallel] FAILED ({batch_elapsed}s)") + raise errors[0] + + spinner.stop(f"[parallel] {len(batch)} agents done ({batch_elapsed}s)") + + # Merge results + for step in batch: + key = step.output_key + step_outputs[key] = local_outputs[key] + step_results[key] = local_results[key] + r = local_results[key] + logger.info( + " [%s] completed (%.1fs, %d chars)", + step.name, r.duration_seconds, len(r.output), + ) + _save_step_output(run_dir, output_iter, step.name, r.output) + + +# --------------------------------------------------------------------------- +# Context and template helpers +# --------------------------------------------------------------------------- + +def _build_context( + input_contents: dict[str, str], + step_outputs: dict[str, str], + feedback: str, + iteration: int, + max_iterations: int, +) -> dict[str, str]: + """Build the template context dict.""" + context: dict[str, str] = {} + context.update(input_contents) + context.update(step_outputs) + context["feedback"] = feedback + context["iteration"] = str(iteration) + context["max_iterations"] = str(max_iterations) + return context + + +def _apply_context_override( + context: dict[str, str], + overrides: dict[str, str], +) -> dict[str, str]: + """Apply context_override mappings for cross-review scenarios.""" + result = dict(context) + for key, value_template in overrides.items(): + result[key] = render_template(value_template, context) + return result + + +def _collect_feedback( + steps: list[StepConfig], + step_outputs: dict[str, str], +) -> str: + """Collect feedback from all verdict steps. + + Single verdict step → raw output (backward compatible). + Multiple verdict steps → combined with agent headers for cross-referencing. + """ + verdict_steps = [s for s in steps if s.verdict] + if len(verdict_steps) == 1: + return step_outputs.get(verdict_steps[0].output_key, "") + parts: list[str] = [] + for s in verdict_steps: + output = step_outputs.get(s.output_key, "") + if output: + parts.append(f"## Review by {s.agent} ({s.name})\n{output}") + return "\n\n---\n\n".join(parts) + + +def _detect_repeated_aggregate( + steps: list[StepConfig], + step_outputs: dict[str, str], + history: dict[str, int], + *, + iteration: int, + phase_name: str | None = None, +) -> str | None: + """Detect repeated aggregate-review outputs across iterations.""" + for step in steps: + if step.prompt_template != "default:aggregate-review": + continue + output = step_outputs.get(step.output_key, "") + normalized = _normalize_aggregate_output(output) + if not normalized: + return None + if normalized in history: + prev_iter = history[normalized] + phase_prefix = f"[{phase_name}] " if phase_name else "" + return ( + f"{phase_prefix}Repeated aggregate_review detected at iteration {iteration} " + f"(same as iteration {prev_iter})." + ) + history[normalized] = iteration + return None + return None + + +def _normalize_aggregate_output(output: str) -> str: + """Normalize aggregate output for repeat detection.""" + return " ".join(output.lower().split()) + + +def _extract_verdict(output: str, pattern: str) -> str: + """Extract PASS or FAIL from output using regex pattern.""" + if re.search(pattern, output): + return "PASS" + return "FAIL" + + +def _save_step_output( + run_dir: Path, + iteration: int, + step_name: str, + content: str, +) -> Path: + """Save step output to run_dir/v{iteration}/{step_name}.md""" + path = run_dir / f"v{iteration}" / f"{step_name}.md" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content, encoding="utf-8") + return path + + +def _save_report(run_dir: Path, config: PipelineConfig, result: PipelineResult) -> None: + """Generate and save the final markdown report.""" + report = build_report(config, result) + report_path = run_dir / "final-report.md" + report_path.parent.mkdir(parents=True, exist_ok=True) + report_path.write_text(report, encoding="utf-8") + logger.info("Report saved: %s", report_path) diff --git a/cross_eval/prompts.py b/cross_eval/prompts.py new file mode 100644 index 0000000..7ebf091 --- /dev/null +++ b/cross_eval/prompts.py @@ -0,0 +1,845 @@ +"""Default prompt templates and pipeline presets.""" +from __future__ import annotations + +import collections +from pathlib import Path +from typing import Callable, Optional + +from cross_eval.models import PhaseConfig, StepConfig + + +# --------------------------------------------------------------------------- +# Default prompt templates +# --------------------------------------------------------------------------- + +GENERATE_TEMPLATE = """\ +You are tasked with implementing code based on a plan and checklist. + +## Plan +{plan} + +## Checklist +{checklist} + +## Reference Documents +{docs} + +## Previous Review Feedback +{feedback} + +## Iteration +This is iteration {iteration} of {max_iterations}. + +## Instructions +1. Explore the project directory to understand the existing codebase structure. +2. Implement ONLY what the plan specifies. Do NOT add extra features, \ +unnecessary abstractions, or premature optimizations. +3. Follow every item in the checklist. +4. If there is previous feedback, address ONLY the specific issues mentioned. +5. If previous feedback contains items marked as DISMISSED or false positive, \ +IGNORE those items — they have been verified as correct. +6. Output the complete implementation. +""" + +REVIEW_TEMPLATE = """\ +You are tasked with reviewing code against a plan and checklist. + +## Plan +{plan} + +## Checklist +{checklist} + +## Reference Documents +{docs} + +## Generated Code / Previous Step Output +{generated_code} + +## Previous Review Feedback +{feedback} + +## Review Instructions +Explore the project directory to understand the full codebase context, \ +then evaluate the code against ONLY the plan and checklist above. + +For each issue found, classify it with BOTH severity AND category: + +Severity levels: +- **Critical**: Breaks functionality, causes data loss, or introduces security vulnerabilities. +- **Major**: Requirement mismatch, significant logic errors, or missing core functionality. +- **Minor**: Coding convention violations, trivial omissions, or style issues. + +Categories: +- **Over-engineering**: Code adds features, abstractions, or complexity \ +NOT required by the plan. +- **Omission**: A requirement from the plan or checklist that is missing or \ +incomplete in the implementation. + +If previous review feedback is provided above, you MUST assess each item: +- **CONFIRMED**: The issue is still present in the current code. +- **DISMISSED (false positive)**: The flagged item is actually correct per \ +the plan requirements. Provide rationale. + +If you find issues outside the plan/checklist scope (e.g. pre-existing bugs, \ +security concerns, performance problems), report them separately under \ +"Out of Scope Issues". + +## Output Format + +### Previous Feedback Assessment +(Only include this section if previous feedback was provided.) +- CONFIRMED: [item description] — still an issue because [reason] +- DISMISSED (false positive): [item description] — actually correct because [reason] +(Write "N/A" if no previous feedback was provided.) + +### Issues Found +List issues ordered by severity (Critical first): +- [Critical][Over-engineering] Description (reference specific plan/checklist item) +- [Major][Omission] Description (reference specific plan/checklist item) +- [Minor][Omission] Description (reference specific plan/checklist item) + +### Out of Scope Issues +Issues found outside plan/checklist scope but worth noting: +- [Critical] Description of issue +- [Minor] Description of issue +(Write "None" if no out-of-scope issues found.) + +### Summary +- Critical: N, Major: N, Minor: N +- Over-engineering count: N +- Omission count: N +- CONFIRMED: N, DISMISSED: N +- Overall quality: [BRIEF ASSESSMENT] + +### Verdict +If all checklist items are satisfied and there is no over-engineering or \ +omission, output: VERDICT: PASS +Otherwise output: VERDICT: FAIL +""" + + +GENERATE_TEMPLATE_KO = """\ +당신은 기획서와 체크리스트를 기반으로 코드를 구현하는 개발자입니다. + +## 기획서 +{plan} + +## 체크리스트 +{checklist} + +## 참고 문서 +{docs} + +## 이전 리뷰 피드백 +{feedback} + +## 반복 정보 +현재 {max_iterations}회 중 {iteration}번째 반복입니다. + +## 지침 +1. 프로젝트 디렉토리를 탐색하여 기존 코드베이스 구조를 파악하세요. +2. 기획서에 명시된 것만 구현하세요. 추가 기능, 불필요한 추상화, 과도한 최적화를 하지 마세요. +3. 체크리스트의 모든 항목을 충족하세요. +4. 이전 리뷰 피드백이 있다면 해당 이슈만 해결하세요. +5. 이전 피드백에서 DISMISSED 또는 오탐으로 표시된 항목은 무시하세요 — 이미 올바른 것으로 검증되었습니다. +6. 완전한 구현을 출력하세요. +""" + +REVIEW_TEMPLATE_KO = """\ +당신은 기획서와 체크리스트 기준으로 코드를 검토하는 리뷰어입니다. + +## 기획서 +{plan} + +## 체크리스트 +{checklist} + +## 참고 문서 +{docs} + +## 검토 대상 코드 +{generated_code} + +## 이전 리뷰 피드백 +{feedback} + +## 검토 지침 +프로젝트 디렉토리를 직접 탐색하여 전체 코드베이스 맥락을 파악한 뒤, \ +위 기획서와 체크리스트 기준으로만 코드를 평가하세요. + +발견된 각 이슈에 심각도와 카테고리를 모두 부여하세요: + +심각도: +- **Critical**: 기능 장애, 데이터 손실, 보안 취약점을 유발하는 문제. +- **Major**: 요구사항 불일치, 중대한 로직 오류, 핵심 기능 누락. +- **Minor**: 코딩 컨벤션 위반, 사소한 누락, 스타일 문제. + +카테고리: +- **과최적화**: 기획서에 없는 기능, 추상화, 복잡성을 추가한 경우. +- **누락**: 기획서/체크리스트에 있지만 구현에서 빠지거나 불완전한 요구사항. + +이전 리뷰 피드백이 제공된 경우, 각 항목을 반드시 평가하세요: +- **CONFIRMED**: 현재 코드에 여전히 존재하는 이슈. +- **DISMISSED (오탐)**: 기획서 요구사항상 실제로 올바른 항목. 근거를 제시하세요. + +기획서/체크리스트 범위 밖에서 발견된 문제(기존 버그, 보안 이슈, 성능 문제 등)는 \ +"범위 밖 이슈" 섹션에 별도로 보고하세요. + +## 출력 형식 + +### 이전 피드백 평가 +(이전 피드백이 제공된 경우에만 포함하세요.) +- CONFIRMED: [항목 설명] — 여전히 이슈인 이유: [근거] +- DISMISSED (오탐): [항목 설명] — 실제로 올바른 이유: [근거] +(이전 피드백이 없으면 "해당 없음"이라고 작성하세요.) + +### 발견된 이슈 +심각도 순서(Critical 먼저)로 나열: +- [Critical][과최적화] 이슈 설명 (관련 기획서/체크리스트 항목 참조) +- [Major][누락] 이슈 설명 (관련 기획서/체크리스트 항목 참조) +- [Minor][누락] 이슈 설명 (관련 기획서/체크리스트 항목 참조) + +### 범위 밖 이슈 +기획서/체크리스트 범위 밖이지만 주목할 만한 이슈: +- [Critical] 이슈 설명 +- [Minor] 이슈 설명 +(범위 밖 이슈가 없으면 "없음"이라고 작성하세요.) + +### 요약 +- Critical: N, Major: N, Minor: N +- 과최적화 수: N +- 누락 수: N +- CONFIRMED: N, DISMISSED: N +- 전체 품질: [간략한 평가] + +### 판정 +모든 체크리스트 항목이 충족되고 과최적화/누락이 없으면: VERDICT: PASS +그렇지 않으면: VERDICT: FAIL +""" + + +REVIEW_ONLY_TEMPLATE = """\ +You are tasked with reviewing existing code against a plan and checklist. + +## Plan +{plan} + +## Checklist +{checklist} + +## Reference Documents +{docs} + +## Previous Review (iteration {iteration} of {max_iterations}) +{feedback} + +## Review Instructions +Explore the project directory thoroughly to understand the full codebase, \ +then evaluate the EXISTING code against ONLY the plan and checklist above. + +You are NOT generating or modifying code. You are auditing what already exists. + +If previous review results are provided above, you MUST: +1. Verify each previously reported issue — is it a real issue or a false positive? +2. Look for issues the previous review MISSED. +3. Do NOT simply repeat the previous review. Provide your own independent assessment. +4. Explicitly mark items as CONFIRMED (still an issue) or DISMISSED (false positive). + +For each issue found, classify it with BOTH severity AND category: + +Severity levels: +- **Critical**: Breaks functionality, causes data loss, or introduces security vulnerabilities. +- **Major**: Requirement mismatch, significant logic errors, or missing core functionality. +- **Minor**: Coding convention violations, trivial omissions, or style issues. + +Categories: +- **Over-engineering**: Code adds features, abstractions, or complexity \ +NOT required by the plan. +- **Omission**: A requirement from the plan or checklist that is missing or \ +incomplete in the implementation. + +If you find issues outside the plan/checklist scope (e.g. pre-existing bugs, \ +security concerns, performance problems), report them separately under \ +"Out of Scope Issues". + +## Output Format + +### Issues Found +List issues ordered by severity (Critical first): +- [Critical][Over-engineering] Description (reference specific plan/checklist item) +- [Major][Omission] Description (reference specific plan/checklist item) +- [Minor][Omission] Description (reference specific plan/checklist item) + +### Out of Scope Issues +Issues found outside plan/checklist scope but worth noting: +- [Critical] Description of issue +- [Minor] Description of issue +(Write "None" if no out-of-scope issues found.) + +### Summary +- Critical: N, Major: N, Minor: N +- Over-engineering count: N +- Omission count: N +- CONFIRMED: N, DISMISSED: N +- Overall quality: [BRIEF ASSESSMENT] + +### Verdict +If all checklist items are satisfied and there is no over-engineering or \ +omission, output: VERDICT: PASS +Otherwise output: VERDICT: FAIL +""" + +REVIEW_ONLY_TEMPLATE_KO = """\ +당신은 기존 코드를 기획서와 체크리스트 기준으로 감사하는 리뷰어입니다. + +## 기획서 +{plan} + +## 체크리스트 +{checklist} + +## 참고 문서 +{docs} + +## 이전 리뷰 결과 ({max_iterations}회 중 {iteration}번째) +{feedback} + +## 검토 지침 +프로젝트 디렉토리를 직접 탐색하여 전체 코드베이스를 파악한 뒤, \ +위 기획서와 체크리스트 기준으로 **기존 코드**를 평가하세요. + +코드를 생성하거나 수정하지 마세요. 이미 존재하는 코드를 감사하는 것이 목적입니다. + +이전 리뷰 결과가 제공된 경우 반드시: +1. 이전에 보고된 각 이슈를 검증하세요 — 진짜 이슈인지 오탐인지? +2. 이전 리뷰가 놓친 새로운 이슈를 찾으세요. +3. 이전 리뷰를 그대로 반복하지 마세요. 독립적인 평가를 제공하세요. +4. 각 항목에 CONFIRMED (여전히 이슈) 또는 DISMISSED (오탐) 태그를 명시하세요. + +발견된 각 이슈에 심각도와 카테고리를 모두 부여하세요: + +심각도: +- **Critical**: 기능 장애, 데이터 손실, 보안 취약점을 유발하는 문제. +- **Major**: 요구사항 불일치, 중대한 로직 오류, 핵심 기능 누락. +- **Minor**: 코딩 컨벤션 위반, 사소한 누락, 스타일 문제. + +카테고리: +- **과최적화**: 기획서에 없는 기능, 추상화, 복잡성을 추가한 경우. +- **누락**: 기획서/체크리스트에 있지만 구현에서 빠지거나 불완전한 요구사항. + +기획서/체크리스트 범위 밖에서 발견된 문제(기존 버그, 보안 이슈, 성능 문제 등)는 \ +"범위 밖 이슈" 섹션에 별도로 보고하세요. + +## 출력 형식 + +### 발견된 이슈 +심각도 순서(Critical 먼저)로 나열: +- [Critical][과최적화] 이슈 설명 (관련 기획서/체크리스트 항목 참조) +- [Major][누락] 이슈 설명 (관련 기획서/체크리스트 항목 참조) +- [Minor][누락] 이슈 설명 (관련 기획서/체크리스트 항목 참조) + +### 범위 밖 이슈 +기획서/체크리스트 범위 밖이지만 주목할 만한 이슈: +- [Critical] 이슈 설명 +- [Minor] 이슈 설명 +(범위 밖 이슈가 없으면 "없음"이라고 작성하세요.) + +### 요약 +- Critical: N, Major: N, Minor: N +- 과최적화 수: N +- 누락 수: N +- CONFIRMED: N, DISMISSED: N +- 전체 품질: [간략한 평가] + +### 판정 +모든 체크리스트 항목이 충족되고 과최적화/누락이 없으면: VERDICT: PASS +그렇지 않으면: VERDICT: FAIL +""" + +AGGREGATE_REVIEW_TEMPLATE = """\ +You are adjudicating multiple review results and turning them into an actionable decision. + +## Plan +{plan} + +## Checklist +{checklist} + +## Reference Documents +{docs} + +## Candidate Outputs +{candidate_outputs} + +## Reviewer Findings +{reviews_bundle} + +## Previous Verification Feedback +{feedback} + +## Instructions +Explore the project directory to confirm the current codebase state. Then: +1. Deduplicate overlapping issues across reviewers. +2. Resolve disagreements explicitly. +3. Keep only issues supported by the plan, checklist, code, or reviewer evidence. +4. When evidence is mixed, explain what was confirmed, what was dismissed, and what still needs follow-up. +5. Produce a prioritized action list for the coder. +6. If no confirmed issue remains, output VERDICT: PASS. Otherwise VERDICT: FAIL. + +## Output Format + +### Confirmed Issues +- [Critical][Omission] Description with rationale and source reviewer(s) + +### Dismissed Findings +- [False positive] Claim — reason why it is actually correct (raised by: Reviewer X) +- [Already fixed] Claim — already resolved in the current code (raised by: Reviewer X) +(Write "None" if nothing was dismissed.) + +### Action Items +1. Concrete fix the coder should make +2. Concrete fix the coder should make + +### Summary +- Confirmed issues: N +- Dismissed findings: N (false positive: N, already fixed: N) +- Overall quality: [BRIEF ASSESSMENT] + +### Verdict +VERDICT: PASS or VERDICT: FAIL +""" + +AGGREGATE_REVIEW_TEMPLATE_KO = """\ +당신은 여러 리뷰 결과를 판정하고 coder가 수정할 액션으로 정리하는 시니어 리뷰어입니다. + +## 기획서 +{plan} + +## 체크리스트 +{checklist} + +## 참고 문서 +{docs} + +## 후보 결과물 +{candidate_outputs} + +## 개별 리뷰 결과 +{reviews_bundle} + +## 이전 검증 피드백 +{feedback} + +## 지침 +프로젝트 디렉토리를 탐색하여 현재 코드베이스 상태를 확인한 뒤 다음을 수행하세요. +1. 리뷰어들 사이에 중복되는 이슈를 합치세요. +2. 의견 충돌은 명시적으로 정리하세요. +3. 기획서, 체크리스트, 코드, 리뷰 근거로 뒷받침되는 이슈만 남기세요. +4. 근거가 엇갈리면 무엇이 확정이고 무엇이 기각 또는 추가확인 대상인지 분명히 적으세요. +5. coder가 바로 수정할 수 있는 우선순위 액션 아이템을 만드세요. +6. 확정된 이슈가 없으면 VERDICT: PASS, 있으면 VERDICT: FAIL 을 출력하세요. + +## 출력 형식 + +### 확정 이슈 +- [Critical][누락] 확정된 이슈 설명, 근거, 출처 리뷰어 + +### 기각된 주장 +- [오탐] 주장 내용 — 실제로 올바른 이유 (제기: 리뷰어 X) +- [수정 완료] 주장 내용 — 현재 코드에서 이미 해결됨 (제기: 리뷰어 X) +(기각된 항목이 없으면 "없음"이라고 작성하세요.) + +### 액션 아이템 +1. coder가 수정해야 할 구체적인 작업 +2. coder가 수정해야 할 구체적인 작업 + +### 요약 +- 확정 이슈 수: N +- 기각된 주장 수: N (오탐: N, 수정 완료: N) +- 전체 품질: [간략한 평가] + +### 판정 +VERDICT: PASS 또는 VERDICT: FAIL +""" + + +DEFAULT_TEMPLATES: dict[str, dict[str, str]] = { + "en": { + "generate": GENERATE_TEMPLATE, + "review": REVIEW_TEMPLATE, + "review-only": REVIEW_ONLY_TEMPLATE, + "aggregate-review": AGGREGATE_REVIEW_TEMPLATE, + }, + "ko": { + "generate": GENERATE_TEMPLATE_KO, + "review": REVIEW_TEMPLATE_KO, + "review-only": REVIEW_ONLY_TEMPLATE_KO, + "aggregate-review": AGGREGATE_REVIEW_TEMPLATE_KO, + }, +} + +# Current language (set by pipeline before run) +_current_language: str = "en" + + +def set_language(lang: str) -> None: + """Set the current template language.""" + global _current_language + if lang not in DEFAULT_TEMPLATES: + raise ValueError(f"Unsupported language '{lang}'. Available: {list(DEFAULT_TEMPLATES.keys())}") + _current_language = lang + + +# --------------------------------------------------------------------------- +# Pipeline presets +# --------------------------------------------------------------------------- + +def _safe_key(name: str) -> str: + """Sanitize agent name for use as template variable / output_key. + + Replaces hyphens with underscores so names like 'claude-coder' + become 'claude_coder', which is valid in format_map(). + """ + return name.replace("-", "_") + + +def _unique_safe_keys(names: list[str]) -> list[str]: + """Return stable, collision-free keys for agent names. + + Duplicate names keep the first key unchanged and receive numeric suffixes + from the second occurrence onward. + """ + totals = collections.Counter(_safe_key(name) for name in names) + seen: collections.defaultdict[str, int] = collections.defaultdict(int) + keys: list[str] = [] + + for name in names: + base = _safe_key(name) + seen[base] += 1 + if totals[base] == 1 or seen[base] == 1: + keys.append(base) + else: + keys.append(f"{base}_{seen[base]}") + + return keys + + +def _build_named_bundle( + labels: list[str], + step_names: list[str], + output_keys: list[str], + title: str, +) -> str: + """Build a templated bundle from prior step outputs.""" + parts: list[str] = [] + for label, step_name, output_key in zip(labels, step_names, output_keys): + parts.append( + f"## {title}: {label} ({step_name})\n" + f"{{{output_key}}}" + ) + return "\n\n---\n\n".join(parts) + + +def _build_simple_preset( + coders: list[str], reviewers: list[str], seniors: list[str], +) -> list[StepConfig]: + """First coder generates, first reviewer reviews.""" + if not coders: + raise ValueError("'simple' preset requires at least 1 coder") + if not reviewers: + raise ValueError("'simple' preset requires at least 1 reviewer") + steps = [ + StepConfig( + name="generate", + agent=coders[0], + role="generate", + prompt_template="default:generate", + output_key="generated_code", + ), + StepConfig( + name="review", + agent=reviewers[0], + role="review", + prompt_template="default:review", + output_key="review_result", + verdict=not seniors, + ), + ] + if seniors: + steps.append( + StepConfig( + name="senior_review", + agent=seniors[0], + role="review", + prompt_template="default:aggregate-review", + output_key="senior_review_result", + verdict=True, + context_override={ + "candidate_outputs": "## Generated code\n{generated_code}", + "reviews_bundle": f"## Review: {reviewers[0]} (review)\n{{review_result}}", + }, + ), + ) + return steps + + +def _build_cross_review_preset( + coders: list[str], reviewers: list[str], seniors: list[str], +) -> list[StepConfig]: + """Both coders generate, then cross-review each other's output.""" + if len(coders) < 2: + raise ValueError("'cross-review' preset requires at least 2 coders") + a, b = coders[0], coders[1] + ak, bk = _unique_safe_keys([a, b]) + steps = [ + StepConfig( + name=f"generate_{ak}", + agent=a, + role="generate", + prompt_template="default:generate", + output_key=f"code_{ak}", + parallel=True, + ), + StepConfig( + name=f"generate_{bk}", + agent=b, + role="generate", + prompt_template="default:generate", + output_key=f"code_{bk}", + parallel=True, + ), + StepConfig( + name=f"review_by_{ak}", + agent=a, + role="review", + prompt_template="default:review", + output_key=f"review_by_{ak}", + context_override={"generated_code": f"{{code_{bk}}}"}, + parallel=True, + verdict=not seniors, + ), + StepConfig( + name=f"review_by_{bk}", + agent=b, + role="review", + prompt_template="default:review", + output_key=f"review_by_{bk}", + verdict=not seniors, + context_override={"generated_code": f"{{code_{ak}}}"}, + parallel=True, + ), + ] + if seniors: + steps.append( + StepConfig( + name="senior_review", + agent=seniors[0], + role="review", + prompt_template="default:aggregate-review", + output_key="senior_review_result", + verdict=True, + context_override={ + "candidate_outputs": _build_named_bundle( + [a, b], + [f"generate_{ak}", f"generate_{bk}"], + [f"code_{ak}", f"code_{bk}"], + "Candidate", + ), + "reviews_bundle": _build_named_bundle( + [a, b], + [f"review_by_{ak}", f"review_by_{bk}"], + [f"review_by_{ak}", f"review_by_{bk}"], + "Review", + ), + }, + ), + ) + return steps + + +def _build_review_only_preset( + coders: list[str], reviewers: list[str], seniors: list[str], +) -> list[StepConfig]: + """Review-only: all reviewers audit existing code independently.""" + if not reviewers: + raise ValueError("'review-only' preset requires at least 1 reviewer") + + if len(reviewers) == 1 and not seniors: + # Single reviewer — backward compatible + return [ + StepConfig( + name="review", + agent=reviewers[0], + role="review", + prompt_template="default:review-only", + output_key="review_result", + verdict=True, + ), + ] + + # Multiple reviewers — each produces a separate review with verdict (parallel) + steps: list[StepConfig] = [] + reviewer_keys = _unique_safe_keys(reviewers) + for reviewer, rk in zip(reviewers, reviewer_keys): + steps.append( + StepConfig( + name=f"review_{rk}", + agent=reviewer, + role="review", + prompt_template="default:review-only", + output_key=f"review_{rk}", + verdict=not seniors, + parallel=True, + ), + ) + if seniors: + step_names = [f"review_{rk}" for rk in reviewer_keys] + output_keys = [f"review_{rk}" for rk in reviewer_keys] + steps.append( + StepConfig( + name="senior_review", + agent=seniors[0], + role="review", + prompt_template="default:aggregate-review", + output_key="senior_review_result", + verdict=True, + context_override={ + "candidate_outputs": "Current repository working tree under review.", + "reviews_bundle": _build_named_bundle( + reviewers, step_names, output_keys, "Review", + ), + }, + ), + ) + return steps + + +def _build_review_fix_preset( + coders: list[str], reviewers: list[str], seniors: list[str], +) -> list[PhaseConfig]: + """Review in parallel, aggregate findings, fix, then verify in a loop.""" + if not coders: + raise ValueError("'review-fix' preset requires at least 1 coder") + if not reviewers: + raise ValueError("'review-fix' preset requires at least 1 reviewer") + + review_steps: list[StepConfig] = [] + reviewer_keys = _unique_safe_keys(reviewers) + for reviewer, rk in zip(reviewers, reviewer_keys): + review_steps.append( + StepConfig( + name=f"review_{rk}", + agent=reviewer, + role="review", + prompt_template="default:review-only", + output_key=f"review_{rk}", + verdict=False, + parallel=True, + ), + ) + + fix_coder = coders[0] + senior_agent = seniors[0] if seniors else reviewers[0] + review_step_names = [f"review_{rk}" for rk in reviewer_keys] + review_output_keys = [f"review_{rk}" for rk in reviewer_keys] + + return [ + PhaseConfig( + name="review_fix", + steps=review_steps + [ + StepConfig( + name="aggregate_review", + agent=senior_agent, + role="review", + prompt_template="default:aggregate-review", + output_key="aggregate_review", + context_override={ + "candidate_outputs": "Current repository working tree under review.", + "reviews_bundle": _build_named_bundle( + reviewers, review_step_names, review_output_keys, "Review", + ), + }, + ), + StepConfig( + name="generate", + agent=fix_coder, + role="generate", + prompt_template="default:generate", + output_key="generated_code", + context_override={"feedback": "{aggregate_review}"}, + ), + StepConfig( + name="verify", + agent=senior_agent, + role="review", + prompt_template="default:review", + output_key="verify_result", + verdict=True, + ), + ], + max_iterations=5, + consecutive_pass=1, + ), + ] + + +PIPELINE_PRESETS: dict[str, Callable] = { + "simple": _build_simple_preset, + "cross-review": _build_cross_review_preset, + "review-only": _build_review_only_preset, +} + +PHASED_PRESETS: dict[str, Callable] = { + "review-fix": _build_review_fix_preset, +} + +ALL_PRESET_NAMES: list[str] = list(PIPELINE_PRESETS.keys()) + list(PHASED_PRESETS.keys()) + + +# --------------------------------------------------------------------------- +# Template resolution and rendering +# --------------------------------------------------------------------------- + +def resolve_template(template_ref: str, templates_dir: Optional[Path] = None) -> str: + """Resolve a template reference to its content string. + + Formats: + - "default:generate" -> built-in GENERATE_TEMPLATE + - "default:review" -> built-in REVIEW_TEMPLATE + - "path/to/file.md" -> read file contents + """ + if template_ref.startswith("default:"): + key = template_ref.split(":", 1)[1] + lang_templates = DEFAULT_TEMPLATES.get(_current_language, DEFAULT_TEMPLATES["en"]) + if key not in lang_templates: + raise ValueError( + f"Unknown default template '{key}'. " + f"Available: {list(lang_templates.keys())}" + ) + return lang_templates[key] + + # Treat as file path + path = Path(template_ref) + if templates_dir and not path.is_absolute(): + path = templates_dir / path + if not path.exists(): + raise FileNotFoundError(f"Template file not found: {path}") + return path.read_text(encoding="utf-8") + + +class _DefaultDict(collections.defaultdict): + """defaultdict that uses the missing key name in the default value.""" + + def __missing__(self, key: str) -> str: + return f"(no {key} provided)" + + +def render_template(template: str, context: dict[str, str]) -> str: + """Render a template string with {variable} placeholders. + + Missing keys produce "(no provided)" instead of raising KeyError. + """ + safe_context = _DefaultDict(str) + safe_context.update(context) + return template.format_map(safe_context) diff --git a/cross_eval/report.py b/cross_eval/report.py new file mode 100644 index 0000000..fac30f2 --- /dev/null +++ b/cross_eval/report.py @@ -0,0 +1,497 @@ +"""Markdown report generation.""" +from __future__ import annotations + +import re +from itertools import groupby + +from cross_eval.models import ( + IterationResult, + PipelineConfig, + PipelineResult, + ReviewMetrics, + StepConfig, +) + + +# --------------------------------------------------------------------------- +# i18n strings +# --------------------------------------------------------------------------- + +_STRINGS: dict[str, dict[str, str]] = { + "en": { + "title": "Cross-Eval Report", + "summary": "Summary", + "prop": "Property", + "val": "Value", + "total_iter": "Total Iterations", + "final_verdict": "Final Verdict", + "duration": "Duration", + "max_iter": "Max Iterations", + "phases_label": "Phases", + "iteration": "Iteration", + "phase": "Phase", + "steps": "Steps", + "max_iterations": "Max iterations", + "consec_pass": "Consecutive PASS required", + "step": "Step", + "verdict": "Verdict", + "output_chars": "Output ({n} chars)", + "feedback_next": "Feedback for next iteration:", + "oos_title": "Out of Scope Issues", + "oos_desc": ( + "The following issues were found outside the plan/checklist scope " + "but are worth noting." + ), + "final_verdict_title": "Final Verdict", + "repeat_title": "Repeated Aggregate Findings", + "repeat_desc": "The following aggregate-review outputs repeated across iterations.", + "pass_msg": "All checklist items satisfied. No over-engineering or omissions detected.", + "fail_phased": "Pipeline phases ({phases}) completed without full convergence.", + "fail_simple": "Maximum iterations ({max_iter}) reached without passing all checks.", + "metrics_title": "Review Metrics", + "metrics_trend_title": "Metrics Trend", + "metrics_iter": "Iter", + "metrics_total_issues": "Total Issues", + "metrics_na": "N/A", + }, + "ko": { + "title": "교차 검증 리포트", + "summary": "요약", + "prop": "항목", + "val": "값", + "total_iter": "총 반복 횟수", + "final_verdict": "최종 판정", + "duration": "소요 시간", + "max_iter": "최대 반복", + "phases_label": "페이즈", + "iteration": "반복", + "phase": "페이즈", + "steps": "단계", + "max_iterations": "최대 반복", + "consec_pass": "연속 PASS 필요", + "step": "단계", + "verdict": "판정", + "output_chars": "출력 ({n}자)", + "feedback_next": "다음 반복을 위한 피드백:", + "oos_title": "범위 밖 이슈", + "oos_desc": ( + "아래는 기획서/체크리스트 범위 밖이지만 " + "리뷰 중 발견된 이슈입니다." + ), + "final_verdict_title": "최종 판정", + "repeat_title": "반복된 Aggregate 이슈", + "repeat_desc": "아래 aggregate-review 결과가 여러 반복에서 동일하게 다시 나타났습니다.", + "pass_msg": "모든 체크리스트 항목 충족. 과최적화/누락 없음.", + "fail_phased": "파이프라인 페이즈 ({phases}) 완료, 완전한 수렴에 도달하지 못함.", + "fail_simple": "최대 반복 횟수 ({max_iter})에 도달, 모든 검증을 통과하지 못함.", + "metrics_title": "리뷰 메트릭", + "metrics_trend_title": "메트릭 추이", + "metrics_iter": "반복", + "metrics_total_issues": "총 이슈", + "metrics_na": "해당 없음", + }, +} + + +def _t(config: PipelineConfig, key: str, **kwargs: str) -> str: + """Get translated string.""" + lang = getattr(config, "language", "en") + strings = _STRINGS.get(lang, _STRINGS["en"]) + s = strings.get(key, _STRINGS["en"].get(key, key)) + if kwargs: + s = s.format(**kwargs) + return s + + +# --------------------------------------------------------------------------- +# Review output parsing +# --------------------------------------------------------------------------- + +def parse_review_metrics(output: str) -> ReviewMetrics: + """Parse review output to extract severity, category, and assessment counts.""" + metrics = ReviewMetrics() + + # Severity: count tagged issue lines (e.g. "[Critical]", "[Major]", "[Minor]") + metrics.critical = len(re.findall(r"\[Critical\]", output, re.IGNORECASE)) + metrics.major = len(re.findall(r"\[Major\]", output, re.IGNORECASE)) + metrics.minor = len(re.findall(r"\[Minor\]", output, re.IGNORECASE)) + + # Categories (EN and KO variants) + metrics.over_engineering = len(re.findall( + r"\[Over-engineering\]|\[과최적화\]", output, re.IGNORECASE, + )) + metrics.omission = len(re.findall( + r"\[Omission\]|\[누락\]", output, re.IGNORECASE, + )) + + # Assessments — match "CONFIRMED: " but not summary "CONFIRMED: N" + metrics.confirmed = len(re.findall(r"\bCONFIRMED:\s+(?!\d)", output)) + metrics.dismissed = len(re.findall(r"\bDISMISSED\b(?:\s*\([^)]*\))?\s*:\s+(?!\d)", output)) + + return metrics + + +def _aggregate_metrics(a: ReviewMetrics, b: ReviewMetrics) -> ReviewMetrics: + """Combine metrics from two review steps.""" + return ReviewMetrics( + critical=a.critical + b.critical, + major=a.major + b.major, + minor=a.minor + b.minor, + over_engineering=a.over_engineering + b.over_engineering, + omission=a.omission + b.omission, + confirmed=a.confirmed + b.confirmed, + dismissed=a.dismissed + b.dismissed, + ) + + +def _extract_out_of_scope(output: str) -> str: + """Extract the 'Out of Scope Issues' section from review output. + + Looks for '### Out of Scope Issues' or '### 범위 밖 이슈' heading, + captures text until the next '###' heading or end of string. + Returns empty string if not found or contains only 'None'/'없음'. + """ + pattern = r"###\s*(?:Out of Scope Issues|범위 밖 이슈)\s*\n(.*?)(?=\n###|\Z)" + match = re.search(pattern, output, re.DOTALL) + if not match: + return "" + content = match.group(1).strip() + if content.lower() in ("none", "없음", ""): + return "" + return content + + +def build_report(config: PipelineConfig, result: PipelineResult) -> str: + """Build the complete markdown report string.""" + has_phases = any(ir.phase_name for ir in result.iterations) + + if has_phases: + return _build_phased_report(config, result) + return _build_simple_report(config, result) + + +def _build_simple_report( + config: PipelineConfig, result: PipelineResult, +) -> str: + """Build report for a non-phased (simple) pipeline run.""" + lines: list[str] = [] + + lines.append(f"# {_t(config, 'title')}\n") + _append_summary_table(lines, config, result) + + out_of_scope_items: list[tuple[int, str]] = [] + + for iter_result in result.iterations: + lines.append("---\n") + lines.append(f"## {_t(config, 'iteration')} {iter_result.iteration}\n") + + _append_iteration_steps(lines, config, iter_result, config.pipeline, out_of_scope_items) + + if iter_result.feedback: + lines.append(f"**{_t(config, 'feedback_next')}** {iter_result.feedback[:200]}...") + lines.append("") + + _append_out_of_scope(lines, config, out_of_scope_items) + _append_review_metrics_table(lines, config, result) + _append_repeated_aggregate(lines, config, result) + _append_final_verdict(lines, config, result) + + return "\n".join(lines) + + +def _build_phased_report( + config: PipelineConfig, result: PipelineResult, +) -> str: + """Build report for a phased pipeline run (e.g. review-fix).""" + lines: list[str] = [] + + lines.append(f"# {_t(config, 'title')}\n") + _append_summary_table(lines, config, result, phased=True) + + phase_map = {p.name: p for p in config.phases} + out_of_scope_items: list[tuple[int, str]] = [] + + for phase_name, phase_iters_iter in groupby( + result.iterations, key=lambda ir: ir.phase_name, + ): + phase_iters = list(phase_iters_iter) + phase_config = phase_map.get(phase_name or "") + + lines.append("---\n") + lines.append(f"## {_t(config, 'phase')}: {phase_name}\n") + + if phase_config: + step_desc = " → ".join(s.name for s in phase_config.steps) + lines.append( + f"{_t(config, 'steps')}: {step_desc} | " + f"{_t(config, 'max_iterations')}: {phase_config.max_iterations} | " + f"{_t(config, 'consec_pass')}: {phase_config.consecutive_pass}\n" + ) + + steps = phase_config.steps if phase_config else config.pipeline + + consecutive = 0 + for iter_result in phase_iters: + verdict_label = "" + if iter_result.verdict: + if iter_result.verdict == "PASS": + consecutive += 1 + if phase_config and phase_config.consecutive_pass > 1: + verdict_label = f" — PASS ({consecutive}/{phase_config.consecutive_pass})" + if consecutive >= phase_config.consecutive_pass: + verdict_label += " ✓" + else: + verdict_label = " — PASS ✓" + else: + consecutive = 0 + verdict_label = " — FAIL" + + lines.append( + f"### {_t(config, 'iteration')} {iter_result.iteration}{verdict_label}\n" + ) + _append_iteration_steps(lines, config, iter_result, steps, out_of_scope_items) + + if iter_result.feedback: + lines.append( + f"**{_t(config, 'feedback_next')}** {iter_result.feedback[:200]}..." + ) + lines.append("") + + _append_out_of_scope(lines, config, out_of_scope_items) + _append_review_metrics_table(lines, config, result) + _append_repeated_aggregate(lines, config, result) + _append_final_verdict(lines, config, result) + + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + +def _append_summary_table( + lines: list[str], + config: PipelineConfig, + result: PipelineResult, + phased: bool = False, +) -> None: + """Append the summary table to lines.""" + total_iter = len(result.iterations) + minutes = int(result.total_duration // 60) + seconds = int(result.total_duration % 60) + duration_str = f"{minutes}m {seconds}s" if minutes else f"{seconds}s" + + lines.append(f"## {_t(config, 'summary')}\n") + lines.append(f"| {_t(config, 'prop')} | {_t(config, 'val')} |") + lines.append("|----------|-------|") + lines.append(f"| {_t(config, 'total_iter')} | {total_iter} |") + lines.append(f"| {_t(config, 'final_verdict')} | **{result.final_verdict}** |") + lines.append(f"| {_t(config, 'duration')} | {duration_str} |") + + if phased and config.phases: + phase_names = " → ".join(p.name for p in config.phases) + lines.append(f"| {_t(config, 'phases_label')} | {phase_names} |") + for p in config.phases: + lines.append( + f"| {_t(config, 'phase')}: {p.name} | " + f"{_t(config, 'max_iterations')} {p.max_iterations}, " + f"{p.consecutive_pass}x {_t(config, 'consec_pass')} |" + ) + else: + lines.append(f"| {_t(config, 'max_iter')} | {config.max_iterations} |") + + lines.append("") + + +def _append_iteration_steps( + lines: list[str], + config: PipelineConfig, + iter_result: IterationResult, + steps: list[StepConfig], + out_of_scope_items: list[tuple[int, str]], +) -> None: + """Append step details for one iteration.""" + for step in steps: + agent_result = iter_result.step_results.get(step.output_key) + output = iter_result.step_outputs.get(step.output_key, "") + + agent_name = agent_result.agent_name if agent_result else step.agent + duration = f" ({agent_result.duration_seconds}s)" if agent_result else "" + + lines.append(f"### {_t(config, 'step')}: {step.name} ({agent_name}){duration}\n") + + if step.verdict and iter_result.verdict: + lines.append(f"**{_t(config, 'verdict')}: {iter_result.verdict}**\n") + + if len(output) > 500: + lines.append("
") + lines.append( + f"{_t(config, 'output_chars', n=str(len(output)))}\n" + ) + lines.append(output) + lines.append("\n
\n") + else: + lines.append(output) + lines.append("") + + if step.role == "review": + oos = _extract_out_of_scope(output) + if oos: + out_of_scope_items.append((iter_result.iteration, oos)) + + # Parse and accumulate review metrics for this iteration + step_metrics = parse_review_metrics(output) + if iter_result.review_metrics is None: + iter_result.review_metrics = step_metrics + else: + iter_result.review_metrics = _aggregate_metrics( + iter_result.review_metrics, step_metrics, + ) + + +def _append_review_metrics_table( + lines: list[str], + config: PipelineConfig, + result: PipelineResult, +) -> None: + """Append per-iteration review metrics table and trend summary.""" + # Only include if at least one iteration has metrics + has_metrics = any(ir.review_metrics for ir in result.iterations) + if not has_metrics: + return + + na = _t(config, "metrics_na") + + lines.append("---\n") + lines.append(f"## {_t(config, 'metrics_title')}\n") + + # Table header + lines.append( + f"| {_t(config, 'metrics_iter')} | {_t(config, 'verdict')} " + f"| Critical | Major | Minor " + f"| Over-eng | Omission " + f"| CONFIRMED | DISMISSED |" + ) + lines.append("|------|---------|----------|-------|-------|----------|----------|-----------|-----------|") + + # Table rows + for ir in result.iterations: + m = ir.review_metrics + v = ir.verdict or "-" + if m: + lines.append( + f"| {ir.iteration} | {v} " + f"| {m.critical} | {m.major} | {m.minor} " + f"| {m.over_engineering} | {m.omission} " + f"| {m.confirmed} | {m.dismissed} |" + ) + else: + lines.append( + f"| {ir.iteration} | {v} " + f"| {na} | {na} | {na} " + f"| {na} | {na} " + f"| {na} | {na} |" + ) + + lines.append("") + + # Trend summary + metrics_list = [ + (ir.iteration, ir.review_metrics) + for ir in result.iterations + if ir.review_metrics + ] + if len(metrics_list) >= 2: + lines.append(f"### {_t(config, 'metrics_trend_title')}\n") + _append_trend_line( + lines, "Issues", + [(it, m.critical + m.major + m.minor) for it, m in metrics_list], + ) + _append_trend_line( + lines, "Over-engineering", + [(it, m.over_engineering) for it, m in metrics_list], + ) + _append_trend_line( + lines, "Omission", + [(it, m.omission) for it, m in metrics_list], + ) + _append_trend_line( + lines, "CONFIRMED", + [(it, m.confirmed) for it, m in metrics_list], + ) + _append_trend_line( + lines, "DISMISSED", + [(it, m.dismissed) for it, m in metrics_list], + ) + lines.append("") + + +def _append_trend_line( + lines: list[str], + label: str, + values: list[tuple[int, int]], +) -> None: + """Append a single trend line like '- Issues: 6 -> 2 -> 0 (decreasing)'.""" + nums = [v for _, v in values] + arrow = " → ".join(str(n) for n in nums) + if nums[-1] < nums[0]: + direction = "decreasing" + elif nums[-1] > nums[0]: + direction = "increasing" + else: + direction = "stable" + lines.append(f"- {label}: {arrow} ({direction})") + + +def _append_out_of_scope( + lines: list[str], + config: PipelineConfig, + out_of_scope_items: list[tuple[int, str]], +) -> None: + """Append the out-of-scope issues section if any exist.""" + if not out_of_scope_items: + return + lines.append("---\n") + lines.append(f"## {_t(config, 'oos_title')}\n") + lines.append(f"{_t(config, 'oos_desc')}\n") + for iteration_num, content in out_of_scope_items: + lines.append(f"### {_t(config, 'iteration')} {iteration_num}\n") + lines.append(content) + lines.append("") + + +def _append_final_verdict( + lines: list[str], + config: PipelineConfig, + result: PipelineResult, +) -> None: + """Append the final verdict section.""" + lines.append("---\n") + lines.append(f"## {_t(config, 'final_verdict_title')}: {result.final_verdict}\n") + + if result.final_verdict == "PASS": + lines.append(_t(config, "pass_msg")) + else: + if config.phases: + phase_names = " → ".join(p.name for p in config.phases) + lines.append(_t(config, "fail_phased", phases=phase_names)) + else: + lines.append( + _t(config, "fail_simple", max_iter=str(config.max_iterations)) + ) + + +def _append_repeated_aggregate( + lines: list[str], + config: PipelineConfig, + result: PipelineResult, +) -> None: + """Append repeated aggregate warnings if any exist.""" + if not result.repeated_aggregate_warnings: + return + lines.append("---\n") + lines.append(f"## {_t(config, 'repeat_title')}\n") + lines.append(f"{_t(config, 'repeat_desc')}\n") + for warning in result.repeated_aggregate_warnings: + lines.append(f"- {warning}") + lines.append("") diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..24d3f55 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,18 @@ +[build-system] +requires = ["setuptools>=68.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "cross-eval" +version = "0.1.0" +description = "AI agent cross-evaluation CLI tool" +requires-python = ">=3.9" +dependencies = [ + "pyyaml>=6.0", +] + +[project.scripts] +cross-eval = "cross_eval.cli:main" + +[tool.setuptools.packages.find] +include = ["cross_eval*"] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3aecde9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +pyyaml>=6.0 diff --git a/tests/__pycache__/test_config.cpython-312.pyc b/tests/__pycache__/test_config.cpython-312.pyc new file mode 100644 index 0000000..b2ce054 Binary files /dev/null and b/tests/__pycache__/test_config.cpython-312.pyc differ diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..7a4cb1a --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,515 @@ +from __future__ import annotations + +import unittest +from unittest.mock import patch + +from cross_eval.agent import _supports_reasoning_effort +from cross_eval.agent import invoke_agent +from cross_eval.config import ( + BUILTIN_AGENTS, + _default_seniors_for_preset, + apply_reasoning_effort_settings, + normalize_reasoning_effort, + validate_config, +) +from cross_eval.models import ( + AgentConfig, + IterationResult, + PhaseConfig, + PipelineConfig, + PipelineResult, + ReviewMetrics, + StepConfig, +) +from cross_eval.pipeline import _detect_repeated_aggregate +from cross_eval.prompts import ( + GENERATE_TEMPLATE, + GENERATE_TEMPLATE_KO, + REVIEW_TEMPLATE, + REVIEW_TEMPLATE_KO, + REVIEW_ONLY_TEMPLATE, + REVIEW_ONLY_TEMPLATE_KO, + AGGREGATE_REVIEW_TEMPLATE, + AGGREGATE_REVIEW_TEMPLATE_KO, + _build_cross_review_preset, + _build_review_fix_preset, + _build_review_only_preset, + _build_simple_preset, +) +from cross_eval.report import build_report, parse_review_metrics + + +class BuiltinAgentConfigTest(unittest.TestCase): + def test_codex_builtin_agents_skip_git_repo_check(self) -> None: + for agent_name in ("codex-coder", "codex-reviewer", "codex-senior"): + with self.subTest(agent=agent_name): + self.assertIn( + "--skip-git-repo-check", + BUILTIN_AGENTS[agent_name].args, + ) + + def test_senior_builtin_agents_exist(self) -> None: + self.assertIn("claude-senior", BUILTIN_AGENTS) + self.assertIn("codex-senior", BUILTIN_AGENTS) + + def test_builtin_reasoning_effort_defaults_match_recommended_levels(self) -> None: + self.assertEqual(BUILTIN_AGENTS["codex-coder"].reasoning_effort, "medium") + self.assertEqual(BUILTIN_AGENTS["codex-reviewer"].reasoning_effort, "medium") + self.assertEqual(BUILTIN_AGENTS["codex-senior"].reasoning_effort, "high") + + def test_normalize_reasoning_effort_aliases(self) -> None: + self.assertEqual(normalize_reasoning_effort("extra-high"), "xhigh") + self.assertEqual(normalize_reasoning_effort("extra_high"), "xhigh") + self.assertEqual(normalize_reasoning_effort("x-high"), "xhigh") + + def test_apply_reasoning_effort_settings_uses_defaults_and_role_overrides(self) -> None: + config = PipelineConfig( + agents={ + "codex-coder": AgentConfig(name="codex-coder", command="codex"), + "codex-reviewer": AgentConfig(name="codex-reviewer", command="codex"), + "codex-senior": AgentConfig(name="codex-senior", command="codex"), + }, + coders=["codex-coder"], + reviewers=["codex-reviewer"], + seniors=["codex-senior"], + ) + + apply_reasoning_effort_settings( + config, + reviewer_effort="high", + senior_effort="xhigh", + ) + + self.assertEqual(config.agents["codex-coder"].reasoning_effort, "medium") + self.assertEqual(config.agents["codex-reviewer"].reasoning_effort, "high") + self.assertEqual(config.agents["codex-senior"].reasoning_effort, "xhigh") + + def test_codex_supports_reasoning_effort_override(self) -> None: + self.assertTrue(_supports_reasoning_effort("codex")) + self.assertFalse(_supports_reasoning_effort("claude")) + + def test_invoke_agent_passes_reasoning_effort_to_codex(self) -> None: + captured: dict[str, list[str]] = {} + + def _fake_run(cmd, **kwargs): + captured["cmd"] = cmd + + class _Result: + returncode = 0 + stdout = "VERDICT: PASS" + stderr = "" + + return _Result() + + agent = AgentConfig( + name="codex-reviewer", + command="codex", + args=["exec", "--model", "gpt-5.4", "-"], + reasoning_effort="high", + ) + + with patch("subprocess.run", side_effect=_fake_run): + invoke_agent(agent, "prompt", "review", quiet=True) + + self.assertEqual( + captured["cmd"][:3], + ["codex", "-c", 'model_reasoning_effort="high"'], + ) + + def test_detect_repeated_aggregate_warns_on_same_output(self) -> None: + steps = [ + StepConfig( + name="aggregate_review", + agent="codex-senior", + role="review", + prompt_template="default:aggregate-review", + output_key="aggregate_review", + ), + ] + history: dict[str, int] = {} + + first = _detect_repeated_aggregate( + steps, {"aggregate_review": "Same issue list"}, history, iteration=1, + ) + second = _detect_repeated_aggregate( + steps, {"aggregate_review": " same issue list "}, history, iteration=2, + ) + + self.assertIsNone(first) + self.assertEqual( + second, + "Repeated aggregate_review detected at iteration 2 (same as iteration 1).", + ) + + def test_report_includes_repeated_aggregate_section(self) -> None: + config = PipelineConfig(language="en") + result = PipelineResult(repeated_aggregate_warnings=[ + "Repeated aggregate_review detected at iteration 4 (same as iteration 3).", + ]) + + report = build_report(config, result) + + self.assertIn("Repeated Aggregate Findings", report) + self.assertIn("same as iteration 3", report) + + def test_review_fix_defaults_senior_from_reviewer_family(self) -> None: + self.assertEqual( + _default_seniors_for_preset( + "preset:review-fix", + ["codex-reviewer", "claude-reviewer"], + BUILTIN_AGENTS, + ), + ["codex-senior"], + ) + self.assertEqual( + _default_seniors_for_preset( + "preset:review-fix", + ["claude-reviewer"], + BUILTIN_AGENTS, + ), + ["claude-senior"], + ) + self.assertEqual( + _default_seniors_for_preset( + "preset:simple", + ["codex-reviewer"], + BUILTIN_AGENTS, + ), + [], + ) + + def test_review_fix_duplicate_reviewers_get_unique_step_keys(self) -> None: + phases = _build_review_fix_preset( + ["codex-coder"], + ["codex-reviewer", "codex-reviewer", "codex-reviewer"], + [], + ) + + converge = phases[0] + self.assertEqual( + [step.name for step in converge.steps[:3]], + [ + "review_codex_reviewer", + "review_codex_reviewer_2", + "review_codex_reviewer_3", + ], + ) + self.assertEqual( + [step.output_key for step in converge.steps[:3]], + [ + "review_codex_reviewer", + "review_codex_reviewer_2", + "review_codex_reviewer_3", + ], + ) + self.assertEqual( + [step.name for step in converge.steps[3:]], + ["aggregate_review", "generate", "verify"], + ) + + def test_review_only_duplicate_reviewers_get_unique_step_keys(self) -> None: + steps = _build_review_only_preset( + ["codex-coder"], + ["codex-reviewer", "codex-reviewer"], + [], + ) + + self.assertEqual( + [step.output_key for step in steps], + ["review_codex_reviewer", "review_codex_reviewer_2"], + ) + + def test_cross_review_duplicate_coders_get_unique_step_keys(self) -> None: + steps = _build_cross_review_preset( + ["codex-coder", "codex-coder"], + ["codex-reviewer"], + [], + ) + + self.assertEqual( + [step.output_key for step in steps], + [ + "code_codex_coder", + "code_codex_coder_2", + "review_by_codex_coder", + "review_by_codex_coder_2", + ], + ) + + def test_review_fix_uses_senior_for_aggregate_and_verify(self) -> None: + phases = _build_review_fix_preset( + ["codex-coder"], + ["claude-reviewer", "codex-reviewer"], + ["codex-senior"], + ) + + steps = phases[0].steps + self.assertEqual(steps[2].name, "aggregate_review") + self.assertEqual(steps[2].agent, "codex-senior") + self.assertEqual(steps[3].name, "generate") + self.assertEqual(steps[4].name, "verify") + self.assertEqual(steps[4].agent, "codex-senior") + self.assertTrue(steps[4].verdict) + + def test_review_only_with_senior_adds_aggregate_step(self) -> None: + steps = _build_review_only_preset( + ["codex-coder"], + ["claude-reviewer", "codex-reviewer"], + ["claude-senior"], + ) + + self.assertEqual(steps[-1].name, "senior_review") + self.assertEqual(steps[-1].agent, "claude-senior") + self.assertTrue(steps[-1].verdict) + self.assertFalse(steps[0].verdict) + self.assertFalse(steps[1].verdict) + + def test_simple_with_senior_adds_final_aggregate_step(self) -> None: + steps = _build_simple_preset( + ["codex-coder"], + ["codex-reviewer"], + ["codex-senior"], + ) + + self.assertEqual( + [step.name for step in steps], + ["generate", "review", "senior_review"], + ) + self.assertFalse(steps[1].verdict) + self.assertTrue(steps[2].verdict) + + def test_validate_config_rejects_duplicate_phase_step_names_and_output_keys(self) -> None: + config = PipelineConfig( + agents={ + "codex-reviewer": AgentConfig( + name="codex-reviewer", + command="codex", + ), + }, + phases=[ + PhaseConfig( + name="converge", + steps=[ + StepConfig( + name="review_dup", + agent="codex-reviewer", + role="review", + prompt_template="default:review-only", + output_key="same_key", + verdict=True, + ), + StepConfig( + name="review_dup", + agent="codex-reviewer", + role="review", + prompt_template="default:review-only", + output_key="same_key", + verdict=True, + ), + ], + ), + ], + ) + + errors = validate_config(config) + + self.assertIn("Phase 'converge' has duplicate step name 'review_dup'", errors) + self.assertIn("Phase 'converge' has duplicate output_key 'same_key'", errors) + + +class PromptTemplateTest(unittest.TestCase): + """Verify prompt template content after category/assessment refactor.""" + + def test_review_templates_no_false_positive_category(self) -> None: + """False positive should NOT appear as a category in review templates.""" + for tmpl, label in [ + (REVIEW_TEMPLATE, "REVIEW_TEMPLATE"), + (REVIEW_TEMPLATE_KO, "REVIEW_TEMPLATE_KO"), + (REVIEW_ONLY_TEMPLATE, "REVIEW_ONLY_TEMPLATE"), + (REVIEW_ONLY_TEMPLATE_KO, "REVIEW_ONLY_TEMPLATE_KO"), + ]: + with self.subTest(template=label): + # Should not contain "False positive" as a category bullet + self.assertNotIn( + "**False positive**", tmpl, + f"{label} still lists False positive as a category", + ) + # KO variant + if label.endswith("_KO"): + self.assertNotIn( + "**오탐**", tmpl, + f"{label} still lists 오탐 as a category", + ) + + def test_review_templates_have_confirmed_dismissed(self) -> None: + """Review templates should instruct CONFIRMED / DISMISSED assessment.""" + for tmpl, label in [ + (REVIEW_TEMPLATE, "REVIEW_TEMPLATE"), + (REVIEW_TEMPLATE_KO, "REVIEW_TEMPLATE_KO"), + ]: + with self.subTest(template=label): + self.assertIn("CONFIRMED", tmpl) + self.assertIn("DISMISSED", tmpl) + + def test_generate_templates_ignore_dismissed(self) -> None: + """Generate templates should tell coder to ignore DISMISSED items.""" + self.assertIn("DISMISSED", GENERATE_TEMPLATE) + self.assertIn("DISMISSED", GENERATE_TEMPLATE_KO) + + def test_aggregate_templates_dismissed_structure(self) -> None: + """Aggregate templates should use [False positive] / [Already fixed] tags.""" + self.assertIn("[False positive]", AGGREGATE_REVIEW_TEMPLATE) + self.assertIn("[Already fixed]", AGGREGATE_REVIEW_TEMPLATE) + self.assertIn("[오탐]", AGGREGATE_REVIEW_TEMPLATE_KO) + self.assertIn("[수정 완료]", AGGREGATE_REVIEW_TEMPLATE_KO) + + +class ReviewMetricsParsingTest(unittest.TestCase): + """Test review output metrics parsing.""" + + def test_parse_review_metrics_basic(self) -> None: + output = """\ +### Issues Found +- [Critical][Over-engineering] Added unnecessary caching layer +- [Major][Omission] Missing input validation for user_id +- [Major][Omission] Missing error handling for DB calls +- [Minor][Omission] No docstring on public API + +### Summary +- Critical: 1, Major: 2, Minor: 1 +- Over-engineering count: 1 +- Omission count: 3 +- CONFIRMED: 0, DISMISSED: 0 +""" + m = parse_review_metrics(output) + self.assertEqual(m.critical, 1) + self.assertEqual(m.major, 2) + self.assertEqual(m.minor, 1) + self.assertEqual(m.over_engineering, 1) + self.assertEqual(m.omission, 3) + self.assertEqual(m.confirmed, 0) + self.assertEqual(m.dismissed, 0) + + def test_parse_review_metrics_korean(self) -> None: + output = """\ +### 발견된 이슈 +- [Critical][과최적화] 불필요한 캐시 레이어 추가 +- [Major][누락] user_id 입력 검증 누락 + +### 이전 피드백 평가 +- CONFIRMED: DB 에러 핸들링 — 여전히 미구현 +- DISMISSED (오탐): 타입 힌트 누락 — 기획서에 없는 요구사항 +""" + m = parse_review_metrics(output) + self.assertEqual(m.critical, 1) + self.assertEqual(m.major, 1) + self.assertEqual(m.over_engineering, 1) + self.assertEqual(m.omission, 1) + self.assertEqual(m.confirmed, 1) + self.assertEqual(m.dismissed, 1) + + def test_parse_review_metrics_with_assessment(self) -> None: + output = """\ +### Previous Feedback Assessment +- CONFIRMED: Missing auth check — still not implemented +- CONFIRMED: SQL injection risk — still present +- DISMISSED (false positive): Unused import — actually used in tests + +### Issues Found +- [Critical][Omission] Missing auth check +- [Critical][Omission] SQL injection risk +""" + m = parse_review_metrics(output) + self.assertEqual(m.confirmed, 2) + self.assertEqual(m.dismissed, 1) + self.assertEqual(m.critical, 2) + self.assertEqual(m.omission, 2) + + def test_report_includes_metrics_table(self) -> None: + config = PipelineConfig( + language="en", + pipeline=[ + StepConfig( + name="review", + agent="claude-reviewer", + role="review", + prompt_template="default:review", + output_key="review_result", + verdict=True, + ), + ], + ) + result = PipelineResult( + iterations=[ + IterationResult( + iteration=1, + step_outputs={ + "review_result": ( + "### Issues Found\n" + "- [Critical][Omission] Missing auth\n" + "- [Major][Over-engineering] Extra abstraction\n" + "### Verdict\nVERDICT: FAIL" + ), + }, + verdict="FAIL", + ), + IterationResult( + iteration=2, + step_outputs={ + "review_result": ( + "### Previous Feedback Assessment\n" + "- CONFIRMED: Missing auth — still missing\n" + "- DISMISSED (false positive): Extra abstraction — needed per plan\n" + "### Issues Found\n" + "- [Major][Omission] Missing auth\n" + "### Verdict\nVERDICT: FAIL" + ), + }, + verdict="FAIL", + ), + ], + final_verdict="FAIL", + ) + + report = build_report(config, result) + + self.assertIn("Review Metrics", report) + # Check table headers + self.assertIn("Critical", report) + self.assertIn("CONFIRMED", report) + self.assertIn("DISMISSED", report) + # Check trend section + self.assertIn("Metrics Trend", report) + self.assertIn("decreasing", report) + + def test_report_no_metrics_table_without_review_steps(self) -> None: + config = PipelineConfig( + language="en", + pipeline=[ + StepConfig( + name="generate", + agent="claude-coder", + role="generate", + prompt_template="default:generate", + output_key="generated_code", + verdict=True, + ), + ], + ) + result = PipelineResult( + iterations=[ + IterationResult( + iteration=1, + step_outputs={"generated_code": "some code"}, + verdict="PASS", + ), + ], + final_verdict="PASS", + ) + + report = build_report(config, result) + self.assertNotIn("Review Metrics", report) + + +if __name__ == "__main__": + unittest.main()