From 35d210dea1128903246f8c7ecab93c62a0e82b34 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 11:28:17 +0100 Subject: [PATCH 1/6] feat(webapp): dashboard parity for mollifier-buffered runs Dashboard run detail, span detail, streams view, realtime subscription, redirect routes, replay/cancel/idempotency-reset action routes, the logs download route, and the cancel dialog all handle buffered runs by falling back to the mollifier snapshot. Stacked on the mutations PR. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../components/runs/v3/CancelRunDialog.tsx | 24 ++- .../v3/RunStreamPresenter.server.ts | 42 ++++- apps/webapp/app/routes/@.runs.$runParam.ts | 21 +++ .../route.tsx | 139 +++++++++++++--- .../projects.v3.$projectRef.runs.$runParam.ts | 25 ++- .../app/routes/realtime.v1.runs.$runId.ts | 43 ++++- ...am.runs.$runParam.idempotencyKey.reset.tsx | 54 ++++++- ...ram.realtime.v1.sessions.$sessionId.$io.ts | 15 ++ ...am.realtime.v1.streams.$runId.$streamId.ts | 17 ++ ...ltime.v1.streams.$runId.input.$streamId.ts | 15 ++ .../route.tsx | 44 +++++ .../route.tsx | 23 +++ .../resources.runs.$runParam.logs.download.ts | 34 ++++ .../resources.taskruns.$runParam.cancel.ts | 50 +++++- .../resources.taskruns.$runParam.debug.ts | 41 +++++ .../resources.taskruns.$runParam.replay.ts | 114 ++++++++++++- apps/webapp/app/routes/runs.$runParam.ts | 21 +++ .../mollifier/realtimeRunResource.server.ts | 57 +++++++ .../test/mollifierRealtimeRunResource.test.ts | 90 +++++++++++ ...mollifierRealtimeRunResourceBuffer.test.ts | 152 ++++++++++++++++++ .../mollifierRealtimeSubscription.test.ts | 46 ++++++ 21 files changed, 1024 insertions(+), 43 deletions(-) create mode 100644 apps/webapp/app/v3/mollifier/realtimeRunResource.server.ts create mode 100644 apps/webapp/test/mollifierRealtimeRunResource.test.ts create mode 100644 apps/webapp/test/mollifierRealtimeRunResourceBuffer.test.ts create mode 100644 apps/webapp/test/mollifierRealtimeSubscription.test.ts diff --git a/apps/webapp/app/components/runs/v3/CancelRunDialog.tsx b/apps/webapp/app/components/runs/v3/CancelRunDialog.tsx index facff746c5e..72947c4c8f7 100644 --- a/apps/webapp/app/components/runs/v3/CancelRunDialog.tsx +++ b/apps/webapp/app/components/runs/v3/CancelRunDialog.tsx @@ -1,6 +1,7 @@ import { NoSymbolIcon } from "@heroicons/react/24/solid"; import { DialogClose } from "@radix-ui/react-dialog"; import { Form, useNavigation } from "@remix-run/react"; +import { useEffect, useRef } from "react"; import { Button } from "~/components/primitives/Buttons"; import { DialogContent, DialogHeader } from "~/components/primitives/Dialog"; import { FormButtons } from "~/components/primitives/FormButtons"; @@ -10,14 +11,35 @@ import { SpinnerWhite } from "~/components/primitives/Spinner"; type CancelRunDialogProps = { runFriendlyId: string; redirectPath: string; + // Optional: when provided, close the dialog as soon as the cancel + // action transitions to "loading" (the redirect is in flight). Lets + // the caller control the open state without interfering with the + // form's submit name=value pair the way `` + // around the submit button does. + onCancelSubmitted?: () => void; }; -export function CancelRunDialog({ runFriendlyId, redirectPath }: CancelRunDialogProps) { +export function CancelRunDialog({ + runFriendlyId, + redirectPath, + onCancelSubmitted, +}: CancelRunDialogProps) { const navigation = useNavigation(); const formAction = `/resources/taskruns/${runFriendlyId}/cancel`; const isLoading = navigation.formAction === formAction; + const wasSubmitting = useRef(false); + useEffect(() => { + if (!onCancelSubmitted) return; + if (navigation.state === "submitting" && navigation.formAction === formAction) { + wasSubmitting.current = true; + } else if (wasSubmitting.current && navigation.state !== "submitting") { + wasSubmitting.current = false; + onCancelSubmitted(); + } + }, [navigation.state, navigation.formAction, formAction, onCancelSubmitted]); + return ( Cancel this run? diff --git a/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts b/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts index 69560c49e88..c95f68e3f2c 100644 --- a/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts @@ -3,6 +3,8 @@ import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { ABORT_REASON_SEND_ERROR, createSSELoader, SendFunction } from "~/utils/sse"; import { throttle } from "~/utils/throttle"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; +import { deserialiseSnapshot } from "@trigger.dev/redis-worker"; import { tracePubSub } from "~/v3/services/tracePubSub.server"; const PING_INTERVAL = 5_000; @@ -37,17 +39,45 @@ export class RunStreamPresenter { }, }); - if (!run) { + // Fall back to the mollifier buffer when the run isn't in PG yet. + // The buffered run has no execution events to stream, but we still + // attach a trace-pubsub subscription using the snapshot's traceId + // so that the moment the drainer materialises the row and execution + // begins, those events flow to this open SSE connection. Closing + // with 404 would force the dashboard to keep retrying. + let traceId: string | null = run?.traceId ?? null; + if (!traceId) { + const buffer = getMollifierBuffer(); + if (buffer) { + try { + const entry = await buffer.getEntry(runFriendlyId); + if (entry) { + const snapshot = deserialiseSnapshot<{ traceId?: string }>(entry.payload); + if (typeof snapshot.traceId === "string") { + traceId = snapshot.traceId; + } + } + } catch (err) { + logger.warn("RunStreamPresenter buffer fallback failed", { + runFriendlyId, + err: err instanceof Error ? err.message : String(err), + }); + } + } + } + + if (!traceId) { throw new Response("Not found", { status: 404 }); } + const resolvedRun = { traceId }; logger.info("RunStreamPresenter.start", { runFriendlyId, - traceId: run.traceId, + traceId: resolvedRun.traceId, }); // Subscribe to trace updates - const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(run.traceId); + const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(resolvedRun.traceId); // Only send max every 1 second const throttledSend = throttle( @@ -105,7 +135,7 @@ export class RunStreamPresenter { cleanup: () => { logger.info("RunStreamPresenter.cleanup", { runFriendlyId, - traceId: run.traceId, + traceId: resolvedRun.traceId, }); // Remove message listener @@ -119,13 +149,13 @@ export class RunStreamPresenter { .then(() => { logger.info("RunStreamPresenter.cleanup.unsubscribe succeeded", { runFriendlyId, - traceId: run.traceId, + traceId: resolvedRun.traceId, }); }) .catch((error) => { logger.error("RunStreamPresenter.cleanup.unsubscribe failed", { runFriendlyId, - traceId: run.traceId, + traceId: resolvedRun.traceId, error: { name: error.name, message: error.message, diff --git a/apps/webapp/app/routes/@.runs.$runParam.ts b/apps/webapp/app/routes/@.runs.$runParam.ts index a52600628d8..c2717418ff2 100644 --- a/apps/webapp/app/routes/@.runs.$runParam.ts +++ b/apps/webapp/app/routes/@.runs.$runParam.ts @@ -4,6 +4,7 @@ import { prisma } from "~/db.server"; import { redirectWithErrorMessage } from "~/models/message.server"; import { requireUser } from "~/services/session.server"; import { impersonate, rootPath, v3RunPath } from "~/utils/pathBuilder"; +import { findBufferedRunRedirectInfo } from "~/v3/mollifier/syntheticRedirectInfo.server"; const ParamsSchema = z.object({ runParam: z.string(), @@ -51,6 +52,26 @@ export async function loader({ params, request }: LoaderFunctionArgs) { }); if (!run) { + // Admin impersonation route — bypass org membership so admins can + // open any buffered run by friendlyId, mirroring the existing PG + // behaviour above (no membership filter on the find). + const buffered = await findBufferedRunRedirectInfo({ + runFriendlyId: runParam, + userId: user.id, + skipOrgMembershipCheck: true, + }); + if (buffered) { + return redirect( + impersonate( + v3RunPath( + { slug: buffered.organizationSlug }, + { slug: buffered.projectSlug }, + { slug: buffered.environmentSlug }, + { friendlyId: runParam } + ) + ) + ); + } return redirectWithErrorMessage(rootPath(), request, "Run doesn't exist", { ephemeral: false, }); diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx index d55511e7ff5..28bae86406f 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx @@ -88,10 +88,13 @@ import { useReplaceSearchParams } from "~/hooks/useReplaceSearchParams"; import { useSearchParams } from "~/hooks/useSearchParam"; import { type Shortcut, useShortcutKeys } from "~/hooks/useShortcutKeys"; import { useHasAdminAccess } from "~/hooks/useUser"; +import { env } from "~/env.server"; import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { NextRunListPresenter } from "~/presenters/v3/NextRunListPresenter.server"; import { RunEnvironmentMismatchError, RunPresenter } from "~/presenters/v3/RunPresenter.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { buildSyntheticTraceForBufferedRun } from "~/v3/mollifier/syntheticTrace.server"; import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server"; import { getImpersonationId } from "~/services/impersonation.server"; import { logger } from "~/services/logger.server"; @@ -277,6 +280,31 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { ); } + // PG miss → try the mollifier buffer. When the gate diverts a trigger + // the run sits in Redis until the drainer materialises it; without + // this fallback the run-detail page 404s for the brief buffered window + // even though the API has accepted the trigger and returned an id. + const buffered = await tryMollifiedRunFallback({ + runFriendlyId: runParam, + organizationSlug, + projectSlug: projectParam, + envSlug: envParam, + userId, + }); + + if (buffered) { + const parent = await getResizableSnapshot(request, resizableSettings.parent.autosaveId); + const tree = await getResizableSnapshot(request, resizableSettings.tree.autosaveId); + + return json({ + run: buffered.run, + trace: buffered.trace, + maximumLiveReloadingSetting: env.MAXIMUM_LIVE_RELOADING_EVENTS, + resizable: { parent, tree }, + runsList: null, + }); + } + throw error; } @@ -305,6 +333,52 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { }); }; +async function tryMollifiedRunFallback(args: { + runFriendlyId: string; + organizationSlug: string; + projectSlug: string; + envSlug: string; + userId: string; +}) { + const project = await findProjectBySlug(args.organizationSlug, args.projectSlug, args.userId); + if (!project) return null; + const environment = await findEnvironmentBySlug(project.id, args.envSlug, args.userId); + if (!environment) return null; + + const buffered = await findRunByIdWithMollifierFallback({ + runId: args.runFriendlyId, + environmentId: environment.id, + organizationId: project.organizationId, + }); + if (!buffered) return null; + + return { + run: { + id: buffered.friendlyId, + number: 1, + friendlyId: buffered.friendlyId, + traceId: buffered.traceId ?? "", + spanId: buffered.spanId ?? "", + status: "PENDING" as const, + isFinished: false, + startedAt: null, + completedAt: null, + logsDeletedAt: null, + rootTaskRun: null, + parentTaskRun: null, + environment: { + id: environment.id, + organizationId: project.organizationId, + type: environment.type, + slug: environment.slug, + userId: undefined, + userName: undefined, + }, + }, + trace: buildSyntheticTraceForBufferedRun(buffered), + }; +} + type LoaderData = SerializeFrom; export default function Page() { @@ -407,23 +481,17 @@ export default function Page() { /> {run.isFinished ? null : ( - - - - - - + )} @@ -587,6 +655,35 @@ function TraceView({ ); } +// Controlled wrapper around the cancel dialog. Owns the Radix open state +// so the dialog closes itself once the cancel action transitions through +// submission. We can't ``-wrap the submit button +// because Radix's onClick handler swallows the button's name=value pair +// that the form action depends on for `redirectUrl`. +function ControlledCancelRunDialog({ + runFriendlyId, + redirectPath, +}: { + runFriendlyId: string; + redirectPath: string; +}) { + const [open, setOpen] = useState(false); + return ( + + + + + setOpen(false)} + /> + + ); +} + function NoLogsView({ run, resizable }: Pick) { const plan = useCurrentPlan(); const organization = useOrganization(); @@ -616,9 +713,13 @@ function NoLogsView({ run, resizable }: Pick) { >
{daysSinceCompleted === undefined ? ( - + - We tidy up older logs to keep things running smoothly. + This run is queued. Logs will appear here once it begins executing. ) : isWithinLogRetention ? ( diff --git a/apps/webapp/app/routes/projects.v3.$projectRef.runs.$runParam.ts b/apps/webapp/app/routes/projects.v3.$projectRef.runs.$runParam.ts index fe267d1f9fa..816b2071ec4 100644 --- a/apps/webapp/app/routes/projects.v3.$projectRef.runs.$runParam.ts +++ b/apps/webapp/app/routes/projects.v3.$projectRef.runs.$runParam.ts @@ -2,7 +2,8 @@ import { type LoaderFunctionArgs, redirect } from "@remix-run/server-runtime"; import { z } from "zod"; import { prisma } from "~/db.server"; import { requireUserId } from "~/services/session.server"; -import { v3RunSpanPath } from "~/utils/pathBuilder"; +import { v3RunPath, v3RunSpanPath } from "~/utils/pathBuilder"; +import { findBufferedRunRedirectInfo } from "~/v3/mollifier/syntheticRedirectInfo.server"; const ParamsSchema = z.object({ projectRef: z.string(), @@ -44,6 +45,28 @@ export async function loader({ params, request }: LoaderFunctionArgs) { }); if (!run) { + // Fall back to the mollifier buffer so a /projects/v3/{ref}/runs/{id} + // share link works during the buffered window. + const buffered = await findBufferedRunRedirectInfo({ + runFriendlyId: validatedParams.runParam, + userId, + }); + if (buffered) { + const url = new URL(request.url); + const searchParams = url.searchParams; + if (!searchParams.has("span") && buffered.spanId) { + searchParams.set("span", buffered.spanId); + } + return redirect( + v3RunPath( + { slug: buffered.organizationSlug }, + { slug: buffered.projectSlug }, + { slug: buffered.environmentSlug }, + { friendlyId: validatedParams.runParam }, + searchParams + ) + ); + } throw new Response("Not found", { status: 404 }); } diff --git a/apps/webapp/app/routes/realtime.v1.runs.$runId.ts b/apps/webapp/app/routes/realtime.v1.runs.$runId.ts index e03787c6200..e3775097048 100644 --- a/apps/webapp/app/routes/realtime.v1.runs.$runId.ts +++ b/apps/webapp/app/routes/realtime.v1.runs.$runId.ts @@ -1,4 +1,3 @@ -import { json } from "@remix-run/server-runtime"; import { z } from "zod"; import { $replica } from "~/db.server"; import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; @@ -7,6 +6,13 @@ import { anyResource, createLoaderApiRoute, } from "~/services/routeBuilders/apiBuilder.server"; +import { logger } from "~/services/logger.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { + isInitialBufferedSubscriptionRequest, + recordRealtimeBufferedSubscription, +} from "~/v3/mollifier/mollifierTelemetry.server"; +import { resolveRealtimeRunResource } from "~/v3/mollifier/realtimeRunResource.server"; const ParamsSchema = z.object({ runId: z.string(), @@ -18,7 +24,7 @@ export const loader = createLoaderApiRoute( allowJWT: true, corsStrategy: "all", findResource: async (params, authentication) => { - return $replica.taskRun.findFirst({ + const pgRun = await $replica.taskRun.findFirst({ where: { friendlyId: params.runId, runtimeEnvironmentId: authentication.environment.id, @@ -31,6 +37,23 @@ export const loader = createLoaderApiRoute( }, }, }); + + // Buffered fallback. If the run is sitting in the mollifier buffer + // (no PG row yet), open the Electric subscription anyway: the + // shape stream returns an empty initial snapshot, and when the + // drainer INSERTs the PG row Electric streams it to the client. + // Without this branch the route 404s, ShapeStream stops on the + // first response, and the hook silently hangs even after the run + // materialises (no auto-recovery). + const bufferedSynthetic = pgRun + ? null + : await findRunByIdWithMollifierFallback({ + runId: params.runId, + environmentId: authentication.environment.id, + organizationId: authentication.environment.organizationId, + }); + + return resolveRealtimeRunResource({ pgRun, bufferedSynthetic }); }, authorization: { action: "read", @@ -48,6 +71,22 @@ export const loader = createLoaderApiRoute( }, }, async ({ authentication, request, resource: run, apiVersion }) => { + // Observability for buffered-window subscriptions. The gate keeps + // the counter at one tick per subscription instead of one tick per + // ~20s live-poll iteration (see `isInitialBufferedSubscriptionRequest`). + const bufferedDwellMs = (run as { __bufferedDwellMs?: number }).__bufferedDwellMs; + if ( + typeof bufferedDwellMs === "number" && + isInitialBufferedSubscriptionRequest(request.url) + ) { + recordRealtimeBufferedSubscription(authentication.environment.id); + logger.info("mollifier.realtime.buffered_subscription", { + runId: run.friendlyId, + envId: authentication.environment.id, + bufferDwellMs: bufferedDwellMs, + }); + } + return realtimeClient.streamRun( request.url, authentication.environment, diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx index 614b668f910..8a3f4dd3a6e 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx @@ -5,6 +5,8 @@ import { logger } from "~/services/logger.server"; import { requireUserId } from "~/services/session.server"; import { ResetIdempotencyKeyService } from "~/v3/services/resetIdempotencyKey.server"; import { v3RunParamsSchema } from "~/utils/pathBuilder"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; export const action: ActionFunction = async ({ request, params }) => { const userId = await requireUserId(request); @@ -37,17 +39,53 @@ export const action: ActionFunction = async ({ request, params }) => { }, }); - if (!taskRun) { - return jsonWithErrorMessage({}, request, "Run not found"); - } - - if (!taskRun.idempotencyKey) { - return jsonWithErrorMessage({}, request, "This run does not have an idempotency key"); + // Resolve run from PG or the mollifier buffer (Q5). For a buffered + // run the snapshot carries the idempotencyKey + taskIdentifier; we + // also need the runtimeEnvironmentId to feed ResetIdempotencyKeyService + // (which clears both PG and the buffer lookup — B6b). + let resolved: + | { idempotencyKey: string; taskIdentifier: string; runtimeEnvironmentId: string } + | null = null; + if (taskRun) { + if (!taskRun.idempotencyKey) { + return jsonWithErrorMessage({}, request, "This run does not have an idempotency key"); + } + resolved = { + idempotencyKey: taskRun.idempotencyKey, + taskIdentifier: taskRun.taskIdentifier, + runtimeEnvironmentId: taskRun.runtimeEnvironmentId, + }; + } else { + const buffer = getMollifierBuffer(); + const entry = buffer ? await buffer.getEntry(runParam) : null; + if (!entry) { + return jsonWithErrorMessage({}, request, "Run not found"); + } + const member = await prisma.orgMember.findFirst({ + where: { userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) { + return jsonWithErrorMessage({}, request, "Run not found"); + } + const synthetic = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: entry.envId, + organizationId: entry.orgId, + }); + if (!synthetic?.idempotencyKey || !synthetic.taskIdentifier) { + return jsonWithErrorMessage({}, request, "This run does not have an idempotency key"); + } + resolved = { + idempotencyKey: synthetic.idempotencyKey, + taskIdentifier: synthetic.taskIdentifier, + runtimeEnvironmentId: entry.envId, + }; } const environment = await prisma.runtimeEnvironment.findUnique({ where: { - id: taskRun.runtimeEnvironmentId, + id: resolved.runtimeEnvironmentId, }, include: { project: { @@ -64,7 +102,7 @@ export const action: ActionFunction = async ({ request, params }) => { const service = new ResetIdempotencyKeyService(); - await service.call(taskRun.idempotencyKey, taskRun.taskIdentifier, { + await service.call(resolved.idempotencyKey, resolved.taskIdentifier, { ...environment, organizationId: environment.project.organizationId, organization: environment.project.organization, diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.sessions.$sessionId.$io.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.sessions.$sessionId.$io.ts index 66135347253..fd1ec765126 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.sessions.$sessionId.$io.ts +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.sessions.$sessionId.$io.ts @@ -12,6 +12,7 @@ import { import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; import { requireUserId } from "~/services/session.server"; import { EnvironmentParamSchema } from "~/utils/pathBuilder"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; const ParamsSchema = z.object({ runParam: z.string(), @@ -59,6 +60,20 @@ export async function loader({ request, params }: LoaderFunctionArgs) { }); if (!run) { + // Buffered run has no Session linkage yet. Return 204 so the SDK's + // SSE client treats this as "channel not yet active" and retries + // naturally once the drainer materialises the row. + const buffered = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: environment.id, + organizationId: project.organizationId, + }); + if (buffered) { + return new Response(null, { + status: 204, + headers: { "content-type": "text/event-stream; charset=utf-8" }, + }); + } return new Response("Run not found", { status: 404 }); } diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.$streamId.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.$streamId.ts index 8d0af728df8..58491dd4298 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.$streamId.ts +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.$streamId.ts @@ -7,6 +7,7 @@ import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { requireUserId } from "~/services/session.server"; import { EnvironmentParamSchema } from "~/utils/pathBuilder"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; const ParamsSchema = z.object({ runParam: z.string(), @@ -58,6 +59,22 @@ export async function loader({ request, params }: LoaderFunctionArgs) { }); if (!run) { + // Fall through to a buffered-run lookup. A buffered run has no output + // streams yet (execution hasn't started); return 204 with the + // event-stream content-type so the SDK's SSE client treats this as + // "stream not yet active" and retries naturally once the drainer + // materialises the run. + const buffered = await findRunByIdWithMollifierFallback({ + runId, + environmentId: environment.id, + organizationId: project.organizationId, + }); + if (buffered) { + return new Response(null, { + status: 204, + headers: { "content-type": "text/event-stream; charset=utf-8" }, + }); + } return new Response("Run not found", { status: 404 }); } diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.input.$streamId.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.input.$streamId.ts index c9480299cc0..430ed5c52f6 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.input.$streamId.ts +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.realtime.v1.streams.$runId.input.$streamId.ts @@ -7,6 +7,7 @@ import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { requireUserId } from "~/services/session.server"; import { EnvironmentParamSchema } from "~/utils/pathBuilder"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; const ParamsSchema = z.object({ runParam: z.string(), @@ -60,6 +61,20 @@ export async function loader({ request, params }: LoaderFunctionArgs) { }); if (!run) { + // Fall through to a buffered-run lookup. A buffered run has no input + // streams yet; return 204 so the SDK's SSE client treats this as + // "stream not yet active" and retries naturally. + const buffered = await findRunByIdWithMollifierFallback({ + runId, + environmentId: environment.id, + organizationId: project.organizationId, + }); + if (buffered) { + return new Response(null, { + status: 204, + headers: { "content-type": "text/event-stream; charset=utf-8" }, + }); + } return new Response("Run not found", { status: 404 }); } diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx index 09f3f33fcb3..ce80b32e1df 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx @@ -82,6 +82,10 @@ import { useHasAdminAccess } from "~/hooks/useUser"; import { useCanViewLogsPage } from "~/hooks/useCanViewLogsPage"; import { redirectWithErrorMessage } from "~/models/message.server"; import { type Span, SpanPresenter, type SpanRun } from "~/presenters/v3/SpanPresenter.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { buildSyntheticSpanRun } from "~/v3/mollifier/syntheticSpanRun.server"; +import { findProjectBySlug } from "~/models/project.server"; +import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; import { logger } from "~/services/logger.server"; import { requireUserId } from "~/services/session.server"; import { cn } from "~/utils/cn"; @@ -117,6 +121,41 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { const presenter = new SpanPresenter(); + const tryBufferFallback = async () => { + // Fall back to the mollifier buffer when the run isn't in PG yet. We + // only synthesise a SpanRun for the root span; child spans don't + // exist for a buffered run, so non-root spanParam values resolve to + // "Event not found" (correct behaviour). + const project = await findProjectBySlug(organizationSlug, projectParam, userId); + if (!project) return null; + const environment = await findEnvironmentBySlug(project.id, envParam, userId); + if (!environment) return null; + + const buffered = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: environment.id, + organizationId: project.organizationId, + }); + if (!buffered) return null; + if (buffered.spanId !== spanParam) { + // The runId is buffered but this spanId doesn't match the root span. + // Don't toast "Event not found" — that's noisy for the initial-render + // request the dashboard fires before the root span auto-selects. + // 204 No Content matches what the PG path returns for the same case. + return new Response(null, { status: 204 }); + } + + const run = await buildSyntheticSpanRun({ + run: buffered, + environment: { + id: environment.id, + slug: environment.slug, + type: environment.type, + }, + }); + return typedjson({ type: "run" as const, run }); + }; + try { const result = await presenter.call({ projectSlug: projectParam, @@ -127,6 +166,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { }); if (!result) { + const buffered = await tryBufferFallback(); + if (buffered) return buffered; return redirectWithErrorMessage( v3RunPath( { slug: organizationSlug }, @@ -147,6 +188,9 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { } return typedjson({ type: "span" as const, span: result.span }); } catch (error) { + const buffered = await tryBufferFallback(); + if (buffered) return buffered; + logger.error("Error loading span", { projectParam, organizationSlug, diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx index 4a9581831c9..5000f68dba1 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx @@ -24,6 +24,7 @@ import { useProject } from "~/hooks/useProject"; import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; import { requireUserId } from "~/services/session.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; import { cn } from "~/utils/cn"; import { v3RunStreamParamsSchema } from "~/utils/pathBuilder"; @@ -75,6 +76,28 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { }); if (!run) { + // Buffered run has no realtime streams yet. Resolve the env by slug + // (so the buffer auth check below carries the same scope a PG hit + // would) and return 204 so the SDK's SSE client treats this as + // "stream not yet active" and retries on reconnect once the drainer + // materialises the row. + const env = await $replica.runtimeEnvironment.findFirst({ + where: { slug: envParam, projectId: project.id }, + select: { id: true }, + }); + if (env) { + const buffered = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: env.id, + organizationId: project.organizationId, + }); + if (buffered) { + return new Response(null, { + status: 204, + headers: { "content-type": "text/event-stream; charset=utf-8" }, + }); + } + } throw new Response("Not Found", { status: 404 }); } diff --git a/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts b/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts index 5c7725c510b..2ff0e083389 100644 --- a/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts +++ b/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts @@ -9,6 +9,7 @@ import { formatDurationMilliseconds } from "@trigger.dev/core/v3/utils/durations import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; import { TaskEventKind } from "@trigger.dev/database"; import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; export async function loader({ params, request }: LoaderFunctionArgs) { const user = await requireUser(request); @@ -30,6 +31,39 @@ export async function loader({ params, request }: LoaderFunctionArgs) { }); if (!run || !run.organizationId) { + // Buffered run has no events to package yet. Return a small gzipped + // placeholder file so the dashboard's "Download logs" button doesn't + // 404 mid-burst. We don't enforce org membership here because the + // buffer entry's envId/orgId fields aren't bound to the requesting + // user — that's checked by the calling page's loader already (this + // route is only reachable from a page the user has visited). + const buffer = getMollifierBuffer(); + if (buffer) { + try { + const entry = await buffer.getEntry(parsedParams.runParam); + if (entry) { + const placeholder = new Readable({ + read() { + this.push( + "# This run has not started yet. Logs will be available once it begins executing.\n" + ); + this.push(null); + }, + }); + const compressed = placeholder.pipe(createGzip()); + return new Response(compressed as any, { + status: 200, + headers: { + "Content-Type": "application/octet-stream", + "Content-Disposition": `attachment; filename="${parsedParams.runParam}.log"`, + "Content-Encoding": "gzip", + }, + }); + } + } catch { + // fall through to 404 on buffer error + } + } return new Response("Not found", { status: 404 }); } diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts index 240d7d3d8ed..c3dff252a73 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts @@ -6,6 +6,7 @@ import { redirectWithErrorMessage, redirectWithSuccessMessage } from "~/models/m import { logger } from "~/services/logger.server"; import { requireUserId } from "~/services/session.server"; import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; export const cancelSchema = z.object({ redirectUrl: z.string(), @@ -42,15 +43,56 @@ export const action: ActionFunction = async ({ request, params }) => { }, }); - if (!taskRun) { + if (taskRun) { + const cancelRunService = new CancelTaskRunService(); + await cancelRunService.call(taskRun); + return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`); + } + + // PG miss — try the mollifier buffer. The customer can hit cancel + // on a buffered run from the dashboard during the burst window. + // Q4 design: snapshot a `mark_cancelled` patch; the drainer's + // bifurcation routes the run to `engine.createCancelledRun` on + // next pop. + const buffer = getMollifierBuffer(); + const entry = buffer ? await buffer.getEntry(runParam) : null; + if (!entry) { submission.error = { runParam: ["Run not found"] }; return json(submission); } - const cancelRunService = new CancelTaskRunService(); - await cancelRunService.call(taskRun); + // Dashboard auth: verify the requesting user is a member of the + // buffered run's org. The API path scopes by env id from the + // authenticated request; the dashboard route uses org-membership + // because the URL doesn't carry an envId. + const member = await prisma.orgMember.findFirst({ + where: { userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) { + submission.error = { runParam: ["Run not found"] }; + return json(submission); + } - return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`); + const result = await buffer!.mutateSnapshot(runParam, { + type: "mark_cancelled", + cancelledAt: new Date().toISOString(), + cancelReason: "Canceled by user", + }); + if (result === "applied_to_snapshot") { + return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`); + } + if (result === "not_found") { + submission.error = { runParam: ["Run not found"] }; + return json(submission); + } + // "busy" — drainer is materialising. Customer can retry; by then the + // PG row exists and the regular cancel path takes over. + return redirectWithErrorMessage( + submission.value.redirectUrl, + request, + "Run is materialising — retry in a moment" + ); } catch (error) { if (error instanceof Error) { logger.error("Failed to cancel run", { diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts index d7acf18e517..e9d7ccd0b31 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts @@ -5,6 +5,8 @@ import { $replica } from "~/db.server"; import { requireUserId } from "~/services/session.server"; import { marqs } from "~/v3/marqs/index.server"; import { engine } from "~/v3/runEngine.server"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; +import { deserialiseSnapshot } from "@trigger.dev/redis-worker"; const ParamSchema = z.object({ runParam: z.string(), @@ -43,6 +45,45 @@ export async function loader({ request, params }: LoaderFunctionArgs) { }); if (!run) { + // Buffered run isn't on a queue yet (it sits in the mollifier buffer + // until the drainer materialises it), so the queue-concurrency fields + // don't apply. Return a minimal "buffered" debug payload from the + // snapshot so the Debug panel can show *something* instead of 404'ing. + const buffer = getMollifierBuffer(); + if (buffer) { + try { + const entry = await buffer.getEntry(runParam); + if (entry) { + const snapshot = deserialiseSnapshot<{ + taskIdentifier?: string; + queue?: string; + concurrencyKey?: string; + }>(entry.payload); + return typedjson({ + engine: "V2" as const, + buffered: true, + run: { + id: entry.runId, + engine: "V2" as const, + friendlyId: entry.runId, + queue: snapshot.queue ?? null, + concurrencyKey: snapshot.concurrencyKey ?? null, + queueTimestamp: entry.createdAt, + runtimeEnvironment: null, + }, + queueConcurrencyLimit: undefined, + envConcurrencyLimit: undefined, + queueCurrentConcurrency: undefined, + envCurrentConcurrency: undefined, + queueReserveConcurrency: undefined, + envReserveConcurrency: undefined, + keys: [], + }); + } + } catch { + // fall through to 404 on buffer error + } + } throw new Response("Not Found", { status: 404 }); } diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts index 8a22822d06b..62da62e0478 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts @@ -11,6 +11,9 @@ import { requireUser } from "~/services/session.server"; import { sortEnvironments } from "~/utils/environmentSort"; import { v3RunSpanPath } from "~/utils/pathBuilder"; import { ReplayTaskRunService } from "~/v3/services/replayTaskRun.server"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import type { TaskRun } from "@trigger.dev/database"; import parseDuration from "parse-duration"; import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server"; import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server"; @@ -33,7 +36,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) { Object.fromEntries(new URL(request.url).searchParams) ); - const run = await $replica.taskRun.findFirst({ + let run = await $replica.taskRun.findFirst({ select: { payload: true, payloadType: true, @@ -88,6 +91,74 @@ export async function loader({ request, params }: LoaderFunctionArgs) { where: { friendlyId: runParam, project: { organization: { members: { some: { userId } } } } }, }); + let synthetic: + | (Awaited> & { __synth: true }) + | undefined; + if (!run) { + // Buffered fallback: read the snapshot and look up the env list via + // the snapshot's organizationId. Without this the Replay dialog + // 404s for runs queued in the mollifier buffer, which dumps the + // user back to the task list. + const buffer = getMollifierBuffer(); + const entry = buffer ? await buffer.getEntry(runParam) : null; + if (!entry) throw new Response("Not Found", { status: 404 }); + const member = await prisma.orgMember.findFirst({ + where: { userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) throw new Response("Not Found", { status: 404 }); + const buffered = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: entry.envId, + organizationId: entry.orgId, + }); + if (!buffered) throw new Response("Not Found", { status: 404 }); + synthetic = Object.assign(buffered, { __synth: true as const }); + const orgProject = await $replica.project.findFirst({ + where: { + environments: { some: { id: entry.envId } }, + }, + select: { + slug: true, + environments: { + select: { + id: true, + type: true, + slug: true, + branchName: true, + orgMember: { select: { user: true } }, + }, + where: { + archivedAt: null, + OR: [ + { type: { in: ["PREVIEW", "STAGING", "PRODUCTION"] } }, + { type: "DEVELOPMENT", orgMember: { userId } }, + ], + }, + }, + }, + }); + if (!orgProject) throw new Response("Not Found", { status: 404 }); + run = { + payload: buffered.payload, + payloadType: buffered.payloadType ?? "application/json", + seedMetadata: buffered.seedMetadata ?? null, + seedMetadataType: buffered.seedMetadataType ?? null, + runtimeEnvironmentId: entry.envId, + concurrencyKey: buffered.concurrencyKey ?? null, + maxAttempts: buffered.maxAttempts ?? null, + maxDurationInSeconds: buffered.maxDurationInSeconds ?? null, + machinePreset: buffered.machinePreset ?? null, + workerQueue: buffered.workerQueue ?? null, + ttl: buffered.ttl ?? null, + idempotencyKey: buffered.idempotencyKey ?? null, + runTags: buffered.runTags, + queue: buffered.queue ?? "task/", + taskIdentifier: buffered.taskIdentifier ?? "", + project: orgProject, + } as unknown as typeof run; + } + if (!run) { throw new Response("Not Found", { status: 404 }); } @@ -174,7 +245,7 @@ export const action: ActionFunction = async ({ request, params }) => { } try { - const taskRun = await prisma.taskRun.findFirst({ + const pgRun = await prisma.taskRun.findFirst({ where: { friendlyId: runParam, }, @@ -192,6 +263,45 @@ export const action: ActionFunction = async ({ request, params }) => { }, }); + // Mollifier read-fallback (Q2): if the original isn't in PG yet, + // synthesise a TaskRun from the buffered snapshot. The B4-extended + // SyntheticRun carries every field ReplayTaskRunService reads. We + // also need projectSlug + orgSlug + envSlug for the redirect path, + // so look those up via the snapshot's runtimeEnvironmentId. + let taskRun: + | (TaskRun & { + project: { slug: string; organization: { slug: string } }; + runtimeEnvironment: { slug: string }; + }) + | null = pgRun ?? null; + if (!taskRun) { + const buffer = getMollifierBuffer(); + const entry = buffer ? await buffer.getEntry(runParam) : null; + if (entry) { + const synthetic = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: entry.envId, + organizationId: entry.orgId, + }); + if (synthetic) { + const envRow = await prisma.runtimeEnvironment.findFirst({ + where: { id: entry.envId }, + select: { + slug: true, + project: { select: { slug: true, organization: { select: { slug: true } } } }, + }, + }); + if (envRow) { + taskRun = { + ...(synthetic as unknown as TaskRun), + project: { slug: envRow.project.slug, organization: { slug: envRow.project.organization.slug } }, + runtimeEnvironment: { slug: envRow.slug }, + }; + } + } + } + } + if (!taskRun) { return redirectWithErrorMessage(submission.value.failedRedirect, request, "Run not found"); } diff --git a/apps/webapp/app/routes/runs.$runParam.ts b/apps/webapp/app/routes/runs.$runParam.ts index b472d7ae8f4..7be799746fd 100644 --- a/apps/webapp/app/routes/runs.$runParam.ts +++ b/apps/webapp/app/routes/runs.$runParam.ts @@ -4,6 +4,7 @@ import { prisma } from "~/db.server"; import { redirectWithErrorMessage } from "~/models/message.server"; import { requireUser } from "~/services/session.server"; import { rootPath, v3RunPath } from "~/utils/pathBuilder"; +import { findBufferedRunRedirectInfo } from "~/v3/mollifier/syntheticRedirectInfo.server"; const ParamsSchema = z.object({ runParam: z.string(), @@ -48,6 +49,26 @@ export async function loader({ params, request }: LoaderFunctionArgs) { }); if (!run) { + // Fall back to the mollifier buffer. Without this a customer clicking + // the run link returned by the trigger API gets bounced to the home + // page until the drainer materialises the PG row. + const buffered = await findBufferedRunRedirectInfo({ runFriendlyId: runParam, userId: user.id }); + if (buffered) { + const url = new URL(request.url); + const searchParams = url.searchParams; + if (!searchParams.has("span") && buffered.spanId) { + searchParams.set("span", buffered.spanId); + } + return redirect( + v3RunPath( + { slug: buffered.organizationSlug }, + { slug: buffered.projectSlug }, + { slug: buffered.environmentSlug }, + { friendlyId: runParam }, + searchParams + ) + ); + } return redirectWithErrorMessage( rootPath(), request, diff --git a/apps/webapp/app/v3/mollifier/realtimeRunResource.server.ts b/apps/webapp/app/v3/mollifier/realtimeRunResource.server.ts new file mode 100644 index 00000000000..0a84f984530 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/realtimeRunResource.server.ts @@ -0,0 +1,57 @@ +import type { SyntheticRun } from "./readFallback.server"; + +// Shape `realtime.v1.runs.$runId.ts`'s findResource hands to the route's +// authorization callback + loader body. The PG-resident case is the +// canonical shape (a TaskRun row with the batch join); the buffered +// case below mirrors it from the synthetic run. +export type RealtimeRunResource = { + id: string; + friendlyId: string; + taskIdentifier: string; + runTags: string[]; + batch: { friendlyId: string } | null; + // Present only when this resource was resolved from the mollifier + // buffer (no PG row yet). Stamped at resolve time so the loader body + // can emit observability for buffered-window subscriptions. The flag + // doubles as the discriminant — PG-sourced resources never carry it. + __bufferedDwellMs?: number; +}; + +export type RealtimeRunResourcePgRun = { + id: string; + friendlyId: string; + taskIdentifier: string; + runTags: string[]; + batch: { friendlyId: string } | null; +}; + +// Given the results of the PG and buffer lookups, produce the resource +// shape the realtime route returns from findResource. PG-first: if the +// run is PG-resident, return it unchanged (the buffered fallback only +// fires when no PG row exists yet). When only the buffer has the run, +// synthesise a matching shape whose `id` is the deterministic value +// engine.trigger will write when the drainer materialises this run — +// this is what lets the Electric subscription's `WHERE id=` match +// the eventual INSERT. +export function resolveRealtimeRunResource(input: { + pgRun: RealtimeRunResourcePgRun | null; + bufferedSynthetic: Pick< + SyntheticRun, + "id" | "friendlyId" | "taskIdentifier" | "runTags" | "createdAt" + > | null; + now?: () => number; +}): RealtimeRunResource | null { + if (input.pgRun) return input.pgRun; + if (input.bufferedSynthetic) { + const now = (input.now ?? Date.now)(); + return { + id: input.bufferedSynthetic.id, + friendlyId: input.bufferedSynthetic.friendlyId, + taskIdentifier: input.bufferedSynthetic.taskIdentifier ?? "", + runTags: input.bufferedSynthetic.runTags, + batch: null, + __bufferedDwellMs: now - input.bufferedSynthetic.createdAt.getTime(), + }; + } + return null; +} diff --git a/apps/webapp/test/mollifierRealtimeRunResource.test.ts b/apps/webapp/test/mollifierRealtimeRunResource.test.ts new file mode 100644 index 00000000000..2f53ecb892f --- /dev/null +++ b/apps/webapp/test/mollifierRealtimeRunResource.test.ts @@ -0,0 +1,90 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { resolveRealtimeRunResource } from "~/v3/mollifier/realtimeRunResource.server"; + +const pgRun = { + id: "pg_internal_id", + friendlyId: "run_pg_friendly", + taskIdentifier: "hello-world", + runTags: ["a", "b"], + batch: { friendlyId: "batch_1" }, +}; + +const bufferedSynthetic = { + id: "buffered_id", + friendlyId: "run_buffered_id", + taskIdentifier: "hello-world", + runTags: ["c"], + // Six seconds ago against the fixed `now` below. + createdAt: new Date("2026-05-22T12:00:00.000Z"), +}; + +const fixedNow = () => new Date("2026-05-22T12:00:06.000Z").getTime(); + +describe("resolveRealtimeRunResource", () => { + it("returns the PG run unchanged when one exists", () => { + // PG wins even if the buffer also has the entry — the drainer may + // be racing the route call and the PG row is the canonical source. + expect( + resolveRealtimeRunResource({ pgRun, bufferedSynthetic: null }), + ).toEqual(pgRun); + expect( + resolveRealtimeRunResource({ pgRun, bufferedSynthetic }), + ).toEqual(pgRun); + }); + + it("never stamps __bufferedDwellMs on a PG-sourced resource", () => { + // The loader body uses __bufferedDwellMs as a discriminant for + // emitting buffered-subscription observability. A PG-resident run + // must never carry it or every PG subscription would over-count. + const result = resolveRealtimeRunResource({ pgRun, bufferedSynthetic }); + expect(result).not.toHaveProperty("__bufferedDwellMs"); + }); + + it("synthesises a resource from the buffered entry when PG misses", () => { + // Load-bearing assertion: `id` must equal `bufferedSynthetic.id`. + // The realtime route hands this `id` to streamRun, which builds + // Electric's `WHERE id=''` clause. When the drainer materialises + // the run, engine.trigger writes the row with that same id (derived + // deterministically from friendlyId), and Electric streams the + // INSERT to the client. If the synthesised `id` ever drifts from + // what the drainer writes, the customer subscribes to a shape that + // never matches and the hook silently hangs even after materialise. + const result = resolveRealtimeRunResource({ + pgRun: null, + bufferedSynthetic, + now: fixedNow, + }); + expect(result).toEqual({ + id: "buffered_id", + friendlyId: "run_buffered_id", + taskIdentifier: "hello-world", + runTags: ["c"], + batch: null, + __bufferedDwellMs: 6000, + }); + }); + + it("defaults a missing taskIdentifier to empty string", () => { + const result = resolveRealtimeRunResource({ + pgRun: null, + bufferedSynthetic: { ...bufferedSynthetic, taskIdentifier: undefined }, + now: fixedNow, + }); + expect(result?.taskIdentifier).toBe(""); + }); + + it("returns null when neither PG nor buffer have the run", () => { + // This is the genuine not-found case — typo'd runId, deleted run, + // etc. The api-builder maps null to 404. Critically, the buffered- + // fallback must NOT promote a missing run to a synthetic resource — + // that would cause Electric to open a shape for a runId that may + // never exist, which is also a silent-hang situation but for a + // different reason. + expect( + resolveRealtimeRunResource({ pgRun: null, bufferedSynthetic: null }), + ).toBeNull(); + }); +}); diff --git a/apps/webapp/test/mollifierRealtimeRunResourceBuffer.test.ts b/apps/webapp/test/mollifierRealtimeRunResourceBuffer.test.ts new file mode 100644 index 00000000000..5cf0610b73b --- /dev/null +++ b/apps/webapp/test/mollifierRealtimeRunResourceBuffer.test.ts @@ -0,0 +1,152 @@ +import { describe, expect, vi } from "vitest"; +import { redisTest } from "@internal/testcontainers"; +import { MollifierBuffer } from "@trigger.dev/redis-worker"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { resolveRealtimeRunResource } from "~/v3/mollifier/realtimeRunResource.server"; + +const SNAPSHOT_BASE = { + friendlyId: "run_phase52e2e", + taskIdentifier: "hello-world", + payload: '{"x":1}', + payloadType: "application/json", + traceContext: { traceparent: "00-0123456789abcdef0123456789abcdef-fedcba9876543210-01" }, + traceId: "0123456789abcdef0123456789abcdef", + spanId: "fedcba9876543210", + queue: "task/hello-world", + tags: ["realtime-e2e"], + depth: 0, + isTest: false, + taskEventStore: "taskEvent", +}; + +// End-to-end: a real MollifierBuffer has an entry, the real +// readFallback helper deserialises it, and the resolveRealtimeRunResource +// helper produces the resource shape the realtime route returns from +// findResource. Regression intent: if any link in the chain breaks — +// buffer interface rename, snapshot field rename, id-derivation drift, +// synthetic-shape change — this test fails. The route file itself is +// then a thin glue layer over tested pieces. +describe("realtime buffered-subscription resource resolution (testcontainers)", () => { + redisTest( + "synthesises a resource whose `id` matches RunId.fromFriendlyId", + async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: SNAPSHOT_BASE.friendlyId, + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT_BASE), + }); + + const bufferedSynthetic = await findRunByIdWithMollifierFallback( + { + runId: SNAPSHOT_BASE.friendlyId, + environmentId: "env_a", + organizationId: "org_1", + }, + { getBuffer: () => buffer }, + ); + expect(bufferedSynthetic).not.toBeNull(); + + const resource = resolveRealtimeRunResource({ + pgRun: null, + bufferedSynthetic, + }); + + // The load-bearing contract: the resolved `id` MUST equal what + // engine.trigger will write to PG.TaskRun.id when the drainer + // materialises this run. Electric's `WHERE id=''` clause + // depends on this match — drift means a silent-hang regression. + expect(resource?.id).toBe(RunId.fromFriendlyId(SNAPSHOT_BASE.friendlyId)); + expect(resource?.friendlyId).toBe(SNAPSHOT_BASE.friendlyId); + expect(resource?.taskIdentifier).toBe("hello-world"); + expect(resource?.runTags).toEqual(["realtime-e2e"]); + expect(resource?.batch).toBeNull(); + expect(resource?.__bufferedDwellMs).toBeTypeOf("number"); + expect(resource?.__bufferedDwellMs).toBeGreaterThanOrEqual(0); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "returns null when neither PG nor the buffer have the entry", + async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + const bufferedSynthetic = await findRunByIdWithMollifierFallback( + { + runId: "run_does_not_exist", + environmentId: "env_a", + organizationId: "org_1", + }, + { getBuffer: () => buffer }, + ); + expect(bufferedSynthetic).toBeNull(); + + const resource = resolveRealtimeRunResource({ + pgRun: null, + bufferedSynthetic, + }); + // The api builder relies on this null to emit a real 404 for + // genuinely missing runs. If we ever promote unknown runIds to + // synthetic resources here, the route opens an Electric shape + // for a run that may never exist — a different silent-hang + // failure mode for typos, deleted runs, etc. + expect(resource).toBeNull(); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "does not fall back to buffer when PG has the row", + async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: SNAPSHOT_BASE.friendlyId, + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT_BASE), + }); + + // Simulate the drainer having materialised the run: PG has the + // canonical row, the buffer still has its entry (would be + // ack'd & removed in real ops). The resolver must return the + // PG row and NOT carry the __bufferedDwellMs flag — otherwise + // the loader body would emit a buffered-subscription log for a + // run that's actually PG-resident, over-counting the signal. + const pgRun = { + id: RunId.fromFriendlyId(SNAPSHOT_BASE.friendlyId), + friendlyId: SNAPSHOT_BASE.friendlyId, + taskIdentifier: "hello-world", + runTags: ["realtime-e2e"], + batch: null, + }; + + const bufferedSynthetic = await findRunByIdWithMollifierFallback( + { + runId: SNAPSHOT_BASE.friendlyId, + environmentId: "env_a", + organizationId: "org_1", + }, + { getBuffer: () => buffer }, + ); + + const resource = resolveRealtimeRunResource({ pgRun, bufferedSynthetic }); + expect(resource).toEqual(pgRun); + expect(resource).not.toHaveProperty("__bufferedDwellMs"); + } finally { + await buffer.close(); + } + }, + ); +}); diff --git a/apps/webapp/test/mollifierRealtimeSubscription.test.ts b/apps/webapp/test/mollifierRealtimeSubscription.test.ts new file mode 100644 index 00000000000..0ea0471a5f1 --- /dev/null +++ b/apps/webapp/test/mollifierRealtimeSubscription.test.ts @@ -0,0 +1,46 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ + prisma: {}, + $replica: {}, +})); + +import { isInitialBufferedSubscriptionRequest } from "~/v3/mollifier/mollifierTelemetry.server"; + +describe("isInitialBufferedSubscriptionRequest", () => { + // Electric's shape-stream protocol returns a `handle=` in + // the first response. The SDK echoes that handle on every reconnect / + // live-poll iteration thereafter. The realtime route logs + + // increments the mollifier.realtime_subscriptions.buffered counter + // only on the initial connect (handle absent) so each subscription + // produces a single observability event instead of one per + // long-poll round-trip (~20s). + it("returns true for the SDK's initial GET (no handle param)", () => { + expect( + isInitialBufferedSubscriptionRequest( + "http://localhost:3030/realtime/v1/runs/run_x?log=full&offset=-1", + ), + ).toBe(true); + }); + + it("returns false for Electric's reconnects (handle present)", () => { + expect( + isInitialBufferedSubscriptionRequest( + "http://localhost:3030/realtime/v1/runs/run_x?handle=100344308-1779&log=full&offset=0_0", + ), + ).toBe(false); + }); + + it("returns false for Electric live-poll reconnects (handle + cursor)", () => { + expect( + isInitialBufferedSubscriptionRequest( + "http://localhost:3030/realtime/v1/runs/run_x?cursor=51020980&handle=100344308&live=true&log=full&offset=0_inf", + ), + ).toBe(false); + }); + + it("accepts a URL instance as well as a string", () => { + const url = new URL("http://localhost:3030/realtime/v1/runs/run_x?log=full"); + expect(isInitialBufferedSubscriptionRequest(url)).toBe(true); + }); +}); From 6ba72fca6acdccebb472ad146700c38c57eb9b76 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Tue, 26 May 2026 14:43:27 +0100 Subject: [PATCH 2/6] fix(webapp): mollifier buffered-path auth checks + drop high-cardinality envId from realtime counter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three CodeRabbit findings from #3709, re-raised on #3757: - resources.taskruns.$runParam.debug.ts: buffered fallback returned the run's queue / concurrencyKey / queueTimestamp from the snapshot without verifying org membership. Any authenticated user who knew a friendlyId could read those fields across orgs. Now joins through orgMember the same way the PG path does and 404s on miss. - resources.runs.$runParam.logs.download.ts: same shape — the buffered placeholder leaked runId existence to non-members on direct URL access. Same orgMember check now gates the buffered branch. - mollifierTelemetry.server.ts: recordRealtimeBufferedSubscription was attaching envId (a UUID) as an OTEL counter dimension, violating the project's "no high-cardinality IDs in metric attributes" guideline. Dropped the parameter; the call site's logger.info still emits envId. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/webapp/app/routes/realtime.v1.runs.$runId.ts | 2 +- .../resources.runs.$runParam.logs.download.ts | 14 ++++++++++---- .../routes/resources.taskruns.$runParam.debug.ts | 10 ++++++++++ .../app/v3/mollifier/mollifierTelemetry.server.ts | 6 +++--- 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/apps/webapp/app/routes/realtime.v1.runs.$runId.ts b/apps/webapp/app/routes/realtime.v1.runs.$runId.ts index e3775097048..a3cd5ac4f99 100644 --- a/apps/webapp/app/routes/realtime.v1.runs.$runId.ts +++ b/apps/webapp/app/routes/realtime.v1.runs.$runId.ts @@ -79,7 +79,7 @@ export const loader = createLoaderApiRoute( typeof bufferedDwellMs === "number" && isInitialBufferedSubscriptionRequest(request.url) ) { - recordRealtimeBufferedSubscription(authentication.environment.id); + recordRealtimeBufferedSubscription(); logger.info("mollifier.realtime.buffered_subscription", { runId: run.friendlyId, envId: authentication.environment.id, diff --git a/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts b/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts index 2ff0e083389..4c5e89c42a7 100644 --- a/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts +++ b/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts @@ -33,15 +33,21 @@ export async function loader({ params, request }: LoaderFunctionArgs) { if (!run || !run.organizationId) { // Buffered run has no events to package yet. Return a small gzipped // placeholder file so the dashboard's "Download logs" button doesn't - // 404 mid-burst. We don't enforce org membership here because the - // buffer entry's envId/orgId fields aren't bound to the requesting - // user — that's checked by the calling page's loader already (this - // route is only reachable from a page the user has visited). + // 404 mid-burst. Re-check org membership against the buffer entry's + // orgId so direct URL access can't confirm runId existence across + // orgs (loader-based check on the calling page doesn't apply here). const buffer = getMollifierBuffer(); if (buffer) { try { const entry = await buffer.getEntry(parsedParams.runParam); if (entry) { + const member = await prisma.orgMember.findFirst({ + where: { userId: user.id, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) { + return new Response("Not found", { status: 404 }); + } const placeholder = new Readable({ read() { this.push( diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts index e9d7ccd0b31..f9fa8521354 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts @@ -54,6 +54,16 @@ export async function loader({ request, params }: LoaderFunctionArgs) { try { const entry = await buffer.getEntry(runParam); if (entry) { + // Same org-membership gate as the PG path above. Without it, + // any authenticated user who knows a runId could read the + // buffered run's queue/concurrencyKey snapshot across orgs. + const member = await $replica.orgMember.findFirst({ + where: { userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) { + throw new Response("Not Found", { status: 404 }); + } const snapshot = deserialiseSnapshot<{ taskIdentifier?: string; queue?: string; diff --git a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts index f9c7ca72f1f..094999c1f00 100644 --- a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts @@ -31,9 +31,9 @@ export const realtimeBufferedSubscriptionsCounter = meter.createCounter( // No `envId` attribute — `envId` is a banned high-cardinality metric // label per the repo's OTel rules. The structured warn log emitted -// alongside the counter tick (in `mollifierStaleSweep.server.ts`) -// carries the envId / orgId / runId for forensic drill-down; the -// metric stays an aggregate. +// alongside the counter tick (in `mollifierStaleSweep.server.ts` and +// the realtime route's `logger.info`) carries the envId / orgId / +// runId for forensic drill-down; the metric stays an aggregate. export function recordRealtimeBufferedSubscription(): void { realtimeBufferedSubscriptionsCounter.add(1); } From 6cf8be0a77582b19a8da1adb51853adcb5a95630 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 27 May 2026 09:13:04 +0100 Subject: [PATCH 3/6] fix(webapp): dashboard typecheck + Devin auth-throw swallowing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three issues: - spans.\$spanParam/route.tsx loader returned a raw \`Response\` (204 on spanId-mismatch) alongside typedjson, collapsing the discriminated \`{type: "run"|"span"}\` union in the useTypedFetcher consumer. Switched to \`throw new Response(null, { status: 204 })\` so Remix exits cleanly without polluting the return type. - resources.taskruns.\$runParam.debug.ts loader's buffered branch returned \`engine: "V2"\` with \`runtimeEnvironment: null\`, widening the V2 variant in debugRun.tsx and breaking every MarQS / RunQueue key call site (run.runtimeEnvironment now nullable). Switched to \`engine: "BUFFERED" as const\` so debugRun.tsx narrows the buffered case to a dedicated panel — the V1/V2 panels recover their non-null runtimeEnvironment assumption. - Devin r3305402673: bare \`catch {}\` was swallowing the intentional \`throw new Response("Not Found", { status: 404 })\` auth-check fallthrough. Narrowed the try to only wrap \`buffer.getEntry\` (the one call that can transient-fail). Authorization and snapshot parsing now live outside the try, so an intentional 404 throw isn't masked by the transient-error catch. Keeps \`throw\` rather than Devin's suggested \`return\` because the consumer is typedjson-typed and a raw-Response return collapses the discriminated union. Also adds a dedicated DebugRunDataBuffered panel for buffered runs that renders the snapshot's queue / concurrencyKey / task identifier (buffered runs aren't on a PG queue, so the existing MarQS/RunQueue panels would 404 anyway). Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/webapp/app/components/admin/debugRun.tsx | 62 ++++++++++++- .../route.tsx | 6 +- .../resources.taskruns.$runParam.debug.ts | 89 +++++++++++-------- 3 files changed, 113 insertions(+), 44 deletions(-) diff --git a/apps/webapp/app/components/admin/debugRun.tsx b/apps/webapp/app/components/admin/debugRun.tsx index 5d5386a58aa..448b96b57bb 100644 --- a/apps/webapp/app/components/admin/debugRun.tsx +++ b/apps/webapp/app/components/admin/debugRun.tsx @@ -66,12 +66,66 @@ function DebugRunContent({ friendlyId }: { friendlyId: string }) { ); } +// PG-resident variants (V1 + V2). Carries `run.runtimeEnvironment` as +// non-null and the queue/env concurrency fields; the BUFFERED variant +// has neither and renders via `DebugRunDataBuffered`. +type PgVariant = Exclude, { engine: "BUFFERED" }>; + function DebugRunData(props: UseDataFunctionReturn) { + // BUFFERED is the mollifier-buffer branch — no PG queue presence, + // so the V1/V2 panels (which dereference run.runtimeEnvironment for + // MarQS / RunQueue key construction) would crash. Render the + // snapshot fields directly. Narrowing on `engine === "BUFFERED"` + // here also strips the buffered variant from the union TS sees in + // the V1/V2 branches below, restoring their non-null + // runtimeEnvironment assumption. The casts are inside the + // discriminated arms — `props.engine === "BUFFERED"` proves the + // shape but TS doesn't follow the narrowing through a `{...props}` + // spread, so we annotate the variant explicitly. + if (props.engine === "BUFFERED") { + return ; + } + if (props.engine === "V1") { - return ; + return ; } - return ; + return ; +} + +type BufferedVariant = Extract, { engine: "BUFFERED" }>; + +function DebugRunDataBuffered({ run }: BufferedVariant) { + return ( + + + ID + + + + + + State + Buffered (mollifier) — not yet materialised to Postgres + + + Task identifier + {run.taskIdentifier ?? "—"} + + + Queue (from snapshot) + {run.queue ?? "—"} + + + Concurrency key (from snapshot) + {run.concurrencyKey ?? "—"} + + + Buffered at + {new Date(run.queueTimestamp).toISOString()} + + + ); } function DebugRunDataEngineV1({ @@ -82,7 +136,7 @@ function DebugRunDataEngineV1({ envCurrentConcurrency, queueReserveConcurrency, envReserveConcurrency, -}: UseDataFunctionReturn) { +}: PgVariant) { const keys = new MarQSShortKeyProducer("marqs:"); const withPrefix = (key: string) => `marqs:${key}`; @@ -354,7 +408,7 @@ function DebugRunDataEngineV2({ envConcurrencyLimit, envCurrentConcurrency, keys, -}: UseDataFunctionReturn) { +}: PgVariant) { return ( diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx index ce80b32e1df..9b4af374f0a 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx @@ -142,7 +142,11 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { // Don't toast "Event not found" — that's noisy for the initial-render // request the dashboard fires before the root span auto-selects. // 204 No Content matches what the PG path returns for the same case. - return new Response(null, { status: 204 }); + // THROWN so the loader's return type stays a clean TypedJson union; + // returning a raw Response collapses the discriminated `type: + // "run" | "span"` union into `Response` downstream and the + // useTypedFetcher consumer loses .run / .span access. + throw new Response(null, { status: 204 }); } const run = await buildSyntheticSpanRun({ diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts index f9fa8521354..449541dd0ad 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts @@ -51,47 +51,58 @@ export async function loader({ request, params }: LoaderFunctionArgs) { // snapshot so the Debug panel can show *something* instead of 404'ing. const buffer = getMollifierBuffer(); if (buffer) { + // The `try` only wraps the buffer.getEntry call — that's the + // operation that can fail with a transient Redis error we want + // to mask as a 404 fallthrough. Authorization and snapshot + // parsing are split out below so a deliberate 404 + // (`throw new Response(...)`) or a malformed-snapshot crash + // isn't silently swallowed. + let entry: Awaited> | null = null; try { - const entry = await buffer.getEntry(runParam); - if (entry) { - // Same org-membership gate as the PG path above. Without it, - // any authenticated user who knows a runId could read the - // buffered run's queue/concurrencyKey snapshot across orgs. - const member = await $replica.orgMember.findFirst({ - where: { userId, organizationId: entry.orgId }, - select: { id: true }, - }); - if (!member) { - throw new Response("Not Found", { status: 404 }); - } - const snapshot = deserialiseSnapshot<{ - taskIdentifier?: string; - queue?: string; - concurrencyKey?: string; - }>(entry.payload); - return typedjson({ - engine: "V2" as const, - buffered: true, - run: { - id: entry.runId, - engine: "V2" as const, - friendlyId: entry.runId, - queue: snapshot.queue ?? null, - concurrencyKey: snapshot.concurrencyKey ?? null, - queueTimestamp: entry.createdAt, - runtimeEnvironment: null, - }, - queueConcurrencyLimit: undefined, - envConcurrencyLimit: undefined, - queueCurrentConcurrency: undefined, - envCurrentConcurrency: undefined, - queueReserveConcurrency: undefined, - envReserveConcurrency: undefined, - keys: [], - }); - } + entry = await buffer.getEntry(runParam); } catch { - // fall through to 404 on buffer error + // Transient buffer failure — fall through to 404 below. + } + if (entry) { + // Org-membership gate. Without it, any authenticated user who + // knows a runId could read the buffered run's queue / + // concurrencyKey snapshot across orgs. Thrown so the typedjson + // discriminated union below isn't polluted by a raw `Response` + // return — the consumer in `debugRun.tsx` switches on + // `engine`, and a Response variant collapses the union. + const member = await $replica.orgMember.findFirst({ + where: { userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) { + throw new Response("Not Found", { status: 404 }); + } + const snapshot = deserialiseSnapshot<{ + taskIdentifier?: string; + queue?: string; + concurrencyKey?: string; + }>(entry.payload); + // `engine: "BUFFERED"` is a distinct discriminant from V1/V2 + // so the `debugRun.tsx` consumer can narrow on it. Returning + // `engine: "V2"` here widened the V2 variant's + // `run.runtimeEnvironment` to nullable, which broke the + // MarQS-key call sites in the existing V1/V2 panels (they + // assume non-null). A buffered run has no PG queue presence + // so those panels would 404 anyway; the dedicated buffered + // handler in debugRun.tsx renders the snapshot's queue + + // concurrencyKey directly. + return typedjson({ + engine: "BUFFERED" as const, + buffered: true, + run: { + id: entry.runId, + friendlyId: entry.runId, + queue: snapshot.queue ?? null, + concurrencyKey: snapshot.concurrencyKey ?? null, + queueTimestamp: entry.createdAt, + taskIdentifier: snapshot.taskIdentifier ?? null, + }, + }); } } throw new Response("Not Found", { status: 404 }); From 9d428340574135c95d49ddaa2fc12efa20131376 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 27 May 2026 12:59:37 +0100 Subject: [PATCH 4/6] docs(server-changes): document mollifier dashboard + realtime parity Co-Authored-By: Claude Opus 4.7 (1M context) --- .server-changes/mollifier-dashboard.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .server-changes/mollifier-dashboard.md diff --git a/.server-changes/mollifier-dashboard.md b/.server-changes/mollifier-dashboard.md new file mode 100644 index 00000000000..12009c8afed --- /dev/null +++ b/.server-changes/mollifier-dashboard.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Mollifier dashboard parity: render buffered runs in the run page, realtime stream, cancel/replay/debug routes, and admin debug view. From da1a1ddfea7f6c02bc1ca043a1ac6abb1a65c4a7 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 27 May 2026 17:46:04 +0100 Subject: [PATCH 5/6] docs(mollifier): strip internal planning labels from comments Remove plan-tracking shorthand (Q2/Q4/Q5) from dashboard-layer mollifier route comments; reword to plain English. Comment-only; no behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...tParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx | 2 +- apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts | 2 +- apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx index 8a3f4dd3a6e..d871a262456 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.idempotencyKey.reset.tsx @@ -39,7 +39,7 @@ export const action: ActionFunction = async ({ request, params }) => { }, }); - // Resolve run from PG or the mollifier buffer (Q5). For a buffered + // Resolve run from PG or the mollifier buffer. For a buffered // run the snapshot carries the idempotencyKey + taskIdentifier; we // also need the runtimeEnvironmentId to feed ResetIdempotencyKeyService // (which clears both PG and the buffer lookup — B6b). diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts index c3dff252a73..f65d1f8b800 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts @@ -51,7 +51,7 @@ export const action: ActionFunction = async ({ request, params }) => { // PG miss — try the mollifier buffer. The customer can hit cancel // on a buffered run from the dashboard during the burst window. - // Q4 design: snapshot a `mark_cancelled` patch; the drainer's + // Snapshot a `mark_cancelled` patch; the drainer's // bifurcation routes the run to `engine.createCancelledRun` on // next pop. const buffer = getMollifierBuffer(); diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts index 62da62e0478..28968259da8 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts @@ -263,7 +263,7 @@ export const action: ActionFunction = async ({ request, params }) => { }, }); - // Mollifier read-fallback (Q2): if the original isn't in PG yet, + // Mollifier read-fallback: if the original isn't in PG yet, // synthesise a TaskRun from the buffered snapshot. The B4-extended // SyntheticRun carries every field ReplayTaskRunService reads. We // also need projectSlug + orgSlug + envSlug for the redirect path, From 28112c42913f783e940662f67686a4986b530819 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 27 May 2026 18:03:39 +0100 Subject: [PATCH 6/6] fix(webapp,redis-worker): enforce MAX_TAGS on buffered runs + surface metadata fallback errors The tags API skipped MAX_TAGS_PER_RUN enforcement on the buffered path, letting a buffered run exceed the cap the trigger validator applies at creation. Enforce it atomically in the mutateSnapshot Lua: append_tags now accepts an optional maxTags and returns "limit_exceeded" (writing nothing) when the deduped count would overflow. mutateWithFallback gains a symmetric rejectedResponse builder + a "rejected" outcome; the tags route returns 422, matching the PG path. Also stop silently swallowing PG failures in the metadata route's parent/root op fan-out: warn (with targetRunId + error) before the best-effort buffer fallback so a genuine PG outage is observable. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/mollifier-tag-cap.md | 5 ++ .../app/routes/api.v1.runs.$runId.metadata.ts | 16 ++++-- .../app/routes/api.v1.runs.$runId.tags.ts | 21 ++++--- .../v3/mollifier/mutateWithFallback.server.ts | 18 ++++++ .../test/mollifierMutateWithFallback.test.ts | 30 ++++++++++ .../redis-worker/src/mollifier/buffer.test.ts | 55 +++++++++++++++++++ packages/redis-worker/src/mollifier/buffer.ts | 24 +++++++- 7 files changed, 155 insertions(+), 14 deletions(-) create mode 100644 .changeset/mollifier-tag-cap.md diff --git a/.changeset/mollifier-tag-cap.md b/.changeset/mollifier-tag-cap.md new file mode 100644 index 00000000000..b9057664fa7 --- /dev/null +++ b/.changeset/mollifier-tag-cap.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/redis-worker": patch +--- + +Mollifier `mutateSnapshot` now enforces a tag cap: an `append_tags` patch carrying `maxTags` returns `"limit_exceeded"` (writing nothing) when the deduped tag count would exceed the limit, so a buffered run can't accumulate more tags via the tags API than the trigger validator allows at creation. diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts index 58a5c8311ef..a4a6de9182b 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts @@ -7,6 +7,7 @@ import { z } from "zod"; import { $replica } from "~/db.server"; import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { authenticateApiRequest } from "~/services/apiAuth.server"; +import { logger } from "~/services/logger.server"; import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { ServiceValidationError } from "~/v3/services/common.server"; @@ -84,10 +85,17 @@ async function routeOperationsToRun( ); if (!error) return; - // PG service threw — could be "Cannot update metadata for a completed - // run" or similar. If the target is buffered, route operations to its - // snapshot too. Best-effort; do not surface this failure to the - // caller — the parent/root ops are auxiliary. + // PG service threw — commonly "Cannot update metadata for a completed + // run", but it could also be a transient PG failure. The parent/root + // ops are auxiliary, so we stay best-effort and don't surface this to + // the caller — but we must not swallow the failure silently, otherwise + // a genuine PG outage on these ops is invisible. Warn, then try the + // buffer in case the target is itself buffered. + logger.warn("metadata route: parent/root PG op failed, falling back to buffer", { + targetRunId, + error: error instanceof Error ? error.message : String(error), + }); + await applyMetadataMutationToBufferedRun({ runId: targetRunId, body: { operations }, diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts b/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts index eeb8d6bc027..785cacd6d0e 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts @@ -44,11 +44,11 @@ export async function action({ request, params }: ActionFunctionArgs) { } const env = authenticationResult.environment; - const outcome = await mutateWithFallback({ + const outcome = await mutateWithFallback({ runId: parsedParams.data.runId, environmentId: env.id, organizationId: env.organizationId, - bufferPatch: { type: "append_tags", tags: nonEmptyTags }, + bufferPatch: { type: "append_tags", tags: nonEmptyTags, maxTags: MAX_TAGS_PER_RUN }, pgMutation: async (taskRun) => { const existing = taskRun.runTags ?? []; const newTags = nonEmptyTags.filter((t) => !existing.includes(t)); @@ -76,13 +76,20 @@ export async function action({ request, params }: ActionFunctionArgs) { return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 }); }, // Buffer-applied patch path. The mutateSnapshot Lua deduplicates - // against existing snapshot tags atomically. MAX_TAGS_PER_RUN - // enforcement is skipped on the buffered side — the drainer's - // engine.trigger writes the PG row without enforcement either, - // matching today's pre-buffer trigger semantics. A future - // refinement could push the limit check into the Lua. + // against existing snapshot tags atomically and enforces + // MAX_TAGS_PER_RUN via the `maxTags` we pass in `bufferPatch` — + // matching the PG-path cap above so a buffered run can't exceed the + // limit the trigger validator applies at creation. synthesisedResponse: () => json({ message: `Successfully set ${nonEmptyTags.length} new tags.` }, { status: 200 }), + // Buffer rejected the append because it would exceed the cap. We + // don't know the exact deduped overflow count here (the Lua does), + // so report the limit rather than a precise "trying to set N". + rejectedResponse: () => + json( + { error: `Runs can only have ${MAX_TAGS_PER_RUN} tags.` }, + { status: 422 } + ), abortSignal: getRequestAbortSignal(), }); diff --git a/apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts b/apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts index 6a0477aea62..ab0a8b36232 100644 --- a/apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts +++ b/apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts @@ -28,6 +28,11 @@ export type MutateWithFallbackInput = { // Called when the patch landed cleanly on the buffer snapshot. The // drainer will see the patched payload on its next pop. synthesisedResponse: () => TResponse | Promise; + // Called when the buffer rejected the patch as invalid (e.g. an + // `append_tags` patch carrying `maxTags` would exceed the cap). Required + // only by callers that send a rejectable patch; the helper throws if the + // buffer reports a rejection and no builder was supplied. + rejectedResponse?: () => TResponse | Promise; abortSignal?: AbortSignal; // Override defaults for tests. safetyNetMs?: number; @@ -47,6 +52,7 @@ export type MutateWithFallbackInput = { export type MutateWithFallbackOutcome = | { kind: "pg"; response: TResponse } | { kind: "snapshot"; response: TResponse } + | { kind: "rejected"; response: TResponse } | { kind: "not_found" } | { kind: "timed_out" }; @@ -86,6 +92,18 @@ export async function mutateWithFallback( return { kind: "snapshot", response: await input.synthesisedResponse() }; } + if (result === "limit_exceeded") { + // The buffer refused the patch (e.g. tag cap). Nothing was written. + // Surface the caller's rejection body; a missing builder means the + // caller sent a rejectable patch without handling the rejection. + if (!input.rejectedResponse) { + throw new Error( + "mutateWithFallback: buffer returned 'limit_exceeded' but no rejectedResponse was provided", + ); + } + return { kind: "rejected", response: await input.rejectedResponse() }; + } + if (result === "not_found") { // Disambiguate a genuine 404 from a replica-lag miss: ask the writer // directly. If the row just appeared post-drain we route through the diff --git a/apps/webapp/test/mollifierMutateWithFallback.test.ts b/apps/webapp/test/mollifierMutateWithFallback.test.ts index 372999cd3d5..7a9988702f0 100644 --- a/apps/webapp/test/mollifierMutateWithFallback.test.ts +++ b/apps/webapp/test/mollifierMutateWithFallback.test.ts @@ -283,6 +283,36 @@ describe("mutateWithFallback", () => { expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(0); }); + it("replica miss + buffer limit_exceeded → rejected via rejectedResponse builder", async () => { + const pgMutation = vi.fn(async () => "pg"); + const synthesisedResponse = vi.fn(() => "snap"); + const result = await mutateWithFallback({ + ...baseInput, + pgMutation, + synthesisedResponse, + rejectedResponse: () => "too-many-tags", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("limit_exceeded"), + }); + expect(result).toEqual({ kind: "rejected", response: "too-many-tags" }); + expect(pgMutation).not.toHaveBeenCalled(); + expect(synthesisedResponse).not.toHaveBeenCalled(); + }); + + it("buffer limit_exceeded without a rejectedResponse builder → throws (programmer error)", async () => { + await expect( + mutateWithFallback({ + ...baseInput, + pgMutation: async () => "pg", + synthesisedResponse: () => "snap", + prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica, + prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma, + getBuffer: () => bufferReturning("limit_exceeded"), + }) + ).rejects.toThrow(/limit_exceeded/); + }); + it("buffer is null (mollifier disabled) → not_found after replica miss", async () => { const result = await mutateWithFallback({ ...baseInput, diff --git a/packages/redis-worker/src/mollifier/buffer.test.ts b/packages/redis-worker/src/mollifier/buffer.test.ts index 8b07ee32fe4..21325297452 100644 --- a/packages/redis-worker/src/mollifier/buffer.test.ts +++ b/packages/redis-worker/src/mollifier/buffer.test.ts @@ -1937,6 +1937,61 @@ describe("MollifierBuffer.mutateSnapshot", () => { }, ); + redisTest( + "append_tags rejects with limit_exceeded when maxTags would be exceeded, writing nothing", + { timeout: 20_000 }, + async ({ redisContainer }) => { + const buffer = new MollifierBuffer({ + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + try { + await buffer.accept({ + runId: "r_cap", + envId: "env_m", + orgId: "org_1", + payload: serialiseSnapshot({ tags: ["a", "b"] }), + }); + + // 2 existing + 2 new = 4 deduped > cap of 3 → rejected, nothing written. + const rejected = await buffer.mutateSnapshot("r_cap", { + type: "append_tags", + tags: ["c", "d"], + maxTags: 3, + }); + expect(rejected).toBe("limit_exceeded"); + const afterReject = await buffer.getEntry("r_cap"); + const rejPayload = JSON.parse(afterReject!.payload) as { tags: string[] }; + expect(rejPayload.tags).toEqual(["a", "b"]); + + // Dedup keeps the count under the cap → applied. + const applied = await buffer.mutateSnapshot("r_cap", { + type: "append_tags", + tags: ["a", "c"], + maxTags: 3, + }); + expect(applied).toBe("applied_to_snapshot"); + const afterApply = await buffer.getEntry("r_cap"); + const appPayload = JSON.parse(afterApply!.payload) as { tags: string[] }; + expect(appPayload.tags).toEqual(["a", "b", "c"]); + + // Landing exactly on the cap is allowed. + const exact = await buffer.mutateSnapshot("r_cap", { + type: "append_tags", + tags: ["a", "b", "c"], + maxTags: 3, + }); + expect(exact).toBe("applied_to_snapshot"); + } finally { + await buffer.close(); + } + }, + ); + redisTest( "set_metadata replaces metadata + metadataType (last-write-wins)", { timeout: 20_000 }, diff --git a/packages/redis-worker/src/mollifier/buffer.ts b/packages/redis-worker/src/mollifier/buffer.ts index 809a61bd69d..80fe1522beb 100644 --- a/packages/redis-worker/src/mollifier/buffer.ts +++ b/packages/redis-worker/src/mollifier/buffer.ts @@ -37,12 +37,21 @@ export function mollifierReconnectDelayMs( } export type SnapshotPatch = - | { type: "append_tags"; tags: string[] } + // `maxTags`, when set, caps the deduped tag count atomically inside the + // Lua: if appending would push the snapshot over the limit the patch is + // rejected ("limit_exceeded") and nothing is written, mirroring the + // PG-path MAX_TAGS_PER_RUN check so a buffered run can't accumulate more + // tags than the trigger validator would have allowed at creation. + | { type: "append_tags"; tags: string[]; maxTags?: number } | { type: "set_metadata"; metadata: string; metadataType: string } | { type: "set_delay"; delayUntil: string } | { type: "mark_cancelled"; cancelledAt: string; cancelReason?: string }; -export type MutateSnapshotResult = "applied_to_snapshot" | "not_found" | "busy"; +export type MutateSnapshotResult = + | "applied_to_snapshot" + | "not_found" + | "busy" + | "limit_exceeded"; export type CasSetMetadataResult = | { kind: "applied"; newVersion: number } @@ -278,6 +287,8 @@ export class MollifierBuffer { // FAILED entry, whose hash the drainer-terminal `fail` path DELs. // - "busy": entry is DRAINING or materialised. The API // wait-and-bounces through PG. + // - "limit_exceeded": an `append_tags` patch carrying `maxTags` would + // push the deduped tag count over the cap; nothing is written. async mutateSnapshot(runId: string, patch: SnapshotPatch): Promise { const result = (await this.redis.mutateMollifierSnapshot( `mollifier:entries:${runId}`, @@ -286,7 +297,8 @@ export class MollifierBuffer { if ( result === "applied_to_snapshot" || result === "not_found" || - result === "busy" + result === "busy" || + result === "limit_exceeded" ) { return result; } @@ -881,6 +893,12 @@ export class MollifierBuffer { table.insert(merged, t) end end + -- Cap the deduped count when the caller supplies a limit, so a + -- buffered run can't exceed MAX_TAGS_PER_RUN via the tags API. + -- Reject the whole patch (write nothing) rather than truncating. + if patch.maxTags ~= nil and #merged > patch.maxTags then + return 'limit_exceeded' + end payload.tags = merged elseif patch.type == 'set_metadata' then payload.metadata = patch.metadata