feat: add fake phase harness

This commit is contained in:
chungyeong
2026-05-10 16:48:52 +09:00
parent be0ddb6e4e
commit 64efeabd33
22 changed files with 5766 additions and 76 deletions

View File

@@ -8,8 +8,8 @@
"types": "./dist/index.d.ts",
"scripts": {
"build": "tsup src/index.ts --format esm,cjs --clean && tsc -p tsconfig.build.json",
"typecheck": "tsc -b --noEmit",
"test": "vitest run"
"typecheck": "tsc -p ../../tsconfig.typecheck.json --noEmit",
"test": "cd ../.. && vitest run --project packages/db"
},
"dependencies": {
"@devflow/core": "workspace:*",

View File

@@ -1,3 +1,4 @@
export { createDbClient, type DbClient } from "./client.js";
export * from "./repositories/run-event.js";
export * from "./repositories/transcript.js";
export * from "./schema/index.js";

View File

@@ -0,0 +1,278 @@
import {
DevflowError,
RunEventPayloadSchemas,
RunEventType,
type RunEventType as RunEventTypeName,
canonicalize,
} from "@devflow/core";
import { and, desc, eq, sql } from "drizzle-orm";
import type { DbClient } from "../client.js";
import { runEvents, runPhases } from "../schema/index.js";
type Database = DbClient["db"];
type TransactionDatabase = Parameters<Parameters<Database["transaction"]>[0]>[0];
export interface AppendRunEventInput {
runId: string;
phaseId?: string;
type: RunEventTypeName;
payload: Record<string, unknown>;
idempotencyKey: string;
}
export interface RunEventRow {
id: bigint;
runId: string;
phaseId: string | null;
seq: bigint;
type: string;
payload: unknown;
idempotencyKey: string;
ts: Date;
}
export class RunEventRepository {
constructor(private readonly db: Database) {}
async append(input: AppendRunEventInput): Promise<RunEventRow> {
return this.db.transaction(async (tx) => this.appendInTransaction(tx, input));
}
async appendInTransaction(
tx: TransactionDatabase,
input: AppendRunEventInput,
): Promise<RunEventRow> {
const type = RunEventType.parse(input.type);
const payload = RunEventPayloadSchemas[type].parse(input.payload) as Record<string, unknown>;
if (isPhaseScopedEvent(type) && input.phaseId === undefined) {
throw new DevflowError("Run event phase id is required for phase-scoped event", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
});
}
if (input.idempotencyKey.length === 0) {
throw new DevflowError("Run event idempotency key is required", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
...(input.phaseId === undefined ? {} : { phaseId: input.phaseId }),
});
}
const expectedIdempotencyKey = expectedRunEventIdempotencyKey(input, type, payload);
if (expectedIdempotencyKey !== undefined && input.idempotencyKey !== expectedIdempotencyKey) {
throw new DevflowError("Run event idempotency key does not match event contract", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
...(input.phaseId === undefined ? {} : { phaseId: input.phaseId }),
});
}
await tx.execute(
sql`SELECT pg_advisory_xact_lock(hashtext('devflow:run-events'), hashtext(${input.runId}))`,
);
if (input.phaseId !== undefined) {
const [phase] = await tx
.select({ id: runPhases.id })
.from(runPhases)
.where(and(eq(runPhases.id, input.phaseId), eq(runPhases.runId, input.runId)))
.limit(1);
if (phase === undefined) {
throw new DevflowError("Run event phase does not belong to run", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
}
const existing = await tx
.select()
.from(runEvents)
.where(
and(eq(runEvents.runId, input.runId), eq(runEvents.idempotencyKey, input.idempotencyKey)),
)
.limit(1);
if (existing[0] !== undefined) {
assertIdempotentReplayMatches(input, type, payload, existing[0]);
return existing[0];
}
const latest = await tx
.select({ seq: runEvents.seq })
.from(runEvents)
.where(eq(runEvents.runId, input.runId))
.orderBy(desc(runEvents.seq))
.limit(1);
const seq = (latest[0]?.seq ?? 0n) + 1n;
const inserted = await tx
.insert(runEvents)
.values({
runId: input.runId,
phaseId: input.phaseId,
seq,
type,
payload,
idempotencyKey: input.idempotencyKey,
})
.returning();
const event = inserted[0];
if (event === undefined) {
throw new DevflowError("Run event insert returned no row", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
...(input.phaseId === undefined ? {} : { phaseId: input.phaseId }),
});
}
return event;
}
}
function isPhaseScopedEvent(type: RunEventTypeName): boolean {
return (
type.startsWith("phase.") || type.startsWith("artifact.") || type === "review.batch_recorded"
);
}
function assertIdempotentReplayMatches(
input: AppendRunEventInput,
type: RunEventTypeName,
payload: Record<string, unknown>,
existing: RunEventRow,
) {
const sameType = existing.type === type;
const samePhase = !isPhaseScopedEvent(type) || existing.phaseId === (input.phaseId ?? null);
const samePayload = canonicalize(normalizeJson(existing.payload)) === canonicalize(payload);
if (sameType && samePhase && samePayload) {
return;
}
throw new DevflowError("Run event idempotency key replay does not match existing event", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
...(input.phaseId === undefined ? {} : { phaseId: input.phaseId }),
});
}
function expectedRunEventIdempotencyKey(
input: AppendRunEventInput,
type: RunEventTypeName,
payload: Record<string, unknown>,
): string | undefined {
switch (type) {
case "run.created":
case "run.started":
case "run.completed":
case "run.failed":
case "run.aborted":
return `${type}:${input.runId}`;
case "run.paused":
case "run.resumed":
return `${type}:${input.runId}:${stringPayload(payload, "cause")}`;
case "phase.started":
case "phase.completed":
case "phase.failed":
case "phase.skipped":
return `${type}:${requiredPhaseId(input)}:${numberPayload(payload, "attempt")}`;
case "prompt.sent":
case "prompt.repaired":
return `${type}:${stringPayload(payload, "dedupKey")}`;
case "artifact.expected":
case "artifact.timeout":
return `${type}:${requiredPhaseId(input)}:${numberPayload(payload, "attempt")}:${stringPayload(payload, "path")}`;
case "artifact.validated":
case "artifact.invalid":
return `${type}:${requiredPhaseId(input)}:${stringPayload(payload, "path")}:${stringPayload(payload, "hash")}`;
case "approval.requested":
return `approval.requested:${stringPayload(payload, "approvalIdempotencyKey")}`;
case "approval.resolved":
return `approval.resolved:${stringPayload(payload, "approvalRequestId")}:${stringPayload(payload, "action")}`;
case "session.created":
case "session.failed":
return `${type}:${stringPayload(payload, "sessionId")}`;
case "session.busy":
case "session.idle":
return `${type}:${stringPayload(payload, "sessionId")}:${stringPayload(payload, "dedupKey")}`;
case "session.ready":
case "session.crashed":
case "session.recovered":
return `${type}:${stringPayload(payload, "sessionId")}:${numberPayload(payload, "recoveryAttempts")}`;
case "command.started":
case "command.completed":
case "command.failed":
return `${type}:${stringPayload(payload, "commandId")}`;
case "review.batch_recorded":
return `review.batch_recorded:${requiredPhaseId(input)}:${stringPayload(payload, "reviewerRole")}:${numberPayload(payload, "attempt")}`;
case "finding.verifier_resolved":
return `finding.verifier_resolved:${stringPayload(payload, "findingId")}`;
case "backtest.iteration_started":
case "backtest.iteration_completed":
case "backtest.objective_evaluated":
return `${type}:${stringPayload(payload, "iterationId")}`;
default:
return undefined;
}
}
function requiredPhaseId(input: AppendRunEventInput): string {
if (input.phaseId === undefined) {
throw new DevflowError("Run event phase id is required for idempotency key", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
});
}
return input.phaseId;
}
function stringPayload(payload: Record<string, unknown>, key: string): string {
const value = payload[key];
if (typeof value !== "string" || value.length === 0) {
throw new DevflowError(`Run event payload is missing string field ${key}`, {
class: "fatal",
code: "internal_state_corruption",
});
}
return value;
}
function numberPayload(payload: Record<string, unknown>, key: string): number {
const value = payload[key];
if (typeof value !== "number" || !Number.isInteger(value)) {
throw new DevflowError(`Run event payload is missing integer field ${key}`, {
class: "fatal",
code: "internal_state_corruption",
});
}
return value;
}
function normalizeJson(value: unknown): unknown {
if (Array.isArray(value)) {
return value.map((item) => normalizeJson(item));
}
if (value !== null && typeof value === "object") {
return Object.fromEntries(
Object.entries(value as Record<string, unknown>).map(([key, child]) => [
key,
normalizeJson(child),
]),
);
}
return value;
}