diff --git a/lib/internal/webstreams/adapters.js b/lib/internal/webstreams/adapters.js index 3e5fd69d4d6a03..ff825717f18f6a 100644 --- a/lib/internal/webstreams/adapters.js +++ b/lib/internal/webstreams/adapters.js @@ -27,6 +27,8 @@ const { CountQueuingStrategy, } = require('internal/webstreams/queuingstrategies'); +const { kFresh } = require('internal/webstreams/util'); + const { Writable, Readable, @@ -457,12 +459,13 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj streamReadable.on('data', onData); return new ReadableStream({ - start(c) { controller = c; }, + start(c) { controller = c; controller[kFresh] = true; console.log("start", typeof c); }, - pull() { streamReadable.resume(); }, + pull() { streamReadable.resume(); }, cancel(reason) { destroy(streamReadable, reason); + console.log("cancel") }, }, strategy); } diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index e7d115e98f12b9..d4d1fa190faf41 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -115,6 +115,7 @@ const { iteratorNext, kType, kState, + kFresh, } = require('internal/webstreams/util'); const { @@ -2286,6 +2287,10 @@ function readableStreamDefaultControllerGetDesiredSize(controller) { } function readableStreamDefaultControllerShouldCallPull(controller) { + if (controller[kFresh]) { + controller[kFresh] = false; + return false; + } const { stream, } = controller[kState]; diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index 1979c55667b167..56d61ba71238bb 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -53,6 +53,7 @@ const { validateFunction, } = require('internal/validators'); +const kFresh = Symbol('kFresh'); const kState = Symbol('kState'); const kType = Symbol('kType'); @@ -298,4 +299,5 @@ module.exports = { iteratorNext, kType, kState, + kFresh, }; diff --git a/t/46347.mjs b/t/46347.mjs new file mode 100644 index 00000000000000..9f60c47b08a904 --- /dev/null +++ b/t/46347.mjs @@ -0,0 +1,44 @@ +// since it's ESM, save it as .mjs + +import fs from 'node:fs' +import process from 'node:process' +import { Readable } from 'node:stream' + +// we initialize a stream, but not start consuming it +const randomNodeStream = fs.createReadStream('/dev/urandom') +// after 10 seconds, it'll get converted to web stream +let randomWebStream + +// we check memory usage every second +// since it's a stream, it shouldn't be higher than the chunk size +const reportMemoryUsage = () => { + const { arrayBuffers } = process.memoryUsage() + console.log( + `Array buffers memory usage is ${Math.round( + arrayBuffers / 1024 / 1024 + )} MiB` + ) + if (arrayBuffers > 256 * 1024 * 1024) { + // streaming should not lead to such a memory increase + // therefore, if it happens => bail + console.log('Over 256 MiB taken, exiting') + process.exit(0) + } +} +setInterval(reportMemoryUsage, 1000) + +// after 10 seconds we use Readable.toWeb +// memory usage should stay pretty much the same since it's still a stream +setTimeout(() => { + console.log('converting node stream to web stream') + randomWebStream = Readable.toWeb(randomNodeStream) +}, 5000) + +// after 15 seconds we start consuming the stream +// memory usage will grow, but the old chunks should be garbage-collected pretty quickly +setTimeout(async () => { + console.log('reading the chunks') + for await (const chunk of randomWebStream) { + // do nothing, just let the stream flow + } +}, 15000) \ No newline at end of file