From ec725749fdae82bcb65bd65b1e104936d691db03 Mon Sep 17 00:00:00 2001 From: semimikoh Date: Tue, 26 May 2026 15:01:52 +0900 Subject: [PATCH] buffer: fix Blob.stream() leaking source buffer Blob.prototype.stream() registered a wakeup callback on the underlying source's start() and never released it. The strong Reader::wakeup_ handle kept the reader -- and through it the blob's DataQueue and backing store -- reachable as a GC root, so the source buffer leaked on every stream() call. On Node 26+, streaming a 1 MiB blob 300 times retained ~300 MiB in process.memoryUsage().arrayBuffers while the V8 heap stayed small. Register the wakeup lazily in pull() and clear it on every terminal or idle path (EOS, error, cancel, backpressure), mirroring the cleanup already done by the async iterator path. The strong handle now only lives while a pull is in flight, so the reader and its backing store become collectable once the stream finishes, errors, is cancelled, or goes idle under backpressure. Fixes: https://github.com/nodejs/node/issues/63574 Signed-off-by: semimikoh --- lib/internal/blob.js | 21 +++++++++-- test/parallel/test-blob-stream-gc.js | 56 ++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 4 deletions(-) create mode 100644 test/parallel/test-blob-stream-gc.js diff --git a/lib/internal/blob.js b/lib/internal/blob.js index f16e520e197cdd..e852f1fc9bcbf5 100644 --- a/lib/internal/blob.js +++ b/lib/internal/blob.js @@ -453,30 +453,40 @@ function createBlobReaderStream(reader) { // There really should only be one read at a time so using an // array here is purely defensive. this.pendingPulls = []; - // Register a wakeup callback that the C++ side can invoke + // Lazily register a wakeup callback that the C++ side can invoke // when new data is available after a STATUS_BLOCK. - reader.setWakeup(() => { + this.wakeup = () => { if (this.pendingPulls.length > 0) { this.readNext(c); } - }); + }; }, pull(c) { const { promise, resolve, reject } = PromiseWithResolvers(); + if (this.pendingPulls.length === 0) { + reader.setWakeup(this.wakeup); + } this.pendingPulls.push({ resolve, reject }); this.readNext(c); return promise; }, + clearWakeupIfIdle() { + if (this.pendingPulls.length === 0) { + reader.setWakeup(undefined); + } + }, readNext(c) { reader.pull((status, buffer) => { // If pendingPulls is empty here, the stream had to have // been canceled, and we don't really care about the result. // We can simply exit. if (this.pendingPulls.length === 0) { + reader.setWakeup(undefined); return; } if (status === 0) { // EOS + reader.setWakeup(undefined); c.close(); // This is to signal the end for byob readers // see https://streams.spec.whatwg.org/#example-rbs-pull @@ -488,6 +498,7 @@ function createBlobReaderStream(reader) { // The read could fail for many different reasons when reading // from a non-memory resident blob part (e.g. file-backed blob). // The error details the system error code. + reader.setWakeup(undefined); const error = lazyDOMException('The blob could not be read', 'NotReadableError'); @@ -497,7 +508,7 @@ function createBlobReaderStream(reader) { return; } else if (status === 2) { // STATUS_BLOCK: No data available yet. The wakeup callback - // registered in start() will re-invoke readNext when data + // registered in pull() will re-invoke readNext when data // arrives. return; } @@ -517,6 +528,7 @@ function createBlobReaderStream(reader) { if (this.pendingPulls.length !== 0) { const pending = this.pendingPulls.shift(); pending.resolve(); + this.clearWakeupIfIdle(); } return; } @@ -525,6 +537,7 @@ function createBlobReaderStream(reader) { }); }, cancel(reason) { + reader.setWakeup(undefined); // Reject any currently pending pulls here. for (const pending of this.pendingPulls) { pending.reject(reason); diff --git a/test/parallel/test-blob-stream-gc.js b/test/parallel/test-blob-stream-gc.js new file mode 100644 index 00000000000000..24b2f15efc1265 --- /dev/null +++ b/test/parallel/test-blob-stream-gc.js @@ -0,0 +1,56 @@ +// Flags: --expose-gc --no-concurrent-array-buffer-sweeping +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { setImmediate: setImmediatePromise } = require('timers/promises'); + +const MiB = 1024 * 1024; +const iterations = 64; +const maxRetained = 16 * MiB; + +async function collectArrayBuffers() { + for (let i = 0; i < 3; i++) { + global.gc(); + await setImmediatePromise(); + } +} + +async function assertNoBlobStreamRetention(name, fn) { + const buffer = Buffer.alloc(MiB); + + await collectArrayBuffers(); + const before = process.memoryUsage().arrayBuffers; + + for (let i = 0; i < iterations; i++) { + await fn(buffer); + } + + await collectArrayBuffers(); + const retained = process.memoryUsage().arrayBuffers - before; + + assert( + retained < maxRetained, + `${name} retained ${retained} bytes in arrayBuffers`, + ); +} + +(async () => { + await assertNoBlobStreamRetention('unused Blob streams', + common.mustCall(async (buffer) => { + new Blob([buffer]).stream(); + }, iterations)); + + await assertNoBlobStreamRetention('cancelled Blob streams', + common.mustCall(async (buffer) => { + await new Blob([buffer]).stream() + .cancel(); + }, iterations)); + + await assertNoBlobStreamRetention('drained Blob streams', + common.mustCall(async (buffer) => { + await new Response( + new Blob([buffer]).stream(), + ).arrayBuffer(); + }, iterations)); +})().then(common.mustCall());