From ef4c56e6b08efe66235d71d6d251e4c118ea14bc Mon Sep 17 00:00:00 2001 From: chungyeong Date: Wed, 13 May 2026 21:44:58 +0900 Subject: [PATCH] feat: add real tmux session manager --- apps/api/src/index.ts | 9 +- apps/worker/src/index.test.ts | 49 + apps/worker/src/index.ts | 22 +- packages/db/src/repositories/run-event.ts | 2 +- packages/run-engine/src/engine.test.ts | 480 ++++++++- packages/run-engine/src/engine.ts | 270 +++++- .../run-engine/src/fake-phase-harness.test.ts | 329 +++++++ packages/run-engine/src/fake-phase-harness.ts | 146 ++- packages/session/src/adapter.ts | 10 + packages/session/src/index.ts | 1 + packages/session/src/manager.test.ts | 334 +++++++ packages/session/src/manager.ts | 149 ++- packages/session/src/tmux.test.ts | 917 ++++++++++++++++++ packages/session/src/tmux.ts | 857 ++++++++++++++++ 14 files changed, 3499 insertions(+), 76 deletions(-) create mode 100644 packages/session/src/manager.test.ts create mode 100644 packages/session/src/tmux.test.ts create mode 100644 packages/session/src/tmux.ts diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 289e83b..2ba8ec3 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -112,9 +112,12 @@ export async function startM4Api(options: StartM4ApiOptions = {}): Promise { }); await next.shutdown(); }); + + it("drains SessionManager resources when Temporal worker shutdown fails", async () => { + client = createDbClient(databaseUrl); + const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-worker-shutdown-"))); + tempRoots.push(workspaceRoot); + const connection = countingConnection(); + const worker = await startWorkerWhenLockFree({ + config: { + DATABASE_URL: databaseUrl, + LOG_LEVEL: "info", + TEMPORAL_ADDRESS: "localhost:7233", + WORKSPACE_ROOT: workspaceRoot, + MAX_CONCURRENT_RUNS: 4, + backends: [{ id: "fake", enabled: true }], + }, + dbClient: client, + recoveryRunIds: [], + connectionFactory: async () => connection, + workerFactory: async () => failingShutdownWorker(), + }); + + await expect(worker.shutdown()).rejects.toThrow("worker shutdown failed"); + expect(connection.closes).toBe(1); + + const next = await startWorkerWhenLockFree({ + config: { + DATABASE_URL: databaseUrl, + LOG_LEVEL: "info", + TEMPORAL_ADDRESS: "localhost:7233", + WORKSPACE_ROOT: workspaceRoot, + MAX_CONCURRENT_RUNS: 4, + backends: [{ id: "fake", enabled: true }], + }, + dbClient: client, + recoveryRunIds: [], + connectionFactory: async () => fakeConnection(), + workerFactory: async () => fakeWorker(), + }); + await next.shutdown(); + }); }); function fakeConnection() { @@ -257,6 +297,15 @@ function countingWorker() { }; } +function failingShutdownWorker() { + return { + run: async () => undefined, + shutdown() { + throw new Error("worker shutdown failed"); + }, + }; +} + async function startWorkerWhenLockFree(options: Parameters[0]) { const deadline = Date.now() + 6_000; let lastError: unknown; diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index da8789e..4f3941c 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -71,11 +71,23 @@ export async function startWorker(options: StartWorkerOptions = {}) { let shutdownPromise: Promise | undefined; const shutdown = () => { shutdownPromise ??= (async () => { - await Promise.resolve(startedWorker.shutdown()); - await sessionManager.shutdown(); - await startedConnection.close(); - if (ownedClient) { - await dbClient.close(); + let workerShutdownError: unknown; + try { + await Promise.resolve(startedWorker.shutdown()); + } catch (error) { + workerShutdownError = error; + } finally { + try { + await sessionManager.shutdown(); + } finally { + await startedConnection.close(); + if (ownedClient) { + await dbClient.close(); + } + } + } + if (workerShutdownError !== undefined) { + throw workerShutdownError; } })(); return shutdownPromise; diff --git a/packages/db/src/repositories/run-event.ts b/packages/db/src/repositories/run-event.ts index 78ef727..b15a5dc 100644 --- a/packages/db/src/repositories/run-event.ts +++ b/packages/db/src/repositories/run-event.ts @@ -71,7 +71,7 @@ export class RunEventRepository { } await tx.execute( - sql`SELECT pg_advisory_xact_lock(hashtext('devflow:run-events'), hashtext(${input.runId}))`, + sql`SELECT pg_advisory_xact_lock(hashtextextended(${`devflow:run-events:${input.runId}`}, 0))`, ); if (input.phaseId !== undefined) { diff --git a/packages/run-engine/src/engine.test.ts b/packages/run-engine/src/engine.test.ts index 67663a1..4fa354b 100644 --- a/packages/run-engine/src/engine.test.ts +++ b/packages/run-engine/src/engine.test.ts @@ -15,7 +15,7 @@ import { join, resolve } from "node:path"; import { and, eq, inArray } from "drizzle-orm"; import { afterEach, describe, expect, it } from "vitest"; -import { loadPersonaFiles, loadTemplateFiles, validateArtifact } from "@devflow/core"; +import { DevflowError, loadPersonaFiles, loadTemplateFiles, validateArtifact } from "@devflow/core"; import { type DbClient, agentPersonas, @@ -30,7 +30,14 @@ import { tuiSessions, workflowTemplates, } from "@devflow/db"; -import { FakeSessionAdapter, type SessionAdapter, SessionManager } from "@devflow/session"; +import { + FakeSessionAdapter, + type SessionAdapter, + type SessionHandle, + SessionManager, + type SessionRuntime, + type TranscriptChunk, +} from "@devflow/session"; import { DbRunEngine, sweepM4ProcessRestart } from "./engine.js"; @@ -94,6 +101,110 @@ class DisposeCountingFakeAdapter extends FakeSessionAdapter { } } +class DisposeFailsFakeAdapter extends FakeSessionAdapter { + override async dispose(handle: Parameters[0]): Promise { + throw new DevflowError("dispose failed", { + class: "recoverable", + code: "pane_briefly_unresponsive", + recoveryHint: `session=${handle.sessionId}`, + }); + } +} + +class CaptureOrderingFakeAdapter extends FakeSessionAdapter { + events: string[] = []; + failCapture = false; + + override async *capture( + handle: Parameters[0], + fromSeq: bigint, + ): AsyncIterable { + this.events.push("capture"); + if (this.failCapture) { + throw new DevflowError("transcript capture failed", { + class: "recoverable", + code: "pane_briefly_unresponsive", + }); + } + yield* super.capture(handle, fromSeq); + } + + override async dispose(handle: Parameters[0]): Promise { + this.events.push("dispose"); + await super.dispose(handle); + } +} + +class CaptureFailsAfterDisposeFakeAdapter extends FakeSessionAdapter { + readonly disposedSessionIds = new Set(); + readonly events: string[] = []; + + override async *capture( + handle: Parameters[0], + fromSeq: bigint, + ): AsyncIterable { + this.events.push("capture"); + if (this.disposedSessionIds.has(handle.sessionId)) { + throw new DevflowError("tmux session already disposed", { + class: "recoverable", + code: "pane_briefly_unresponsive", + recoveryHint: `session=${handle.sessionId}`, + }); + } + yield* super.capture(handle, fromSeq); + } + + override async dispose(handle: Parameters[0]): Promise { + this.events.push("dispose"); + this.disposedSessionIds.add(handle.sessionId); + await super.dispose(handle); + } +} + +class TerminalHandleRecordingRuntime implements SessionRuntime { + readonly adapter = new FakeSessionAdapter({ writeDelayMs: 0 }); + readonly captureHandles: SessionHandle[] = []; + readonly disposeHandles: SessionHandle[] = []; + + trackOperation(operation: Promise): Promise { + return operation; + } + + start(...args: Parameters): ReturnType { + return this.adapter.start(...args); + } + + sendPrompt( + ...args: Parameters + ): ReturnType { + return this.adapter.sendPrompt(...args); + } + + probe(...args: Parameters): ReturnType { + return this.adapter.probe(...args); + } + + resume(...args: Parameters): ReturnType { + return this.adapter.resume(...args); + } + + rebootstrap( + ...args: Parameters + ): ReturnType { + return this.adapter.rebootstrap(...args); + } + + async *capture(handle: SessionHandle, fromSeq: bigint): ReturnType { + this.captureHandles.push(handle); + yield* this.adapter.capture(handle, fromSeq); + } + + async dispose(handle: SessionHandle): Promise { + this.disposeHandles.push(handle); + await this.adapter.dispose(handle); + } +} + describe("DbRunEngine", () => { let client: DbClient | undefined; const runIds: string[] = []; @@ -857,6 +968,43 @@ describe("DbRunEngine", () => { expect(sessions.every((session) => session.state === "FAILED_NEEDS_HUMAN")).toBe(true); }); + it("repairs final reports when direct advance sees a terminalized fatal phase", async () => { + client = createDbClient(databaseUrl); + await seedDevelopmentRegistry(client.db); + const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-"))); + const repoPath = createGitRepo(); + tempRoots.push(workspaceRoot, repoPath); + const engine = new DbRunEngine({ + db: client.db, + sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })), + workspaceRoot, + maxConcurrentRuns: 100, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 }, + }); + + const { runId } = await engine.startRun({ + requirementsMd: "Direct advance should repair terminal reports.", + repoPath, + baseBranch: "main", + scenarios: { + phase_plan: "unknown-schema", + }, + }); + runIds.push(runId); + const specApproval = pendingApproval(await engine.getStatus(runId), "spec_approved"); + await engine.signalApproval(runId, specApproval.id, "approve", randomUUID()); + const phasePlanApproval = pendingApproval(await engine.getStatus(runId), "phase_plan_approved"); + await engine.signalApprovalForWorkflow(runId, phasePlanApproval.id, "approve", randomUUID()); + + await expect(engine.advanceRunUntilBlocked(runId)).rejects.toMatchObject({ + code: "fake_fixture_missing", + }); + + const failed = await engine.getStatus(runId); + expect(failed.run.state).toBe("failed"); + expect(failed.run.finalReportPath).toMatch(/\.report\.md$/); + }); + it("does not start another pending phase when approval replay sees active work", async () => { client = createDbClient(databaseUrl); await seedDevelopmentRegistry(client.db); @@ -1061,6 +1209,278 @@ describe("DbRunEngine", () => { expect((await engine.getStatus(runId)).run.state).toBe("aborted"); }); + it("surfaces session dispose failures during abort", async () => { + client = createDbClient(databaseUrl); + await seedDevelopmentRegistry(client.db); + const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-"))); + const repoPath = createGitRepo(); + tempRoots.push(workspaceRoot, repoPath); + const engine = new DbRunEngine({ + db: client.db, + sessions: sessionRuntime(client.db, new DisposeFailsFakeAdapter({ writeDelayMs: 0 })), + workspaceRoot, + maxConcurrentRuns: 100, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 }, + }); + + const { runId } = await engine.startRun({ + requirementsMd: "Abort while waiting for approval.", + repoPath, + baseBranch: "main", + }); + runIds.push(runId); + + await expect(engine.abortRun(runId, "user requested abort")).rejects.toMatchObject({ + code: "pane_briefly_unresponsive", + }); + + const aborted = await engine.getStatus(runId); + expect(aborted.run.state).toBe("aborted"); + const [run] = await client.db + .select({ finalReportPath: runs.finalReportPath }) + .from(runs) + .where(eq(runs.id, runId)); + expect(run?.finalReportPath).toMatch(/\.report\.md$/); + }); + + it("captures terminal session transcripts before abort disposal", async () => { + client = createDbClient(databaseUrl); + await seedDevelopmentRegistry(client.db); + const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-"))); + const repoPath = createGitRepo(); + tempRoots.push(workspaceRoot, repoPath); + const adapter = new CaptureOrderingFakeAdapter({ writeDelayMs: 0 }); + const engine = new DbRunEngine({ + db: client.db, + sessions: sessionRuntime(client.db, adapter), + workspaceRoot, + maxConcurrentRuns: 100, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 }, + }); + + const { runId } = await engine.startRun({ + requirementsMd: "Abort after capturing terminal transcript.", + repoPath, + baseBranch: "main", + }); + runIds.push(runId); + adapter.events.length = 0; + + await engine.abortRun(runId, "user requested abort"); + + expect(adapter.events).toEqual(["capture", "dispose"]); + const [session] = await client.db + .select({ lastCaptureSeq: tuiSessions.lastCaptureSeq }) + .from(tuiSessions) + .where(eq(tuiSessions.runId, runId)); + expect(session?.lastCaptureSeq).toBeGreaterThan(0n); + }); + + it("retries terminal approval cleanup idempotently when a decision is replayed", async () => { + client = createDbClient(databaseUrl); + await seedDevelopmentRegistry(client.db); + const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-"))); + const repoPath = createGitRepo(); + tempRoots.push(workspaceRoot, repoPath); + const adapter = new CaptureFailsAfterDisposeFakeAdapter({ writeDelayMs: 0 }); + const engine = new DbRunEngine({ + db: client.db, + sessions: sessionRuntime(client.db, adapter), + workspaceRoot, + maxConcurrentRuns: 100, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 }, + }); + + const { runId } = await engine.startRun({ + requirementsMd: "Replay a terminal approval decision.", + repoPath, + baseBranch: "main", + }); + runIds.push(runId); + const approvalId = (await engine.getStatus(runId)).approvals[0]?.id; + expect(approvalId).toBeDefined(); + if (approvalId === undefined) { + throw new Error("approval id missing"); + } + const clientToken = randomUUID(); + adapter.events.length = 0; + + await engine.signalApproval(runId, approvalId, "reject", clientToken); + expect(adapter.events).toEqual(["capture", "dispose"]); + adapter.events.length = 0; + + await expect( + engine.signalApproval(runId, approvalId, "reject", clientToken), + ).resolves.toBeUndefined(); + expect(adapter.events).toEqual(["capture", "dispose"]); + expect((await engine.getStatus(runId)).run.state).toBe("failed"); + }); + + it("uses persisted tmux handles when capturing and disposing terminal sessions", async () => { + client = createDbClient(databaseUrl); + await seedDevelopmentRegistry(client.db); + const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-"))); + const repoPath = createGitRepo(); + tempRoots.push(workspaceRoot, repoPath); + const sessions = new TerminalHandleRecordingRuntime(); + const engine = new DbRunEngine({ + db: client.db, + sessions, + workspaceRoot, + maxConcurrentRuns: 100, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 }, + }); + + const { runId } = await engine.startRun({ + requirementsMd: "Abort after persisting tmux handle fields.", + repoPath, + baseBranch: "main", + }); + runIds.push(runId); + await client.db + .update(tuiSessions) + .set({ + lastCaptureSeq: 1n, + lastKnownPanePid: 777, + tmuxSession: "persisted-session", + tmuxWindow: "persisted-window", + }) + .where(eq(tuiSessions.runId, runId)); + + await engine.abortRun(runId, "user requested abort"); + + expect(sessions.captureHandles).toContainEqual({ + sessionId: expect.any(String), + pid: 777, + transcriptBaseline: { + startSeq: 1n, + lines: expect.arrayContaining([expect.any(String)]), + }, + tmuxSession: "persisted-session", + tmuxWindow: "persisted-window", + }); + expect(sessions.disposeHandles).toContainEqual({ + sessionId: expect.any(String), + pid: 777, + tmuxSession: "persisted-session", + tmuxWindow: "persisted-window", + }); + }); + + it("attempts disposal when transcript capture fails during cleanup", async () => { + client = createDbClient(databaseUrl); + await seedDevelopmentRegistry(client.db); + const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-"))); + const repoPath = createGitRepo(); + tempRoots.push(workspaceRoot, repoPath); + const adapter = new CaptureOrderingFakeAdapter({ writeDelayMs: 0 }); + const engine = new DbRunEngine({ + db: client.db, + sessions: sessionRuntime(client.db, adapter), + workspaceRoot, + maxConcurrentRuns: 100, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 }, + }); + + const { runId } = await engine.startRun({ + requirementsMd: "Abort with failed transcript capture.", + repoPath, + baseBranch: "main", + }); + runIds.push(runId); + adapter.events.length = 0; + adapter.failCapture = true; + + await expect(engine.abortRun(runId, "user requested abort")).rejects.toMatchObject({ + code: "pane_briefly_unresponsive", + }); + + expect(adapter.events).toEqual(["capture", "dispose"]); + const [run] = await client.db + .select({ finalReportPath: runs.finalReportPath, state: runs.state }) + .from(runs) + .where(eq(runs.id, runId)); + expect(run).toMatchObject({ state: "aborted" }); + expect(run?.finalReportPath).toMatch(/\.report\.md$/); + + adapter.events.length = 0; + adapter.failCapture = false; + await engine.abortRun(runId, "retry abort cleanup"); + expect(adapter.events).toEqual(["capture", "dispose"]); + }); + + it("writes a failed final report before surfacing approval reject dispose failures", async () => { + client = createDbClient(databaseUrl); + await seedDevelopmentRegistry(client.db); + const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-"))); + const repoPath = createGitRepo(); + tempRoots.push(workspaceRoot, repoPath); + const engine = new DbRunEngine({ + db: client.db, + sessions: sessionRuntime(client.db, new DisposeFailsFakeAdapter({ writeDelayMs: 0 })), + workspaceRoot, + maxConcurrentRuns: 100, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 }, + }); + + const { runId } = await engine.startRun({ + requirementsMd: "Reject while waiting for approval.", + repoPath, + baseBranch: "main", + }); + runIds.push(runId); + const request = pendingApproval(await engine.getStatus(runId), "spec_approved"); + + await expect( + engine.signalApproval(runId, request.id, "reject", randomUUID()), + ).rejects.toMatchObject({ + code: "pane_briefly_unresponsive", + }); + + const [run] = await client.db + .select({ finalReportPath: runs.finalReportPath, state: runs.state }) + .from(runs) + .where(eq(runs.id, runId)); + expect(run?.state).toBe("failed"); + expect(run?.finalReportPath).toMatch(/\.report\.md$/); + }); + + it("writes a failed final report before surfacing workflow approval reject dispose failures", async () => { + client = createDbClient(databaseUrl); + await seedDevelopmentRegistry(client.db); + const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-"))); + const repoPath = createGitRepo(); + tempRoots.push(workspaceRoot, repoPath); + const engine = new DbRunEngine({ + db: client.db, + sessions: sessionRuntime(client.db, new DisposeFailsFakeAdapter({ writeDelayMs: 0 })), + workspaceRoot, + maxConcurrentRuns: 100, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 }, + }); + + const { runId } = await engine.startRun({ + requirementsMd: "Reject through workflow approval path.", + repoPath, + baseBranch: "main", + }); + runIds.push(runId); + const request = pendingApproval(await engine.getStatus(runId), "spec_approved"); + + await expect( + engine.signalApprovalForWorkflow(runId, request.id, "reject", randomUUID()), + ).rejects.toMatchObject({ + code: "pane_briefly_unresponsive", + }); + + const [run] = await client.db + .select({ finalReportPath: runs.finalReportPath, state: runs.state }) + .from(runs) + .where(eq(runs.id, runId)); + expect(run?.state).toBe("failed"); + expect(run?.finalReportPath).toMatch(/\.report\.md$/); + }); + it("sweeps non-terminal M4 runs on API startup recovery", async () => { client = createDbClient(databaseUrl); await seedDevelopmentRegistry(client.db); @@ -1226,7 +1646,7 @@ describe("DbRunEngine", () => { }); }); - it("replays terminal approval disposal side effects for duplicate decisions", async () => { + it("replays terminal approval cleanup side effects idempotently", async () => { client = createDbClient(databaseUrl); await seedDevelopmentRegistry(client.db); const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-"))); @@ -1309,6 +1729,60 @@ describe("DbRunEngine", () => { ), ).toMatchObject({ runId, status: "aborted" }); }); + + it("repairs terminal final reports before surfacing approval replay dispose failures", async () => { + client = createDbClient(databaseUrl); + await seedDevelopmentRegistry(client.db); + const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-"))); + const repoPath = createGitRepo(); + const worktreeRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-worktree-"))); + tempRoots.push(workspaceRoot, repoPath, worktreeRoot); + const [template] = await client.db + .select({ hash: workflowTemplates.hash, id: workflowTemplates.id }) + .from(workflowTemplates) + .where(eq(workflowTemplates.name, "development")) + .limit(1); + if (template === undefined) { + throw new Error("development template missing"); + } + const runId = randomUUID(); + runIds.push(runId); + await client.db.insert(runs).values({ + id: runId, + templateId: template.id, + templateHash: template.hash, + state: "aborted", + repoPath, + baseBranch: "main", + worktreeRoot, + endedAt: new Date(), + finalReportPath: null, + }); + await client.db.insert(tuiSessions).values({ + id: randomUUID(), + runId, + roleId: "implementer", + backend: "fake", + cwd: worktreeRoot, + state: "FAILED_NEEDS_HUMAN", + }); + const engine = new DbRunEngine({ + db: client.db, + sessions: sessionRuntime(client.db, new DisposeFailsFakeAdapter({ writeDelayMs: 0 })), + workspaceRoot, + maxConcurrentRuns: 100, + }); + + await expect(engine.replayAppliedApprovalSideEffects(runId, "abort")).rejects.toMatchObject({ + code: "pane_briefly_unresponsive", + }); + + const [run] = await client.db + .select({ finalReportPath: runs.finalReportPath }) + .from(runs) + .where(eq(runs.id, runId)); + expect(run?.finalReportPath).toMatch(/\.report\.md$/); + }); }); function pendingApproval(status: Awaited>, gateKey: string) { diff --git a/packages/run-engine/src/engine.ts b/packages/run-engine/src/engine.ts index c4c7b16..3a74c50 100644 --- a/packages/run-engine/src/engine.ts +++ b/packages/run-engine/src/engine.ts @@ -21,6 +21,7 @@ import { import { type DbClient, RunEventRepository, + TuiTranscriptRepository, agentPersonas, approvalDecisions, approvalRequests, @@ -33,10 +34,16 @@ import { runPhases, runs, tuiSessions, + tuiTranscriptChunks, workflowTemplates, } from "@devflow/db"; -import type { SessionRuntime } from "@devflow/session"; -import { and, asc, desc, eq, inArray, sql } from "drizzle-orm"; +import { + type SessionHandle, + type SessionRuntime, + type TranscriptBaseline, + captureAndPersistTranscript, +} from "@devflow/session"; +import { and, asc, desc, eq, gt, inArray, lte, sql } from "drizzle-orm"; import { runSingleFakePhase } from "./fake-phase-harness.js"; @@ -198,6 +205,7 @@ export class DbRunEngine implements RunEngine { return { runId }; } await this.markRunFailedIfActive(runId, "start_run_failed"); + await this.composeFinalReportForTerminalRun(runId); throw error; } @@ -387,6 +395,7 @@ export class DbRunEngine implements RunEngine { return this.getStatus(runId); } await this.markRunFailedIfActive(runId, options.failureReason ?? "advance_run_failed"); + await this.composeFinalReportForTerminalRun(runId); throw error; } @@ -398,9 +407,11 @@ export class DbRunEngine implements RunEngine { repoPath: string, baseBranch: string, ): Promise { - await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtext('devflow:start-run-global'))`); await tx.execute( - sql`SELECT pg_advisory_xact_lock(hashtext('devflow:start-run'), hashtext(${`${repoPath}:${baseBranch}`}))`, + sql`SELECT pg_advisory_xact_lock(hashtextextended(${"devflow:start-run-concurrency"}, 0))`, + ); + await tx.execute( + sql`SELECT pg_advisory_xact_lock(hashtextextended(${`devflow:start-run:${repoPath}:${baseBranch}`}, 0))`, ); } @@ -456,17 +467,18 @@ export class DbRunEngine implements RunEngine { return; } await this.markRunFailedIfActive(runId, "approval_advance_failed"); + await this.composeFinalReportForTerminalRun(runId); throw error; } return; } if (parsedAction === "reject") { - await this.composeFinalReportBestEffort(runId, "failed"); + await this.cleanupTerminalRun(runId, "failed", decision.sessionIdsToDispose); return; } - await this.composeFinalReportBestEffort(runId, "aborted"); + await this.cleanupTerminalRun(runId, "aborted", decision.sessionIdsToDispose); } async signalApprovalForWorkflow( @@ -477,7 +489,23 @@ export class DbRunEngine implements RunEngine { comment?: string, ): Promise { const parsedAction = ApprovalDecisionAction.parse(action); - await this.recordApprovalDecision(runId, approvalRequestId, parsedAction, clientToken, comment); + const decision = await this.recordApprovalDecision( + runId, + approvalRequestId, + parsedAction, + clientToken, + comment, + ); + if (parsedAction === "reject" || parsedAction === "abort") { + const status = await this.getStatus(runId); + if (isTerminalRunState(status.run.state)) { + await this.cleanupTerminalRun( + runId, + status.run.state as "completed" | "failed" | "aborted", + decision.sessionIdsToDispose, + ); + } + } } async validateApprovalSignalInput( @@ -514,17 +542,16 @@ export class DbRunEngine implements RunEngine { ): Promise { const parsedAction = ApprovalDecisionAction.parse(action); const shouldDisposeSessions = options.disposeSessions ?? true; - if (shouldDisposeSessions && parsedAction === "reject") { - await this.disposeSessions(await this.sessionIdsForRun(runId)); - } else if (shouldDisposeSessions && parsedAction === "abort") { - await this.disposeSessions(await this.sessionIdsForRun(runId)); - } - const status = await this.getStatus(runId); if (isTerminalRunState(status.run.state)) { - await this.composeFinalReportBestEffort( + const sessionIds = + shouldDisposeSessions && (parsedAction === "reject" || parsedAction === "abort") + ? await this.sessionIdsForRun(runId) + : []; + await this.cleanupTerminalRun( runId, status.run.state as "completed" | "failed" | "aborted", + sessionIds, ); } } @@ -627,11 +654,19 @@ export class DbRunEngine implements RunEngine { async abortRun(runId: string, reason: string): Promise { const eventRepository = new RunEventRepository(this.db); - let aborted = false; + let shouldCleanup = false; let sessionsToDispose: string[] = []; await this.db.transaction(async (tx) => { const [run] = await lockRun(tx, runId); - if (run === undefined || isTerminalRunState(run.state)) { + if (run === undefined) { + return; + } + if (run.state === "aborted") { + sessionsToDispose = await sessionIdsForRun(tx, runId); + shouldCleanup = true; + return; + } + if (isTerminalRunState(run.state)) { return; } await tx @@ -653,12 +688,11 @@ export class DbRunEngine implements RunEngine { await failActivePhasesInTransaction(tx, eventRepository, runId, "abort"); await abortPendingApprovalsInTransaction(tx, runId); sessionsToDispose = await markSessionsFailedInTransaction(tx, eventRepository, runId); - aborted = true; + shouldCleanup = true; }); - if (aborted) { - await this.disposeSessions(sessionsToDispose); - await this.composeFinalReportBestEffort(runId, "aborted"); + if (shouldCleanup) { + await this.cleanupTerminalRun(runId, "aborted", sessionsToDispose); } } @@ -970,7 +1004,7 @@ export class DbRunEngine implements RunEngine { action: ApprovalDecisionActionValue, clientToken: string, comment: string | undefined, - ): Promise<{ replayed: boolean }> { + ): Promise<{ replayed: boolean; sessionIdsToDispose: string[] }> { const decisionIdempotencyKey = `${approvalRequestId}:${action}:${clientToken}`; const eventRepository = new RunEventRepository(this.db); let sessionsToDispose: string[] = []; @@ -1118,11 +1152,7 @@ export class DbRunEngine implements RunEngine { return { replayed: false }; }); - if (sessionsToDispose.length > 0) { - await this.disposeSessions(sessionsToDispose); - } - - return result; + return { ...result, sessionIdsToDispose: sessionsToDispose }; } private async readApprovalSignalState( @@ -1650,11 +1680,8 @@ export class DbRunEngine implements RunEngine { }); sessionsToDispose = await markSessionsFailedInTransaction(tx, eventRepository, runId); }); - if (markedFailed) { - await this.disposeSessions(sessionsToDispose); - } - if (reportStatus !== undefined) { - await this.composeFinalReportBestEffort(runId, reportStatus); + if (markedFailed && reportStatus !== undefined) { + await this.cleanupTerminalRun(runId, reportStatus, sessionsToDispose); } } @@ -1670,6 +1697,127 @@ export class DbRunEngine implements RunEngine { } } + private async composeFinalReportForTerminalRun(runId: string): Promise { + const status = await this.getStatus(runId); + if (!isTerminalRunState(status.run.state)) { + return; + } + await this.composeFinalReportBestEffort( + runId, + status.run.state as "completed" | "failed" | "aborted", + ); + } + + private async cleanupTerminalRun( + runId: string, + status: "completed" | "failed" | "aborted", + sessionIds: readonly string[], + ): Promise { + let captureError: unknown; + try { + await this.captureSessionTranscripts(sessionIds); + } catch (error) { + captureError = error; + } + + await this.composeFinalReportBestEffort(runId, status); + + let disposeError: unknown; + try { + await this.disposeSessions(sessionIds); + } catch (error) { + disposeError = error; + } + + if (captureError !== undefined) { + throw captureError; + } + + if (disposeError !== undefined) { + throw disposeError; + } + } + + private async captureSessionTranscripts(sessionIds: readonly string[]): Promise { + const uniqueSessionIds = [...new Set(sessionIds)]; + if (uniqueSessionIds.length === 0) { + return; + } + + const sessionRows = await this.db + .select({ + id: tuiSessions.id, + lastCaptureSeq: tuiSessions.lastCaptureSeq, + lastKnownPanePid: tuiSessions.lastKnownPanePid, + tmuxSession: tuiSessions.tmuxSession, + tmuxWindow: tuiSessions.tmuxWindow, + }) + .from(tuiSessions) + .where(inArray(tuiSessions.id, uniqueSessionIds)); + const sink = new TuiTranscriptRepository(this.db); + const results = await Promise.allSettled( + sessionRows.map(async (session) => { + const transcriptBaseline = await this.loadTranscriptBaseline( + session.id, + session.lastCaptureSeq, + ); + try { + await captureAndPersistTranscript({ + adapter: this.sessions, + handle: { + ...sessionHandleFromRow(session), + ...(transcriptBaseline === undefined ? {} : { transcriptBaseline }), + }, + fromSeq: session.lastCaptureSeq, + sink, + }); + } catch (error) { + if (isMissingTerminalSessionError(error)) { + return; + } + throw error; + } + }), + ); + const failed = results.find((result) => result.status === "rejected"); + if (failed !== undefined) { + throw failed.reason; + } + } + + private async loadTranscriptBaseline( + sessionId: string, + lastCaptureSeq: bigint, + ): Promise { + if (lastCaptureSeq === 0n) { + return undefined; + } + const rowsDescending = await this.db + .select({ seq: tuiTranscriptChunks.seq, content: tuiTranscriptChunks.content }) + .from(tuiTranscriptChunks) + .where( + and( + eq(tuiTranscriptChunks.sessionId, sessionId), + gt(tuiTranscriptChunks.seq, 0n), + lte(tuiTranscriptChunks.seq, lastCaptureSeq), + ), + ) + .orderBy(desc(tuiTranscriptChunks.seq)) + .limit(transcriptBaselineLineLimit); + if (rowsDescending.length === 0) { + return undefined; + } + const rows = [...rowsDescending].reverse(); + const startSeq = rows[0]?.seq; + if (startSeq === undefined) { + return undefined; + } + return { + startSeq, + lines: rows.map((row) => row.content), + }; + } + private async writeStubFinalReport( runId: string, status: "completed" | "failed" | "aborted", @@ -1905,9 +2053,33 @@ export class DbRunEngine implements RunEngine { } private async disposeSessions(sessionIds: readonly string[]): Promise { - await Promise.all( - sessionIds.map((sessionId) => this.sessions.dispose({ sessionId }).catch(() => undefined)), + const uniqueSessionIds = [...new Set(sessionIds)]; + if (uniqueSessionIds.length === 0) { + return; + } + const sessionRows = await this.db + .select({ + id: tuiSessions.id, + lastKnownPanePid: tuiSessions.lastKnownPanePid, + tmuxSession: tuiSessions.tmuxSession, + tmuxWindow: tuiSessions.tmuxWindow, + }) + .from(tuiSessions) + .where(inArray(tuiSessions.id, uniqueSessionIds)); + const results = await Promise.allSettled( + sessionRows.map((session) => + this.sessions.dispose(sessionHandleFromRow(session)).catch((error: unknown) => { + if (isMissingTerminalSessionError(error)) { + return; + } + throw error; + }), + ), ); + const failed = results.find((result) => result.status === "rejected"); + if (failed !== undefined) { + throw failed.reason; + } } private async sessionIdsForRun(runId: string): Promise { @@ -1915,6 +2087,8 @@ export class DbRunEngine implements RunEngine { } } +const transcriptBaselineLineLimit = 200; + export async function readRunStatus(db: Database, runId: string): Promise { const [run] = await db .select({ @@ -2084,6 +2258,36 @@ function activeRunExists(currentRunId: string, currentState: string): DevflowErr }); } +function sessionHandleFromRow(session: { + id: string; + lastKnownPanePid: number | null; + tmuxSession: string | null; + tmuxWindow: string | null; +}): SessionHandle { + return { + sessionId: session.id, + ...(session.lastKnownPanePid === null ? {} : { pid: session.lastKnownPanePid }), + ...(session.tmuxSession === null ? {} : { tmuxSession: session.tmuxSession }), + ...(session.tmuxWindow === null ? {} : { tmuxWindow: session.tmuxWindow }), + }; +} + +function isMissingTerminalSessionError(error: unknown): boolean { + if (!(error instanceof DevflowError)) { + return false; + } + const hint = `${error.recoveryHint ?? ""}\n${error.message}`.toLowerCase(); + return ( + hint.includes("missing tmux session") || + hint.includes("can't find session") || + hint.includes("can't find pane") || + hint.includes("no server running") || + hint.includes("tmux session is disposed") || + hint.includes("tmux session already disposed") || + hint.includes("fake session is not active") + ); +} + function isPgConstraintViolation(error: unknown, constraint: string): boolean { return ( typeof error === "object" && diff --git a/packages/run-engine/src/fake-phase-harness.test.ts b/packages/run-engine/src/fake-phase-harness.test.ts index b4ded20..ed181eb 100644 --- a/packages/run-engine/src/fake-phase-harness.test.ts +++ b/packages/run-engine/src/fake-phase-harness.test.ts @@ -113,6 +113,19 @@ class ProbeUnknownFailureFakeAdapter extends FakeSessionAdapter { } } +class TmuxLivenessOnlyProbeFakeAdapter extends FakeSessionAdapter { + rebootstrapAttempts = 0; + + override async probe(_handle: SessionHandle): Promise { + return { alive: true, paneActive: true, hint: "tmux_liveness_only" }; + } + + override async rebootstrap(handle: SessionHandle): Promise { + this.rebootstrapAttempts += 1; + return super.rebootstrap(handle); + } +} + class BreakArtifactParentFakeAdapter extends FakeSessionAdapter { override async sendPrompt( handle: SessionHandle, @@ -134,6 +147,105 @@ class WriteDirectoryArtifactFakeAdapter extends FakeSessionAdapter { } } +class SendAndDisposeFailFakeAdapter extends FakeSessionAdapter { + events: string[] = []; + + override async sendPrompt(): Promise<{ promptId: string }> { + throw new Error("unclassified prompt failure"); + } + + override async *capture(handle: SessionHandle, fromSeq: bigint): AsyncIterable { + this.events.push("capture"); + yield* super.capture(handle, fromSeq); + } + + override async dispose(handle: SessionHandle): Promise { + this.events.push("dispose"); + throw new DevflowError("dispose failed", { + class: "recoverable", + code: "pane_briefly_unresponsive", + recoveryHint: `session=${handle.sessionId}`, + }); + } +} + +class SendFailureRecordsTerminalHandlesFakeAdapter extends FakeSessionAdapter { + readonly captureHandles: SessionHandle[] = []; + readonly disposeHandles: SessionHandle[] = []; + + constructor(private readonly db: DbClient["db"]) { + super({ writeDelayMs: 0 }); + } + + override async sendPrompt(handle: SessionHandle): Promise<{ promptId: string }> { + await this.db + .update(tuiSessions) + .set({ + lastKnownPanePid: 777, + tmuxSession: "persisted-session", + tmuxWindow: "persisted-window", + }) + .where(eq(tuiSessions.id, handle.sessionId)); + throw new Error("unclassified prompt failure"); + } + + override async *capture(handle: SessionHandle, fromSeq: bigint): AsyncIterable { + this.captureHandles.push(handle); + yield* super.capture(handle, fromSeq); + } + + override async dispose(handle: SessionHandle): Promise { + this.disposeHandles.push(handle); + await super.dispose(handle); + } +} + +class WrongSessionIdAndDisposeFailFakeAdapter extends FakeSessionAdapter { + override async start(input: StartInput): Promise { + await super.start(input); + return { sessionId: randomUUID() }; + } + + override async dispose(handle: SessionHandle): Promise { + throw new DevflowError("dispose failed", { + class: "recoverable", + code: "pane_briefly_unresponsive", + recoveryHint: `session=${handle.sessionId}`, + }); + } +} + +class CaptureFailsFakeAdapter extends FakeSessionAdapter { + events: string[] = []; + + override capture(): AsyncIterable { + this.events.push("capture"); + return { + [Symbol.asyncIterator]() { + return { + async next() { + throw new DevflowError("transcript history unavailable", { + class: "human_required", + code: "transcript_history_unavailable", + }); + }, + }; + }, + }; + } + + override async dispose(handle: SessionHandle): Promise { + this.events.push("dispose"); + await super.dispose(handle); + } +} + +class SendAndCaptureFailFakeAdapter extends CaptureFailsFakeAdapter { + override async sendPrompt(): Promise<{ promptId: string }> { + throw new Error("unclassified prompt failure"); + } +} + class StartFailsFakeAdapter extends FakeSessionAdapter { constructor(private readonly error: DevflowError) { super(); @@ -442,6 +554,183 @@ describe("runSingleFakePhase", () => { }, ); + it("surfaces dispose failures after fatal prompt send cleanup", async () => { + const { db, phaseId, runId } = await createRunAndPhase(); + const worktreeRoot = realpathSync( + mkdtempSync(join(tmpdir(), "devflow-fake-phase-dispose-failure-")), + ); + tempRoots.push(worktreeRoot); + const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json"); + const adapter = new SendAndDisposeFailFakeAdapter({ writeDelayMs: 0 }); + + await expect( + runSingleFakePhase({ + adapter, + db, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions: "Scenario: ok\nWrite the development specification.", + phaseId, + phaseKey: "implement", + roleId: "implementer", + runId, + worktreeRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 100 }, + }), + ).rejects.toMatchObject({ code: "pane_briefly_unresponsive" }); + + const [run] = await db.select({ state: runs.state }).from(runs).where(eq(runs.id, runId)); + expect(run).toEqual({ state: "failed" }); + expect(adapter.events).toEqual(["capture", "dispose"]); + }); + + it("uses persisted tmux handles when fatal prompt cleanup captures and disposes sessions", async () => { + const { db, phaseId, runId } = await createRunAndPhase(); + const worktreeRoot = realpathSync( + mkdtempSync(join(tmpdir(), "devflow-fake-phase-terminal-cleanup-")), + ); + tempRoots.push(worktreeRoot); + const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json"); + const adapter = new SendFailureRecordsTerminalHandlesFakeAdapter(db); + + await expect( + runSingleFakePhase({ + adapter, + db, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions: "Scenario: ok\nWrite the development specification.", + phaseId, + phaseKey: "implement", + roleId: "implementer", + runId, + worktreeRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 100 }, + }), + ).rejects.toThrow("unclassified prompt failure"); + + expect(adapter.captureHandles).toContainEqual({ + sessionId: expect.any(String), + pid: 777, + tmuxSession: "persisted-session", + tmuxWindow: "persisted-window", + }); + expect(adapter.disposeHandles).toContainEqual({ + sessionId: expect.any(String), + pid: 777, + tmuxSession: "persisted-session", + tmuxWindow: "persisted-window", + }); + }); + + it("attempts disposal when fatal prompt cleanup transcript capture fails", async () => { + const { db, phaseId, runId } = await createRunAndPhase(); + const worktreeRoot = realpathSync( + mkdtempSync(join(tmpdir(), "devflow-fake-phase-capture-cleanup-failure-")), + ); + tempRoots.push(worktreeRoot); + const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json"); + const adapter = new SendAndCaptureFailFakeAdapter({ writeDelayMs: 0 }); + + await expect( + runSingleFakePhase({ + adapter, + db, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions: "Scenario: ok\nWrite the development specification.", + phaseId, + phaseKey: "implement", + roleId: "implementer", + runId, + worktreeRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 100 }, + }), + ).rejects.toMatchObject({ code: "transcript_history_unavailable" }); + + await expect( + db.select({ state: runs.state }).from(runs).where(eq(runs.id, runId)), + ).resolves.toEqual([{ state: "failed" }]); + await expect( + db.select({ state: runPhases.state }).from(runPhases).where(eq(runPhases.id, phaseId)), + ).resolves.toEqual([{ state: "failed" }]); + await expect( + db.select({ state: tuiSessions.state }).from(tuiSessions).where(eq(tuiSessions.runId, runId)), + ).resolves.toEqual([{ state: "FAILED_NEEDS_HUMAN" }]); + expect(adapter.events).toEqual(["capture", "dispose"]); + }); + + it("records durable failure state when session start cleanup fails", async () => { + const { db, phaseId, runId } = await createRunAndPhase(); + const worktreeRoot = realpathSync( + mkdtempSync(join(tmpdir(), "devflow-fake-phase-start-cleanup-failure-")), + ); + tempRoots.push(worktreeRoot); + const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json"); + + await expect( + runSingleFakePhase({ + adapter: new WrongSessionIdAndDisposeFailFakeAdapter({ writeDelayMs: 0 }), + db, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions: "Scenario: ok\nWrite the development specification.", + phaseId, + phaseKey: "implement", + roleId: "implementer", + runId, + worktreeRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 100 }, + }), + ).rejects.toMatchObject({ code: "pane_briefly_unresponsive" }); + + await expect( + db.select({ state: runs.state }).from(runs).where(eq(runs.id, runId)), + ).resolves.toEqual([{ state: "failed" }]); + await expect( + db.select({ state: runPhases.state }).from(runPhases).where(eq(runPhases.id, phaseId)), + ).resolves.toEqual([{ state: "failed" }]); + await expect( + db.select({ state: tuiSessions.state }).from(tuiSessions).where(eq(tuiSessions.runId, runId)), + ).resolves.toEqual([{ state: "FAILED_NEEDS_HUMAN" }]); + }); + + it("captures transcript before creating workflow approval gates", async () => { + const { db, phaseId, runId } = await createRunAndPhase(); + const worktreeRoot = realpathSync( + mkdtempSync(join(tmpdir(), "devflow-fake-phase-workflow-capture-")), + ); + tempRoots.push(worktreeRoot); + const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json"); + + await expect( + runSingleFakePhase({ + adapter: new CaptureFailsFakeAdapter({ writeDelayMs: 0 }), + db, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions: "Scenario: ok\nWrite the development specification.", + phaseId, + phaseKey: "implement", + roleId: "implementer", + runId, + worktreeRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 100 }, + workflowApprovalGateKey: "spec_approved", + }), + ).rejects.toMatchObject({ code: "transcript_history_unavailable" }); + + await expect( + db + .select({ id: approvalRequests.id }) + .from(approvalRequests) + .where(eq(approvalRequests.runId, runId)), + ).resolves.toEqual([]); + await expect( + db.select({ state: runs.state }).from(runs).where(eq(runs.id, runId)), + ).resolves.toEqual([{ state: "executing" }]); + }); + it("rolls back phase start when recording the phase.started event fails", async () => { const { db, phaseId, runId } = await createRunAndPhase(); const worktreeRoot = realpathSync( @@ -2945,6 +3234,46 @@ describe("runSingleFakePhase", () => { expect(approval).toEqual({ gateKey: "artifact_timeout_exhausted", state: "pending" }); }); + it("does not rebootstrap when tmux probe only proves pane liveness", async () => { + const { db, phaseId, runId } = await createRunAndPhase(); + const worktreeRoot = realpathSync( + mkdtempSync(join(tmpdir(), "devflow-fake-phase-probe-liveness-only-")), + ); + tempRoots.push(worktreeRoot); + const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json"); + const sessionId = randomUUID(); + const adapter = new TmuxLivenessOnlyProbeFakeAdapter({ + sessionIdFactory: () => sessionId, + writeDelayMs: 0, + }); + + await expect( + runSingleFakePhase({ + adapter, + db, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions: "Scenario: timeout\nRepair-Scenario: ok\nProbe proves only tmux liveness.", + phaseId, + phaseKey: "implement", + roleId: "implementer", + runId, + worktreeRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 10 }, + uuidFactory: () => "00000000-0000-4000-8000-000000000033", + }), + ).rejects.toMatchObject({ code: "artifact_timeout_exhausted" }); + + expect(adapter.rebootstrapAttempts).toBe(0); + await expectRunPaused(db, runId); + + const [approval] = await db + .select({ gateKey: approvalRequests.gateKey, state: approvalRequests.state }) + .from(approvalRequests) + .where(eq(approvalRequests.runId, runId)); + expect(approval).toEqual({ gateKey: "artifact_timeout_exhausted", state: "pending" }); + }); + it("fails the run when timeout recovery probe throws an unclassified error", async () => { const { db, phaseId, runId } = await createRunAndPhase(); const worktreeRoot = realpathSync( diff --git a/packages/run-engine/src/fake-phase-harness.ts b/packages/run-engine/src/fake-phase-harness.ts index 70203e7..1668c6c 100644 --- a/packages/run-engine/src/fake-phase-harness.ts +++ b/packages/run-engine/src/fake-phase-harness.ts @@ -22,6 +22,7 @@ import { tuiSessions, } from "@devflow/db"; import { + type ProbeResult, type SessionAdapter, type SessionHandle, SessionManager, @@ -253,7 +254,6 @@ export async function runSingleFakePhase( throw gateError; } await failRunAndDisposeSession(input, eventRepository, attempt, "prompt_send_failed", handle); - await captureTranscript(input, handle); throw error; } } @@ -295,7 +295,6 @@ export async function runSingleFakePhase( "artifact_validation_failed", handle, ); - await captureTranscript(input, handle); throw error; } @@ -332,7 +331,6 @@ export async function runSingleFakePhase( "artifact_timeout_recovery_failed", handle, ); - await captureTranscript(input, handle); throw recoveryError; } if (!recovered) { @@ -387,7 +385,6 @@ export async function runSingleFakePhase( "stale_artifact_remove_failed", handle, ); - await captureTranscript(input, handle); throw error; } const timeoutRepairEnvelope = buildEnvelope( @@ -417,7 +414,6 @@ export async function runSingleFakePhase( "prompt_send_failed", handle, ); - await captureTranscript(input, handle); throw repairError; } const gateError = toHumanRequiredRecoveryError(repairError); @@ -464,7 +460,6 @@ export async function runSingleFakePhase( "artifact_repair_failed", handle, ); - await captureTranscript(input, handle); throw repairError; } await failPhaseAndRequestGate( @@ -542,7 +537,6 @@ export async function runSingleFakePhase( "stale_artifact_remove_failed", handle, ); - await captureTranscript(input, handle); throw error; } const repairEnvelope = buildEnvelope( @@ -572,7 +566,6 @@ export async function runSingleFakePhase( "prompt_send_failed", handle, ); - await captureTranscript(input, handle); throw error; } const gateError = toHumanRequiredRecoveryError(error); @@ -618,7 +611,6 @@ export async function runSingleFakePhase( "artifact_repair_failed", handle, ); - await captureTranscript(input, handle); throw error; } await failPhaseAndRequestGate( @@ -640,8 +632,10 @@ export async function runSingleFakePhase( } } + let transcript: Awaited> | undefined; if (outcome.validation.ok) { if (input.workflowApprovalGateKey !== undefined) { + transcript = await captureTranscript(input, handle); await requestWorkflowApproval( input, eventRepository, @@ -685,7 +679,7 @@ export async function runSingleFakePhase( }); } - const transcript = await captureTranscript(input, handle); + transcript ??= await captureTranscript(input, handle); return { sessionId: handle.sessionId, @@ -1307,6 +1301,16 @@ async function failPhaseAndRun( attempt: number, reason: string, ) { + const sessionIdsToDispose = await markPhaseAndRunFailed(input, eventRepository, attempt, reason); + await disposeSessionIds(input, sessionIdsToDispose); +} + +async function markPhaseAndRunFailed( + input: CanonicalRunSingleFakePhaseInput, + eventRepository: RunEventRepository, + attempt: number, + reason: string, +): Promise { let sessionIdsToDispose: string[] = []; await input.db.transaction(async (tx) => { await tx.execute(sql`SELECT 1 FROM ${runs} WHERE ${runs.id} = ${input.runId} FOR UPDATE`); @@ -1348,7 +1352,7 @@ async function failPhaseAndRun( input.runId, ); }); - await disposeSessionIds(input.sessions, sessionIdsToDispose); + return sessionIdsToDispose; } async function failRunAndDisposeSession( @@ -1356,10 +1360,33 @@ async function failRunAndDisposeSession( eventRepository: RunEventRepository, attempt: number, reason: string, - handle: { sessionId: string }, + handle: SessionHandle, ) { - await failPhaseAndRun(input, eventRepository, attempt, reason); - await input.sessions.dispose(handle).catch(() => undefined); + const sessionIdsToDispose = await markPhaseAndRunFailed(input, eventRepository, attempt, reason); + let captureError: unknown; + try { + await captureTranscript(input, handle); + } catch (error) { + captureError = error; + } + + let disposeError: unknown; + try { + await disposeSessionIds(input, sessionIdsToDispose); + if (!sessionIdsToDispose.includes(handle.sessionId)) { + await input.sessions.dispose(await sessionHandleForId(input.db, handle.sessionId, handle)); + } + } catch (error) { + disposeError = error; + } + + if (captureError !== undefined) { + throw captureError; + } + + if (disposeError !== undefined) { + throw disposeError; + } } async function completePhaseAndRun( @@ -1615,11 +1642,19 @@ async function startSessionAndRecord( }); return startedHandle; } catch (error) { + let disposeError: unknown; if (handle !== undefined) { - await input.sessions.dispose(handle); + try { + await input.sessions.dispose(handle); + } catch (cleanupError) { + disposeError = cleanupError; + } } if (isRunStateChanged(error)) { + if (disposeError !== undefined) { + throw disposeError; + } throw error; } if (shouldCreateHumanGate(error)) { @@ -1633,13 +1668,16 @@ async function startSessionAndRecord( { errorCode: error.code, recoveryHint: gateError.recoveryHint }, sessionId, ); + if (disposeError !== undefined) { + throw disposeError; + } throw gateError; } await failPhaseAndRun(input, eventRepository, attempt, "session_start_failed"); await markSessionFailedNeedsHuman(input, eventRepository, sessionId); - if (handle !== undefined) { - await input.sessions.dispose(handle).catch(() => undefined); + if (disposeError !== undefined) { + throw disposeError; } throw error; } @@ -2230,7 +2268,7 @@ async function recoverFromArtifactTimeout( sessionId: string, ): Promise { const probe = await probeWithTypedError(input.sessions, { sessionId }); - if (!probe.alive || !probe.paneActive) { + if (!probe.alive || !probe.paneActive || isBackendReadinessUnknown(probe)) { return false; } await setSessionStateIfRunActive(input, sessionId, "RESUMING"); @@ -2263,6 +2301,10 @@ async function recoverFromArtifactTimeout( return true; } +function isBackendReadinessUnknown(probe: ProbeResult): boolean { + return probe.hint === "tmux_liveness_only"; +} + async function setSessionStateIfRunActive( input: CanonicalRunSingleFakePhaseInput, sessionId: string, @@ -2397,12 +2439,19 @@ async function markAllSessionsFailedInTransaction( return sessions.map((session) => session.id); } -async function disposeSessionIds(sessions: SessionRuntime, sessionIds: readonly string[]) { - await Promise.all( - [...new Set(sessionIds)].map((sessionId) => - sessions.dispose({ sessionId }).catch(() => undefined), - ), - ); +async function disposeSessionIds( + input: CanonicalRunSingleFakePhaseInput, + sessionIds: readonly string[], +) { + if (sessionIds.length === 0) { + return; + } + const handles = await sessionHandlesFromDb(input.db, sessionIds); + const results = await Promise.allSettled(handles.map((handle) => input.sessions.dispose(handle))); + const failed = results.find((result) => result.status === "rejected"); + if (failed !== undefined) { + throw failed.reason; + } } async function waitForArtifact(path: string, options: ArtifactWaitOptions = {}): Promise { @@ -2702,7 +2751,13 @@ async function captureTranscript( ) { const sink = input.transcriptSink ?? new TuiTranscriptRepository(input.db); const [session] = await input.db - .select({ lastCaptureSeq: tuiSessions.lastCaptureSeq }) + .select({ + id: tuiSessions.id, + lastCaptureSeq: tuiSessions.lastCaptureSeq, + lastKnownPanePid: tuiSessions.lastKnownPanePid, + tmuxSession: tuiSessions.tmuxSession, + tmuxWindow: tuiSessions.tmuxWindow, + }) .from(tuiSessions) .where(eq(tuiSessions.id, handle.sessionId)); if (session === undefined) { @@ -2715,12 +2770,51 @@ async function captureTranscript( } return captureAndPersistTranscript({ adapter: input.sessions, - handle, + handle: sessionHandleFromRow(session), fromSeq: session.lastCaptureSeq, sink, }); } +async function sessionHandlesFromDb( + db: DbClient["db"], + sessionIds: readonly string[], +): Promise { + const rows = await db + .select({ + id: tuiSessions.id, + lastKnownPanePid: tuiSessions.lastKnownPanePid, + tmuxSession: tuiSessions.tmuxSession, + tmuxWindow: tuiSessions.tmuxWindow, + }) + .from(tuiSessions) + .where(inArray(tuiSessions.id, [...new Set(sessionIds)])); + return rows.map((row) => sessionHandleFromRow(row)); +} + +async function sessionHandleForId( + db: DbClient["db"], + sessionId: string, + fallback: SessionHandle, +): Promise { + const [handle] = await sessionHandlesFromDb(db, [sessionId]); + return handle ?? fallback; +} + +function sessionHandleFromRow(session: { + id: string; + lastKnownPanePid: number | null; + tmuxSession: string | null; + tmuxWindow: string | null; +}): SessionHandle { + return { + sessionId: session.id, + ...(session.lastKnownPanePid === null ? {} : { pid: session.lastKnownPanePid }), + ...(session.tmuxSession === null ? {} : { tmuxSession: session.tmuxSession }), + ...(session.tmuxWindow === null ? {} : { tmuxWindow: session.tmuxWindow }), + }; +} + function sleep(ms: number, signal?: AbortSignal): Promise { if (signal === undefined) { return new Promise((resolve) => setTimeout(resolve, ms)); diff --git a/packages/session/src/adapter.ts b/packages/session/src/adapter.ts index 0203958..c36262d 100644 --- a/packages/session/src/adapter.ts +++ b/packages/session/src/adapter.ts @@ -23,9 +23,19 @@ export interface StartInput { export interface SessionHandle { sessionId: string; + runId?: string; + roleId?: string; pid?: number; tmuxSession?: string; tmuxWindow?: string; + envelopePrelude?: string; + requirePreludeReplay?: boolean; + transcriptBaseline?: TranscriptBaseline; +} + +export interface TranscriptBaseline { + startSeq: bigint; + lines: readonly string[]; } export interface ProbeResult { diff --git a/packages/session/src/index.ts b/packages/session/src/index.ts index 96aae35..25ea150 100644 --- a/packages/session/src/index.ts +++ b/packages/session/src/index.ts @@ -2,3 +2,4 @@ export * from "./adapter.js"; export * from "./fake.js"; export * from "./manager.js"; export * from "./transcript.js"; +export * from "./tmux.js"; diff --git a/packages/session/src/manager.test.ts b/packages/session/src/manager.test.ts new file mode 100644 index 0000000..8250574 --- /dev/null +++ b/packages/session/src/manager.test.ts @@ -0,0 +1,334 @@ +import { randomUUID } from "node:crypto"; +import { afterEach, describe, expect, it } from "vitest"; + +import type { PromptEnvelope } from "@devflow/core"; +import { + type DbClient, + approvalRequests, + createDbClient, + runEvents, + runs, + tuiSessions, + tuiTranscriptChunks, + workflowTemplates, +} from "@devflow/db"; +import { eq, inArray } from "drizzle-orm"; + +import type { + ProbeResult, + SessionAdapter, + SessionHandle, + StartInput, + TranscriptChunk, +} from "./adapter.js"; +import { SessionManager } from "./manager.js"; + +const testDatabaseUrl = + process.env.DATABASE_URL ?? "postgres://devflow:devflow@127.0.0.1:55432/devflow"; + +class RecordingRecoveryAdapter implements SessionAdapter { + resumeCalls = 0; + sendPromptCalls = 0; + captureCalls: Array<{ sessionId: string; fromSeq: bigint }> = []; + + async start(input: StartInput): Promise { + return { sessionId: input.sessionId ?? randomUUID() }; + } + + async sendPrompt( + _handle: SessionHandle, + envelope: PromptEnvelope, + ): Promise<{ promptId: string }> { + this.sendPromptCalls += 1; + return { promptId: envelope.dedupKey }; + } + + async probe(): Promise { + return { alive: true, paneActive: true }; + } + + async resume(handle: SessionHandle): Promise { + this.resumeCalls += 1; + return handle; + } + + async rebootstrap(handle: SessionHandle): Promise { + return handle; + } + + capture(handle: SessionHandle, fromSeq: bigint): AsyncIterable { + this.captureCalls.push({ sessionId: handle.sessionId, fromSeq }); + return singleTranscript(fromSeq + 1n, "shutdown transcript"); + } + + async dispose(): Promise { + return; + } +} + +describe("SessionManager recovery", () => { + let client: DbClient | undefined; + const runIds: string[] = []; + const templateIds: string[] = []; + + afterEach(async () => { + if (client === undefined) { + return; + } + if (runIds.length > 0) { + await client.db.delete(approvalRequests).where(inArray(approvalRequests.runId, [...runIds])); + await client.db.delete(runs).where(inArray(runs.id, [...runIds])); + } + if (templateIds.length > 0) { + await client.db + .delete(workflowTemplates) + .where(inArray(workflowTemplates.id, [...templateIds])); + } + runIds.length = 0; + templateIds.length = 0; + await client.close(); + client = undefined; + }); + + it("recovers BUSY sessions without prompt proof for baseline replay handling", async () => { + client = createDbClient(testDatabaseUrl); + const templateId = randomUUID(); + const runId = randomUUID(); + const sessionId = randomUUID(); + templateIds.push(templateId); + runIds.push(runId); + + await client.db.insert(workflowTemplates).values({ + id: templateId, + name: `template-${templateId}`, + version: 1, + hash: `hash-${templateId}`, + definition: {}, + }); + await client.db.insert(runs).values({ + id: runId, + templateId, + templateHash: `hash-${templateId}`, + state: "executing", + repoPath: `/tmp/devflow-${runId}`, + baseBranch: "main", + worktreeRoot: `/tmp/devflow-${runId}/main`, + }); + await client.db.insert(tuiSessions).values({ + id: sessionId, + runId, + roleId: "implementer", + backend: "codex", + cwd: `/tmp/devflow-${runId}/main`, + state: "BUSY", + lastPromptHash: "a".repeat(64), + lastPromptAt: new Date("2026-05-13T00:00:00.000Z"), + tmuxSession: "devflow-test-session", + tmuxWindow: "implementer", + }); + const adapter = new RecordingRecoveryAdapter(); + const manager = new SessionManager({ + db: client.db, + adapter, + recoveryRunIds: [runId], + }); + + await expect(manager.recoverSessions()).resolves.toEqual({ + recoveredSessionIds: [sessionId], + failedSessionIds: [], + }); + + expect(adapter.resumeCalls).toBe(1); + await expect( + client.db + .select({ state: tuiSessions.state, recoveryAttempts: tuiSessions.recoveryAttempts }) + .from(tuiSessions) + .where(eq(tuiSessions.id, sessionId)), + ).resolves.toEqual([{ state: "BUSY", recoveryAttempts: 0 }]); + await expect( + client.db.select({ state: runs.state }).from(runs).where(eq(runs.id, runId)), + ).resolves.toEqual([{ state: "executing" }]); + const events = await client.db + .select({ type: runEvents.type }) + .from(runEvents) + .where(eq(runEvents.runId, runId)) + .orderBy(runEvents.seq); + expect(events.map((event) => event.type)).toEqual([]); + }); + + it("skips prompt delivery when durable prompt proof already exists", async () => { + client = createDbClient(testDatabaseUrl); + const templateId = randomUUID(); + const runId = randomUUID(); + templateIds.push(templateId); + runIds.push(runId); + const dedupKey = "b".repeat(64); + + await client.db.insert(workflowTemplates).values({ + id: templateId, + name: `template-${templateId}`, + version: 1, + hash: `hash-${templateId}`, + definition: {}, + }); + await client.db.insert(runs).values({ + id: runId, + templateId, + templateHash: `hash-${templateId}`, + state: "executing", + repoPath: `/tmp/devflow-${runId}`, + baseBranch: "main", + worktreeRoot: `/tmp/devflow-${runId}/main`, + }); + await client.db.insert(runEvents).values({ + runId, + seq: 1n, + type: "prompt.sent", + payload: { roleId: "implementer", dedupKey }, + idempotencyKey: `prompt.sent:${dedupKey}`, + }); + const adapter = new RecordingRecoveryAdapter(); + const manager = new SessionManager({ db: client.db, adapter }); + + await expect( + manager.sendPrompt( + { sessionId: randomUUID() }, + { + uuid: randomUUID(), + runId, + roleId: "implementer", + phaseKey: "spec", + attempt: 1, + expectedArtifact: `/tmp/devflow-${runId}/main/spec.json`, + expectedSchema: "dev/spec@1", + dedupKey, + instructions: "already delivered", + }, + ), + ).resolves.toEqual({ promptId: dedupKey }); + expect(adapter.sendPromptCalls).toBe(0); + }); + + it("does not deliver a prompt if shutdown starts while checking durable prompt proof", async () => { + const adapter = new RecordingRecoveryAdapter(); + const manager = new SessionManager({ adapter }); + const promptProofStarted = deferred(); + const promptProofAllowed = deferred(); + ( + manager as unknown as { + promptDeliveryAlreadyRecorded(envelope: PromptEnvelope): Promise; + } + ).promptDeliveryAlreadyRecorded = async () => { + promptProofStarted.resolve(); + return promptProofAllowed.promise; + }; + const runId = randomUUID(); + const dedupKey = "c".repeat(64); + + const send = manager.sendPrompt( + { sessionId: randomUUID() }, + { + uuid: randomUUID(), + runId, + roleId: "implementer", + phaseKey: "spec", + attempt: 1, + expectedArtifact: `/tmp/devflow-${runId}/main/spec.json`, + expectedSchema: "dev/spec@1", + dedupKey, + instructions: "shutdown race", + }, + ); + await promptProofStarted.promise; + const shutdown = manager.shutdown(); + promptProofAllowed.resolve(false); + + await expect(send).rejects.toMatchObject({ code: "session_manager_draining" }); + await expect(shutdown).resolves.toBeUndefined(); + expect(adapter.sendPromptCalls).toBe(0); + }); + + it("captures tracked transcripts before releasing the singleton lock on shutdown", async () => { + client = createDbClient(testDatabaseUrl); + const templateId = randomUUID(); + const runId = randomUUID(); + const sessionId = randomUUID(); + templateIds.push(templateId); + runIds.push(runId); + + await client.db.insert(workflowTemplates).values({ + id: templateId, + name: `template-${templateId}`, + version: 1, + hash: `hash-${templateId}`, + definition: {}, + }); + await client.db.insert(runs).values({ + id: runId, + templateId, + templateHash: `hash-${templateId}`, + state: "executing", + repoPath: `/tmp/devflow-${runId}`, + baseBranch: "main", + worktreeRoot: `/tmp/devflow-${runId}/main`, + }); + await client.db.insert(tuiSessions).values({ + id: sessionId, + runId, + roleId: "implementer", + backend: "codex", + cwd: `/tmp/devflow-${runId}/main`, + state: "READY", + lastCaptureSeq: 1n, + }); + const adapter = new RecordingRecoveryAdapter(); + const manager = new SessionManager({ db: client.db, adapter }); + await manager.resume({ sessionId }); + + await expect(manager.shutdown()).resolves.toBeUndefined(); + + expect(adapter.captureCalls).toEqual([{ sessionId, fromSeq: 1n }]); + await expect( + client.db + .select({ seq: tuiTranscriptChunks.seq, content: tuiTranscriptChunks.content }) + .from(tuiTranscriptChunks) + .where(eq(tuiTranscriptChunks.sessionId, sessionId)), + ).resolves.toEqual([{ seq: 2n, content: "shutdown transcript" }]); + await expect( + client.db + .select({ lastCaptureSeq: tuiSessions.lastCaptureSeq }) + .from(tuiSessions) + .where(eq(tuiSessions.id, sessionId)), + ).resolves.toEqual([{ lastCaptureSeq: 2n }]); + }); +}); + +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((promiseResolve, promiseReject) => { + resolve = promiseResolve; + reject = promiseReject; + }); + return { promise, resolve, reject }; +} + +function singleTranscript(seq: bigint, content: string): AsyncIterable { + return { + [Symbol.asyncIterator]() { + let emitted = false; + return { + async next() { + if (emitted) { + return { done: true, value: undefined }; + } + emitted = true; + return { + done: false, + value: { seq, content, capturedAt: new Date("2026-05-13T00:00:00.000Z") }, + }; + }, + }; + }, + }; +} diff --git a/packages/session/src/manager.ts b/packages/session/src/manager.ts index e415853..3bad6a3 100644 --- a/packages/session/src/manager.ts +++ b/packages/session/src/manager.ts @@ -2,19 +2,24 @@ import { DevflowError, type PromptEnvelope } from "@devflow/core"; import { type DbClient, RunEventRepository, + TuiTranscriptRepository, approvalRequests, + runEvents, runs, tuiSessions, + tuiTranscriptChunks, } from "@devflow/db"; -import { and, eq, inArray, notInArray, sql } from "drizzle-orm"; +import { and, desc, eq, gt, inArray, lte, notInArray, sql } from "drizzle-orm"; import type { ProbeResult, SessionAdapter, SessionHandle, StartInput, + TranscriptBaseline, TranscriptChunk, } from "./adapter.js"; +import { captureAndPersistTranscript } from "./transcript.js"; type Database = DbClient["db"]; @@ -92,7 +97,7 @@ export class SessionManager implements SessionRuntime { const client = (await this.dbClient.pool.connect()) as AdvisoryLockClient; const result = await client.query<{ acquired: boolean }>( - "SELECT pg_try_advisory_lock(hashtext($1)) AS acquired", + "SELECT pg_try_advisory_lock(hashtextextended($1, 0)) AS acquired", ["devflow:session-manager"], ); if (result.rows[0]?.acquired !== true) { @@ -110,16 +115,27 @@ export class SessionManager implements SessionRuntime { async shutdown(): Promise { this.draining = true; await this.waitForInFlight(); + let captureError: unknown; + try { + await this.captureTrackedTranscripts(); + } catch (error) { + captureError = error; + } const client = this.lockClient; this.lockClient = undefined; this.handles.clear(); if (client !== undefined) { try { - await client.query("SELECT pg_advisory_unlock(hashtext($1))", ["devflow:session-manager"]); + await client.query("SELECT pg_advisory_unlock(hashtextextended($1, 0))", [ + "devflow:session-manager", + ]); } finally { client.release(); } } + if (captureError !== undefined) { + throw captureError; + } } trackOperation(operation: Promise): Promise { @@ -135,7 +151,15 @@ export class SessionManager implements SessionRuntime { async sendPrompt(handle: SessionHandle, envelope: PromptEnvelope): Promise<{ promptId: string }> { this.assertAcceptingPrompts(); - return this.track(this.adapter.sendPrompt(this.handleFor(handle), envelope)); + return this.track( + (async () => { + if (await this.promptDeliveryAlreadyRecorded(envelope)) { + return { promptId: envelope.dedupKey }; + } + this.assertAcceptingPrompts(); + return this.adapter.sendPrompt(this.handleFor(handle), envelope); + })(), + ); } async probe(handle: SessionHandle): Promise { @@ -185,7 +209,9 @@ export class SessionManager implements SessionRuntime { roleId: tuiSessions.roleId, backend: tuiSessions.backend, cwd: tuiSessions.cwd, + lastCaptureSeq: tuiSessions.lastCaptureSeq, lastKnownPanePid: tuiSessions.lastKnownPanePid, + lastPromptHash: tuiSessions.lastPromptHash, recoveryAttempts: tuiSessions.recoveryAttempts, state: tuiSessions.state, tmuxSession: tuiSessions.tmuxSession, @@ -209,11 +235,19 @@ export class SessionManager implements SessionRuntime { const recoveredSessionIds: string[] = []; const failedSessionIds: string[] = []; for (const session of sessionRows) { + const transcriptBaseline = await this.loadTranscriptBaseline( + session.id, + session.lastCaptureSeq, + ); const handle = compactHandle( session.id, + session.runId, + session.roleId, + session.backend, session.lastKnownPanePid, session.tmuxSession, session.tmuxWindow, + transcriptBaseline, ); try { const resumed = await this.resumeWithRetry(handle); @@ -229,6 +263,27 @@ export class SessionManager implements SessionRuntime { return { recoveredSessionIds, failedSessionIds }; } + private async promptDeliveryAlreadyRecorded(envelope: PromptEnvelope): Promise { + if (this.db === undefined) { + return false; + } + + const [event] = await this.db + .select({ id: runEvents.id }) + .from(runEvents) + .where( + and( + eq(runEvents.runId, envelope.runId), + inArray(runEvents.idempotencyKey, [ + `prompt.sent:${envelope.dedupKey}`, + `prompt.repaired:${envelope.dedupKey}`, + ]), + ), + ) + .limit(1); + return event !== undefined; + } + private async markStartupRecoverySucceeded( session: { id: string; @@ -390,6 +445,67 @@ export class SessionManager implements SessionRuntime { throw lastError; } + private async loadTranscriptBaseline( + sessionId: string, + lastCaptureSeq: bigint, + ): Promise { + if (this.db === undefined || lastCaptureSeq === 0n) { + return undefined; + } + + const rowsDescending = await this.db + .select({ seq: tuiTranscriptChunks.seq, content: tuiTranscriptChunks.content }) + .from(tuiTranscriptChunks) + .where( + and( + eq(tuiTranscriptChunks.sessionId, sessionId), + gt(tuiTranscriptChunks.seq, 0n), + lte(tuiTranscriptChunks.seq, lastCaptureSeq), + ), + ) + .orderBy(desc(tuiTranscriptChunks.seq)) + .limit(transcriptBaselineLineLimit); + if (rowsDescending.length === 0) { + return undefined; + } + + const rows = [...rowsDescending].reverse(); + const startSeq = rows[0]?.seq; + if (startSeq === undefined) { + return undefined; + } + return { + startSeq, + lines: rows.map((row) => row.content), + }; + } + + private async captureTrackedTranscripts(): Promise { + if (this.db === undefined || this.handles.size === 0) { + return; + } + + const sessionRows = await this.db + .select({ id: tuiSessions.id, lastCaptureSeq: tuiSessions.lastCaptureSeq }) + .from(tuiSessions) + .where(inArray(tuiSessions.id, [...this.handles.keys()])); + const sink = new TuiTranscriptRepository(this.db); + const results = await Promise.allSettled( + sessionRows.map((session) => + captureAndPersistTranscript({ + adapter: this.adapter, + handle: this.handleFor({ sessionId: session.id }), + fromSeq: session.lastCaptureSeq, + sink, + }), + ), + ); + const failed = results.find((result) => result.status === "rejected"); + if (failed !== undefined) { + throw failed.reason; + } + } + private async track(operation: Promise): Promise { const tracked = operation.finally(() => { this.inFlight.delete(tracked); @@ -420,7 +536,13 @@ export class SessionManager implements SessionRuntime { } private handleFor(handle: SessionHandle): SessionHandle { - return this.handles.get(handle.sessionId) ?? handle; + const tracked = this.handles.get(handle.sessionId); + if (tracked === undefined) { + return handle; + } + const merged = mergeSessionHandles(tracked, handle); + this.handles.set(handle.sessionId, merged); + return merged; } private assertAcceptingPrompts(): void { @@ -442,15 +564,32 @@ function isTerminalRunState(state: string): state is (typeof terminalRunStates)[ function compactHandle( sessionId: string, + runId: string, + roleId: string, + backend: string, pid: number | null, tmuxSession: string | null, tmuxWindow: string | null, + transcriptBaseline: TranscriptBaseline | undefined, ): SessionHandle { return { sessionId, + runId, + roleId, ...(pid === null ? {} : { pid }), ...(tmuxSession === null ? {} : { tmuxSession }), ...(tmuxWindow === null ? {} : { tmuxWindow }), + ...(backend === "fake" ? {} : { requirePreludeReplay: true }), + ...(transcriptBaseline === undefined ? {} : { transcriptBaseline }), + }; +} + +const transcriptBaselineLineLimit = 200; + +function mergeSessionHandles(tracked: SessionHandle, incoming: SessionHandle): SessionHandle { + return { + ...tracked, + ...Object.fromEntries(Object.entries(incoming).filter(([, value]) => value !== undefined)), }; } diff --git a/packages/session/src/tmux.test.ts b/packages/session/src/tmux.test.ts new file mode 100644 index 0000000..aa41edd --- /dev/null +++ b/packages/session/src/tmux.test.ts @@ -0,0 +1,917 @@ +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; + +import { DevflowError, type PromptEnvelope, renderPromptEnvelope } from "@devflow/core"; + +import { + TmuxCommandError, + type TmuxDriver, + type TmuxDriverExecOptions, + TmuxSessionAdapter, +} from "./tmux.js"; + +const runId = "00000000-0000-4000-8000-000000000001"; +const sessionId = "00000000-0000-4000-8000-000000000042"; +const dedupKey = "a".repeat(64); + +interface RecordedTmuxCommand { + args: string[]; + cwd?: string; + input?: string; +} + +class RecordingTmuxDriver implements TmuxDriver { + readonly commands: RecordedTmuxCommand[] = []; + captureOutput = "first line\nsecond line\n"; + panePid = 4242; + private readonly buffers = new Set(); + private readonly failures: { tmuxCommand: string; error: unknown }[] = []; + private readonly liveSessions = new Set(); + + async exec(args: readonly string[], options: TmuxDriverExecOptions = {}): Promise { + const command: RecordedTmuxCommand = { args: [...args] }; + if (options.cwd !== undefined) { + command.cwd = options.cwd; + } + if (options.input !== undefined) { + command.input = options.input; + } + this.commands.push(command); + + const [tmuxCommand] = args; + const failureIndex = this.failures.findIndex((failure) => failure.tmuxCommand === tmuxCommand); + if (failureIndex >= 0) { + const [failure] = this.failures.splice(failureIndex, 1); + throw failure?.error ?? new Error(`failed ${tmuxCommand}`); + } + + if (tmuxCommand === "new-session") { + const sessionName = valueAfter(args, "-s"); + this.liveSessions.add(sessionName); + return ""; + } + if (tmuxCommand === "display-message") { + return `${this.panePid}\n`; + } + if (tmuxCommand === "load-buffer") { + this.buffers.add(valueAfter(args, "-b")); + return ""; + } + if (tmuxCommand === "paste-buffer") { + const bufferName = valueAfter(args, "-b"); + if (!this.buffers.has(bufferName)) { + throw new Error(`can't find buffer: ${bufferName}`); + } + return ""; + } + if (tmuxCommand === "has-session") { + const target = valueAfter(args, "-t"); + if (!this.liveSessions.has(sessionNameFromTarget(target))) { + throw new Error(`missing tmux session ${target}`); + } + return ""; + } + if (tmuxCommand === "capture-pane") { + return this.captureOutput; + } + if (tmuxCommand === "respawn-pane") { + this.panePid += 1; + return ""; + } + if (tmuxCommand === "kill-session") { + const target = valueAfter(args, "-t"); + this.liveSessions.delete(sessionNameFromTarget(target)); + return ""; + } + return ""; + } + + commandCount(tmuxCommand: string): number { + return this.commands.filter((command) => command.args[0] === tmuxCommand).length; + } + + failNext(tmuxCommand: string, error: unknown): void { + this.failures.push({ tmuxCommand, error }); + } + + deleteBuffer(bufferName: string): void { + this.buffers.delete(bufferName); + } + + clearCommands(): void { + this.commands.length = 0; + } +} + +describe("TmuxSessionAdapter", () => { + const tempRoots: string[] = []; + + afterEach(() => { + for (const root of tempRoots.splice(0)) { + rmSync(root, { recursive: true, force: true }); + } + }); + + it("starts a detached tmux session with a shell-quoted backend command and bootstrap prelude", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-start-")); + tempRoots.push(cwd); + const driver = new RecordingTmuxDriver(); + const adapter = new TmuxSessionAdapter({ + driver, + commandForBackend: () => ["codex", "--model", "gpt 5"], + sessionNameFactory: () => "devflow-test-session", + windowNameFactory: () => "implementer", + }); + + const handle = await adapter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "codex", + cwd, + envelopePrelude: "Follow the Devflow protocol", + }); + + expect(handle).toEqual({ + sessionId, + pid: 4242, + tmuxSession: "devflow-test-session", + tmuxWindow: "implementer", + }); + expect(driver.commands[0]).toEqual({ + args: [ + "new-session", + "-d", + "-s", + "devflow-test-session", + "-n", + "implementer", + "-c", + cwd, + "'codex' '--model' 'gpt 5'", + ], + cwd, + }); + expect(driver.commands).toContainEqual({ + args: ["load-buffer", "-b", "devflow-prelude-00000000", "-"], + input: "Follow the Devflow protocol", + }); + expect(driver.commands).toContainEqual({ + args: [ + "paste-buffer", + "-b", + "devflow-prelude-00000000", + "-t", + "devflow-test-session:implementer", + ], + }); + expect(driver.commands).toContainEqual({ + args: ["send-keys", "-t", "devflow-test-session:implementer", "Enter"], + }); + }); + + it("sends rendered prompt envelopes by paste-buffer and treats duplicate dedup keys as idempotent", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-prompt-")); + tempRoots.push(cwd); + const driver = new RecordingTmuxDriver(); + const adapter = new TmuxSessionAdapter({ + driver, + commandForBackend: () => ["claude"], + sessionNameFactory: () => "devflow-test-session", + windowNameFactory: () => "planner", + }); + const handle = await adapter.start({ + sessionId, + runId, + roleId: "planner", + backend: "claude", + cwd, + }); + const prompt = envelope({ roleId: "planner" }); + + await expect(adapter.sendPrompt(handle, prompt)).resolves.toEqual({ promptId: dedupKey }); + await expect( + adapter.sendPrompt(handle, { + ...prompt, + uuid: "00000000-0000-4000-8000-000000000099", + }), + ).resolves.toEqual({ promptId: dedupKey }); + + expect(driver.commandCount("load-buffer")).toBe(1); + expect(driver.commands).toContainEqual({ + args: ["load-buffer", "-b", `devflow-prompt-${dedupKey.slice(0, 12)}`, "-"], + input: renderPromptEnvelope(prompt), + }); + expect(driver.commands).toContainEqual({ + args: [ + "paste-buffer", + "-b", + `devflow-prompt-${dedupKey.slice(0, 12)}`, + "-t", + "devflow-test-session:planner", + ], + }); + + const pasteFailurePrompt = envelope({ + roleId: "planner", + dedupKey: "c".repeat(64), + }); + driver.failNext("paste-buffer", new Error("paste failed before delivery")); + await expect(adapter.sendPrompt(handle, pasteFailurePrompt)).rejects.toMatchObject({ + class: "recoverable", + code: "prompt_send_transient", + }); + const pasteCountAfterPasteFailure = driver.commandCount("paste-buffer"); + await expect(adapter.sendPrompt(handle, pasteFailurePrompt)).resolves.toEqual({ + promptId: "c".repeat(64), + }); + expect(driver.commandCount("paste-buffer")).toBe(pasteCountAfterPasteFailure + 1); + + const partialFailurePrompt = envelope({ + roleId: "planner", + dedupKey: "b".repeat(64), + }); + driver.failNext("send-keys", new Error("send-keys lost acknowledgement")); + await expect(adapter.sendPrompt(handle, partialFailurePrompt)).rejects.toMatchObject({ + class: "recoverable", + code: "prompt_send_transient", + }); + const pasteCountAfterPartialFailure = driver.commandCount("paste-buffer"); + const enterCountAfterPartialFailure = driver.commandCount("send-keys"); + await expect(adapter.sendPrompt(handle, partialFailurePrompt)).resolves.toEqual({ + promptId: "b".repeat(64), + }); + expect(driver.commandCount("paste-buffer")).toBe(pasteCountAfterPartialFailure); + expect(driver.commandCount("send-keys")).toBe(enterCountAfterPartialFailure + 1); + }); + + it("does not re-paste after an uncertain paste-buffer timeout", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-uncertain-paste-")); + tempRoots.push(cwd); + const driver = new RecordingTmuxDriver(); + const adapter = new TmuxSessionAdapter({ + driver, + commandForBackend: () => ["claude"], + sessionNameFactory: () => "devflow-test-session", + windowNameFactory: () => "planner", + }); + const handle = await adapter.start({ + sessionId, + runId, + roleId: "planner", + backend: "claude", + cwd, + }); + const prompt = envelope({ + roleId: "planner", + dedupKey: "f".repeat(64), + }); + driver.failNext( + "paste-buffer", + new TmuxCommandError("paste timed out", { + args: ["paste-buffer"], + reason: "timeout", + }), + ); + + await expect(adapter.sendPrompt(handle, prompt)).rejects.toMatchObject({ + class: "recoverable", + code: "prompt_send_transient", + }); + const pasteCountAfterTimeout = driver.commandCount("paste-buffer"); + const enterCountAfterTimeout = driver.commandCount("send-keys"); + + await expect(adapter.sendPrompt(handle, prompt)).resolves.toEqual({ + promptId: "f".repeat(64), + }); + expect(driver.commandCount("paste-buffer")).toBe(pasteCountAfterTimeout); + expect(driver.commandCount("send-keys")).toBe(enterCountAfterTimeout + 1); + }); + + it("re-pastes a partially delivered prompt after rebootstrap", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-rebootstrap-prompt-")); + tempRoots.push(cwd); + const driver = new RecordingTmuxDriver(); + const adapter = new TmuxSessionAdapter({ + driver, + commandForBackend: () => ["claude"], + sessionNameFactory: () => "devflow-test-session", + windowNameFactory: () => "planner", + }); + const handle = await adapter.start({ + sessionId, + runId, + roleId: "planner", + backend: "claude", + cwd, + }); + const prompt = envelope({ + roleId: "planner", + dedupKey: "d".repeat(64), + }); + driver.failNext("send-keys", new Error("send-keys lost acknowledgement")); + await expect(adapter.sendPrompt(handle, prompt)).rejects.toMatchObject({ + class: "recoverable", + code: "prompt_send_transient", + }); + + await adapter.rebootstrap(handle); + driver.clearCommands(); + + await expect(adapter.sendPrompt(handle, prompt)).resolves.toEqual({ + promptId: "d".repeat(64), + }); + expect(driver.commands).toContainEqual({ + args: ["load-buffer", "-b", "devflow-prompt-dddddddddddd", "-"], + input: renderPromptEnvelope(prompt), + }); + expect(driver.commands).toContainEqual({ + args: [ + "paste-buffer", + "-b", + "devflow-prompt-dddddddddddd", + "-t", + "devflow-test-session:planner", + ], + }); + }); + + it("treats a previously delivered prompt as idempotent after rebootstrap", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-rebootstrap-sent-prompt-")); + tempRoots.push(cwd); + const driver = new RecordingTmuxDriver(); + const adapter = new TmuxSessionAdapter({ + driver, + commandForBackend: () => ["claude"], + sessionNameFactory: () => "devflow-test-session", + windowNameFactory: () => "planner", + }); + const handle = await adapter.start({ + sessionId, + runId, + roleId: "planner", + backend: "claude", + cwd, + }); + const prompt = envelope({ + roleId: "planner", + dedupKey: "e".repeat(64), + }); + await adapter.sendPrompt(handle, prompt); + + await adapter.rebootstrap(handle); + driver.clearCommands(); + + await expect(adapter.sendPrompt(handle, prompt)).resolves.toEqual({ + promptId: "e".repeat(64), + }); + expect(driver.commands).toEqual([]); + }); + + it("does not treat a recovered last prompt hash as durable prompt delivery proof", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-resume-not-dedup-")); + tempRoots.push(cwd); + const driver = new RecordingTmuxDriver(); + const starter = new TmuxSessionAdapter({ + driver, + commandForBackend: () => ["claude"], + sessionNameFactory: () => "devflow-test-session", + windowNameFactory: () => "planner", + }); + await starter.start({ + sessionId, + runId, + roleId: "planner", + backend: "claude", + cwd, + }); + const prompt = envelope({ roleId: "planner" }); + await starter.sendPrompt( + { + sessionId, + tmuxSession: "devflow-test-session", + tmuxWindow: "planner", + }, + prompt, + ); + + const adapter = new TmuxSessionAdapter({ driver }); + driver.clearCommands(); + const resumed = await adapter.resume({ + sessionId, + tmuxSession: "devflow-test-session", + tmuxWindow: "planner", + }); + await expect(adapter.sendPrompt(resumed, prompt)).resolves.toEqual({ + promptId: dedupKey, + }); + expect(driver.commandCount("load-buffer")).toBe(1); + }); + + it("replays a preserved prelude buffer after resume and rebootstrap", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-resume-prelude-")); + tempRoots.push(cwd); + const driver = new RecordingTmuxDriver(); + const starter = new TmuxSessionAdapter({ + driver, + commandForBackend: () => ["codex"], + sessionNameFactory: () => "devflow-test-session", + windowNameFactory: () => "implementer", + }); + await starter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "codex", + cwd, + envelopePrelude: "Follow the Devflow protocol", + }); + + const adapter = new TmuxSessionAdapter({ driver }); + const resumed = await adapter.resume({ + sessionId, + tmuxSession: "devflow-test-session", + tmuxWindow: "implementer", + }); + driver.clearCommands(); + + await expect(adapter.rebootstrap(resumed)).resolves.toMatchObject({ + sessionId, + }); + expect(driver.commands).toContainEqual({ + args: [ + "paste-buffer", + "-b", + "devflow-prelude-00000000", + "-t", + "devflow-test-session:implementer", + ], + }); + }); + + it("fails closed when a recovered real-backend session cannot replay its prelude", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-missing-recovered-prelude-")); + tempRoots.push(cwd); + const driver = new RecordingTmuxDriver(); + const starter = new TmuxSessionAdapter({ + driver, + commandForBackend: () => ["codex"], + sessionNameFactory: () => "devflow-test-session", + windowNameFactory: () => "implementer", + }); + await starter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "codex", + cwd, + envelopePrelude: "Follow the Devflow protocol", + }); + + const adapter = new TmuxSessionAdapter({ driver }); + const resumed = await adapter.resume({ + sessionId, + runId, + roleId: "implementer", + tmuxSession: "devflow-test-session", + tmuxWindow: "implementer", + requirePreludeReplay: true, + }); + driver.deleteBuffer("devflow-prelude-00000000"); + + await expect(adapter.rebootstrap(resumed)).rejects.toMatchObject({ + class: "recoverable", + code: "pane_briefly_unresponsive", + }); + }); + + it("uses recovered transcript baseline to continue capture after resume", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-resume-transcript-")); + tempRoots.push(cwd); + const driver = new RecordingTmuxDriver(); + const starter = new TmuxSessionAdapter({ + driver, + commandForBackend: () => ["codex"], + sessionNameFactory: () => "devflow-test-session", + windowNameFactory: () => "implementer", + now: () => new Date("2026-05-13T00:00:00.000Z"), + }); + await starter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "codex", + cwd, + }); + + const adapter = new TmuxSessionAdapter({ + driver, + now: () => new Date("2026-05-13T00:00:00.000Z"), + }); + driver.captureOutput = "first line\nsecond line\nthird line\n"; + const resumed = await adapter.resume({ + sessionId, + tmuxSession: "devflow-test-session", + tmuxWindow: "implementer", + transcriptBaseline: { + startSeq: 1n, + lines: ["first line", "second line"], + }, + }); + + await expect(collect(adapter.capture(resumed, 2n))).resolves.toEqual([ + { + seq: 3n, + content: "third line", + capturedAt: new Date("2026-05-13T00:00:00.000Z"), + }, + ]); + + driver.captureOutput = "rolled first line\nchanged second line\nchanged third line\n"; + await expect(collect(adapter.capture(resumed, 2n))).rejects.toMatchObject({ + class: "human_required", + code: "transcript_history_unavailable", + }); + }); + + it("uses a transcript baseline passed directly to capture", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-direct-baseline-transcript-")); + tempRoots.push(cwd); + const driver = new RecordingTmuxDriver(); + const starter = new TmuxSessionAdapter({ + driver, + commandForBackend: () => ["codex"], + sessionNameFactory: () => "devflow-test-session", + windowNameFactory: () => "implementer", + now: () => new Date("2026-05-13T00:00:00.000Z"), + }); + await starter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "codex", + cwd, + }); + + const adapter = new TmuxSessionAdapter({ + driver, + now: () => new Date("2026-05-13T00:00:00.000Z"), + }); + driver.captureOutput = "first line\nsecond line\nthird line\n"; + + await expect( + collect( + adapter.capture( + { + sessionId, + tmuxSession: "devflow-test-session", + tmuxWindow: "implementer", + transcriptBaseline: { + startSeq: 1n, + lines: ["first line", "second line"], + }, + }, + 2n, + ), + ), + ).resolves.toEqual([ + { + seq: 3n, + content: "third line", + capturedAt: new Date("2026-05-13T00:00:00.000Z"), + }, + ]); + }); + + it("continues capture from a retained suffix when tmux history partially rolled", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-rolled-suffix-transcript-")); + tempRoots.push(cwd); + const driver = new RecordingTmuxDriver(); + const starter = new TmuxSessionAdapter({ + driver, + commandForBackend: () => ["codex"], + sessionNameFactory: () => "devflow-test-session", + windowNameFactory: () => "implementer", + now: () => new Date("2026-05-13T00:00:00.000Z"), + }); + await starter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "codex", + cwd, + }); + + const baselineLines = Array.from({ length: 200 }, (_, index) => `line ${801 + index}`); + const retainedHistory = Array.from({ length: 105 }, (_, index) => `line ${901 + index}`); + const adapter = new TmuxSessionAdapter({ + driver, + now: () => new Date("2026-05-13T00:00:00.000Z"), + }); + driver.captureOutput = `${retainedHistory.join("\n")}\n`; + const resumed = await adapter.resume({ + sessionId, + tmuxSession: "devflow-test-session", + tmuxWindow: "implementer", + transcriptBaseline: { + startSeq: 801n, + lines: baselineLines, + }, + }); + + await expect(collect(adapter.capture(resumed, 1000n))).resolves.toEqual([ + { + seq: 1001n, + content: "line 1001", + capturedAt: new Date("2026-05-13T00:00:00.000Z"), + }, + { + seq: 1002n, + content: "line 1002", + capturedAt: new Date("2026-05-13T00:00:00.000Z"), + }, + { + seq: 1003n, + content: "line 1003", + capturedAt: new Date("2026-05-13T00:00:00.000Z"), + }, + { + seq: 1004n, + content: "line 1004", + capturedAt: new Date("2026-05-13T00:00:00.000Z"), + }, + { + seq: 1005n, + content: "line 1005", + capturedAt: new Date("2026-05-13T00:00:00.000Z"), + }, + ]); + }); + + it("fails closed when a recovered transcript baseline matches multiple history positions", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-ambiguous-transcript-")); + tempRoots.push(cwd); + const driver = new RecordingTmuxDriver(); + const starter = new TmuxSessionAdapter({ + driver, + commandForBackend: () => ["codex"], + sessionNameFactory: () => "devflow-test-session", + windowNameFactory: () => "implementer", + }); + await starter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "codex", + cwd, + }); + + const adapter = new TmuxSessionAdapter({ driver }); + driver.captureOutput = "A\nB\nA\nB\nC\n"; + const resumed = await adapter.resume({ + sessionId, + tmuxSession: "devflow-test-session", + tmuxWindow: "implementer", + transcriptBaseline: { + startSeq: 3n, + lines: ["A", "B"], + }, + }); + + await expect(collect(adapter.capture(resumed, 4n))).rejects.toMatchObject({ + class: "human_required", + code: "transcript_history_unavailable", + }); + }); + + it("probes, resumes, rebootstraps, captures transcript lines, and disposes tmux sessions", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-lifecycle-")); + tempRoots.push(cwd); + const driver = new RecordingTmuxDriver(); + const adapter = new TmuxSessionAdapter({ + driver, + commandForBackend: () => ["codex"], + sessionNameFactory: () => "devflow-test-session", + windowNameFactory: () => "implementer", + now: () => new Date("2026-05-13T00:00:00.000Z"), + }); + const handle = await adapter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "codex", + cwd, + envelopePrelude: "Follow the Devflow protocol", + }); + + await expect(adapter.probe(handle)).resolves.toEqual({ + alive: true, + hint: "tmux_liveness_only", + paneActive: true, + lastOutputAt: new Date("2026-05-13T00:00:00.000Z"), + }); + await expect(adapter.resume(handle)).resolves.toMatchObject({ + sessionId, + pid: 4242, + tmuxSession: "devflow-test-session", + tmuxWindow: "implementer", + }); + driver.clearCommands(); + await expect(adapter.rebootstrap(handle)).resolves.toMatchObject({ + sessionId, + pid: 4243, + }); + expect(driver.commands).toContainEqual({ + args: ["load-buffer", "-b", "devflow-prelude-00000000", "-"], + input: "Follow the Devflow protocol", + }); + + const chunks = await collect(adapter.capture(handle, 0n)); + expect(chunks).toEqual([ + { + seq: 1n, + content: "first line", + capturedAt: new Date("2026-05-13T00:00:00.000Z"), + }, + { + seq: 2n, + content: "second line", + capturedAt: new Date("2026-05-13T00:00:00.000Z"), + }, + ]); + await expect(collect(adapter.capture(handle, 2n))).resolves.toEqual([]); + expect(driver.commands).toContainEqual({ + args: ["capture-pane", "-p", "-t", "devflow-test-session:implementer", "-S", "-"], + }); + driver.captureOutput = "only current line\n"; + await expect(collect(adapter.capture(handle, 2n))).rejects.toMatchObject({ + class: "human_required", + code: "transcript_history_unavailable", + }); + driver.captureOutput = "rolled first line\nchanged second line\n"; + await expect(collect(adapter.capture(handle, 1n))).rejects.toMatchObject({ + class: "human_required", + code: "transcript_history_unavailable", + }); + + await adapter.dispose(handle); + await expect(adapter.probe(handle)).resolves.toMatchObject({ + alive: false, + paneActive: false, + }); + }); + + it("cleans up a tmux session when bootstrap fails after new-session", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-start-cleanup-")); + tempRoots.push(cwd); + const driver = new RecordingTmuxDriver(); + driver.failNext("display-message", new Error("pid unavailable")); + const adapter = new TmuxSessionAdapter({ + driver, + commandForBackend: () => ["codex"], + sessionNameFactory: () => "devflow-test-session", + windowNameFactory: () => "implementer", + }); + + await expect( + adapter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "codex", + cwd, + }), + ).rejects.toMatchObject({ + class: "recoverable", + }); + expect(driver.commands).toContainEqual({ + args: ["kill-session", "-t", "devflow-test-session"], + }); + + driver.failNext("display-message", new Error("pid unavailable")); + driver.failNext("kill-session", new Error("permission denied")); + await expect( + adapter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "codex", + cwd, + }), + ).rejects.toMatchObject({ + class: "recoverable", + recoveryHint: "permission denied", + }); + }); + + it("only ignores dispose failures when the tmux session is already gone", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-dispose-")); + tempRoots.push(cwd); + const driver = new RecordingTmuxDriver(); + const adapter = new TmuxSessionAdapter({ + driver, + commandForBackend: () => ["codex"], + sessionNameFactory: () => "devflow-test-session", + windowNameFactory: () => "implementer", + }); + const handle = await adapter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "codex", + cwd, + }); + + driver.failNext("kill-session", new Error("permission denied")); + await expect(adapter.dispose(handle)).rejects.toMatchObject({ + class: "recoverable", + code: "pane_briefly_unresponsive", + }); + + driver.failNext("kill-session", new Error("missing tmux session devflow-test-session")); + await expect(adapter.dispose(handle)).resolves.toBeUndefined(); + }); + + it("classifies missing backend commands as human-required backend_unavailable", async () => { + const cwd = mkdtempSync(join(tmpdir(), "devflow-tmux-unavailable-")); + tempRoots.push(cwd); + const adapter = new TmuxSessionAdapter({ + driver: new RecordingTmuxDriver(), + commandForBackend: () => undefined, + }); + + await expect( + adapter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "codex", + cwd, + }), + ).rejects.toMatchObject({ + class: "human_required", + code: "backend_unavailable", + }); + }); + + it("classifies missing session cwd as fatal workspace_permissions", async () => { + const adapter = new TmuxSessionAdapter({ + driver: new RecordingTmuxDriver(), + commandForBackend: () => ["codex"], + }); + + await expect( + adapter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "codex", + cwd: join(tmpdir(), `devflow-missing-cwd-${sessionId}`), + }), + ).rejects.toMatchObject({ + class: "fatal", + code: "workspace_permissions", + }); + }); +}); + +function envelope(overrides: Partial = {}): PromptEnvelope { + return { + uuid: "00000000-0000-4000-8000-000000000010", + runId, + roleId: "implementer", + phaseKey: "implement", + attempt: 0, + expectedArtifact: "/tmp/devflow-artifact.json", + expectedSchema: "dev/spec@1", + dedupKey, + instructions: "Build the artifact", + ...overrides, + }; +} + +async function collect(iterable: AsyncIterable): Promise { + const items: T[] = []; + for await (const item of iterable) { + items.push(item); + } + return items; +} + +function valueAfter(values: readonly string[], flag: string): string { + const index = values.indexOf(flag); + const value = values[index + 1]; + if (index < 0 || value === undefined) { + throw new DevflowError(`Missing tmux flag ${flag}`, { + class: "fatal", + code: "test_driver_missing_flag", + }); + } + return value; +} + +function sessionNameFromTarget(target: string): string { + return target.split(":")[0] ?? target; +} diff --git a/packages/session/src/tmux.ts b/packages/session/src/tmux.ts new file mode 100644 index 0000000..e2fcd0b --- /dev/null +++ b/packages/session/src/tmux.ts @@ -0,0 +1,857 @@ +import { spawn } from "node:child_process"; +import { randomUUID } from "node:crypto"; +import { existsSync } from "node:fs"; + +import { DevflowError, type PromptEnvelope, renderPromptEnvelope } from "@devflow/core"; + +import type { + ProbeResult, + SessionAdapter, + SessionHandle, + StartInput, + TranscriptChunk, +} from "./adapter.js"; + +export interface TmuxDriverExecOptions { + cwd?: string; + input?: string; + timeoutMs?: number; +} + +export interface TmuxDriver { + exec(args: readonly string[], options?: TmuxDriverExecOptions): Promise; +} + +export interface ChildProcessTmuxDriverOptions { + binaryPath?: string; + timeoutMs?: number; +} + +export class TmuxCommandError extends Error { + readonly args: readonly string[]; + readonly stderr: string | undefined; + readonly exitCode: number | undefined; + readonly reason: "exit_nonzero" | "spawn_failed" | "timeout"; + + constructor( + message: string, + options: { + args: readonly string[]; + reason: "exit_nonzero" | "spawn_failed" | "timeout"; + stderr?: string; + exitCode?: number; + cause?: unknown; + }, + ) { + super(message, { cause: options.cause }); + this.name = "TmuxCommandError"; + this.args = options.args; + this.reason = options.reason; + this.stderr = options.stderr; + this.exitCode = options.exitCode; + } +} + +export class ChildProcessTmuxDriver implements TmuxDriver { + private readonly binaryPath: string; + private readonly timeoutMs: number; + + constructor(options: ChildProcessTmuxDriverOptions = {}) { + this.binaryPath = options.binaryPath ?? "tmux"; + this.timeoutMs = options.timeoutMs ?? 10_000; + } + + exec(args: readonly string[], options: TmuxDriverExecOptions = {}): Promise { + return new Promise((resolve, reject) => { + const child = spawn(this.binaryPath, [...args], { + cwd: options.cwd, + stdio: ["pipe", "pipe", "pipe"], + }); + let stdout = ""; + let stderr = ""; + let settled = false; + const timeoutMs = options.timeoutMs ?? this.timeoutMs; + const timer = setTimeout(() => { + if (settled) { + return; + } + settled = true; + child.kill("SIGKILL"); + reject( + new TmuxCommandError(`tmux command timed out: ${args.join(" ")}`, { + args, + reason: "timeout", + stderr, + }), + ); + }, timeoutMs); + + child.stdout.setEncoding("utf8"); + child.stderr.setEncoding("utf8"); + child.stdout.on("data", (chunk: string) => { + stdout += chunk; + }); + child.stderr.on("data", (chunk: string) => { + stderr += chunk; + }); + child.on("error", (cause) => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + reject( + new TmuxCommandError(`failed to spawn tmux: ${this.binaryPath}`, { + args, + reason: "spawn_failed", + cause, + }), + ); + }); + child.on("close", (exitCode) => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + if (exitCode === 0) { + resolve(stdout); + return; + } + reject( + new TmuxCommandError(`tmux command failed: ${args.join(" ")}`, { + args, + reason: "exit_nonzero", + stderr, + ...(exitCode === null ? {} : { exitCode }), + }), + ); + }); + + if (options.input !== undefined) { + child.stdin.end(options.input); + } else { + child.stdin.end(); + } + }); + } +} + +export interface TmuxSessionAdapterOptions { + driver?: TmuxDriver; + sessionIdFactory?: () => string; + sessionNameFactory?: (input: StartInput & { sessionId: string }) => string; + windowNameFactory?: (input: StartInput & { sessionId: string }) => string; + commandForBackend?: (input: StartInput) => readonly string[] | undefined; + now?: () => Date; +} + +interface TmuxSessionRecord { + handle: SessionHandle; + pastedDedupKeys: Set; + sentDedupKeys: Set; + runId?: string; + roleId?: string; + envelopePrelude?: string; + requirePreludeReplay: boolean; + transcriptAnchor: TranscriptAnchor | undefined; + lastOutputAt?: Date; + disposed: boolean; +} + +interface TranscriptAnchor { + startSeq: bigint; + lines: string[]; +} + +export class TmuxSessionAdapter implements SessionAdapter { + private readonly driver: TmuxDriver; + private readonly sessionIdFactory: () => string; + private readonly sessionNameFactory: (input: StartInput & { sessionId: string }) => string; + private readonly windowNameFactory: (input: StartInput & { sessionId: string }) => string; + private readonly commandForBackend: (input: StartInput) => readonly string[] | undefined; + private readonly now: () => Date; + private readonly records = new Map(); + + constructor(options: TmuxSessionAdapterOptions = {}) { + this.driver = options.driver ?? new ChildProcessTmuxDriver(); + this.sessionIdFactory = options.sessionIdFactory ?? randomUUID; + this.sessionNameFactory = + options.sessionNameFactory ?? + ((input) => `devflow_${compactIdentifier(input.sessionId).slice(0, 32)}`); + this.windowNameFactory = options.windowNameFactory ?? ((input) => input.roleId); + this.commandForBackend = options.commandForBackend ?? defaultCommandForBackend; + this.now = options.now ?? (() => new Date()); + } + + async start(input: StartInput): Promise { + if (!existsSync(input.cwd)) { + throw new DevflowError(`Session cwd does not exist: ${input.cwd}`, { + class: "fatal", + code: "workspace_permissions", + runId: input.runId, + }); + } + const command = this.commandForBackend(input); + if (command === undefined || command.length === 0) { + throw new DevflowError(`No tmux backend command registered for ${input.backend}`, { + class: "human_required", + code: "backend_unavailable", + runId: input.runId, + }); + } + + const sessionId = input.sessionId ?? this.sessionIdFactory(); + const factoryInput = { ...input, sessionId }; + const tmuxSession = sanitizeTmuxName(this.sessionNameFactory(factoryInput), "session"); + const tmuxWindow = sanitizeTmuxName(this.windowNameFactory(factoryInput), "main"); + const handle: SessionHandle = { sessionId, tmuxSession, tmuxWindow }; + let sessionCreated = false; + + try { + await this.runTmux( + [ + "new-session", + "-d", + "-s", + tmuxSession, + "-n", + tmuxWindow, + "-c", + input.cwd, + shellJoin(command), + ], + { cwd: input.cwd }, + input.runId, + ); + sessionCreated = true; + + const record: TmuxSessionRecord = { + handle, + pastedDedupKeys: new Set(), + sentDedupKeys: new Set(), + runId: input.runId, + roleId: input.roleId, + ...(input.envelopePrelude === undefined ? {} : { envelopePrelude: input.envelopePrelude }), + requirePreludeReplay: + input.envelopePrelude !== undefined && input.envelopePrelude.length > 0, + transcriptAnchor: undefined, + lastOutputAt: this.now(), + disposed: false, + }; + this.records.set(sessionId, record); + + if (input.envelopePrelude !== undefined && input.envelopePrelude.length > 0) { + await this.pasteText( + handle, + preludeBufferName(sessionId), + input.envelopePrelude, + input.runId, + ); + } + + const pid = await this.readPanePid(handle, input.runId); + const handleWithPid = { ...handle, pid }; + record.handle = handleWithPid; + return handleWithPid; + } catch (error) { + this.records.delete(sessionId); + if (sessionCreated) { + await this.killSession(handle, { ignoreMissing: true }); + } + throw error; + } + } + + async sendPrompt(handle: SessionHandle, envelope: PromptEnvelope): Promise<{ promptId: string }> { + const record = this.recordFor(handle); + if (record.runId !== undefined && envelope.runId !== record.runId) { + throw new DevflowError("Prompt does not match tmux session run", { + class: "fatal", + code: "prompt_session_mismatch", + runId: envelope.runId, + }); + } + if (record.roleId !== undefined && envelope.roleId !== record.roleId) { + throw new DevflowError("Prompt does not match tmux session role", { + class: "fatal", + code: "prompt_session_mismatch", + runId: envelope.runId, + }); + } + if (record.sentDedupKeys.has(envelope.dedupKey)) { + return { promptId: envelope.dedupKey }; + } + + const bufferName = promptBufferName(envelope.dedupKey); + if (!record.pastedDedupKeys.has(envelope.dedupKey)) { + await this.loadBuffer( + bufferName, + renderPromptEnvelope(envelope), + envelope.runId, + "prompt_send_transient", + ); + try { + await this.pasteBuffer(record.handle, bufferName, envelope.runId, "prompt_send_transient"); + } catch (error) { + if (isUncertainPasteFailure(error)) { + record.pastedDedupKeys.add(envelope.dedupKey); + } + throw error; + } + record.pastedDedupKeys.add(envelope.dedupKey); + } + await this.sendEnter(record.handle, envelope.runId, "prompt_send_transient"); + record.sentDedupKeys.add(envelope.dedupKey); + record.lastOutputAt = this.now(); + return { promptId: envelope.dedupKey }; + } + + async probe(handle: SessionHandle): Promise { + const probeHandle = this.handleFor(handle); + try { + await this.driver.exec(["has-session", "-t", tmuxSessionName(probeHandle)]); + await this.readPanePid(probeHandle); + } catch (error) { + return { + alive: false, + paneActive: false, + hint: recoveryHint(error), + }; + } + + const result: ProbeResult = { + alive: true, + paneActive: true, + hint: "tmux_liveness_only", + }; + const lastOutputAt = this.records.get(handle.sessionId)?.lastOutputAt; + if (lastOutputAt !== undefined) { + return { ...result, lastOutputAt }; + } + return result; + } + + async resume(handle: SessionHandle): Promise { + const resumeHandle = this.handleFor(handle); + await this.runTmux(["has-session", "-t", tmuxSessionName(resumeHandle)]); + const pid = await this.readPanePid(resumeHandle); + const resumed: SessionHandle = { ...resumeHandle, pid }; + const record = this.records.get(handle.sessionId); + if (record === undefined) { + this.records.set(handle.sessionId, { + handle: resumed, + pastedDedupKeys: new Set(), + sentDedupKeys: new Set(), + ...(handle.runId === undefined ? {} : { runId: handle.runId }), + ...(handle.roleId === undefined ? {} : { roleId: handle.roleId }), + ...(handle.envelopePrelude === undefined + ? {} + : { envelopePrelude: handle.envelopePrelude }), + requirePreludeReplay: handle.requirePreludeReplay === true, + ...(handle.transcriptBaseline === undefined + ? { transcriptAnchor: undefined } + : { + transcriptAnchor: { + startSeq: handle.transcriptBaseline.startSeq, + lines: [...handle.transcriptBaseline.lines], + }, + }), + disposed: false, + }); + } else { + record.handle = resumed; + if (handle.runId !== undefined) { + record.runId = handle.runId; + } + if (handle.roleId !== undefined) { + record.roleId = handle.roleId; + } + if (handle.envelopePrelude !== undefined) { + record.envelopePrelude = handle.envelopePrelude; + } + record.requirePreludeReplay = + record.requirePreludeReplay || handle.requirePreludeReplay === true; + record.disposed = false; + } + return resumed; + } + + async rebootstrap(handle: SessionHandle): Promise { + const rebootHandle = this.handleFor(handle); + const record = this.recordFor(handle); + await this.runTmux(["respawn-pane", "-k", "-t", tmuxTarget(rebootHandle)]); + const pid = await this.readPanePid(rebootHandle); + const rebootstrapped: SessionHandle = { ...rebootHandle, pid }; + record.handle = rebootstrapped; + record.pastedDedupKeys.clear(); + if (record.envelopePrelude !== undefined && record.envelopePrelude.length > 0) { + await this.pasteText( + rebootstrapped, + preludeBufferName(rebootstrapped.sessionId), + record.envelopePrelude, + record.runId, + ); + } else if (record.requirePreludeReplay) { + await this.pasteExistingBuffer(rebootstrapped, preludeBufferName(rebootstrapped.sessionId), { + ignoreMissing: false, + }); + } else { + await this.pasteExistingBuffer(rebootstrapped, preludeBufferName(rebootstrapped.sessionId), { + ignoreMissing: true, + }); + } + record.lastOutputAt = this.now(); + record.disposed = false; + return rebootstrapped; + } + + async *capture(handle: SessionHandle, fromSeq: bigint): AsyncIterable { + if (fromSeq > BigInt(Number.MAX_SAFE_INTEGER)) { + throw new DevflowError("Transcript cursor is too large for tmux line capture", { + class: "fatal", + code: "transcript_sequence_invalid", + }); + } + + const record = this.recordFor(handle); + const captureHandle = record.handle; + const stdout = await this.runTmux([ + "capture-pane", + "-p", + "-t", + tmuxTarget(captureHandle), + "-S", + "-", + ]); + const lines = trimTrailingEmptyLines(stdout.replace(/\r\n/g, "\n").split("\n")); + const anchor = record.transcriptAnchor; + const firstAvailableSeq = firstAvailableTranscriptSeq(lines, anchor, fromSeq); + const lastAvailableSeq = firstAvailableSeq + BigInt(lines.length) - 1n; + if (fromSeq > lastAvailableSeq) { + throw transcriptHistoryUnavailable("tmux transcript cursor is ahead of available history"); + } + const chunks: TranscriptChunk[] = []; + let seq = fromSeq; + const startIndex = Number(fromSeq - firstAvailableSeq + 1n); + for (const line of lines.slice(startIndex)) { + seq += 1n; + chunks.push({ + seq, + content: line, + capturedAt: this.now(), + }); + } + const nextAnchor = tailTranscriptAnchor(firstAvailableSeq, lines); + if (nextAnchor === undefined) { + record.transcriptAnchor = undefined; + } else { + record.transcriptAnchor = nextAnchor; + } + for (const chunk of chunks) { + yield chunk; + } + } + + async dispose(handle: SessionHandle): Promise { + const disposeHandle = this.handleFor(handle); + await this.killSession(disposeHandle, { ignoreMissing: true }); + const record = this.records.get(handle.sessionId); + if (record !== undefined) { + record.disposed = true; + } + } + + private recordFor(handle: SessionHandle): TmuxSessionRecord { + const existing = this.records.get(handle.sessionId); + if (existing !== undefined && !existing.disposed) { + mergeRecordFromHandle(existing, handle); + return existing; + } + if (existing?.disposed === true) { + throw new DevflowError("Tmux session is disposed", { + class: "recoverable", + code: "pane_briefly_unresponsive", + }); + } + const resolvedHandle = this.handleFor(handle); + const record: TmuxSessionRecord = { + handle: resolvedHandle, + pastedDedupKeys: new Set(), + sentDedupKeys: new Set(), + ...(handle.runId === undefined ? {} : { runId: handle.runId }), + ...(handle.roleId === undefined ? {} : { roleId: handle.roleId }), + ...(handle.envelopePrelude === undefined ? {} : { envelopePrelude: handle.envelopePrelude }), + requirePreludeReplay: handle.requirePreludeReplay === true, + transcriptAnchor: + handle.transcriptBaseline === undefined + ? undefined + : { + startSeq: handle.transcriptBaseline.startSeq, + lines: [...handle.transcriptBaseline.lines], + }, + disposed: false, + }; + this.records.set(handle.sessionId, record); + return record; + } + + private handleFor(handle: SessionHandle): SessionHandle { + const record = this.records.get(handle.sessionId); + if (record !== undefined) { + mergeRecordFromHandle(record, handle); + } + const existing = record?.handle ?? handle; + const tmuxSession = existing.tmuxSession ?? defaultSessionName(existing.sessionId); + return { + ...existing, + tmuxSession, + }; + } + + private async pasteText( + handle: SessionHandle, + bufferName: string, + text: string, + runId?: string, + ): Promise { + await this.loadBuffer(bufferName, text, runId); + await this.pasteBuffer(handle, bufferName, runId); + await this.sendEnter(handle, runId); + } + + private async loadBuffer( + bufferName: string, + text: string, + runId?: string, + recoverableCode = "pane_briefly_unresponsive", + ): Promise { + await this.runTmux( + ["load-buffer", "-b", bufferName, "-"], + { input: text }, + runId, + recoverableCode, + ); + } + + private async pasteBuffer( + handle: SessionHandle, + bufferName: string, + runId?: string, + recoverableCode = "pane_briefly_unresponsive", + ): Promise { + await this.runTmux( + ["paste-buffer", "-b", bufferName, "-t", tmuxTarget(handle)], + undefined, + runId, + recoverableCode, + ); + } + + private async sendEnter( + handle: SessionHandle, + runId?: string, + recoverableCode = "pane_briefly_unresponsive", + ): Promise { + await this.runTmux( + ["send-keys", "-t", tmuxTarget(handle), "Enter"], + undefined, + runId, + recoverableCode, + ); + } + + private async pasteExistingBuffer( + handle: SessionHandle, + bufferName: string, + options: { ignoreMissing: boolean }, + ): Promise { + try { + await this.pasteBuffer(handle, bufferName); + await this.sendEnter(handle); + } catch (error) { + if (options.ignoreMissing && isMissingBufferFailure(error)) { + return; + } + throw error; + } + } + + private async readPanePid(handle: SessionHandle, runId?: string): Promise { + const stdout = await this.runTmux( + ["display-message", "-p", "-t", tmuxTarget(handle), "#{pane_pid}"], + undefined, + runId, + ); + const pid = Number.parseInt(stdout.trim(), 10); + if (!Number.isInteger(pid) || pid <= 0) { + throw new DevflowError(`Unable to parse tmux pane pid from ${JSON.stringify(stdout)}`, { + class: "recoverable", + code: "pane_briefly_unresponsive", + ...(runId === undefined ? {} : { runId }), + }); + } + return pid; + } + + private async runTmux( + args: readonly string[], + options?: TmuxDriverExecOptions, + runId?: string, + recoverableCode = "pane_briefly_unresponsive", + ): Promise { + try { + return await this.driver.exec(args, options); + } catch (cause) { + throw classifyTmuxFailure(cause, runId, recoverableCode); + } + } + + private async killSession( + handle: SessionHandle, + options: { ignoreMissing: boolean }, + ): Promise { + try { + await this.driver.exec(["kill-session", "-t", tmuxSessionName(handle)]); + } catch (cause) { + if (options.ignoreMissing && isMissingSessionFailure(cause)) { + return; + } + throw classifyTmuxFailure(cause); + } + } +} + +function defaultCommandForBackend(input: StartInput): readonly string[] | undefined { + if (input.backend === "fake") { + return undefined; + } + return [input.backend]; +} + +function mergeRecordFromHandle(record: TmuxSessionRecord, handle: SessionHandle): void { + record.handle = mergeSessionHandles(record.handle, handle); + if (handle.runId !== undefined) { + record.runId = handle.runId; + } + if (handle.roleId !== undefined) { + record.roleId = handle.roleId; + } + if (handle.envelopePrelude !== undefined) { + record.envelopePrelude = handle.envelopePrelude; + } + record.requirePreludeReplay = record.requirePreludeReplay || handle.requirePreludeReplay === true; + if (handle.transcriptBaseline !== undefined) { + record.transcriptAnchor = { + startSeq: handle.transcriptBaseline.startSeq, + lines: [...handle.transcriptBaseline.lines], + }; + } +} + +function mergeSessionHandles(tracked: SessionHandle, incoming: SessionHandle): SessionHandle { + return { + ...tracked, + ...Object.fromEntries(Object.entries(incoming).filter(([, value]) => value !== undefined)), + }; +} + +function tmuxSessionName(handle: SessionHandle): string { + return handle.tmuxSession ?? defaultSessionName(handle.sessionId); +} + +function tmuxTarget(handle: SessionHandle): string { + const sessionName = tmuxSessionName(handle); + return handle.tmuxWindow === undefined ? sessionName : `${sessionName}:${handle.tmuxWindow}`; +} + +function defaultSessionName(sessionId: string): string { + return `devflow_${compactIdentifier(sessionId).slice(0, 32)}`; +} + +function preludeBufferName(sessionId: string): string { + return `devflow-prelude-${compactIdentifier(sessionId).slice(0, 8)}`; +} + +function promptBufferName(dedupKey: string): string { + return `devflow-prompt-${dedupKey.slice(0, 12)}`; +} + +function sanitizeTmuxName(value: string, fallback: string): string { + const sanitized = value.replace(/[^A-Za-z0-9_-]/g, "_").replace(/^_+|_+$/g, ""); + return sanitized.length === 0 ? fallback : sanitized.slice(0, 80); +} + +function compactIdentifier(value: string): string { + return value.replace(/[^A-Za-z0-9]/g, ""); +} + +function shellJoin(argv: readonly string[]): string { + return argv.map(shellQuote).join(" "); +} + +function shellQuote(value: string): string { + return `'${value.replace(/'/g, "'\\''")}'`; +} + +function trimTrailingEmptyLines(lines: string[]): string[] { + const trimmed = [...lines]; + while (trimmed.length > 0 && trimmed[trimmed.length - 1] === "") { + trimmed.pop(); + } + return trimmed; +} + +function firstAvailableTranscriptSeq( + lines: readonly string[], + anchor: TranscriptAnchor | undefined, + fromSeq: bigint, +): bigint { + if (anchor === undefined) { + if (fromSeq > 0n) { + throw transcriptHistoryUnavailable("session was recovered without transcript baseline"); + } + return 1n; + } + + if (anchor.lines.length === 0) { + if (fromSeq > 0n) { + throw transcriptHistoryUnavailable("transcript baseline is empty"); + } + return 1n; + } + + const anchorMatch = findAnchorMatch(lines, anchor, fromSeq); + if (anchorMatch === undefined) { + throw transcriptHistoryUnavailable("tmux history rolled, was cleared, or was truncated"); + } + + const firstSeq = + anchor.startSeq + BigInt(anchorMatch.anchorOffset) - BigInt(anchorMatch.historyIndex); + if (firstSeq < 1n) { + throw transcriptHistoryUnavailable("tmux history does not align with transcript baseline"); + } + return firstSeq; +} + +function findAnchorMatch( + lines: readonly string[], + anchor: TranscriptAnchor, + fromSeq: bigint, +): { anchorOffset: number; historyIndex: number } | undefined { + for (let anchorOffset = 0; anchorOffset < anchor.lines.length; anchorOffset += 1) { + const suffix = anchor.lines.slice(anchorOffset); + let match: { anchorOffset: number; historyIndex: number } | undefined; + for (let historyIndex = 0; historyIndex <= lines.length - suffix.length; historyIndex += 1) { + if (!suffix.every((line, offset) => lines[historyIndex + offset] === line)) { + continue; + } + const firstSeq = anchor.startSeq + BigInt(anchorOffset) - BigInt(historyIndex); + const lastSeq = firstSeq + BigInt(lines.length) - 1n; + if (firstSeq < 1n || fromSeq < firstSeq - 1n || fromSeq > lastSeq) { + continue; + } + if (match !== undefined) { + throw transcriptHistoryUnavailable( + "transcript baseline matches multiple history positions", + ); + } + match = { anchorOffset, historyIndex }; + } + if (match !== undefined) { + return match; + } + } + return undefined; +} + +function tailTranscriptAnchor( + firstAvailableSeq: bigint, + lines: readonly string[], +): TranscriptAnchor | undefined { + if (lines.length === 0) { + return undefined; + } + const tail = lines.slice(-transcriptAnchorLineLimit); + return { + startSeq: firstAvailableSeq + BigInt(lines.length - tail.length), + lines: tail, + }; +} + +function transcriptHistoryUnavailable(reason: string): DevflowError { + return new DevflowError("Tmux transcript history no longer contains requested cursor", { + class: "human_required", + code: "transcript_history_unavailable", + recoveryHint: reason, + }); +} + +const transcriptAnchorLineLimit = 200; + +function classifyTmuxFailure( + cause: unknown, + runId?: string, + recoverableCode = "pane_briefly_unresponsive", +): DevflowError { + if (cause instanceof DevflowError) { + return cause; + } + if (cause instanceof TmuxCommandError && cause.reason === "spawn_failed") { + return new DevflowError("tmux is unavailable", { + class: "human_required", + code: "backend_unavailable", + ...(runId === undefined ? {} : { runId }), + recoveryHint: "install tmux >= 3.3 or configure the tmux binary path", + cause, + }); + } + return new DevflowError("tmux session is briefly unresponsive", { + class: "recoverable", + code: recoverableCode, + ...(runId === undefined ? {} : { runId }), + recoveryHint: recoveryHint(cause), + cause, + }); +} + +function isMissingSessionFailure(error: unknown): boolean { + const hint = recoveryHint(error).toLowerCase(); + return ( + hint.includes("missing tmux session") || + hint.includes("can't find session") || + hint.includes("can't find pane") || + hint.includes("no server running") + ); +} + +function isMissingBufferFailure(error: unknown): boolean { + const hint = recoveryHint(error).toLowerCase(); + return hint.includes("can't find buffer") || hint.includes("no buffer"); +} + +function isUncertainPasteFailure(error: unknown): boolean { + return ( + error instanceof DevflowError && + error.cause instanceof TmuxCommandError && + error.cause.reason === "timeout" + ); +} + +function recoveryHint(error: unknown): string { + if (error instanceof DevflowError) { + if (error.recoveryHint !== undefined && error.recoveryHint.length > 0) { + return error.recoveryHint; + } + if (error.cause !== undefined) { + return recoveryHint(error.cause); + } + } + if (error instanceof TmuxCommandError && error.stderr !== undefined && error.stderr.length > 0) { + return error.stderr; + } + if (error instanceof Error) { + return error.message; + } + return "tmux command failed"; +}