feat(my-deepagent): v0.2 PR #3 — FastAPI + SSE + minimal Web GUI (mydeepagent serve)

Closes the "GUI 미존재" gap from the user's first-session requirements
(REPL + workflow + GUI). v0.2 PR #1's Postgres migration made a second
concurrent writer safe; v0.2 PR #2a/#2b wired durable resume; this commit
ships the HTTP + browser surface that uses them.

No auth, no multi-tenant, single uvicorn worker — per DR-3 boundaries.
v0.3+ will add auth, multi-worker fanout, LISTEN/NOTIFY SSE upgrade.

Backend
- `src/my_deepagent/api/`:
  - `app.py` create_app() factory. lifespan stores db/config/personas/
    workflows on app.state. CORS allow_origin_regex http://localhost(:port)?.
    /static mount + /, /{page}.html for the HTML frontend.
  - `models.py` — pydantic v2 DTOs (extra="forbid") for every route. Auto
    OpenAPI/Swagger via FastAPI's response_model.
  - `deps.py` — get_db / get_config / get_personas / get_workflows.
  - `runner.py` — start_new_run / start_resume. Pre-allocates run_id via
    new `WorkflowEngine.run(pre_allocated_run_id=...)` so the route returns
    the id immediately while the engine runs in asyncio.create_task.
  - `sse.py` — 0.5 s poll over run_events.seq. Emits ServerSentEvent rows;
    sends `event: done` and HTTP-200-closes when run hits terminal.
  - `routes/{runs,personas,workflows,budget}.py`:
      GET  /api/runs              (list, ?limit + ?state)
      GET  /api/runs/{id}         (detail + phases + artifacts + events)
      POST /api/runs              (start; mock-able via runner.start_new_run)
      POST /api/runs/{id}/resume
      POST /api/runs/{id}/abort
      GET  /api/runs/{id}/events  (SSE; Last-Event-ID header + ?last_event_id)
      GET  /api/personas
      GET  /api/workflows
      GET  /api/budget

CLI
- `cli/serve.py` mydeepagent serve [--host 127.0.0.1] [--port 8000].
  Loud stderr warning if --host is not loopback (no auth = footgun).
  uvicorn.run(factory=True, workers=1).
- `cli/main.py` serve command registered.

Static frontend (vanilla HTML/JS/CSS, no build system)
- index.html — runs list + budget summary
- new.html — start-run form (workflow select, repo path, requirements,
  per-role persona override)
- run.html — run detail + live SSE event log + Resume/Abort buttons
- app.js — fetch + EventSource. XSS policy HARDCODED at file top:
  textContent only, innerHTML/insertAdjacentHTML/outerHTML forbidden.
- style.css — dark theme, single file.

Engine
- WorkflowEngine.run(... pre_allocated_run_id: UUID|None = None). None →
  uuid4() (existing behavior). Set → use that UUID. Backward compatible.

Tests
- tests/integration/test_api_read.py (5): list empty, get 404, personas
  seed count (12), workflows seed (>=3), budget empty.
- tests/integration/test_api_write.py (5): missing template 400, extra
  field 422, resume 404, abort 404, mock-runner happy path.
- tests/integration/test_api_sse.py (1): seed terminal run + 3 events,
  drain stream, assert types present + stream closes within 3 s.
- tests/integration/test_api_static.py (5): index/new/run HTML 200,
  app.js content-type + XSS-policy substring assertion, style.css
  content-type.
- All fixtures use httpx ASGITransport + app.router.lifespan_context
  (httpx does NOT auto-trigger FastAPI lifespan) + sqlite tmp_path.

Gates
- ruff check + ruff format --check + mypy --strict: PASS (120 source files)
- pytest non-E2E: 603 PASS (12.15 s) — +16 from new API tests
- pytest E2E real OpenRouter on Postgres: PASS 60.44 s (baseline 71–122 s
  range; well within DR-3 acceptance threshold ≤+20%)

Manual browser verification deferred to a follow-up (docker compose up,
mydeepagent serve, open http://localhost:8000).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
chungyeong
2026-05-16 22:25:15 +09:00
parent 501292a5cd
commit 0630142c34
27 changed files with 2369 additions and 21 deletions

View File

@@ -0,0 +1,9 @@
"""FastAPI HTTP surface for my-deepagent (v0.2 PR #3).
Single-user, localhost-only Web GUI backend. No auth, no multi-tenant.
See plan §1.6 + DR-3. Top-level entry: :func:`my_deepagent.api.app.create_app`.
"""
from .app import create_app
__all__ = ["create_app"]

View File

@@ -0,0 +1,113 @@
"""FastAPI application factory for v0.2 PR #3.
Single uvicorn worker. localhost-only by default (see cli/serve.py for the
`--host` warning). No auth. CORS restricted to `http://localhost:<any port>`.
"""
from __future__ import annotations
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from starlette.responses import FileResponse
from ..config import Config, load_config
from ..persistence.db import Database
from ..persona import load_personas_from_dir
from ..workflow import WorkflowTemplate, load_workflow_yaml
from .routes import budget as budget_routes
from .routes import personas as personas_routes
from .routes import runs as runs_routes
from .routes import workflows as workflows_routes
_DOCS_SCHEMAS = Path(__file__).resolve().parents[3] / "docs" / "schemas"
_STATIC_ROOT = Path(__file__).resolve().parents[3] / "static"
_LOG = logging.getLogger(__name__)
def _load_seed_workflows() -> list[tuple[Path, WorkflowTemplate]]:
"""Return (path, WorkflowTemplate) for every YAML in docs/schemas/workflows/.
Malformed YAMLs are logged and skipped — the API should still come up
cleanly even if one seed is broken.
"""
wf_dir = _DOCS_SCHEMAS / "workflows"
if not wf_dir.is_dir():
return []
out: list[tuple[Path, WorkflowTemplate]] = []
for p in sorted(wf_dir.glob("*.yaml")):
try:
tpl = load_workflow_yaml(p)
except Exception as e:
_LOG.warning("skipping malformed workflow yaml %s: %s", p, e)
continue
out.append((p, tpl))
return out
@asynccontextmanager
async def _lifespan(app: FastAPI) -> AsyncIterator[None]:
"""Initialize the shared Database, personas, workflows on startup; dispose on shutdown."""
config: Config = app.state.config or load_config()
db = Database(config.database_url)
# init_schema is a no-op against an already-migrated DB; cheap to call.
await db.init_schema()
app.state.config = config
app.state.db = db
app.state.personas = load_personas_from_dir(_DOCS_SCHEMAS / "personas")
app.state.workflows = _load_seed_workflows()
try:
yield
finally:
await db.dispose()
def create_app(config: Config | None = None) -> FastAPI:
"""Build the FastAPI app. `config` defaults to `load_config()`.
`mydeepagent serve` calls this and hands it to uvicorn. Tests can also
call this with a custom Config + use httpx ASGI transport.
"""
app = FastAPI(
title="my-deepagent",
version="0.2.0",
description="Single-user local Web GUI for the my-deepagent CLI.",
lifespan=_lifespan,
)
app.state.config = config
app.add_middleware(
CORSMiddleware,
allow_origin_regex=r"^http://localhost(:\d+)?$",
allow_credentials=False,
allow_methods=["GET", "POST", "DELETE", "OPTIONS"],
allow_headers=["*"],
)
# API routes
app.include_router(runs_routes.router, prefix="/api/runs", tags=["runs"])
app.include_router(personas_routes.router, prefix="/api/personas", tags=["personas"])
app.include_router(workflows_routes.router, prefix="/api/workflows", tags=["workflows"])
app.include_router(budget_routes.router, prefix="/api/budget", tags=["budget"])
# Static frontend (built later in D3). Optional — if static/ is missing, skip.
if _STATIC_ROOT.is_dir():
app.mount("/static", StaticFiles(directory=str(_STATIC_ROOT)), name="static")
@app.get("/", include_in_schema=False)
async def _root() -> FileResponse:
return FileResponse(str(_STATIC_ROOT / "index.html"))
@app.get("/{page}.html", include_in_schema=False)
async def _static_page(page: str) -> FileResponse:
# Only serve known pages — others 404 via FileResponse missing.
target = _STATIC_ROOT / f"{page}.html"
return FileResponse(str(target))
return app

View File

@@ -0,0 +1,46 @@
"""Shared FastAPI dependencies.
Pulls singletons stashed in `app.state` by the lifespan handler. Database is
created ONCE per uvicorn process; per-request creation would defeat
connection pooling.
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from fastapi import Request
from ..config import Config
from ..persistence.db import Database
if TYPE_CHECKING:
from ..persona import Persona
from ..workflow import WorkflowTemplate
_DOCS_SCHEMAS = Path(__file__).resolve().parents[3] / "docs" / "schemas"
def get_db(request: Request) -> Database:
"""Return the shared Database instance from app state."""
return request.app.state.db # type: ignore[no-any-return]
def get_config(request: Request) -> Config:
return request.app.state.config # type: ignore[no-any-return]
def get_personas(request: Request) -> list[Persona]:
return request.app.state.personas # type: ignore[no-any-return]
def get_workflows(request: Request) -> list[tuple[Path, WorkflowTemplate]]:
"""Return a list of (yaml_path, WorkflowTemplate) for all seed workflows."""
return request.app.state.workflows # type: ignore[no-any-return]
def seed_root() -> Path:
"""Filesystem root for `docs/schemas/` seed assets."""
return _DOCS_SCHEMAS

View File

@@ -0,0 +1,146 @@
"""pydantic v2 response models for the my-deepagent HTTP API.
These shapes are stable contracts the Web GUI depends on. Internal ORM models
in `my_deepagent.persistence.models` are NOT exposed directly; routes convert
ORM rows into these DTOs.
"""
from __future__ import annotations
from pydantic import BaseModel, ConfigDict, Field
class _Strict(BaseModel):
"""Base for API DTOs — extra=forbid to catch typos at deserialization time."""
model_config = ConfigDict(extra="forbid", frozen=True)
# ---------------------------------------------------------------------------
# /api/runs
# ---------------------------------------------------------------------------
class RunSummary(_Strict):
id: str
state: str
repo_path: str
base_branch: str
worktree_root: str
created_at: str
ended_at: str | None = None
final_report_path: str | None = None
class PhaseInfo(_Strict):
id: str
phase_key: str
seq: int
state: str
attempts: int
started_at: str | None = None
ended_at: str | None = None
class ArtifactInfo(_Strict):
id: str
phase_id: str
schema_id: str
path: str
valid: bool
created_at: str
class EventInfo(_Strict):
seq: int
phase_id: str | None = None
type: str
ts: str
payload: dict[str, object] | None = None
class RunDetail(_Strict):
run: RunSummary
phases: list[PhaseInfo]
artifacts: list[ArtifactInfo]
events: list[EventInfo]
# ---------------------------------------------------------------------------
# /api/runs POST body
# ---------------------------------------------------------------------------
class StartRunRequest(BaseModel):
"""User-submitted body for POST /api/runs (NOT frozen — input not response)."""
model_config = ConfigDict(extra="forbid")
template_path: str = Field(min_length=1)
repo_path: str = Field(min_length=1)
base_branch: str = "main"
requirements_md: str = ""
override: dict[str, str] | None = None
class StartRunResponse(_Strict):
run_id: str
state: str
message: str = "started"
# ---------------------------------------------------------------------------
# /api/personas
# ---------------------------------------------------------------------------
class PersonaSummary(_Strict):
name: str
version: int
description: str | None = None
model: str
capabilities: list[str]
max_risk_level: str
# ---------------------------------------------------------------------------
# /api/workflows
# ---------------------------------------------------------------------------
class WorkflowRoleSummary(_Strict):
id: str
required_capabilities: list[str]
class WorkflowPhaseSummary(_Strict):
key: str
title: str
risk: str
role: str
class WorkflowSummary(_Strict):
path: str # relative path under docs/schemas/workflows/
name: str
version: int
description: str | None = None
roles: list[WorkflowRoleSummary]
phases: list[WorkflowPhaseSummary]
# ---------------------------------------------------------------------------
# /api/budget
# ---------------------------------------------------------------------------
class BudgetScopeEntry(_Strict):
scope: str
spent_usd: float
cap_usd: float | None
warn_usd: float | None = None
class BudgetSummary(_Strict):
day: BudgetScopeEntry | None
runs: list[BudgetScopeEntry]
personas: list[BudgetScopeEntry]

View File

@@ -0,0 +1 @@
"""Per-resource FastAPI route modules. Mounted from `app.create_app`."""

View File

@@ -0,0 +1,64 @@
"""GET /api/budget — current budget ledger summary."""
from __future__ import annotations
from typing import Annotated
from fastapi import APIRouter, Depends
from sqlalchemy import select
from ...config import Config
from ...persistence.db import Database
from ...persistence.models import BudgetLedgerRow
from ..deps import get_config, get_db
from ..models import BudgetScopeEntry, BudgetSummary
router = APIRouter()
DbDep = Annotated[Database, Depends(get_db)]
ConfigDep = Annotated[Config, Depends(get_config)]
def _scope_kind(scope: str) -> str:
"""Classify a ledger scope into 'day', 'run', or 'persona' bucket."""
if scope.startswith("day:"):
return "day"
if scope.startswith("persona:"):
return "persona"
if scope.startswith("run:"):
return "run"
return "other"
@router.get("", response_model=BudgetSummary)
async def get_budget_summary(db: DbDep, config: ConfigDep) -> BudgetSummary:
async with db.session() as s:
rows = (await s.execute(select(BudgetLedgerRow))).scalars().all()
day: BudgetScopeEntry | None = None
runs: list[BudgetScopeEntry] = []
personas: list[BudgetScopeEntry] = []
for r in rows:
kind = _scope_kind(r.scope)
warn_usd: float | None
if kind == "day":
warn_usd = config.budget_daily_warn_usd
elif kind == "run":
warn_usd = config.budget_run_warn_usd
else:
warn_usd = None
entry = BudgetScopeEntry(
scope=r.scope,
spent_usd=float(r.spent_usd),
cap_usd=float(r.cap_usd) if r.cap_usd is not None else None,
warn_usd=warn_usd,
)
if kind == "day":
day = entry
elif kind == "run":
runs.append(entry)
elif kind == "persona":
personas.append(entry)
return BudgetSummary(day=day, runs=runs, personas=personas)

View File

@@ -0,0 +1,30 @@
"""GET /api/personas — list seed personas."""
from __future__ import annotations
from typing import Annotated
from fastapi import APIRouter, Depends
from ...persona import Persona
from ..deps import get_personas
from ..models import PersonaSummary
router = APIRouter()
PersonasDep = Annotated[list[Persona], Depends(get_personas)]
@router.get("", response_model=list[PersonaSummary])
async def list_personas(personas: PersonasDep) -> list[PersonaSummary]:
return [
PersonaSummary(
name=p.name,
version=p.version,
description=p.description,
model=p.model,
capabilities=[c.value for c in p.capabilities],
max_risk_level=p.max_risk_level.value,
)
for p in personas
]

View File

@@ -0,0 +1,294 @@
"""GET/POST /api/runs — list, detail, start, resume, abort, SSE event stream."""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlalchemy import desc, select
from sse_starlette.sse import EventSourceResponse
from ...config import Config
from ...errors import MyDeepAgentError
from ...persistence.db import Database
from ...persistence.models import ArtifactRow, RunEventRow, RunPhaseRow, RunRow
from ...persona import Persona
from ...run_event import RunEventType
from .. import runner
from ..deps import get_config, get_db, get_personas, seed_root
from ..models import (
ArtifactInfo,
EventInfo,
PhaseInfo,
RunDetail,
RunSummary,
StartRunRequest,
StartRunResponse,
)
from ..sse import run_events_stream
_LOG = logging.getLogger(__name__)
router = APIRouter()
DbDep = Annotated[Database, Depends(get_db)]
ConfigDep = Annotated[Config, Depends(get_config)]
PersonasDep = Annotated[list[Persona], Depends(get_personas)]
def _row_to_summary(row: RunRow) -> RunSummary:
return RunSummary(
id=row.id,
state=row.state,
repo_path=row.repo_path,
base_branch=row.base_branch,
worktree_root=row.worktree_root,
created_at=row.created_at,
ended_at=row.ended_at,
final_report_path=row.final_report_path,
)
@router.get("", response_model=list[RunSummary])
async def list_runs(
db: DbDep,
limit: int = Query(default=50, ge=1, le=200),
state: str | None = Query(default=None),
) -> list[RunSummary]:
"""List the most recent runs (default 50). Optional `?state=` filter."""
async with db.session() as s:
stmt = select(RunRow).order_by(desc(RunRow.created_at)).limit(limit)
if state is not None:
stmt = stmt.where(RunRow.state == state)
rows = (await s.execute(stmt)).scalars().all()
return [_row_to_summary(r) for r in rows]
@router.get("/{run_id}", response_model=RunDetail)
async def get_run(run_id: str, db: DbDep) -> RunDetail:
"""Return a single run + its phases, artifacts, recent events."""
async with db.session() as s:
run = await s.get(RunRow, run_id)
if run is None:
raise HTTPException(status_code=404, detail=f"run {run_id} not found")
phases = (
(
await s.execute(
select(RunPhaseRow)
.where(RunPhaseRow.run_id == run_id)
.order_by(RunPhaseRow.seq)
)
)
.scalars()
.all()
)
artifacts = (
(await s.execute(select(ArtifactRow).where(ArtifactRow.run_id == run_id)))
.scalars()
.all()
)
events = (
(
await s.execute(
select(RunEventRow)
.where(RunEventRow.run_id == run_id)
.order_by(RunEventRow.seq.desc())
.limit(100)
)
)
.scalars()
.all()
)
return RunDetail(
run=_row_to_summary(run),
phases=[
PhaseInfo(
id=p.id,
phase_key=p.phase_key,
seq=p.seq,
state=p.state,
attempts=p.attempts,
started_at=p.started_at,
ended_at=p.ended_at,
)
for p in phases
],
artifacts=[
ArtifactInfo(
id=a.id,
phase_id=a.phase_id,
schema_id=a.schema_id,
path=a.path,
valid=a.valid,
created_at=a.created_at,
)
for a in artifacts
],
events=[
EventInfo(
seq=e.seq,
phase_id=e.phase_id,
type=e.type,
ts=e.ts,
payload=e.payload if isinstance(e.payload, dict) else None,
)
for e in reversed(events) # oldest first for display
],
)
# ---------------------------------------------------------------------------
# POST /api/runs — start a new run in the background
# ---------------------------------------------------------------------------
@router.post("", response_model=StartRunResponse)
async def start_run(
body: StartRunRequest,
db: DbDep,
config: ConfigDep,
personas: PersonasDep,
) -> StartRunResponse:
"""Kick off a new run; returns the run_id once persistence is in motion."""
template_path = Path(body.template_path)
if not template_path.is_absolute():
template_path = seed_root() / "workflows" / template_path.name
if not template_path.is_file():
raise HTTPException(
status_code=400, detail=f"workflow template not found: {body.template_path}"
)
try:
run_id = await runner.start_new_run(
db=db,
config=config,
personas=personas,
seed_root=seed_root(),
template_path=template_path,
repo_path=Path(body.repo_path),
base_branch=body.base_branch,
requirements_md=body.requirements_md,
override=body.override,
)
except MyDeepAgentError as e:
raise HTTPException(status_code=400, detail=f"{e.code}: {e}") from e
return StartRunResponse(run_id=str(run_id), state="executing", message="started")
# ---------------------------------------------------------------------------
# POST /api/runs/{run_id}/resume
# ---------------------------------------------------------------------------
@router.post("/{run_id}/resume", response_model=StartRunResponse)
async def resume_run(
run_id: str,
db: DbDep,
config: ConfigDep,
personas: PersonasDep,
) -> StartRunResponse:
"""Re-enter `engine.resume(run_id)` in the background."""
async with db.session() as s:
run = await s.get(RunRow, run_id)
if run is None:
raise HTTPException(status_code=404, detail=f"run {run_id} not found")
if run.state in ("completed", "failed", "aborted"):
raise HTTPException(
status_code=409,
detail=f"run {run_id} is already terminal ({run.state})",
)
try:
await runner.start_resume(
db=db,
config=config,
personas=personas,
seed_root=seed_root(),
run_id=UUID(run_id),
)
except MyDeepAgentError as e:
raise HTTPException(status_code=400, detail=f"{e.code}: {e}") from e
return StartRunResponse(run_id=run_id, state="executing", message="resuming")
# ---------------------------------------------------------------------------
# POST /api/runs/{run_id}/abort
# ---------------------------------------------------------------------------
@router.post("/{run_id}/abort", response_model=StartRunResponse)
async def abort_run(run_id: str, db: DbDep) -> StartRunResponse:
"""Force-mark a non-terminal run as aborted + emit a RUN_ABORTED event.
Does not actually cancel the background task (asyncio cancellation across
arbitrary engine code is unsafe). The next phase boundary picks up the
state change.
"""
async with db.session() as s:
run = await s.get(RunRow, run_id)
if run is None:
raise HTTPException(status_code=404, detail=f"run {run_id} not found")
if run.state in ("completed", "failed", "aborted"):
raise HTTPException(
status_code=409,
detail=f"run {run_id} is already terminal ({run.state})",
)
run.state = "aborted"
# Append a synthesized run.aborted event for the SSE stream.
next_seq = (
await s.execute(
select(RunEventRow.seq)
.where(RunEventRow.run_id == run_id)
.order_by(RunEventRow.seq.desc())
.limit(1)
)
).scalar_one_or_none() or 0
s.add(
RunEventRow(
run_id=run_id,
phase_id=None,
seq=int(next_seq) + 1,
type=RunEventType.RUN_ABORTED.value,
payload={"reason": "user_abort_via_api"},
idempotency_key=f"run.aborted:{run_id}:user_api",
ts=run.updated_at,
)
)
await s.commit()
return StartRunResponse(run_id=run_id, state="aborted", message="aborted")
# ---------------------------------------------------------------------------
# GET /api/runs/{run_id}/events — Server-Sent Events stream
# ---------------------------------------------------------------------------
@router.get("/{run_id}/events")
async def stream_events(
run_id: str,
request: Request,
db: DbDep,
last_event_id: int = Query(default=0, alias="last_event_id", ge=0),
) -> EventSourceResponse:
"""SSE stream of run_events. Closes when the run reaches a terminal state.
Honors `Last-Event-ID` HTTP header (standard EventSource reconnect) AND
the `?last_event_id=` query param as a fallback for clients that can't
set headers (vanilla `<a href>` opens).
"""
# Standard `Last-Event-ID` header takes priority over the query param.
header_val = request.headers.get("last-event-id")
if header_val:
try:
last_event_id = max(last_event_id, int(header_val))
except ValueError:
pass
async with db.session() as s:
run = await s.get(RunRow, run_id)
if run is None:
raise HTTPException(status_code=404, detail=f"run {run_id} not found")
return EventSourceResponse(run_events_stream(db, run_id, last_event_id=last_event_id))

View File

@@ -0,0 +1,52 @@
"""GET /api/workflows — list seed workflow templates."""
from __future__ import annotations
from pathlib import Path
from typing import Annotated
from fastapi import APIRouter, Depends
from ...workflow import WorkflowTemplate
from ..deps import get_workflows, seed_root
from ..models import WorkflowPhaseSummary, WorkflowRoleSummary, WorkflowSummary
router = APIRouter()
WorkflowsDep = Annotated[list[tuple[Path, WorkflowTemplate]], Depends(get_workflows)]
@router.get("", response_model=list[WorkflowSummary])
async def list_workflows(workflows: WorkflowsDep) -> list[WorkflowSummary]:
base = seed_root() / "workflows"
out: list[WorkflowSummary] = []
for path, tpl in workflows:
try:
rel = path.relative_to(base.parent)
except ValueError:
rel = path
out.append(
WorkflowSummary(
path=str(rel),
name=tpl.name,
version=tpl.version,
description=tpl.description,
roles=[
WorkflowRoleSummary(
id=r.id,
required_capabilities=[c.value for c in r.required_capabilities],
)
for r in tpl.roles
],
phases=[
WorkflowPhaseSummary(
key=ph.key,
title=ph.title,
risk=ph.risk.value,
role=ph.role,
)
for ph in tpl.phases
],
)
)
return out

View File

@@ -0,0 +1,140 @@
"""Background task runner for engine.run / engine.resume invocations.
v0.2 PR #3 scope: single uvicorn worker. The in-memory `_tasks` dict only makes
sense within one process; multi-worker fanout (Redis / NOTIFY) is v0.3 work.
Lifecycle:
- POST /api/runs pre-allocates a UUID, schedules `engine.run(pre_allocated_run_id=...)`
as `asyncio.create_task`, returns the UUID immediately. The task continues
until completion / abort / process shutdown.
- POST /api/runs/{id}/resume schedules `engine.resume(run_id)` the same way.
- If the uvicorn process dies mid-run, the next startup's `sweep_orphan_runs`
marks the non-terminal run as failed.
The `_tasks` dict's only purpose is to prevent garbage collection of in-flight
tasks (asyncio.create_task returns a reference but otherwise nothing pins it).
"""
from __future__ import annotations
import asyncio
import logging
from pathlib import Path
from typing import Any
from uuid import UUID, uuid4
from ..artifact_schema import ArtifactSchemaRegistry
from ..binding import BackendAvailability, BindingOverride, PersonaConsentStore
from ..budget import make_budget_tracker_from_config
from ..config import Config
from ..engine import WorkflowEngine
from ..enums import ApprovalDecisionAction, Backend
from ..persistence.db import Database
from ..persona import Persona
from ..workflow import load_workflow_yaml
_LOG = logging.getLogger(__name__)
# In-process registry of background run tasks, keyed by run_id.
_tasks: dict[UUID, asyncio.Task[Any]] = {}
async def _auto_approve(_payload: dict[str, object], _gates: list[str]) -> object:
"""GUI runs have no interactive prompt — auto-approve every gate.
Future: a /api/approvals route + websocket round-trip can replace this.
"""
return ApprovalDecisionAction.APPROVE
async def _build_engine(
db: Database,
config: Config,
personas: list[Persona],
seed_root: Path,
) -> WorkflowEngine:
registry = ArtifactSchemaRegistry(roots=[seed_root / "artifacts"])
consent = PersonaConsentStore(config.data_dir / "consents.json")
backends = BackendAvailability(available_backends=frozenset(Backend))
budget = make_budget_tracker_from_config(db, config)
await budget.init()
return WorkflowEngine(
db=db,
config=config,
persona_pool=personas,
artifact_registry=registry,
consent_store=consent,
available_backends=backends,
approval_callback=_auto_approve,
budget_tracker=budget,
)
async def start_new_run(
db: Database,
config: Config,
personas: list[Persona],
seed_root: Path,
template_path: Path,
repo_path: Path,
base_branch: str,
requirements_md: str,
override: dict[str, str] | None,
) -> UUID:
"""Schedule a new engine.run as a background task and return the run_id.
The run_id is pre-allocated here (uuid4) and passed to `engine.run` via
`pre_allocated_run_id`, so the route can return it before the phase loop
completes.
"""
template = load_workflow_yaml(template_path)
engine = await _build_engine(db, config, personas, seed_root)
run_id = uuid4()
async def _wrapped() -> None:
try:
await engine.run(
template,
repo_path=repo_path,
base_branch=base_branch,
requirements_md=requirements_md,
override=BindingOverride.parse(override) if override else None,
pre_allocated_run_id=run_id,
)
except Exception:
_LOG.exception("background run %s failed", run_id)
raise
finally:
_tasks.pop(run_id, None)
_tasks[run_id] = asyncio.create_task(_wrapped())
return run_id
async def start_resume(
db: Database,
config: Config,
personas: list[Persona],
seed_root: Path,
run_id: UUID,
) -> UUID:
"""Schedule engine.resume(run_id) as a background task."""
engine = await _build_engine(db, config, personas, seed_root)
async def _wrapped() -> None:
try:
await engine.resume(run_id)
except Exception:
_LOG.exception("background resume failed for run %s", run_id)
raise
finally:
_tasks.pop(run_id, None)
_tasks[run_id] = asyncio.create_task(_wrapped())
return run_id
def is_running(run_id: UUID) -> bool:
"""True if a background task for run_id is still in-flight."""
task = _tasks.get(run_id)
return task is not None and not task.done()

View File

@@ -0,0 +1,94 @@
"""SSE stream builder for `/api/runs/{id}/events`.
v0.2 PR #3 scope: poll-based stream against the `run_events` table.
v0.3 will upgrade to Postgres `LISTEN/NOTIFY` (ADR pending).
Pattern:
1. Client opens EventSource. Optional `?last_event_id=<int>` query param for
resume after a disconnect.
2. Backfill: SELECT events WHERE run_id=? AND seq > last_event_id ORDER BY seq.
3. Live tail: every `_POLL_INTERVAL_S` seconds, SELECT new events since the
last seen seq. Emit each as `data: <json>\\n\\n` with `id: <seq>`.
4. When the run reaches a terminal state (completed/failed/aborted), emit one
final `event: done` and close. HTTP 200 → EventSource will NOT reconnect.
Each event payload:
{
"seq": int,
"type": "run.started" | "phase.completed" | ... ,
"ts": "ISO",
"phase_id": str | null,
"payload": dict | null
}
"""
from __future__ import annotations
import asyncio
import json
from collections.abc import AsyncIterator
from sqlalchemy import select
from sse_starlette.event import ServerSentEvent
from ..persistence.db import Database
from ..persistence.models import RunEventRow, RunRow
_POLL_INTERVAL_S: float = 0.5
_TERMINAL_STATES: frozenset[str] = frozenset({"completed", "failed", "aborted"})
async def run_events_stream(
db: Database, run_id: str, last_event_id: int = 0
) -> AsyncIterator[ServerSentEvent]:
"""Yield ServerSentEvent objects for a single run until it reaches terminal state."""
last_seen = last_event_id
while True:
# Pull new events since last_seen.
async with db.session() as s:
rows = (
(
await s.execute(
select(RunEventRow)
.where(RunEventRow.run_id == run_id)
.where(RunEventRow.seq > last_seen)
.order_by(RunEventRow.seq)
)
)
.scalars()
.all()
)
for row in rows:
evt_data = {
"seq": row.seq,
"type": row.type,
"ts": row.ts,
"phase_id": row.phase_id,
"payload": row.payload if isinstance(row.payload, dict) else None,
}
yield ServerSentEvent(
data=json.dumps(evt_data, ensure_ascii=False),
event="event",
id=str(row.seq),
)
last_seen = row.seq
# Check whether the run is terminal — break only after draining
# the last batch of events.
async with db.session() as s:
run = await s.get(RunRow, run_id)
if run is None:
# The run was deleted out from under us; close gracefully.
yield ServerSentEvent(data="run-not-found", event="error")
break
if run.state in _TERMINAL_STATES:
yield ServerSentEvent(
data=json.dumps({"state": run.state}),
event="done",
id=str(last_seen),
)
break
await asyncio.sleep(_POLL_INTERVAL_S)

View File

@@ -123,6 +123,17 @@ def pricing() -> None:
pricing_command()
@app.command()
def serve(
host: str = typer.Option("127.0.0.1", help="Bind host (use 0.0.0.0 at your own risk)"),
port: int = typer.Option(8000, help="Bind port"),
) -> None:
"""Launch the FastAPI Web GUI backend on http://<host>:<port>."""
from .serve import serve_command
serve_command(host, port)
@app.callback(invoke_without_command=True)
def main(
ctx: typer.Context,

View File

@@ -0,0 +1,41 @@
"""`mydeepagent serve` — launch the FastAPI Web GUI backend via uvicorn.
v0.2 PR #3 scope: localhost only, single uvicorn worker. The `--host`
override is honored but `--host 127.0.0.1` is enforced as the default and a
loud stderr warning is printed for any non-loopback value (the API has no
auth — exposing it on a network interface is a footgun).
"""
from __future__ import annotations
import sys
import typer
import uvicorn
_LOOPBACK_HOSTS: frozenset[str] = frozenset({"127.0.0.1", "localhost", "::1"})
def serve_command(host: str, port: int) -> None:
if host not in _LOOPBACK_HOSTS:
# CLAUDE.md §10 — surface the safety issue loudly rather than silently bind.
msg = (
f"\n⚠️ mydeepagent serve is binding to {host!r} (not loopback).\n"
" The API has no authentication. Anyone with network access to this\n"
" port can read every run, start new runs, and read OpenRouter costs.\n"
" Use a reverse proxy with auth, or stick to 127.0.0.1 / localhost.\n"
)
typer.secho(msg, fg=typer.colors.YELLOW, err=True)
sys.stderr.flush()
# Use the factory form so uvicorn calls `create_app()` itself; this keeps
# config load + lifespan setup inside the uvicorn worker rather than the
# CLI process.
uvicorn.run(
"my_deepagent.api.app:create_app",
host=host,
port=port,
workers=1, # v0.2 single-worker assumption per plan.md §1.7 / DR-3
factory=True,
log_level="info",
)

View File

@@ -167,13 +167,17 @@ class WorkflowEngine:
base_branch: str = "main",
requirements_md: str = "",
override: BindingOverride | None = None,
pre_allocated_run_id: UUID | None = None,
) -> RunResult:
"""Start a brand-new run. Allocates a new `run_id`, binds personas, persists
skeleton metadata, and dispatches to the shared `_execute_run` phase loop.
For resuming an existing non-terminal run, use :meth:`resume` instead.
`pre_allocated_run_id` lets the FastAPI runner pick the UUID up-front
so the route can return it before the phase loop completes.
"""
run_id = uuid4()
run_id = pre_allocated_run_id if pre_allocated_run_id is not None else uuid4()
worktree_root = self._config.workspace_root / str(run_id)
worktree_root.mkdir(parents=True, exist_ok=True)
artifacts_dir = worktree_root / "artifacts"