"""Sub-agent session linkage + runner (v0.3 PR #6). PR #1 already added `parent_session_id` + `depth` columns to `InteractiveSessionRow`. This module provides: - :func:`spawn_subagent_session` — atomically creates a child row inheriting ``project_key`` from the parent, sets ``parent_session_id`` + ``depth = parent.depth + 1``, rejects when depth would exceed :data:`MAX_SUBAGENT_DEPTH`. - :func:`list_subagents` — direct children for ``/agents`` listings. - :func:`resolve_root_session_id` — walk the parent chain to find the root. - :func:`run_subagent_to_completion` — actually invoke a sub-agent's ``ainvoke`` with isolation + LangGraph thread + summary push to parent. Cost rollup: each sub-agent's CostMiddleware is wired with the ROOT session id so all LLM calls under that session tree charge a single ``session:`` scope — matches the plan ("sub-agent는 root session의 한도에 합산"). """ from __future__ import annotations import logging from collections.abc import Sequence from datetime import UTC, datetime from typing import Any from uuid import UUID, uuid4 from sqlalchemy import desc, select from .audit import make_audit_recorder from .budget import make_budget_tracker_from_config from .compaction import compact_session from .config import Config from .errors import MyDeepAgentError from .memory import ( ensure_memory_initialized, global_memory_dir, list_memory_paths, project_memory_dir, ) from .middleware.audit import AuditToolMiddleware from .middleware.cost import CostMiddleware from .middleware.plan_mode import PlanModeMiddleware from .monitoring.pricing import ModelPrice, PricingCache from .monitoring.token_budget import count_tokens from .persistence.db import Database from .persistence.models import AgentPersonaRow, InteractiveSessionRow, MessageRow from .persona import Persona from .session import build_agent from .skills import ( ensure_skills_initialized, project_skills_dir, resolve_skill_sources, user_skills_dir, ) _LOG = logging.getLogger(__name__) #: Maximum sub-agent nesting depth. Above this we refuse to spawn — Claude #: Code's `task` tool limits agent stacks to roughly 3 levels (Main → A → B) #: to keep budgets and audit trails legible. MAX_SUBAGENT_DEPTH: int = 3 def _now_iso() -> str: return datetime.now(UTC).isoformat(timespec="seconds") async def spawn_subagent_session( db: Database, *, parent_session_id: UUID, persona: Persona, initial_title: str | None = None, ) -> UUID: """Create a child :class:`InteractiveSessionRow` linked to ``parent_session_id``. The child inherits ``project_key`` from the parent — same memory dir, same skill dir. ``depth`` is incremented by 1; if that would exceed :data:`MAX_SUBAGENT_DEPTH` we raise ``MyDeepAgentError(human_required)`` so the caller (REPL slash / API endpoint) can surface a clean message. The persona may be different from the parent's (callers often want a specialised role for the child), so ``persona`` is required. We upsert its ``AgentPersonaRow`` for the FK exactly like :func:`cli.interactive._load_or_create_session_row` does for root rows. Returns the new child ``session_id``. """ async with db.session() as s: parent = await s.get(InteractiveSessionRow, str(parent_session_id)) if parent is None: raise MyDeepAgentError.human_required( "parent_session_missing", message=f"cannot spawn sub-agent: parent session {parent_session_id} not found", recovery_hint="confirm the parent session id; sub-agents require a live parent", ) if parent.state == "ended": raise MyDeepAgentError.human_required( "parent_session_ended", message=f"cannot spawn sub-agent: parent {parent.id} is ended", recovery_hint="resume the parent session first or pick a different parent", ) new_depth = (parent.depth or 0) + 1 if new_depth > MAX_SUBAGENT_DEPTH: raise MyDeepAgentError.human_required( "subagent_depth_exceeded", message=( f"sub-agent depth limit reached: parent depth={parent.depth}, " f"max={MAX_SUBAGENT_DEPTH}" ), recovery_hint=( f"flatten the agent stack (max {MAX_SUBAGENT_DEPTH} levels) " "or close intermediate sub-agents first" ), ) # Upsert AgentPersonaRow for the persona we're spawning with. ph = persona.compute_hash() persona_row = ( await s.execute(select(AgentPersonaRow).where(AgentPersonaRow.hash == ph)) ).scalar_one_or_none() if persona_row is None: persona_row = AgentPersonaRow( id=str(uuid4()), name=persona.name, version=persona.version, hash=ph, definition=persona.model_dump(by_alias=True), created_at=_now_iso(), ) s.add(persona_row) await s.flush() child_id = uuid4() child = InteractiveSessionRow( id=str(child_id), persona_id=persona_row.id, persona_hash=ph, started_at=_now_iso(), last_message_at=None, state="active", total_input_tokens=0, total_output_tokens=0, model=persona.model, project_key=parent.project_key, # inherit so memory is shared title=initial_title, plan_mode=False, parent_session_id=parent.id, depth=new_depth, ) s.add(child) await s.commit() return child_id async def list_subagents(db: Database, parent_session_id: UUID) -> list[InteractiveSessionRow]: """Return all direct children of ``parent_session_id``, oldest first. Used by the ``/agents`` slash and the Web GUI session tree. Does NOT recurse — callers that want the full tree must walk it themselves. """ async with db.session() as s: rows: Sequence[InteractiveSessionRow] = ( ( await s.execute( select(InteractiveSessionRow) .where(InteractiveSessionRow.parent_session_id == str(parent_session_id)) .order_by(InteractiveSessionRow.started_at) ) ) .scalars() .all() ) return list(rows) async def resolve_root_session_id(db: Database, session_id: UUID) -> UUID: """Walk ``parent_session_id`` until we reach a session with ``parent=None``. Guarded against cycles (would only happen if depth column lied — protective cap = 1 + MAX_SUBAGENT_DEPTH iterations). Returns the input id when the session has no parent. """ current = str(session_id) for _ in range(MAX_SUBAGENT_DEPTH + 2): async with db.session() as s: row = await s.get(InteractiveSessionRow, current) if row is None: return session_id if row.parent_session_id is None: return UUID(row.id) current = row.parent_session_id # Cycle detected — return the latest hop as a graceful fallback. return UUID(current) _SUBAGENT_SUMMARY_INSTRUCTION = ( "당신은 sub-agent 입니다. 사용자가 요청한 과제를 마치고 한 번의 응답 안에 " "(1) 도달한 결론, (2) 변경한 파일/생성한 산출물, (3) 부모 세션에 전달할 핵심 " "요약 (≤ 400 단어) 을 정리하세요. 추가 turn 은 일어나지 않습니다." ) def _static_pricing_seed() -> PricingCache: cache = PricingCache() cache.set( [ ModelPrice("anthropic/claude-sonnet-4-6", 0.003, 0.015, 200_000), ModelPrice("anthropic/claude-haiku-4-5", 0.001, 0.005, 200_000), ModelPrice("anthropic/claude-opus-4-1", 0.015, 0.075, 200_000), ModelPrice("deepseek/deepseek-chat", 0.00028, 0.00112, 64_000), ] ) return cache def _flatten_assistant_content(msg: Any) -> str: content = getattr(msg, "content", "") or "" if isinstance(content, list): return "\n".join( (b.get("text", str(b)) if isinstance(b, dict) else str(b)) for b in content ) return str(content) async def _persist_message( db: Database, session_id: UUID, role: str, content: str, *, model: str ) -> None: """Insert one MessageRow + bump last_message_at + token totals. Mirrors the REPL's ``_append_message`` but lives in subagents.py so the runner stays self-contained. """ token_count = count_tokens(content, model) now = datetime.now(UTC).isoformat(timespec="seconds") async with db.session() as s: last_seq = ( await s.execute( select(MessageRow.seq) .where(MessageRow.session_id == str(session_id)) .order_by(desc(MessageRow.seq)) .limit(1) ) ).scalar_one_or_none() or 0 s.add( MessageRow( session_id=str(session_id), seq=last_seq + 1, role=role, content=content, tool_calls=None, token_count=token_count, is_summary=False, archived=False, ts=now, ) ) row = await s.get(InteractiveSessionRow, str(session_id)) if row is not None: row.last_message_at = now if role == "user": row.total_input_tokens += token_count elif role == "assistant": row.total_output_tokens += token_count await s.commit() async def run_subagent_to_completion( db: Database, config: Config, parent_session_id: UUID, sub_session_id: UUID, persona: Persona, prompt: str, *, saver: Any | None = None, ) -> str: """Invoke the sub-agent ONCE with the supplied prompt and return its summary. - Loads the sub-session row to read ``project_key`` (inherited from parent) - Resolves the root session id and wires CostMiddleware to charge that single ``session:`` scope so the whole agent tree shares one budget envelope (per plan: "sub-agent는 root session의 한도에 합산"). - Persists user prompt + assistant summary to the sub-session. - Pushes a "[sub-agent result] …" system message to the parent so the user sees the outcome in the main thread. - Marks the sub-session ``ended`` on completion. Failures are logged + propagated as a synthetic assistant message in the sub-session, and an error system message in the parent. """ async with db.session() as s: sub_row = await s.get(InteractiveSessionRow, str(sub_session_id)) if sub_row is None: raise MyDeepAgentError.fatal( "subagent_session_missing", message=f"sub-agent session {sub_session_id} not found", recovery_hint="call spawn_subagent_session before run_subagent_to_completion", ) project_key = sub_row.project_key or "" root_session_id = await resolve_root_session_id(db, parent_session_id) # Bootstrap shared memory + skills dirs (idempotent). if project_key: ensure_memory_initialized(project_memory_dir(config, project_key)) ensure_skills_initialized(project_skills_dir(config, project_key)) ensure_memory_initialized(global_memory_dir(config)) ensure_skills_initialized(user_skills_dir(config)) pricing = _static_pricing_seed() budget = make_budget_tracker_from_config(db, config) cost_mw = CostMiddleware( pricing=pricing, model_name=persona.model, interactive_session_id=root_session_id, persona_name=persona.name, budget_tracker=budget, ) audit_mw = AuditToolMiddleware( interactive_session_id=sub_session_id, file_recorder=make_audit_recorder(config.state_dir), ) plan_mw = PlanModeMiddleware(is_active=lambda: False) memory_paths = list_memory_paths(global_memory_dir(config)) if project_key: memory_paths += list_memory_paths(project_memory_dir(config, project_key)) skill_sources = resolve_skill_sources(config, project_key or None) agent = build_agent( persona, config, root_dir=config.workspace_root, middleware=[plan_mw, cost_mw, audit_mw], checkpointer=saver, memory_paths_override=memory_paths, skills_sources_override=skill_sources, ) full_prompt = f"{prompt.strip()}\n\n---\n\n{_SUBAGENT_SUMMARY_INSTRUCTION}" await _persist_message(db, sub_session_id, "user", full_prompt, model=persona.model) thread_id = f"{sub_session_id}:0" try: result = await agent.ainvoke( {"messages": [{"role": "user", "content": full_prompt}]}, config={"configurable": {"thread_id": thread_id}}, ) except Exception as e: _LOG.exception("sub-agent ainvoke failed for session %s", sub_session_id) error_msg = f"sub-agent failed: {type(e).__name__}: {e}" await _persist_message(db, sub_session_id, "assistant", error_msg, model=persona.model) await _persist_message( db, parent_session_id, "system", f"[sub-agent {str(sub_session_id)[:8]} error] {error_msg}", model=persona.model, ) await _mark_session_ended(db, sub_session_id) return error_msg messages = result.get("messages", []) if isinstance(result, dict) else [] summary = _flatten_assistant_content(messages[-1]) if messages else "(empty response)" await _persist_message(db, sub_session_id, "assistant", summary, model=persona.model) await _persist_message( db, parent_session_id, "system", f"[sub-agent {str(sub_session_id)[:8]} result]\n{summary}", model=persona.model, ) # Compact the sub-session if it grew too big (rare for single-turn but # the helper is idempotent + cheap to call). await compact_session(db, config, str(sub_session_id)) await _mark_session_ended(db, sub_session_id) return summary async def _mark_session_ended(db: Database, session_id: UUID) -> None: async with db.session() as s: row = await s.get(InteractiveSessionRow, str(session_id)) if row is not None and row.state != "ended": row.state = "ended" row.ended_at = datetime.now(UTC).isoformat(timespec="seconds") await s.commit()