Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apps/sim/app/api/schedules/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ const JOB_CHUNK_SIZE = 100
const MAX_TICK_DURATION_MS = 3 * 60 * 1000
const STALE_SCHEDULE_CLAIM_MS = getMaxExecutionTimeout()

/**
* Upper bound (ms) for the random start delay applied to each scheduled
* execution. Cron schedules all fire on the same boundary (e.g. every `:00`),
* which stampedes the database connection pool at the top of each minute/hour.
* Spreading starts across a [0, 30s) window smooths that burst.
*/
const SCHEDULE_JITTER_MAX_MS = 30_000

const dueFilter = (queuedAt: Date) =>
and(
isNull(workflowSchedule.archivedAt),
Expand Down Expand Up @@ -217,6 +225,7 @@ async function processScheduleItem(
const jobId = await jobQueue.enqueue('schedule-execution', payload, {
jobId: scheduleJobId,
concurrencyKey: scheduleJobId,
delayMs: Math.floor(Math.random() * SCHEDULE_JITTER_MAX_MS),
metadata: {
workflowId: schedule.workflowId ?? undefined,
workspaceId: resolvedWorkspaceId ?? undefined,
Expand Down
3 changes: 3 additions & 0 deletions apps/sim/lib/core/async-jobs/backends/trigger-dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ export class TriggerDevJobQueue implements JobQueueBackend {
triggerOptions.idempotencyKey = options.jobId
triggerOptions.idempotencyKeyTTL = '14d'
}
if (options?.delayMs && options.delayMs > 0) {
triggerOptions.delay = new Date(Date.now() + options.delayMs)
}
const handle = await tasks.trigger(taskId, enrichedPayload, triggerOptions)

logger.debug('Enqueued job via trigger.dev', { jobId: handle.id, type, taskId, tags })
Expand Down
Loading