fix(my-deepagent): v0.3 plan-conformance — 18-item gap fix across PR #2-#9

1차 v0.3 구현 후 plan-v0.3 와 대조해 발견된 18건 누락/명세 위반을 보강.
자기 리뷰 3 라운드 (누락·미완 / 오류·엣지케이스 / 과최적화) 모두 PASS.

PR #5 plan-mode (3건):
- BLOCKED_TOOLS_IN_PLAN_MODE 에 write_todos 추가
- /plan 시 system message inject (_PLAN_MODE_SYSTEM_PROMPT)
- /approve 시 마지막 assistant 메시지를 "approved plan" system 으로 inject
- InteractiveSession._pending_system_messages 인프라 신설

PR #2 compaction (1건):
- CompactionResult.summary_text 추가, 다음 thread 첫 ainvoke 에 inject

PR #3 auto-memory (6건):
- global memory dir + bootstrap
- frontmatter name/description/type 정식 도입 + MemoryEntry/MemoryType
- _infer_memory_type (keyword heuristic, no LLM)
- _scrub_secrets (OpenRouter/Anthropic/OpenAI/AWS/Bearer redaction)
- /memory show <name> 서브명령
- /remember [--global] / /forget [--global] 스코프 토글

PR #4 skills (3건):
- project_skills_dir + 두 스코프 (global / project) merge with last-wins
- /skill <name> 본문 inject (queue_system_message) — 이전엔 REPL 출력만
- /skills show <name> 별도 서브명령

PR #6 sub-agent (4건):
- budget.py `session:<uuid>` scope + CostMiddleware 자동 전달
- resolve_root_session_id walk-up (cycle guard) + sub-agent root 에 charge
- run_subagent_to_completion 실제 ainvoke + 결과 push to parent
- /agents 서브명령 구조 (list / spawn / show) + spawn 시 parent system msg

PR #7 governance (1건):
- bootstrap_user_dirs — instructions + global/memory + skills + projects 한
  호출로 idempotent 부트스트랩

PR #8 Web GUI (1건):
- index.html → 세션 목록, runs.html (신설) → workflow archive
- conversation.html ?session=<id> deep-link

PR #9 workflow integration (2건):
- /workflow 백그라운드 WorkflowEngine.run + 진행 메시지 stream 누적
- /binding show <workflow-name[@version]> 인자 지원

테스트 (+17, 685 → 702 passed):
- test_plan_mode: write_todos 차단 + blocklist sanity
- test_memory: scrub + type 추론 + override
- test_skills: project override + find_skill + resolve_skill_sources(pk)
- test_subagents: resolve_root_session_id chain + missing fallback
- test_budget: session: scope accumulation
- test_instructions: governance bootstrap + idempotency
- test_api_static: runs.html 신설 + index.html 재구성

게이트:
- ruff check / format --check / mypy: PASS (141 source files)
- pytest -q --ignore=tests/integration/test_e2e_workflow.py
  --ignore=tests/integration/test_openrouter_smoke.py: 702 passed

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
chungyeong
2026-05-18 00:03:08 +09:00
parent 361d6d7636
commit 96c8849e2c
24 changed files with 1687 additions and 304 deletions

View File

@@ -95,9 +95,10 @@ class BudgetTracker:
run_id: UUID | None,
persona_name: str | None,
estimated_cost_usd: float,
session_id: UUID | None = None,
) -> BudgetCheck:
"""Check if a call of estimated_cost can proceed. May raise BudgetExhaustedError."""
scopes = self._scopes_for(run_id, persona_name)
scopes = self._scopes_for(run_id, persona_name, session_id)
async with self._db.session() as s:
for scope in scopes:
cap = self._cap_for_scope(scope)
@@ -120,11 +121,12 @@ class BudgetTracker:
run_id: UUID | None,
persona_name: str | None,
actual_cost_usd: float,
session_id: UUID | None = None,
) -> None:
"""Persist the actual cost into all relevant scopes."""
if actual_cost_usd == 0:
return
scopes = self._scopes_for(run_id, persona_name)
scopes = self._scopes_for(run_id, persona_name, session_id)
async with self._db.session() as s:
for scope in scopes:
await self._upsert_spend(s, scope, actual_cost_usd, self._cap_for_scope(scope))
@@ -145,11 +147,22 @@ class BudgetTracker:
# ----- internals ----------------------------------------------------------
def _scopes_for(self, run_id: UUID | None, persona_name: str | None) -> list[str]:
def _scopes_for(
self,
run_id: UUID | None,
persona_name: str | None,
session_id: UUID | None = None,
) -> list[str]:
today = _today_utc()
out = [f"day:{today}"]
if run_id is not None:
out.append(f"run:{run_id}")
if session_id is not None:
# v0.3 PR #6: sub-agent invocations charge their cost against this
# scope so the root interactive session can roll up everything that
# ran under it. Cap is the same as run cap (single user, single
# session ≈ single run for budget purposes).
out.append(f"session:{session_id}")
if persona_name:
out.append(f"persona:{persona_name}:day:{today}")
return out
@@ -159,6 +172,8 @@ class BudgetTracker:
return self._daily_cap
if scope.startswith("run:"):
return self._run_cap
if scope.startswith("session:"):
return self._run_cap # reuse run-cap for interactive sessions
if scope.startswith("persona:") and ":day:" in scope:
return self._daily_cap # per-persona daily uses day cap unless overridden
return None

View File

@@ -18,6 +18,7 @@ the same `MessageRow` + `InteractiveSessionRow` foundation.
from __future__ import annotations
import asyncio
import logging
import re
from datetime import UTC, datetime
from pathlib import Path
@@ -32,17 +33,22 @@ from rich.console import Console
from sqlalchemy import desc, select
from ..audit import make_audit_recorder
from ..binding import is_persona_eligible_for_role
from ..budget import make_budget_tracker_from_config
from ..compaction import compact_session, should_compact
from ..config import Config, load_config
from ..governance import require_consent
from ..instructions import ensure_global_instructions_initialized, resolve_instruction_paths
from ..governance import bootstrap_user_dirs, require_consent
from ..instructions import resolve_instruction_paths
from ..memory import (
INDEX_FILENAME,
MemoryType,
add_memory_entry,
ensure_memory_initialized,
global_memory_dir,
list_memory_paths,
memory_entries_summary,
project_memory_dir,
read_entry,
remove_memory_entry,
)
from ..middleware.audit import AuditToolMiddleware
@@ -57,13 +63,15 @@ from ..persona import Persona
from ..session import build_agent
from ..skills import (
ensure_skills_initialized,
list_installed_skills,
find_skill,
list_all_skills,
project_skills_dir,
read_skill_body,
resolve_skill_sources,
user_skills_dir,
)
from ..slash import SlashParsed, SlashRegistry, parse_slash
from ..subagents import list_subagents, spawn_subagent_session
from ..subagents import list_subagents, run_subagent_to_completion, spawn_subagent_session
from ..user_dirs import (
ensure_user_dirs_initialized,
load_combined_personas,
@@ -72,8 +80,20 @@ from ..user_dirs import (
from ..workflow import WorkflowTemplate
_CONSOLE = Console()
_LOG = logging.getLogger(__name__)
_FILE_REF_PATTERN = re.compile(r"(?<![\w./])@([\w./\-]+)")
#: Injected as a system message on /plan entry — tells the model that writes,
#: shell exec, sub-agent spawn, and `write_todos` are all blocked until the
#: user `/approve`s. Mirrors Claude Code's plan-mode preamble.
_PLAN_MODE_SYSTEM_PROMPT = (
"당신은 plan mode 입니다. write_file / edit_file / execute / task / "
"write_todos 는 모두 차단됩니다. read_file / glob / grep / ls 만 사용해 "
"필요한 정보를 확인하고, 사용자가 검토할 plan markdown 1개를 답변으로만 "
"제시하세요 (## Context / ## Phases / ## 검증 섹션 권장). 사용자가 /approve "
"하기 전까지는 어떤 변경도 시도하지 마세요."
)
def _seed_root() -> Path:
return Path(__file__).resolve().parents[3] / "docs" / "schemas"
@@ -171,21 +191,38 @@ class InteractiveSession:
# thread_suffix bumps on /model and compaction; LangGraph thread_id =
# f"{session_id}:{suffix}" so model switches start fresh deepagents state.
self._thread_suffix: int = 0
# v0.3 PR #3: per-project memory dir bootstrap. Idempotent so resumes
# are cheap. Path is determined entirely by config + project_key —
# the same repo across sessions hits the same memory.
# v0.3 PR #3: per-project + global memory dir bootstrap. Idempotent
# so resumes are cheap. `memory_dir` is project-scoped (same repo →
# same memory across sessions); `global_memory_dir_path` applies to
# every project.
self.memory_dir: Path = project_memory_dir(config, project_key)
ensure_memory_initialized(self.memory_dir)
# v0.3 PR #7: bootstrap global MYDEEPAGENT.md (project file is loaded
# if present but never auto-created — we don't write into the user's repo).
ensure_global_instructions_initialized(config)
# v0.3 PR #4: user-scope skills directory bootstrap. Empty is normal —
# users drop `<name>/SKILL.md` directories under here to register skills.
self.global_memory_dir_path: Path = global_memory_dir(config)
ensure_memory_initialized(self.global_memory_dir_path)
# v0.3 PR #7: bootstrap user-wide skeleton (global MYDEEPAGENT.md +
# global memory + skills dir + projects parent). Idempotent — repeat
# calls are cheap and resume sessions still get the layout right.
bootstrap_user_dirs(config)
# v0.3 PR #4: skills directory bootstrap (both scopes). Empty is
# normal — users drop `<name>/SKILL.md` directories under either to
# register skills. Project wins on name collision per
# `deepagents.SkillsMiddleware` "later overrides earlier" rule.
self.skills_dir: Path = user_skills_dir(config)
self.project_skills_dir: Path = project_skills_dir(config, project_key)
ensure_skills_initialized(self.skills_dir)
ensure_skills_initialized(self.project_skills_dir)
# v0.3 PR #5: plan-mode flag. PlanModeMiddleware reads this via closure
# every tool call — no agent rebuild needed when toggling on/off.
self._plan_mode: bool = False
# v0.3 PR #5/#2: queued system-message bodies to prepend on the next
# ainvoke. Used by /plan (inject "you are in plan-mode..."),
# /approve (inject the approved plan markdown), and compaction
# (inject the summary into the fresh thread). Consumed (and cleared)
# by `_invoke_and_stream`.
self._pending_system_messages: list[str] = []
# v0.3 PR #6: hold refs to in-flight sub-agent tasks so the GC + RUF006
# don't drop them mid-flight.
self._pending_subagent_tasks: set[asyncio.Task[Any]] = set()
@property
def thread_id(self) -> str:
@@ -238,14 +275,30 @@ class InteractiveSession:
"""Whether plan mode is currently active for this session."""
return self._plan_mode
def queue_system_message(self, content: str) -> None:
"""Queue a system-message body to prepend on the next ainvoke.
Used by plan-mode/approve/compaction to inject one-shot instructions
or summaries into the LangGraph thread. Empty/whitespace bodies are
silently dropped.
"""
if content and content.strip():
self._pending_system_messages.append(content)
def consume_pending_system_messages(self) -> list[str]:
"""Return + clear the queued system messages."""
out = list(self._pending_system_messages)
self._pending_system_messages.clear()
return out
async def set_plan_mode(self, enabled: bool) -> None:
"""Toggle plan mode + persist to the session row.
PlanModeMiddleware re-reads via closure each tool call → no agent
rebuild required. We DO bump the thread suffix on each toggle so the
model doesn't carry over "I was about to write a file" state into the
new mode. Persists `plan_mode` on the InteractiveSessionRow so resumes
re-establish the mode.
Bumps `thread_suffix` so the model starts the new mode with a clean
LangGraph thread. Persists ``plan_mode`` so resumes re-establish it.
Callers that need plan-specific system messages should use
:meth:`enter_plan_mode` / :meth:`approve_plan` / :meth:`reject_plan`
which wrap this and queue the right prompts.
"""
self._plan_mode = enabled
self._thread_suffix += 1
@@ -255,6 +308,45 @@ class InteractiveSession:
row.plan_mode = enabled
await s.commit()
async def enter_plan_mode(self) -> None:
"""Enter plan-mode + queue the plan-mode system prompt for next turn."""
await self.set_plan_mode(True)
self.queue_system_message(_PLAN_MODE_SYSTEM_PROMPT)
async def approve_plan(self) -> None:
"""Leave plan-mode + queue the most recent assistant message as an
approved-plan system message so the next ainvoke remembers what the
user agreed to. No-op if no assistant message exists yet.
"""
plan_text = await self._fetch_last_assistant_text()
await self.set_plan_mode(False)
if plan_text:
self.queue_system_message(
f"The user APPROVED the following plan. Execute it faithfully.\n\n{plan_text}"
)
async def reject_plan(self) -> None:
"""Leave plan-mode + drop any queued plan messages + reset thread."""
await self.set_plan_mode(False)
self._pending_system_messages.clear()
# clear_agent_cache bumps thread_suffix again — fresh slate.
self.clear_agent_cache()
async def _fetch_last_assistant_text(self) -> str | None:
"""Return the most recent non-archived assistant message body, or None."""
async with self.db.session() as s:
row = (
await s.execute(
select(MessageRow)
.where(MessageRow.session_id == str(self.session_id))
.where(MessageRow.role == "assistant")
.where(MessageRow.archived.is_(False))
.order_by(desc(MessageRow.seq))
.limit(1)
)
).scalar_one_or_none()
return row.content if row is not None else None
def build_agent_if_needed(self) -> Any:
if self._agent is not None:
return self._agent
@@ -275,12 +367,14 @@ class InteractiveSession:
plan_mw = PlanModeMiddleware(is_active=lambda: self._plan_mode)
# Re-glob memory paths every time the agent is rebuilt — `/remember` and
# `/forget` call `clear_agent_cache()` so this picks up new/removed files.
# Order: instruction files (global → project) FIRST, then MEMORY.md
# index, then individual entries. Later files override earlier ones
# at the same path per `deepagents.MemoryMiddleware`.
# Order: instruction files (global → project) FIRST, then global memory,
# then project memory. Later files override earlier ones at the same
# path per `deepagents.MemoryMiddleware`.
instruction_paths = resolve_instruction_paths(self.config, self.repo_root)
memory_paths = list_memory_paths(self.memory_dir)
skill_sources = resolve_skill_sources(self.config)
global_memory_paths = list_memory_paths(self.global_memory_dir_path)
project_memory_paths = list_memory_paths(self.memory_dir)
memory_paths = global_memory_paths + project_memory_paths
skill_sources = resolve_skill_sources(self.config, self.project_key)
self._agent = build_agent(
self._persona,
self.config,
@@ -549,6 +643,10 @@ def _register_compaction_slash(reg: SlashRegistry, sess: InteractiveSession) ->
result = await compact_session(sess.db, sess.config, str(sess.session_id))
if result.compacted:
sess.clear_agent_cache()
if result.summary_text:
sess.queue_system_message(
f"Earlier conversation history (compacted summary):\n\n{result.summary_text}"
)
_CONSOLE.print(
f"[green]compacted[/] — {result.archived} messages archived, "
f"summary {result.summary_tokens} tokens (new thread started)"
@@ -560,86 +658,204 @@ def _register_compaction_slash(reg: SlashRegistry, sess: InteractiveSession) ->
reg.register("compact", _compact, help="manually compact the conversation history")
async def _handle_remember(sess: InteractiveSession, raw: str) -> None:
text, scope, mtype = _parse_remember_args(raw)
if not text:
_CONSOLE.print(
"[yellow]usage:[/] /remember [--global] [--type=user|feedback|project|reference] <text>"
)
return
target_dir = sess.global_memory_dir_path if scope == "global" else sess.memory_dir
try:
result = add_memory_entry(target_dir, text, memory_type=mtype)
except ValueError as e:
_CONSOLE.print(f"[red]{e}[/]")
return
sess.clear_agent_cache()
scope_label = "[bold]global[/]" if scope == "global" else "project"
_CONSOLE.print(
f"[green]remembered →[/] {result.path.name} "
f"scope={scope_label} type={result.memory_type} (new thread, memory reloaded)"
)
if result.scrubbed:
_CONSOLE.print("[yellow]⚠ secret-like substring detected and redacted before saving.[/]")
async def _handle_forget(sess: InteractiveSession, args: tuple[str, ...]) -> None:
if not args:
_CONSOLE.print("[yellow]usage:[/] /forget [--global] <slug> — remove a memory file.")
return
scope = "global" if "--global" in args else "project"
slug_candidates = [a for a in args if not a.startswith("--")]
if not slug_candidates:
_CONSOLE.print("[yellow]missing slug.[/]")
return
slug = slug_candidates[0]
target_dir = sess.global_memory_dir_path if scope == "global" else sess.memory_dir
removed = remove_memory_entry(target_dir, slug)
if not removed:
_CONSOLE.print(f"[yellow]no memory file found in {scope}:[/] {slug}")
return
sess.clear_agent_cache()
_CONSOLE.print(f"[green]forgotten →[/] {slug} scope={scope} (new thread, memory reloaded)")
def _register_memory_slash(reg: SlashRegistry, sess: InteractiveSession) -> None:
"""Register /remember, /forget, /memory slash handlers (v0.3 PR #3)."""
async def _remember(cmd: SlashParsed) -> bool:
# /remember <text> — strip the leading "remember" word from raw to
# preserve original whitespace inside the entry body.
text = cmd.raw[len("remember") :].strip() if cmd.raw.lower().startswith("remember") else ""
if not text:
_CONSOLE.print(
"[yellow]usage:[/] /remember <text> — saves a memory file for this project."
)
return False
try:
path = add_memory_entry(sess.memory_dir, text)
except ValueError as e:
_CONSOLE.print(f"[red]{e}[/]")
return False
# Force rebuild on next turn so MemoryMiddleware picks up the new file.
sess.clear_agent_cache()
_CONSOLE.print(f"[green]remembered →[/] {path.name} (new thread, memory reloaded)")
await _handle_remember(sess, cmd.raw)
return False
async def _forget(cmd: SlashParsed) -> bool:
if not cmd.args:
_CONSOLE.print("[yellow]usage:[/] /forget <slug> — remove a memory file.")
return False
slug = cmd.args[0]
removed = remove_memory_entry(sess.memory_dir, slug)
if not removed:
_CONSOLE.print(f"[yellow]no memory file found for:[/] {slug}")
return False
sess.clear_agent_cache()
_CONSOLE.print(f"[green]forgotten →[/] {slug} (new thread, memory reloaded)")
await _handle_forget(sess, cmd.args)
return False
async def _memory(_: SlashParsed) -> bool:
entries = memory_entries_summary(sess.memory_dir)
_CONSOLE.print(f"[bold]project memory[/] ({sess.memory_dir})")
if not entries:
_CONSOLE.print(" [dim](no entries — try /remember <text>)[/]")
async def _memory(cmd: SlashParsed) -> bool:
if cmd.args and cmd.args[0] == "show":
if len(cmd.args) < 2:
_CONSOLE.print("[yellow]usage:[/] /memory show <name>")
return False
_show_memory_entry(sess, cmd.args[1])
return False
_list_memories(sess)
return False
reg.register("remember", _remember, help="save a memory: /remember [--global] <text>")
reg.register("forget", _forget, help="remove a memory: /forget [--global] <slug>")
reg.register(
"memory",
_memory,
help="list memory entries / /memory show <name> for body",
)
def _parse_remember_args(raw: str) -> tuple[str, str, MemoryType | None]:
"""Extract the body, scope ("project"|"global"), and explicit type from
a `/remember` slash raw string. Returns ``(body, scope, memory_type)``.
"""
tokens = raw.split()
if not tokens or tokens[0].lower() != "remember":
return ("", "project", None)
rest_tokens = tokens[1:]
scope = "project"
mtype: MemoryType | None = None
body_tokens: list[str] = []
for t in rest_tokens:
if t == "--global":
scope = "global"
elif t.startswith("--type="):
candidate = t.split("=", 1)[1].strip().lower()
if candidate == "user":
mtype = "user"
elif candidate == "feedback":
mtype = "feedback"
elif candidate == "project":
mtype = "project"
elif candidate == "reference":
mtype = "reference"
else:
body_tokens.append(t)
body = " ".join(body_tokens).strip()
return (body, scope, mtype)
def _list_memories(sess: InteractiveSession) -> None:
for label, dir_path in (("project", sess.memory_dir), ("global", sess.global_memory_dir_path)):
entries = memory_entries_summary(dir_path)
_CONSOLE.print(f"[bold]{label} memory[/] ({dir_path})")
if not entries:
_CONSOLE.print(" [dim](no entries)[/]")
continue
for name, size in entries:
_CONSOLE.print(f" - {name} [dim]({size} chars)[/]")
return False
reg.register("remember", _remember, help="save a memory: /remember <text>")
reg.register("forget", _forget, help="remove a memory: /forget <slug>")
reg.register("memory", _memory, help="list memory entries for this project")
def _show_memory_entry(sess: InteractiveSession, name: str) -> None:
target_name = name if name.endswith(".md") else f"{name}.md"
for label, dir_path in (("project", sess.memory_dir), ("global", sess.global_memory_dir_path)):
candidate = dir_path / target_name
if candidate.name == INDEX_FILENAME:
continue
entry = read_entry(candidate)
if entry is not None:
_CONSOLE.print(
f"[bold]{entry.name}[/] (type={entry.memory_type}, scope={label})\n"
f"[dim]{entry.description}[/]"
)
_CONSOLE.print(entry.content)
return
_CONSOLE.print(f"[yellow]no memory entry found:[/] {name}")
def _list_skills(sess: InteractiveSession) -> None:
infos = list_all_skills(sess.config, sess.project_key)
_CONSOLE.print(
f"[bold]installed skills[/] global={sess.skills_dir} project={sess.project_skills_dir}"
)
if not infos:
_CONSOLE.print(
" [dim](none installed — drop a <name>/SKILL.md directory under either path)[/]"
)
return
for info in infos:
_CONSOLE.print(f" - [cyan]{info.name}[/] ([dim]{info.scope}[/]) — {info.description}")
def _show_skill_body(sess: InteractiveSession, name: str) -> None:
skill = find_skill(sess.config, sess.project_key, name)
if skill is not None:
_CONSOLE.print(f"[bold]{skill.name}[/] scope={skill.scope} ({skill.path})")
_CONSOLE.print(skill.path.read_text(encoding="utf-8"))
return
# Fallback to direct lookup under user skills dir (preserves test paths).
body = read_skill_body(sess.skills_dir, name)
if body is None:
_CONSOLE.print(f"[yellow]no skill found:[/] {name}")
return
_CONSOLE.print(f"[bold]{name}[/] ({sess.skills_dir / name / 'SKILL.md'})")
_CONSOLE.print(body)
def _register_skills_slash(reg: SlashRegistry, sess: InteractiveSession) -> None:
"""Register /skills (list) and /skill <name> (show body) slash handlers (PR #4)."""
"""Register /skills (list/show) + /skill (system-inject) handlers (PR #4)."""
async def _skills(_: SlashParsed) -> bool:
infos = list_installed_skills(sess.skills_dir)
_CONSOLE.print(f"[bold]installed skills[/] ({sess.skills_dir})")
if not infos:
_CONSOLE.print(
" [dim](none installed — drop a <name>/SKILL.md directory under the path above)[/]"
)
async def _skills(cmd: SlashParsed) -> bool:
if cmd.args and cmd.args[0] == "show":
if len(cmd.args) < 2:
_CONSOLE.print("[yellow]usage:[/] /skills show <name>")
return False
_show_skill_body(sess, cmd.args[1])
return False
for info in infos:
_CONSOLE.print(f" - [cyan]{info.name}[/] — {info.description}")
_list_skills(sess)
return False
async def _skill(cmd: SlashParsed) -> bool:
if not cmd.args:
_CONSOLE.print("[yellow]usage:[/] /skill <name> — show the full SKILL.md body")
_CONSOLE.print(
"[yellow]usage:[/] /skill <name> — inject the skill body as a "
"system message for the next turn"
)
return False
name = cmd.args[0]
body = read_skill_body(sess.skills_dir, name)
if body is None:
skill = find_skill(sess.config, sess.project_key, name)
if skill is None:
_CONSOLE.print(f"[yellow]no skill found:[/] {name}")
return False
_CONSOLE.print(f"[bold]{name}[/] ({sess.skills_dir / name / 'SKILL.md'})")
_CONSOLE.print(body)
body = skill.path.read_text(encoding="utf-8")
sess.queue_system_message(
f"The user requested skill `{skill.name}` (scope={skill.scope}). "
f"Apply the following SKILL.md as the dominant approach for this turn:"
f"\n\n{body}"
)
_CONSOLE.print(
f"[green]skill queued →[/] {skill.name} scope={skill.scope} "
"(next turn will receive the body as a system message)"
)
return False
reg.register("skills", _skills, help="list installed skills")
reg.register("skill", _skill, help="show a skill's body: /skill <name>")
reg.register("skills", _skills, help="list skills / /skills show <name> for body")
reg.register("skill", _skill, help="inject a skill for next turn: /skill <name>")
def _register_plan_mode_slash(reg: SlashRegistry, sess: InteractiveSession) -> None:
@@ -649,11 +865,11 @@ def _register_plan_mode_slash(reg: SlashRegistry, sess: InteractiveSession) -> N
if sess.plan_mode:
_CONSOLE.print("[yellow]plan-mode is already active.[/]")
return False
await sess.set_plan_mode(True)
await sess.enter_plan_mode()
_CONSOLE.print(
"[bold yellow]plan-mode ON[/] — write_file / edit_file / "
"execute / task tools are blocked. Use /approve to leave, "
"or /reject to discard the plan."
"execute / task / write_todos 가 모두 차단됩니다. /approve 하면 "
"방금 만든 plan 그대로 실행 모드로 전환, /reject 하면 thread 리셋."
)
return False
@@ -661,16 +877,18 @@ def _register_plan_mode_slash(reg: SlashRegistry, sess: InteractiveSession) -> N
if not sess.plan_mode:
_CONSOLE.print("[yellow]plan-mode is not active.[/]")
return False
await sess.set_plan_mode(False)
_CONSOLE.print("[green]plan approved → leaving plan-mode (writes re-enabled).[/]")
await sess.approve_plan()
_CONSOLE.print(
"[green]plan approved → leaving plan-mode (writes re-enabled, "
"approved plan queued for next turn).[/]"
)
return False
async def _reject(_: SlashParsed) -> bool:
if not sess.plan_mode:
_CONSOLE.print("[yellow]plan-mode is not active.[/]")
return False
await sess.set_plan_mode(False)
sess.clear_agent_cache() # drop the plan thread entirely
await sess.reject_plan()
_CONSOLE.print("[red]plan rejected → fresh thread, writes re-enabled.[/]")
return False
@@ -679,60 +897,148 @@ def _register_plan_mode_slash(reg: SlashRegistry, sess: InteractiveSession) -> N
reg.register("reject", _reject, help="leave plan-mode, discard plan thread")
def _register_subagent_slash(reg: SlashRegistry, sess: InteractiveSession) -> None:
"""Register /agents (list children) and /spawn <persona> slash handlers (PR #6)."""
async def _agents(_: SlashParsed) -> bool:
children = await list_subagents(sess.db, sess.session_id)
_CONSOLE.print(f"[bold]sub-agents of {str(sess.session_id)[:8]}…[/]")
if not children:
_CONSOLE.print(" [dim](none — use /spawn <persona> to create one)[/]")
return False
for c in children:
label = c.title or "(no title)"
_CONSOLE.print(
f" - [cyan]{c.id[:8]}…[/] depth={c.depth} state={c.state} [dim]{label}[/]"
)
return False
async def _spawn(cmd: SlashParsed) -> bool:
if not cmd.args:
_CONSOLE.print(
"[yellow]usage:[/] /spawn <persona-name> — fork a child session "
"with the named persona (inherits project memory + skills)"
)
return False
target_name = cmd.args[0]
target = None
for p in sess.personas:
if p.name == target_name or f"{p.name}@{p.version}" == target_name:
target = p
break
if target is None:
_CONSOLE.print(f"[red]persona not found:[/] {target_name}")
return False
try:
child_id = await spawn_subagent_session(
sess.db,
parent_session_id=sess.session_id,
persona=target,
initial_title=f"child of {str(sess.session_id)[:8]}",
)
except Exception as e:
_CONSOLE.print(f"[red]spawn failed:[/] {type(e).__name__}: {e}")
return False
async with sess.db.session() as s:
child = await s.get(InteractiveSessionRow, str(child_id))
depth = child.depth if child is not None else "?"
async def _agents_list(sess: InteractiveSession) -> None:
children = await list_subagents(sess.db, sess.session_id)
_CONSOLE.print(f"[bold]sub-agents of {str(sess.session_id)[:8]}…[/]")
if not children:
_CONSOLE.print(' [dim](none — use /agents spawn <persona> "<purpose>" to create one)[/]')
return
for c in children:
label = c.title or "(no title)"
_CONSOLE.print(
f"[green]spawned[/] {str(child_id)[:8]}"
f"depth={depth} "
f"resume with: `mydeepagent --session {str(child_id)[:8]}`"
f" - [cyan]{c.id[:8]}…[/] depth={c.depth} state={c.state} [dim]{label}[/]"
)
async def _agents_show(sess: InteractiveSession, prefix: str) -> None:
"""Print a child's most recent message history (head + tail)."""
from sqlalchemy import select as _sel
children = await list_subagents(sess.db, sess.session_id)
matches = [c for c in children if c.id.startswith(prefix)]
if not matches:
_CONSOLE.print(f"[yellow]no sub-agent matches prefix:[/] {prefix}")
return
if len(matches) > 1:
_CONSOLE.print(f"[red]ambiguous prefix matches >1 sub-agent:[/] {prefix}")
return
child = matches[0]
async with sess.db.session() as s:
rows = (
(
await s.execute(
_sel(MessageRow)
.where(MessageRow.session_id == child.id)
.where(MessageRow.archived.is_(False))
.order_by(MessageRow.seq)
)
)
.scalars()
.all()
)
_CONSOLE.print(
f"[bold]{child.id[:8]}…[/] state={child.state} depth={child.depth} "
f"({len(rows)} messages)"
)
for r in rows:
preview = r.content.strip()
if len(preview) > 400:
preview = preview[:400] + ""
_CONSOLE.print(f" [dim]{r.role}[/] {preview}")
async def _agents_spawn(sess: InteractiveSession, args: tuple[str, ...]) -> None:
"""Spawn + run a sub-agent to completion in the background.
Syntax: ``/agents spawn <persona[@version]> "<purpose>"`` — the purpose
is a quoted string (multi-word) sent as the user prompt of the sub.
"""
if len(args) < 2:
_CONSOLE.print(
'[yellow]usage:[/] /agents spawn <persona-name> "<purpose>"'
"purpose is the first user prompt the sub-agent will see."
)
return
target_name = args[0]
purpose = " ".join(args[1:]).strip().strip('"').strip("'")
if not purpose:
_CONSOLE.print("[yellow]purpose must be non-empty.[/]")
return
target: Persona | None = None
for p in sess.personas:
if p.name == target_name or f"{p.name}@{p.version}" == target_name:
target = p
break
if target is None:
_CONSOLE.print(f"[red]persona not found:[/] {target_name}")
return
try:
child_id = await spawn_subagent_session(
sess.db,
parent_session_id=sess.session_id,
persona=target,
initial_title=purpose[:50],
)
except Exception as e:
_CONSOLE.print(f"[red]spawn failed:[/] {type(e).__name__}: {e}")
return
# Push a "spawned" system marker into the parent so the user sees it in
# /sessions show + the conversation thread.
await _append_message(
sess.db,
sess.session_id,
"system",
f"[sub-agent {str(child_id)[:8]} spawned] persona={target.name}@{target.version} "
f"purpose={purpose}",
token_count=_approx_token_count(purpose, sess.active_model),
)
_CONSOLE.print(
f"[green]spawned[/] {str(child_id)[:8]}… persona={target.name} running in background…"
)
# Fire-and-forget background invocation. We hold a ref on the session so
# GC doesn't kill it mid-flight (matches the API runner pattern).
task = asyncio.create_task(
run_subagent_to_completion(
sess.db,
sess.config,
sess.session_id,
child_id,
target,
purpose,
saver=sess.saver,
)
)
sess._pending_subagent_tasks.add(task)
task.add_done_callback(sess._pending_subagent_tasks.discard)
def _register_subagent_slash(reg: SlashRegistry, sess: InteractiveSession) -> None:
"""Register /agents subcommand router (PR #6)."""
async def _agents(cmd: SlashParsed) -> bool:
if not cmd.args or cmd.args[0] == "list":
await _agents_list(sess)
return False
sub = cmd.args[0]
rest = cmd.args[1:]
if sub == "spawn":
await _agents_spawn(sess, rest)
return False
if sub == "show":
if not rest:
_CONSOLE.print("[yellow]usage:[/] /agents show <prefix>")
return False
await _agents_show(sess, rest[0])
return False
_CONSOLE.print(f"[yellow]unknown /agents subcommand:[/] {sub} (try list / spawn / show)")
return False
reg.register("agents", _agents, help="list direct sub-agents of this session")
reg.register("spawn", _spawn, help="fork a child session: /spawn <persona-name>")
reg.register(
"agents",
_agents,
help='/agents list | spawn <persona> "<purpose>" | show <prefix>',
)
def _print_personas(sess: InteractiveSession) -> None:
@@ -773,15 +1079,85 @@ def _find_workflow(
def _print_workflow_kickoff(path: Path, tpl: WorkflowTemplate) -> None:
_CONSOLE.print(
f"[yellow]/workflow kick-off is best invoked via:[/]\n"
f" mydeepagent run --workflow {path} --repo .\n"
f"That command launches a full WorkflowEngine.run with audit + budget "
f"+ resume support. Live progress: `mydeepagent runs show <id>` or "
f"the Web GUI."
f"[green]workflow queued →[/] [cyan]{tpl.name}@{tpl.version}[/] "
f"phases={len(tpl.phases)} roles={len(tpl.roles)}\n"
f" source: [dim]{path}[/]\n"
f" progress messages will appear in this session. "
f"Detailed view: `mydeepagent runs show <id>` or the Runs page."
)
_CONSOLE.print(
f" workflow: [cyan]{tpl.name}@{tpl.version}[/] "
f"phases={len(tpl.phases)} roles={len(tpl.roles)}"
async def _run_workflow_background(
sess: InteractiveSession,
template: WorkflowTemplate,
repo_path: Path,
base_branch: str = "main",
) -> None:
"""Build a WorkflowEngine + invoke `run` + mirror events to the session.
Phase 1 logs "started run ...". After completion logs result state + final
report path (if any). All failures are swallowed and logged as system
messages — we never propagate to the REPL loop because that would crash
the user's chat.
"""
from ..artifact_schema import ArtifactSchemaRegistry # local import: heavy module
from ..binding import BackendAvailability, PersonaConsentStore
from ..engine import WorkflowEngine
from ..enums import Backend
from ..tui.approval import cli_approval_callback
seed_root = Path(__file__).resolve().parents[3] / "docs" / "schemas"
registry = ArtifactSchemaRegistry(roots=[seed_root / "artifacts"])
consent_store = PersonaConsentStore(sess.config.data_dir / "persona-consents.json")
budget = make_budget_tracker_from_config(sess.db, sess.config)
await budget.init()
engine = WorkflowEngine(
db=sess.db,
config=sess.config,
persona_pool=sess.personas,
artifact_registry=registry,
consent_store=consent_store,
available_backends=BackendAvailability(available_backends=frozenset(Backend)),
approval_callback=cli_approval_callback,
budget_tracker=budget,
pricing=sess.pricing,
)
await _append_message(
sess.db,
sess.session_id,
"system",
f"[workflow {template.name}@{template.version} started] repo={repo_path} "
f"phases={len(template.phases)}",
token_count=_approx_token_count(template.name, sess.active_model),
)
try:
result = await engine.run(
template,
repo_path=repo_path,
base_branch=base_branch,
)
except Exception as e:
_LOG.exception("workflow run failed for template %s", template.name)
await _append_message(
sess.db,
sess.session_id,
"system",
f"[workflow {template.name}@{template.version} failed] {type(e).__name__}: {e}",
token_count=_approx_token_count(template.name, sess.active_model),
)
return
final_path = result.final_report_path or "(no report)"
await _append_message(
sess.db,
sess.session_id,
"system",
f"[workflow {template.name}@{template.version} {result.state.value}] "
f"run_id={result.run_id} report={final_path}",
token_count=_approx_token_count(template.name, sess.active_model),
)
@@ -790,10 +1166,79 @@ def _print_bindings(sess: InteractiveSession) -> None:
_CONSOLE.print("[dim](no workflows loaded)[/]")
return
for _path, tpl in sess.workflows:
_CONSOLE.print(f"[bold]{tpl.name}@{tpl.version}[/]")
for role in tpl.roles:
caps = ", ".join(c.value for c in role.required_capabilities)
_CONSOLE.print(f" - role [cyan]{role.id}[/] required: [dim]{caps}[/]")
_print_binding_for_template(sess, tpl)
def _print_binding_for_template(sess: InteractiveSession, tpl: WorkflowTemplate) -> None:
"""For one workflow template, print each role + the eligible personas list.
Format mirrors `mydeepagent run --binding` previews — we DO NOT run the
full ``bind_personas`` because that requires consent store + backend
availability checks. This is a fast dry-run that just shows capability
matches so the user can decide which personas to pin via `--override`.
"""
_CONSOLE.print(f"[bold]{tpl.name}@{tpl.version}[/]")
for role in tpl.roles:
caps = ", ".join(c.value for c in role.required_capabilities)
_CONSOLE.print(f" - role [cyan]{role.id}[/] required: [dim]{caps}[/]")
eligible: list[Persona] = []
for p in sess.personas:
ok, _reason = is_persona_eligible_for_role(p, role, tpl)
if ok:
eligible.append(p)
if not eligible:
_CONSOLE.print(" [red](no eligible persona)[/]")
continue
for p in eligible:
_CONSOLE.print(f" · [green]{p.name}@{p.version}[/] [dim]({p.model})[/]")
def _parse_workflow_args(args: tuple[str, ...], default_repo: Path) -> tuple[Path, str]:
repo_arg = default_repo
base_branch = "main"
for a in args:
if a.startswith("--repo="):
repo_arg = Path(a.split("=", 1)[1]).expanduser().resolve()
elif a.startswith("--base="):
base_branch = a.split("=", 1)[1]
return repo_arg, base_branch
async def _handle_workflow_kickoff(sess: InteractiveSession, args: tuple[str, ...]) -> None:
if not args:
_CONSOLE.print(
"[yellow]usage:[/] /workflow <name[@version]> [--repo=<path>] "
"[--base=<branch>] — kick off a background workflow run. "
"Progress messages will appear in this session."
)
return
match = _find_workflow(sess, args[0])
if match is None:
_CONSOLE.print(f"[red]workflow not found:[/] {args[0]}")
return
repo_arg, base_branch = _parse_workflow_args(args[1:], sess.repo_root)
path, tpl = match
_print_workflow_kickoff(path, tpl)
task = asyncio.create_task(_run_workflow_background(sess, tpl, repo_arg, base_branch))
sess._pending_subagent_tasks.add(task) # reuse the existing ref-set
task.add_done_callback(sess._pending_subagent_tasks.discard)
async def _handle_binding_show(sess: InteractiveSession, args: tuple[str, ...]) -> None:
if not args or args[0] != "show":
_CONSOLE.print(
"[yellow]usage:[/] /binding show [<workflow-name[@version]>] — "
"list role→persona eligibility. No name = all loaded workflows."
)
return
if len(args) >= 2:
match = _find_workflow(sess, args[1])
if match is None:
_CONSOLE.print(f"[red]workflow not found:[/] {args[1]}")
return
_print_binding_for_template(sess, match[1])
return
_print_bindings(sess)
def _register_workflow_slash(reg: SlashRegistry, sess: InteractiveSession) -> None:
@@ -808,26 +1253,11 @@ def _register_workflow_slash(reg: SlashRegistry, sess: InteractiveSession) -> No
return False
async def _workflow_cmd(cmd: SlashParsed) -> bool:
if not cmd.args:
_CONSOLE.print(
"[yellow]usage:[/] /workflow <name[@version]> — kick off a "
"background workflow run. See /runs for progress."
)
return False
target_name = cmd.args[0]
match = _find_workflow(sess, target_name)
if match is None:
_CONSOLE.print(f"[red]workflow not found:[/] {target_name}")
return False
path, tpl = match
_print_workflow_kickoff(path, tpl)
await _handle_workflow_kickoff(sess, cmd.args)
return False
async def _binding_cmd(cmd: SlashParsed) -> bool:
if not cmd.args or cmd.args[0] != "show":
_CONSOLE.print("[yellow]usage:[/] /binding show — list role→persona defaults")
return False
_print_bindings(sess)
await _handle_binding_show(sess, cmd.args)
return False
reg.register("personas", _personas_cmd, help="list all loaded personas")
@@ -870,7 +1300,20 @@ async def _invoke_and_stream(
sess: InteractiveSession,
) -> None:
"""Invoke the agent, print the assistant response, and persist both messages."""
# 1. Persist the user message first so it's durable even if ainvoke fails.
# 1. Consume queued system messages (from /plan, /approve, compaction).
# Persist them BEFORE the user msg so /sessions show shows them in the
# correct order ([..., system, user, assistant, ...]).
pending_sys = sess.consume_pending_system_messages()
for s_body in pending_sys:
await _append_message(
sess.db,
sess.session_id,
"system",
s_body,
token_count=_approx_token_count(s_body, sess.active_model),
)
# 2. Persist the user message — durable even if ainvoke fails.
await _append_message(
sess.db,
sess.session_id,
@@ -879,11 +1322,14 @@ async def _invoke_and_stream(
token_count=_approx_token_count(user_text, sess.active_model),
)
# 2. Invoke the agent. LangGraph thread_id includes the suffix so /model
# or /clear-induced switches start a fresh context.
# 3. Build the LangGraph invoke payload. Same system messages prepend
# the user turn so the model actually sees them; LangGraph carries the
# accumulated state for the same thread_id.
invoke_messages: list[dict[str, Any]] = [{"role": "system", "content": s} for s in pending_sys]
invoke_messages.append({"role": "user", "content": user_text})
try:
result = await agent.ainvoke(
{"messages": [{"role": "user", "content": user_text}]},
{"messages": invoke_messages},
config={"configurable": {"thread_id": sess.thread_id}},
)
except Exception:
@@ -917,12 +1363,20 @@ async def _invoke_and_stream(
async with sess.db.session() as s:
session_row = await s.get(InteractiveSessionRow, str(sess.session_id))
if session_row is not None and should_compact(session_row):
result = await compact_session(sess.db, sess.config, str(sess.session_id))
if result.compacted:
compaction_result = await compact_session(sess.db, sess.config, str(sess.session_id))
if compaction_result.compacted:
sess.clear_agent_cache() # bumps thread_suffix → fresh deepagents thread
# PR #2 plan: new thread starts with system+summary+recent_K context.
# We queue the summary as a system message so the next ainvoke
# carries it on the fresh thread.
if compaction_result.summary_text:
sess.queue_system_message(
"Earlier conversation history (compacted summary):\n\n"
f"{compaction_result.summary_text}"
)
_CONSOLE.print(
f"[dim]context compacted — {result.archived} messages archived, "
f"summary {result.summary_tokens} tokens, new thread[/]"
f"[dim]context compacted — {compaction_result.archived} messages archived, "
f"summary {compaction_result.summary_tokens} tokens, new thread[/]"
)

View File

@@ -64,11 +64,13 @@ class CompactionResult:
compacted: bool,
archived: int = 0,
summary_tokens: int = 0,
summary_text: str = "",
reason: str = "",
) -> None:
self.compacted = compacted
self.archived = archived
self.summary_tokens = summary_tokens
self.summary_text = summary_text
self.reason = reason
def __repr__(self) -> str:
@@ -280,6 +282,7 @@ async def compact_session(
compacted=True,
archived=len(archive_ids),
summary_tokens=summary_tokens,
summary_text=summary_text,
reason="ok",
)

View File

@@ -1,13 +1,29 @@
"""Governance consent for sending user code to external LLM providers."""
"""Governance consent + first-run filesystem bootstrap.
v0.3 PR #7 extends this module to provision the user-wide skeleton on first
run: ``<data_dir>/MYDEEPAGENT.md``, ``<data_dir>/global/memory/MEMORY.md``,
``<data_dir>/skills/``, ``<data_dir>/projects/``. All steps are idempotent
so repeated calls do nothing destructive.
The bootstrap is invoked at REPL/API startup so users always see the dirs
even before they touch a `/remember` or `/skill` slash.
"""
from __future__ import annotations
import json
import logging
import os
from datetime import UTC, datetime
from pathlib import Path
from .config import Config
from .errors import MyDeepAgentError
from .instructions import ensure_global_instructions_initialized
from .memory import ensure_memory_initialized, global_memory_dir
from .skills import ensure_skills_initialized, user_skills_dir
_LOG = logging.getLogger(__name__)
def consent_path(data_dir: Path) -> Path:
@@ -39,3 +55,25 @@ def require_consent(data_dir: Path) -> None:
message="governance consent not recorded",
recovery_hint="run `mydeepagent init` and accept the data-governance prompt",
)
def bootstrap_user_dirs(config: Config) -> None:
"""Provision the full user-wide skeleton. Idempotent.
Creates (if missing):
- ``<data_dir>/MYDEEPAGENT.md`` (global instructions w/ template)
- ``<data_dir>/global/memory/MEMORY.md`` (empty index for cross-project memory)
- ``<data_dir>/skills/`` (user-wide skills root)
- ``<data_dir>/projects/`` (parent of per-project subtrees)
Per-project subdirs (``projects/<project_key>/memory|skills``) are still
created lazily by :class:`InteractiveSession` since they depend on the
repo path; the parent ``projects/`` is materialised here so users see the
expected layout even before opening their first session.
"""
data_dir = Path(config.data_dir)
data_dir.mkdir(parents=True, exist_ok=True)
ensure_global_instructions_initialized(config)
ensure_memory_initialized(global_memory_dir(config))
ensure_skills_initialized(user_skills_dir(config))
(data_dir / "projects").mkdir(parents=True, exist_ok=True)

View File

@@ -1,10 +1,13 @@
"""Auto-memory (v0.3 PR #3) — project-scoped persistent context.
"""Auto-memory (v0.3 PR #3) — project-scoped + global persistent context.
Layout::
<config.data_dir>/projects/<project_key>/memory/
<config.data_dir>/projects/<project_key>/memory/ # project-scoped
MEMORY.md # index — one line per entry: "- [Title](file.md) — hook"
<slug>.md # individual memory entries (with optional frontmatter)
<slug>.md # individual memory entries with frontmatter
<config.data_dir>/global/memory/ # global (every project)
MEMORY.md
<slug>.md
The deepagents `MemoryMiddleware` reads every path we pass via the `memory=`
kwarg of `create_deep_agent` and injects them (concatenated) into the system
@@ -13,19 +16,42 @@ every turn, so updates take effect on the next user message — no agent
rebuild required.
`/remember <text>` appends a new entry file and updates the index. `/forget
<slug>` deletes the entry file and prunes the index. Both are project-scoped
(via `project_key`) so different repos keep separate memory.
<slug>` deletes the entry file and prunes the index. Both default to the
project scope; pass ``scope="global"`` to write into the global directory.
Frontmatter follows the Claude Code auto-memory convention:
---
name: <slug>
description: <one-line hook>
type: user | feedback | project | reference
---
<body>
Type inference uses simple keyword heuristics (deterministic — no LLM call)
so `/remember` works offline. Callers can override with ``--type=feedback``
on the slash if the heuristic picks the wrong bucket.
API keys / OpenRouter / Anthropic tokens are scrubbed at write time via
:func:`_scrub_secrets` — the user gets a single warning + a placeholder.
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Literal
import yaml
from .config import Config
#: Filename of the index file inside each project memory dir.
MemoryType = Literal["user", "feedback", "project", "reference"]
_MEMORY_TYPES: tuple[MemoryType, ...] = ("user", "feedback", "project", "reference")
#: Filename of the index file inside each memory dir (project or global).
INDEX_FILENAME = "MEMORY.md"
#: Slug character set — kept conservative for filesystem portability.
@@ -34,13 +60,69 @@ _SLUG_RE = re.compile(r"[^a-z0-9_-]+")
#: Initial index body when bootstrapping a fresh memory directory.
_INITIAL_INDEX = """# Auto-memory
This file is an index of stored memories for this project. Each entry below
points to a sibling `*.md` file. Entries are auto-managed by `/remember` and
`/forget` slash commands — edit by hand if you need finer control.
This file is an index of stored memories. Each entry below points to a
sibling `*.md` file. Entries are auto-managed by `/remember` and `/forget`
slash commands — edit by hand if you need finer control.
## Entries
"""
#: Regexes used by `_scrub_secrets`. Each redacts a recognisable secret
#: shape: OpenRouter / Anthropic / OpenAI keys + bearer tokens + AWS keys.
_SECRET_PATTERNS: tuple[tuple[re.Pattern[str], str], ...] = (
(re.compile(r"sk-or-[A-Za-z0-9_-]{16,}"), "<redacted:openrouter-key>"),
(re.compile(r"sk-ant-[A-Za-z0-9_-]{16,}"), "<redacted:anthropic-key>"),
(re.compile(r"sk-[A-Za-z0-9_-]{20,}"), "<redacted:openai-key>"),
(re.compile(r"Bearer\s+[A-Za-z0-9._-]{16,}"), "<redacted:bearer-token>"),
(re.compile(r"AKIA[0-9A-Z]{16}"), "<redacted:aws-access-key>"),
)
@dataclass(frozen=True)
class MemoryEntry:
"""One stored memory. Parsed from a `<slug>.md` frontmatter + body."""
name: str
description: str
memory_type: MemoryType
content: str
file_path: Path
def _scrub_secrets(text: str) -> tuple[str, bool]:
"""Return ``(scrubbed_text, was_modified)``.
Iterates `_SECRET_PATTERNS` and replaces every match with a labelled
placeholder. Conservative: any pattern hit redacts the whole match.
"""
out = text
modified = False
for pat, placeholder in _SECRET_PATTERNS:
new = pat.sub(placeholder, out)
if new != out:
modified = True
out = new
return out, modified
def _infer_memory_type(content: str, explicit: MemoryType | None = None) -> MemoryType:
"""Deterministic keyword-based classifier (no LLM call).
Falls back to ``project`` when nothing matches. Designed to be cheap +
predictable — `/remember "fish shell"` always lands in ``user``,
`/remember "don't mock the database"` in ``feedback``, etc.
"""
if explicit is not None:
return explicit
text = content.lower()
if any(k in text for k in ("don't ", "dont ", "stop ", "never ", "no longer ", "instead of")):
return "feedback"
if any(k in text for k in ("i ", "i'm ", "i am ", "my ", "prefer", "fish shell", "user is")):
return "user"
if any(k in text for k in ("see http", "linear ", "github.com", "channel ", "dashboard")):
return "reference"
return "project"
def project_memory_dir(config: Config, project_key: str) -> Path:
"""Return the absolute directory path for this project's memory."""
@@ -49,6 +131,11 @@ def project_memory_dir(config: Config, project_key: str) -> Path:
return Path(config.data_dir) / "projects" / project_key / "memory"
def global_memory_dir(config: Config) -> Path:
"""Return the absolute directory path for the user's global memory."""
return Path(config.data_dir) / "global" / "memory"
def ensure_memory_initialized(memory_dir: Path) -> Path:
"""Create the memory directory + initial MEMORY.md if missing.
@@ -95,27 +182,43 @@ def _now_iso() -> str:
return datetime.now(UTC).isoformat(timespec="seconds")
@dataclass(frozen=True)
class WriteResult:
"""Outcome of `add_memory_entry`. Carries the file path + whether
secret-scrubbing kicked in (so the slash handler can warn the user)."""
path: Path
memory_type: MemoryType
scrubbed: bool
def add_memory_entry(
memory_dir: Path,
content: str,
*,
name: str | None = None,
) -> Path:
description: str | None = None,
memory_type: MemoryType | None = None,
) -> WriteResult:
"""Write a new memory file + append pointer to the index.
- ``name`` (optional): explicit slug. If omitted, derived from the first
line of ``content`` via :func:`_slugify`.
- File names collide → ``-2``, ``-3``, … suffix is appended until unique.
- ``name`` (optional): explicit slug. Default = slugified first line.
- ``description`` (optional): one-line hook for the index pointer.
Default = first line of content (no leading ``#``).
- ``memory_type`` (optional): override the heuristic classifier.
Returns the absolute path to the newly written file. Raises
``ValueError`` for empty content.
Secret-shaped substrings (OpenRouter/Anthropic/OpenAI keys, AWS access
keys, bearer tokens) are redacted via :func:`_scrub_secrets` before
write — the ``WriteResult.scrubbed`` flag tells the caller to warn the
user. Empty/whitespace content raises ``ValueError``.
"""
if not content or not content.strip():
raise ValueError("memory content must be non-empty")
ensure_memory_initialized(memory_dir)
safe_content, scrubbed = _scrub_secrets(content.strip())
first_line = content.strip().splitlines()[0]
first_line = safe_content.splitlines()[0]
slug_base = _slugify(name or first_line)
candidate = memory_dir / f"{slug_base}.md"
n = 2
@@ -123,20 +226,82 @@ def add_memory_entry(
candidate = memory_dir / f"{slug_base}-{n}.md"
n += 1
# File body: short frontmatter + content. The frontmatter is informational
# for human readers; the deepagents middleware does not parse it.
body = f"---\nslug: {candidate.stem}\ncreated: {_now_iso()}\n---\n\n{content.strip()}\n"
inferred_type = _infer_memory_type(safe_content, memory_type)
hook = (description or first_line.strip().lstrip("# ").strip())[:120] or candidate.stem
body = (
f"---\n"
f"name: {candidate.stem}\n"
f"description: {hook}\n"
f"type: {inferred_type}\n"
f"created: {_now_iso()}\n"
f"---\n\n"
f"{safe_content}\n"
)
candidate.write_text(body, encoding="utf-8")
# Append a one-line pointer to the index — first line of content is the
# title, truncated to keep the index scannable.
title = first_line.strip().lstrip("# ").strip()[:80] or candidate.stem
pointer = f"- [{title}]({candidate.name}) — {_now_iso()}\n"
pointer = f"- [{hook}]({candidate.name}) — type:{inferred_type}\n"
index_path = memory_dir / INDEX_FILENAME
with index_path.open("a", encoding="utf-8") as f:
f.write(pointer)
return candidate
return WriteResult(path=candidate, memory_type=inferred_type, scrubbed=scrubbed)
def read_entry(file_path: Path) -> MemoryEntry | None:
"""Parse a single ``<slug>.md`` file into a :class:`MemoryEntry`.
Returns None for files with malformed/missing frontmatter — the caller
can decide whether to surface the issue. Falls back to ``project`` when
`type` is missing or unrecognised.
"""
if not file_path.is_file():
return None
try:
raw = file_path.read_text(encoding="utf-8")
except OSError:
return None
if not raw.startswith("---"):
return None
parts = raw.split("---", 2)
if len(parts) < 3:
return None
try:
meta = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError:
return None
if not isinstance(meta, dict):
return None
name = str(meta.get("name", file_path.stem)).strip()
description = str(meta.get("description", "")).strip() or "(no description)"
raw_type = str(meta.get("type", "project")).strip().lower()
mt: MemoryType = "project"
for known in _MEMORY_TYPES:
if raw_type == known:
mt = known
break
return MemoryEntry(
name=name,
description=description,
memory_type=mt,
content=parts[2].lstrip("\n"),
file_path=file_path,
)
def read_index_entries(memory_dir: Path) -> list[MemoryEntry]:
"""Return parsed :class:`MemoryEntry` for every `*.md` in the dir except
``MEMORY.md`` itself. Sorted by filename. Malformed files are skipped."""
if not memory_dir.is_dir():
return []
out: list[MemoryEntry] = []
for p in sorted(memory_dir.glob("*.md")):
if p.name == INDEX_FILENAME:
continue
entry = read_entry(p)
if entry is not None:
out.append(entry)
return out
def remove_memory_entry(memory_dir: Path, slug_or_filename: str) -> bool:

View File

@@ -56,6 +56,7 @@ class CostMiddleware(AgentMiddleware):
run_id=self.run_id,
persona_name=self.persona_name,
estimated_cost_usd=estimated,
session_id=self.interactive_session_id,
)
started = time.perf_counter()
try:
@@ -104,6 +105,7 @@ class CostMiddleware(AgentMiddleware):
run_id=self.run_id,
persona_name=self.persona_name,
actual_cost_usd=actual,
session_id=self.interactive_session_id,
)
return response

View File

@@ -13,11 +13,13 @@ Implementation strategy:
("ok, I'll keep planning instead"). We do NOT raise — that would crash the
turn and the user would lose the partial response.
Blocked tools (matches Claude Code):
Blocked tools (matches Claude Code's ExitPlanMode-required tool set):
- ``write_file``, ``edit_file`` — fs mutation
- ``bash`` / ``execute`` / ``run_command`` / ``shell`` — shell exec
- ``task`` — sub-agent spawn (a sub-agent could bypass plan mode)
- ``write_todos`` is allowed — plan mode IS planning, todos are the artifact
- ``write_todos`` — todos are PART of the plan markdown. Plan-mode
forbids commits to the agent's TODO list; the user reviews the plan
first, then /approve unlocks both writes and the TODO list.
"""
from __future__ import annotations
@@ -37,8 +39,14 @@ _SHELL_TOOLS: frozenset[str] = frozenset({"bash", "execute", "run_command", "she
#: Tool names that spawn sub-agents (which would bypass plan mode in the parent).
_SUBAGENT_TOOLS: frozenset[str] = frozenset({"task"})
#: Plan-mode forbids committing to a TODO list — todos are part of the
#: plan markdown that the user reviews before /approve.
_PLANNING_TOOLS: frozenset[str] = frozenset({"write_todos"})
#: Full blocklist applied while plan mode is on.
BLOCKED_TOOLS_IN_PLAN_MODE: frozenset[str] = _FS_WRITE_TOOLS | _SHELL_TOOLS | _SUBAGENT_TOOLS
BLOCKED_TOOLS_IN_PLAN_MODE: frozenset[str] = (
_FS_WRITE_TOOLS | _SHELL_TOOLS | _SUBAGENT_TOOLS | _PLANNING_TOOLS
)
def _block_message(tool_name: str) -> str:

View File

@@ -1,12 +1,11 @@
"""Agent Skills (v0.3 PR #4) — LLM-routed progressive disclosure.
Layout::
Layout (two scopes, mirrors Claude Code's `~/.claude/skills/` + repo overlay):
<config.data_dir>/skills/<skill-name>/SKILL.md
[optional supporting files]
<config.data_dir>/skills/<name>/SKILL.md # global / user
<config.data_dir>/projects/<project_key>/skills/<name>/SKILL.md # project
We mount this single directory as a source for ``deepagents.SkillsMiddleware``
which:
We mount both directories as sources for ``deepagents.SkillsMiddleware`` which:
1. Parses every ``SKILL.md`` YAML frontmatter (``name``, ``description``, …)
2. Injects an index of ``(name, description)`` pairs into the system prompt
@@ -14,15 +13,18 @@ which:
``read_file`` — no embeddings, no per-token vector lookup, no custom
routing logic. Anthropic's Agent Skills specification verbatim.
The skill name in the YAML frontmatter must match the parent directory name
(``deepagents`` enforces this) — e.g. a skill directory ``web-research/``
needs ``name: web-research`` inside its ``SKILL.md``.
Per ``deepagents.SkillsMiddleware`` semantics, later sources override earlier
ones at the same skill name — so project-scope wins over global-scope, which
matches the Claude Code precedence.
PR #4 keeps the surface area small: we mount one user-scope source and expose
``/skills`` (list) and ``/skill <name>`` (show full body for inspection)
slashes. Project-scope skills (``<repo>/.mydeepagent/skills/``) are NOT wired
in this PR — call sites can later layer them by passing additional sources
through ``build_agent(skills_sources_override=...)``.
The skill name in the YAML frontmatter must match the parent directory name.
PR #4 slashes:
- ``/skills``: list installed skills (project + global, with scope label)
- ``/skills show <name>``: REPL output only (inspection)
- ``/skill <name>``: inject the SKILL.md body as a one-shot system message
on the next ainvoke (the LLM treats it as an explicit "use this skill"
directive for this turn).
"""
from __future__ import annotations
@@ -46,22 +48,34 @@ _MAX_SKILL_READ_BYTES = 10 * 1024 * 1024
class SkillInfo:
"""Lightweight summary of one installed skill — used by `/skills` slash.
Fields are derived from the YAML frontmatter inside ``SKILL.md``:
- ``name``: directory name (also enforced inside frontmatter by deepagents)
- ``description``: 1-line summary, truncated if very long
- ``path``: absolute path of the ``SKILL.md`` for `/skill <name>` body display
- ``path``: absolute path of the ``SKILL.md`` for body display
- ``scope``: ``"project"`` (repo-local) or ``"global"`` (user-wide)
"""
name: str
description: str
path: Path
scope: str = "global"
def user_skills_dir(config: Config) -> Path:
"""Return the user-scope skills directory (``<data_dir>/skills``)."""
"""Return the global / user-wide skills directory (``<data_dir>/skills``)."""
return Path(config.data_dir) / "skills"
def project_skills_dir(config: Config, project_key: str) -> Path:
"""Return the project-scope skills directory.
Stored under ``<data_dir>/projects/<project_key>/skills/`` to keep all
project-scoped artefacts (memory, skills) under a single parent path.
"""
if not project_key:
raise ValueError("project_key must be non-empty")
return Path(config.data_dir) / "projects" / project_key / "skills"
def ensure_skills_initialized(skills_dir: Path) -> None:
"""Create the skills directory if missing.
@@ -116,7 +130,7 @@ def _parse_skill_md(path: Path) -> SkillInfo | None:
return SkillInfo(name=name, description=description, path=path)
def list_installed_skills(skills_dir: Path) -> list[SkillInfo]:
def list_installed_skills(skills_dir: Path, *, scope: str = "global") -> list[SkillInfo]:
"""Scan the directory for ``<name>/SKILL.md`` entries and return summaries.
- Sorted by name for deterministic UX
@@ -136,10 +150,33 @@ def list_installed_skills(skills_dir: Path) -> list[SkillInfo]:
continue
info = _parse_skill_md(skill_md)
if info is not None:
found.append(info)
found.append(
SkillInfo(name=info.name, description=info.description, path=info.path, scope=scope)
)
return found
def list_all_skills(config: Config, project_key: str) -> list[SkillInfo]:
"""Merged project + global skill list. Project wins on name collision."""
global_skills = list_installed_skills(user_skills_dir(config), scope="global")
project_skills = list_installed_skills(project_skills_dir(config, project_key), scope="project")
project_names = {s.name for s in project_skills}
merged = [s for s in global_skills if s.name not in project_names]
merged.extend(project_skills)
merged.sort(key=lambda s: s.name)
return merged
def find_skill(config: Config, project_key: str, name: str) -> SkillInfo | None:
"""Resolve a skill by name, preferring project-scope over global."""
if not name:
return None
for skill in list_all_skills(config, project_key):
if skill.name == name:
return skill
return None
def read_skill_body(skills_dir: Path, name: str) -> str | None:
"""Return the full SKILL.md content for the named skill, or None if missing.
@@ -160,11 +197,16 @@ def read_skill_body(skills_dir: Path, name: str) -> str | None:
return None
def resolve_skill_sources(config: Config) -> list[str]:
def resolve_skill_sources(config: Config, project_key: str | None = None) -> list[str]:
"""Build the list of skill-directory sources to pass to deepagents.
Currently a single-entry list (user-scope). Designed to be extended with
project-scope and team-scope sources in later PRs without changing the
caller interface.
Order: global first, then project. ``deepagents.SkillsMiddleware``
later-wins, so project skills override global ones at the same name.
Returns absolute paths. Project source is omitted when no
``project_key`` is supplied (e.g. workflow-engine call sites that don't
have a project context).
"""
return [str(user_skills_dir(config).resolve())]
sources = [str(user_skills_dir(config).resolve())]
if project_key:
sources.append(str(project_skills_dir(config, project_key).resolve()))
return sources

View File

@@ -1,34 +1,60 @@
"""Sub-agent session linkage (v0.3 PR #6) — fork a session into a child.
"""Sub-agent session linkage + runner (v0.3 PR #6).
PR #1 already added `parent_session_id` + `depth` columns to
`InteractiveSessionRow`. This module provides:
- :func:`spawn_subagent_session` — atomically creates a child row inheriting
``project_key`` (so memory is shared) and ``persona_id`` from the parent
unless overridden, sets ``parent_session_id`` + ``depth = parent.depth + 1``,
and rejects when depth would exceed :data:`MAX_SUBAGENT_DEPTH`.
- :func:`list_subagents` — return all direct children of a session for
``/agents`` and the Web GUI's session tree.
``project_key`` from the parent, sets ``parent_session_id`` + ``depth =
parent.depth + 1``, rejects when depth would exceed
:data:`MAX_SUBAGENT_DEPTH`.
- :func:`list_subagents` — direct children for ``/agents`` listings.
- :func:`resolve_root_session_id` — walk the parent chain to find the root.
- :func:`run_subagent_to_completion` — actually invoke a sub-agent's
``ainvoke`` with isolation + LangGraph thread + summary push to parent.
The deepagents ``task`` tool is *separate* from this concept — that tool
spawns langchain-internal sub-agents that run inline and return a string.
Those don't get InteractiveSessionRows. Use ``spawn_subagent_session`` when
you want a persisted, addressable session that the user can navigate to via
``mydeepagent --session <id>`` or the Web GUI.
Cost rollup: each sub-agent's CostMiddleware is wired with the ROOT session
id so all LLM calls under that session tree charge a single ``session:<uuid>``
scope — matches the plan ("sub-agent는 root session의 한도에 합산").
"""
from __future__ import annotations
import logging
from collections.abc import Sequence
from datetime import UTC, datetime
from typing import Any
from uuid import UUID, uuid4
from sqlalchemy import select
from sqlalchemy import desc, select
from .audit import make_audit_recorder
from .budget import make_budget_tracker_from_config
from .compaction import compact_session
from .config import Config
from .errors import MyDeepAgentError
from .memory import (
ensure_memory_initialized,
global_memory_dir,
list_memory_paths,
project_memory_dir,
)
from .middleware.audit import AuditToolMiddleware
from .middleware.cost import CostMiddleware
from .middleware.plan_mode import PlanModeMiddleware
from .monitoring.pricing import ModelPrice, PricingCache
from .monitoring.token_budget import count_tokens
from .persistence.db import Database
from .persistence.models import AgentPersonaRow, InteractiveSessionRow
from .persistence.models import AgentPersonaRow, InteractiveSessionRow, MessageRow
from .persona import Persona
from .session import build_agent
from .skills import (
ensure_skills_initialized,
project_skills_dir,
resolve_skill_sources,
user_skills_dir,
)
_LOG = logging.getLogger(__name__)
#: Maximum sub-agent nesting depth. Above this we refuse to spawn — Claude
#: Code's `task` tool limits agent stacks to roughly 3 levels (Main → A → B)
@@ -147,3 +173,217 @@ async def list_subagents(db: Database, parent_session_id: UUID) -> list[Interact
.all()
)
return list(rows)
async def resolve_root_session_id(db: Database, session_id: UUID) -> UUID:
"""Walk ``parent_session_id`` until we reach a session with ``parent=None``.
Guarded against cycles (would only happen if depth column lied — protective
cap = 1 + MAX_SUBAGENT_DEPTH iterations). Returns the input id when the
session has no parent.
"""
current = str(session_id)
for _ in range(MAX_SUBAGENT_DEPTH + 2):
async with db.session() as s:
row = await s.get(InteractiveSessionRow, current)
if row is None:
return session_id
if row.parent_session_id is None:
return UUID(row.id)
current = row.parent_session_id
# Cycle detected — return the latest hop as a graceful fallback.
return UUID(current)
_SUBAGENT_SUMMARY_INSTRUCTION = (
"당신은 sub-agent 입니다. 사용자가 요청한 과제를 마치고 한 번의 응답 안에 "
"(1) 도달한 결론, (2) 변경한 파일/생성한 산출물, (3) 부모 세션에 전달할 핵심 "
"요약 (≤ 400 단어) 을 정리하세요. 추가 turn 은 일어나지 않습니다."
)
def _static_pricing_seed() -> PricingCache:
cache = PricingCache()
cache.set(
[
ModelPrice("anthropic/claude-sonnet-4-6", 0.003, 0.015, 200_000),
ModelPrice("anthropic/claude-haiku-4-5", 0.001, 0.005, 200_000),
ModelPrice("anthropic/claude-opus-4-1", 0.015, 0.075, 200_000),
ModelPrice("deepseek/deepseek-chat", 0.00028, 0.00112, 64_000),
]
)
return cache
def _flatten_assistant_content(msg: Any) -> str:
content = getattr(msg, "content", "") or ""
if isinstance(content, list):
return "\n".join(
(b.get("text", str(b)) if isinstance(b, dict) else str(b)) for b in content
)
return str(content)
async def _persist_message(
db: Database, session_id: UUID, role: str, content: str, *, model: str
) -> None:
"""Insert one MessageRow + bump last_message_at + token totals.
Mirrors the REPL's ``_append_message`` but lives in subagents.py so the
runner stays self-contained.
"""
token_count = count_tokens(content, model)
now = datetime.now(UTC).isoformat(timespec="seconds")
async with db.session() as s:
last_seq = (
await s.execute(
select(MessageRow.seq)
.where(MessageRow.session_id == str(session_id))
.order_by(desc(MessageRow.seq))
.limit(1)
)
).scalar_one_or_none() or 0
s.add(
MessageRow(
session_id=str(session_id),
seq=last_seq + 1,
role=role,
content=content,
tool_calls=None,
token_count=token_count,
is_summary=False,
archived=False,
ts=now,
)
)
row = await s.get(InteractiveSessionRow, str(session_id))
if row is not None:
row.last_message_at = now
if role == "user":
row.total_input_tokens += token_count
elif role == "assistant":
row.total_output_tokens += token_count
await s.commit()
async def run_subagent_to_completion(
db: Database,
config: Config,
parent_session_id: UUID,
sub_session_id: UUID,
persona: Persona,
prompt: str,
*,
saver: Any | None = None,
) -> str:
"""Invoke the sub-agent ONCE with the supplied prompt and return its summary.
- Loads the sub-session row to read ``project_key`` (inherited from parent)
- Resolves the root session id and wires CostMiddleware to charge that
single ``session:<root_uuid>`` scope so the whole agent tree shares one
budget envelope (per plan: "sub-agent는 root session의 한도에 합산").
- Persists user prompt + assistant summary to the sub-session.
- Pushes a "[sub-agent <id> result] …" system message to the parent so
the user sees the outcome in the main thread.
- Marks the sub-session ``ended`` on completion.
Failures are logged + propagated as a synthetic assistant message in the
sub-session, and an error system message in the parent.
"""
async with db.session() as s:
sub_row = await s.get(InteractiveSessionRow, str(sub_session_id))
if sub_row is None:
raise MyDeepAgentError.fatal(
"subagent_session_missing",
message=f"sub-agent session {sub_session_id} not found",
recovery_hint="call spawn_subagent_session before run_subagent_to_completion",
)
project_key = sub_row.project_key or ""
root_session_id = await resolve_root_session_id(db, parent_session_id)
# Bootstrap shared memory + skills dirs (idempotent).
if project_key:
ensure_memory_initialized(project_memory_dir(config, project_key))
ensure_skills_initialized(project_skills_dir(config, project_key))
ensure_memory_initialized(global_memory_dir(config))
ensure_skills_initialized(user_skills_dir(config))
pricing = _static_pricing_seed()
budget = make_budget_tracker_from_config(db, config)
cost_mw = CostMiddleware(
pricing=pricing,
model_name=persona.model,
interactive_session_id=root_session_id,
persona_name=persona.name,
budget_tracker=budget,
)
audit_mw = AuditToolMiddleware(
interactive_session_id=sub_session_id,
file_recorder=make_audit_recorder(config.state_dir),
)
plan_mw = PlanModeMiddleware(is_active=lambda: False)
memory_paths = list_memory_paths(global_memory_dir(config))
if project_key:
memory_paths += list_memory_paths(project_memory_dir(config, project_key))
skill_sources = resolve_skill_sources(config, project_key or None)
agent = build_agent(
persona,
config,
root_dir=config.workspace_root,
middleware=[plan_mw, cost_mw, audit_mw],
checkpointer=saver,
memory_paths_override=memory_paths,
skills_sources_override=skill_sources,
)
full_prompt = f"{prompt.strip()}\n\n---\n\n{_SUBAGENT_SUMMARY_INSTRUCTION}"
await _persist_message(db, sub_session_id, "user", full_prompt, model=persona.model)
thread_id = f"{sub_session_id}:0"
try:
result = await agent.ainvoke(
{"messages": [{"role": "user", "content": full_prompt}]},
config={"configurable": {"thread_id": thread_id}},
)
except Exception as e:
_LOG.exception("sub-agent ainvoke failed for session %s", sub_session_id)
error_msg = f"sub-agent failed: {type(e).__name__}: {e}"
await _persist_message(db, sub_session_id, "assistant", error_msg, model=persona.model)
await _persist_message(
db,
parent_session_id,
"system",
f"[sub-agent {str(sub_session_id)[:8]} error] {error_msg}",
model=persona.model,
)
await _mark_session_ended(db, sub_session_id)
return error_msg
messages = result.get("messages", []) if isinstance(result, dict) else []
summary = _flatten_assistant_content(messages[-1]) if messages else "(empty response)"
await _persist_message(db, sub_session_id, "assistant", summary, model=persona.model)
await _persist_message(
db,
parent_session_id,
"system",
f"[sub-agent {str(sub_session_id)[:8]} result]\n{summary}",
model=persona.model,
)
# Compact the sub-session if it grew too big (rare for single-turn but
# the helper is idempotent + cheap to call).
await compact_session(db, config, str(sub_session_id))
await _mark_session_ended(db, sub_session_id)
return summary
async def _mark_session_ended(db: Database, session_id: UUID) -> None:
async with db.session() as s:
row = await s.get(InteractiveSessionRow, str(session_id))
if row is not None and row.state != "ended":
row.state = "ended"
row.ended_at = datetime.now(UTC).isoformat(timespec="seconds")
await s.commit()