diff --git a/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts b/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts index 69560c49e88..3a01f8f4397 100644 --- a/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts @@ -3,6 +3,8 @@ import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { ABORT_REASON_SEND_ERROR, createSSELoader, SendFunction } from "~/utils/sse"; import { throttle } from "~/utils/throttle"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; +import { deserialiseMollifierSnapshot } from "~/v3/mollifier/mollifierSnapshot.server"; import { tracePubSub } from "~/v3/services/tracePubSub.server"; const PING_INTERVAL = 5_000; @@ -37,17 +39,48 @@ export class RunStreamPresenter { }, }); - if (!run) { + // Fall back to the mollifier buffer when the run isn't in PG yet. + // The buffered run has no execution events to stream, but we still + // attach a trace-pubsub subscription using the snapshot's traceId + // so that the moment the drainer materialises the row and execution + // begins, those events flow to this open SSE connection. Closing + // with 404 would force the dashboard to keep retrying. + let traceId: string | null = run?.traceId ?? null; + if (!traceId) { + const buffer = getMollifierBuffer(); + if (buffer) { + try { + const entry = await buffer.getEntry(runFriendlyId); + if (entry) { + // Go through the webapp wrapper so this read-side module + // shares a single deserialisation path with readFallback — + // see the contract comment in syntheticRedirectInfo.server.ts. + const snapshot = deserialiseMollifierSnapshot(entry.payload); + if (typeof snapshot.traceId === "string") { + traceId = snapshot.traceId; + } + } + } catch (err) { + logger.warn("RunStreamPresenter buffer fallback failed", { + runFriendlyId, + err: err instanceof Error ? err.message : String(err), + }); + } + } + } + + if (!traceId) { throw new Response("Not found", { status: 404 }); } + const resolvedRun = { traceId }; logger.info("RunStreamPresenter.start", { runFriendlyId, - traceId: run.traceId, + traceId: resolvedRun.traceId, }); // Subscribe to trace updates - const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(run.traceId); + const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(resolvedRun.traceId); // Only send max every 1 second const throttledSend = throttle( @@ -105,7 +138,7 @@ export class RunStreamPresenter { cleanup: () => { logger.info("RunStreamPresenter.cleanup", { runFriendlyId, - traceId: run.traceId, + traceId: resolvedRun.traceId, }); // Remove message listener @@ -119,13 +152,13 @@ export class RunStreamPresenter { .then(() => { logger.info("RunStreamPresenter.cleanup.unsubscribe succeeded", { runFriendlyId, - traceId: run.traceId, + traceId: resolvedRun.traceId, }); }) .catch((error) => { logger.error("RunStreamPresenter.cleanup.unsubscribe failed", { runFriendlyId, - traceId: run.traceId, + traceId: resolvedRun.traceId, error: { name: error.name, message: error.message, diff --git a/apps/webapp/app/routes/@.runs.$runParam.ts b/apps/webapp/app/routes/@.runs.$runParam.ts index a52600628d8..a709191271e 100644 --- a/apps/webapp/app/routes/@.runs.$runParam.ts +++ b/apps/webapp/app/routes/@.runs.$runParam.ts @@ -3,7 +3,8 @@ import { z } from "zod"; import { prisma } from "~/db.server"; import { redirectWithErrorMessage } from "~/models/message.server"; import { requireUser } from "~/services/session.server"; -import { impersonate, rootPath, v3RunPath } from "~/utils/pathBuilder"; +import { impersonate, rootPath, v3RunPath, v3RunSpanPath } from "~/utils/pathBuilder"; +import { findBufferedRunRedirectInfo } from "~/v3/mollifier/syntheticRedirectInfo.server"; const ParamsSchema = z.object({ runParam: z.string(), @@ -32,6 +33,7 @@ export async function loader({ params, request }: LoaderFunctionArgs) { friendlyId: runParam, }, select: { + spanId: true, runtimeEnvironment: { select: { slug: true, @@ -51,16 +53,45 @@ export async function loader({ params, request }: LoaderFunctionArgs) { }); if (!run) { + // Admin impersonation route — bypass org membership so admins can + // open any buffered run by friendlyId, mirroring the existing PG + // behaviour above (no membership filter on the find). + const buffered = await findBufferedRunRedirectInfo({ + runFriendlyId: runParam, + userId: user.id, + skipOrgMembershipCheck: true, + }); + if (buffered) { + // Preselect the root span so the run-detail trace tree opens with + // the buffered run's span highlighted, matching the sibling + // redirect routes (runs.$runParam.ts, projects.v3.$projectRef…). + const path = buffered.spanId + ? v3RunSpanPath( + { slug: buffered.organizationSlug }, + { slug: buffered.projectSlug }, + { slug: buffered.environmentSlug }, + { friendlyId: runParam }, + { spanId: buffered.spanId } + ) + : v3RunPath( + { slug: buffered.organizationSlug }, + { slug: buffered.projectSlug }, + { slug: buffered.environmentSlug }, + { friendlyId: runParam } + ); + return redirect(impersonate(path)); + } return redirectWithErrorMessage(rootPath(), request, "Run doesn't exist", { ephemeral: false, }); } - const path = v3RunPath( + const path = v3RunSpanPath( { slug: run.project.organization.slug }, { slug: run.project.slug }, { slug: run.runtimeEnvironment.slug }, - { friendlyId: runParam } + { friendlyId: runParam }, + { spanId: run.spanId } ); return redirect(impersonate(path)); diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx index d55511e7ff5..0d8312a7899 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx @@ -88,10 +88,14 @@ import { useReplaceSearchParams } from "~/hooks/useReplaceSearchParams"; import { useSearchParams } from "~/hooks/useSearchParam"; import { type Shortcut, useShortcutKeys } from "~/hooks/useShortcutKeys"; import { useHasAdminAccess } from "~/hooks/useUser"; +import { env } from "~/env.server"; import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { NextRunListPresenter } from "~/presenters/v3/NextRunListPresenter.server"; import { RunEnvironmentMismatchError, RunPresenter } from "~/presenters/v3/RunPresenter.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { buildSyntheticRunHeader } from "~/v3/mollifier/syntheticRunHeader.server"; +import { buildSyntheticTraceForBufferedRun } from "~/v3/mollifier/syntheticTrace.server"; import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server"; import { getImpersonationId } from "~/services/impersonation.server"; import { logger } from "~/services/logger.server"; @@ -277,6 +281,31 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { ); } + // PG miss → try the mollifier buffer. When the gate diverts a trigger + // the run sits in Redis until the drainer materialises it; without + // this fallback the run-detail page 404s for the brief buffered window + // even though the API has accepted the trigger and returned an id. + const buffered = await tryMollifiedRunFallback({ + runFriendlyId: runParam, + organizationSlug, + projectSlug: projectParam, + envSlug: envParam, + userId, + }); + + if (buffered) { + const parent = await getResizableSnapshot(request, resizableSettings.parent.autosaveId); + const tree = await getResizableSnapshot(request, resizableSettings.tree.autosaveId); + + return json({ + run: buffered.run, + trace: buffered.trace, + maximumLiveReloadingSetting: env.MAXIMUM_LIVE_RELOADING_EVENTS, + resizable: { parent, tree }, + runsList: null, + }); + } + throw error; } @@ -305,6 +334,39 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { }); }; +async function tryMollifiedRunFallback(args: { + runFriendlyId: string; + organizationSlug: string; + projectSlug: string; + envSlug: string; + userId: string; +}) { + const project = await findProjectBySlug(args.organizationSlug, args.projectSlug, args.userId); + if (!project) return null; + const environment = await findEnvironmentBySlug(project.id, args.envSlug, args.userId); + if (!environment) return null; + + const buffered = await findRunByIdWithMollifierFallback({ + runId: args.runFriendlyId, + environmentId: environment.id, + organizationId: project.organizationId, + }); + if (!buffered) return null; + + return { + run: buildSyntheticRunHeader({ + run: buffered, + environment: { + id: environment.id, + organizationId: project.organizationId, + type: environment.type, + slug: environment.slug, + }, + }), + trace: buildSyntheticTraceForBufferedRun(buffered), + }; +} + type LoaderData = SerializeFrom; export default function Page() { @@ -407,23 +469,17 @@ export default function Page() { /> {run.isFinished ? null : ( - - - - - - + )} @@ -587,6 +643,35 @@ function TraceView({ ); } +// Controlled wrapper around the cancel dialog. Owns the Radix open state +// so the dialog closes itself once the cancel action transitions through +// submission. We can't ``-wrap the submit button +// because Radix's onClick handler swallows the button's name=value pair +// that the form action depends on for `redirectUrl`. +function ControlledCancelRunDialog({ + runFriendlyId, + redirectPath, +}: { + runFriendlyId: string; + redirectPath: string; +}) { + const [open, setOpen] = useState(false); + return ( + + + + + setOpen(false)} + /> + + ); +} + function NoLogsView({ run, resizable }: Pick) { const plan = useCurrentPlan(); const organization = useOrganization(); @@ -616,9 +701,13 @@ function NoLogsView({ run, resizable }: Pick) { >
{daysSinceCompleted === undefined ? ( - + - We tidy up older logs to keep things running smoothly. + This run is queued. Logs will appear here once it begins executing. ) : isWithinLogRetention ? ( diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts index 240d7d3d8ed..fa6ee29f3db 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts @@ -6,6 +6,7 @@ import { redirectWithErrorMessage, redirectWithSuccessMessage } from "~/models/m import { logger } from "~/services/logger.server"; import { requireUserId } from "~/services/session.server"; import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; export const cancelSchema = z.object({ redirectUrl: z.string(), @@ -42,15 +43,56 @@ export const action: ActionFunction = async ({ request, params }) => { }, }); - if (!taskRun) { + if (taskRun) { + const cancelRunService = new CancelTaskRunService(); + await cancelRunService.call(taskRun); + return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`); + } + + // PG miss — try the mollifier buffer. The customer can hit cancel + // on a buffered run from the dashboard during the burst window. + // Snapshot a `mark_cancelled` patch; the drainer's + // bifurcation routes the run to `engine.createCancelledRun` on + // next pop. + const buffer = getMollifierBuffer(); + const entry = buffer ? await buffer.getEntry(runParam) : null; + if (!entry) { submission.error = { runParam: ["Run not found"] }; return json(submission); } - const cancelRunService = new CancelTaskRunService(); - await cancelRunService.call(taskRun); + // Dashboard auth: verify the requesting user is a member of the + // buffered run's org. The API path scopes by env id from the + // authenticated request; the dashboard route uses org-membership + // because the URL doesn't carry an envId. + const member = await prisma.orgMember.findFirst({ + where: { userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) { + submission.error = { runParam: ["Run not found"] }; + return json(submission); + } - return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`); + const result = await buffer!.mutateSnapshot(runParam, { + type: "mark_cancelled", + cancelledAt: new Date().toISOString(), + cancelReason: "Canceled by user", + }); + if (result === "applied_to_snapshot") { + return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`); + } + // "not_found" or "busy" — both indicate the drainer raced us between + // the getEntry check above and mutateSnapshot. On "not_found" the + // entry was just popped and the PG row is in flight; on "busy" the + // drainer is mid-materialisation. Either way the customer should + // retry — by then the PG row exists and the regular cancel path at + // the top of this action takes over. + return redirectWithErrorMessage( + submission.value.redirectUrl, + request, + "Run is materialising — retry in a moment" + ); } catch (error) { if (error instanceof Error) { logger.error("Failed to cancel run", { diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts index 8a22822d06b..8008fb90b7d 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts @@ -11,6 +11,12 @@ import { requireUser } from "~/services/session.server"; import { sortEnvironments } from "~/utils/environmentSort"; import { v3RunSpanPath } from "~/utils/pathBuilder"; import { ReplayTaskRunService } from "~/v3/services/replayTaskRun.server"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { + buildSyntheticReplayTaskRun, + type SyntheticReplayTaskRun, +} from "~/v3/mollifier/syntheticReplayTaskRun.server"; import parseDuration from "parse-duration"; import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server"; import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server"; @@ -33,7 +39,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) { Object.fromEntries(new URL(request.url).searchParams) ); - const run = await $replica.taskRun.findFirst({ + let run = await $replica.taskRun.findFirst({ select: { payload: true, payloadType: true, @@ -88,6 +94,74 @@ export async function loader({ request, params }: LoaderFunctionArgs) { where: { friendlyId: runParam, project: { organization: { members: { some: { userId } } } } }, }); + let synthetic: + | (Awaited> & { __synth: true }) + | undefined; + if (!run) { + // Buffered fallback: read the snapshot and look up the env list via + // the snapshot's organizationId. Without this the Replay dialog + // 404s for runs queued in the mollifier buffer, which dumps the + // user back to the task list. + const buffer = getMollifierBuffer(); + const entry = buffer ? await buffer.getEntry(runParam) : null; + if (!entry) throw new Response("Not Found", { status: 404 }); + const member = await prisma.orgMember.findFirst({ + where: { userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) throw new Response("Not Found", { status: 404 }); + const buffered = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: entry.envId, + organizationId: entry.orgId, + }); + if (!buffered) throw new Response("Not Found", { status: 404 }); + synthetic = Object.assign(buffered, { __synth: true as const }); + const orgProject = await $replica.project.findFirst({ + where: { + environments: { some: { id: entry.envId } }, + }, + select: { + slug: true, + environments: { + select: { + id: true, + type: true, + slug: true, + branchName: true, + orgMember: { select: { user: true } }, + }, + where: { + archivedAt: null, + OR: [ + { type: { in: ["PREVIEW", "STAGING", "PRODUCTION"] } }, + { type: "DEVELOPMENT", orgMember: { userId } }, + ], + }, + }, + }, + }); + if (!orgProject) throw new Response("Not Found", { status: 404 }); + run = { + payload: buffered.payload, + payloadType: buffered.payloadType ?? "application/json", + seedMetadata: buffered.seedMetadata ?? null, + seedMetadataType: buffered.seedMetadataType ?? null, + runtimeEnvironmentId: entry.envId, + concurrencyKey: buffered.concurrencyKey ?? null, + maxAttempts: buffered.maxAttempts ?? null, + maxDurationInSeconds: buffered.maxDurationInSeconds ?? null, + machinePreset: buffered.machinePreset ?? null, + workerQueue: buffered.workerQueue ?? null, + ttl: buffered.ttl ?? null, + idempotencyKey: buffered.idempotencyKey ?? null, + runTags: buffered.runTags, + queue: buffered.queue ?? "task/", + taskIdentifier: buffered.taskIdentifier ?? "", + project: orgProject, + } as unknown as typeof run; + } + if (!run) { throw new Response("Not Found", { status: 404 }); } @@ -174,7 +248,7 @@ export const action: ActionFunction = async ({ request, params }) => { } try { - const taskRun = await prisma.taskRun.findFirst({ + const pgRun = await prisma.taskRun.findFirst({ where: { friendlyId: runParam, }, @@ -192,6 +266,36 @@ export const action: ActionFunction = async ({ request, params }) => { }, }); + // Mollifier read-fallback: if the original isn't in PG yet, + // synthesise a TaskRun from the buffered snapshot. The B4-extended + // SyntheticRun carries every field ReplayTaskRunService reads. We + // also need projectSlug + orgSlug + envSlug for the redirect path, + // so look those up via the snapshot's runtimeEnvironmentId. + let taskRun: SyntheticReplayTaskRun | null = pgRun ?? null; + if (!taskRun) { + const buffer = getMollifierBuffer(); + const entry = buffer ? await buffer.getEntry(runParam) : null; + if (entry) { + const synthetic = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: entry.envId, + organizationId: entry.orgId, + }); + if (synthetic) { + const envRow = await prisma.runtimeEnvironment.findFirst({ + where: { id: entry.envId }, + select: { + slug: true, + project: { select: { slug: true, organization: { select: { slug: true } } } }, + }, + }); + if (envRow) { + taskRun = buildSyntheticReplayTaskRun({ synthetic, envRow }); + } + } + } + } + if (!taskRun) { return redirectWithErrorMessage(submission.value.failedRedirect, request, "Run not found"); } diff --git a/apps/webapp/app/v3/mollifier/syntheticReplayTaskRun.server.ts b/apps/webapp/app/v3/mollifier/syntheticReplayTaskRun.server.ts new file mode 100644 index 00000000000..c6afc7c3069 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/syntheticReplayTaskRun.server.ts @@ -0,0 +1,38 @@ +import type { TaskRun } from "@trigger.dev/database"; +import type { SyntheticRun } from "./readFallback.server"; + +export type SyntheticReplayTaskRun = TaskRun & { + project: { slug: string; organization: { slug: string } }; + runtimeEnvironment: { slug: string }; +}; + +// Adapt a buffered-run snapshot into the TaskRun-shaped input that +// `ReplayTaskRunService.call` expects. ReplayTaskRunService builds the +// new run's traceparent as `00-${existingTaskRun.traceId}-${existingTaskRun.spanId}-01` +// without guarding for undefined, so a synthetic with missing traceId +// or spanId (older snapshots — both fields are documented optional on +// `SyntheticRun`) would produce `00-undefined-undefined-01`, an invalid +// W3C traceparent that OTel silently drops, severing the replay's trace +// link to the original run. +// +// Returns null when those fields are missing — the caller surfaces this +// as "Run not found" so the customer retries once the drainer has +// materialised the PG row, where traceId/spanId are guaranteed present. +export function buildSyntheticReplayTaskRun(args: { + synthetic: SyntheticRun; + envRow: { + slug: string; + project: { slug: string; organization: { slug: string } }; + }; +}): SyntheticReplayTaskRun | null { + const { synthetic, envRow } = args; + if (!synthetic.traceId || !synthetic.spanId) return null; + return { + ...(synthetic as unknown as TaskRun), + project: { + slug: envRow.project.slug, + organization: { slug: envRow.project.organization.slug }, + }, + runtimeEnvironment: { slug: envRow.slug }, + }; +} diff --git a/apps/webapp/app/v3/mollifier/syntheticRunHeader.server.ts b/apps/webapp/app/v3/mollifier/syntheticRunHeader.server.ts new file mode 100644 index 00000000000..4ce296d16bf --- /dev/null +++ b/apps/webapp/app/v3/mollifier/syntheticRunHeader.server.ts @@ -0,0 +1,49 @@ +import type { SyntheticRun } from "./readFallback.server"; + +// Synthesise the run-detail page's `run` header shape (the NavBar + +// status badge + Cancel-button gate) from a buffered run snapshot. The +// shape matches `RunPresenter.getRun`'s `runData` — keep this in sync +// when fields are added there. +// +// CANCELED state is reflected back from `SyntheticRun.cancelledAt` / +// `status` so that after a buffered-cancel the NavBar shows the run as +// CANCELED + isFinished:true (which collapses the Cancel button) before +// the drainer materialises the PG row. This mirrors what +// `buildSyntheticSpanRun` does for the right-side details panel — the +// SyntheticRun.cancelledAt contract comment in readFallback.server.ts +// names this exact UI surface. +export function buildSyntheticRunHeader(args: { + run: SyntheticRun; + environment: { + id: string; + organizationId: string; + type: "PRODUCTION" | "DEVELOPMENT" | "STAGING" | "PREVIEW"; + slug: string; + }; +}) { + const { run, environment } = args; + const isCancelled = run.status === "CANCELED"; + + return { + id: run.friendlyId, + number: 1, + friendlyId: run.friendlyId, + traceId: run.traceId ?? "", + spanId: run.spanId ?? "", + status: isCancelled ? ("CANCELED" as const) : ("PENDING" as const), + isFinished: isCancelled, + startedAt: null, + completedAt: run.cancelledAt ?? null, + logsDeletedAt: null, + rootTaskRun: null, + parentTaskRun: null, + environment: { + id: environment.id, + organizationId: environment.organizationId, + type: environment.type, + slug: environment.slug, + userId: undefined, + userName: undefined, + }, + }; +} diff --git a/apps/webapp/test/mollifierSyntheticReplayTaskRun.test.ts b/apps/webapp/test/mollifierSyntheticReplayTaskRun.test.ts new file mode 100644 index 00000000000..6df2d92dde4 --- /dev/null +++ b/apps/webapp/test/mollifierSyntheticReplayTaskRun.test.ts @@ -0,0 +1,106 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { buildSyntheticReplayTaskRun } from "~/v3/mollifier/syntheticReplayTaskRun.server"; +import type { SyntheticRun } from "~/v3/mollifier/readFallback.server"; + +const NOW = new Date("2026-05-21T10:00:00Z"); + +function makeSyntheticRun(overrides: Partial = {}): SyntheticRun { + return { + id: "run_internal_1", + friendlyId: "run_friendly_1", + status: "QUEUED", + cancelledAt: undefined, + cancelReason: undefined, + delayUntil: undefined, + taskIdentifier: "hello-world", + createdAt: NOW, + payload: { message: "hi" }, + payloadType: "application/json", + metadata: undefined, + metadataType: undefined, + seedMetadata: undefined, + seedMetadataType: undefined, + idempotencyKey: undefined, + idempotencyKeyOptions: undefined, + isTest: false, + depth: 0, + ttl: "10m", + tags: [], + runTags: [], + lockedToVersion: undefined, + resumeParentOnCompletion: false, + parentTaskRunId: undefined, + traceId: "trace_1", + spanId: "span_1", + parentSpanId: undefined, + runtimeEnvironmentId: "env_a", + engine: "V2", + workerQueue: "worker-queue-1", + queue: "task/hello-world", + concurrencyKey: undefined, + machinePreset: "small-1x", + realtimeStreamsVersion: "v1", + maxAttempts: 3, + maxDurationInSeconds: 3600, + replayedFromTaskRunFriendlyId: undefined, + annotations: undefined, + traceContext: undefined, + scheduleId: undefined, + batchId: undefined, + parentTaskRunFriendlyId: undefined, + rootTaskRunFriendlyId: undefined, + ...overrides, + }; +} + +const ENV_ROW = { + slug: "dev", + project: { slug: "hello-world", organization: { slug: "references" } }, +}; + +describe("buildSyntheticReplayTaskRun", () => { + it("returns the adapted TaskRun shape when traceId and spanId are present", () => { + const taskRun = buildSyntheticReplayTaskRun({ + synthetic: makeSyntheticRun(), + envRow: ENV_ROW, + }); + expect(taskRun).not.toBeNull(); + expect(taskRun!.traceId).toBe("trace_1"); + expect(taskRun!.spanId).toBe("span_1"); + expect(taskRun!.project.slug).toBe("hello-world"); + expect(taskRun!.project.organization.slug).toBe("references"); + expect(taskRun!.runtimeEnvironment.slug).toBe("dev"); + }); + + it("returns null when the snapshot has no traceId", () => { + // ReplayTaskRunService builds `00-${traceId}-${spanId}-01` without + // guarding for undefined. Falling through with a missing traceId + // would emit `00-undefined-...-01`, an invalid W3C traceparent that + // OTel silently drops, breaking the replayed run's trace linkage to + // the original. The helper must refuse rather than degrade silently. + const taskRun = buildSyntheticReplayTaskRun({ + synthetic: makeSyntheticRun({ traceId: undefined }), + envRow: ENV_ROW, + }); + expect(taskRun).toBeNull(); + }); + + it("returns null when the snapshot has no spanId", () => { + const taskRun = buildSyntheticReplayTaskRun({ + synthetic: makeSyntheticRun({ spanId: undefined }), + envRow: ENV_ROW, + }); + expect(taskRun).toBeNull(); + }); + + it("returns null when both traceId and spanId are missing", () => { + const taskRun = buildSyntheticReplayTaskRun({ + synthetic: makeSyntheticRun({ traceId: undefined, spanId: undefined }), + envRow: ENV_ROW, + }); + expect(taskRun).toBeNull(); + }); +}); diff --git a/apps/webapp/test/mollifierSyntheticRunHeader.test.ts b/apps/webapp/test/mollifierSyntheticRunHeader.test.ts new file mode 100644 index 00000000000..7335a12ec46 --- /dev/null +++ b/apps/webapp/test/mollifierSyntheticRunHeader.test.ts @@ -0,0 +1,111 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { buildSyntheticRunHeader } from "~/v3/mollifier/syntheticRunHeader.server"; +import type { SyntheticRun } from "~/v3/mollifier/readFallback.server"; + +const NOW = new Date("2026-05-21T10:00:00Z"); +const CANCELLED_AT = new Date("2026-05-21T10:00:30Z"); + +function makeSyntheticRun(overrides: Partial = {}): SyntheticRun { + return { + id: "run_internal_1", + friendlyId: "run_friendly_1", + status: "QUEUED", + cancelledAt: undefined, + cancelReason: undefined, + delayUntil: undefined, + taskIdentifier: "hello-world", + createdAt: NOW, + payload: { message: "hi" }, + payloadType: "application/json", + metadata: undefined, + metadataType: undefined, + seedMetadata: undefined, + seedMetadataType: undefined, + idempotencyKey: undefined, + idempotencyKeyOptions: undefined, + isTest: false, + depth: 0, + ttl: "10m", + tags: [], + runTags: [], + lockedToVersion: undefined, + resumeParentOnCompletion: false, + parentTaskRunId: undefined, + traceId: "trace_1", + spanId: "span_1", + parentSpanId: undefined, + runtimeEnvironmentId: "env_a", + engine: "V2", + workerQueue: "worker-queue-1", + queue: "task/hello-world", + concurrencyKey: undefined, + machinePreset: "small-1x", + realtimeStreamsVersion: "v1", + maxAttempts: 3, + maxDurationInSeconds: 3600, + replayedFromTaskRunFriendlyId: undefined, + annotations: undefined, + traceContext: undefined, + scheduleId: undefined, + batchId: undefined, + parentTaskRunFriendlyId: undefined, + rootTaskRunFriendlyId: undefined, + ...overrides, + }; +} + +const ENV = { + id: "env_a", + organizationId: "org_a", + type: "DEVELOPMENT" as const, + slug: "dev", +}; + +describe("buildSyntheticRunHeader", () => { + it("returns PENDING / non-final state for a queued buffered run", () => { + const header = buildSyntheticRunHeader({ run: makeSyntheticRun(), environment: ENV }); + expect(header.status).toBe("PENDING"); + expect(header.isFinished).toBe(false); + expect(header.completedAt).toBeNull(); + }); + + it("reflects CANCELED state from the snapshot so the NavBar and Cancel-button gate update before the drainer materialises", () => { + const header = buildSyntheticRunHeader({ + run: makeSyntheticRun({ status: "CANCELED", cancelledAt: CANCELLED_AT }), + environment: ENV, + }); + // The Cancel button in route.tsx is gated on `!run.isFinished` and the + // status badge reads `run.status`. Both must flip on buffered-cancel + // or the user sees a "Pending" badge with a Cancel button on a run + // that's already cancelled in the snapshot. + expect(header.status).toBe("CANCELED"); + expect(header.isFinished).toBe(true); + expect(header.completedAt).toEqual(CANCELLED_AT); + }); + + it("forwards identity and environment fields from the snapshot", () => { + const header = buildSyntheticRunHeader({ run: makeSyntheticRun(), environment: ENV }); + expect(header.friendlyId).toBe("run_friendly_1"); + expect(header.id).toBe("run_friendly_1"); + expect(header.traceId).toBe("trace_1"); + expect(header.spanId).toBe("span_1"); + expect(header.environment).toMatchObject({ + id: "env_a", + organizationId: "org_a", + type: "DEVELOPMENT", + slug: "dev", + }); + }); + + it("falls back to empty strings when the snapshot has no trace/span ids", () => { + const header = buildSyntheticRunHeader({ + run: makeSyntheticRun({ traceId: undefined, spanId: undefined }), + environment: ENV, + }); + expect(header.traceId).toBe(""); + expect(header.spanId).toBe(""); + }); +});