feat: add minimum API and web GUI

This commit is contained in:
chungyeong
2026-05-14 01:16:41 +09:00
parent e5020a59f0
commit c9fed71cc9
21 changed files with 3757 additions and 11 deletions

View File

@@ -14,6 +14,8 @@
"@devflow/run-engine": "workspace:*",
"@devflow/session": "workspace:*",
"@devflow/workflows": "workspace:*",
"@temporalio/client": "^1.17.1"
"@fastify/sensible": "6",
"@temporalio/client": "^1.17.1",
"fastify": "5"
}
}

952
apps/api/src/http.test.ts Normal file
View File

@@ -0,0 +1,952 @@
import { randomUUID } from "node:crypto";
import { mkdtempSync, realpathSync, rmSync } from "node:fs";
import { get } from "node:http";
import { tmpdir } from "node:os";
import { join } from "node:path";
import type { ApprovalDecisionAction } from "@devflow/core";
import {
type DbClient,
RunEventRepository,
agentPersonas,
approvalDecisions,
approvalRequests,
createDbClient,
runInputs,
runs,
tuiSessions,
workflowTemplates,
} from "@devflow/db";
import type { RunEngine, RunStartInput, RunStatus } from "@devflow/run-engine";
import { and, eq, inArray } from "drizzle-orm";
import { afterEach, describe, expect, it } from "vitest";
import { createHttpApi } from "./http.js";
import { formatSseMessage, runEventMessages } from "./sse.js";
const databaseUrl =
process.env.DATABASE_URL ?? "postgres://devflow:devflow@127.0.0.1:55432/devflow";
class RecordingEngine implements RunEngine {
readonly approvalSignals: Array<{
action: ApprovalDecisionAction;
approvalRequestId: string;
clientToken: string;
comment?: string;
runId: string;
}> = [];
readonly startedRuns: RunStartInput[] = [];
constructor(
protected readonly db: DbClient["db"],
private readonly runId = randomUUID(),
) {}
async startRun(input: RunStartInput): Promise<{ runId: string }> {
this.startedRuns.push(input);
return { runId: this.runId };
}
async signalApproval(
runId: string,
approvalRequestId: string,
action: ApprovalDecisionAction,
clientToken: string,
comment?: string,
): Promise<void> {
this.approvalSignals.push({
action,
approvalRequestId,
clientToken,
...(comment === undefined ? {} : { comment }),
runId,
});
}
async pauseRun(_runId: string): Promise<void> {
return;
}
async resumeRun(_runId: string): Promise<void> {
return;
}
async abortRun(_runId: string, _reason: string): Promise<void> {
return;
}
async getStatus(runId: string): Promise<RunStatus> {
const { readRunStatus } = await import("@devflow/run-engine");
return readRunStatus(this.db, runId);
}
}
class DecisionRecordingEngine extends RecordingEngine {
override async signalApproval(
runId: string,
approvalRequestId: string,
action: ApprovalDecisionAction,
clientToken: string,
comment?: string,
): Promise<void> {
await new Promise((resolve) => setTimeout(resolve, 25));
await super.signalApproval(runId, approvalRequestId, action, clientToken, comment);
const idempotencyKey = `${approvalRequestId}:${action}:${clientToken}`;
const [existing] = await this.db
.select({ id: approvalDecisions.id })
.from(approvalDecisions)
.where(eq(approvalDecisions.idempotencyKey, idempotencyKey))
.limit(1);
if (existing !== undefined) {
return;
}
await this.db.insert(approvalDecisions).values({
approvalRequestId,
action,
comment,
idempotencyKey,
});
}
}
describe("HTTP API", () => {
let client: DbClient | undefined;
const runIds: string[] = [];
const templateIds: string[] = [];
const personaIds: string[] = [];
const tempRoots: string[] = [];
afterEach(async () => {
if (client !== undefined) {
if (runIds.length > 0) {
const requests = await client.db
.select({ id: approvalRequests.id })
.from(approvalRequests)
.where(inArray(approvalRequests.runId, [...runIds]));
if (requests.length > 0) {
await client.db.delete(approvalDecisions).where(
inArray(
approvalDecisions.approvalRequestId,
requests.map((request) => request.id),
),
);
}
await client.db
.delete(approvalRequests)
.where(inArray(approvalRequests.runId, [...runIds]));
await client.db.delete(runs).where(inArray(runs.id, [...runIds]));
}
if (personaIds.length > 0) {
await client.db.delete(agentPersonas).where(inArray(agentPersonas.id, [...personaIds]));
}
if (templateIds.length > 0) {
await client.db
.delete(workflowTemplates)
.where(inArray(workflowTemplates.id, [...templateIds]));
}
await client.close();
client = undefined;
}
for (const root of tempRoots.splice(0)) {
rmSync(root, { recursive: true, force: true });
}
runIds.length = 0;
templateIds.length = 0;
personaIds.length = 0;
});
it("exposes templates, personas, run creation, and approval signaling routes", async () => {
client = createDbClient(databaseUrl);
const templateId = await insertTemplate(client, templateIds);
const personaId = await insertPersona(client, personaIds);
const engine = new RecordingEngine(client.db, "00000000-0000-4000-8000-000000000701");
const approvalRunId = "00000000-0000-4000-8000-000000000703";
const approvalRequestId = "00000000-0000-4000-8000-000000000704";
const app = await createHttpApi({ db: client.db, engine });
try {
const templates = await app.inject({ method: "GET", url: "/api/templates" });
expect(templates.statusCode).toBe(200);
expect((templates.json() as { templates: unknown[] }).templates).toEqual(
expect.arrayContaining([
expect.objectContaining({ id: templateId, name: "http-template", version: 1 }),
]),
);
const personas = await app.inject({ method: "GET", url: "/api/personas" });
expect(personas.statusCode).toBe(200);
expect((personas.json() as { personas: unknown[] }).personas).toEqual(
expect.arrayContaining([
expect.objectContaining({ id: personaId, name: "http-persona", version: 1 }),
]),
);
const created = await app.inject({
method: "POST",
url: "/api/runs",
payload: {
baseBranch: "main",
repoPath: "/tmp/repo",
requirementsMd: "Build the thing.",
templateName: "development",
templateVersion: 1,
},
});
expect(created.statusCode).toBe(201);
expect(created.json()).toEqual({ runId: "00000000-0000-4000-8000-000000000701" });
expect(engine.startedRuns).toMatchObject([
{
baseBranch: "main",
repoPath: "/tmp/repo",
requirementsMd: "Build the thing.",
templateName: "development",
templateVersion: 1,
},
]);
const approval = await app.inject({
method: "POST",
url: `/api/runs/${approvalRunId}/approvals/${approvalRequestId}`,
payload: {
action: "approve",
clientToken: "00000000-0000-4000-8000-000000000702",
comment: "ship",
},
});
expect(approval.statusCode).toBe(201);
expect(engine.approvalSignals).toEqual([
{
action: "approve",
approvalRequestId,
clientToken: "00000000-0000-4000-8000-000000000702",
comment: "ship",
runId: approvalRunId,
},
]);
const missingToken = await app.inject({
method: "POST",
url: `/api/runs/${approvalRunId}/approvals/${approvalRequestId}`,
payload: { action: "approve" },
});
expect(missingToken.statusCode).toBe(400);
} finally {
await app.close();
}
});
it("streams run events over native SSE with run-event replay", async () => {
client = createDbClient(databaseUrl);
const templateId = await insertTemplate(client, templateIds);
const runId = randomUUID();
runIds.push(runId);
const root = realpathSync(mkdtempSync(join(tmpdir(), "devflow-http-sse-")));
tempRoots.push(root);
await client.db.insert(runs).values({
id: runId,
templateId,
templateHash: "a".repeat(64),
state: "created",
repoPath: root,
baseBranch: "main",
worktreeRoot: root,
});
await client.db.insert(runInputs).values({
runId,
requirementsMd: "SSE replay",
inputHash: "b".repeat(64),
});
await new RunEventRepository(client.db).append({
runId,
type: "run.created",
payload: { runId },
idempotencyKey: `run.created:${runId}`,
});
const app = await createHttpApi({
db: client.db,
engine: new RecordingEngine(client.db),
heartbeatMs: 1000,
pollMs: 10,
});
await app.listen({ host: "127.0.0.1", port: 0 });
const address = app.server.address();
if (address === null || typeof address === "string") {
throw new Error("HTTP server did not expose a TCP address");
}
try {
const body = await readSseUntil(
`http://127.0.0.1:${address.port}/sse/runs/${runId}`,
"run.event_appended",
);
expect(body).toContain("event: run.event_appended");
expect(body).toContain('"type":"run.created"');
} finally {
await app.close();
}
});
it("streams only global-scope derived events on fresh global SSE connect", async () => {
client = createDbClient(databaseUrl);
const templateId = await insertTemplate(client, templateIds);
const runId = randomUUID();
runIds.push(runId);
const root = realpathSync(mkdtempSync(join(tmpdir(), "devflow-http-global-sse-")));
tempRoots.push(root);
await client.db.insert(runs).values({
id: runId,
templateId,
templateHash: "1".repeat(64),
state: "created",
repoPath: root,
baseBranch: "main",
worktreeRoot: root,
});
await client.db.insert(runInputs).values({
runId,
requirementsMd: "Global SSE",
inputHash: "2".repeat(64),
});
const events = new RunEventRepository(client.db);
await events.append({
runId,
type: "run.created",
payload: { runId },
idempotencyKey: `run.created:${runId}`,
});
const app = await createHttpApi({
db: client.db,
engine: new RecordingEngine(client.db),
heartbeatMs: 1000,
pollMs: 10,
});
await app.listen({ host: "127.0.0.1", port: 0 });
const address = app.server.address();
if (address === null || typeof address === "string") {
throw new Error("HTTP server did not expose a TCP address");
}
try {
const body = await readSseUntil(
`http://127.0.0.1:${address.port}/sse/global`,
'"next":"bound"',
async () => {
await events.append({
runId,
type: "session.ready",
payload: {
sessionId: "00000000-0000-4000-8000-000000000705",
roleId: "builder",
recoveryAttempts: 0,
},
idempotencyKey: "session.ready:00000000-0000-4000-8000-000000000705:0",
});
await events.append({
runId,
type: "run.started",
payload: { templateHash: "1".repeat(64) },
idempotencyKey: `run.started:${runId}`,
});
},
);
expect(body).toContain("id:");
expect(body).toContain('"next":"bound"');
expect(body).not.toContain('"next":"created"');
expect(body).not.toContain("event: session.state_changed");
} finally {
await app.close();
}
});
it("replays missed global SSE events from Last-Event-ID", async () => {
client = createDbClient(databaseUrl);
const templateId = await insertTemplate(client, templateIds);
const runId = randomUUID();
runIds.push(runId);
const root = realpathSync(mkdtempSync(join(tmpdir(), "devflow-http-global-replay-")));
tempRoots.push(root);
await client.db.insert(runs).values({
id: runId,
templateId,
templateHash: "3".repeat(64),
state: "created",
repoPath: root,
baseBranch: "main",
worktreeRoot: root,
});
await client.db.insert(runInputs).values({
runId,
requirementsMd: "Global SSE replay",
inputHash: "4".repeat(64),
});
const events = new RunEventRepository(client.db);
const created = await events.append({
runId,
type: "run.created",
payload: { runId },
idempotencyKey: `run.created:${runId}`,
});
await events.append({
runId,
type: "run.started",
payload: { templateHash: "3".repeat(64) },
idempotencyKey: `run.started:${runId}`,
});
const app = await createHttpApi({
db: client.db,
engine: new RecordingEngine(client.db),
heartbeatMs: 1000,
pollMs: 10,
});
await app.listen({ host: "127.0.0.1", port: 0 });
const address = app.server.address();
if (address === null || typeof address === "string") {
throw new Error("HTTP server did not expose a TCP address");
}
try {
const body = await readSseUntil(
`http://127.0.0.1:${address.port}/sse/global`,
'"next":"bound"',
undefined,
{ "Last-Event-ID": created.id.toString() },
);
expect(body).toContain('"next":"bound"');
expect(body).not.toContain('"next":"created"');
expect(body).not.toContain("event: run.event_appended");
} finally {
await app.close();
}
});
it("returns 200 for approval decision replay", async () => {
client = createDbClient(databaseUrl);
const templateId = await insertTemplate(client, templateIds);
const runId = randomUUID();
const approvalRequestId = randomUUID();
const clientToken = randomUUID();
runIds.push(runId);
const root = realpathSync(mkdtempSync(join(tmpdir(), "devflow-http-approval-")));
tempRoots.push(root);
await client.db.insert(runs).values({
id: runId,
templateId,
templateHash: "e".repeat(64),
state: "awaiting_approval",
repoPath: root,
baseBranch: "main",
worktreeRoot: root,
});
await client.db.insert(runInputs).values({
runId,
requirementsMd: "Approval replay",
inputHash: "f".repeat(64),
});
await client.db.insert(approvalRequests).values({
id: approvalRequestId,
runId,
gateKey: "spec_approved",
state: "approved",
idempotencyKey: `${runId}:spec_approved::1`,
payload: { replay: true },
});
await client.db.insert(approvalDecisions).values({
approvalRequestId,
action: "approve",
idempotencyKey: `${approvalRequestId}:approve:${clientToken}`,
});
const engine = new RecordingEngine(client.db);
const app = await createHttpApi({ db: client.db, engine });
try {
const response = await app.inject({
method: "POST",
url: `/api/runs/${runId}/approvals/${approvalRequestId}`,
payload: { action: "approve", clientToken },
});
expect(response.statusCode).toBe(200);
expect(engine.approvalSignals).toMatchObject([{ action: "approve", approvalRequestId }]);
} finally {
await app.close();
}
});
it("serializes same-token approval responses so exactly one request reports created", async () => {
client = createDbClient(databaseUrl);
const templateId = await insertTemplate(client, templateIds);
const runId = randomUUID();
const approvalRequestId = randomUUID();
const clientToken = randomUUID();
runIds.push(runId);
const root = realpathSync(mkdtempSync(join(tmpdir(), "devflow-http-approval-race-")));
tempRoots.push(root);
await client.db.insert(runs).values({
id: runId,
templateId,
templateHash: "7".repeat(64),
state: "awaiting_approval",
repoPath: root,
baseBranch: "main",
worktreeRoot: root,
});
await client.db.insert(runInputs).values({
runId,
requirementsMd: "Approval race",
inputHash: "8".repeat(64),
});
await client.db.insert(approvalRequests).values({
id: approvalRequestId,
runId,
gateKey: "spec_approved",
state: "pending",
idempotencyKey: `${runId}:spec_approved::1`,
payload: { replay: false },
});
const engine = new DecisionRecordingEngine(client.db);
const app = await createHttpApi({ db: client.db, engine });
const secondApp = await createHttpApi({ db: client.db, engine });
try {
const responses = await Promise.all([
app.inject({
method: "POST",
url: `/api/runs/${runId}/approvals/${approvalRequestId}`,
payload: { action: "approve", clientToken },
}),
secondApp.inject({
method: "POST",
url: `/api/runs/${runId}/approvals/${approvalRequestId}`,
payload: { action: "approve", clientToken },
}),
]);
expect(responses.map((response) => response.statusCode).sort()).toEqual([200, 201]);
expect(
responses.map((response) => (response.json() as { decision: { id: string } }).decision.id),
).toEqual([expect.any(String), expect.any(String)]);
const decisions = await client.db
.select({ id: approvalDecisions.id })
.from(approvalDecisions)
.where(
and(
eq(approvalDecisions.approvalRequestId, approvalRequestId),
eq(approvalDecisions.action, "approve"),
),
);
expect(decisions).toHaveLength(1);
} finally {
await app.close();
await secondApp.close();
}
});
it("drains long run SSE replay gaps without derived historical events", async () => {
client = createDbClient(databaseUrl);
const templateId = await insertTemplate(client, templateIds);
const runId = randomUUID();
runIds.push(runId);
const root = realpathSync(mkdtempSync(join(tmpdir(), "devflow-http-long-replay-")));
tempRoots.push(root);
await client.db.insert(runs).values({
id: runId,
templateId,
templateHash: "9".repeat(64),
state: "executing",
repoPath: root,
baseBranch: "main",
worktreeRoot: root,
});
await client.db.insert(runInputs).values({
runId,
requirementsMd: "Long SSE replay",
inputHash: "0".repeat(64),
});
const events = new RunEventRepository(client.db);
for (let index = 0; index < 100; index += 1) {
const commandId = randomUUID();
await events.append({
runId,
type: "command.started",
payload: { commandId },
idempotencyKey: `command.started:${commandId}`,
});
}
await events.append({
runId,
type: "run.started",
payload: { templateHash: "9".repeat(64) },
idempotencyKey: `run.started:${runId}`,
});
const app = await createHttpApi({
db: client.db,
engine: new RecordingEngine(client.db),
heartbeatMs: 1000,
pollMs: 10,
});
await app.listen({ host: "127.0.0.1", port: 0 });
const address = app.server.address();
if (address === null || typeof address === "string") {
throw new Error("HTTP server did not expose a TCP address");
}
try {
const body = await readSseUntil(
`http://127.0.0.1:${address.port}/sse/runs/${runId}`,
'"type":"run.started"',
);
expect(body).toContain("event: run.event_appended");
expect(body).not.toContain("event: run.state_changed");
} finally {
await app.close();
}
});
it("assigns the global SSE cursor only after all messages derived from one row", async () => {
client = createDbClient(databaseUrl);
const templateId = await insertTemplate(client, templateIds);
const runId = randomUUID();
const approvalRequestId = randomUUID();
runIds.push(runId);
const root = realpathSync(mkdtempSync(join(tmpdir(), "devflow-http-global-cursor-")));
tempRoots.push(root);
await client.db.insert(runs).values({
id: runId,
templateId,
templateHash: "5".repeat(64),
state: "awaiting_approval",
repoPath: root,
baseBranch: "main",
worktreeRoot: root,
});
await client.db.insert(runInputs).values({
runId,
requirementsMd: "Global SSE cursor",
inputHash: "6".repeat(64),
});
const events = new RunEventRepository(client.db);
const app = await createHttpApi({
db: client.db,
engine: new RecordingEngine(client.db),
heartbeatMs: 1000,
pollMs: 10,
});
await app.listen({ host: "127.0.0.1", port: 0 });
const address = app.server.address();
if (address === null || typeof address === "string") {
throw new Error("HTTP server did not expose a TCP address");
}
try {
const body = await readSseUntil(
`http://127.0.0.1:${address.port}/sse/global`,
'"next":"awaiting_approval"',
async () => {
await events.append({
runId,
type: "approval.requested",
payload: {
approvalRequestId,
approvalIdempotencyKey: `${runId}:spec_approved::1`,
gateKey: "spec_approved",
runState: "awaiting_approval",
},
idempotencyKey: `approval.requested:${runId}:spec_approved::1`,
});
},
);
expect(body).toContain("event: approval.created");
expect(body).toContain("event: run.state_changed");
expect(body).toMatch(/event: approval\.created\ndata:/);
expect(body).toMatch(/id: \d+\nevent: run\.state_changed/);
} finally {
await app.close();
}
});
it("exposes TUI sessions for the run detail view", async () => {
client = createDbClient(databaseUrl);
const templateId = await insertTemplate(client, templateIds);
const runId = randomUUID();
runIds.push(runId);
const root = realpathSync(mkdtempSync(join(tmpdir(), "devflow-http-sessions-")));
tempRoots.push(root);
await client.db.insert(runs).values({
id: runId,
templateId,
templateHash: "c".repeat(64),
state: "executing",
repoPath: root,
baseBranch: "main",
worktreeRoot: root,
});
await client.db.insert(runInputs).values({
runId,
requirementsMd: "Session list",
inputHash: "d".repeat(64),
});
await client.db.insert(tuiSessions).values({
runId,
roleId: "builder",
backend: "fake",
cwd: root,
expectedArtifactPath: join(root, "artifact.json"),
expectedSchema: "dev/spec@1",
state: "READY",
});
const app = await createHttpApi({ db: client.db, engine: new RecordingEngine(client.db) });
try {
const response = await app.inject({ method: "GET", url: `/api/runs/${runId}/sessions` });
expect(response.statusCode).toBe(200);
expect(response.json()).toMatchObject({
sessions: [
{
backend: "fake",
cwd: root,
expectedArtifactPath: join(root, "artifact.json"),
expectedSchema: "dev/spec@1",
recoveryAttempts: 0,
roleId: "builder",
state: "READY",
},
],
});
} finally {
await app.close();
}
});
});
describe("SSE formatting", () => {
it("formats run event messages and derives contract events", () => {
const messages = runEventMessages(
{
id: 1n,
payload: {
approvalRequestId: "00000000-0000-4000-8000-000000000703",
gateKey: "spec_approved",
},
phaseId: null,
runId: "00000000-0000-4000-8000-000000000704",
seq: 7n,
ts: new Date("2026-05-13T00:00:00.000Z"),
type: "approval.requested",
},
{ deriveStateEvents: true },
);
const appended = messages[0];
const derived = messages[1];
if (appended === undefined || derived === undefined) {
throw new Error("Expected appended and derived SSE messages");
}
expect(formatSseMessage(appended)).toContain("id: 7\nevent: run.event_appended");
expect(derived).toEqual({
data: {
approvalId: "00000000-0000-4000-8000-000000000703",
gateKey: "spec_approved",
runId: "00000000-0000-4000-8000-000000000704",
},
event: "approval.created",
id: "7",
});
});
it("derives state change events with locked state values", () => {
expect(
runEventMessages(
{
id: 2n,
payload: { templateHash: "a".repeat(64) },
phaseId: null,
runId: "00000000-0000-4000-8000-000000000704",
seq: 8n,
ts: new Date("2026-05-13T00:00:00.000Z"),
type: "run.started",
},
{ deriveStateEvents: true },
)[1],
).toMatchObject({ data: { next: "bound" }, event: "run.state_changed" });
expect(
runEventMessages(
{
id: 3n,
payload: { attempt: 1, phaseKey: "implement" },
phaseId: "00000000-0000-4000-8000-000000000705",
runId: "00000000-0000-4000-8000-000000000704",
seq: 9n,
ts: new Date("2026-05-13T00:00:00.000Z"),
type: "phase.started",
},
{ deriveStateEvents: true },
)[1],
).toMatchObject({ data: { next: "running" }, event: "phase.state_changed" });
expect(
runEventMessages(
{
id: 4n,
payload: {
recoveryAttempts: 0,
roleId: "builder",
sessionId: "00000000-0000-4000-8000-000000000706",
},
phaseId: null,
runId: "00000000-0000-4000-8000-000000000704",
seq: 10n,
ts: new Date("2026-05-13T00:00:00.000Z"),
type: "session.ready",
},
{ deriveStateEvents: true },
)[1],
).toMatchObject({ data: { next: "READY" }, event: "session.state_changed" });
expect(
runEventMessages(
{
id: 7n,
payload: { attempt: 1, phaseKey: "implement", runState: "executing" },
phaseId: "00000000-0000-4000-8000-000000000705",
runId: "00000000-0000-4000-8000-000000000704",
seq: 13n,
ts: new Date("2026-05-13T00:00:00.000Z"),
type: "phase.started",
},
{ deriveStateEvents: true },
).map((message) => message.event),
).toEqual(["run.event_appended", "run.state_changed", "phase.state_changed"]);
expect(
runEventMessages(
{
id: 5n,
payload: {
attempt: 1,
path: "/tmp/spec.json",
phaseKey: "spec",
schemaId: "dev/spec@1",
},
phaseId: "00000000-0000-4000-8000-000000000705",
runId: "00000000-0000-4000-8000-000000000704",
seq: 11n,
ts: new Date("2026-05-13T00:00:00.000Z"),
type: "artifact.expected",
},
{ deriveStateEvents: true },
)[1],
).toMatchObject({ data: { next: "awaiting_artifact" }, event: "phase.state_changed" });
expect(
runEventMessages(
{
id: 6n,
payload: {
approvalRequestId: "00000000-0000-4000-8000-000000000707",
approvalIdempotencyKey: "approval-key",
gateKey: "spec_approved",
phaseKey: "spec",
phaseState: "awaiting_approval",
roleId: "builder",
runState: "awaiting_approval",
sessionId: "00000000-0000-4000-8000-000000000706",
sessionState: "WAITING_FOR_APPROVAL",
},
phaseId: "00000000-0000-4000-8000-000000000705",
runId: "00000000-0000-4000-8000-000000000704",
seq: 12n,
ts: new Date("2026-05-13T00:00:00.000Z"),
type: "approval.requested",
},
{ deriveStateEvents: true },
).map((message) => message.event),
).toEqual([
"run.event_appended",
"approval.created",
"run.state_changed",
"phase.state_changed",
"session.state_changed",
]);
});
});
async function insertTemplate(client: DbClient, ids: string[]): Promise<string> {
const id = randomUUID();
ids.push(id);
await client.db.insert(workflowTemplates).values({
id,
name: "http-template",
version: 1,
hash: randomHash(),
definition: { name: "http-template", version: 1, roles: [], phases: [] },
});
return id;
}
async function insertPersona(client: DbClient, ids: string[]): Promise<string> {
const id = randomUUID();
ids.push(id);
await client.db.insert(agentPersonas).values({
id,
name: "http-persona",
version: 1,
hash: randomHash(),
definition: {
backend: "fake",
capabilities: [],
maxRiskLevel: "low",
modelConfig: {},
name: "http-persona",
promptConfig: {},
version: 1,
},
});
return id;
}
function randomHash(): string {
return randomUUID().replaceAll("-", "").padEnd(64, "0").slice(0, 64);
}
async function readSseUntil(
url: string,
marker: string,
onConnected?: () => Promise<void>,
headers?: Record<string, string>,
): Promise<string> {
return new Promise((resolve, reject) => {
let settled = false;
let connected = false;
let body = "";
const request = get(url, { headers }, (response) => {
response.setEncoding("utf8");
response.on("data", (chunk: string) => {
body += chunk;
if (body.includes(": connected") && !connected) {
connected = true;
onConnected?.().catch((error: unknown) => {
if (!settled) {
settled = true;
request.destroy();
reject(error);
}
});
}
if (body.includes(marker) && !settled) {
settled = true;
request.destroy();
resolve(body);
}
});
});
request.on("error", (error) => {
if (!settled) {
reject(error);
}
});
setTimeout(() => {
if (!settled) {
settled = true;
request.destroy();
reject(new Error(`Timed out waiting for ${marker}`));
}
}, 2000);
});
}

615
apps/api/src/http.ts Normal file
View File

@@ -0,0 +1,615 @@
import { ApprovalDecisionAction, DevflowError, getConfig } from "@devflow/core";
import type { DbClient } from "@devflow/db";
import {
agentPersonas,
approvalDecisions,
runs,
tuiSessions,
tuiTranscriptChunks,
workflowTemplates,
} from "@devflow/db";
import { type RunEngine, type RunStartInput, readRunStatus } from "@devflow/run-engine";
import sensible from "@fastify/sensible";
import { and, desc, eq, sql } from "drizzle-orm";
import Fastify, { type FastifyInstance, type FastifyReply } from "fastify";
import {
formatSseComment,
formatSseMessage,
latestGlobalRunEventId,
latestRunEventSeq,
latestTranscriptChunkId,
openSseResponse,
readGlobalRunEventRows,
readRunEventRows,
readTranscriptChunkRows,
runEventMessages,
transcriptChunkMessage,
} from "./sse.js";
type Database = DbClient["db"];
export interface HttpApiOptions {
db: Database;
engine: RunEngine;
heartbeatMs?: number;
pollMs?: number;
}
export async function createHttpApi(options: HttpApiOptions): Promise<FastifyInstance> {
const app = Fastify({ logger: false });
await app.register(sensible);
app.setErrorHandler((error, _request, reply) => {
const normalizedError = error instanceof Error ? error : new Error(String(error));
const statusCode = httpStatusForError(normalizedError);
reply.code(statusCode).send({
error: {
class: normalizedError instanceof DevflowError ? normalizedError.class : "fatal",
code: normalizedError instanceof DevflowError ? normalizedError.code : "internal_error",
message: normalizedError.message,
recoveryHint:
normalizedError instanceof DevflowError ? normalizedError.recoveryHint : undefined,
},
});
});
app.addHook("onRequest", async (_request, reply) => {
reply.header("access-control-allow-origin", "*");
reply.header("access-control-allow-methods", "GET,POST,OPTIONS");
reply.header("access-control-allow-headers", "content-type,last-event-id");
});
app.options("/*", async (_request, reply) => reply.code(204).send());
app.get("/health", async () => ({ ok: true }));
app.get("/api/health", async () => ({ ok: true }));
app.get("/api/runs", async () => ({
runs: await listRuns(options.db),
}));
app.post("/api/runs", async (request, reply) => {
const input = parseRunStartBody(request.body);
const result = await options.engine.startRun(input);
reply.code(201).send(result);
});
app.get<{ Params: { runId: string } }>("/api/runs/:runId", async (request) =>
options.engine.getStatus(request.params.runId),
);
app.get<{ Params: { runId: string } }>("/api/runs/:runId/transcript", async (request) => ({
chunks: await listTranscriptChunks(options.db, request.params.runId),
}));
app.get<{ Params: { runId: string } }>("/api/runs/:runId/sessions", async (request) => ({
sessions: await listSessions(options.db, request.params.runId),
}));
app.post<{
Body: unknown;
Params: { approvalRequestId: string; runId: string };
}>("/api/runs/:runId/approvals/:approvalRequestId", async (request, reply) => {
const body = asRecord(request.body);
const parsedAction = ApprovalDecisionAction.safeParse(body.action);
if (!parsedAction.success) {
throw badRequest("Invalid approval action", "invalid_approval_action");
}
if (typeof body.clientToken !== "string" || body.clientToken.length === 0) {
throw badRequest("clientToken is required", "invalid_client_token");
}
const clientToken = body.clientToken;
const comment = typeof body.comment === "string" ? body.comment : undefined;
const result = await withApprovalRouteLock(
options.db,
`${request.params.approvalRequestId}:${clientToken}`,
async () => {
const replay = await approvalDecisionExists(
options.db,
request.params.approvalRequestId,
parsedAction.data,
clientToken,
);
await options.engine.signalApproval(
request.params.runId,
request.params.approvalRequestId,
parsedAction.data,
clientToken,
comment,
);
const decision = await readApprovalDecision(
options.db,
request.params.approvalRequestId,
parsedAction.data,
clientToken,
);
return { decision, replay };
},
);
reply.code(result.replay ? 200 : 201).send({
ok: true,
clientToken,
decision: result.decision ?? null,
});
});
app.post<{ Params: { runId: string } }>("/api/runs/:runId/pause", async (request) => {
await options.engine.pauseRun(request.params.runId);
return { ok: true };
});
app.post<{ Params: { runId: string } }>("/api/runs/:runId/resume", async (request) => {
await options.engine.resumeRun(request.params.runId);
return { ok: true };
});
app.post<{ Body: unknown; Params: { runId: string } }>(
"/api/runs/:runId/abort",
async (request) => {
const body = asRecord(request.body);
await options.engine.abortRun(
request.params.runId,
typeof body.reason === "string" ? body.reason : "api_abort",
);
return { ok: true };
},
);
app.get("/api/templates", async () => ({
templates: await listTemplates(options.db),
}));
app.get("/api/personas", async () => ({
personas: await listPersonas(options.db),
}));
app.get("/api/doctor", async () => ({
checks: doctorChecks(),
}));
app.get("/api/backends", async () => ({
backends: getConfig().backends,
}));
app.get<{ Params: { runId: string } }>("/sse/runs/:runId", async (request, reply) => {
await readRunStatus(options.db, request.params.runId);
reply.hijack();
openSseResponse(reply.raw);
const heartbeatMs = options.heartbeatMs ?? 15_000;
const pollMs = options.pollMs ?? 500;
let cursor = parseLastEventId(request.headers["last-event-id"]);
let transcriptCursor = await latestTranscriptChunkId(options.db, request.params.runId);
let closed = false;
let polling = false;
const drainRows = async (deriveStateEvents: boolean, throughSeq?: bigint) => {
while (!closed) {
const rows = await readRunEventRows(options.db, request.params.runId, cursor, throughSeq);
if (rows.length === 0) {
return;
}
for (const row of rows) {
for (const message of runEventMessages(row, { deriveStateEvents })) {
reply.raw.write(formatSseMessage(message));
}
cursor = row.seq;
}
if (throughSeq !== undefined && cursor >= throughSeq) {
return;
}
}
};
const replayThroughSeq = await latestRunEventSeq(options.db, request.params.runId);
await drainRows(false, replayThroughSeq);
const pollOnce = async () => {
await drainRows(true);
const chunks = await readTranscriptChunkRows(
options.db,
request.params.runId,
transcriptCursor,
);
for (const chunk of chunks) {
reply.raw.write(formatSseMessage(transcriptChunkMessage(chunk)));
transcriptCursor = chunk.id;
}
};
const poll = setInterval(() => {
if (polling) {
return;
}
polling = true;
void pollOnce()
.catch(() => closeSse(reply))
.finally(() => {
polling = false;
});
}, pollMs);
const heartbeat = setInterval(() => {
reply.raw.write(formatSseComment("heartbeat"));
}, heartbeatMs);
request.raw.on("close", () => {
closed = true;
clearInterval(poll);
clearInterval(heartbeat);
});
});
app.get("/sse/global", async (request, reply) => {
reply.hijack();
openSseResponse(reply.raw);
const heartbeatMs = options.heartbeatMs ?? 15_000;
const pollMs = options.pollMs ?? 500;
let cursor =
parseOptionalLastEventId(request.headers["last-event-id"]) ??
(await latestGlobalRunEventId(options.db));
let polling = false;
const writeRows = async () => {
const rows = await readGlobalRunEventRows(options.db, cursor);
for (const row of rows) {
const messages = runEventMessages(row, { deriveStateEvents: true }).filter((message) =>
isGlobalSseEvent(message.event),
);
messages.forEach((message, index) => {
if (index === messages.length - 1) {
reply.raw.write(
formatSseMessage({
event: message.event,
data: message.data,
id: row.id.toString(),
}),
);
} else {
reply.raw.write(formatSseMessage({ event: message.event, data: message.data }));
}
});
cursor = row.id;
}
};
const poll = setInterval(() => {
if (polling) {
return;
}
polling = true;
void writeRows()
.catch(() => closeSse(reply))
.finally(() => {
polling = false;
});
}, pollMs);
const heartbeat = setInterval(() => {
reply.raw.write(formatSseComment("heartbeat"));
}, heartbeatMs);
request.raw.on("close", () => {
clearInterval(poll);
clearInterval(heartbeat);
});
});
return app;
}
async function listRuns(db: Database) {
const rows = await db
.select({
id: runs.id,
state: runs.state,
repoPath: runs.repoPath,
baseBranch: runs.baseBranch,
worktreeRoot: runs.worktreeRoot,
currentPhaseId: runs.currentPhaseId,
finalReportPath: runs.finalReportPath,
startedAt: runs.startedAt,
endedAt: runs.endedAt,
createdAt: runs.createdAt,
updatedAt: runs.updatedAt,
})
.from(runs)
.orderBy(desc(runs.createdAt))
.limit(100);
return rows.map((row) => ({
...row,
createdAt: row.createdAt.toISOString(),
endedAt: row.endedAt?.toISOString() ?? null,
startedAt: row.startedAt?.toISOString() ?? null,
updatedAt: row.updatedAt?.toISOString() ?? null,
}));
}
async function listTemplates(db: Database) {
return db
.select({
id: workflowTemplates.id,
name: workflowTemplates.name,
version: workflowTemplates.version,
hash: workflowTemplates.hash,
definition: workflowTemplates.definition,
createdAt: workflowTemplates.createdAt,
})
.from(workflowTemplates)
.orderBy(desc(workflowTemplates.createdAt));
}
async function listPersonas(db: Database) {
return db
.select({
id: agentPersonas.id,
name: agentPersonas.name,
version: agentPersonas.version,
hash: agentPersonas.hash,
definition: agentPersonas.definition,
createdAt: agentPersonas.createdAt,
})
.from(agentPersonas)
.orderBy(desc(agentPersonas.createdAt));
}
async function listTranscriptChunks(db: Database, runId: string) {
const rows = await db
.select({
sessionId: tuiTranscriptChunks.sessionId,
roleId: tuiSessions.roleId,
seq: tuiTranscriptChunks.seq,
content: tuiTranscriptChunks.content,
capturedAt: tuiTranscriptChunks.capturedAt,
})
.from(tuiTranscriptChunks)
.innerJoin(tuiSessions, eq(tuiTranscriptChunks.sessionId, tuiSessions.id))
.where(eq(tuiSessions.runId, runId))
.orderBy(desc(tuiTranscriptChunks.id))
.limit(200);
return rows.reverse().map((row) => ({
...row,
capturedAt: row.capturedAt.toISOString(),
seq: row.seq.toString(),
}));
}
async function listSessions(db: Database, runId: string) {
const rows = await db
.select({
id: tuiSessions.id,
roleId: tuiSessions.roleId,
backend: tuiSessions.backend,
cwd: tuiSessions.cwd,
expectedArtifactPath: tuiSessions.expectedArtifactPath,
expectedSchema: tuiSessions.expectedSchema,
lastPromptAt: tuiSessions.lastPromptAt,
recoveryAttempts: tuiSessions.recoveryAttempts,
state: tuiSessions.state,
tmuxSession: tuiSessions.tmuxSession,
tmuxWindow: tuiSessions.tmuxWindow,
})
.from(tuiSessions)
.where(eq(tuiSessions.runId, runId))
.orderBy(desc(tuiSessions.createdAt));
return rows.map((row) => ({
...row,
lastPromptAt: row.lastPromptAt?.toISOString() ?? null,
}));
}
async function approvalDecisionExists(
db: Database,
approvalRequestId: string,
action: string,
clientToken: string,
): Promise<boolean> {
const [row] = await db
.select({ id: approvalDecisions.id })
.from(approvalDecisions)
.where(
and(
eq(approvalDecisions.approvalRequestId, approvalRequestId),
eq(approvalDecisions.action, action),
eq(approvalDecisions.idempotencyKey, `${approvalRequestId}:${action}:${clientToken}`),
),
)
.limit(1);
return row !== undefined;
}
async function readApprovalDecision(
db: Database,
approvalRequestId: string,
action: string,
clientToken: string,
) {
const [row] = await db
.select({
id: approvalDecisions.id,
approvalRequestId: approvalDecisions.approvalRequestId,
action: approvalDecisions.action,
comment: approvalDecisions.comment,
decidedAt: approvalDecisions.decidedAt,
idempotencyKey: approvalDecisions.idempotencyKey,
})
.from(approvalDecisions)
.where(
and(
eq(approvalDecisions.approvalRequestId, approvalRequestId),
eq(approvalDecisions.action, action),
eq(approvalDecisions.idempotencyKey, `${approvalRequestId}:${action}:${clientToken}`),
),
)
.limit(1);
if (row === undefined) {
return undefined;
}
return {
...row,
decidedAt: row.decidedAt.toISOString(),
};
}
async function withApprovalRouteLock<T>(
db: Database,
key: string,
operation: () => Promise<T>,
): Promise<T> {
return db.transaction(async (tx) => {
await tx.execute(
sql`SELECT pg_advisory_xact_lock(hashtextextended(${`devflow:approval-route:${key}`}, 0))`,
);
return operation();
});
}
function doctorChecks() {
const config = getConfig();
return [
{
name: "config",
status: "pass",
detail: "Config loaded and validated",
remediation: "",
},
...config.backends.map((backend) => {
if (backend.id === "fake") {
return {
name: "backend.fake",
status: "pass",
detail: "Fake backend is always available",
remediation: "",
};
}
const resolved = backend.binaryPath !== undefined;
return {
name: `backend.${backend.id}`,
status: resolved ? "pass" : "warn",
detail: resolved ? backend.binaryPath : `${backend.id} binary did not resolve at startup`,
remediation: `Install ${backend.id} or disable it in DEVFLOW_BACKENDS_JSON.`,
};
}),
];
}
function parseRunStartBody(body: unknown): RunStartInput {
const record = asRecord(body);
if (typeof record.requirementsMd !== "string" || record.requirementsMd.length === 0) {
throw badRequest("requirementsMd is required", "invalid_run_start_input");
}
if (typeof record.repoPath !== "string" || record.repoPath.length === 0) {
throw badRequest("repoPath is required", "invalid_run_start_input");
}
if (typeof record.baseBranch !== "string" || record.baseBranch.length === 0) {
throw badRequest("baseBranch is required", "invalid_run_start_input");
}
const input: RunStartInput = {
baseBranch: record.baseBranch,
repoPath: record.repoPath,
requirementsMd: record.requirementsMd,
};
if (typeof record.runId === "string" && record.runId.length > 0) {
input.runId = record.runId;
}
if (typeof record.templateName === "string" && record.templateName.length > 0) {
input.templateName = record.templateName;
}
const templateVersion = optionalNumber(record.templateVersion);
if (templateVersion !== undefined) {
input.templateVersion = templateVersion;
}
if (typeof record.worktreeRoot === "string" && record.worktreeRoot.length > 0) {
input.worktreeRoot = record.worktreeRoot;
}
if (record.objective !== undefined) {
input.objective = record.objective;
}
if (isRecord(record.extra)) {
input.extra = record.extra;
}
if (isRecord(record.overrides)) {
input.overrides = record.overrides as NonNullable<RunStartInput["overrides"]>;
}
if (isRecord(record.scenarios)) {
input.scenarios = record.scenarios as NonNullable<RunStartInput["scenarios"]>;
}
return input;
}
function optionalNumber(value: unknown): number | undefined {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (typeof value === "string" && value.length > 0) {
const parsed = Number(value);
if (Number.isFinite(parsed)) {
return parsed;
}
}
return undefined;
}
function asRecord(value: unknown): Record<string, unknown> {
if (!isRecord(value)) {
throw badRequest("Request body must be a JSON object", "invalid_request_body");
}
return value;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return value !== null && typeof value === "object" && !Array.isArray(value);
}
function badRequest(message: string, code: string) {
return new DevflowError(message, {
class: "human_required",
code,
recoveryHint: message,
});
}
function httpStatusForError(error: Error): number {
if (!(error instanceof DevflowError)) {
return 500;
}
if (error.code.endsWith("_not_found")) {
return 404;
}
if (
error.code === "active_run_exists" ||
error.code === "approval_conflict" ||
error.code === "invalid_client_token" ||
error.code === "invalid_request_body" ||
error.code === "invalid_run_start_input" ||
error.code === "invalid_approval_action"
) {
return error.code === "approval_conflict" || error.code === "active_run_exists" ? 409 : 400;
}
return error.class === "fatal" ? 500 : 409;
}
function parseLastEventId(value: string | string[] | undefined): bigint {
const raw = Array.isArray(value) ? value.at(-1) : value;
if (raw === undefined || raw.length === 0) {
return 0n;
}
try {
return BigInt(raw);
} catch {
return 0n;
}
}
function parseOptionalLastEventId(value: string | string[] | undefined): bigint | undefined {
const raw = Array.isArray(value) ? value.at(-1) : value;
if (raw === undefined || raw.length === 0) {
return undefined;
}
try {
return BigInt(raw);
} catch {
return undefined;
}
}
function isGlobalSseEvent(event: string): boolean {
return (
event === "run.state_changed" || event === "approval.created" || event === "approval.resolved"
);
}
function closeSse(reply: FastifyReply): void {
if (!reply.raw.destroyed) {
reply.raw.end();
}
}

View File

@@ -15,8 +15,11 @@ import {
import { TemporalRunEngine, temporalNamespace } from "@devflow/workflows";
import { Connection, WorkflowClient } from "@temporalio/client";
import { createHttpApi } from "./http.js";
import { recoverM4ApiStartup, startM4SessionManager } from "./startup.js";
export * from "./http.js";
export * from "./sse.js";
export * from "./startup.js";
export interface StartM4ApiOptions {
@@ -62,10 +65,57 @@ export interface StartTemporalApiResult {
export type StartApiOptions = StartTemporalApiOptions;
export type StartApiResult = StartTemporalApiResult;
export interface StartHttpApiOptions extends StartTemporalApiOptions {
host?: string;
port?: number;
}
export interface StartHttpApiResult extends StartTemporalApiResult {
url: string;
}
export async function startApi(options: StartApiOptions = {}): Promise<StartApiResult> {
return startTemporalApi(options);
}
export async function startHttpApi(options: StartHttpApiOptions = {}): Promise<StartHttpApiResult> {
const config = options.dbClient === undefined ? getConfig() : undefined;
const ownedClient = options.dbClient === undefined;
const dbClient =
options.dbClient ?? createDbClient(config?.DATABASE_URL ?? getConfig().DATABASE_URL);
const api = await startTemporalApi({ ...options, dbClient });
const server = await createHttpApi({ db: dbClient.db, engine: api.engine });
const host = options.host ?? "127.0.0.1";
const port = options.port ?? 3000;
try {
const url = await server.listen({ host, port });
return {
...api,
url,
async stop() {
try {
await server.close();
} finally {
try {
await api.stop();
} finally {
if (ownedClient) {
await dbClient.close();
}
}
}
},
};
} catch (error) {
await server.close().catch(() => undefined);
await api.stop().catch(() => undefined);
if (ownedClient) {
await dbClient.close();
}
throw error;
}
}
export async function startM4Api(options: StartM4ApiOptions = {}): Promise<StartM4ApiResult> {
const ownedClient = options.dbClient === undefined;
const config = ownedClient || options.workspaceRoot === undefined ? getConfig() : undefined;
@@ -254,7 +304,7 @@ function dbOnlySessionRuntime(): SessionRuntime {
}
if (isDirectEntry(import.meta.url, process.argv)) {
startApi()
startHttpApi()
.then(async (api) => {
await waitForShutdownSignal();
await api.stop();

443
apps/api/src/sse.ts Normal file
View File

@@ -0,0 +1,443 @@
import type { ServerResponse } from "node:http";
import { and, asc, desc, eq, gt, lte } from "drizzle-orm";
import type { DbClient } from "@devflow/db";
import { runEvents, tuiSessions, tuiTranscriptChunks } from "@devflow/db";
type Database = DbClient["db"];
export interface SseMessage {
event: string;
data: unknown;
id?: string;
}
export interface RunEventRow {
id: bigint;
runId: string;
phaseId: string | null;
seq: bigint;
type: string;
payload: unknown;
ts: Date;
}
export interface TranscriptChunkRow {
id: bigint;
runId: string;
sessionId: string;
roleId: string;
seq: bigint;
content: string;
capturedAt: Date;
}
export function formatSseMessage(message: SseMessage): string {
const lines: string[] = [];
if (message.id !== undefined) {
lines.push(`id: ${message.id}`);
}
lines.push(`event: ${message.event}`);
lines.push(`data: ${JSON.stringify(message.data)}`);
return `${lines.join("\n")}\n\n`;
}
export function formatSseComment(comment: string): string {
return `: ${comment}\n\n`;
}
export function openSseResponse(response: ServerResponse): void {
response.writeHead(200, {
"cache-control": "no-cache",
connection: "keep-alive",
"content-type": "text/event-stream",
"x-accel-buffering": "no",
});
response.write(formatSseComment("connected"));
}
export function runEventMessages(row: RunEventRow, options: { deriveStateEvents: boolean }) {
const base = runEventAppendedMessage(row);
if (!options.deriveStateEvents) {
return [base];
}
return [base, ...derivedRunEventMessages(row)];
}
export function runEventAppendedMessage(row: RunEventRow): SseMessage {
return {
id: row.seq.toString(),
event: "run.event_appended",
data: {
eventId: row.seq.toString(),
id: row.id.toString(),
payload: row.payload,
phaseId: row.phaseId,
runId: row.runId,
ts: row.ts.toISOString(),
type: row.type,
},
};
}
export function transcriptChunkMessage(row: TranscriptChunkRow): SseMessage {
return {
event: "transcript.chunk_appended",
data: {
content: row.content.slice(0, 4096),
runId: row.runId,
roleId: row.roleId,
seq: row.seq.toString(),
sessionId: row.sessionId,
ts: row.capturedAt.toISOString(),
},
};
}
export async function readRunEventRows(
db: Database,
runId: string,
afterSeq: bigint,
throughSeq?: bigint,
): Promise<RunEventRow[]> {
const conditions = [eq(runEvents.runId, runId), gt(runEvents.seq, afterSeq)];
if (throughSeq !== undefined) {
conditions.push(lte(runEvents.seq, throughSeq));
}
return db
.select({
id: runEvents.id,
runId: runEvents.runId,
phaseId: runEvents.phaseId,
seq: runEvents.seq,
type: runEvents.type,
payload: runEvents.payload,
ts: runEvents.ts,
})
.from(runEvents)
.where(and(...conditions))
.orderBy(asc(runEvents.seq))
.limit(100);
}
export async function latestRunEventSeq(db: Database, runId: string): Promise<bigint> {
const [row] = await db
.select({ seq: runEvents.seq })
.from(runEvents)
.where(eq(runEvents.runId, runId))
.orderBy(desc(runEvents.seq))
.limit(1);
return row?.seq ?? 0n;
}
export async function readGlobalRunEventRows(
db: Database,
afterId: bigint,
): Promise<RunEventRow[]> {
return db
.select({
id: runEvents.id,
runId: runEvents.runId,
phaseId: runEvents.phaseId,
seq: runEvents.seq,
type: runEvents.type,
payload: runEvents.payload,
ts: runEvents.ts,
})
.from(runEvents)
.where(gt(runEvents.id, afterId))
.orderBy(asc(runEvents.id))
.limit(100);
}
export async function latestGlobalRunEventId(db: Database): Promise<bigint> {
const [row] = await db
.select({ id: runEvents.id })
.from(runEvents)
.orderBy(desc(runEvents.id))
.limit(1);
return row?.id ?? 0n;
}
export async function readTranscriptChunkRows(
db: Database,
runId: string,
afterId: bigint,
): Promise<TranscriptChunkRow[]> {
return db
.select({
id: tuiTranscriptChunks.id,
runId: tuiSessions.runId,
sessionId: tuiTranscriptChunks.sessionId,
roleId: tuiSessions.roleId,
seq: tuiTranscriptChunks.seq,
content: tuiTranscriptChunks.content,
capturedAt: tuiTranscriptChunks.capturedAt,
})
.from(tuiTranscriptChunks)
.innerJoin(tuiSessions, eq(tuiTranscriptChunks.sessionId, tuiSessions.id))
.where(and(eq(tuiSessions.runId, runId), gt(tuiTranscriptChunks.id, afterId)))
.orderBy(asc(tuiTranscriptChunks.id))
.limit(100);
}
export async function latestTranscriptChunkId(db: Database, runId: string): Promise<bigint> {
const [row] = await db
.select({ id: tuiTranscriptChunks.id })
.from(tuiTranscriptChunks)
.innerJoin(tuiSessions, eq(tuiTranscriptChunks.sessionId, tuiSessions.id))
.where(eq(tuiSessions.runId, runId))
.orderBy(desc(tuiTranscriptChunks.id))
.limit(1);
return row?.id ?? 0n;
}
function derivedRunEventMessages(row: RunEventRow): SseMessage[] {
if (row.type.startsWith("run.")) {
const next = runStateForEvent(row.type, row.payload);
if (next === null) {
return [];
}
return [
{
id: row.seq.toString(),
event: "run.state_changed",
data: { runId: row.runId, prev: stringPayload(row.payload, "pausedFromState"), next },
},
];
}
if (row.type.startsWith("phase.")) {
const phaseKey = stringPayload(row.payload, "phaseKey");
const next = phaseStateForEvent(row.type);
if (next === null) {
return [];
}
const messages: SseMessage[] = [];
const runState = stringPayload(row.payload, "runState");
if (row.type === "phase.started" && runState !== null) {
messages.push({
id: row.seq.toString(),
event: "run.state_changed",
data: { runId: row.runId, prev: null, next: runState },
});
}
messages.push({
id: row.seq.toString(),
event: "phase.state_changed",
data: {
runId: row.runId,
phaseId: row.phaseId,
phaseKey,
prev: null,
next,
},
});
return messages;
}
if (row.type === "approval.requested") {
const messages: SseMessage[] = [
{
id: row.seq.toString(),
event: "approval.created",
data: {
approvalId: stringPayload(row.payload, "approvalRequestId"),
gateKey: stringPayload(row.payload, "gateKey"),
runId: row.runId,
},
},
];
const runState = stringPayload(row.payload, "runState");
if (runState !== null) {
messages.push({
id: row.seq.toString(),
event: "run.state_changed",
data: { runId: row.runId, prev: null, next: runState },
});
}
const phaseState = stringPayload(row.payload, "phaseState");
if (phaseState !== null) {
messages.push({
id: row.seq.toString(),
event: "phase.state_changed",
data: {
runId: row.runId,
phaseId: row.phaseId,
phaseKey: stringPayload(row.payload, "phaseKey"),
prev: null,
next: phaseState,
},
});
}
const sessionState = stringPayload(row.payload, "sessionState");
const sessionId = stringPayload(row.payload, "sessionId");
const roleId = stringPayload(row.payload, "roleId");
if (sessionState !== null && sessionId !== null) {
messages.push({
id: row.seq.toString(),
event: "session.state_changed",
data: {
next: sessionState,
prev: null,
roleId,
runId: row.runId,
sessionId,
},
});
}
return messages;
}
if (row.type === "approval.resolved") {
return [
{
id: row.seq.toString(),
event: "approval.resolved",
data: {
action: stringPayload(row.payload, "action"),
approvalId: stringPayload(row.payload, "approvalRequestId"),
runId: row.runId,
},
},
];
}
if (row.type.startsWith("session.")) {
const next = sessionStateForEvent(row.type);
if (next === null) {
return [];
}
return [
{
id: row.seq.toString(),
event: "session.state_changed",
data: {
next,
prev: null,
roleId: stringPayload(row.payload, "roleId"),
runId: row.runId,
sessionId: stringPayload(row.payload, "sessionId"),
},
},
];
}
if (row.type === "artifact.expected") {
return [
{
id: row.seq.toString(),
event: "phase.state_changed",
data: {
runId: row.runId,
phaseId: row.phaseId,
phaseKey: stringPayload(row.payload, "phaseKey"),
prev: null,
next: "awaiting_artifact",
},
},
];
}
if (row.type === "artifact.validated" || row.type === "artifact.invalid") {
const messages: SseMessage[] = [
{
id: row.seq.toString(),
event: "phase.state_changed",
data: {
runId: row.runId,
phaseId: row.phaseId,
phaseKey: stringPayload(row.payload, "phaseKey"),
prev: null,
next: "validating",
},
},
];
if (row.type === "artifact.validated") {
messages.push({
id: row.seq.toString(),
event: "artifact.validated",
data: {
artifactId: stringPayload(row.payload, "artifactId"),
path: stringPayload(row.payload, "path"),
runId: row.runId,
schemaId: stringPayload(row.payload, "schemaId"),
valid: true,
},
});
}
return messages;
}
return [];
}
function runStateForEvent(type: string, payload: unknown): string | null {
const explicit = stringPayload(payload, "state") ?? stringPayload(payload, "resumedTo");
if (explicit !== null) {
return explicit;
}
if (type === "run.created") {
return "created";
}
if (type === "run.started") {
return "bound";
}
if (type === "run.paused") {
return "paused";
}
if (type === "run.resumed") {
const cause = stringPayload(payload, "cause");
if (cause?.endsWith(":request_changes")) {
return "planning";
}
return "executing";
}
if (type === "run.completed") {
return "completed";
}
if (type === "run.failed") {
return "failed";
}
if (type === "run.aborted") {
return "aborted";
}
return null;
}
function phaseStateForEvent(type: string): string | null {
if (type === "phase.started") {
return "running";
}
if (type === "phase.completed") {
return "completed";
}
if (type === "phase.failed") {
return "failed";
}
if (type === "phase.skipped") {
return "skipped";
}
return null;
}
function sessionStateForEvent(type: string): string | null {
if (type === "session.created") {
return "CREATED";
}
if (type === "session.ready" || type === "session.idle" || type === "session.recovered") {
return "READY";
}
if (type === "session.busy") {
return "BUSY";
}
if (type === "session.crashed") {
return "CRASHED";
}
if (type === "session.failed") {
return "FAILED_NEEDS_HUMAN";
}
return null;
}
function stringPayload(payload: unknown, key: string): string | null {
if (payload === null || typeof payload !== "object") {
return null;
}
const value = (payload as Record<string, unknown>)[key];
return typeof value === "string" ? value : null;
}