Add 3-verdict system (PASS/FAIL/ESCALATE) with priority handling across simple and phased pipelines. Senior reviewers can now escalate issues requiring human intervention, immediately breaking the review loop. - ESCALATE verdict extraction with highest priority over PASS/FAIL - Issue Tracker tables (ISS-NNN) carried across iterations - Auto-escalate heuristic using (file, keyword) composite fingerprints - Report restructuring: executive view first (verdict → tracker → metrics) - Onboarding: `doctor`, `demo`, `init --guided` commands - Exit codes: PASS=0, FAIL=1, ESCALATE=2 - 87 tests passing (54 config + 25 onboarding + 8 integration) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
201 lines
6.4 KiB
Python
201 lines
6.4 KiB
Python
"""Environment health checks for cross-eval."""
|
|
from __future__ import annotations
|
|
|
|
import shutil
|
|
import subprocess
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
|
|
@dataclass
|
|
class DoctorCheck:
|
|
"""Result of a single health check."""
|
|
|
|
name: str
|
|
passed: bool
|
|
critical: bool
|
|
message: str
|
|
detail: Optional[str] = None
|
|
|
|
|
|
def check_cli_installed(command: str) -> tuple[bool, str]:
|
|
"""Check if a CLI tool is on PATH and get its version."""
|
|
path = shutil.which(command)
|
|
if not path:
|
|
return False, f"'{command}' not found on PATH"
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[command, "--version"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
version = (result.stdout.strip() or result.stderr.strip()).split("\n")[0]
|
|
return True, version or "(version unknown)"
|
|
except (subprocess.TimeoutExpired, OSError):
|
|
return True, "(installed but version check failed)"
|
|
|
|
|
|
def check_cli_authenticated(command: str) -> tuple[bool, str]:
|
|
"""Check if a CLI tool is authenticated by running a minimal probe."""
|
|
path = shutil.which(command)
|
|
if not path:
|
|
return False, "not installed"
|
|
|
|
if command == "claude":
|
|
try:
|
|
result = subprocess.run(
|
|
[command, "-p", "--model", "haiku", "--max-turns", "1"],
|
|
input="respond with just 'ok'",
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
)
|
|
combined = result.stdout + result.stderr
|
|
if any(kw in combined.lower() for kw in (
|
|
"not logged in", "login", "unauthorized", "unauthenticated",
|
|
"api key", "invalid key",
|
|
)):
|
|
return False, "not authenticated — run: claude login"
|
|
if result.returncode == 0:
|
|
return True, "authenticated"
|
|
return False, f"exit code {result.returncode}: {combined[:100]}"
|
|
except subprocess.TimeoutExpired:
|
|
return False, "timed out (30s) — possible network issue"
|
|
except OSError as e:
|
|
return False, str(e)
|
|
|
|
elif command == "codex":
|
|
try:
|
|
result = subprocess.run(
|
|
[command, "--version"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
combined = result.stdout + result.stderr
|
|
if any(kw in combined.lower() for kw in (
|
|
"not logged in", "login", "unauthorized", "api key",
|
|
)):
|
|
return False, "not authenticated — run: codex login"
|
|
return True, "installed (auth check: codex login if needed)"
|
|
except (subprocess.TimeoutExpired, OSError) as e:
|
|
return False, str(e)
|
|
|
|
return False, f"unknown command: {command}"
|
|
|
|
|
|
def check_config(directory: Path) -> tuple[bool, Optional[Path], list[str]]:
|
|
"""Check if config.yaml exists and is valid."""
|
|
config_path = directory / ".cross-eval" / "config.yaml"
|
|
if not config_path.exists():
|
|
return False, None, []
|
|
|
|
try:
|
|
from cross_eval.config import load_config
|
|
load_config(config_path)
|
|
return True, config_path, []
|
|
except (ValueError, FileNotFoundError) as e:
|
|
return False, config_path, [str(e)]
|
|
|
|
|
|
def run_doctor(directory: Path) -> list[DoctorCheck]:
|
|
"""Run all health checks and return results."""
|
|
checks: list[DoctorCheck] = []
|
|
|
|
# 1. claude CLI
|
|
installed, version = check_cli_installed("claude")
|
|
checks.append(DoctorCheck(
|
|
name="claude CLI",
|
|
passed=installed,
|
|
critical=True,
|
|
message=version if installed else "not found",
|
|
detail="Install: https://docs.anthropic.com/en/docs/claude-code" if not installed else None,
|
|
))
|
|
|
|
if installed:
|
|
auth_ok, auth_msg = check_cli_authenticated("claude")
|
|
checks.append(DoctorCheck(
|
|
name="claude auth",
|
|
passed=auth_ok,
|
|
critical=True,
|
|
message=auth_msg,
|
|
))
|
|
|
|
# 2. codex CLI
|
|
installed, version = check_cli_installed("codex")
|
|
checks.append(DoctorCheck(
|
|
name="codex CLI",
|
|
passed=installed,
|
|
critical=False,
|
|
message=version if installed else "not found (optional)",
|
|
detail="Install: https://github.com/openai/codex" if not installed else None,
|
|
))
|
|
|
|
if installed:
|
|
auth_ok, auth_msg = check_cli_authenticated("codex")
|
|
checks.append(DoctorCheck(
|
|
name="codex auth",
|
|
passed=auth_ok,
|
|
critical=False,
|
|
message=auth_msg,
|
|
))
|
|
|
|
# 3. Config
|
|
config_ok, config_path, config_errors = check_config(directory)
|
|
if config_path is None:
|
|
checks.append(DoctorCheck(
|
|
name="config",
|
|
passed=True, # not having config is fine
|
|
critical=False,
|
|
message="no .cross-eval/config.yaml (will use defaults)",
|
|
detail="Run: cross-eval init",
|
|
))
|
|
elif config_ok:
|
|
checks.append(DoctorCheck(
|
|
name="config",
|
|
passed=True,
|
|
critical=False,
|
|
message=f"valid ({config_path.name})",
|
|
))
|
|
else:
|
|
checks.append(DoctorCheck(
|
|
name="config",
|
|
passed=False,
|
|
critical=True,
|
|
message="invalid config",
|
|
detail="\n".join(config_errors),
|
|
))
|
|
|
|
return checks
|
|
|
|
|
|
def format_doctor_results(checks: list[DoctorCheck]) -> str:
|
|
"""Format doctor check results for terminal output."""
|
|
lines: list[str] = []
|
|
lines.append("\n cross-eval doctor\n")
|
|
|
|
for check in checks:
|
|
icon = " ✓" if check.passed else " ✗"
|
|
lines.append(f"{icon} {check.name}: {check.message}")
|
|
if check.detail and not check.passed:
|
|
for detail_line in check.detail.split("\n"):
|
|
lines.append(f" {detail_line}")
|
|
|
|
# Summary
|
|
failed_critical = [c for c in checks if not c.passed and c.critical]
|
|
failed_warn = [c for c in checks if not c.passed and not c.critical]
|
|
|
|
lines.append("")
|
|
if not failed_critical and not failed_warn:
|
|
lines.append(" All checks passed!")
|
|
elif failed_critical:
|
|
lines.append(f" {len(failed_critical)} critical issue(s) found.")
|
|
else:
|
|
lines.append(f" {len(failed_warn)} warning(s), no critical issues.")
|
|
|
|
lines.append("")
|
|
return "\n".join(lines)
|