"""v0.3 PR #6 — Sub-agent session linkage tests. Covers: 1. `spawn_subagent_session` creates a child row with correct parent_session_id, depth = parent.depth + 1, inherited project_key. 2. Depth limit `MAX_SUBAGENT_DEPTH` rejects further spawns. 3. Spawn against ended/missing parent raises human_required errors. 4. `list_subagents` returns direct children in start-order, excludes grandchildren. 5. Persona upsert behaves correctly — same persona hash → same persona_id. """ from __future__ import annotations import uuid from collections.abc import AsyncIterator from datetime import UTC, datetime from pathlib import Path import pytest from my_deepagent.config import load_config from my_deepagent.errors import MyDeepAgentError from my_deepagent.persistence.db import Database from my_deepagent.persistence.models import ( AgentPersonaRow, InteractiveSessionRow, ) from my_deepagent.persona import Persona from my_deepagent.subagents import ( MAX_SUBAGENT_DEPTH, list_subagents, spawn_subagent_session, ) def _now() -> str: return datetime.now(UTC).isoformat(timespec="seconds") def _make_persona(name: str = "spec-writer") -> Persona: return Persona( name=name, version=1, backend="openrouter", model="openrouter:deepseek/deepseek-chat", provider_origin="CN/DeepSeek", capabilities=("spec_write",), max_risk_level="medium", system_prompt="System prompt — at least ten chars", deepagents_backend="state", ) @pytest.fixture async def db_with_root(tmp_path: Path) -> AsyncIterator[tuple[Database, str]]: """Database + one root InteractiveSessionRow with depth=0 + project_key='proj1234abcdef00'.""" db_url = f"sqlite+aiosqlite:///{tmp_path / 'subagent.sqlite3'}" db = Database(db_url) await db.init_schema() persona_id = str(uuid.uuid4()) root_id = str(uuid.uuid4()) async with db.session() as s: s.add( AgentPersonaRow( id=persona_id, name="default-interactive", version=1, hash="parent-hash", definition={"name": "default-interactive", "version": 1}, created_at=_now(), ) ) s.add( InteractiveSessionRow( id=root_id, persona_id=persona_id, persona_hash="parent-hash", started_at=_now(), last_message_at=_now(), state="active", total_input_tokens=0, total_output_tokens=0, model="openrouter:deepseek/deepseek-chat", project_key="proj1234abcdef00", title="root", plan_mode=False, parent_session_id=None, depth=0, ) ) await s.commit() try: yield (db, root_id) finally: await db.dispose() # --------------------------------------------------------------------------- # Happy path # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_spawn_creates_child_with_inherited_project_key( db_with_root: tuple[Database, str], ) -> None: db, root_id = db_with_root persona = _make_persona() child_id = await spawn_subagent_session( db, parent_session_id=uuid.UUID(root_id), persona=persona, initial_title="planner-1", ) async with db.session() as s: child = await s.get(InteractiveSessionRow, str(child_id)) assert child is not None assert child.parent_session_id == root_id assert child.depth == 1 assert child.project_key == "proj1234abcdef00" # inherited assert child.title == "planner-1" assert child.state == "active" assert child.plan_mode is False assert child.persona_hash == persona.compute_hash() @pytest.mark.asyncio async def test_spawn_two_children_depth_one_each( db_with_root: tuple[Database, str], ) -> None: db, root_id = db_with_root persona = _make_persona() child_a = await spawn_subagent_session( db, parent_session_id=uuid.UUID(root_id), persona=persona ) child_b = await spawn_subagent_session( db, parent_session_id=uuid.UUID(root_id), persona=persona ) async with db.session() as s: a = await s.get(InteractiveSessionRow, str(child_a)) b = await s.get(InteractiveSessionRow, str(child_b)) assert a is not None and b is not None assert a.depth == b.depth == 1 assert a.parent_session_id == b.parent_session_id == root_id # --------------------------------------------------------------------------- # Depth limit # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_spawn_rejects_beyond_max_depth(db_with_root: tuple[Database, str]) -> None: db, root_id = db_with_root persona = _make_persona() current = uuid.UUID(root_id) # Chain spawns down to MAX_SUBAGENT_DEPTH (root depth=0; spawn produces 1, 2, 3). for expected_depth in range(1, MAX_SUBAGENT_DEPTH + 1): new_child = await spawn_subagent_session(db, parent_session_id=current, persona=persona) async with db.session() as s: row = await s.get(InteractiveSessionRow, str(new_child)) assert row is not None assert row.depth == expected_depth current = new_child # Now `current` has depth=MAX_SUBAGENT_DEPTH (3) → spawn must reject. with pytest.raises(MyDeepAgentError) as exc_info: await spawn_subagent_session(db, parent_session_id=current, persona=persona) assert exc_info.value.code == "subagent_depth_exceeded" # --------------------------------------------------------------------------- # Invalid parent # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_spawn_missing_parent_raises(db_with_root: tuple[Database, str]) -> None: db, _root_id = db_with_root persona = _make_persona() bogus = uuid.uuid4() with pytest.raises(MyDeepAgentError) as exc_info: await spawn_subagent_session(db, parent_session_id=bogus, persona=persona) assert exc_info.value.code == "parent_session_missing" @pytest.mark.asyncio async def test_spawn_ended_parent_raises(db_with_root: tuple[Database, str]) -> None: db, root_id = db_with_root async with db.session() as s: row = await s.get(InteractiveSessionRow, root_id) assert row is not None row.state = "ended" await s.commit() persona = _make_persona() with pytest.raises(MyDeepAgentError) as exc_info: await spawn_subagent_session(db, parent_session_id=uuid.UUID(root_id), persona=persona) assert exc_info.value.code == "parent_session_ended" # --------------------------------------------------------------------------- # list_subagents # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_list_subagents_returns_direct_children_only( db_with_root: tuple[Database, str], ) -> None: db, root_id = db_with_root persona = _make_persona() # root → child_a → grandchild child_a = await spawn_subagent_session( db, parent_session_id=uuid.UUID(root_id), persona=persona ) child_b = await spawn_subagent_session( db, parent_session_id=uuid.UUID(root_id), persona=persona ) grandchild = await spawn_subagent_session(db, parent_session_id=child_a, persona=persona) direct = await list_subagents(db, uuid.UUID(root_id)) ids = [r.id for r in direct] assert str(child_a) in ids assert str(child_b) in ids assert str(grandchild) not in ids # depth-2 not in direct children assert len(direct) == 2 @pytest.mark.asyncio async def test_list_subagents_no_children_returns_empty( db_with_root: tuple[Database, str], ) -> None: db, root_id = db_with_root direct = await list_subagents(db, uuid.UUID(root_id)) assert direct == [] # --------------------------------------------------------------------------- # Persona upsert # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_spawn_reuses_persona_row_for_same_hash( db_with_root: tuple[Database, str], ) -> None: db, root_id = db_with_root persona = _make_persona("shared-persona") child_a = await spawn_subagent_session( db, parent_session_id=uuid.UUID(root_id), persona=persona ) child_b = await spawn_subagent_session( db, parent_session_id=uuid.UUID(root_id), persona=persona ) async with db.session() as s: a = await s.get(InteractiveSessionRow, str(child_a)) b = await s.get(InteractiveSessionRow, str(child_b)) assert a is not None and b is not None assert a.persona_id == b.persona_id assert a.persona_hash == b.persona_hash # No duplicate AgentPersonaRow. async with db.session() as s: cfg = load_config(workspace_root=Path.cwd(), data_dir=Path.cwd() / "data") # noqa: F841 from sqlalchemy import select rows = ( ( await s.execute( select(AgentPersonaRow).where(AgentPersonaRow.hash == persona.compute_hash()) ) ) .scalars() .all() ) assert len(rows) == 1