Files
dev-puppeteer/packages/run-engine/src/fake-phase-harness.ts
2026-05-11 00:46:45 +09:00

2533 lines
76 KiB
TypeScript

import { createHash, randomUUID } from "node:crypto";
import { existsSync, realpathSync } from "node:fs";
import { readFile, stat, unlink } from "node:fs/promises";
import { basename, dirname, isAbsolute, relative, resolve } from "node:path";
import {
DevflowError,
type PromptEnvelope,
canonicalize,
hash,
validateArtifact,
} from "@devflow/core";
import {
type DbClient,
RunEventRepository,
TuiTranscriptRepository,
approvalRequests,
artifacts,
runEvents,
runPhases,
runs,
tuiSessions,
} from "@devflow/db";
import {
type SessionAdapter,
type SessionHandle,
SessionManager,
type SessionRuntime,
type TranscriptChunkSink,
captureAndPersistTranscript,
} from "@devflow/session";
import { and, desc, eq, inArray, sql } from "drizzle-orm";
export interface FakePhaseWaitOptions {
timeoutMs?: number;
pollIntervalMs?: number;
stableMs?: number;
}
interface ArtifactWaitOptions extends FakePhaseWaitOptions {
ignoreInitialSignature?: string;
}
interface RunSingleFakePhaseBaseInput {
db: DbClient["db"];
runId: string;
phaseId: string;
phaseKey: string;
roleId: string;
worktreeRoot: string;
expectedArtifactPath: string;
expectedSchema: string;
instructions: string;
wait?: FakePhaseWaitOptions;
uuidFactory?: () => string;
transcriptSink?: TranscriptChunkSink;
terminalRun?: boolean;
workflowApprovalGateKey?: string;
workflowApprovalPayload?: Record<string, unknown>;
}
export type RunSingleFakePhaseInput = RunSingleFakePhaseBaseInput &
({ sessions: SessionRuntime; adapter?: never } | { adapter: SessionAdapter; sessions?: never });
type CanonicalRunSingleFakePhaseInput = RunSingleFakePhaseBaseInput & {
sessions: SessionRuntime;
};
export interface RunSingleFakePhaseResult {
sessionId: string;
promptId: string;
artifactId: string;
artifactHash: string;
artifactValid: boolean;
transcriptCaptured: number;
}
type TransactionDb = Parameters<Parameters<RunSingleFakePhaseInput["db"]["transaction"]>[0]>[0];
const sendPromptRetryBudget = 2;
const terminalRunStates = ["completed", "failed", "aborted"] as const;
const phaseMutationRunStates = ["executing", "planning"] as const;
interface PhaseEntry {
attempt: number;
continueArtifactWait: boolean;
continueValidation: boolean;
promptId?: string;
repairAttemptUsed: boolean;
replayedOutcome?: ArtifactOutcome;
resumedPrompt: boolean;
handle?: SessionHandle;
}
function canonicalizeRunSingleFakePhaseInput(
input: RunSingleFakePhaseInput,
): CanonicalRunSingleFakePhaseInput {
const rawWorktreeRoot = resolve(input.worktreeRoot);
const worktreeRoot = realpathSync(rawWorktreeRoot);
const expectedArtifactPath = canonicalizePathAgainstWorktree(
input.expectedArtifactPath,
rawWorktreeRoot,
worktreeRoot,
);
const sessions =
"sessions" in input && input.sessions !== undefined
? input.sessions
: new SessionManager({ db: input.db, adapter: input.adapter });
return { ...input, expectedArtifactPath, sessions, worktreeRoot };
}
function canonicalizePathAgainstWorktree(
path: string,
rawWorktreeRoot: string,
canonicalWorktreeRoot: string,
): string {
const absolutePath = resolve(path);
const relativeToWorktree = relative(rawWorktreeRoot, absolutePath);
if (!relativeToWorktree.startsWith("..") && !isAbsolute(relativeToWorktree)) {
return canonicalizePossiblyMissingPath(resolve(canonicalWorktreeRoot, relativeToWorktree));
}
return canonicalizePossiblyMissingPath(absolutePath);
}
function canonicalizePossiblyMissingPath(path: string): string {
const missingSegments: string[] = [];
let current = resolve(path);
while (!existsSync(current)) {
const parent = dirname(current);
if (parent === current) {
return resolve(path);
}
missingSegments.unshift(basename(current));
current = parent;
}
return resolve(realpathSync(current), ...missingSegments);
}
export async function runSingleFakePhase(
rawInput: RunSingleFakePhaseInput,
): Promise<RunSingleFakePhaseResult> {
const input = canonicalizeRunSingleFakePhaseInput(rawInput);
const eventRepository = new RunEventRepository(input.db);
const phaseEntry = await enterInitialPhase(input, eventRepository);
const attempt = phaseEntry.attempt;
let handle: SessionHandle;
if (phaseEntry.handle !== undefined) {
handle = phaseEntry.handle;
} else {
try {
await removeStaleArtifact(input);
} catch (error) {
await failPhaseAndRun(input, eventRepository, attempt, "stale_artifact_remove_failed");
throw error;
}
handle = await startSessionAndRecord(input, eventRepository, attempt);
}
const activeInstructions = phaseEntry.repairAttemptUsed
? repairInstructionsFor(input.instructions)
: input.instructions;
const envelope = buildEnvelope(input, attempt, activeInstructions);
const promptEventType = phaseEntry.repairAttemptUsed ? "prompt.repaired" : "prompt.sent";
let repairAttemptUsed = phaseEntry.repairAttemptUsed;
let promptSend: PromptSendRecord | undefined;
let promptId: string;
let outcome: ArtifactOutcome | undefined = phaseEntry.replayedOutcome;
let promptDedupKeyForIdle = envelope.dedupKey;
let initialPromptIdleRecorded = false;
if (phaseEntry.replayedOutcome !== undefined) {
promptId = requirePhaseEntryPromptId(input, phaseEntry, "Replayed artifact entry");
promptDedupKeyForIdle = promptId;
const replayedWorkflowGate =
phaseEntry.replayedOutcome.validation.ok && input.workflowApprovalGateKey !== undefined;
if (!replayedWorkflowGate) {
await markSessionIdle(input, eventRepository, handle.sessionId, promptId);
initialPromptIdleRecorded = true;
}
} else if (phaseEntry.continueArtifactWait) {
promptId = requirePhaseEntryPromptId(input, phaseEntry, "Artifact wait replay");
promptDedupKeyForIdle = promptId;
promptSend = { promptId, artifactBaselineSignature: undefined };
} else if (phaseEntry.continueValidation) {
promptId = requirePhaseEntryPromptId(input, phaseEntry, "Artifact validation replay");
promptDedupKeyForIdle = promptId;
} else {
try {
promptSend = await sendPromptAndRecord(
input,
eventRepository,
handle,
envelope,
promptEventType,
{ captureArtifactBaseline: !phaseEntry.resumedPrompt },
);
promptId = promptSend.promptId;
} catch (error) {
if (isRunStateChanged(error)) {
await captureTranscript(input, handle);
throw error;
}
if (shouldCreateHumanGate(error)) {
const gateError = toHumanRequiredRecoveryError(error);
await failPhaseAndRequestGate(
input,
eventRepository,
attempt,
"prompt_send_failed",
gateError.code,
{ errorCode: error.code, recoveryHint: gateError.recoveryHint },
handle.sessionId,
{ markSessionCrashed: true },
);
await captureTranscript(input, handle);
throw gateError;
}
await failRunAndDisposeSession(input, eventRepository, attempt, "prompt_send_failed", handle);
await captureTranscript(input, handle);
throw error;
}
}
if (outcome === undefined && promptSend === undefined && !phaseEntry.continueValidation) {
throw new DevflowError("Prompt send state missing before artifact wait", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
if (outcome === undefined) {
try {
outcome = phaseEntry.continueValidation
? await validateCurrentArtifact(input, eventRepository, attempt)
: await waitForAndValidateArtifact(
input,
eventRepository,
attempt,
handle.sessionId,
promptSend?.artifactBaselineSignature,
);
} catch (error) {
if (isRunStateChanged(error)) {
await captureTranscript(input, handle);
throw error;
}
if (!isDevflowErrorWithCode(error, "artifact_timeout_exhausted")) {
await failRunAndDisposeSession(
input,
eventRepository,
attempt,
"artifact_validation_failed",
handle,
);
await captureTranscript(input, handle);
throw error;
}
let recovered: boolean;
try {
recovered = await recoverFromArtifactTimeout(input, eventRepository, handle.sessionId);
} catch (recoveryError) {
if (isRunStateChanged(recoveryError)) {
await captureTranscript(input, handle);
throw recoveryError;
}
if (shouldCreateHumanGate(recoveryError)) {
const gateError = toArtifactTimeoutRecoveryGateError(recoveryError);
await failPhaseAndRequestGate(
input,
eventRepository,
attempt,
"artifact_timeout_recovery_failed",
gateError.code,
{
errorCode: recoveryError.code,
expectedArtifactPath: input.expectedArtifactPath,
recoveryHint: gateError.recoveryHint,
},
handle.sessionId,
);
await captureTranscript(input, handle);
throw gateError;
}
await failRunAndDisposeSession(
input,
eventRepository,
attempt,
"artifact_timeout_recovery_failed",
handle,
);
await captureTranscript(input, handle);
throw recoveryError;
}
if (!recovered) {
await failPhaseAndRequestGate(
input,
eventRepository,
attempt,
"artifact_timeout",
"artifact_timeout_exhausted",
{
expectedArtifactPath: input.expectedArtifactPath,
},
handle.sessionId,
{ markSessionCrashed: true },
);
await captureTranscript(input, handle);
throw error;
}
if (repairAttemptUsed) {
await failPhaseAndRequestGate(
input,
eventRepository,
attempt,
"artifact_timeout",
"artifact_timeout_exhausted",
{
expectedArtifactPath: input.expectedArtifactPath,
},
handle.sessionId,
);
await captureTranscript(input, handle);
throw error;
}
const timeoutRepairAttempt = await startPhaseAndRecord(
input,
eventRepository,
["awaiting_artifact"],
{
reason: "artifact_timeout",
repair: true,
},
);
repairAttemptUsed = true;
try {
await removeStaleArtifact(input);
} catch (error) {
await failRunAndDisposeSession(
input,
eventRepository,
timeoutRepairAttempt,
"stale_artifact_remove_failed",
handle,
);
await captureTranscript(input, handle);
throw error;
}
const timeoutRepairEnvelope = buildEnvelope(
input,
timeoutRepairAttempt,
repairInstructionsFor(input.instructions),
);
try {
promptSend = await sendPromptAndRecord(
input,
eventRepository,
handle,
timeoutRepairEnvelope,
"prompt.repaired",
);
promptId = promptSend.promptId;
} catch (repairError) {
if (isRunStateChanged(repairError)) {
await captureTranscript(input, handle);
throw repairError;
}
if (!shouldCreateHumanGate(repairError)) {
await failRunAndDisposeSession(
input,
eventRepository,
timeoutRepairAttempt,
"prompt_send_failed",
handle,
);
await captureTranscript(input, handle);
throw repairError;
}
const gateError = toHumanRequiredRecoveryError(repairError);
await failPhaseAndRequestGate(
input,
eventRepository,
timeoutRepairAttempt,
"prompt_send_failed",
gateError.code,
{
errorCode: repairError.code,
expectedArtifactPath: input.expectedArtifactPath,
recoveryHint: gateError.recoveryHint,
},
handle.sessionId,
{ markSessionCrashed: true },
);
await captureTranscript(input, handle);
throw gateError;
}
try {
outcome = await waitForAndValidateArtifact(
input,
eventRepository,
timeoutRepairAttempt,
handle.sessionId,
promptSend.artifactBaselineSignature,
);
} catch (repairError) {
if (isRunStateChanged(repairError)) {
await captureTranscript(input, handle);
throw repairError;
}
if (!isDevflowErrorWithCode(repairError, "artifact_timeout_exhausted")) {
await failRunAndDisposeSession(
input,
eventRepository,
timeoutRepairAttempt,
"artifact_repair_failed",
handle,
);
await captureTranscript(input, handle);
throw repairError;
}
await failPhaseAndRequestGate(
input,
eventRepository,
timeoutRepairAttempt,
"artifact_timeout",
"artifact_timeout_exhausted",
{
expectedArtifactPath: input.expectedArtifactPath,
},
handle.sessionId,
);
await captureTranscript(input, handle);
throw repairError;
}
if (!(outcome.validation.ok && input.workflowApprovalGateKey !== undefined)) {
await markSessionIdle(
input,
eventRepository,
handle.sessionId,
timeoutRepairEnvelope.dedupKey,
);
}
}
}
if (outcome === undefined) {
throw new DevflowError("Artifact outcome missing after fake phase wait", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
const successfulWorkflowGate =
outcome.validation.ok && input.workflowApprovalGateKey !== undefined;
if (outcome.attempt === attempt && !initialPromptIdleRecorded && !successfulWorkflowGate) {
await markSessionIdle(input, eventRepository, handle.sessionId, promptDedupKeyForIdle);
}
if (!outcome.validation.ok) {
if (repairAttemptUsed) {
await failPhaseAndRequestGate(
input,
eventRepository,
outcome.attempt,
"artifact_invalid",
"artifact_invalid_after_repair",
{
artifactId: outcome.artifact.id,
expectedArtifactPath: input.expectedArtifactPath,
},
handle.sessionId,
);
await captureTranscript(input, handle);
throw new DevflowError("Artifact remained invalid after repair", {
class: "human_required",
code: "artifact_invalid_after_repair",
runId: input.runId,
phaseId: input.phaseId,
});
}
const repairAttempt = await startPhaseAndRecord(input, eventRepository, ["validating"], {
repair: true,
});
repairAttemptUsed = true;
try {
await removeStaleArtifact(input);
} catch (error) {
await failRunAndDisposeSession(
input,
eventRepository,
repairAttempt,
"stale_artifact_remove_failed",
handle,
);
await captureTranscript(input, handle);
throw error;
}
const repairEnvelope = buildEnvelope(
input,
repairAttempt,
repairInstructionsFor(input.instructions),
);
try {
promptSend = await sendPromptAndRecord(
input,
eventRepository,
handle,
repairEnvelope,
"prompt.repaired",
);
promptId = promptSend.promptId;
} catch (error) {
if (isRunStateChanged(error)) {
await captureTranscript(input, handle);
throw error;
}
if (!shouldCreateHumanGate(error)) {
await failRunAndDisposeSession(
input,
eventRepository,
repairAttempt,
"prompt_send_failed",
handle,
);
await captureTranscript(input, handle);
throw error;
}
const gateError = toHumanRequiredRecoveryError(error);
await failPhaseAndRequestGate(
input,
eventRepository,
repairAttempt,
"prompt_send_failed",
gateError.code,
{
errorCode: error.code,
expectedArtifactPath: input.expectedArtifactPath,
recoveryHint: gateError.recoveryHint,
},
handle.sessionId,
{ markSessionCrashed: true },
);
await captureTranscript(input, handle);
throw gateError;
}
try {
outcome = await waitForAndValidateArtifact(
input,
eventRepository,
repairAttempt,
handle.sessionId,
promptSend.artifactBaselineSignature,
);
} catch (error) {
if (isRunStateChanged(error)) {
await captureTranscript(input, handle);
throw error;
}
if (!isDevflowErrorWithCode(error, "artifact_timeout_exhausted")) {
await failRunAndDisposeSession(
input,
eventRepository,
repairAttempt,
"artifact_repair_failed",
handle,
);
await captureTranscript(input, handle);
throw error;
}
await failPhaseAndRequestGate(
input,
eventRepository,
repairAttempt,
"artifact_timeout",
"artifact_timeout_exhausted",
{
expectedArtifactPath: input.expectedArtifactPath,
},
handle.sessionId,
);
await captureTranscript(input, handle);
throw error;
}
if (!(outcome.validation.ok && input.workflowApprovalGateKey !== undefined)) {
await markSessionIdle(input, eventRepository, handle.sessionId, repairEnvelope.dedupKey);
}
}
if (outcome.validation.ok) {
if (input.workflowApprovalGateKey !== undefined) {
await requestWorkflowApproval(
input,
eventRepository,
outcome.attempt,
handle.sessionId,
input.workflowApprovalGateKey,
input.workflowApprovalPayload ?? {
artifactId: outcome.artifact.id,
expectedArtifactPath: input.expectedArtifactPath,
schemaId: input.expectedSchema,
},
);
} else {
await completePhaseAndRun(
input,
eventRepository,
outcome.attempt,
handle.sessionId,
input.terminalRun ?? true,
);
}
} else {
await failPhaseAndRequestGate(
input,
eventRepository,
outcome.attempt,
"artifact_invalid",
"artifact_invalid_after_repair",
{
artifactId: outcome.artifact.id,
expectedArtifactPath: input.expectedArtifactPath,
},
handle.sessionId,
);
await captureTranscript(input, handle);
throw new DevflowError("Artifact remained invalid after repair", {
class: "human_required",
code: "artifact_invalid_after_repair",
runId: input.runId,
phaseId: input.phaseId,
});
}
const transcript = await captureTranscript(input, handle);
return {
sessionId: handle.sessionId,
promptId,
artifactId: outcome.artifact.id,
artifactHash: outcome.artifactHash,
artifactValid: outcome.validation.ok,
transcriptCaptured: transcript.captured,
};
}
async function enterInitialPhase(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
): Promise<PhaseEntry> {
const attempt = await tryStartPhaseAndRecord(input, eventRepository, ["pending"]);
if (attempt !== undefined) {
return {
attempt,
continueArtifactWait: false,
continueValidation: false,
repairAttemptUsed: false,
resumedPrompt: false,
};
}
const [phase] = await input.db
.select({ attempts: runPhases.attempts, state: runPhases.state })
.from(runPhases)
.where(and(eq(runPhases.id, input.phaseId), eq(runPhases.runId, input.runId)));
if (phase === undefined) {
throw new DevflowError("Run phase does not exist", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
if (phase.state === "running" && phase.attempts > 0) {
const phaseStart = await phaseStartReplayMetadata(input, phase.attempts);
if (phaseStart === undefined) {
throw cannotReplayPhase(input, phase.state);
}
const instructions = phaseStart.repairAttemptUsed
? repairInstructionsFor(input.instructions)
: input.instructions;
const envelope = buildEnvelope(input, phase.attempts, instructions);
const [session] = await input.db
.select({
expectedArtifactPath: tuiSessions.expectedArtifactPath,
expectedSchema: tuiSessions.expectedSchema,
id: tuiSessions.id,
lastPromptHash: tuiSessions.lastPromptHash,
lastPromptAt: tuiSessions.lastPromptAt,
roleId: tuiSessions.roleId,
state: tuiSessions.state,
})
.from(tuiSessions)
.where(and(eq(tuiSessions.runId, input.runId), eq(tuiSessions.roleId, input.roleId)));
if (session === undefined) {
return {
attempt: phase.attempts,
continueArtifactWait: false,
continueValidation: false,
repairAttemptUsed: phaseStart.repairAttemptUsed,
resumedPrompt: false,
};
}
if (["CREATED", "BOOTSTRAPPING", "READY"].includes(session.state)) {
return {
attempt: phase.attempts,
continueArtifactWait: false,
continueValidation: false,
repairAttemptUsed: phaseStart.repairAttemptUsed,
resumedPrompt: false,
handle: { sessionId: session.id },
};
}
if (
session.state === "BUSY" &&
session.lastPromptHash === envelope.dedupKey &&
session.expectedArtifactPath === input.expectedArtifactPath &&
session.expectedSchema === input.expectedSchema
) {
return {
attempt: phase.attempts,
continueArtifactWait: false,
continueValidation: false,
repairAttemptUsed: phaseStart.repairAttemptUsed,
resumedPrompt: true,
handle: { sessionId: session.id },
};
}
}
if (phase.state === "awaiting_artifact" && phase.attempts > 0) {
const phaseStart = await phaseStartReplayMetadata(input, phase.attempts);
if (phaseStart === undefined) {
throw cannotReplayPhase(input, phase.state);
}
const instructions = phaseStart.repairAttemptUsed
? repairInstructionsFor(input.instructions)
: input.instructions;
const envelope = buildEnvelope(input, phase.attempts, instructions);
const [session] = await input.db
.select({
expectedArtifactPath: tuiSessions.expectedArtifactPath,
expectedSchema: tuiSessions.expectedSchema,
id: tuiSessions.id,
lastPromptHash: tuiSessions.lastPromptHash,
lastPromptAt: tuiSessions.lastPromptAt,
state: tuiSessions.state,
})
.from(tuiSessions)
.where(and(eq(tuiSessions.runId, input.runId), eq(tuiSessions.roleId, input.roleId)));
if (
session !== undefined &&
session.state !== "FAILED_NEEDS_HUMAN" &&
session.lastPromptHash === envelope.dedupKey &&
session.expectedArtifactPath === input.expectedArtifactPath &&
session.expectedSchema === input.expectedSchema
) {
return {
attempt: phase.attempts,
continueArtifactWait: true,
continueValidation: false,
promptId: session.lastPromptHash,
repairAttemptUsed: phaseStart.repairAttemptUsed,
resumedPrompt: true,
handle: { sessionId: session.id },
};
}
}
if (phase.state === "validating" && phase.attempts > 0) {
const phaseStart = await phaseStartReplayMetadata(input, phase.attempts);
if (phaseStart === undefined) {
throw cannotReplayPhase(input, phase.state);
}
const [artifact] = await input.db
.select({
createdAt: artifacts.createdAt,
hash: artifacts.hash,
id: artifacts.id,
path: artifacts.path,
valid: artifacts.valid,
validationError: artifacts.validationError,
})
.from(artifacts)
.where(
and(
eq(artifacts.runId, input.runId),
eq(artifacts.phaseId, input.phaseId),
eq(artifacts.path, input.expectedArtifactPath),
eq(artifacts.schemaId, input.expectedSchema),
),
)
.orderBy(desc(artifacts.createdAt))
.limit(1);
const instructions = phaseStart.repairAttemptUsed
? repairInstructionsFor(input.instructions)
: input.instructions;
const envelope = buildEnvelope(input, phase.attempts, instructions);
const [session] = await input.db
.select({
expectedArtifactPath: tuiSessions.expectedArtifactPath,
expectedSchema: tuiSessions.expectedSchema,
id: tuiSessions.id,
lastPromptHash: tuiSessions.lastPromptHash,
lastPromptAt: tuiSessions.lastPromptAt,
roleId: tuiSessions.roleId,
})
.from(tuiSessions)
.where(and(eq(tuiSessions.runId, input.runId), eq(tuiSessions.roleId, input.roleId)));
if (
artifact !== undefined &&
session !== undefined &&
session.lastPromptHash !== null &&
session.lastPromptHash === envelope.dedupKey &&
artifact.createdAt >= (session.lastPromptAt ?? new Date(0))
) {
const validation = persistedArtifactValidation(input, artifact);
await eventRepository.append({
runId: input.runId,
phaseId: input.phaseId,
type: validation.ok ? "artifact.validated" : "artifact.invalid",
payload: validation.ok
? {
artifactId: artifact.id,
hash: artifact.hash,
path: artifact.path,
schemaId: input.expectedSchema,
}
: {
artifactId: artifact.id,
errors: validation.errors,
hash: artifact.hash,
path: artifact.path,
schemaId: input.expectedSchema,
},
idempotencyKey: `${validation.ok ? "artifact.validated" : "artifact.invalid"}:${input.phaseId}:${artifact.path}:${artifact.hash}`,
});
return {
attempt: phase.attempts,
continueArtifactWait: false,
continueValidation: false,
promptId: session.lastPromptHash,
repairAttemptUsed: phaseStart.repairAttemptUsed,
replayedOutcome: {
attempt: phase.attempts,
artifact: { id: artifact.id },
artifactHash: artifact.hash,
validation,
},
resumedPrompt: false,
handle: { sessionId: session.id },
};
}
if (
session !== undefined &&
session.lastPromptHash === envelope.dedupKey &&
session.expectedArtifactPath === input.expectedArtifactPath &&
session.expectedSchema === input.expectedSchema
) {
return {
attempt: phase.attempts,
continueArtifactWait: false,
continueValidation: true,
promptId: session.lastPromptHash,
repairAttemptUsed: phaseStart.repairAttemptUsed,
resumedPrompt: true,
handle: { sessionId: session.id },
};
}
}
throw cannotReplayPhase(input, phase.state);
}
function cannotReplayPhase(
input: CanonicalRunSingleFakePhaseInput,
phaseState: string,
): DevflowError {
return new DevflowError("Cannot start a fake phase from the current phase state", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
recoveryHint: `phase_state=${phaseState}`,
});
}
function requirePhaseEntryPromptId(
input: CanonicalRunSingleFakePhaseInput,
phaseEntry: PhaseEntry,
context: string,
): string {
if (phaseEntry.promptId !== undefined) {
return phaseEntry.promptId;
}
throw new DevflowError(`${context} did not include a prompt id`, {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
interface PhaseStartReplayMetadata {
repairAttemptUsed: boolean;
}
async function phaseStartReplayMetadata(
input: CanonicalRunSingleFakePhaseInput,
attempt: number,
): Promise<PhaseStartReplayMetadata | undefined> {
const [event] = await input.db
.select({ payload: runEvents.payload })
.from(runEvents)
.where(
and(
eq(runEvents.runId, input.runId),
eq(runEvents.phaseId, input.phaseId),
eq(runEvents.type, "phase.started"),
eq(runEvents.idempotencyKey, `phase.started:${input.phaseId}:${attempt}`),
),
)
.limit(1);
if (event === undefined) {
return undefined;
}
const payload = event.payload;
const repairAttemptUsed =
typeof payload === "object" &&
payload !== null &&
"repair" in payload &&
payload.repair === true;
return { repairAttemptUsed };
}
interface PersistedArtifactReplay {
valid: boolean;
validationError: unknown;
}
function persistedArtifactValidation(
input: CanonicalRunSingleFakePhaseInput,
artifact: PersistedArtifactReplay,
): ReturnType<typeof validateArtifact> {
if (artifact.valid) {
return { ok: true };
}
if (
typeof artifact.validationError === "object" &&
artifact.validationError !== null &&
"errors" in artifact.validationError &&
Array.isArray(artifact.validationError.errors)
) {
return { ok: false, errors: artifact.validationError.errors };
}
throw new DevflowError("Invalid artifact replay is missing validation errors", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
async function startPhaseAndRecord(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
allowedCurrentStates: readonly string[],
payload: Record<string, unknown> = {},
): Promise<number> {
const attempt = await tryStartPhaseAndRecord(
input,
eventRepository,
allowedCurrentStates,
payload,
);
if (attempt !== undefined) {
return attempt;
}
const [phase] = await input.db
.select({ state: runPhases.state })
.from(runPhases)
.where(and(eq(runPhases.id, input.phaseId), eq(runPhases.runId, input.runId)));
if (phase === undefined) {
throw new DevflowError("Run phase does not exist", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
throw new DevflowError("Cannot start a fake phase from the current phase state", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
recoveryHint: `phase_state=${phase.state}`,
});
}
async function tryStartPhaseAndRecord(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
allowedCurrentStates: readonly string[],
payload: Record<string, unknown> = {},
): Promise<number | undefined> {
return input.db.transaction(async (tx) => {
await tx.execute(sql`SELECT 1 FROM ${runs} WHERE ${runs.id} = ${input.runId} FOR UPDATE`);
const [run] = await tx.select({ state: runs.state }).from(runs).where(eq(runs.id, input.runId));
if (run === undefined) {
throw new DevflowError("Run does not exist", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
if (!phaseMutationRunStates.includes(run.state as (typeof phaseMutationRunStates)[number])) {
throw new DevflowError("Run left active state before fake phase start", {
class: "human_required",
code: "run_state_changed",
runId: input.runId,
phaseId: input.phaseId,
recoveryHint: `run_state=${run.state}`,
});
}
const [updatedPhase] = await tx
.update(runPhases)
.set({
attempts: sql`${runPhases.attempts} + 1`,
state: "running",
startedAt: new Date(),
})
.where(
and(
eq(runPhases.id, input.phaseId),
eq(runPhases.runId, input.runId),
inArray(runPhases.state, [...allowedCurrentStates]),
),
)
.returning({ attempts: runPhases.attempts });
if (updatedPhase !== undefined) {
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "phase.started",
payload: { phaseKey: input.phaseKey, attempt: updatedPhase.attempts, ...payload },
idempotencyKey: `phase.started:${input.phaseId}:${updatedPhase.attempts}`,
});
return updatedPhase.attempts;
}
const [phaseExists] = await tx
.select({ id: runPhases.id })
.from(runPhases)
.where(and(eq(runPhases.id, input.phaseId), eq(runPhases.runId, input.runId)));
if (phaseExists === undefined) {
throw new DevflowError("Run phase does not exist", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
return undefined;
});
}
async function failPhaseAndRequestGate(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
attempt: number,
reason: string,
gateKey: string,
payload: Record<string, unknown>,
sessionId?: string,
options: { markSessionCrashed?: boolean } = {},
) {
try {
await input.db.transaction(async (tx) => {
await tx.execute(sql`SELECT 1 FROM ${runs} WHERE ${runs.id} = ${input.runId} FOR UPDATE`);
const [run] = await tx
.select({ state: runs.state })
.from(runs)
.where(eq(runs.id, input.runId));
if (run === undefined) {
throw new DevflowError("Run does not exist", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
if (isTerminalRunState(run.state)) {
return;
}
const request = await ensureHumanGateRequestInTransaction(
input,
tx,
gateKey,
attempt,
payload,
);
await tx
.update(runPhases)
.set({ state: "failed", endedAt: new Date() })
.where(and(eq(runPhases.id, input.phaseId), eq(runPhases.runId, input.runId)));
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "phase.failed",
payload: { phaseKey: input.phaseKey, attempt, reason },
idempotencyKey: `phase.failed:${input.phaseId}:${attempt}`,
});
if (run.state !== "paused") {
const cause = `human_required:${gateKey}:${input.phaseId}:${attempt}`;
await tx
.update(runs)
.set({ state: "paused", pausedFromState: run.state, updatedAt: new Date() })
.where(eq(runs.id, input.runId));
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
type: "run.paused",
payload: { cause, pausedFromState: run.state },
idempotencyKey: `run.paused:${input.runId}:${cause}`,
});
}
if (sessionId !== undefined && options.markSessionCrashed === true) {
const [session] = await tx
.select({ recoveryAttempts: tuiSessions.recoveryAttempts })
.from(tuiSessions)
.where(eq(tuiSessions.id, sessionId));
const recoveryAttempts = (session?.recoveryAttempts ?? 0) + 1;
await tx
.update(tuiSessions)
.set({ state: "CRASHED", recoveryAttempts })
.where(eq(tuiSessions.id, sessionId));
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "session.crashed",
payload: { sessionId, roleId: input.roleId, recoveryAttempts },
idempotencyKey: `session.crashed:${sessionId}:${recoveryAttempts}`,
});
}
if (sessionId !== undefined) {
await tx
.update(tuiSessions)
.set({ state: "FAILED_NEEDS_HUMAN" })
.where(eq(tuiSessions.id, sessionId));
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "session.failed",
payload: { sessionId, roleId: input.roleId },
idempotencyKey: `session.failed:${sessionId}`,
});
}
await appendHumanGateRequestedEventInTransaction(
input,
eventRepository,
tx,
request,
gateKey,
);
});
} catch (error) {
await failPhaseAndRun(input, eventRepository, attempt, "approval_request_failed");
if (sessionId !== undefined) {
await markSessionFailedNeedsHuman(input, eventRepository, sessionId);
}
throw error;
}
}
async function failPhaseAndRun(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
attempt: number,
reason: string,
) {
let sessionIdsToDispose: string[] = [];
await input.db.transaction(async (tx) => {
await tx.execute(sql`SELECT 1 FROM ${runs} WHERE ${runs.id} = ${input.runId} FOR UPDATE`);
const [run] = await tx.select({ state: runs.state }).from(runs).where(eq(runs.id, input.runId));
if (run === undefined || isTerminalRunState(run.state)) {
return;
}
await tx
.update(runPhases)
.set({ state: "failed", endedAt: new Date() })
.where(and(eq(runPhases.id, input.phaseId), eq(runPhases.runId, input.runId)));
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "phase.failed",
payload: { phaseKey: input.phaseKey, attempt, reason },
idempotencyKey: `phase.failed:${input.phaseId}:${attempt}`,
});
await tx
.update(runs)
.set({
state: "failed",
currentPhaseId: null,
pausedFromState: null,
endedAt: new Date(),
updatedAt: new Date(),
})
.where(eq(runs.id, input.runId));
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
type: "run.failed",
payload: { reason },
idempotencyKey: `run.failed:${input.runId}`,
});
sessionIdsToDispose = await markAllSessionsFailedInTransaction(
tx,
eventRepository,
input.runId,
);
});
await disposeSessionIds(input.sessions, sessionIdsToDispose);
}
async function failRunAndDisposeSession(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
attempt: number,
reason: string,
handle: { sessionId: string },
) {
await failPhaseAndRun(input, eventRepository, attempt, reason);
await input.sessions.dispose(handle).catch(() => undefined);
}
async function completePhaseAndRun(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
attempt: number,
sessionId: string,
terminalRun = true,
) {
await input.db.transaction(async (tx) => {
await tx.execute(sql`SELECT 1 FROM ${runs} WHERE ${runs.id} = ${input.runId} FOR UPDATE`);
const [run] = await tx.select({ state: runs.state }).from(runs).where(eq(runs.id, input.runId));
if (run === undefined || isTerminalRunState(run.state)) {
return;
}
if (!phaseMutationRunStates.includes(run.state as (typeof phaseMutationRunStates)[number])) {
throw new DevflowError("Run left active state before fake phase completion", {
class: "human_required",
code: "run_state_changed",
runId: input.runId,
phaseId: input.phaseId,
recoveryHint: `run_state=${run.state}`,
});
}
await tx
.update(runPhases)
.set({ state: "completed", endedAt: new Date() })
.where(and(eq(runPhases.id, input.phaseId), eq(runPhases.runId, input.runId)));
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "phase.completed",
payload: { phaseKey: input.phaseKey, attempt },
idempotencyKey: `phase.completed:${input.phaseId}:${attempt}`,
});
const [session] = await tx
.select({ recoveryAttempts: tuiSessions.recoveryAttempts })
.from(tuiSessions)
.where(eq(tuiSessions.id, sessionId));
const recoveryAttempts = session?.recoveryAttempts ?? 0;
await tx.update(tuiSessions).set({ state: "READY" }).where(eq(tuiSessions.id, sessionId));
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "session.ready",
payload: { sessionId, roleId: input.roleId, recoveryAttempts },
idempotencyKey: `session.ready:${sessionId}:${recoveryAttempts}`,
});
if (!terminalRun) {
return;
}
await tx
.update(runs)
.set({
state: "completed",
currentPhaseId: null,
pausedFromState: null,
endedAt: new Date(),
updatedAt: new Date(),
})
.where(eq(runs.id, input.runId));
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
type: "run.completed",
payload: { phaseKey: input.phaseKey },
idempotencyKey: `run.completed:${input.runId}`,
});
});
}
async function requestWorkflowApproval(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
attempt: number,
sessionId: string,
gateKey: string,
payload: Record<string, unknown>,
) {
const approvalIdempotencyKey = `${input.runId}:${gateKey}:${input.phaseId}:${attempt}`;
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
const request = await ensureApprovalRequestInTransaction(
input,
tx,
gateKey,
approvalIdempotencyKey,
payload,
);
await tx
.update(runPhases)
.set({ state: "awaiting_approval" })
.where(and(eq(runPhases.id, input.phaseId), eq(runPhases.runId, input.runId)));
await tx
.update(tuiSessions)
.set({ state: "WAITING_FOR_APPROVAL" })
.where(eq(tuiSessions.id, sessionId));
await tx
.update(runs)
.set({ state: "awaiting_approval", currentPhaseId: input.phaseId })
.where(and(eq(runs.id, input.runId), inArray(runs.state, ["executing", "planning"])));
const [run] = await tx
.select({ state: runs.state })
.from(runs)
.where(eq(runs.id, input.runId))
.limit(1);
if (run?.state !== "awaiting_approval") {
throw new DevflowError("Cannot request workflow approval after run left executing state", {
class: "human_required",
code: "run_state_changed",
runId: input.runId,
phaseId: input.phaseId,
recoveryHint: `run_state=${run?.state ?? "missing"}`,
});
}
await appendHumanGateRequestedEventInTransaction(input, eventRepository, tx, request, gateKey);
});
}
async function ensureApprovalRequestInTransaction(
input: CanonicalRunSingleFakePhaseInput,
tx: TransactionDb,
gateKey: string,
idempotencyKey: string,
payload: Record<string, unknown>,
): Promise<HumanGateRequest> {
const inserted = await tx
.insert(approvalRequests)
.values({
runId: input.runId,
phaseId: input.phaseId,
gateKey,
state: "pending",
idempotencyKey,
payload,
})
.onConflictDoNothing({ target: approvalRequests.idempotencyKey })
.returning({ id: approvalRequests.id, idempotencyKey: approvalRequests.idempotencyKey });
if (inserted[0] !== undefined) {
return inserted[0];
}
const [existing] = await tx
.select({
id: approvalRequests.id,
idempotencyKey: approvalRequests.idempotencyKey,
payload: approvalRequests.payload,
state: approvalRequests.state,
})
.from(approvalRequests)
.where(eq(approvalRequests.idempotencyKey, idempotencyKey));
if (existing === undefined || existing.state !== "pending") {
throw new DevflowError("Approval request replay did not match pending request", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
if (canonicalize(existing.payload) !== canonicalize(payload)) {
throw new DevflowError("Approval request replay payload mismatch", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
return { id: existing.id, idempotencyKey: existing.idempotencyKey };
}
async function startSessionAndRecord(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
attempt: number,
): Promise<SessionHandle> {
const existingHandle = await resumeExistingSessionAndRecord(input, eventRepository, attempt);
if (existingHandle !== undefined) {
return existingHandle;
}
let handle: SessionHandle | undefined;
let sessionRowPersisted = false;
try {
handle = await input.sessions.start({
runId: input.runId,
roleId: input.roleId,
backend: "fake",
cwd: input.worktreeRoot,
expectedArtifactPath: input.expectedArtifactPath,
expectedSchema: input.expectedSchema,
});
const startedHandle = handle;
let sessionInsertConflicted = false;
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
const insertedSession = await tx
.insert(tuiSessions)
.values({
id: startedHandle.sessionId,
runId: input.runId,
roleId: input.roleId,
backend: "fake",
cwd: input.worktreeRoot,
expectedArtifactPath: input.expectedArtifactPath,
expectedSchema: input.expectedSchema,
state: "CREATED",
})
.onConflictDoNothing({ target: [tuiSessions.runId, tuiSessions.roleId] })
.returning({ id: tuiSessions.id });
if (insertedSession[0] === undefined) {
sessionInsertConflicted = true;
return;
}
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "session.created",
payload: { sessionId: startedHandle.sessionId, roleId: input.roleId, backend: "fake" },
idempotencyKey: `session.created:${startedHandle.sessionId}`,
});
await tx
.update(tuiSessions)
.set({ state: "BOOTSTRAPPING" })
.where(eq(tuiSessions.id, startedHandle.sessionId));
await tx
.update(tuiSessions)
.set({ state: "READY" })
.where(eq(tuiSessions.id, startedHandle.sessionId));
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "session.ready",
payload: { sessionId: startedHandle.sessionId, roleId: input.roleId, recoveryAttempts: 0 },
idempotencyKey: `session.ready:${startedHandle.sessionId}:0`,
});
});
if (sessionInsertConflicted) {
await input.sessions.dispose(startedHandle).catch(() => undefined);
handle = undefined;
const existingHandle = await resumeExistingSessionAndRecord(input, eventRepository, attempt);
if (existingHandle !== undefined) {
return existingHandle;
}
throw new DevflowError("Concurrent fake session insert conflicted without an existing row", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
sessionRowPersisted = true;
return startedHandle;
} catch (error) {
if (handle !== undefined) {
await input.sessions.dispose(handle);
}
if (isRunStateChanged(error)) {
throw error;
}
if (shouldCreateHumanGate(error)) {
const gateError = toHumanRequiredRecoveryError(error);
await failPhaseAndRequestGate(
input,
eventRepository,
attempt,
"session_start_failed",
gateError.code,
{ errorCode: error.code, recoveryHint: gateError.recoveryHint },
sessionRowPersisted ? handle?.sessionId : undefined,
);
throw gateError;
}
await failPhaseAndRun(input, eventRepository, attempt, "session_start_failed");
if (sessionRowPersisted && handle !== undefined) {
await markSessionFailedNeedsHuman(input, eventRepository, handle.sessionId);
}
throw error;
}
}
async function resumeExistingSessionAndRecord(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
attempt: number,
): Promise<SessionHandle | undefined> {
const [session] = await input.db
.select({
id: tuiSessions.id,
recoveryAttempts: tuiSessions.recoveryAttempts,
state: tuiSessions.state,
})
.from(tuiSessions)
.where(and(eq(tuiSessions.runId, input.runId), eq(tuiSessions.roleId, input.roleId)))
.limit(1);
if (session === undefined) {
return undefined;
}
if (session.state === "FAILED_NEEDS_HUMAN") {
throw new DevflowError("Cannot reuse a failed fake phase session", {
class: "human_required",
code: "session_failed_needs_human",
runId: input.runId,
phaseId: input.phaseId,
});
}
let handle: SessionHandle;
try {
handle = await resumeWithRetry(input.sessions, { sessionId: session.id });
} catch (error) {
if (isRunStateChanged(error)) {
throw error;
}
if (shouldCreateHumanGate(error)) {
const gateError = toHumanRequiredRecoveryError(error);
await failPhaseAndRequestGate(
input,
eventRepository,
attempt,
"session_resume_failed",
gateError.code,
{ errorCode: error.code, recoveryHint: gateError.recoveryHint },
session.id,
{ markSessionCrashed: true },
);
throw gateError;
}
await failPhaseAndRun(input, eventRepository, attempt, "session_resume_failed");
throw error;
}
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
await tx
.update(tuiSessions)
.set({
cwd: input.worktreeRoot,
expectedArtifactPath: input.expectedArtifactPath,
expectedSchema: input.expectedSchema,
state: "READY",
})
.where(eq(tuiSessions.id, session.id));
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "session.ready",
payload: {
sessionId: session.id,
roleId: input.roleId,
recoveryAttempts: session.recoveryAttempts,
},
idempotencyKey: `session.ready:${session.id}:${session.recoveryAttempts}`,
});
});
return handle;
}
async function setPhaseState(
input: CanonicalRunSingleFakePhaseInput,
state: "running" | "awaiting_artifact" | "validating" | "completed" | "failed",
) {
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
await tx
.update(runPhases)
.set({ state })
.where(and(eq(runPhases.id, input.phaseId), eq(runPhases.runId, input.runId)));
});
}
function isTerminalRunState(state: string): boolean {
return terminalRunStates.includes(state as (typeof terminalRunStates)[number]);
}
async function assertRunCanMutatePhaseInTransaction(
input: CanonicalRunSingleFakePhaseInput,
tx: TransactionDb,
) {
await tx.execute(sql`SELECT 1 FROM ${runs} WHERE ${runs.id} = ${input.runId} FOR UPDATE`);
const [run] = await tx.select({ state: runs.state }).from(runs).where(eq(runs.id, input.runId));
if (run === undefined) {
throw new DevflowError("Run does not exist", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
if (!phaseMutationRunStates.includes(run.state as (typeof phaseMutationRunStates)[number])) {
throw new DevflowError("Run left active state before fake phase mutation", {
class: "human_required",
code: "run_state_changed",
runId: input.runId,
phaseId: input.phaseId,
recoveryHint: `run_state=${run.state}`,
});
}
}
function buildEnvelope(
input: CanonicalRunSingleFakePhaseInput,
attempt: number,
instructions: string,
): PromptEnvelope {
const envelopeWithoutUuid = {
runId: input.runId,
roleId: input.roleId,
phaseKey: input.phaseKey,
expectedArtifact: input.expectedArtifactPath,
expectedSchema: input.expectedSchema,
instructions,
attempt,
};
return {
uuid: input.uuidFactory?.() ?? randomUUID(),
runId: input.runId,
roleId: input.roleId,
phaseKey: input.phaseKey,
attempt,
expectedArtifact: input.expectedArtifactPath,
expectedSchema: input.expectedSchema,
dedupKey: hash(envelopeWithoutUuid),
instructions,
};
}
interface PromptSendRecord {
promptId: string;
artifactBaselineSignature: string | undefined;
}
interface SendPromptAndRecordOptions {
captureArtifactBaseline?: boolean;
}
async function sendPromptAndRecord(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
handle: { sessionId: string },
envelope: PromptEnvelope,
type: "prompt.sent" | "prompt.repaired",
options: SendPromptAndRecordOptions = {},
): Promise<PromptSendRecord> {
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
await tx
.update(tuiSessions)
.set({
cwd: input.worktreeRoot,
expectedArtifactPath: input.expectedArtifactPath,
expectedSchema: input.expectedSchema,
state: "BUSY",
lastPromptHash: envelope.dedupKey,
lastPromptAt: new Date(),
})
.where(eq(tuiSessions.id, handle.sessionId));
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "session.busy",
payload: { sessionId: handle.sessionId, roleId: input.roleId, dedupKey: envelope.dedupKey },
idempotencyKey: `session.busy:${handle.sessionId}:${envelope.dedupKey}`,
});
});
const artifactBaselineSignature =
options.captureArtifactBaseline === false
? undefined
: await artifactSignature(input.expectedArtifactPath);
const prompt = await sendPromptWithRetry(input.sessions, handle, envelope);
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type,
payload: { roleId: input.roleId, dedupKey: envelope.dedupKey },
idempotencyKey: `${type}:${envelope.dedupKey}`,
});
});
return { promptId: prompt.promptId, artifactBaselineSignature };
}
async function sendPromptWithRetry(
sessions: SessionRuntime,
handle: { sessionId: string },
envelope: PromptEnvelope,
): Promise<{ promptId: string }> {
let lastError: unknown;
for (let physicalAttempt = 0; physicalAttempt <= sendPromptRetryBudget; physicalAttempt += 1) {
try {
return await sessions.sendPrompt(handle, envelope);
} catch (error) {
lastError = error;
if (!(error instanceof DevflowError) || error.class !== "recoverable") {
throw error;
}
}
}
throw lastError;
}
interface ArtifactOutcome {
attempt: number;
artifact: { id: string };
artifactHash: string;
validation: ReturnType<typeof validateArtifact>;
}
interface ArtifactRecord {
id: string;
phaseId: string | null;
schemaId: string;
valid: boolean;
validationError: unknown;
}
async function waitForAndValidateArtifact(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
attempt: number,
sessionId: string,
artifactBaselineSignature: string | undefined,
): Promise<ArtifactOutcome> {
await startArtifactWait(input, eventRepository, attempt);
try {
const waitOptions: ArtifactWaitOptions = { ...input.wait };
if (artifactBaselineSignature !== undefined) {
waitOptions.ignoreInitialSignature = artifactBaselineSignature;
}
await waitForArtifact(input.expectedArtifactPath, waitOptions);
} catch (error) {
if (!isDevflowErrorWithCode(error, "artifact_timeout_exhausted")) {
throw error;
}
await recordArtifactTimeout(input, eventRepository, attempt, sessionId);
throw error;
}
await setPhaseState(input, "validating");
return validateCurrentArtifact(input, eventRepository, attempt);
}
async function validateCurrentArtifact(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
attempt: number,
): Promise<ArtifactOutcome> {
const artifactBytes = await readArtifactBytes(input);
const artifactHash = createHash("sha256").update(artifactBytes).digest("hex");
const parsedArtifact = parseArtifactJson(artifactBytes);
const validation = validateArtifact(input.expectedSchema, parsedArtifact);
const artifact = await recordArtifactValidation(
input,
eventRepository,
attempt,
artifactHash,
validation,
);
if (artifact === undefined) {
throw new DevflowError("Artifact insert returned no row", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
return { attempt, artifact, artifactHash, validation };
}
async function startArtifactWait(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
attempt: number,
) {
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
await tx
.update(runPhases)
.set({ state: "awaiting_artifact" })
.where(and(eq(runPhases.id, input.phaseId), eq(runPhases.runId, input.runId)));
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "artifact.expected",
payload: {
path: input.expectedArtifactPath,
schemaId: input.expectedSchema,
attempt,
},
idempotencyKey: `artifact.expected:${input.phaseId}:${attempt}:${input.expectedArtifactPath}`,
});
});
}
async function recordArtifactTimeout(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
attempt: number,
sessionId: string,
) {
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "artifact.timeout",
payload: {
path: input.expectedArtifactPath,
schemaId: input.expectedSchema,
attempt,
},
idempotencyKey: `artifact.timeout:${input.phaseId}:${attempt}:${input.expectedArtifactPath}`,
});
await tx
.update(tuiSessions)
.set({ state: "ARTIFACT_TIMEOUT" })
.where(eq(tuiSessions.id, sessionId));
});
}
async function recordArtifactValidation(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
attempt: number,
artifactHash: string,
validation: ReturnType<typeof validateArtifact>,
): Promise<{ id: string } | undefined> {
return input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
const artifact = await insertArtifactRecordInTransaction(tx, input, artifactHash, validation);
if (artifact === undefined) {
return undefined;
}
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: validation.ok ? "artifact.validated" : "artifact.invalid",
payload: validation.ok
? {
artifactId: artifact.id,
hash: artifactHash,
path: input.expectedArtifactPath,
schemaId: input.expectedSchema,
}
: {
artifactId: artifact.id,
hash: artifactHash,
path: input.expectedArtifactPath,
schemaId: input.expectedSchema,
errors: validation.errors,
},
idempotencyKey: `${validation.ok ? "artifact.validated" : "artifact.invalid"}:${input.phaseId}:${input.expectedArtifactPath}:${artifactHash}`,
});
return artifact;
});
}
async function insertArtifactRecordInTransaction(
tx: TransactionDb,
input: CanonicalRunSingleFakePhaseInput,
artifactHash: string,
validation: ReturnType<typeof validateArtifact>,
): Promise<{ id: string } | undefined> {
const validationError = validation.ok ? null : { errors: validation.errors };
const inserted = await tx
.insert(artifacts)
.values({
runId: input.runId,
phaseId: input.phaseId,
path: input.expectedArtifactPath,
schemaId: input.expectedSchema,
hash: artifactHash,
valid: validation.ok,
validationError,
})
.onConflictDoNothing({ target: [artifacts.runId, artifacts.path, artifacts.hash] })
.returning({
id: artifacts.id,
phaseId: artifacts.phaseId,
schemaId: artifacts.schemaId,
valid: artifacts.valid,
validationError: artifacts.validationError,
});
const artifact =
inserted[0] ??
(
await tx
.select({
id: artifacts.id,
phaseId: artifacts.phaseId,
schemaId: artifacts.schemaId,
valid: artifacts.valid,
validationError: artifacts.validationError,
})
.from(artifacts)
.where(
and(
eq(artifacts.runId, input.runId),
eq(artifacts.path, input.expectedArtifactPath),
eq(artifacts.hash, artifactHash),
),
)
.limit(1)
)[0];
if (artifact !== undefined) {
assertArtifactReplayMatches(input, validation.ok, validationError, artifact);
}
return artifact === undefined ? undefined : { id: artifact.id };
}
function assertArtifactReplayMatches(
input: CanonicalRunSingleFakePhaseInput,
valid: boolean,
validationError: unknown,
artifact: ArtifactRecord,
) {
const samePhase = artifact.phaseId === input.phaseId;
const sameSchema = artifact.schemaId === input.expectedSchema;
const sameValidity = artifact.valid === valid;
const sameValidationError =
canonicalize(artifact.validationError) === canonicalize(validationError);
if (samePhase && sameSchema && sameValidity && sameValidationError) {
return;
}
throw new DevflowError("Artifact replay does not match current validation context", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
async function markSessionIdle(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
sessionId: string,
promptDedupKey: string,
) {
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
await tx.update(tuiSessions).set({ state: "READY" }).where(eq(tuiSessions.id, sessionId));
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "session.idle",
payload: { sessionId, roleId: input.roleId, dedupKey: promptDedupKey },
idempotencyKey: `session.idle:${sessionId}:${promptDedupKey}`,
});
});
}
async function markSessionReady(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
sessionId: string,
) {
const [session] = await input.db
.select({ recoveryAttempts: tuiSessions.recoveryAttempts })
.from(tuiSessions)
.where(eq(tuiSessions.id, sessionId));
const recoveryAttempts = session?.recoveryAttempts ?? 0;
await eventRepository.append({
runId: input.runId,
phaseId: input.phaseId,
type: "session.ready",
payload: { sessionId, roleId: input.roleId, recoveryAttempts },
idempotencyKey: `session.ready:${sessionId}:${recoveryAttempts}`,
});
}
async function recoverFromArtifactTimeout(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
sessionId: string,
): Promise<boolean> {
const probe = await probeWithTypedError(input.sessions, { sessionId });
if (!probe.alive || !probe.paneActive) {
return false;
}
await setSessionStateIfRunActive(input, sessionId, "RESUMING");
const rebootstrapOk = await rebootstrapWithRetry(input.sessions, { sessionId });
if (!rebootstrapOk) {
return false;
}
await setSessionStateIfRunActive(input, sessionId, "REBOOTSTRAPPED");
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
const [session] = await tx
.select({ recoveryAttempts: tuiSessions.recoveryAttempts })
.from(tuiSessions)
.where(eq(tuiSessions.id, sessionId));
const recoveryAttempts = (session?.recoveryAttempts ?? 0) + 1;
await tx
.update(tuiSessions)
.set({ state: "READY", recoveryAttempts })
.where(eq(tuiSessions.id, sessionId));
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "session.recovered",
payload: { sessionId, roleId: input.roleId, recoveryAttempts },
idempotencyKey: `session.recovered:${sessionId}:${recoveryAttempts}`,
});
});
return true;
}
async function setSessionStateIfRunActive(
input: CanonicalRunSingleFakePhaseInput,
sessionId: string,
state: "RESUMING" | "REBOOTSTRAPPED",
) {
await input.db.transaction(async (tx) => {
await assertRunCanMutatePhaseInTransaction(input, tx);
await tx.update(tuiSessions).set({ state }).where(eq(tuiSessions.id, sessionId));
});
}
async function probeWithTypedError(
sessions: SessionRuntime,
handle: { sessionId: string },
): ReturnType<SessionRuntime["probe"]> {
try {
return await sessions.probe(handle);
} catch (error) {
if (error instanceof DevflowError) {
throw error;
}
throw new DevflowError("Unclassified probe failure", {
class: "fatal",
code: "internal_state_corruption",
cause: error,
});
}
}
async function rebootstrapWithRetry(
sessions: SessionRuntime,
handle: { sessionId: string },
): Promise<boolean> {
for (let attemptsRemaining = 2; attemptsRemaining > 0; attemptsRemaining -= 1) {
try {
await sessions.rebootstrap(handle);
return true;
} catch (error) {
if (!(error instanceof DevflowError)) {
throw new DevflowError("Unclassified rebootstrap failure", {
class: "fatal",
code: "internal_state_corruption",
cause: error,
});
}
if (error.class !== "recoverable") {
throw error;
}
// Retry budget is intentionally one rebootstrap retry after the first failure.
}
}
return false;
}
async function resumeWithRetry(
sessions: SessionRuntime,
handle: { sessionId: string },
): Promise<SessionHandle> {
let lastError: unknown;
for (let physicalAttempt = 0; physicalAttempt <= 2; physicalAttempt += 1) {
try {
return await sessions.resume(handle);
} catch (error) {
lastError = error;
if (!(error instanceof DevflowError) || error.class !== "recoverable") {
throw error;
}
}
}
throw lastError;
}
async function markSessionFailedNeedsHuman(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
sessionId: string,
) {
await input.db
.update(tuiSessions)
.set({ state: "FAILED_NEEDS_HUMAN" })
.where(eq(tuiSessions.id, sessionId));
await eventRepository.append({
runId: input.runId,
phaseId: input.phaseId,
type: "session.failed",
payload: { sessionId, roleId: input.roleId },
idempotencyKey: `session.failed:${sessionId}`,
});
}
async function markAllSessionsFailedInTransaction(
tx: TransactionDb,
eventRepository: RunEventRepository,
runId: string,
): Promise<string[]> {
const sessions = await tx
.select({ id: tuiSessions.id, roleId: tuiSessions.roleId })
.from(tuiSessions)
.where(eq(tuiSessions.runId, runId));
if (sessions.length === 0) {
return [];
}
await tx
.update(tuiSessions)
.set({ state: "FAILED_NEEDS_HUMAN" })
.where(eq(tuiSessions.runId, runId));
for (const session of sessions) {
await eventRepository.appendInTransaction(tx, {
runId,
type: "session.failed",
payload: { sessionId: session.id, roleId: session.roleId },
idempotencyKey: `session.failed:${session.id}`,
});
}
return sessions.map((session) => session.id);
}
async function disposeSessionIds(sessions: SessionRuntime, sessionIds: readonly string[]) {
await Promise.all(
[...new Set(sessionIds)].map((sessionId) =>
sessions.dispose({ sessionId }).catch(() => undefined),
),
);
}
async function waitForArtifact(path: string, options: ArtifactWaitOptions = {}): Promise<void> {
const timeoutMs = options.timeoutMs ?? 5_000;
const pollIntervalMs = options.pollIntervalMs ?? 25;
const stableMs = options.stableMs ?? 500;
const ignoreInitialSignature = options.ignoreInitialSignature;
const deadline = Date.now() + timeoutMs;
let lastSignature: string | undefined;
let stableSince: number | undefined;
while (Date.now() <= deadline) {
try {
const signature = await artifactSignature(path);
if (signature === undefined || signature === ignoreInitialSignature) {
lastSignature = undefined;
stableSince = undefined;
await sleep(pollIntervalMs);
continue;
}
if (lastSignature === signature) {
stableSince ??= Date.now();
if (Date.now() - stableSince >= stableMs) {
return;
}
} else {
lastSignature = signature;
stableSince = Date.now();
if (stableMs === 0) {
return;
}
}
} catch (cause) {
if (cause instanceof DevflowError) {
throw cause;
}
if (isNodeError(cause) && cause.code === "ENOENT") {
lastSignature = undefined;
stableSince = undefined;
} else {
throw new DevflowError("Failed to stat expected artifact path", {
class: "fatal",
code: "workspace_permissions",
recoveryHint: path,
cause,
});
}
}
await sleep(pollIntervalMs);
}
throw new DevflowError("Timed out waiting for fake phase artifact", {
class: "human_required",
code: "artifact_timeout_exhausted",
recoveryHint: path,
});
}
async function artifactSignature(path: string): Promise<string | undefined> {
try {
const artifactStat = await stat(path);
return `${artifactStat.size}:${artifactStat.mtimeMs}`;
} catch (cause) {
if (isNodeError(cause) && cause.code === "ENOENT") {
return undefined;
}
throw new DevflowError("Failed to stat expected artifact path", {
class: "fatal",
code: "workspace_permissions",
recoveryHint: path,
cause,
});
}
}
function parseArtifactJson(bytes: Buffer): unknown {
try {
return JSON.parse(bytes.toString("utf8")) as unknown;
} catch (cause) {
return {
__devflowParseError: "invalid_json",
message: cause instanceof Error ? cause.message : String(cause),
};
}
}
async function readArtifactBytes(input: CanonicalRunSingleFakePhaseInput): Promise<Buffer> {
try {
return await readFile(input.expectedArtifactPath);
} catch (cause) {
throw new DevflowError("Failed to read expected artifact", {
class: "fatal",
code: "workspace_permissions",
runId: input.runId,
phaseId: input.phaseId,
recoveryHint: input.expectedArtifactPath,
cause,
});
}
}
function repairInstructionsFor(instructions: string): string {
const repairLine = "Repair the artifact so it conforms to the expected schema.";
const repairScenario =
/^Repair-Scenario:\s*([A-Za-z0-9_-]+)\s*$/m.exec(instructions)?.[1] ?? "ok";
if (/^Scenario:\s*[A-Za-z0-9_-]+\s*$/m.test(instructions)) {
return instructions.replace(
/^Scenario:\s*[A-Za-z0-9_-]+\s*$/m,
`Scenario: ${repairScenario}\n${repairLine}`,
);
}
return `Scenario: ${repairScenario}\n${repairLine}\n${instructions}`;
}
interface HumanGateRequest {
id: string;
idempotencyKey: string;
}
async function ensureHumanGateRequestInTransaction(
input: CanonicalRunSingleFakePhaseInput,
tx: TransactionDb,
gateKey: string,
attempt: number,
payload: Record<string, unknown>,
): Promise<HumanGateRequest> {
const idempotencyKey = `${input.runId}:${gateKey}:${input.phaseId}:${attempt}`;
const storedPayload = stripUndefinedProperties(payload) as Record<string, unknown>;
await tx
.insert(approvalRequests)
.values({
runId: input.runId,
phaseId: input.phaseId,
gateKey,
state: "pending",
idempotencyKey,
payload: storedPayload,
})
.onConflictDoNothing({ target: approvalRequests.idempotencyKey });
const [request] = await tx
.select({
gateKey: approvalRequests.gateKey,
id: approvalRequests.id,
payload: approvalRequests.payload,
phaseId: approvalRequests.phaseId,
runId: approvalRequests.runId,
})
.from(approvalRequests)
.where(eq(approvalRequests.idempotencyKey, idempotencyKey))
.limit(1);
if (request === undefined) {
throw new DevflowError("Approval request insert returned no row", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
if (
request.runId !== input.runId ||
request.phaseId !== input.phaseId ||
request.gateKey !== gateKey ||
canonicalize(request.payload) !== canonicalize(storedPayload)
) {
throw new DevflowError("Approval request idempotency replay does not match existing request", {
class: "fatal",
code: "internal_state_corruption",
runId: input.runId,
phaseId: input.phaseId,
});
}
return { id: request.id, idempotencyKey };
}
async function appendHumanGateRequestedEventInTransaction(
input: CanonicalRunSingleFakePhaseInput,
eventRepository: RunEventRepository,
tx: TransactionDb,
request: HumanGateRequest,
gateKey: string,
) {
await eventRepository.appendInTransaction(tx, {
runId: input.runId,
phaseId: input.phaseId,
type: "approval.requested",
payload: {
approvalRequestId: request.id,
approvalIdempotencyKey: request.idempotencyKey,
gateKey,
},
idempotencyKey: `approval.requested:${request.idempotencyKey}`,
});
}
function stripUndefinedProperties(value: unknown): unknown {
if (Array.isArray(value)) {
return value.map((item) => stripUndefinedProperties(item));
}
if (value !== null && typeof value === "object") {
return Object.fromEntries(
Object.entries(value as Record<string, unknown>)
.filter(([, child]) => child !== undefined)
.map(([key, child]) => [key, stripUndefinedProperties(child)]),
);
}
return value;
}
function isDevflowErrorWithCode(error: unknown, code: string): error is DevflowError {
return error instanceof DevflowError && error.code === code;
}
function isRunStateChanged(error: unknown): error is DevflowError {
return isDevflowErrorWithCode(error, "run_state_changed");
}
function shouldCreateHumanGate(error: unknown): error is DevflowError {
return error instanceof DevflowError && error.class !== "fatal";
}
function toHumanRequiredRecoveryError(error: DevflowError): DevflowError {
if (error.class === "human_required") {
return error;
}
const options: ConstructorParameters<typeof DevflowError>[1] = {
class: "human_required",
code: "prompt_send_exhausted",
recoveryHint: error.recoveryHint ?? error.message,
cause: error,
};
if (error.runId !== undefined) {
options.runId = error.runId;
}
if (error.phaseId !== undefined) {
options.phaseId = error.phaseId;
}
return new DevflowError("Recoverable session error exhausted retry budget", {
...options,
});
}
function toArtifactTimeoutRecoveryGateError(error: DevflowError): DevflowError {
if (error.class === "human_required") {
return error;
}
const options: ConstructorParameters<typeof DevflowError>[1] = {
class: "human_required",
code: "artifact_timeout_exhausted",
recoveryHint: error.recoveryHint ?? error.message,
cause: error,
};
if (error.runId !== undefined) {
options.runId = error.runId;
}
if (error.phaseId !== undefined) {
options.phaseId = error.phaseId;
}
return new DevflowError("Artifact timeout recovery exhausted retry budget", options);
}
async function removeStaleArtifact(input: CanonicalRunSingleFakePhaseInput): Promise<void> {
try {
await unlink(input.expectedArtifactPath);
} catch (cause) {
if (isNodeError(cause) && cause.code === "ENOENT") {
return;
}
throw new DevflowError("Failed to remove stale artifact before waiting", {
class: "fatal",
code: "workspace_permissions",
runId: input.runId,
phaseId: input.phaseId,
recoveryHint: input.expectedArtifactPath,
cause,
});
}
}
async function captureTranscript(
input: CanonicalRunSingleFakePhaseInput,
handle: { sessionId: string },
) {
const sink = input.transcriptSink ?? new TuiTranscriptRepository(input.db);
const [session] = await input.db
.select({ lastCaptureSeq: tuiSessions.lastCaptureSeq })
.from(tuiSessions)
.where(eq(tuiSessions.id, handle.sessionId));
if (session === undefined) {
throw new DevflowError("TUI session does not exist for transcript capture", {
class: "fatal",
code: "session_not_found",
runId: input.runId,
phaseId: input.phaseId,
});
}
return captureAndPersistTranscript({
adapter: input.sessions,
handle,
fromSeq: session.lastCaptureSeq,
sink,
});
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function isNodeError(error: unknown): error is NodeJS.ErrnoException {
return error instanceof Error && "code" in error;
}