Skip to content

Commit

Permalink
simplify formData body parsing (#2735)
Browse files Browse the repository at this point in the history
* simplify formData body parsing

* perf: don't copy all headers

* fixup
  • Loading branch information
KhafraDev authored Feb 12, 2024
1 parent 1af7a96 commit 4e69afd
Showing 1 changed file with 19 additions and 44 deletions.
63 changes: 19 additions & 44 deletions lib/fetch/body.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ const { FormData } = require('./formdata')
const { kState } = require('./symbols')
const { webidl } = require('./webidl')
const { Blob, File: NativeFile } = require('node:buffer')
const { kBodyUsed } = require('../core/symbols')
const assert = require('node:assert')
const { isErrored } = require('../core/util')
const { isUint8Array, isArrayBuffer } = require('util/types')
const { isArrayBuffer } = require('util/types')
const { File: UndiciFile } = require('./file')
const { serializeAMimeType } = require('./dataURL')
const { Readable } = require('node:stream')

/** @type {globalThis['File']} */
const File = NativeFile ?? UndiciFile
Expand Down Expand Up @@ -291,29 +291,6 @@ function cloneBody (body) {
}
}

async function * consumeBody (body) {
if (body) {
if (isUint8Array(body)) {
yield body
} else {
const stream = body.stream

if (util.isDisturbed(stream)) {
throw new TypeError('The body has already been consumed.')
}

if (stream.locked) {
throw new TypeError('The stream is locked.')
}

// Compat.
stream[kBodyUsed] = true

yield * stream
}
}
}

function throwIfAborted (state) {
if (state.aborted) {
throw new DOMException('The operation was aborted.', 'AbortError')
Expand All @@ -328,7 +305,7 @@ function bodyMixinMethods (instance) {
// given a byte sequence bytes: return a Blob whose
// contents are bytes and whose type attribute is this’s
// MIME type.
return specConsumeBody(this, (bytes) => {
return consumeBody(this, (bytes) => {
let mimeType = bodyMimeType(this)

if (mimeType === null) {
Expand All @@ -348,21 +325,21 @@ function bodyMixinMethods (instance) {
// of running consume body with this and the following step
// given a byte sequence bytes: return a new ArrayBuffer
// whose contents are bytes.
return specConsumeBody(this, (bytes) => {
return consumeBody(this, (bytes) => {
return new Uint8Array(bytes).buffer
}, instance)
},

text () {
// The text() method steps are to return the result of running
// consume body with this and UTF-8 decode.
return specConsumeBody(this, utf8DecodeBytes, instance)
return consumeBody(this, utf8DecodeBytes, instance)
},

json () {
// The json() method steps are to return the result of running
// consume body with this and parse JSON from bytes.
return specConsumeBody(this, parseJSONFromBytes, instance)
return consumeBody(this, parseJSONFromBytes, instance)
},

async formData () {
Expand All @@ -375,16 +352,15 @@ function bodyMixinMethods (instance) {

// If mimeType’s essence is "multipart/form-data", then:
if (mimeType !== null && mimeType.essence === 'multipart/form-data') {
const headers = {}
for (const [key, value] of this.headers) headers[key] = value

const responseFormData = new FormData()

let busboy

try {
busboy = new Busboy({
headers,
headers: {
'content-type': serializeAMimeType(mimeType)
},
preservePath: true
})
} catch (err) {
Expand Down Expand Up @@ -427,8 +403,10 @@ function bodyMixinMethods (instance) {
busboy.on('error', (err) => reject(new TypeError(err)))
})

if (this.body !== null) for await (const chunk of consumeBody(this[kState].body)) busboy.write(chunk)
busboy.end()
if (this.body !== null) {
Readable.from(this[kState].body.stream).pipe(busboy)
}

await busboyResolve

return responseFormData
Expand All @@ -442,20 +420,17 @@ function bodyMixinMethods (instance) {
// application/x-www-form-urlencoded parser will keep the BOM.
// https://url.spec.whatwg.org/#concept-urlencoded-parser
// Note that streaming decoder is stateful and cannot be reused
const streamingDecoder = new TextDecoder('utf-8', { ignoreBOM: true })
const stream = this[kState].body.stream.pipeThrough(new TextDecoderStream('utf-8', { ignoreBOM: true }))

for await (const chunk of consumeBody(this[kState].body)) {
if (!isUint8Array(chunk)) {
throw new TypeError('Expected Uint8Array chunk')
}
text += streamingDecoder.decode(chunk, { stream: true })
for await (const chunk of stream) {
text += chunk
}
text += streamingDecoder.decode()

entries = new URLSearchParams(text)
} catch (err) {
// istanbul ignore next: Unclear when new URLSearchParams can fail on a string.
// 2. If entries is failure, then throw a TypeError.
throw new TypeError(undefined, { cause: err })
throw new TypeError(err)
}

// 3. Return a new FormData object whose entries are entries.
Expand Down Expand Up @@ -493,7 +468,7 @@ function mixinBody (prototype) {
* @param {(value: unknown) => unknown} convertBytesToJSValue
* @param {Response|Request} instance
*/
async function specConsumeBody (object, convertBytesToJSValue, instance) {
async function consumeBody (object, convertBytesToJSValue, instance) {
webidl.brandCheck(object, instance)

throwIfAborted(object[kState])
Expand Down

0 comments on commit 4e69afd

Please sign in to comment.