feat: add temporal run engine integration

This commit is contained in:
chungyeong
2026-05-13 08:39:19 +09:00
parent 78ebd5ef78
commit aa3033771a
37 changed files with 7338 additions and 224 deletions

View File

@@ -2,6 +2,7 @@ import { execFileSync } from "node:child_process";
import { randomUUID } from "node:crypto";
import {
existsSync,
mkdirSync,
mkdtempSync,
readFileSync,
realpathSync,
@@ -84,6 +85,15 @@ class PausesAfterPromptAcceptedFakeAdapter extends FakeSessionAdapter {
}
}
class DisposeCountingFakeAdapter extends FakeSessionAdapter {
disposeCalls = 0;
override async dispose(handle: Parameters<FakeSessionAdapter["dispose"]>[0]): Promise<void> {
this.disposeCalls += 1;
await super.dispose(handle);
}
}
describe("DbRunEngine", () => {
let client: DbClient | undefined;
const runIds: string[] = [];
@@ -129,6 +139,7 @@ describe("DbRunEngine", () => {
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
@@ -281,6 +292,7 @@ describe("DbRunEngine", () => {
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
@@ -357,6 +369,118 @@ describe("DbRunEngine", () => {
});
});
it("validates a prepared run replay without accepting changed start inputs", async () => {
client = createDbClient(databaseUrl);
await seedDevelopmentRegistry(client.db);
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
const repoPath = createGitRepo();
tempRoots.push(workspaceRoot, repoPath);
const engine = new DbRunEngine({
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
maxConcurrentRuns: 100,
workspaceRoot,
});
const runId = randomUUID();
const input = {
runId,
requirementsMd: "Validate replayed Temporal start input.",
repoPath,
baseBranch: "main",
scenarios: { spec: "ok" },
};
await engine.prepareRun(input);
runIds.push(runId);
await expect(engine.validatePreparedRunInput(input)).resolves.toBeUndefined();
await expect(
engine.validatePreparedRunInput({
...input,
scenarios: { spec: "timeout" },
}),
).rejects.toMatchObject({ code: "internal_state_corruption" });
});
it("rejects prepared run replay when the persisted worktree path is only a partial directory", async () => {
client = createDbClient(databaseUrl);
await seedDevelopmentRegistry(client.db);
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
const repoPath = createGitRepo();
tempRoots.push(workspaceRoot, repoPath);
const engine = new DbRunEngine({
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
maxConcurrentRuns: 100,
workspaceRoot,
});
const runId = randomUUID();
const input = {
runId,
requirementsMd: "Reject partial worktree replay.",
repoPath,
baseBranch: "main",
};
await engine.prepareRun(input);
runIds.push(runId);
const [run] = await client.db
.select({ worktreeRoot: runs.worktreeRoot })
.from(runs)
.where(eq(runs.id, runId));
expect(run).toBeDefined();
if (run === undefined) {
throw new Error("prepared run missing");
}
rmSync(run.worktreeRoot, { recursive: true, force: true });
mkdirSync(run.worktreeRoot, { recursive: true });
await expect(engine.prepareRun(input)).rejects.toMatchObject({
code: "workspace_permissions",
});
});
it("rejects prepared run replay when the persisted worktree belongs to another repo", async () => {
client = createDbClient(databaseUrl);
await seedDevelopmentRegistry(client.db);
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
const repoPath = createGitRepo();
tempRoots.push(workspaceRoot, repoPath);
const engine = new DbRunEngine({
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
maxConcurrentRuns: 100,
workspaceRoot,
});
const runId = randomUUID();
const input = {
runId,
requirementsMd: "Reject a replayed worktree that belongs to a different repo.",
repoPath,
baseBranch: "main",
};
await engine.prepareRun(input);
runIds.push(runId);
const [run] = await client.db
.select({ worktreeRoot: runs.worktreeRoot })
.from(runs)
.where(eq(runs.id, runId));
expect(run).toBeDefined();
if (run === undefined) {
throw new Error("prepared run missing");
}
rmSync(run.worktreeRoot, { recursive: true, force: true });
mkdirSync(run.worktreeRoot, { recursive: true });
execFileSync("git", ["init", "-b", `devflow/${runId}/main`], {
cwd: run.worktreeRoot,
stdio: "ignore",
});
await expect(engine.prepareRun(input)).rejects.toMatchObject({
code: "workspace_permissions",
});
});
it("enforces the configured maximum concurrent active runs", async () => {
client = createDbClient(databaseUrl);
await seedDevelopmentRegistry(client.db);
@@ -418,6 +542,7 @@ describe("DbRunEngine", () => {
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
@@ -456,7 +581,7 @@ describe("DbRunEngine", () => {
expect((await engine.getStatus(runId)).run.state).toBe("awaiting_approval");
});
it("resumes an active phase that observed a manual pause mid-mutation", async () => {
it("repairs an active phase that paused after prompt acceptance but before prompt proof", async () => {
client = createDbClient(databaseUrl);
await seedDevelopmentRegistry(client.db);
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
@@ -466,6 +591,7 @@ describe("DbRunEngine", () => {
db: client.db,
sessions: sessionRuntime(client.db, new PausesAfterPromptAcceptedFakeAdapter(client.db)),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
@@ -488,7 +614,7 @@ describe("DbRunEngine", () => {
const resumed = await engine.getStatus(runId);
expect(resumed.run.state).toBe("awaiting_approval");
expect(resumed.phases.find((phase) => phase.phaseKey === "spec")).toMatchObject({
attempts: 1,
attempts: 2,
state: "awaiting_approval",
});
expect(pendingApproval(resumed, "spec_approved")).toBeDefined();
@@ -504,6 +630,7 @@ describe("DbRunEngine", () => {
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
@@ -567,6 +694,7 @@ describe("DbRunEngine", () => {
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
@@ -614,6 +742,7 @@ describe("DbRunEngine", () => {
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
@@ -650,6 +779,7 @@ describe("DbRunEngine", () => {
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
@@ -686,6 +816,7 @@ describe("DbRunEngine", () => {
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
@@ -736,6 +867,7 @@ describe("DbRunEngine", () => {
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
@@ -815,6 +947,7 @@ describe("DbRunEngine", () => {
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
@@ -871,6 +1004,7 @@ describe("DbRunEngine", () => {
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
@@ -937,6 +1071,7 @@ describe("DbRunEngine", () => {
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
@@ -983,6 +1118,7 @@ describe("DbRunEngine", () => {
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
@@ -1025,6 +1161,7 @@ describe("DbRunEngine", () => {
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
@@ -1051,6 +1188,127 @@ describe("DbRunEngine", () => {
code: "approval_conflict",
});
});
it("does not treat a client token suffix as an approval replay", async () => {
client = createDbClient(databaseUrl);
await seedDevelopmentRegistry(client.db);
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
const repoPath = createGitRepo();
tempRoots.push(workspaceRoot, repoPath);
const engine = new DbRunEngine({
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
const { runId } = await engine.startRun({
requirementsMd: "Check approval token suffix handling.",
repoPath,
baseBranch: "main",
});
runIds.push(runId);
const [request] = await client.db
.select({ id: approvalRequests.id })
.from(approvalRequests)
.where(and(eq(approvalRequests.runId, runId), eq(approvalRequests.state, "pending")));
expect(request).toBeDefined();
if (request === undefined) {
throw new Error("approval request missing");
}
await engine.signalApproval(runId, request.id, "approve", "prefix:shared-token");
await expect(
engine.signalApproval(runId, request.id, "approve", "shared-token"),
).rejects.toMatchObject({
code: "approval_conflict",
});
});
it("replays terminal approval disposal side effects for duplicate decisions", async () => {
client = createDbClient(databaseUrl);
await seedDevelopmentRegistry(client.db);
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
const repoPath = createGitRepo();
tempRoots.push(workspaceRoot, repoPath);
const adapter = new DisposeCountingFakeAdapter({ writeDelayMs: 0 });
const engine = new DbRunEngine({
db: client.db,
sessions: sessionRuntime(client.db, adapter),
workspaceRoot,
maxConcurrentRuns: 100,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
const { runId } = await engine.startRun({
requirementsMd: "Reject and replay disposal.",
repoPath,
baseBranch: "main",
});
runIds.push(runId);
const request = pendingApproval(await engine.getStatus(runId), "spec_approved");
const clientToken = randomUUID();
await engine.signalApproval(runId, request.id, "reject", clientToken);
expect(adapter.disposeCalls).toBe(1);
await engine.signalApproval(runId, request.id, "reject", clientToken);
expect(adapter.disposeCalls).toBe(2);
await engine.replayAppliedApprovalSideEffects(runId, "reject");
expect(adapter.disposeCalls).toBe(3);
});
it("repairs missing aborted final reports during applied approval replay", async () => {
client = createDbClient(databaseUrl);
await seedDevelopmentRegistry(client.db);
const workspaceRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-workspace-")));
const repoPath = createGitRepo();
const worktreeRoot = realpathSync(mkdtempSync(join(tmpdir(), "devflow-engine-worktree-")));
tempRoots.push(workspaceRoot, repoPath, worktreeRoot);
const [template] = await client.db
.select({ hash: workflowTemplates.hash, id: workflowTemplates.id })
.from(workflowTemplates)
.where(eq(workflowTemplates.name, "development"))
.limit(1);
if (template === undefined) {
throw new Error("development template missing");
}
const runId = randomUUID();
runIds.push(runId);
await client.db.insert(runs).values({
id: runId,
templateId: template.id,
templateHash: template.hash,
state: "aborted",
repoPath,
baseBranch: "main",
worktreeRoot,
endedAt: new Date(),
finalReportPath: null,
});
const engine = new DbRunEngine({
db: client.db,
sessions: sessionRuntime(client.db, new FakeSessionAdapter({ writeDelayMs: 0 })),
workspaceRoot,
maxConcurrentRuns: 100,
});
await engine.replayAppliedApprovalSideEffects(runId, "approve");
const [run] = await client.db
.select({ finalReportPath: runs.finalReportPath })
.from(runs)
.where(eq(runs.id, runId));
expect(run?.finalReportPath).toMatch(/\.report\.md$/);
if (run?.finalReportPath === null || run?.finalReportPath === undefined) {
throw new Error("final report was not repaired");
}
expect(
JSON.parse(
readFileSync(run.finalReportPath.replace(/\.report\.md$/, ".report.json"), "utf8"),
),
).toMatchObject({ runId, status: "aborted" });
});
});
function pendingApproval(status: Awaited<ReturnType<DbRunEngine["getStatus"]>>, gateKey: string) {

View File

@@ -1,6 +1,6 @@
import { execFile } from "node:child_process";
import { createHash, randomUUID } from "node:crypto";
import { realpathSync } from "node:fs";
import { existsSync, realpathSync } from "node:fs";
import { mkdir, readFile, rename, writeFile } from "node:fs/promises";
import { dirname, join, relative, resolve } from "node:path";
import { promisify } from "node:util";
@@ -14,6 +14,7 @@ import {
Persona,
Template,
bindTemplatePersonas,
canonicalize,
hash,
validateArtifact,
} from "@devflow/core";
@@ -92,6 +93,8 @@ export interface DbRunEngineOptions {
timeoutMs?: number;
pollIntervalMs?: number;
stableMs?: number;
signal?: AbortSignal;
onPoll?: () => void;
};
}
@@ -183,16 +186,48 @@ export class DbRunEngine implements RunEngine {
}
async startRun(input: RunStartInput): Promise<{ runId: string }> {
const runId = input.runId ?? randomUUID();
const runInput = { ...input, runId };
await this.prepareRun(runInput);
try {
await this.lockBindingsForRun(runInput);
await this.advanceRunUntilBlocked(runId, { failureReason: "start_run_failed" });
} catch (error) {
if (await this.shouldPreserveHumanGateRun(runId, error)) {
return { runId };
}
await this.markRunFailedIfActive(runId, "start_run_failed");
throw error;
}
return { runId };
}
async prepareRun(input: RunStartInput): Promise<{ runId: string }> {
const runId = input.runId ?? randomUUID();
const templateName = input.templateName ?? "development";
const templateVersion = input.templateVersion ?? 1;
const repoPath = canonicalExistingPath(input.repoPath);
const worktreeRoot = await this.resolveWorktreeRoot(runId, input.worktreeRoot);
const templateRecord = await this.loadTemplate(templateName, templateVersion);
const worktreeRoot = await this.resolveWorktreeRoot(runId, input.worktreeRoot);
const inputExtra = storeEngineMetadata(input.extra, input.scenarios, input.overrides);
const existing = await this.existingRunForPrepare(runId);
if (existing !== undefined) {
this.assertPreparedRunMatches(runId, existing, {
repoPath,
baseBranch: input.baseBranch,
templateHash: templateRecord.hash,
worktreeRoot,
requirementsMd: input.requirementsMd,
objective: input.objective ?? null,
extra: inputExtra,
});
await this.ensureGitWorktree(repoPath, input.baseBranch, runId, existing.worktreeRoot);
return { runId };
}
const template = Template.parse(templateRecord.definition);
const personaRecords = await this.loadPersonas();
const personas = personaRecords.map((row) => Persona.parse(row.definition));
const inputExtra = storeEngineMetadata(input.extra, input.scenarios);
const inputHash = hash({
templateHash: templateRecord.hash,
bindings: [],
@@ -262,25 +297,100 @@ export class DbRunEngine implements RunEngine {
throw error;
}
try {
await this.lockBindings(
return { runId };
}
async validatePreparedRunInput(input: RunStartInput): Promise<void> {
const runId = input.runId;
if (runId === undefined) {
throw new DevflowError("Run id is required to validate a prepared run", {
class: "fatal",
code: "internal_state_corruption",
});
}
const templateName = input.templateName ?? "development";
const templateVersion = input.templateVersion ?? 1;
const existing = await this.existingRunForPrepare(runId);
if (existing === undefined) {
throw runNotFound(runId);
}
const templateRecord = await this.loadTemplate(templateName, templateVersion);
this.assertPreparedRunMatches(runId, existing, {
repoPath: canonicalExistingPath(input.repoPath),
baseBranch: input.baseBranch,
templateHash: templateRecord.hash,
worktreeRoot: this.expectedWorktreeRoot(runId, input.worktreeRoot),
requirementsMd: input.requirementsMd,
objective: input.objective ?? null,
extra: storeEngineMetadata(input.extra, input.scenarios, input.overrides),
});
}
async lockBindingsForRun(input: RunStartInput): Promise<void> {
const runId = input.runId;
if (runId === undefined) {
throw new DevflowError("Run id is required to lock bindings", {
class: "fatal",
code: "internal_state_corruption",
});
}
const [run] = await this.db
.select({ state: runs.state, templateHash: runs.templateHash })
.from(runs)
.where(eq(runs.id, runId))
.limit(1);
if (run === undefined) {
throw runNotFound(runId);
}
if (run.state !== "created") {
return;
}
const templateName = input.templateName ?? "development";
const templateVersion = input.templateVersion ?? 1;
const templateRecord = await this.loadTemplate(templateName, templateVersion);
if (templateRecord.hash !== run.templateHash) {
throw new DevflowError("Run template hash does not match binding input", {
class: "fatal",
code: "internal_state_corruption",
runId,
template,
templateRecord.hash,
personaRecords,
personas,
input,
});
}
const template = Template.parse(templateRecord.definition);
const personaRecords = await this.loadPersonas();
const personas = personaRecords.map((row) => Persona.parse(row.definition));
await this.lockBindings(runId, template, templateRecord.hash, personaRecords, personas, input);
}
async failRunIfActive(runId: string, reason: string): Promise<void> {
await this.markRunFailedIfActive(runId, reason);
}
async advanceRunUntilBlocked(
runId: string,
options: { resumeActivePhase?: boolean; failureReason?: string } = {},
): Promise<RunStatus> {
try {
await this.advanceRun(
runId,
options.resumeActivePhase === undefined
? {}
: { resumeActivePhase: options.resumeActivePhase },
);
await this.advanceRun(runId);
} catch (error) {
if (await this.shouldPreserveHumanGateRun(runId, error)) {
return { runId };
if (error instanceof DevflowError && error.code === "activity_cancelled") {
throw error;
}
await this.markRunFailedIfActive(runId, "start_run_failed");
if (await this.shouldPreserveHumanGateRun(runId, error)) {
return this.getStatus(runId);
}
await this.markRunFailedIfActive(runId, options.failureReason ?? "advance_run_failed");
throw error;
}
return { runId };
return this.getStatus(runId);
}
private async lockStartAttempt(
@@ -359,6 +469,66 @@ export class DbRunEngine implements RunEngine {
await this.composeFinalReportBestEffort(runId, "aborted");
}
async signalApprovalForWorkflow(
runId: string,
approvalRequestId: string,
action: ApprovalDecisionActionValue,
clientToken: string,
comment?: string,
): Promise<void> {
const parsedAction = ApprovalDecisionAction.parse(action);
await this.recordApprovalDecision(runId, approvalRequestId, parsedAction, clientToken, comment);
}
async validateApprovalSignalInput(
runId: string,
approvalRequestId: string,
action: ApprovalDecisionActionValue,
clientToken: string,
): Promise<"pending" | "applied"> {
const parsedAction = ApprovalDecisionAction.parse(action);
return this.readApprovalSignalState(runId, approvalRequestId, parsedAction, clientToken, {
allowPending: true,
allowReplayBeforeStateChecks: true,
});
}
async readApprovalSignalResult(
runId: string,
approvalRequestId: string,
action: ApprovalDecisionActionValue,
clientToken: string,
): Promise<"pending" | "applied"> {
const parsedAction = ApprovalDecisionAction.parse(action);
return this.readApprovalSignalState(runId, approvalRequestId, parsedAction, clientToken, {
allowPending: true,
allowReplayBeforeStateChecks: true,
requireOwnDecisionWhenResolved: true,
});
}
async replayAppliedApprovalSideEffects(
runId: string,
action: ApprovalDecisionActionValue,
options: { disposeSessions?: boolean } = {},
): Promise<void> {
const parsedAction = ApprovalDecisionAction.parse(action);
const shouldDisposeSessions = options.disposeSessions ?? true;
if (shouldDisposeSessions && parsedAction === "reject") {
await this.disposeSessions(await this.sessionIdsForRun(runId));
} else if (shouldDisposeSessions && parsedAction === "abort") {
await this.disposeSessions(await this.sessionIdsForRun(runId));
}
const status = await this.getStatus(runId);
if (isTerminalRunState(status.run.state)) {
await this.composeFinalReportBestEffort(
runId,
status.run.state as "completed" | "failed" | "aborted",
);
}
}
async pauseRun(runId: string): Promise<void> {
const eventRepository = new RunEventRepository(this.db);
await this.db.transaction(async (tx) => {
@@ -389,6 +559,45 @@ export class DbRunEngine implements RunEngine {
async resumeRun(runId: string): Promise<void> {
const eventRepository = new RunEventRepository(this.db);
const shouldAdvance = await this.resumeRunState(runId, eventRepository);
if (shouldAdvance) {
try {
await this.advanceRun(runId, { resumeActivePhase: true });
} catch (error) {
if (await this.shouldPreserveHumanGateRun(runId, error)) {
return;
}
await this.markRunFailedIfActive(runId, "resume_advance_failed");
throw error;
}
}
}
async validateResumeSignalInput(runId: string): Promise<void> {
await this.db.transaction(async (tx) => {
const [run] = await lockRun(tx, runId);
if (run === undefined) {
throw runNotFound(runId);
}
if (run.state !== "paused") {
return;
}
if (await hasPendingHumanRequiredGate(tx, runId)) {
throw approvalConflict(runId, "pending human-required gate must be resolved first");
}
});
}
async resumeRunForWorkflow(runId: string): Promise<void> {
const eventRepository = new RunEventRepository(this.db);
await this.resumeRunState(runId, eventRepository);
}
private async resumeRunState(
runId: string,
eventRepository: RunEventRepository,
): Promise<boolean> {
let shouldAdvance = false;
await this.db.transaction(async (tx) => {
const [run] = await lockRun(tx, runId);
@@ -413,17 +622,7 @@ export class DbRunEngine implements RunEngine {
shouldAdvance = nextState === "executing" || nextState === "planning";
});
if (shouldAdvance) {
try {
await this.advanceRun(runId, { resumeActivePhase: true });
} catch (error) {
if (await this.shouldPreserveHumanGateRun(runId, error)) {
return;
}
await this.markRunFailedIfActive(runId, "resume_advance_failed");
throw error;
}
}
return shouldAdvance;
}
async abortRun(runId: string, reason: string): Promise<void> {
@@ -464,73 +663,7 @@ export class DbRunEngine implements RunEngine {
}
async getStatus(runId: string): Promise<RunStatus> {
const [run] = await this.db
.select({
id: runs.id,
state: runs.state,
repoPath: runs.repoPath,
baseBranch: runs.baseBranch,
worktreeRoot: runs.worktreeRoot,
currentPhaseId: runs.currentPhaseId,
finalReportPath: runs.finalReportPath,
startedAt: runs.startedAt,
endedAt: runs.endedAt,
})
.from(runs)
.where(eq(runs.id, runId))
.limit(1);
if (run === undefined) {
throw runNotFound(runId);
}
const [phases, approvals, eventsTail] = await Promise.all([
this.db
.select({
id: runPhases.id,
phaseKey: runPhases.phaseKey,
seq: runPhases.seq,
state: runPhases.state,
attempts: runPhases.attempts,
})
.from(runPhases)
.where(eq(runPhases.runId, runId))
.orderBy(asc(runPhases.seq)),
this.db
.select({
id: approvalRequests.id,
phaseId: approvalRequests.phaseId,
gateKey: approvalRequests.gateKey,
state: approvalRequests.state,
})
.from(approvalRequests)
.where(eq(approvalRequests.runId, runId))
.orderBy(asc(approvalRequests.createdAt)),
this.db
.select({
id: runEvents.id,
seq: runEvents.seq,
type: runEvents.type,
payload: runEvents.payload,
ts: runEvents.ts,
})
.from(runEvents)
.where(eq(runEvents.runId, runId))
.orderBy(desc(runEvents.seq))
.limit(20),
]);
return {
run,
phases,
approvals,
eventsTail: eventsTail.reverse().map((event) => ({
id: event.id.toString(),
seq: event.seq.toString(),
type: event.type,
payload: event.payload,
ts: event.ts,
})),
};
return readRunStatus(this.db, runId);
}
private async lockBindings(
@@ -563,7 +696,7 @@ export class DbRunEngine implements RunEngine {
objective: input.objective ?? null,
repoPath: canonicalExistingPath(input.repoPath),
baseBranch: input.baseBranch,
extra: storeEngineMetadata(input.extra, input.scenarios),
extra: storeEngineMetadata(input.extra, input.scenarios, input.overrides),
});
await this.db.transaction(async (tx) => {
@@ -872,6 +1005,9 @@ export class DbRunEngine implements RunEngine {
if (existingDecision.action !== action) {
throw approvalConflict(runId, "client token already used for a different action");
}
if (action === "abort" || action === "reject") {
sessionsToDispose = await sessionIdsForRun(tx, runId);
}
return { replayed: true };
}
if (isTerminalRunState(run.state)) {
@@ -989,6 +1125,81 @@ export class DbRunEngine implements RunEngine {
return result;
}
private async readApprovalSignalState(
runId: string,
approvalRequestId: string,
action: ApprovalDecisionActionValue,
clientToken: string,
options: {
allowPending: boolean;
allowReplayBeforeStateChecks: boolean;
requireOwnDecisionWhenResolved?: boolean;
},
): Promise<"pending" | "applied"> {
return this.db.transaction(async (tx) => {
const [run] = await lockRun(tx, runId);
if (run === undefined) {
throw runNotFound(runId);
}
await tx.execute(
sql`SELECT 1 FROM ${approvalRequests} WHERE ${approvalRequests.id} = ${approvalRequestId} FOR UPDATE`,
);
const [request] = await tx
.select({
id: approvalRequests.id,
phaseId: approvalRequests.phaseId,
state: approvalRequests.state,
})
.from(approvalRequests)
.where(and(eq(approvalRequests.id, approvalRequestId), eq(approvalRequests.runId, runId)))
.limit(1);
if (request === undefined) {
throw new DevflowError("Approval request does not exist", {
class: "human_required",
code: "approval_not_found",
runId,
});
}
const existingDecision = await existingDecisionForToken(tx, approvalRequestId, clientToken);
if (existingDecision !== undefined) {
if (existingDecision.action !== action) {
throw approvalConflict(runId, "client token already used for a different action");
}
if (options.allowReplayBeforeStateChecks) {
return "applied";
}
}
if (request.state !== "pending") {
if (options.requireOwnDecisionWhenResolved === true) {
throw approvalConflict(runId, `approval_state=${request.state}`);
}
throw approvalConflict(runId, `approval_state=${request.state}`);
}
if (!options.allowPending) {
throw approvalConflict(runId, "approval decision has not been applied");
}
if (isTerminalRunState(run.state)) {
throw approvalConflict(runId, `run_state=${run.state}`);
}
if (run.state !== "awaiting_approval" && run.state !== "paused") {
throw approvalConflict(runId, `run_state=${run.state}`);
}
if (run.state === "paused") {
const resolvesHumanRequiredGate =
(action === "reject" || action === "abort") &&
(request.phaseId === null ||
(await isHumanRequiredApprovalPhase(tx, runId, request.phaseId)));
if (!resolvesHumanRequiredGate) {
throw approvalConflict(runId, "paused runs must be resumed before approval decisions");
}
}
return "pending";
});
}
private async composeFinalReport(
runId: string,
status: "completed" | "failed" | "aborted",
@@ -1553,6 +1764,28 @@ export class DbRunEngine implements RunEngine {
runId: string,
requestedWorktreeRoot?: string,
): Promise<string> {
const { runRoot, worktreeRoot } = this.expectedWorktreeRootParts(runId, requestedWorktreeRoot);
await mkdir(runRoot, { recursive: true });
const canonicalRunRoot = realpathSync(runRoot);
await mkdir(dirname(worktreeRoot), { recursive: true });
if (!isPathInsideOrEqual(worktreeRoot, canonicalRunRoot)) {
throw new DevflowError("Resolved worktree root escaped the run workspace root", {
class: "fatal",
code: "workspace_permissions",
recoveryHint: worktreeRoot,
});
}
return worktreeRoot;
}
private expectedWorktreeRoot(runId: string, requestedWorktreeRoot?: string): string {
return this.expectedWorktreeRootParts(runId, requestedWorktreeRoot).worktreeRoot;
}
private expectedWorktreeRootParts(
runId: string,
requestedWorktreeRoot?: string,
): { runRoot: string; worktreeRoot: string } {
const runRoot = join(this.workspaceRoot, runId);
const worktreeRoot = requestedWorktreeRoot ?? join(runRoot, "main");
if (!isPathInsideOrEqual(resolve(worktreeRoot), resolve(runRoot))) {
@@ -1562,18 +1795,8 @@ export class DbRunEngine implements RunEngine {
recoveryHint: worktreeRoot,
});
}
await mkdir(runRoot, { recursive: true });
const canonicalRunRoot = realpathSync(runRoot);
const resolvedWorktreeRoot = resolve(worktreeRoot);
await mkdir(dirname(resolvedWorktreeRoot), { recursive: true });
if (!isPathInsideOrEqual(resolvedWorktreeRoot, canonicalRunRoot)) {
throw new DevflowError("Resolved worktree root escaped the run workspace root", {
class: "fatal",
code: "workspace_permissions",
recoveryHint: resolvedWorktreeRoot,
});
}
return resolvedWorktreeRoot;
return { runRoot: resolve(runRoot), worktreeRoot: resolvedWorktreeRoot };
}
private async createGitWorktree(
@@ -1601,11 +1824,165 @@ export class DbRunEngine implements RunEngine {
}
}
private async ensureGitWorktree(
repoPath: string,
baseBranch: string,
runId: string,
worktreeRoot: string,
): Promise<string> {
if (existsSync(worktreeRoot)) {
return validateExistingGitWorktree(repoPath, baseBranch, runId, worktreeRoot);
}
return this.createGitWorktree(repoPath, baseBranch, runId, worktreeRoot);
}
private async existingRunForPrepare(runId: string): Promise<
| {
repoPath: string;
baseBranch: string;
templateHash: string;
worktreeRoot: string;
requirementsMd: string;
objective: unknown;
extra: unknown;
}
| undefined
> {
const [run] = await this.db
.select({
repoPath: runs.repoPath,
baseBranch: runs.baseBranch,
templateHash: runs.templateHash,
worktreeRoot: runs.worktreeRoot,
requirementsMd: runInputs.requirementsMd,
objective: runInputs.objective,
extra: runInputs.extra,
})
.from(runs)
.innerJoin(runInputs, eq(runInputs.runId, runs.id))
.where(eq(runs.id, runId))
.limit(1);
return run;
}
private assertPreparedRunMatches(
runId: string,
existing: {
repoPath: string;
baseBranch: string;
templateHash: string;
worktreeRoot: string;
requirementsMd: string;
objective: unknown;
extra: unknown;
},
expected: {
repoPath: string;
baseBranch: string;
templateHash: string;
worktreeRoot: string;
requirementsMd: string;
objective: unknown;
extra: unknown;
},
): void {
if (
existing.repoPath !== expected.repoPath ||
existing.baseBranch !== expected.baseBranch ||
existing.templateHash !== expected.templateHash ||
existing.worktreeRoot !== expected.worktreeRoot ||
existing.requirementsMd !== expected.requirementsMd ||
canonicalize(existing.objective ?? null) !== canonicalize(expected.objective ?? null) ||
canonicalize(existing.extra ?? {}) !== canonicalize(expected.extra ?? {})
) {
throw new DevflowError("Existing run does not match replayed start input", {
class: "fatal",
code: "internal_state_corruption",
runId,
});
}
}
private async disposeSessions(sessionIds: readonly string[]): Promise<void> {
await Promise.all(
sessionIds.map((sessionId) => this.sessions.dispose({ sessionId }).catch(() => undefined)),
);
}
private async sessionIdsForRun(runId: string): Promise<string[]> {
return sessionIdsForRun(this.db, runId);
}
}
export async function readRunStatus(db: Database, runId: string): Promise<RunStatus> {
const [run] = await db
.select({
id: runs.id,
state: runs.state,
repoPath: runs.repoPath,
baseBranch: runs.baseBranch,
worktreeRoot: runs.worktreeRoot,
currentPhaseId: runs.currentPhaseId,
finalReportPath: runs.finalReportPath,
startedAt: runs.startedAt,
endedAt: runs.endedAt,
})
.from(runs)
.where(eq(runs.id, runId))
.limit(1);
if (run === undefined) {
throw runNotFound(runId);
}
const [phases, approvals, eventsTail] = await Promise.all([
db
.select({
id: runPhases.id,
phaseKey: runPhases.phaseKey,
seq: runPhases.seq,
state: runPhases.state,
attempts: runPhases.attempts,
})
.from(runPhases)
.where(eq(runPhases.runId, runId))
.orderBy(asc(runPhases.seq)),
db
.select({
id: approvalRequests.id,
phaseId: approvalRequests.phaseId,
gateKey: approvalRequests.gateKey,
state: approvalRequests.state,
})
.from(approvalRequests)
.where(eq(approvalRequests.runId, runId))
.orderBy(asc(approvalRequests.createdAt)),
db
.select({
id: runEvents.id,
seq: runEvents.seq,
type: runEvents.type,
payload: runEvents.payload,
ts: runEvents.ts,
})
.from(runEvents)
.where(eq(runEvents.runId, runId))
.orderBy(desc(runEvents.seq))
.limit(20),
]);
return {
run,
phases,
approvals,
eventsTail: eventsTail.reverse().map((event) => ({
id: event.id.toString(),
seq: event.seq.toString(),
type: event.type,
payload: event.payload,
ts: event.ts,
})),
};
}
export interface M4ProcessRestartSweepOptions {
@@ -2023,7 +2400,21 @@ async function existingDecisionForToken(
})
.from(approvalDecisions)
.where(eq(approvalDecisions.approvalRequestId, approvalRequestId));
return decisions.find((decision) => decision.idempotencyKey.endsWith(`:${clientToken}`));
return decisions.find((decision) => {
const prefix = `${approvalRequestId}:${decision.action}:`;
if (!decision.idempotencyKey.startsWith(prefix)) {
return false;
}
return decision.idempotencyKey.slice(prefix.length) === clientToken;
});
}
async function sessionIdsForRun(db: TransactionDb | Database, runId: string): Promise<string[]> {
const sessions = await db
.select({ id: tuiSessions.id })
.from(tuiSessions)
.where(eq(tuiSessions.runId, runId));
return sessions.map((session) => session.id);
}
function approvalStateForAction(action: ApprovalDecisionActionValue) {
@@ -2167,10 +2558,12 @@ function invalidPhasePlan(runId: string, index: number): DevflowError {
function storeEngineMetadata(
extra: Record<string, unknown> | undefined,
scenarios: Record<string, FakePhaseScenario> | undefined,
overrides?: Partial<BindingOverrides>,
): Record<string, unknown> {
return {
...(extra ?? {}),
devflowM4: {
overrides: overrides ?? {},
scenarios: scenarios ?? {},
},
};
@@ -2251,6 +2644,74 @@ function gitChildEnv(): NodeJS.ProcessEnv {
return env;
}
async function validateExistingGitWorktree(
repoPath: string,
baseBranch: string,
runId: string,
worktreeRoot: string,
): Promise<string> {
try {
const canonicalWorktreeRoot = realpathSync(worktreeRoot);
const { stdout: topLevelStdout } = await execFileAsync(
"git",
["-C", canonicalWorktreeRoot, "rev-parse", "--show-toplevel"],
{ env: gitChildEnv(), maxBuffer: 1024 * 1024 },
);
const gitTopLevel = realpathSync(topLevelStdout.trim());
if (gitTopLevel !== canonicalWorktreeRoot) {
throw new Error(`expected ${canonicalWorktreeRoot}; got ${gitTopLevel}`);
}
const expectedBranch = `devflow/${runId}/main`;
const { stdout: branchStdout } = await execFileAsync(
"git",
["-C", canonicalWorktreeRoot, "branch", "--show-current"],
{ env: gitChildEnv(), maxBuffer: 1024 * 1024 },
);
const branch = branchStdout.trim();
if (branch !== expectedBranch) {
throw new Error(`expected branch ${expectedBranch}; got ${branch}`);
}
const { stdout: commonDirStdout } = await execFileAsync(
"git",
["-C", canonicalWorktreeRoot, "rev-parse", "--git-common-dir"],
{ env: gitChildEnv(), maxBuffer: 1024 * 1024 },
);
const { stdout: repoCommonDirStdout } = await execFileAsync(
"git",
["-C", repoPath, "rev-parse", "--git-common-dir"],
{ env: gitChildEnv(), maxBuffer: 1024 * 1024 },
);
const canonicalRepoGitDir = realpathSync(resolve(repoPath, repoCommonDirStdout.trim()));
const canonicalCommonDir = realpathSync(resolve(canonicalWorktreeRoot, commonDirStdout.trim()));
if (!isPathInsideOrEqual(canonicalCommonDir, canonicalRepoGitDir)) {
throw new Error(
`expected git common dir under ${canonicalRepoGitDir}; got ${canonicalCommonDir}`,
);
}
const { stdout: worktreeListStdout } = await execFileAsync(
"git",
["-C", repoPath, "worktree", "list", "--porcelain"],
{ env: gitChildEnv(), maxBuffer: 1024 * 1024 },
);
const registeredWorktrees = worktreeListStdout
.split("\n")
.filter((line) => line.startsWith("worktree "))
.map((line) => realpathSync(line.slice("worktree ".length)));
if (!registeredWorktrees.includes(canonicalWorktreeRoot)) {
throw new Error(`${canonicalWorktreeRoot} is not registered to ${repoPath}`);
}
return canonicalWorktreeRoot;
} catch (cause) {
throw new DevflowError("Existing worktree root is not a valid git worktree", {
class: "human_required",
code: "workspace_permissions",
runId,
recoveryHint: `worktree=${worktreeRoot};repo=${repoPath};base=${baseBranch}`,
cause,
});
}
}
const gitLocalEnvKeys = [
"GIT_ALTERNATE_OBJECT_DIRECTORIES",
"GIT_CONFIG",

View File

@@ -1,8 +1,16 @@
import { randomUUID } from "node:crypto";
import { mkdirSync, mkdtempSync, realpathSync, rmSync, symlinkSync, writeFileSync } from "node:fs";
import {
mkdirSync,
mkdtempSync,
readFileSync,
realpathSync,
rmSync,
symlinkSync,
writeFileSync,
} from "node:fs";
import { tmpdir } from "node:os";
import { dirname, join } from "node:path";
import { eq, inArray } from "drizzle-orm";
import { and, eq, inArray } from "drizzle-orm";
import { afterEach, describe, expect, it } from "vitest";
import { DevflowError, hash } from "@devflow/core";
@@ -190,6 +198,35 @@ class AcceptedThenTransientFakeAdapter extends FakeSessionAdapter {
}
}
class SendCountingFakeAdapter extends FakeSessionAdapter {
sendAttempts = 0;
override async sendPrompt(
handle: SessionHandle,
envelope: Parameters<FakeSessionAdapter["sendPrompt"]>[1],
): Promise<{ promptId: string }> {
this.sendAttempts += 1;
return super.sendPrompt(handle, envelope);
}
}
class StartObservesPersistedSessionFakeAdapter extends FakeSessionAdapter {
observedSessionRowsBeforeStart: number | undefined;
constructor(private readonly db: DbClient["db"]) {
super({ writeDelayMs: 0 });
}
override async start(input: StartInput): Promise<SessionHandle> {
const sessions = await this.db
.select({ id: tuiSessions.id })
.from(tuiSessions)
.where(and(eq(tuiSessions.runId, input.runId), eq(tuiSessions.roleId, input.roleId)));
this.observedSessionRowsBeforeStart = sessions.length;
return super.start(input);
}
}
class CaptureCursorFakeAdapter extends FakeSessionAdapter {
capturedFromSeq: bigint | undefined;
@@ -793,7 +830,7 @@ describe("runSingleFakePhase", () => {
]);
});
it("resumes a running phase when prompt delivery succeeded before prompt.sent was recorded", async () => {
it("does not trust a running phase artifact when prompt.sent was not recorded", async () => {
const { db, phaseId, runId } = await createRunAndPhase("executing", "running", 1);
await recordPhaseStarted(db, runId, phaseId);
const worktreeRoot = realpathSync(
@@ -803,7 +840,7 @@ describe("runSingleFakePhase", () => {
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
const instructions = "Scenario: ok\nWrite the development specification.";
const sessionId = randomUUID();
const adapter = new FakeSessionAdapter({
const adapter = new SendCountingFakeAdapter({
sessionIdFactory: () => sessionId,
writeDelayMs: 0,
});
@@ -864,6 +901,7 @@ describe("runSingleFakePhase", () => {
});
expect(result.artifactValid).toBe(true);
expect(adapter.sendAttempts).toBe(2);
await expectRunCompleted(db, runId);
const events = await db
@@ -871,12 +909,448 @@ describe("runSingleFakePhase", () => {
.from(runEvents)
.where(eq(runEvents.runId, runId))
.orderBy(runEvents.seq);
expect(events.map((event) => event.type)).toContain("prompt.sent");
expect(events.map((event) => event.type)).not.toContain("prompt.sent");
expect(events.map((event) => event.type)).toContain("prompt.repaired");
expect(events.map((event) => event.type).filter((type) => type === "phase.started")).toEqual([
"phase.started",
"phase.started",
]);
});
it("waits on a BUSY prompt with no prompt event instead of resending it", async () => {
const { db, phaseId, runId } = await createRunAndPhase("executing", "running", 1);
await recordPhaseStarted(db, runId, phaseId);
const worktreeRoot = realpathSync(
mkdtempSync(join(tmpdir(), "devflow-fake-phase-pre-send-replay-")),
);
tempRoots.push(worktreeRoot);
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
const instructions = "Scenario: ok\nWrite the development specification.";
const sessionId = randomUUID();
const adapter = new SendCountingFakeAdapter({
sessionIdFactory: () => sessionId,
writeDelayMs: 0,
});
await adapter.start({
runId,
roleId: "implementer",
backend: "fake",
cwd: worktreeRoot,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
});
const dedupKey = hash({
attempt: 1,
expectedArtifact: expectedArtifactPath,
expectedSchema: "dev/spec@1",
instructions,
phaseKey: "implement",
roleId: "implementer",
runId,
});
await db.insert(tuiSessions).values({
id: sessionId,
runId,
roleId: "implementer",
backend: "fake",
cwd: worktreeRoot,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
lastPromptHash: dedupKey,
lastPromptAt: new Date(),
state: "BUSY",
});
const result = await runSingleFakePhase({
adapter,
db,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
instructions,
phaseId,
phaseKey: "implement",
roleId: "implementer",
runId,
worktreeRoot,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
uuidFactory: () => "00000000-0000-4000-8000-000000000041",
});
expect(result.artifactValid).toBe(true);
expect(adapter.sendAttempts).toBe(1);
await expectRunCompleted(db, runId);
const events = await db
.select({ type: runEvents.type })
.from(runEvents)
.where(eq(runEvents.runId, runId))
.orderBy(runEvents.seq);
expect(events.map((event) => event.type)).not.toContain("prompt.sent");
expect(events.map((event) => event.type)).toContain("prompt.repaired");
});
it("restarts a bootstrapping phantom session instead of sending to it", async () => {
const { db, phaseId, runId } = await createRunAndPhase("executing", "running", 1);
await recordPhaseStarted(db, runId, phaseId);
const worktreeRoot = realpathSync(
mkdtempSync(join(tmpdir(), "devflow-fake-phase-bootstrapping-replay-")),
);
tempRoots.push(worktreeRoot);
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
const sessionId = randomUUID();
await db.insert(tuiSessions).values({
id: sessionId,
runId,
roleId: "implementer",
backend: "fake",
cwd: worktreeRoot,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
state: "BOOTSTRAPPING",
});
const adapter = new SendCountingFakeAdapter({ writeDelayMs: 0 });
const result = await runSingleFakePhase({
adapter,
db,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
instructions: "Scenario: ok\nWrite the development specification.",
phaseId,
phaseKey: "implement",
roleId: "implementer",
runId,
worktreeRoot,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
uuidFactory: () => "00000000-0000-4000-8000-000000000042",
});
expect(result).toMatchObject({ artifactValid: true, sessionId });
expect(adapter.sendAttempts).toBe(1);
await expectRunCompleted(db, runId);
});
it("persists the session row only after adapter start succeeds", async () => {
const { db, phaseId, runId } = await createRunAndPhase();
const worktreeRoot = realpathSync(
mkdtempSync(join(tmpdir(), "devflow-fake-phase-session-post-start-")),
);
tempRoots.push(worktreeRoot);
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
const adapter = new StartObservesPersistedSessionFakeAdapter(db);
const result = await runSingleFakePhase({
adapter,
db,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
instructions: "Scenario: ok\nWrite the development specification.",
phaseId,
phaseKey: "implement",
roleId: "implementer",
runId,
worktreeRoot,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
expect(result.artifactValid).toBe(true);
expect(adapter.observedSessionRowsBeforeStart).toBe(0);
const sessions = await db
.select({ id: tuiSessions.id, state: tuiSessions.state })
.from(tuiSessions)
.where(eq(tuiSessions.runId, runId));
expect(sessions).toEqual([{ id: result.sessionId, state: "READY" }]);
});
it("does not validate a stale artifact from a running READY replay without prompt proof", async () => {
const { db, phaseId, runId } = await createRunAndPhase("executing", "running", 1);
await recordPhaseStarted(db, runId, phaseId);
const worktreeRoot = realpathSync(
mkdtempSync(join(tmpdir(), "devflow-fake-phase-ready-stale-")),
);
tempRoots.push(worktreeRoot);
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
const sessionId = randomUUID();
const adapter = new SendCountingFakeAdapter({
sessionIdFactory: () => sessionId,
writeDelayMs: 0,
});
await adapter.start({
sessionId,
runId,
roleId: "implementer",
backend: "fake",
cwd: worktreeRoot,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
});
await db.insert(tuiSessions).values({
id: sessionId,
runId,
roleId: "implementer",
backend: "fake",
cwd: worktreeRoot,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
state: "READY",
});
mkdirSync(dirname(expectedArtifactPath), { recursive: true });
writeFileSync(
expectedArtifactPath,
JSON.stringify({
summary: "Stale development specification",
requirements: [{ id: "REQ-STALE", description: "This file predates prompt proof" }],
acceptanceCriteria: ["This artifact must not be accepted"],
risks: [],
}),
);
const result = await runSingleFakePhase({
adapter,
db,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
instructions: "Scenario: ok\nWrite the development specification.",
phaseId,
phaseKey: "implement",
roleId: "implementer",
runId,
worktreeRoot,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
});
expect(result.artifactValid).toBe(true);
expect(adapter.sendAttempts).toBe(1);
const artifact = JSON.parse(readFileSync(expectedArtifactPath, "utf8")) as { summary: string };
expect(artifact.summary).toBe("Fake development specification");
});
it("does not validate a stale artifact from a running BUSY replay without prompt proof", async () => {
const { db, phaseId, runId } = await createRunAndPhase("executing", "running", 1);
await recordPhaseStarted(db, runId, phaseId);
const worktreeRoot = realpathSync(
mkdtempSync(join(tmpdir(), "devflow-fake-phase-busy-stale-")),
);
tempRoots.push(worktreeRoot);
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
const instructions =
"Scenario: timeout\nRepair-Scenario: timeout\nDo not accept stale artifact content.";
const sessionId = randomUUID();
const dedupKey = hash({
attempt: 1,
expectedArtifact: expectedArtifactPath,
expectedSchema: "dev/spec@1",
instructions,
phaseKey: "implement",
roleId: "implementer",
runId,
});
const adapter = new FakeSessionAdapter({
sessionIdFactory: () => sessionId,
writeDelayMs: 0,
});
await adapter.start({
sessionId,
runId,
roleId: "implementer",
backend: "fake",
cwd: worktreeRoot,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
});
await db.insert(tuiSessions).values({
id: sessionId,
runId,
roleId: "implementer",
backend: "fake",
cwd: worktreeRoot,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
lastPromptHash: dedupKey,
lastPromptAt: new Date(),
state: "BUSY",
});
mkdirSync(dirname(expectedArtifactPath), { recursive: true });
writeFileSync(
expectedArtifactPath,
JSON.stringify({
summary: "Stale development specification",
requirements: [{ id: "REQ-STALE", description: "This file predates prompt proof" }],
acceptanceCriteria: ["This artifact must not be accepted"],
risks: [],
}),
);
await expect(
runSingleFakePhase({
adapter,
db,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
instructions,
phaseId,
phaseKey: "implement",
roleId: "implementer",
runId,
worktreeRoot,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 10 },
}),
).rejects.toMatchObject({ code: "artifact_timeout_exhausted" });
await expectRunPaused(db, runId);
const artifactRows = await db.select().from(artifacts).where(eq(artifacts.runId, runId));
expect(artifactRows).toEqual([]);
});
it("does not turn a baseline-protected BUSY replay into durable prompt proof", async () => {
const { db, phaseId, runId } = await createRunAndPhase("executing", "awaiting_artifact", 1);
await recordPhaseStarted(db, runId, phaseId);
const worktreeRoot = realpathSync(
mkdtempSync(join(tmpdir(), "devflow-fake-phase-busy-baseline-durable-")),
);
tempRoots.push(worktreeRoot);
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
const instructions =
"Scenario: timeout\nRepair-Scenario: timeout\nDo not persist synthetic prompt proof.";
const sessionId = randomUUID();
const dedupKey = hash({
attempt: 1,
expectedArtifact: expectedArtifactPath,
expectedSchema: "dev/spec@1",
instructions,
phaseKey: "implement",
roleId: "implementer",
runId,
});
const adapter = new FakeSessionAdapter({
sessionIdFactory: () => sessionId,
writeDelayMs: 0,
});
await adapter.start({
sessionId,
runId,
roleId: "implementer",
backend: "fake",
cwd: worktreeRoot,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
});
await db.insert(tuiSessions).values({
id: sessionId,
runId,
roleId: "implementer",
backend: "fake",
cwd: worktreeRoot,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
lastPromptHash: dedupKey,
lastPromptAt: new Date(),
state: "BUSY",
});
mkdirSync(dirname(expectedArtifactPath), { recursive: true });
writeFileSync(
expectedArtifactPath,
JSON.stringify({
summary: "STALE accepted by replay",
requirements: [{ id: "REQ-STALE", description: "This file predates prompt proof" }],
acceptanceCriteria: ["This artifact must not be accepted"],
risks: [],
}),
);
await db.insert(runEvents).values({
runId,
phaseId,
seq: 2n,
type: "artifact.expected",
payload: { path: expectedArtifactPath, schemaId: "dev/spec@1", attempt: 1 },
idempotencyKey: `artifact.expected:${phaseId}:1:${expectedArtifactPath}`,
});
await expect(
runSingleFakePhase({
adapter,
db,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
instructions,
phaseId,
phaseKey: "implement",
roleId: "implementer",
runId,
worktreeRoot,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 10 },
}),
).rejects.toMatchObject({ code: "artifact_timeout_exhausted" });
await expectRunPaused(db, runId);
const artifactRows = await db.select().from(artifacts).where(eq(artifacts.runId, runId));
expect(artifactRows).toEqual([]);
const promptEvents = await db
.select({ type: runEvents.type })
.from(runEvents)
.where(eq(runEvents.runId, runId));
expect(promptEvents.map((event) => event.type)).not.toContain("prompt.sent");
});
it("does not fail the run when artifact wait is cancelled for workflow signal handling", async () => {
const { db, phaseId, runId } = await createRunAndPhase();
const worktreeRoot = realpathSync(
mkdtempSync(join(tmpdir(), "devflow-fake-phase-cancelled-wait-")),
);
tempRoots.push(worktreeRoot);
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
const controller = new AbortController();
let abortScheduled = false;
await expect(
runSingleFakePhase({
adapter: new FakeSessionAdapter({ writeDelayMs: 0 }),
db,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
instructions: "Scenario: timeout\nWait until the workflow signal cancels this activity.",
phaseId,
phaseKey: "implement",
roleId: "implementer",
runId,
worktreeRoot,
wait: {
pollIntervalMs: 1,
stableMs: 0,
timeoutMs: 500,
signal: controller.signal,
onPoll: () => {
if (!abortScheduled) {
abortScheduled = true;
setTimeout(() => controller.abort(new Error("workflow signal arrived")), 0);
}
},
},
}),
).rejects.toMatchObject({ code: "activity_cancelled" });
const [run] = await db.select({ state: runs.state }).from(runs).where(eq(runs.id, runId));
const [phase] = await db
.select({ state: runPhases.state })
.from(runPhases)
.where(eq(runPhases.id, phaseId));
const [session] = await db
.select({ lastCaptureSeq: tuiSessions.lastCaptureSeq })
.from(tuiSessions)
.where(eq(tuiSessions.runId, runId));
expect(run?.state).toBe("executing");
expect(phase?.state).toBe("awaiting_artifact");
expect(session?.lastCaptureSeq).toBeGreaterThan(0n);
const events = await db
.select({ type: runEvents.type })
.from(runEvents)
.where(eq(runEvents.runId, runId))
.orderBy(runEvents.seq);
expect(events.map((event) => event.type)).not.toContain("phase.failed");
expect(events.map((event) => event.type)).not.toContain("run.failed");
});
it("requests a human gate when existing session resume exhausts retries", async () => {
const { db, phaseId, runId } = await createRunAndPhase();
const worktreeRoot = realpathSync(
@@ -933,8 +1407,9 @@ describe("runSingleFakePhase", () => {
tempRoots.push(worktreeRoot);
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
const adapter = new SendCountingFakeAdapter({ writeDelayMs: 0 });
const result = await runSingleFakePhase({
adapter: new FakeSessionAdapter({ writeDelayMs: 0 }),
adapter,
db,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
@@ -1001,8 +1476,9 @@ describe("runSingleFakePhase", () => {
}),
);
const adapter = new SendCountingFakeAdapter({ writeDelayMs: 0 });
const result = await runSingleFakePhase({
adapter: new FakeSessionAdapter({ writeDelayMs: 0 }),
adapter,
db,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
@@ -1016,6 +1492,7 @@ describe("runSingleFakePhase", () => {
});
expect(result).toMatchObject({ artifactValid: true, promptId, sessionId });
expect(adapter.sendAttempts).toBe(0);
await expectRunCompleted(db, runId);
const events = await db
@@ -1023,7 +1500,7 @@ describe("runSingleFakePhase", () => {
.from(runEvents)
.where(eq(runEvents.runId, runId))
.orderBy(runEvents.seq);
expect(events.map((event) => event.type)).not.toContain("prompt.sent");
expect(events.map((event) => event.type)).toContain("prompt.sent");
expect(events.map((event) => event.type)).toContain("artifact.expected");
expect(events.map((event) => event.type)).toContain("artifact.validated");
});
@@ -1384,6 +1861,77 @@ describe("runSingleFakePhase", () => {
expect(events.filter((event) => event.type === "phase.started")).toHaveLength(1);
});
it("does not validate a stale prior artifact before a repair prompt is sent", async () => {
const { db, phaseId, runId } = await createRunAndPhase("executing", "running", 2);
await recordPhaseStarted(db, runId, phaseId, 2, true);
const worktreeRoot = realpathSync(
mkdtempSync(join(tmpdir(), "devflow-fake-phase-repair-stale-running-")),
);
tempRoots.push(worktreeRoot);
const expectedArtifactPath = join(worktreeRoot, "artifacts", "spec.json");
mkdirSync(dirname(expectedArtifactPath), { recursive: true });
writeFileSync(expectedArtifactPath, JSON.stringify({ fake: "stale-invalid" }));
const instructions =
"Scenario: invalid\nRepair-Scenario: ok\nWrite the development specification.";
const priorPromptId = hash({
attempt: 1,
expectedArtifact: expectedArtifactPath,
expectedSchema: "dev/spec@1",
instructions,
phaseKey: "implement",
roleId: "implementer",
runId,
});
const sessionId = randomUUID();
const adapter = new SendCountingFakeAdapter({
sessionIdFactory: () => sessionId,
writeDelayMs: 0,
});
await adapter.start({
runId,
roleId: "implementer",
backend: "fake",
cwd: worktreeRoot,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
});
await db.insert(tuiSessions).values({
id: sessionId,
runId,
roleId: "implementer",
backend: "fake",
cwd: worktreeRoot,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
lastPromptHash: priorPromptId,
lastPromptAt: new Date(),
state: "READY",
});
const result = await runSingleFakePhase({
adapter,
db,
expectedArtifactPath,
expectedSchema: "dev/spec@1",
instructions,
phaseId,
phaseKey: "implement",
roleId: "implementer",
runId,
worktreeRoot,
wait: { pollIntervalMs: 1, stableMs: 0, timeoutMs: 500 },
uuidFactory: () => "00000000-0000-4000-8000-000000000043",
});
expect(result.artifactValid).toBe(true);
expect(adapter.sendAttempts).toBe(1);
const invalidArtifacts = await db
.select({ valid: artifacts.valid })
.from(artifacts)
.where(and(eq(artifacts.runId, runId), eq(artifacts.valid, false)));
expect(invalidArtifacts).toEqual([]);
});
it("resumes a repair attempt while awaiting its artifact", async () => {
const { db, phaseId, runId } = await createRunAndPhase("executing", "awaiting_artifact", 2);
await recordPhaseStarted(db, runId, phaseId, 2, true);
@@ -1758,8 +2306,11 @@ describe("runSingleFakePhase", () => {
.where(eq(approvalRequests.runId, runId));
expect(approval).toEqual({ gateKey: "backend_unavailable", state: "pending" });
const sessions = await db.select().from(tuiSessions).where(eq(tuiSessions.runId, runId));
expect(sessions).toEqual([]);
const sessions = await db
.select({ state: tuiSessions.state })
.from(tuiSessions)
.where(eq(tuiSessions.runId, runId));
expect(sessions).toEqual([{ state: "FAILED_NEEDS_HUMAN" }]);
const events = await db
.select({ type: runEvents.type })
@@ -1770,6 +2321,7 @@ describe("runSingleFakePhase", () => {
"phase.started",
"phase.failed",
"run.paused",
"session.failed",
"approval.requested",
]);
});
@@ -1829,6 +2381,7 @@ describe("runSingleFakePhase", () => {
"phase.started",
"phase.failed",
"run.failed",
"session.failed",
]);
});

View File

@@ -35,6 +35,8 @@ export interface FakePhaseWaitOptions {
timeoutMs?: number;
pollIntervalMs?: number;
stableMs?: number;
signal?: AbortSignal;
onPoll?: () => void;
}
interface ArtifactWaitOptions extends FakePhaseWaitOptions {
@@ -63,6 +65,7 @@ export type RunSingleFakePhaseInput = RunSingleFakePhaseBaseInput &
({ sessions: SessionRuntime; adapter?: never } | { adapter: SessionAdapter; sessions?: never });
type CanonicalRunSingleFakePhaseInput = RunSingleFakePhaseBaseInput & {
reserveSessionId?: () => string;
sessions: SessionRuntime;
};
@@ -81,11 +84,17 @@ const sendPromptRetryBudget = 2;
const terminalRunStates = ["completed", "failed", "aborted"] as const;
const phaseMutationRunStates = ["executing", "planning"] as const;
interface SessionIdReservable {
reserveSessionId(): string;
}
interface PhaseEntry {
attempt: number;
continueArtifactWait: boolean;
continueValidation: boolean;
artifactBaselineSignature?: string | undefined;
promptId?: string;
recordPromptEventOnReplay?: boolean;
repairAttemptUsed: boolean;
replayedOutcome?: ArtifactOutcome;
resumedPrompt: boolean;
@@ -106,8 +115,19 @@ function canonicalizeRunSingleFakePhaseInput(
"sessions" in input && input.sessions !== undefined
? input.sessions
: new SessionManager({ db: input.db, adapter: input.adapter });
const adapter = "adapter" in input ? input.adapter : undefined;
const reserveSessionId =
adapter !== undefined && isSessionIdReservable(adapter)
? () => adapter.reserveSessionId()
: undefined;
return { ...input, expectedArtifactPath, sessions, worktreeRoot };
return {
...input,
expectedArtifactPath,
...(reserveSessionId === undefined ? {} : { reserveSessionId }),
sessions,
worktreeRoot,
};
}
function canonicalizePathAgainstWorktree(
@@ -140,6 +160,15 @@ function canonicalizePossiblyMissingPath(path: string): string {
return resolve(realpathSync(current), ...missingSegments);
}
function isSessionIdReservable(
adapter: SessionAdapter,
): adapter is SessionAdapter & SessionIdReservable {
return (
"reserveSessionId" in adapter &&
typeof (adapter as Partial<SessionIdReservable>).reserveSessionId === "function"
);
}
export async function runSingleFakePhase(
rawInput: RunSingleFakePhaseInput,
): Promise<RunSingleFakePhaseResult> {
@@ -184,10 +213,14 @@ export async function runSingleFakePhase(
} else if (phaseEntry.continueArtifactWait) {
promptId = requirePhaseEntryPromptId(input, phaseEntry, "Artifact wait replay");
promptDedupKeyForIdle = promptId;
promptSend = { promptId, artifactBaselineSignature: undefined };
if (phaseEntry.recordPromptEventOnReplay === true) {
await recordPromptEventIfMissing(input, eventRepository, promptEventType, envelope);
}
promptSend = { promptId, artifactBaselineSignature: phaseEntry.artifactBaselineSignature };
} else if (phaseEntry.continueValidation) {
promptId = requirePhaseEntryPromptId(input, phaseEntry, "Artifact validation replay");
promptDedupKeyForIdle = promptId;
await recordPromptEventIfMissing(input, eventRepository, promptEventType, envelope);
} else {
try {
promptSend = await sendPromptAndRecord(
@@ -250,6 +283,10 @@ export async function runSingleFakePhase(
await captureTranscript(input, handle);
throw error;
}
if (isActivityCancelled(error)) {
await captureTranscript(input, handle);
throw error;
}
if (!isDevflowErrorWithCode(error, "artifact_timeout_exhausted")) {
await failRunAndDisposeSession(
input,
@@ -415,6 +452,10 @@ export async function runSingleFakePhase(
await captureTranscript(input, handle);
throw repairError;
}
if (isActivityCancelled(repairError)) {
await captureTranscript(input, handle);
throw repairError;
}
if (!isDevflowErrorWithCode(repairError, "artifact_timeout_exhausted")) {
await failRunAndDisposeSession(
input,
@@ -565,6 +606,10 @@ export async function runSingleFakePhase(
await captureTranscript(input, handle);
throw error;
}
if (isActivityCancelled(error)) {
await captureTranscript(input, handle);
throw error;
}
if (!isDevflowErrorWithCode(error, "artifact_timeout_exhausted")) {
await failRunAndDisposeSession(
input,
@@ -711,13 +756,31 @@ async function enterInitialPhase(
};
}
if (["CREATED", "BOOTSTRAPPING", "READY"].includes(session.state)) {
const promptEventAlreadyRecorded = await promptEventExists(
input,
phaseStart.repairAttemptUsed ? "prompt.repaired" : "prompt.sent",
envelope.dedupKey,
);
if (
promptEventAlreadyRecorded &&
(await artifactSignature(input.expectedArtifactPath)) !== undefined
) {
return {
attempt: phase.attempts,
continueArtifactWait: false,
continueValidation: true,
promptId: envelope.dedupKey,
repairAttemptUsed: phaseStart.repairAttemptUsed,
resumedPrompt: true,
handle: { sessionId: session.id },
};
}
return {
attempt: phase.attempts,
continueArtifactWait: false,
continueValidation: false,
repairAttemptUsed: phaseStart.repairAttemptUsed,
resumedPrompt: false,
handle: { sessionId: session.id },
};
}
if (
@@ -726,10 +789,29 @@ async function enterInitialPhase(
session.expectedArtifactPath === input.expectedArtifactPath &&
session.expectedSchema === input.expectedSchema
) {
if (
!(await promptEventExists(
input,
phaseStart.repairAttemptUsed ? "prompt.repaired" : "prompt.sent",
envelope.dedupKey,
))
) {
return {
attempt: phase.attempts,
continueArtifactWait: true,
continueValidation: false,
artifactBaselineSignature: await artifactSignature(input.expectedArtifactPath),
promptId: envelope.dedupKey,
repairAttemptUsed: phaseStart.repairAttemptUsed,
resumedPrompt: true,
handle: { sessionId: session.id },
};
}
return {
attempt: phase.attempts,
continueArtifactWait: false,
continueArtifactWait: true,
continueValidation: false,
promptId: session.lastPromptHash,
repairAttemptUsed: phaseStart.repairAttemptUsed,
resumedPrompt: true,
handle: { sessionId: session.id },
@@ -764,11 +846,21 @@ async function enterInitialPhase(
session.expectedArtifactPath === input.expectedArtifactPath &&
session.expectedSchema === input.expectedSchema
) {
const currentPromptEventExists = await promptEventExists(
input,
phaseStart.repairAttemptUsed ? "prompt.repaired" : "prompt.sent",
envelope.dedupKey,
);
const artifactWaitEventExists = await artifactExpectedEventExists(input, phase.attempts);
return {
attempt: phase.attempts,
continueArtifactWait: true,
continueValidation: false,
...(currentPromptEventExists || !artifactWaitEventExists
? {}
: { artifactBaselineSignature: await artifactSignature(input.expectedArtifactPath) }),
promptId: session.lastPromptHash,
recordPromptEventOnReplay: !currentPromptEventExists && !artifactWaitEventExists,
repairAttemptUsed: phaseStart.repairAttemptUsed,
resumedPrompt: true,
handle: { sessionId: session.id },
@@ -1166,6 +1258,19 @@ async function failPhaseAndRequestGate(
}
if (sessionId !== undefined) {
await tx
.insert(tuiSessions)
.values({
id: sessionId,
runId: input.runId,
roleId: input.roleId,
backend: "fake",
cwd: input.worktreeRoot,
expectedArtifactPath: input.expectedArtifactPath,
expectedSchema: input.expectedSchema,
state: "FAILED_NEEDS_HUMAN",
})
.onConflictDoNothing({ target: tuiSessions.id });
await tx
.update(tuiSessions)
.set({ state: "FAILED_NEEDS_HUMAN" })
@@ -1437,15 +1542,22 @@ async function startSessionAndRecord(
eventRepository: RunEventRepository,
attempt: number,
): Promise<SessionHandle> {
const existingHandle = await resumeExistingSessionAndRecord(input, eventRepository, attempt);
if (existingHandle !== undefined) {
return existingHandle;
const existingSession = await sessionForRole(input);
if (
existingSession !== undefined &&
!["CREATED", "BOOTSTRAPPING"].includes(existingSession.state)
) {
const existingHandle = await resumeExistingSessionAndRecord(input, eventRepository, attempt);
if (existingHandle !== undefined) {
return existingHandle;
}
}
const sessionId = existingSession?.id ?? input.reserveSessionId?.() ?? randomUUID();
let handle: SessionHandle | undefined;
let sessionRowPersisted = false;
try {
handle = await input.sessions.start({
sessionId,
runId: input.runId,
roleId: input.roleId,
backend: "fake",
@@ -1454,10 +1566,18 @@ async function startSessionAndRecord(
expectedSchema: input.expectedSchema,
});
const startedHandle = handle;
let sessionInsertConflicted = false;
if (startedHandle.sessionId !== sessionId) {
throw new DevflowError("Session adapter did not honor reserved session id", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
recoveryHint: `expected=${sessionId};actual=${startedHandle.sessionId}`,
});
}
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
const insertedSession = await tx
await tx
.insert(tuiSessions)
.values({
id: startedHandle.sessionId,
@@ -1467,14 +1587,9 @@ async function startSessionAndRecord(
cwd: input.worktreeRoot,
expectedArtifactPath: input.expectedArtifactPath,
expectedSchema: input.expectedSchema,
state: "CREATED",
state: "BOOTSTRAPPING",
})
.onConflictDoNothing({ target: [tuiSessions.runId, tuiSessions.roleId] })
.returning({ id: tuiSessions.id });
if (insertedSession[0] === undefined) {
sessionInsertConflicted = true;
return;
}
.onConflictDoNothing({ target: tuiSessions.id });
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
@@ -1498,21 +1613,6 @@ async function startSessionAndRecord(
idempotencyKey: `session.ready:${startedHandle.sessionId}:0`,
});
});
if (sessionInsertConflicted) {
await input.sessions.dispose(startedHandle).catch(() => undefined);
handle = undefined;
const existingHandle = await resumeExistingSessionAndRecord(input, eventRepository, attempt);
if (existingHandle !== undefined) {
return existingHandle;
}
throw new DevflowError("Concurrent fake session insert conflicted without an existing row", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
sessionRowPersisted = true;
return startedHandle;
} catch (error) {
if (handle !== undefined) {
@@ -1531,19 +1631,35 @@ async function startSessionAndRecord(
"session_start_failed",
gateError.code,
{ errorCode: error.code, recoveryHint: gateError.recoveryHint },
sessionRowPersisted ? handle?.sessionId : undefined,
sessionId,
);
throw gateError;
}
await failPhaseAndRun(input, eventRepository, attempt, "session_start_failed");
if (sessionRowPersisted && handle !== undefined) {
await markSessionFailedNeedsHuman(input, eventRepository, handle.sessionId);
await markSessionFailedNeedsHuman(input, eventRepository, sessionId);
if (handle !== undefined) {
await input.sessions.dispose(handle).catch(() => undefined);
}
throw error;
}
}
async function sessionForRole(input: CanonicalRunSingleFakePhaseInput): Promise<
| {
id: string;
state: string;
}
| undefined
> {
const [session] = await input.db
.select({ id: tuiSessions.id, state: tuiSessions.state })
.from(tuiSessions)
.where(and(eq(tuiSessions.runId, input.runId), eq(tuiSessions.roleId, input.roleId)))
.limit(1);
return session;
}
async function resumeExistingSessionAndRecord(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
@@ -1709,6 +1825,14 @@ async function sendPromptAndRecord(
type: "prompt.sent" | "prompt.repaired",
options: SendPromptAndRecordOptions = {},
): Promise<PromptSendRecord> {
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
});
const artifactBaselineSignature =
options.captureArtifactBaseline === false
? undefined
: await artifactSignature(input.expectedArtifactPath);
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
await tx
@@ -1730,11 +1854,6 @@ async function sendPromptAndRecord(
idempotencyKey: `session.busy:${handle.sessionId}:${envelope.dedupKey}`,
});
});
const artifactBaselineSignature =
options.captureArtifactBaseline === false
? undefined
: await artifactSignature(input.expectedArtifactPath);
const prompt = await sendPromptWithRetry(input.sessions, handle, envelope);
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
@@ -1750,6 +1869,66 @@ async function sendPromptAndRecord(
return { promptId: prompt.promptId, artifactBaselineSignature };
}
async function recordPromptEventIfMissing(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
type: "prompt.sent" | "prompt.repaired",
envelope: PromptEnvelope,
): Promise<void> {
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type,
payload: { roleId: input.roleId, dedupKey: envelope.dedupKey },
idempotencyKey: `${type}:${envelope.dedupKey}`,
});
});
}
async function promptEventExists(
input: CanonicalRunSingleFakePhaseInput,
type: "prompt.sent" | "prompt.repaired",
dedupKey: string,
): Promise<boolean> {
const [event] = await input.db
.select({ id: runEvents.id })
.from(runEvents)
.where(
and(
eq(runEvents.runId, input.runId),
eq(runEvents.phaseId, input.phaseId),
eq(runEvents.type, type),
eq(runEvents.idempotencyKey, `${type}:${dedupKey}`),
),
)
.limit(1);
return event !== undefined;
}
async function artifactExpectedEventExists(
input: CanonicalRunSingleFakePhaseInput,
attempt: number,
): Promise<boolean> {
const [event] = await input.db
.select({ id: runEvents.id })
.from(runEvents)
.where(
and(
eq(runEvents.runId, input.runId),
eq(runEvents.phaseId, input.phaseId),
eq(runEvents.type, "artifact.expected"),
eq(
runEvents.idempotencyKey,
`artifact.expected:${input.phaseId}:${attempt}:${input.expectedArtifactPath}`,
),
),
)
.limit(1);
return event !== undefined;
}
async function sendPromptWithRetry(
sessions: SessionRuntime,
handle: { sessionId: string },
@@ -2163,6 +2342,19 @@ async function markSessionFailedNeedsHuman(
eventRepository: RunEventRepository,
sessionId: string,
) {
await input.db
.insert(tuiSessions)
.values({
id: sessionId,
runId: input.runId,
roleId: input.roleId,
backend: "fake",
cwd: input.worktreeRoot,
expectedArtifactPath: input.expectedArtifactPath,
expectedSchema: input.expectedSchema,
state: "FAILED_NEEDS_HUMAN",
})
.onConflictDoNothing({ target: tuiSessions.id });
await input.db
.update(tuiSessions)
.set({ state: "FAILED_NEEDS_HUMAN" })
@@ -2223,12 +2415,14 @@ async function waitForArtifact(path: string, options: ArtifactWaitOptions = {}):
let stableSince: number | undefined;
while (Date.now() <= deadline) {
throwIfAborted(options.signal);
options.onPoll?.();
try {
const signature = await artifactSignature(path);
if (signature === undefined || signature === ignoreInitialSignature) {
lastSignature = undefined;
stableSince = undefined;
await sleep(pollIntervalMs);
await sleep(pollIntervalMs, options.signal);
continue;
}
if (lastSignature === signature) {
@@ -2259,7 +2453,7 @@ async function waitForArtifact(path: string, options: ArtifactWaitOptions = {}):
});
}
}
await sleep(pollIntervalMs);
await sleep(pollIntervalMs, options.signal);
}
throw new DevflowError("Timed out waiting for fake phase artifact", {
@@ -2427,6 +2621,10 @@ function isDevflowErrorWithCode(error: unknown, code: string): error is DevflowE
return error instanceof DevflowError && error.code === code;
}
function isActivityCancelled(error: unknown): error is DevflowError {
return isDevflowErrorWithCode(error, "activity_cancelled");
}
function isRunStateChanged(error: unknown): error is DevflowError {
return isDevflowErrorWithCode(error, "run_state_changed");
}
@@ -2523,8 +2721,37 @@ async function captureTranscript(
});
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
function sleep(ms: number, signal?: AbortSignal): Promise<void> {
if (signal === undefined) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
throwIfAborted(signal);
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
signal.removeEventListener("abort", onAbort);
resolve();
}, ms);
const onAbort = () => {
clearTimeout(timeout);
reject(activityCancelledError(signal.reason));
};
signal.addEventListener("abort", onAbort, { once: true });
});
}
function throwIfAborted(signal?: AbortSignal): void {
if (signal?.aborted) {
throw activityCancelledError(signal.reason);
}
}
function activityCancelledError(cause: unknown): DevflowError {
return new DevflowError("Activity was cancelled before artifact wait completed", {
class: "recoverable",
code: "activity_cancelled",
cause,
});
}
function isNodeError(error: unknown): error is NodeJS.ErrnoException {