Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ const EnvironmentSchema = z
.transform((v) => v ?? process.env.REDIS_PASSWORD),
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().nonnegative().default(100),
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
Expand Down
168 changes: 167 additions & 1 deletion apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,42 @@ import { RunId } from "@trigger.dev/core/v3/isomorphic";
import type { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
import { logger } from "~/services/logger.server";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { ServiceValidationError } from "~/v3/services/common.server";
import type { RunEngine } from "~/v3/runEngine.server";
import { shouldIdempotencyKeyBeCleared } from "~/v3/taskStatus";
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
import { claimOrAwait } from "~/v3/mollifier/idempotencyClaim.server";
import type { TraceEventConcern, TriggerTaskRequest } from "../types";

// Claim ownership context returned to the caller when the
// IdempotencyKeyConcern won a pre-gate claim. Caller MUST publish the
// winning runId on pipeline success (`publishClaim`) or release the
// claim on failure (`releaseClaim`).
export type ClaimedIdempotency = {
envId: string;
taskIdentifier: string;
idempotencyKey: string;
// Ownership token from `claimOrAwait`. The caller's trigger pipeline
// MUST thread this into publishClaim/releaseClaim so the buffer's
// compare-and-act protects the slot against a stale predecessor.
token: string;
};

export type IdempotencyKeyConcernResult =
| { isCached: true; run: TaskRun }
| { isCached: false; idempotencyKey?: string; idempotencyKeyExpiresAt?: Date };
| {
isCached: false;
idempotencyKey?: string;
idempotencyKeyExpiresAt?: Date;
// Set when this trigger holds a pre-gate claim. The caller's
// trigger pipeline MUST resolve the claim by either publishing
// the runId on success or releasing on failure. Undefined when
// the request has no idempotency key, when the buffer is
// unavailable, or when the request is a triggerAndWait (claim
// path skipped per plan doc).
claim?: ClaimedIdempotency;
};

export class IdempotencyKeyConcern {
constructor(
Expand All @@ -17,6 +46,47 @@ export class IdempotencyKeyConcern {
private readonly traceEventConcern: TraceEventConcern
) {}

// Q5 buffer-side dedup. Resolves an idempotency key against the
// mollifier buffer when PG missed. Returns a SyntheticRun cast to
// TaskRun so the route handler (which only reads run.id / run.friendlyId)
// can echo the buffered run's friendlyId as a cached hit. Returns null
// for any failure or miss — buffer outages must not 500 the trigger
// hot path; we fail open to "no cache hit" and let the request through.
private async findBufferedRunWithIdempotency(
environmentId: string,
organizationId: string,
taskIdentifier: string,
idempotencyKey: string,
): Promise<TaskRun | null> {
const buffer = getMollifierBuffer();
if (!buffer) return null;

let bufferedRunId: string | null;
try {
bufferedRunId = await buffer.lookupIdempotency({
envId: environmentId,
taskIdentifier,
idempotencyKey,
});
} catch (err) {
logger.error("IdempotencyKeyConcern: buffer lookupIdempotency failed", {
environmentId,
taskIdentifier,
err: err instanceof Error ? err.message : String(err),
});
return null;
}
if (!bufferedRunId) return null;

const synthetic = await findRunByIdWithMollifierFallback({
runId: bufferedRunId,
environmentId,
organizationId,
});
if (!synthetic) return null;
return synthetic as unknown as TaskRun;
}

async handleTriggerRequest(
request: TriggerTaskRequest,
parentStore: string | undefined
Expand Down Expand Up @@ -44,6 +114,25 @@ export class IdempotencyKeyConcern {
})
: undefined;

// Buffer fallback per Q5 mollifier-idempotency design. PG missed —
// the same key may belong to a buffered run that hasn't materialised
// yet. Skipped when `resumeParentOnCompletion` is set: blocking a
// parent on a buffered child via waitpoint requires a PG row that
// doesn't exist yet. The follow-up accept's SETNX in mollifyTrigger
// still dedupes the trigger itself; the waitpoint just doesn't fire
// for this rare race window.
if (!existingRun && idempotencyKey && !request.body.options?.resumeParentOnCompletion) {
const buffered = await this.findBufferedRunWithIdempotency(
request.environment.id,
request.environment.organizationId,
request.taskId,
idempotencyKey,
);
if (buffered) {
return { isCached: true, run: buffered };
}
}

if (existingRun) {
// The idempotency key has expired
if (existingRun.idempotencyKeyExpiresAt && existingRun.idempotencyKeyExpiresAt < new Date()) {
Expand Down Expand Up @@ -133,6 +222,83 @@ export class IdempotencyKeyConcern {
return { isCached: true, run: existingRun };
}

// Pre-gate claim — closes the PG+buffer race during gate transition
// (see _plans/2026-05-21-mollifier-idempotency-claim.md). All
// same-key triggers serialise here before evaluateGate decides
// PG-pass-through vs mollify. Skipped for triggerAndWait
// (resumeParentOnCompletion) — that path bypasses the gate via F4
// and its existing PG-side dedup is sufficient.
if (!request.body.options?.resumeParentOnCompletion) {
const ttlSeconds = Math.max(
1,
Math.min(
30,
Math.ceil((idempotencyKeyExpiresAt.getTime() - Date.now()) / 1000),
),
);
const outcome = await claimOrAwait({
envId: request.environment.id,
taskIdentifier: request.taskId,
idempotencyKey,
ttlSeconds,
});
if (outcome.kind === "resolved") {
// Another concurrent trigger committed first. Re-resolve via the
// existing checks: writer-side PG findFirst first (defeats
// replica lag), then buffer fallback for the buffered case.
const writerRun = await this.prisma.taskRun.findFirst({
where: {
runtimeEnvironmentId: request.environment.id,
idempotencyKey,
taskIdentifier: request.taskId,
},
include: { associatedWaitpoint: true },
});
if (writerRun) {
return { isCached: true, run: writerRun };
}
const buffered = await this.findBufferedRunWithIdempotency(
request.environment.id,
request.environment.organizationId,
request.taskId,
idempotencyKey,
);
if (buffered) {
return { isCached: true, run: buffered };
}
// Claim resolved to a runId nothing can find — likely the
// claimant errored after publish, or the row TTL'd out. Log
// and fall through to a fresh trigger.
logger.warn("idempotency claim resolved but runId not findable", {
envId: request.environment.id,
taskIdentifier: request.taskId,
claimedRunId: outcome.runId,
});
}
if (outcome.kind === "timed_out") {
throw new ServiceValidationError(
"Idempotency claim resolution timed out",
503,
);
}
if (outcome.kind === "claimed") {
// Caller MUST publish/release. Signalled via the result's
// `claim` field, including the ownership token so the buffer
// can compare-and-act on the slot we now own.
return {
isCached: false,
idempotencyKey,
idempotencyKeyExpiresAt,
claim: {
envId: request.environment.id,
taskIdentifier: request.taskId,
idempotencyKey,
token: outcome.token,
},
};
}
}

return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
}
}
Loading