diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 524e6b9..6c8ffea 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -41,7 +41,7 @@ inputs: checklist: checklist.md agents: - generator: + coder: command: claude args: ["-p", "--model", "sonnet", "--permission-mode", "auto"] system_prompt: "You are a senior software engineer. Follow the plan precisely." @@ -53,14 +53,16 @@ agents: # 방법 1: 프리셋 사용 (사용자가 pipeline YAML 직접 작성할 필요 없음) pipeline: preset:simple # "A 생성 → B 리뷰" (기본값) # pipeline: preset:cross-review # "둘 다 생성 → 서로 리뷰" +# pipeline: preset:plan-review # "구현 전 문서/기획 검토" +# pipeline: preset:coding-review-fix # "초기 코딩 1회 → 리뷰/수정 반복" # 방법 2: 직접 커스텀 (고급 사용자용) # pipeline: -# - name: generate -# agent: generator -# role: generate -# prompt_template: "default:generate" -# output_key: generated_code +# - name: coding +# agent: coder +# role: coding +# prompt_template: "default:coding" +# output_key: coding_output # - name: review # agent: reviewer # role: review @@ -73,8 +75,10 @@ pipeline: preset:simple # "A 생성 → B 리뷰" (기본값) | 프리셋 | 설명 | 자동 생성되는 steps | |--------|------|-------------------| -| `simple` | A 생성 → B 리뷰 | generate(agent1) → review(agent2) | -| `cross-review` | 둘 다 생성, 서로 리뷰 | gen_a → gen_b → review_of_b(agent_a) → review_of_a(agent_b) | +| `simple` | A 코딩 → B 리뷰 | coding(agent1) → review(agent2) | +| `cross-review` | 둘 다 코딩, 서로 리뷰 | coding_a → coding_b → review_of_b(agent_a) → review_of_a(agent_b) | +| `plan-review` | 구현 전 문서 검토 | parallel plan_review_* → senior_review(optional) | +| `coding-review-fix` | 초기 코딩 후 리뷰/수정 반복 | initial_coding(coding) → review_fix(review* → aggregate → coding → verify) | 프리셋은 내부적으로 적절한 pipeline steps + context_override를 자동 구성한다. agents에 정의된 순서대로 agent1, agent2가 배정된다. 프리셋이 불충분하면 직접 steps를 작성할 수 있다. @@ -109,11 +113,11 @@ cross_eval/ - verdict_pattern 유효한 정규식인지 **prompts.py** — 기본 프롬프트 2종 + 파이프라인 프리셋 정의: -- `default:generate` — "기획서에 명시된 것만 구현하라, 과최적화 금지" + plan/checklist/feedback + **"프로젝트 디렉토리의 기존 코드를 탐색하여 컨텍스트를 파악하라"** 지시 +- `default:coding` — "기획서에 명시된 것만 구현하라, 과최적화 금지" + plan/checklist/feedback + **"프로젝트 디렉토리의 기존 코드를 탐색하여 컨텍스트를 파악하라"** 지시 - `default:review` — 과최적화/오탐/누락 3기준 검토 + `VERDICT: PASS|FAIL` 출력 + **"프로젝트 디렉토리를 직접 탐색하여 코드를 검증하라"** 지시 - `{variable}` 플레이스홀더, 누락 시 `(no {key} provided)` 출력 - 사용자가 커스텀 .md 파일로 오버라이드 가능 -- `PIPELINE_PRESETS` dict: `simple`, `cross-review` 등 프리셋별 StepConfig 리스트 정의 +- `PIPELINE_PRESETS` dict: `simple`, `cross-review`, `plan-review` 등 프리셋별 StepConfig 리스트 정의 **agent.py** — `invoke_agent(agent_config, prompt, cwd)`: - `cwd` 파라미터로 프로젝트 디렉토리 지정 → 에이전트가 해당 디렉토리에서 파일 탐색 가능 @@ -141,7 +145,7 @@ final-report.md 생성 - 최종 판정 **cli.py** — 서브커맨드: -- `cross-eval init [--dir .] [--preset simple|cross-review]` — 스캐폴딩 (기존 파일 안 덮어씀) +- `cross-eval init [--dir .] [--preset simple|cross-review|plan-review]` — 스캐폴딩 (기존 파일 안 덮어씀) - `cross-eval run [-c config] [--max-iter N] [--dry-run] [--output-dir path] [--input key=path ...]` - `--input key=path`: config의 inputs 오버라이드/추가 - `--dry-run`: 에이전트 호출 없이 렌더링된 프롬프트만 출력 @@ -167,3 +171,17 @@ final-report.md 생성 3. `cross-eval run --dry-run` 로 프롬프트 렌더링 확인 (에이전트 호출 없이) 4. plan.md/checklist.md에 간단한 내용 넣고 `cross-eval run --max-iter 2` 로 실제 실행 5. `output/` 디렉토리에 v1/, final-report.md 생성 확인 + + + cross-eval run \ + --docs /Users/chungyeong/Desktop/Dev/new-alpha-foundry/plans/TO_CLICKHOUSE \ + --preset coding-review-fix \ + --coder claude \ + --reviewer codex \ + --reviewer codex \ + --reviewer codex \ + --senior codex \ + --coder-effort high \ + --reviewer-effort high \ + --senior-effort xhigh \ + --max-iter 10 diff --git a/README.md b/README.md index e286554..726b0ac 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ AI 에이전트 간 교차 검증을 자동화하는 CLI 도구. -기획서와 체크리스트를 기반으로 "생성 → 리뷰 → 피드백 → 재생성" 루프를 자동으로 돌려서, +기획서와 체크리스트를 기반으로 "코딩 → 리뷰 → 피드백 → 재코딩" 루프를 자동으로 돌려서, **과최적화 / 오탐 / 누락** 문제를 잡아냅니다. ## 설치 @@ -51,7 +51,7 @@ cp .cross-eval/checklist-sample.md .cross-eval/checklist.md ### 3. 실행 ```bash -# 기본 실행 (생성 → 리뷰, 최대 3회 반복) +# 기본 실행 (코딩 → 리뷰, 최대 3회 반복) cross-eval run # 프롬프트만 확인 (에이전트 호출 없이, 비용 절약) @@ -72,10 +72,10 @@ cross-eval run --config .cross-eval/config.yaml ``` output/ ├── v1/ -│ ├── generate.md # 에이전트 생성 결과 +│ ├── coding.md # 에이전트 코딩 결과 │ └── review.md # 에이전트 리뷰 결과 ├── v2/ -│ ├── generate.md +│ ├── coding.md │ └── review.md └── final-report.md # 전체 요약 리포트 ``` @@ -92,7 +92,7 @@ inputs: checklist: checklist.md agents: - generator: + coder: command: claude args: ["-p", "--model", "sonnet", "--permission-mode", "auto"] system_prompt: "You are a senior software engineer." @@ -110,11 +110,16 @@ pipeline: preset:simple | 프리셋 | 설명 | |--------|------| -| `simple` | Agent A가 생성, Agent B가 리뷰 (기본값) | -| `cross-review` | 둘 다 생성, 서로 교차 리뷰 | +| `simple` | Agent A가 코딩, Agent B가 리뷰 (기본값) | +| `cross-review` | 둘 다 코딩, 서로 교차 리뷰 | +| `plan-review` | 구현 전 기획서/체크리스트/참고문서를 검토하고 필요시 현재 코드베이스와의 정합성도 확인 | +| `review-only` | 기존 코드만 감사 용도로 검토 | +| `review-fix` | 리뷰 결과를 취합한 뒤 자동 수정과 재검증까지 반복 | +| `coding-review-fix` | 초기 코딩 1회 후 리뷰 결과를 취합해 자동 수정과 재검증을 반복 | ```bash # 초기화 옵션 cross-eval init --preset cross-review # 교차 리뷰 프리셋 +cross-eval init --preset plan-review # 구현 전 문서 검토 프리셋 cross-eval init --lang en # 영어 템플릿 ``` diff --git a/cross_eval.egg-info/SOURCES.txt b/cross_eval.egg-info/SOURCES.txt index 77a3801..8272bb0 100644 --- a/cross_eval.egg-info/SOURCES.txt +++ b/cross_eval.egg-info/SOURCES.txt @@ -4,6 +4,8 @@ cross_eval/__init__.py cross_eval/agent.py cross_eval/cli.py cross_eval/config.py +cross_eval/demo.py +cross_eval/doctor.py cross_eval/models.py cross_eval/pipeline.py cross_eval/prompts.py @@ -14,4 +16,6 @@ cross_eval.egg-info/dependency_links.txt cross_eval.egg-info/entry_points.txt cross_eval.egg-info/requires.txt cross_eval.egg-info/top_level.txt -tests/test_config.py \ No newline at end of file +tests/test_config.py +tests/test_onboarding.py +tests/test_pipeline_integration.py \ No newline at end of file diff --git a/cross_eval/agent.py b/cross_eval/agent.py index 9ace21f..8fb6ef4 100644 --- a/cross_eval/agent.py +++ b/cross_eval/agent.py @@ -19,6 +19,34 @@ _SYSTEM_PROMPT_AGENTS = ("claude",) _REASONING_EFFORT_AGENTS = ("codex",) +class AgentInvocationError(RuntimeError): + """Structured error for agent CLI failures.""" + + def __init__( + self, + *, + agent_name: str, + step_name: str, + cmd_preview: str, + raw_error: str, + failure_type: str, + suggested_action: str, + ) -> None: + self.agent_name = agent_name + self.step_name = step_name + self.cmd_preview = cmd_preview + self.raw_error = raw_error + self.failure_type = failure_type + self.suggested_action = suggested_action + super().__init__( + f"Agent '{agent_name}' failed (exit code != 0) at step '{step_name}':\n" + f" type: {failure_type}\n" + f" cmd: {cmd_preview}\n" + f" error: {raw_error or '(no output)'}\n" + f" action: {suggested_action}" + ) + + def _supports_system_prompt_flag(command: str) -> bool: """Check if the agent CLI supports --system-prompt flag.""" return any(name in command for name in _SYSTEM_PROMPT_AGENTS) @@ -29,6 +57,53 @@ def _supports_reasoning_effort(command: str) -> bool: return any(name in command for name in _REASONING_EFFORT_AGENTS) +def _classify_agent_failure(detail: str) -> tuple[str, str]: + """Classify a failed agent invocation into a user-actionable bucket.""" + normalized = detail.lower() + + auth_markers = ( + "not logged in", + "please run /login", + "auth", + "authentication", + "invalid api key", + "api key", + "unauthorized", + "forbidden", + ) + usage_limit_markers = ( + "quota", + "rate limit", + "credits", + "credit balance", + "budget", + "insufficient funds", + "usage limit", + "token limit", + "billing", + ) + + if any(marker in normalized for marker in auth_markers): + return ( + "AUTH", + "Agent CLI authentication is missing or expired. Re-authenticate the CLI, then rerun.", + ) + if any(marker in normalized for marker in usage_limit_markers): + return ( + "USAGE_LIMIT", + "Agent CLI hit a quota, billing, or token budget limit. Refill or raise the limit, then rerun.", + ) + if "api error" in normalized: + return ( + "API_ERROR", + "Agent CLI returned an API error. Inspect the saved error file for the raw response.", + ) + return ( + "UNKNOWN", + "Agent CLI failed for an unknown reason. Inspect the saved error file for details.", + ) + + class _Spinner: """Animated spinner for long-running agent calls.""" @@ -137,11 +212,14 @@ def invoke_agent( if err_detail and len(err_detail) > 500: err_detail = err_detail[:500] + "..." cmd_preview = " ".join(cmd[:6]) - raise RuntimeError( - f"Agent '{agent.name}' failed (exit code {result.returncode}) " - f"at step '{step_name}':\n" - f" cmd: {cmd_preview}\n" - f" error: {err_detail or '(no output)'}" + failure_type, suggested_action = _classify_agent_failure(err_detail or "") + raise AgentInvocationError( + agent_name=agent.name, + step_name=step_name, + cmd_preview=cmd_preview, + raw_error=err_detail or "(no output)", + failure_type=failure_type, + suggested_action=suggested_action, ) if spinner: diff --git a/cross_eval/cli.py b/cross_eval/cli.py index 68dc75e..45d424a 100644 --- a/cross_eval/cli.py +++ b/cross_eval/cli.py @@ -7,7 +7,7 @@ import sys from pathlib import Path from cross_eval import __version__ -from cross_eval.config import REASONING_EFFORT_CHOICES +from cross_eval.config import REASONING_EFFORT_CHOICES, resolve_agent_shorthand logger = logging.getLogger(__name__) @@ -38,7 +38,7 @@ coders: [claude-coder] reviewers: [claude-reviewer] # seniors: [codex-senior] -# 파이프라인 종류: simple | cross-review | review-only | review-fix +# 파이프라인 종류: simple | cross-review | plan-review | review-only | review-fix | coding-review-fix pipeline: preset:{preset} # 반복 설정 @@ -145,7 +145,7 @@ def main(argv: list[str] | None = None) -> int: "AI 코딩 에이전트의 결과물을 자동으로 검증하는 CLI 도구.\n" "\n" "동작 방식:\n" - " 1. 기획서(plan)를 바탕으로 Coder 에이전트가 코드를 생성\n" + " 1. 기획서(plan)를 바탕으로 Coder 에이전트가 코드를 작성\n" " 2. Reviewer 에이전트가 기획서 대비 코드를 검토하고 PASS/FAIL 판정\n" " 3. FAIL이면 피드백을 반영해서 1~2를 반복 (최대 N회)\n" "\n" @@ -195,11 +195,19 @@ def main(argv: list[str] | None = None) -> int: init_parser.add_argument( "--preset", default="simple", - choices=["simple", "cross-review", "review-only", "review-fix"], + choices=[ + "simple", + "cross-review", + "plan-review", + "review-only", + "review-fix", + "coding-review-fix", + ], help=( "파이프라인 종류 (기본: simple). " - "simple=코딩+리뷰, cross-review=교차리뷰, " - "review-only=리뷰만, review-fix=리뷰수렴+자동수정" + "simple=코딩+리뷰, cross-review=교차리뷰, plan-review=문서기획검토, " + "review-only=리뷰만, review-fix=리뷰수렴+자동수정, " + "coding-review-fix=초기코딩후리뷰수렴" ), ) init_parser.add_argument( @@ -208,13 +216,65 @@ def main(argv: list[str] | None = None) -> int: choices=["en", "ko"], help="프롬프트 언어 (기본: ko)", ) + init_parser.add_argument( + "--guided", + action="store_true", + help="대화형 설정 마법사 실행", + ) + + # --- doctor --- + doctor_parser = subparsers.add_parser( + "doctor", + help="실행 환경 점검 (CLI 설치, 인증, 설정 파일 검증)", + description="cross-eval 실행에 필요한 환경을 점검합니다.", + ) + doctor_parser.add_argument( + "--dir", + type=Path, + default=Path("."), + help="점검할 디렉토리 (기본: 현재 디렉토리)", + ) + + # --- demo --- + demo_parser = subparsers.add_parser( + "demo", + help="내장 데모 실행 (파이프라인 동작 체험)", + description=( + "내장된 간단한 기획서로 cross-eval 파이프라인의 전체 동작을 체험합니다.\n" + "기본값은 mock 모드(시뮬레이션)이며, --live로 실제 에이전트를 호출할 수 있습니다." + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + demo_parser.add_argument( + "--live", + action="store_true", + help="실제 에이전트를 호출하여 데모 실행 (API 비용 발생)", + ) + demo_parser.add_argument( + "--preset", + default="simple", + choices=["simple", "review-fix", "coding-review-fix"], + help="데모할 파이프라인 종류 (기본: simple)", + ) + demo_parser.add_argument( + "--escalate", + action="store_true", + help="ESCALATE 시나리오 데모 (mock 모드 전용)", + ) + demo_parser.add_argument( + "--timeout", + type=int, + default=None, + metavar="SEC", + help="에이전트 호출 제한 시간 (--live 전용)", + ) # --- run --- run_parser = subparsers.add_parser( "run", help="검증 파이프라인 실행", description=( - "기획서(plan)를 기반으로 AI 에이전트가 코드 생성과 리뷰를 반복합니다.\n" + "기획서(plan)를 기반으로 AI 에이전트가 코딩과 리뷰를 반복합니다.\n" "\n" "설정 파일 없이 바로 실행할 수 있고, config.yaml로도 실행할 수 있습니다.\n" "CLI 옵션이 config.yaml보다 우선합니다." @@ -222,13 +282,19 @@ def main(argv: list[str] | None = None) -> int: epilog=( "파이프라인 종류 (--preset):\n" " ┌──────────────┬─────────────────────────────────────────────────────┐\n" - " │ simple │ Coder가 코드 생성 → Reviewer가 리뷰 │\n" - " │ (기본값) │ FAIL이면 피드백 반영해서 재생성, PASS까지 반복 │\n" + " │ simple │ Coder가 코드 작성 → Reviewer가 리뷰 │\n" + " │ (기본값) │ FAIL이면 피드백 반영해서 재코딩, PASS까지 반복 │\n" " ├──────────────┼─────────────────────────────────────────────────────┤\n" " │ review-fix │ 2단계 파이프라인: │\n" " │ │ Reviewer N명 병렬 리뷰 → 취합 → 수정 → 재검증 │\n" " ├──────────────┼─────────────────────────────────────────────────────┤\n" - " │ review-only │ 코드 생성 없이 Reviewer N명이 기존 코드만 검토 │\n" + " │ coding- │ 3단계 파이프라인: │\n" + " │ review-fix │ 초기 코딩 1회 → 리뷰 취합 → 수정 → 재검증 반복 │\n" + " ├──────────────┼─────────────────────────────────────────────────────┤\n" + " │ plan-review │ 구현 전 기획서/체크리스트/문서를 검토 │\n" + " │ │ 필요하면 현재 코드베이스와의 정합성도 점검 │\n" + " ├──────────────┼─────────────────────────────────────────────────────┤\n" + " │ review-only │ 코드 작성 없이 Reviewer N명이 기존 코드만 검토 │\n" " │ │ (이미 작성된 코드의 품질 감사용) │\n" " ├──────────────┼─────────────────────────────────────────────────────┤\n" " │ cross-review │ Coder 2명이 각각 구현 → 상대방 코드를 교차 리뷰 │\n" @@ -239,10 +305,10 @@ def main(argv: list[str] | None = None) -> int: " ┌──────────────────┬─────────┬───────────┬──────────────────────────┐\n" " │ 이름 │ CLI │ 기본 모델 │ 역할 │\n" " ├──────────────────┼─────────┼───────────┼──────────────────────────┤\n" - " │ claude-coder │ claude │ opus │ 코드 생성 │\n" + " │ claude-coder │ claude │ opus │ 코드 작성 │\n" " │ claude-reviewer │ claude │ opus │ 코드 리뷰 │\n" " │ claude-senior │ claude │ opus │ 리뷰 취합/판정 │\n" - " │ codex-coder │ codex │ gpt-5.4 │ 코드 생성 │\n" + " │ codex-coder │ codex │ gpt-5.4 │ 코드 작성 │\n" " │ codex-reviewer │ codex │ gpt-5.4 │ 코드 리뷰 │\n" " │ codex-senior │ codex │ gpt-5.4 │ 리뷰 취합/판정 │\n" " └──────────────────┴─────────┴───────────┴──────────────────────────┘\n" @@ -267,10 +333,18 @@ def main(argv: list[str] | None = None) -> int: " cross-eval run --plan plan.md --preset review-fix \\\n" " --reviewer claude --reviewer codex\n" "\n" + " 초기 코딩 후 리뷰 수렴 + 자동 수정 (coding-review-fix):\n" + " cross-eval run --plan plan.md --preset coding-review-fix \\\n" + " --reviewer claude --reviewer codex\n" + "\n" " 기존 코드 리뷰만 (review-only):\n" " cross-eval run --plan plan.md --preset review-only \\\n" " --reviewer claude --reviewer codex\n" "\n" + " 구현 전 문서/기획 검토 (plan-review):\n" + " cross-eval run --plan plan.md --preset plan-review \\\n" + " --reviewer claude --reviewer codex\n" + "\n" " 모델 변경:\n" " cross-eval run --plan plan.md --model sonnet\n" "\n" @@ -341,7 +415,7 @@ def main(argv: list[str] | None = None) -> int: help="모든 에이전트의 모델을 한번에 변경 (예: sonnet, opus)", ) agent_group.add_argument( - "--generator-model", default=None, metavar="MODEL", + "--coder-model", default=None, metavar="MODEL", help="Coder 에이전트 모델만 변경", ) agent_group.add_argument( @@ -353,7 +427,14 @@ def main(argv: list[str] | None = None) -> int: pipe_group = run_parser.add_argument_group("파이프라인") pipe_group.add_argument( "--preset", default=None, - choices=["simple", "cross-review", "review-only", "review-fix"], + choices=[ + "simple", + "cross-review", + "plan-review", + "review-only", + "review-fix", + "coding-review-fix", + ], help="파이프라인 종류 (기본: simple). 각 종류 설명은 아래 참조", ) pipe_group.add_argument( @@ -400,6 +481,10 @@ def main(argv: list[str] | None = None) -> int: if args.command == "init": return cmd_init(args) + elif args.command == "doctor": + return cmd_doctor(args) + elif args.command == "demo": + return cmd_demo(args) elif args.command == "run": return cmd_run(args) else: @@ -407,9 +492,186 @@ def main(argv: list[str] | None = None) -> int: return 0 +def cmd_doctor(args: argparse.Namespace) -> int: + """Run environment health checks.""" + from cross_eval.doctor import format_doctor_results, run_doctor + + checks = run_doctor(args.dir.resolve()) + print(format_doctor_results(checks)) + + has_critical = any(not c.passed and c.critical for c in checks) + return 1 if has_critical else 0 + + +def cmd_demo(args: argparse.Namespace) -> int: + """Run a built-in demo to show the pipeline lifecycle.""" + from cross_eval.demo import run_live_demo, run_mock_demo + + if args.live: + print("\n⚠ --live 모드: 실제 AI 에이전트를 호출합니다 (API 비용 발생).") + print(" 내장 피보나치 함수 기획서를 사용합니다.\n") + try: + answer = input("계속하시겠습니까? [y/N] ").strip().lower() + except (EOFError, KeyboardInterrupt): + print("\n취소됨.") + return 0 + if answer not in ("y", "yes"): + print("취소됨.") + return 0 + + try: + raw_timeout = args.timeout if args.timeout is not None else 0 + agent_timeout = None if raw_timeout == 0 else raw_timeout + result = run_live_demo(preset=args.preset, timeout=agent_timeout) + print(f"\nResult: {result.final_verdict}") + print(f"Iterations: {len(result.iterations)}") + if result.run_dir: + print(f"Output: {result.run_dir}/") + return 0 + except (RuntimeError, KeyboardInterrupt) as e: + if isinstance(e, KeyboardInterrupt): + print("\nInterrupted.") + return 130 + print(f"Demo error: {e}", file=sys.stderr) + return 1 + else: + run_mock_demo(preset=args.preset, show_escalate=args.escalate) + return 0 + + +# --------------------------------------------------------------------------- +# Guided init wizard +# --------------------------------------------------------------------------- + +_PRESET_DESCRIPTIONS = { + "simple": "코딩 + 리뷰 (가장 기본)", + "review-fix": "리뷰 → 취합 → 수정 → 재검증 반복", + "coding-review-fix": "초기 코딩 + 리뷰 수렴 반복", + "plan-review": "구현 전 기획서/문서 검토", + "review-only": "기존 코드만 리뷰 (코딩 없음)", + "cross-review": "2명이 각각 구현 후 교차 리뷰", +} + +_PRESET_ORDER = [ + "simple", "review-fix", "coding-review-fix", + "plan-review", "review-only", "cross-review", +] + + +def _prompt_choice( + message: str, + choices: list[str], + descriptions: dict[str, str] | None = None, + default: int = 1, +) -> str: + """Prompt user to pick from a numbered list.""" + print(f"\n{message}") + for i, choice in enumerate(choices, 1): + desc = f" — {descriptions[choice]}" if descriptions and choice in descriptions else "" + marker = " (기본)" if i == default else "" + print(f" {i}. {choice}{desc}{marker}") + + while True: + try: + raw = input(f"선택 [{default}]: ").strip() + except (EOFError, KeyboardInterrupt): + print() + return choices[default - 1] + if not raw: + return choices[default - 1] + try: + idx = int(raw) + if 1 <= idx <= len(choices): + return choices[idx - 1] + except ValueError: + if raw in choices: + return raw + print(f" 1-{len(choices)} 사이 숫자를 입력하세요.") + + +def _prompt_text(message: str, default: str = "") -> str: + """Prompt for text input with default.""" + suffix = f" [{default}]" if default else "" + try: + raw = input(f"{message}{suffix}: ").strip() + except (EOFError, KeyboardInterrupt): + print() + return default + return raw or default + + +def _run_guided_init(target: Path) -> dict: + """Interactive setup wizard. Returns settings dict.""" + print("\n━━━ cross-eval 설정 마법사 ━━━\n") + + lang = _prompt_choice( + "언어 / Language:", + ["ko", "en"], + {"ko": "한국어", "en": "English"}, + default=1, + ) + + preset = _prompt_choice( + "파이프라인 종류:", + _PRESET_ORDER, + _PRESET_DESCRIPTIONS, + default=1, + ) + + print("\n--- 에이전트 설정 ---") + print(" 사용 가능: claude, codex (또는 claude-coder, codex-reviewer 등)") + + coder = _prompt_text(" Coder 에이전트", default="claude") + reviewer = _prompt_text(" Reviewer 에이전트", default="claude") + + needs_senior = preset in ("review-fix", "coding-review-fix") + senior = "" + if needs_senior: + senior = _prompt_text(" Senior 에이전트", default=reviewer) + else: + senior = _prompt_text(" Senior 에이전트 (선택, Enter로 건너뛰기)", default="") + + max_iter = _prompt_text("최대 반복 횟수", default="3") + try: + max_iter_int = int(max_iter) + except ValueError: + max_iter_int = 3 + + create_templates = _prompt_text( + "\n템플릿 파일(plan.md, checklist.md) 생성?", default="Y", + ).lower() in ("y", "yes", "") + + return { + "lang": lang, + "preset": preset, + "coder": coder, + "reviewer": reviewer, + "senior": senior, + "max_iter": max_iter_int, + "create_templates": create_templates, + } + + def cmd_init(args: argparse.Namespace) -> int: """Scaffold a new cross-eval project.""" target = args.dir.resolve() + + if args.guided: + settings = _run_guided_init(target) + args.lang = settings["lang"] + args.preset = settings["preset"] + # We'll use guided settings for enhanced config generation + return _write_init_files(target, args, guided_settings=settings) + + return _write_init_files(target, args) + + +def _write_init_files( + target: Path, + args: argparse.Namespace, + guided_settings: dict | None = None, +) -> int: + """Write config and template files to target directory.""" ce_dir = target / ".cross-eval" ce_dir.mkdir(parents=True, exist_ok=True) @@ -417,14 +679,23 @@ def cmd_init(args: argparse.Namespace) -> int: plan_sample = PLAN_SAMPLE_KO if lang == "ko" else PLAN_SAMPLE_EN checklist_sample = CHECKLIST_SAMPLE_KO if lang == "ko" else CHECKLIST_SAMPLE_EN - files = { - ".cross-eval/config.yaml": DEFAULT_CONFIG_YAML.format( + # Generate config content + if guided_settings: + config_content = _generate_guided_config(args.preset, lang, guided_settings) + else: + config_content = DEFAULT_CONFIG_YAML.format( preset=args.preset, language=lang, - ), - ".cross-eval/plan.md": plan_sample, - ".cross-eval/checklist.md": checklist_sample, + ) + + files: dict[str, str] = { + ".cross-eval/config.yaml": config_content, } + # Add templates unless guided mode opted out + if not guided_settings or guided_settings.get("create_templates", True): + files[".cross-eval/plan.md"] = plan_sample + files[".cross-eval/checklist.md"] = checklist_sample + created = [] skipped = [] for name, content in files.items(): @@ -436,23 +707,67 @@ def cmd_init(args: argparse.Namespace) -> int: created.append(name) if created: - print(f" 생성: {', '.join(created)}") + print(f"\n 생성: {', '.join(created)}") if skipped: print(f" 이미 존재 (건너뜀): {', '.join(skipped)}") print(f"\n 파이프라인: {args.preset}") print(f" 언어: {lang}") + if guided_settings: + print(f" Coder: {guided_settings['coder']}") + print(f" Reviewer: {guided_settings['reviewer']}") + if guided_settings.get("senior"): + print(f" Senior: {guided_settings['senior']}") + print(f" 최대 반복: {guided_settings['max_iter']}") print("") print("다음 단계:") print(" 1. .cross-eval/plan.md 에 기획서 작성") print(" 2. .cross-eval/checklist.md 에 체크리스트 작성 (선택)") print(" 3. cross-eval run 으로 실행") print("") - print("주의: 에이전트는 기본적으로 파일 읽기/쓰기/실행 권한을 가집니다.") - print(" 실행 전에 .cross-eval/config.yaml 을 확인하세요.") + print("팁: cross-eval doctor 로 환경 점검을 먼저 하세요.") + print(" cross-eval demo 로 동작 방식을 미리 볼 수 있습니다.") return 0 +def _generate_guided_config( + preset: str, + lang: str, + settings: dict, +) -> str: + """Generate config.yaml content from guided init settings.""" + coder_name = resolve_agent_shorthand(settings["coder"], "coder") + reviewer_name = resolve_agent_shorthand(settings["reviewer"], "reviewer") + + lines = [ + "# cross-eval 설정 (guided init으로 생성됨)", + "", + "inputs:", + " plan: plan.md", + " checklist: checklist.md", + "", + f"coders: [{coder_name}]", + f"reviewers: [{reviewer_name}]", + ] + + senior = settings.get("senior", "") + if senior: + senior_name = resolve_agent_shorthand(senior, "senior") + lines.append(f"seniors: [{senior_name}]") + + lines.extend([ + "", + f"pipeline: preset:{preset}", + "", + f"max_iterations: {settings['max_iter']}", + f"language: {lang}", + "output_dir: output", + "", + ]) + + return "\n".join(lines) + "\n" + + def _read_docs_dir(docs_dir: Path) -> str: """Read all files in a directory and concatenate with filename headers.""" parts: list[str] = [] @@ -482,6 +797,16 @@ def _apply_model_override(config, agent_name: str, model: str) -> None: agent.args = new_args +def _apply_phased_iteration_override(config, max_iter: int | None) -> None: + """Apply CLI max-iter to converging phases while preserving setup phases.""" + if max_iter is None: + return + + for phase in config.phases: + if any(step.verdict for step in phase.steps): + phase.max_iterations = max_iter + + def cmd_run(args: argparse.Namespace) -> int: """Load config, validate, and execute the pipeline.""" from cross_eval.config import ( @@ -562,7 +887,7 @@ def cmd_run(args: argparse.Namespace) -> int: preset = args.preset or "simple" # Determine which preset was configured (from YAML or defaults) if args.preset is None and config.phases: - preset = "review-fix" # only phased preset currently + preset = config.preset_name if config.preset_name != "custom" else "review-fix" elif args.preset is None and not args.coders and not args.reviewers and not args.seniors: pass # no changes needed inferred_coders, inferred_reviewers, inferred_seniors = _infer_roles( @@ -584,11 +909,12 @@ def cmd_run(args: argparse.Namespace) -> int: config.preset_name = preset if preset in PHASED_PRESETS: config.phases = PHASED_PRESETS[preset](coders, reviewers, seniors) + _apply_phased_iteration_override(config, args.max_iter) config.pipeline = [] elif preset in PIPELINE_PRESETS: config.pipeline = PIPELINE_PRESETS[preset](coders, reviewers, seniors) config.phases = [] - if preset == "review-only" and args.max_iter is None and args.min_iter is None: + if preset in {"plan-review", "review-only"} and args.max_iter is None and args.min_iter is None: config.max_iterations = 1 apply_reasoning_effort_settings( @@ -603,10 +929,10 @@ def cmd_run(args: argparse.Namespace) -> int: if args.model is not None: for agent_name in config.agents: _apply_model_override(config, agent_name, args.model) - # --generator-model / --reviewer-model: apply by role - if args.generator_model is not None: + # --coder-model / --reviewer-model: apply by role + if args.coder_model is not None: for coder_name in config.coders: - _apply_model_override(config, coder_name, args.generator_model) + _apply_model_override(config, coder_name, args.coder_model) if args.reviewer_model is not None: for reviewer_name in config.reviewers: _apply_model_override(config, reviewer_name, args.reviewer_model) @@ -694,6 +1020,11 @@ def cmd_run(args: argparse.Namespace) -> int: if not args.dry_run and result.run_dir: print(f"Output: {result.run_dir}/") + if result.final_verdict == "ESCALATE": + from cross_eval.report import print_escalation_report + print_escalation_report(config, result) + return 2 + return 0 if result.final_verdict == "PASS" else 1 diff --git a/cross_eval/config.py b/cross_eval/config.py index ad5c620..c9751f8 100644 --- a/cross_eval/config.py +++ b/cross_eval/config.py @@ -39,6 +39,26 @@ _CODEX_ARGS = [ "-", ] +_CLAUDE_BASE_ARGS = [ + "-p", + "--setting-sources", + "user", + "--disable-slash-commands", + "--model", + "opus", +] + +_CLAUDE_CODER_ARGS = list(_CLAUDE_BASE_ARGS) + [ + "--dangerously-skip-permissions", + "--permission-mode", + "bypassPermissions", +] + +_CLAUDE_REVIEW_ARGS = list(_CLAUDE_BASE_ARGS) + [ + "--permission-mode", + "plan", +] + _CODER_SYSTEM_PROMPT = ( "You are a senior software engineer implementing code changes.\n" "Rules:\n" @@ -81,29 +101,37 @@ _SENIOR_SYSTEM_PROMPT = ( "4. Be skeptical of false positives, but do not lower the bar on real requirement " "gaps.\n" "5. When issues remain, produce a concise prioritized action list the coder can act on.\n" - "6. Do NOT invent new requirements beyond the plan and checklist.\n" - "7. End with VERDICT: PASS or VERDICT: FAIL." + "6. Maintain an Issue Tracker table across iterations to track issue status.\n" + "7. Do NOT invent new requirements beyond the plan and checklist.\n" + "8. End with one of three verdicts:\n" + " - VERDICT: PASS — all requirements met, no issues remain.\n" + " - VERDICT: FAIL — issues found that the coder can fix.\n" + " - VERDICT: ESCALATE — issues that require human intervention. Use ESCALATE when:\n" + " * Requirements are ambiguous and need clarification from stakeholders\n" + " * Architecture decisions are needed that go beyond the plan scope\n" + " * External dependency issues block progress\n" + " * The coder has failed to resolve the same issue 2+ times" ) BUILTIN_AGENTS: dict[str, AgentConfig] = { "claude-coder": AgentConfig( name="claude-coder", command="claude", - args=["-p", "--model", "opus", "--permission-mode", "auto"], + args=list(_CLAUDE_CODER_ARGS), system_prompt=_CODER_SYSTEM_PROMPT, reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["coder"], ), "claude-reviewer": AgentConfig( name="claude-reviewer", command="claude", - args=["-p", "--model", "opus", "--permission-mode", "auto"], + args=list(_CLAUDE_REVIEW_ARGS), system_prompt=_REVIEWER_SYSTEM_PROMPT, reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["reviewer"], ), "claude-senior": AgentConfig( name="claude-senior", command="claude", - args=["-p", "--model", "opus", "--permission-mode", "auto"], + args=list(_CLAUDE_REVIEW_ARGS), system_prompt=_SENIOR_SYSTEM_PROMPT, reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["senior"], ), @@ -136,6 +164,11 @@ _AGENT_ALIASES: dict[str, str] = { "codex": "codex", } +_ROLE_ALIASES: dict[str, str] = { + "coding": "coding", + "review": "review", +} + def resolve_agent_shorthand(name: str, role: str) -> str: """Resolve shorthand agent name to full builtin name. @@ -150,6 +183,16 @@ def resolve_agent_shorthand(name: str, role: str) -> str: return name +def normalize_step_role(role: str) -> str: + """Normalize step role aliases to the canonical role name.""" + return _ROLE_ALIASES.get(role, role) + + +def normalize_prompt_template(template_ref: str) -> str: + """Normalize prompt template aliases to canonical template refs.""" + return template_ref + + # --------------------------------------------------------------------------- # Role inference (backward compatibility) # --------------------------------------------------------------------------- @@ -233,7 +276,7 @@ def _default_seniors_for_preset( """Infer a default senior agent for presets that benefit from adjudication.""" if not ( isinstance(pipeline_raw, str) - and pipeline_raw == "preset:review-fix" + and pipeline_raw in {"preset:review-fix", "preset:coding-review-fix"} and reviewers ): return [] @@ -465,7 +508,7 @@ def _resolve_pipeline( """Resolve pipeline from preset string or explicit step list. Returns (steps, phases) tuple. Only one will be non-empty. - - Simple/cross-review/review-only → steps populated, phases empty. + - Simple/cross-review/plan-review/review-only → steps populated, phases empty. - Phased presets (review-fix) → steps empty, phases populated. """ # Preset: "preset:simple" or "preset:review-fix" @@ -485,11 +528,15 @@ def _resolve_pipeline( if isinstance(pipeline_raw, list): steps = [] for step_data in pipeline_raw: + raw_role = step_data.get("role", "coding") + normalized_role = normalize_step_role(raw_role) steps.append(StepConfig( name=step_data["name"], agent=step_data["agent"], - role=step_data.get("role", "generate"), - prompt_template=step_data.get("prompt_template", f"default:{step_data.get('role', 'generate')}"), + role=normalized_role, + prompt_template=normalize_prompt_template( + step_data.get("prompt_template", f"default:{normalized_role}") + ), output_key=step_data["output_key"], verdict=step_data.get("verdict", False), verdict_pattern=step_data.get("verdict_pattern", r"VERDICT:\s*PASS"), @@ -524,10 +571,6 @@ def validate_config(config: PipelineConfig) -> list[str]: errors, scope=f"Phase '{phase.name}'", ) - if not any(s.verdict for s in phase.steps): - errors.append( - f"Phase '{phase.name}' must have at least one step with verdict: true" - ) # Validate verdict patterns for step in phase.steps: if step.verdict: diff --git a/cross_eval/demo.py b/cross_eval/demo.py new file mode 100644 index 0000000..f02ce9f --- /dev/null +++ b/cross_eval/demo.py @@ -0,0 +1,282 @@ +"""Built-in demo for cross-eval — lets new users see the full lifecycle.""" +from __future__ import annotations + +import sys +import time +from pathlib import Path + +from cross_eval.models import PipelineConfig, PipelineResult + + +# --------------------------------------------------------------------------- +# Built-in demo plan & checklist +# --------------------------------------------------------------------------- + +DEMO_PLAN = """\ +# Demo: Fibonacci Function + +## Objective +Implement a `fibonacci(n)` function in Python. + +## Requirements +1. `fibonacci(0)` returns `0`, `fibonacci(1)` returns `1`. +2. For `n >= 2`, return the sum of the two preceding values. +3. Raise `ValueError` for negative `n`. +4. Use an iterative approach (not recursive). + +## Constraints +- Single file: `fib.py` +- No external dependencies. +""" + +DEMO_CHECKLIST = """\ +# Demo Checklist +- [ ] fibonacci(0) → 0 +- [ ] fibonacci(1) → 1 +- [ ] fibonacci(10) → 55 +- [ ] fibonacci(-1) raises ValueError +- [ ] Iterative implementation (no recursion) +- [ ] No unnecessary abstractions +""" + +# --------------------------------------------------------------------------- +# Mock outputs (realistic-looking) +# --------------------------------------------------------------------------- + +_MOCK_CODING_V1 = """\ +I'll implement the fibonacci function in `fib.py`. + +```python +# fib.py + +def fibonacci(n: int) -> int: + \"\"\"Return the nth Fibonacci number using iteration.\"\"\" + if n < 0: + return -1 # invalid input + if n <= 1: + return n + a, b = 0, 1 + for _ in range(2, n + 1): + a, b = b, a + b + return b +``` + +Created `fib.py` with the iterative fibonacci function. +""" + +_MOCK_REVIEW_V1 = """\ +### Previous Feedback Assessment +N/A — first iteration. + +### Issues Found +- ISS-001 [Major][Omission] Negative input should raise `ValueError`, \ +but implementation returns `-1` instead. (Requirement 3: "Raise ValueError for negative n") + +### Out of Scope Issues +None + +### Summary +- Critical: 0, Major: 1, Minor: 0 +- Over-engineering count: 0 +- Omission count: 1 +- CONFIRMED: 0, DISMISSED: 0 +- Overall quality: Good structure, one requirement gap. + +### Verdict +VERDICT: FAIL +""" + +_MOCK_CODING_V2 = """\ +Fixing the negative input handling per review feedback (ISS-001). + +```python +# fib.py + +def fibonacci(n: int) -> int: + \"\"\"Return the nth Fibonacci number using iteration.\"\"\" + if n < 0: + raise ValueError(f"n must be non-negative, got {n}") + if n <= 1: + return n + a, b = 0, 1 + for _ in range(2, n + 1): + a, b = b, a + b + return b +``` + +Updated `fib.py`: negative input now raises `ValueError`. +""" + +_MOCK_REVIEW_V2 = """\ +### Previous Feedback Assessment +- DISMISSED (false positive): None +- CONFIRMED: None — ISS-001 has been fixed. + +### Issues Found +None — all checklist items satisfied. + +### Out of Scope Issues +None + +### Summary +- Critical: 0, Major: 0, Minor: 0 +- Over-engineering count: 0 +- Omission count: 0 +- CONFIRMED: 0, DISMISSED: 0 +- Overall quality: All requirements met, clean implementation. + +### Verdict +VERDICT: PASS +""" + +_MOCK_STEPS = [ + # (iteration, step_name, agent, duration, output_chars, verdict, output) + (1, "coding", "claude-coder", 2.1, 347, None, _MOCK_CODING_V1), + (1, "review", "claude-reviewer", 1.8, 423, "FAIL", _MOCK_REVIEW_V1), + (2, "coding", "claude-coder", 2.3, 382, None, _MOCK_CODING_V2), + (2, "review", "claude-reviewer", 1.5, 312, "PASS", _MOCK_REVIEW_V2), +] + +_MOCK_ESCALATE_REVIEW = """\ +### Issues Found +- ISS-001 [Critical][Omission] Requirements are ambiguous: "iterative approach" is unclear — \ +does this exclude memoization? The plan needs clarification from stakeholders. + +### Verdict +VERDICT: ESCALATE +""" + +_MOCK_ESCALATE_STEPS = [ + (1, "coding", "claude-coder", 2.1, 347, None, _MOCK_CODING_V1), + (1, "review", "claude-reviewer", 1.8, 520, "ESCALATE", _MOCK_ESCALATE_REVIEW), +] + + +# --------------------------------------------------------------------------- +# Mock demo runner +# --------------------------------------------------------------------------- + +DIM = "\033[2m" +BOLD = "\033[1m" +GREEN = "\033[32m" +RED = "\033[31m" +YELLOW = "\033[33m" +CYAN = "\033[36m" +RESET = "\033[0m" + + +def run_mock_demo(preset: str = "simple", show_escalate: bool = False) -> None: + """Run a simulated demo showing the full pipeline lifecycle.""" + steps = _MOCK_ESCALATE_STEPS if show_escalate else _MOCK_STEPS + + print(f"\n{BOLD}=== cross-eval demo (mock) ==={RESET}") + print(f"{DIM}Preset: {preset} | Coder: claude-coder | Reviewer: claude-reviewer{RESET}") + print(f"{DIM}Plan: fibonacci function | Max iterations: 3{RESET}\n") + + current_iter = 0 + for iteration, step_name, agent, duration, chars, verdict, output in steps: + if iteration != current_iter: + current_iter = iteration + print(f"{BOLD}{'━' * 50}") + print(f" Iteration {iteration}/3") + print(f"{'━' * 50}{RESET}") + + # Simulate running + sys.stdout.write(f" ⠋ [{step_name}] {agent} running...") + sys.stdout.flush() + time.sleep(0.5) + sys.stdout.write(f"\r {GREEN}✓{RESET} [{step_name}] {agent} — {chars} chars ({duration}s)\n") + + if verdict: + if verdict == "PASS": + color = GREEN + elif verdict == "ESCALATE": + color = YELLOW + else: + color = RED + print(f" {color}{BOLD}Verdict: {verdict}{RESET}") + + if verdict == "FAIL": + # Show key feedback + print(f" {DIM}Feedback: ISS-001 [Major] Negative input returns -1 instead of ValueError{RESET}") + elif verdict == "ESCALATE": + print(f" {YELLOW}Reason: Requirements need clarification from stakeholders{RESET}") + + print() + + # Final result + if show_escalate: + final = "ESCALATE" + color = YELLOW + else: + final = "PASS" + color = GREEN + + print(f"{BOLD}Result: {color}{final}{RESET}") + print(f"Iterations: {current_iter}") + + if show_escalate: + print(f"\n{RED}{BOLD}{'=' * 50}") + print(f" Escalation Report") + print(f"{'=' * 50}{RESET}") + print(f"{YELLOW}Human review required.{RESET}") + print(f" {RED}•{RESET} Requirements are ambiguous — needs stakeholder clarification") + print(f"{RED}{BOLD}{'=' * 50}{RESET}") + + print(f"\n{DIM}This was a mock demo. To run with real agents:{RESET}") + print(f"{DIM} cross-eval demo --live{RESET}") + print(f"{DIM} cross-eval run --plan plan.md{RESET}\n") + + +def run_live_demo( + preset: str = "simple", + timeout: int | None = None, +) -> PipelineResult: + """Run a live demo with real agents using the built-in plan.""" + import tempfile + + from cross_eval.config import ( + BUILTIN_AGENTS, + _resolve_agents, + apply_reasoning_effort_settings, + ) + from cross_eval.pipeline import run_pipeline + from cross_eval.prompts import PHASED_PRESETS, PIPELINE_PRESETS + + coders = ["claude-coder"] + reviewers = ["claude-reviewer"] + seniors: list[str] = [] + agents = _resolve_agents(dict(BUILTIN_AGENTS), coders, reviewers, seniors) + + if preset in PIPELINE_PRESETS: + pipeline = PIPELINE_PRESETS[preset](coders, reviewers, seniors) + phases = [] + elif preset in PHASED_PRESETS: + pipeline = [] + phases = PHASED_PRESETS[preset](coders, reviewers, seniors) + else: + pipeline = PIPELINE_PRESETS["simple"](coders, reviewers, seniors) + phases = [] + + with tempfile.TemporaryDirectory() as tmpdir: + plan_path = Path(tmpdir) / "plan.md" + checklist_path = Path(tmpdir) / "checklist.md" + plan_path.write_text(DEMO_PLAN, encoding="utf-8") + checklist_path.write_text(DEMO_CHECKLIST, encoding="utf-8") + + config = PipelineConfig( + output_dir=Path("output"), + max_iterations=3, + language="en", + inputs={"plan": plan_path, "checklist": checklist_path}, + agents=agents, + coders=coders, + reviewers=reviewers, + seniors=seniors, + pipeline=pipeline, + phases=phases, + preset_name=f"demo-{preset}", + ) + apply_reasoning_effort_settings(config) + + return run_pipeline(config, timeout=timeout) diff --git a/cross_eval/doctor.py b/cross_eval/doctor.py new file mode 100644 index 0000000..fc50c38 --- /dev/null +++ b/cross_eval/doctor.py @@ -0,0 +1,200 @@ +"""Environment health checks for cross-eval.""" +from __future__ import annotations + +import shutil +import subprocess +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + + +@dataclass +class DoctorCheck: + """Result of a single health check.""" + + name: str + passed: bool + critical: bool + message: str + detail: Optional[str] = None + + +def check_cli_installed(command: str) -> tuple[bool, str]: + """Check if a CLI tool is on PATH and get its version.""" + path = shutil.which(command) + if not path: + return False, f"'{command}' not found on PATH" + + try: + result = subprocess.run( + [command, "--version"], + capture_output=True, + text=True, + timeout=10, + ) + version = (result.stdout.strip() or result.stderr.strip()).split("\n")[0] + return True, version or "(version unknown)" + except (subprocess.TimeoutExpired, OSError): + return True, "(installed but version check failed)" + + +def check_cli_authenticated(command: str) -> tuple[bool, str]: + """Check if a CLI tool is authenticated by running a minimal probe.""" + path = shutil.which(command) + if not path: + return False, "not installed" + + if command == "claude": + try: + result = subprocess.run( + [command, "-p", "--model", "haiku", "--max-turns", "1"], + input="respond with just 'ok'", + capture_output=True, + text=True, + timeout=30, + ) + combined = result.stdout + result.stderr + if any(kw in combined.lower() for kw in ( + "not logged in", "login", "unauthorized", "unauthenticated", + "api key", "invalid key", + )): + return False, "not authenticated — run: claude login" + if result.returncode == 0: + return True, "authenticated" + return False, f"exit code {result.returncode}: {combined[:100]}" + except subprocess.TimeoutExpired: + return False, "timed out (30s) — possible network issue" + except OSError as e: + return False, str(e) + + elif command == "codex": + try: + result = subprocess.run( + [command, "--version"], + capture_output=True, + text=True, + timeout=10, + ) + combined = result.stdout + result.stderr + if any(kw in combined.lower() for kw in ( + "not logged in", "login", "unauthorized", "api key", + )): + return False, "not authenticated — run: codex login" + return True, "installed (auth check: codex login if needed)" + except (subprocess.TimeoutExpired, OSError) as e: + return False, str(e) + + return False, f"unknown command: {command}" + + +def check_config(directory: Path) -> tuple[bool, Optional[Path], list[str]]: + """Check if config.yaml exists and is valid.""" + config_path = directory / ".cross-eval" / "config.yaml" + if not config_path.exists(): + return False, None, [] + + try: + from cross_eval.config import load_config + load_config(config_path) + return True, config_path, [] + except (ValueError, FileNotFoundError) as e: + return False, config_path, [str(e)] + + +def run_doctor(directory: Path) -> list[DoctorCheck]: + """Run all health checks and return results.""" + checks: list[DoctorCheck] = [] + + # 1. claude CLI + installed, version = check_cli_installed("claude") + checks.append(DoctorCheck( + name="claude CLI", + passed=installed, + critical=True, + message=version if installed else "not found", + detail="Install: https://docs.anthropic.com/en/docs/claude-code" if not installed else None, + )) + + if installed: + auth_ok, auth_msg = check_cli_authenticated("claude") + checks.append(DoctorCheck( + name="claude auth", + passed=auth_ok, + critical=True, + message=auth_msg, + )) + + # 2. codex CLI + installed, version = check_cli_installed("codex") + checks.append(DoctorCheck( + name="codex CLI", + passed=installed, + critical=False, + message=version if installed else "not found (optional)", + detail="Install: https://github.com/openai/codex" if not installed else None, + )) + + if installed: + auth_ok, auth_msg = check_cli_authenticated("codex") + checks.append(DoctorCheck( + name="codex auth", + passed=auth_ok, + critical=False, + message=auth_msg, + )) + + # 3. Config + config_ok, config_path, config_errors = check_config(directory) + if config_path is None: + checks.append(DoctorCheck( + name="config", + passed=True, # not having config is fine + critical=False, + message="no .cross-eval/config.yaml (will use defaults)", + detail="Run: cross-eval init", + )) + elif config_ok: + checks.append(DoctorCheck( + name="config", + passed=True, + critical=False, + message=f"valid ({config_path.name})", + )) + else: + checks.append(DoctorCheck( + name="config", + passed=False, + critical=True, + message="invalid config", + detail="\n".join(config_errors), + )) + + return checks + + +def format_doctor_results(checks: list[DoctorCheck]) -> str: + """Format doctor check results for terminal output.""" + lines: list[str] = [] + lines.append("\n cross-eval doctor\n") + + for check in checks: + icon = " ✓" if check.passed else " ✗" + lines.append(f"{icon} {check.name}: {check.message}") + if check.detail and not check.passed: + for detail_line in check.detail.split("\n"): + lines.append(f" {detail_line}") + + # Summary + failed_critical = [c for c in checks if not c.passed and c.critical] + failed_warn = [c for c in checks if not c.passed and not c.critical] + + lines.append("") + if not failed_critical and not failed_warn: + lines.append(" All checks passed!") + elif failed_critical: + lines.append(f" {len(failed_critical)} critical issue(s) found.") + else: + lines.append(f" {len(failed_warn)} warning(s), no critical issues.") + + lines.append("") + return "\n".join(lines) diff --git a/cross_eval/models.py b/cross_eval/models.py index f5d10a9..8fa29ad 100644 --- a/cross_eval/models.py +++ b/cross_eval/models.py @@ -24,7 +24,7 @@ class StepConfig: name: str agent: str # reference to agents key - role: str # "generate" or "review" + role: str # "coding" or "review" prompt_template: str # "default:" or file path output_key: str verdict: bool = False @@ -105,6 +105,7 @@ class IterationResult: phase_name: Optional[str] = None repeated_aggregate_warning: Optional[str] = None review_metrics: Optional[ReviewMetrics] = None + escalated_issues: Optional[str] = None @dataclass @@ -116,3 +117,4 @@ class PipelineResult: total_duration: float = 0.0 run_dir: Optional[Path] = None repeated_aggregate_warnings: list[str] = field(default_factory=list) + escalated_issues: list[str] = field(default_factory=list) diff --git a/cross_eval/pipeline.py b/cross_eval/pipeline.py index 28086a3..7981cfe 100644 --- a/cross_eval/pipeline.py +++ b/cross_eval/pipeline.py @@ -10,7 +10,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path -from cross_eval.agent import invoke_agent +from cross_eval.agent import AgentInvocationError, invoke_agent from cross_eval.config import try_reload_config from cross_eval.models import ( AgentResult, @@ -68,6 +68,8 @@ def _run_simple_pipeline( final_verdict = "MAX_ITERATIONS_REACHED" aggregate_history: dict[str, int] = {} aggregate_warnings: list[str] = [] + escalated_issues: list[str] = [] + all_feedbacks: list[str] = [] for i in range(1, config.max_iterations + 1): config = try_reload_config(config) @@ -100,8 +102,34 @@ def _run_simple_pipeline( iter_result.feedback = _collect_feedback(config.pipeline, step_outputs) feedback = iter_result.feedback or feedback + all_feedbacks.append(feedback) + + # Extract tracker from verdict/review steps for next iteration + for step in config.pipeline: + if step.verdict or step.role == "review": + tracker = _extract_senior_tracker( + step_outputs.get(step.output_key, ""), + ) + if tracker: + input_contents["previous_senior_tracker"] = tracker + iterations.append(iter_result) + # ESCALATE check (highest priority) + if verdict == "ESCALATE": + final_verdict = "ESCALATE" + # Extract escalation details from verdict step outputs + for step in config.pipeline: + if step.verdict: + esc = _extract_escalated_issues( + step_outputs.get(step.output_key, ""), + ) + if esc: + escalated_issues.append(esc) + iter_result.escalated_issues = esc + logger.info(" ESCALATE at iteration %d — stopping loop.", i) + break + if verdict == "PASS": final_verdict = "PASS" if i >= config.min_iterations: @@ -113,6 +141,26 @@ def _run_simple_pipeline( i, config.min_iterations, ) + # Auto-escalate: no senior/aggregator + repeated FAIL + has_aggregator = config.seniors or any( + s.prompt_template == "default:aggregate-review" for s in config.pipeline + ) + if ( + verdict == "FAIL" + and not has_aggregator + and i >= 2 + and _detect_auto_escalate(all_feedbacks[:-1], feedback) + ): + final_verdict = "ESCALATE" + auto_msg = ( + f"Auto-escalated: same issues detected across {i} iterations " + f"without resolution (no senior reviewer configured)." + ) + escalated_issues.append(auto_msg) + iter_result.escalated_issues = auto_msg + logger.info(" AUTO-ESCALATE at iteration %d", i) + break + if dry_run: logger.info(" (dry-run: stopping after iteration 1)") break @@ -125,6 +173,7 @@ def _run_simple_pipeline( total_duration=round(total_duration, 1), run_dir=run_dir, repeated_aggregate_warnings=aggregate_warnings, + escalated_issues=escalated_issues, ) if not dry_run: @@ -154,8 +203,14 @@ def _run_phased_pipeline( global_iter = 0 aggregate_history_by_phase: dict[str, dict[str, int]] = {} aggregate_warnings: list[str] = [] + escalated_issues: list[str] = [] + all_feedbacks: list[str] = [] + escalated = False for phase_idx, phase in enumerate(config.phases): + if escalated: + break + logger.info("=" * 60) logger.info( " Phase: %s (max_iter=%d, consecutive_pass=%d)", @@ -205,8 +260,45 @@ def _run_phased_pipeline( iter_result.feedback = _collect_feedback(phase.steps, step_outputs) feedback = iter_result.feedback or feedback + all_feedbacks.append(feedback) + + # Extract tracker from verdict/review steps + for step in phase.steps: + if step.verdict or step.role == "review": + tracker = _extract_senior_tracker( + step_outputs.get(step.output_key, ""), + ) + if tracker: + input_contents["previous_senior_tracker"] = tracker + iterations.append(iter_result) + # ESCALATE check + if verdict == "ESCALATE": + final_verdict = "ESCALATE" + for step in phase.steps: + if step.verdict: + esc = _extract_escalated_issues( + step_outputs.get(step.output_key, ""), + ) + if esc: + escalated_issues.append(esc) + iter_result.escalated_issues = esc + logger.info( + " [%s] ESCALATE at iteration %d — stopping.", + phase.name, pi, + ) + escalated = True + break + + if verdict is None: + logger.info( + " [%s] completed (no verdict step; single-pass phase)", + phase.name, + ) + phase_converged = True + break + if verdict == "PASS": consecutive_passes += 1 logger.info( @@ -223,9 +315,33 @@ def _run_phased_pipeline( else: consecutive_passes = 0 + # Auto-escalate in phased pipeline + has_aggregator = config.seniors or any( + s.prompt_template == "default:aggregate-review" for s in phase.steps + ) + if ( + verdict == "FAIL" + and not has_aggregator + and pi >= 2 + and _detect_auto_escalate(all_feedbacks[:-1], feedback) + ): + final_verdict = "ESCALATE" + auto_msg = ( + f"Auto-escalated: same issues detected across {pi} iterations " + f"in phase '{phase.name}' without resolution." + ) + escalated_issues.append(auto_msg) + iter_result.escalated_issues = auto_msg + logger.info(" [%s] AUTO-ESCALATE at iteration %d", phase.name, pi) + escalated = True + break + if dry_run: break + if escalated: + break + if phase_converged: logger.info(" Phase '%s' completed: CONVERGED", phase.name) else: @@ -245,6 +361,7 @@ def _run_phased_pipeline( total_duration=round(total_duration, 1), run_dir=run_dir, repeated_aggregate_warnings=aggregate_warnings, + escalated_issues=escalated_issues, ) if not dry_run: @@ -373,15 +490,17 @@ def _run_steps( run_dir=run_dir, output_iter=output_iter, phase_name=phase_name, ) - # Extract verdict from all verdict steps (ALL must PASS) + # Extract verdict from all verdict steps (ALL must PASS; ESCALATE wins over all) for step in steps: if step.verdict: output = step_outputs.get(step.output_key, "") step_verdict = _extract_verdict(output, step.verdict_pattern) logger.info(" [%s] verdict: %s", step.name, step_verdict) - if verdict is None: + if step_verdict == "ESCALATE": + verdict = "ESCALATE" + elif verdict is None: verdict = step_verdict - elif step_verdict == "FAIL": + elif verdict != "ESCALATE" and step_verdict == "FAIL": verdict = "FAIL" return step_outputs, step_results, verdict @@ -466,10 +585,11 @@ def _execute_step( f"Try --timeout 0 (unlimited)" ) except RuntimeError as e: - phase_info = f"- **Phase**: {phase_name}\n" if phase_name else "" - error_msg = ( - f"# Agent Error\n\n{phase_info}" - f"- **Step**: {step.name}\n- **Agent**: {step.agent}\n\n```\n{e}\n```\n" + error_msg = _format_runtime_error_markdown( + e, + step_name=step.name, + agent_name=step.agent, + phase_name=phase_name, ) _save_step_output(run_dir, output_iter, f"{step.name}_error", error_msg) logger.error(" [%s] FAILED — saved to output", step.name) @@ -527,7 +647,7 @@ def _execute_parallel_batch( # Collect results from parallel threads local_outputs: dict[str, str] = {} local_results: dict[str, AgentResult] = {} - errors: list[Exception] = [] + errors: list[tuple[StepConfig, Exception]] = [] # Show a single spinner for the batch from cross_eval.agent import _Spinner @@ -563,19 +683,15 @@ def _execute_parallel_batch( local_results[output_key] = result local_outputs[output_key] = output except Exception as e: - errors.append(e) + errors.append((step, e)) batch_elapsed = round(time.monotonic() - batch_start, 1) - if errors: - spinner.stop(f"[parallel] FAILED ({batch_elapsed}s)") - raise errors[0] - - spinner.stop(f"[parallel] {len(batch)} agents done ({batch_elapsed}s)") - - # Merge results + # Persist successful outputs even if a sibling step failed. for step in batch: key = step.output_key + if key not in local_outputs: + continue step_outputs[key] = local_outputs[key] step_results[key] = local_results[key] r = local_results[key] @@ -585,6 +701,48 @@ def _execute_parallel_batch( ) _save_step_output(run_dir, output_iter, step.name, r.output) + if errors: + spinner.stop(f"[parallel] FAILED ({batch_elapsed}s)") + for failed_step, exc in errors: + if isinstance(exc, subprocess.TimeoutExpired): + stdout = (exc.stdout or b"") if isinstance(exc.stdout, bytes) else (exc.stdout or "") + stderr = (exc.stderr or b"") if isinstance(exc.stderr, bytes) else (exc.stderr or "") + if isinstance(stdout, bytes): + stdout = stdout.decode("utf-8", errors="replace") + if isinstance(stderr, bytes): + stderr = stderr.decode("utf-8", errors="replace") + phase_info = f"- **Phase**: {phase_name}\n" if phase_name else "" + error_msg = ( + f"# Agent Timeout\n\n" + f"{phase_info}" + f"- **Step**: {failed_step.name}\n" + f"- **Agent**: {failed_step.agent}\n" + f"- **Timeout**: {timeout}s\n\n" + f"Partial stdout ({len(stdout)} chars):\n" + f"```\n{stdout[:2000] or '(none)'}\n```\n\n" + f"Stderr:\n```\n{stderr[:2000] or '(none)'}\n```\n" + ) + else: + error_msg = _format_runtime_error_markdown( + exc, + step_name=failed_step.name, + agent_name=failed_step.agent, + phase_name=phase_name, + ) + _save_step_output(run_dir, output_iter, f"{failed_step.name}_error", error_msg) + logger.error(" [%s] FAILED — saved to output", failed_step.name) + + failed_steps = ", ".join(step.name for step, _ in errors) + saved_steps = ", ".join(step.name for step in batch if step.output_key in local_outputs) + first_error = errors[0][1] + saved_note = f" Successful outputs were saved for: {saved_steps}." if saved_steps else "" + raise RuntimeError( + f"Parallel batch failed: {len(errors)}/{len(batch)} steps failed ({failed_steps})." + f"{saved_note} First error:\n{first_error}" + ) + + spinner.stop(f"[parallel] {len(batch)} agents done ({batch_elapsed}s)") + # --------------------------------------------------------------------------- # Context and template helpers @@ -671,13 +829,104 @@ def _normalize_aggregate_output(output: str) -> str: return " ".join(output.lower().split()) +_ESCALATE_PATTERN = re.compile(r"VERDICT:\s*ESCALATE", re.IGNORECASE) + +_TRACKER_TABLE_PATTERN = re.compile( + r"(##+ Issue Tracker[^\n]*\n(?:\|[^\n]+\|\n?)+)", re.DOTALL, +) + + def _extract_verdict(output: str, pattern: str) -> str: - """Extract PASS or FAIL from output using regex pattern.""" + """Extract PASS, FAIL, or ESCALATE from output using regex pattern.""" + if re.search(_ESCALATE_PATTERN, output): + return "ESCALATE" # highest priority if re.search(pattern, output): return "PASS" return "FAIL" +def _extract_senior_tracker(output: str) -> str: + """Extract Issue Tracker table from senior review output.""" + match = _TRACKER_TABLE_PATTERN.search(output) + return match.group(0) if match else "" + + +def _extract_escalated_issues(output: str) -> str: + """Extract escalation details from senior review output.""" + # Look for content between VERDICT: ESCALATE and end, or an escalation section + pattern = r"(?:###?\s*Escalat(?:ed|ion).*?\n)(.*?)(?=\n###|\Z)" + match = re.search(pattern, output, re.DOTALL | re.IGNORECASE) + if match: + return match.group(1).strip() + # Fallback: grab the Action Items section + pattern2 = r"(?:###?\s*Action Items.*?\n)(.*?)(?=\n###|\Z)" + match2 = re.search(pattern2, output, re.DOTALL | re.IGNORECASE) + if match2: + return match2.group(1).strip() + return "" + + +_FP_PATTERN = re.compile(r"[\w/\\]+\.\w{1,5}") +_ISSUE_KEYWORDS = re.compile( + r"\b(missing|validation|error[\s_-]?handling|unused|import|" + r"injection|auth(?:entication|orization)?|deprecated|" + r"leak|overflow|null|undefined|timeout|deadlock|race[\s_-]?condition|" + r"security|permission|encoding|format|parsing|connection|" + r"boundary|initialization|cleanup|resource|concurrency|" + r"exception|crash|hang|corrupt|truncat|duplicat|inconsisten|" + r"omission|over[\s_-]?engineer|refactor|naming|docstring|" + r"type[\s_-]?hint|test|coverage|logging|config|performance)\w*", + re.IGNORECASE, +) + + +def _issue_fingerprints(text: str) -> set[tuple[str, str]]: + """Extract (file_path, issue_keyword) pairs from feedback text. + + For each file path found, look for issue keywords within a window of + ~120 characters around the file path mention and create composite keys. + """ + lower = text.lower() + paths = list(_FP_PATTERN.finditer(lower)) + if not paths: + return set() + + pairs: set[tuple[str, str]] = set() + for m in paths: + fp = m.group() + # Search a window around the file path for issue keywords + window_start = max(0, m.start() - 60) + window_end = min(len(lower), m.end() + 60) + window = lower[window_start:window_end] + for kw_match in _ISSUE_KEYWORDS.finditer(window): + pairs.add((fp, kw_match.group().lower())) + return pairs + + +def _detect_auto_escalate( + feedbacks: list[str], + current_feedback: str, + threshold: int = 2, +) -> bool: + """Detect repeated identical issues across iterations (for auto-escalation). + + Extracts (file_path, issue_keyword) fingerprints from feedback and checks + if any identical pair appears in >= *threshold* previous iterations. + This avoids false positives when the same file is mentioned for completely + different issues across iterations. + """ + current_fps = _issue_fingerprints(current_feedback) + if not current_fps: + return False + + repeat_count = 0 + for prev in feedbacks: + prev_fps = _issue_fingerprints(prev) + if current_fps & prev_fps: + repeat_count += 1 + return repeat_count >= threshold + + def _save_step_output( run_dir: Path, iteration: int, @@ -691,8 +940,56 @@ def _save_step_output( return path +def _format_runtime_error_markdown( + exc: Exception, + *, + step_name: str, + agent_name: str, + phase_name: str | None = None, +) -> str: + """Render a structured markdown error report for a failed step.""" + phase_info = f"- **Phase**: {phase_name}\n" if phase_name else "" + lines = [ + "# Agent Error", + "", + phase_info.rstrip(), + f"- **Step**: {step_name}", + f"- **Agent**: {agent_name}", + ] + lines = [line for line in lines if line] + + if isinstance(exc, AgentInvocationError): + lines.extend( + [ + f"- **Failure Type**: {exc.failure_type}", + f"- **Suggested Action**: {exc.suggested_action}", + "", + "## Command", + f"```", + exc.cmd_preview, + "```", + "", + "## Raw Error", + "```", + exc.raw_error, + "```", + ], + ) + else: + lines.extend( + [ + "", + "```", + str(exc), + "```", + ], + ) + + return "\n".join(lines) + "\n" + + def _save_report(run_dir: Path, config: PipelineConfig, result: PipelineResult) -> None: - """Generate and save the final markdown report.""" + """Build and save the final markdown report.""" report = build_report(config, result) report_path = run_dir / "final-report.md" report_path.parent.mkdir(parents=True, exist_ok=True) diff --git a/cross_eval/prompts.py b/cross_eval/prompts.py index 7ebf091..48f1183 100644 --- a/cross_eval/prompts.py +++ b/cross_eval/prompts.py @@ -12,7 +12,7 @@ from cross_eval.models import PhaseConfig, StepConfig # Default prompt templates # --------------------------------------------------------------------------- -GENERATE_TEMPLATE = """\ +CODING_TEMPLATE = """\ You are tasked with implementing code based on a plan and checklist. ## Plan @@ -53,8 +53,8 @@ You are tasked with reviewing code against a plan and checklist. ## Reference Documents {docs} -## Generated Code / Previous Step Output -{generated_code} +## Coding Output / Previous Step Output +{coding_output} ## Previous Review Feedback {feedback} @@ -94,10 +94,10 @@ security concerns, performance problems), report them separately under \ (Write "N/A" if no previous feedback was provided.) ### Issues Found -List issues ordered by severity (Critical first): -- [Critical][Over-engineering] Description (reference specific plan/checklist item) -- [Major][Omission] Description (reference specific plan/checklist item) -- [Minor][Omission] Description (reference specific plan/checklist item) +List issues ordered by severity (Critical first). Assign each issue a unique ID (ISS-NNN): +- ISS-001 [Critical][Over-engineering] Description (reference specific plan/checklist item) +- ISS-002 [Major][Omission] Description (reference specific plan/checklist item) +- ISS-003 [Minor][Omission] Description (reference specific plan/checklist item) ### Out of Scope Issues Issues found outside plan/checklist scope but worth noting: @@ -119,7 +119,7 @@ Otherwise output: VERDICT: FAIL """ -GENERATE_TEMPLATE_KO = """\ +CODING_TEMPLATE_KO = """\ 당신은 기획서와 체크리스트를 기반으로 코드를 구현하는 개발자입니다. ## 기획서 @@ -159,7 +159,7 @@ REVIEW_TEMPLATE_KO = """\ {docs} ## 검토 대상 코드 -{generated_code} +{coding_output} ## 이전 리뷰 피드백 {feedback} @@ -195,10 +195,10 @@ REVIEW_TEMPLATE_KO = """\ (이전 피드백이 없으면 "해당 없음"이라고 작성하세요.) ### 발견된 이슈 -심각도 순서(Critical 먼저)로 나열: -- [Critical][과최적화] 이슈 설명 (관련 기획서/체크리스트 항목 참조) -- [Major][누락] 이슈 설명 (관련 기획서/체크리스트 항목 참조) -- [Minor][누락] 이슈 설명 (관련 기획서/체크리스트 항목 참조) +심각도 순서(Critical 먼저)로 나열. 각 이슈에 고유 ID(ISS-NNN)를 부여하세요: +- ISS-001 [Critical][과최적화] 이슈 설명 (관련 기획서/체크리스트 항목 참조) +- ISS-002 [Major][누락] 이슈 설명 (관련 기획서/체크리스트 항목 참조) +- ISS-003 [Minor][누락] 이슈 설명 (관련 기획서/체크리스트 항목 참조) ### 범위 밖 이슈 기획서/체크리스트 범위 밖이지만 주목할 만한 이슈: @@ -357,6 +357,150 @@ REVIEW_ONLY_TEMPLATE_KO = """\ 그렇지 않으면: VERDICT: FAIL """ +PLAN_REVIEW_TEMPLATE = """\ +You are tasked with reviewing planning documents before implementation begins. + +## Plan +{plan} + +## Checklist +{checklist} + +## Reference Documents +{docs} + +## Previous Review (iteration {iteration} of {max_iterations}) +{feedback} + +## Review Instructions +Review the planning package itself: the plan, checklist, and reference documents. +You MAY inspect the current repository to validate feasibility, constraints, and integration assumptions. +Do NOT write or modify code. Assume implementation has NOT started yet. + +Your job is to find planning issues that would likely cause bad implementation outcomes: +- Ambiguous or contradictory requirements +- Missing acceptance criteria, constraints, edge cases, or dependencies +- Scope that is broader or more complex than the stated objective +- Checklist items that do not verify the actual requirements +- Plan details that conflict with the current codebase or architecture + +If previous review results are provided above, you MUST: +1. Verify each previously reported issue — is it a real issue or a false positive? +2. Look for issues the previous review MISSED. +3. Do NOT simply repeat the previous review. Provide your own independent assessment. +4. Explicitly mark items as CONFIRMED (still an issue) or DISMISSED (false positive). + +For each issue found, classify it with BOTH severity AND category: + +Severity levels: +- **Critical**: The plan is likely to cause fundamentally wrong implementation or unsafe behavior. +- **Major**: Important requirements, constraints, or acceptance criteria are unclear, conflicting, missing, or incompatible with the existing system. +- **Minor**: Wording, structure, or checklist quality problems that reduce implementation clarity. + +Categories: +- **Over-engineering**: The plan introduces scope, abstractions, or complexity not justified by the stated objective. +- **Omission**: A necessary requirement, constraint, acceptance criterion, edge case, dependency, or compatibility consideration is missing or incomplete. + +If you find issues outside the planning scope (e.g. repository health, pre-existing code problems), report them separately under "Out of Scope Issues". + +## Output Format + +### Issues Found +List issues ordered by severity (Critical first): +- [Critical][Over-engineering] Description (reference specific plan/checklist item) +- [Major][Omission] Description (reference specific plan/checklist item) +- [Minor][Omission] Description (reference specific plan/checklist item) + +### Out of Scope Issues +Issues found outside planning scope but worth noting: +- [Critical] Description of issue +- [Minor] Description of issue +(Write "None" if no out-of-scope issues found.) + +### Summary +- Critical: N, Major: N, Minor: N +- Over-engineering count: N +- Omission count: N +- CONFIRMED: N, DISMISSED: N +- Overall quality: [BRIEF ASSESSMENT] + +### Verdict +If the planning documents are clear, complete enough to implement, compatible with the current repository, and free of unjustified scope, output: VERDICT: PASS +Otherwise output: VERDICT: FAIL +""" + +PLAN_REVIEW_TEMPLATE_KO = """\ +당신은 구현 시작 전에 기획 문서를 검토하는 리뷰어입니다. + +## 기획서 +{plan} + +## 체크리스트 +{checklist} + +## 참고 문서 +{docs} + +## 이전 리뷰 결과 ({max_iterations}회 중 {iteration}번째) +{feedback} + +## 검토 지침 +검토 대상은 코드가 아니라 기획 패키지 자체입니다: 기획서, 체크리스트, 참고 문서를 함께 검토하세요. +현재 저장소를 살펴보며 구현 가능성, 제약조건, 통합 가정이 맞는지도 확인할 수 있습니다. +코드를 생성하거나 수정하지 마세요. 아직 구현이 시작되지 않았다고 가정하세요. + +목표는 구현 단계에서 문제를 일으킬 기획 결함을 찾는 것입니다: +- 요구사항이 모호하거나 서로 충돌하는 경우 +- 수용 기준, 제약조건, 엣지 케이스, 의존성이 빠진 경우 +- 목표 대비 범위가 지나치게 넓거나 복잡한 경우 +- 체크리스트가 실제 요구사항 검증에 충분하지 않은 경우 +- 기획 내용이 현재 코드베이스나 아키텍처와 충돌하는 경우 + +이전 리뷰 결과가 제공된 경우 반드시: +1. 이전에 보고된 각 이슈를 검증하세요 — 진짜 이슈인지 오탐인지? +2. 이전 리뷰가 놓친 새로운 이슈를 찾으세요. +3. 이전 리뷰를 그대로 반복하지 마세요. 독립적인 평가를 제공하세요. +4. 각 항목에 CONFIRMED (여전히 이슈) 또는 DISMISSED (오탐) 태그를 명시하세요. + +발견된 각 이슈에 심각도와 카테고리를 모두 부여하세요: + +심각도: +- **Critical**: 잘못된 구현이나 위험한 동작으로 직결될 가능성이 큰 기획 결함. +- **Major**: 중요한 요구사항, 제약조건, 수용 기준이 모호하거나 충돌하거나 누락되었거나 기존 시스템과 맞지 않는 경우. +- **Minor**: 문서 표현, 구조, 체크리스트 품질 문제로 구현 명확성이 떨어지는 경우. + +카테고리: +- **과최적화**: 목표 대비 불필요한 범위, 추상화, 복잡성을 기획에 추가한 경우. +- **누락**: 필요한 요구사항, 제약조건, 수용 기준, 엣지 케이스, 의존성, 호환성 고려가 빠졌거나 불완전한 경우. + +기획 범위 밖에서 발견된 문제(저장소 상태, 기존 코드 문제 등)는 "범위 밖 이슈" 섹션에 별도로 보고하세요. + +## 출력 형식 + +### 발견된 이슈 +심각도 순서(Critical 먼저)로 나열: +- [Critical][과최적화] 이슈 설명 (관련 기획서/체크리스트 항목 참조) +- [Major][누락] 이슈 설명 (관련 기획서/체크리스트 항목 참조) +- [Minor][누락] 이슈 설명 (관련 기획서/체크리스트 항목 참조) + +### 범위 밖 이슈 +기획 범위 밖이지만 주목할 만한 이슈: +- [Critical] 이슈 설명 +- [Minor] 이슈 설명 +(범위 밖 이슈가 없으면 "없음"이라고 작성하세요.) + +### 요약 +- Critical: N, Major: N, Minor: N +- 과최적화 수: N +- 누락 수: N +- CONFIRMED: N, DISMISSED: N +- 전체 품질: [간략한 평가] + +### 판정 +기획 문서가 구현 가능한 수준으로 명확하고 충분하며 현재 저장소와도 정합적이고, 불필요한 범위 확장이 없으면: VERDICT: PASS +그렇지 않으면: VERDICT: FAIL +""" + AGGREGATE_REVIEW_TEMPLATE = """\ You are adjudicating multiple review results and turning them into an actionable decision. @@ -378,6 +522,9 @@ You are adjudicating multiple review results and turning them into an actionable ## Previous Verification Feedback {feedback} +## Previous Issue Tracker +{previous_senior_tracker} + ## Instructions Explore the project directory to confirm the current codebase state. Then: 1. Deduplicate overlapping issues across reviewers. @@ -385,7 +532,12 @@ Explore the project directory to confirm the current codebase state. Then: 3. Keep only issues supported by the plan, checklist, code, or reviewer evidence. 4. When evidence is mixed, explain what was confirmed, what was dismissed, and what still needs follow-up. 5. Produce a prioritized action list for the coder. -6. If no confirmed issue remains, output VERDICT: PASS. Otherwise VERDICT: FAIL. +6. Maintain the Issue Tracker table across iterations (carry forward unresolved issues). +7. If no confirmed issue remains, output VERDICT: PASS. +8. If issues exist that the coder can fix, output VERDICT: FAIL. +9. If issues require human intervention (ambiguous requirements, architecture decisions, \ +external dependency problems, or the same issue persists after 2+ fix attempts), \ +output VERDICT: ESCALATE. ## Output Format @@ -401,13 +553,19 @@ Explore the project directory to confirm the current codebase state. Then: 1. Concrete fix the coder should make 2. Concrete fix the coder should make +## Issue Tracker + +| ISS-ID | Severity | Description | Status | Since | +|--------|----------|-------------|--------|-------| +| ISS-001 | Critical | ... | Open/Fixed/Dismissed | v1 | + ### Summary - Confirmed issues: N - Dismissed findings: N (false positive: N, already fixed: N) - Overall quality: [BRIEF ASSESSMENT] ### Verdict -VERDICT: PASS or VERDICT: FAIL +VERDICT: PASS or VERDICT: FAIL or VERDICT: ESCALATE """ AGGREGATE_REVIEW_TEMPLATE_KO = """\ @@ -431,6 +589,9 @@ AGGREGATE_REVIEW_TEMPLATE_KO = """\ ## 이전 검증 피드백 {feedback} +## 이전 이슈 트래커 +{previous_senior_tracker} + ## 지침 프로젝트 디렉토리를 탐색하여 현재 코드베이스 상태를 확인한 뒤 다음을 수행하세요. 1. 리뷰어들 사이에 중복되는 이슈를 합치세요. @@ -438,7 +599,11 @@ AGGREGATE_REVIEW_TEMPLATE_KO = """\ 3. 기획서, 체크리스트, 코드, 리뷰 근거로 뒷받침되는 이슈만 남기세요. 4. 근거가 엇갈리면 무엇이 확정이고 무엇이 기각 또는 추가확인 대상인지 분명히 적으세요. 5. coder가 바로 수정할 수 있는 우선순위 액션 아이템을 만드세요. -6. 확정된 이슈가 없으면 VERDICT: PASS, 있으면 VERDICT: FAIL 을 출력하세요. +6. 이슈 트래커 테이블을 반복 간에 유지하세요 (미해결 이슈를 이월). +7. 확정된 이슈가 없으면 VERDICT: PASS 를 출력하세요. +8. coder가 수정 가능한 이슈가 있으면 VERDICT: FAIL 을 출력하세요. +9. 사람의 개입이 필요한 이슈(모호한 요구사항, 아키텍처 결정, 외부 의존성 문제, \ +동일 이슈가 2회 이상 해결 실패)가 있으면 VERDICT: ESCALATE 를 출력하세요. ## 출력 형식 @@ -454,26 +619,34 @@ AGGREGATE_REVIEW_TEMPLATE_KO = """\ 1. coder가 수정해야 할 구체적인 작업 2. coder가 수정해야 할 구체적인 작업 +## 이슈 트래커 + +| ISS-ID | 심각도 | 설명 | 상태 | 최초 발견 | +|--------|--------|------|------|-----------| +| ISS-001 | Critical | ... | Open/Fixed/Dismissed | v1 | + ### 요약 - 확정 이슈 수: N - 기각된 주장 수: N (오탐: N, 수정 완료: N) - 전체 품질: [간략한 평가] ### 판정 -VERDICT: PASS 또는 VERDICT: FAIL +VERDICT: PASS 또는 VERDICT: FAIL 또는 VERDICT: ESCALATE """ DEFAULT_TEMPLATES: dict[str, dict[str, str]] = { "en": { - "generate": GENERATE_TEMPLATE, + "coding": CODING_TEMPLATE, "review": REVIEW_TEMPLATE, + "plan-review": PLAN_REVIEW_TEMPLATE, "review-only": REVIEW_ONLY_TEMPLATE, "aggregate-review": AGGREGATE_REVIEW_TEMPLATE, }, "ko": { - "generate": GENERATE_TEMPLATE_KO, + "coding": CODING_TEMPLATE_KO, "review": REVIEW_TEMPLATE_KO, + "plan-review": PLAN_REVIEW_TEMPLATE_KO, "review-only": REVIEW_ONLY_TEMPLATE_KO, "aggregate-review": AGGREGATE_REVIEW_TEMPLATE_KO, }, @@ -544,18 +717,18 @@ def _build_named_bundle( def _build_simple_preset( coders: list[str], reviewers: list[str], seniors: list[str], ) -> list[StepConfig]: - """First coder generates, first reviewer reviews.""" + """First coder writes code, first reviewer reviews.""" if not coders: raise ValueError("'simple' preset requires at least 1 coder") if not reviewers: raise ValueError("'simple' preset requires at least 1 reviewer") steps = [ StepConfig( - name="generate", + name="coding", agent=coders[0], - role="generate", - prompt_template="default:generate", - output_key="generated_code", + role="coding", + prompt_template="default:coding", + output_key="coding_output", ), StepConfig( name="review", @@ -576,7 +749,7 @@ def _build_simple_preset( output_key="senior_review_result", verdict=True, context_override={ - "candidate_outputs": "## Generated code\n{generated_code}", + "candidate_outputs": "## Coding output\n{coding_output}", "reviews_bundle": f"## Review: {reviewers[0]} (review)\n{{review_result}}", }, ), @@ -587,25 +760,25 @@ def _build_simple_preset( def _build_cross_review_preset( coders: list[str], reviewers: list[str], seniors: list[str], ) -> list[StepConfig]: - """Both coders generate, then cross-review each other's output.""" + """Both coders write code, then cross-review each other's output.""" if len(coders) < 2: raise ValueError("'cross-review' preset requires at least 2 coders") a, b = coders[0], coders[1] ak, bk = _unique_safe_keys([a, b]) steps = [ StepConfig( - name=f"generate_{ak}", + name=f"coding_{ak}", agent=a, - role="generate", - prompt_template="default:generate", + role="coding", + prompt_template="default:coding", output_key=f"code_{ak}", parallel=True, ), StepConfig( - name=f"generate_{bk}", + name=f"coding_{bk}", agent=b, - role="generate", - prompt_template="default:generate", + role="coding", + prompt_template="default:coding", output_key=f"code_{bk}", parallel=True, ), @@ -615,7 +788,7 @@ def _build_cross_review_preset( role="review", prompt_template="default:review", output_key=f"review_by_{ak}", - context_override={"generated_code": f"{{code_{bk}}}"}, + context_override={"coding_output": f"{{code_{bk}}}"}, parallel=True, verdict=not seniors, ), @@ -626,7 +799,7 @@ def _build_cross_review_preset( prompt_template="default:review", output_key=f"review_by_{bk}", verdict=not seniors, - context_override={"generated_code": f"{{code_{ak}}}"}, + context_override={"coding_output": f"{{code_{ak}}}"}, parallel=True, ), ] @@ -642,9 +815,9 @@ def _build_cross_review_preset( context_override={ "candidate_outputs": _build_named_bundle( [a, b], - [f"generate_{ak}", f"generate_{bk}"], + [f"coding_{ak}", f"coding_{bk}"], [f"code_{ak}", f"code_{bk}"], - "Candidate", + "Coding Output", ), "reviews_bundle": _build_named_bundle( [a, b], @@ -715,6 +888,61 @@ def _build_review_only_preset( return steps +def _build_plan_review_preset( + coders: list[str], reviewers: list[str], seniors: list[str], +) -> list[StepConfig]: + """Plan-review: reviewers audit planning docs before implementation.""" + if not reviewers: + raise ValueError("'plan-review' preset requires at least 1 reviewer") + + if len(reviewers) == 1 and not seniors: + return [ + StepConfig( + name="plan_review", + agent=reviewers[0], + role="review", + prompt_template="default:plan-review", + output_key="plan_review_result", + verdict=True, + ), + ] + + steps: list[StepConfig] = [] + reviewer_keys = _unique_safe_keys(reviewers) + for reviewer, rk in zip(reviewers, reviewer_keys): + steps.append( + StepConfig( + name=f"plan_review_{rk}", + agent=reviewer, + role="review", + prompt_template="default:plan-review", + output_key=f"plan_review_{rk}", + verdict=not seniors, + parallel=True, + ), + ) + if seniors: + step_names = [f"plan_review_{rk}" for rk in reviewer_keys] + output_keys = [f"plan_review_{rk}" for rk in reviewer_keys] + steps.append( + StepConfig( + name="senior_review", + agent=seniors[0], + role="review", + prompt_template="default:aggregate-review", + output_key="senior_review_result", + verdict=True, + context_override={ + "candidate_outputs": "Planning documents under review (plan/checklist/reference docs).", + "reviews_bundle": _build_named_bundle( + reviewers, step_names, output_keys, "Review", + ), + }, + ), + ) + return steps + + def _build_review_fix_preset( coders: list[str], reviewers: list[str], seniors: list[str], ) -> list[PhaseConfig]: @@ -762,11 +990,11 @@ def _build_review_fix_preset( }, ), StepConfig( - name="generate", + name="coding", agent=fix_coder, - role="generate", - prompt_template="default:generate", - output_key="generated_code", + role="coding", + prompt_template="default:coding", + output_key="coding_output", context_override={"feedback": "{aggregate_review}"}, ), StepConfig( @@ -784,14 +1012,44 @@ def _build_review_fix_preset( ] +def _build_coding_review_fix_preset( + coders: list[str], reviewers: list[str], seniors: list[str], +) -> list[PhaseConfig]: + """Write code once, then run the review-fix convergence loop.""" + if not coders: + raise ValueError("'coding-review-fix' preset requires at least 1 coder") + if not reviewers: + raise ValueError("'coding-review-fix' preset requires at least 1 reviewer") + + return [ + PhaseConfig( + name="initial_coding", + steps=[ + StepConfig( + name="coding", + agent=coders[0], + role="coding", + prompt_template="default:coding", + output_key="coding_output", + ), + ], + max_iterations=1, + consecutive_pass=1, + ), + *_build_review_fix_preset(coders, reviewers, seniors), + ] + + PIPELINE_PRESETS: dict[str, Callable] = { "simple": _build_simple_preset, "cross-review": _build_cross_review_preset, + "plan-review": _build_plan_review_preset, "review-only": _build_review_only_preset, } PHASED_PRESETS: dict[str, Callable] = { "review-fix": _build_review_fix_preset, + "coding-review-fix": _build_coding_review_fix_preset, } ALL_PRESET_NAMES: list[str] = list(PIPELINE_PRESETS.keys()) + list(PHASED_PRESETS.keys()) @@ -805,7 +1063,7 @@ def resolve_template(template_ref: str, templates_dir: Optional[Path] = None) -> """Resolve a template reference to its content string. Formats: - - "default:generate" -> built-in GENERATE_TEMPLATE + - "default:coding" -> built-in CODING_TEMPLATE - "default:review" -> built-in REVIEW_TEMPLATE - "path/to/file.md" -> read file contents """ diff --git a/cross_eval/report.py b/cross_eval/report.py index fac30f2..9b29e2e 100644 --- a/cross_eval/report.py +++ b/cross_eval/report.py @@ -48,11 +48,16 @@ _STRINGS: dict[str, dict[str, str]] = { "pass_msg": "All checklist items satisfied. No over-engineering or omissions detected.", "fail_phased": "Pipeline phases ({phases}) completed without full convergence.", "fail_simple": "Maximum iterations ({max_iter}) reached without passing all checks.", + "escalate_msg": "Human review required. The following issues could not be resolved automatically:", + "escalate_title": "Escalation Report", + "issue_tracker_title": "Issue Tracker Summary", + "issue_tracker_desc": "Issues discovered across iterations and their final resolution status.", "metrics_title": "Review Metrics", "metrics_trend_title": "Metrics Trend", "metrics_iter": "Iter", "metrics_total_issues": "Total Issues", "metrics_na": "N/A", + "iteration_details": "Iteration Details", }, "ko": { "title": "교차 검증 리포트", @@ -84,11 +89,16 @@ _STRINGS: dict[str, dict[str, str]] = { "pass_msg": "모든 체크리스트 항목 충족. 과최적화/누락 없음.", "fail_phased": "파이프라인 페이즈 ({phases}) 완료, 완전한 수렴에 도달하지 못함.", "fail_simple": "최대 반복 횟수 ({max_iter})에 도달, 모든 검증을 통과하지 못함.", + "escalate_msg": "사람의 확인이 필요합니다. 아래 이슈는 자동으로 해결할 수 없었습니다:", + "escalate_title": "에스컬레이션 리포트", + "issue_tracker_title": "이슈 트래커 요약", + "issue_tracker_desc": "반복 과정에서 발견된 이슈와 최종 처리 상태입니다.", "metrics_title": "리뷰 메트릭", "metrics_trend_title": "메트릭 추이", "metrics_iter": "반복", "metrics_total_issues": "총 이슈", "metrics_na": "해당 없음", + "iteration_details": "반복 상세", }, } @@ -181,20 +191,41 @@ def _build_simple_report( out_of_scope_items: list[tuple[int, str]] = [] + # Pre-scan iterations to collect out-of-scope items and review metrics + # (needed before rendering final verdict / metrics sections) for iter_result in result.iterations: - lines.append("---\n") - lines.append(f"## {_t(config, 'iteration')} {iter_result.iteration}\n") + for step in config.pipeline: + output = iter_result.step_outputs.get(step.output_key, "") + if step.role == "review": + oos = _extract_out_of_scope(output) + if oos: + out_of_scope_items.append((iter_result.iteration, oos)) + step_metrics = parse_review_metrics(output) + if iter_result.review_metrics is None: + iter_result.review_metrics = step_metrics + else: + iter_result.review_metrics = _aggregate_metrics( + iter_result.review_metrics, step_metrics, + ) - _append_iteration_steps(lines, config, iter_result, config.pipeline, out_of_scope_items) + _append_final_verdict(lines, config, result) + _append_issue_tracker_summary(lines, config, result) + _append_review_metrics_table(lines, config, result) + + lines.append("---\n") + lines.append(f"## {_t(config, 'iteration_details')}\n") + + for iter_result in result.iterations: + lines.append(f"### {_t(config, 'iteration')} {iter_result.iteration}\n") + + _append_iteration_steps(lines, config, iter_result, config.pipeline, out_of_scope_items, skip_extraction=True) if iter_result.feedback: lines.append(f"**{_t(config, 'feedback_next')}** {iter_result.feedback[:200]}...") lines.append("") _append_out_of_scope(lines, config, out_of_scope_items) - _append_review_metrics_table(lines, config, result) _append_repeated_aggregate(lines, config, result) - _append_final_verdict(lines, config, result) return "\n".join(lines) @@ -211,14 +242,42 @@ def _build_phased_report( phase_map = {p.name: p for p in config.phases} out_of_scope_items: list[tuple[int, str]] = [] + # Pre-scan iterations to collect out-of-scope items and review metrics + for phase_name, phase_iters_iter in groupby( + result.iterations, key=lambda ir: ir.phase_name, + ): + phase_iters = list(phase_iters_iter) + phase_config = phase_map.get(phase_name or "") + steps = phase_config.steps if phase_config else config.pipeline + for iter_result in phase_iters: + for step in steps: + output = iter_result.step_outputs.get(step.output_key, "") + if step.role == "review": + oos = _extract_out_of_scope(output) + if oos: + out_of_scope_items.append((iter_result.iteration, oos)) + step_metrics = parse_review_metrics(output) + if iter_result.review_metrics is None: + iter_result.review_metrics = step_metrics + else: + iter_result.review_metrics = _aggregate_metrics( + iter_result.review_metrics, step_metrics, + ) + + _append_final_verdict(lines, config, result) + _append_issue_tracker_summary(lines, config, result) + _append_review_metrics_table(lines, config, result) + + lines.append("---\n") + lines.append(f"## {_t(config, 'iteration_details')}\n") + for phase_name, phase_iters_iter in groupby( result.iterations, key=lambda ir: ir.phase_name, ): phase_iters = list(phase_iters_iter) phase_config = phase_map.get(phase_name or "") - lines.append("---\n") - lines.append(f"## {_t(config, 'phase')}: {phase_name}\n") + lines.append(f"### {_t(config, 'phase')}: {phase_name}\n") if phase_config: step_desc = " → ".join(s.name for s in phase_config.steps) @@ -242,14 +301,17 @@ def _build_phased_report( verdict_label += " ✓" else: verdict_label = " — PASS ✓" + elif iter_result.verdict == "ESCALATE": + consecutive = 0 + verdict_label = " — ESCALATE" else: consecutive = 0 verdict_label = " — FAIL" lines.append( - f"### {_t(config, 'iteration')} {iter_result.iteration}{verdict_label}\n" + f"#### {_t(config, 'iteration')} {iter_result.iteration}{verdict_label}\n" ) - _append_iteration_steps(lines, config, iter_result, steps, out_of_scope_items) + _append_iteration_steps(lines, config, iter_result, steps, out_of_scope_items, skip_extraction=True) if iter_result.feedback: lines.append( @@ -258,9 +320,7 @@ def _build_phased_report( lines.append("") _append_out_of_scope(lines, config, out_of_scope_items) - _append_review_metrics_table(lines, config, result) _append_repeated_aggregate(lines, config, result) - _append_final_verdict(lines, config, result) return "\n".join(lines) @@ -309,8 +369,14 @@ def _append_iteration_steps( iter_result: IterationResult, steps: list[StepConfig], out_of_scope_items: list[tuple[int, str]], + *, + skip_extraction: bool = False, ) -> None: - """Append step details for one iteration.""" + """Append step details for one iteration. + + If *skip_extraction* is True, out-of-scope and review-metrics parsing + is skipped (useful when a pre-scan already collected that data). + """ for step in steps: agent_result = iter_result.step_results.get(step.output_key) output = iter_result.step_outputs.get(step.output_key, "") @@ -334,7 +400,7 @@ def _append_iteration_steps( lines.append(output) lines.append("") - if step.role == "review": + if not skip_extraction and step.role == "review": oos = _extract_out_of_scope(output) if oos: out_of_scope_items.append((iter_result.iteration, oos)) @@ -471,6 +537,12 @@ def _append_final_verdict( if result.final_verdict == "PASS": lines.append(_t(config, "pass_msg")) + elif result.final_verdict == "ESCALATE": + lines.append(_t(config, "escalate_msg")) + lines.append("") + for issue in result.escalated_issues: + lines.append(f"- {issue}") + lines.append("") else: if config.phases: phase_names = " → ".join(p.name for p in config.phases) @@ -481,6 +553,121 @@ def _append_final_verdict( ) +# --------------------------------------------------------------------------- +# Issue Tracker extraction from senior/aggregate outputs +# --------------------------------------------------------------------------- + +_ISSUE_TRACKER_PATTERN = re.compile( + r"##+ (?:Issue Tracker|이슈 트래커)[^\n]*\n((?:\|[^\n]+\|\n?)+)", + re.DOTALL, +) + +_TRACKER_ROW_PATTERN = re.compile( + r"^\|\s*(ISS-\d+)\s*\|\s*(\S+)\s*\|\s*(.*?)\s*\|\s*(\S+)\s*\|\s*(\S+)\s*\|", + re.MULTILINE, +) + + +def _extract_issue_tracker_rows( + result: PipelineResult, +) -> list[dict[str, str]]: + """Extract the latest Issue Tracker table from pipeline results. + + Scans iteration outputs in reverse to find the most recent tracker table + from aggregate/senior review steps. Falls back to parsing individual + review outputs for ISS-NNN tagged issues. + """ + # Try to find a tracker table from the last iteration with one + for ir in reversed(result.iterations): + for key, output in ir.step_outputs.items(): + match = _ISSUE_TRACKER_PATTERN.search(output) + if not match: + continue + table_text = match.group(1) + rows = [] + for row_match in _TRACKER_ROW_PATTERN.finditer(table_text): + rows.append({ + "id": row_match.group(1), + "severity": row_match.group(2), + "description": row_match.group(3).strip(), + "status": row_match.group(4), + "since": row_match.group(5), + }) + if rows: + return rows + + # Fallback: parse ISS-NNN from review outputs across iterations + seen: dict[str, dict[str, str]] = {} + for ir in result.iterations: + for key, output in ir.step_outputs.items(): + for m in re.finditer( + r"(ISS-\d+)\s*\[(\w+)\]\[.*?\]\s*(.*?)(?:\n|$)", output, + ): + iss_id = m.group(1) + if iss_id not in seen: + seen[iss_id] = { + "id": iss_id, + "severity": m.group(2), + "description": m.group(3).strip()[:80], + "status": "Open", + "since": f"v{ir.iteration}", + } + return list(seen.values()) + + +def _append_issue_tracker_summary( + lines: list[str], + config: PipelineConfig, + result: PipelineResult, +) -> None: + """Append a consolidated issue tracker table to the report.""" + rows = _extract_issue_tracker_rows(result) + if not rows: + return + + lines.append("---\n") + lines.append(f"## {_t(config, 'issue_tracker_title')}\n") + lines.append(f"{_t(config, 'issue_tracker_desc')}\n") + + lang = getattr(config, "language", "en") + if lang == "ko": + lines.append("| ISS-ID | 심각도 | 설명 | 상태 | 최초 발견 |") + else: + lines.append("| ISS-ID | Severity | Description | Status | Since |") + lines.append("|--------|----------|-------------|--------|-------|") + + for row in rows: + lines.append( + f"| {row['id']} | {row['severity']} " + f"| {row['description']} | {row['status']} | {row['since']} |" + ) + lines.append("") + + +def print_escalation_report( + config: PipelineConfig, + result: PipelineResult, +) -> None: + """Print a prominent ANSI-colored escalation report to the terminal.""" + RED = "\033[31m" + YELLOW = "\033[33m" + BOLD = "\033[1m" + RESET = "\033[0m" + + title = _t(config, "escalate_title") + msg = _t(config, "escalate_msg") + + print(f"\n{RED}{BOLD}{'=' * 60}") + print(f" {title}") + print(f"{'=' * 60}{RESET}\n") + print(f"{YELLOW}{msg}{RESET}\n") + + for issue in result.escalated_issues: + print(f" {RED}•{RESET} {issue}") + + print(f"\n{RED}{BOLD}{'=' * 60}{RESET}\n") + + def _append_repeated_aggregate( lines: list[str], config: PipelineConfig, diff --git a/tests/test_config.py b/tests/test_config.py index 7a4cb1a..ba61b92 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,19 +1,25 @@ from __future__ import annotations +import tempfile import unittest +from pathlib import Path from unittest.mock import patch -from cross_eval.agent import _supports_reasoning_effort +from cross_eval.agent import AgentInvocationError, _supports_reasoning_effort +from cross_eval.cli import _apply_phased_iteration_override from cross_eval.agent import invoke_agent from cross_eval.config import ( BUILTIN_AGENTS, _default_seniors_for_preset, apply_reasoning_effort_settings, normalize_reasoning_effort, + normalize_prompt_template, + normalize_step_role, validate_config, ) from cross_eval.models import ( AgentConfig, + AgentResult, IterationResult, PhaseConfig, PipelineConfig, @@ -21,25 +27,53 @@ from cross_eval.models import ( ReviewMetrics, StepConfig, ) -from cross_eval.pipeline import _detect_repeated_aggregate +from cross_eval.pipeline import ( + _detect_auto_escalate, + _detect_repeated_aggregate, + _execute_parallel_batch, + _extract_senior_tracker, + _extract_verdict, +) from cross_eval.prompts import ( - GENERATE_TEMPLATE, - GENERATE_TEMPLATE_KO, + CODING_TEMPLATE, + CODING_TEMPLATE_KO, REVIEW_TEMPLATE, REVIEW_TEMPLATE_KO, + PLAN_REVIEW_TEMPLATE, + PLAN_REVIEW_TEMPLATE_KO, REVIEW_ONLY_TEMPLATE, REVIEW_ONLY_TEMPLATE_KO, AGGREGATE_REVIEW_TEMPLATE, AGGREGATE_REVIEW_TEMPLATE_KO, _build_cross_review_preset, + _build_coding_review_fix_preset, + _build_plan_review_preset, _build_review_fix_preset, _build_review_only_preset, _build_simple_preset, ) -from cross_eval.report import build_report, parse_review_metrics - +from cross_eval.config import _SENIOR_SYSTEM_PROMPT +from cross_eval.report import build_report, parse_review_metrics, print_escalation_report class BuiltinAgentConfigTest(unittest.TestCase): + def test_claude_builtin_agents_use_user_settings_and_disable_slash_commands(self) -> None: + for agent_name in ("claude-coder", "claude-reviewer", "claude-senior"): + with self.subTest(agent=agent_name): + args = BUILTIN_AGENTS[agent_name].args + self.assertIn("--setting-sources", args) + self.assertIn("user", args) + self.assertIn("--disable-slash-commands", args) + + def test_claude_builtin_agents_use_role_specific_permission_modes(self) -> None: + coder_args = BUILTIN_AGENTS["claude-coder"].args + reviewer_args = BUILTIN_AGENTS["claude-reviewer"].args + senior_args = BUILTIN_AGENTS["claude-senior"].args + + self.assertIn("--dangerously-skip-permissions", coder_args) + self.assertIn("bypassPermissions", coder_args) + self.assertIn("plan", reviewer_args) + self.assertIn("plan", senior_args) + def test_codex_builtin_agents_skip_git_repo_check(self) -> None: for agent_name in ("codex-coder", "codex-reviewer", "codex-senior"): with self.subTest(agent=agent_name): @@ -62,6 +96,10 @@ class BuiltinAgentConfigTest(unittest.TestCase): self.assertEqual(normalize_reasoning_effort("extra_high"), "xhigh") self.assertEqual(normalize_reasoning_effort("x-high"), "xhigh") + def test_normalize_step_role_and_template_aliases(self) -> None: + self.assertEqual(normalize_step_role("coding"), "coding") + self.assertEqual(normalize_prompt_template("default:coding"), "default:coding") + def test_apply_reasoning_effort_settings_uses_defaults_and_role_overrides(self) -> None: config = PipelineConfig( agents={ @@ -116,6 +154,123 @@ class BuiltinAgentConfigTest(unittest.TestCase): ["codex", "-c", 'model_reasoning_effort="high"'], ) + def test_invoke_agent_classifies_auth_failures(self) -> None: + def _fake_run(cmd, **kwargs): + class _Result: + returncode = 1 + stdout = "" + stderr = "Not logged in · Please run /login" + + return _Result() + + agent = AgentConfig( + name="claude-reviewer", + command="claude", + args=["-p", "--model", "opus"], + ) + + with patch("subprocess.run", side_effect=_fake_run): + with self.assertRaises(AgentInvocationError) as ctx: + invoke_agent(agent, "prompt", "review", quiet=True) + + self.assertEqual(ctx.exception.failure_type, "AUTH") + self.assertIn("Re-authenticate", ctx.exception.suggested_action) + + def test_invoke_agent_classifies_usage_limit_failures(self) -> None: + def _fake_run(cmd, **kwargs): + class _Result: + returncode = 1 + stdout = "" + stderr = "API Error: 429 rate limit exceeded for current quota" + + return _Result() + + agent = AgentConfig( + name="codex-reviewer", + command="codex", + args=["exec", "--model", "gpt-5.4", "-"], + ) + + with patch("subprocess.run", side_effect=_fake_run): + with self.assertRaises(AgentInvocationError) as ctx: + invoke_agent(agent, "prompt", "review", quiet=True) + + self.assertEqual(ctx.exception.failure_type, "USAGE_LIMIT") + self.assertIn("quota", ctx.exception.suggested_action) + + def test_parallel_batch_saves_successes_before_failure(self) -> None: + config = PipelineConfig( + agents={ + "ok-reviewer": AgentConfig(name="ok-reviewer", command="codex"), + "bad-reviewer": AgentConfig(name="bad-reviewer", command="claude"), + }, + ) + steps = [ + StepConfig( + name="review_ok", + agent="ok-reviewer", + role="review", + prompt_template="default:review-only", + output_key="review_ok", + parallel=True, + ), + StepConfig( + name="review_bad", + agent="bad-reviewer", + role="review", + prompt_template="default:review-only", + output_key="review_bad", + parallel=True, + ), + ] + step_outputs: dict[str, str] = {} + step_results: dict[str, AgentResult] = {} + + def _fake_invoke(agent, prompt, step_name, **kwargs): + if step_name == "review_ok": + return AgentResult( + output="VERDICT: PASS", + exit_code=0, + agent_name=agent.name, + step_name=step_name, + duration_seconds=1.0, + ) + raise AgentInvocationError( + agent_name=agent.name, + step_name=step_name, + cmd_preview="claude -p ...", + raw_error="API Error: 429 rate limit exceeded for current quota", + failure_type="USAGE_LIMIT", + suggested_action="Agent CLI hit a quota, billing, or token budget limit. Refill or raise the limit, then rerun.", + ) + + with tempfile.TemporaryDirectory() as tmpdir: + with patch("cross_eval.pipeline.invoke_agent", side_effect=_fake_invoke): + with self.assertRaises(RuntimeError) as ctx: + _execute_parallel_batch( + steps, + config, + input_contents={}, + feedback="", + iteration=1, + max_iterations=3, + cwd=Path(tmpdir), + timeout=None, + dry_run=False, + step_outputs=step_outputs, + step_results=step_results, + run_dir=Path(tmpdir), + output_iter=1, + ) + + self.assertIn("Successful outputs were saved for: review_ok", str(ctx.exception)) + self.assertEqual(step_outputs["review_ok"], "VERDICT: PASS") + self.assertTrue((Path(tmpdir) / "v1" / "review_ok.md").exists()) + error_path = Path(tmpdir) / "v1" / "review_bad_error.md" + self.assertTrue(error_path.exists()) + self.assertIn("Failure Type", error_path.read_text(encoding="utf-8")) + self.assertIn("USAGE_LIMIT", error_path.read_text(encoding="utf-8")) + def test_detect_repeated_aggregate_warns_on_same_output(self) -> None: steps = [ StepConfig( @@ -169,6 +324,14 @@ class BuiltinAgentConfigTest(unittest.TestCase): ), ["claude-senior"], ) + self.assertEqual( + _default_seniors_for_preset( + "preset:coding-review-fix", + ["codex-reviewer"], + BUILTIN_AGENTS, + ), + ["codex-senior"], + ) self.assertEqual( _default_seniors_for_preset( "preset:simple", @@ -204,9 +367,37 @@ class BuiltinAgentConfigTest(unittest.TestCase): ) self.assertEqual( [step.name for step in converge.steps[3:]], - ["aggregate_review", "generate", "verify"], + ["aggregate_review", "coding", "verify"], ) + def test_coding_review_fix_starts_with_single_coding_phase(self) -> None: + phases = _build_coding_review_fix_preset( + ["codex-coder"], + ["claude-reviewer", "codex-reviewer"], + ["codex-senior"], + ) + + self.assertEqual([phase.name for phase in phases], ["initial_coding", "review_fix"]) + self.assertEqual(phases[0].max_iterations, 1) + self.assertEqual([step.name for step in phases[0].steps], ["coding"]) + self.assertEqual([step.name for step in phases[1].steps[2:]], ["aggregate_review", "coding", "verify"]) + + def test_apply_phased_iteration_override_updates_only_verdict_phases(self) -> None: + config = PipelineConfig( + phases=_build_coding_review_fix_preset( + ["codex-coder"], + ["codex-reviewer"], + ["codex-senior"], + ), + ) + + _apply_phased_iteration_override(config, 10) + + self.assertEqual(config.phases[0].name, "initial_coding") + self.assertEqual(config.phases[0].max_iterations, 1) + self.assertEqual(config.phases[1].name, "review_fix") + self.assertEqual(config.phases[1].max_iterations, 10) + def test_review_only_duplicate_reviewers_get_unique_step_keys(self) -> None: steps = _build_review_only_preset( ["codex-coder"], @@ -219,6 +410,31 @@ class BuiltinAgentConfigTest(unittest.TestCase): ["review_codex_reviewer", "review_codex_reviewer_2"], ) + def test_plan_review_duplicate_reviewers_get_unique_step_keys(self) -> None: + steps = _build_plan_review_preset( + ["codex-coder"], + ["codex-reviewer", "codex-reviewer"], + [], + ) + + self.assertEqual( + [step.output_key for step in steps], + ["plan_review_codex_reviewer", "plan_review_codex_reviewer_2"], + ) + + def test_plan_review_with_senior_adds_aggregate_step(self) -> None: + steps = _build_plan_review_preset( + ["codex-coder"], + ["claude-reviewer", "codex-reviewer"], + ["claude-senior"], + ) + + self.assertEqual(steps[-1].name, "senior_review") + self.assertEqual(steps[-1].agent, "claude-senior") + self.assertTrue(steps[-1].verdict) + self.assertFalse(steps[0].verdict) + self.assertFalse(steps[1].verdict) + def test_cross_review_duplicate_coders_get_unique_step_keys(self) -> None: steps = _build_cross_review_preset( ["codex-coder", "codex-coder"], @@ -246,7 +462,7 @@ class BuiltinAgentConfigTest(unittest.TestCase): steps = phases[0].steps self.assertEqual(steps[2].name, "aggregate_review") self.assertEqual(steps[2].agent, "codex-senior") - self.assertEqual(steps[3].name, "generate") + self.assertEqual(steps[3].name, "coding") self.assertEqual(steps[4].name, "verify") self.assertEqual(steps[4].agent, "codex-senior") self.assertTrue(steps[4].verdict) @@ -273,7 +489,7 @@ class BuiltinAgentConfigTest(unittest.TestCase): self.assertEqual( [step.name for step in steps], - ["generate", "review", "senior_review"], + ["coding", "review", "senior_review"], ) self.assertFalse(steps[1].verdict) self.assertTrue(steps[2].verdict) @@ -325,6 +541,8 @@ class PromptTemplateTest(unittest.TestCase): for tmpl, label in [ (REVIEW_TEMPLATE, "REVIEW_TEMPLATE"), (REVIEW_TEMPLATE_KO, "REVIEW_TEMPLATE_KO"), + (PLAN_REVIEW_TEMPLATE, "PLAN_REVIEW_TEMPLATE"), + (PLAN_REVIEW_TEMPLATE_KO, "PLAN_REVIEW_TEMPLATE_KO"), (REVIEW_ONLY_TEMPLATE, "REVIEW_ONLY_TEMPLATE"), (REVIEW_ONLY_TEMPLATE_KO, "REVIEW_ONLY_TEMPLATE_KO"), ]: @@ -351,10 +569,10 @@ class PromptTemplateTest(unittest.TestCase): self.assertIn("CONFIRMED", tmpl) self.assertIn("DISMISSED", tmpl) - def test_generate_templates_ignore_dismissed(self) -> None: - """Generate templates should tell coder to ignore DISMISSED items.""" - self.assertIn("DISMISSED", GENERATE_TEMPLATE) - self.assertIn("DISMISSED", GENERATE_TEMPLATE_KO) + def test_coding_templates_ignore_dismissed(self) -> None: + """Coding templates should tell coder to ignore DISMISSED items.""" + self.assertIn("DISMISSED", CODING_TEMPLATE) + self.assertIn("DISMISSED", CODING_TEMPLATE_KO) def test_aggregate_templates_dismissed_structure(self) -> None: """Aggregate templates should use [False positive] / [Already fixed] tags.""" @@ -487,11 +705,11 @@ class ReviewMetricsParsingTest(unittest.TestCase): language="en", pipeline=[ StepConfig( - name="generate", + name="coding", agent="claude-coder", - role="generate", - prompt_template="default:generate", - output_key="generated_code", + role="coding", + prompt_template="default:coding", + output_key="coding_output", verdict=True, ), ], @@ -500,7 +718,7 @@ class ReviewMetricsParsingTest(unittest.TestCase): iterations=[ IterationResult( iteration=1, - step_outputs={"generated_code": "some code"}, + step_outputs={"coding_output": "some code"}, verdict="PASS", ), ], @@ -511,5 +729,230 @@ class ReviewMetricsParsingTest(unittest.TestCase): self.assertNotIn("Review Metrics", report) +class EscalateVerdictTest(unittest.TestCase): + """Test ESCALATE verdict functionality.""" + + def test_extract_verdict_escalate(self) -> None: + output = "Some review content\n\nVERDICT: ESCALATE\n" + result = _extract_verdict(output, r"VERDICT:\s*PASS") + self.assertEqual(result, "ESCALATE") + + def test_extract_verdict_escalate_priority(self) -> None: + """ESCALATE should take priority even if PASS pattern also matches.""" + output = "VERDICT: PASS\n\nVERDICT: ESCALATE\n" + result = _extract_verdict(output, r"VERDICT:\s*PASS") + self.assertEqual(result, "ESCALATE") + + def test_extract_verdict_pass_still_works(self) -> None: + output = "All good\n\nVERDICT: PASS\n" + result = _extract_verdict(output, r"VERDICT:\s*PASS") + self.assertEqual(result, "PASS") + + def test_extract_verdict_fail_still_works(self) -> None: + output = "Issues found\n\nVERDICT: FAIL\n" + result = _extract_verdict(output, r"VERDICT:\s*PASS") + self.assertEqual(result, "FAIL") + + def test_extract_senior_tracker(self) -> None: + output = ( + "Some text\n\n" + "## Issue Tracker\n" + "| ISS-ID | Severity | Description | Status | Since |\n" + "|--------|----------|-------------|--------|-------|\n" + "| ISS-001 | Critical | Missing auth | Open | v1 |\n" + "| ISS-002 | Major | Bad naming | Fixed | v1 |\n" + "\nMore text" + ) + tracker = _extract_senior_tracker(output) + self.assertIn("Issue Tracker", tracker) + self.assertIn("ISS-001", tracker) + self.assertIn("ISS-002", tracker) + + def test_extract_senior_tracker_empty(self) -> None: + output = "No tracker table here" + tracker = _extract_senior_tracker(output) + self.assertEqual(tracker, "") + + def test_auto_escalate_heuristic(self) -> None: + prev1 = "Issue in src/auth.py: missing validation" + prev2 = "Issue in src/auth.py: validation still missing" + current = "Issue in src/auth.py: validation not implemented" + + # Should detect repeated issue + self.assertTrue(_detect_auto_escalate([prev1, prev2], current, threshold=2)) + + def test_auto_escalate_no_repeat(self) -> None: + prev1 = "Issue in src/auth.py: missing validation" + current = "Issue in src/database.py: connection pool" + + self.assertFalse(_detect_auto_escalate([prev1], current, threshold=2)) + + def test_auto_escalate_different_issues_same_file(self) -> None: + """Same file path but different issues should NOT trigger escalation.""" + prev1 = "Issue in src/utils.py: missing validation on input" + prev2 = "Issue in src/utils.py: unused import at top of file" + current = "Issue in src/utils.py: error handling not implemented" + + # All mention src/utils.py, but the issue keywords differ across + # iterations, so this should NOT escalate. + self.assertFalse(_detect_auto_escalate([prev1, prev2], current, threshold=2)) + + def test_report_escalate_verdict(self) -> None: + config = PipelineConfig(language="en") + result = PipelineResult( + final_verdict="ESCALATE", + escalated_issues=["Requirements are ambiguous — need stakeholder input"], + ) + + report = build_report(config, result) + + self.assertIn("ESCALATE", report) + self.assertIn("Human review required", report) + self.assertIn("ambiguous", report) + + def test_report_escalate_verdict_ko(self) -> None: + config = PipelineConfig(language="ko") + result = PipelineResult( + final_verdict="ESCALATE", + escalated_issues=["요구사항이 모호함"], + ) + + report = build_report(config, result) + + self.assertIn("ESCALATE", report) + self.assertIn("사람의 확인이 필요합니다", report) + + def test_exit_code_escalate(self) -> None: + from cross_eval.cli import main + + mock_result = PipelineResult( + final_verdict="ESCALATE", + escalated_issues=["Needs human review"], + ) + + with patch("cross_eval.config.load_config") as mock_load, \ + patch("cross_eval.config.validate_config", return_value=[]), \ + patch("cross_eval.pipeline.run_pipeline", return_value=mock_result), \ + patch("cross_eval.report.print_escalation_report"): + mock_config = PipelineConfig( + pipeline=[ + StepConfig( + name="review", + agent="claude-reviewer", + role="review", + prompt_template="default:review", + output_key="review_result", + verdict=True, + ), + ], + agents=dict(BUILTIN_AGENTS), + coders=["claude-coder"], + reviewers=["claude-reviewer"], + inputs={"plan": Path("/tmp/plan.md")}, + language="en", + max_iterations=3, + preset_name="simple", + ) + mock_load.return_value = mock_config + + with tempfile.NamedTemporaryFile(suffix=".yaml", mode="w") as f: + f.write("inputs:\n plan: /tmp/plan.md\n") + f.flush() + exit_code = main(["run", "-c", f.name]) + + self.assertEqual(exit_code, 2) + + def test_senior_prompt_includes_escalate(self) -> None: + self.assertIn("ESCALATE", _SENIOR_SYSTEM_PROMPT) + self.assertIn("ambiguous", _SENIOR_SYSTEM_PROMPT.lower()) + + def test_aggregate_template_has_tracker(self) -> None: + self.assertIn("{previous_senior_tracker}", AGGREGATE_REVIEW_TEMPLATE) + self.assertIn("Issue Tracker", AGGREGATE_REVIEW_TEMPLATE) + self.assertIn("VERDICT: ESCALATE", AGGREGATE_REVIEW_TEMPLATE) + + def test_report_includes_issue_tracker_summary(self) -> None: + config = PipelineConfig( + language="en", + pipeline=[ + StepConfig( + name="review", + agent="claude-reviewer", + role="review", + prompt_template="default:review", + output_key="review_result", + verdict=True, + ), + ], + ) + result = PipelineResult( + iterations=[ + IterationResult( + iteration=1, + step_outputs={ + "review_result": ( + "### Issues Found\n" + "- ISS-001 [Critical][Omission] Missing auth check\n" + "- ISS-002 [Major][Omission] No input validation\n" + "### Verdict\nVERDICT: FAIL" + ), + }, + verdict="FAIL", + ), + ], + final_verdict="FAIL", + ) + + report = build_report(config, result) + self.assertIn("Issue Tracker Summary", report) + self.assertIn("ISS-001", report) + self.assertIn("ISS-002", report) + + def test_report_includes_senior_tracker_table(self) -> None: + config = PipelineConfig( + language="en", + pipeline=[ + StepConfig( + name="senior_review", + agent="claude-senior", + role="review", + prompt_template="default:aggregate-review", + output_key="senior_review_result", + verdict=True, + ), + ], + ) + result = PipelineResult( + iterations=[ + IterationResult( + iteration=1, + step_outputs={ + "senior_review_result": ( + "### Confirmed Issues\n- Missing auth\n\n" + "## Issue Tracker\n" + "| ISS-ID | Severity | Description | Status | Since |\n" + "|--------|----------|-------------|--------|-------|\n" + "| ISS-001 | Critical | Missing auth check | Open | v1 |\n" + "| ISS-002 | Major | No validation | Fixed | v1 |\n" + "\n### Verdict\nVERDICT: FAIL" + ), + }, + verdict="FAIL", + ), + ], + final_verdict="FAIL", + ) + + report = build_report(config, result) + self.assertIn("Issue Tracker Summary", report) + self.assertIn("ISS-001", report) + self.assertIn("Fixed", report) + + def test_aggregate_template_ko_has_tracker(self) -> None: + self.assertIn("{previous_senior_tracker}", AGGREGATE_REVIEW_TEMPLATE_KO) + self.assertIn("이슈 트래커", AGGREGATE_REVIEW_TEMPLATE_KO) + self.assertIn("VERDICT: ESCALATE", AGGREGATE_REVIEW_TEMPLATE_KO) + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_onboarding.py b/tests/test_onboarding.py new file mode 100644 index 0000000..4596cb0 --- /dev/null +++ b/tests/test_onboarding.py @@ -0,0 +1,267 @@ +"""Tests for doctor, demo, and guided init features.""" +from __future__ import annotations + +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch, MagicMock + +from cross_eval.doctor import ( + DoctorCheck, + check_cli_installed, + check_config, + format_doctor_results, + run_doctor, +) +from cross_eval.demo import ( + DEMO_CHECKLIST, + DEMO_PLAN, + run_mock_demo, +) +from cross_eval.cli import ( + _generate_guided_config, + _prompt_choice, + _prompt_text, + main, +) + + +# --------------------------------------------------------------------------- +# Doctor tests +# --------------------------------------------------------------------------- + +class DoctorCheckInstalledTest(unittest.TestCase): + def test_check_cli_installed_found(self) -> None: + with patch("cross_eval.doctor.shutil.which", return_value="/usr/bin/python3"): + with patch("cross_eval.doctor.subprocess.run") as mock_run: + mock_run.return_value = MagicMock( + stdout="Python 3.12.0", stderr="" + ) + found, version = check_cli_installed("python3") + + self.assertTrue(found) + self.assertIn("Python", version) + + def test_check_cli_installed_not_found(self) -> None: + with patch("cross_eval.doctor.shutil.which", return_value=None): + found, msg = check_cli_installed("nonexistent-tool") + + self.assertFalse(found) + self.assertIn("not found", msg) + + def test_check_config_exists_valid(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + ce_dir = Path(tmpdir) / ".cross-eval" + ce_dir.mkdir() + config_path = ce_dir / "config.yaml" + config_path.write_text( + "inputs:\n plan: plan.md\ncoders: [claude-coder]\n" + "reviewers: [claude-reviewer]\npipeline: preset:simple\n", + encoding="utf-8", + ) + # Also create plan.md so validation passes + (ce_dir / "plan.md").write_text("# Plan", encoding="utf-8") + + ok, path, errors = check_config(Path(tmpdir)) + + self.assertTrue(ok) + self.assertIsNotNone(path) + self.assertEqual(errors, []) + + def test_check_config_not_exists(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + ok, path, errors = check_config(Path(tmpdir)) + + self.assertFalse(ok) + self.assertIsNone(path) + + def test_check_config_invalid(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + ce_dir = Path(tmpdir) / ".cross-eval" + ce_dir.mkdir() + # Valid YAML but missing required fields → validation fails + (ce_dir / "config.yaml").write_text( + "inputs:\n plan: /nonexistent/plan.md\n", + encoding="utf-8", + ) + + ok, path, errors = check_config(Path(tmpdir)) + + self.assertFalse(ok) + self.assertIsNotNone(path) + + def test_format_doctor_results_all_pass(self) -> None: + checks = [ + DoctorCheck("test", True, True, "ok"), + DoctorCheck("test2", True, False, "ok"), + ] + output = format_doctor_results(checks) + self.assertIn("✓", output) + self.assertIn("All checks passed", output) + + def test_format_doctor_results_critical_fail(self) -> None: + checks = [ + DoctorCheck("claude CLI", False, True, "not found"), + ] + output = format_doctor_results(checks) + self.assertIn("✗", output) + self.assertIn("critical", output.lower()) + + def test_cmd_doctor_returns_0_all_pass(self) -> None: + with patch("cross_eval.doctor.run_doctor") as mock: + mock.return_value = [ + DoctorCheck("test", True, True, "ok"), + ] + exit_code = main(["doctor"]) + self.assertEqual(exit_code, 0) + + def test_cmd_doctor_returns_1_critical_fail(self) -> None: + with patch("cross_eval.doctor.run_doctor") as mock: + mock.return_value = [ + DoctorCheck("claude CLI", False, True, "not found"), + ] + exit_code = main(["doctor"]) + self.assertEqual(exit_code, 1) + + +# --------------------------------------------------------------------------- +# Demo tests +# --------------------------------------------------------------------------- + +class DemoTest(unittest.TestCase): + def test_demo_plan_is_nonempty(self) -> None: + self.assertIn("fibonacci", DEMO_PLAN.lower()) + + def test_demo_checklist_is_nonempty(self) -> None: + self.assertIn("fibonacci", DEMO_CHECKLIST.lower()) + + def test_mock_demo_runs_without_error(self) -> None: + # Should not raise + with patch("sys.stdout"): + run_mock_demo(preset="simple") + + def test_mock_demo_escalate_runs_without_error(self) -> None: + with patch("sys.stdout"): + run_mock_demo(preset="simple", show_escalate=True) + + def test_cmd_demo_mock_default(self) -> None: + with patch("cross_eval.demo.run_mock_demo") as mock: + exit_code = main(["demo"]) + mock.assert_called_once_with(preset="simple", show_escalate=False) + self.assertEqual(exit_code, 0) + + def test_cmd_demo_escalate_flag(self) -> None: + with patch("cross_eval.demo.run_mock_demo") as mock: + exit_code = main(["demo", "--escalate"]) + mock.assert_called_once_with(preset="simple", show_escalate=True) + self.assertEqual(exit_code, 0) + + def test_cmd_demo_live_requires_confirmation(self) -> None: + with patch("builtins.input", return_value="n"): + exit_code = main(["demo", "--live"]) + self.assertEqual(exit_code, 0) + + +# --------------------------------------------------------------------------- +# Guided init tests +# --------------------------------------------------------------------------- + +class GuidedInitTest(unittest.TestCase): + def test_prompt_choice_default(self) -> None: + with patch("builtins.input", return_value=""): + result = _prompt_choice("Pick:", ["a", "b", "c"], default=2) + self.assertEqual(result, "b") + + def test_prompt_choice_by_number(self) -> None: + with patch("builtins.input", return_value="3"): + result = _prompt_choice("Pick:", ["a", "b", "c"], default=1) + self.assertEqual(result, "c") + + def test_prompt_choice_by_name(self) -> None: + with patch("builtins.input", return_value="simple"): + result = _prompt_choice("Pick:", ["simple", "review-fix"], default=1) + self.assertEqual(result, "simple") + + def test_prompt_text_default(self) -> None: + with patch("builtins.input", return_value=""): + result = _prompt_text("Name", default="claude") + self.assertEqual(result, "claude") + + def test_prompt_text_custom(self) -> None: + with patch("builtins.input", return_value="codex"): + result = _prompt_text("Name", default="claude") + self.assertEqual(result, "codex") + + def test_generate_guided_config(self) -> None: + config = _generate_guided_config( + "review-fix", "ko", + { + "coder": "claude", + "reviewer": "codex", + "senior": "codex", + "max_iter": 5, + }, + ) + self.assertIn("preset:review-fix", config) + self.assertIn("language: ko", config) + self.assertIn("claude-coder", config) + self.assertIn("codex-reviewer", config) + self.assertIn("codex-senior", config) + self.assertIn("max_iterations: 5", config) + + def test_generate_guided_config_full_name(self) -> None: + config = _generate_guided_config( + "simple", "ko", + { + "coder": "claude-coder", + "reviewer": "codex-reviewer", + "senior": "", + "max_iter": 3, + }, + ) + # Full names should not be double-suffixed + self.assertIn("claude-coder", config) + self.assertNotIn("claude-coder-coder", config) + self.assertIn("codex-reviewer", config) + self.assertNotIn("codex-reviewer-reviewer", config) + + def test_generate_guided_config_no_senior(self) -> None: + config = _generate_guided_config( + "simple", "en", + { + "coder": "claude", + "reviewer": "claude", + "senior": "", + "max_iter": 3, + }, + ) + self.assertNotIn("senior", config.lower()) + + def test_guided_init_creates_files(self) -> None: + # Simulate guided init with all defaults + inputs = iter(["", "", "", "", "", "", ""]) + with tempfile.TemporaryDirectory() as tmpdir: + with patch("builtins.input", side_effect=lambda _="": next(inputs, "")): + exit_code = main(["init", "--guided", "--dir", tmpdir]) + + config_path = Path(tmpdir) / ".cross-eval" / "config.yaml" + self.assertTrue(config_path.exists()) + self.assertEqual(exit_code, 0) + + def test_guided_init_preserves_existing_files(self) -> None: + inputs = iter(["", "", "", "", "", "", ""]) + with tempfile.TemporaryDirectory() as tmpdir: + ce_dir = Path(tmpdir) / ".cross-eval" + ce_dir.mkdir() + existing = ce_dir / "config.yaml" + existing.write_text("# existing", encoding="utf-8") + + with patch("builtins.input", side_effect=lambda _="": next(inputs, "")): + main(["init", "--guided", "--dir", tmpdir]) + + # Should not overwrite + self.assertEqual(existing.read_text(), "# existing") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_pipeline_integration.py b/tests/test_pipeline_integration.py new file mode 100644 index 0000000..f44eba2 --- /dev/null +++ b/tests/test_pipeline_integration.py @@ -0,0 +1,461 @@ +"""Integration tests for cross-eval pipeline with mocked agents.""" +from __future__ import annotations + +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + +from cross_eval.config import BUILTIN_AGENTS +from cross_eval.models import ( + AgentConfig, + AgentResult, + PhaseConfig, + PipelineConfig, + StepConfig, +) +from cross_eval.pipeline import run_pipeline +from cross_eval.prompts import _build_review_fix_preset, _build_simple_preset + + +def _make_mock_agent(outputs: list[str]): + """Returns a side_effect function that returns outputs in sequence.""" + call_count = [0] + + def _mock(agent_config, prompt, step_name, **kwargs): + idx = min(call_count[0], len(outputs) - 1) + call_count[0] += 1 + return AgentResult( + output=outputs[idx], + exit_code=0, + agent_name=agent_config.name, + step_name=step_name, + duration_seconds=0.1, + ) + + return _mock + + +def _make_step_mock(step_outputs: dict[str, list[str]]): + """Returns a side_effect that dispatches by step_name, cycling through outputs.""" + counters: dict[str, int] = {} + + def _mock(agent_config, prompt, step_name, **kwargs): + if step_name not in counters: + counters[step_name] = 0 + outputs = step_outputs.get(step_name, [""]) + idx = min(counters[step_name], len(outputs) - 1) + counters[step_name] += 1 + return AgentResult( + output=outputs[idx], + exit_code=0, + agent_name=agent_config.name, + step_name=step_name, + duration_seconds=0.1, + ) + + return _mock + + +def _minimal_simple_config( + run_dir: Path, + max_iterations: int = 3, + seniors: list[str] | None = None, +) -> PipelineConfig: + """Build a minimal simple pipeline config for testing.""" + coders = ["claude-coder"] + reviewers = ["claude-reviewer"] + senior_list = seniors if seniors is not None else [] + steps = _build_simple_preset(coders, reviewers, senior_list) + agents = dict(BUILTIN_AGENTS) + return PipelineConfig( + output_dir=run_dir, + max_iterations=max_iterations, + min_iterations=1, + language="en", + inputs={"plan": "Test plan", "checklist": "Test checklist"}, + agents=agents, + coders=coders, + reviewers=reviewers, + seniors=senior_list, + pipeline=steps, + preset_name="simple", + ) + + +class TestSimplePipelinePassStopsLoop(unittest.TestCase): + """Test 1: mock agent returns VERDICT: PASS on first review -> stops at iteration 1.""" + + def test_simple_pipeline_pass_stops_loop(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + config = _minimal_simple_config(Path(tmpdir)) + + mock = _make_mock_agent([ + "Coding output here", # coding step + "All good\n\nVERDICT: PASS", # review step + ]) + + with patch("cross_eval.pipeline.invoke_agent", side_effect=mock): + result = run_pipeline(config) + + self.assertEqual(result.final_verdict, "PASS") + self.assertEqual(len(result.iterations), 1) + + +class TestSimplePipelineFailThenPass(unittest.TestCase): + """Test 2: FAIL on first review, PASS on second -> 2 iterations.""" + + def test_simple_pipeline_fail_then_pass(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + config = _minimal_simple_config(Path(tmpdir), max_iterations=5) + + mock = _make_step_mock({ + "coding": ["Coding output v1", "Coding output v2"], + "review": [ + "Issues found\n\nVERDICT: FAIL", + "All good\n\nVERDICT: PASS", + ], + }) + + with patch("cross_eval.pipeline.invoke_agent", side_effect=mock): + result = run_pipeline(config) + + self.assertEqual(result.final_verdict, "PASS") + self.assertEqual(len(result.iterations), 2) + + +class TestSimplePipelineEscalateBreaksLoop(unittest.TestCase): + """Test 3: ESCALATE on review -> stops immediately, final_verdict=ESCALATE.""" + + def test_simple_pipeline_escalate_breaks_loop(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + config = _minimal_simple_config( + Path(tmpdir), max_iterations=5, seniors=["claude-senior"], + ) + + escalate_output = ( + "### Confirmed Issues\n" + "- [Critical] Requirements are ambiguous\n\n" + "### Escalated Issues\n" + "Requirements need stakeholder clarification\n\n" + "### Verdict\n" + "VERDICT: ESCALATE\n" + ) + + mock = _make_step_mock({ + "coding": ["Coding output"], + "review": ["Issues found\n\nVERDICT: FAIL"], + "senior_review": [escalate_output], + }) + + with patch("cross_eval.pipeline.invoke_agent", side_effect=mock): + result = run_pipeline(config) + + self.assertEqual(result.final_verdict, "ESCALATE") + self.assertEqual(len(result.iterations), 1) + self.assertTrue(len(result.escalated_issues) > 0) + + +class TestSimplePipelineEscalatePriorityOverPass(unittest.TestCase): + """Test 4: one verdict step returns PASS, another returns ESCALATE -> ESCALATE wins.""" + + def test_simple_pipeline_escalate_priority_over_pass(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + # Build a custom pipeline with 2 verdict steps (no senior) + steps = [ + StepConfig( + name="coding", + agent="claude-coder", + role="coding", + prompt_template="default:coding", + output_key="coding_output", + ), + StepConfig( + name="review_a", + agent="claude-reviewer", + role="review", + prompt_template="default:review", + output_key="review_a_result", + verdict=True, + ), + StepConfig( + name="review_b", + agent="claude-reviewer", + role="review", + prompt_template="default:review", + output_key="review_b_result", + verdict=True, + ), + ] + config = PipelineConfig( + output_dir=Path(tmpdir), + max_iterations=3, + min_iterations=1, + language="en", + inputs={"plan": "Test plan", "checklist": "Test checklist"}, + agents=dict(BUILTIN_AGENTS), + coders=["claude-coder"], + reviewers=["claude-reviewer"], + pipeline=steps, + preset_name="custom", + ) + + escalate_output = ( + "### Escalated Issues\n" + "Ambiguous requirements need clarification\n\n" + "VERDICT: ESCALATE\n" + ) + + mock = _make_step_mock({ + "coding": ["Coding output"], + "review_a": ["All good\n\nVERDICT: PASS"], + "review_b": [escalate_output], + }) + + with patch("cross_eval.pipeline.invoke_agent", side_effect=mock): + result = run_pipeline(config) + + self.assertEqual(result.final_verdict, "ESCALATE") + self.assertTrue(len(result.escalated_issues) > 0) + + +class TestPhasedPipelineEscalateBreaksPhase(unittest.TestCase): + """Test 5: phased pipeline (review-fix), verify step returns ESCALATE -> phase stops.""" + + def test_phased_pipeline_escalate_breaks_phase(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + coders = ["claude-coder"] + reviewers = ["claude-reviewer"] + seniors = ["claude-senior"] + phases = _build_review_fix_preset(coders, reviewers, seniors) + + config = PipelineConfig( + output_dir=Path(tmpdir), + max_iterations=5, + min_iterations=1, + language="en", + inputs={"plan": "Test plan", "checklist": "Test checklist"}, + agents=dict(BUILTIN_AGENTS), + coders=coders, + reviewers=reviewers, + seniors=seniors, + phases=phases, + preset_name="review-fix", + ) + + escalate_output = ( + "### Escalated Issues\n" + "Architecture decisions needed beyond plan scope\n\n" + "### Verdict\n" + "VERDICT: ESCALATE\n" + ) + + mock = _make_step_mock({ + "review_claude_reviewer": ["Review findings here"], + "aggregate_review": ["Aggregated review\n\nAction items: fix X"], + "coding": ["Fixed code"], + "verify": [escalate_output], + }) + + with patch("cross_eval.pipeline.invoke_agent", side_effect=mock): + result = run_pipeline(config) + + self.assertEqual(result.final_verdict, "ESCALATE") + self.assertTrue(len(result.escalated_issues) > 0) + + +class TestAutoEscalateFiresWithoutSenior(unittest.TestCase): + """Test 6: simple pipeline without senior, same FAIL feedback 3 times -> auto-escalate.""" + + def test_auto_escalate_fires_without_senior(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + # No seniors -> review step has verdict=True + config = _minimal_simple_config( + Path(tmpdir), max_iterations=5, seniors=None, + ) + + # Same feedback mentioning the same file paths across all iterations + repeated_fail = ( + "Issues found in src/auth.py: missing validation check.\n" + "The file src/auth.py still has the same problem.\n\n" + "VERDICT: FAIL" + ) + + mock = _make_step_mock({ + "coding": ["Coding output v1", "Coding output v2", "Coding output v3"], + "review": [repeated_fail, repeated_fail, repeated_fail], + }) + + with patch("cross_eval.pipeline.invoke_agent", side_effect=mock): + result = run_pipeline(config) + + self.assertEqual(result.final_verdict, "ESCALATE") + self.assertTrue( + any("Auto-escalated" in iss for iss in result.escalated_issues), + ) + + +class TestAutoEscalateDoesNotFireWithSenior(unittest.TestCase): + """Test 7: same repeated FAIL but WITH senior/aggregate step -> no auto-escalate.""" + + def test_auto_escalate_does_not_fire_with_senior(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + # With seniors -> senior_review step has verdict=True, review does not + config = _minimal_simple_config( + Path(tmpdir), max_iterations=5, seniors=["claude-senior"], + ) + + repeated_fail_review = ( + "Issues found in src/auth.py: missing validation check.\n" + "VERDICT: FAIL" + ) + # Senior also returns FAIL but the auto-escalate should NOT fire + # because has_aggregator is True (seniors list is populated) + senior_fail = ( + "### Confirmed Issues\n" + "- Missing validation in src/auth.py\n\n" + "### Action Items\n" + "1. Add validation in src/auth.py\n\n" + "VERDICT: FAIL" + ) + + mock = _make_step_mock({ + "coding": [ + "Coding output v1", + "Coding output v2", + "Coding output v3", + "Coding output v4", + "Coding output v5", + ], + "review": [ + repeated_fail_review, + repeated_fail_review, + repeated_fail_review, + repeated_fail_review, + repeated_fail_review, + ], + "senior_review": [ + senior_fail, + senior_fail, + senior_fail, + senior_fail, + senior_fail, + ], + }) + + with patch("cross_eval.pipeline.invoke_agent", side_effect=mock): + result = run_pipeline(config) + + # Should NOT auto-escalate; should reach max iterations + self.assertNotEqual(result.final_verdict, "ESCALATE") + self.assertEqual(result.final_verdict, "MAX_ITERATIONS_REACHED") + self.assertEqual(len(result.iterations), 5) + + +class TestTrackerExtractionAcrossIterations(unittest.TestCase): + """Test 8: senior review output with Issue Tracker table -> passed to next iteration.""" + + def test_tracker_extraction_across_iterations(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + config = _minimal_simple_config( + Path(tmpdir), max_iterations=3, seniors=["claude-senior"], + ) + + tracker_table = ( + "## Issue Tracker\n" + "| ISS-ID | Severity | Description | Status | Since |\n" + "|--------|----------|-------------|--------|-------|\n" + "| ISS-001 | Critical | Missing auth check | Open | v1 |\n" + "| ISS-002 | Major | No validation | Open | v1 |\n" + ) + senior_output_v1 = ( + "### Confirmed Issues\n" + "- Missing auth\n\n" + f"{tracker_table}\n" + "### Verdict\n" + "VERDICT: FAIL" + ) + senior_output_v2 = ( + "### Confirmed Issues\n" + "- None remaining\n\n" + "## Issue Tracker\n" + "| ISS-ID | Severity | Description | Status | Since |\n" + "|--------|----------|-------------|--------|-------|\n" + "| ISS-001 | Critical | Missing auth check | Fixed | v1 |\n" + "| ISS-002 | Major | No validation | Fixed | v1 |\n" + "\n### Verdict\n" + "VERDICT: PASS" + ) + + captured_prompts: list[dict[str, str]] = [] + + def _tracking_mock(agent_config, prompt, step_name, **kwargs): + captured_prompts.append({ + "step_name": step_name, + "prompt": prompt, + "agent_name": agent_config.name, + }) + if step_name == "coding": + return AgentResult( + output="Coding output", + exit_code=0, + agent_name=agent_config.name, + step_name=step_name, + duration_seconds=0.1, + ) + elif step_name == "review": + return AgentResult( + output="Review findings\n\nVERDICT: FAIL", + exit_code=0, + agent_name=agent_config.name, + step_name=step_name, + duration_seconds=0.1, + ) + elif step_name == "senior_review": + # First call: FAIL with tracker, second call: PASS + senior_calls = [ + p for p in captured_prompts if p["step_name"] == "senior_review" + ] + if len(senior_calls) <= 1: + output = senior_output_v1 + else: + output = senior_output_v2 + return AgentResult( + output=output, + exit_code=0, + agent_name=agent_config.name, + step_name=step_name, + duration_seconds=0.1, + ) + return AgentResult( + output="", + exit_code=0, + agent_name=agent_config.name, + step_name=step_name, + duration_seconds=0.1, + ) + + with patch("cross_eval.pipeline.invoke_agent", side_effect=_tracking_mock): + result = run_pipeline(config) + + self.assertEqual(result.final_verdict, "PASS") + self.assertEqual(len(result.iterations), 2) + + # Verify that the second iteration's senior_review prompt contains + # the tracker table from iteration 1 + iter2_senior_prompts = [ + p for p in captured_prompts + if p["step_name"] == "senior_review" + and "ISS-001" in p["prompt"] + and "Missing auth check" in p["prompt"] + ] + # The second senior_review call should have the tracker in its prompt + self.assertTrue( + len(iter2_senior_prompts) >= 1, + "Expected previous_senior_tracker content (ISS-001) to appear " + "in at least one senior_review prompt", + ) + + +if __name__ == "__main__": + unittest.main()