diff --git a/my-deepagent/CHANGELOG.md b/my-deepagent/CHANGELOG.md index c76e3f1..c3ebee8 100644 --- a/my-deepagent/CHANGELOG.md +++ b/my-deepagent/CHANGELOG.md @@ -3,6 +3,42 @@ ## [Unreleased] ### Added +- **v0.3 PR #2 — context compaction (auto + manual `/compact`)**. + Claude Code의 auto-compact + `/compact` 슬래시 등가. 세션 누적 토큰이 + 활성 모델 윈도우의 70%를 넘으면 자동으로 가장 오래된 비-system, 비-archived + 메시지를 cheap 모델(`openrouter:deepseek/deepseek-chat` 기본)로 1회 요약 → + `MessageRow(is_summary=True, role=system)` 1줄 삽입 + 원본은 archive. + LangGraph thread는 `thread_suffix` bump로 새 컨텍스트 시작 (재인입 비용 회피). + - `monitoring/token_budget.py` (신규): `tiktoken cl100k_base`로 추정. + `MODEL_CONTEXT_LIMITS` 모델별 윈도우 (DeepSeek 64k, Claude Sonnet/Haiku/Opus + 200k, GPT-4o 128k). 미등록 모델은 32k 기본값 — 보수적으로 compaction + 조기 트리거. `COMPACTION_THRESHOLD = 0.7` 상수. `count_tokens()`는 빈 + 문자열·예외 모두 안전 (실패 시 char/4 fallback). + - `compaction.py` (신규): `should_compact()` / `compact_session()` / + `CompactionResult`. `_SESSION_LOCKS: dict[str, asyncio.Lock]`로 + 세션별 직렬화 — 동시 compaction 호출 시 두 번째는 첫 번째 종료를 기다림. + `KEEP_RECENT_K = 10`, `MIN_COMPACTABLE = 4`. LLM 호출은 DB session + 바깥에서 (asyncpg connection 점유 회피). archived rows는 negative seq + band (`-(original.seq + 1)`)로 옮겨 summary가 `to_compact[0].seq` + 자리에 자연스럽게 들어감 (UNIQUE constraint 충돌 회피). + - `cli/interactive.py`: + - `_approx_token_count`를 tiktoken-based로 교체 (이전: 단순 `len // 4`). + - 매 ainvoke 후 `should_compact(session_row)` 체크 → 임계 초과 시 자동 + `compact_session()` 호출 → 성공 시 `clear_agent_cache()`로 thread bump. + 한 줄 stdout 알림 (`context compacted — N messages archived, summary K tokens`). + - `/compact` 슬래시 등록 (`_register_compaction_slash`). 수동 강제 compaction. + 충분한 메시지가 없으면 (`< MIN_COMPACTABLE`) 사유 출력. + - `tests/integration/test_compaction.py` (신규, 7 케이스): + 1. `should_compact` 70% 임계 아래/위/미등록 모델 분기 (3개) + 2. `MIN_COMPACTABLE` 미만이면 LLM 호출 없이 거부 (stub-call 카운트 검증) + 3. Happy path: 14개 메시지 → 4개 archive(negative seq) + summary at seq=1 + + 10개 live 유지 + 토큰 카운터 산술 (1000 - 4*20 + summary_tokens) 검증 + 4. 동일 `session_id`에 동시 호출 2개 → Lock 직렬화 (LLM 호출 윈도우 겹침 + 없음 또는 두 번째 short-circuit) 검증 + 5. 없는 `session_id` → `session_not_found` + - `pyproject.toml`: `tiktoken>=0.7` 명시 (이전엔 langchain-openai 경유 + transitive였음 — 직접 의존 표시). + - **v0.3 PR #1 — interactive session persistence + LangGraph saver wiring**. v0.3의 토대. REPL/GUI 모두 장기 대화 영속 가능하도록 데이터 모델·CLI·HTTP API를 함께 도입. Claude Code의 `claude --resume` 등가. diff --git a/my-deepagent/pyproject.toml b/my-deepagent/pyproject.toml index 3d9157e..473df74 100644 --- a/my-deepagent/pyproject.toml +++ b/my-deepagent/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "pyyaml>=6.0", "rich>=13.9", "structlog>=24.4", + "tiktoken>=0.7", "typer>=0.14", "zstandard>=0.23", "deepagents>=0.6.1,<0.7.0", diff --git a/my-deepagent/src/my_deepagent/cli/interactive.py b/my-deepagent/src/my_deepagent/cli/interactive.py index db203f2..c6eb482 100644 --- a/my-deepagent/src/my_deepagent/cli/interactive.py +++ b/my-deepagent/src/my_deepagent/cli/interactive.py @@ -33,11 +33,13 @@ from sqlalchemy import desc, select from ..audit import make_audit_recorder from ..budget import make_budget_tracker_from_config +from ..compaction import compact_session, should_compact from ..config import Config, load_config from ..governance import require_consent from ..middleware.audit import AuditToolMiddleware from ..middleware.cost import CostMiddleware from ..monitoring.pricing import ModelPrice, PricingCache +from ..monitoring.token_budget import count_tokens from ..persistence.checkpointer import get_checkpointer_ctx from ..persistence.db import Database from ..persistence.models import InteractiveSessionRow, MessageRow @@ -462,10 +464,29 @@ def _register_telemetry_slash(reg: SlashRegistry) -> None: reg.register("sessions", _sessions, help="list recent interactive sessions") +def _register_compaction_slash(reg: SlashRegistry, sess: InteractiveSession) -> None: + """Register /compact slash handler (v0.3 PR #2).""" + + async def _compact(_: SlashParsed) -> bool: + result = await compact_session(sess.db, sess.config, str(sess.session_id)) + if result.compacted: + sess.clear_agent_cache() + _CONSOLE.print( + f"[green]compacted[/] — {result.archived} messages archived, " + f"summary {result.summary_tokens} tokens (new thread started)" + ) + else: + _CONSOLE.print(f"[yellow]compaction skipped:[/] {result.reason}") + return False + + reg.register("compact", _compact, help="manually compact the conversation history") + + def _register_slash(reg: SlashRegistry, sess: InteractiveSession) -> None: _register_navigation_slash(reg, sess) _register_persona_slash(reg, sess) _register_telemetry_slash(reg) + _register_compaction_slash(reg, sess) def _completer(personas: list[Persona], slash_names: list[str]) -> WordCompleter: @@ -474,14 +495,14 @@ def _completer(personas: list[Persona], slash_names: list[str]) -> WordCompleter return WordCompleter(words, ignore_case=True, sentence=True) -def _approx_token_count(text: str) -> int: - """Conservative char-based token estimate (PR #1 placeholder). +def _approx_token_count(text: str, model: str = "") -> int: + """Token count via tiktoken (PR #2). - PR #2 swaps this for tiktoken with model-aware tokenizer selection. - 1 token ≈ 4 chars is the cl100k_base rule of thumb for English; mixed - Korean text trends higher tokens/char, so we round up. + Falls back to a char-based heuristic inside `count_tokens` on tiktoken + failure. Caller passes the active model so future model-specific + tokenizers slot in without changing the call site. """ - return max(0, (len(text) + 3) // 4) + return count_tokens(text, model) async def _invoke_and_stream( @@ -496,7 +517,7 @@ async def _invoke_and_stream( sess.session_id, "user", user_text, - token_count=_approx_token_count(user_text), + token_count=_approx_token_count(user_text, sess.active_model), ) # 2. Invoke the agent. LangGraph thread_id includes the suffix so /model @@ -528,9 +549,23 @@ async def _invoke_and_stream( sess.session_id, "assistant", content_str, - token_count=_approx_token_count(content_str), + token_count=_approx_token_count(content_str, sess.active_model), ) + # 4. Auto-compaction check. Triggered when total used tokens cross 70% + # of the active model's context window. Holds a per-session lock so + # concurrent turns serialise; failure is non-fatal (next turn retries). + async with sess.db.session() as s: + session_row = await s.get(InteractiveSessionRow, str(sess.session_id)) + if session_row is not None and should_compact(session_row): + result = await compact_session(sess.db, sess.config, str(sess.session_id)) + if result.compacted: + sess.clear_agent_cache() # bumps thread_suffix → fresh deepagents thread + _CONSOLE.print( + f"[dim]context compacted — {result.archived} messages archived, " + f"summary {result.summary_tokens} tokens, new thread[/]" + ) + async def _repl_loop( sess: InteractiveSession, diff --git a/my-deepagent/src/my_deepagent/compaction.py b/my-deepagent/src/my_deepagent/compaction.py new file mode 100644 index 0000000..11a0fee --- /dev/null +++ b/my-deepagent/src/my_deepagent/compaction.py @@ -0,0 +1,293 @@ +"""Context compaction (v0.3 PR #2). + +When `total_input_tokens + total_output_tokens` reaches ~70% of the active +model's context window, we summarise the oldest non-system, non-archived +messages with a cheap model and insert a single `MessageRow(is_summary=True)` +in their place. Original messages are marked `archived=True` (they stay in +the DB and can be inspected via `sessions show --all`). + +The session's LangGraph thread is also rolled forward (caller bumps +`InteractiveSession._thread_suffix`) so the next `ainvoke` starts a clean +state with only the summary + recent K messages. +""" + +from __future__ import annotations + +import asyncio +import logging +from datetime import UTC, datetime + +from langchain_openai import ChatOpenAI +from sqlalchemy import select, update +from sqlalchemy.ext.asyncio import AsyncSession + +from .config import Config +from .monitoring.token_budget import ( + count_tokens, + is_over_threshold, + model_context_limit, +) +from .persistence.db import Database +from .persistence.models import InteractiveSessionRow, MessageRow +from .secrets import resolve_openrouter_api_key + +_LOG = logging.getLogger(__name__) + +#: Number of recent non-archived messages kept verbatim during compaction. +KEEP_RECENT_K = 10 + +#: Minimum number of compactable messages required for `/compact`. +MIN_COMPACTABLE = 4 + +#: One concurrent compaction per session_id. +_SESSION_LOCKS: dict[str, asyncio.Lock] = {} + + +def _now_iso() -> str: + return datetime.now(UTC).isoformat(timespec="seconds") + + +def _session_lock(session_id: str) -> asyncio.Lock: + lock = _SESSION_LOCKS.get(session_id) + if lock is None: + lock = asyncio.Lock() + _SESSION_LOCKS[session_id] = lock + return lock + + +class CompactionResult: + """Outcome of a compaction call. Read-only by convention.""" + + def __init__( + self, + *, + compacted: bool, + archived: int = 0, + summary_tokens: int = 0, + reason: str = "", + ) -> None: + self.compacted = compacted + self.archived = archived + self.summary_tokens = summary_tokens + self.reason = reason + + def __repr__(self) -> str: + return ( + f"" + ) + + +def should_compact(session_row: InteractiveSessionRow) -> bool: + """True when total used tokens >= 70% of the active model's window.""" + used = (session_row.total_input_tokens or 0) + (session_row.total_output_tokens or 0) + model = session_row.model or "" + return is_over_threshold(used, model) + + +async def _collect_messages_for_compaction( + s: AsyncSession, session_id: str +) -> tuple[list[MessageRow], list[MessageRow]]: + """Return (to_compact, to_keep) for the session. + + Strategy: + - Skip all `is_summary` and `archived` rows. + - Skip `role=system` rows (they are MYDEEPAGENT.md / memory / skills + injections — owned by the next ainvoke, not by compaction). + - Keep the last KEEP_RECENT_K non-system, non-archived, non-summary + messages verbatim. + - Everything older than that is to_compact. + """ + rows = ( + ( + await s.execute( + select(MessageRow) + .where(MessageRow.session_id == session_id) + .where(MessageRow.archived.is_(False)) + .where(MessageRow.is_summary.is_(False)) + .where(MessageRow.role != "system") + .order_by(MessageRow.seq) + ) + ) + .scalars() + .all() + ) + if len(rows) <= KEEP_RECENT_K: + return ([], list(rows)) + cutoff = len(rows) - KEEP_RECENT_K + return (list(rows[:cutoff]), list(rows[cutoff:])) + + +def _format_for_summary(messages: list[MessageRow]) -> str: + """Render messages as a compact transcript the summariser can consume.""" + lines: list[str] = [] + for m in messages: + role = m.role.upper() + content = (m.content or "").strip() + lines.append(f"[{m.seq}] {role}: {content}") + return "\n\n".join(lines) + + +async def _run_summary_llm(config: Config, model: str, transcript: str) -> str: + """Single LLM call producing a compact paragraph-level summary.""" + api_key = resolve_openrouter_api_key(config) + llm = ChatOpenAI( + model=model.removeprefix("openrouter:"), + base_url=config.openrouter_base_url, + api_key=api_key, + timeout=60.0, + max_completion_tokens=600, + ) + system_prompt = ( + "You are compressing an interactive conversation history for a developer " + "agent. Produce a single concise summary (Korean if the conversation is " + "Korean, otherwise English) that captures: (1) the user's intent and " + "any key decisions, (2) artefacts mentioned by name/path, (3) any " + "open questions or pending follow-ups. Aim for ≤ 300 tokens. Do NOT " + "invent details that are not in the transcript." + ) + response = await llm.ainvoke( + [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": f"Transcript:\n\n{transcript}"}, + ] + ) + content = response.content + if isinstance(content, list): + content = "\n".join( + (c.get("text", str(c)) if isinstance(c, dict) else str(c)) for c in content + ) + return str(content).strip() + + +async def compact_session( + db: Database, + config: Config, + session_id: str, + *, + summary_model: str | None = None, +) -> CompactionResult: + """Compact the oldest portion of a session's history. Idempotent under lock. + + The caller is responsible for bumping the LangGraph thread_id (e.g. + `InteractiveSession.clear_agent_cache()` style) AFTER this returns so the + next ainvoke starts fresh. + """ + lock = _session_lock(session_id) + async with lock: + async with db.session() as s: + row = await s.get(InteractiveSessionRow, session_id) + if row is None: + return CompactionResult(compacted=False, reason="session_not_found") + + to_compact, _to_keep = await _collect_messages_for_compaction(s, session_id) + if len(to_compact) < MIN_COMPACTABLE: + return CompactionResult( + compacted=False, + reason=f"insufficient_messages ({len(to_compact)} < {MIN_COMPACTABLE})", + ) + transcript = _format_for_summary(to_compact) + active_model = row.model or "" + model_for_summary = summary_model or "openrouter:deepseek/deepseek-chat" + + # The LLM call lives OUTSIDE the DB session — it can take seconds and + # we don't want to hold an asyncpg connection during the wait. + try: + summary_text = await _run_summary_llm(config, model_for_summary, transcript) + except Exception as e: + _LOG.exception("compaction summariser failed for session %s", session_id) + return CompactionResult(compacted=False, reason=f"summariser_failed:{e}") + + if not summary_text: + return CompactionResult(compacted=False, reason="empty_summary") + + summary_tokens = count_tokens(summary_text, active_model) + + async with db.session() as s: + # Re-fetch under fresh session in case state changed during LLM call. + row = await s.get(InteractiveSessionRow, session_id) + if row is None: + return CompactionResult(compacted=False, reason="session_disappeared") + + # Insert the summary as a new MessageRow before the kept tail. + # - seq: 1 + max(seq of to_compact) → places summary just before + # the first kept message in the ordering, which is what readers + # want for "render history in seq order". + # - role: "system" + is_summary=True so the next ainvoke includes + # it and the GUI can render it distinctly. + # + # Then archive the originals in one UPDATE. + archive_ids = [m.id for m in to_compact] + archived_token_total = sum(int(m.token_count or 0) for m in to_compact) + now = _now_iso() + + # Summary seq = smallest archived seq. We can't pick a fractional + # position (INTEGER seq) and shifting every kept row by +1 is too + # expensive. Instead we archive originals into the negative-seq + # band (see below) which frees their positive seqs; then the + # summary at to_compact[0].seq slots naturally in front of the + # kept tail when readers `ORDER BY seq ASC WHERE archived=False`. + summary_seq = to_compact[0].seq + s.add( + MessageRow( + session_id=session_id, + seq=summary_seq, + role="system", + content=summary_text, + tool_calls=None, + token_count=summary_tokens, + is_summary=True, + archived=False, + ts=now, + ) + ) + + # Mark originals archived — bump their seq into a high band so the + # UNIQUE (session_id, seq) constraint doesn't trip with the new + # summary at summary_seq. We shift archived rows to negative seq + # space (id-based) to keep them addressable but out of the way. + # + # Postgres lets us do this in one UPDATE with FROM clause; SQLite + # doesn't, so do it per-row for portability. + for original in to_compact: + # Use a negative offset based on original.seq so they remain + # ordered relative to each other. + new_seq = -(original.seq + 1) + await s.execute( + update(MessageRow) + .where(MessageRow.id == original.id) + .values(archived=True, seq=new_seq) + ) + + # Update aggregate token counters on the session row. Subtract + # archived contribution and add the summary tokens (assigned to + # system → input bucket). + row.total_input_tokens = max( + 0, + int(row.total_input_tokens or 0) - archived_token_total + summary_tokens, + ) + row.last_message_at = now + + await s.commit() + + _LOG.info( + "compacted session=%s archived=%d summary_tokens=%d", + session_id, + len(archive_ids), + summary_tokens, + ) + return CompactionResult( + compacted=True, + archived=len(archive_ids), + summary_tokens=summary_tokens, + reason="ok", + ) + + +def context_usage_fraction(session_row: InteractiveSessionRow) -> float: + """Return current used_tokens / context_limit (0.0 if no model set).""" + used = (session_row.total_input_tokens or 0) + (session_row.total_output_tokens or 0) + limit = model_context_limit(session_row.model or "") + if limit <= 0: + return 0.0 + return used / limit diff --git a/my-deepagent/src/my_deepagent/monitoring/token_budget.py b/my-deepagent/src/my_deepagent/monitoring/token_budget.py new file mode 100644 index 0000000..3c41e14 --- /dev/null +++ b/my-deepagent/src/my_deepagent/monitoring/token_budget.py @@ -0,0 +1,86 @@ +"""Token counting + context-window lookup (v0.3 PR #2). + +We can't rely on `LlmCallRow.input_tokens` / `output_tokens` because OpenRouter +sometimes forwards responses with empty `usage_metadata` (v0.1 known limit). +Instead we use `tiktoken` with `cl100k_base` as a conservative estimator — +it overcounts slightly for Korean text (which is what we want for a +compaction threshold check). +""" + +from __future__ import annotations + +import functools +from typing import Final + +import tiktoken + +# --------------------------------------------------------------------------- +# Model context windows. Keys match the form persisted in +# `InteractiveSessionRow.model` (e.g. "openrouter:deepseek/deepseek-chat"). +# When a model is not in the table, we fall back to `_DEFAULT_LIMIT` and log +# nothing — compaction triggers conservatively at 70%. +# --------------------------------------------------------------------------- + +_DEFAULT_LIMIT: Final[int] = 32_000 + +MODEL_CONTEXT_LIMITS: Final[dict[str, int]] = { + # OpenRouter — DeepSeek + "openrouter:deepseek/deepseek-chat": 64_000, + "deepseek/deepseek-chat": 64_000, + # OpenRouter — Anthropic + "openrouter:anthropic/claude-sonnet-4-6": 200_000, + "openrouter:anthropic/claude-haiku-4-5": 200_000, + "openrouter:anthropic/claude-opus-4-1": 200_000, + "anthropic/claude-sonnet-4-6": 200_000, + "anthropic/claude-haiku-4-5": 200_000, + "anthropic/claude-opus-4-1": 200_000, + # OpenRouter — OpenAI (for parity testing) + "openrouter:openai/gpt-4o": 128_000, + "openrouter:openai/gpt-4o-mini": 128_000, +} + +# Compaction triggers when used tokens cross this fraction of the window. +COMPACTION_THRESHOLD: Final[float] = 0.7 + + +def model_context_limit(model: str) -> int: + """Return the context window size (tokens) for the given model id. + + Unknown models return `_DEFAULT_LIMIT` (32 000) so compaction still kicks + in eventually — better than letting context grow unbounded. + """ + return MODEL_CONTEXT_LIMITS.get(model, _DEFAULT_LIMIT) + + +@functools.lru_cache(maxsize=8) +def _encoding_for_model(model: str) -> tiktoken.Encoding: + """Best-effort tokenizer per model id. + + tiktoken doesn't ship encodings for DeepSeek or recent Anthropic models. + `cl100k_base` (GPT-4 / GPT-3.5 turbo) is a reasonable upper-bound + estimator across modern models: it overcounts Korean by ~1.2x but that + only makes the 70% threshold trigger slightly earlier, which is the + conservative direction we want. + """ + return tiktoken.get_encoding("cl100k_base") + + +def count_tokens(text: str, model: str = "") -> int: + """Return tiktoken-estimated token count for `text` under `model`. + + Always returns >= 0. Empty string → 0. An exception inside tiktoken + falls back to a char-based heuristic (1 token ≈ 4 chars) so callers + don't have to guard. + """ + if not text: + return 0 + try: + enc = _encoding_for_model(model) + return len(enc.encode(text, disallowed_special=())) + except Exception: + return max(1, (len(text) + 3) // 4) + + +def is_over_threshold(used_tokens: int, model: str) -> bool: + """True if `used_tokens` >= context_limit(model) * COMPACTION_THRESHOLD.""" + return used_tokens >= int(model_context_limit(model) * COMPACTION_THRESHOLD) diff --git a/my-deepagent/tests/integration/test_compaction.py b/my-deepagent/tests/integration/test_compaction.py new file mode 100644 index 0000000..dace16c --- /dev/null +++ b/my-deepagent/tests/integration/test_compaction.py @@ -0,0 +1,351 @@ +"""v0.3 PR #2 — Context compaction tests. + +4 scenarios from the plan: +1. Manual `/compact` (via compact_session()) — happy path: inserts summary, + archives originals to negative seq band, bumps token counters. +2. should_compact() threshold logic: under 70% → False, over 70% → True. +3. Insufficient messages (< MIN_COMPACTABLE) → CompactionResult(compacted=False). +4. Per-session asyncio.Lock serializes concurrent compactions — second caller + waits for first to release. + +All scenarios stub the summariser LLM (no OpenRouter calls). The DB layer is +exercised end-to-end via aiosqlite tmp_path. +""" + +from __future__ import annotations + +import asyncio +import uuid +from collections.abc import AsyncIterator +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +import pytest +from sqlalchemy import select + +from my_deepagent import compaction as compaction_mod +from my_deepagent.compaction import ( + KEEP_RECENT_K, + MIN_COMPACTABLE, + CompactionResult, + compact_session, + should_compact, +) +from my_deepagent.config import load_config +from my_deepagent.persistence.db import Database +from my_deepagent.persistence.models import ( + AgentPersonaRow, + InteractiveSessionRow, + MessageRow, +) + + +def _now() -> str: + return datetime.now(UTC).isoformat(timespec="seconds") + + +@pytest.fixture +async def db_with_session(tmp_path: Path) -> AsyncIterator[tuple[Database, str, Any]]: + """Yield (db, session_id, config) with one persona + one interactive session. + + Caller is responsible for seeding MessageRow rows. + """ + db_url = f"sqlite+aiosqlite:///{tmp_path / 'compact.sqlite3'}" + cfg = load_config( + workspace_root=tmp_path, + data_dir=tmp_path / "data", + database_url=db_url, + ) + db = Database(db_url) + await db.init_schema() + + persona_id = str(uuid.uuid4()) + session_id = str(uuid.uuid4()) + async with db.session() as s: + s.add( + AgentPersonaRow( + id=persona_id, + name="test-persona", + version=1, + hash="hash-test", + definition={"name": "test-persona", "version": 1}, + created_at=_now(), + ) + ) + s.add( + InteractiveSessionRow( + id=session_id, + persona_id=persona_id, + persona_hash="hash-test", + started_at=_now(), + last_message_at=_now(), + state="active", + total_input_tokens=0, + total_output_tokens=0, + model="openrouter:deepseek/deepseek-chat", + project_key="testproj0000abcd", + title="test session", + plan_mode=False, + parent_session_id=None, + depth=0, + ) + ) + await s.commit() + + try: + yield (db, session_id, cfg) + finally: + await db.dispose() + + +async def _seed_messages( + db: Database, session_id: str, n: int, *, start_seq: int = 1, role_alternation: bool = True +) -> None: + """Insert n non-system, non-archived, non-summary messages.""" + async with db.session() as s: + for i in range(n): + role = "user" if (role_alternation and i % 2 == 0) else "assistant" + s.add( + MessageRow( + session_id=session_id, + seq=start_seq + i, + role=role, + content=f"message {start_seq + i} body text repeated to add tokens", + tool_calls=None, + token_count=20, + is_summary=False, + archived=False, + ts=_now(), + ) + ) + await s.commit() + + +# --------------------------------------------------------------------------- +# Scenario 2: should_compact threshold logic +# --------------------------------------------------------------------------- + + +def test_should_compact_below_threshold() -> None: + row = InteractiveSessionRow( + id="x", + persona_id="p", + persona_hash="h", + state="active", + total_input_tokens=10_000, + total_output_tokens=10_000, + model="openrouter:deepseek/deepseek-chat", # 64k window → 70% = 44_800 + ) + assert should_compact(row) is False + + +def test_should_compact_at_threshold() -> None: + row = InteractiveSessionRow( + id="x", + persona_id="p", + persona_hash="h", + state="active", + total_input_tokens=40_000, + total_output_tokens=10_000, # 50_000 > 44_800 + model="openrouter:deepseek/deepseek-chat", + ) + assert should_compact(row) is True + + +def test_should_compact_unknown_model_uses_default_limit() -> None: + # Default 32_000 → 70% = 22_400. + row = InteractiveSessionRow( + id="x", + persona_id="p", + persona_hash="h", + state="active", + total_input_tokens=20_000, + total_output_tokens=3_000, # 23_000 > 22_400 + model="some-unknown/model", + ) + assert should_compact(row) is True + + +# --------------------------------------------------------------------------- +# Scenario 3: insufficient messages → no-op +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_compact_session_rejects_insufficient_messages( + db_with_session: tuple[Database, str, Any], + monkeypatch: pytest.MonkeyPatch, +) -> None: + db, sid, cfg = db_with_session + # Seed MIN_COMPACTABLE + KEEP_RECENT_K - 1 messages so to_compact is short. + await _seed_messages(db, sid, n=KEEP_RECENT_K + MIN_COMPACTABLE - 1) + + # Stub the summariser so an accidental call would still pass — but assert it's + # never invoked (length gate triggers before the LLM call). + called = {"n": 0} + + async def fake_summary(*_a: Any, **_k: Any) -> str: + called["n"] += 1 + return "should-not-be-called" + + monkeypatch.setattr(compaction_mod, "_run_summary_llm", fake_summary) + + result = await compact_session(db, cfg, sid) + assert result.compacted is False + assert "insufficient_messages" in result.reason + assert called["n"] == 0 + + +# --------------------------------------------------------------------------- +# Scenario 1: happy path — summary inserted, originals archived to negative seq +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_compact_session_happy_path( + db_with_session: tuple[Database, str, Any], + monkeypatch: pytest.MonkeyPatch, +) -> None: + db, sid, cfg = db_with_session + # 14 messages: oldest 4 should be compacted (14 - KEEP_RECENT_K(10) = 4). + await _seed_messages(db, sid, n=14) + + async def fake_summary(*_a: Any, **_k: Any) -> str: + return "요약: 사용자가 wordcount CLI를 만들고 있고 일부 코드를 작성했습니다." + + monkeypatch.setattr(compaction_mod, "_run_summary_llm", fake_summary) + + # Pre-condition: input tokens currently 0 on the row; bump to a non-zero so we + # can verify the subtract-archived-add-summary arithmetic. + async with db.session() as s: + row = await s.get(InteractiveSessionRow, sid) + assert row is not None + row.total_input_tokens = 1000 # arbitrary baseline + await s.commit() + + result = await compact_session(db, cfg, sid) + assert result.compacted is True, f"got {result!r}" + assert result.archived == 4 + assert result.summary_tokens > 0 + + async with db.session() as s: + # The 4 archived messages should now be at negative seq and archived=True. + archived_rows = ( + ( + await s.execute( + select(MessageRow) + .where(MessageRow.session_id == sid) + .where(MessageRow.archived.is_(True)) + .order_by(MessageRow.seq) + ) + ) + .scalars() + .all() + ) + assert len(archived_rows) == 4 + for r in archived_rows: + assert r.seq < 0 + assert r.archived is True + assert r.is_summary is False + + # Exactly one new summary row, role=system, is_summary=True, archived=False. + summary_rows = ( + ( + await s.execute( + select(MessageRow) + .where(MessageRow.session_id == sid) + .where(MessageRow.is_summary.is_(True)) + ) + ) + .scalars() + .all() + ) + assert len(summary_rows) == 1 + summary_row = summary_rows[0] + assert summary_row.role == "system" + assert summary_row.archived is False + assert summary_row.seq == 1 # smallest of the original to_compact seqs + + # The 10 most recent messages remain non-archived at their original seqs. + live_rows = ( + ( + await s.execute( + select(MessageRow) + .where(MessageRow.session_id == sid) + .where(MessageRow.archived.is_(False)) + .where(MessageRow.is_summary.is_(False)) + .order_by(MessageRow.seq) + ) + ) + .scalars() + .all() + ) + assert len(live_rows) == KEEP_RECENT_K + assert [r.seq for r in live_rows] == list(range(5, 15)) + + # Token counter arithmetic: 1000 - (4*20) + summary_tokens. + sess = await s.get(InteractiveSessionRow, sid) + assert sess is not None + assert sess.total_input_tokens == 1000 - 80 + result.summary_tokens + + +# --------------------------------------------------------------------------- +# Scenario 4: per-session Lock serializes concurrent compactions +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_compact_session_lock_serializes_concurrent_calls( + db_with_session: tuple[Database, str, Any], + monkeypatch: pytest.MonkeyPatch, +) -> None: + db, sid, cfg = db_with_session + await _seed_messages(db, sid, n=14) + + # Slow summariser: lets us observe lock serialization (second caller starts + # only after the first finishes). + call_starts: list[float] = [] + call_ends: list[float] = [] + + async def slow_summary(*_a: Any, **_k: Any) -> str: + call_starts.append(asyncio.get_event_loop().time()) + await asyncio.sleep(0.25) + call_ends.append(asyncio.get_event_loop().time()) + return "요약 ok" + + monkeypatch.setattr(compaction_mod, "_run_summary_llm", slow_summary) + + # Two concurrent compactions on the same session_id. + r1, r2 = await asyncio.gather( + compact_session(db, cfg, sid), + compact_session(db, cfg, sid), + ) + + # First call should compact; second call sees no compactable messages left. + compacted_count = sum(1 for r in (r1, r2) for _ in [r] if r.compacted) + assert compacted_count == 1, f"expected exactly 1 compaction, got r1={r1!r} r2={r2!r}" + + # If the lock works, the slow_summary was either called once (second caller + # short-circuits on length gate) or twice with non-overlapping windows. + if len(call_starts) == 2: + # Second LLM call should start after first finishes. + assert call_starts[1] >= call_ends[0], "lock failed to serialize summariser calls" + + +@pytest.mark.asyncio +async def test_compact_session_missing_session_returns_not_found( + db_with_session: tuple[Database, str, Any], + monkeypatch: pytest.MonkeyPatch, +) -> None: + db, _sid, cfg = db_with_session + bogus = str(uuid.uuid4()) + + async def fake_summary(*_a: Any, **_k: Any) -> str: + return "should-not-be-called" + + monkeypatch.setattr(compaction_mod, "_run_summary_llm", fake_summary) + + result: CompactionResult = await compact_session(db, cfg, bogus) + assert result.compacted is False + assert result.reason == "session_not_found" diff --git a/my-deepagent/uv.lock b/my-deepagent/uv.lock index 7b5bc7d..b5e2e60 100644 --- a/my-deepagent/uv.lock +++ b/my-deepagent/uv.lock @@ -1207,6 +1207,7 @@ dependencies = [ { name = "sqlalchemy", extra = ["asyncio"] }, { name = "sse-starlette" }, { name = "structlog" }, + { name = "tiktoken" }, { name = "typer" }, { name = "uvicorn", extra = ["standard"] }, { name = "zstandard" }, @@ -1253,6 +1254,7 @@ requires-dist = [ { name = "sqlalchemy", extras = ["asyncio"], specifier = ">=2.0" }, { name = "sse-starlette", specifier = ">=2.1" }, { name = "structlog", specifier = ">=24.4" }, + { name = "tiktoken", specifier = ">=0.7" }, { name = "typer", specifier = ">=0.14" }, { name = "uvicorn", extras = ["standard"], specifier = ">=0.30" }, { name = "zstandard", specifier = ">=0.23" },