734 lines
26 KiB
Python
734 lines
26 KiB
Python
"""Configuration loading, validation, and preset resolution."""
|
|
from __future__ import annotations
|
|
|
|
import copy
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
from cross_eval.models import (
|
|
AgentConfig,
|
|
ExecutionConfig,
|
|
PhaseConfig,
|
|
PipelineConfig,
|
|
StepConfig,
|
|
)
|
|
from cross_eval.prompts import PHASED_PRESETS, PIPELINE_PRESETS
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
REASONING_EFFORT_ALIASES = {
|
|
"extra-high": "xhigh",
|
|
"extra_high": "xhigh",
|
|
"x-high": "xhigh",
|
|
}
|
|
REASONING_EFFORT_CHOICES = ("minimal", "low", "medium", "high", "xhigh")
|
|
DEFAULT_ROLE_REASONING_EFFORTS = {
|
|
"coder": "medium",
|
|
"reviewer": "medium",
|
|
"senior": "high",
|
|
}
|
|
FIX_STYLE_PRESETS = {"review-fix", "coding-review-fix"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Built-in agent registry
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_CODEX_ARGS = [
|
|
"exec",
|
|
"--full-auto",
|
|
"--skip-git-repo-check",
|
|
"--model",
|
|
"gpt-5.4",
|
|
"-",
|
|
]
|
|
|
|
_CLAUDE_BASE_ARGS = [
|
|
"-p",
|
|
"--setting-sources",
|
|
"user",
|
|
"--disable-slash-commands",
|
|
"--model",
|
|
"opus",
|
|
]
|
|
|
|
_CLAUDE_CODER_ARGS = list(_CLAUDE_BASE_ARGS) + [
|
|
"--dangerously-skip-permissions",
|
|
"--permission-mode",
|
|
"bypassPermissions",
|
|
]
|
|
|
|
_CLAUDE_REVIEW_ARGS = [
|
|
"--setting-sources",
|
|
"user",
|
|
"--disable-slash-commands",
|
|
"--model",
|
|
"opus",
|
|
"--permission-mode",
|
|
"plan",
|
|
]
|
|
|
|
_CODER_SYSTEM_PROMPT = (
|
|
"You are a senior software engineer implementing code changes.\n"
|
|
"Rules:\n"
|
|
"1. FIRST explore the project directory to understand the existing codebase, "
|
|
"patterns, and conventions before writing any code.\n"
|
|
"2. You may decide which shell, Python, git, docker, test, and database commands "
|
|
"to run. The user does not need to pre-specify exact commands.\n"
|
|
"3. Environment variables from configured .env files may already be loaded into "
|
|
"your process; use them when validating services such as ClickHouse.\n"
|
|
"4. Implement ONLY what the plan specifies. Do NOT add extra features, "
|
|
"unnecessary abstractions, premature optimizations, or \"nice-to-have\" improvements.\n"
|
|
"5. Follow the project's existing coding style, naming conventions, and directory structure.\n"
|
|
"6. If previous review feedback is provided, fix ONLY the specific issues mentioned. "
|
|
"Do NOT refactor unrelated code.\n"
|
|
"7. Ignore any items from previous feedback that were marked as DISMISSED or false positive.\n"
|
|
"8. When in doubt about scope, do LESS, not more."
|
|
)
|
|
|
|
_REVIEWER_SYSTEM_PROMPT = (
|
|
"You are a code reviewer. You MUST NOT create, modify, or delete any files.\n"
|
|
"Rules:\n"
|
|
"1. Explore the project directory to understand the full codebase context.\n"
|
|
"2. You may decide which shell, Python, test, git, docker, and database read commands "
|
|
"to run in order to verify behavior. The user does not need to pre-specify exact commands.\n"
|
|
"3. Environment variables from configured .env files may already be loaded into "
|
|
"your process; use them for verification when relevant.\n"
|
|
"4. Compare the implementation against the plan and checklist ONLY.\n"
|
|
"5. Classify every issue with BOTH severity AND category:\n"
|
|
" - Severity: Critical (breaks functionality/security) > Major (requirement mismatch) > Minor (convention/style)\n"
|
|
" - Category: Over-engineering / Omission\n"
|
|
"6. When reviewing with previous feedback, mark items as CONFIRMED (still an issue) "
|
|
"or DISMISSED (false positive) with rationale.\n"
|
|
"7. Report out-of-scope issues separately — problems found outside plan/checklist scope.\n"
|
|
"8. Order issues by severity (Critical first).\n"
|
|
"9. Do NOT suggest improvements beyond the plan scope.\n"
|
|
"10. End with VERDICT: PASS (all requirements met, no over-engineering) "
|
|
"or VERDICT: FAIL (issues found)."
|
|
)
|
|
|
|
_SENIOR_SYSTEM_PROMPT = (
|
|
"You are a senior technical reviewer coordinating a review-fix-verification loop.\n"
|
|
"Rules:\n"
|
|
"1. Explore the project directory to understand the full codebase context.\n"
|
|
"2. You may decide which shell, Python, test, git, docker, and database read commands "
|
|
"to run to verify disputed issues. The user does not need to pre-specify exact commands.\n"
|
|
"3. Environment variables from configured .env files may already be loaded into "
|
|
"your process; use them when validating service integrations.\n"
|
|
"4. In aggregation mode, deduplicate overlaps, resolve disagreements, and keep only "
|
|
"evidence-backed issues. Categorize dismissed findings as [False positive] or [Already fixed].\n"
|
|
"5. In verification mode, judge the current implementation directly against ONLY the "
|
|
"plan and checklist.\n"
|
|
"6. Be skeptical of false positives, but do not lower the bar on real requirement "
|
|
"gaps.\n"
|
|
"7. When issues remain, produce a concise prioritized action list the coder can act on.\n"
|
|
"8. Maintain an Issue Tracker table across iterations to track issue status.\n"
|
|
"9. Do NOT invent new requirements beyond the plan and checklist.\n"
|
|
"10. End with one of three verdicts:\n"
|
|
" - VERDICT: PASS — all requirements met, no issues remain.\n"
|
|
" - VERDICT: FAIL — issues found that the coder can fix.\n"
|
|
" - VERDICT: ESCALATE — issues that require human intervention. Use ESCALATE when:\n"
|
|
" * Requirements are ambiguous and need clarification from stakeholders\n"
|
|
" * Architecture decisions are needed that go beyond the plan scope\n"
|
|
" * External dependency issues block progress\n"
|
|
" * The coder has failed to resolve the same issue 2+ times"
|
|
)
|
|
|
|
BUILTIN_AGENTS: dict[str, AgentConfig] = {
|
|
"claude-coder": AgentConfig(
|
|
name="claude-coder",
|
|
command="claude",
|
|
args=list(_CLAUDE_CODER_ARGS),
|
|
system_prompt=_CODER_SYSTEM_PROMPT,
|
|
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["coder"],
|
|
),
|
|
"claude-reviewer": AgentConfig(
|
|
name="claude-reviewer",
|
|
command="claude",
|
|
args=list(_CLAUDE_REVIEW_ARGS),
|
|
system_prompt=_REVIEWER_SYSTEM_PROMPT,
|
|
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["reviewer"],
|
|
),
|
|
"claude-senior": AgentConfig(
|
|
name="claude-senior",
|
|
command="claude",
|
|
args=list(_CLAUDE_REVIEW_ARGS),
|
|
system_prompt=_SENIOR_SYSTEM_PROMPT,
|
|
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["senior"],
|
|
),
|
|
"codex-coder": AgentConfig(
|
|
name="codex-coder",
|
|
command="codex",
|
|
args=list(_CODEX_ARGS),
|
|
system_prompt=_CODER_SYSTEM_PROMPT,
|
|
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["coder"],
|
|
),
|
|
"codex-reviewer": AgentConfig(
|
|
name="codex-reviewer",
|
|
command="codex",
|
|
args=list(_CODEX_ARGS),
|
|
system_prompt=_REVIEWER_SYSTEM_PROMPT,
|
|
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["reviewer"],
|
|
),
|
|
"codex-senior": AgentConfig(
|
|
name="codex-senior",
|
|
command="codex",
|
|
args=list(_CODEX_ARGS),
|
|
system_prompt=_SENIOR_SYSTEM_PROMPT,
|
|
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["senior"],
|
|
),
|
|
}
|
|
|
|
# Shorthand aliases: "claude" → "claude-coder"/"claude-reviewer", "codex" → same
|
|
_AGENT_ALIASES: dict[str, str] = {
|
|
"claude": "claude",
|
|
"codex": "codex",
|
|
}
|
|
|
|
_ROLE_ALIASES: dict[str, str] = {
|
|
"coding": "coding",
|
|
"review": "review",
|
|
}
|
|
|
|
|
|
def resolve_agent_shorthand(name: str, role: str) -> str:
|
|
"""Resolve shorthand agent name to full builtin name.
|
|
|
|
Examples:
|
|
resolve_agent_shorthand("claude", "coder") → "claude-coder"
|
|
resolve_agent_shorthand("codex", "reviewer") → "codex-reviewer"
|
|
resolve_agent_shorthand("claude-coder", "coder") → "claude-coder" (unchanged)
|
|
"""
|
|
if name in _AGENT_ALIASES:
|
|
return f"{_AGENT_ALIASES[name]}-{role}"
|
|
return name
|
|
|
|
|
|
def normalize_step_role(role: str) -> str:
|
|
"""Normalize step role aliases to the canonical role name."""
|
|
return _ROLE_ALIASES.get(role, role)
|
|
|
|
|
|
def normalize_prompt_template(template_ref: str) -> str:
|
|
"""Normalize prompt template aliases to canonical template refs."""
|
|
return template_ref
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Role inference (backward compatibility)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_CODER_PATTERNS = ("gen", "coder", "implement", "develop", "write")
|
|
_SENIOR_PATTERNS = ("senior", "lead", "principal", "aggregate", "adjudicat", "synth")
|
|
_REVIEWER_PATTERNS = ("review", "audit", "check", "verify", "inspect")
|
|
|
|
|
|
def _infer_roles(agent_names: list[str]) -> tuple[list[str], list[str], list[str]]:
|
|
"""Infer coder/reviewer/senior roles from agent names.
|
|
|
|
Heuristic:
|
|
- Names containing 'gen', 'coder', etc. → coder
|
|
- Names containing 'senior', 'lead', etc. → senior
|
|
- Names containing 'review', 'audit', etc. → reviewer
|
|
- If no matches: first agent → coder, rest → reviewers
|
|
"""
|
|
coders: list[str] = []
|
|
reviewers: list[str] = []
|
|
seniors: list[str] = []
|
|
unclassified: list[str] = []
|
|
|
|
for name in agent_names:
|
|
lower = name.lower()
|
|
if any(p in lower for p in _CODER_PATTERNS):
|
|
coders.append(name)
|
|
elif any(p in lower for p in _SENIOR_PATTERNS):
|
|
seniors.append(name)
|
|
elif any(p in lower for p in _REVIEWER_PATTERNS):
|
|
reviewers.append(name)
|
|
else:
|
|
unclassified.append(name)
|
|
|
|
# Fallback: if no classification worked, use positional convention
|
|
if not coders and not reviewers:
|
|
if len(agent_names) >= 2:
|
|
coders = [agent_names[0]]
|
|
reviewers = list(agent_names[1:])
|
|
elif agent_names:
|
|
# Single agent: treat as reviewer (for review-only)
|
|
reviewers = list(agent_names)
|
|
elif not coders and unclassified:
|
|
coders = [unclassified.pop(0)]
|
|
elif not reviewers and unclassified:
|
|
reviewers = list(unclassified)
|
|
unclassified = []
|
|
|
|
# Any remaining unclassified go to reviewers
|
|
reviewers.extend(unclassified)
|
|
|
|
return coders, reviewers, seniors
|
|
|
|
|
|
def _resolve_agents(
|
|
user_agents: dict[str, AgentConfig],
|
|
coders: list[str],
|
|
reviewers: list[str],
|
|
seniors: list[str],
|
|
) -> dict[str, AgentConfig]:
|
|
"""Ensure all referenced agents exist by merging built-in definitions.
|
|
|
|
If a coder or reviewer name references an agent not in user_agents
|
|
but present in BUILTIN_AGENTS, the built-in definition is added.
|
|
"""
|
|
all_referenced = set(coders) | set(reviewers) | set(seniors)
|
|
result = dict(user_agents)
|
|
|
|
for name in all_referenced:
|
|
if name not in result and name in BUILTIN_AGENTS:
|
|
result[name] = copy.deepcopy(BUILTIN_AGENTS[name])
|
|
|
|
return result
|
|
|
|
|
|
def _default_seniors_for_preset(
|
|
pipeline_raw: Any,
|
|
reviewers: list[str],
|
|
agents: dict[str, AgentConfig],
|
|
) -> list[str]:
|
|
"""Infer a default senior agent for presets that benefit from adjudication."""
|
|
if not (
|
|
isinstance(pipeline_raw, str)
|
|
and pipeline_raw in {"preset:review-fix", "preset:coding-review-fix"}
|
|
and reviewers
|
|
):
|
|
return []
|
|
|
|
first_reviewer = reviewers[0]
|
|
if first_reviewer.startswith("codex-"):
|
|
return ["codex-senior"]
|
|
if first_reviewer.startswith("claude-"):
|
|
return ["claude-senior"]
|
|
|
|
reviewer_agent = agents.get(first_reviewer)
|
|
if reviewer_agent is None:
|
|
return []
|
|
|
|
command = reviewer_agent.command.lower()
|
|
if "codex" in command:
|
|
return ["codex-senior"]
|
|
if "claude" in command:
|
|
return ["claude-senior"]
|
|
return []
|
|
|
|
|
|
def normalize_reasoning_effort(effort: str) -> str:
|
|
"""Normalize user-facing reasoning effort aliases."""
|
|
normalized = REASONING_EFFORT_ALIASES.get(effort, effort)
|
|
if normalized not in REASONING_EFFORT_CHOICES:
|
|
raise ValueError(
|
|
f"Unsupported reasoning effort '{effort}'. "
|
|
f"Use one of: {REASONING_EFFORT_CHOICES}"
|
|
)
|
|
return normalized
|
|
|
|
|
|
def apply_reasoning_effort_settings(
|
|
config: PipelineConfig,
|
|
*,
|
|
reasoning_effort: str | None = None,
|
|
coder_effort: str | None = None,
|
|
reviewer_effort: str | None = None,
|
|
senior_effort: str | None = None,
|
|
) -> None:
|
|
"""Apply default and override reasoning effort settings by role."""
|
|
shared_effort = normalize_reasoning_effort(reasoning_effort) if reasoning_effort else None
|
|
role_efforts = {
|
|
"coder": normalize_reasoning_effort(coder_effort) if coder_effort else shared_effort,
|
|
"reviewer": normalize_reasoning_effort(reviewer_effort) if reviewer_effort else shared_effort,
|
|
"senior": normalize_reasoning_effort(senior_effort) if senior_effort else shared_effort,
|
|
}
|
|
|
|
_apply_role_effort(config.agents, config.coders, role_efforts["coder"], "coder")
|
|
_apply_role_effort(config.agents, config.reviewers, role_efforts["reviewer"], "reviewer")
|
|
_apply_role_effort(config.agents, config.seniors, role_efforts["senior"], "senior")
|
|
|
|
|
|
def _apply_role_effort(
|
|
agents: dict[str, AgentConfig],
|
|
agent_names: list[str],
|
|
override_effort: str | None,
|
|
role: str,
|
|
) -> None:
|
|
"""Set reasoning effort on agents for a specific role."""
|
|
for agent_name in agent_names:
|
|
agent = agents.get(agent_name)
|
|
if agent is None:
|
|
continue
|
|
if override_effort is not None:
|
|
agent.reasoning_effort = override_effort
|
|
elif agent.reasoning_effort is None:
|
|
agent.reasoning_effort = DEFAULT_ROLE_REASONING_EFFORTS[role]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Default config (no YAML)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def default_config() -> PipelineConfig:
|
|
"""Return a PipelineConfig with sensible defaults (no YAML needed)."""
|
|
agents = copy.deepcopy(BUILTIN_AGENTS)
|
|
coders = ["claude-coder"]
|
|
reviewers = ["claude-reviewer"]
|
|
seniors: list[str] = []
|
|
pipeline = PIPELINE_PRESETS["simple"](coders, reviewers, seniors)
|
|
return PipelineConfig(
|
|
output_dir=Path(".cross-eval/output"),
|
|
max_iterations=3,
|
|
language="ko",
|
|
execution=ExecutionConfig(),
|
|
inputs={},
|
|
agents=agents,
|
|
coders=coders,
|
|
reviewers=reviewers,
|
|
seniors=seniors,
|
|
pipeline=pipeline,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# YAML loading
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_config(path: Path) -> PipelineConfig:
|
|
"""Load and validate a YAML config file, returning PipelineConfig."""
|
|
path = path.resolve()
|
|
with open(path, encoding="utf-8") as f:
|
|
raw = yaml.safe_load(f)
|
|
|
|
if not isinstance(raw, dict):
|
|
raise ValueError(f"Config file must be a YAML mapping, got {type(raw).__name__}")
|
|
|
|
config = _parse_raw(raw, path)
|
|
|
|
errors = validate_config(config)
|
|
if errors:
|
|
raise ValueError("Config validation failed:\n " + "\n ".join(errors))
|
|
|
|
return config
|
|
|
|
|
|
def _parse_raw(raw: dict[str, Any], config_path: Path) -> PipelineConfig:
|
|
"""Parse raw YAML dict into PipelineConfig."""
|
|
# --- agents ---
|
|
agents: dict[str, AgentConfig] = {}
|
|
for name, agent_data in raw.get("agents", {}).items():
|
|
agents[name] = AgentConfig(
|
|
name=name,
|
|
command=agent_data.get("command", "claude"),
|
|
args=agent_data.get("args", ["-p"]),
|
|
system_prompt=agent_data.get("system_prompt"),
|
|
reasoning_effort=agent_data.get("reasoning_effort"),
|
|
stdin_mode=agent_data.get("stdin_mode", False),
|
|
agentic=agent_data.get("agentic", False),
|
|
)
|
|
|
|
# --- roles: explicit or inferred ---
|
|
pipeline_raw = raw.get("pipeline", "preset:simple")
|
|
coders_raw = raw.get("coders")
|
|
reviewers_raw = raw.get("reviewers")
|
|
seniors_raw = raw.get("seniors")
|
|
|
|
if coders_raw is not None or reviewers_raw is not None or seniors_raw is not None:
|
|
# Explicit role assignment from YAML
|
|
coders: list[str] = coders_raw if coders_raw is not None else []
|
|
reviewers: list[str] = reviewers_raw if reviewers_raw is not None else []
|
|
seniors: list[str] = seniors_raw if seniors_raw is not None else []
|
|
else:
|
|
# Backward compat: infer from agent names
|
|
coders, reviewers, seniors = _infer_roles(list(agents.keys()))
|
|
|
|
if not seniors:
|
|
seniors = _default_seniors_for_preset(pipeline_raw, reviewers, agents)
|
|
|
|
# Auto-merge built-in agents for any referenced names not yet defined
|
|
agents = _resolve_agents(agents, coders, reviewers, seniors)
|
|
config_stub = PipelineConfig(
|
|
agents=agents,
|
|
coders=coders,
|
|
reviewers=reviewers,
|
|
seniors=seniors,
|
|
)
|
|
apply_reasoning_effort_settings(config_stub)
|
|
|
|
# --- inputs (resolve relative to config file location) ---
|
|
config_dir = config_path.parent
|
|
inputs: dict[str, Path | str] = {}
|
|
for key, val in raw.get("inputs", {}).items():
|
|
p = Path(val)
|
|
if not p.is_absolute():
|
|
p = config_dir / p
|
|
inputs[key] = p
|
|
|
|
execution_raw = raw.get("execution", {}) or {}
|
|
execution = ExecutionConfig(
|
|
mode=execution_raw.get("mode", "agent-decides"),
|
|
command_policy=execution_raw.get("command_policy", "broad"),
|
|
inherit_env=bool(execution_raw.get("inherit_env", True)),
|
|
auto_env_files=list(execution_raw.get("auto_env_files", [".env", ".env.local"])),
|
|
env_files=list(execution_raw.get("env_files", [])),
|
|
expose_env_names=bool(execution_raw.get("expose_env_names", True)),
|
|
auto_context_targets=list(execution_raw.get("auto_context_targets", [])),
|
|
)
|
|
|
|
# --- pipeline (preset or custom) ---
|
|
steps, phases = _resolve_pipeline(pipeline_raw, coders, reviewers, seniors)
|
|
|
|
# Detect preset name for output directory naming
|
|
preset_name = "custom"
|
|
if isinstance(pipeline_raw, str) and pipeline_raw.startswith("preset:"):
|
|
preset_name = pipeline_raw.split(":", 1)[1]
|
|
|
|
config = PipelineConfig(
|
|
output_dir=Path(raw.get("output_dir", ".cross-eval/output")),
|
|
max_iterations=int(raw.get("max_iterations", 3)),
|
|
min_iterations=int(raw.get("min_iterations", 1)),
|
|
verbose=bool(raw.get("verbose", False)),
|
|
language=raw.get("language", "en"),
|
|
execution=execution,
|
|
inputs=inputs,
|
|
agents=agents,
|
|
coders=coders,
|
|
reviewers=reviewers,
|
|
seniors=seniors,
|
|
pipeline=steps,
|
|
phases=phases,
|
|
preset_name=preset_name,
|
|
_config_path=config_path,
|
|
_config_mtime=config_path.stat().st_mtime,
|
|
)
|
|
sync_phased_iterations(config)
|
|
ensure_fix_preset_agentic(config)
|
|
return config
|
|
|
|
|
|
def try_reload_config(config: PipelineConfig) -> PipelineConfig:
|
|
"""Reload config if the file has been modified on disk.
|
|
|
|
Returns the new config if reloaded, or the same config if unchanged.
|
|
Validation errors during reload are logged but do not crash the pipeline.
|
|
"""
|
|
if config._config_path is None or config._config_mtime is None:
|
|
return config
|
|
|
|
try:
|
|
current_mtime = config._config_path.stat().st_mtime
|
|
except OSError:
|
|
return config
|
|
|
|
if current_mtime <= config._config_mtime:
|
|
return config
|
|
|
|
logger.info("Config file changed, reloading: %s", config._config_path.name)
|
|
try:
|
|
new_config = load_config(config._config_path)
|
|
logger.info("Config reloaded successfully")
|
|
return new_config
|
|
except (ValueError, FileNotFoundError, yaml.YAMLError) as e:
|
|
logger.warning("Config reload failed, keeping previous config: %s", e)
|
|
return config
|
|
|
|
|
|
def _resolve_pipeline(
|
|
pipeline_raw: Any,
|
|
coders: list[str],
|
|
reviewers: list[str],
|
|
seniors: list[str],
|
|
) -> tuple[list[StepConfig], list[PhaseConfig]]:
|
|
"""Resolve pipeline from preset string or explicit step list.
|
|
|
|
Returns (steps, phases) tuple. Only one will be non-empty.
|
|
- Simple/cross-review/plan-review/review-only → steps populated, phases empty.
|
|
- Phased presets (review-fix) → steps empty, phases populated.
|
|
"""
|
|
# Preset: "preset:simple" or "preset:review-fix"
|
|
if isinstance(pipeline_raw, str) and pipeline_raw.startswith("preset:"):
|
|
preset_name = pipeline_raw.split(":", 1)[1]
|
|
if preset_name in PIPELINE_PRESETS:
|
|
return PIPELINE_PRESETS[preset_name](coders, reviewers, seniors), []
|
|
if preset_name in PHASED_PRESETS:
|
|
return [], PHASED_PRESETS[preset_name](coders, reviewers, seniors)
|
|
all_presets = list(PIPELINE_PRESETS.keys()) + list(PHASED_PRESETS.keys())
|
|
raise ValueError(
|
|
f"Unknown pipeline preset '{preset_name}'. "
|
|
f"Available: {all_presets}"
|
|
)
|
|
|
|
# Explicit step list
|
|
if isinstance(pipeline_raw, list):
|
|
steps = []
|
|
for step_data in pipeline_raw:
|
|
raw_role = step_data.get("role", "coding")
|
|
normalized_role = normalize_step_role(raw_role)
|
|
steps.append(StepConfig(
|
|
name=step_data["name"],
|
|
agent=step_data["agent"],
|
|
role=normalized_role,
|
|
prompt_template=normalize_prompt_template(
|
|
step_data.get("prompt_template", f"default:{normalized_role}")
|
|
),
|
|
output_key=step_data["output_key"],
|
|
verdict=step_data.get("verdict", False),
|
|
verdict_pattern=step_data.get("verdict_pattern", r"VERDICT:\s*PASS"),
|
|
context_override=step_data.get("context_override", {}),
|
|
))
|
|
return steps, []
|
|
|
|
raise ValueError(
|
|
f"'pipeline' must be a preset string (e.g. 'preset:simple') "
|
|
f"or a list of step definitions, got {type(pipeline_raw).__name__}"
|
|
)
|
|
|
|
|
|
def validate_config(config: PipelineConfig) -> list[str]:
|
|
"""Return list of validation error strings (empty = valid)."""
|
|
errors: list[str] = []
|
|
|
|
if config.phases:
|
|
# --- Phased pipeline validation ---
|
|
for phase in config.phases:
|
|
if not phase.steps:
|
|
errors.append(f"Phase '{phase.name}' has no steps")
|
|
for step in phase.steps:
|
|
if step.agent not in config.agents:
|
|
errors.append(
|
|
f"Phase '{phase.name}' step '{step.name}' references "
|
|
f"undefined agent '{step.agent}'. "
|
|
f"Defined agents: {list(config.agents.keys())}"
|
|
)
|
|
_validate_unique_step_fields(
|
|
phase.steps,
|
|
errors,
|
|
scope=f"Phase '{phase.name}'",
|
|
)
|
|
# Validate verdict patterns
|
|
for step in phase.steps:
|
|
if step.verdict:
|
|
try:
|
|
re.compile(step.verdict_pattern)
|
|
except re.error as e:
|
|
errors.append(
|
|
f"Phase '{phase.name}' step '{step.name}' "
|
|
f"has invalid verdict_pattern: {e}"
|
|
)
|
|
else:
|
|
# --- Simple pipeline validation ---
|
|
if not config.pipeline:
|
|
errors.append("Pipeline must have at least one step")
|
|
|
|
for step in config.pipeline:
|
|
if step.agent not in config.agents:
|
|
errors.append(
|
|
f"Step '{step.name}' references undefined agent '{step.agent}'. "
|
|
f"Defined agents: {list(config.agents.keys())}"
|
|
)
|
|
|
|
_validate_unique_step_fields(
|
|
config.pipeline,
|
|
errors,
|
|
scope="Pipeline",
|
|
)
|
|
|
|
if not any(s.verdict for s in config.pipeline):
|
|
errors.append("Pipeline must have at least one step with verdict: true")
|
|
|
|
for step in config.pipeline:
|
|
if step.verdict:
|
|
try:
|
|
re.compile(step.verdict_pattern)
|
|
except re.error as e:
|
|
errors.append(
|
|
f"Step '{step.name}' has invalid verdict_pattern: {e}"
|
|
)
|
|
|
|
# --- Common validation ---
|
|
for key, val in config.inputs.items():
|
|
if isinstance(val, Path) and not val.exists():
|
|
errors.append(f"Input file '{key}' not found: {val}")
|
|
|
|
if config.language not in ("en", "ko"):
|
|
errors.append(f"Unsupported language '{config.language}'. Use 'en' or 'ko'.")
|
|
|
|
if config.execution.mode not in {"agent-decides"}:
|
|
errors.append(
|
|
f"Unsupported execution.mode '{config.execution.mode}'. Use 'agent-decides'."
|
|
)
|
|
if config.execution.command_policy not in {"broad", "restricted"}:
|
|
errors.append(
|
|
"Unsupported execution.command_policy "
|
|
f"'{config.execution.command_policy}'. Use 'broad' or 'restricted'."
|
|
)
|
|
|
|
return errors
|
|
|
|
|
|
def _validate_unique_step_fields(
|
|
steps: list[StepConfig],
|
|
errors: list[str],
|
|
*,
|
|
scope: str,
|
|
) -> None:
|
|
"""Ensure step names and output keys are unique within a step collection."""
|
|
seen_names: set[str] = set()
|
|
seen_output_keys: set[str] = set()
|
|
|
|
for step in steps:
|
|
if step.name in seen_names:
|
|
errors.append(f"{scope} has duplicate step name '{step.name}'")
|
|
seen_names.add(step.name)
|
|
|
|
if step.output_key in seen_output_keys:
|
|
errors.append(f"{scope} has duplicate output_key '{step.output_key}'")
|
|
seen_output_keys.add(step.output_key)
|
|
|
|
|
|
def _make_agentic(agent: AgentConfig) -> None:
|
|
"""Convert an agent to agentic mode in-place (remove -p, set agentic=True)."""
|
|
agent.agentic = True
|
|
agent.args = [a for a in agent.args if a != "-p"]
|
|
|
|
|
|
def sync_phased_iterations(
|
|
config: PipelineConfig,
|
|
max_iter: int | None = None,
|
|
) -> None:
|
|
"""Apply effective max iterations to converging phases while preserving setup phases."""
|
|
if not config.phases:
|
|
return
|
|
|
|
effective_max_iter = config.max_iterations if max_iter is None else max_iter
|
|
for phase in config.phases:
|
|
if any(step.verdict for step in phase.steps):
|
|
phase.max_iterations = effective_max_iter
|
|
|
|
|
|
def ensure_fix_preset_agentic(config: PipelineConfig) -> None:
|
|
"""Fix-style presets should modify code, so coders run agentically by default."""
|
|
if config.preset_name not in FIX_STYLE_PRESETS:
|
|
return
|
|
|
|
for coder_name in config.coders:
|
|
agent = config.agents.get(coder_name)
|
|
if agent is not None and not agent.agentic:
|
|
_make_agentic(agent)
|
|
|
|
|
|
def apply_input_overrides(
|
|
config: PipelineConfig, overrides: dict[str, str]
|
|
) -> None:
|
|
"""Apply CLI --input overrides to the config."""
|
|
for key, path_str in overrides.items():
|
|
config.inputs[key] = Path(path_str)
|