"""Persona binding algorithm: auto-select, override, capability/risk validation, consent gate.""" from __future__ import annotations import fcntl import json import os from collections.abc import Iterator from contextlib import contextmanager from dataclasses import dataclass from datetime import UTC, datetime from pathlib import Path from typing import Any, Literal, cast from .enums import Backend, RiskLevel from .errors import MyDeepAgentError from .hash import sha256 from .persona import Persona from .workflow import WorkflowRole, WorkflowTemplate ConsentDecision = Literal["approve", "block", "once"] _RISK_RANK: dict[RiskLevel, int] = { RiskLevel.LOW: 0, RiskLevel.MEDIUM: 1, RiskLevel.HIGH: 2, } @dataclass(frozen=True) class BackendAvailability: """Which backends are reachable in the current environment. v0.1.0: openrouter availability is determined solely by API-key presence. Other backends follow the same pattern — callers populate available_backends. """ available_backends: frozenset[Backend] def is_available(self, backend: Backend) -> bool: return backend in self.available_backends @dataclass(frozen=True) class BindingOverride: """Per-role persona override: role_id → "persona-name@version" spec string.""" persona_pinned: dict[str, str] @classmethod def parse(cls, raw: dict[str, str] | None) -> BindingOverride: return cls(persona_pinned=dict(raw or {})) @dataclass(frozen=True) class Binding: """Resolved binding of a single workflow role to a concrete persona.""" role_id: str persona: Persona binding_hash: str def is_persona_eligible_for_role( persona: Persona, role: WorkflowRole, template: WorkflowTemplate, ) -> tuple[bool, str | None]: """Return (eligible, reason_if_not). Checks three conditions in order: 1. The persona has all capabilities required by the role. 2. The persona's allowed_roles (if set) includes this role. 3. The persona's max_risk_level covers the highest phase risk for this role. """ required = set(role.required_capabilities) have = set(persona.capabilities) if not required.issubset(have): missing = required - have return False, f"missing capabilities: {sorted(c.value for c in missing)}" if persona.allowed_roles is not None and role.id not in persona.allowed_roles: return False, f"role {role.id!r} not in persona.allowed_roles" max_phase_risk = max( (ph.risk for ph in template.phases if ph.role == role.id), default=RiskLevel.LOW, ) if _RISK_RANK[max_phase_risk] > _RISK_RANK[persona.max_risk_level]: return ( False, ( f"phase risk {max_phase_risk.value} > " f"persona max_risk_level {persona.max_risk_level.value}" ), ) return True, None def _auto_select(candidates: list[Persona], role: WorkflowRole) -> Persona: """Deterministic selection from eligible candidates. Priority (ascending sort key): 1. preferred_backends index (lower = more preferred; non-preferred → last) 2. version descending (higher = newer) 3. name ascending (alphabetical tiebreak) 4. compute_hash ascending (hash tiebreak for identical name+version) """ def _key(p: Persona) -> tuple[int, int, str, str]: try: pref_idx = role.preferred_backends.index(p.backend) except ValueError: pref_idx = len(role.preferred_backends) + 1 return (pref_idx, -p.version, p.name, p.compute_hash()) return sorted(candidates, key=_key)[0] class PersonaConsentStore: """Crash-safe + multi-process-safe JSON file store for per-persona consent decisions. Storage: {path} -> {"": {"decision": "approve|block|once", "decided_at": "..."}} Concurrency guarantees: * Writes are atomic via tmp-file + fsync + os.replace (POSIX rename is atomic). * Cross-process safety via advisory ``fcntl.flock`` on a lock-file at ``{path}.lock``. ``set()`` / ``revoke()`` hold an exclusive lock for the read-modify-write cycle; ``get()`` uses a shared lock for consistent reads. This prevents lost-update races between concurrent ``mydeepagent`` invocations on the same machine. """ def __init__(self, path: Path) -> None: self._path = path self._lock_path = path.with_suffix(path.suffix + ".lock") @contextmanager def _flock(self, exclusive: bool) -> Iterator[None]: """Acquire a POSIX advisory lock for the duration of the block.""" self._lock_path.parent.mkdir(parents=True, exist_ok=True) fd = os.open(self._lock_path, os.O_RDWR | os.O_CREAT, 0o600) try: fcntl.flock(fd, fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH) try: yield finally: fcntl.flock(fd, fcntl.LOCK_UN) finally: os.close(fd) def _load(self) -> dict[str, Any]: if not self._path.is_file(): return {} try: raw = self._path.read_text(encoding="utf-8") data: object = json.loads(raw) if raw.strip() else {} except (OSError, json.JSONDecodeError) as e: raise MyDeepAgentError.fatal( "internal_state_corruption", message=f"failed to read consent store at {self._path}: {e}", recovery_hint=( f"delete {self._path} and re-run; " "previously granted consents will be re-prompted" ), cause=e, ) from e if not isinstance(data, dict): raise MyDeepAgentError.fatal( "internal_state_corruption", message=f"consent store must be a JSON object: {self._path}", ) return data def _write(self, data: dict[str, Any]) -> None: """Atomic crash-safe write. Caller must already hold the exclusive flock.""" self._path.parent.mkdir(parents=True, exist_ok=True) tmp = self._path.with_suffix(self._path.suffix + ".tmp") payload = json.dumps(data, indent=2, sort_keys=True, ensure_ascii=False) fd = os.open(tmp, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) try: os.write(fd, payload.encode("utf-8")) os.fsync(fd) finally: os.close(fd) os.replace(tmp, self._path) def get(self, persona_hash: str) -> ConsentDecision | None: """Return stored decision or None if absent / unrecognised.""" with self._flock(exclusive=False): entry = self._load().get(persona_hash) if entry is None: return None decision = entry.get("decision") if isinstance(entry, dict) else None if decision not in ("approve", "block", "once"): return None return cast(ConsentDecision, decision) def set(self, persona_hash: str, decision: ConsentDecision) -> None: """Persist a consent decision. Exclusive lock + atomic write.""" with self._flock(exclusive=True): data = self._load() data[persona_hash] = { "decision": decision, "decided_at": datetime.now(UTC).isoformat(timespec="seconds"), } self._write(data) def revoke(self, persona_hash: str) -> None: """Remove a previously stored consent decision. Exclusive lock. No-op if absent.""" with self._flock(exclusive=True): data = self._load() data.pop(persona_hash, None) self._write(data) def filter_consented_personas( personas: list[Persona], consent_store: PersonaConsentStore, ) -> list[Persona]: """Remove personas whose consent decision is 'block'. 'approve', 'once', and absent (None) decisions all allow the persona through. """ return [p for p in personas if consent_store.get(p.compute_hash()) != "block"] def _parse_override_version(pinned_spec: str, version_str: str) -> int | None: """Parse the version component of an override spec. None if empty, raise otherwise.""" if not version_str: return None try: return int(version_str) except ValueError as e: raise MyDeepAgentError.human_required( "no_eligible_persona", message=(f"override spec '{pinned_spec}' has non-integer version '{version_str}'"), recovery_hint="use the format '@'", cause=e, ) from e def _resolve_override( role: WorkflowRole, template: WorkflowTemplate, pinned_spec: str, eligible: list[Persona], persona_pool: list[Persona], consent_store: PersonaConsentStore, ) -> Persona: """Resolve an override spec to a single eligible persona or raise human_required.""" name, _, version_str = pinned_spec.partition("@") version = _parse_override_version(pinned_spec, version_str) matches = [p for p in eligible if p.name == name and (version is None or p.version == version)] if matches: return matches[0] if len(matches) == 1 else _auto_select(matches, role) # Distinguish: blocked vs. ineligible vs. simply absent. pool_matches = [ p for p in persona_pool if p.name == name and (version is None or p.version == version) ] if any(consent_store.get(p.compute_hash()) == "block" for p in pool_matches): raise MyDeepAgentError.human_required( "persona_blocked_by_user", message=f"override persona '{pinned_spec}' is consent-blocked", recovery_hint="run `mydeepagent consents revoke ` to clear the block", ) if pool_matches: _, reason = is_persona_eligible_for_role(pool_matches[0], role, template) raise MyDeepAgentError.human_required( "no_eligible_persona", message=( f"override persona '{pinned_spec}' is ineligible for role '{role.id}': {reason}" ), ) raise MyDeepAgentError.human_required( "no_eligible_persona", message=f"no eligible persona matches override '{pinned_spec}' for role '{role.id}'", ) def _resolve_auto( role: WorkflowRole, template: WorkflowTemplate, eligible: list[Persona], persona_pool: list[Persona], consent_store: PersonaConsentStore, ) -> Persona: """Auto-select from eligible or raise human_required with diagnostic context.""" if eligible: return _auto_select(eligible, role) any_blocked = any( is_persona_eligible_for_role(p, role, template)[0] and consent_store.get(p.compute_hash()) == "block" for p in persona_pool ) if any_blocked: raise MyDeepAgentError.human_required( "persona_blocked_by_user", message=(f"all eligible personas for role '{role.id}' are blocked by user consent"), ) raise MyDeepAgentError.human_required( "no_eligible_persona", message=f"no eligible persona for role '{role.id}'", recovery_hint=( f"add a persona with capabilities " f"{sorted(c.value for c in role.required_capabilities)} " "to docs/schemas/personas/" ), ) def bind_personas( template: WorkflowTemplate, persona_pool: list[Persona], available_backends: BackendAvailability, consent_store: PersonaConsentStore, override: BindingOverride | None = None, ) -> dict[str, Binding]: """Bind each workflow role to a concrete persona. Resolution order per role: 1. Apply consent filter (remove 'block' personas). 2. Apply eligibility filter (capabilities, allowed_roles, risk level). 3. If override is set for this role, pick the pinned persona from eligible. 4. Otherwise, auto_select from eligible. 5. Validate backend availability. 6. Validate openrouter model non-empty. Raises: MyDeepAgentError (human_required, 'no_eligible_persona') — no match found. MyDeepAgentError (human_required, 'persona_blocked_by_user') — all candidates blocked. MyDeepAgentError (human_required, 'backend_unavailable') — backend not in environment. MyDeepAgentError (human_required, 'model_unavailable') — openrouter model is blank. """ _override = override or BindingOverride.parse(None) consented_pool = filter_consented_personas(persona_pool, consent_store) bindings: dict[str, Binding] = {} for role in template.roles: eligible: list[Persona] = [ p for p in consented_pool if is_persona_eligible_for_role(p, role, template)[0] ] if role.id in _override.persona_pinned: chosen = _resolve_override( role, template, _override.persona_pinned[role.id], eligible, persona_pool, consent_store, ) else: chosen = _resolve_auto(role, template, eligible, persona_pool, consent_store) # Backend availability check if not available_backends.is_available(chosen.backend): raise MyDeepAgentError.human_required( "backend_unavailable", message=( f"backend '{chosen.backend.value}' is not available " f"for persona '{chosen.name}@{chosen.version}'" ), recovery_hint=_backend_recovery_hint(chosen.backend), ) # Openrouter model non-empty check if chosen.backend == Backend.OPENROUTER and not chosen.model.strip(): raise MyDeepAgentError.human_required( "model_unavailable", message=( f"persona '{chosen.name}@{chosen.version}' " "has empty model for openrouter backend" ), recovery_hint=( "set `model:` field in the persona yaml " "(e.g. 'openrouter:deepseek/deepseek-chat')" ), ) binding_hash = sha256( { "role_id": role.id, "template_name": template.name, "template_version": template.version, "persona_hash": chosen.compute_hash(), "backend": chosen.backend.value, } ) bindings[role.id] = Binding(role_id=role.id, persona=chosen, binding_hash=binding_hash) return bindings def _backend_recovery_hint(backend: Backend) -> str: if backend == Backend.OPENROUTER: return "run `mydeepagent login openrouter` to register an API key" if backend in (Backend.ANTHROPIC, Backend.OPENAI, Backend.GOOGLE): return f"run `mydeepagent login {backend.value}` to register an API key" if backend == Backend.FAKE: return ( "the 'fake' backend is for tests only; " "add Backend.FAKE to the BackendAvailability set in your test harness" ) return f"enable backend '{backend.value}' in config and ensure prerequisites"