diff --git a/lib/web/fetch/body.js b/lib/web/fetch/body.js index 0e33d2c9057..26cce5f3e0c 100644 --- a/lib/web/fetch/body.js +++ b/lib/web/fetch/body.js @@ -403,14 +403,14 @@ function mixinBody (prototype) { async function consumeBody (object, convertBytesToJSValue, instance) { webidl.brandCheck(object, instance) - throwIfAborted(object[kState]) - // 1. If object is unusable, then return a promise rejected // with a TypeError. if (bodyUnusable(object[kState].body)) { throw new TypeError('Body is unusable') } + throwIfAborted(object[kState]) + // 2. Let promise be a new promise. const promise = createDeferredPromise() diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index 30f457df395..76acc3d865b 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -58,7 +58,7 @@ const { subresourceSet } = require('./constants') const EE = require('node:events') -const { Readable, pipeline } = require('node:stream') +const { Readable, pipeline, finished } = require('node:stream') const { addAbortListener, isErrored, isReadable, nodeMajor, nodeMinor, bufferToLowerCasedHeaderName } = require('../../core/util') const { dataURLProcessor, serializeAMimeType, minimizeSupportedMimeType } = require('./data-url') const { getGlobalDispatcher } = require('../../global') @@ -1082,39 +1082,17 @@ function fetchFinale (fetchParams, response) { } else { // mcollina: all the following steps of the specs are skipped. // The internal transform stream is not needed. - // + // See https://github.com/nodejs/undici/pull/3093#issuecomment-2050198541 + // 1. Let transformStream be a new TransformStream. // 2. Let identityTransformAlgorithm be an algorithm which, given chunk, enqueues chunk in transformStream. // 3. Set up transformStream with transformAlgorithm set to identityTransformAlgorithm and flushAlgorithm // set to processResponseEndOfBody. // 4. Set internalResponse’s body’s stream to the result of internalResponse’s body’s stream piped through transformStream. - const byteStream = new ReadableStream({ - readableStream: internalResponse.body.stream, - async start () { - this._bodyReader = this.readableStream.getReader() - }, - async pull (controller) { - while (controller.desiredSize >= 0) { - const { done, value } = await this._bodyReader.read() - - if (done) { - queueMicrotask(() => { - readableStreamClose(controller) - - // Added to in lieu of steps 1-4 of the spec - processResponseEndOfBody() - }) - break - } - - controller.enqueue(value) - } - }, - type: 'bytes' + finished(internalResponse.body.stream, () => { + processResponseEndOfBody() }) - - internalResponse.body.stream = byteStream } }