import { randomUUID } from "node:crypto"; import { mkdtempSync, realpathSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { DevflowError } from "@devflow/core"; import { type DbClient, createDbClient, runEvents, runs, tuiSessions, workflowTemplates, } from "@devflow/db"; import { FakeSessionAdapter, type SessionAdapter, type SessionHandle } from "@devflow/session"; import { eq, inArray } from "drizzle-orm"; import { afterEach, describe, expect, it } from "vitest"; import { startWorker } from "./index.js"; const databaseUrl = process.env.DATABASE_URL ?? "postgres://devflow:devflow@127.0.0.1:55432/devflow"; class ResumeTrackingAdapter extends FakeSessionAdapter { resumeAttempts = 0; override async resume(handle: SessionHandle): Promise { this.resumeAttempts += 1; return super.resume(handle); } } describe("startWorker", () => { let client: DbClient | undefined; const runIds: string[] = []; const templateIds: string[] = []; const tempRoots: string[] = []; afterEach(async () => { if (client !== undefined) { if (runIds.length > 0) { await client.db.delete(runs).where(inArray(runs.id, [...runIds])); } if (templateIds.length > 0) { await client.db .delete(workflowTemplates) .where(inArray(workflowTemplates.id, [...templateIds])); } await client.close(); client = undefined; } for (const root of tempRoots.splice(0)) { rmSync(root, { recursive: true, force: true }); } runIds.length = 0; templateIds.length = 0; }); it("initializes SessionManager recovery before accepting Temporal work", async () => { client = createDbClient(databaseUrl); const templateId = randomUUID(); const runId = randomUUID(); const sessionId = randomUUID(); const repoPath = realpathSync(mkdtempSync(join(tmpdir(), "devflow-worker-repo-"))); const worktreeRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-worker-worktree-"))); tempRoots.push(repoPath, worktreeRoot); templateIds.push(templateId); runIds.push(runId); await client.db.insert(workflowTemplates).values({ id: templateId, name: `worker-recovery-${templateId}`, version: 1, hash: "f".repeat(64), definition: { name: "worker-recovery", version: 1, roles: [], phases: [] }, }); await client.db.insert(runs).values({ id: runId, templateId, templateHash: "f".repeat(64), state: "executing", repoPath, baseBranch: "main", worktreeRoot, }); await client.db.insert(tuiSessions).values({ id: sessionId, runId, roleId: "spec_writer", backend: "fake", cwd: worktreeRoot, state: "BOOTSTRAPPING", }); const adapter = new ResumeTrackingAdapter({ sessionIdFactory: () => sessionId, writeDelayMs: 0, }); await adapter.start({ runId, roleId: "spec_writer", backend: "fake", cwd: worktreeRoot, }); const worker = await startWorkerWhenLockFree({ config: { DATABASE_URL: databaseUrl, LOG_LEVEL: "info", TEMPORAL_ADDRESS: "localhost:7233", WORKSPACE_ROOT: worktreeRoot, MAX_CONCURRENT_RUNS: 4, SESSION_MAX_HUNG_MS: 20 * 60 * 1000, backends: [{ id: "fake", enabled: true }], }, dbClient: client, recoveryRunIds: [runId], sessionAdapter: adapter, connectionFactory: async () => fakeConnection(), workerFactory: async () => fakeWorker(), }); try { expect(worker.recovery).toEqual({ failedSessionIds: [], recoveredSessionIds: [sessionId] }); expect(adapter.resumeAttempts).toBe(1); const [session] = await client.db .select({ state: tuiSessions.state }) .from(tuiSessions) .where(eq(tuiSessions.id, sessionId)); expect(session).toEqual({ state: "READY" }); const events = await client.db .select({ type: runEvents.type }) .from(runEvents) .where(eq(runEvents.runId, runId)) .orderBy(runEvents.seq); expect(events.map((event) => event.type)).toEqual(["session.created", "session.ready"]); } finally { await worker.shutdown(); } }); it("releases acquired resources when SessionManager startup fails", async () => { client = createDbClient(databaseUrl); const adapter: SessionAdapter = new FakeSessionAdapter(); const first = await startWorkerWhenLockFree({ config: { DATABASE_URL: databaseUrl, LOG_LEVEL: "info", TEMPORAL_ADDRESS: "localhost:7233", WORKSPACE_ROOT: realpathSync(mkdtempSync(join(tmpdir(), "devflow-worker-workspace-"))), MAX_CONCURRENT_RUNS: 4, SESSION_MAX_HUNG_MS: 20 * 60 * 1000, backends: [{ id: "fake", enabled: true }], }, dbClient: client, recoveryRunIds: [], sessionAdapter: adapter, connectionFactory: async () => fakeConnection(), workerFactory: async () => fakeWorker(), }); try { await expect( startWorker({ config: { DATABASE_URL: databaseUrl, LOG_LEVEL: "info", TEMPORAL_ADDRESS: "localhost:7233", WORKSPACE_ROOT: realpathSync(mkdtempSync(join(tmpdir(), "devflow-worker-workspace-"))), MAX_CONCURRENT_RUNS: 4, SESSION_MAX_HUNG_MS: 20 * 60 * 1000, backends: [{ id: "fake", enabled: true }], }, dbClient: client, recoveryRunIds: [], connectionFactory: async () => fakeConnection(), workerFactory: async () => fakeWorker(), }), ).rejects.toMatchObject({ code: "session_manager_already_running" }); } finally { await first.shutdown(); } }); it("drains SessionManager resources when the Temporal worker run loop stops", async () => { client = createDbClient(databaseUrl); const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-worker-run-"))); tempRoots.push(workspaceRoot); const connection = countingConnection(); const runtime = countingWorker(); const worker = await startWorkerWhenLockFree({ config: { DATABASE_URL: databaseUrl, LOG_LEVEL: "info", TEMPORAL_ADDRESS: "localhost:7233", WORKSPACE_ROOT: workspaceRoot, MAX_CONCURRENT_RUNS: 4, SESSION_MAX_HUNG_MS: 20 * 60 * 1000, backends: [{ id: "fake", enabled: true }], }, dbClient: client, recoveryRunIds: [], connectionFactory: async () => connection, workerFactory: async () => runtime, }); await worker.run(); expect(runtime.runs).toBe(1); expect(runtime.shutdowns).toBe(1); expect(connection.closes).toBe(1); const next = await startWorkerWhenLockFree({ config: { DATABASE_URL: databaseUrl, LOG_LEVEL: "info", TEMPORAL_ADDRESS: "localhost:7233", WORKSPACE_ROOT: workspaceRoot, MAX_CONCURRENT_RUNS: 4, SESSION_MAX_HUNG_MS: 20 * 60 * 1000, backends: [{ id: "fake", enabled: true }], }, dbClient: client, recoveryRunIds: [], connectionFactory: async () => fakeConnection(), workerFactory: async () => fakeWorker(), }); await next.shutdown(); }); it("drains SessionManager resources when Temporal worker shutdown fails", async () => { client = createDbClient(databaseUrl); const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-worker-shutdown-"))); tempRoots.push(workspaceRoot); const connection = countingConnection(); const worker = await startWorkerWhenLockFree({ config: { DATABASE_URL: databaseUrl, LOG_LEVEL: "info", TEMPORAL_ADDRESS: "localhost:7233", WORKSPACE_ROOT: workspaceRoot, MAX_CONCURRENT_RUNS: 4, SESSION_MAX_HUNG_MS: 20 * 60 * 1000, backends: [{ id: "fake", enabled: true }], }, dbClient: client, recoveryRunIds: [], connectionFactory: async () => connection, workerFactory: async () => failingShutdownWorker(), }); await expect(worker.shutdown()).rejects.toThrow("worker shutdown failed"); expect(connection.closes).toBe(1); const next = await startWorkerWhenLockFree({ config: { DATABASE_URL: databaseUrl, LOG_LEVEL: "info", TEMPORAL_ADDRESS: "localhost:7233", WORKSPACE_ROOT: workspaceRoot, MAX_CONCURRENT_RUNS: 4, SESSION_MAX_HUNG_MS: 20 * 60 * 1000, backends: [{ id: "fake", enabled: true }], }, dbClient: client, recoveryRunIds: [], connectionFactory: async () => fakeConnection(), workerFactory: async () => fakeWorker(), }); await next.shutdown(); }); }); function fakeConnection() { return { close: async () => undefined, }; } function fakeWorker() { return { run: async () => undefined, shutdown: () => undefined, }; } function countingConnection() { return { closes: 0, async close() { this.closes += 1; }, }; } function countingWorker() { return { runs: 0, shutdowns: 0, async run() { this.runs += 1; }, shutdown() { this.shutdowns += 1; }, }; } function failingShutdownWorker() { return { run: async () => undefined, shutdown() { throw new Error("worker shutdown failed"); }, }; } async function startWorkerWhenLockFree(options: Parameters[0]) { const deadline = Date.now() + 6_000; let lastError: unknown; while (Date.now() < deadline) { try { return await startWorker(options); } catch (error) { lastError = error; if (!(error instanceof DevflowError) || error.code !== "session_manager_already_running") { throw error; } await new Promise((resolveWait) => setTimeout(resolveWait, 50)); } } throw lastError; }