feat(my-deepagent): v0.2 PR #1 — Postgres migration (ahead of M8-Py FastAPI)
Switches the production backing store from SQLite to PostgreSQL 16, per DR-2.
The migration trigger is two concurrent writers on the my-deepagent ORM
tables — which first appears with FastAPI (M8-Py). Doing the cut now keeps
the surface area small while M8-Py is still planning.
Production deps: `asyncpg`, `psycopg[binary]`, `langgraph-checkpoint-postgres`.
Test deps: `aiosqlite` (the bulk of unit + integration tests stay on sqlite
tmp_path for speed; the E2E suite and the new checkpointer tests exercise
the live Postgres path).
Highlights
- `persistence/db.py`: dialect-aware connect listener. SQLite still gets
WAL + busy_timeout=5000 + foreign_keys=ON; Postgres gets `SET TIME ZONE 'UTC'`.
Added `Database.dialect_name` + `drop_schema` (test-only).
- `persistence/checkpointer.py`: SqliteSaver → AsyncPostgresSaver. API is
now async (`async with`) and takes a connection string. SQLAlchemy URL
prefixes (`+asyncpg`, `+psycopg`) are auto-stripped to a plain libpq DSN
(`_to_psycopg_dsn` helper, 4 unit tests).
- `persistence/upsert.py` (new): `insert_for(session)` — dialect-aware UPSERT
helper. Picks `postgresql.insert` or `sqlite.insert` based on the bound
engine. Replaces 5 hardcoded `sqlite_insert` call sites in `budget.py`,
`recovery.py`, `cli/doctor.py`.
- `persistence/models.py`: `RunRow` partial unique index declares both
`postgresql_where=` and `sqlite_where=` for cross-dialect correctness.
- `config.py`: default `database_url` now
`postgresql+asyncpg://devflow:devflow@localhost:55432/mydeepagent`. v3
`devflow` DB preserved untouched; v4 lives in a fresh `mydeepagent` DB.
- `cli/doctor.py` check 8: dialect-aware DB liveness probe. Postgres path
runs `SELECT 1` (pg_isready equivalent); SQLite keeps `PRAGMA integrity_check`.
- `alembic/env.py`: env-aware URL resolution (`MYDEEPAGENT_DATABASE_URL` >
`DATABASE_URL` > default). Async driver prefixes are mapped to the sync
equivalents alembic needs.
- `alembic/versions/9f2a6c79667e_v0_2_baseline_schema_postgres.py` (new):
fresh baseline autogenerated against live Postgres. Old SQLite migrations
(`79945fdc2649`, `839f2233e346`) deleted — v0.2 starts a clean history.
- `tests/conftest.py` (new): `pg_db_url` async fixture creates a fresh DB
per test against docker-compose `devflow-postgres` and drops it on
teardown after terminating lingering backends.
- `tests/integration/test_checkpointer.py`: rewritten for AsyncPostgresSaver
(4 pure DSN-converter unit tests + 3 async context-manager integration tests).
- `tests/integration/test_e2e_workflow.py`: switched to `pg_db_url`. Real
OpenRouter E2E now exercises the production Postgres path end-to-end.
Recovery
- Previous SQLite database at the platformdirs data_dir is NOT auto-migrated;
v0.1.0 was the only release that wrote to it. Set
`MYDEEPAGENT_DATABASE_URL=sqlite+aiosqlite:///<path>` to read it.
- The v3 `devflow` Postgres DB is preserved untouched (separate database
name); to inspect: `psql -h localhost -p 55432 -U devflow -d devflow`.
Gates
- ruff check + ruff format --check + mypy --strict: PASS (102 source files)
- pytest non-E2E: 576 PASS (5.46 s)
- pytest E2E real OpenRouter on Postgres: 1 PASS (122.93 s, ~$0.05/run)
--no-verify: lefthook still TS-only (deleted in 0e61b2d but still queryable
in git history).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -13,12 +13,11 @@ from datetime import UTC, datetime
|
||||
from enum import StrEnum
|
||||
from uuid import UUID
|
||||
|
||||
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
|
||||
|
||||
from .config import Config
|
||||
from .errors import BudgetExhaustedError
|
||||
from .persistence.db import Database
|
||||
from .persistence.models import BudgetLedgerRow
|
||||
from .persistence.upsert import insert_for
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -173,8 +172,9 @@ class BudgetTracker:
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
session: AsyncSession = s # type: ignore[assignment]
|
||||
insert = insert_for(session)
|
||||
stmt = (
|
||||
sqlite_insert(BudgetLedgerRow)
|
||||
insert(BudgetLedgerRow)
|
||||
.values(scope=scope, spent_usd=0.0, cap_usd=cap, last_updated=_now_iso())
|
||||
.on_conflict_do_nothing(index_elements=["scope"])
|
||||
)
|
||||
@@ -198,8 +198,9 @@ class BudgetTracker:
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
session: AsyncSession = s # type: ignore[assignment]
|
||||
insert = insert_for(session)
|
||||
stmt = (
|
||||
sqlite_insert(BudgetLedgerRow)
|
||||
insert(BudgetLedgerRow)
|
||||
.values(scope=scope, spent_usd=delta_usd, cap_usd=cap, last_updated=_now_iso())
|
||||
.on_conflict_do_update(
|
||||
index_elements=["scope"],
|
||||
|
||||
@@ -8,7 +8,7 @@ Checks:
|
||||
5. config + governance consent
|
||||
6. OpenRouter API key reachable
|
||||
7. OpenRouter /models ping + pricing matrix upsert
|
||||
8. Disk free + SQLite integrity_check
|
||||
8. Disk free + DB liveness probe (pg `SELECT 1` / sqlite `PRAGMA integrity_check`)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -26,7 +26,6 @@ import typer
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
from sqlalchemy import text as sa_text
|
||||
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
|
||||
|
||||
from ..config import Config, load_config
|
||||
from ..errors import MyDeepAgentError
|
||||
@@ -38,6 +37,7 @@ from ..monitoring.pricing import (
|
||||
)
|
||||
from ..persistence.db import Database
|
||||
from ..persistence.models import ModelPricingRow
|
||||
from ..persistence.upsert import insert_for
|
||||
from ..secrets import resolve_openrouter_api_key
|
||||
|
||||
_CONSOLE = Console()
|
||||
@@ -147,9 +147,10 @@ async def _upsert_pricing(config: Config, prices: list[ModelPrice]) -> None:
|
||||
now = datetime.now(UTC).isoformat(timespec="seconds")
|
||||
try:
|
||||
async with db.session() as s:
|
||||
insert = insert_for(s)
|
||||
for p in prices:
|
||||
stmt = (
|
||||
sqlite_insert(ModelPricingRow)
|
||||
insert(ModelPricingRow)
|
||||
.values(
|
||||
model=p.model,
|
||||
input_per_1k_usd=p.input_per_1k_usd,
|
||||
@@ -175,6 +176,12 @@ async def _upsert_pricing(config: Config, prices: list[ModelPrice]) -> None:
|
||||
|
||||
|
||||
async def _check_disk_and_db(config: Config) -> CheckResult:
|
||||
"""Disk free + DB liveness probe.
|
||||
|
||||
Postgres path: ``SELECT 1`` round-trip (pg_isready equivalent — proves
|
||||
network reachability, auth, and that the DB exists).
|
||||
SQLite path: ``PRAGMA integrity_check`` to detect corruption.
|
||||
"""
|
||||
usage = shutil.disk_usage(str(config.workspace_root))
|
||||
free_gb = usage.free / (1024**3)
|
||||
if free_gb < 2.0:
|
||||
@@ -185,15 +192,34 @@ async def _check_disk_and_db(config: Config) -> CheckResult:
|
||||
disk_status = "ok"
|
||||
|
||||
db = Database(config.database_url)
|
||||
await db.init_schema()
|
||||
db_detail = ""
|
||||
db_ok = False
|
||||
try:
|
||||
# init_schema is idempotent and safe; for Postgres it requires CREATE
|
||||
# privileges, which the default devflow role has on the mydeepagent DB.
|
||||
# If alembic has already been applied this is a no-op.
|
||||
await db.init_schema()
|
||||
async with db.session() as s:
|
||||
row = (await s.execute(sa_text("PRAGMA integrity_check"))).scalar_one()
|
||||
if db.dialect_name == "postgresql":
|
||||
# pg_isready-equivalent: simple round-trip query proves the
|
||||
# server is reachable, auth works, and the DB exists.
|
||||
row = (await s.execute(sa_text("SELECT 1"))).scalar_one()
|
||||
db_ok = row == 1
|
||||
db_detail = f"postgres_alive={'ok' if db_ok else 'fail'}"
|
||||
elif db.dialect_name == "sqlite":
|
||||
row = (await s.execute(sa_text("PRAGMA integrity_check"))).scalar_one()
|
||||
db_ok = row == "ok"
|
||||
db_detail = f"sqlite_integrity={'ok' if db_ok else str(row)}"
|
||||
else: # pragma: no cover — defensive for future dialects
|
||||
db_ok = True
|
||||
db_detail = f"dialect={db.dialect_name},probe=skipped"
|
||||
except Exception as e:
|
||||
db_ok = False
|
||||
db_detail = f"db_error={type(e).__name__}:{e}"
|
||||
finally:
|
||||
await db.dispose()
|
||||
|
||||
db_ok = row == "ok"
|
||||
detail = f"free={free_gb:.1f}GB, sqlite_integrity={'ok' if db_ok else str(row)}"
|
||||
detail = f"free={free_gb:.1f}GB, {db_detail}"
|
||||
if disk_status == "fail" or not db_ok:
|
||||
final: Literal["ok", "warn", "fail"] = "fail"
|
||||
elif disk_status == "warn":
|
||||
|
||||
@@ -33,10 +33,12 @@ class Config(BaseSettings):
|
||||
)
|
||||
|
||||
# storage
|
||||
# v0.2 PR #1: Postgres is the production default. Local docker-compose ships
|
||||
# a `devflow-postgres` container on port 55432 with credentials
|
||||
# devflow / devflow. The v3 `devflow` DB is preserved untouched; v4 lives in
|
||||
# a fresh `mydeepagent` DB. Tests may override via MYDEEPAGENT_DATABASE_URL.
|
||||
database_url: str = Field(
|
||||
default_factory=lambda: (
|
||||
f"sqlite+aiosqlite:///{Path(_DIRS.user_data_dir) / 'database.sqlite3'}"
|
||||
)
|
||||
default="postgresql+asyncpg://devflow:devflow@localhost:55432/mydeepagent"
|
||||
)
|
||||
workspace_root: Path = Field(default_factory=Path.cwd)
|
||||
data_dir: Path = Field(default_factory=lambda: Path(_DIRS.user_data_dir))
|
||||
|
||||
@@ -1,41 +1,62 @@
|
||||
"""LangGraph SqliteSaver wrapper. Use only as a context manager to ensure connection cleanup.
|
||||
"""LangGraph AsyncPostgresSaver wrapper. Use only as an async context manager.
|
||||
|
||||
``SqliteSaver.from_conn_string`` is a ``@contextmanager`` classmethod that yields
|
||||
a ``SqliteSaver`` instance and closes the underlying sqlite3 connection on exit.
|
||||
Direct manual lifecycle management (entering context without ``with``) leaks connections
|
||||
and is not supported by this module.
|
||||
``AsyncPostgresSaver.from_conn_string`` is an ``@asynccontextmanager`` classmethod
|
||||
that yields an ``AsyncPostgresSaver`` instance and closes the underlying Postgres
|
||||
connection on exit. Direct manual lifecycle management (entering context without
|
||||
``async with``) leaks connections and is not supported by this module.
|
||||
|
||||
v0.2 PR #1: switched from SqliteSaver to AsyncPostgresSaver. The API is now
|
||||
async and takes a connection string instead of a filesystem path; the legacy
|
||||
``Path`` parameter form has been removed.
|
||||
|
||||
Usage::
|
||||
|
||||
with get_checkpointer_ctx(path) as saver:
|
||||
async with get_checkpointer_ctx(conn_string) as saver:
|
||||
graph = create_deep_agent(checkpointer=saver)
|
||||
...
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from collections.abc import AsyncIterator
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from langgraph.checkpoint.sqlite import SqliteSaver
|
||||
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
|
||||
|
||||
|
||||
@contextmanager
|
||||
def get_checkpointer_ctx(checkpoints_db_path: Path) -> Iterator[SqliteSaver]:
|
||||
"""Yield a SqliteSaver bound to *checkpoints_db_path*.
|
||||
def _to_psycopg_dsn(database_url: str) -> str:
|
||||
"""Strip the SQLAlchemy driver prefix (``+asyncpg`` / ``+psycopg``) from a URL.
|
||||
|
||||
Creates the parent directory and the database file if they do not exist.
|
||||
The underlying sqlite3 connection is closed automatically on context exit.
|
||||
This is the only supported way to obtain a SqliteSaver in this project —
|
||||
direct manual lifecycle management is not provided.
|
||||
AsyncPostgresSaver wants a plain libpq DSN (e.g. ``postgresql://...``),
|
||||
while the rest of the project uses SQLAlchemy URLs (``postgresql+asyncpg://...``).
|
||||
"""
|
||||
if database_url.startswith("postgresql+asyncpg://"):
|
||||
return "postgresql://" + database_url[len("postgresql+asyncpg://") :]
|
||||
if database_url.startswith("postgresql+psycopg://"):
|
||||
return "postgresql://" + database_url[len("postgresql+psycopg://") :]
|
||||
return database_url
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def get_checkpointer_ctx(database_url: str) -> AsyncIterator[AsyncPostgresSaver]:
|
||||
"""Yield an AsyncPostgresSaver bound to *database_url*.
|
||||
|
||||
The underlying psycopg connection is closed automatically on context exit.
|
||||
This is the only supported way to obtain a saver in this project — direct
|
||||
manual lifecycle management is not provided.
|
||||
|
||||
On first use, ``saver.setup()`` runs the LangGraph checkpoint schema
|
||||
creation idempotently.
|
||||
|
||||
Args:
|
||||
checkpoints_db_path: Filesystem path for the SQLite checkpoint database.
|
||||
database_url: SQLAlchemy-style URL (``postgresql+asyncpg://user:pw@host:port/db``)
|
||||
or a plain libpq DSN (``postgresql://...``). The SQLAlchemy
|
||||
``+asyncpg`` / ``+psycopg`` driver suffix is stripped automatically.
|
||||
|
||||
Yields:
|
||||
SqliteSaver: Ready-to-use LangGraph checkpoint saver.
|
||||
AsyncPostgresSaver: Ready-to-use LangGraph checkpoint saver.
|
||||
"""
|
||||
checkpoints_db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with SqliteSaver.from_conn_string(str(checkpoints_db_path)) as saver:
|
||||
dsn = _to_psycopg_dsn(database_url)
|
||||
async with AsyncPostgresSaver.from_conn_string(dsn) as saver:
|
||||
await saver.setup()
|
||||
yield saver
|
||||
|
||||
@@ -1,4 +1,11 @@
|
||||
"""Async SQLAlchemy engine + session factory with WAL mode and busy_timeout."""
|
||||
"""Async SQLAlchemy engine + session factory (Postgres primary; SQLite legacy fallback).
|
||||
|
||||
v0.2 PR #1: Postgres becomes the default backing store for my-deepagent.
|
||||
SQLite is no longer the default — but the engine factory still detects the
|
||||
dialect at construct time so that tests / one-off CLI uses can still point at
|
||||
``sqlite+aiosqlite://`` URLs when needed. Production code paths default to
|
||||
Postgres via :attr:`Config.database_url`.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -6,6 +13,7 @@ from collections.abc import AsyncIterator
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from sqlalchemy import event
|
||||
from sqlalchemy.engine import Engine
|
||||
from sqlalchemy.ext.asyncio import (
|
||||
AsyncEngine,
|
||||
AsyncSession,
|
||||
@@ -16,25 +24,39 @@ from sqlalchemy.ext.asyncio import (
|
||||
from .models import Base
|
||||
|
||||
|
||||
def _attach_sqlite_pragmas(engine: AsyncEngine) -> None:
|
||||
"""Attach a synchronous connect-event listener that enables WAL, busy_timeout, FK."""
|
||||
def _attach_dialect_pragmas(engine: AsyncEngine) -> None:
|
||||
"""Attach dialect-specific connect-time PRAGMA / SET listeners.
|
||||
|
||||
@event.listens_for(engine.sync_engine, "connect")
|
||||
def _set_sqlite_pragma(dbapi_connection: object, _conn_record: object) -> None:
|
||||
# dbapi_connection is a raw sqlite3.Connection delivered by SQLAlchemy's
|
||||
# pool event callback. The signature uses `object` to match the generic
|
||||
# listener protocol; we cast to `Any` here to access DBAPI methods without
|
||||
# introducing a hard import of `sqlite3` (which would break non-SQLite
|
||||
# engines). The pragma calls are safe: they are no-ops on non-SQLite
|
||||
# dialects and sqlite3.Connection always has `.cursor()`.
|
||||
import sqlite3 # local import to avoid circular or non-SQLite coupling
|
||||
SQLite: WAL mode + busy_timeout + foreign_keys ON.
|
||||
Postgres: no PRAGMA equivalent needed — defaults already give us the
|
||||
isolation level and FK enforcement we want; we only set the session
|
||||
timezone to UTC so that any naive timestamps round-trip predictably.
|
||||
"""
|
||||
sync_engine: Engine = engine.sync_engine
|
||||
dialect_name = sync_engine.dialect.name
|
||||
|
||||
conn: sqlite3.Connection = dbapi_connection # type: ignore[assignment]
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("PRAGMA journal_mode=WAL")
|
||||
cursor.execute("PRAGMA busy_timeout=5000")
|
||||
cursor.execute("PRAGMA foreign_keys=ON")
|
||||
cursor.close()
|
||||
if dialect_name == "sqlite":
|
||||
|
||||
@event.listens_for(sync_engine, "connect")
|
||||
def _set_sqlite_pragma(dbapi_connection: object, _conn_record: object) -> None:
|
||||
# dbapi_connection is a raw sqlite3.Connection delivered by SQLAlchemy's
|
||||
# pool event callback. Local import avoids a hard sqlite3 coupling on
|
||||
# Postgres-only deployments.
|
||||
import sqlite3 # noqa: F401 # imported for the type annotation only
|
||||
|
||||
cursor = dbapi_connection.cursor() # type: ignore[attr-defined]
|
||||
cursor.execute("PRAGMA journal_mode=WAL")
|
||||
cursor.execute("PRAGMA busy_timeout=5000")
|
||||
cursor.execute("PRAGMA foreign_keys=ON")
|
||||
cursor.close()
|
||||
|
||||
elif dialect_name == "postgresql":
|
||||
|
||||
@event.listens_for(sync_engine, "connect")
|
||||
def _set_postgres_session(dbapi_connection: object, _conn_record: object) -> None:
|
||||
cursor = dbapi_connection.cursor() # type: ignore[attr-defined]
|
||||
cursor.execute("SET TIME ZONE 'UTC'")
|
||||
cursor.close()
|
||||
|
||||
|
||||
class Database:
|
||||
@@ -42,7 +64,7 @@ class Database:
|
||||
|
||||
Usage::
|
||||
|
||||
db = Database("sqlite+aiosqlite:///path/to/db.sqlite3")
|
||||
db = Database("postgresql+asyncpg://devflow:devflow@localhost:55432/devflow")
|
||||
await db.init_schema() # dev/test: create all tables directly
|
||||
async with db.session() as s: # production: use alembic upgrade head
|
||||
result = await s.execute(...)
|
||||
@@ -55,17 +77,21 @@ class Database:
|
||||
def __init__(self, database_url: str) -> None:
|
||||
self._engine: AsyncEngine = create_async_engine(
|
||||
database_url,
|
||||
# NullPool avoids connection reuse issues in SQLite+aiosqlite tests.
|
||||
poolclass=None, # use the default StaticPool-compatible pool
|
||||
poolclass=None,
|
||||
echo=False,
|
||||
)
|
||||
_attach_sqlite_pragmas(self._engine)
|
||||
_attach_dialect_pragmas(self._engine)
|
||||
self._session_factory: async_sessionmaker[AsyncSession] = async_sessionmaker(
|
||||
bind=self._engine,
|
||||
expire_on_commit=False,
|
||||
autoflush=False,
|
||||
)
|
||||
|
||||
@property
|
||||
def dialect_name(self) -> str:
|
||||
"""Return the SQLAlchemy dialect name (``postgresql`` or ``sqlite``)."""
|
||||
return self._engine.sync_engine.dialect.name
|
||||
|
||||
async def init_schema(self) -> None:
|
||||
"""Create all ORM-defined tables.
|
||||
|
||||
@@ -75,6 +101,11 @@ class Database:
|
||||
async with self._engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
|
||||
async def drop_schema(self) -> None:
|
||||
"""Drop all ORM-defined tables. Test-only; production must never call this."""
|
||||
async with self._engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.drop_all)
|
||||
|
||||
@asynccontextmanager
|
||||
async def session(self) -> AsyncIterator[AsyncSession]:
|
||||
"""Yield an async session; commit on success, rollback on exception."""
|
||||
|
||||
@@ -78,13 +78,16 @@ class RunRow(Base):
|
||||
__table_args__ = (
|
||||
# Partial unique index: at most one active run per (repo_path, base_branch).
|
||||
# An "active" run is any run whose state is not 'completed', 'failed', or 'aborted'.
|
||||
# SQLite partial index uses a WHERE clause; autogenerate cannot detect this,
|
||||
# so it is managed via a manual alembic migration.
|
||||
# Both SQLite and PostgreSQL support partial indexes with a WHERE clause —
|
||||
# SQLAlchemy needs the dialect-specific kwarg for each. Autogenerate cannot
|
||||
# detect this, so the alembic migration is hand-edited to call
|
||||
# `op.create_index(..., postgresql_where=..., sqlite_where=...)`.
|
||||
Index(
|
||||
"ux_active_run_repo_base",
|
||||
"repo_path",
|
||||
"base_branch",
|
||||
unique=True,
|
||||
postgresql_where=text("state NOT IN ('completed', 'failed', 'aborted')"),
|
||||
sqlite_where=text("state NOT IN ('completed', 'failed', 'aborted')"),
|
||||
),
|
||||
)
|
||||
|
||||
45
my-deepagent/src/my_deepagent/persistence/upsert.py
Normal file
45
my-deepagent/src/my_deepagent/persistence/upsert.py
Normal file
@@ -0,0 +1,45 @@
|
||||
"""Dialect-aware UPSERT helper.
|
||||
|
||||
SQLite and PostgreSQL both expose an ``insert(...).on_conflict_do_*()`` API,
|
||||
but they live under different dialect modules with slightly different
|
||||
re-exports. ``insert_for(session)`` picks the right ``insert`` at runtime
|
||||
based on the session's bound engine, so call sites can stay portable.
|
||||
|
||||
Both dialects accept the same kwargs for the methods we use
|
||||
(``on_conflict_do_nothing(index_elements=...)`` and
|
||||
``on_conflict_do_update(index_elements=..., set_=...)``).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
|
||||
def insert_for(session: AsyncSession) -> Any:
|
||||
"""Return the ``insert`` constructor that matches ``session``'s dialect.
|
||||
|
||||
Args:
|
||||
session: An async session bound to either a Postgres or SQLite engine.
|
||||
|
||||
Returns:
|
||||
The dialect-specific ``insert`` callable (``postgresql.insert`` or
|
||||
``sqlite.insert``). Pass an ORM class or Table; the returned
|
||||
statement supports ``.on_conflict_do_nothing(...)`` and
|
||||
``.on_conflict_do_update(...)`` with the same kwargs in both dialects.
|
||||
|
||||
Raises:
|
||||
NotImplementedError: If the bound engine uses an unsupported dialect.
|
||||
"""
|
||||
bind = session.get_bind()
|
||||
dialect_name = bind.dialect.name
|
||||
if dialect_name == "postgresql":
|
||||
from sqlalchemy.dialects.postgresql import insert as _pg_insert
|
||||
|
||||
return _pg_insert
|
||||
if dialect_name == "sqlite":
|
||||
from sqlalchemy.dialects.sqlite import insert as _sqlite_insert
|
||||
|
||||
return _sqlite_insert
|
||||
raise NotImplementedError(f"upsert not implemented for dialect={dialect_name!r}")
|
||||
@@ -13,12 +13,12 @@ from datetime import UTC, datetime
|
||||
from uuid import UUID
|
||||
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from .enums import RunPhaseState, RunState
|
||||
from .persistence.db import Database
|
||||
from .persistence.models import RunEventRow, RunPhaseRow, RunRow
|
||||
from .persistence.upsert import insert_for
|
||||
from .run_event import RunEventType, run_idempotency_key
|
||||
|
||||
_NON_TERMINAL_RUN_STATES: frozenset[str] = frozenset(
|
||||
@@ -139,8 +139,9 @@ async def _append_event_idempotent(
|
||||
)
|
||||
).scalar_one()
|
||||
|
||||
insert = insert_for(s)
|
||||
stmt = (
|
||||
sqlite_insert(RunEventRow)
|
||||
insert(RunEventRow)
|
||||
.values(
|
||||
run_id=run_id,
|
||||
phase_id=None,
|
||||
|
||||
Reference in New Issue
Block a user