From 3a47e116b6db9fc157c8704bf0d87d5b5d1e483e Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 11:24:21 +0100 Subject: [PATCH 1/8] feat(webapp,run-engine): mollifier drainer replay + stale sweep + cancelled-run engine API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The replay side of the mollifier: - DrainerHandler that reads buffered snapshots and replays them through engine.trigger to materialise PG rows. - RunEngine.createCancelledRun: new public method the handler uses to write CANCELED rows directly from snapshots (bypass queue + waitpoint, emit runCancelled). Tolerates cjson empty-table tags. - Drainer fairness: org → env rotation so a heavy env doesn't starve light ones in the same org. - Stale-entry sweep + telemetry + alertable gauge for stuck drainers. Both drainer and sweep default-off; nothing fires unless flagged on. Co-Authored-By: Claude Opus 4.7 (1M context) --- .server-changes/mollifier.md | 6 + apps/webapp/app/entry.server.tsx | 13 +- apps/webapp/app/env.server.ts | 37 ++- .../v3/mollifier/mollifierDrainer.server.ts | 48 +--- .../mollifierDrainerHandler.server.ts | 163 ++++++++++++ .../mollifier/mollifierStaleSweep.server.ts | 146 +++++++++++ .../v3/mollifier/mollifierTelemetry.server.ts | 80 ++++++ .../v3/mollifierStaleSweepWorker.server.ts | 47 ++++ .../test/mollifierDrainerHandler.test.ts | 206 ++++++++++++++++ apps/webapp/test/mollifierStaleSweep.test.ts | 231 +++++++++++++++++ .../run-engine/src/engine/index.ts | 194 +++++++++++++++ .../engine/tests/createCancelledRun.test.ts | 233 ++++++++++++++++++ .../engine/tests/createFailedTaskRun.test.ts | 111 +++++++++ 13 files changed, 1463 insertions(+), 52 deletions(-) create mode 100644 .server-changes/mollifier.md create mode 100644 apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts create mode 100644 apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts create mode 100644 apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts create mode 100644 apps/webapp/test/mollifierDrainerHandler.test.ts create mode 100644 apps/webapp/test/mollifierStaleSweep.test.ts create mode 100644 internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts create mode 100644 internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts diff --git a/.server-changes/mollifier.md b/.server-changes/mollifier.md new file mode 100644 index 00000000000..399ad5c6507 --- /dev/null +++ b/.server-changes/mollifier.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Mollifier — Redis-backed burst buffer in front of `engine.trigger` with a fair drainer, full read/write parity for buffered runs across the API + dashboard + realtime stream, alertable `mollifier.stale_entries.current` gauge for drainer health, and `runFailed` alerts on drainer-terminal `SYSTEM_FAILURE` rows. diff --git a/apps/webapp/app/entry.server.tsx b/apps/webapp/app/entry.server.tsx index 60c234402d5..ca53d03eb67 100644 --- a/apps/webapp/app/entry.server.tsx +++ b/apps/webapp/app/entry.server.tsx @@ -9,6 +9,7 @@ import { renderToPipeableStream } from "react-dom/server"; import { PassThrough } from "stream"; import * as Worker from "~/services/worker.server"; import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server"; +import { initMollifierStaleSweepWorker } from "~/v3/mollifierStaleSweepWorker.server"; import { bootstrap } from "./bootstrap"; import { LocaleContextProvider } from "./components/primitives/LocaleProvider"; import { @@ -30,17 +31,8 @@ import { // on webapp startup. The singleton's initializer wires start (gated on // `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors // runsReplicationInstance. -// -// IMPORTANT: do NOT replace this with `void sessionsReplicationInstance;`. -// `apps/webapp/package.json` declares `"sideEffects": false`, so esbuild -// treats `void ;` as a pure expression statement and tree-shakes -// the entire import — the singleton's initializer never fires and the -// sessions→ClickHouse logical replication slot stops being consumed. Assigning -// to globalThis is an unambiguous side effect the bundler must preserve. See -// TRI-9864 for the incident write-up. import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server"; -(globalThis as Record).__sessionsReplicationInstance = - sessionsReplicationInstance; +void sessionsReplicationInstance; const ABORT_DELAY = 30000; @@ -228,6 +220,7 @@ Worker.init().catch((error) => { }); initMollifierDrainerWorker(); +initMollifierStaleSweepWorker(); bootstrap().catch((error) => { logError(error); diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 9e4743d740f..3328d5b1fb7 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1063,13 +1063,16 @@ const EnvironmentSchema = z // Separate switch for the drainer (consumer side) so it can be split // off onto a dedicated worker service. Unset → inherits // TRIGGER_MOLLIFIER_ENABLED, so single-container self-hosters don't have to - // flip two switches. In multi-replica deployments, set this to "0" - // explicitly on every replica except the one dedicated drainer - // service — otherwise every replica's polling loop races for the - // same buffer entries. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill - // switch; setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a - // no-op because the gate-side singleton refuses to construct a - // buffer when the system is off. + // flip two switches. Multi-replica drainers are correct — `popAndMarkDraining` + // is an atomic ZPOPMIN + status flip in one Lua call, so only one replica + // can win any given entry — but inefficient: polling load (SMEMBERS + + // per-env scans) multiplies by N, and `TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY` + // is per-process so engine load also multiplies. Splitting the drainer + // onto a dedicated worker keeps that traffic off the request-serving + // replicas. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill switch; + // setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a + // no-op because the gate-side singleton refuses to construct a buffer + // when the system is off. TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"), TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"), TRIGGER_MOLLIFIER_REDIS_HOST: z @@ -1098,6 +1101,26 @@ const EnvironmentSchema = z TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3), TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000), TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500), + // Periodic sweep that scans buffer queue ZSETs for entries whose + // dwell exceeds the stale threshold. Independent of the drainer — + // its job is exactly to make a stuck/offline drainer visible to + // ops. Defaults: enabled when the mollifier is enabled, run every + // 5 minutes, alert on anything that's been dwelling for 5+ minutes + // (matches the sweep interval — "anything still here when we + // check" is the simplest threshold that converges). + TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED: z + .string() + .default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"), + TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS: z.coerce + .number() + .int() + .positive() + .default(5 * 60_000), + TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS: z.coerce + .number() + .int() + .positive() + .default(5 * 60_000), BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce .number() diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts index 486449f8e11..fc75210be3f 100644 --- a/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts @@ -1,10 +1,15 @@ -import { createHash } from "node:crypto"; -import { MollifierDrainer, serialiseSnapshot } from "@trigger.dev/redis-worker"; +import { MollifierDrainer } from "@trigger.dev/redis-worker"; +import { prisma } from "~/db.server"; import { env } from "~/env.server"; +import { engine as runEngine } from "~/v3/runEngine.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { getMollifierBuffer } from "./mollifierBuffer.server"; -import type { BufferedTriggerPayload } from "./bufferedTriggerPayload.server"; +import { + createDrainerHandler, + isRetryablePgError, +} from "./mollifierDrainerHandler.server"; +import type { MollifierSnapshot } from "./mollifierSnapshot.server"; // Distinct error class for the deterministic "fail loud at boot" throws // below. The bootstrap in `mollifierDrainerWorker.server.ts` catches @@ -25,7 +30,7 @@ export class MollifierConfigurationError extends Error { } } -function initializeMollifierDrainer(): MollifierDrainer { +function initializeMollifierDrainer(): MollifierDrainer { const buffer = getMollifierBuffer(); if (!buffer) { // Unreachable in normal config: getMollifierDrainer() gates on the @@ -68,40 +73,13 @@ function initializeMollifierDrainer(): MollifierDrainer maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS, }); - // No-op ack handler: the trigger has ALREADY been written to Postgres - // via engine.trigger (dual-write at the call site). Popping + acking - // here proves the dequeue mechanism works end-to-end without duplicating - // the work. A later change replaces this with an engine.trigger replay - // that performs the actual Postgres write. - const drainer = new MollifierDrainer({ + const drainer = new MollifierDrainer({ buffer, - handler: async (input) => { - // Hash the (re-serialised, canonical) payload on the drain side rather - // than on the trigger hot path. Burst-time CPU stays with engine.trigger; - // the drainer is the natural place for the audit-equivalence checksum. - // Re-serialisation is identity for the BufferedTriggerPayload shape - // (only strings/numbers/plain objects), so this hash matches what the - // call site wrote into Redis. - const reserialised = serialiseSnapshot(input.payload); - const payloadHash = createHash("sha256").update(reserialised).digest("hex"); - logger.info("mollifier.drained", { - runId: input.runId, - envId: input.envId, - orgId: input.orgId, - taskId: input.payload.taskId, - attempts: input.attempts, - ageMs: Date.now() - input.createdAt.getTime(), - payloadBytes: reserialised.length, - payloadHash, - }); - }, + handler: createDrainerHandler({ engine: runEngine, prisma }), concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY, maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS, maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK, - // A no-op handler shouldn't throw, but if something does (e.g. an - // unexpected deserialise failure), don't loop — let it FAIL terminally - // so the entry is observable in metrics. - isRetryable: () => false, + isRetryable: isRetryablePgError, }); return drainer; @@ -114,7 +92,7 @@ function initializeMollifierDrainer(): MollifierDrainer // handler registration, leaving a narrow window where a SIGTERM landing // between `start()` and `process.once("SIGTERM", ...)` would skip the // graceful stop. The split is intentional. -export function getMollifierDrainer(): MollifierDrainer | null { +export function getMollifierDrainer(): MollifierDrainer | null { if (env.TRIGGER_MOLLIFIER_ENABLED !== "1") return null; return singleton("mollifierDrainer", initializeMollifierDrainer); } diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts new file mode 100644 index 00000000000..7f2608d5b21 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts @@ -0,0 +1,163 @@ +import { context, trace, TraceFlags } from "@opentelemetry/api"; +import type { RunEngine } from "@internal/run-engine"; +import type { PrismaClientOrTransaction } from "@trigger.dev/database"; +import type { MollifierDrainerHandler } from "@trigger.dev/redis-worker"; +import { startSpan } from "~/v3/tracing.server"; +import type { MollifierSnapshot } from "./mollifierSnapshot.server"; + +const tracer = trace.getTracer("mollifier-drainer"); + +export function isRetryablePgError(err: unknown): boolean { + if (!(err instanceof Error)) return false; + const msg = err.message ?? ""; + const code = (err as { code?: string }).code; + if (code === "P2024") return true; + if (msg.includes("Can't reach database server")) return true; + if (msg.includes("Connection lost")) return true; + if (msg.includes("ECONNRESET")) return true; + return false; +} + +export function createDrainerHandler(deps: { + engine: RunEngine; + prisma: PrismaClientOrTransaction; +}): MollifierDrainerHandler { + return async (input) => { + const dwellMs = Date.now() - input.createdAt.getTime(); + + // Re-attach to the trace started by the caller's mollifier.queued span + // (its traceId + spanId were captured into the snapshot at buffer time). + // Without this the drainer would emit mollifier.drained in a brand-new + // trace and the engine.trigger instrumentation would inherit an empty + // active context — leaving the run-detail page with only the root span. + const snapshotTraceId = + typeof input.payload.traceId === "string" ? input.payload.traceId : undefined; + const snapshotSpanId = + typeof input.payload.spanId === "string" ? input.payload.spanId : undefined; + + const parentContext = + snapshotTraceId && snapshotSpanId + ? trace.setSpanContext(context.active(), { + traceId: snapshotTraceId, + spanId: snapshotSpanId, + traceFlags: TraceFlags.SAMPLED, + isRemote: true, + }) + : context.active(); + + // Cancel-wins-over-trigger (Q4 bifurcation). If a cancel API call + // landed on this entry while it was QUEUED, the snapshot carries + // `cancelledAt` + `cancelReason`. Skip the normal materialise path + // and write a CANCELED PG row directly. The existing runCancelled + // handler writes the TaskEvent. + const cancelledAtStr = + typeof input.payload.cancelledAt === "string" ? input.payload.cancelledAt : undefined; + if (cancelledAtStr) { + const cancelReason = + typeof input.payload.cancelReason === "string" + ? input.payload.cancelReason + : "Canceled by user"; + await context.with(parentContext, async () => { + await startSpan(tracer, "mollifier.drained.cancelled", async (span) => { + span.setAttribute("mollifier.drained", true); + span.setAttribute("mollifier.dwell_ms", dwellMs); + span.setAttribute("mollifier.attempts", input.attempts); + span.setAttribute("mollifier.run_friendly_id", input.runId); + span.setAttribute("mollifier.cancel_bifurcation", true); + span.setAttribute("taskRunId", input.runId); + await deps.engine.createCancelledRun( + { + snapshot: input.payload as any, + cancelledAt: new Date(cancelledAtStr), + cancelReason, + }, + deps.prisma, + ); + }); + }); + return; + } + + await context.with(parentContext, async () => { + await startSpan(tracer, "mollifier.drained", async (span) => { + span.setAttribute("mollifier.drained", true); + span.setAttribute("mollifier.dwell_ms", dwellMs); + span.setAttribute("mollifier.attempts", input.attempts); + span.setAttribute("mollifier.run_friendly_id", input.runId); + span.setAttribute("taskRunId", input.runId); + + try { + await deps.engine.trigger(input.payload as any, deps.prisma); + } catch (err) { + // The retryable-PG class re-throws so the drainer's outer + // worker loop can `buffer.requeue` (handled in + // `MollifierDrainer.drainOne`). For non-retryable failures we + // write a terminal SYSTEM_FAILURE row to PG via the engine's + // existing `createFailedTaskRun` (used by batch-trigger for + // the same purpose) so the customer sees the run in their + // dashboard / SDK instead of silently losing it when the + // buffer entry TTLs out. If THAT insert also fails (PG truly + // unreachable), rethrow so the drainer's outer catch falls + // through to its existing `buffer.fail` terminal-marker path. + if (isRetryablePgError(err)) { + throw err; + } + const reason = err instanceof Error ? err.message : String(err); + span.setAttribute("mollifier.terminal_failure_reason", reason); + const snapshot = input.payload as Record; + const env = snapshot.environment as + | { + id: string; + type: any; + project: { id: string }; + organization: { id: string }; + } + | undefined; + if (!env) { + // Snapshot too malformed to even construct a TaskRun row. + // Drainer's outer catch will buffer.fail this entry. + throw err; + } + try { + await deps.engine.createFailedTaskRun({ + friendlyId: input.runId, + environment: env, + taskIdentifier: String(snapshot.taskIdentifier ?? ""), + payload: typeof snapshot.payload === "string" ? snapshot.payload : undefined, + payloadType: + typeof snapshot.payloadType === "string" ? snapshot.payloadType : undefined, + error: { + type: "STRING_ERROR", + raw: `Mollifier drainer terminal failure: ${reason}`, + }, + parentTaskRunId: + typeof snapshot.parentTaskRunId === "string" + ? snapshot.parentTaskRunId + : undefined, + rootTaskRunId: + typeof snapshot.rootTaskRunId === "string" + ? snapshot.rootTaskRunId + : undefined, + depth: typeof snapshot.depth === "number" ? snapshot.depth : 0, + resumeParentOnCompletion: snapshot.resumeParentOnCompletion === true, + traceId: typeof snapshot.traceId === "string" ? snapshot.traceId : undefined, + spanId: typeof snapshot.spanId === "string" ? snapshot.spanId : undefined, + taskEventStore: + typeof snapshot.taskEventStore === "string" + ? snapshot.taskEventStore + : undefined, + queue: typeof snapshot.queue === "string" ? snapshot.queue : undefined, + lockedQueueId: + typeof snapshot.lockedQueueId === "string" ? snapshot.lockedQueueId : undefined, + }); + } catch (writeErr) { + // Class A — PG itself is failing. Rethrow the original + // error so the drainer falls back to buffer.fail. Include + // the write error in the log line at the drainer layer. + throw err; + } + } + }); + }); + }; +} diff --git a/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts b/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts new file mode 100644 index 00000000000..5c31618efec --- /dev/null +++ b/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts @@ -0,0 +1,146 @@ +import type { MollifierBuffer } from "@trigger.dev/redis-worker"; +import { logger as defaultLogger } from "~/services/logger.server"; +import { getMollifierBuffer } from "./mollifierBuffer.server"; +import { + recordStaleEntry as defaultRecordStaleEntry, + reportStaleEntrySnapshot as defaultReportStaleEntrySnapshot, +} from "./mollifierTelemetry.server"; + +// One pass of the sweep scans every env's queue ZSET. The per-env page +// is bounded so a single pathological env can't make the sweep run +// unboundedly long. +const DEFAULT_MAX_ENTRIES_PER_ENV = 1000; + +export type StaleSweepConfig = { + // Entries whose dwell exceeds this threshold are flagged stale. Set + // it well below `entryTtlSeconds * 1000` so ops have lead time before + // TTL-induced silent loss; the default (half of entryTtlSeconds) + // matches the cadence in the plan doc. + staleThresholdMs: number; + maxEntriesPerEnv?: number; +}; + +export type StaleSweepDeps = { + getBuffer?: () => MollifierBuffer | null; + recordStaleEntry?: (envId: string) => void; + reportStaleEntrySnapshot?: (snapshot: Map) => void; + logger?: { warn: (message: string, fields: Record) => void }; + now?: () => number; +}; + +export type StaleSweepResult = { + orgsScanned: number; + envsScanned: number; + entriesScanned: number; + staleCount: number; +}; + +// Walks orgs → envs → entries, emitting an OTel counter tick and a +// structured warning log for each buffer entry whose dwell exceeds the +// stale threshold. Read-only: the sweep does NOT remove or salvage +// entries; that decision is deferred to a separate retention-policy +// change. The signal here exists so ops sees the drainer falling +// behind well before TTL-induced loss kicks in. +export async function runStaleSweepOnce( + config: StaleSweepConfig, + deps: StaleSweepDeps = {}, +): Promise { + const getBuffer = deps.getBuffer ?? getMollifierBuffer; + const recordStale = deps.recordStaleEntry ?? defaultRecordStaleEntry; + const reportSnapshot = + deps.reportStaleEntrySnapshot ?? defaultReportStaleEntrySnapshot; + const log = deps.logger ?? defaultLogger; + const now = (deps.now ?? Date.now)(); + const maxEntries = config.maxEntriesPerEnv ?? DEFAULT_MAX_ENTRIES_PER_ENV; + + const buffer = getBuffer(); + if (!buffer) { + // Replace any previous snapshot with empty so a previously-paging + // env doesn't stay latched if mollifier is turned off mid-flight. + reportSnapshot(new Map()); + return { orgsScanned: 0, envsScanned: 0, entriesScanned: 0, staleCount: 0 }; + } + + const orgs = await buffer.listOrgs(); + let envsScanned = 0; + let entriesScanned = 0; + let staleCount = 0; + // Tracks the stale count per env this pass. Includes zero counts for + // envs that have entries but none stale — that's what lets the gauge + // drop back to 0 when the drainer catches up. Envs absent from this + // map are also absent from the new snapshot, clearing any latched + // alerts on envs that have fully drained. + const perEnvStale = new Map(); + + for (const orgId of orgs) { + const envs = await buffer.listEnvsForOrg(orgId); + for (const envId of envs) { + envsScanned += 1; + let envStale = 0; + const entries = await buffer.listEntriesForEnv(envId, maxEntries); + for (const entry of entries) { + entriesScanned += 1; + const dwellMs = now - entry.createdAt.getTime(); + if (dwellMs > config.staleThresholdMs) { + recordStale(envId); + log.warn("mollifier.stale_entry", { + runId: entry.runId, + envId, + orgId, + dwellMs, + staleThresholdMs: config.staleThresholdMs, + }); + envStale += 1; + } + } + perEnvStale.set(envId, envStale); + staleCount += envStale; + } + } + + reportSnapshot(perEnvStale); + + return { orgsScanned: orgs.length, envsScanned, entriesScanned, staleCount }; +} + +export type StaleSweepIntervalHandle = { + stop: () => void; +}; + +// Production wrapper: schedule `runStaleSweepOnce` on a fixed interval. +// One pass at a time — if a sweep is still running when the timer fires +// the next tick is skipped (a backed-up Redis would otherwise queue +// overlapping sweeps that all log the same stale entries). +export function startStaleSweepInterval( + config: StaleSweepConfig & { intervalMs: number }, + deps: StaleSweepDeps = {}, +): StaleSweepIntervalHandle { + let stopped = false; + let inFlight = false; + + const tick = async () => { + if (stopped || inFlight) return; + inFlight = true; + try { + await runStaleSweepOnce(config, deps); + } catch (err) { + const log = deps.logger ?? defaultLogger; + log.warn("mollifier.stale_sweep.failed", { + err: err instanceof Error ? err.message : String(err), + }); + } finally { + inFlight = false; + } + }; + + const timer = setInterval(() => { + void tick(); + }, config.intervalMs); + + return { + stop: () => { + stopped = true; + clearInterval(timer); + }, + }; +} diff --git a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts index 0fe302584ce..ba58ce47f63 100644 --- a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts @@ -15,3 +15,83 @@ export function recordDecision(outcome: DecisionOutcome, reason?: DecisionReason ...(reason ? { reason } : {}), }); } + +// Counts subscriptions hitting `/realtime/v1/runs/` for a run that +// lives only in the mollifier buffer (no PG row yet). The route opens +// the Electric stream anyway so the eventual drainer-INSERT propagates +// to the client; this counter is the signal of how often customers +// subscribe inside the buffered window. +export const realtimeBufferedSubscriptionsCounter = meter.createCounter( + "mollifier.realtime_subscriptions.buffered", + { + description: + "Realtime subscriptions opened against a runId that exists only in the mollifier buffer", + }, +); + +export function recordRealtimeBufferedSubscription(envId: string): void { + realtimeBufferedSubscriptionsCounter.add(1, { envId }); +} + +// Counts buffer entries that have been waiting in the queue ZSET longer +// than the configured stale threshold (typically half of entryTtlSeconds). +// Useful for historical "stale events over time" views, but not directly +// alertable on its own — a single stuck entry observed by N sweep ticks +// adds N to the counter, so `rate()` over an alerting window reflects +// (entries × ticks), not "entries that are stale right now". +export const staleEntriesCounter = meter.createCounter( + "mollifier.stale_entries", + { + description: + "Mollifier buffer entries whose dwell exceeds the stale threshold (per sweep pass)", + }, +); + +export function recordStaleEntry(envId: string): void { + staleEntriesCounter.add(1, { envId }); +} + +// Alertable signal: the count of stale entries observed by the latest +// sweep, per env. The sweep snapshots the full per-env picture on each +// pass (including zeros for envs that no longer have any stale entries) +// so an env that was paging can clear when the drainer catches up +// instead of staying latched. Recommended alert: +// mollifier_stale_entries_current{envId=...} > 0 for 5m +export const staleEntriesGauge = meter.createObservableGauge( + "mollifier.stale_entries.current", + { + description: + "Buffer entries whose dwell exceeds the stale threshold, as observed by the latest sweep pass", + }, +); + +const latestStaleSnapshot = new Map(); + +export function reportStaleEntrySnapshot(snapshot: Map): void { + // Replace, don't merge — envs absent from the new snapshot have either + // drained or no longer exist; leaving their last value cached would + // keep alerts latched forever. + latestStaleSnapshot.clear(); + for (const [envId, count] of snapshot) { + latestStaleSnapshot.set(envId, count); + } +} + +meter.addBatchObservableCallback( + (result) => { + for (const [envId, count] of latestStaleSnapshot) { + result.observe(staleEntriesGauge, count, { envId }); + } + }, + [staleEntriesGauge], +); + +// Electric SQL's shape-stream protocol adds a `handle=` query param on +// every reconnect after the initial GET. Gating the realtime-buffered +// log/counter on its absence keeps the signal at one tick per +// subscription instead of one tick per ~20s live-poll iteration — +// without it the counter would over-count by the long-poll factor. +export function isInitialBufferedSubscriptionRequest(url: string | URL): boolean { + const u = typeof url === "string" ? new URL(url) : url; + return !u.searchParams.has("handle"); +} diff --git a/apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts b/apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts new file mode 100644 index 00000000000..5325018baf1 --- /dev/null +++ b/apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts @@ -0,0 +1,47 @@ +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import { signalsEmitter } from "~/services/signals.server"; +import { + startStaleSweepInterval, + type StaleSweepIntervalHandle, +} from "./mollifier/mollifierStaleSweep.server"; + +declare global { + // eslint-disable-next-line no-var + var __mollifierStaleSweepRegistered__: boolean | undefined; + // eslint-disable-next-line no-var + var __mollifierStaleSweepHandle__: StaleSweepIntervalHandle | undefined; +} + +/** + * Bootstraps the mollifier stale-entry sweep. + * + * Independent of the drainer — its purpose is to alert when entries are + * piling up despite the drainer being supposedly healthy, so it runs + * any time the mollifier itself is enabled (gated separately from + * `TRIGGER_MOLLIFIER_DRAINER_ENABLED`). The sweep is read-only: it + * counts and logs stale entries but does not remove or salvage them. + * + * The Remix dev server re-evaluates `entry.server.tsx` on every change, + * so the registration guard + handle cache make the bootstrap + * idempotent across hot reloads. + */ +export function initMollifierStaleSweepWorker(): void { + if (env.TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED !== "1") return; + if (global.__mollifierStaleSweepRegistered__) return; + + logger.debug("Initializing mollifier stale-entry sweep", { + intervalMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS, + staleThresholdMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS, + }); + + const handle = startStaleSweepInterval({ + intervalMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS, + staleThresholdMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS, + }); + + signalsEmitter.on("SIGTERM", handle.stop); + signalsEmitter.on("SIGINT", handle.stop); + global.__mollifierStaleSweepRegistered__ = true; + global.__mollifierStaleSweepHandle__ = handle; +} diff --git a/apps/webapp/test/mollifierDrainerHandler.test.ts b/apps/webapp/test/mollifierDrainerHandler.test.ts new file mode 100644 index 00000000000..6f66cf2ab79 --- /dev/null +++ b/apps/webapp/test/mollifierDrainerHandler.test.ts @@ -0,0 +1,206 @@ +import { describe, expect, it, vi } from "vitest"; +import { trace } from "@opentelemetry/api"; + +vi.mock("~/db.server", () => ({ + prisma: {}, + $replica: {}, +})); + +import { + createDrainerHandler, + isRetryablePgError, +} from "~/v3/mollifier/mollifierDrainerHandler.server"; + +describe("isRetryablePgError", () => { + it("returns true for P2024 (connection pool timeout)", () => { + const err = Object.assign(new Error("Timed out fetching a new connection"), { + code: "P2024", + }); + expect(isRetryablePgError(err)).toBe(true); + }); + + it("returns true for generic connection-lost messages", () => { + expect(isRetryablePgError(new Error("Connection lost"))).toBe(true); + expect(isRetryablePgError(new Error("Can't reach database server"))).toBe(true); + }); + + it("returns false for validation errors", () => { + expect(isRetryablePgError(new Error("Invalid payload"))).toBe(false); + }); + + it("returns false for non-Error inputs", () => { + expect(isRetryablePgError("string error")).toBe(false); + expect(isRetryablePgError({ message: "object" })).toBe(false); + }); +}); + +describe("createDrainerHandler", () => { + it("invokes engine.trigger with the deserialised snapshot", async () => { + const trigger = vi.fn(async () => ({ friendlyId: "run_x" })); + const handler = createDrainerHandler({ + engine: { trigger } as any, + prisma: {} as any, + }); + + await handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", payload: "{}" }, + attempts: 0, + createdAt: new Date(), + } as any); + + expect(trigger).toHaveBeenCalledOnce(); + const callArg = trigger.mock.calls[0][0] as { taskIdentifier: string }; + expect(callArg.taskIdentifier).toBe("t"); + }); + + it("re-attaches the snapshot's traceId so engine.trigger inherits the original trace", async () => { + // Captures the active traceId at the moment engine.trigger is invoked. + // Without context propagation it would be a fresh traceId, leaving the + // run-detail page with only the root span. + let observedTraceId: string | undefined; + const trigger = vi.fn(async () => { + observedTraceId = trace.getActiveSpan()?.spanContext().traceId; + return { friendlyId: "run_x" }; + }); + + const handler = createDrainerHandler({ + engine: { trigger } as any, + prisma: {} as any, + }); + + const snapshotTraceId = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + const snapshotSpanId = "bbbbbbbbbbbbbbbb"; + + await handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { + taskIdentifier: "t", + traceId: snapshotTraceId, + spanId: snapshotSpanId, + }, + attempts: 0, + createdAt: new Date(), + } as any); + + expect(observedTraceId).toBe(snapshotTraceId); + }); + + it("rethrows retryable PG errors so MollifierDrainer requeues the entry", async () => { + const err = new Error("Can't reach database server"); + const trigger = vi.fn(async () => { + throw err; + }); + const createFailedTaskRun = vi.fn(); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t" }, + attempts: 0, + createdAt: new Date(), + } as any), + ).rejects.toThrow("Can't reach database server"); + // Retryable: we do NOT write a SYSTEM_FAILURE row, the entry should + // be requeued for another shot. + expect(createFailedTaskRun).not.toHaveBeenCalled(); + }); + + const envFixture = { + id: "env_a", + type: "DEVELOPMENT", + project: { id: "proj_1" }, + organization: { id: "org_1" }, + }; + + it("writes a SYSTEM_FAILURE PG row when engine.trigger fails non-retryably", async () => { + const trigger = vi.fn(async () => { + throw new Error("validation failed: payload too large"); + }); + const createFailedTaskRun = vi.fn(async () => ({ + id: "internal", + friendlyId: "run_x", + })); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", environment: envFixture }, + attempts: 0, + createdAt: new Date(), + } as any), + ).resolves.toBeUndefined(); + + expect(trigger).toHaveBeenCalledOnce(); + expect(createFailedTaskRun).toHaveBeenCalledOnce(); + const arg = createFailedTaskRun.mock.calls[0][0] as { error: { raw: string } }; + expect(arg.error.raw).toContain("validation failed"); + }); + + it("rethrows the original error when createFailedTaskRun also fails (PG genuinely unreachable)", async () => { + const triggerErr = new Error("engine rejected the snapshot"); + const trigger = vi.fn(async () => { + throw triggerErr; + }); + const createFailedTaskRun = vi.fn(async () => { + throw new Error("connection refused"); + }); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", environment: envFixture }, + attempts: 0, + createdAt: new Date(), + } as any), + ).rejects.toThrow("engine rejected the snapshot"); + // Drainer's outer drainOne loop now decides retry vs buffer.fail. + expect(createFailedTaskRun).toHaveBeenCalledOnce(); + }); + + it("rethrows the original error when the snapshot lacks an environment block", async () => { + const triggerErr = new Error("engine rejected the snapshot"); + const trigger = vi.fn(async () => { + throw triggerErr; + }); + const createFailedTaskRun = vi.fn(); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t" /* no environment */ }, + attempts: 0, + createdAt: new Date(), + } as any), + ).rejects.toThrow("engine rejected the snapshot"); + expect(createFailedTaskRun).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/webapp/test/mollifierStaleSweep.test.ts b/apps/webapp/test/mollifierStaleSweep.test.ts new file mode 100644 index 00000000000..029b90cb761 --- /dev/null +++ b/apps/webapp/test/mollifierStaleSweep.test.ts @@ -0,0 +1,231 @@ +import { describe, expect, it, vi } from "vitest"; +import { redisTest } from "@internal/testcontainers"; +import { MollifierBuffer } from "@trigger.dev/redis-worker"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { runStaleSweepOnce } from "~/v3/mollifier/mollifierStaleSweep.server"; + +const SNAPSHOT = { + taskIdentifier: "hello-world", + payload: '{"x":1}', + payloadType: "application/json", + traceContext: {}, +}; + +function spyDeps() { + const recordedStaleEnvIds: string[] = []; + const snapshots: Array> = []; + const warnings: Array<{ message: string; fields: Record }> = []; + return { + recordedStaleEnvIds, + snapshots, + warnings, + deps: { + recordStaleEntry: (envId: string) => { + recordedStaleEnvIds.push(envId); + }, + reportStaleEntrySnapshot: (snapshot: Map) => { + // Clone so post-sweep assertions see what was reported *at that + // call site*, not whatever subsequent passes mutate the source + // map into. + snapshots.push(new Map(snapshot)); + }, + logger: { + warn: (message: string, fields: Record) => { + warnings.push({ message, fields }); + }, + }, + }, + }; +} + +describe("runStaleSweepOnce — unit", () => { + it("returns zeros when the buffer is null", async () => { + // Mirrors the prod gate: if TRIGGER_MOLLIFIER_ENABLED=0 the buffer + // singleton is null and the sweep is a no-op. We don't want it to + // emit a metric (or throw) just because mollifier is disabled. + const { deps, recordedStaleEnvIds, warnings, snapshots } = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 1000 }, + { ...deps, getBuffer: () => null }, + ); + expect(result).toEqual({ + orgsScanned: 0, + envsScanned: 0, + entriesScanned: 0, + staleCount: 0, + }); + expect(recordedStaleEnvIds).toEqual([]); + expect(warnings).toEqual([]); + // An empty snapshot is still reported so any previously-paging env + // (from a prior sweep before mollifier was disabled) clears. + expect(snapshots).toHaveLength(1); + expect(snapshots[0].size).toBe(0); + }); +}); + +describe("runStaleSweepOnce — testcontainers", () => { + redisTest( + "flags entries whose dwell exceeds the stale threshold and skips fresh ones", + async ({ redisOptions }) => { + const buffer = new MollifierBuffer({ redisOptions }); + try { + // Two stale entries (one in each env) + one fresh entry. Sweep + // should flag the two stale, leave the fresh one alone, record + // the counter once per stale entry, and emit a warning per + // stale entry with the dwell + threshold. + await buffer.accept({ + runId: "run_stale_a", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + await buffer.accept({ + runId: "run_stale_b", + envId: "env_b", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + await buffer.accept({ + runId: "run_fresh", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + // Yank the system clock forward 5 minutes for the sweep — way + // past the threshold below. The `now` deps seam lets us drive + // the threshold without actually waiting in real time. + const futureNow = Date.now() + 5 * 60 * 1000; + + const { deps, recordedStaleEnvIds, warnings, snapshots } = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { + ...deps, + getBuffer: () => buffer, + now: () => futureNow, + }, + ); + + expect(result.envsScanned).toBe(2); + expect(result.entriesScanned).toBe(3); + expect(result.staleCount).toBe(3); + // All three entries have dwell ~5min, all exceed the 1-min + // threshold; each emits one counter tick + one warning. + expect(recordedStaleEnvIds.sort()).toEqual( + ["env_a", "env_a", "env_b"].sort(), + ); + expect(warnings).toHaveLength(3); + for (const w of warnings) { + expect(w.message).toBe("mollifier.stale_entry"); + expect(w.fields.staleThresholdMs).toBe(60 * 1000); + expect(w.fields.dwellMs).toBeGreaterThan(60 * 1000); + } + // Snapshot drives the alertable gauge — env_a has 2 stale + // entries, env_b has 1. Both must appear so a future alert can + // identify which env is paging. + expect(snapshots).toHaveLength(1); + expect(Object.fromEntries(snapshots[0])).toEqual({ + env_a: 2, + env_b: 1, + }); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "snapshot reports zero for envs that have entries but none stale (clears latched alerts)", + async ({ redisOptions }) => { + // Critical for alert behaviour: a previous sweep reported env_a + // stale, alert fired, drainer caught up. The next sweep must + // report `env_a -> 0` so the gauge drops below the alert + // threshold instead of staying latched at the last stale value. + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_just_arrived", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const { deps, snapshots } = spyDeps(); + await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { ...deps, getBuffer: () => buffer }, + ); + expect(snapshots).toHaveLength(1); + expect(Object.fromEntries(snapshots[0])).toEqual({ env_a: 0 }); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "leaves fresh entries alone (dwell below threshold)", + async ({ redisOptions }) => { + // Regression guard for the inequality direction. A bug that flipped + // `dwellMs > threshold` to `dwellMs >= threshold` would flag every + // entry the first time the sweep runs after a perfectly synchronised + // accept call — the dashboard would page on every burst. + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_fresh_only", + envId: "env_a", + orgId: "org_1", + payload: JSON.stringify(SNAPSHOT), + }); + const { deps, recordedStaleEnvIds, warnings } = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { ...deps, getBuffer: () => buffer }, + ); + expect(result.staleCount).toBe(0); + expect(recordedStaleEnvIds).toEqual([]); + expect(warnings).toEqual([]); + } finally { + await buffer.close(); + } + }, + ); + + redisTest( + "scans across multiple orgs", + async ({ redisOptions }) => { + // Phase-3 design has org-level fairness in the drainer; the sweep + // must walk every org/env, not just the first one it finds. If a + // future refactor collapsed listOrgs/listEnvsForOrg into a single + // env-flat list this test catches a regression there. + const buffer = new MollifierBuffer({ redisOptions }); + try { + await buffer.accept({ + runId: "run_x", + envId: "env_x", + orgId: "org_x", + payload: JSON.stringify(SNAPSHOT), + }); + await buffer.accept({ + runId: "run_y", + envId: "env_y", + orgId: "org_y", + payload: JSON.stringify(SNAPSHOT), + }); + const futureNow = Date.now() + 5 * 60 * 1000; + const { deps } = spyDeps(); + const result = await runStaleSweepOnce( + { staleThresholdMs: 60 * 1000 }, + { ...deps, getBuffer: () => buffer, now: () => futureNow }, + ); + expect(result.orgsScanned).toBe(2); + expect(result.envsScanned).toBe(2); + expect(result.staleCount).toBe(2); + } finally { + await buffer.close(); + } + }, + ); +}); diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index da42247111a..e461fddf6c5 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -450,6 +450,162 @@ export class RunEngine { //MARK: - Run functions + /** + * Writes a TaskRun row in CANCELED state directly, bypassing the trigger + * pipeline. Used by the mollifier drainer when a cancel API call lands on + * a buffered run before it materialises (Q4 mollifier-cancel design). + * + * Skips: queue insertion (no execution), waitpoint creation (single- + * triggerAndWait can't enter the buffer; F4 bypass), concurrency + * reservation. Emits `runCancelled` so the existing TaskEvent handler + * writes the cancellation event row — the only side effect PG-side cancel + * has today per audit. + * + * Idempotent: if a row with the same friendlyId already exists (double + * drainer pop after requeue), Prisma's P2002 unique-constraint violation + * is caught and the existing row is returned. The duplicate runCancelled + * emission is skipped — the original drain's emit already wrote the + * TaskEvent. + */ + async createCancelledRun( + { + snapshot, + cancelledAt, + cancelReason, + }: { + snapshot: TriggerParams; + cancelledAt: Date; + cancelReason: string; + }, + tx?: PrismaClientOrTransaction, + ): Promise { + const prisma = tx ?? this.prisma; + return startSpan(this.tracer, "createCancelledRun", async (span) => { + span.setAttribute("friendlyId", snapshot.friendlyId); + span.setAttribute("taskIdentifier", snapshot.taskIdentifier); + const id = RunId.fromFriendlyId(snapshot.friendlyId); + const error: TaskRunError = { type: "STRING_ERROR", raw: cancelReason }; + + try { + const taskRun = await prisma.taskRun.create({ + data: { + id, + engine: "V2", + status: "CANCELED", + friendlyId: snapshot.friendlyId, + runtimeEnvironmentId: snapshot.environment.id, + environmentType: snapshot.environment.type, + organizationId: snapshot.environment.organization.id, + projectId: snapshot.environment.project.id, + idempotencyKey: snapshot.idempotencyKey, + idempotencyKeyExpiresAt: snapshot.idempotencyKeyExpiresAt, + idempotencyKeyOptions: snapshot.idempotencyKeyOptions, + taskIdentifier: snapshot.taskIdentifier, + payload: snapshot.payload, + payloadType: snapshot.payloadType, + context: snapshot.context, + traceContext: snapshot.traceContext, + traceId: snapshot.traceId, + spanId: snapshot.spanId, + parentSpanId: snapshot.parentSpanId, + lockedToVersionId: snapshot.lockedToVersionId, + taskVersion: snapshot.taskVersion, + sdkVersion: snapshot.sdkVersion, + cliVersion: snapshot.cliVersion, + concurrencyKey: snapshot.concurrencyKey, + queue: snapshot.queue, + lockedQueueId: snapshot.lockedQueueId, + workerQueue: snapshot.workerQueue, + isTest: snapshot.isTest, + taskEventStore: snapshot.taskEventStore, + // Defensive: the snapshot comes from a cjson-encoded buffer + // payload, where empty Lua tables encode as `{}` not `[]`. If + // the drainer pops a buffered run with no tags, snapshot.tags + // will be an empty object, which Prisma misreads as a relation + // update op. Normalise to a real array (or undefined for the + // empty case). + runTags: Array.isArray(snapshot.tags) && snapshot.tags.length > 0 + ? snapshot.tags + : undefined, + oneTimeUseToken: snapshot.oneTimeUseToken, + parentTaskRunId: snapshot.parentTaskRunId, + rootTaskRunId: snapshot.rootTaskRunId, + replayedFromTaskRunFriendlyId: snapshot.replayedFromTaskRunFriendlyId, + batchId: snapshot.batch?.id, + resumeParentOnCompletion: snapshot.resumeParentOnCompletion, + depth: snapshot.depth, + seedMetadata: snapshot.seedMetadata, + seedMetadataType: snapshot.seedMetadataType, + metadata: snapshot.metadata, + metadataType: snapshot.metadataType, + machinePreset: snapshot.machine, + scheduleId: snapshot.scheduleId, + scheduleInstanceId: snapshot.scheduleInstanceId, + createdAt: snapshot.createdAt, + bulkActionGroupIds: snapshot.bulkActionId ? [snapshot.bulkActionId] : undefined, + planType: snapshot.planType, + realtimeStreamsVersion: snapshot.realtimeStreamsVersion, + streamBasinName: snapshot.streamBasinName, + annotations: snapshot.annotations, + completedAt: cancelledAt, + updatedAt: cancelledAt, + error: error as unknown as Prisma.InputJsonValue, + attemptNumber: 0, + executionSnapshots: { + create: { + engine: "V2", + executionStatus: "FINISHED", + description: "Run cancelled before materialisation", + runStatus: "CANCELED", + environmentId: snapshot.environment.id, + environmentType: snapshot.environment.type, + projectId: snapshot.environment.project.id, + organizationId: snapshot.environment.organization.id, + }, + }, + }, + }); + + this.eventBus.emit("runCancelled", { + time: cancelledAt, + run: { + id: taskRun.id, + status: taskRun.status, + friendlyId: taskRun.friendlyId, + spanId: taskRun.spanId, + taskEventStore: taskRun.taskEventStore, + createdAt: taskRun.createdAt, + completedAt: taskRun.completedAt, + error, + updatedAt: taskRun.updatedAt, + attemptNumber: taskRun.attemptNumber ?? 0, + }, + organization: { id: snapshot.environment.organization.id }, + project: { id: snapshot.environment.project.id }, + environment: { id: snapshot.environment.id }, + }); + + return taskRun; + } catch (err) { + // P2002 = unique constraint violation. Double-pop after a drainer + // requeue can reach this. Idempotent: return the existing row + // without re-emitting. + if ( + err instanceof Prisma.PrismaClientKnownRequestError && + err.code === "P2002" + ) { + this.logger.info( + "createCancelledRun: row already exists, returning existing (idempotent)", + { friendlyId: snapshot.friendlyId }, + ); + const existing = await prisma.taskRun.findFirst({ where: { id } }); + if (existing) return existing; + } + throw err; + } + }); + } + /** "Triggers" one run. */ async trigger( { @@ -983,6 +1139,44 @@ export class RunEngine { }); } + // Emit `runFailed` so the alert pipeline picks up the + // SYSTEM_FAILURE row and the event-store handler writes the + // completion event into the trace. Without this the mollifier + // drainer's terminal failures (and batch-trigger's + // exceed-limit failures) land in PG silently — visible in the + // dashboard list but never reaching customers' configured + // ERROR alert channels. + this.eventBus.emit("runFailed", { + time: taskRun.completedAt ?? new Date(), + run: { + id: taskRun.id, + status: taskRun.status, + spanId: taskRun.spanId, + error, + taskEventStore: taskRun.taskEventStore, + createdAt: taskRun.createdAt, + completedAt: taskRun.completedAt, + updatedAt: taskRun.updatedAt, + // This row never attempted execution — it's a synthesised + // terminal failure. The alert payload's `attemptNumber=0` + // is the signal downstream consumers can use to + // distinguish a never-ran failure from a run that + // exhausted its retries. + attemptNumber: 0, + usageDurationMs: 0, + costInCents: 0, + }, + organization: { + id: environment.organization.id, + }, + project: { + id: environment.project.id, + }, + environment: { + id: environment.id, + }, + }); + return taskRun; }, { diff --git a/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts b/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts new file mode 100644 index 00000000000..0a541b5349e --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts @@ -0,0 +1,233 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; + +function freshRunId() { + return RunId.generate().friendlyId; +} +import { expect } from "vitest"; +import { RunEngine } from "../index.js"; +import type { EventBusEventArgs } from "../eventBus.js"; +import { setupAuthenticatedEnvironment } from "./setup.js"; + +vi.setConfig({ testTimeout: 60_000 }); + +function baseEngineOptions(redisOptions: Parameters[0]["queue"]["redis"]) { + return { + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x" as const, + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }; +} + +// Phase C1 / Q4 design — engine.createCancelledRun writes a CANCELED +// TaskRun row directly from a buffer snapshot. Verifies the bypass- +// queue / bypass-waitpoint / emit-runCancelled contract. +describe("RunEngine.createCancelledRun", () => { + containerTest( + "writes CANCELED PG row with snapshot fields, completedAt, error", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + try { + const friendlyId = freshRunId(); + const cancelledAt = new Date("2026-05-20T12:00:00.000Z"); + const cancelReason = "Canceled by user"; + + const result = await engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: '{"hello":"world"}', + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000aaaa000000000000", + spanId: "bbbb000000000000", + queue: "task/test-task", + isTest: false, + tags: ["test-tag"], + }, + cancelledAt, + cancelReason, + }); + + expect(result.status).toBe("CANCELED"); + expect(result.friendlyId).toBe(friendlyId); + expect(result.id).toBe(RunId.fromFriendlyId(friendlyId)); + expect(result.completedAt?.toISOString()).toBe(cancelledAt.toISOString()); + expect(result.taskIdentifier).toBe("test-task"); + expect(result.runTags).toEqual(["test-tag"]); + expect(result.payload).toBe('{"hello":"world"}'); + const err = result.error as { type?: string; raw?: string }; + expect(err.type).toBe("STRING_ERROR"); + expect(err.raw).toBe(cancelReason); + + // Verify the PG row is canonical (findFirst returns the row). + const stored = await prisma.taskRun.findFirst({ + where: { friendlyId }, + }); + expect(stored).not.toBeNull(); + expect(stored!.status).toBe("CANCELED"); + } finally { + await engine.quit(); + } + }, + ); + + containerTest( + "emits runCancelled with correct payload", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + const captured: EventBusEventArgs<"runCancelled">[0][] = []; + engine.eventBus.on("runCancelled", (event) => { + captured.push(event); + }); + + try { + const cancelledAt = new Date(); + const cancelReason = "Test cancel"; + const friendlyId = freshRunId(); + await engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000cccc000000000000", + spanId: "dddd000000000000", + queue: "task/test-task", + isTest: false, + tags: [], + }, + cancelledAt, + cancelReason, + }); + + expect(captured).toHaveLength(1); + expect(captured[0]!.run.status).toBe("CANCELED"); + expect(captured[0]!.run.friendlyId).toBe(friendlyId); + expect(captured[0]!.run.error).toEqual({ type: "STRING_ERROR", raw: cancelReason }); + expect(captured[0]!.organization.id).toBe(env.organization.id); + } finally { + await engine.quit(); + } + }, + ); + + containerTest( + "idempotent on double-pop: second call returns existing row without re-emitting", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + const captured: EventBusEventArgs<"runCancelled">[0][] = []; + engine.eventBus.on("runCancelled", (event) => { + captured.push(event); + }); + + try { + const snapshot = { + friendlyId: freshRunId(), + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000eeee000000000000", + spanId: "ffff000000000000", + queue: "task/test-task", + isTest: false, + tags: [], + }; + const cancelledAt = new Date(); + const cancelReason = "Test idempotent"; + + const first = await engine.createCancelledRun({ snapshot, cancelledAt, cancelReason }); + const second = await engine.createCancelledRun({ snapshot, cancelledAt, cancelReason }); + + expect(second.id).toBe(first.id); + // Only the first call's emit fired; the P2002 path skips re-emission. + expect(captured).toHaveLength(1); + } finally { + await engine.quit(); + } + }, + ); + + // Regression: cjson encodes empty Lua tables as `{}`, not `[]`. When + // the drainer pops a buffered run that never had a tag set, the + // deserialised snapshot's `tags` field is an empty object. The old + // implementation passed it straight into Prisma's `runTags:` field; + // Prisma misread the object as a relation update op and threw + // `Argument 'set' is missing`. The drainer caught the error and + // marked the buffer entry FAILED — so the CANCELED PG row never + // landed. Found while running the Phase F challenge suite. + containerTest( + "tolerates snapshot.tags being an empty object (cjson edge case)", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + try { + const friendlyId = freshRunId(); + // Cast through unknown to simulate the cjson-decode output shape + // for an empty Lua table — TypeScript's snapshot type says + // string[], but the buffer Lua delivers {} for the empty case. + const result = await engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000abcd000000000000", + spanId: "1234000000000000", + queue: "task/test-task", + isTest: false, + tags: {} as unknown as string[], + }, + cancelledAt: new Date(), + cancelReason: "Cancelled — empty tags", + }); + expect(result.status).toBe("CANCELED"); + expect(result.friendlyId).toBe(friendlyId); + // Prisma normalises the absent-tags case to either [] or null + // depending on the column default; assert it's an empty array. + expect(result.runTags).toEqual([]); + } finally { + await engine.quit(); + } + }, + ); +}); diff --git a/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts b/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts new file mode 100644 index 00000000000..0619eeffc2f --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts @@ -0,0 +1,111 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic"; +import { expect } from "vitest"; +import { RunEngine } from "../index.js"; +import { EventBusEventArgs } from "../eventBus.js"; +import { setupAuthenticatedEnvironment } from "./setup.js"; + +vi.setConfig({ testTimeout: 60_000 }); + +describe("RunEngine.createFailedTaskRun", () => { + containerTest("emits runFailed so the alert pipeline wakes up", async ({ prisma, redisOptions }) => { + // The mollifier drainer (and batch-trigger over-limit path) call + // createFailedTaskRun to write a terminal SYSTEM_FAILURE PG row + // for runs that never actually executed. Without an explicit + // runFailed emit, the row lands silently — the + // runEngineHandlers' `runFailed` listener (which enqueues + // PerformTaskRunAlertsService) never fires, so customers' + // configured TASK_RUN alert channels miss the failure entirely. + // + // Regression intent: if the emit is removed or moved out of + // createFailedTaskRun's success path, this test fails. The + // shape assertions pin the fields the alert delivery service + // reads from the event payload (run.id, run.status, error, + // attemptNumber=0 as the never-ran-marker). + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + masterQueueConsumersDisabled: true, + processWorkerQueueDebounceMs: 50, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const failedEvents: EventBusEventArgs<"runFailed">[0][] = []; + engine.eventBus.on("runFailed", (event) => { + failedEvents.push(event); + }); + + const friendlyId = generateFriendlyId("run"); + const taskIdentifier = "drainer-terminal-test"; + + const failed = await engine.createFailedTaskRun({ + friendlyId, + environment: { + id: authenticatedEnvironment.id, + type: authenticatedEnvironment.type, + project: { id: authenticatedEnvironment.project.id }, + organization: { id: authenticatedEnvironment.organization.id }, + }, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + error: { + type: "STRING_ERROR", + raw: "Mollifier drainer terminal failure: synthetic engine.trigger panic", + }, + traceId: "0123456789abcdef0123456789abcdef", + spanId: "fedcba9876543210", + }); + + expect(failed.status).toBe("SYSTEM_FAILURE"); + + expect(failedEvents).toHaveLength(1); + const event = failedEvents[0]; + expect(event.run.id).toBe(failed.id); + expect(event.run.status).toBe("SYSTEM_FAILURE"); + expect(event.run.spanId).toBe("fedcba9876543210"); + // attemptNumber=0 is the marker that the run never executed — + // it's a synthesised terminal failure, not an exhausted-retries + // failure. Downstream consumers can use this to distinguish. + expect(event.run.attemptNumber).toBe(0); + expect(event.run.usageDurationMs).toBe(0); + expect(event.run.costInCents).toBe(0); + expect(event.run.error).toEqual({ + type: "STRING_ERROR", + raw: "Mollifier drainer terminal failure: synthetic engine.trigger panic", + }); + expect(event.organization.id).toBe(authenticatedEnvironment.organization.id); + expect(event.project.id).toBe(authenticatedEnvironment.project.id); + expect(event.environment.id).toBe(authenticatedEnvironment.id); + } finally { + await engine.quit(); + } + }); +}); From 5b4cffaf8239164f628dbdfd15bfb737eae7c31a Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 15:05:53 +0100 Subject: [PATCH 2/8] fix(webapp,run-engine): replay-layer code-review follow-ups - `isRetryablePgError`: also accept `errorCode === "P1001"` so `PrismaClientInitializationError` (which surfaces P1001 on a different field than `PrismaClientKnownRequestError`) retries. - Drop `envId` from OTel metric labels on `mollifier.realtime_subscriptions.buffered`, `mollifier.stale_entries`, and the `mollifier.stale_entries.current` gauge. `envId` is a banned high-cardinality attribute; the structured warn log alongside each counter tick still carries envId for forensic drill-down. - Stale-sweep test name + comments now match the assertion shape (all three entries stale, not "two stale + one fresh"). - `RunEngine.createCancelledRun` P2002 path now requires the existing row's status to be CANCELED; a non-canceled conflict throws rather than silently reporting success, so the caller can route to `engine.cancelRun()` or skip. - Regression test pins the new conflict guard. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../mollifierDrainerHandler.server.ts | 7 ++ .../mollifier/mollifierStaleSweep.server.ts | 7 +- .../v3/mollifier/mollifierTelemetry.server.ts | 54 +++++++------ apps/webapp/test/mollifierStaleSweep.test.ts | 80 ++++++++++--------- .../run-engine/src/engine/index.ts | 16 +++- .../engine/tests/createCancelledRun.test.ts | 61 ++++++++++++++ 6 files changed, 160 insertions(+), 65 deletions(-) diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts index 7f2608d5b21..741f1afed07 100644 --- a/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts @@ -10,8 +10,15 @@ const tracer = trace.getTracer("mollifier-drainer"); export function isRetryablePgError(err: unknown): boolean { if (!(err instanceof Error)) return false; const msg = err.message ?? ""; + // Prisma surfaces P1001 ("Can't reach database server") via two + // different error classes — `PrismaClientKnownRequestError` exposes + // it as `err.code`, `PrismaClientInitializationError` exposes it as + // `err.errorCode`. Check both so reconnection-time errors retry + // regardless of which class fires. const code = (err as { code?: string }).code; + const errorCode = (err as { errorCode?: string }).errorCode; if (code === "P2024") return true; + if (code === "P1001" || errorCode === "P1001") return true; if (msg.includes("Can't reach database server")) return true; if (msg.includes("Connection lost")) return true; if (msg.includes("ECONNRESET")) return true; diff --git a/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts b/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts index 5c31618efec..c2f94495cc9 100644 --- a/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierStaleSweep.server.ts @@ -22,7 +22,10 @@ export type StaleSweepConfig = { export type StaleSweepDeps = { getBuffer?: () => MollifierBuffer | null; - recordStaleEntry?: (envId: string) => void; + // No `envId` arg — `envId` is a high-cardinality metric attribute and + // is intentionally not emitted as a metric label. The structured warn + // log below carries envId for forensic drill-down. + recordStaleEntry?: () => void; reportStaleEntrySnapshot?: (snapshot: Map) => void; logger?: { warn: (message: string, fields: Record) => void }; now?: () => number; @@ -82,7 +85,7 @@ export async function runStaleSweepOnce( entriesScanned += 1; const dwellMs = now - entry.createdAt.getTime(); if (dwellMs > config.staleThresholdMs) { - recordStale(envId); + recordStale(); log.warn("mollifier.stale_entry", { runId: entry.runId, envId, diff --git a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts index ba58ce47f63..f9c7ca72f1f 100644 --- a/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts @@ -29,16 +29,21 @@ export const realtimeBufferedSubscriptionsCounter = meter.createCounter( }, ); -export function recordRealtimeBufferedSubscription(envId: string): void { - realtimeBufferedSubscriptionsCounter.add(1, { envId }); +// No `envId` attribute — `envId` is a banned high-cardinality metric +// label per the repo's OTel rules. The structured warn log emitted +// alongside the counter tick (in `mollifierStaleSweep.server.ts`) +// carries the envId / orgId / runId for forensic drill-down; the +// metric stays an aggregate. +export function recordRealtimeBufferedSubscription(): void { + realtimeBufferedSubscriptionsCounter.add(1); } // Counts buffer entries that have been waiting in the queue ZSET longer -// than the configured stale threshold (typically half of entryTtlSeconds). -// Useful for historical "stale events over time" views, but not directly -// alertable on its own — a single stuck entry observed by N sweep ticks -// adds N to the counter, so `rate()` over an alerting window reflects -// (entries × ticks), not "entries that are stale right now". +// than the configured stale threshold. Useful for historical "stale +// events over time" views, but not directly alertable on its own — a +// single stuck entry observed by N sweep ticks adds N to the counter, +// so `rate()` over an alerting window reflects (entries × ticks), not +// "entries that are stale right now". export const staleEntriesCounter = meter.createCounter( "mollifier.stale_entries", { @@ -47,16 +52,16 @@ export const staleEntriesCounter = meter.createCounter( }, ); -export function recordStaleEntry(envId: string): void { - staleEntriesCounter.add(1, { envId }); +// No `envId` attribute — see comment above. +export function recordStaleEntry(): void { + staleEntriesCounter.add(1); } -// Alertable signal: the count of stale entries observed by the latest -// sweep, per env. The sweep snapshots the full per-env picture on each -// pass (including zeros for envs that no longer have any stale entries) -// so an env that was paging can clear when the drainer catches up -// instead of staying latched. Recommended alert: -// mollifier_stale_entries_current{envId=...} > 0 for 5m +// Alertable signal: the total count of stale entries observed by the +// latest sweep. The sweep snapshots the full picture on each pass so +// the gauge drops back to 0 when the drainer catches up instead of +// staying latched. Recommended alert: +// mollifier_stale_entries_current > 0 for 5m export const staleEntriesGauge = meter.createObservableGauge( "mollifier.stale_entries.current", { @@ -65,23 +70,22 @@ export const staleEntriesGauge = meter.createObservableGauge( }, ); -const latestStaleSnapshot = new Map(); +let latestStaleTotal = 0; export function reportStaleEntrySnapshot(snapshot: Map): void { - // Replace, don't merge — envs absent from the new snapshot have either - // drained or no longer exist; leaving their last value cached would - // keep alerts latched forever. - latestStaleSnapshot.clear(); - for (const [envId, count] of snapshot) { - latestStaleSnapshot.set(envId, count); + // Sum across envs. Per-env breakdown is intentionally NOT emitted as + // a metric label (high-cardinality); the structured warn log lines + // from the sweep carry per-env detail for ops to drill down. + let total = 0; + for (const count of snapshot.values()) { + total += count; } + latestStaleTotal = total; } meter.addBatchObservableCallback( (result) => { - for (const [envId, count] of latestStaleSnapshot) { - result.observe(staleEntriesGauge, count, { envId }); - } + result.observe(staleEntriesGauge, latestStaleTotal); }, [staleEntriesGauge], ); diff --git a/apps/webapp/test/mollifierStaleSweep.test.ts b/apps/webapp/test/mollifierStaleSweep.test.ts index 029b90cb761..6b57931407f 100644 --- a/apps/webapp/test/mollifierStaleSweep.test.ts +++ b/apps/webapp/test/mollifierStaleSweep.test.ts @@ -14,16 +14,21 @@ const SNAPSHOT = { }; function spyDeps() { - const recordedStaleEnvIds: string[] = []; + // Counter ticks — metric carries no `envId` label (high-cardinality) + // so the spy is a simple call count. Per-env detail lives on the + // structured warn log and the snapshot map. + let staleEntryCount = 0; const snapshots: Array> = []; const warnings: Array<{ message: string; fields: Record }> = []; return { - recordedStaleEnvIds, + get staleEntryCount() { + return staleEntryCount; + }, snapshots, warnings, deps: { - recordStaleEntry: (envId: string) => { - recordedStaleEnvIds.push(envId); + recordStaleEntry: () => { + staleEntryCount += 1; }, reportStaleEntrySnapshot: (snapshot: Map) => { // Clone so post-sweep assertions see what was reported *at that @@ -45,10 +50,10 @@ describe("runStaleSweepOnce — unit", () => { // Mirrors the prod gate: if TRIGGER_MOLLIFIER_ENABLED=0 the buffer // singleton is null and the sweep is a no-op. We don't want it to // emit a metric (or throw) just because mollifier is disabled. - const { deps, recordedStaleEnvIds, warnings, snapshots } = spyDeps(); + const spies = spyDeps(); const result = await runStaleSweepOnce( { staleThresholdMs: 1000 }, - { ...deps, getBuffer: () => null }, + { ...spies.deps, getBuffer: () => null }, ); expect(result).toEqual({ orgsScanned: 0, @@ -56,8 +61,9 @@ describe("runStaleSweepOnce — unit", () => { entriesScanned: 0, staleCount: 0, }); - expect(recordedStaleEnvIds).toEqual([]); - expect(warnings).toEqual([]); + expect(spies.staleEntryCount).toBe(0); + expect(spies.warnings).toEqual([]); + const snapshots = spies.snapshots; // An empty snapshot is still reported so any previously-paging env // (from a prior sweep before mollifier was disabled) clears. expect(snapshots).toHaveLength(1); @@ -67,14 +73,15 @@ describe("runStaleSweepOnce — unit", () => { describe("runStaleSweepOnce — testcontainers", () => { redisTest( - "flags entries whose dwell exceeds the stale threshold and skips fresh ones", + "flags every entry whose dwell exceeds the stale threshold", async ({ redisOptions }) => { const buffer = new MollifierBuffer({ redisOptions }); try { - // Two stale entries (one in each env) + one fresh entry. Sweep - // should flag the two stale, leave the fresh one alone, record - // the counter once per stale entry, and emit a warning per - // stale entry with the dwell + threshold. + // Three entries across two envs in the same org. The sweep below + // runs against a `now` advanced by 5 minutes, so all three have + // dwell ~5min and ALL THREE are stale against a 1-minute + // threshold — there is no "fresh" entry in this scenario. The + // assertions below pin the all-three-stale shape. await buffer.accept({ runId: "run_stale_a", envId: "env_a", @@ -88,7 +95,7 @@ describe("runStaleSweepOnce — testcontainers", () => { payload: JSON.stringify(SNAPSHOT), }); await buffer.accept({ - runId: "run_fresh", + runId: "run_stale_c", envId: "env_a", orgId: "org_1", payload: JSON.stringify(SNAPSHOT), @@ -98,11 +105,11 @@ describe("runStaleSweepOnce — testcontainers", () => { // the threshold without actually waiting in real time. const futureNow = Date.now() + 5 * 60 * 1000; - const { deps, recordedStaleEnvIds, warnings, snapshots } = spyDeps(); + const spies = spyDeps(); const result = await runStaleSweepOnce( { staleThresholdMs: 60 * 1000 }, { - ...deps, + ...spies.deps, getBuffer: () => buffer, now: () => futureNow, }, @@ -111,22 +118,21 @@ describe("runStaleSweepOnce — testcontainers", () => { expect(result.envsScanned).toBe(2); expect(result.entriesScanned).toBe(3); expect(result.staleCount).toBe(3); - // All three entries have dwell ~5min, all exceed the 1-min - // threshold; each emits one counter tick + one warning. - expect(recordedStaleEnvIds.sort()).toEqual( - ["env_a", "env_a", "env_b"].sort(), - ); - expect(warnings).toHaveLength(3); - for (const w of warnings) { + // All three entries exceed the threshold; each emits one + // counter tick + one warning. + expect(spies.staleEntryCount).toBe(3); + expect(spies.warnings).toHaveLength(3); + for (const w of spies.warnings) { expect(w.message).toBe("mollifier.stale_entry"); expect(w.fields.staleThresholdMs).toBe(60 * 1000); expect(w.fields.dwellMs).toBeGreaterThan(60 * 1000); } // Snapshot drives the alertable gauge — env_a has 2 stale - // entries, env_b has 1. Both must appear so a future alert can - // identify which env is paging. - expect(snapshots).toHaveLength(1); - expect(Object.fromEntries(snapshots[0])).toEqual({ + // entries, env_b has 1. Per-env detail is still passed to + // `reportStaleEntrySnapshot` for forensic value even though the + // gauge itself aggregates the total. + expect(spies.snapshots).toHaveLength(1); + expect(Object.fromEntries(spies.snapshots[0])).toEqual({ env_a: 2, env_b: 1, }); @@ -151,13 +157,13 @@ describe("runStaleSweepOnce — testcontainers", () => { orgId: "org_1", payload: JSON.stringify(SNAPSHOT), }); - const { deps, snapshots } = spyDeps(); + const spies = spyDeps(); await runStaleSweepOnce( { staleThresholdMs: 60 * 1000 }, - { ...deps, getBuffer: () => buffer }, + { ...spies.deps, getBuffer: () => buffer }, ); - expect(snapshots).toHaveLength(1); - expect(Object.fromEntries(snapshots[0])).toEqual({ env_a: 0 }); + expect(spies.snapshots).toHaveLength(1); + expect(Object.fromEntries(spies.snapshots[0])).toEqual({ env_a: 0 }); } finally { await buffer.close(); } @@ -179,14 +185,14 @@ describe("runStaleSweepOnce — testcontainers", () => { orgId: "org_1", payload: JSON.stringify(SNAPSHOT), }); - const { deps, recordedStaleEnvIds, warnings } = spyDeps(); + const spies = spyDeps(); const result = await runStaleSweepOnce( { staleThresholdMs: 60 * 1000 }, - { ...deps, getBuffer: () => buffer }, + { ...spies.deps, getBuffer: () => buffer }, ); expect(result.staleCount).toBe(0); - expect(recordedStaleEnvIds).toEqual([]); - expect(warnings).toEqual([]); + expect(spies.staleEntryCount).toBe(0); + expect(spies.warnings).toEqual([]); } finally { await buffer.close(); } @@ -215,10 +221,10 @@ describe("runStaleSweepOnce — testcontainers", () => { payload: JSON.stringify(SNAPSHOT), }); const futureNow = Date.now() + 5 * 60 * 1000; - const { deps } = spyDeps(); + const spies = spyDeps(); const result = await runStaleSweepOnce( { staleThresholdMs: 60 * 1000 }, - { ...deps, getBuffer: () => buffer, now: () => futureNow }, + { ...spies.deps, getBuffer: () => buffer, now: () => futureNow }, ); expect(result.orgsScanned).toBe(2); expect(result.envsScanned).toBe(2); diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index e461fddf6c5..244d2b014ed 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -599,7 +599,21 @@ export class RunEngine { { friendlyId: snapshot.friendlyId }, ); const existing = await prisma.taskRun.findFirst({ where: { id } }); - if (existing) return existing; + if (existing) { + // Only treat the conflict as idempotent when the existing + // row is ALREADY canceled. If a non-canceled row landed + // first (e.g. the drainer's normal `engine.trigger` replay + // path raced ahead of the cancel) we surface a conflict + // rather than silently reporting "cancelled" — the run is + // genuinely live and the caller must decide between + // engine.cancelRun() and skipping. + if (existing.status === "CANCELED") { + return existing; + } + throw new Error( + `createCancelledRun conflict: existing run ${snapshot.friendlyId} has status ${existing.status}`, + ); + } } throw err; } diff --git a/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts b/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts index 0a541b5349e..71223ce0773 100644 --- a/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts +++ b/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts @@ -230,4 +230,65 @@ describe("RunEngine.createCancelledRun", () => { } }, ); + + // Regression: the P2002-on-id idempotency path used to return ANY + // existing row, which would silently report success even if a live + // (non-CANCELED) row landed first. The guard now requires the + // existing row's status to be CANCELED; anything else surfaces a + // conflict so the caller can route to engine.cancelRun() or skip. + containerTest( + "P2002 conflict with non-CANCELED existing row throws (does not silently succeed)", + async ({ prisma, redisOptions }) => { + const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) }); + try { + const friendlyId = freshRunId(); + const id = RunId.fromFriendlyId(friendlyId); + + // Plant a live (non-CANCELED) row with the same id so the + // cancelled-run INSERT hits P2002 and the guard finds a row + // that ISN'T CANCELED. + await prisma.taskRun.create({ + data: { + id, + friendlyId, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + status: "PENDING", + runtimeEnvironmentId: env.id, + projectId: env.project.id, + organizationId: env.organizationId, + queue: "task/test-task", + traceId: "0000000000000000aaaa000000000000", + spanId: "bbbb000000000000", + engine: "V2", + }, + }); + + await expect( + engine.createCancelledRun({ + snapshot: { + friendlyId, + environment: env, + taskIdentifier: "test-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "0000000000000000aaaa000000000000", + spanId: "bbbb000000000000", + queue: "task/test-task", + isTest: false, + tags: [], + }, + cancelledAt: new Date(), + cancelReason: "Should not silently overwrite a live row", + }), + ).rejects.toThrow(/createCancelledRun conflict.*PENDING/); + } finally { + await engine.quit(); + } + }, + ); }); From 56350b9034a1894cdf6eeb83bc980fe23d4b147b Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 15:44:06 +0100 Subject: [PATCH 3/8] test(webapp): isolate mollifierDrainerWorker test from runEngine singleton MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Importing the production drainer wiring transitively loads \`~/v3/runEngine.server\`, whose top-level \`singleton(...)\` eagerly constructs a RunEngine. The constructor spins up Prisma + Redis workers that try to connect to localhost — in CI (no PG, no Redis) that produces an unhandled \`PrismaClientInitializationError\` which fails the run even though every assertion passes. Mock the runEngine and prisma modules so the unit test exercises only the bootstrap's error classification, not a live engine. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/webapp/test/mollifierDrainerWorker.test.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/apps/webapp/test/mollifierDrainerWorker.test.ts b/apps/webapp/test/mollifierDrainerWorker.test.ts index e5f38229d8f..0d4e931fd83 100644 --- a/apps/webapp/test/mollifierDrainerWorker.test.ts +++ b/apps/webapp/test/mollifierDrainerWorker.test.ts @@ -1,4 +1,17 @@ -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; + +// Importing `~/v3/mollifier/mollifierDrainer.server` (below) transitively +// loads `~/v3/runEngine.server`, whose top-level `singleton(...)` call +// eagerly constructs a RunEngine. That spins up Prisma + Redis workers +// that try to connect to localhost — which in CI (no PG, no Redis) +// produces an unhandled `PrismaClientInitializationError` that fails +// the test run even though the assertions all pass. Mocking the +// runEngine module short-circuits the singleton so no worker starts. +vi.mock("~/v3/runEngine.server", () => ({ engine: {} })); +// Same problem: prisma.server.ts's top-level singleton tries to open a +// PG client. The test never makes a query; an empty stub is enough. +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + import { MollifierConfigurationError } from "~/v3/mollifier/mollifierDrainer.server"; import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server"; From 631879a3a8abee1d920ca67214f08d0c6c63c03d Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 16:12:22 +0100 Subject: [PATCH 4/8] test(webapp): bump mollifierStaleSweep redisTest timeouts to 20s Container startup + the sweep loop can exceed Vitest's 5s default on CI runners (passes in ~1.7-2s locally). Matches the explicit \`{ timeout: 20_000 }\` other mollifier redisTests carry across the project. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/webapp/test/mollifierStaleSweep.test.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/apps/webapp/test/mollifierStaleSweep.test.ts b/apps/webapp/test/mollifierStaleSweep.test.ts index 6b57931407f..cc452aff9f6 100644 --- a/apps/webapp/test/mollifierStaleSweep.test.ts +++ b/apps/webapp/test/mollifierStaleSweep.test.ts @@ -74,6 +74,7 @@ describe("runStaleSweepOnce — unit", () => { describe("runStaleSweepOnce — testcontainers", () => { redisTest( "flags every entry whose dwell exceeds the stale threshold", + { timeout: 20_000 }, async ({ redisOptions }) => { const buffer = new MollifierBuffer({ redisOptions }); try { @@ -144,6 +145,7 @@ describe("runStaleSweepOnce — testcontainers", () => { redisTest( "snapshot reports zero for envs that have entries but none stale (clears latched alerts)", + { timeout: 20_000 }, async ({ redisOptions }) => { // Critical for alert behaviour: a previous sweep reported env_a // stale, alert fired, drainer caught up. The next sweep must @@ -172,6 +174,7 @@ describe("runStaleSweepOnce — testcontainers", () => { redisTest( "leaves fresh entries alone (dwell below threshold)", + { timeout: 20_000 }, async ({ redisOptions }) => { // Regression guard for the inequality direction. A bug that flipped // `dwellMs > threshold` to `dwellMs >= threshold` would flag every @@ -201,6 +204,7 @@ describe("runStaleSweepOnce — testcontainers", () => { redisTest( "scans across multiple orgs", + { timeout: 20_000 }, async ({ redisOptions }) => { // Phase-3 design has org-level fairness in the drainer; the sweep // must walk every org/env, not just the first one it finds. If a From 8f10bb05bca9f7df90236dea338e0774a50d7342 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Tue, 26 May 2026 17:32:17 +0100 Subject: [PATCH 5/8] fix(webapp,run-engine): replay-layer Devin follow-ups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs flagged by Devin on PR #3754: 1. entry.server.tsx reverted to \`void sessionsReplicationInstance;\`, which esbuild tree-shakes under \`"sideEffects": false\`. Restored the globalThis assignment + warning comment from #3738 (incident TRI-9864). Without this the sessions→ClickHouse logical replication slot stops being consumed at boot. 2. createFailedTaskRun unconditionally emitted \`runFailed\`, which the \`completeFailedRunEvent\` listener uses to write a span completion into ClickHouse. But TriggerFailedTaskService.call() already wraps createFailedTaskRun inside \`repository.traceEvent({ incomplete: false, isError: true })\` which writes its own completion row for the same (traceId, spanId). Two completions racing on the same span row is a real observability bug. Added an \`emitRunFailedEvent: boolean = true\` opt-out. The TriggerFailedTaskService.call() path now passes \`false\` and enqueues \`PerformTaskRunAlertsService\` directly after the trace event closes so the alerts side of \`runFailed\` is preserved. \`callWithoutTraceEvents\` and the mollifier drainer's terminal- failure path keep the default emit (they have no outer trace event managing the span). Regression test pins the opt-out: \`emitRunFailedEvent: false\` writes the PG row but does NOT fire the bus event. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/webapp/app/entry.server.tsx | 11 +++- .../services/triggerFailedTask.server.ts | 27 ++++++++ .../run-engine/src/engine/index.ts | 27 ++++++++ .../engine/tests/createFailedTaskRun.test.ts | 65 +++++++++++++++++++ 4 files changed, 129 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/entry.server.tsx b/apps/webapp/app/entry.server.tsx index ca53d03eb67..9996eb7b30a 100644 --- a/apps/webapp/app/entry.server.tsx +++ b/apps/webapp/app/entry.server.tsx @@ -31,8 +31,17 @@ import { // on webapp startup. The singleton's initializer wires start (gated on // `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors // runsReplicationInstance. +// +// IMPORTANT: do NOT replace this with `void sessionsReplicationInstance;`. +// `apps/webapp/package.json` declares `"sideEffects": false`, so esbuild +// treats `void ;` as a pure expression statement and tree-shakes +// the entire import — the singleton's initializer never fires and the +// sessions→ClickHouse logical replication slot stops being consumed. Assigning +// to globalThis is an unambiguous side effect the bundler must preserve. See +// TRI-9864 for the incident write-up. import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server"; -void sessionsReplicationInstance; +(globalThis as Record).__sessionsReplicationInstance = + sessionsReplicationInstance; const ABORT_DELAY = 30000; diff --git a/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts b/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts index 5f985b684c1..a26f6c3e74d 100644 --- a/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerFailedTask.server.ts @@ -6,6 +6,7 @@ import type { PrismaClientOrTransaction } from "@trigger.dev/database"; import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { getEventRepository } from "~/v3/eventRepository/index.server"; +import { PerformTaskRunAlertsService } from "~/v3/services/alerts/performTaskRunAlerts.server"; import { DefaultQueueManager } from "../concerns/queues.server"; import type { TriggerTaskRequest } from "../types"; @@ -176,6 +177,14 @@ export class TriggerFailedTaskService { event.setAttribute("runId", failedRunFriendlyId); event.failWithError(taskRunError); + // `emitRunFailedEvent: false` because this call site owns the + // trace-event lifecycle via the outer `traceEvent({ + // incomplete: false, isError: true })`. Letting the engine + // emit `runFailed` here would race the + // `completeFailedRunEvent` listener against the outer trace + // event's own completion write for the same (traceId, spanId). + // We re-trigger the alerts side directly after the trace + // event closes, below. return await this.engine.createFailedTaskRun({ friendlyId: failedRunFriendlyId, environment: { @@ -200,12 +209,30 @@ export class TriggerFailedTaskService { spanId: event.spanId, traceContext: traceContext as Record, taskEventStore: store, + emitRunFailedEvent: false, ...(queueName !== undefined && { queue: queueName }), ...(lockedQueueId !== undefined && { lockedQueueId }), }); } ); + // Alerts side of `runFailed` — the engine emit was suppressed + // above so the trace-event completion isn't double-written; we + // still need the alert pipeline to fire so customers' ERROR + // channels see the failure. Best-effort: a failed enqueue logs + // but doesn't block returning the friendlyId, mirroring the + // engine handler's behaviour at runEngineHandlers.server.ts:81. + try { + await PerformTaskRunAlertsService.enqueue(failedRun.id); + } catch (alertsError) { + logger.warn("TriggerFailedTaskService: alert enqueue failed", { + taskId: request.taskId, + friendlyId: failedRun.friendlyId, + error: + alertsError instanceof Error ? alertsError.message : String(alertsError), + }); + } + return failedRun.friendlyId; } catch (createError) { const createErrorMsg = diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 244d2b014ed..ee8cf06cf3c 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -1051,6 +1051,7 @@ export class RunEngine { taskEventStore, queue: queueOverride, lockedQueueId: lockedQueueIdOverride, + emitRunFailedEvent = true, }: { friendlyId: string; environment: { @@ -1078,6 +1079,19 @@ export class RunEngine { queue?: string; /** Resolved TaskQueue.id when the task is locked to a specific queue. */ lockedQueueId?: string; + /** + * Whether to emit the `runFailed` engine-bus event. Defaults to true. + * + * Set to `false` when the caller is ALREADY managing the trace-event + * lifecycle for this run via `repository.traceEvent({ incomplete: false, + * isError: true, ... })`. In that path the outer trace event handles + * span completion itself; emitting `runFailed` from here causes the + * `runFailed` → `completeFailedRunEvent` handler to write a second + * completion row for the same (traceId, spanId), racing with the + * outer trace event's own write. The alert side of `runFailed` is + * preserved by emitting from the caller after `traceEvent` returns. + */ + emitRunFailedEvent?: boolean; }): Promise { return startSpan( this.tracer, @@ -1160,6 +1174,19 @@ export class RunEngine { // exceed-limit failures) land in PG silently — visible in the // dashboard list but never reaching customers' configured // ERROR alert channels. + // + // Gated by `emitRunFailedEvent` so call sites that already wrap + // this inside `repository.traceEvent({ incomplete: false, + // isError: true })` can opt out — the outer trace event writes + // the completion row itself, and a second write via + // `completeFailedRunEvent` would race against it. Callers that + // disable the emit are responsible for triggering the alerts + // side themselves (e.g. by calling + // `PerformTaskRunAlertsService.enqueue` directly after the + // trace event closes). + if (!emitRunFailedEvent) { + return taskRun; + } this.eventBus.emit("runFailed", { time: taskRun.completedAt ?? new Date(), run: { diff --git a/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts b/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts index 0619eeffc2f..84d33baa87d 100644 --- a/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts +++ b/internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts @@ -108,4 +108,69 @@ describe("RunEngine.createFailedTaskRun", () => { await engine.quit(); } }); + + // The TriggerFailedTaskService.call() path wraps createFailedTaskRun + // inside `repository.traceEvent({ incomplete: false, isError: true })` + // which already writes the completion row for the (traceId, spanId). + // Emitting `runFailed` from here would cause the + // `completeFailedRunEvent` handler to race a second write against + // the same span — the `emitRunFailedEvent: false` opt-out is what + // suppresses the emit. The PG row + alert side stay correct because + // the caller enqueues `PerformTaskRunAlertsService.enqueue(run.id)` + // directly after the trace event closes. + containerTest( + "emitRunFailedEvent: false suppresses the bus emit but still creates the PG row", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 }, + queue: { redis: redisOptions, masterQueueConsumersDisabled: true, processWorkerQueueDebounceMs: 50 }, + runLock: { redis: redisOptions }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { name: "small-1x" as const, cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const failedEvents: EventBusEventArgs<"runFailed">[0][] = []; + engine.eventBus.on("runFailed", (event) => { + failedEvents.push(event); + }); + + const friendlyId = generateFriendlyId("run"); + const failed = await engine.createFailedTaskRun({ + friendlyId, + environment: { + id: authenticatedEnvironment.id, + type: authenticatedEnvironment.type, + project: { id: authenticatedEnvironment.project.id }, + organization: { id: authenticatedEnvironment.organization.id }, + }, + taskIdentifier: "outer-trace-event-test", + payload: "{}", + payloadType: "application/json", + error: { type: "STRING_ERROR", raw: "outer trace event manages span" }, + traceId: "0123456789abcdef0123456789abcdef", + spanId: "fedcba9876543210", + emitRunFailedEvent: false, + }); + + // PG row landed (caller still gets a usable TaskRun). + expect(failed.status).toBe("SYSTEM_FAILURE"); + expect(failed.friendlyId).toBe(friendlyId); + + // Bus emit was suppressed. + expect(failedEvents).toHaveLength(0); + } finally { + await engine.quit(); + } + }, + ); }); From 81b7c3f71fb901553a7900777b285e4355101dd6 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 27 May 2026 12:59:04 +0100 Subject: [PATCH 6/8] docs(server-changes): scope mollifier drainer-replay note to this PR Rename the catch-all mollifier.md and trim it to the drainer replay handler, stale sweep, telemetry gauge, and run-engine cancelled/failed APIs; later read/mutation/dashboard work is documented in its own PR. Co-Authored-By: Claude Opus 4.7 (1M context) --- .server-changes/mollifier-drainer-replay.md | 6 ++++++ .server-changes/mollifier.md | 6 ------ 2 files changed, 6 insertions(+), 6 deletions(-) create mode 100644 .server-changes/mollifier-drainer-replay.md delete mode 100644 .server-changes/mollifier.md diff --git a/.server-changes/mollifier-drainer-replay.md b/.server-changes/mollifier-drainer-replay.md new file mode 100644 index 00000000000..a8ff2d4bb00 --- /dev/null +++ b/.server-changes/mollifier-drainer-replay.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Mollifier drainer replay: replay buffered entries into `engine.trigger`, stale-entry sweep, a drainer-health gauge, and run-engine cancelled/failed run APIs. diff --git a/.server-changes/mollifier.md b/.server-changes/mollifier.md deleted file mode 100644 index 399ad5c6507..00000000000 --- a/.server-changes/mollifier.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -area: webapp -type: feature ---- - -Mollifier — Redis-backed burst buffer in front of `engine.trigger` with a fair drainer, full read/write parity for buffered runs across the API + dashboard + realtime stream, alertable `mollifier.stale_entries.current` gauge for drainer health, and `runFailed` alerts on drainer-terminal `SYSTEM_FAILURE` rows. From 7235873177b169d3abfc72b1117ce42d5faced29 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 27 May 2026 16:14:53 +0100 Subject: [PATCH 7/8] fix(webapp): honour cancel-wins conflict + requeue on transient PG outage in mollifier drainer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The mollifier drainer's cancel bifurcation called engine.createCancelledRun without handling its documented conflict contract: when the normal trigger replay path races ahead and materialises a live (non-CANCELED) row, the engine throws a conflict so the caller can "decide between engine.cancelRun() and skipping". The handler did neither — the conflict propagated, isRetryablePgError returned false, and the drainer buffer.fail()'d the entry, silently losing the cancellation while the run kept executing. Now route conflicts to engine.cancelRun() so the cancel actually wins. Separately, when engine.trigger fails non-retryably and the SYSTEM_FAILURE fallback write then fails because PG is transiently unreachable, rethrowing the original non-retryable error made the drainer buffer.fail() the entry — losing the run with no PG row ever landing, and dropping the write error entirely. Rethrow the retryable write error instead so the drainer requeues; the failure row lands once PG recovers. Non-retryable write failures still rethrow the original error as before. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../mollifierDrainerHandler.server.ts | 64 +++++++++++++--- .../test/mollifierDrainerHandler.test.ts | 76 +++++++++++++++++++ 2 files changed, 129 insertions(+), 11 deletions(-) diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts index 741f1afed07..d08f0e30e80 100644 --- a/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts @@ -1,6 +1,7 @@ import { context, trace, TraceFlags } from "@opentelemetry/api"; import type { RunEngine } from "@internal/run-engine"; import type { PrismaClientOrTransaction } from "@trigger.dev/database"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; import type { MollifierDrainerHandler } from "@trigger.dev/redis-worker"; import { startSpan } from "~/v3/tracing.server"; import type { MollifierSnapshot } from "./mollifierSnapshot.server"; @@ -72,14 +73,41 @@ export function createDrainerHandler(deps: { span.setAttribute("mollifier.run_friendly_id", input.runId); span.setAttribute("mollifier.cancel_bifurcation", true); span.setAttribute("taskRunId", input.runId); - await deps.engine.createCancelledRun( - { - snapshot: input.payload as any, - cancelledAt: new Date(cancelledAtStr), - cancelReason, - }, - deps.prisma, - ); + try { + await deps.engine.createCancelledRun( + { + snapshot: input.payload as any, + cancelledAt: new Date(cancelledAtStr), + cancelReason, + }, + deps.prisma, + ); + } catch (err) { + // createCancelledRun throws a conflict when the normal trigger + // replay path won the race and already materialised a live + // (non-CANCELED) row for this friendlyId. Its contract leaves + // the resolution to us: honour the cancel by actually + // cancelling the now-live run. Letting the conflict propagate + // would instead reach the drainer's terminal-failure path + // (isRetryablePgError() is false for it), buffer.fail() the + // entry, and silently lose the cancellation while the run + // keeps executing. + const isConflict = + err instanceof Error && err.message.startsWith("createCancelledRun conflict"); + if (!isConflict) { + throw err; + } + span.setAttribute("mollifier.cancel_conflict", true); + const friendlyId = + typeof input.payload.friendlyId === "string" + ? input.payload.friendlyId + : input.runId; + await deps.engine.cancelRun({ + runId: RunId.fromFriendlyId(friendlyId), + completedAt: new Date(cancelledAtStr), + reason: cancelReason, + }); + } }); }); return; @@ -158,9 +186,23 @@ export function createDrainerHandler(deps: { typeof snapshot.lockedQueueId === "string" ? snapshot.lockedQueueId : undefined, }); } catch (writeErr) { - // Class A — PG itself is failing. Rethrow the original - // error so the drainer falls back to buffer.fail. Include - // the write error in the log line at the drainer layer. + // The terminal SYSTEM_FAILURE write itself failed. If it + // failed because PG is transiently unreachable, rethrow the + // *write* error so the drainer requeues — buffer.fail()ing on + // the original non-retryable error would lose the run with no + // PG row ever landing. Once PG recovers the requeued entry + // writes its failure row and the customer sees it. + if (isRetryablePgError(writeErr)) { + span.setAttribute("mollifier.terminal_write_retryable", true); + throw writeErr; + } + // PG reachable but the write was rejected for another reason + // (genuinely bad snapshot). Rethrow the original trigger error + // so the drainer falls back to buffer.fail. + span.setAttribute( + "mollifier.terminal_write_error", + writeErr instanceof Error ? writeErr.message : String(writeErr) + ); throw err; } } diff --git a/apps/webapp/test/mollifierDrainerHandler.test.ts b/apps/webapp/test/mollifierDrainerHandler.test.ts index 6f66cf2ab79..e1272203560 100644 --- a/apps/webapp/test/mollifierDrainerHandler.test.ts +++ b/apps/webapp/test/mollifierDrainerHandler.test.ts @@ -1,5 +1,6 @@ import { describe, expect, it, vi } from "vitest"; import { trace } from "@opentelemetry/api"; +import { RunId } from "@trigger.dev/core/v3/isomorphic"; vi.mock("~/db.server", () => ({ prisma: {}, @@ -180,6 +181,81 @@ describe("createDrainerHandler", () => { expect(createFailedTaskRun).toHaveBeenCalledOnce(); }); + it("honours the cancel when a buffered cancel races a materialised non-CANCELED row", async () => { + // Cancel-wins-over-trigger (Q4 bifurcation). If the normal trigger + // replay path materialised a live PENDING row before the cancel + // bifurcation drained, engine.createCancelledRun throws a conflict — + // its documented contract is that "the caller must decide between + // engine.cancelRun() and skipping". The drainer handler must honour + // the cancel intent by actually cancelling the now-live run; otherwise + // the conflict propagates, isRetryablePgError() returns false, and the + // drainer buffer.fail()s the entry — silently losing the cancellation + // while the run keeps executing. + const friendlyId = RunId.generate().friendlyId; + const createCancelledRun = vi.fn(async () => { + throw new Error( + `createCancelledRun conflict: existing run ${friendlyId} has status PENDING` + ); + }); + const cancelRun = vi.fn(async () => ({ alreadyFinished: false })); + const handler = createDrainerHandler({ + engine: { createCancelledRun, cancelRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: friendlyId, + envId: "env_a", + orgId: "org_1", + payload: { + friendlyId, + taskIdentifier: "t", + environment: envFixture, + cancelledAt: new Date().toISOString(), + cancelReason: "Canceled by user", + }, + attempts: 0, + createdAt: new Date(), + } as any) + ).resolves.toBeUndefined(); + + // The live run is actually cancelled, by its internal id. + expect(cancelRun).toHaveBeenCalledOnce(); + expect(cancelRun.mock.calls[0][0].runId).toBe(RunId.fromFriendlyId(friendlyId)); + }); + + it("requeues on a transient PG outage during the SYSTEM_FAILURE fallback write", async () => { + // engine.trigger failed non-retryably, so we try to write a terminal + // SYSTEM_FAILURE row. If THAT write fails because PG is transiently + // unreachable, rethrowing the *original* non-retryable error makes the + // drainer buffer.fail() the entry — losing the run with no PG row ever + // landing. Rethrow the retryable write error instead so the drainer + // requeues; once PG recovers the failure row lands and the customer + // sees it. + const trigger = vi.fn(async () => { + throw new Error("validation failed: payload too large"); + }); + const createFailedTaskRun = vi.fn(async () => { + throw new Error("Can't reach database server"); + }); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await expect( + handler({ + runId: "run_x", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", environment: envFixture }, + attempts: 0, + createdAt: new Date(), + } as any) + ).rejects.toThrow("Can't reach database server"); + }); + it("rethrows the original error when the snapshot lacks an environment block", async () => { const triggerErr = new Error("engine rejected the snapshot"); const trigger = vi.fn(async () => { From 014313e00759fef1ba061b90f06f16ce819749d3 Mon Sep 17 00:00:00 2001 From: Daniel Sutton Date: Wed, 27 May 2026 17:41:09 +0100 Subject: [PATCH 8/8] docs(mollifier): strip internal planning labels from comments Remove plan-tracking shorthand (Q# bifurcation, Phase C1/Q4) from replay-layer mollifier comments and test names; reword to plain English. Comment/test-name only; no behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts | 2 +- apps/webapp/test/mollifierDrainerHandler.test.ts | 2 +- .../run-engine/src/engine/tests/createCancelledRun.test.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts index d08f0e30e80..8ee89b895a2 100644 --- a/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts @@ -53,7 +53,7 @@ export function createDrainerHandler(deps: { }) : context.active(); - // Cancel-wins-over-trigger (Q4 bifurcation). If a cancel API call + // Cancel-wins-over-trigger. If a cancel API call // landed on this entry while it was QUEUED, the snapshot carries // `cancelledAt` + `cancelReason`. Skip the normal materialise path // and write a CANCELED PG row directly. The existing runCancelled diff --git a/apps/webapp/test/mollifierDrainerHandler.test.ts b/apps/webapp/test/mollifierDrainerHandler.test.ts index e1272203560..1a20fdbb69f 100644 --- a/apps/webapp/test/mollifierDrainerHandler.test.ts +++ b/apps/webapp/test/mollifierDrainerHandler.test.ts @@ -182,7 +182,7 @@ describe("createDrainerHandler", () => { }); it("honours the cancel when a buffered cancel races a materialised non-CANCELED row", async () => { - // Cancel-wins-over-trigger (Q4 bifurcation). If the normal trigger + // Cancel-wins-over-trigger. If the normal trigger // replay path materialised a live PENDING row before the cancel // bifurcation drained, engine.createCancelledRun throws a conflict — // its documented contract is that "the caller must decide between diff --git a/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts b/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts index 71223ce0773..1f47d0b2a1b 100644 --- a/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts +++ b/internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts @@ -44,7 +44,7 @@ function baseEngineOptions(redisOptions: Parameters[0]["queue" }; } -// Phase C1 / Q4 design — engine.createCancelledRun writes a CANCELED +// engine.createCancelledRun writes a CANCELED // TaskRun row directly from a buffer snapshot. Verifies the bypass- // queue / bypass-waitpoint / emit-runCancelled contract. describe("RunEngine.createCancelledRun", () => {