"""Markdown report generation.""" from __future__ import annotations import re from itertools import groupby from cross_eval.models import ( IterationResult, PipelineConfig, PipelineResult, ReviewMetrics, StepConfig, ) # --------------------------------------------------------------------------- # i18n strings # --------------------------------------------------------------------------- _STRINGS: dict[str, dict[str, str]] = { "en": { "title": "Cross-Eval Report", "summary": "Summary", "prop": "Property", "val": "Value", "total_iter": "Total Iterations", "final_verdict": "Final Verdict", "duration": "Duration", "max_iter": "Max Iterations", "phases_label": "Phases", "iteration": "Iteration", "phase": "Phase", "steps": "Steps", "max_iterations": "Max iterations", "consec_pass": "Consecutive PASS required", "step": "Step", "verdict": "Verdict", "output_chars": "Output ({n} chars)", "feedback_next": "Feedback for next iteration:", "oos_title": "Out of Scope Issues", "oos_desc": ( "The following issues were found outside the plan/checklist scope " "but are worth noting." ), "final_verdict_title": "Final Verdict", "repeat_title": "Repeated Aggregate Findings", "repeat_desc": "The following aggregate-review outputs repeated across iterations.", "pass_msg": "All checklist items satisfied. No over-engineering or omissions detected.", "fail_phased": "Pipeline phases ({phases}) completed without full convergence.", "fail_simple": "Maximum iterations ({max_iter}) reached without passing all checks.", "escalate_msg": "Human review required. The following issues could not be resolved automatically:", "escalate_title": "Escalation Report", "issue_tracker_title": "Issue Tracker Summary", "issue_tracker_desc": "Issues discovered across iterations and their final resolution status.", "metrics_title": "Review Metrics", "metrics_trend_title": "Metrics Trend", "metrics_iter": "Iter", "metrics_total_issues": "Total Issues", "metrics_na": "N/A", "iteration_details": "Iteration Details", "evidence_summary": "Evidence Summary", "evidence_agent": "Agent", "evidence_exit_code": "Exit Code", "evidence_duration": "Duration", "evidence_output_size": "Output Size", "evidence_transcript": "Execution transcript", }, "ko": { "title": "교차 검증 리포트", "summary": "요약", "prop": "항목", "val": "값", "total_iter": "총 반복 횟수", "final_verdict": "최종 판정", "duration": "소요 시간", "max_iter": "최대 반복", "phases_label": "페이즈", "iteration": "반복", "phase": "페이즈", "steps": "단계", "max_iterations": "최대 반복", "consec_pass": "연속 PASS 필요", "step": "단계", "verdict": "판정", "output_chars": "출력 ({n}자)", "feedback_next": "다음 반복을 위한 피드백:", "oos_title": "범위 밖 이슈", "oos_desc": ( "아래는 기획서/체크리스트 범위 밖이지만 " "리뷰 중 발견된 이슈입니다." ), "final_verdict_title": "최종 판정", "repeat_title": "반복된 Aggregate 이슈", "repeat_desc": "아래 aggregate-review 결과가 여러 반복에서 동일하게 다시 나타났습니다.", "pass_msg": "모든 체크리스트 항목 충족. 과최적화/누락 없음.", "fail_phased": "파이프라인 페이즈 ({phases}) 완료, 완전한 수렴에 도달하지 못함.", "fail_simple": "최대 반복 횟수 ({max_iter})에 도달, 모든 검증을 통과하지 못함.", "escalate_msg": "사람의 확인이 필요합니다. 아래 이슈는 자동으로 해결할 수 없었습니다:", "escalate_title": "에스컬레이션 리포트", "issue_tracker_title": "이슈 트래커 요약", "issue_tracker_desc": "반복 과정에서 발견된 이슈와 최종 처리 상태입니다.", "metrics_title": "리뷰 메트릭", "metrics_trend_title": "메트릭 추이", "metrics_iter": "반복", "metrics_total_issues": "총 이슈", "metrics_na": "해당 없음", "iteration_details": "반복 상세", "evidence_summary": "실행 증거 요약", "evidence_agent": "에이전트", "evidence_exit_code": "종료 코드", "evidence_duration": "소요 시간", "evidence_output_size": "출력 크기", "evidence_transcript": "실행 트랜스크립트", }, } def _t(config: PipelineConfig, key: str, **kwargs: str) -> str: """Get translated string.""" lang = getattr(config, "language", "en") strings = _STRINGS.get(lang, _STRINGS["en"]) s = strings.get(key, _STRINGS["en"].get(key, key)) if kwargs: s = s.format(**kwargs) return s # --------------------------------------------------------------------------- # Review output parsing # --------------------------------------------------------------------------- def parse_review_metrics(output: str) -> ReviewMetrics: """Parse review output to extract severity, category, and assessment counts.""" metrics = ReviewMetrics() # Severity: count tagged issue lines (e.g. "[Critical]", "[Major]", "[Minor]") metrics.critical = len(re.findall(r"\[Critical\]", output, re.IGNORECASE)) metrics.major = len(re.findall(r"\[Major\]", output, re.IGNORECASE)) metrics.minor = len(re.findall(r"\[Minor\]", output, re.IGNORECASE)) # Categories (EN and KO variants) metrics.over_engineering = len(re.findall( r"\[Over-engineering\]|\[과최적화\]", output, re.IGNORECASE, )) metrics.omission = len(re.findall( r"\[Omission\]|\[누락\]", output, re.IGNORECASE, )) # Assessments — match "CONFIRMED: " but not summary "CONFIRMED: N" metrics.confirmed = len(re.findall(r"\bCONFIRMED:\s+(?!\d)", output)) metrics.dismissed = len(re.findall(r"\bDISMISSED\b(?:\s*\([^)]*\))?\s*:\s+(?!\d)", output)) return metrics def _aggregate_metrics(a: ReviewMetrics, b: ReviewMetrics) -> ReviewMetrics: """Combine metrics from two review steps.""" return ReviewMetrics( critical=a.critical + b.critical, major=a.major + b.major, minor=a.minor + b.minor, over_engineering=a.over_engineering + b.over_engineering, omission=a.omission + b.omission, confirmed=a.confirmed + b.confirmed, dismissed=a.dismissed + b.dismissed, ) def _extract_out_of_scope(output: str) -> str: """Extract the 'Out of Scope Issues' section from review output. Looks for '### Out of Scope Issues' or '### 범위 밖 이슈' heading, captures text until the next '###' heading or end of string. Returns empty string if not found or contains only 'None'/'없음'. """ pattern = r"###\s*(?:Out of Scope Issues|범위 밖 이슈)\s*\n(.*?)(?=\n###|\Z)" match = re.search(pattern, output, re.DOTALL) if not match: return "" content = match.group(1).strip() if content.lower() in ("none", "없음", ""): return "" return content def build_report(config: PipelineConfig, result: PipelineResult) -> str: """Build the complete markdown report string.""" has_phases = any(ir.phase_name for ir in result.iterations) if has_phases: return _build_phased_report(config, result) return _build_simple_report(config, result) def _build_simple_report( config: PipelineConfig, result: PipelineResult, ) -> str: """Build report for a non-phased (simple) pipeline run.""" lines: list[str] = [] lines.append(f"# {_t(config, 'title')}\n") _append_summary_table(lines, config, result) out_of_scope_items: list[tuple[int, str]] = [] # Pre-scan iterations to collect out-of-scope items and review metrics # (needed before rendering final verdict / metrics sections) for iter_result in result.iterations: for step in config.pipeline: output = iter_result.step_outputs.get(step.output_key, "") if step.role == "review": oos = _extract_out_of_scope(output) if oos: out_of_scope_items.append((iter_result.iteration, oos)) step_metrics = parse_review_metrics(output) if iter_result.review_metrics is None: iter_result.review_metrics = step_metrics else: iter_result.review_metrics = _aggregate_metrics( iter_result.review_metrics, step_metrics, ) _append_final_verdict(lines, config, result) _append_issue_tracker_summary(lines, config, result) _append_review_metrics_table(lines, config, result) lines.append("---\n") lines.append(f"## {_t(config, 'iteration_details')}\n") for iter_result in result.iterations: lines.append(f"### {_t(config, 'iteration')} {iter_result.iteration}\n") _append_iteration_steps(lines, config, iter_result, config.pipeline, out_of_scope_items, skip_extraction=True) if iter_result.feedback: lines.append(f"**{_t(config, 'feedback_next')}** {iter_result.feedback[:200]}...") lines.append("") _append_out_of_scope(lines, config, out_of_scope_items) _append_repeated_aggregate(lines, config, result) return "\n".join(lines) def _build_phased_report( config: PipelineConfig, result: PipelineResult, ) -> str: """Build report for a phased pipeline run (e.g. review-fix).""" lines: list[str] = [] lines.append(f"# {_t(config, 'title')}\n") _append_summary_table(lines, config, result, phased=True) phase_map = {p.name: p for p in config.phases} out_of_scope_items: list[tuple[int, str]] = [] # Pre-scan iterations to collect out-of-scope items and review metrics for phase_name, phase_iters_iter in groupby( result.iterations, key=lambda ir: ir.phase_name, ): phase_iters = list(phase_iters_iter) phase_config = phase_map.get(phase_name or "") steps = phase_config.steps if phase_config else config.pipeline for iter_result in phase_iters: for step in steps: output = iter_result.step_outputs.get(step.output_key, "") if step.role == "review": oos = _extract_out_of_scope(output) if oos: out_of_scope_items.append((iter_result.iteration, oos)) step_metrics = parse_review_metrics(output) if iter_result.review_metrics is None: iter_result.review_metrics = step_metrics else: iter_result.review_metrics = _aggregate_metrics( iter_result.review_metrics, step_metrics, ) _append_final_verdict(lines, config, result) _append_issue_tracker_summary(lines, config, result) _append_review_metrics_table(lines, config, result) lines.append("---\n") lines.append(f"## {_t(config, 'iteration_details')}\n") for phase_name, phase_iters_iter in groupby( result.iterations, key=lambda ir: ir.phase_name, ): phase_iters = list(phase_iters_iter) phase_config = phase_map.get(phase_name or "") lines.append(f"### {_t(config, 'phase')}: {phase_name}\n") if phase_config: step_desc = " → ".join(s.name for s in phase_config.steps) lines.append( f"{_t(config, 'steps')}: {step_desc} | " f"{_t(config, 'max_iterations')}: {phase_config.max_iterations} | " f"{_t(config, 'consec_pass')}: {phase_config.consecutive_pass}\n" ) steps = phase_config.steps if phase_config else config.pipeline consecutive = 0 for iter_result in phase_iters: verdict_label = "" if iter_result.verdict: if iter_result.verdict == "PASS": consecutive += 1 if phase_config and phase_config.consecutive_pass > 1: verdict_label = f" — PASS ({consecutive}/{phase_config.consecutive_pass})" if consecutive >= phase_config.consecutive_pass: verdict_label += " ✓" else: verdict_label = " — PASS ✓" elif iter_result.verdict == "ESCALATE": consecutive = 0 verdict_label = " — ESCALATE" else: consecutive = 0 verdict_label = " — FAIL" lines.append( f"#### {_t(config, 'iteration')} {iter_result.iteration}{verdict_label}\n" ) _append_iteration_steps(lines, config, iter_result, steps, out_of_scope_items, skip_extraction=True) if iter_result.feedback: lines.append( f"**{_t(config, 'feedback_next')}** {iter_result.feedback[:200]}..." ) lines.append("") _append_out_of_scope(lines, config, out_of_scope_items) _append_repeated_aggregate(lines, config, result) return "\n".join(lines) # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _append_summary_table( lines: list[str], config: PipelineConfig, result: PipelineResult, phased: bool = False, ) -> None: """Append the summary table to lines.""" total_iter = len(result.iterations) minutes = int(result.total_duration // 60) seconds = int(result.total_duration % 60) duration_str = f"{minutes}m {seconds}s" if minutes else f"{seconds}s" lines.append(f"## {_t(config, 'summary')}\n") lines.append(f"| {_t(config, 'prop')} | {_t(config, 'val')} |") lines.append("|----------|-------|") lines.append(f"| {_t(config, 'total_iter')} | {total_iter} |") lines.append(f"| {_t(config, 'final_verdict')} | **{result.final_verdict}** |") lines.append(f"| {_t(config, 'duration')} | {duration_str} |") if phased and config.phases: phase_names = " → ".join(p.name for p in config.phases) lines.append(f"| {_t(config, 'phases_label')} | {phase_names} |") for p in config.phases: lines.append( f"| {_t(config, 'phase')}: {p.name} | " f"{_t(config, 'max_iterations')} {p.max_iterations}, " f"{p.consecutive_pass}x {_t(config, 'consec_pass')} |" ) else: lines.append(f"| {_t(config, 'max_iter')} | {config.max_iterations} |") lines.append("") def _append_iteration_steps( lines: list[str], config: PipelineConfig, iter_result: IterationResult, steps: list[StepConfig], out_of_scope_items: list[tuple[int, str]], *, skip_extraction: bool = False, ) -> None: """Append step details for one iteration. If *skip_extraction* is True, out-of-scope and review-metrics parsing is skipped (useful when a pre-scan already collected that data). """ # Evidence summary table — quick overview of all steps' execution data has_evidence = any( iter_result.step_results.get(s.output_key) for s in steps ) if has_evidence: s_step = _t(config, "step") s_agent = _t(config, "evidence_agent") s_exit = _t(config, "evidence_exit_code") s_dur = _t(config, "evidence_duration") s_size = _t(config, "evidence_output_size") lines.append(f"**{_t(config, 'evidence_summary')}**\n") lines.append(f"| {s_step} | {s_agent} | {s_exit} | {s_dur} | {s_size} |") lines.append("|------|-------|-----------|----------|-------------|") for step in steps: ar = iter_result.step_results.get(step.output_key) out = iter_result.step_outputs.get(step.output_key, "") if ar: lines.append( f"| {step.name} | {ar.agent_name} " f"| {ar.exit_code} | {ar.duration_seconds}s " f"| {len(out)} chars |" ) lines.append("") for step in steps: agent_result = iter_result.step_results.get(step.output_key) output = iter_result.step_outputs.get(step.output_key, "") agent_name = agent_result.agent_name if agent_result else step.agent duration = f" ({agent_result.duration_seconds}s)" if agent_result else "" lines.append(f"### {_t(config, 'step')}: {step.name} ({agent_name}){duration}\n") # Show command preview and exit code for execution evidence if agent_result and agent_result.command_preview: lines.append(f"**Command**: `{agent_result.command_preview}`") lines.append(f"**Exit code**: {agent_result.exit_code}\n") if step.verdict and iter_result.verdict: lines.append(f"**{_t(config, 'verdict')}: {iter_result.verdict}**\n") if len(output) > 500: lines.append("
") lines.append( f"{_t(config, 'output_chars', n=str(len(output)))}\n" ) lines.append(output) lines.append("\n
\n") else: lines.append(output) lines.append("") # Include transcript excerpt for execution evidence visibility if agent_result and agent_result.transcript: transcript_preview = agent_result.transcript[:1500] if len(agent_result.transcript) > 1500: transcript_preview += "\n... (truncated)" transcript_label = _t(config, "evidence_transcript") lines.append("
") lines.append(f"{transcript_label}\n") lines.append(transcript_preview) lines.append("\n
\n") if not skip_extraction and step.role == "review": oos = _extract_out_of_scope(output) if oos: out_of_scope_items.append((iter_result.iteration, oos)) # Parse and accumulate review metrics for this iteration step_metrics = parse_review_metrics(output) if iter_result.review_metrics is None: iter_result.review_metrics = step_metrics else: iter_result.review_metrics = _aggregate_metrics( iter_result.review_metrics, step_metrics, ) def _append_review_metrics_table( lines: list[str], config: PipelineConfig, result: PipelineResult, ) -> None: """Append per-iteration review metrics table and trend summary.""" # Only include if at least one iteration has metrics has_metrics = any(ir.review_metrics for ir in result.iterations) if not has_metrics: return na = _t(config, "metrics_na") lines.append("---\n") lines.append(f"## {_t(config, 'metrics_title')}\n") # Table header lines.append( f"| {_t(config, 'metrics_iter')} | {_t(config, 'verdict')} " f"| Critical | Major | Minor " f"| Over-eng | Omission " f"| CONFIRMED | DISMISSED |" ) lines.append("|------|---------|----------|-------|-------|----------|----------|-----------|-----------|") # Table rows for ir in result.iterations: m = ir.review_metrics v = ir.verdict or "-" if m: lines.append( f"| {ir.iteration} | {v} " f"| {m.critical} | {m.major} | {m.minor} " f"| {m.over_engineering} | {m.omission} " f"| {m.confirmed} | {m.dismissed} |" ) else: lines.append( f"| {ir.iteration} | {v} " f"| {na} | {na} | {na} " f"| {na} | {na} " f"| {na} | {na} |" ) lines.append("") # Trend summary metrics_list = [ (ir.iteration, ir.review_metrics) for ir in result.iterations if ir.review_metrics ] if len(metrics_list) >= 2: lines.append(f"### {_t(config, 'metrics_trend_title')}\n") _append_trend_line( lines, "Issues", [(it, m.critical + m.major + m.minor) for it, m in metrics_list], ) _append_trend_line( lines, "Over-engineering", [(it, m.over_engineering) for it, m in metrics_list], ) _append_trend_line( lines, "Omission", [(it, m.omission) for it, m in metrics_list], ) _append_trend_line( lines, "CONFIRMED", [(it, m.confirmed) for it, m in metrics_list], ) _append_trend_line( lines, "DISMISSED", [(it, m.dismissed) for it, m in metrics_list], ) lines.append("") def _append_trend_line( lines: list[str], label: str, values: list[tuple[int, int]], ) -> None: """Append a single trend line like '- Issues: 6 -> 2 -> 0 (decreasing)'.""" nums = [v for _, v in values] arrow = " → ".join(str(n) for n in nums) if nums[-1] < nums[0]: direction = "decreasing" elif nums[-1] > nums[0]: direction = "increasing" else: direction = "stable" lines.append(f"- {label}: {arrow} ({direction})") def _append_out_of_scope( lines: list[str], config: PipelineConfig, out_of_scope_items: list[tuple[int, str]], ) -> None: """Append the out-of-scope issues section if any exist.""" if not out_of_scope_items: return lines.append("---\n") lines.append(f"## {_t(config, 'oos_title')}\n") lines.append(f"{_t(config, 'oos_desc')}\n") for iteration_num, content in out_of_scope_items: lines.append(f"### {_t(config, 'iteration')} {iteration_num}\n") lines.append(content) lines.append("") def _append_final_verdict( lines: list[str], config: PipelineConfig, result: PipelineResult, ) -> None: """Append the final verdict section.""" lines.append("---\n") lines.append(f"## {_t(config, 'final_verdict_title')}: {result.final_verdict}\n") if result.agentic_branch: lines.append(f"**Agentic branch**: `{result.agentic_branch}`") lines.append(f"```bash\ngit checkout {result.agentic_branch}\n```\n") if result.final_verdict == "PASS": lines.append(_t(config, "pass_msg")) elif result.final_verdict == "ESCALATE": lines.append(_t(config, "escalate_msg")) lines.append("") for issue in result.escalated_issues: lines.append(f"- {issue}") lines.append("") else: if config.phases: phase_names = " → ".join(p.name for p in config.phases) lines.append(_t(config, "fail_phased", phases=phase_names)) else: lines.append( _t(config, "fail_simple", max_iter=str(config.max_iterations)) ) # --------------------------------------------------------------------------- # Issue Tracker extraction from senior/aggregate outputs # --------------------------------------------------------------------------- _ISSUE_TRACKER_PATTERN = re.compile( r"##+ (?:Issue Tracker|이슈 트래커)[^\n]*\n((?:\|[^\n]+\|\n?)+)", re.DOTALL, ) _TRACKER_ROW_PATTERN = re.compile( r"^\|\s*(ISS-\d+)\s*\|\s*(\S+)\s*\|\s*(.*?)\s*\|\s*(\S+)\s*\|\s*(\S+)\s*\|", re.MULTILINE, ) def _extract_issue_tracker_rows( result: PipelineResult, ) -> list[dict[str, str]]: """Extract the latest Issue Tracker table from pipeline results. Scans iteration outputs in reverse to find the most recent tracker table from aggregate/senior review steps. Falls back to parsing individual review outputs for ISS-NNN tagged issues. """ # Try to find a tracker table from the last iteration with one for ir in reversed(result.iterations): for key, output in ir.step_outputs.items(): match = _ISSUE_TRACKER_PATTERN.search(output) if not match: continue table_text = match.group(1) rows = [] for row_match in _TRACKER_ROW_PATTERN.finditer(table_text): rows.append({ "id": row_match.group(1), "severity": row_match.group(2), "description": row_match.group(3).strip(), "status": row_match.group(4), "since": row_match.group(5), }) if rows: return rows # Fallback: parse ISS-NNN from review outputs across iterations seen: dict[str, dict[str, str]] = {} for ir in result.iterations: for key, output in ir.step_outputs.items(): for m in re.finditer( r"(ISS-\d+)\s*\[(\w+)\]\[.*?\]\s*(.*?)(?:\n|$)", output, ): iss_id = m.group(1) if iss_id not in seen: seen[iss_id] = { "id": iss_id, "severity": m.group(2), "description": m.group(3).strip()[:80], "status": "Open", "since": f"v{ir.iteration}", } return list(seen.values()) def _append_issue_tracker_summary( lines: list[str], config: PipelineConfig, result: PipelineResult, ) -> None: """Append a consolidated issue tracker table to the report.""" rows = _extract_issue_tracker_rows(result) if not rows: return lines.append("---\n") lines.append(f"## {_t(config, 'issue_tracker_title')}\n") lines.append(f"{_t(config, 'issue_tracker_desc')}\n") lang = getattr(config, "language", "en") if lang == "ko": lines.append("| ISS-ID | 심각도 | 설명 | 상태 | 최초 발견 |") else: lines.append("| ISS-ID | Severity | Description | Status | Since |") lines.append("|--------|----------|-------------|--------|-------|") for row in rows: lines.append( f"| {row['id']} | {row['severity']} " f"| {row['description']} | {row['status']} | {row['since']} |" ) lines.append("") def print_escalation_report( config: PipelineConfig, result: PipelineResult, ) -> None: """Print a prominent ANSI-colored escalation report to the terminal.""" RED = "\033[31m" YELLOW = "\033[33m" BOLD = "\033[1m" RESET = "\033[0m" title = _t(config, "escalate_title") msg = _t(config, "escalate_msg") print(f"\n{RED}{BOLD}{'=' * 60}") print(f" {title}") print(f"{'=' * 60}{RESET}\n") print(f"{YELLOW}{msg}{RESET}\n") for issue in result.escalated_issues: print(f" {RED}•{RESET} {issue}") print(f"\n{RED}{BOLD}{'=' * 60}{RESET}\n") def _append_repeated_aggregate( lines: list[str], config: PipelineConfig, result: PipelineResult, ) -> None: """Append repeated aggregate warnings if any exist.""" if not result.repeated_aggregate_warnings: return lines.append("---\n") lines.append(f"## {_t(config, 'repeat_title')}\n") lines.append(f"{_t(config, 'repeat_desc')}\n") for warning in result.repeated_aggregate_warnings: lines.append(f"- {warning}") lines.append("")