diff --git a/.env.example b/.env.example index 8fe5953..1620589 100644 --- a/.env.example +++ b/.env.example @@ -4,3 +4,4 @@ LOG_LEVEL=info TEMPORAL_ADDRESS=localhost:7233 DEVFLOW_POSTGRES_PORT=55432 DEVFLOW_BACKENDS_JSON=[{"id":"fake","enabled":true}] +SESSION_MAX_HUNG_MS=1200000 diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 2ba8ec3..97f1d92 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -28,6 +28,7 @@ export interface StartM4ApiOptions { sessionManager?: SessionManager; runEngine?: RunEngine; maxConcurrentRuns?: number; + sessionMaxHungMs?: number; } export interface StartM4ApiResult { @@ -50,6 +51,7 @@ export interface StartTemporalApiOptions { availableBackends?: readonly BackendConfig[]; maxConcurrentRuns?: number; workspaceRoot?: string; + sessionMaxHungMs?: number; } export interface StartTemporalApiResult { @@ -69,6 +71,7 @@ export async function startM4Api(options: StartM4ApiOptions = {}): Promise { TEMPORAL_ADDRESS: "localhost:7233", WORKSPACE_ROOT: worktreeRoot, MAX_CONCURRENT_RUNS: 4, + SESSION_MAX_HUNG_MS: 20 * 60 * 1000, backends: [{ id: "fake", enabled: true }], }, dbClient: client, @@ -148,6 +149,7 @@ describe("startWorker", () => { TEMPORAL_ADDRESS: "localhost:7233", WORKSPACE_ROOT: realpathSync(mkdtempSync(join(tmpdir(), "devflow-worker-workspace-"))), MAX_CONCURRENT_RUNS: 4, + SESSION_MAX_HUNG_MS: 20 * 60 * 1000, backends: [{ id: "fake", enabled: true }], }, dbClient: client, @@ -165,6 +167,7 @@ describe("startWorker", () => { TEMPORAL_ADDRESS: "localhost:7233", WORKSPACE_ROOT: realpathSync(mkdtempSync(join(tmpdir(), "devflow-worker-workspace-"))), MAX_CONCURRENT_RUNS: 4, + SESSION_MAX_HUNG_MS: 20 * 60 * 1000, backends: [{ id: "fake", enabled: true }], }, dbClient: client, @@ -191,6 +194,7 @@ describe("startWorker", () => { TEMPORAL_ADDRESS: "localhost:7233", WORKSPACE_ROOT: workspaceRoot, MAX_CONCURRENT_RUNS: 4, + SESSION_MAX_HUNG_MS: 20 * 60 * 1000, backends: [{ id: "fake", enabled: true }], }, dbClient: client, @@ -211,6 +215,7 @@ describe("startWorker", () => { TEMPORAL_ADDRESS: "localhost:7233", WORKSPACE_ROOT: workspaceRoot, MAX_CONCURRENT_RUNS: 4, + SESSION_MAX_HUNG_MS: 20 * 60 * 1000, backends: [{ id: "fake", enabled: true }], }, dbClient: client, @@ -233,6 +238,7 @@ describe("startWorker", () => { TEMPORAL_ADDRESS: "localhost:7233", WORKSPACE_ROOT: workspaceRoot, MAX_CONCURRENT_RUNS: 4, + SESSION_MAX_HUNG_MS: 20 * 60 * 1000, backends: [{ id: "fake", enabled: true }], }, dbClient: client, @@ -251,6 +257,7 @@ describe("startWorker", () => { TEMPORAL_ADDRESS: "localhost:7233", WORKSPACE_ROOT: workspaceRoot, MAX_CONCURRENT_RUNS: 4, + SESSION_MAX_HUNG_MS: 20 * 60 * 1000, backends: [{ id: "fake", enabled: true }], }, dbClient: client, diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index 4f3941c..2820689 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -51,6 +51,7 @@ export async function startWorker(options: StartWorkerOptions = {}) { workspaceRoot: config.WORKSPACE_ROOT, availableBackends: config.backends, maxConcurrentRuns: config.MAX_CONCURRENT_RUNS, + recovery: { maxHungMs: config.SESSION_MAX_HUNG_MS }, }), connection: connection as NativeConnection, namespace: "devflow", diff --git a/packages/core/src/config.test.ts b/packages/core/src/config.test.ts index d783ffa..908f69e 100644 --- a/packages/core/src/config.test.ts +++ b/packages/core/src/config.test.ts @@ -50,6 +50,26 @@ describe("config loader", () => { }); expect(config.backends).toContainEqual({ id: "fake", enabled: true }); + expect(config.SESSION_MAX_HUNG_MS).toBe(20 * 60 * 1000); + }); + + it("loads configurable session hung timeout", () => { + const root = mkdtempSync(join(tmpdir(), "devflow-config-")); + const workspace = join(root, "workspace"); + mkdirSync(workspace); + + const config = loadConfigFromSources({ + cwd: root, + env: { + DATABASE_URL: "postgres://devflow:devflow@localhost:5432/devflow", + WORKSPACE_ROOT: workspace, + LOG_LEVEL: "info", + TEMPORAL_ADDRESS: "localhost:7233", + SESSION_MAX_HUNG_MS: "2500", + }, + }); + + expect(config.SESSION_MAX_HUNG_MS).toBe(2500); }); it("resolves backend binaries from PATH during config load", () => { diff --git a/packages/core/src/config.ts b/packages/core/src/config.ts index 13a7532..12a6e4b 100644 --- a/packages/core/src/config.ts +++ b/packages/core/src/config.ts @@ -22,6 +22,11 @@ const RawConfigSchema = z.object({ LOG_LEVEL: LogLevel, TEMPORAL_ADDRESS: z.string().min(1), MAX_CONCURRENT_RUNS: z.coerce.number().int().positive().default(4), + SESSION_MAX_HUNG_MS: z.coerce + .number() + .int() + .positive() + .default(20 * 60 * 1000), backends: z.array(BackendConfig).default([{ id: "fake", enabled: true }]), }); diff --git a/packages/run-engine/src/engine.test.ts b/packages/run-engine/src/engine.test.ts index 4fa354b..35e5fce 100644 --- a/packages/run-engine/src/engine.test.ts +++ b/packages/run-engine/src/engine.test.ts @@ -32,6 +32,7 @@ import { } from "@devflow/db"; import { FakeSessionAdapter, + type ProbeResult, type SessionAdapter, type SessionHandle, SessionManager, @@ -161,6 +162,16 @@ class CaptureFailsAfterDisposeFakeAdapter extends FakeSessionAdapter { } } +class RecentlyHungProbeFakeAdapter extends FakeSessionAdapter { + override async probe(handle: SessionHandle): Promise { + const result = await super.probe(handle); + if (!result.alive || !result.paneActive) { + return result; + } + return { ...result, lastOutputAt: new Date(Date.now() - 5) }; + } +} + class TerminalHandleRecordingRuntime implements SessionRuntime { readonly adapter = new FakeSessionAdapter({ writeDelayMs: 0 }); readonly captureHandles: SessionHandle[] = []; @@ -480,6 +491,43 @@ describe("DbRunEngine", () => { }); }); + it("passes configured session hung timeout into phase recovery", async () => { + client = createDbClient(databaseUrl); + await seedDevelopmentRegistry(client.db); + const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-"))); + const repoPath = createGitRepo(); + tempRoots.push(workspaceRoot, repoPath); + const engine = new DbRunEngine({ + db: client.db, + sessions: sessionRuntime(client.db, new RecentlyHungProbeFakeAdapter({ writeDelayMs: 0 })), + maxConcurrentRuns: 100, + recovery: { maxHungMs: 1 }, + workspaceRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 10 }, + }); + + const { runId } = await engine.startRun({ + requirementsMd: "Classify timeout recovery using the configured hung threshold.", + repoPath, + baseBranch: "main", + scenarios: { + spec: { scenario: "timeout", repairScenario: "ok" }, + }, + }); + runIds.push(runId); + + const status = await engine.getStatus(runId); + expect(status.run.state).toBe("awaiting_approval"); + expect(pendingApproval(status, "spec_approved")).toBeDefined(); + + const [timeoutEvent] = await client.db + .select({ payload: runEvents.payload }) + .from(runEvents) + .where(and(eq(runEvents.runId, runId), eq(runEvents.type, "artifact.timeout"))) + .limit(1); + expect(timeoutEvent?.payload).toMatchObject({ sessionState: "HUNG" }); + }); + it("validates a prepared run replay without accepting changed start inputs", async () => { client = createDbClient(databaseUrl); await seedDevelopmentRegistry(client.db); diff --git a/packages/run-engine/src/engine.ts b/packages/run-engine/src/engine.ts index 3a74c50..7f42078 100644 --- a/packages/run-engine/src/engine.ts +++ b/packages/run-engine/src/engine.ts @@ -96,6 +96,9 @@ export interface DbRunEngineOptions { workspaceRoot: string; availableBackends?: readonly BackendConfig[]; maxConcurrentRuns?: number; + recovery?: { + maxHungMs?: number; + }; wait?: { timeoutMs?: number; pollIntervalMs?: number; @@ -179,6 +182,7 @@ export class DbRunEngine implements RunEngine { private readonly workspaceRoot: string; private readonly availableBackends: readonly BackendConfig[]; private readonly maxConcurrentRuns: number; + private readonly recovery: DbRunEngineOptions["recovery"]; private readonly wait: DbRunEngineOptions["wait"]; constructor(options: DbRunEngineOptions) { @@ -189,6 +193,7 @@ export class DbRunEngine implements RunEngine { { id: "fake", enabled: true, binaryPath: undefined }, ]; this.maxConcurrentRuns = options.maxConcurrentRuns ?? 4; + this.recovery = options.recovery; this.wait = options.wait; } @@ -911,6 +916,7 @@ export class DbRunEngine implements RunEngine { context.input.requirementsMd, scenarioForPhase(context.input.extra, phaseRow.phaseKey), ), + ...(this.recovery === undefined ? {} : { recovery: this.recovery }), ...(wait === undefined ? {} : { wait }), terminalRun: false, ...(workflowApprovalGateKey === undefined diff --git a/packages/run-engine/src/fake-phase-harness.test.ts b/packages/run-engine/src/fake-phase-harness.test.ts index ed181eb..d8823c4 100644 --- a/packages/run-engine/src/fake-phase-harness.test.ts +++ b/packages/run-engine/src/fake-phase-harness.test.ts @@ -55,6 +55,19 @@ class RebootstrapFailsOnceFakeAdapter extends FakeSessionAdapter { } } +class RebootstrapAlwaysRecoverableFakeAdapter extends FakeSessionAdapter { + rebootstrapAttempts = 0; + + override async rebootstrap(_handle: SessionHandle): Promise { + this.rebootstrapAttempts += 1; + throw new DevflowError("rebootstrap retry budget exhausted", { + class: "recoverable", + code: "pane_briefly_unresponsive", + recoveryHint: "pane did not become responsive after rebootstrap", + }); + } +} + class RebootstrapWritesStaleArtifactFakeAdapter extends FakeSessionAdapter { private expectedArtifactPath: string | undefined; @@ -98,6 +111,15 @@ class RebootstrapHumanRequiredFakeAdapter extends FakeSessionAdapter { } } +class RebootstrapCountingFakeAdapter extends FakeSessionAdapter { + rebootstrapAttempts = 0; + + override async rebootstrap(handle: SessionHandle): Promise { + this.rebootstrapAttempts += 1; + return super.rebootstrap(handle); + } +} + class ProbeRecoverableFakeAdapter extends FakeSessionAdapter { override async probe(_handle: SessionHandle): Promise { throw new DevflowError("recoverable probe failure", { @@ -113,6 +135,18 @@ class ProbeUnknownFailureFakeAdapter extends FakeSessionAdapter { } } +class ProbeFailsOnceThenRecoversFakeAdapter extends FakeSessionAdapter { + private probeAttempts = 0; + + override async probe(handle: SessionHandle): Promise { + this.probeAttempts += 1; + if (this.probeAttempts === 1) { + throw new Error("first probe failed unexpectedly"); + } + return super.probe(handle); + } +} + class TmuxLivenessOnlyProbeFakeAdapter extends FakeSessionAdapter { rebootstrapAttempts = 0; @@ -126,6 +160,19 @@ class TmuxLivenessOnlyProbeFakeAdapter extends FakeSessionAdapter { } } +class DeadPaneProbeFakeAdapter extends FakeSessionAdapter { + rebootstrapAttempts = 0; + + override async probe(_handle: SessionHandle): Promise { + return { alive: false, paneActive: false, hint: "pane exited after prompt" }; + } + + override async rebootstrap(handle: SessionHandle): Promise { + this.rebootstrapAttempts += 1; + return super.rebootstrap(handle); + } +} + class BreakArtifactParentFakeAdapter extends FakeSessionAdapter { override async sendPrompt( handle: SessionHandle, @@ -1685,6 +1732,72 @@ describe("runSingleFakePhase", () => { .from(approvalRequests) .where(eq(approvalRequests.runId, runId)); expect(approval).toEqual({ gateKey: "prompt_send_exhausted", state: "pending" }); + + const [session] = await db + .select({ state: tuiSessions.state }) + .from(tuiSessions) + .where(eq(tuiSessions.id, sessionId)); + expect(session).toEqual({ state: "FAILED_NEEDS_HUMAN" }); + + const events = await db + .select({ type: runEvents.type }) + .from(runEvents) + .where(eq(runEvents.runId, runId)) + .orderBy(runEvents.seq); + expect(events.map((event) => event.type)).not.toContain("session.crashed"); + }); + + it("does not emit a duplicate crash event when a crashed session resume exhausts retries", async () => { + const { db, phaseId, runId } = await createRunAndPhase(); + const worktreeRoot = realpathSync( + mkdtempSync(join(tmpdir(), "devflow-fake-phase-crashed-resume-fails-")), + ); + tempRoots.push(worktreeRoot); + const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json"); + const sessionId = randomUUID(); + await db.insert(tuiSessions).values({ + id: sessionId, + runId, + roleId: "implementer", + backend: "fake", + cwd: worktreeRoot, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + recoveryAttempts: 1, + state: "CRASHED", + }); + const adapter = new ResumeFailsFakeAdapter(); + + await expect( + runSingleFakePhase({ + adapter, + db, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions: "Scenario: ok\nCrashed session resume still fails.", + phaseId, + phaseKey: "implement", + roleId: "implementer", + runId, + worktreeRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 }, + uuidFactory: () => "00000000-0000-4000-8000-000000000045", + }), + ).rejects.toMatchObject({ code: "prompt_send_exhausted" }); + + expect(adapter.resumeAttempts).toBe(3); + const [session] = await db + .select({ recoveryAttempts: tuiSessions.recoveryAttempts, state: tuiSessions.state }) + .from(tuiSessions) + .where(eq(tuiSessions.id, sessionId)); + expect(session).toEqual({ recoveryAttempts: 1, state: "FAILED_NEEDS_HUMAN" }); + + const events = await db + .select({ type: runEvents.type }) + .from(runEvents) + .where(eq(runEvents.runId, runId)) + .orderBy(runEvents.seq); + expect(events.map((event) => event.type)).not.toContain("session.crashed"); }); it("resumes a running phase when the crash happened before session creation", async () => { @@ -2358,10 +2471,20 @@ describe("runSingleFakePhase", () => { await expectRunPaused(db, runId); const [approval] = await db - .select({ gateKey: approvalRequests.gateKey, state: approvalRequests.state }) + .select({ + gateKey: approvalRequests.gateKey, + payload: approvalRequests.payload, + state: approvalRequests.state, + }) .from(approvalRequests) .where(eq(approvalRequests.runId, runId)); - expect(approval).toEqual({ gateKey: "artifact_invalid_after_repair", state: "pending" }); + expect(approval).toEqual({ + gateKey: "artifact_invalid_after_repair", + payload: expect.objectContaining({ + recoveryHint: expect.stringContaining(expectedArtifactPath), + }), + state: "pending", + }); }); it("revalidates an artifact file when validating replay has no artifact row yet", async () => { @@ -3068,6 +3191,51 @@ describe("runSingleFakePhase", () => { await expectRunCompleted(db, runId); }); + it("preserves rebootstrap exhaustion recovery hints in the human gate", async () => { + const { db, phaseId, runId } = await createRunAndPhase(); + const worktreeRoot = realpathSync( + mkdtempSync(join(tmpdir(), "devflow-fake-phase-rebootstrap-exhausted-")), + ); + tempRoots.push(worktreeRoot); + const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json"); + const sessionId = randomUUID(); + const adapter = new RebootstrapAlwaysRecoverableFakeAdapter({ + sessionIdFactory: () => sessionId, + writeDelayMs: 0, + }); + + await expect( + runSingleFakePhase({ + adapter, + db, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions: "Scenario: timeout\nRepair-Scenario: ok\nRebootstrap never recovers.", + phaseId, + phaseKey: "implement", + roleId: "implementer", + runId, + worktreeRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 10 }, + uuidFactory: () => "00000000-0000-4000-8000-000000000041", + }), + ).rejects.toMatchObject({ code: "artifact_timeout_exhausted" }); + + expect(adapter.rebootstrapAttempts).toBe(2); + await expectRunPaused(db, runId); + + const [approval] = await db + .select({ gateKey: approvalRequests.gateKey, payload: approvalRequests.payload }) + .from(approvalRequests) + .where(eq(approvalRequests.runId, runId)); + expect(approval).toEqual({ + gateKey: "artifact_timeout_exhausted", + payload: expect.objectContaining({ + recoveryHint: "pane did not become responsive after rebootstrap", + }), + }); + }); + it("fails the run instead of gating when timeout recovery rebootstrap is fatal", async () => { const { db, phaseId, runId } = await createRunAndPhase(); const worktreeRoot = realpathSync( @@ -3178,10 +3346,18 @@ describe("runSingleFakePhase", () => { await expectRunPaused(db, runId); const [approval] = await db - .select({ gateKey: approvalRequests.gateKey, state: approvalRequests.state }) + .select({ + gateKey: approvalRequests.gateKey, + payload: approvalRequests.payload, + state: approvalRequests.state, + }) .from(approvalRequests) .where(eq(approvalRequests.runId, runId)); - expect(approval).toEqual({ gateKey: "backend_unavailable", state: "pending" }); + expect(approval).toEqual({ + gateKey: "backend_unavailable", + payload: expect.objectContaining({ recoveryHint: "human rebootstrap failure" }), + state: "pending", + }); }); it("requests a human gate when timeout recovery probe fails recoverably", async () => { @@ -3232,6 +3408,13 @@ describe("runSingleFakePhase", () => { .from(approvalRequests) .where(eq(approvalRequests.runId, runId)); expect(approval).toEqual({ gateKey: "artifact_timeout_exhausted", state: "pending" }); + + const events = await db + .select({ type: runEvents.type }) + .from(runEvents) + .where(eq(runEvents.runId, runId)) + .orderBy(runEvents.seq); + expect(events.map((event) => event.type)).not.toContain("session.crashed"); }); it("does not rebootstrap when tmux probe only proves pane liveness", async () => { @@ -3274,6 +3457,508 @@ describe("runSingleFakePhase", () => { expect(approval).toEqual({ gateKey: "artifact_timeout_exhausted", state: "pending" }); }); + it("recovers a post-prompt crashed session through rebootstrap", async () => { + const { db, phaseId, runId } = await createRunAndPhase(); + const worktreeRoot = realpathSync( + mkdtempSync(join(tmpdir(), "devflow-fake-phase-dead-pane-timeout-")), + ); + tempRoots.push(worktreeRoot); + const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json"); + const sessionId = randomUUID(); + const adapter = new DeadPaneProbeFakeAdapter({ + sessionIdFactory: () => sessionId, + writeDelayMs: 0, + }); + + const result = await runSingleFakePhase({ + adapter, + db, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions: "Scenario: timeout\nRepair-Scenario: ok\nPane exits after accepting prompt.", + phaseId, + phaseKey: "implement", + roleId: "implementer", + runId, + worktreeRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 10 }, + uuidFactory: () => "00000000-0000-4000-8000-000000000047", + }); + + expect(result.artifactValid).toBe(true); + expect(adapter.rebootstrapAttempts).toBe(1); + await expectRunCompleted(db, runId); + + const [session] = await db + .select({ state: tuiSessions.state }) + .from(tuiSessions) + .where(eq(tuiSessions.id, sessionId)); + expect(session).toEqual({ state: "READY" }); + + const events = await db + .select({ type: runEvents.type }) + .from(runEvents) + .where(eq(runEvents.runId, runId)) + .orderBy(runEvents.seq); + expect(events.map((event) => event.type)).toEqual( + expect.arrayContaining(["session.crashed", "session.recovered", "artifact.validated"]), + ); + expect(events.map((event) => event.type)).not.toContain("approval.requested"); + }); + + it.each(["ARTIFACT_TIMEOUT", "HUNG"] as const)( + "recovers a dead pane when replay starts from %s", + async (sessionState) => { + const { db, phaseId, runId } = await createRunAndPhase("executing", "awaiting_artifact", 1); + const worktreeRoot = realpathSync( + mkdtempSync( + join(tmpdir(), `devflow-fake-phase-dead-${sessionState.toLowerCase()}-replay-`), + ), + ); + tempRoots.push(worktreeRoot); + const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json"); + const sessionId = randomUUID(); + const instructions = `Scenario: ok\nReplay dead pane from ${sessionState}.`; + const dedupKey = hash({ + attempt: 1, + expectedArtifact: expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions, + phaseKey: "implement", + roleId: "implementer", + runId, + }); + await new RunEventRepository(db).append({ + idempotencyKey: `phase.started:${phaseId}:1`, + phaseId, + payload: { attempt: 1, phaseKey: "implement" }, + runId, + type: "phase.started", + }); + await db.insert(tuiSessions).values({ + id: sessionId, + runId, + roleId: "implementer", + backend: "fake", + cwd: worktreeRoot, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + lastPromptHash: dedupKey, + lastPromptAt: new Date("2026-05-13T00:00:00.000Z"), + state: sessionState, + }); + const adapter = new DeadPaneProbeFakeAdapter({ + sessionIdFactory: () => sessionId, + writeDelayMs: 0, + }); + await adapter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "fake", + cwd: worktreeRoot, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + }); + + const result = await runSingleFakePhase({ + adapter, + db, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions, + phaseId, + phaseKey: "implement", + roleId: "implementer", + runId, + worktreeRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 }, + }); + + expect(result.artifactValid).toBe(true); + expect(adapter.rebootstrapAttempts).toBe(1); + + const events = await db + .select({ type: runEvents.type }) + .from(runEvents) + .where(eq(runEvents.runId, runId)) + .orderBy(runEvents.seq); + expect(events.map((event) => event.type)).toContain("session.recovered"); + expect(events.map((event) => event.type)).toContain("artifact.validated"); + expect(events.map((event) => event.type)).not.toContain("approval.requested"); + }, + ); + + it("gates awaiting-artifact replay when the persisted session is already timed out", async () => { + const { db, phaseId, runId } = await createRunAndPhase("executing", "awaiting_artifact", 1); + const worktreeRoot = realpathSync( + mkdtempSync(join(tmpdir(), "devflow-fake-phase-timeout-replay-")), + ); + tempRoots.push(worktreeRoot); + const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json"); + mkdirSync(dirname(expectedArtifactPath), { recursive: true }); + writeFileSync( + expectedArtifactPath, + JSON.stringify({ + summary: "Late artifact", + requirements: [{ id: "REQ-1", description: "Do not accept stale timeout output" }], + acceptanceCriteria: ["Gate instead of skipping recovery"], + risks: [], + }), + ); + const sessionId = randomUUID(); + const instructions = "Scenario: ok\nReplay a timed-out phase."; + const dedupKey = hash({ + attempt: 1, + expectedArtifact: expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions, + phaseKey: "implement", + roleId: "implementer", + runId, + }); + await new RunEventRepository(db).append({ + idempotencyKey: `phase.started:${phaseId}:1`, + phaseId, + payload: { attempt: 1, phaseKey: "implement" }, + runId, + type: "phase.started", + }); + await db.insert(tuiSessions).values({ + id: sessionId, + runId, + roleId: "implementer", + backend: "fake", + cwd: worktreeRoot, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + lastPromptHash: dedupKey, + lastPromptAt: new Date("2026-05-13T00:00:00.000Z"), + state: "ARTIFACT_TIMEOUT", + }); + + await expect( + runSingleFakePhase({ + adapter: new FakeSessionAdapter({ sessionIdFactory: () => sessionId, writeDelayMs: 0 }), + db, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions, + phaseId, + phaseKey: "implement", + roleId: "implementer", + runId, + worktreeRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 10 }, + uuidFactory: () => "00000000-0000-4000-8000-000000000042", + }), + ).rejects.toMatchObject({ code: "artifact_timeout_exhausted" }); + + await expectRunPaused(db, runId); + const [session] = await db + .select({ state: tuiSessions.state }) + .from(tuiSessions) + .where(eq(tuiSessions.id, sessionId)); + expect(session).toEqual({ state: "FAILED_NEEDS_HUMAN" }); + + const events = await db + .select({ type: runEvents.type }) + .from(runEvents) + .where(eq(runEvents.runId, runId)) + .orderBy(runEvents.seq); + expect(events.map((event) => event.type)).not.toContain("session.crashed"); + }); + + it("continues timeout recovery replay when the session was already resuming", async () => { + const { db, phaseId, runId } = await createRunAndPhase("executing", "awaiting_artifact", 1); + const worktreeRoot = realpathSync( + mkdtempSync(join(tmpdir(), "devflow-fake-phase-resuming-timeout-replay-")), + ); + tempRoots.push(worktreeRoot); + const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json"); + mkdirSync(dirname(expectedArtifactPath), { recursive: true }); + writeFileSync( + expectedArtifactPath, + JSON.stringify({ + summary: "Late artifact while resuming", + requirements: [{ id: "REQ-1", description: "Do not accept stale resuming output" }], + acceptanceCriteria: ["Continue recovery and request fresh repair output"], + risks: [], + }), + ); + const sessionId = randomUUID(); + const instructions = "Scenario: ok\nReplay an already-resuming timeout recovery."; + const dedupKey = hash({ + attempt: 1, + expectedArtifact: expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions, + phaseKey: "implement", + roleId: "implementer", + runId, + }); + await new RunEventRepository(db).append({ + idempotencyKey: `phase.started:${phaseId}:1`, + phaseId, + payload: { attempt: 1, phaseKey: "implement" }, + runId, + type: "phase.started", + }); + await db.insert(tuiSessions).values({ + id: sessionId, + runId, + roleId: "implementer", + backend: "fake", + cwd: worktreeRoot, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + lastPromptHash: dedupKey, + lastPromptAt: new Date("2026-05-13T00:00:00.000Z"), + state: "RESUMING", + }); + const adapter = new RebootstrapCountingFakeAdapter({ + sessionIdFactory: () => sessionId, + writeDelayMs: 0, + }); + await adapter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "fake", + cwd: worktreeRoot, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + }); + + const result = await runSingleFakePhase({ + adapter, + db, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions, + phaseId, + phaseKey: "implement", + roleId: "implementer", + runId, + worktreeRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 }, + uuidFactory: () => "00000000-0000-4000-8000-000000000046", + }); + + expect(result.artifactValid).toBe(true); + expect(adapter.rebootstrapAttempts).toBe(1); + const artifactRows = await db + .select({ id: artifacts.id, valid: artifacts.valid }) + .from(artifacts) + .where(eq(artifacts.runId, runId)); + expect(artifactRows).toHaveLength(1); + expect(artifactRows[0]?.valid).toBe(true); + + const [phase] = await db + .select({ attempts: runPhases.attempts, state: runPhases.state }) + .from(runPhases) + .where(eq(runPhases.id, phaseId)); + expect(phase).toEqual({ attempts: 2, state: "completed" }); + + const events = await db + .select({ type: runEvents.type }) + .from(runEvents) + .where(eq(runEvents.runId, runId)) + .orderBy(runEvents.seq); + expect(events.map((event) => event.type)).toContain("session.recovered"); + expect(events.map((event) => event.type)).toContain("artifact.validated"); + }); + + it.each([ + ["HUNG", 1], + ["CRASHED", 1], + ["REBOOTSTRAPPED", 0], + ] as const)( + "continues timeout recovery replay when the session is %s", + async (sessionState, expectedRebootstrapAttempts) => { + const { db, phaseId, runId } = await createRunAndPhase("executing", "awaiting_artifact", 1); + const worktreeRoot = realpathSync( + mkdtempSync( + join(tmpdir(), `devflow-fake-phase-${sessionState.toLowerCase()}-timeout-replay-`), + ), + ); + tempRoots.push(worktreeRoot); + const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json"); + mkdirSync(dirname(expectedArtifactPath), { recursive: true }); + writeFileSync( + expectedArtifactPath, + JSON.stringify({ + summary: `Late artifact while ${sessionState}`, + requirements: [{ id: "REQ-1", description: "Do not accept stale replay output" }], + acceptanceCriteria: ["Continue recovery and require fresh repair output"], + risks: [], + }), + ); + const sessionId = randomUUID(); + const instructions = `Scenario: ok\nReplay ${sessionState} timeout recovery.`; + const dedupKey = hash({ + attempt: 1, + expectedArtifact: expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions, + phaseKey: "implement", + roleId: "implementer", + runId, + }); + await new RunEventRepository(db).append({ + idempotencyKey: `phase.started:${phaseId}:1`, + phaseId, + payload: { attempt: 1, phaseKey: "implement" }, + runId, + type: "phase.started", + }); + await db.insert(tuiSessions).values({ + id: sessionId, + runId, + roleId: "implementer", + backend: "fake", + cwd: worktreeRoot, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + lastPromptHash: dedupKey, + lastPromptAt: new Date("2026-05-13T00:00:00.000Z"), + state: sessionState, + }); + const adapter = new RebootstrapCountingFakeAdapter({ + sessionIdFactory: () => sessionId, + writeDelayMs: 0, + }); + await adapter.start({ + sessionId, + runId, + roleId: "implementer", + backend: "fake", + cwd: worktreeRoot, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + }); + + const result = await runSingleFakePhase({ + adapter, + db, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions, + phaseId, + phaseKey: "implement", + roleId: "implementer", + runId, + worktreeRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 }, + }); + + expect(result.artifactValid).toBe(true); + expect(adapter.rebootstrapAttempts).toBe(expectedRebootstrapAttempts); + + const [phase] = await db + .select({ attempts: runPhases.attempts, state: runPhases.state }) + .from(runPhases) + .where(eq(runPhases.id, phaseId)); + expect(phase).toEqual({ attempts: 2, state: "completed" }); + + const events = await db + .select({ type: runEvents.type }) + .from(runEvents) + .where(eq(runEvents.runId, runId)) + .orderBy(runEvents.seq); + expect(events.map((event) => event.type)).toContain("session.recovered"); + expect(events.map((event) => event.type)).toContain("artifact.validated"); + expect(events.map((event) => event.type)).not.toContain("session.crashed"); + }, + ); + + it("does not accept a stale timeout artifact after session recovery replay", async () => { + const { db, phaseId, runId } = await createRunAndPhase("executing", "awaiting_artifact", 1); + const worktreeRoot = realpathSync( + mkdtempSync(join(tmpdir(), "devflow-fake-phase-recovered-timeout-replay-")), + ); + tempRoots.push(worktreeRoot); + const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json"); + mkdirSync(dirname(expectedArtifactPath), { recursive: true }); + writeFileSync( + expectedArtifactPath, + JSON.stringify({ + summary: "Late artifact after recovery", + requirements: [{ id: "REQ-1", description: "Do not complete from stale output" }], + acceptanceCriteria: ["Require fresh repair output"], + risks: [], + }), + ); + const sessionId = randomUUID(); + const instructions = "Scenario: ok\nReplay after recovered timeout."; + const dedupKey = hash({ + attempt: 1, + expectedArtifact: expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions, + phaseKey: "implement", + roleId: "implementer", + runId, + }); + const repository = new RunEventRepository(db); + await repository.append({ + idempotencyKey: `phase.started:${phaseId}:1`, + phaseId, + payload: { attempt: 1, phaseKey: "implement" }, + runId, + type: "phase.started", + }); + await repository.append({ + idempotencyKey: `session.recovered:${sessionId}:1`, + phaseId, + payload: { recoveryAttempts: 1, roleId: "implementer", sessionId }, + runId, + type: "session.recovered", + }); + await db.insert(tuiSessions).values({ + id: sessionId, + runId, + roleId: "implementer", + backend: "fake", + cwd: worktreeRoot, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + lastPromptHash: dedupKey, + lastPromptAt: new Date("2026-05-13T00:00:00.000Z"), + recoveryAttempts: 1, + state: "READY", + }); + + await expect( + runSingleFakePhase({ + adapter: new FakeSessionAdapter({ sessionIdFactory: () => sessionId, writeDelayMs: 0 }), + db, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions, + phaseId, + phaseKey: "implement", + roleId: "implementer", + runId, + worktreeRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 10 }, + uuidFactory: () => "00000000-0000-4000-8000-000000000043", + }), + ).rejects.toMatchObject({ code: "prompt_send_exhausted" }); + + const artifactRows = await db + .select({ id: artifacts.id }) + .from(artifacts) + .where(eq(artifacts.runId, runId)); + expect(artifactRows).toEqual([]); + + const [phase] = await db + .select({ attempts: runPhases.attempts, state: runPhases.state }) + .from(runPhases) + .where(eq(runPhases.id, phaseId)); + expect(phase).toEqual({ attempts: 2, state: "failed" }); + }); + it("fails the run when timeout recovery probe throws an unclassified error", async () => { const { db, phaseId, runId } = await createRunAndPhase(); const worktreeRoot = realpathSync( @@ -3313,6 +3998,47 @@ describe("runSingleFakePhase", () => { expect(approvals).toEqual([]); }); + it("does not swallow an unclassified timeout-classification probe failure when a later probe would recover", async () => { + const { db, phaseId, runId } = await createRunAndPhase(); + const worktreeRoot = realpathSync( + mkdtempSync(join(tmpdir(), "devflow-fake-phase-probe-first-unknown-")), + ); + tempRoots.push(worktreeRoot); + const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json"); + const sessionId = randomUUID(); + + await expect( + runSingleFakePhase({ + adapter: new ProbeFailsOnceThenRecoversFakeAdapter({ + sessionIdFactory: () => sessionId, + writeDelayMs: 0, + }), + db, + expectedArtifactPath, + expectedSchema: "dev/spec@1", + instructions: + "Scenario: timeout\nRepair-Scenario: ok\nFirst classification probe fails unexpectedly.", + phaseId, + phaseKey: "implement", + roleId: "implementer", + runId, + worktreeRoot, + wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 10 }, + uuidFactory: () => "00000000-0000-4000-8000-000000000038", + }), + ).rejects.toMatchObject({ code: "internal_state_corruption" }); + + const [run] = await db.select({ state: runs.state }).from(runs).where(eq(runs.id, runId)); + expect(run).toEqual({ state: "failed" }); + + const events = await db + .select({ type: runEvents.type }) + .from(runEvents) + .where(eq(runEvents.runId, runId)) + .orderBy(runEvents.seq); + expect(events.map((event) => event.type)).not.toContain("session.recovered"); + }); + it("does not let a stale artifact produced during timeout recovery satisfy repair validation", async () => { const { db, phaseId, runId } = await createRunAndPhase(); const worktreeRoot = realpathSync( diff --git a/packages/run-engine/src/fake-phase-harness.ts b/packages/run-engine/src/fake-phase-harness.ts index 1668c6c..f5d5f66 100644 --- a/packages/run-engine/src/fake-phase-harness.ts +++ b/packages/run-engine/src/fake-phase-harness.ts @@ -28,7 +28,12 @@ import { SessionManager, type SessionRuntime, type TranscriptChunkSink, + assertSessionStateAssignment, + assertSessionTransition, captureAndPersistTranscript, + isAllowedSessionTransition, + isSessionHung, + retryRecoverable, } from "@devflow/session"; import { and, desc, eq, inArray, sql } from "drizzle-orm"; @@ -44,6 +49,10 @@ interface ArtifactWaitOptions extends FakePhaseWaitOptions { ignoreInitialSignature?: string; } +interface FakePhaseRecoveryOptions { + maxHungMs?: number; +} + interface RunSingleFakePhaseBaseInput { db: DbClient["db"]; runId: string; @@ -55,6 +64,7 @@ interface RunSingleFakePhaseBaseInput { expectedSchema: string; instructions: string; wait?: FakePhaseWaitOptions; + recovery?: FakePhaseRecoveryOptions; uuidFactory?: () => string; transcriptSink?: TranscriptChunkSink; terminalRun?: boolean; @@ -81,7 +91,6 @@ export interface RunSingleFakePhaseResult { type TransactionDb = Parameters[0]>[0]; -const sendPromptRetryBudget = 2; const terminalRunStates = ["completed", "failed", "aborted"] as const; const phaseMutationRunStates = ["executing", "planning"] as const; @@ -298,9 +307,9 @@ export async function runSingleFakePhase( throw error; } - let recovered: boolean; + let recovery: ArtifactTimeoutRecoveryResult; try { - recovered = await recoverFromArtifactTimeout(input, eventRepository, handle.sessionId); + recovery = await recoverFromArtifactTimeout(input, eventRepository, handle.sessionId); } catch (recoveryError) { if (isRunStateChanged(recoveryError)) { await captureTranscript(input, handle); @@ -333,7 +342,7 @@ export async function runSingleFakePhase( ); throw recoveryError; } - if (!recovered) { + if (!recovery.recovered) { await failPhaseAndRequestGate( input, eventRepository, @@ -342,9 +351,9 @@ export async function runSingleFakePhase( "artifact_timeout_exhausted", { expectedArtifactPath: input.expectedArtifactPath, + recoveryHint: recovery.recoveryHint ?? input.expectedArtifactPath, }, handle.sessionId, - { markSessionCrashed: true }, ); await captureTranscript(input, handle); throw error; @@ -358,6 +367,7 @@ export async function runSingleFakePhase( "artifact_timeout_exhausted", { expectedArtifactPath: input.expectedArtifactPath, + recoveryHint: input.expectedArtifactPath, }, handle.sessionId, ); @@ -470,6 +480,7 @@ export async function runSingleFakePhase( "artifact_timeout_exhausted", { expectedArtifactPath: input.expectedArtifactPath, + recoveryHint: input.expectedArtifactPath, }, handle.sessionId, ); @@ -511,6 +522,7 @@ export async function runSingleFakePhase( { artifactId: outcome.artifact.id, expectedArtifactPath: input.expectedArtifactPath, + recoveryHint: artifactInvalidRecoveryHint(input, outcome), }, handle.sessionId, ); @@ -520,6 +532,7 @@ export async function runSingleFakePhase( code: "artifact_invalid_after_repair", runId: input.runId, phaseId: input.phaseId, + recoveryHint: artifactInvalidRecoveryHint(input, outcome), }); } @@ -621,6 +634,7 @@ export async function runSingleFakePhase( "artifact_timeout_exhausted", { expectedArtifactPath: input.expectedArtifactPath, + recoveryHint: input.expectedArtifactPath, }, handle.sessionId, ); @@ -667,6 +681,7 @@ export async function runSingleFakePhase( { artifactId: outcome.artifact.id, expectedArtifactPath: input.expectedArtifactPath, + recoveryHint: artifactInvalidRecoveryHint(input, outcome), }, handle.sessionId, ); @@ -676,6 +691,7 @@ export async function runSingleFakePhase( code: "artifact_invalid_after_repair", runId: input.runId, phaseId: input.phaseId, + recoveryHint: artifactInvalidRecoveryHint(input, outcome), }); } @@ -833,6 +849,18 @@ async function enterInitialPhase( }) .from(tuiSessions) .where(and(eq(tuiSessions.runId, input.runId), eq(tuiSessions.roleId, input.roleId))); + if ( + session !== undefined && + (isTimeoutRecoverySessionState(session.state) || + (session.state === "READY" && (await sessionRecoveredEventExists(input, session.id)))) && + session.lastPromptHash === envelope.dedupKey && + session.expectedArtifactPath === input.expectedArtifactPath && + session.expectedSchema === input.expectedSchema + ) { + return recoverAwaitingArtifactReplay(input, eventRepository, phase.attempts, session.id, { + repairAttemptUsed: phaseStart.repairAttemptUsed, + }); + } if ( session !== undefined && session.state !== "FAILED_NEEDS_HUMAN" && @@ -967,6 +995,112 @@ async function enterInitialPhase( throw cannotReplayPhase(input, phase.state); } +async function recoverAwaitingArtifactReplay( + input: CanonicalRunSingleFakePhaseInput, + eventRepository: RunEventRepository, + attempt: number, + sessionId: string, + options: { repairAttemptUsed: boolean }, +): Promise { + let recovery: ArtifactTimeoutRecoveryResult; + try { + recovery = await recoverFromArtifactTimeout(input, eventRepository, sessionId); + } catch (recoveryError) { + if (isRunStateChanged(recoveryError)) { + throw recoveryError; + } + if (shouldCreateHumanGate(recoveryError)) { + const gateError = toArtifactTimeoutRecoveryGateError(recoveryError); + await failPhaseAndRequestGate( + input, + eventRepository, + attempt, + "artifact_timeout_recovery_failed", + gateError.code, + { + errorCode: recoveryError.code, + expectedArtifactPath: input.expectedArtifactPath, + recoveryHint: gateError.recoveryHint, + }, + sessionId, + ); + throw gateError; + } + await failPhaseAndRun(input, eventRepository, attempt, "artifact_timeout_recovery_failed"); + throw recoveryError; + } + + if (!recovery.recovered || options.repairAttemptUsed) { + await failPhaseAndRequestGate( + input, + eventRepository, + attempt, + "artifact_timeout", + "artifact_timeout_exhausted", + { + expectedArtifactPath: input.expectedArtifactPath, + recoveryHint: recovery.recoveryHint ?? input.expectedArtifactPath, + }, + sessionId, + ); + throw new DevflowError("Artifact timeout recovery exhausted retry budget", { + class: "human_required", + code: "artifact_timeout_exhausted", + runId: input.runId, + phaseId: input.phaseId, + recoveryHint: recovery.recoveryHint ?? input.expectedArtifactPath, + }); + } + + const repairAttempt = await startPhaseAndRecord(input, eventRepository, ["awaiting_artifact"], { + reason: "artifact_timeout", + repair: true, + }); + try { + await removeStaleArtifact(input); + } catch (error) { + await failPhaseAndRun(input, eventRepository, repairAttempt, "stale_artifact_remove_failed"); + throw error; + } + return { + attempt: repairAttempt, + continueArtifactWait: false, + continueValidation: false, + handle: { sessionId }, + repairAttemptUsed: true, + resumedPrompt: false, + }; +} + +function isTimeoutRecoverySessionState(state: string): boolean { + return ["ARTIFACT_TIMEOUT", "HUNG", "CRASHED", "RESUMING", "REBOOTSTRAPPED"].includes(state); +} + +async function sessionRecoveredEventExists( + input: CanonicalRunSingleFakePhaseInput, + sessionId: string, +): Promise { + const events = await input.db + .select({ payload: runEvents.payload }) + .from(runEvents) + .where( + and( + eq(runEvents.runId, input.runId), + eq(runEvents.phaseId, input.phaseId), + eq(runEvents.type, "session.recovered"), + ), + ); + return events.some((event) => payloadSessionId(event.payload) === sessionId); +} + +function payloadSessionId(payload: unknown): string | undefined { + if (payload === null || typeof payload !== "object") { + return undefined; + } + const sessionId = (payload as Record).sessionId; + return typeof sessionId === "string" ? sessionId : undefined; +} + function cannotReplayPhase( input: CanonicalRunSingleFakePhaseInput, phaseState: string, @@ -1234,24 +1368,33 @@ async function failPhaseAndRequestGate( if (sessionId !== undefined && options.markSessionCrashed === true) { const [session] = await tx - .select({ recoveryAttempts: tuiSessions.recoveryAttempts }) + .select({ recoveryAttempts: tuiSessions.recoveryAttempts, state: tuiSessions.state }) .from(tuiSessions) .where(eq(tuiSessions.id, sessionId)); const recoveryAttempts = (session?.recoveryAttempts ?? 0) + 1; - await tx - .update(tuiSessions) - .set({ state: "CRASHED", recoveryAttempts }) - .where(eq(tuiSessions.id, sessionId)); - await eventRepository.appendInTransaction(tx, { - runId: input.runId, - phaseId: input.phaseId, - type: "session.crashed", - payload: { sessionId, roleId: input.roleId, recoveryAttempts }, - idempotencyKey: `session.crashed:${sessionId}:${recoveryAttempts}`, - }); + if (session !== undefined && isAllowedSessionTransition(session.state, "CRASHED")) { + await tx + .update(tuiSessions) + .set({ state: "CRASHED", recoveryAttempts }) + .where(eq(tuiSessions.id, sessionId)); + await eventRepository.appendInTransaction(tx, { + runId: input.runId, + phaseId: input.phaseId, + type: "session.crashed", + payload: { sessionId, roleId: input.roleId, recoveryAttempts }, + idempotencyKey: `session.crashed:${sessionId}:${recoveryAttempts}`, + }); + } } if (sessionId !== undefined) { + const [session] = await tx + .select({ state: tuiSessions.state }) + .from(tuiSessions) + .where(eq(tuiSessions.id, sessionId)); + if (session !== undefined) { + assertSessionTransition(session.state, "FAILED_NEEDS_HUMAN"); + } await tx .insert(tuiSessions) .values({ @@ -1425,10 +1568,11 @@ async function completePhaseAndRun( }); const [session] = await tx - .select({ recoveryAttempts: tuiSessions.recoveryAttempts }) + .select({ recoveryAttempts: tuiSessions.recoveryAttempts, state: tuiSessions.state }) .from(tuiSessions) .where(eq(tuiSessions.id, sessionId)); const recoveryAttempts = session?.recoveryAttempts ?? 0; + assertSessionStateAssignment(session?.state ?? "BUSY", "READY"); await tx.update(tuiSessions).set({ state: "READY" }).where(eq(tuiSessions.id, sessionId)); await eventRepository.appendInTransaction(tx, { runId: input.runId, @@ -1484,6 +1628,11 @@ async function requestWorkflowApproval( .update(runPhases) .set({ state: "awaiting_approval" }) .where(and(eq(runPhases.id, input.phaseId), eq(runPhases.runId, input.runId))); + const [session] = await tx + .select({ state: tuiSessions.state }) + .from(tuiSessions) + .where(eq(tuiSessions.id, sessionId)); + assertSessionStateAssignment(session?.state ?? "BUSY", "WAITING_FOR_APPROVAL"); await tx .update(tuiSessions) .set({ state: "WAITING_FOR_APPROVAL" }) @@ -1614,7 +1763,7 @@ async function startSessionAndRecord( cwd: input.worktreeRoot, expectedArtifactPath: input.expectedArtifactPath, expectedSchema: input.expectedSchema, - state: "BOOTSTRAPPING", + state: "CREATED", }) .onConflictDoNothing({ target: tuiSessions.id }); await eventRepository.appendInTransaction(tx, { @@ -1624,10 +1773,12 @@ async function startSessionAndRecord( payload: { sessionId: startedHandle.sessionId, roleId: input.roleId, backend: "fake" }, idempotencyKey: `session.created:${startedHandle.sessionId}`, }); + assertSessionTransition("CREATED", "BOOTSTRAPPING"); await tx .update(tuiSessions) .set({ state: "BOOTSTRAPPING" }) .where(eq(tuiSessions.id, startedHandle.sessionId)); + assertSessionTransition("BOOTSTRAPPING", "READY"); await tx .update(tuiSessions) .set({ state: "READY" }) @@ -1751,6 +1902,11 @@ async function resumeExistingSessionAndRecord( } await input.db.transaction(async (tx) => { await assertRunCanMutatePhaseInTransaction(input, tx); + const [currentSession] = await tx + .select({ state: tuiSessions.state }) + .from(tuiSessions) + .where(eq(tuiSessions.id, session.id)); + assertSessionStateAssignment(currentSession?.state ?? session.state, "READY"); await tx .update(tuiSessions) .set({ @@ -1873,6 +2029,11 @@ async function sendPromptAndRecord( : await artifactSignature(input.expectedArtifactPath); await input.db.transaction(async (tx) => { await assertRunCanMutatePhaseInTransaction(input, tx); + const [currentSession] = await tx + .select({ state: tuiSessions.state }) + .from(tuiSessions) + .where(eq(tuiSessions.id, handle.sessionId)); + assertSessionTransition(currentSession?.state ?? "READY", "BUSY"); await tx .update(tuiSessions) .set({ @@ -1972,19 +2133,7 @@ async function sendPromptWithRetry( handle: { sessionId: string }, envelope: PromptEnvelope, ): Promise<{ promptId: string }> { - let lastError: unknown; - for (let physicalAttempt = 0; physicalAttempt <= sendPromptRetryBudget; physicalAttempt += 1) { - try { - return await sessions.sendPrompt(handle, envelope); - } catch (error) { - lastError = error; - if (!(error instanceof DevflowError) || error.class !== "recoverable") { - throw error; - } - } - } - - throw lastError; + return retryRecoverable("sendPrompt", () => sessions.sendPrompt(handle, envelope)); } interface ArtifactOutcome { @@ -1994,6 +2143,13 @@ interface ArtifactOutcome { validation: ReturnType; } +function artifactInvalidRecoveryHint( + input: CanonicalRunSingleFakePhaseInput, + outcome: ArtifactOutcome, +): string { + return `artifact=${outcome.artifact.id};path=${input.expectedArtifactPath}`; +} + interface ArtifactRecord { id: string; phaseId: string | null; @@ -2020,7 +2176,8 @@ async function waitForAndValidateArtifact( if (!isDevflowErrorWithCode(error, "artifact_timeout_exhausted")) { throw error; } - await recordArtifactTimeout(input, eventRepository, attempt, sessionId); + const timedOutSessionState = await classifyTimedOutSession(input, sessionId); + await recordArtifactTimeout(input, eventRepository, attempt, sessionId, timedOutSessionState); throw error; } @@ -2087,6 +2244,7 @@ async function recordArtifactTimeout( eventRepository: RunEventRepository, attempt: number, sessionId: string, + sessionState: "ARTIFACT_TIMEOUT" | "HUNG" | "CRASHED", ) { await input.db.transaction(async (tx) => { await assertRunCanMutatePhaseInTransaction(input, tx); @@ -2098,16 +2256,60 @@ async function recordArtifactTimeout( path: input.expectedArtifactPath, schemaId: input.expectedSchema, attempt, + sessionState, }, idempotencyKey: `artifact.timeout:${input.phaseId}:${attempt}:${input.expectedArtifactPath}`, }); - await tx - .update(tuiSessions) - .set({ state: "ARTIFACT_TIMEOUT" }) + const [currentSession] = await tx + .select({ recoveryAttempts: tuiSessions.recoveryAttempts, state: tuiSessions.state }) + .from(tuiSessions) .where(eq(tuiSessions.id, sessionId)); + const currentState = currentSession?.state ?? "BUSY"; + assertSessionStateAssignment(currentState, sessionState); + if (currentState === sessionState) { + return; + } + if (sessionState === "CRASHED") { + const recoveryAttempts = (currentSession?.recoveryAttempts ?? 0) + 1; + await tx + .update(tuiSessions) + .set({ recoveryAttempts, state: "CRASHED" }) + .where(eq(tuiSessions.id, sessionId)); + await eventRepository.appendInTransaction(tx, { + runId: input.runId, + phaseId: input.phaseId, + type: "session.crashed", + payload: { sessionId, roleId: input.roleId, recoveryAttempts }, + idempotencyKey: `session.crashed:${sessionId}:${recoveryAttempts}`, + }); + return; + } + await tx.update(tuiSessions).set({ state: sessionState }).where(eq(tuiSessions.id, sessionId)); }); } +async function classifyTimedOutSession( + input: CanonicalRunSingleFakePhaseInput, + sessionId: string, +): Promise<"ARTIFACT_TIMEOUT" | "HUNG" | "CRASHED"> { + try { + const probe = await probeWithTypedError(input.sessions, { sessionId }); + if (!probe.alive || !probe.paneActive) { + return "CRASHED"; + } + return isSessionHung(probe.lastOutputAt, new Date(), input.recovery?.maxHungMs) + ? "HUNG" + : "ARTIFACT_TIMEOUT"; + } catch (error) { + // A transient probe failure should not be promoted to a crash classification, + // but fatal/unclassified probe failures must still fail the run. + if (error instanceof DevflowError && error.class === "recoverable") { + return "ARTIFACT_TIMEOUT"; + } + throw error; + } +} + async function recordArtifactValidation( input: CanonicalRunSingleFakePhaseInput, eventRepository: RunEventRepository, @@ -2232,6 +2434,11 @@ async function markSessionIdle( ) { await input.db.transaction(async (tx) => { await assertRunCanMutatePhaseInTransaction(input, tx); + const [currentSession] = await tx + .select({ state: tuiSessions.state }) + .from(tuiSessions) + .where(eq(tuiSessions.id, sessionId)); + assertSessionStateAssignment(currentSession?.state ?? "BUSY", "READY"); await tx.update(tuiSessions).set({ state: "READY" }).where(eq(tuiSessions.id, sessionId)); await eventRepository.appendInTransaction(tx, { runId: input.runId, @@ -2266,26 +2473,36 @@ async function recoverFromArtifactTimeout( input: CanonicalRunSingleFakePhaseInput, eventRepository: RunEventRepository, sessionId: string, -): Promise { - const probe = await probeWithTypedError(input.sessions, { sessionId }); - if (!probe.alive || !probe.paneActive || isBackendReadinessUnknown(probe)) { - return false; +): Promise { + const currentState = await sessionState(input, sessionId); + if (currentState === "READY") { + return { recovered: true }; } - await setSessionStateIfRunActive(input, sessionId, "RESUMING"); - const rebootstrapOk = await rebootstrapWithRetry(input.sessions, { sessionId }); - if (!rebootstrapOk) { - return false; + if (!["CRASHED", "RESUMING", "REBOOTSTRAPPED"].includes(currentState ?? "")) { + const probe = await probeWithTypedError(input.sessions, { sessionId }); + if (isBackendReadinessUnknown(probe)) { + return { + recovered: false, + recoveryHint: recoveryHintForProbe(probe), + }; + } + } + if (currentState !== "REBOOTSTRAPPED") { + await setSessionStateIfRunActive(input, sessionId, "RESUMING"); + + await rebootstrapWithRetry(input.sessions, { sessionId }); + await setSessionStateIfRunActive(input, sessionId, "REBOOTSTRAPPED"); } - await setSessionStateIfRunActive(input, sessionId, "REBOOTSTRAPPED"); await input.db.transaction(async (tx) => { await assertRunCanMutatePhaseInTransaction(input, tx); const [session] = await tx - .select({ recoveryAttempts: tuiSessions.recoveryAttempts }) + .select({ recoveryAttempts: tuiSessions.recoveryAttempts, state: tuiSessions.state }) .from(tuiSessions) .where(eq(tuiSessions.id, sessionId)); const recoveryAttempts = (session?.recoveryAttempts ?? 0) + 1; + assertSessionTransition(session?.state ?? "REBOOTSTRAPPED", "READY"); await tx .update(tuiSessions) .set({ state: "READY", recoveryAttempts }) @@ -2298,7 +2515,30 @@ async function recoverFromArtifactTimeout( idempotencyKey: `session.recovered:${sessionId}:${recoveryAttempts}`, }); }); - return true; + return { recovered: true }; +} + +interface ArtifactTimeoutRecoveryResult { + recovered: boolean; + recoveryHint?: string; +} + +function recoveryHintForProbe(probe: ProbeResult): string { + if (probe.hint !== undefined && probe.hint.length > 0) { + return probe.hint; + } + return `probe_alive=${probe.alive};pane_active=${probe.paneActive}`; +} + +async function sessionState( + input: CanonicalRunSingleFakePhaseInput, + sessionId: string, +): Promise { + const [session] = await input.db + .select({ state: tuiSessions.state }) + .from(tuiSessions) + .where(eq(tuiSessions.id, sessionId)); + return session?.state; } function isBackendReadinessUnknown(probe: ProbeResult): boolean { @@ -2312,6 +2552,13 @@ async function setSessionStateIfRunActive( ) { await input.db.transaction(async (tx) => { await assertRunCanMutatePhaseInTransaction(input, tx); + const [session] = await tx + .select({ state: tuiSessions.state }) + .from(tuiSessions) + .where(eq(tuiSessions.id, sessionId)); + if (session !== undefined) { + assertSessionStateAssignment(session.state, state); + } await tx.update(tuiSessions).set({ state }).where(eq(tuiSessions.id, sessionId)); }); } @@ -2337,46 +2584,31 @@ async function probeWithTypedError( async function rebootstrapWithRetry( sessions: SessionRuntime, handle: { sessionId: string }, -): Promise { - for (let attemptsRemaining = 2; attemptsRemaining > 0; attemptsRemaining -= 1) { - try { +): Promise { + try { + await retryRecoverable("rebootstrap", async () => { await sessions.rebootstrap(handle); - return true; - } catch (error) { - if (!(error instanceof DevflowError)) { - throw new DevflowError("Unclassified rebootstrap failure", { - class: "fatal", - code: "internal_state_corruption", - cause: error, - }); - } - if (error.class !== "recoverable") { - throw error; - } - // Retry budget is intentionally one rebootstrap retry after the first failure. + }); + } catch (error) { + if (!(error instanceof DevflowError)) { + throw new DevflowError("Unclassified rebootstrap failure", { + class: "fatal", + code: "internal_state_corruption", + cause: error, + }); } + if (error.class !== "recoverable") { + throw error; + } + throw error; } - - return false; } async function resumeWithRetry( sessions: SessionRuntime, handle: { sessionId: string }, ): Promise { - let lastError: unknown; - for (let physicalAttempt = 0; physicalAttempt <= 2; physicalAttempt += 1) { - try { - return await sessions.resume(handle); - } catch (error) { - lastError = error; - if (!(error instanceof DevflowError) || error.class !== "recoverable") { - throw error; - } - } - } - - throw lastError; + return retryRecoverable("resume", () => sessions.resume(handle)); } async function markSessionFailedNeedsHuman( @@ -2384,6 +2616,13 @@ async function markSessionFailedNeedsHuman( eventRepository: RunEventRepository, sessionId: string, ) { + const [existingSession] = await input.db + .select({ state: tuiSessions.state }) + .from(tuiSessions) + .where(eq(tuiSessions.id, sessionId)); + if (existingSession !== undefined) { + assertSessionStateAssignment(existingSession.state, "FAILED_NEEDS_HUMAN"); + } await input.db .insert(tuiSessions) .values({ @@ -2416,12 +2655,15 @@ async function markAllSessionsFailedInTransaction( runId: string, ): Promise { const sessions = await tx - .select({ id: tuiSessions.id, roleId: tuiSessions.roleId }) + .select({ id: tuiSessions.id, roleId: tuiSessions.roleId, state: tuiSessions.state }) .from(tuiSessions) .where(eq(tuiSessions.runId, runId)); if (sessions.length === 0) { return []; } + for (const session of sessions) { + assertSessionStateAssignment(session.state, "FAILED_NEEDS_HUMAN"); + } await tx .update(tuiSessions) @@ -2684,7 +2926,7 @@ function shouldCreateHumanGate(error: unknown): error is DevflowError { function toHumanRequiredRecoveryError(error: DevflowError): DevflowError { if (error.class === "human_required") { - return error; + return ensureRecoveryHint(error); } const options: ConstructorParameters[1] = { @@ -2708,7 +2950,7 @@ function toHumanRequiredRecoveryError(error: DevflowError): DevflowError { function toArtifactTimeoutRecoveryGateError(error: DevflowError): DevflowError { if (error.class === "human_required") { - return error; + return ensureRecoveryHint(error); } const options: ConstructorParameters[1] = { @@ -2727,6 +2969,26 @@ function toArtifactTimeoutRecoveryGateError(error: DevflowError): DevflowError { return new DevflowError("Artifact timeout recovery exhausted retry budget", options); } +function ensureRecoveryHint(error: DevflowError): DevflowError { + if (error.recoveryHint !== undefined && error.recoveryHint.length > 0) { + return error; + } + + const options: ConstructorParameters[1] = { + class: error.class, + code: error.code, + recoveryHint: error.message, + cause: error.cause, + }; + if (error.runId !== undefined) { + options.runId = error.runId; + } + if (error.phaseId !== undefined) { + options.phaseId = error.phaseId; + } + return new DevflowError(error.message, options); +} + async function removeStaleArtifact(input: CanonicalRunSingleFakePhaseInput): Promise { try { await unlink(input.expectedArtifactPath); diff --git a/packages/session/src/index.ts b/packages/session/src/index.ts index 25ea150..7ffc782 100644 --- a/packages/session/src/index.ts +++ b/packages/session/src/index.ts @@ -1,5 +1,6 @@ export * from "./adapter.js"; export * from "./fake.js"; export * from "./manager.js"; +export * from "./recovery.js"; export * from "./transcript.js"; export * from "./tmux.js"; diff --git a/packages/session/src/manager.ts b/packages/session/src/manager.ts index 3bad6a3..3423e93 100644 --- a/packages/session/src/manager.ts +++ b/packages/session/src/manager.ts @@ -19,6 +19,7 @@ import type { TranscriptBaseline, TranscriptChunk, } from "./adapter.js"; +import { assertSessionTransition, retryRecoverable } from "./recovery.js"; import { captureAndPersistTranscript } from "./transcript.js"; type Database = DbClient["db"]; @@ -298,6 +299,12 @@ export class SessionManager implements SessionRuntime { if (this.db === undefined || !["CREATED", "BOOTSTRAPPING"].includes(session.state)) { return; } + if (session.state === "CREATED") { + assertSessionTransition("CREATED", "BOOTSTRAPPING"); + assertSessionTransition("BOOTSTRAPPING", "READY"); + } else { + assertSessionTransition(session.state, "READY"); + } const eventRepository = new RunEventRepository(this.db); const sessionUpdate: { @@ -345,6 +352,7 @@ export class SessionManager implements SessionRuntime { backend: string; cwd: string; recoveryAttempts: number; + state: string; }, error: unknown, ): Promise { @@ -357,6 +365,7 @@ export class SessionManager implements SessionRuntime { const gateKey = "session_recovery_required"; const approvalIdempotencyKey = `${session.runId}:${gateKey}:${session.id}:${recoveryAttempts}`; const pauseCause = `session_recovery_failed:${session.id}:${recoveryAttempts}`; + assertSessionTransition(session.state, "FAILED_NEEDS_HUMAN"); await this.db.transaction(async (tx) => { await tx.execute(sql`SELECT 1 FROM ${runs} WHERE ${runs.id} = ${session.runId} FOR UPDATE`); const [run] = await tx @@ -431,18 +440,7 @@ export class SessionManager implements SessionRuntime { } private async resumeWithRetry(handle: SessionHandle): Promise { - let lastError: unknown; - for (let attempt = 0; attempt <= 2; attempt += 1) { - try { - return await this.track(this.adapter.resume(handle)); - } catch (error) { - lastError = error; - if (!(error instanceof DevflowError) || error.class !== "recoverable") { - throw error; - } - } - } - throw lastError; + return retryRecoverable("resume", () => this.track(this.adapter.resume(handle))); } private async loadTranscriptBaseline( diff --git a/packages/session/src/recovery.test.ts b/packages/session/src/recovery.test.ts new file mode 100644 index 0000000..50e3f42 --- /dev/null +++ b/packages/session/src/recovery.test.ts @@ -0,0 +1,103 @@ +import { describe, expect, it } from "vitest"; + +import { DevflowError } from "@devflow/core"; + +import { + SessionRecoveryBudget, + assertSessionStateAssignment, + assertSessionTransition, + isSessionHung, + retryRecoverable, +} from "./recovery.js"; + +describe("session recovery policy", () => { + it("allows only locked session state-machine transitions", () => { + expect(() => assertSessionTransition("CREATED", "BOOTSTRAPPING")).not.toThrow(); + expect(() => assertSessionTransition("BOOTSTRAPPING", "READY")).not.toThrow(); + expect(() => assertSessionTransition("READY", "BUSY")).not.toThrow(); + expect(() => assertSessionTransition("BUSY", "READY")).not.toThrow(); + expect(() => assertSessionTransition("BUSY", "ARTIFACT_TIMEOUT")).not.toThrow(); + expect(() => assertSessionTransition("ARTIFACT_TIMEOUT", "RESUMING")).not.toThrow(); + expect(() => assertSessionTransition("RESUMING", "REBOOTSTRAPPED")).not.toThrow(); + expect(() => assertSessionTransition("REBOOTSTRAPPED", "READY")).not.toThrow(); + + expect(() => assertSessionTransition("READY", "REBOOTSTRAPPED")).toThrow( + /Invalid session state transition/, + ); + expect(() => assertSessionTransition("CRASHED", "CRASHED")).toThrow( + /Invalid session state transition/, + ); + expect(() => assertSessionTransition("FAILED_NEEDS_HUMAN", "READY")).toThrow( + /Invalid session state transition/, + ); + }); + + it("allows no-op state assignment without treating it as a transition", () => { + expect(() => assertSessionStateAssignment("READY", "READY")).not.toThrow(); + expect(() => assertSessionStateAssignment("BUSY", "READY")).not.toThrow(); + expect(() => assertSessionStateAssignment("FAILED_NEEDS_HUMAN", "READY")).toThrow( + /Invalid session state assignment/, + ); + }); + + it("retries recoverable errors for one initial prompt send plus two retries", async () => { + let attempts = 0; + const result = await retryRecoverable("sendPrompt", async () => { + attempts += 1; + if (attempts < SessionRecoveryBudget.sendPrompt.physicalAttempts) { + throw new DevflowError("temporary prompt failure", { + class: "recoverable", + code: "prompt_send_transient", + }); + } + return "sent"; + }); + + expect(result).toBe("sent"); + expect(attempts).toBe(3); + }); + + it("throws the final recoverable error after the retry budget is exhausted", async () => { + let attempts = 0; + await expect( + retryRecoverable("rebootstrap", async () => { + attempts += 1; + throw new DevflowError("pane briefly unresponsive", { + class: "recoverable", + code: "pane_briefly_unresponsive", + }); + }), + ).rejects.toMatchObject({ + class: "recoverable", + code: "pane_briefly_unresponsive", + }); + + expect(attempts).toBe(SessionRecoveryBudget.rebootstrap.physicalAttempts); + }); + + it("does not retry human-required or fatal errors", async () => { + let attempts = 0; + await expect( + retryRecoverable("resume", async () => { + attempts += 1; + throw new DevflowError("operator action required", { + class: "human_required", + code: "session_recovery_required", + }); + }), + ).rejects.toMatchObject({ + class: "human_required", + code: "session_recovery_required", + }); + + expect(attempts).toBe(1); + }); + + it("uses the locked hung-session timeout boundary", () => { + const now = new Date("2026-05-13T10:20:00.000Z"); + + expect(isSessionHung(new Date("2026-05-13T10:00:00.000Z"), now)).toBe(true); + expect(isSessionHung(new Date("2026-05-13T10:00:01.000Z"), now)).toBe(false); + expect(isSessionHung(undefined, now)).toBe(false); + }); +}); diff --git a/packages/session/src/recovery.ts b/packages/session/src/recovery.ts new file mode 100644 index 0000000..fa66ff1 --- /dev/null +++ b/packages/session/src/recovery.ts @@ -0,0 +1,127 @@ +import { DevflowError, SessionState, type SessionState as SessionStateName } from "@devflow/core"; + +export const SessionRecoveryBudget = Object.freeze({ + sendPrompt: Object.freeze({ retries: 2, physicalAttempts: 3 }), + resume: Object.freeze({ retries: 2, physicalAttempts: 3 }), + rebootstrap: Object.freeze({ retries: 1, physicalAttempts: 2 }), + artifactRepair: Object.freeze({ retries: 1, physicalAttempts: 2 }), + maxHungMs: 20 * 60 * 1000, +}); + +export type SessionRetryOperation = "sendPrompt" | "resume" | "rebootstrap" | "artifactRepair"; + +const allowedSessionTransitions: ReadonlyMap< + SessionStateName, + ReadonlySet +> = new Map([ + ["CREATED", new Set(["BOOTSTRAPPING", "FAILED_NEEDS_HUMAN"])], + ["BOOTSTRAPPING", new Set(["READY", "FAILED_NEEDS_HUMAN"])], + ["READY", new Set(["BUSY", "FAILED_NEEDS_HUMAN"])], + [ + "BUSY", + new Set([ + "READY", + "WAITING_FOR_APPROVAL", + "ARTIFACT_TIMEOUT", + "HUNG", + "CRASHED", + "FAILED_NEEDS_HUMAN", + ]), + ], + ["WAITING_FOR_APPROVAL", new Set(["READY", "FAILED_NEEDS_HUMAN"])], + ["ARTIFACT_TIMEOUT", new Set(["RESUMING", "FAILED_NEEDS_HUMAN"])], + ["HUNG", new Set(["RESUMING", "FAILED_NEEDS_HUMAN"])], + ["CRASHED", new Set(["RESUMING", "FAILED_NEEDS_HUMAN"])], + ["RESUMING", new Set(["READY", "REBOOTSTRAPPED", "FAILED_NEEDS_HUMAN"])], + ["REBOOTSTRAPPED", new Set(["READY", "FAILED_NEEDS_HUMAN"])], + ["FAILED_NEEDS_HUMAN", new Set()], +]); + +export function isAllowedSessionTransition(from: string, to: string): boolean { + const parsedFrom = SessionState.safeParse(from); + const parsedTo = SessionState.safeParse(to); + if (!parsedFrom.success || !parsedTo.success) { + return false; + } + return allowedSessionTransitions.get(parsedFrom.data)?.has(parsedTo.data) ?? false; +} + +export function isAllowedSessionStateAssignment(from: string, to: string): boolean { + const parsedFrom = SessionState.safeParse(from); + const parsedTo = SessionState.safeParse(to); + if (!parsedFrom.success || !parsedTo.success) { + return false; + } + if (parsedFrom.data === parsedTo.data) { + return true; + } + return isAllowedSessionTransition(parsedFrom.data, parsedTo.data); +} + +export function assertSessionTransition(from: string, to: string): void { + if (isAllowedSessionTransition(from, to)) { + return; + } + + throw new DevflowError("Invalid session state transition", { + class: "fatal", + code: "internal_state_corruption", + recoveryHint: `${from}->${to}`, + }); +} + +export function assertSessionStateAssignment(from: string, to: string): void { + if (isAllowedSessionStateAssignment(from, to)) { + return; + } + + throw new DevflowError("Invalid session state assignment", { + class: "fatal", + code: "internal_state_corruption", + recoveryHint: `${from}->${to}`, + }); +} + +export async function retryRecoverable( + operation: SessionRetryOperation, + run: (physicalAttempt: number) => Promise, +): Promise { + const physicalAttempts = recoveryPhysicalAttempts(operation); + let lastError: unknown; + for (let physicalAttempt = 1; physicalAttempt <= physicalAttempts; physicalAttempt += 1) { + try { + return await run(physicalAttempt); + } catch (error) { + lastError = error; + if (!(error instanceof DevflowError) || error.class !== "recoverable") { + throw error; + } + } + } + + throw lastError; +} + +export function isSessionHung( + lastOutputAt: Date | undefined, + now: Date, + maxHungMs = SessionRecoveryBudget.maxHungMs, +): boolean { + if (lastOutputAt === undefined) { + return false; + } + return now.getTime() - lastOutputAt.getTime() >= maxHungMs; +} + +function recoveryPhysicalAttempts(operation: SessionRetryOperation): number { + switch (operation) { + case "sendPrompt": + return SessionRecoveryBudget.sendPrompt.physicalAttempts; + case "resume": + return SessionRecoveryBudget.resume.physicalAttempts; + case "rebootstrap": + return SessionRecoveryBudget.rebootstrap.physicalAttempts; + case "artifactRepair": + return SessionRecoveryBudget.artifactRepair.physicalAttempts; + } +} diff --git a/packages/workflows/src/activities.ts b/packages/workflows/src/activities.ts index 64fedc3..799894d 100644 --- a/packages/workflows/src/activities.ts +++ b/packages/workflows/src/activities.ts @@ -14,6 +14,9 @@ export interface DevflowActivityDependencies { workspaceRoot: string; availableBackends?: readonly BackendConfig[]; maxConcurrentRuns?: number; + recovery?: { + maxHungMs?: number; + }; wait?: { timeoutMs?: number; pollIntervalMs?: number; @@ -50,6 +53,7 @@ export function createDevflowActivities( ...(dependencies.maxConcurrentRuns === undefined ? {} : { maxConcurrentRuns: dependencies.maxConcurrentRuns }), + ...(dependencies.recovery === undefined ? {} : { recovery: dependencies.recovery }), ...(activityWait === undefined ? {} : { wait: activityWait }), }); };