"""Unit tests for WorkflowEngine SIGTERM/SIGINT graceful shutdown handlers.""" from __future__ import annotations import asyncio import signal from pathlib import Path from typing import Any import pytest from my_deepagent.artifact_schema import ArtifactSchemaRegistry from my_deepagent.binding import BackendAvailability, PersonaConsentStore from my_deepagent.config import load_config from my_deepagent.engine import WorkflowEngine from my_deepagent.enums import Backend from my_deepagent.persistence.db import Database from my_deepagent.persona import load_personas_from_dir _DOCS = Path(__file__).resolve().parents[2] / "docs" / "schemas" _ARTIFACTS_ROOT = _DOCS / "artifacts" def _make_engine(tmp_path: Path) -> WorkflowEngine: cfg = load_config( workspace_root=tmp_path, data_dir=tmp_path / "data", database_url=f"sqlite+aiosqlite:///{tmp_path / 'test.sqlite3'}", ) personas = load_personas_from_dir(_DOCS / "personas") registry = ArtifactSchemaRegistry(roots=[_ARTIFACTS_ROOT]) consent_store = PersonaConsentStore(tmp_path / "consents.json") available_backends = BackendAvailability(available_backends=frozenset(Backend)) async def _dummy_approval(payload: dict[str, Any], gates: list[str]) -> Any: raise NotImplementedError("approval not used in signal tests") db = Database(cfg.database_url) return WorkflowEngine( db=db, config=cfg, persona_pool=personas, artifact_registry=registry, consent_store=consent_store, available_backends=available_backends, approval_callback=_dummy_approval, ) @pytest.mark.asyncio async def test_shutdown_requested_false_initially(tmp_path: Path) -> None: """Engine starts with shutdown_requested == False.""" engine = _make_engine(tmp_path) assert engine.shutdown_requested is False @pytest.mark.asyncio async def test_on_signal_sets_shutdown_event(tmp_path: Path) -> None: """Calling _on_signal directly sets shutdown_requested to True.""" engine = _make_engine(tmp_path) assert engine.shutdown_requested is False engine._on_signal(signal.SIGTERM) assert engine.shutdown_requested is True @pytest.mark.asyncio async def test_install_signal_handlers_registers_sigterm(tmp_path: Path) -> None: """install_signal_handlers registers a SIGTERM handler on the running loop.""" engine = _make_engine(tmp_path) async def _check() -> None: engine.install_signal_handlers() loop = asyncio.get_running_loop() # asyncio loop stores handlers in the private _signal_handlers dict (CPython impl). # We accept both: the private dict exists, or signal.getsignal returns our callable. # The private dict is preferred but may not exist on all platforms. handlers = getattr(loop, "_signal_handlers", {}) if handlers: assert signal.SIGTERM in handlers, "SIGTERM not registered in loop._signal_handlers" else: # Fallback: just verify shutdown_requested works when _on_signal is called. engine._on_signal(signal.SIGTERM) assert engine.shutdown_requested is True await _check() @pytest.mark.asyncio async def test_force_cancel_inflight_cancels_pending_tasks(tmp_path: Path) -> None: """_force_cancel_inflight cancels all tasks in _inflight_tasks that are not done.""" engine = _make_engine(tmp_path) async def _long_running() -> None: await asyncio.sleep(1000) task: asyncio.Task[None] = asyncio.create_task(_long_running()) engine._inflight_tasks.add(task) # Give the event loop a tick to start the task. await asyncio.sleep(0) assert not task.done() engine._force_cancel_inflight() # Give the event loop a tick to process the cancellation. await asyncio.sleep(0) assert task.cancelled() @pytest.mark.asyncio async def test_force_cancel_inflight_skips_done_tasks(tmp_path: Path) -> None: """_force_cancel_inflight does not call cancel() on already-done tasks.""" engine = _make_engine(tmp_path) async def _instant() -> str: return "done" task: asyncio.Task[str] = asyncio.create_task(_instant()) await asyncio.sleep(0) # let the task complete assert task.done() engine._inflight_tasks.add(task) # Should not raise; done tasks are skipped. engine._force_cancel_inflight() # Still done, not newly cancelled. assert task.done() assert not task.cancelled()