Deduplicate custom background tasks#806
Conversation
Generalize Fedify's enqueue-and-process-later pattern, previously limited to outgoing activity delivery, to arbitrary application-defined background jobs. `Federation` and `FederationBuilder` gain `defineTask()` (via the new `TaskRegistry` interface), and `Context` gains `enqueueTask()`/`enqueueTaskMany()`. Each task carries a Standard Schema that infers the payload type and validates it both at enqueue time and at dequeue time, guarding against schema drift across deployments. Payloads are serialized with devalue so that `Date`, `Map`, `Set`, `URL`, `bigint`, circular references, and Activity Vocabulary objects round-trip faithfully across every message queue backend. Failed handlers retry with exponential backoff by default, configurable per task or federation-wide, and tasks can be isolated onto a dedicated queue or fall back to the outbox queue. The payload codec is implemented twice on purpose: `codec.ts` as a class (`TaskCodec`) and `codec-fn.ts` as standalone utility functions, each with its own tests. Only the class is wired into the runtime; the functional variant is kept temporarily so the team can compare the two styles and decide which reads better before one is removed. fedify-dev#206 fedify-dev#797 Assisted-by: Claude Code:claude-opus-4-8
Split the monolithic `install` task into `install:deno` and `install:pnpm`, with `codegen` as an explicit dependency, so each runtime's setup can be run on its own. `test:deno` now depends on `install:deno` instead of `prepare`, since Deno runs the TypeScript sources directly and does not need the build step. Update AGENTS.md to match: document `mise run prepare`/`prepare-each` for building, `check-each` and `test-each` for scoping work to specific packages, and add a section directing agents to consult `mise tasks`. Assisted-by: Claude Code:claude-opus-4-8
The `install:deno` task runs `scripts/install.ts`, which `deno cache`s each workspace member's export entry points; those include the generated `packages/vocab/src/vocab.ts`. `install:pnpm` likewise expects the generated sources to be present. Both therefore require `codegen` to have run first. Previously `codegen` sat alongside `install:deno` and `install:pnpm` in the `install` task's `depends` list, which `mise` runs in parallel, so the cache step could start before `vocab.ts` was generated. Move `codegen` into each subtask's own `depends` so it is ordered before them; `mise` dedupes the shared dependency to a single run. As a result `install:deno` and `install:pnpm` are now correct when invoked on their own, not only as part of `install`. Also correct the `install:pnpm` description, which said "for Deno". Assisted-by: Claude Code:claude-opus-4-8
`isPlainObject` in the task codec only accepted objects whose prototype is exactly `Object.prototype`, so an object made with `Object.create(null)` was treated as a non-plain leaf. Any vocab object nested inside such an object was therefore left as its parked holder instead of being revived, even though devalue round-trips null-prototype objects without throwing. Accept a `null` prototype as well, and add a regression test that round-trips a vocab object nested in an `Object.create(null)` object. fedify-dev#803 (comment) Assisted-by: Claude Code:claude-opus-4-8
When a custom task handler throws and the queue does not own retries, the error path computes the elapsed time from `message.started` to feed the retry policy. `message.started` is normally a valid ISO instant set at enqueue time, but a corrupted or drifted queue could hand back an invalid string, in which case `Temporal.Instant.from()` threw out of the error-handling block. That masked the original handler error and aborted the retry, silently dropping the task. Wrap the parse in a try-catch, fall back to a zero elapsed time, and log the offending value. A regression test drives a message with a malformed `started` through a throwing handler and asserts the retry is still enqueued. fedify-dev#803 (comment) Assisted-by: Claude Code:claude-opus-4-8
`FederationBuilderImpl.taskDefinitions` was a plain object, so the duplicate check `name in this.taskDefinitions` and the lookups `this.taskDefinitions[taskName]` consulted the prototype chain. Task names are arbitrary user-supplied strings, so a name such as "constructor", "toString", or "__proto__" was wrongly reported as already defined and resolved to an inherited method on lookup. Switch the registry to a `Map`, which is immune to prototype keys by construction and avoids the clone footgun where a later spread or `Object.assign` would silently reintroduce the prototype. Sibling registries stay plain objects since they are keyed by controlled values (type-id URLs). Add a regression test covering names that collide with `Object.prototype`. fedify-dev#803 (comment) Assisted-by: Claude Code:claude-opus-4-8
`#revive` mapped every node through all five class revivers, allocating five promises per node and resolving them with `Array.fromAsync` before picking the first truthy result. The class filters are mutually exclusive, so it now finds the single matching reviver and runs only that one, cutting the per-node work to a single promise. This keeps the existing behaviour (cycles, repeated references, and Map/Set/Array/plain-object/null-prototype containers all still round-trip, as the codec tests assert) and folds the rationale for two declined suggestions into a comment: the walked tree is devalue's throwaway parse output, so there is no external identity to preserve and nothing to clone lazily; and a recursion-depth cap is moot because this pass recurses with `await` (unwinding the stack each level) while devalue's own recursive `stringify`/`parse` is the binding limit on nesting and would overflow first. fedify-dev#803 (comment) fedify-dev#803 (comment) Assisted-by: Claude Code:claude-opus-4-8
A task may route to its own queue via `defineTask(name, { queue })`, and
`resolveTaskQueue()` enqueues its messages there, but
`_startQueueInternal()` only listened on the four federation-wide queues
(inbox, outbox, fanout, task). A task queue that was none of those got
no worker, so its messages were never processed even while
`startQueue()` was running.
Collect the distinct dedicated queue instances from the task registry
and start a worker for each, treating them as part of the "task"
selector. Dedupe against the standard queues and against task queues
already started on an earlier call so no instance is listened on twice,
and let a deployment whose only queues are per-task ones still start:
the early return no longer bails out when a dedicated task queue exists.
fedify-dev#803 (comment)
Assisted-by: Claude Code:claude-opus-4-8
@dahlia, the maintainer picks `TaskCodec` because it carries the loader state on the instance at [a comment](fedify-dev#803 (comment)). Therefore remove the *codec-fn.ts*. `TaskCodecLoaders` moved to *codec.ts* because `TaskCodec` use it.
The task payload schema validates on both sides of the queue: at enqueue time and again at dequeue time. The wire therefore carries the validated *output*, which the same schema must re-accept as input, so transforming schemas (e.g., Zod's .transform()) whose output differs in shape from their input cannot round-trip. This constraint was neither documented nor tested; state it in the manual and the schema option's JSDoc, and pin it with a regression test. fedify-dev#803 Assisted-by: Claude Code:claude-fable-5
resolveTaskQueue() returns the fallback queue even for a task name with no registered definition, so enqueuing a handle created by a different federation instance silently succeeded and the worker later dropped the message with only a warning. The task API's contract is to fail fast at the enqueue call site (it already validates the payload there), so check the registry before resolving a queue and throw a TypeError instead. fedify-dev#803 Assisted-by: Claude Code:claude-fable-5
Small follow-ups from review:
- Document that tasks must be defined before startQueue() (or the
first request); workers for dedicated per-task queues are only
registered when the queue machinery starts, so a queue defined
later never gets a worker.
- Return early from the enqueue path when no payloads are given,
instead of reaching enqueueMany()/Promise.all with an empty
batch, whose backend behavior is undefined.
- Rename #enqueueSingular to #encodeTaskMessage; it encodes and
builds a TaskMessage but does not enqueue anything.
- Fix a comment typo in the codec.
fedify-dev#803
Assisted-by: Claude Code:claude-fable-5
The revival dispatch pulled init/set out of a heterogeneous tuple list, losing the correlation between each tuple's filter and its init/set node type, which forced two @ts-ignore suppressions at the call site. Such suppressions hide any future error on those lines, so they are unfit for a permanent implementation. Each entry is now built by a generic classReviver() factory whose single type parameter ties the filter to its init/set, letting the compiler check the calls it previously could not. Also bind the recursive reviver to one inner closure per decode pass instead of allocating a fresh closure on every dispatch. fedify-dev#803 (comment) Assisted-by: Claude Code:claude-fable-5
The __contextData phantom field binds a TaskDefinition handle to its federation's context data type, but as a string-keyed property it leaked into user-facing docs and IDE completions despite its @internal tag. Replace it with a module-private unique symbol key: no value exists at runtime, the marker disappears from completions, and cross-federation handle rejection still type-checks, now guarded by a regression test. Also replace the tasks barrel's wildcard re-export of task.ts with explicit named exports of the six types its consumers actually use, so nothing new falls through the barrel unnoticed. fedify-dev#803 (comment) Assisted-by: Claude Code:claude-fable-5
Automated reviewers keep proposing a fixed recursion depth cap (~100) in TaskCodec's #revive to guard against stack overflow from deeply nested payloads. The concern does not apply: the revive traversal suspends at an await on every level, so nesting depth consumes heap (promise chains) rather than native stack, and a structure deep enough to threaten the stack would fail inside devalue.parse() before #revive ever ran. A cap would only reject legitimate payloads. Add a regression test that round-trips a payload nested 1,000 levels deep—an order of magnitude above any proposed cap—through alternating objects and arrays down to a vocab leaf, so introducing such a cap now fails the suite. fedify-dev#803 (comment) Assisted-by: Claude Code:claude-fable-5
The enqueue guard only checked that the handle's task name existed in the local registry, so once two federation instances defined the same task name, a handle from the other instance slipped through: the local context encoded the payload under the schema carried by the foreign handle while the worker decoded it under the local definition's schema. A payload the local schema would have rejected at enqueue thus landed in the queue anyway, only to be dropped at decode time—defeating the fail-fast purpose the guard exists for. defineTask() now stores the exact handle object it returns alongside the internal definition, and enqueueTask()/enqueueTaskMany() compare that handle by identity. Handles still work on every federation built from the same builder, since build() shares the stored definitions. The cross-federation regression test now covers the same-name case in addition to the undefined-name case. fedify-dev#803 (comment) Assisted-by: Claude Code:claude-fable-5
MockContext.enqueueTask() invoked the handler with the raw input, while production enqueueTask() validates the payload against the task schema and hands the validated output to the handler. Tests written against @fedify/testing therefore accepted payloads that production rejects at enqueue, and observed the raw input rather than the coerced or normalized value a transforming schema produces—masking integration bugs the mock exists to surface. The mock now runs the registered schema's Standard Schema validator before invoking the handler, throwing the same TypeError production throws on failure and passing the validated output through. enqueueTaskMany() inherits this since it delegates to enqueueTask(). Added tests covering a rejected payload, a coercing schema whose validated output reaches the handler, and per-item validation in the batch path. fedify-dev#803 (comment) Assisted-by: Claude Code:claude-fable-5
The @standard-schema/spec import is shared by the fedify and testing packages, so it belongs at the workspace level rather than being declared per package. The root deno.json already lists it and workspace members inherit the root import map, making the copy in the fedify package's deno.json redundant; drop it. The pnpm side already sources the version from the catalog in pnpm-workspace.yaml, with each package.json referencing it as "catalog:". Assisted-by: Claude Code:claude-fable-5
Every other enqueue path (inbox, outbox, fanout, forwarding) calls _startQueueInternal() right before enqueuing unless manuallyStartQueue is set, but #enqueueTasks did not. An application that only uses the custom task API never sends an activity, so with the default configuration its first enqueueTask() accepted the message while no worker ever listened: tasks piled up in the queue unprocessed until startQueue() was called explicitly or an activity happened to be sent. Add the same guard to #enqueueTasks, plus a regression test asserting that the first enqueue starts the task worker exactly once and that a second enqueue does not start another listener. fedify-dev#803 (comment) Assisted-by: Claude Code:claude-fable-5
Production's enqueueTaskMany() validates and encodes every payload with Promise.all() before enqueuing anything, so a batch with one invalid item rejects with no effect. The mock looped enqueueTask() per item instead, invoking handlers for earlier payloads before a later one failed validation—tests could observe a partial processing state that cannot occur in production. Split the definition lookup and the schema validation out of enqueueTask() into helpers, and make enqueueTaskMany() validate the whole batch up front, running handlers only once every payload has passed. The existing batch-validation test now pins that no handler runs at all when the batch rejects. fedify-dev#803 (comment) fedify-dev#803 (comment) Assisted-by: Claude Code:claude-fable-5
Production compares the registered handle by identity (14313a1), so passing a handle from another federation instance throws even when both instances define the same task name. The mock looked definitions up by name only, and defineTask() did not keep the handle it returned, so an identity check was impossible: tests could pass with a handle the real federation rejects. Store the returned handle with the definition and require the enqueued handle to be that very object, with the same error message production uses. A regression test defines the same task name on two mock federations and asserts the foreign handle is rejected without running any handler. fedify-dev#803 (comment) Assisted-by: Claude Code:claude-fable-5
The custom background task APIs added on this branch were annotated with @SInCE 2.3.0, but the release that will include them is not yet decided. Replace those tags with the placeholder 2.x.x so the documentation does not promise a specific version prematurely. Affected APIs: Context.enqueueTask and enqueueTaskMany, the taskRetryPolicy and taskQueueResolution federation options, the task queue option, TaskMessage, and the task definition types (TaskHandler, TaskDefinitionOptions, TaskDefinition, TaskRegistry, and TaskEnqueueOptions). Assisted-by: Claude Code:claude-opus-4-8
Local review of the custom background task PR flagged several
documentation-level problems; no runtime behavior is affected:
- The tasks manual claimed the API ships in Fedify 2.3.0, while the
new APIs' JSDoc had already moved to `@since 2.x.x` because the
containing release is undecided. Align the manual with the JSDoc.
- The manual also claimed a per-task queue defined after the queue
machinery starts "never gets a worker." Without
`manuallyStartQueue`, the next request or enqueue starts the
worker, so soften the claim to match the implementation.
- A comment in codec.test.ts described an instance-level `#seen` map
that does not exist—each `deserialize()` call builds its own
per-decode map—and the first test's title claimed a fresh instance
per operation while the tests share one module-level codec.
Correct both.
- Fix grammar errors in the new *AGENTS.md* paragraph about
`mise tasks`.
fedify-dev#803
Assisted-by: Claude Code:claude-fable-5
Assisted-by: Codex:gpt-5-5
The custom task API's producer side—Context.enqueueTask() and
enqueueTaskMany()—had no direct coverage at the middleware layer. Add
tests that drive a real ContextImpl against a recording queue and assert
on what it enqueues:
- enqueueTask() builds a well-formed task message (type, taskName,
baseUrl, attempt, UUID id, parseable started instant, trace context)
and round-trips a vocab payload through the codec as JSON-LD.
- enqueueTaskMany() routes a multi-item batch through enqueueMany(),
preserving order and forwarding delay/orderingKey, while a
single-item batch uses enqueue() instead.
- When the queue lacks enqueueMany(), the batch falls back to
concurrent single enqueues—verified with a rendezvous queue that
blocks until both are in flight—still preserving order and options.
- An invalid payload anywhere in the batch rejects with a schema
TypeError and enqueues nothing.
To avoid duplicating fixtures, the MockQueue and Standard Schema test
helpers that tasks.test.ts defined inline move to testing/tasks.ts
(re-exported by testing/mod.ts); both suites now import the single
implementation, and the fixture-usage allowlist covers the new file.
fedify-dev#803
Assisted-by: Claude Code:claude-opus-4-8
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: ASSERTIVE Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughAdds a complete custom background task API to Fedify: ChangesCustom Background Task API
Sequence Diagram(s)sequenceDiagram
participant App
participant Federatable
participant Context
participant enqueueTasks
participant KvStore
participant MessageQueue
participant Worker as Task Worker (`#listenTaskMessage`)
participant TaskHandler
App->>Federatable: defineTask("sendDigest", { schema, handler })
Federatable-->>App: TaskDefinition handle
App->>Context: enqueueTask(handle, payload, { deduplicationKey: "k" })
Context->>enqueueTasks: task, [payload], options
enqueueTasks->>enqueueTasks: planDeduplication → cas plan
enqueueTasks->>enqueueTasks: encodeTaskMessage (codec.encode + envelope)
enqueueTasks->>KvStore: cas(taskDeduplication/k, token, ttl)
KvStore-->>enqueueTasks: claimed (proceed=true)
enqueueTasks->>MessageQueue: enqueue(TaskMessage)
MessageQueue-->>enqueueTasks: ok
MessageQueue-->>Worker: deliver TaskMessage
Worker->>Worker: codec.decode(schema, message.data)
Worker->>TaskHandler: handler(context, validatedPayload)
TaskHandler-->>Worker: ok
Note over App,KvStore: Duplicate enqueue within TTL
App->>Context: enqueueTask(handle, payload2, { deduplicationKey: "k" })
Context->>enqueueTasks: task, [payload2], options
enqueueTasks->>KvStore: cas(taskDeduplication/k, token2, ttl)
KvStore-->>enqueueTasks: lost (proceed=false)
enqueueTasks-->>Context: (no enqueue)
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related issues
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@codex review |
|
✅ Action performedReview finished.
|
|
To use Codex here, create an environment for this repo. |
There was a problem hiding this comment.
Code Review
This pull request introduces a custom background task API for Fedify 2.x.x, allowing applications to define, enqueue, and process arbitrary background jobs. It adds defineTask to Federation and FederationBuilder, and enqueueTask/enqueueTaskMany to Context. Payloads are validated using Standard Schema and serialized with devalue to support complex types and Activity Vocabulary objects. The API supports customizable retry policies, queue routing, and deduplication (either native or via a key-value fallback). Additionally, the PR updates testing utilities, documentation, and mise tasks. Feedback on the changes suggests implementing a depth limit in the recursive revival traversal of TaskCodec to mitigate potential denial-of-service (DoS) attacks or stack overflows from deeply nested payloads.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| #revive = (seen: Seen): Revive => { | ||
| const inner: Revive = async (node) => { | ||
| if (node === null || typeof node !== "object") return node; | ||
| if (seen.has(node)) return seen.get(node); | ||
| for (const reviver of this.#classRevivers) { | ||
| const out = reviver(seen, inner, node); | ||
| if (out !== undefined) return await out; | ||
| } | ||
| // devalue can handle non-container objects. | ||
| return node; | ||
| }; | ||
| return inner; | ||
| }; |
There was a problem hiding this comment.
To prevent potential CPU exhaustion or stack overflows from maliciously crafted deep JSON payloads, we should implement a depth limit in the recursive revival traversal function. A limit of 100 is typically high enough for legitimate ActivityPub data but low enough to prevent DoS attacks.
#revive = (seen: Seen): Revive => {
const inner = async (node: unknown, depth: number): Promise<unknown> => {
if (node === null || typeof node !== "object") return node;
if (seen.has(node)) return seen.get(node);
if (depth > 100) {
throw new TypeError("Maximum depth limit exceeded");
}
for (const reviver of this.#classRevivers) {
const out = reviver(seen, (next) => inner(next, depth + 1), node);
if (out !== undefined) return await out;
}
// devalue can handle non-container objects.
return node;
};
return (node) => inner(node, 0);
};References
- To prevent stack overflow from maliciously crafted deep JSON, implement a depth limit in recursive traversal functions. The limit should be high enough for legitimate data but low enough to prevent DoS attacks.
There was a problem hiding this comment.
Code Review
This pull request introduces a custom background task API for Fedify, enabling the definition, enqueuing, and background processing of arbitrary application-defined jobs with robust serialization, retry policies, queue routing, and deduplication. The code review feedback highlights several critical compatibility, security, and performance improvements: replacing the ES2024 Array.fromAsync calls with standard loops to maintain compatibility with Node.js 18 and 20, implementing a depth limit in the recursive deserialization function to prevent potential Denial of Service (DoS) attacks, preserving null prototypes during object revival to avoid prototype pollution, and optimizing the enqueue pipeline by validating payloads before claiming deduplication keys.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| classReviver( | ||
| isInstanceOf(Array), | ||
| (): unknown[] => [], | ||
| async (revive, node, arr) => { | ||
| arr.push(...await Array.fromAsync(node, revive)); | ||
| }, | ||
| ), |
There was a problem hiding this comment.
Array.fromAsync is an ES2024 feature that is not supported in Node.js 18 or 20 without a polyfill or flag. Since this project supports Node.js >= 18, using Array.fromAsync will throw a TypeError in those environments.
We can replace it with a standard for...of loop with await to achieve the same asynchronous mapping while maintaining full compatibility with Node.js 18 and 20.
classReviver(
isInstanceOf(Array),
(): unknown[] => [],
async (revive, node, arr) => {
for (const item of node) {
arr.push(await revive(item));
}
},
),| isInstanceOf(Set), | ||
| () => new Set<unknown>(), | ||
| async (revive, node, set) => { | ||
| for (const v of await Array.fromAsync(node, revive)) set.add(v); | ||
| }, | ||
| ), | ||
| classReviver( |
There was a problem hiding this comment.
Similar to the Array reviver, Array.fromAsync is not supported in Node.js 18 or 20. We can replace it with a standard for...of loop to ensure compatibility.
| isInstanceOf(Set), | |
| () => new Set<unknown>(), | |
| async (revive, node, set) => { | |
| for (const v of await Array.fromAsync(node, revive)) set.add(v); | |
| }, | |
| ), | |
| classReviver( | |
| classReviver( | |
| isInstanceOf(Set), | |
| () => new Set<unknown>(), | |
| async (revive, node, set) => { | |
| for (const item of node) { | |
| set.add(await revive(item)); | |
| } | |
| }, | |
| ), |
| #revive = (seen: Seen): Revive => { | ||
| const inner: Revive = async (node) => { | ||
| if (node === null || typeof node !== "object") return node; | ||
| if (seen.has(node)) return seen.get(node); | ||
| for (const reviver of this.#classRevivers) { | ||
| const out = reviver(seen, inner, node); | ||
| if (out !== undefined) return await out; | ||
| } | ||
| // devalue can handle non-container objects. | ||
| return node; | ||
| }; | ||
| return inner; | ||
| }; |
There was a problem hiding this comment.
To prevent potential DoS attacks or excessive memory/CPU consumption from maliciously crafted deep JSON payloads, we should implement a depth limit in the recursive #revive function. This aligns with the repository's general rules for recursive traversal functions.
#revive = (seen: Seen, maxDepth = 100): Revive => {
const inner = (depth: number): Revive => {
return async (node) => {
if (depth > maxDepth) {
throw new TypeError("Exceeded maximum depth limit during deserialization.");
}
if (node === null || typeof node !== "object") return node;
if (seen.has(node)) return seen.get(node);
const nextRevive = inner(depth + 1);
for (const reviver of this.#classRevivers) {
const out = reviver(seen, nextRevive, node);
if (out !== undefined) return await out;
}
return node;
};
};
return inner(0);
};References
- To prevent stack overflow from maliciously crafted deep JSON, implement a depth limit in recursive traversal functions. The limit should be high enough for legitimate data but low enough to prevent DoS attacks.
| isPlainObject, | ||
| (): Record<string, unknown> => ({}), | ||
| async (revive, node, obj) => { | ||
| for (const [k, v] of globalThis.Object.entries(node)) { | ||
| obj[k] = await revive(v); | ||
| } | ||
| }, | ||
| ), |
There was a problem hiding this comment.
The current implementation always revives plain objects with Object.prototype (via {}). If the original object had a null prototype (e.g., created via Object.create(null)), this prototype fidelity is lost, which could lead to prototype pollution vulnerabilities or key collision bugs in applications relying on null-prototype maps.
We can preserve the prototype of the parsed object by using Object.create(globalThis.Object.getPrototypeOf(node)) in the init function.
classReviver(
isPlainObject,
(node): Record<string, unknown> =>
Object.create(globalThis.Object.getPrototypeOf(node)),
async (revive, node, obj) => {
for (const [k, v] of globalThis.Object.entries(node)) {
obj[k] = await revive(v);
}
},
),| const messages: TaskMessage[] = await Promise.all( | ||
| items.map(encodeTaskMessage(ctx.codec, ctx.origin, task, options)), | ||
| ); | ||
| const claim = await claimDeduplication(ctx, plan, task.name); | ||
| if (!claim.proceed) return; | ||
| if (!ctx.federation.manuallyStartQueue) { | ||
| ctx.federation._startQueueInternal(ctx.data); | ||
| } |
There was a problem hiding this comment.
Currently, enqueueTasks encodes (validates and serializes) all items before claiming the deduplication key. Since serialization (especially toJsonLd expansion for ActivityPub objects) can be quite expensive, we can optimize this by validating all items first (to fail fast and satisfy the requirement of validating before claiming the key), then claiming the key, and only serializing the payloads if the claim succeeds.
// Validate all items first to fail fast and avoid claiming keys for invalid payloads
const validatedItems = await Promise.all(
items.map((item) => TaskCodec.validate(task.schema, item))
);
const claim = await claimDeduplication(ctx, plan, task.name);
if (!claim.proceed) return;
if (!ctx.federation.manuallyStartQueue) {
ctx.federation._startQueueInternal(ctx.data);
}
const messages: TaskMessage[] = await Promise.all(
validatedItems.map(async (data) => {
const encoded = await ctx.codec.serialize(data);
const carrier: Record<string, unknown> = {};
propagation.inject(context.active(), carrier);
return {
type: "task",
id: crypto.randomUUID(),
baseUrl: ctx.origin,
taskName: task.name,
data: encoded,
started: Temporal.Now.instant().toString(),
attempt: 0,
orderingKey: options.orderingKey,
traceContext: carrier,
};
})
);References
- Avoid using Record<string, string> for types that can vary based on conditions, as it can lead to type errors and require unnecessary type assertions, confusing contributors. Consider using object or a more flexible type if properties are dynamic.
There was a problem hiding this comment.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
packages/testing/src/mock.ts (1)
515-527:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
queueStartedshould not flip to true for non-outbox queue startup.Starting only
"task"currently setsqueueStarted = true, so latersendActivity()is misclassified as queued outbox delivery.Suggested fix
async startQueue( contextData: TContextData, options?: FederationStartQueueOptions, ): Promise<void> { this.contextData = contextData; - this.queueStarted = true; // If a specific queue is specified, only activate that one if (options?.queue) { this.activeQueues.add(options.queue); } else { // If no specific queue, activate all four this.activeQueues.add("inbox"); this.activeQueues.add("outbox"); this.activeQueues.add("fanout"); this.activeQueues.add("task"); } + this.queueStarted = this.activeQueues.has("outbox"); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/testing/src/mock.ts` around lines 515 - 527, The queueStarted flag is being unconditionally set to true when any queue starts, but it should only be true when the "outbox" queue is activated. This causes sendActivity() to incorrectly classify non-outbox queue activity as queued outbox delivery. Move the assignment of this.queueStarted = true to only execute when either no specific queue is specified (meaning all queues including outbox are activated) or when the specific queue being activated is "outbox". Update the conditional logic to check if options?.queue is either undefined or explicitly equals "outbox" before setting queueStarted to true.packages/fedify/src/federation/middleware.ts (1)
902-906: 🧹 Nitpick | 🔵 Trivial | ⚡ Quick winWire task queues into the existing queue observability path.
Task queues are omitted from
#queueDepthGaugeEntries, andmessage.type === "task"bypasses the span/started/outcome/in-flight wrapper used by inbox/outbox/fanout workers. That makes task backlog and handler failures invisible to the existing queue metrics/traces even though tasks are first-class queued work.Also applies to: 1239-1240
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/fedify/src/federation/middleware.ts` around lines 902 - 906, Task queues are not included in the `#queueDepthGaugeEntries` array and the task message type handler bypasses the observability wrapper (span/started/outcome/in-flight) used by inbox/outbox/fanout workers, making task metrics and traces invisible. Add the task queue entry to `#queueDepthGaugeEntries` following the same pattern as the inbox, outbox, and fanout entries with role "task" and the corresponding task queue. Additionally, update the message handler to apply the same observability wrapper and metrics collection to task queue messages that are currently applied to other queue types, ensuring all queued work is tracked consistently.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@AGENTS.md`:
- Around line 137-139: Replace the placeholder syntax `mise run
test:<runtime:deno,node,bun>` with concrete, runnable task names. List the
actual commands explicitly as `mise run test:deno`, `mise run test:node`, and
`mise run test:bun` so users can copy and paste them directly without having to
interpret pseudo-syntax. This makes the documentation immediately actionable and
prevents confusion.
In `@packages/fedify/src/federation/context.ts`:
- Around line 460-473: Update the JSDoc comment for the `enqueueTaskMany` method
to accurately document its behavior with deduplicated batches. First, modify the
statement that currently describes falling back to parallel single enqueues to
make it conditional, clarifying that this fallback only occurs when the queue's
bulk enqueue operation is available or in certain situations. Second, add an
additional `@throws {TypeError}` entry documenting that a TypeError is thrown
when atomic `enqueueMany` is unavailable for deduplicated multi-item batches,
ensuring the documentation fully captures all error conditions the method can
encounter.
In `@packages/fedify/src/federation/middleware.test.ts`:
- Around line 10568-10573: The module-scoped taskFederationOptions object reuses
a single MemoryKvStore instance across all tests, causing potential state
pollution and test interdependencies. Move the taskFederationOptions definition
from module scope into each test function or a beforeEach hook, ensuring a fresh
MemoryKvStore instance is created for each test. Then update all test functions
that reference the module-scoped taskFederationOptions to use the local version
instead.
In `@packages/fedify/src/federation/middleware.ts`:
- Around line 944-949: The resolveTaskQueue method correctly identifies which
queue to use for task messages, but the _startQueueInternal method does not
properly handle the "task" selector when that queue is shared with standard
queues or routed as a fallback. Fix _startQueueInternal to map the "task"
selector to its resolved task queue instance by calling
resolveTaskQueue("task"), then ensure that resolved queue is started/listened to
exactly once by tracking started queues appropriately. This ensures that when
queue.task is configured as the same instance as inbox/outbox/fanout, or when
task messages are fallback-routed to outboxQueue, the actual queue receiving
those messages has an active listener.
In `@packages/fedify/src/federation/mq.ts`:
- Around line 454-462: The error check in the enqueueMany method around the
deduplicationKey validation is too strict and prevents single-item batches from
using the deduplicated enqueue fallback. Modify the condition to only throw the
TypeError when both deduplicationKey is present AND the batch contains multiple
items. For single-item batches, allow the code to proceed with the fallback to
the wrapped queue's enqueue() method, since atomicity is maintained for
individual items. Check the length of the messages array being enqueued (this
will be available through the method parameters) and only enforce the
restriction when multiple items are being batched.
In `@packages/fedify/src/federation/tasks/codec.ts`:
- Around line 7-15: The TaskCodec class and its public methods (serialize,
deserialize, and validate) lack JSDoc documentation. Add comprehensive JSDoc
comments for the TaskCodec class constructor describing its purpose and the
options parameter, the serialize method explaining that it converts data to a
JSON string and documenting any potential errors, the deserialize method
explaining that it parses a JSON string back into data and documenting any
parsing errors, and the validate method documenting its validation behavior and
error conditions. Each JSDoc should follow the established pattern in the
codebase by including descriptions of parameters, return types, and any errors
that might be thrown.
In `@packages/fedify/src/federation/tasks/tasks.test.ts`:
- Around line 302-323: The startQueue() task worker test currently only covers
the scenario with a distinct task queue, but lacks coverage for when the
effective task queue falls back to shared instances. Add additional test steps
within the same test function to cover the cases where queue.task is undefined
(falling back to the outbox queue) and where queue.task shares an instance with
inbox, outbox, or fanout. For each fallback scenario, verify that only the
appropriate fallback queue (like outbox when task is undefined) has its
listenCount incremented to 1 while the other queues remain at 0, using the same
assertion pattern as the existing test step but with the appropriate queue
configuration.
In `@packages/fedify/src/testing/tasks.ts`:
- Around line 127-134: The MockQueue.listen method does not handle pre-aborted
signals. If the signal passed via options is already aborted before the event
listener is registered, the abort event will never fire and the Promise will
never resolve, causing tests to hang. Check if options?.signal?.aborted is
already true and resolve the Promise immediately in that case. Only register the
abort event listener when the signal is not already aborted.
In `@packages/testing/src/mock.test.ts`:
- Around line 1728-1830: The tests in mock.test.ts are using the old Deno-style
test harness (`@std/assert`) instead of the required Node.js test harness for the
packages/testing package. Replace the import statements to use `import { test }
from "node:test"` and `import assert from "node:assert/strict"`, then update all
assertion calls throughout the tests: replace each `assertEquals()` call with
`assert.equal()` or `assert.strictEqual()`, replace each `assertRejects()` call
with `assert.rejects()`, and ensure the test function signatures and assertion
call syntax conform to the Node.js assert/strict API.
---
Outside diff comments:
In `@packages/fedify/src/federation/middleware.ts`:
- Around line 902-906: Task queues are not included in the
`#queueDepthGaugeEntries` array and the task message type handler bypasses the
observability wrapper (span/started/outcome/in-flight) used by
inbox/outbox/fanout workers, making task metrics and traces invisible. Add the
task queue entry to `#queueDepthGaugeEntries` following the same pattern as the
inbox, outbox, and fanout entries with role "task" and the corresponding task
queue. Additionally, update the message handler to apply the same observability
wrapper and metrics collection to task queue messages that are currently applied
to other queue types, ensuring all queued work is tracked consistently.
In `@packages/testing/src/mock.ts`:
- Around line 515-527: The queueStarted flag is being unconditionally set to
true when any queue starts, but it should only be true when the "outbox" queue
is activated. This causes sendActivity() to incorrectly classify non-outbox
queue activity as queued outbox delivery. Move the assignment of
this.queueStarted = true to only execute when either no specific queue is
specified (meaning all queues including outbox are activated) or when the
specific queue being activated is "outbox". Update the conditional logic to
check if options?.queue is either undefined or explicitly equals "outbox" before
setting queueStarted to true.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 37da6ef5-700b-44ba-83f7-0f5f4faec66b
⛔ Files ignored due to path filters (2)
deno.lockis excluded by!**/*.lockpnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (31)
AGENTS.mdCHANGES.mddocs/.vitepress/config.mtsdocs/manual/tasks.mdmise.tomlpackages/fedify/deno.jsonpackages/fedify/package.jsonpackages/fedify/src/federation/builder.tspackages/fedify/src/federation/context.tspackages/fedify/src/federation/federation.tspackages/fedify/src/federation/middleware.test.tspackages/fedify/src/federation/middleware.tspackages/fedify/src/federation/mod.tspackages/fedify/src/federation/mq.test.tspackages/fedify/src/federation/mq.tspackages/fedify/src/federation/queue.tspackages/fedify/src/federation/tasks/codec.test.tspackages/fedify/src/federation/tasks/codec.tspackages/fedify/src/federation/tasks/enqueue.test.tspackages/fedify/src/federation/tasks/enqueue.tspackages/fedify/src/federation/tasks/mod.tspackages/fedify/src/federation/tasks/task.tspackages/fedify/src/federation/tasks/tasks.test.tspackages/fedify/src/testing/context.tspackages/fedify/src/testing/mod.tspackages/fedify/src/testing/tasks.tspackages/testing/package.jsonpackages/testing/src/context.tspackages/testing/src/mock.test.tspackages/testing/src/mock.tsscripts/check_fixture_usage.ts
| - `mise run test:<runtime:deno,node,bun>`: Executes all the tests by the | ||
| runtime. If some specific tests are needed, execute | ||
| `mise run test:<runtime> <test-file-path-from-the-root>`. |
There was a problem hiding this comment.
Use concrete runnable task names instead of pseudo-syntax.
mise run test:<runtime:deno,node,bun> reads like an executable command but is placeholder syntax. Please list concrete commands (test:deno, test:node, test:bun) to avoid copy-paste failures.
Suggested doc fix
- - `mise run test:<runtime:deno,node,bun>`: Executes all the tests by the
- runtime. If some specific tests are needed, execute
- `mise run test:<runtime> <test-file-path-from-the-root>`.
+ - `mise run test:deno`, `mise run test:node`, `mise run test:bun`:
+ Execute all tests for a specific runtime. For a specific test file, run
+ `mise run test:<runtime> <test-file-path-from-the-root>`.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@AGENTS.md` around lines 137 - 139, Replace the placeholder syntax `mise run
test:<runtime:deno,node,bun>` with concrete, runnable task names. List the
actual commands explicitly as `mise run test:deno`, `mise run test:node`, and
`mise run test:bun` so users can copy and paste them directly without having to
interpret pseudo-syntax. This makes the documentation immediately actionable and
prevents confusion.
| /** | ||
| * Enqueues multiple payloads for a custom background task at once. | ||
| * Uses the queue's bulk enqueue operation when available, falling back | ||
| * to parallel single enqueues. | ||
| * @template TData The type of the task payload, inferred from the task's | ||
| * schema. | ||
| * @param task The handle returned by {@link TaskRegistry.defineTask}. | ||
| * @param payloads The task payloads. Each is validated against the | ||
| * task's schema before being enqueued. | ||
| * @param options Options for enqueuing the tasks. | ||
| * @throws {TypeError} If the task is not defined on this federation, | ||
| * if no message queue is configured for tasks, or if | ||
| * a payload fails schema validation. | ||
| * @since 2.x.x |
There was a problem hiding this comment.
Document the deduplicated-batch TypeError path in enqueueTaskMany JSDoc.
This doc currently implies batch enqueue always falls back to parallel single enqueues, but deduplicated multi-item batches can throw when atomic enqueueMany is unavailable. Please make the fallback statement conditional and add that throw case to @throws.
Suggested doc fix
- * Enqueues multiple payloads for a custom background task at once.
- * Uses the queue's bulk enqueue operation when available, falling back
- * to parallel single enqueues.
+ * Enqueues multiple payloads for a custom background task at once.
+ * Uses the queue's bulk enqueue operation when available. Without
+ * deduplication, it may fall back to parallel single enqueues.
...
- * `@throws` {TypeError} If the task is not defined on this federation,
- * if no message queue is configured for tasks, or if
- * a payload fails schema validation.
+ * `@throws` {TypeError} If the task is not defined on this federation,
+ * if no message queue is configured for tasks, if
+ * a payload fails schema validation, or if a
+ * deduplicated multi-item batch cannot be enqueued
+ * atomically.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /** | |
| * Enqueues multiple payloads for a custom background task at once. | |
| * Uses the queue's bulk enqueue operation when available, falling back | |
| * to parallel single enqueues. | |
| * @template TData The type of the task payload, inferred from the task's | |
| * schema. | |
| * @param task The handle returned by {@link TaskRegistry.defineTask}. | |
| * @param payloads The task payloads. Each is validated against the | |
| * task's schema before being enqueued. | |
| * @param options Options for enqueuing the tasks. | |
| * @throws {TypeError} If the task is not defined on this federation, | |
| * if no message queue is configured for tasks, or if | |
| * a payload fails schema validation. | |
| * @since 2.x.x | |
| /** | |
| * Enqueues multiple payloads for a custom background task at once. | |
| * Uses the queue's bulk enqueue operation when available. Without | |
| * deduplication, it may fall back to parallel single enqueues. | |
| * `@template` TData The type of the task payload, inferred from the task's | |
| * schema. | |
| * `@param` task The handle returned by {`@link` TaskRegistry.defineTask}. | |
| * `@param` payloads The task payloads. Each is validated against the | |
| * task's schema before being enqueued. | |
| * `@param` options Options for enqueuing the tasks. | |
| * `@throws` {TypeError} If the task is not defined on this federation, | |
| * if no message queue is configured for tasks, if | |
| * a payload fails schema validation, or if a | |
| * deduplicated multi-item batch cannot be enqueued | |
| * atomically. | |
| * `@since` 2.x.x |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/fedify/src/federation/context.ts` around lines 460 - 473, Update the
JSDoc comment for the `enqueueTaskMany` method to accurately document its
behavior with deduplicated batches. First, modify the statement that currently
describes falling back to parallel single enqueues to make it conditional,
clarifying that this fallback only occurs when the queue's bulk enqueue
operation is available or in certain situations. Second, add an additional
`@throws {TypeError}` entry documenting that a TypeError is thrown when atomic
`enqueueMany` is unavailable for deduplicated multi-item batches, ensuring the
documentation fully captures all error conditions the method can encounter.
| const taskFederationOptions = { | ||
| kv: new MemoryKvStore(), | ||
| documentLoaderFactory: () => mockDocumentLoader, | ||
| contextLoaderFactory: () => mockDocumentLoader, | ||
| manuallyStartQueue: true, | ||
| }; |
There was a problem hiding this comment.
Avoid sharing a single MemoryKvStore across task enqueue tests.
taskFederationOptions is module-scoped and reuses one KV instance across all tests, which can create order-dependent behavior and flaky failures as more task/dedup/idempotency cases are added.
Suggested fix
-const taskFederationOptions = {
- kv: new MemoryKvStore(),
- documentLoaderFactory: () => mockDocumentLoader,
- contextLoaderFactory: () => mockDocumentLoader,
- manuallyStartQueue: true,
-};
+const makeTaskFederationOptions = () => ({
+ kv: new MemoryKvStore(),
+ documentLoaderFactory: () => mockDocumentLoader,
+ contextLoaderFactory: () => mockDocumentLoader,
+ manuallyStartQueue: true,
+});And update call sites:
-const federation = createFederation<void>({
- ...taskFederationOptions,
+const federation = createFederation<void>({
+ ...makeTaskFederationOptions(),
queue: { task: queue },
});📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const taskFederationOptions = { | |
| kv: new MemoryKvStore(), | |
| documentLoaderFactory: () => mockDocumentLoader, | |
| contextLoaderFactory: () => mockDocumentLoader, | |
| manuallyStartQueue: true, | |
| }; | |
| const makeTaskFederationOptions = () => ({ | |
| kv: new MemoryKvStore(), | |
| documentLoaderFactory: () => mockDocumentLoader, | |
| contextLoaderFactory: () => mockDocumentLoader, | |
| manuallyStartQueue: true, | |
| }); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/fedify/src/federation/middleware.test.ts` around lines 10568 -
10573, The module-scoped taskFederationOptions object reuses a single
MemoryKvStore instance across all tests, causing potential state pollution and
test interdependencies. Move the taskFederationOptions definition from module
scope into each test function or a beforeEach hook, ensuring a fresh
MemoryKvStore instance is created for each test. Then update all test functions
that reference the module-scoped taskFederationOptions to use the local version
instead.
| resolveTaskQueue(taskName: string): MessageQueue | undefined { | ||
| const def = this.taskDefinitions.get(taskName); | ||
| const resolved = def?.queue ?? this.taskQueue; | ||
| if (resolved != null) return resolved; | ||
| return this.taskQueueResolution === "strict" ? undefined : this.outboxQueue; | ||
| } |
There was a problem hiding this comment.
Start the effective task queue for queue: "task" even when it is shared or fallback-routed.
resolveTaskQueue() can route task messages to outboxQueue, and callers can also configure queue.task as the same instance as inbox/outbox/fanout. But _startQueueInternal(..., "task") skips all standard-queue branches and then also skips taskQueue when it equals a standard queue, leaving manually started task deployments with no listener for the queue that actually receives task messages. Map the "task" selector to the resolved task queue instances and mark the corresponding started flag so shared/fallback queues are listened exactly once.
Also applies to: 984-1031
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/fedify/src/federation/middleware.ts` around lines 944 - 949, The
resolveTaskQueue method correctly identifies which queue to use for task
messages, but the _startQueueInternal method does not properly handle the "task"
selector when that queue is shared with standard queues or routed as a fallback.
Fix _startQueueInternal to map the "task" selector to its resolved task queue
instance by calling resolveTaskQueue("task"), then ensure that resolved queue is
started/listened to exactly once by tracking started queues appropriately. This
ensures that when queue.task is configured as the same instance as
inbox/outbox/fanout, or when task messages are fallback-routed to outboxQueue,
the actual queue receiving those messages has an active listener.
| if (options?.deduplicationKey != null) { | ||
| throw new TypeError( | ||
| "Cannot enqueue a batch with a deduplicationKey: the wrapped queue " + | ||
| "does not implement enqueueMany, so ParallelMessageQueue would " + | ||
| "have to fan out to individual enqueue() calls that cannot share " + | ||
| "one deduplicationKey atomically. Wrap a queue that implements " + | ||
| "enqueueMany instead.", | ||
| ); | ||
| } |
There was a problem hiding this comment.
Allow single-item deduplicated enqueueMany() fallback.
Line 454 currently throws whenever deduplicationKey is present, even for a one-message input. The atomicity limitation applies to multi-item fan-out; single-item can safely fall back to enqueue().
Proposed fix
async enqueueMany(
messages: readonly any[],
options?: MessageQueueEnqueueOptions,
): Promise<void> {
if (this.queue.enqueueMany == null) {
- if (options?.deduplicationKey != null) {
+ if (options?.deduplicationKey != null && messages.length > 1) {
throw new TypeError(
"Cannot enqueue a batch with a deduplicationKey: the wrapped queue " +
"does not implement enqueueMany, so ParallelMessageQueue would " +
"have to fan out to individual enqueue() calls that cannot share " +
"one deduplicationKey atomically. Wrap a queue that implements " +
"enqueueMany instead.",
);
}
+ if (messages.length === 1) {
+ await this.queue.enqueue(messages[0], options);
+ return;
+ }
const results = await Promise.allSettled(
messages.map((message) => this.queue.enqueue(message, options)),
);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (options?.deduplicationKey != null) { | |
| throw new TypeError( | |
| "Cannot enqueue a batch with a deduplicationKey: the wrapped queue " + | |
| "does not implement enqueueMany, so ParallelMessageQueue would " + | |
| "have to fan out to individual enqueue() calls that cannot share " + | |
| "one deduplicationKey atomically. Wrap a queue that implements " + | |
| "enqueueMany instead.", | |
| ); | |
| } | |
| if (options?.deduplicationKey != null && messages.length > 1) { | |
| throw new TypeError( | |
| "Cannot enqueue a batch with a deduplicationKey: the wrapped queue " + | |
| "does not implement enqueueMany, so ParallelMessageQueue would " + | |
| "have to fan out to individual enqueue() calls that cannot share " + | |
| "one deduplicationKey atomically. Wrap a queue that implements " + | |
| "enqueueMany instead.", | |
| ); | |
| } | |
| if (messages.length === 1) { | |
| await this.queue.enqueue(messages[0], options); | |
| return; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/fedify/src/federation/mq.ts` around lines 454 - 462, The error check
in the enqueueMany method around the deduplicationKey validation is too strict
and prevents single-item batches from using the deduplicated enqueue fallback.
Modify the condition to only throw the TypeError when both deduplicationKey is
present AND the batch contains multiple items. For single-item batches, allow
the code to proceed with the fallback to the wrapped queue's enqueue() method,
since atomicity is maintained for individual items. Check the length of the
messages array being enqueued (this will be available through the method
parameters) and only enforce the restriction when multiple items are being
batched.
| export default class TaskCodec { | ||
| constructor(readonly options: TaskCodecLoaders) {} | ||
|
|
||
| serialize = (data: unknown): Promise<string> => | ||
| stringifyAsync(data, { Vocab: this.#stringifyVocab }); | ||
|
|
||
| deserialize = (raw: string): Promise<unknown> => | ||
| this.#revive(new Map())(parse(raw, { Vocab: VocabHolder.from })); | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Add JSDoc for the remaining public TaskCodec APIs.
TaskCodec, serialize(), deserialize(), and validate() are public but undocumented. Please document behavior and thrown errors consistently with the rest of the public surface.
♻️ Suggested doc patch
-export default class TaskCodec {
+/** Encodes and decodes task payloads with schema validation. */
+export default class TaskCodec {
constructor(readonly options: TaskCodecLoaders) {}
+ /** Serializes task payload data into wire format. */
serialize = (data: unknown): Promise<string> =>
stringifyAsync(data, { Vocab: this.#stringifyVocab });
+ /** Deserializes wire-format task payload data. */
deserialize = (raw: string): Promise<unknown> =>
this.#revive(new Map())(parse(raw, { Vocab: VocabHolder.from }));
@@
- static validate = async <S extends StandardSchemaV1>(
+ /** Validates task payload data against a Standard Schema. */
+ static validate = async <S extends StandardSchemaV1>(
schema: S,
data: unknown,
): Promise<StandardSchemaV1.InferOutput<S>> =>
getValueIfSchema(await schema["~standard"].validate(data));As per coding guidelines, "Include JSDoc comments for all public APIs."
Also applies to: 29-33
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/fedify/src/federation/tasks/codec.ts` around lines 7 - 15, The
TaskCodec class and its public methods (serialize, deserialize, and validate)
lack JSDoc documentation. Add comprehensive JSDoc comments for the TaskCodec
class constructor describing its purpose and the options parameter, the
serialize method explaining that it converts data to a JSON string and
documenting any potential errors, the deserialize method explaining that it
parses a JSON string back into data and documenting any parsing errors, and the
validate method documenting its validation behavior and error conditions. Each
JSDoc should follow the established pattern in the codebase by including
descriptions of parameters, return types, and any errors that might be thrown.
Source: Coding guidelines
| test("startQueue() task worker", async (t) => { | ||
| await t.step('starts only the task worker for queue: "task"', async () => { | ||
| const inbox = new MockQueue(); | ||
| const outbox = new MockQueue(); | ||
| const fanout = new MockQueue(); | ||
| const taskQueue = new MockQueue(); | ||
| const federation = createFederation<void>({ | ||
| ...baseOptions, | ||
| queue: { inbox, outbox, fanout, task: taskQueue }, | ||
| }); | ||
| const controller = new AbortController(); | ||
| const listening = federation.startQueue(undefined, { | ||
| signal: controller.signal, | ||
| queue: "task", | ||
| }); | ||
| strictEqual(taskQueue.listenCount, 1); | ||
| strictEqual(inbox.listenCount, 0); | ||
| strictEqual(outbox.listenCount, 0); | ||
| strictEqual(fanout.listenCount, 0); | ||
| controller.abort(); | ||
| await listening; | ||
| }); |
There was a problem hiding this comment.
Add coverage for queue: "task" when the effective task queue is fallback/shared.
The current startQueue tests cover a distinct task queue and a shared queue only when starting all queues. Please add regression coverage for startQueue({ queue: "task" }) when tasks resolve to the outbox fallback and when queue.task shares an inbox/outbox/fanout instance, so the worker-start selection bug cannot regress.
Also applies to: 340-353
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/fedify/src/federation/tasks/tasks.test.ts` around lines 302 - 323,
The startQueue() task worker test currently only covers the scenario with a
distinct task queue, but lacks coverage for when the effective task queue falls
back to shared instances. Add additional test steps within the same test
function to cover the cases where queue.task is undefined (falling back to the
outbox queue) and where queue.task shares an instance with inbox, outbox, or
fanout. For each fallback scenario, verify that only the appropriate fallback
queue (like outbox when task is undefined) has its listenCount incremented to 1
while the other queues remain at 0, using the same assertion pattern as the
existing test step but with the appropriate queue configuration.
| listen( | ||
| _handler: (message: TaskMessage) => Promise<void> | void, | ||
| options?: MessageQueueListenOptions, | ||
| ): Promise<void> { | ||
| this.listenCount++; | ||
| return new Promise((resolve) => { | ||
| options?.signal?.addEventListener("abort", () => resolve()); | ||
| }); |
There was a problem hiding this comment.
Handle pre-aborted signals in MockQueue.listen.
If the signal is already aborted before registration, the Promise never resolves. That can hang queue listener teardown in tests.
Suggested fix
listen(
_handler: (message: TaskMessage) => Promise<void> | void,
options?: MessageQueueListenOptions,
): Promise<void> {
this.listenCount++;
+ if (options?.signal?.aborted) return Promise.resolve();
return new Promise((resolve) => {
- options?.signal?.addEventListener("abort", () => resolve());
+ options?.signal?.addEventListener("abort", () => resolve(), {
+ once: true,
+ });
});
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/fedify/src/testing/tasks.ts` around lines 127 - 134, The
MockQueue.listen method does not handle pre-aborted signals. If the signal
passed via options is already aborted before the event listener is registered,
the abort event will never fire and the Promise will never resolve, causing
tests to hang. Check if options?.signal?.aborted is already true and resolve the
Promise immediately in that case. Only register the abort event listener when
the signal is not already aborted.
| test("MockContext.enqueueTask rejects a payload the schema refuses", async () => { | ||
| const federation = createFederation<void>(); | ||
| let called = 0; | ||
| const task = federation.defineTask("count", { | ||
| schema: numberSchema, | ||
| handler: () => { | ||
| called++; | ||
| }, | ||
| }); | ||
| const context = federation.createContext( | ||
| new URL("https://example.com"), | ||
| undefined, | ||
| ); | ||
| await assertRejects( | ||
| () => context.enqueueTask(task, "not a number" as unknown as number), | ||
| TypeError, | ||
| "Task data failed schema validation", | ||
| ); | ||
| assertEquals(called, 0); // production fails fast at enqueue; so must the mock | ||
| }); | ||
|
|
||
| test("MockContext.enqueueTask passes the schema's validated output to the handler", async () => { | ||
| const federation = createFederation<void>(); | ||
| // A coercing schema: uppercases the string. Input and output share the | ||
| // same type, but the validated value differs from the raw input. | ||
| const upper: StandardSchemaV1<string, string> = { | ||
| "~standard": { | ||
| version: 1, | ||
| vendor: "fedify-test", | ||
| validate: (value: unknown) => | ||
| typeof value === "string" | ||
| ? { value: value.toUpperCase() } | ||
| : { issues: [{ message: "Expected a string." }] }, | ||
| }, | ||
| }; | ||
| let received = "UNSET"; | ||
| const task = federation.defineTask("shout", { | ||
| schema: upper, | ||
| handler: (_ctx, data) => { | ||
| received = data; | ||
| }, | ||
| }); | ||
| const context = federation.createContext( | ||
| new URL("https://example.com"), | ||
| undefined, | ||
| ); | ||
| await context.enqueueTask(task, "hi"); | ||
| // The handler observes the validated output, not the raw "hi". | ||
| assertEquals(received, "HI"); | ||
| }); | ||
|
|
||
| test("MockContext.enqueueTaskMany validates the whole batch before any handler runs", async () => { | ||
| const federation = createFederation<void>(); | ||
| const seen: number[] = []; | ||
| const task = federation.defineTask("count-many", { | ||
| schema: numberSchema, | ||
| handler: (_ctx, data) => { | ||
| seen.push(data); | ||
| }, | ||
| }); | ||
| const context = federation.createContext( | ||
| new URL("https://example.com"), | ||
| undefined, | ||
| ); | ||
| // The second item is invalid: production validates every payload before | ||
| // enqueuing anything, so the whole batch rejects with no effect. The | ||
| // mock must not let the first handler run before the batch is vetted. | ||
| await assertRejects( | ||
| () => context.enqueueTaskMany(task, [1, "two" as unknown as number]), | ||
| TypeError, | ||
| "Task data failed schema validation", | ||
| ); | ||
| assertEquals(seen, []); | ||
| }); | ||
|
|
||
| test("MockContext.enqueueTask rejects a handle from another federation", async () => { | ||
| const federation = createFederation<void>(); | ||
| const other = createFederation<void>(); | ||
| let called = 0; | ||
| federation.defineTask("shared-name", { | ||
| schema: numberSchema, | ||
| handler: () => { | ||
| called++; | ||
| }, | ||
| }); | ||
| // Same task name on another federation: production compares the registered | ||
| // handle by identity and rejects the foreign one, so a name-only lookup in | ||
| // the mock would let tests pass with a handle the real federation refuses. | ||
| const foreign = other.defineTask("shared-name", { | ||
| schema: numberSchema, | ||
| handler: () => {}, | ||
| }); | ||
| const context = federation.createContext( | ||
| new URL("https://example.com"), | ||
| undefined, | ||
| ); | ||
| await assertRejects( | ||
| () => context.enqueueTask(foreign, 1), | ||
| TypeError, | ||
| "is not defined on this federation", | ||
| ); | ||
| assertEquals(called, 0); | ||
| }); |
There was a problem hiding this comment.
Use the required test harness for new packages/testing tests.
These newly added tests should use node:test and node:assert/strict for this package, instead of extending the @fedify/fixture/@std/assert pattern.
As per coding guidelines, **/*.test.ts must use node:test and node:assert/strict except for @fedify/fedify and @fedify/vocab; based on learnings, this repo expects new-package tests to use node:test.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/testing/src/mock.test.ts` around lines 1728 - 1830, The tests in
mock.test.ts are using the old Deno-style test harness (`@std/assert`) instead of
the required Node.js test harness for the packages/testing package. Replace the
import statements to use `import { test } from "node:test"` and `import assert
from "node:assert/strict"`, then update all assertion calls throughout the
tests: replace each `assertEquals()` call with `assert.equal()` or
`assert.strictEqual()`, replace each `assertRejects()` call with
`assert.rejects()`, and ensure the test function signatures and assertion call
syntax conform to the Node.js assert/strict API.
Sources: Coding guidelines, Learnings
The array reviver in TaskCodec restored elements with `arr.push(...await Array.fromAsync(node, revive))`. Spreading the revived elements into a single call hits the engine's argument-count limit, so a large enough array throws `RangeError: Maximum call stack size exceeded` during decode. Since the worker drops decode failures without retry, an otherwise-valid payload that enqueued fine is silently lost on the dequeue side. Replace the spread with a per-item loop append, matching the existing Map and Set revivers, so revival no longer depends on the array length. Add a regression test that round-trips a 200,000-element array. fedify-dev#803 (comment) Assisted-by: Claude Code:claude-opus-4-8
Three documentation points raised on the task API review, plus a
regression test backing the Temporal claim:
- The payload codec round-trips devalue's built-in Temporal types
with no extra code, but the supported-payload list omitted them.
List `Temporal` (with `Temporal.Instant` / `Temporal.Duration`
examples) and add a serialize/deserialize round-trip test so the
documented support stays covered.
- The vocab import example used the compatibility path
`@fedify/fedify/vocab`. Switch it to `@fedify/vocab`, matching the
surrounding docs and the current package boundary so copied code
does not bind to a path slated for removal.
- Task payloads now cross durable queue storage and can hold arbitrary
application data. Add a trust-boundary security note to the queue
isolation section: treat the backend and payloads as internal
trusted storage, pass identifiers the worker resolves rather than
long-lived secrets, and use a dedicated task queue with
`taskQueueResolution: "strict"` when isolation is required.
fedify-dev#803 (comment)
fedify-dev#803 (comment)
fedify-dev#803 (comment)
Assisted-by: Claude Code:claude-opus-4-8
Context.enqueueTask() and enqueueTaskMany() now accept a
deduplicationKey requesting at-most-once enqueue for tasks that share
it (new TaskEnqueueOptions.deduplicationKey).
Resolution follows the queue and key-value store capabilities:
- A queue declaring the new MessageQueue.nativeDeduplication owns the
check; the key is forwarded through the new
MessageQueueEnqueueOptions.deduplicationKey.
- Otherwise Fedify applies a best-effort guard through the optional
KvStore.cas primitive under a new taskDeduplication key prefix,
tunable with the new FederationOptions.taskDeduplicationTtl and
taskDeduplicationFallback options.
For enqueueTaskMany(), a single key governs the whole batch. A native
queue that does not implement enqueueMany() cannot express batch-level
at-most-once with a per-message key, so such a multi-item enqueue is
rejected with a TypeError instead of silently leaking duplicates.
Configuration errors that are decidable without a payload (a native
queue lacking enqueueMany, or a closed fallback without cas) are
checked before payloads are validated and encoded, so they reject
before any user schema runs or any key is reserved.
fedify-dev#798
Assisted-by: Claude Code:claude-opus-4-8
The #enqueueTasks and #encodeTaskMessage methods made ContextImpl oversized, so move the handle validation, deduplication planning, payload encoding, and queue dispatch into a new tasks/enqueue.ts module. ContextImpl now delegates to enqueueTasks(), passing only the small slice of itself (federation, codec, origin, data) the pipeline needs. Pull the shared task-test helpers (the schema factory, stock schemas, base federation options, and the recording MockQueue) into a new testing/mq-tasks.ts module, and split the enqueue-specific cases out of tasks.test.ts into enqueue.test.ts. Teach the fixture-usage check to expand glob patterns in its allowlist so the whole testing/ directory is covered by a single entry instead of one path per file. Assisted-by: Claude Code:claude-opus-4-8
Two branches both touched the task testing utilities and diverged: one split MockQueue and the shared schemas/options out into mq-tasks.ts, while the other kept evolving them in tasks.ts. After rebasing the common edits, consolidate everything back into a single tasks.ts and drop the now-redundant mq-tasks.ts. Assisted-by: Claude Code:claude-opus-4-8
The key-value deduplication path reserved a marker before dispatching to the queue but never undid it when the dispatch failed. A transient backend failure therefore left the marker behind, so the retry was silently deduplicated against a task that had never reached the queue. The cas claim now stores a unique token instead of a bare `true`, and a failed dispatch conditionally clears it (cas succeeds only while the stored value is still our token). The conditional clear keeps a stale rollback from deleting a marker that another concurrent enqueue has already re-claimed. A rollback that itself fails is logged and swallowed so the original enqueue error still reaches the caller. The enqueueMany requirement for deduplicated multi-item batches now keys on whether deduplication is actually applied—a native queue or the cas fallback—rather than on nativeDeduplication alone. Under the "open" fallback (no native dedup, no cas) no marker is taken, so the batch fans out without deduplication instead of throwing. ParallelMessageQueue likewise rejects a deduplicated batch when the wrapped queue lacks enqueueMany, since fanning out cannot carry one key atomically. fedify-dev#798 Assisted-by: Claude Code:claude-opus-4-8
Resolves #798, the second sub-issue of #206.
Important
This branch is stacked on #803 (#797, the first sub-issue), which is not yet merged. Until #803 lands, this PR's range includes #803's commits too, so the diff shown here is the core
defineTask/enqueueTaskAPI plus this sub-issue's deduplication work. Please merge #803 first; once it lands onfeat/custom-worker, rebasing this branch drops the overlap and leaves this sub-issue's deduplication changes alone.Summary
Background tasks frequently need at-most-once-per-key enqueue semantics: a digest mailer must not send twice when a request is retried, and a cleanup job should coalesce duplicate triggers. This sub-issue adds an opt-in
deduplicationKeyto the task enqueue path, mirroring the native-capability flag pattern thatnativeRetrial(#250) established—a backend that deduplicates natively owns the check; otherwise Fedify provides a best-effort key–value fallback with the race-condition tradeoff documented explicitly.It is kept separate from the core API (#797 / #803) so the first PR stays small and the deduplication semantics—including the documented best-effort limitation—get their own reviewable boundary.
Public API
MQ-layer primitives
These are message-queue-layer primitives, not task-layer concepts, so they survive a future
Workerextraction unchanged and are reusable by any enqueue path:InProcessMessageQueuedeclaresnativeDeduplication = false;ParallelMessageQueueinherits the wrapped queue's flag and rejects a batch carrying adeduplicationKeywhen the wrapped queue has noenqueueMany(it would otherwise fan out to singleenqueue()calls that cannot share one key atomically).Task-API surface
A new
taskDeduplicationkey–value prefix (default["_fedify", "taskDeduplication"]) holds fallback markers, kept separate fromactivityIdempotence.Resolution path
The enqueue pipeline was extracted out of
ContextImplinto a dedicatedtasks/enqueue.tsmodule (handle validation, deduplication planning, payload encoding, and queue dispatch live in one place). Deduplication is decided once, before any payload is encoded:nativeDeduplication: true, Fedify forwardsdeduplicationKeyinMessageQueueEnqueueOptionsand the backend owns the check. The key–value store is never touched.KvStoreexposes the optional compare-and-swap (KvStore.cas) primitive, Fedify claims the key under thetaskDeduplicationprefix withtaskDeduplicationTtl. A present marker skips the enqueue; a won claim proceeds. If the subsequent dispatch fails, the marker is rolled back (via a conditionalcasclear) so a transient failure does not suppress the retry.nativeDeduplication, and aKvStorewithoutcas), behavior followstaskDeduplicationFallback:"open"(default) proceeds without deduplication after a debug-level log;"closed"throws aTypeErrorbefore enqueuing.Among the first-party adapters, the in-memory, Deno KV, SQLite, and MySQL key–value stores implement
cas; PostgreSQL and Redis do not yet, so those deployments take the fallback branch until per-adapter follow-ups add it.Batch semantics
For
enqueueTaskMany, a singlededuplicationKeyapplies to the whole batch—it enqueues as a unit or is skipped as a unit, never partially. Per-item deduplication means callingenqueueTaskin a loop, each with its own key. When deduplication is actually applied (a native queue, or aKvStorewithcas), a multi-item batch with adeduplicationKeyon a queue withoutenqueueManyis rejected rather than risking duplicates, since fanning the key across separateenqueue()calls cannot enqueue the batch as one unit. Under the"open"fallback no marker is taken, so the batch simply fans out.Documented limitation
The key–value fallback is best-effort, not transactional: the marker write and the enqueue are separate operations. Fedify rolls the marker back on enqueue failure, but a crash before that rollback, the
"open"fallback under concurrency, a non-atomic third-partycas, or key reuse within the TTL window can still admit a duplicate or suppress a task. Cleanup is by TTL expiry, not active deletion on handler success (active cleanup introduces a success→crash-before-delete window; deferred). This is stated in the public JSDoc fordeduplicationKeyand in a warning callout in docs/manual/tasks.md. Deployments needing strict guarantees use a queue withnativeDeduplication: true.Out of scope
nativeDeduplication: true/casto the remaining first-party adapters (PostgreSQL, Redis) — tracked as per-adapter follow-ups; this sub-issue ships the core flag plus the key–value fallback.Acceptance criteria
deduplicationKeyon anativeDeduplication: truequeue is forwarded; Fedify does not write to the key–value store.deduplicationKeyon a default queue: a second enqueue inside the TTL is skipped; re-enqueue after TTL expiry succeeds.taskDeduplicationFallback: "closed"throws synchronously when no conditional write is available;"open"proceeds with a debug log.taskDeduplicationkey–value prefix does not collide withactivityIdempotence.enqueueTaskManyapplies one batch-leveldeduplicationKey.Tests
tasks/enqueue.test.tscovers the three resolution paths, TTL skip/expiry, the"open"/"closed"fallback, the prefix isolation, batch-level dedup, theenqueueMany-required rejection, and marker rollback on dispatch failure.mq.test.tscovers the newnativeDeduplicationflag and theParallelMessageQueuebatch rejection.@fedify/testing) honorsdeduplicationKeyso applications can assert dedup behavior in their own tests.@fedify/fixturetest()and pass under Deno, Node.js, and Bun.Notes for reviewers
feat/custom-workerand this is stacked on the unmerged Add custom background task API (defineTask/enqueueTask) #803; see the callout at the top.KvStore.casis a pre-existing primitive; this PR consumes it rather than introducing it.AI disclosure
Assisted-by: Claude Code:claude-opus-4-8
Assisted-by: Codex:gpt-5.5
Codex was used only in review.