Files
cross-eval/cross_eval/config.py
이충영 에이닷서비스개발 ee4f1a07ef initial commit
2026-03-11 21:53:14 +09:00

608 lines
22 KiB
Python

"""Configuration loading, validation, and preset resolution."""
from __future__ import annotations
import logging
import re
from pathlib import Path
from typing import Any
import yaml
from cross_eval.models import AgentConfig, PhaseConfig, PipelineConfig, StepConfig
from cross_eval.prompts import PHASED_PRESETS, PIPELINE_PRESETS
logger = logging.getLogger(__name__)
REASONING_EFFORT_ALIASES = {
"extra-high": "xhigh",
"extra_high": "xhigh",
"x-high": "xhigh",
}
REASONING_EFFORT_CHOICES = ("minimal", "low", "medium", "high", "xhigh")
DEFAULT_ROLE_REASONING_EFFORTS = {
"coder": "medium",
"reviewer": "medium",
"senior": "high",
}
# ---------------------------------------------------------------------------
# Built-in agent registry
# ---------------------------------------------------------------------------
_CODEX_ARGS = [
"exec",
"--full-auto",
"--skip-git-repo-check",
"--model",
"gpt-5.4",
"-",
]
_CODER_SYSTEM_PROMPT = (
"You are a senior software engineer implementing code changes.\n"
"Rules:\n"
"1. FIRST explore the project directory to understand the existing codebase, "
"patterns, and conventions before writing any code.\n"
"2. Implement ONLY what the plan specifies. Do NOT add extra features, "
"unnecessary abstractions, premature optimizations, or \"nice-to-have\" improvements.\n"
"3. Follow the project's existing coding style, naming conventions, and directory structure.\n"
"4. If previous review feedback is provided, fix ONLY the specific issues mentioned. "
"Do NOT refactor unrelated code.\n"
"5. Ignore any items from previous feedback that were marked as DISMISSED or false positive.\n"
"6. When in doubt about scope, do LESS, not more."
)
_REVIEWER_SYSTEM_PROMPT = (
"You are a code reviewer. You MUST NOT create, modify, or delete any files.\n"
"Rules:\n"
"1. Explore the project directory to understand the full codebase context.\n"
"2. Compare the implementation against the plan and checklist ONLY.\n"
"3. Classify every issue with BOTH severity AND category:\n"
" - Severity: Critical (breaks functionality/security) > Major (requirement mismatch) > Minor (convention/style)\n"
" - Category: Over-engineering / Omission\n"
"4. When reviewing with previous feedback, mark items as CONFIRMED (still an issue) "
"or DISMISSED (false positive) with rationale.\n"
"5. Report out-of-scope issues separately — problems found outside plan/checklist scope.\n"
"6. Order issues by severity (Critical first).\n"
"7. Do NOT suggest improvements beyond the plan scope.\n"
"8. End with VERDICT: PASS (all requirements met, no over-engineering) "
"or VERDICT: FAIL (issues found)."
)
_SENIOR_SYSTEM_PROMPT = (
"You are a senior technical reviewer coordinating a review-fix-verification loop.\n"
"Rules:\n"
"1. Explore the project directory to understand the full codebase context.\n"
"2. In aggregation mode, deduplicate overlaps, resolve disagreements, and keep only "
"evidence-backed issues. Categorize dismissed findings as [False positive] or [Already fixed].\n"
"3. In verification mode, judge the current implementation directly against ONLY the "
"plan and checklist.\n"
"4. Be skeptical of false positives, but do not lower the bar on real requirement "
"gaps.\n"
"5. When issues remain, produce a concise prioritized action list the coder can act on.\n"
"6. Do NOT invent new requirements beyond the plan and checklist.\n"
"7. End with VERDICT: PASS or VERDICT: FAIL."
)
BUILTIN_AGENTS: dict[str, AgentConfig] = {
"claude-coder": AgentConfig(
name="claude-coder",
command="claude",
args=["-p", "--model", "opus", "--permission-mode", "auto"],
system_prompt=_CODER_SYSTEM_PROMPT,
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["coder"],
),
"claude-reviewer": AgentConfig(
name="claude-reviewer",
command="claude",
args=["-p", "--model", "opus", "--permission-mode", "auto"],
system_prompt=_REVIEWER_SYSTEM_PROMPT,
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["reviewer"],
),
"claude-senior": AgentConfig(
name="claude-senior",
command="claude",
args=["-p", "--model", "opus", "--permission-mode", "auto"],
system_prompt=_SENIOR_SYSTEM_PROMPT,
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["senior"],
),
"codex-coder": AgentConfig(
name="codex-coder",
command="codex",
args=list(_CODEX_ARGS),
system_prompt=_CODER_SYSTEM_PROMPT,
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["coder"],
),
"codex-reviewer": AgentConfig(
name="codex-reviewer",
command="codex",
args=list(_CODEX_ARGS),
system_prompt=_REVIEWER_SYSTEM_PROMPT,
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["reviewer"],
),
"codex-senior": AgentConfig(
name="codex-senior",
command="codex",
args=list(_CODEX_ARGS),
system_prompt=_SENIOR_SYSTEM_PROMPT,
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["senior"],
),
}
# Shorthand aliases: "claude" → "claude-coder"/"claude-reviewer", "codex" → same
_AGENT_ALIASES: dict[str, str] = {
"claude": "claude",
"codex": "codex",
}
def resolve_agent_shorthand(name: str, role: str) -> str:
"""Resolve shorthand agent name to full builtin name.
Examples:
resolve_agent_shorthand("claude", "coder") → "claude-coder"
resolve_agent_shorthand("codex", "reviewer") → "codex-reviewer"
resolve_agent_shorthand("claude-coder", "coder") → "claude-coder" (unchanged)
"""
if name in _AGENT_ALIASES:
return f"{_AGENT_ALIASES[name]}-{role}"
return name
# ---------------------------------------------------------------------------
# Role inference (backward compatibility)
# ---------------------------------------------------------------------------
_CODER_PATTERNS = ("gen", "coder", "implement", "develop", "write")
_SENIOR_PATTERNS = ("senior", "lead", "principal", "aggregate", "adjudicat", "synth")
_REVIEWER_PATTERNS = ("review", "audit", "check", "verify", "inspect")
def _infer_roles(agent_names: list[str]) -> tuple[list[str], list[str], list[str]]:
"""Infer coder/reviewer/senior roles from agent names.
Heuristic:
- Names containing 'gen', 'coder', etc. → coder
- Names containing 'senior', 'lead', etc. → senior
- Names containing 'review', 'audit', etc. → reviewer
- If no matches: first agent → coder, rest → reviewers
"""
coders: list[str] = []
reviewers: list[str] = []
seniors: list[str] = []
unclassified: list[str] = []
for name in agent_names:
lower = name.lower()
if any(p in lower for p in _CODER_PATTERNS):
coders.append(name)
elif any(p in lower for p in _SENIOR_PATTERNS):
seniors.append(name)
elif any(p in lower for p in _REVIEWER_PATTERNS):
reviewers.append(name)
else:
unclassified.append(name)
# Fallback: if no classification worked, use positional convention
if not coders and not reviewers:
if len(agent_names) >= 2:
coders = [agent_names[0]]
reviewers = list(agent_names[1:])
elif agent_names:
# Single agent: treat as reviewer (for review-only)
reviewers = list(agent_names)
elif not coders and unclassified:
coders = [unclassified.pop(0)]
elif not reviewers and unclassified:
reviewers = list(unclassified)
unclassified = []
# Any remaining unclassified go to reviewers
reviewers.extend(unclassified)
return coders, reviewers, seniors
def _resolve_agents(
user_agents: dict[str, AgentConfig],
coders: list[str],
reviewers: list[str],
seniors: list[str],
) -> dict[str, AgentConfig]:
"""Ensure all referenced agents exist by merging built-in definitions.
If a coder or reviewer name references an agent not in user_agents
but present in BUILTIN_AGENTS, the built-in definition is added.
"""
all_referenced = set(coders) | set(reviewers) | set(seniors)
result = dict(user_agents)
for name in all_referenced:
if name not in result and name in BUILTIN_AGENTS:
result[name] = BUILTIN_AGENTS[name]
return result
def _default_seniors_for_preset(
pipeline_raw: Any,
reviewers: list[str],
agents: dict[str, AgentConfig],
) -> list[str]:
"""Infer a default senior agent for presets that benefit from adjudication."""
if not (
isinstance(pipeline_raw, str)
and pipeline_raw == "preset:review-fix"
and reviewers
):
return []
first_reviewer = reviewers[0]
if first_reviewer.startswith("codex-"):
return ["codex-senior"]
if first_reviewer.startswith("claude-"):
return ["claude-senior"]
reviewer_agent = agents.get(first_reviewer)
if reviewer_agent is None:
return []
command = reviewer_agent.command.lower()
if "codex" in command:
return ["codex-senior"]
if "claude" in command:
return ["claude-senior"]
return []
def normalize_reasoning_effort(effort: str) -> str:
"""Normalize user-facing reasoning effort aliases."""
normalized = REASONING_EFFORT_ALIASES.get(effort, effort)
if normalized not in REASONING_EFFORT_CHOICES:
raise ValueError(
f"Unsupported reasoning effort '{effort}'. "
f"Use one of: {REASONING_EFFORT_CHOICES}"
)
return normalized
def apply_reasoning_effort_settings(
config: PipelineConfig,
*,
reasoning_effort: str | None = None,
coder_effort: str | None = None,
reviewer_effort: str | None = None,
senior_effort: str | None = None,
) -> None:
"""Apply default and override reasoning effort settings by role."""
shared_effort = normalize_reasoning_effort(reasoning_effort) if reasoning_effort else None
role_efforts = {
"coder": normalize_reasoning_effort(coder_effort) if coder_effort else shared_effort,
"reviewer": normalize_reasoning_effort(reviewer_effort) if reviewer_effort else shared_effort,
"senior": normalize_reasoning_effort(senior_effort) if senior_effort else shared_effort,
}
_apply_role_effort(config.agents, config.coders, role_efforts["coder"], "coder")
_apply_role_effort(config.agents, config.reviewers, role_efforts["reviewer"], "reviewer")
_apply_role_effort(config.agents, config.seniors, role_efforts["senior"], "senior")
def _apply_role_effort(
agents: dict[str, AgentConfig],
agent_names: list[str],
override_effort: str | None,
role: str,
) -> None:
"""Set reasoning effort on agents for a specific role."""
for agent_name in agent_names:
agent = agents.get(agent_name)
if agent is None:
continue
if override_effort is not None:
agent.reasoning_effort = override_effort
elif agent.reasoning_effort is None:
agent.reasoning_effort = DEFAULT_ROLE_REASONING_EFFORTS[role]
# ---------------------------------------------------------------------------
# Default config (no YAML)
# ---------------------------------------------------------------------------
def default_config() -> PipelineConfig:
"""Return a PipelineConfig with sensible defaults (no YAML needed)."""
agents = dict(BUILTIN_AGENTS)
coders = ["claude-coder"]
reviewers = ["claude-reviewer"]
seniors: list[str] = []
pipeline = PIPELINE_PRESETS["simple"](coders, reviewers, seniors)
return PipelineConfig(
output_dir=Path("output"),
max_iterations=3,
language="ko",
inputs={},
agents=agents,
coders=coders,
reviewers=reviewers,
seniors=seniors,
pipeline=pipeline,
)
# ---------------------------------------------------------------------------
# YAML loading
# ---------------------------------------------------------------------------
def load_config(path: Path) -> PipelineConfig:
"""Load and validate a YAML config file, returning PipelineConfig."""
path = path.resolve()
with open(path, encoding="utf-8") as f:
raw = yaml.safe_load(f)
if not isinstance(raw, dict):
raise ValueError(f"Config file must be a YAML mapping, got {type(raw).__name__}")
config = _parse_raw(raw, path)
errors = validate_config(config)
if errors:
raise ValueError("Config validation failed:\n " + "\n ".join(errors))
return config
def _parse_raw(raw: dict[str, Any], config_path: Path) -> PipelineConfig:
"""Parse raw YAML dict into PipelineConfig."""
# --- agents ---
agents: dict[str, AgentConfig] = {}
for name, agent_data in raw.get("agents", {}).items():
agents[name] = AgentConfig(
name=name,
command=agent_data.get("command", "claude"),
args=agent_data.get("args", ["-p"]),
system_prompt=agent_data.get("system_prompt"),
reasoning_effort=agent_data.get("reasoning_effort"),
stdin_mode=agent_data.get("stdin_mode", False),
)
# --- roles: explicit or inferred ---
pipeline_raw = raw.get("pipeline", "preset:simple")
coders_raw = raw.get("coders")
reviewers_raw = raw.get("reviewers")
seniors_raw = raw.get("seniors")
if coders_raw is not None or reviewers_raw is not None or seniors_raw is not None:
# Explicit role assignment from YAML
coders: list[str] = coders_raw if coders_raw is not None else []
reviewers: list[str] = reviewers_raw if reviewers_raw is not None else []
seniors: list[str] = seniors_raw if seniors_raw is not None else []
else:
# Backward compat: infer from agent names
coders, reviewers, seniors = _infer_roles(list(agents.keys()))
if not seniors:
seniors = _default_seniors_for_preset(pipeline_raw, reviewers, agents)
# Auto-merge built-in agents for any referenced names not yet defined
agents = _resolve_agents(agents, coders, reviewers, seniors)
config_stub = PipelineConfig(
agents=agents,
coders=coders,
reviewers=reviewers,
seniors=seniors,
)
apply_reasoning_effort_settings(config_stub)
# --- inputs (resolve relative to config file location) ---
config_dir = config_path.parent
inputs: dict[str, Path | str] = {}
for key, val in raw.get("inputs", {}).items():
p = Path(val)
if not p.is_absolute():
p = config_dir / p
inputs[key] = p
# --- pipeline (preset or custom) ---
steps, phases = _resolve_pipeline(pipeline_raw, coders, reviewers, seniors)
# Detect preset name for output directory naming
preset_name = "custom"
if isinstance(pipeline_raw, str) and pipeline_raw.startswith("preset:"):
preset_name = pipeline_raw.split(":", 1)[1]
return PipelineConfig(
output_dir=Path(raw.get("output_dir", "output")),
max_iterations=int(raw.get("max_iterations", 3)),
min_iterations=int(raw.get("min_iterations", 1)),
verbose=bool(raw.get("verbose", False)),
language=raw.get("language", "en"),
inputs=inputs,
agents=agents,
coders=coders,
reviewers=reviewers,
seniors=seniors,
pipeline=steps,
phases=phases,
preset_name=preset_name,
_config_path=config_path,
_config_mtime=config_path.stat().st_mtime,
)
def try_reload_config(config: PipelineConfig) -> PipelineConfig:
"""Reload config if the file has been modified on disk.
Returns the new config if reloaded, or the same config if unchanged.
Validation errors during reload are logged but do not crash the pipeline.
"""
if config._config_path is None or config._config_mtime is None:
return config
try:
current_mtime = config._config_path.stat().st_mtime
except OSError:
return config
if current_mtime <= config._config_mtime:
return config
logger.info("Config file changed, reloading: %s", config._config_path.name)
try:
new_config = load_config(config._config_path)
logger.info("Config reloaded successfully")
return new_config
except (ValueError, FileNotFoundError, yaml.YAMLError) as e:
logger.warning("Config reload failed, keeping previous config: %s", e)
return config
def _resolve_pipeline(
pipeline_raw: Any,
coders: list[str],
reviewers: list[str],
seniors: list[str],
) -> tuple[list[StepConfig], list[PhaseConfig]]:
"""Resolve pipeline from preset string or explicit step list.
Returns (steps, phases) tuple. Only one will be non-empty.
- Simple/cross-review/review-only → steps populated, phases empty.
- Phased presets (review-fix) → steps empty, phases populated.
"""
# Preset: "preset:simple" or "preset:review-fix"
if isinstance(pipeline_raw, str) and pipeline_raw.startswith("preset:"):
preset_name = pipeline_raw.split(":", 1)[1]
if preset_name in PIPELINE_PRESETS:
return PIPELINE_PRESETS[preset_name](coders, reviewers, seniors), []
if preset_name in PHASED_PRESETS:
return [], PHASED_PRESETS[preset_name](coders, reviewers, seniors)
all_presets = list(PIPELINE_PRESETS.keys()) + list(PHASED_PRESETS.keys())
raise ValueError(
f"Unknown pipeline preset '{preset_name}'. "
f"Available: {all_presets}"
)
# Explicit step list
if isinstance(pipeline_raw, list):
steps = []
for step_data in pipeline_raw:
steps.append(StepConfig(
name=step_data["name"],
agent=step_data["agent"],
role=step_data.get("role", "generate"),
prompt_template=step_data.get("prompt_template", f"default:{step_data.get('role', 'generate')}"),
output_key=step_data["output_key"],
verdict=step_data.get("verdict", False),
verdict_pattern=step_data.get("verdict_pattern", r"VERDICT:\s*PASS"),
context_override=step_data.get("context_override", {}),
))
return steps, []
raise ValueError(
f"'pipeline' must be a preset string (e.g. 'preset:simple') "
f"or a list of step definitions, got {type(pipeline_raw).__name__}"
)
def validate_config(config: PipelineConfig) -> list[str]:
"""Return list of validation error strings (empty = valid)."""
errors: list[str] = []
if config.phases:
# --- Phased pipeline validation ---
for phase in config.phases:
if not phase.steps:
errors.append(f"Phase '{phase.name}' has no steps")
for step in phase.steps:
if step.agent not in config.agents:
errors.append(
f"Phase '{phase.name}' step '{step.name}' references "
f"undefined agent '{step.agent}'. "
f"Defined agents: {list(config.agents.keys())}"
)
_validate_unique_step_fields(
phase.steps,
errors,
scope=f"Phase '{phase.name}'",
)
if not any(s.verdict for s in phase.steps):
errors.append(
f"Phase '{phase.name}' must have at least one step with verdict: true"
)
# Validate verdict patterns
for step in phase.steps:
if step.verdict:
try:
re.compile(step.verdict_pattern)
except re.error as e:
errors.append(
f"Phase '{phase.name}' step '{step.name}' "
f"has invalid verdict_pattern: {e}"
)
else:
# --- Simple pipeline validation ---
if not config.pipeline:
errors.append("Pipeline must have at least one step")
for step in config.pipeline:
if step.agent not in config.agents:
errors.append(
f"Step '{step.name}' references undefined agent '{step.agent}'. "
f"Defined agents: {list(config.agents.keys())}"
)
_validate_unique_step_fields(
config.pipeline,
errors,
scope="Pipeline",
)
if not any(s.verdict for s in config.pipeline):
errors.append("Pipeline must have at least one step with verdict: true")
for step in config.pipeline:
if step.verdict:
try:
re.compile(step.verdict_pattern)
except re.error as e:
errors.append(
f"Step '{step.name}' has invalid verdict_pattern: {e}"
)
# --- Common validation ---
for key, val in config.inputs.items():
if isinstance(val, Path) and not val.exists():
errors.append(f"Input file '{key}' not found: {val}")
if config.language not in ("en", "ko"):
errors.append(f"Unsupported language '{config.language}'. Use 'en' or 'ko'.")
return errors
def _validate_unique_step_fields(
steps: list[StepConfig],
errors: list[str],
*,
scope: str,
) -> None:
"""Ensure step names and output keys are unique within a step collection."""
seen_names: set[str] = set()
seen_output_keys: set[str] = set()
for step in steps:
if step.name in seen_names:
errors.append(f"{scope} has duplicate step name '{step.name}'")
seen_names.add(step.name)
if step.output_key in seen_output_keys:
errors.append(f"{scope} has duplicate output_key '{step.output_key}'")
seen_output_keys.add(step.output_key)
def apply_input_overrides(
config: PipelineConfig, overrides: dict[str, str]
) -> None:
"""Apply CLI --input overrides to the config."""
for key, path_str in overrides.items():
config.inputs[key] = Path(path_str)