diff --git a/lib/core/util.js b/lib/core/util.js index 47ad0485202..6c998580d96 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -635,5 +635,6 @@ module.exports = { nodeMajor, nodeMinor, nodeHasAutoSelectFamily: nodeMajor > 18 || (nodeMajor === 18 && nodeMinor >= 13), + nodeStreamFinishedSupportsWebStreams: nodeMajor > 18 || (nodeMajor === 18 && nodeMinor > 13), safeHTTPMethods: ['GET', 'HEAD', 'OPTIONS', 'TRACE'] } diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index 76acc3d865b..53814771780 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -59,7 +59,15 @@ const { } = require('./constants') const EE = require('node:events') const { Readable, pipeline, finished } = require('node:stream') -const { addAbortListener, isErrored, isReadable, nodeMajor, nodeMinor, bufferToLowerCasedHeaderName } = require('../../core/util') +const { + addAbortListener, + isErrored, + isReadable, + nodeMajor, + nodeMinor, + bufferToLowerCasedHeaderName, + nodeStreamFinishedSupportsWebStreams +} = require('../../core/util') const { dataURLProcessor, serializeAMimeType, minimizeSupportedMimeType } = require('./data-url') const { getGlobalDispatcher } = require('../../global') const { webidl } = require('./webidl') @@ -1090,9 +1098,39 @@ function fetchFinale (fetchParams, response) { // set to processResponseEndOfBody. // 4. Set internalResponse’s body’s stream to the result of internalResponse’s body’s stream piped through transformStream. - finished(internalResponse.body.stream, () => { - processResponseEndOfBody() - }) + // TODO(@KhafraDev): remove this once node v18 support is dropped + if (!nodeStreamFinishedSupportsWebStreams) { + const byteStream = new ReadableStream({ + readableStream: internalResponse.body.stream, + async start () { + this._bodyReader = this.readableStream.getReader() + }, + async pull (controller) { + while (controller.desiredSize >= 0) { + const { done, value } = await this._bodyReader.read() + + if (done) { + queueMicrotask(() => { + readableStreamClose(controller) + + // Added to in lieu of steps 1-4 of the spec + processResponseEndOfBody() + }) + break + } + + controller.enqueue(value) + } + }, + type: 'bytes' + }) + + internalResponse.body.stream = byteStream + } else { + finished(internalResponse.body.stream, () => { + processResponseEndOfBody() + }) + } } }