diff --git a/lib/fetch/body.js b/lib/fetch/body.js index 96bdaafc2a9..c291afa9368 100644 --- a/lib/fetch/body.js +++ b/lib/fetch/body.js @@ -2,7 +2,14 @@ const Busboy = require('busboy') const util = require('../core/util') -const { ReadableStreamFrom, isBlobLike, isReadableStreamLike, readableStreamClose } = require('./util') +const { + ReadableStreamFrom, + isBlobLike, + isReadableStreamLike, + readableStreamClose, + createDeferredPromise, + fullyReadBody +} = require('./util') const { FormData } = require('./formdata') const { kState } = require('./symbols') const { webidl } = require('./webidl') @@ -13,7 +20,6 @@ const assert = require('assert') const { isErrored } = require('../core/util') const { isUint8Array, isArrayBuffer } = require('util/types') const { File: UndiciFile } = require('./file') -const { StringDecoder } = require('string_decoder') const { parseMIMEType, serializeAMimeType } = require('./dataURL') let ReadableStream = globalThis.ReadableStream @@ -313,26 +319,45 @@ function bodyMixinMethods (instance) { const methods = { blob () { // The blob() method steps are to return the result of - // running consume body with this and Blob. - return specConsumeBody(this, 'Blob', instance) + // running consume body with this and the following step + // given a byte sequence bytes: return a Blob whose + // contents are bytes and whose type attribute is this’s + // MIME type. + return specConsumeBody(this, (bytes) => { + let mimeType = bodyMimeType(this) + + if (mimeType === 'failure') { + mimeType = '' + } else if (mimeType) { + mimeType = serializeAMimeType(mimeType) + } + + // Return a Blob whose contents are bytes and type attribute + // is mimeType. + return new Blob([bytes], { type: mimeType }) + }, instance) }, arrayBuffer () { - // The arrayBuffer() method steps are to return the - // result of running consume body with this and ArrayBuffer. - return specConsumeBody(this, 'ArrayBuffer', instance) + // The arrayBuffer() method steps are to return the result + // of running consume body with this and the following step + // given a byte sequence bytes: return a new ArrayBuffer + // whose contents are bytes. + return specConsumeBody(this, (bytes) => { + return new Uint8Array(bytes).buffer + }, instance) }, text () { - // The text() method steps are to return the result of - // running consume body with this and text. - return specConsumeBody(this, 'text', instance) + // The text() method steps are to return the result of running + // consume body with this and UTF-8 decode. + return specConsumeBody(this, utf8DecodeBytes, instance) }, json () { - // The json() method steps are to return the result of - // running consume body with this and JSON. - return specConsumeBody(this, 'JSON', instance) + // The json() method steps are to return the result of running + // consume body with this and parse JSON from bytes. + return specConsumeBody(this, parseJSONFromBytes, instance) }, async formData () { @@ -455,8 +480,13 @@ function mixinBody (prototype) { Object.assign(prototype.prototype, bodyMixinMethods(prototype)) } -// https://fetch.spec.whatwg.org/#concept-body-consume-body -async function specConsumeBody (object, type, instance) { +/** + * @see https://fetch.spec.whatwg.org/#concept-body-consume-body + * @param {Response|Request} object + * @param {(value: unknown) => unknown} convertBytesToJSValue + * @param {Response|Request} instance + */ +async function specConsumeBody (object, convertBytesToJSValue, instance) { webidl.brandCheck(object, instance) throwIfAborted(object[kState]) @@ -467,71 +497,37 @@ async function specConsumeBody (object, type, instance) { throw new TypeError('Body is unusable') } - // 2. Let promise be a promise resolved with an empty byte - // sequence. - let promise - - // 3. If object’s body is non-null, then set promise to the - // result of fully reading body as promise given object’s - // body. - if (object[kState].body != null) { - promise = await fullyReadBodyAsPromise(object[kState].body) - } else { - // step #2 - promise = { size: 0, bytes: [new Uint8Array()] } + // 2. Let promise be a new promise. + const promise = createDeferredPromise() + + // 3. Let errorSteps given error be to reject promise with error. + const errorSteps = (error) => promise.reject(error) + + // 4. Let successSteps given a byte sequence data be to resolve + // promise with the result of running convertBytesToJSValue + // with data. If that threw an exception, then run errorSteps + // with that exception. + const successSteps = (data) => { + try { + promise.resolve(convertBytesToJSValue(data)) + } catch (e) { + errorSteps(e) + } } - // 4. Let steps be to return the result of package data with - // the first argument given, type, and object’s MIME type. - const mimeType = type === 'Blob' || type === 'FormData' - ? bodyMimeType(object) - : undefined - - // 5. Return the result of upon fulfillment of promise given - // steps. - return packageData(promise, type, mimeType) -} - -/** - * @see https://fetch.spec.whatwg.org/#concept-body-package-data - * @param {{ size: number, bytes: Uint8Array[] }} bytes - * @param {string} type - * @param {ReturnType|undefined} mimeType - */ -function packageData ({ bytes, size }, type, mimeType) { - switch (type) { - case 'ArrayBuffer': { - // Return a new ArrayBuffer whose contents are bytes. - const uint8 = new Uint8Array(size) - let offset = 0 - - for (const chunk of bytes) { - uint8.set(chunk, offset) - offset += chunk.byteLength - } + // 5. If object’s body is null, then run successSteps with an + // empty byte sequence. + if (object[kState].body == null) { + successSteps(new Uint8Array()) + return promise.promise + } - return uint8.buffer - } - case 'Blob': { - if (mimeType === 'failure') { - mimeType = '' - } else if (mimeType) { - mimeType = serializeAMimeType(mimeType) - } + // 6. Otherwise, fully read object’s body given successSteps, + // errorSteps, and object’s relevant global object. + fullyReadBody(object[kState].body, successSteps, errorSteps) - // Return a Blob whose contents are bytes and type attribute - // is mimeType. - return new Blob(bytes, { type: mimeType }) - } - case 'JSON': { - // Return the result of running parse JSON from bytes on bytes. - return JSON.parse(utf8DecodeBytes(bytes)) - } - case 'text': { - // 1. Return the result of running UTF-8 decode on bytes. - return utf8DecodeBytes(bytes) - } - } + // 7. Return promise. + return promise.promise } // https://fetch.spec.whatwg.org/#body-unusable @@ -542,73 +538,40 @@ function bodyUnusable (body) { return body != null && (body.stream.locked || util.isDisturbed(body.stream)) } -// https://fetch.spec.whatwg.org/#fully-reading-body-as-promise -async function fullyReadBodyAsPromise (body) { - // 1. Let reader be the result of getting a reader for body’s - // stream. If that threw an exception, then return a promise - // rejected with that exception. - const reader = body.stream.getReader() - - // 2. Return the result of reading all bytes from reader. - /** @type {Uint8Array[]} */ - const bytes = [] - let size = 0 - - while (true) { - const { done, value } = await reader.read() - - if (done) { - break - } - - // https://streams.spec.whatwg.org/#read-loop - // If chunk is not a Uint8Array object, reject promise with - // a TypeError and abort these steps. - if (!isUint8Array(value)) { - throw new TypeError('Value is not a Uint8Array.') - } - - bytes.push(value) - size += value.byteLength - } - - return { size, bytes } -} - /** * @see https://encoding.spec.whatwg.org/#utf-8-decode - * @param {Uint8Array[]} ioQueue + * @param {Buffer} buffer */ -function utf8DecodeBytes (ioQueue) { - if (ioQueue.length === 0) { +function utf8DecodeBytes (buffer) { + if (buffer.length === 0) { return '' } - // 1. Let buffer be the result of peeking three bytes - // from ioQueue, converted to a byte sequence. - const buffer = ioQueue[0] + // 1. Let buffer be the result of peeking three bytes from + // ioQueue, converted to a byte sequence. // 2. If buffer is 0xEF 0xBB 0xBF, then read three // bytes from ioQueue. (Do nothing with those bytes.) if (buffer[0] === 0xEF && buffer[1] === 0xBB && buffer[2] === 0xBF) { - ioQueue[0] = ioQueue[0].subarray(3) + buffer = buffer.subarray(3) } // 3. Process a queue with an instance of UTF-8’s // decoder, ioQueue, output, and "replacement". - const decoder = new StringDecoder('utf-8') - let output = '' - - for (const chunk of ioQueue) { - output += decoder.write(chunk) - } - - output += decoder.end() + const output = new TextDecoder().decode(buffer) // 4. Return output. return output } +/** + * @see https://infra.spec.whatwg.org/#parse-json-bytes-to-a-javascript-value + * @param {Uint8Array} bytes + */ +function parseJSONFromBytes (bytes) { + return JSON.parse(utf8DecodeBytes(bytes)) +} + /** * @see https://fetch.spec.whatwg.org/#concept-body-mime-type * @param {import('./response').Response|import('./request').Request} object diff --git a/lib/fetch/util.js b/lib/fetch/util.js index f2fd1088629..a0faed91354 100644 --- a/lib/fetch/util.js +++ b/lib/fetch/util.js @@ -805,48 +805,32 @@ function iteratorResult (pair, kind) { /** * @see https://fetch.spec.whatwg.org/#body-fully-read */ -async function fullyReadBody (body, processBody, processBodyError) { +function fullyReadBody (body, processBody, processBodyError) { // 1. If taskDestination is null, then set taskDestination to // the result of starting a new parallel queue. - // 2. Let promise be the result of fully reading body as promise - // given body. - try { - /** @type {Uint8Array[]} */ - const chunks = [] - let length = 0 - - const reader = body.stream.getReader() - - while (true) { - const { done, value } = await reader.read() + // 2. Let successSteps given a byte sequence bytes be to queue a + // fetch task to run processBody given bytes, with taskDestination. + const successSteps = (bytes) => queueMicrotask(() => processBody(bytes)) - if (done === true) { - break - } + // 3. Let errorSteps be to queue a fetch task to run processBodyError, + // with taskDestination. + const errorSteps = (error) => queueMicrotask(() => processBodyError(error)) - // read-loop chunk steps - assert(isUint8Array(value)) + // 4. Let reader be the result of getting a reader for body’s stream. + // If that threw an exception, then run errorSteps with that + // exception and return. + let reader - chunks.push(value) - length += value.byteLength - } - - // 3. Let fulfilledSteps given a byte sequence bytes be to queue - // a fetch task to run processBody given bytes, with - // taskDestination. - const fulfilledSteps = (bytes) => queueMicrotask(() => { - processBody(bytes) - }) - - fulfilledSteps(Buffer.concat(chunks, length)) - } catch (err) { - // 4. Let rejectedSteps be to queue a fetch task to run - // processBodyError, with taskDestination. - queueMicrotask(() => processBodyError(err)) + try { + reader = body.stream.getReader() + } catch (e) { + errorSteps(e) + return } - // 5. React to promise with fulfilledSteps and rejectedSteps. + // 5. Read all bytes from reader, given successSteps and errorSteps. + readAllBytes(reader, successSteps, errorSteps) } /** @type {ReadableStream} */ @@ -911,6 +895,50 @@ function isomorphicEncode (input) { return input } +/** + * @see https://streams.spec.whatwg.org/#readablestreamdefaultreader-read-all-bytes + * @see https://streams.spec.whatwg.org/#read-loop + * @param {ReadableStreamDefaultReader} reader + * @param {(bytes: Uint8Array) => void} successSteps + * @param {(error: Error) => void} failureSteps + */ +async function readAllBytes (reader, successSteps, failureSteps) { + const bytes = [] + let byteLength = 0 + + while (true) { + let done + let chunk + + try { + ({ done, value: chunk } = await reader.read()) + } catch (e) { + // 1. Call failureSteps with e. + failureSteps(e) + return + } + + if (done) { + // 1. Call successSteps with bytes. + successSteps(Buffer.concat(bytes, byteLength)) + return + } + + // 1. If chunk is not a Uint8Array object, call failureSteps + // with a TypeError and abort these steps. + if (!isUint8Array(chunk)) { + failureSteps(new TypeError('Received non-Uint8Array chunk')) + return + } + + // 2. Append the bytes represented by chunk to bytes. + bytes.push(chunk) + byteLength += chunk.length + + // 3. Read-loop given reader, bytes, successSteps, and failureSteps. + } +} + /** * Fetch supports node >= 16.8.0, but Object.hasOwn was added in v16.9.0. */