feat: add real tmux session manager
This commit is contained in:
@@ -15,7 +15,7 @@ import { join, resolve } from "node:path";
|
||||
import { and, eq, inArray } from "drizzle-orm";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
|
||||
import { loadPersonaFiles, loadTemplateFiles, validateArtifact } from "@devflow/core";
|
||||
import { DevflowError, loadPersonaFiles, loadTemplateFiles, validateArtifact } from "@devflow/core";
|
||||
import {
|
||||
type DbClient,
|
||||
agentPersonas,
|
||||
@@ -30,7 +30,14 @@ import {
|
||||
tuiSessions,
|
||||
workflowTemplates,
|
||||
} from "@devflow/db";
|
||||
import { FakeSessionAdapter, type SessionAdapter, SessionManager } from "@devflow/session";
|
||||
import {
|
||||
FakeSessionAdapter,
|
||||
type SessionAdapter,
|
||||
type SessionHandle,
|
||||
SessionManager,
|
||||
type SessionRuntime,
|
||||
type TranscriptChunk,
|
||||
} from "@devflow/session";
|
||||
|
||||
import { DbRunEngine, sweepM4ProcessRestart } from "./engine.js";
|
||||
|
||||
@@ -94,6 +101,110 @@ class DisposeCountingFakeAdapter extends FakeSessionAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
class DisposeFailsFakeAdapter extends FakeSessionAdapter {
|
||||
override async dispose(handle: Parameters<FakeSessionAdapter["dispose"]>[0]): Promise<void> {
|
||||
throw new DevflowError("dispose failed", {
|
||||
class: "recoverable",
|
||||
code: "pane_briefly_unresponsive",
|
||||
recoveryHint: `session=${handle.sessionId}`,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class CaptureOrderingFakeAdapter extends FakeSessionAdapter {
|
||||
events: string[] = [];
|
||||
failCapture = false;
|
||||
|
||||
override async *capture(
|
||||
handle: Parameters<FakeSessionAdapter["capture"]>[0],
|
||||
fromSeq: bigint,
|
||||
): AsyncIterable<TranscriptChunk> {
|
||||
this.events.push("capture");
|
||||
if (this.failCapture) {
|
||||
throw new DevflowError("transcript capture failed", {
|
||||
class: "recoverable",
|
||||
code: "pane_briefly_unresponsive",
|
||||
});
|
||||
}
|
||||
yield* super.capture(handle, fromSeq);
|
||||
}
|
||||
|
||||
override async dispose(handle: Parameters<FakeSessionAdapter["dispose"]>[0]): Promise<void> {
|
||||
this.events.push("dispose");
|
||||
await super.dispose(handle);
|
||||
}
|
||||
}
|
||||
|
||||
class CaptureFailsAfterDisposeFakeAdapter extends FakeSessionAdapter {
|
||||
readonly disposedSessionIds = new Set<string>();
|
||||
readonly events: string[] = [];
|
||||
|
||||
override async *capture(
|
||||
handle: Parameters<FakeSessionAdapter["capture"]>[0],
|
||||
fromSeq: bigint,
|
||||
): AsyncIterable<TranscriptChunk> {
|
||||
this.events.push("capture");
|
||||
if (this.disposedSessionIds.has(handle.sessionId)) {
|
||||
throw new DevflowError("tmux session already disposed", {
|
||||
class: "recoverable",
|
||||
code: "pane_briefly_unresponsive",
|
||||
recoveryHint: `session=${handle.sessionId}`,
|
||||
});
|
||||
}
|
||||
yield* super.capture(handle, fromSeq);
|
||||
}
|
||||
|
||||
override async dispose(handle: Parameters<FakeSessionAdapter["dispose"]>[0]): Promise<void> {
|
||||
this.events.push("dispose");
|
||||
this.disposedSessionIds.add(handle.sessionId);
|
||||
await super.dispose(handle);
|
||||
}
|
||||
}
|
||||
|
||||
class TerminalHandleRecordingRuntime implements SessionRuntime {
|
||||
readonly adapter = new FakeSessionAdapter({ writeDelayMs: 0 });
|
||||
readonly captureHandles: SessionHandle[] = [];
|
||||
readonly disposeHandles: SessionHandle[] = [];
|
||||
|
||||
trackOperation<T>(operation: Promise<T>): Promise<T> {
|
||||
return operation;
|
||||
}
|
||||
|
||||
start(...args: Parameters<SessionRuntime["start"]>): ReturnType<SessionRuntime["start"]> {
|
||||
return this.adapter.start(...args);
|
||||
}
|
||||
|
||||
sendPrompt(
|
||||
...args: Parameters<SessionRuntime["sendPrompt"]>
|
||||
): ReturnType<SessionRuntime["sendPrompt"]> {
|
||||
return this.adapter.sendPrompt(...args);
|
||||
}
|
||||
|
||||
probe(...args: Parameters<SessionRuntime["probe"]>): ReturnType<SessionRuntime["probe"]> {
|
||||
return this.adapter.probe(...args);
|
||||
}
|
||||
|
||||
resume(...args: Parameters<SessionRuntime["resume"]>): ReturnType<SessionRuntime["resume"]> {
|
||||
return this.adapter.resume(...args);
|
||||
}
|
||||
|
||||
rebootstrap(
|
||||
...args: Parameters<SessionRuntime["rebootstrap"]>
|
||||
): ReturnType<SessionRuntime["rebootstrap"]> {
|
||||
return this.adapter.rebootstrap(...args);
|
||||
}
|
||||
|
||||
async *capture(handle: SessionHandle, fromSeq: bigint): ReturnType<SessionRuntime["capture"]> {
|
||||
this.captureHandles.push(handle);
|
||||
yield* this.adapter.capture(handle, fromSeq);
|
||||
}
|
||||
|
||||
async dispose(handle: SessionHandle): Promise<void> {
|
||||
this.disposeHandles.push(handle);
|
||||
await this.adapter.dispose(handle);
|
||||
}
|
||||
}
|
||||
|
||||
describe("DbRunEngine", () => {
|
||||
let client: DbClient | undefined;
|
||||
const runIds: string[] = [];
|
||||
@@ -857,6 +968,43 @@ describe("DbRunEngine", () => {
|
||||
expect(sessions.every((session) => session.state === "FAILED_NEEDS_HUMAN")).toBe(true);
|
||||
});
|
||||
|
||||
it("repairs final reports when direct advance sees a terminalized fatal phase", async () => {
|
||||
client = createDbClient(databaseUrl);
|
||||
await seedDevelopmentRegistry(client.db);
|
||||
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
|
||||
const repoPath = createGitRepo();
|
||||
tempRoots.push(workspaceRoot, repoPath);
|
||||
const engine = new DbRunEngine({
|
||||
db: client.db,
|
||||
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
|
||||
workspaceRoot,
|
||||
maxConcurrentRuns: 100,
|
||||
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
|
||||
});
|
||||
|
||||
const { runId } = await engine.startRun({
|
||||
requirementsMd: "Direct advance should repair terminal reports.",
|
||||
repoPath,
|
||||
baseBranch: "main",
|
||||
scenarios: {
|
||||
phase_plan: "unknown-schema",
|
||||
},
|
||||
});
|
||||
runIds.push(runId);
|
||||
const specApproval = pendingApproval(await engine.getStatus(runId), "spec_approved");
|
||||
await engine.signalApproval(runId, specApproval.id, "approve", randomUUID());
|
||||
const phasePlanApproval = pendingApproval(await engine.getStatus(runId), "phase_plan_approved");
|
||||
await engine.signalApprovalForWorkflow(runId, phasePlanApproval.id, "approve", randomUUID());
|
||||
|
||||
await expect(engine.advanceRunUntilBlocked(runId)).rejects.toMatchObject({
|
||||
code: "fake_fixture_missing",
|
||||
});
|
||||
|
||||
const failed = await engine.getStatus(runId);
|
||||
expect(failed.run.state).toBe("failed");
|
||||
expect(failed.run.finalReportPath).toMatch(/\.report\.md$/);
|
||||
});
|
||||
|
||||
it("does not start another pending phase when approval replay sees active work", async () => {
|
||||
client = createDbClient(databaseUrl);
|
||||
await seedDevelopmentRegistry(client.db);
|
||||
@@ -1061,6 +1209,278 @@ describe("DbRunEngine", () => {
|
||||
expect((await engine.getStatus(runId)).run.state).toBe("aborted");
|
||||
});
|
||||
|
||||
it("surfaces session dispose failures during abort", async () => {
|
||||
client = createDbClient(databaseUrl);
|
||||
await seedDevelopmentRegistry(client.db);
|
||||
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
|
||||
const repoPath = createGitRepo();
|
||||
tempRoots.push(workspaceRoot, repoPath);
|
||||
const engine = new DbRunEngine({
|
||||
db: client.db,
|
||||
sessions: sessionRuntime(client.db, new DisposeFailsFakeAdapter({ writeDelayMs: 0 })),
|
||||
workspaceRoot,
|
||||
maxConcurrentRuns: 100,
|
||||
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
|
||||
});
|
||||
|
||||
const { runId } = await engine.startRun({
|
||||
requirementsMd: "Abort while waiting for approval.",
|
||||
repoPath,
|
||||
baseBranch: "main",
|
||||
});
|
||||
runIds.push(runId);
|
||||
|
||||
await expect(engine.abortRun(runId, "user requested abort")).rejects.toMatchObject({
|
||||
code: "pane_briefly_unresponsive",
|
||||
});
|
||||
|
||||
const aborted = await engine.getStatus(runId);
|
||||
expect(aborted.run.state).toBe("aborted");
|
||||
const [run] = await client.db
|
||||
.select({ finalReportPath: runs.finalReportPath })
|
||||
.from(runs)
|
||||
.where(eq(runs.id, runId));
|
||||
expect(run?.finalReportPath).toMatch(/\.report\.md$/);
|
||||
});
|
||||
|
||||
it("captures terminal session transcripts before abort disposal", async () => {
|
||||
client = createDbClient(databaseUrl);
|
||||
await seedDevelopmentRegistry(client.db);
|
||||
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
|
||||
const repoPath = createGitRepo();
|
||||
tempRoots.push(workspaceRoot, repoPath);
|
||||
const adapter = new CaptureOrderingFakeAdapter({ writeDelayMs: 0 });
|
||||
const engine = new DbRunEngine({
|
||||
db: client.db,
|
||||
sessions: sessionRuntime(client.db, adapter),
|
||||
workspaceRoot,
|
||||
maxConcurrentRuns: 100,
|
||||
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
|
||||
});
|
||||
|
||||
const { runId } = await engine.startRun({
|
||||
requirementsMd: "Abort after capturing terminal transcript.",
|
||||
repoPath,
|
||||
baseBranch: "main",
|
||||
});
|
||||
runIds.push(runId);
|
||||
adapter.events.length = 0;
|
||||
|
||||
await engine.abortRun(runId, "user requested abort");
|
||||
|
||||
expect(adapter.events).toEqual(["capture", "dispose"]);
|
||||
const [session] = await client.db
|
||||
.select({ lastCaptureSeq: tuiSessions.lastCaptureSeq })
|
||||
.from(tuiSessions)
|
||||
.where(eq(tuiSessions.runId, runId));
|
||||
expect(session?.lastCaptureSeq).toBeGreaterThan(0n);
|
||||
});
|
||||
|
||||
it("retries terminal approval cleanup idempotently when a decision is replayed", async () => {
|
||||
client = createDbClient(databaseUrl);
|
||||
await seedDevelopmentRegistry(client.db);
|
||||
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
|
||||
const repoPath = createGitRepo();
|
||||
tempRoots.push(workspaceRoot, repoPath);
|
||||
const adapter = new CaptureFailsAfterDisposeFakeAdapter({ writeDelayMs: 0 });
|
||||
const engine = new DbRunEngine({
|
||||
db: client.db,
|
||||
sessions: sessionRuntime(client.db, adapter),
|
||||
workspaceRoot,
|
||||
maxConcurrentRuns: 100,
|
||||
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
|
||||
});
|
||||
|
||||
const { runId } = await engine.startRun({
|
||||
requirementsMd: "Replay a terminal approval decision.",
|
||||
repoPath,
|
||||
baseBranch: "main",
|
||||
});
|
||||
runIds.push(runId);
|
||||
const approvalId = (await engine.getStatus(runId)).approvals[0]?.id;
|
||||
expect(approvalId).toBeDefined();
|
||||
if (approvalId === undefined) {
|
||||
throw new Error("approval id missing");
|
||||
}
|
||||
const clientToken = randomUUID();
|
||||
adapter.events.length = 0;
|
||||
|
||||
await engine.signalApproval(runId, approvalId, "reject", clientToken);
|
||||
expect(adapter.events).toEqual(["capture", "dispose"]);
|
||||
adapter.events.length = 0;
|
||||
|
||||
await expect(
|
||||
engine.signalApproval(runId, approvalId, "reject", clientToken),
|
||||
).resolves.toBeUndefined();
|
||||
expect(adapter.events).toEqual(["capture", "dispose"]);
|
||||
expect((await engine.getStatus(runId)).run.state).toBe("failed");
|
||||
});
|
||||
|
||||
it("uses persisted tmux handles when capturing and disposing terminal sessions", async () => {
|
||||
client = createDbClient(databaseUrl);
|
||||
await seedDevelopmentRegistry(client.db);
|
||||
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
|
||||
const repoPath = createGitRepo();
|
||||
tempRoots.push(workspaceRoot, repoPath);
|
||||
const sessions = new TerminalHandleRecordingRuntime();
|
||||
const engine = new DbRunEngine({
|
||||
db: client.db,
|
||||
sessions,
|
||||
workspaceRoot,
|
||||
maxConcurrentRuns: 100,
|
||||
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
|
||||
});
|
||||
|
||||
const { runId } = await engine.startRun({
|
||||
requirementsMd: "Abort after persisting tmux handle fields.",
|
||||
repoPath,
|
||||
baseBranch: "main",
|
||||
});
|
||||
runIds.push(runId);
|
||||
await client.db
|
||||
.update(tuiSessions)
|
||||
.set({
|
||||
lastCaptureSeq: 1n,
|
||||
lastKnownPanePid: 777,
|
||||
tmuxSession: "persisted-session",
|
||||
tmuxWindow: "persisted-window",
|
||||
})
|
||||
.where(eq(tuiSessions.runId, runId));
|
||||
|
||||
await engine.abortRun(runId, "user requested abort");
|
||||
|
||||
expect(sessions.captureHandles).toContainEqual({
|
||||
sessionId: expect.any(String),
|
||||
pid: 777,
|
||||
transcriptBaseline: {
|
||||
startSeq: 1n,
|
||||
lines: expect.arrayContaining([expect.any(String)]),
|
||||
},
|
||||
tmuxSession: "persisted-session",
|
||||
tmuxWindow: "persisted-window",
|
||||
});
|
||||
expect(sessions.disposeHandles).toContainEqual({
|
||||
sessionId: expect.any(String),
|
||||
pid: 777,
|
||||
tmuxSession: "persisted-session",
|
||||
tmuxWindow: "persisted-window",
|
||||
});
|
||||
});
|
||||
|
||||
it("attempts disposal when transcript capture fails during cleanup", async () => {
|
||||
client = createDbClient(databaseUrl);
|
||||
await seedDevelopmentRegistry(client.db);
|
||||
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
|
||||
const repoPath = createGitRepo();
|
||||
tempRoots.push(workspaceRoot, repoPath);
|
||||
const adapter = new CaptureOrderingFakeAdapter({ writeDelayMs: 0 });
|
||||
const engine = new DbRunEngine({
|
||||
db: client.db,
|
||||
sessions: sessionRuntime(client.db, adapter),
|
||||
workspaceRoot,
|
||||
maxConcurrentRuns: 100,
|
||||
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
|
||||
});
|
||||
|
||||
const { runId } = await engine.startRun({
|
||||
requirementsMd: "Abort with failed transcript capture.",
|
||||
repoPath,
|
||||
baseBranch: "main",
|
||||
});
|
||||
runIds.push(runId);
|
||||
adapter.events.length = 0;
|
||||
adapter.failCapture = true;
|
||||
|
||||
await expect(engine.abortRun(runId, "user requested abort")).rejects.toMatchObject({
|
||||
code: "pane_briefly_unresponsive",
|
||||
});
|
||||
|
||||
expect(adapter.events).toEqual(["capture", "dispose"]);
|
||||
const [run] = await client.db
|
||||
.select({ finalReportPath: runs.finalReportPath, state: runs.state })
|
||||
.from(runs)
|
||||
.where(eq(runs.id, runId));
|
||||
expect(run).toMatchObject({ state: "aborted" });
|
||||
expect(run?.finalReportPath).toMatch(/\.report\.md$/);
|
||||
|
||||
adapter.events.length = 0;
|
||||
adapter.failCapture = false;
|
||||
await engine.abortRun(runId, "retry abort cleanup");
|
||||
expect(adapter.events).toEqual(["capture", "dispose"]);
|
||||
});
|
||||
|
||||
it("writes a failed final report before surfacing approval reject dispose failures", async () => {
|
||||
client = createDbClient(databaseUrl);
|
||||
await seedDevelopmentRegistry(client.db);
|
||||
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
|
||||
const repoPath = createGitRepo();
|
||||
tempRoots.push(workspaceRoot, repoPath);
|
||||
const engine = new DbRunEngine({
|
||||
db: client.db,
|
||||
sessions: sessionRuntime(client.db, new DisposeFailsFakeAdapter({ writeDelayMs: 0 })),
|
||||
workspaceRoot,
|
||||
maxConcurrentRuns: 100,
|
||||
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
|
||||
});
|
||||
|
||||
const { runId } = await engine.startRun({
|
||||
requirementsMd: "Reject while waiting for approval.",
|
||||
repoPath,
|
||||
baseBranch: "main",
|
||||
});
|
||||
runIds.push(runId);
|
||||
const request = pendingApproval(await engine.getStatus(runId), "spec_approved");
|
||||
|
||||
await expect(
|
||||
engine.signalApproval(runId, request.id, "reject", randomUUID()),
|
||||
).rejects.toMatchObject({
|
||||
code: "pane_briefly_unresponsive",
|
||||
});
|
||||
|
||||
const [run] = await client.db
|
||||
.select({ finalReportPath: runs.finalReportPath, state: runs.state })
|
||||
.from(runs)
|
||||
.where(eq(runs.id, runId));
|
||||
expect(run?.state).toBe("failed");
|
||||
expect(run?.finalReportPath).toMatch(/\.report\.md$/);
|
||||
});
|
||||
|
||||
it("writes a failed final report before surfacing workflow approval reject dispose failures", async () => {
|
||||
client = createDbClient(databaseUrl);
|
||||
await seedDevelopmentRegistry(client.db);
|
||||
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
|
||||
const repoPath = createGitRepo();
|
||||
tempRoots.push(workspaceRoot, repoPath);
|
||||
const engine = new DbRunEngine({
|
||||
db: client.db,
|
||||
sessions: sessionRuntime(client.db, new DisposeFailsFakeAdapter({ writeDelayMs: 0 })),
|
||||
workspaceRoot,
|
||||
maxConcurrentRuns: 100,
|
||||
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
|
||||
});
|
||||
|
||||
const { runId } = await engine.startRun({
|
||||
requirementsMd: "Reject through workflow approval path.",
|
||||
repoPath,
|
||||
baseBranch: "main",
|
||||
});
|
||||
runIds.push(runId);
|
||||
const request = pendingApproval(await engine.getStatus(runId), "spec_approved");
|
||||
|
||||
await expect(
|
||||
engine.signalApprovalForWorkflow(runId, request.id, "reject", randomUUID()),
|
||||
).rejects.toMatchObject({
|
||||
code: "pane_briefly_unresponsive",
|
||||
});
|
||||
|
||||
const [run] = await client.db
|
||||
.select({ finalReportPath: runs.finalReportPath, state: runs.state })
|
||||
.from(runs)
|
||||
.where(eq(runs.id, runId));
|
||||
expect(run?.state).toBe("failed");
|
||||
expect(run?.finalReportPath).toMatch(/\.report\.md$/);
|
||||
});
|
||||
|
||||
it("sweeps non-terminal M4 runs on API startup recovery", async () => {
|
||||
client = createDbClient(databaseUrl);
|
||||
await seedDevelopmentRegistry(client.db);
|
||||
@@ -1226,7 +1646,7 @@ describe("DbRunEngine", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("replays terminal approval disposal side effects for duplicate decisions", async () => {
|
||||
it("replays terminal approval cleanup side effects idempotently", async () => {
|
||||
client = createDbClient(databaseUrl);
|
||||
await seedDevelopmentRegistry(client.db);
|
||||
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
|
||||
@@ -1309,6 +1729,60 @@ describe("DbRunEngine", () => {
|
||||
),
|
||||
).toMatchObject({ runId, status: "aborted" });
|
||||
});
|
||||
|
||||
it("repairs terminal final reports before surfacing approval replay dispose failures", async () => {
|
||||
client = createDbClient(databaseUrl);
|
||||
await seedDevelopmentRegistry(client.db);
|
||||
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
|
||||
const repoPath = createGitRepo();
|
||||
const worktreeRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-worktree-")));
|
||||
tempRoots.push(workspaceRoot, repoPath, worktreeRoot);
|
||||
const [template] = await client.db
|
||||
.select({ hash: workflowTemplates.hash, id: workflowTemplates.id })
|
||||
.from(workflowTemplates)
|
||||
.where(eq(workflowTemplates.name, "development"))
|
||||
.limit(1);
|
||||
if (template === undefined) {
|
||||
throw new Error("development template missing");
|
||||
}
|
||||
const runId = randomUUID();
|
||||
runIds.push(runId);
|
||||
await client.db.insert(runs).values({
|
||||
id: runId,
|
||||
templateId: template.id,
|
||||
templateHash: template.hash,
|
||||
state: "aborted",
|
||||
repoPath,
|
||||
baseBranch: "main",
|
||||
worktreeRoot,
|
||||
endedAt: new Date(),
|
||||
finalReportPath: null,
|
||||
});
|
||||
await client.db.insert(tuiSessions).values({
|
||||
id: randomUUID(),
|
||||
runId,
|
||||
roleId: "implementer",
|
||||
backend: "fake",
|
||||
cwd: worktreeRoot,
|
||||
state: "FAILED_NEEDS_HUMAN",
|
||||
});
|
||||
const engine = new DbRunEngine({
|
||||
db: client.db,
|
||||
sessions: sessionRuntime(client.db, new DisposeFailsFakeAdapter({ writeDelayMs: 0 })),
|
||||
workspaceRoot,
|
||||
maxConcurrentRuns: 100,
|
||||
});
|
||||
|
||||
await expect(engine.replayAppliedApprovalSideEffects(runId, "abort")).rejects.toMatchObject({
|
||||
code: "pane_briefly_unresponsive",
|
||||
});
|
||||
|
||||
const [run] = await client.db
|
||||
.select({ finalReportPath: runs.finalReportPath })
|
||||
.from(runs)
|
||||
.where(eq(runs.id, runId));
|
||||
expect(run?.finalReportPath).toMatch(/\.report\.md$/);
|
||||
});
|
||||
});
|
||||
|
||||
function pendingApproval(status: Awaited<ReturnType<DbRunEngine["getStatus"]>>, gateKey: string) {
|
||||
|
||||
@@ -21,6 +21,7 @@ import {
|
||||
import {
|
||||
type DbClient,
|
||||
RunEventRepository,
|
||||
TuiTranscriptRepository,
|
||||
agentPersonas,
|
||||
approvalDecisions,
|
||||
approvalRequests,
|
||||
@@ -33,10 +34,16 @@ import {
|
||||
runPhases,
|
||||
runs,
|
||||
tuiSessions,
|
||||
tuiTranscriptChunks,
|
||||
workflowTemplates,
|
||||
} from "@devflow/db";
|
||||
import type { SessionRuntime } from "@devflow/session";
|
||||
import { and, asc, desc, eq, inArray, sql } from "drizzle-orm";
|
||||
import {
|
||||
type SessionHandle,
|
||||
type SessionRuntime,
|
||||
type TranscriptBaseline,
|
||||
captureAndPersistTranscript,
|
||||
} from "@devflow/session";
|
||||
import { and, asc, desc, eq, gt, inArray, lte, sql } from "drizzle-orm";
|
||||
|
||||
import { runSingleFakePhase } from "./fake-phase-harness.js";
|
||||
|
||||
@@ -198,6 +205,7 @@ export class DbRunEngine implements RunEngine {
|
||||
return { runId };
|
||||
}
|
||||
await this.markRunFailedIfActive(runId, "start_run_failed");
|
||||
await this.composeFinalReportForTerminalRun(runId);
|
||||
throw error;
|
||||
}
|
||||
|
||||
@@ -387,6 +395,7 @@ export class DbRunEngine implements RunEngine {
|
||||
return this.getStatus(runId);
|
||||
}
|
||||
await this.markRunFailedIfActive(runId, options.failureReason ?? "advance_run_failed");
|
||||
await this.composeFinalReportForTerminalRun(runId);
|
||||
throw error;
|
||||
}
|
||||
|
||||
@@ -398,9 +407,11 @@ export class DbRunEngine implements RunEngine {
|
||||
repoPath: string,
|
||||
baseBranch: string,
|
||||
): Promise<void> {
|
||||
await tx.execute(sql`SELECT pg_advisory_xact_lock(hashtext('devflow:start-run-global'))`);
|
||||
await tx.execute(
|
||||
sql`SELECT pg_advisory_xact_lock(hashtext('devflow:start-run'), hashtext(${`${repoPath}:${baseBranch}`}))`,
|
||||
sql`SELECT pg_advisory_xact_lock(hashtextextended(${"devflow:start-run-concurrency"}, 0))`,
|
||||
);
|
||||
await tx.execute(
|
||||
sql`SELECT pg_advisory_xact_lock(hashtextextended(${`devflow:start-run:${repoPath}:${baseBranch}`}, 0))`,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -456,17 +467,18 @@ export class DbRunEngine implements RunEngine {
|
||||
return;
|
||||
}
|
||||
await this.markRunFailedIfActive(runId, "approval_advance_failed");
|
||||
await this.composeFinalReportForTerminalRun(runId);
|
||||
throw error;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (parsedAction === "reject") {
|
||||
await this.composeFinalReportBestEffort(runId, "failed");
|
||||
await this.cleanupTerminalRun(runId, "failed", decision.sessionIdsToDispose);
|
||||
return;
|
||||
}
|
||||
|
||||
await this.composeFinalReportBestEffort(runId, "aborted");
|
||||
await this.cleanupTerminalRun(runId, "aborted", decision.sessionIdsToDispose);
|
||||
}
|
||||
|
||||
async signalApprovalForWorkflow(
|
||||
@@ -477,7 +489,23 @@ export class DbRunEngine implements RunEngine {
|
||||
comment?: string,
|
||||
): Promise<void> {
|
||||
const parsedAction = ApprovalDecisionAction.parse(action);
|
||||
await this.recordApprovalDecision(runId, approvalRequestId, parsedAction, clientToken, comment);
|
||||
const decision = await this.recordApprovalDecision(
|
||||
runId,
|
||||
approvalRequestId,
|
||||
parsedAction,
|
||||
clientToken,
|
||||
comment,
|
||||
);
|
||||
if (parsedAction === "reject" || parsedAction === "abort") {
|
||||
const status = await this.getStatus(runId);
|
||||
if (isTerminalRunState(status.run.state)) {
|
||||
await this.cleanupTerminalRun(
|
||||
runId,
|
||||
status.run.state as "completed" | "failed" | "aborted",
|
||||
decision.sessionIdsToDispose,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async validateApprovalSignalInput(
|
||||
@@ -514,17 +542,16 @@ export class DbRunEngine implements RunEngine {
|
||||
): Promise<void> {
|
||||
const parsedAction = ApprovalDecisionAction.parse(action);
|
||||
const shouldDisposeSessions = options.disposeSessions ?? true;
|
||||
if (shouldDisposeSessions && parsedAction === "reject") {
|
||||
await this.disposeSessions(await this.sessionIdsForRun(runId));
|
||||
} else if (shouldDisposeSessions && parsedAction === "abort") {
|
||||
await this.disposeSessions(await this.sessionIdsForRun(runId));
|
||||
}
|
||||
|
||||
const status = await this.getStatus(runId);
|
||||
if (isTerminalRunState(status.run.state)) {
|
||||
await this.composeFinalReportBestEffort(
|
||||
const sessionIds =
|
||||
shouldDisposeSessions && (parsedAction === "reject" || parsedAction === "abort")
|
||||
? await this.sessionIdsForRun(runId)
|
||||
: [];
|
||||
await this.cleanupTerminalRun(
|
||||
runId,
|
||||
status.run.state as "completed" | "failed" | "aborted",
|
||||
sessionIds,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -627,11 +654,19 @@ export class DbRunEngine implements RunEngine {
|
||||
|
||||
async abortRun(runId: string, reason: string): Promise<void> {
|
||||
const eventRepository = new RunEventRepository(this.db);
|
||||
let aborted = false;
|
||||
let shouldCleanup = false;
|
||||
let sessionsToDispose: string[] = [];
|
||||
await this.db.transaction(async (tx) => {
|
||||
const [run] = await lockRun(tx, runId);
|
||||
if (run === undefined || isTerminalRunState(run.state)) {
|
||||
if (run === undefined) {
|
||||
return;
|
||||
}
|
||||
if (run.state === "aborted") {
|
||||
sessionsToDispose = await sessionIdsForRun(tx, runId);
|
||||
shouldCleanup = true;
|
||||
return;
|
||||
}
|
||||
if (isTerminalRunState(run.state)) {
|
||||
return;
|
||||
}
|
||||
await tx
|
||||
@@ -653,12 +688,11 @@ export class DbRunEngine implements RunEngine {
|
||||
await failActivePhasesInTransaction(tx, eventRepository, runId, "abort");
|
||||
await abortPendingApprovalsInTransaction(tx, runId);
|
||||
sessionsToDispose = await markSessionsFailedInTransaction(tx, eventRepository, runId);
|
||||
aborted = true;
|
||||
shouldCleanup = true;
|
||||
});
|
||||
|
||||
if (aborted) {
|
||||
await this.disposeSessions(sessionsToDispose);
|
||||
await this.composeFinalReportBestEffort(runId, "aborted");
|
||||
if (shouldCleanup) {
|
||||
await this.cleanupTerminalRun(runId, "aborted", sessionsToDispose);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -970,7 +1004,7 @@ export class DbRunEngine implements RunEngine {
|
||||
action: ApprovalDecisionActionValue,
|
||||
clientToken: string,
|
||||
comment: string | undefined,
|
||||
): Promise<{ replayed: boolean }> {
|
||||
): Promise<{ replayed: boolean; sessionIdsToDispose: string[] }> {
|
||||
const decisionIdempotencyKey = `${approvalRequestId}:${action}:${clientToken}`;
|
||||
const eventRepository = new RunEventRepository(this.db);
|
||||
let sessionsToDispose: string[] = [];
|
||||
@@ -1118,11 +1152,7 @@ export class DbRunEngine implements RunEngine {
|
||||
return { replayed: false };
|
||||
});
|
||||
|
||||
if (sessionsToDispose.length > 0) {
|
||||
await this.disposeSessions(sessionsToDispose);
|
||||
}
|
||||
|
||||
return result;
|
||||
return { ...result, sessionIdsToDispose: sessionsToDispose };
|
||||
}
|
||||
|
||||
private async readApprovalSignalState(
|
||||
@@ -1650,11 +1680,8 @@ export class DbRunEngine implements RunEngine {
|
||||
});
|
||||
sessionsToDispose = await markSessionsFailedInTransaction(tx, eventRepository, runId);
|
||||
});
|
||||
if (markedFailed) {
|
||||
await this.disposeSessions(sessionsToDispose);
|
||||
}
|
||||
if (reportStatus !== undefined) {
|
||||
await this.composeFinalReportBestEffort(runId, reportStatus);
|
||||
if (markedFailed && reportStatus !== undefined) {
|
||||
await this.cleanupTerminalRun(runId, reportStatus, sessionsToDispose);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1670,6 +1697,127 @@ export class DbRunEngine implements RunEngine {
|
||||
}
|
||||
}
|
||||
|
||||
private async composeFinalReportForTerminalRun(runId: string): Promise<void> {
|
||||
const status = await this.getStatus(runId);
|
||||
if (!isTerminalRunState(status.run.state)) {
|
||||
return;
|
||||
}
|
||||
await this.composeFinalReportBestEffort(
|
||||
runId,
|
||||
status.run.state as "completed" | "failed" | "aborted",
|
||||
);
|
||||
}
|
||||
|
||||
private async cleanupTerminalRun(
|
||||
runId: string,
|
||||
status: "completed" | "failed" | "aborted",
|
||||
sessionIds: readonly string[],
|
||||
): Promise<void> {
|
||||
let captureError: unknown;
|
||||
try {
|
||||
await this.captureSessionTranscripts(sessionIds);
|
||||
} catch (error) {
|
||||
captureError = error;
|
||||
}
|
||||
|
||||
await this.composeFinalReportBestEffort(runId, status);
|
||||
|
||||
let disposeError: unknown;
|
||||
try {
|
||||
await this.disposeSessions(sessionIds);
|
||||
} catch (error) {
|
||||
disposeError = error;
|
||||
}
|
||||
|
||||
if (captureError !== undefined) {
|
||||
throw captureError;
|
||||
}
|
||||
|
||||
if (disposeError !== undefined) {
|
||||
throw disposeError;
|
||||
}
|
||||
}
|
||||
|
||||
private async captureSessionTranscripts(sessionIds: readonly string[]): Promise<void> {
|
||||
const uniqueSessionIds = [...new Set(sessionIds)];
|
||||
if (uniqueSessionIds.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const sessionRows = await this.db
|
||||
.select({
|
||||
id: tuiSessions.id,
|
||||
lastCaptureSeq: tuiSessions.lastCaptureSeq,
|
||||
lastKnownPanePid: tuiSessions.lastKnownPanePid,
|
||||
tmuxSession: tuiSessions.tmuxSession,
|
||||
tmuxWindow: tuiSessions.tmuxWindow,
|
||||
})
|
||||
.from(tuiSessions)
|
||||
.where(inArray(tuiSessions.id, uniqueSessionIds));
|
||||
const sink = new TuiTranscriptRepository(this.db);
|
||||
const results = await Promise.allSettled(
|
||||
sessionRows.map(async (session) => {
|
||||
const transcriptBaseline = await this.loadTranscriptBaseline(
|
||||
session.id,
|
||||
session.lastCaptureSeq,
|
||||
);
|
||||
try {
|
||||
await captureAndPersistTranscript({
|
||||
adapter: this.sessions,
|
||||
handle: {
|
||||
...sessionHandleFromRow(session),
|
||||
...(transcriptBaseline === undefined ? {} : { transcriptBaseline }),
|
||||
},
|
||||
fromSeq: session.lastCaptureSeq,
|
||||
sink,
|
||||
});
|
||||
} catch (error) {
|
||||
if (isMissingTerminalSessionError(error)) {
|
||||
return;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}),
|
||||
);
|
||||
const failed = results.find((result) => result.status === "rejected");
|
||||
if (failed !== undefined) {
|
||||
throw failed.reason;
|
||||
}
|
||||
}
|
||||
|
||||
private async loadTranscriptBaseline(
|
||||
sessionId: string,
|
||||
lastCaptureSeq: bigint,
|
||||
): Promise<TranscriptBaseline | undefined> {
|
||||
if (lastCaptureSeq === 0n) {
|
||||
return undefined;
|
||||
}
|
||||
const rowsDescending = await this.db
|
||||
.select({ seq: tuiTranscriptChunks.seq, content: tuiTranscriptChunks.content })
|
||||
.from(tuiTranscriptChunks)
|
||||
.where(
|
||||
and(
|
||||
eq(tuiTranscriptChunks.sessionId, sessionId),
|
||||
gt(tuiTranscriptChunks.seq, 0n),
|
||||
lte(tuiTranscriptChunks.seq, lastCaptureSeq),
|
||||
),
|
||||
)
|
||||
.orderBy(desc(tuiTranscriptChunks.seq))
|
||||
.limit(transcriptBaselineLineLimit);
|
||||
if (rowsDescending.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
const rows = [...rowsDescending].reverse();
|
||||
const startSeq = rows[0]?.seq;
|
||||
if (startSeq === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
startSeq,
|
||||
lines: rows.map((row) => row.content),
|
||||
};
|
||||
}
|
||||
|
||||
private async writeStubFinalReport(
|
||||
runId: string,
|
||||
status: "completed" | "failed" | "aborted",
|
||||
@@ -1905,9 +2053,33 @@ export class DbRunEngine implements RunEngine {
|
||||
}
|
||||
|
||||
private async disposeSessions(sessionIds: readonly string[]): Promise<void> {
|
||||
await Promise.all(
|
||||
sessionIds.map((sessionId) => this.sessions.dispose({ sessionId }).catch(() => undefined)),
|
||||
const uniqueSessionIds = [...new Set(sessionIds)];
|
||||
if (uniqueSessionIds.length === 0) {
|
||||
return;
|
||||
}
|
||||
const sessionRows = await this.db
|
||||
.select({
|
||||
id: tuiSessions.id,
|
||||
lastKnownPanePid: tuiSessions.lastKnownPanePid,
|
||||
tmuxSession: tuiSessions.tmuxSession,
|
||||
tmuxWindow: tuiSessions.tmuxWindow,
|
||||
})
|
||||
.from(tuiSessions)
|
||||
.where(inArray(tuiSessions.id, uniqueSessionIds));
|
||||
const results = await Promise.allSettled(
|
||||
sessionRows.map((session) =>
|
||||
this.sessions.dispose(sessionHandleFromRow(session)).catch((error: unknown) => {
|
||||
if (isMissingTerminalSessionError(error)) {
|
||||
return;
|
||||
}
|
||||
throw error;
|
||||
}),
|
||||
),
|
||||
);
|
||||
const failed = results.find((result) => result.status === "rejected");
|
||||
if (failed !== undefined) {
|
||||
throw failed.reason;
|
||||
}
|
||||
}
|
||||
|
||||
private async sessionIdsForRun(runId: string): Promise<string[]> {
|
||||
@@ -1915,6 +2087,8 @@ export class DbRunEngine implements RunEngine {
|
||||
}
|
||||
}
|
||||
|
||||
const transcriptBaselineLineLimit = 200;
|
||||
|
||||
export async function readRunStatus(db: Database, runId: string): Promise<RunStatus> {
|
||||
const [run] = await db
|
||||
.select({
|
||||
@@ -2084,6 +2258,36 @@ function activeRunExists(currentRunId: string, currentState: string): DevflowErr
|
||||
});
|
||||
}
|
||||
|
||||
function sessionHandleFromRow(session: {
|
||||
id: string;
|
||||
lastKnownPanePid: number | null;
|
||||
tmuxSession: string | null;
|
||||
tmuxWindow: string | null;
|
||||
}): SessionHandle {
|
||||
return {
|
||||
sessionId: session.id,
|
||||
...(session.lastKnownPanePid === null ? {} : { pid: session.lastKnownPanePid }),
|
||||
...(session.tmuxSession === null ? {} : { tmuxSession: session.tmuxSession }),
|
||||
...(session.tmuxWindow === null ? {} : { tmuxWindow: session.tmuxWindow }),
|
||||
};
|
||||
}
|
||||
|
||||
function isMissingTerminalSessionError(error: unknown): boolean {
|
||||
if (!(error instanceof DevflowError)) {
|
||||
return false;
|
||||
}
|
||||
const hint = `${error.recoveryHint ?? ""}\n${error.message}`.toLowerCase();
|
||||
return (
|
||||
hint.includes("missing tmux session") ||
|
||||
hint.includes("can't find session") ||
|
||||
hint.includes("can't find pane") ||
|
||||
hint.includes("no server running") ||
|
||||
hint.includes("tmux session is disposed") ||
|
||||
hint.includes("tmux session already disposed") ||
|
||||
hint.includes("fake session is not active")
|
||||
);
|
||||
}
|
||||
|
||||
function isPgConstraintViolation(error: unknown, constraint: string): boolean {
|
||||
return (
|
||||
typeof error === "object" &&
|
||||
|
||||
@@ -113,6 +113,19 @@ class ProbeUnknownFailureFakeAdapter extends FakeSessionAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
class TmuxLivenessOnlyProbeFakeAdapter extends FakeSessionAdapter {
|
||||
rebootstrapAttempts = 0;
|
||||
|
||||
override async probe(_handle: SessionHandle): Promise<ProbeResult> {
|
||||
return { alive: true, paneActive: true, hint: "tmux_liveness_only" };
|
||||
}
|
||||
|
||||
override async rebootstrap(handle: SessionHandle): Promise<SessionHandle> {
|
||||
this.rebootstrapAttempts += 1;
|
||||
return super.rebootstrap(handle);
|
||||
}
|
||||
}
|
||||
|
||||
class BreakArtifactParentFakeAdapter extends FakeSessionAdapter {
|
||||
override async sendPrompt(
|
||||
handle: SessionHandle,
|
||||
@@ -134,6 +147,105 @@ class WriteDirectoryArtifactFakeAdapter extends FakeSessionAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
class SendAndDisposeFailFakeAdapter extends FakeSessionAdapter {
|
||||
events: string[] = [];
|
||||
|
||||
override async sendPrompt(): Promise<{ promptId: string }> {
|
||||
throw new Error("unclassified prompt failure");
|
||||
}
|
||||
|
||||
override async *capture(handle: SessionHandle, fromSeq: bigint): AsyncIterable<TranscriptChunk> {
|
||||
this.events.push("capture");
|
||||
yield* super.capture(handle, fromSeq);
|
||||
}
|
||||
|
||||
override async dispose(handle: SessionHandle): Promise<void> {
|
||||
this.events.push("dispose");
|
||||
throw new DevflowError("dispose failed", {
|
||||
class: "recoverable",
|
||||
code: "pane_briefly_unresponsive",
|
||||
recoveryHint: `session=${handle.sessionId}`,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class SendFailureRecordsTerminalHandlesFakeAdapter extends FakeSessionAdapter {
|
||||
readonly captureHandles: SessionHandle[] = [];
|
||||
readonly disposeHandles: SessionHandle[] = [];
|
||||
|
||||
constructor(private readonly db: DbClient["db"]) {
|
||||
super({ writeDelayMs: 0 });
|
||||
}
|
||||
|
||||
override async sendPrompt(handle: SessionHandle): Promise<{ promptId: string }> {
|
||||
await this.db
|
||||
.update(tuiSessions)
|
||||
.set({
|
||||
lastKnownPanePid: 777,
|
||||
tmuxSession: "persisted-session",
|
||||
tmuxWindow: "persisted-window",
|
||||
})
|
||||
.where(eq(tuiSessions.id, handle.sessionId));
|
||||
throw new Error("unclassified prompt failure");
|
||||
}
|
||||
|
||||
override async *capture(handle: SessionHandle, fromSeq: bigint): AsyncIterable<TranscriptChunk> {
|
||||
this.captureHandles.push(handle);
|
||||
yield* super.capture(handle, fromSeq);
|
||||
}
|
||||
|
||||
override async dispose(handle: SessionHandle): Promise<void> {
|
||||
this.disposeHandles.push(handle);
|
||||
await super.dispose(handle);
|
||||
}
|
||||
}
|
||||
|
||||
class WrongSessionIdAndDisposeFailFakeAdapter extends FakeSessionAdapter {
|
||||
override async start(input: StartInput): Promise<SessionHandle> {
|
||||
await super.start(input);
|
||||
return { sessionId: randomUUID() };
|
||||
}
|
||||
|
||||
override async dispose(handle: SessionHandle): Promise<void> {
|
||||
throw new DevflowError("dispose failed", {
|
||||
class: "recoverable",
|
||||
code: "pane_briefly_unresponsive",
|
||||
recoveryHint: `session=${handle.sessionId}`,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class CaptureFailsFakeAdapter extends FakeSessionAdapter {
|
||||
events: string[] = [];
|
||||
|
||||
override capture(): AsyncIterable<TranscriptChunk> {
|
||||
this.events.push("capture");
|
||||
return {
|
||||
[Symbol.asyncIterator]() {
|
||||
return {
|
||||
async next() {
|
||||
throw new DevflowError("transcript history unavailable", {
|
||||
class: "human_required",
|
||||
code: "transcript_history_unavailable",
|
||||
});
|
||||
},
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
override async dispose(handle: SessionHandle): Promise<void> {
|
||||
this.events.push("dispose");
|
||||
await super.dispose(handle);
|
||||
}
|
||||
}
|
||||
|
||||
class SendAndCaptureFailFakeAdapter extends CaptureFailsFakeAdapter {
|
||||
override async sendPrompt(): Promise<{ promptId: string }> {
|
||||
throw new Error("unclassified prompt failure");
|
||||
}
|
||||
}
|
||||
|
||||
class StartFailsFakeAdapter extends FakeSessionAdapter {
|
||||
constructor(private readonly error: DevflowError) {
|
||||
super();
|
||||
@@ -442,6 +554,183 @@ describe("runSingleFakePhase", () => {
|
||||
},
|
||||
);
|
||||
|
||||
it("surfaces dispose failures after fatal prompt send cleanup", async () => {
|
||||
const { db, phaseId, runId } = await createRunAndPhase();
|
||||
const worktreeRoot = realpathSync(
|
||||
mkdtempSync(join(tmpdir(), "devflow-fake-phase-dispose-failure-")),
|
||||
);
|
||||
tempRoots.push(worktreeRoot);
|
||||
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
|
||||
const adapter = new SendAndDisposeFailFakeAdapter({ writeDelayMs: 0 });
|
||||
|
||||
await expect(
|
||||
runSingleFakePhase({
|
||||
adapter,
|
||||
db,
|
||||
expectedArtifactPath,
|
||||
expectedSchema: "dev/spec@1",
|
||||
instructions: "Scenario: ok\nWrite the development specification.",
|
||||
phaseId,
|
||||
phaseKey: "implement",
|
||||
roleId: "implementer",
|
||||
runId,
|
||||
worktreeRoot,
|
||||
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 100 },
|
||||
}),
|
||||
).rejects.toMatchObject({ code: "pane_briefly_unresponsive" });
|
||||
|
||||
const [run] = await db.select({ state: runs.state }).from(runs).where(eq(runs.id, runId));
|
||||
expect(run).toEqual({ state: "failed" });
|
||||
expect(adapter.events).toEqual(["capture", "dispose"]);
|
||||
});
|
||||
|
||||
it("uses persisted tmux handles when fatal prompt cleanup captures and disposes sessions", async () => {
|
||||
const { db, phaseId, runId } = await createRunAndPhase();
|
||||
const worktreeRoot = realpathSync(
|
||||
mkdtempSync(join(tmpdir(), "devflow-fake-phase-terminal-cleanup-")),
|
||||
);
|
||||
tempRoots.push(worktreeRoot);
|
||||
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
|
||||
const adapter = new SendFailureRecordsTerminalHandlesFakeAdapter(db);
|
||||
|
||||
await expect(
|
||||
runSingleFakePhase({
|
||||
adapter,
|
||||
db,
|
||||
expectedArtifactPath,
|
||||
expectedSchema: "dev/spec@1",
|
||||
instructions: "Scenario: ok\nWrite the development specification.",
|
||||
phaseId,
|
||||
phaseKey: "implement",
|
||||
roleId: "implementer",
|
||||
runId,
|
||||
worktreeRoot,
|
||||
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 100 },
|
||||
}),
|
||||
).rejects.toThrow("unclassified prompt failure");
|
||||
|
||||
expect(adapter.captureHandles).toContainEqual({
|
||||
sessionId: expect.any(String),
|
||||
pid: 777,
|
||||
tmuxSession: "persisted-session",
|
||||
tmuxWindow: "persisted-window",
|
||||
});
|
||||
expect(adapter.disposeHandles).toContainEqual({
|
||||
sessionId: expect.any(String),
|
||||
pid: 777,
|
||||
tmuxSession: "persisted-session",
|
||||
tmuxWindow: "persisted-window",
|
||||
});
|
||||
});
|
||||
|
||||
it("attempts disposal when fatal prompt cleanup transcript capture fails", async () => {
|
||||
const { db, phaseId, runId } = await createRunAndPhase();
|
||||
const worktreeRoot = realpathSync(
|
||||
mkdtempSync(join(tmpdir(), "devflow-fake-phase-capture-cleanup-failure-")),
|
||||
);
|
||||
tempRoots.push(worktreeRoot);
|
||||
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
|
||||
const adapter = new SendAndCaptureFailFakeAdapter({ writeDelayMs: 0 });
|
||||
|
||||
await expect(
|
||||
runSingleFakePhase({
|
||||
adapter,
|
||||
db,
|
||||
expectedArtifactPath,
|
||||
expectedSchema: "dev/spec@1",
|
||||
instructions: "Scenario: ok\nWrite the development specification.",
|
||||
phaseId,
|
||||
phaseKey: "implement",
|
||||
roleId: "implementer",
|
||||
runId,
|
||||
worktreeRoot,
|
||||
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 100 },
|
||||
}),
|
||||
).rejects.toMatchObject({ code: "transcript_history_unavailable" });
|
||||
|
||||
await expect(
|
||||
db.select({ state: runs.state }).from(runs).where(eq(runs.id, runId)),
|
||||
).resolves.toEqual([{ state: "failed" }]);
|
||||
await expect(
|
||||
db.select({ state: runPhases.state }).from(runPhases).where(eq(runPhases.id, phaseId)),
|
||||
).resolves.toEqual([{ state: "failed" }]);
|
||||
await expect(
|
||||
db.select({ state: tuiSessions.state }).from(tuiSessions).where(eq(tuiSessions.runId, runId)),
|
||||
).resolves.toEqual([{ state: "FAILED_NEEDS_HUMAN" }]);
|
||||
expect(adapter.events).toEqual(["capture", "dispose"]);
|
||||
});
|
||||
|
||||
it("records durable failure state when session start cleanup fails", async () => {
|
||||
const { db, phaseId, runId } = await createRunAndPhase();
|
||||
const worktreeRoot = realpathSync(
|
||||
mkdtempSync(join(tmpdir(), "devflow-fake-phase-start-cleanup-failure-")),
|
||||
);
|
||||
tempRoots.push(worktreeRoot);
|
||||
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
|
||||
|
||||
await expect(
|
||||
runSingleFakePhase({
|
||||
adapter: new WrongSessionIdAndDisposeFailFakeAdapter({ writeDelayMs: 0 }),
|
||||
db,
|
||||
expectedArtifactPath,
|
||||
expectedSchema: "dev/spec@1",
|
||||
instructions: "Scenario: ok\nWrite the development specification.",
|
||||
phaseId,
|
||||
phaseKey: "implement",
|
||||
roleId: "implementer",
|
||||
runId,
|
||||
worktreeRoot,
|
||||
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 100 },
|
||||
}),
|
||||
).rejects.toMatchObject({ code: "pane_briefly_unresponsive" });
|
||||
|
||||
await expect(
|
||||
db.select({ state: runs.state }).from(runs).where(eq(runs.id, runId)),
|
||||
).resolves.toEqual([{ state: "failed" }]);
|
||||
await expect(
|
||||
db.select({ state: runPhases.state }).from(runPhases).where(eq(runPhases.id, phaseId)),
|
||||
).resolves.toEqual([{ state: "failed" }]);
|
||||
await expect(
|
||||
db.select({ state: tuiSessions.state }).from(tuiSessions).where(eq(tuiSessions.runId, runId)),
|
||||
).resolves.toEqual([{ state: "FAILED_NEEDS_HUMAN" }]);
|
||||
});
|
||||
|
||||
it("captures transcript before creating workflow approval gates", async () => {
|
||||
const { db, phaseId, runId } = await createRunAndPhase();
|
||||
const worktreeRoot = realpathSync(
|
||||
mkdtempSync(join(tmpdir(), "devflow-fake-phase-workflow-capture-")),
|
||||
);
|
||||
tempRoots.push(worktreeRoot);
|
||||
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
|
||||
|
||||
await expect(
|
||||
runSingleFakePhase({
|
||||
adapter: new CaptureFailsFakeAdapter({ writeDelayMs: 0 }),
|
||||
db,
|
||||
expectedArtifactPath,
|
||||
expectedSchema: "dev/spec@1",
|
||||
instructions: "Scenario: ok\nWrite the development specification.",
|
||||
phaseId,
|
||||
phaseKey: "implement",
|
||||
roleId: "implementer",
|
||||
runId,
|
||||
worktreeRoot,
|
||||
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 100 },
|
||||
workflowApprovalGateKey: "spec_approved",
|
||||
}),
|
||||
).rejects.toMatchObject({ code: "transcript_history_unavailable" });
|
||||
|
||||
await expect(
|
||||
db
|
||||
.select({ id: approvalRequests.id })
|
||||
.from(approvalRequests)
|
||||
.where(eq(approvalRequests.runId, runId)),
|
||||
).resolves.toEqual([]);
|
||||
await expect(
|
||||
db.select({ state: runs.state }).from(runs).where(eq(runs.id, runId)),
|
||||
).resolves.toEqual([{ state: "executing" }]);
|
||||
});
|
||||
|
||||
it("rolls back phase start when recording the phase.started event fails", async () => {
|
||||
const { db, phaseId, runId } = await createRunAndPhase();
|
||||
const worktreeRoot = realpathSync(
|
||||
@@ -2945,6 +3234,46 @@ describe("runSingleFakePhase", () => {
|
||||
expect(approval).toEqual({ gateKey: "artifact_timeout_exhausted", state: "pending" });
|
||||
});
|
||||
|
||||
it("does not rebootstrap when tmux probe only proves pane liveness", async () => {
|
||||
const { db, phaseId, runId } = await createRunAndPhase();
|
||||
const worktreeRoot = realpathSync(
|
||||
mkdtempSync(join(tmpdir(), "devflow-fake-phase-probe-liveness-only-")),
|
||||
);
|
||||
tempRoots.push(worktreeRoot);
|
||||
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
|
||||
const sessionId = randomUUID();
|
||||
const adapter = new TmuxLivenessOnlyProbeFakeAdapter({
|
||||
sessionIdFactory: () => sessionId,
|
||||
writeDelayMs: 0,
|
||||
});
|
||||
|
||||
await expect(
|
||||
runSingleFakePhase({
|
||||
adapter,
|
||||
db,
|
||||
expectedArtifactPath,
|
||||
expectedSchema: "dev/spec@1",
|
||||
instructions: "Scenario: timeout\nRepair-Scenario: ok\nProbe proves only tmux liveness.",
|
||||
phaseId,
|
||||
phaseKey: "implement",
|
||||
roleId: "implementer",
|
||||
runId,
|
||||
worktreeRoot,
|
||||
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 10 },
|
||||
uuidFactory: () => "00000000-0000-4000-8000-000000000033",
|
||||
}),
|
||||
).rejects.toMatchObject({ code: "artifact_timeout_exhausted" });
|
||||
|
||||
expect(adapter.rebootstrapAttempts).toBe(0);
|
||||
await expectRunPaused(db, runId);
|
||||
|
||||
const [approval] = await db
|
||||
.select({ gateKey: approvalRequests.gateKey, state: approvalRequests.state })
|
||||
.from(approvalRequests)
|
||||
.where(eq(approvalRequests.runId, runId));
|
||||
expect(approval).toEqual({ gateKey: "artifact_timeout_exhausted", state: "pending" });
|
||||
});
|
||||
|
||||
it("fails the run when timeout recovery probe throws an unclassified error", async () => {
|
||||
const { db, phaseId, runId } = await createRunAndPhase();
|
||||
const worktreeRoot = realpathSync(
|
||||
|
||||
@@ -22,6 +22,7 @@ import {
|
||||
tuiSessions,
|
||||
} from "@devflow/db";
|
||||
import {
|
||||
type ProbeResult,
|
||||
type SessionAdapter,
|
||||
type SessionHandle,
|
||||
SessionManager,
|
||||
@@ -253,7 +254,6 @@ export async function runSingleFakePhase(
|
||||
throw gateError;
|
||||
}
|
||||
await failRunAndDisposeSession(input, eventRepository, attempt, "prompt_send_failed", handle);
|
||||
await captureTranscript(input, handle);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
@@ -295,7 +295,6 @@ export async function runSingleFakePhase(
|
||||
"artifact_validation_failed",
|
||||
handle,
|
||||
);
|
||||
await captureTranscript(input, handle);
|
||||
throw error;
|
||||
}
|
||||
|
||||
@@ -332,7 +331,6 @@ export async function runSingleFakePhase(
|
||||
"artifact_timeout_recovery_failed",
|
||||
handle,
|
||||
);
|
||||
await captureTranscript(input, handle);
|
||||
throw recoveryError;
|
||||
}
|
||||
if (!recovered) {
|
||||
@@ -387,7 +385,6 @@ export async function runSingleFakePhase(
|
||||
"stale_artifact_remove_failed",
|
||||
handle,
|
||||
);
|
||||
await captureTranscript(input, handle);
|
||||
throw error;
|
||||
}
|
||||
const timeoutRepairEnvelope = buildEnvelope(
|
||||
@@ -417,7 +414,6 @@ export async function runSingleFakePhase(
|
||||
"prompt_send_failed",
|
||||
handle,
|
||||
);
|
||||
await captureTranscript(input, handle);
|
||||
throw repairError;
|
||||
}
|
||||
const gateError = toHumanRequiredRecoveryError(repairError);
|
||||
@@ -464,7 +460,6 @@ export async function runSingleFakePhase(
|
||||
"artifact_repair_failed",
|
||||
handle,
|
||||
);
|
||||
await captureTranscript(input, handle);
|
||||
throw repairError;
|
||||
}
|
||||
await failPhaseAndRequestGate(
|
||||
@@ -542,7 +537,6 @@ export async function runSingleFakePhase(
|
||||
"stale_artifact_remove_failed",
|
||||
handle,
|
||||
);
|
||||
await captureTranscript(input, handle);
|
||||
throw error;
|
||||
}
|
||||
const repairEnvelope = buildEnvelope(
|
||||
@@ -572,7 +566,6 @@ export async function runSingleFakePhase(
|
||||
"prompt_send_failed",
|
||||
handle,
|
||||
);
|
||||
await captureTranscript(input, handle);
|
||||
throw error;
|
||||
}
|
||||
const gateError = toHumanRequiredRecoveryError(error);
|
||||
@@ -618,7 +611,6 @@ export async function runSingleFakePhase(
|
||||
"artifact_repair_failed",
|
||||
handle,
|
||||
);
|
||||
await captureTranscript(input, handle);
|
||||
throw error;
|
||||
}
|
||||
await failPhaseAndRequestGate(
|
||||
@@ -640,8 +632,10 @@ export async function runSingleFakePhase(
|
||||
}
|
||||
}
|
||||
|
||||
let transcript: Awaited<ReturnType<typeof captureTranscript>> | undefined;
|
||||
if (outcome.validation.ok) {
|
||||
if (input.workflowApprovalGateKey !== undefined) {
|
||||
transcript = await captureTranscript(input, handle);
|
||||
await requestWorkflowApproval(
|
||||
input,
|
||||
eventRepository,
|
||||
@@ -685,7 +679,7 @@ export async function runSingleFakePhase(
|
||||
});
|
||||
}
|
||||
|
||||
const transcript = await captureTranscript(input, handle);
|
||||
transcript ??= await captureTranscript(input, handle);
|
||||
|
||||
return {
|
||||
sessionId: handle.sessionId,
|
||||
@@ -1307,6 +1301,16 @@ async function failPhaseAndRun(
|
||||
attempt: number,
|
||||
reason: string,
|
||||
) {
|
||||
const sessionIdsToDispose = await markPhaseAndRunFailed(input, eventRepository, attempt, reason);
|
||||
await disposeSessionIds(input, sessionIdsToDispose);
|
||||
}
|
||||
|
||||
async function markPhaseAndRunFailed(
|
||||
input: CanonicalRunSingleFakePhaseInput,
|
||||
eventRepository: RunEventRepository,
|
||||
attempt: number,
|
||||
reason: string,
|
||||
): Promise<string[]> {
|
||||
let sessionIdsToDispose: string[] = [];
|
||||
await input.db.transaction(async (tx) => {
|
||||
await tx.execute(sql`SELECT 1 FROM ${runs} WHERE ${runs.id} = ${input.runId} FOR UPDATE`);
|
||||
@@ -1348,7 +1352,7 @@ async function failPhaseAndRun(
|
||||
input.runId,
|
||||
);
|
||||
});
|
||||
await disposeSessionIds(input.sessions, sessionIdsToDispose);
|
||||
return sessionIdsToDispose;
|
||||
}
|
||||
|
||||
async function failRunAndDisposeSession(
|
||||
@@ -1356,10 +1360,33 @@ async function failRunAndDisposeSession(
|
||||
eventRepository: RunEventRepository,
|
||||
attempt: number,
|
||||
reason: string,
|
||||
handle: { sessionId: string },
|
||||
handle: SessionHandle,
|
||||
) {
|
||||
await failPhaseAndRun(input, eventRepository, attempt, reason);
|
||||
await input.sessions.dispose(handle).catch(() => undefined);
|
||||
const sessionIdsToDispose = await markPhaseAndRunFailed(input, eventRepository, attempt, reason);
|
||||
let captureError: unknown;
|
||||
try {
|
||||
await captureTranscript(input, handle);
|
||||
} catch (error) {
|
||||
captureError = error;
|
||||
}
|
||||
|
||||
let disposeError: unknown;
|
||||
try {
|
||||
await disposeSessionIds(input, sessionIdsToDispose);
|
||||
if (!sessionIdsToDispose.includes(handle.sessionId)) {
|
||||
await input.sessions.dispose(await sessionHandleForId(input.db, handle.sessionId, handle));
|
||||
}
|
||||
} catch (error) {
|
||||
disposeError = error;
|
||||
}
|
||||
|
||||
if (captureError !== undefined) {
|
||||
throw captureError;
|
||||
}
|
||||
|
||||
if (disposeError !== undefined) {
|
||||
throw disposeError;
|
||||
}
|
||||
}
|
||||
|
||||
async function completePhaseAndRun(
|
||||
@@ -1615,11 +1642,19 @@ async function startSessionAndRecord(
|
||||
});
|
||||
return startedHandle;
|
||||
} catch (error) {
|
||||
let disposeError: unknown;
|
||||
if (handle !== undefined) {
|
||||
await input.sessions.dispose(handle);
|
||||
try {
|
||||
await input.sessions.dispose(handle);
|
||||
} catch (cleanupError) {
|
||||
disposeError = cleanupError;
|
||||
}
|
||||
}
|
||||
|
||||
if (isRunStateChanged(error)) {
|
||||
if (disposeError !== undefined) {
|
||||
throw disposeError;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
if (shouldCreateHumanGate(error)) {
|
||||
@@ -1633,13 +1668,16 @@ async function startSessionAndRecord(
|
||||
{ errorCode: error.code, recoveryHint: gateError.recoveryHint },
|
||||
sessionId,
|
||||
);
|
||||
if (disposeError !== undefined) {
|
||||
throw disposeError;
|
||||
}
|
||||
throw gateError;
|
||||
}
|
||||
|
||||
await failPhaseAndRun(input, eventRepository, attempt, "session_start_failed");
|
||||
await markSessionFailedNeedsHuman(input, eventRepository, sessionId);
|
||||
if (handle !== undefined) {
|
||||
await input.sessions.dispose(handle).catch(() => undefined);
|
||||
if (disposeError !== undefined) {
|
||||
throw disposeError;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
@@ -2230,7 +2268,7 @@ async function recoverFromArtifactTimeout(
|
||||
sessionId: string,
|
||||
): Promise<boolean> {
|
||||
const probe = await probeWithTypedError(input.sessions, { sessionId });
|
||||
if (!probe.alive || !probe.paneActive) {
|
||||
if (!probe.alive || !probe.paneActive || isBackendReadinessUnknown(probe)) {
|
||||
return false;
|
||||
}
|
||||
await setSessionStateIfRunActive(input, sessionId, "RESUMING");
|
||||
@@ -2263,6 +2301,10 @@ async function recoverFromArtifactTimeout(
|
||||
return true;
|
||||
}
|
||||
|
||||
function isBackendReadinessUnknown(probe: ProbeResult): boolean {
|
||||
return probe.hint === "tmux_liveness_only";
|
||||
}
|
||||
|
||||
async function setSessionStateIfRunActive(
|
||||
input: CanonicalRunSingleFakePhaseInput,
|
||||
sessionId: string,
|
||||
@@ -2397,12 +2439,19 @@ async function markAllSessionsFailedInTransaction(
|
||||
return sessions.map((session) => session.id);
|
||||
}
|
||||
|
||||
async function disposeSessionIds(sessions: SessionRuntime, sessionIds: readonly string[]) {
|
||||
await Promise.all(
|
||||
[...new Set(sessionIds)].map((sessionId) =>
|
||||
sessions.dispose({ sessionId }).catch(() => undefined),
|
||||
),
|
||||
);
|
||||
async function disposeSessionIds(
|
||||
input: CanonicalRunSingleFakePhaseInput,
|
||||
sessionIds: readonly string[],
|
||||
) {
|
||||
if (sessionIds.length === 0) {
|
||||
return;
|
||||
}
|
||||
const handles = await sessionHandlesFromDb(input.db, sessionIds);
|
||||
const results = await Promise.allSettled(handles.map((handle) => input.sessions.dispose(handle)));
|
||||
const failed = results.find((result) => result.status === "rejected");
|
||||
if (failed !== undefined) {
|
||||
throw failed.reason;
|
||||
}
|
||||
}
|
||||
|
||||
async function waitForArtifact(path: string, options: ArtifactWaitOptions = {}): Promise<void> {
|
||||
@@ -2702,7 +2751,13 @@ async function captureTranscript(
|
||||
) {
|
||||
const sink = input.transcriptSink ?? new TuiTranscriptRepository(input.db);
|
||||
const [session] = await input.db
|
||||
.select({ lastCaptureSeq: tuiSessions.lastCaptureSeq })
|
||||
.select({
|
||||
id: tuiSessions.id,
|
||||
lastCaptureSeq: tuiSessions.lastCaptureSeq,
|
||||
lastKnownPanePid: tuiSessions.lastKnownPanePid,
|
||||
tmuxSession: tuiSessions.tmuxSession,
|
||||
tmuxWindow: tuiSessions.tmuxWindow,
|
||||
})
|
||||
.from(tuiSessions)
|
||||
.where(eq(tuiSessions.id, handle.sessionId));
|
||||
if (session === undefined) {
|
||||
@@ -2715,12 +2770,51 @@ async function captureTranscript(
|
||||
}
|
||||
return captureAndPersistTranscript({
|
||||
adapter: input.sessions,
|
||||
handle,
|
||||
handle: sessionHandleFromRow(session),
|
||||
fromSeq: session.lastCaptureSeq,
|
||||
sink,
|
||||
});
|
||||
}
|
||||
|
||||
async function sessionHandlesFromDb(
|
||||
db: DbClient["db"],
|
||||
sessionIds: readonly string[],
|
||||
): Promise<SessionHandle[]> {
|
||||
const rows = await db
|
||||
.select({
|
||||
id: tuiSessions.id,
|
||||
lastKnownPanePid: tuiSessions.lastKnownPanePid,
|
||||
tmuxSession: tuiSessions.tmuxSession,
|
||||
tmuxWindow: tuiSessions.tmuxWindow,
|
||||
})
|
||||
.from(tuiSessions)
|
||||
.where(inArray(tuiSessions.id, [...new Set(sessionIds)]));
|
||||
return rows.map((row) => sessionHandleFromRow(row));
|
||||
}
|
||||
|
||||
async function sessionHandleForId(
|
||||
db: DbClient["db"],
|
||||
sessionId: string,
|
||||
fallback: SessionHandle,
|
||||
): Promise<SessionHandle> {
|
||||
const [handle] = await sessionHandlesFromDb(db, [sessionId]);
|
||||
return handle ?? fallback;
|
||||
}
|
||||
|
||||
function sessionHandleFromRow(session: {
|
||||
id: string;
|
||||
lastKnownPanePid: number | null;
|
||||
tmuxSession: string | null;
|
||||
tmuxWindow: string | null;
|
||||
}): SessionHandle {
|
||||
return {
|
||||
sessionId: session.id,
|
||||
...(session.lastKnownPanePid === null ? {} : { pid: session.lastKnownPanePid }),
|
||||
...(session.tmuxSession === null ? {} : { tmuxSession: session.tmuxSession }),
|
||||
...(session.tmuxWindow === null ? {} : { tmuxWindow: session.tmuxWindow }),
|
||||
};
|
||||
}
|
||||
|
||||
function sleep(ms: number, signal?: AbortSignal): Promise<void> {
|
||||
if (signal === undefined) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
|
||||
Reference in New Issue
Block a user