diff --git a/packages/db/package.json b/packages/db/package.json index 2ca5f0c..2a86eaf 100644 --- a/packages/db/package.json +++ b/packages/db/package.json @@ -12,6 +12,7 @@ "test": "vitest run" }, "dependencies": { + "@devflow/core": "workspace:*", "drizzle-orm": "0.45.2", "pg": "8.20.0" } diff --git a/packages/db/src/index.ts b/packages/db/src/index.ts index d5323ad..be2c8b3 100644 --- a/packages/db/src/index.ts +++ b/packages/db/src/index.ts @@ -1,2 +1,3 @@ export { createDbClient, type DbClient } from "./client.js"; +export * from "./repositories/transcript.js"; export * from "./schema/index.js"; diff --git a/packages/db/src/repositories/transcript.test.ts b/packages/db/src/repositories/transcript.test.ts new file mode 100644 index 0000000..1d71348 --- /dev/null +++ b/packages/db/src/repositories/transcript.test.ts @@ -0,0 +1,166 @@ +import { randomUUID } from "node:crypto"; +import { and, eq, inArray } from "drizzle-orm"; +import { afterEach, describe, expect, it } from "vitest"; + +import { type DbClient, createDbClient } from "../client.js"; +import { runs, tuiSessions, tuiTranscriptChunks, workflowTemplates } from "../schema/index.js"; +import { TuiTranscriptRepository } from "./transcript.js"; + +const testDatabaseUrl = + process.env.DATABASE_URL ?? "postgres://devflow:devflow@127.0.0.1:55432/devflow"; + +describe("TuiTranscriptRepository", () => { + let client: DbClient | undefined; + const runIds: string[] = []; + const templateIds: string[] = []; + + afterEach(async () => { + if (client === undefined) { + return; + } + if (runIds.length > 0) { + await client.db.delete(runs).where(inArray(runs.id, [...runIds])); + } + if (templateIds.length > 0) { + await client.db + .delete(workflowTemplates) + .where(inArray(workflowTemplates.id, [...templateIds])); + } + runIds.length = 0; + templateIds.length = 0; + await client.close(); + client = undefined; + }); + + async function createSession() { + client = createDbClient(testDatabaseUrl); + const templateId = randomUUID(); + const runId = randomUUID(); + const sessionId = randomUUID(); + templateIds.push(templateId); + runIds.push(runId); + + await client.db.insert(workflowTemplates).values({ + id: templateId, + name: `template-${templateId}`, + version: 1, + hash: `hash-${templateId}`, + definition: {}, + }); + await client.db.insert(runs).values({ + id: runId, + templateId, + templateHash: `hash-${templateId}`, + state: "executing", + repoPath: `/tmp/devflow-${runId}`, + baseBranch: "main", + worktreeRoot: `/tmp/devflow-${runId}/main`, + }); + await client.db.insert(tuiSessions).values({ + id: sessionId, + runId, + roleId: "implementer", + backend: "fake", + cwd: `/tmp/devflow-${runId}/main`, + state: "READY", + }); + + return { db: client.db, sessionId }; + } + + it("appends transcript chunks idempotently and advances last_capture_seq", async () => { + const { db, sessionId } = await createSession(); + const repository = new TuiTranscriptRepository(db); + const firstAt = new Date("2026-05-09T00:00:00.000Z"); + const secondAt = new Date("2026-05-09T00:00:01.000Z"); + + await repository.append(sessionId, [ + { seq: 1n, content: "first", capturedAt: firstAt }, + { seq: 2n, content: "second", capturedAt: secondAt }, + ]); + await repository.append(sessionId, [ + { seq: 2n, content: "second", capturedAt: secondAt }, + { seq: 3n, content: "third", capturedAt: new Date("2026-05-09T00:00:02.000Z") }, + ]); + + const rows = await db + .select({ + seq: tuiTranscriptChunks.seq, + content: tuiTranscriptChunks.content, + }) + .from(tuiTranscriptChunks) + .where(eq(tuiTranscriptChunks.sessionId, sessionId)) + .orderBy(tuiTranscriptChunks.seq); + expect(rows).toEqual([ + { seq: 1n, content: "first" }, + { seq: 2n, content: "second" }, + { seq: 3n, content: "third" }, + ]); + + const [session] = await db + .select({ lastCaptureSeq: tuiSessions.lastCaptureSeq }) + .from(tuiSessions) + .where(eq(tuiSessions.id, sessionId)); + expect(session?.lastCaptureSeq).toBe(3n); + }); + + it("rejects conflicting content for an existing transcript sequence", async () => { + const { db, sessionId } = await createSession(); + const repository = new TuiTranscriptRepository(db); + const capturedAt = new Date("2026-05-09T00:00:00.000Z"); + + await repository.append(sessionId, [{ seq: 1n, content: "first", capturedAt }]); + + await expect( + repository.append(sessionId, [{ seq: 1n, content: "different", capturedAt }]), + ).rejects.toMatchObject({ code: "transcript_seq_conflict" }); + + const rows = await db + .select() + .from(tuiTranscriptChunks) + .where(and(eq(tuiTranscriptChunks.sessionId, sessionId), eq(tuiTranscriptChunks.seq, 1n))); + expect(rows).toHaveLength(1); + expect(rows?.[0]?.content).toBe("first"); + }); + + it("rejects sequence gaps before advancing last_capture_seq", async () => { + const { db, sessionId } = await createSession(); + const repository = new TuiTranscriptRepository(db); + + await expect( + repository.append(sessionId, [ + { seq: 2n, content: "second", capturedAt: new Date("2026-05-09T00:00:01.000Z") }, + ]), + ).rejects.toMatchObject({ code: "transcript_sequence_gap" }); + + const [session] = await db + .select({ lastCaptureSeq: tuiSessions.lastCaptureSeq }) + .from(tuiSessions) + .where(eq(tuiSessions.id, sessionId)); + expect(session?.lastCaptureSeq).toBe(0n); + }); + + it("surfaces concurrent conflicting content for the same transcript sequence", async () => { + const { db, sessionId } = await createSession(); + const firstRepository = new TuiTranscriptRepository(db); + const secondRepository = new TuiTranscriptRepository(db); + const capturedAt = new Date("2026-05-09T00:00:00.000Z"); + + const results = await Promise.allSettled([ + firstRepository.append(sessionId, [{ seq: 1n, content: "first", capturedAt }]), + secondRepository.append(sessionId, [{ seq: 1n, content: "different", capturedAt }]), + ]); + + expect(results.filter((result) => result.status === "fulfilled")).toHaveLength(1); + const rejected = results.find((result) => result.status === "rejected"); + expect(rejected).toMatchObject({ + reason: expect.objectContaining({ code: "transcript_seq_conflict" }), + }); + + const rows = await db + .select() + .from(tuiTranscriptChunks) + .where(eq(tuiTranscriptChunks.sessionId, sessionId)); + expect(rows).toHaveLength(1); + }); +}); diff --git a/packages/db/src/repositories/transcript.ts b/packages/db/src/repositories/transcript.ts new file mode 100644 index 0000000..06be016 --- /dev/null +++ b/packages/db/src/repositories/transcript.ts @@ -0,0 +1,139 @@ +import { DevflowError } from "@devflow/core"; +import { and, eq, inArray, sql } from "drizzle-orm"; + +import type { DbClient } from "../client.js"; +import { tuiSessions, tuiTranscriptChunks } from "../schema/index.js"; + +export interface TranscriptChunkInput { + seq: bigint; + content: string; + capturedAt: Date; +} + +export interface AppendTranscriptResult { + received: number; + inserted: number; + lastSeq: bigint | undefined; +} + +type Database = DbClient["db"]; + +export class TuiTranscriptRepository { + constructor(private readonly db: Database) {} + + async append( + sessionId: string, + chunks: readonly TranscriptChunkInput[], + ): Promise { + if (chunks.length === 0) { + return { received: 0, inserted: 0, lastSeq: undefined }; + } + + const normalized = normalizeChunks(chunks); + + return this.db.transaction(async (tx) => { + await tx.execute( + sql`SELECT pg_advisory_xact_lock(hashtext(${`devflow:tui-transcript:${sessionId}`}))`, + ); + + const [session] = await tx + .select({ lastCaptureSeq: tuiSessions.lastCaptureSeq }) + .from(tuiSessions) + .where(eq(tuiSessions.id, sessionId)); + if (session === undefined) { + throw new DevflowError("TUI session does not exist", { + class: "fatal", + code: "session_not_found", + }); + } + + const insertedRows = await tx + .insert(tuiTranscriptChunks) + .values( + normalized.map((chunk) => ({ + sessionId, + seq: chunk.seq, + content: chunk.content, + capturedAt: chunk.capturedAt, + })), + ) + .onConflictDoNothing({ + target: [tuiTranscriptChunks.sessionId, tuiTranscriptChunks.seq], + }) + .returning({ seq: tuiTranscriptChunks.seq }); + + const seqs = normalized.map((chunk) => chunk.seq); + const persistedRows = await tx + .select({ + seq: tuiTranscriptChunks.seq, + content: tuiTranscriptChunks.content, + }) + .from(tuiTranscriptChunks) + .where( + and(eq(tuiTranscriptChunks.sessionId, sessionId), inArray(tuiTranscriptChunks.seq, seqs)), + ); + const persistedBySeq = new Map(persistedRows.map((row) => [row.seq, row.content])); + + for (const chunk of normalized) { + const persisted = persistedBySeq.get(chunk.seq); + if (persisted !== chunk.content) { + throw new DevflowError("Transcript sequence already exists with different content", { + class: "fatal", + code: "transcript_seq_conflict", + }); + } + } + + const lastSeq = normalized.at(-1)?.seq; + const nextCaptureSeq = advanceContiguousCursor(session.lastCaptureSeq, normalized); + await tx + .update(tuiSessions) + .set({ + lastCaptureSeq: nextCaptureSeq, + }) + .where(eq(tuiSessions.id, sessionId)); + + return { received: chunks.length, inserted: insertedRows.length, lastSeq }; + }); + } +} + +function normalizeChunks(chunks: readonly TranscriptChunkInput[]): TranscriptChunkInput[] { + const bySeq = new Map(); + for (const chunk of chunks) { + if (chunk.seq <= 0n) { + throw new DevflowError("Transcript sequence must be positive", { + class: "fatal", + code: "transcript_sequence_invalid", + }); + } + + const existing = bySeq.get(chunk.seq); + if (existing !== undefined && existing.content !== chunk.content) { + throw new DevflowError("Duplicate transcript sequence has conflicting content", { + class: "fatal", + code: "transcript_seq_conflict", + }); + } + bySeq.set(chunk.seq, chunk); + } + + return [...bySeq.values()].sort((left, right) => Number(left.seq - right.seq)); +} + +function advanceContiguousCursor(current: bigint, chunks: readonly TranscriptChunkInput[]): bigint { + let cursor = current; + for (const chunk of chunks) { + if (chunk.seq <= cursor) { + continue; + } + if (chunk.seq !== cursor + 1n) { + throw new DevflowError("Transcript sequence cannot skip the capture cursor", { + class: "fatal", + code: "transcript_sequence_gap", + }); + } + cursor = chunk.seq; + } + return cursor; +} diff --git a/packages/session/package.json b/packages/session/package.json index d4dbc16..f29fc96 100644 --- a/packages/session/package.json +++ b/packages/session/package.json @@ -13,5 +13,8 @@ }, "dependencies": { "@devflow/core": "workspace:*" + }, + "devDependencies": { + "@devflow/db": "workspace:*" } } diff --git a/packages/session/src/index.ts b/packages/session/src/index.ts index 644f756..d8479ea 100644 --- a/packages/session/src/index.ts +++ b/packages/session/src/index.ts @@ -1,2 +1,3 @@ export * from "./adapter.js"; export * from "./fake.js"; +export * from "./transcript.js"; diff --git a/packages/session/src/transcript.test.ts b/packages/session/src/transcript.test.ts new file mode 100644 index 0000000..32e2bf6 --- /dev/null +++ b/packages/session/src/transcript.test.ts @@ -0,0 +1,154 @@ +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; + +import type { PromptEnvelope } from "@devflow/core"; +import type { TuiTranscriptRepository } from "@devflow/db"; + +import type { TranscriptChunk } from "./adapter.js"; +import { FakeSessionAdapter } from "./fake.js"; +import { captureAndPersistTranscript } from "./transcript.js"; + +const runId = "00000000-0000-4000-8000-000000000001"; + +function envelope(overrides: Partial = {}): PromptEnvelope { + return { + uuid: "00000000-0000-4000-8000-000000000010", + runId, + roleId: "implementer", + phaseKey: "implement", + attempt: 0, + expectedArtifact: join(mkdtempSync(join(tmpdir(), "devflow-transcript-artifact-")), "out.json"), + expectedSchema: "dev/spec@1", + dedupKey: "a".repeat(64), + instructions: "Scenario: timeout\nNo artifact needed", + ...overrides, + }; +} + +async function collectSink() { + const calls: Array<{ sessionId: string; chunks: TranscriptChunk[] }> = []; + return { + calls, + sink: { + async append(sessionId: string, chunks: readonly TranscriptChunk[]) { + calls.push({ sessionId, chunks: [...chunks] }); + }, + }, + }; +} + +describe("captureAndPersistTranscript", () => { + const tempRoots: string[] = []; + + afterEach(() => { + for (const root of tempRoots.splice(0)) { + rmSync(root, { recursive: true, force: true }); + } + }); + + it("captures adapter chunks after fromSeq and persists them to the sink", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-transcript-cwd-")); + tempRoots.push(cwd); + const adapter = new FakeSessionAdapter({ fixtureRoot: cwd, writeDelayMs: 0 }); + const handle = await adapter.start({ + runId, + roleId: "implementer", + backend: "fake", + cwd, + }); + await adapter.sendPrompt(handle, envelope()); + const { calls, sink } = await collectSink(); + + const result = await captureAndPersistTranscript({ + adapter, + handle, + fromSeq: 1n, + sink, + }); + + expect(result).toEqual({ captured: 1, lastSeq: 2n }); + expect(calls).toHaveLength(1); + expect(calls[0]).toMatchObject({ + sessionId: handle.sessionId, + chunks: [ + { + seq: 2n, + content: expect.stringContaining("timeout"), + }, + ], + }); + }); + + it("accepts the DB transcript repository as a sink contract", () => { + const repository = null as unknown as TuiTranscriptRepository; + const sink: Parameters[0]["sink"] = repository; + + expect(sink).toBe(repository); + }); + + it("does not call the sink when there are no new chunks", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-transcript-cwd-")); + tempRoots.push(cwd); + const adapter = new FakeSessionAdapter({ fixtureRoot: cwd, writeDelayMs: 0 }); + const handle = await adapter.start({ + runId, + roleId: "implementer", + backend: "fake", + cwd, + }); + const { calls, sink } = await collectSink(); + + const result = await captureAndPersistTranscript({ + adapter, + handle, + fromSeq: 1n, + sink, + }); + + expect(result).toEqual({ captured: 0, lastSeq: 1n }); + expect(calls).toEqual([]); + }); + + it("rejects non-monotonic adapter chunks before persistence", async () => { + const handle = { sessionId: "00000000-0000-4000-8000-000000000020" }; + const now = new Date("2026-05-09T00:00:00.000Z"); + const adapter = { + async *capture() { + yield { seq: 2n, content: "second", capturedAt: now }; + yield { seq: 2n, content: "duplicate", capturedAt: now }; + }, + }; + const { sink } = await collectSink(); + + await expect( + captureAndPersistTranscript({ + adapter, + handle, + fromSeq: 1n, + sink, + }), + ).rejects.toMatchObject({ code: "transcript_sequence_invalid" }); + }); + + it("rejects sequence gaps before advancing the capture watermark", async () => { + const handle = { sessionId: "00000000-0000-4000-8000-000000000021" }; + const now = new Date("2026-05-09T00:00:00.000Z"); + const adapter = { + async *capture() { + yield { seq: 3n, content: "third", capturedAt: now }; + }, + }; + const { sink } = await collectSink(); + + await expect( + captureAndPersistTranscript({ + adapter, + handle, + fromSeq: 1n, + sink, + }), + ).rejects.toMatchObject({ code: "transcript_sequence_gap" }); + }); +}); diff --git a/packages/session/src/transcript.ts b/packages/session/src/transcript.ts new file mode 100644 index 0000000..79a937a --- /dev/null +++ b/packages/session/src/transcript.ts @@ -0,0 +1,51 @@ +import { DevflowError } from "@devflow/core"; + +import type { SessionAdapter, SessionHandle, TranscriptChunk } from "./adapter.js"; + +export interface TranscriptChunkSink { + append(sessionId: string, chunks: readonly TranscriptChunk[]): Promise; +} + +export interface CaptureAndPersistTranscriptInput { + adapter: Pick; + handle: SessionHandle; + fromSeq: bigint; + sink: TranscriptChunkSink; +} + +export interface CaptureAndPersistTranscriptResult { + captured: number; + lastSeq: bigint; +} + +export async function captureAndPersistTranscript( + input: CaptureAndPersistTranscriptInput, +): Promise { + const chunks: TranscriptChunk[] = []; + let lastSeq = input.fromSeq; + + for await (const chunk of input.adapter.capture(input.handle, input.fromSeq)) { + if (chunk.seq <= lastSeq) { + throw new DevflowError("Transcript chunks must be strictly increasing", { + class: "fatal", + code: "transcript_sequence_invalid", + }); + } + if (chunk.seq !== lastSeq + 1n) { + throw new DevflowError("Transcript chunks must be contiguous", { + class: "fatal", + code: "transcript_sequence_gap", + }); + } + chunks.push(chunk); + lastSeq = chunk.seq; + } + + if (chunks.length === 0) { + return { captured: 0, lastSeq: input.fromSeq }; + } + + await input.sink.append(input.handle.sessionId, chunks); + + return { captured: chunks.length, lastSeq }; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 06ed763..e89ef8e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -90,6 +90,9 @@ importers: packages/db: dependencies: + '@devflow/core': + specifier: workspace:* + version: link:../core drizzle-orm: specifier: 0.45.2 version: 0.45.2(@types/pg@8.20.0)(pg@8.20.0) @@ -102,6 +105,10 @@ importers: '@devflow/core': specifier: workspace:* version: link:../core + devDependencies: + '@devflow/db': + specifier: workspace:* + version: link:../db packages: