407 lines
16 KiB
Python
407 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
import tempfile
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
from cross_eval.agent import invoke_agent
|
|
from cross_eval.config import BUILTIN_AGENTS
|
|
from cross_eval.discovery import discover_repo, format_repo_discovery
|
|
from cross_eval.models import AgentConfig, AgentResult, PipelineConfig
|
|
from cross_eval.pipeline import run_pipeline
|
|
from cross_eval.prompts import _build_simple_preset
|
|
from cross_eval.runtime_env import build_runtime_environment, summarize_environment
|
|
|
|
|
|
class RuntimeEnvTest(unittest.TestCase):
|
|
def test_build_runtime_environment_loads_dotenv_values(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / ".env").write_text(
|
|
"CLICKHOUSE_URL=http://localhost:8123\nDATABASE_URL=postgres://db\n",
|
|
encoding="utf-8",
|
|
)
|
|
execution = PipelineConfig().execution
|
|
env, loaded_files, loaded_values = build_runtime_environment(execution, root)
|
|
|
|
self.assertEqual(loaded_files[0].name, ".env")
|
|
self.assertEqual(loaded_values["CLICKHOUSE_URL"], "http://localhost:8123")
|
|
self.assertEqual(env["DATABASE_URL"], "postgres://db")
|
|
|
|
def test_summarize_environment_mentions_clickhouse_from_env(self) -> None:
|
|
execution = PipelineConfig().execution
|
|
summary = summarize_environment(
|
|
execution,
|
|
[Path("/tmp/.env")],
|
|
{"CLICKHOUSE_URL": "http://localhost:8123"},
|
|
{"CLICKHOUSE_URL": "http://localhost:8123"},
|
|
)
|
|
self.assertIn("CLICKHOUSE_URL", summary)
|
|
self.assertIn("ClickHouse-related", summary)
|
|
|
|
|
|
class RepoDiscoveryTest(unittest.TestCase):
|
|
def test_discover_repo_detects_python_postgres_and_clickhouse(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / "pyproject.toml").write_text(
|
|
'[project]\nname = "svc"\ndependencies = ["psycopg", "clickhouse-driver"]\n',
|
|
encoding="utf-8",
|
|
)
|
|
(root / "docker-compose.yml").write_text(
|
|
"services:\n db:\n image: postgres:16\n ch:\n image: clickhouse/clickhouse-server:latest\n",
|
|
encoding="utf-8",
|
|
)
|
|
discovery = discover_repo(root, {"DATABASE_URL", "CLICKHOUSE_URL"})
|
|
summary = format_repo_discovery(discovery)
|
|
|
|
self.assertIn("python", discovery.languages)
|
|
self.assertIn("postgresql", discovery.databases)
|
|
self.assertIn("clickhouse", discovery.databases)
|
|
self.assertIn("Detected local service containers", summary)
|
|
|
|
|
|
class PromptContextTest(unittest.TestCase):
|
|
def test_run_pipeline_injects_env_and_discovery_context_into_prompt(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / ".env").write_text("CLICKHOUSE_URL=http://localhost:8123\n", encoding="utf-8")
|
|
steps = _build_simple_preset(["claude-coder"], ["claude-reviewer"], [])
|
|
config = PipelineConfig(
|
|
output_dir=root / "out",
|
|
max_iterations=1,
|
|
language="en",
|
|
inputs={"plan": "Plan", "checklist": "Checklist"},
|
|
agents={name: agent for name, agent in BUILTIN_AGENTS.items()},
|
|
coders=["claude-coder"],
|
|
reviewers=["claude-reviewer"],
|
|
pipeline=steps,
|
|
preset_name="simple",
|
|
)
|
|
prompts: list[str] = []
|
|
|
|
def _fake_invoke(agent_config, prompt, step_name, **kwargs):
|
|
prompts.append(prompt)
|
|
output = "VERDICT: PASS" if step_name == "review" else "coding output"
|
|
return AgentResult(
|
|
output=output,
|
|
exit_code=0,
|
|
agent_name=agent_config.name,
|
|
step_name=step_name,
|
|
duration_seconds=0.1,
|
|
transcript="# Agent Execution Transcript",
|
|
)
|
|
|
|
with patch("cross_eval.pipeline.invoke_agent", side_effect=_fake_invoke):
|
|
run_pipeline(config, cwd=root)
|
|
|
|
joined = "\n".join(prompts)
|
|
self.assertIn("Execution Policy", joined)
|
|
self.assertIn("Environment Context", joined)
|
|
self.assertIn("Repository Discovery", joined)
|
|
self.assertIn("ClickHouse-related environment variables are available", joined)
|
|
self.assertTrue((root / "out").exists())
|
|
|
|
|
|
class AgentTranscriptTest(unittest.TestCase):
|
|
def test_invoke_agent_records_transcript(self) -> None:
|
|
def _fake_run(cmd, **kwargs):
|
|
class _Result:
|
|
returncode = 0
|
|
stdout = "hello"
|
|
stderr = "warn"
|
|
|
|
return _Result()
|
|
|
|
agent = AgentConfig(
|
|
name="codex-reviewer",
|
|
command="codex",
|
|
args=["exec", "--model", "gpt-5.4", "-"],
|
|
)
|
|
|
|
with patch("subprocess.run", side_effect=_fake_run):
|
|
result = invoke_agent(agent, "prompt", "review", quiet=True)
|
|
|
|
self.assertIn("## Command", result.transcript)
|
|
self.assertIn("hello", result.transcript)
|
|
self.assertIn("warn", result.transcript)
|
|
|
|
def test_invoke_agent_transcript_includes_exit_code_and_duration(self) -> None:
|
|
def _fake_run(cmd, **kwargs):
|
|
class _Result:
|
|
returncode = 0
|
|
stdout = "output"
|
|
stderr = ""
|
|
|
|
return _Result()
|
|
|
|
agent = AgentConfig(
|
|
name="codex-reviewer",
|
|
command="codex",
|
|
args=["exec", "--model", "gpt-5.4", "-"],
|
|
)
|
|
|
|
with patch("subprocess.run", side_effect=_fake_run):
|
|
result = invoke_agent(agent, "prompt", "review", quiet=True)
|
|
|
|
self.assertIn("## Exit Code: 0", result.transcript)
|
|
|
|
|
|
class RepoDiscoveryExtendedTest(unittest.TestCase):
|
|
"""Regression tests for broadened repo/service discovery signals."""
|
|
|
|
def test_discover_go_project(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / "go.mod").write_text(
|
|
"module example.com/myapp\n\ngo 1.21\n",
|
|
encoding="utf-8",
|
|
)
|
|
discovery = discover_repo(root)
|
|
|
|
self.assertIn("go", discovery.languages)
|
|
self.assertIn("go", discovery.package_managers)
|
|
|
|
def test_discover_rust_project(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / "Cargo.toml").write_text(
|
|
'[package]\nname = "myapp"\nversion = "0.1.0"\n',
|
|
encoding="utf-8",
|
|
)
|
|
discovery = discover_repo(root)
|
|
|
|
self.assertIn("rust", discovery.languages)
|
|
self.assertIn("cargo", discovery.package_managers)
|
|
|
|
def test_discover_ruby_project(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / "Gemfile").write_text(
|
|
'source "https://rubygems.org"\ngem "rails"\n',
|
|
encoding="utf-8",
|
|
)
|
|
discovery = discover_repo(root)
|
|
|
|
self.assertIn("ruby", discovery.languages)
|
|
self.assertIn("bundler", discovery.package_managers)
|
|
|
|
def test_discover_java_gradle_project(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / "build.gradle").write_text(
|
|
"plugins { id 'java' }\n",
|
|
encoding="utf-8",
|
|
)
|
|
discovery = discover_repo(root)
|
|
|
|
self.assertIn("java", discovery.languages)
|
|
self.assertIn("gradle", discovery.package_managers)
|
|
|
|
def test_discover_elasticsearch_from_compose(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / "docker-compose.yml").write_text(
|
|
"services:\n es:\n image: elasticsearch:8.10.0\n",
|
|
encoding="utf-8",
|
|
)
|
|
discovery = discover_repo(root)
|
|
|
|
self.assertIn("elasticsearch", discovery.services)
|
|
|
|
def test_discover_kafka_from_compose(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / "docker-compose.yml").write_text(
|
|
"services:\n broker:\n image: confluentinc/cp-kafka:latest\n",
|
|
encoding="utf-8",
|
|
)
|
|
discovery = discover_repo(root)
|
|
|
|
self.assertIn("kafka", discovery.services)
|
|
|
|
def test_discover_rabbitmq_from_env(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
discovery = discover_repo(root, {"RABBITMQ_URL"})
|
|
|
|
self.assertIn("rabbitmq", discovery.databases)
|
|
|
|
def test_discover_sqlite_from_requirements(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / "requirements.txt").write_text(
|
|
"aiosqlite==0.19.0\nfastapi\n",
|
|
encoding="utf-8",
|
|
)
|
|
discovery = discover_repo(root)
|
|
|
|
self.assertIn("python", discovery.languages)
|
|
self.assertIn("sqlite", discovery.databases)
|
|
|
|
def test_discover_dynamodb_from_env(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
discovery = discover_repo(root, {"DYNAMODB_TABLE"})
|
|
|
|
self.assertIn("dynamodb", discovery.databases)
|
|
|
|
def test_discover_frameworks_from_pyproject(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / "pyproject.toml").write_text(
|
|
'[project]\nname = "svc"\ndependencies = ["fastapi", "uvicorn"]\n',
|
|
encoding="utf-8",
|
|
)
|
|
discovery = discover_repo(root)
|
|
|
|
self.assertIn("fastapi", discovery.frameworks)
|
|
|
|
def test_discover_knex_hint(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / "knexfile.js").write_text(
|
|
"module.exports = {};\n",
|
|
encoding="utf-8",
|
|
)
|
|
discovery = discover_repo(root)
|
|
|
|
self.assertIn("Knex migration config detected.", discovery.hints)
|
|
|
|
def test_discover_makefile_hint(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / "Makefile").write_text(
|
|
"all:\n\techo hello\n",
|
|
encoding="utf-8",
|
|
)
|
|
discovery = discover_repo(root)
|
|
|
|
self.assertIn("Makefile available for build/task automation.", discovery.hints)
|
|
|
|
def test_format_repo_discovery_includes_frameworks(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / "package.json").write_text(
|
|
'{"dependencies": {"express": "^4.18.0"}}',
|
|
encoding="utf-8",
|
|
)
|
|
discovery = discover_repo(root)
|
|
summary = format_repo_discovery(discovery)
|
|
|
|
self.assertIn("Detected frameworks", summary)
|
|
self.assertIn("express", summary)
|
|
|
|
def test_discover_pnpm_lockfile(self) -> None:
|
|
"""Detect pnpm from lockfile when no packageManager field."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / "package.json").write_text(
|
|
'{"name": "app"}',
|
|
encoding="utf-8",
|
|
)
|
|
(root / "pnpm-lock.yaml").write_text("lockfileVersion: 6\n", encoding="utf-8")
|
|
discovery = discover_repo(root)
|
|
|
|
self.assertIn("pnpm", discovery.package_managers)
|
|
|
|
def test_discover_yarn_lockfile(self) -> None:
|
|
"""Detect yarn from lockfile when no packageManager field."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
(root / "package.json").write_text(
|
|
'{"name": "app"}',
|
|
encoding="utf-8",
|
|
)
|
|
(root / "yarn.lock").write_text("# yarn lockfile v1\n", encoding="utf-8")
|
|
discovery = discover_repo(root)
|
|
|
|
self.assertIn("yarn", discovery.package_managers)
|
|
|
|
|
|
class SummarizeEnvExtendedTest(unittest.TestCase):
|
|
"""Regression tests for expanded environment summary prefixes."""
|
|
|
|
def test_summarize_shows_mongo_env_var(self) -> None:
|
|
execution = PipelineConfig().execution
|
|
summary = summarize_environment(
|
|
execution,
|
|
[Path("/tmp/.env")],
|
|
{"MONGO_URI": "mongodb://localhost"},
|
|
{"MONGO_URI": "mongodb://localhost"},
|
|
)
|
|
self.assertIn("MONGO_URI", summary)
|
|
|
|
def test_summarize_shows_kafka_env_var(self) -> None:
|
|
execution = PipelineConfig().execution
|
|
summary = summarize_environment(
|
|
execution,
|
|
[Path("/tmp/.env")],
|
|
{"KAFKA_BOOTSTRAP_SERVERS": "localhost:9092"},
|
|
{"KAFKA_BOOTSTRAP_SERVERS": "localhost:9092"},
|
|
)
|
|
self.assertIn("KAFKA_BOOTSTRAP_SERVERS", summary)
|
|
|
|
def test_summarize_shows_elasticsearch_env_var(self) -> None:
|
|
execution = PipelineConfig().execution
|
|
summary = summarize_environment(
|
|
execution,
|
|
[Path("/tmp/.env")],
|
|
{"ELASTICSEARCH_URL": "http://localhost:9200"},
|
|
{"ELASTICSEARCH_URL": "http://localhost:9200"},
|
|
)
|
|
self.assertIn("ELASTICSEARCH_URL", summary)
|
|
|
|
|
|
class TranscriptSavingRegressionTest(unittest.TestCase):
|
|
"""Verify that transcripts are saved as step artifacts during pipeline runs."""
|
|
|
|
def test_transcript_files_saved_during_pipeline(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
steps = _build_simple_preset(["claude-coder"], ["claude-reviewer"], [])
|
|
config = PipelineConfig(
|
|
output_dir=root / "out",
|
|
max_iterations=1,
|
|
language="en",
|
|
inputs={"plan": "Plan", "checklist": "Checklist"},
|
|
agents={name: agent for name, agent in BUILTIN_AGENTS.items()},
|
|
coders=["claude-coder"],
|
|
reviewers=["claude-reviewer"],
|
|
pipeline=steps,
|
|
preset_name="simple",
|
|
)
|
|
|
|
def _fake_invoke(agent_config, prompt, step_name, **kwargs):
|
|
output = "VERDICT: PASS" if step_name == "review" else "coding output"
|
|
return AgentResult(
|
|
output=output,
|
|
exit_code=0,
|
|
agent_name=agent_config.name,
|
|
step_name=step_name,
|
|
duration_seconds=0.1,
|
|
transcript="# Agent Execution Transcript\n\n## Command\n```\nclaude -p\n```",
|
|
)
|
|
|
|
with patch("cross_eval.pipeline.invoke_agent", side_effect=_fake_invoke):
|
|
result = run_pipeline(config, cwd=root)
|
|
|
|
# Verify transcript files were saved
|
|
run_dir = result.run_dir
|
|
self.assertIsNotNone(run_dir)
|
|
coding_transcript = run_dir / "v1" / "coding_transcript.md"
|
|
review_transcript = run_dir / "v1" / "review_transcript.md"
|
|
self.assertTrue(
|
|
coding_transcript.exists(),
|
|
f"Expected transcript at {coding_transcript}",
|
|
)
|
|
self.assertTrue(
|
|
review_transcript.exists(),
|
|
f"Expected transcript at {review_transcript}",
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|