Files
cross-eval/cross_eval/config.py
이충영 에이닷서비스개발 0bbe0f6f7b continue
2026-03-15 17:54:30 +09:00

746 lines
27 KiB
Python

"""Configuration loading, validation, and preset resolution."""
from __future__ import annotations
import copy
import logging
import re
from pathlib import Path
from typing import Any
import yaml
from cross_eval.models import (
AgentConfig,
ExecutionConfig,
PhaseConfig,
PipelineConfig,
StepConfig,
)
from cross_eval.prompts import PHASED_PRESETS, PIPELINE_PRESETS
logger = logging.getLogger(__name__)
REASONING_EFFORT_ALIASES = {
"extra-high": "xhigh",
"extra_high": "xhigh",
"x-high": "xhigh",
}
REASONING_EFFORT_CHOICES = ("minimal", "low", "medium", "high", "xhigh")
DEFAULT_ROLE_REASONING_EFFORTS = {
"coder": "medium",
"reviewer": "medium",
"senior": "high",
}
FIX_STYLE_PRESETS = {
"plan-review",
"coding-plan-review",
}
# ---------------------------------------------------------------------------
# Built-in agent registry
# ---------------------------------------------------------------------------
_CODEX_ARGS = [
"exec",
"--full-auto",
"--skip-git-repo-check",
"--model",
"gpt-5.4",
"-",
]
_CLAUDE_BASE_ARGS = [
"-p",
"--setting-sources",
"user",
"--disable-slash-commands",
"--model",
"opus",
]
_CLAUDE_CODER_ARGS = list(_CLAUDE_BASE_ARGS) + [
"--dangerously-skip-permissions",
"--permission-mode",
"bypassPermissions",
]
_CLAUDE_REVIEW_ARGS = list(_CLAUDE_BASE_ARGS)
_CODER_SYSTEM_PROMPT = (
"You are a senior software engineer implementing code changes.\n"
"Rules:\n"
"1. FIRST explore the project directory to understand the existing codebase, "
"patterns, and conventions before writing any code.\n"
"2. You MUST use the Edit and Write tools to make ACTUAL file changes. "
"Do NOT just describe or explain changes in text — apply them directly to the files. "
"Your text output alone has no effect; only tool-based edits count.\n"
"3. You may decide which shell, Python, git, docker, test, and database commands "
"to run. The user does not need to pre-specify exact commands.\n"
"4. Environment variables from configured .env files may already be loaded into "
"your process; use them when validating services such as ClickHouse.\n"
"5. Implement ONLY what the plan specifies. Do NOT add extra features, "
"unnecessary abstractions, premature optimizations, or \"nice-to-have\" improvements.\n"
"6. Follow the project's existing coding style, naming conventions, and directory structure.\n"
"7. If previous review feedback is provided, fix ONLY the specific issues mentioned. "
"Do NOT refactor unrelated code.\n"
"8. Ignore any items from previous feedback that were marked as DISMISSED or false positive.\n"
"9. When in doubt about scope, do LESS, not more."
)
_REVIEWER_SYSTEM_PROMPT = (
"You are a code reviewer. You MUST NOT create, modify, or delete any files.\n"
"Rules:\n"
"1. Explore the project directory to understand the full codebase context.\n"
"2. You may decide which shell, Python, test, git, docker, and database read commands "
"to run in order to verify behavior. The user does not need to pre-specify exact commands.\n"
"3. Environment variables from configured .env files may already be loaded into "
"your process; use them for verification when relevant.\n"
"4. Compare the implementation against the plan and checklist ONLY.\n"
"5. Classify every issue with BOTH severity AND category:\n"
" - Severity: Critical (breaks functionality/security) > Major (requirement mismatch) > Minor (convention/style)\n"
" - Category: Over-engineering / Omission\n"
"6. When reviewing with previous feedback, mark items as CONFIRMED (still an issue) "
"or DISMISSED (false positive) with rationale.\n"
"7. Report out-of-scope issues separately — problems found outside plan/checklist scope.\n"
"8. Order issues by severity (Critical first).\n"
"9. Do NOT suggest improvements beyond the plan scope.\n"
"10. End with VERDICT: PASS (all requirements met, no over-engineering) "
"or VERDICT: FAIL (issues found)."
)
_SENIOR_SYSTEM_PROMPT = (
"You are a senior technical reviewer coordinating a review-fix-verification loop.\n"
"Rules:\n"
"1. Explore the project directory to understand the full codebase context.\n"
"2. You may decide which shell, Python, test, git, docker, and database read commands "
"to run to verify disputed issues. The user does not need to pre-specify exact commands.\n"
"3. Environment variables from configured .env files may already be loaded into "
"your process; use them when validating service integrations.\n"
"4. In aggregation mode, deduplicate overlaps, resolve disagreements, and keep only "
"evidence-backed issues. Categorize dismissed findings as [False positive] or [Already fixed].\n"
"5. In verification mode, judge the current implementation directly against ONLY the "
"plan and checklist.\n"
"6. Be skeptical of false positives, but do not lower the bar on real requirement "
"gaps.\n"
"7. When issues remain, produce a concise prioritized action list the coder can act on.\n"
"8. Maintain an Issue Tracker table across iterations to track issue status.\n"
"9. Do NOT invent new requirements beyond the plan and checklist.\n"
"10. End with one of three verdicts:\n"
" - VERDICT: PASS — all requirements met, no issues remain.\n"
" - VERDICT: FAIL — issues found that the coder can fix.\n"
" - VERDICT: ESCALATE — issues that require human intervention. Use ESCALATE when:\n"
" * Requirements are ambiguous and need clarification from stakeholders\n"
" * Architecture decisions are needed that go beyond the plan scope\n"
" * External dependency issues block progress\n"
" * The coder has failed to resolve the same issue 2+ times"
)
BUILTIN_AGENTS: dict[str, AgentConfig] = {
"claude-coder": AgentConfig(
name="claude-coder",
command="claude",
args=list(_CLAUDE_CODER_ARGS),
system_prompt=_CODER_SYSTEM_PROMPT,
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["coder"],
),
"claude-reviewer": AgentConfig(
name="claude-reviewer",
command="claude",
args=list(_CLAUDE_REVIEW_ARGS),
system_prompt=_REVIEWER_SYSTEM_PROMPT,
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["reviewer"],
),
"claude-senior": AgentConfig(
name="claude-senior",
command="claude",
args=list(_CLAUDE_REVIEW_ARGS),
system_prompt=_SENIOR_SYSTEM_PROMPT,
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["senior"],
),
"codex-coder": AgentConfig(
name="codex-coder",
command="codex",
args=list(_CODEX_ARGS),
system_prompt=_CODER_SYSTEM_PROMPT,
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["coder"],
),
"codex-reviewer": AgentConfig(
name="codex-reviewer",
command="codex",
args=list(_CODEX_ARGS),
system_prompt=_REVIEWER_SYSTEM_PROMPT,
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["reviewer"],
),
"codex-senior": AgentConfig(
name="codex-senior",
command="codex",
args=list(_CODEX_ARGS),
system_prompt=_SENIOR_SYSTEM_PROMPT,
reasoning_effort=DEFAULT_ROLE_REASONING_EFFORTS["senior"],
),
}
# Shorthand aliases: "claude" → "claude-coder"/"claude-reviewer", "codex" → same
_AGENT_ALIASES: dict[str, str] = {
"claude": "claude",
"codex": "codex",
}
_ROLE_ALIASES: dict[str, str] = {
"coding": "coding",
"review": "review",
}
def resolve_agent_shorthand(name: str, role: str) -> str:
"""Resolve shorthand agent name to full builtin name.
Examples:
resolve_agent_shorthand("claude", "coder") → "claude-coder"
resolve_agent_shorthand("codex", "reviewer") → "codex-reviewer"
resolve_agent_shorthand("claude-coder", "coder") → "claude-coder" (unchanged)
"""
if name in _AGENT_ALIASES:
return f"{_AGENT_ALIASES[name]}-{role}"
return name
def normalize_step_role(role: str) -> str:
"""Normalize step role aliases to the canonical role name."""
return _ROLE_ALIASES.get(role, role)
def normalize_prompt_template(template_ref: str) -> str:
"""Normalize prompt template aliases to canonical template refs."""
return template_ref
# ---------------------------------------------------------------------------
# Role inference (backward compatibility)
# ---------------------------------------------------------------------------
_CODER_PATTERNS = ("gen", "coder", "implement", "develop", "write")
_SENIOR_PATTERNS = ("senior", "lead", "principal", "aggregate", "adjudicat", "synth")
_REVIEWER_PATTERNS = ("review", "audit", "check", "verify", "inspect")
def _infer_roles(agent_names: list[str]) -> tuple[list[str], list[str], list[str]]:
"""Infer coder/reviewer/senior roles from agent names.
Heuristic:
- Names containing 'gen', 'coder', etc. → coder
- Names containing 'senior', 'lead', etc. → senior
- Names containing 'review', 'audit', etc. → reviewer
- If no matches: first agent → coder, rest → reviewers
"""
coders: list[str] = []
reviewers: list[str] = []
seniors: list[str] = []
unclassified: list[str] = []
for name in agent_names:
lower = name.lower()
if any(p in lower for p in _CODER_PATTERNS):
coders.append(name)
elif any(p in lower for p in _SENIOR_PATTERNS):
seniors.append(name)
elif any(p in lower for p in _REVIEWER_PATTERNS):
reviewers.append(name)
else:
unclassified.append(name)
# Fallback: if no classification worked, use positional convention
if not coders and not reviewers:
if len(agent_names) >= 2:
coders = [agent_names[0]]
reviewers = list(agent_names[1:])
elif agent_names:
# Single agent: treat as reviewer (for review-only)
reviewers = list(agent_names)
elif not coders and unclassified:
coders = [unclassified.pop(0)]
elif not reviewers and unclassified:
reviewers = list(unclassified)
unclassified = []
# Any remaining unclassified go to reviewers
reviewers.extend(unclassified)
return coders, reviewers, seniors
def _resolve_agents(
user_agents: dict[str, AgentConfig],
coders: list[str],
reviewers: list[str],
seniors: list[str],
) -> dict[str, AgentConfig]:
"""Ensure all referenced agents exist by merging built-in definitions.
If a coder or reviewer name references an agent not in user_agents
but present in BUILTIN_AGENTS, the built-in definition is added.
"""
all_referenced = set(coders) | set(reviewers) | set(seniors)
result = dict(user_agents)
for name in all_referenced:
if name not in result and name in BUILTIN_AGENTS:
result[name] = copy.deepcopy(BUILTIN_AGENTS[name])
return result
def _default_seniors_for_preset(
pipeline_raw: Any,
reviewers: list[str],
agents: dict[str, AgentConfig],
) -> list[str]:
"""Infer a default senior agent for presets that benefit from adjudication."""
if not (
isinstance(pipeline_raw, str)
and pipeline_raw in {
"preset:plan-review",
"preset:coding-plan-review",
}
and reviewers
):
return []
first_reviewer = reviewers[0]
if first_reviewer.startswith("codex-"):
return ["codex-senior"]
if first_reviewer.startswith("claude-"):
return ["claude-senior"]
reviewer_agent = agents.get(first_reviewer)
if reviewer_agent is None:
return []
command = reviewer_agent.command.lower()
if "codex" in command:
return ["codex-senior"]
if "claude" in command:
return ["claude-senior"]
return []
def normalize_reasoning_effort(effort: str) -> str:
"""Normalize user-facing reasoning effort aliases."""
normalized = REASONING_EFFORT_ALIASES.get(effort, effort)
if normalized not in REASONING_EFFORT_CHOICES:
raise ValueError(
f"Unsupported reasoning effort '{effort}'. "
f"Use one of: {REASONING_EFFORT_CHOICES}"
)
return normalized
def apply_reasoning_effort_settings(
config: PipelineConfig,
*,
reasoning_effort: str | None = None,
coder_effort: str | None = None,
reviewer_effort: str | None = None,
senior_effort: str | None = None,
) -> None:
"""Apply default and override reasoning effort settings by role."""
shared_effort = normalize_reasoning_effort(reasoning_effort) if reasoning_effort else None
role_efforts = {
"coder": normalize_reasoning_effort(coder_effort) if coder_effort else shared_effort,
"reviewer": normalize_reasoning_effort(reviewer_effort) if reviewer_effort else shared_effort,
"senior": normalize_reasoning_effort(senior_effort) if senior_effort else shared_effort,
}
_apply_role_effort(config.agents, config.coders, role_efforts["coder"], "coder")
_apply_role_effort(config.agents, config.reviewers, role_efforts["reviewer"], "reviewer")
_apply_role_effort(config.agents, config.seniors, role_efforts["senior"], "senior")
def _apply_role_effort(
agents: dict[str, AgentConfig],
agent_names: list[str],
override_effort: str | None,
role: str,
) -> None:
"""Set reasoning effort on agents for a specific role."""
for agent_name in agent_names:
agent = agents.get(agent_name)
if agent is None:
continue
if override_effort is not None:
agent.reasoning_effort = override_effort
elif agent.reasoning_effort is None:
agent.reasoning_effort = DEFAULT_ROLE_REASONING_EFFORTS[role]
# ---------------------------------------------------------------------------
# Default config (no YAML)
# ---------------------------------------------------------------------------
def default_config() -> PipelineConfig:
"""Return a PipelineConfig with sensible defaults (no YAML needed)."""
agents = copy.deepcopy(BUILTIN_AGENTS)
coders = ["claude-coder"]
reviewers = ["claude-reviewer"]
seniors: list[str] = []
pipeline: list[StepConfig] = []
phases = PHASED_PRESETS["coding-plan-review"](coders, reviewers, seniors)
return PipelineConfig(
output_dir=Path(".cross-eval/output"),
use_worktree=False,
max_iterations=3,
language="ko",
execution=ExecutionConfig(),
inputs={},
agents=agents,
coders=coders,
reviewers=reviewers,
seniors=seniors,
pipeline=pipeline,
phases=phases,
preset_name="coding-plan-review",
)
# ---------------------------------------------------------------------------
# YAML loading
# ---------------------------------------------------------------------------
def load_config(path: Path) -> PipelineConfig:
"""Load and validate a YAML config file, returning PipelineConfig."""
path = path.resolve()
with open(path, encoding="utf-8") as f:
raw = yaml.safe_load(f)
if not isinstance(raw, dict):
raise ValueError(f"Config file must be a YAML mapping, got {type(raw).__name__}")
config = _parse_raw(raw, path)
errors = validate_config(config)
if errors:
raise ValueError("Config validation failed:\n " + "\n ".join(errors))
return config
def _parse_raw(raw: dict[str, Any], config_path: Path) -> PipelineConfig:
"""Parse raw YAML dict into PipelineConfig."""
project_root = config_path.parent.parent if config_path.parent.name == ".cross-eval" else config_path.parent
# --- agents ---
agents: dict[str, AgentConfig] = {}
for name, agent_data in raw.get("agents", {}).items():
agents[name] = AgentConfig(
name=name,
command=agent_data.get("command", "claude"),
args=agent_data.get("args", ["-p"]),
system_prompt=agent_data.get("system_prompt"),
reasoning_effort=agent_data.get("reasoning_effort"),
stdin_mode=agent_data.get("stdin_mode", False),
agentic=agent_data.get("agentic", False),
)
# --- roles: explicit or inferred ---
pipeline_raw = raw.get("pipeline", "preset:coding-plan-review")
coders_raw = raw.get("coders")
reviewers_raw = raw.get("reviewers")
seniors_raw = raw.get("seniors")
if coders_raw is not None or reviewers_raw is not None or seniors_raw is not None:
# Explicit role assignment from YAML
coders: list[str] = coders_raw if coders_raw is not None else []
reviewers: list[str] = reviewers_raw if reviewers_raw is not None else []
seniors: list[str] = seniors_raw if seniors_raw is not None else []
else:
# Backward compat: infer from agent names
coders, reviewers, seniors = _infer_roles(list(agents.keys()))
if not seniors:
seniors = _default_seniors_for_preset(pipeline_raw, reviewers, agents)
# Auto-merge built-in agents for any referenced names not yet defined
agents = _resolve_agents(agents, coders, reviewers, seniors)
config_stub = PipelineConfig(
agents=agents,
coders=coders,
reviewers=reviewers,
seniors=seniors,
)
apply_reasoning_effort_settings(config_stub)
# --- inputs (resolve relative to config file location) ---
config_dir = config_path.parent
inputs: dict[str, Path | str] = {}
for key, val in raw.get("inputs", {}).items():
p = Path(val)
if not p.is_absolute():
p = config_dir / p
inputs[key] = p
execution_raw = raw.get("execution", {}) or {}
execution = ExecutionConfig(
mode=execution_raw.get("mode", "agent-decides"),
command_policy=execution_raw.get("command_policy", "broad"),
inherit_env=bool(execution_raw.get("inherit_env", True)),
auto_env_files=list(execution_raw.get("auto_env_files", [".env", ".env.local"])),
env_files=list(execution_raw.get("env_files", [])),
expose_env_names=bool(execution_raw.get("expose_env_names", True)),
auto_context_targets=list(execution_raw.get("auto_context_targets", [])),
)
# --- pipeline (preset or custom) ---
steps, phases = _resolve_pipeline(pipeline_raw, coders, reviewers, seniors)
# Detect preset name for output directory naming
preset_name = "custom"
if isinstance(pipeline_raw, str) and pipeline_raw.startswith("preset:"):
preset_name = pipeline_raw.split(":", 1)[1]
output_dir = Path(raw.get("output_dir", ".cross-eval/output"))
if not output_dir.is_absolute():
output_dir = project_root / output_dir
config = PipelineConfig(
output_dir=output_dir,
use_worktree=bool(raw.get("use_worktree", False)),
max_iterations=int(raw.get("max_iterations", 3)),
min_iterations=int(raw.get("min_iterations", 1)),
verbose=bool(raw.get("verbose", False)),
language=raw.get("language", "en"),
execution=execution,
inputs=inputs,
agents=agents,
coders=coders,
reviewers=reviewers,
seniors=seniors,
pipeline=steps,
phases=phases,
preset_name=preset_name,
_config_path=config_path,
_config_mtime=config_path.stat().st_mtime,
)
sync_phased_iterations(config)
ensure_fix_preset_agentic(config)
return config
def try_reload_config(config: PipelineConfig) -> PipelineConfig:
"""Reload config if the file has been modified on disk.
Returns the new config if reloaded, or the same config if unchanged.
Validation errors during reload are logged but do not crash the pipeline.
"""
if config._config_path is None or config._config_mtime is None:
return config
try:
current_mtime = config._config_path.stat().st_mtime
except OSError:
return config
if current_mtime <= config._config_mtime:
return config
logger.info("Config file changed, reloading: %s", config._config_path.name)
try:
new_config = load_config(config._config_path)
logger.info("Config reloaded successfully")
return new_config
except (ValueError, FileNotFoundError, yaml.YAMLError) as e:
logger.warning("Config reload failed, keeping previous config: %s", e)
return config
def _resolve_pipeline(
pipeline_raw: Any,
coders: list[str],
reviewers: list[str],
seniors: list[str],
) -> tuple[list[StepConfig], list[PhaseConfig]]:
"""Resolve pipeline from preset string or explicit step list.
Returns (steps, phases) tuple. Only one will be non-empty.
- plan-review → steps populated, phases empty.
- coding-plan-review → steps empty, phases populated.
"""
# Preset: "preset:plan-review" or "preset:coding-plan-review"
if isinstance(pipeline_raw, str) and pipeline_raw.startswith("preset:"):
preset_name = pipeline_raw.split(":", 1)[1]
if preset_name in PIPELINE_PRESETS:
return PIPELINE_PRESETS[preset_name](coders, reviewers, seniors), []
if preset_name in PHASED_PRESETS:
return [], PHASED_PRESETS[preset_name](coders, reviewers, seniors)
all_presets = list(PIPELINE_PRESETS.keys()) + list(PHASED_PRESETS.keys())
raise ValueError(
f"Unknown pipeline preset '{preset_name}'. "
f"Available: {all_presets}"
)
# Explicit step list
if isinstance(pipeline_raw, list):
steps = []
for step_data in pipeline_raw:
raw_role = step_data.get("role", "coding")
normalized_role = normalize_step_role(raw_role)
steps.append(StepConfig(
name=step_data["name"],
agent=step_data["agent"],
role=normalized_role,
prompt_template=normalize_prompt_template(
step_data.get("prompt_template", f"default:{normalized_role}")
),
output_key=step_data["output_key"],
verdict=step_data.get("verdict", False),
verdict_pattern=step_data.get("verdict_pattern", r"VERDICT:\s*PASS"),
context_override=step_data.get("context_override", {}),
))
return steps, []
raise ValueError(
f"'pipeline' must be a preset string (e.g. 'preset:plan-review') "
f"or a list of step definitions, got {type(pipeline_raw).__name__}"
)
def validate_config(config: PipelineConfig) -> list[str]:
"""Return list of validation error strings (empty = valid)."""
errors: list[str] = []
if config.phases:
# --- Phased pipeline validation ---
for phase in config.phases:
if not phase.steps:
errors.append(f"Phase '{phase.name}' has no steps")
for step in phase.steps:
if step.agent not in config.agents:
errors.append(
f"Phase '{phase.name}' step '{step.name}' references "
f"undefined agent '{step.agent}'. "
f"Defined agents: {list(config.agents.keys())}"
)
_validate_unique_step_fields(
phase.steps,
errors,
scope=f"Phase '{phase.name}'",
)
# Validate verdict patterns
for step in phase.steps:
if step.verdict:
try:
re.compile(step.verdict_pattern)
except re.error as e:
errors.append(
f"Phase '{phase.name}' step '{step.name}' "
f"has invalid verdict_pattern: {e}"
)
else:
# --- Simple pipeline validation ---
if not config.pipeline:
errors.append("Pipeline must have at least one step")
for step in config.pipeline:
if step.agent not in config.agents:
errors.append(
f"Step '{step.name}' references undefined agent '{step.agent}'. "
f"Defined agents: {list(config.agents.keys())}"
)
_validate_unique_step_fields(
config.pipeline,
errors,
scope="Pipeline",
)
if not any(s.verdict for s in config.pipeline):
errors.append("Pipeline must have at least one step with verdict: true")
for step in config.pipeline:
if step.verdict:
try:
re.compile(step.verdict_pattern)
except re.error as e:
errors.append(
f"Step '{step.name}' has invalid verdict_pattern: {e}"
)
# --- Common validation ---
for key, val in config.inputs.items():
if isinstance(val, Path) and not val.exists():
errors.append(f"Input file '{key}' not found: {val}")
if config.language not in ("en", "ko"):
errors.append(f"Unsupported language '{config.language}'. Use 'en' or 'ko'.")
if config.execution.mode not in {"agent-decides"}:
errors.append(
f"Unsupported execution.mode '{config.execution.mode}'. Use 'agent-decides'."
)
if config.execution.command_policy not in {"broad", "restricted"}:
errors.append(
"Unsupported execution.command_policy "
f"'{config.execution.command_policy}'. Use 'broad' or 'restricted'."
)
return errors
def _validate_unique_step_fields(
steps: list[StepConfig],
errors: list[str],
*,
scope: str,
) -> None:
"""Ensure step names and output keys are unique within a step collection."""
seen_names: set[str] = set()
seen_output_keys: set[str] = set()
for step in steps:
if step.name in seen_names:
errors.append(f"{scope} has duplicate step name '{step.name}'")
seen_names.add(step.name)
if step.output_key in seen_output_keys:
errors.append(f"{scope} has duplicate output_key '{step.output_key}'")
seen_output_keys.add(step.output_key)
def _make_agentic(agent: AgentConfig) -> None:
"""Convert an agent to agentic mode in-place."""
agent.agentic = True
agent.args = [a for a in agent.args if a not in {"-p", "--print"}]
def sync_phased_iterations(
config: PipelineConfig,
max_iter: int | None = None,
) -> None:
"""Apply effective max iterations to converging phases while preserving setup phases."""
if not config.phases:
return
effective_max_iter = config.max_iterations if max_iter is None else max_iter
for phase in config.phases:
if any(step.verdict for step in phase.steps):
phase.max_iterations = effective_max_iter
def ensure_fix_preset_agentic(config: PipelineConfig) -> None:
"""Fix-style presets should modify code, so coders run agentically by default."""
if config.preset_name not in FIX_STYLE_PRESETS:
return
for coder_name in config.coders:
agent = config.agents.get(coder_name)
if agent is not None and not agent.agentic:
_make_agentic(agent)
def apply_input_overrides(
config: PipelineConfig, overrides: dict[str, str]
) -> None:
"""Apply CLI --input overrides to the config."""
for key, path_str in overrides.items():
config.inputs[key] = Path(path_str)