Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use FinalizationRegistry for cloned response body #3458

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions lib/web/fetch/body.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,25 @@ const { kState } = require('./symbols')
const { webidl } = require('./webidl')
const { Blob } = require('node:buffer')
const assert = require('node:assert')
const { isErrored } = require('node:stream')
const { isErrored, isDisturbed } = require('node:stream')
const { isArrayBuffer } = require('node:util/types')
const { serializeAMimeType } = require('./data-url')
const { multipartFormDataParser } = require('./formdata-parser')

const textEncoder = new TextEncoder()
function noop () {}

const hasFinalizationRegistry = globalThis.FinalizationRegistry && process.version.indexOf('v18') !== 0
let streamRegistry

if (hasFinalizationRegistry) {
streamRegistry = new FinalizationRegistry((weakRef) => {
const stream = weakRef.deref()
if (stream && !stream.locked && !isDisturbed(stream) && !isErrored(stream)) {
stream.cancel('Response object has been garbage collected').catch(noop)
}
})
}

// https://fetch.spec.whatwg.org/#concept-bodyinit-extract
function extractBody (object, keepalive = false) {
Expand Down Expand Up @@ -264,14 +277,18 @@ function safelyExtractBody (object, keepalive = false) {
return extractBody(object, keepalive)
}

function cloneBody (body) {
function cloneBody (instance, body) {
// To clone a body body, run these steps:

// https://fetch.spec.whatwg.org/#concept-body-clone

// 1. Let « out1, out2 » be the result of teeing body’s stream.
const [out1, out2] = body.stream.tee()

if (hasFinalizationRegistry) {
streamRegistry.register(instance, new WeakRef(out1))
mcollina marked this conversation as resolved.
Show resolved Hide resolved
}

// 2. Set body’s stream to out1.
body.stream = out1

Expand Down Expand Up @@ -499,5 +516,7 @@ module.exports = {
safelyExtractBody,
cloneBody,
mixinBody,
streamRegistry,
hasFinalizationRegistry,
bodyUnusable
}
2 changes: 1 addition & 1 deletion lib/web/fetch/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ function cloneRequest (request) {
// 2. If request’s body is non-null, set newRequest’s body to the
// result of cloning request’s body.
if (request.body != null) {
newRequest.body = cloneBody(request.body)
newRequest.body = cloneBody(newRequest, request.body)
}

// 3. Return newRequest.
Expand Down
21 changes: 3 additions & 18 deletions lib/web/fetch/response.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const { Headers, HeadersList, fill, getHeadersGuard, setHeadersGuard, setHeadersList } = require('./headers')
const { extractBody, cloneBody, mixinBody, bodyUnusable } = require('./body')
const { extractBody, cloneBody, mixinBody, hasFinalizationRegistry, streamRegistry, bodyUnusable } = require('./body')
const util = require('../../core/util')
const nodeUtil = require('node:util')
const { kEnumerableProperty } = util
Expand All @@ -26,24 +26,9 @@ const { URLSerializer } = require('./data-url')
const { kConstruct } = require('../../core/symbols')
const assert = require('node:assert')
const { types } = require('node:util')
const { isDisturbed, isErrored } = require('node:stream')

const textEncoder = new TextEncoder('utf-8')

const hasFinalizationRegistry = globalThis.FinalizationRegistry && process.version.indexOf('v18') !== 0
let registry

if (hasFinalizationRegistry) {
registry = new FinalizationRegistry((weakRef) => {
const stream = weakRef.deref()
if (stream && !stream.locked && !isDisturbed(stream) && !isErrored(stream)) {
stream.cancel('Response object has been garbage collected').catch(noop)
}
})
}

function noop () {}

// https://fetch.spec.whatwg.org/#response-class
class Response {
// Creates network error Response.
Expand Down Expand Up @@ -327,7 +312,7 @@ function cloneResponse (response) {
// 3. If response’s body is non-null, then set newResponse’s body to the
// result of cloning response’s body.
if (response.body != null) {
newResponse.body = cloneBody(response.body)
newResponse.body = cloneBody(newResponse, response.body)
}

// 4. Return newResponse.
Expand Down Expand Up @@ -532,7 +517,7 @@ function fromInnerResponse (innerResponse, guard) {
// a primitive or an object, even undefined. If the held value is an object, the registry keeps
// a strong reference to it (so it can pass it to the cleanup callback later). Reworded from
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/FinalizationRegistry
registry.register(response, new WeakRef(innerResponse.body.stream))
streamRegistry.register(response, new WeakRef(innerResponse.body.stream))
}

return response
Expand Down
3 changes: 2 additions & 1 deletion test/fetch/fire-and-forget.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ test('does not need the body to be consumed to continue', { timeout: 180_000, sk
// eslint-disable-next-line no-undef
gc(true)
const array = new Array(batch)
for (let i = 0; i < batch; i++) {
for (let i = 0; i < batch; i += 2) {
array[i] = fetch(url).catch(() => {})
array[i + 1] = fetch(url).then(r => r.clone()).catch(() => {})
}
await Promise.all(array)
await sleep(delay)
Expand Down
Loading