release: cut 0.2.0 baseline
This commit is contained in:
152
cross_eval/runtime_env.py
Normal file
152
cross_eval/runtime_env.py
Normal file
@@ -0,0 +1,152 @@
|
||||
"""Helpers for building agent runtime environments from .env files."""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from cross_eval.models import ExecutionConfig
|
||||
|
||||
_SUMMARY_PREFIXES = (
|
||||
"CLICKHOUSE",
|
||||
"CH_",
|
||||
"DB_",
|
||||
"DATABASE",
|
||||
"PG",
|
||||
"POSTGRES",
|
||||
"MYSQL",
|
||||
"REDIS",
|
||||
"AWS",
|
||||
"S3",
|
||||
)
|
||||
|
||||
|
||||
def _strip_quotes(value: str) -> str:
|
||||
if len(value) >= 2 and value[0] == value[-1] and value[0] in {"'", '"'}:
|
||||
unwrapped = value[1:-1]
|
||||
if value[0] == '"':
|
||||
return bytes(unwrapped, "utf-8").decode("unicode_escape")
|
||||
return unwrapped
|
||||
return value
|
||||
|
||||
|
||||
def parse_dotenv(path: Path) -> dict[str, str]:
|
||||
"""Parse a simple dotenv file into key/value pairs."""
|
||||
values: dict[str, str] = {}
|
||||
for raw_line in path.read_text(encoding="utf-8").splitlines():
|
||||
line = raw_line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
if line.startswith("export "):
|
||||
line = line[len("export ") :].strip()
|
||||
if "=" not in line:
|
||||
continue
|
||||
key, value = line.split("=", 1)
|
||||
key = key.strip()
|
||||
if not key:
|
||||
continue
|
||||
values[key] = _strip_quotes(value.strip())
|
||||
return values
|
||||
|
||||
|
||||
def resolve_env_files(execution: ExecutionConfig, project_root: Path) -> list[Path]:
|
||||
"""Resolve and deduplicate configured env files under the project root."""
|
||||
candidates: list[Path] = []
|
||||
for raw in execution.env_files:
|
||||
path = Path(raw)
|
||||
if not path.is_absolute():
|
||||
path = project_root / path
|
||||
candidates.append(path)
|
||||
|
||||
for raw in execution.auto_env_files:
|
||||
path = project_root / raw
|
||||
candidates.append(path)
|
||||
|
||||
resolved: list[Path] = []
|
||||
seen: set[Path] = set()
|
||||
for path in candidates:
|
||||
try:
|
||||
normalized = path.resolve()
|
||||
except OSError:
|
||||
normalized = path
|
||||
if normalized in seen or not normalized.exists() or not normalized.is_file():
|
||||
continue
|
||||
seen.add(normalized)
|
||||
resolved.append(normalized)
|
||||
return resolved
|
||||
|
||||
|
||||
def build_runtime_environment(
|
||||
execution: ExecutionConfig,
|
||||
project_root: Path,
|
||||
) -> tuple[dict[str, str], list[Path], dict[str, str]]:
|
||||
"""Build subprocess env plus metadata about loaded files and names."""
|
||||
env = os.environ.copy() if execution.inherit_env else {}
|
||||
loaded_files = resolve_env_files(execution, project_root)
|
||||
loaded_values: dict[str, str] = {}
|
||||
for path in loaded_files:
|
||||
file_values = parse_dotenv(path)
|
||||
loaded_values.update(file_values)
|
||||
env.update(file_values)
|
||||
return env, loaded_files, loaded_values
|
||||
|
||||
|
||||
def summarize_environment(
|
||||
execution: ExecutionConfig,
|
||||
loaded_files: list[Path],
|
||||
env: dict[str, str],
|
||||
loaded_values: dict[str, str],
|
||||
) -> str:
|
||||
"""Generate a safe environment summary for prompts without leaking secrets."""
|
||||
lines: list[str] = []
|
||||
if loaded_files:
|
||||
joined = ", ".join(str(path) for path in loaded_files)
|
||||
lines.append(f"Loaded env files into the agent process: {joined}")
|
||||
else:
|
||||
lines.append("No .env file was auto-loaded into the agent process.")
|
||||
|
||||
if execution.auto_context_targets:
|
||||
lines.append(
|
||||
"Execution targets hinted by the user: "
|
||||
+ ", ".join(execution.auto_context_targets)
|
||||
)
|
||||
|
||||
if execution.expose_env_names:
|
||||
visible_names = sorted(
|
||||
{
|
||||
key
|
||||
for key in set(loaded_values) | set(env)
|
||||
if key.startswith(_SUMMARY_PREFIXES)
|
||||
or any(prefix in key for prefix in ("CLICKHOUSE", "DATABASE", "DB_"))
|
||||
}
|
||||
)
|
||||
if visible_names:
|
||||
lines.append("Relevant env var names available to commands: " + ", ".join(visible_names))
|
||||
else:
|
||||
lines.append("No DB/service env var names matched the default summary filters.")
|
||||
else:
|
||||
lines.append("Environment variable values are loaded but names are hidden from the prompt.")
|
||||
|
||||
wants_clickhouse = "clickhouse" in {target.lower() for target in execution.auto_context_targets}
|
||||
clickhouse_keys = [key for key in env if "CLICKHOUSE" in key or key.startswith("CH_")]
|
||||
if wants_clickhouse or clickhouse_keys:
|
||||
if clickhouse_keys:
|
||||
lines.append("ClickHouse-related environment variables are available to the agent.")
|
||||
else:
|
||||
lines.append("No ClickHouse-specific env vars were detected in the loaded environment.")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def build_execution_policy(execution: ExecutionConfig) -> str:
|
||||
"""Describe the execution latitude granted to agentic coders/reviewers."""
|
||||
lines = [
|
||||
f"Execution mode: {execution.mode}",
|
||||
f"Command policy: {execution.command_policy}",
|
||||
"The agent may choose shell, Python, git, docker, test, and database commands on its own when needed.",
|
||||
"The user does not need to pre-specify exact commands.",
|
||||
]
|
||||
if execution.command_policy == "broad":
|
||||
lines.append("Prefer direct validation by running the minimum set of commands needed to prove a fix.")
|
||||
else:
|
||||
lines.append("Keep command usage minimal and focused on validation.")
|
||||
return "\n".join(lines)
|
||||
Reference in New Issue
Block a user