498 lines
17 KiB
Python
498 lines
17 KiB
Python
"""Markdown report generation."""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from itertools import groupby
|
|
|
|
from cross_eval.models import (
|
|
IterationResult,
|
|
PipelineConfig,
|
|
PipelineResult,
|
|
ReviewMetrics,
|
|
StepConfig,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# i18n strings
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_STRINGS: dict[str, dict[str, str]] = {
|
|
"en": {
|
|
"title": "Cross-Eval Report",
|
|
"summary": "Summary",
|
|
"prop": "Property",
|
|
"val": "Value",
|
|
"total_iter": "Total Iterations",
|
|
"final_verdict": "Final Verdict",
|
|
"duration": "Duration",
|
|
"max_iter": "Max Iterations",
|
|
"phases_label": "Phases",
|
|
"iteration": "Iteration",
|
|
"phase": "Phase",
|
|
"steps": "Steps",
|
|
"max_iterations": "Max iterations",
|
|
"consec_pass": "Consecutive PASS required",
|
|
"step": "Step",
|
|
"verdict": "Verdict",
|
|
"output_chars": "Output ({n} chars)",
|
|
"feedback_next": "Feedback for next iteration:",
|
|
"oos_title": "Out of Scope Issues",
|
|
"oos_desc": (
|
|
"The following issues were found outside the plan/checklist scope "
|
|
"but are worth noting."
|
|
),
|
|
"final_verdict_title": "Final Verdict",
|
|
"repeat_title": "Repeated Aggregate Findings",
|
|
"repeat_desc": "The following aggregate-review outputs repeated across iterations.",
|
|
"pass_msg": "All checklist items satisfied. No over-engineering or omissions detected.",
|
|
"fail_phased": "Pipeline phases ({phases}) completed without full convergence.",
|
|
"fail_simple": "Maximum iterations ({max_iter}) reached without passing all checks.",
|
|
"metrics_title": "Review Metrics",
|
|
"metrics_trend_title": "Metrics Trend",
|
|
"metrics_iter": "Iter",
|
|
"metrics_total_issues": "Total Issues",
|
|
"metrics_na": "N/A",
|
|
},
|
|
"ko": {
|
|
"title": "교차 검증 리포트",
|
|
"summary": "요약",
|
|
"prop": "항목",
|
|
"val": "값",
|
|
"total_iter": "총 반복 횟수",
|
|
"final_verdict": "최종 판정",
|
|
"duration": "소요 시간",
|
|
"max_iter": "최대 반복",
|
|
"phases_label": "페이즈",
|
|
"iteration": "반복",
|
|
"phase": "페이즈",
|
|
"steps": "단계",
|
|
"max_iterations": "최대 반복",
|
|
"consec_pass": "연속 PASS 필요",
|
|
"step": "단계",
|
|
"verdict": "판정",
|
|
"output_chars": "출력 ({n}자)",
|
|
"feedback_next": "다음 반복을 위한 피드백:",
|
|
"oos_title": "범위 밖 이슈",
|
|
"oos_desc": (
|
|
"아래는 기획서/체크리스트 범위 밖이지만 "
|
|
"리뷰 중 발견된 이슈입니다."
|
|
),
|
|
"final_verdict_title": "최종 판정",
|
|
"repeat_title": "반복된 Aggregate 이슈",
|
|
"repeat_desc": "아래 aggregate-review 결과가 여러 반복에서 동일하게 다시 나타났습니다.",
|
|
"pass_msg": "모든 체크리스트 항목 충족. 과최적화/누락 없음.",
|
|
"fail_phased": "파이프라인 페이즈 ({phases}) 완료, 완전한 수렴에 도달하지 못함.",
|
|
"fail_simple": "최대 반복 횟수 ({max_iter})에 도달, 모든 검증을 통과하지 못함.",
|
|
"metrics_title": "리뷰 메트릭",
|
|
"metrics_trend_title": "메트릭 추이",
|
|
"metrics_iter": "반복",
|
|
"metrics_total_issues": "총 이슈",
|
|
"metrics_na": "해당 없음",
|
|
},
|
|
}
|
|
|
|
|
|
def _t(config: PipelineConfig, key: str, **kwargs: str) -> str:
|
|
"""Get translated string."""
|
|
lang = getattr(config, "language", "en")
|
|
strings = _STRINGS.get(lang, _STRINGS["en"])
|
|
s = strings.get(key, _STRINGS["en"].get(key, key))
|
|
if kwargs:
|
|
s = s.format(**kwargs)
|
|
return s
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Review output parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse_review_metrics(output: str) -> ReviewMetrics:
|
|
"""Parse review output to extract severity, category, and assessment counts."""
|
|
metrics = ReviewMetrics()
|
|
|
|
# Severity: count tagged issue lines (e.g. "[Critical]", "[Major]", "[Minor]")
|
|
metrics.critical = len(re.findall(r"\[Critical\]", output, re.IGNORECASE))
|
|
metrics.major = len(re.findall(r"\[Major\]", output, re.IGNORECASE))
|
|
metrics.minor = len(re.findall(r"\[Minor\]", output, re.IGNORECASE))
|
|
|
|
# Categories (EN and KO variants)
|
|
metrics.over_engineering = len(re.findall(
|
|
r"\[Over-engineering\]|\[과최적화\]", output, re.IGNORECASE,
|
|
))
|
|
metrics.omission = len(re.findall(
|
|
r"\[Omission\]|\[누락\]", output, re.IGNORECASE,
|
|
))
|
|
|
|
# Assessments — match "CONFIRMED: <description>" but not summary "CONFIRMED: N"
|
|
metrics.confirmed = len(re.findall(r"\bCONFIRMED:\s+(?!\d)", output))
|
|
metrics.dismissed = len(re.findall(r"\bDISMISSED\b(?:\s*\([^)]*\))?\s*:\s+(?!\d)", output))
|
|
|
|
return metrics
|
|
|
|
|
|
def _aggregate_metrics(a: ReviewMetrics, b: ReviewMetrics) -> ReviewMetrics:
|
|
"""Combine metrics from two review steps."""
|
|
return ReviewMetrics(
|
|
critical=a.critical + b.critical,
|
|
major=a.major + b.major,
|
|
minor=a.minor + b.minor,
|
|
over_engineering=a.over_engineering + b.over_engineering,
|
|
omission=a.omission + b.omission,
|
|
confirmed=a.confirmed + b.confirmed,
|
|
dismissed=a.dismissed + b.dismissed,
|
|
)
|
|
|
|
|
|
def _extract_out_of_scope(output: str) -> str:
|
|
"""Extract the 'Out of Scope Issues' section from review output.
|
|
|
|
Looks for '### Out of Scope Issues' or '### 범위 밖 이슈' heading,
|
|
captures text until the next '###' heading or end of string.
|
|
Returns empty string if not found or contains only 'None'/'없음'.
|
|
"""
|
|
pattern = r"###\s*(?:Out of Scope Issues|범위 밖 이슈)\s*\n(.*?)(?=\n###|\Z)"
|
|
match = re.search(pattern, output, re.DOTALL)
|
|
if not match:
|
|
return ""
|
|
content = match.group(1).strip()
|
|
if content.lower() in ("none", "없음", ""):
|
|
return ""
|
|
return content
|
|
|
|
|
|
def build_report(config: PipelineConfig, result: PipelineResult) -> str:
|
|
"""Build the complete markdown report string."""
|
|
has_phases = any(ir.phase_name for ir in result.iterations)
|
|
|
|
if has_phases:
|
|
return _build_phased_report(config, result)
|
|
return _build_simple_report(config, result)
|
|
|
|
|
|
def _build_simple_report(
|
|
config: PipelineConfig, result: PipelineResult,
|
|
) -> str:
|
|
"""Build report for a non-phased (simple) pipeline run."""
|
|
lines: list[str] = []
|
|
|
|
lines.append(f"# {_t(config, 'title')}\n")
|
|
_append_summary_table(lines, config, result)
|
|
|
|
out_of_scope_items: list[tuple[int, str]] = []
|
|
|
|
for iter_result in result.iterations:
|
|
lines.append("---\n")
|
|
lines.append(f"## {_t(config, 'iteration')} {iter_result.iteration}\n")
|
|
|
|
_append_iteration_steps(lines, config, iter_result, config.pipeline, out_of_scope_items)
|
|
|
|
if iter_result.feedback:
|
|
lines.append(f"**{_t(config, 'feedback_next')}** {iter_result.feedback[:200]}...")
|
|
lines.append("")
|
|
|
|
_append_out_of_scope(lines, config, out_of_scope_items)
|
|
_append_review_metrics_table(lines, config, result)
|
|
_append_repeated_aggregate(lines, config, result)
|
|
_append_final_verdict(lines, config, result)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_phased_report(
|
|
config: PipelineConfig, result: PipelineResult,
|
|
) -> str:
|
|
"""Build report for a phased pipeline run (e.g. review-fix)."""
|
|
lines: list[str] = []
|
|
|
|
lines.append(f"# {_t(config, 'title')}\n")
|
|
_append_summary_table(lines, config, result, phased=True)
|
|
|
|
phase_map = {p.name: p for p in config.phases}
|
|
out_of_scope_items: list[tuple[int, str]] = []
|
|
|
|
for phase_name, phase_iters_iter in groupby(
|
|
result.iterations, key=lambda ir: ir.phase_name,
|
|
):
|
|
phase_iters = list(phase_iters_iter)
|
|
phase_config = phase_map.get(phase_name or "")
|
|
|
|
lines.append("---\n")
|
|
lines.append(f"## {_t(config, 'phase')}: {phase_name}\n")
|
|
|
|
if phase_config:
|
|
step_desc = " → ".join(s.name for s in phase_config.steps)
|
|
lines.append(
|
|
f"{_t(config, 'steps')}: {step_desc} | "
|
|
f"{_t(config, 'max_iterations')}: {phase_config.max_iterations} | "
|
|
f"{_t(config, 'consec_pass')}: {phase_config.consecutive_pass}\n"
|
|
)
|
|
|
|
steps = phase_config.steps if phase_config else config.pipeline
|
|
|
|
consecutive = 0
|
|
for iter_result in phase_iters:
|
|
verdict_label = ""
|
|
if iter_result.verdict:
|
|
if iter_result.verdict == "PASS":
|
|
consecutive += 1
|
|
if phase_config and phase_config.consecutive_pass > 1:
|
|
verdict_label = f" — PASS ({consecutive}/{phase_config.consecutive_pass})"
|
|
if consecutive >= phase_config.consecutive_pass:
|
|
verdict_label += " ✓"
|
|
else:
|
|
verdict_label = " — PASS ✓"
|
|
else:
|
|
consecutive = 0
|
|
verdict_label = " — FAIL"
|
|
|
|
lines.append(
|
|
f"### {_t(config, 'iteration')} {iter_result.iteration}{verdict_label}\n"
|
|
)
|
|
_append_iteration_steps(lines, config, iter_result, steps, out_of_scope_items)
|
|
|
|
if iter_result.feedback:
|
|
lines.append(
|
|
f"**{_t(config, 'feedback_next')}** {iter_result.feedback[:200]}..."
|
|
)
|
|
lines.append("")
|
|
|
|
_append_out_of_scope(lines, config, out_of_scope_items)
|
|
_append_review_metrics_table(lines, config, result)
|
|
_append_repeated_aggregate(lines, config, result)
|
|
_append_final_verdict(lines, config, result)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _append_summary_table(
|
|
lines: list[str],
|
|
config: PipelineConfig,
|
|
result: PipelineResult,
|
|
phased: bool = False,
|
|
) -> None:
|
|
"""Append the summary table to lines."""
|
|
total_iter = len(result.iterations)
|
|
minutes = int(result.total_duration // 60)
|
|
seconds = int(result.total_duration % 60)
|
|
duration_str = f"{minutes}m {seconds}s" if minutes else f"{seconds}s"
|
|
|
|
lines.append(f"## {_t(config, 'summary')}\n")
|
|
lines.append(f"| {_t(config, 'prop')} | {_t(config, 'val')} |")
|
|
lines.append("|----------|-------|")
|
|
lines.append(f"| {_t(config, 'total_iter')} | {total_iter} |")
|
|
lines.append(f"| {_t(config, 'final_verdict')} | **{result.final_verdict}** |")
|
|
lines.append(f"| {_t(config, 'duration')} | {duration_str} |")
|
|
|
|
if phased and config.phases:
|
|
phase_names = " → ".join(p.name for p in config.phases)
|
|
lines.append(f"| {_t(config, 'phases_label')} | {phase_names} |")
|
|
for p in config.phases:
|
|
lines.append(
|
|
f"| {_t(config, 'phase')}: {p.name} | "
|
|
f"{_t(config, 'max_iterations')} {p.max_iterations}, "
|
|
f"{p.consecutive_pass}x {_t(config, 'consec_pass')} |"
|
|
)
|
|
else:
|
|
lines.append(f"| {_t(config, 'max_iter')} | {config.max_iterations} |")
|
|
|
|
lines.append("")
|
|
|
|
|
|
def _append_iteration_steps(
|
|
lines: list[str],
|
|
config: PipelineConfig,
|
|
iter_result: IterationResult,
|
|
steps: list[StepConfig],
|
|
out_of_scope_items: list[tuple[int, str]],
|
|
) -> None:
|
|
"""Append step details for one iteration."""
|
|
for step in steps:
|
|
agent_result = iter_result.step_results.get(step.output_key)
|
|
output = iter_result.step_outputs.get(step.output_key, "")
|
|
|
|
agent_name = agent_result.agent_name if agent_result else step.agent
|
|
duration = f" ({agent_result.duration_seconds}s)" if agent_result else ""
|
|
|
|
lines.append(f"### {_t(config, 'step')}: {step.name} ({agent_name}){duration}\n")
|
|
|
|
if step.verdict and iter_result.verdict:
|
|
lines.append(f"**{_t(config, 'verdict')}: {iter_result.verdict}**\n")
|
|
|
|
if len(output) > 500:
|
|
lines.append("<details>")
|
|
lines.append(
|
|
f"<summary>{_t(config, 'output_chars', n=str(len(output)))}</summary>\n"
|
|
)
|
|
lines.append(output)
|
|
lines.append("\n</details>\n")
|
|
else:
|
|
lines.append(output)
|
|
lines.append("")
|
|
|
|
if step.role == "review":
|
|
oos = _extract_out_of_scope(output)
|
|
if oos:
|
|
out_of_scope_items.append((iter_result.iteration, oos))
|
|
|
|
# Parse and accumulate review metrics for this iteration
|
|
step_metrics = parse_review_metrics(output)
|
|
if iter_result.review_metrics is None:
|
|
iter_result.review_metrics = step_metrics
|
|
else:
|
|
iter_result.review_metrics = _aggregate_metrics(
|
|
iter_result.review_metrics, step_metrics,
|
|
)
|
|
|
|
|
|
def _append_review_metrics_table(
|
|
lines: list[str],
|
|
config: PipelineConfig,
|
|
result: PipelineResult,
|
|
) -> None:
|
|
"""Append per-iteration review metrics table and trend summary."""
|
|
# Only include if at least one iteration has metrics
|
|
has_metrics = any(ir.review_metrics for ir in result.iterations)
|
|
if not has_metrics:
|
|
return
|
|
|
|
na = _t(config, "metrics_na")
|
|
|
|
lines.append("---\n")
|
|
lines.append(f"## {_t(config, 'metrics_title')}\n")
|
|
|
|
# Table header
|
|
lines.append(
|
|
f"| {_t(config, 'metrics_iter')} | {_t(config, 'verdict')} "
|
|
f"| Critical | Major | Minor "
|
|
f"| Over-eng | Omission "
|
|
f"| CONFIRMED | DISMISSED |"
|
|
)
|
|
lines.append("|------|---------|----------|-------|-------|----------|----------|-----------|-----------|")
|
|
|
|
# Table rows
|
|
for ir in result.iterations:
|
|
m = ir.review_metrics
|
|
v = ir.verdict or "-"
|
|
if m:
|
|
lines.append(
|
|
f"| {ir.iteration} | {v} "
|
|
f"| {m.critical} | {m.major} | {m.minor} "
|
|
f"| {m.over_engineering} | {m.omission} "
|
|
f"| {m.confirmed} | {m.dismissed} |"
|
|
)
|
|
else:
|
|
lines.append(
|
|
f"| {ir.iteration} | {v} "
|
|
f"| {na} | {na} | {na} "
|
|
f"| {na} | {na} "
|
|
f"| {na} | {na} |"
|
|
)
|
|
|
|
lines.append("")
|
|
|
|
# Trend summary
|
|
metrics_list = [
|
|
(ir.iteration, ir.review_metrics)
|
|
for ir in result.iterations
|
|
if ir.review_metrics
|
|
]
|
|
if len(metrics_list) >= 2:
|
|
lines.append(f"### {_t(config, 'metrics_trend_title')}\n")
|
|
_append_trend_line(
|
|
lines, "Issues",
|
|
[(it, m.critical + m.major + m.minor) for it, m in metrics_list],
|
|
)
|
|
_append_trend_line(
|
|
lines, "Over-engineering",
|
|
[(it, m.over_engineering) for it, m in metrics_list],
|
|
)
|
|
_append_trend_line(
|
|
lines, "Omission",
|
|
[(it, m.omission) for it, m in metrics_list],
|
|
)
|
|
_append_trend_line(
|
|
lines, "CONFIRMED",
|
|
[(it, m.confirmed) for it, m in metrics_list],
|
|
)
|
|
_append_trend_line(
|
|
lines, "DISMISSED",
|
|
[(it, m.dismissed) for it, m in metrics_list],
|
|
)
|
|
lines.append("")
|
|
|
|
|
|
def _append_trend_line(
|
|
lines: list[str],
|
|
label: str,
|
|
values: list[tuple[int, int]],
|
|
) -> None:
|
|
"""Append a single trend line like '- Issues: 6 -> 2 -> 0 (decreasing)'."""
|
|
nums = [v for _, v in values]
|
|
arrow = " → ".join(str(n) for n in nums)
|
|
if nums[-1] < nums[0]:
|
|
direction = "decreasing"
|
|
elif nums[-1] > nums[0]:
|
|
direction = "increasing"
|
|
else:
|
|
direction = "stable"
|
|
lines.append(f"- {label}: {arrow} ({direction})")
|
|
|
|
|
|
def _append_out_of_scope(
|
|
lines: list[str],
|
|
config: PipelineConfig,
|
|
out_of_scope_items: list[tuple[int, str]],
|
|
) -> None:
|
|
"""Append the out-of-scope issues section if any exist."""
|
|
if not out_of_scope_items:
|
|
return
|
|
lines.append("---\n")
|
|
lines.append(f"## {_t(config, 'oos_title')}\n")
|
|
lines.append(f"{_t(config, 'oos_desc')}\n")
|
|
for iteration_num, content in out_of_scope_items:
|
|
lines.append(f"### {_t(config, 'iteration')} {iteration_num}\n")
|
|
lines.append(content)
|
|
lines.append("")
|
|
|
|
|
|
def _append_final_verdict(
|
|
lines: list[str],
|
|
config: PipelineConfig,
|
|
result: PipelineResult,
|
|
) -> None:
|
|
"""Append the final verdict section."""
|
|
lines.append("---\n")
|
|
lines.append(f"## {_t(config, 'final_verdict_title')}: {result.final_verdict}\n")
|
|
|
|
if result.final_verdict == "PASS":
|
|
lines.append(_t(config, "pass_msg"))
|
|
else:
|
|
if config.phases:
|
|
phase_names = " → ".join(p.name for p in config.phases)
|
|
lines.append(_t(config, "fail_phased", phases=phase_names))
|
|
else:
|
|
lines.append(
|
|
_t(config, "fail_simple", max_iter=str(config.max_iterations))
|
|
)
|
|
|
|
|
|
def _append_repeated_aggregate(
|
|
lines: list[str],
|
|
config: PipelineConfig,
|
|
result: PipelineResult,
|
|
) -> None:
|
|
"""Append repeated aggregate warnings if any exist."""
|
|
if not result.repeated_aggregate_warnings:
|
|
return
|
|
lines.append("---\n")
|
|
lines.append(f"## {_t(config, 'repeat_title')}\n")
|
|
lines.append(f"{_t(config, 'repeat_desc')}\n")
|
|
for warning in result.repeated_aggregate_warnings:
|
|
lines.append(f"- {warning}")
|
|
lines.append("")
|