feat: persist session transcripts

This commit is contained in:
chungyeong
2026-05-10 01:47:54 +09:00
parent 017528b497
commit be0ddb6e4e
9 changed files with 523 additions and 0 deletions

View File

@@ -1,2 +1,3 @@
export { createDbClient, type DbClient } from "./client.js";
export * from "./repositories/transcript.js";
export * from "./schema/index.js";

View File

@@ -0,0 +1,166 @@
import { randomUUID } from "node:crypto";
import { and, eq, inArray } from "drizzle-orm";
import { afterEach, describe, expect, it } from "vitest";
import { type DbClient, createDbClient } from "../client.js";
import { runs, tuiSessions, tuiTranscriptChunks, workflowTemplates } from "../schema/index.js";
import { TuiTranscriptRepository } from "./transcript.js";
const testDatabaseUrl =
process.env.DATABASE_URL ?? "postgres://devflow:devflow@127.0.0.1:55432/devflow";
describe("TuiTranscriptRepository", () => {
let client: DbClient | undefined;
const runIds: string[] = [];
const templateIds: string[] = [];
afterEach(async () => {
if (client === undefined) {
return;
}
if (runIds.length > 0) {
await client.db.delete(runs).where(inArray(runs.id, [...runIds]));
}
if (templateIds.length > 0) {
await client.db
.delete(workflowTemplates)
.where(inArray(workflowTemplates.id, [...templateIds]));
}
runIds.length = 0;
templateIds.length = 0;
await client.close();
client = undefined;
});
async function createSession() {
client = createDbClient(testDatabaseUrl);
const templateId = randomUUID();
const runId = randomUUID();
const sessionId = randomUUID();
templateIds.push(templateId);
runIds.push(runId);
await client.db.insert(workflowTemplates).values({
id: templateId,
name: `template-${templateId}`,
version: 1,
hash: `hash-${templateId}`,
definition: {},
});
await client.db.insert(runs).values({
id: runId,
templateId,
templateHash: `hash-${templateId}`,
state: "executing",
repoPath: `/tmp/devflow-${runId}`,
baseBranch: "main",
worktreeRoot: `/tmp/devflow-${runId}/main`,
});
await client.db.insert(tuiSessions).values({
id: sessionId,
runId,
roleId: "implementer",
backend: "fake",
cwd: `/tmp/devflow-${runId}/main`,
state: "READY",
});
return { db: client.db, sessionId };
}
it("appends transcript chunks idempotently and advances last_capture_seq", async () => {
const { db, sessionId } = await createSession();
const repository = new TuiTranscriptRepository(db);
const firstAt = new Date("2026-05-09T00:00:00.000Z");
const secondAt = new Date("2026-05-09T00:00:01.000Z");
await repository.append(sessionId, [
{ seq: 1n, content: "first", capturedAt: firstAt },
{ seq: 2n, content: "second", capturedAt: secondAt },
]);
await repository.append(sessionId, [
{ seq: 2n, content: "second", capturedAt: secondAt },
{ seq: 3n, content: "third", capturedAt: new Date("2026-05-09T00:00:02.000Z") },
]);
const rows = await db
.select({
seq: tuiTranscriptChunks.seq,
content: tuiTranscriptChunks.content,
})
.from(tuiTranscriptChunks)
.where(eq(tuiTranscriptChunks.sessionId, sessionId))
.orderBy(tuiTranscriptChunks.seq);
expect(rows).toEqual([
{ seq: 1n, content: "first" },
{ seq: 2n, content: "second" },
{ seq: 3n, content: "third" },
]);
const [session] = await db
.select({ lastCaptureSeq: tuiSessions.lastCaptureSeq })
.from(tuiSessions)
.where(eq(tuiSessions.id, sessionId));
expect(session?.lastCaptureSeq).toBe(3n);
});
it("rejects conflicting content for an existing transcript sequence", async () => {
const { db, sessionId } = await createSession();
const repository = new TuiTranscriptRepository(db);
const capturedAt = new Date("2026-05-09T00:00:00.000Z");
await repository.append(sessionId, [{ seq: 1n, content: "first", capturedAt }]);
await expect(
repository.append(sessionId, [{ seq: 1n, content: "different", capturedAt }]),
).rejects.toMatchObject({ code: "transcript_seq_conflict" });
const rows = await db
.select()
.from(tuiTranscriptChunks)
.where(and(eq(tuiTranscriptChunks.sessionId, sessionId), eq(tuiTranscriptChunks.seq, 1n)));
expect(rows).toHaveLength(1);
expect(rows?.[0]?.content).toBe("first");
});
it("rejects sequence gaps before advancing last_capture_seq", async () => {
const { db, sessionId } = await createSession();
const repository = new TuiTranscriptRepository(db);
await expect(
repository.append(sessionId, [
{ seq: 2n, content: "second", capturedAt: new Date("2026-05-09T00:00:01.000Z") },
]),
).rejects.toMatchObject({ code: "transcript_sequence_gap" });
const [session] = await db
.select({ lastCaptureSeq: tuiSessions.lastCaptureSeq })
.from(tuiSessions)
.where(eq(tuiSessions.id, sessionId));
expect(session?.lastCaptureSeq).toBe(0n);
});
it("surfaces concurrent conflicting content for the same transcript sequence", async () => {
const { db, sessionId } = await createSession();
const firstRepository = new TuiTranscriptRepository(db);
const secondRepository = new TuiTranscriptRepository(db);
const capturedAt = new Date("2026-05-09T00:00:00.000Z");
const results = await Promise.allSettled([
firstRepository.append(sessionId, [{ seq: 1n, content: "first", capturedAt }]),
secondRepository.append(sessionId, [{ seq: 1n, content: "different", capturedAt }]),
]);
expect(results.filter((result) => result.status === "fulfilled")).toHaveLength(1);
const rejected = results.find((result) => result.status === "rejected");
expect(rejected).toMatchObject({
reason: expect.objectContaining({ code: "transcript_seq_conflict" }),
});
const rows = await db
.select()
.from(tuiTranscriptChunks)
.where(eq(tuiTranscriptChunks.sessionId, sessionId));
expect(rows).toHaveLength(1);
});
});

View File

@@ -0,0 +1,139 @@
import { DevflowError } from "@devflow/core";
import { and, eq, inArray, sql } from "drizzle-orm";
import type { DbClient } from "../client.js";
import { tuiSessions, tuiTranscriptChunks } from "../schema/index.js";
export interface TranscriptChunkInput {
seq: bigint;
content: string;
capturedAt: Date;
}
export interface AppendTranscriptResult {
received: number;
inserted: number;
lastSeq: bigint | undefined;
}
type Database = DbClient["db"];
export class TuiTranscriptRepository {
constructor(private readonly db: Database) {}
async append(
sessionId: string,
chunks: readonly TranscriptChunkInput[],
): Promise<AppendTranscriptResult> {
if (chunks.length === 0) {
return { received: 0, inserted: 0, lastSeq: undefined };
}
const normalized = normalizeChunks(chunks);
return this.db.transaction(async (tx) => {
await tx.execute(
sql`SELECT pg_advisory_xact_lock(hashtext(${`devflow:tui-transcript:${sessionId}`}))`,
);
const [session] = await tx
.select({ lastCaptureSeq: tuiSessions.lastCaptureSeq })
.from(tuiSessions)
.where(eq(tuiSessions.id, sessionId));
if (session === undefined) {
throw new DevflowError("TUI session does not exist", {
class: "fatal",
code: "session_not_found",
});
}
const insertedRows = await tx
.insert(tuiTranscriptChunks)
.values(
normalized.map((chunk) => ({
sessionId,
seq: chunk.seq,
content: chunk.content,
capturedAt: chunk.capturedAt,
})),
)
.onConflictDoNothing({
target: [tuiTranscriptChunks.sessionId, tuiTranscriptChunks.seq],
})
.returning({ seq: tuiTranscriptChunks.seq });
const seqs = normalized.map((chunk) => chunk.seq);
const persistedRows = await tx
.select({
seq: tuiTranscriptChunks.seq,
content: tuiTranscriptChunks.content,
})
.from(tuiTranscriptChunks)
.where(
and(eq(tuiTranscriptChunks.sessionId, sessionId), inArray(tuiTranscriptChunks.seq, seqs)),
);
const persistedBySeq = new Map(persistedRows.map((row) => [row.seq, row.content]));
for (const chunk of normalized) {
const persisted = persistedBySeq.get(chunk.seq);
if (persisted !== chunk.content) {
throw new DevflowError("Transcript sequence already exists with different content", {
class: "fatal",
code: "transcript_seq_conflict",
});
}
}
const lastSeq = normalized.at(-1)?.seq;
const nextCaptureSeq = advanceContiguousCursor(session.lastCaptureSeq, normalized);
await tx
.update(tuiSessions)
.set({
lastCaptureSeq: nextCaptureSeq,
})
.where(eq(tuiSessions.id, sessionId));
return { received: chunks.length, inserted: insertedRows.length, lastSeq };
});
}
}
function normalizeChunks(chunks: readonly TranscriptChunkInput[]): TranscriptChunkInput[] {
const bySeq = new Map<bigint, TranscriptChunkInput>();
for (const chunk of chunks) {
if (chunk.seq <= 0n) {
throw new DevflowError("Transcript sequence must be positive", {
class: "fatal",
code: "transcript_sequence_invalid",
});
}
const existing = bySeq.get(chunk.seq);
if (existing !== undefined && existing.content !== chunk.content) {
throw new DevflowError("Duplicate transcript sequence has conflicting content", {
class: "fatal",
code: "transcript_seq_conflict",
});
}
bySeq.set(chunk.seq, chunk);
}
return [...bySeq.values()].sort((left, right) => Number(left.seq - right.seq));
}
function advanceContiguousCursor(current: bigint, chunks: readonly TranscriptChunkInput[]): bigint {
let cursor = current;
for (const chunk of chunks) {
if (chunk.seq <= cursor) {
continue;
}
if (chunk.seq !== cursor + 1n) {
throw new DevflowError("Transcript sequence cannot skip the capture cursor", {
class: "fatal",
code: "transcript_sequence_gap",
});
}
cursor = chunk.seq;
}
return cursor;
}