Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: stream error timings #2497

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 46 additions & 23 deletions lib/api/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ module.exports = class BodyReadable extends Readable {
return super.destroy(err)
}

_destroy (err, callback) {
// Workaround for Node "bug". If the stream is destroyed in same
// tick as it is created, then a user who is waiting for a
// promise (i.e micro tick) for installing a 'error' listener will
// never get a chance and will always encounter an unhandled exception.
// - tick => process.nextTick(fn)
// - micro tick => queueMicrotask(fn)
queueMicrotask(() => {
callback(err)
})
}

emit (ev, ...args) {
if (ev === 'data') {
// Node < 16.7
Expand Down Expand Up @@ -166,7 +178,7 @@ module.exports = class BodyReadable extends Readable {
}
}

if (this.closed) {
if (this._readableState.closeEmitted) {
return Promise.resolve(null)
}

Expand Down Expand Up @@ -210,33 +222,44 @@ function isUnusable (self) {
}

async function consume (stream, type) {
if (isUnusable(stream)) {
throw new TypeError('unusable')
}

assert(!stream[kConsume])

return new Promise((resolve, reject) => {
stream[kConsume] = {
type,
stream,
resolve,
reject,
length: 0,
body: []
}
if (isUnusable(stream)) {
const rState = stream._readableState
if (rState.destroyed && rState.closeEmitted === false) {
stream
.on('error', err => {
reject(err)
})
.on('close', () => {
reject(new TypeError('unusable'))
})
} else {
reject(rState.errored ?? new TypeError('unusable'))
}
} else {
stream[kConsume] = {
type,
stream,
resolve,
reject,
length: 0,
body: []
}

stream
.on('error', function (err) {
consumeFinish(this[kConsume], err)
})
.on('close', function () {
if (this[kConsume].body !== null) {
consumeFinish(this[kConsume], new RequestAbortedError())
}
})
stream
.on('error', function (err) {
consumeFinish(this[kConsume], err)
})
.on('close', function () {
if (this[kConsume].body !== null) {
consumeFinish(this[kConsume], new RequestAbortedError())
}
})

process.nextTick(consumeStart, stream[kConsume])
queueMicrotask(() => consumeStart(stream[kConsume]))
}
})
}

Expand Down
22 changes: 14 additions & 8 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1963,12 +1963,19 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength,
body.resume()
}
}
const onAbort = function () {
if (finished) {
return
const onClose = function () {
// 'close' might be emitted *before* 'error' for
// broken streams. Wait a tick to avoid this case.
queueMicrotask(() => {
// It's only safe to remove 'error' listener after
// 'close'.
body.removeListener('error', onFinished)
})

if (!finished) {
const err = new RequestAbortedError()
queueMicrotask(() => onFinished(err))
}
const err = new RequestAbortedError()
queueMicrotask(() => onFinished(err))
}
const onFinished = function (err) {
if (finished) {
Expand All @@ -1986,8 +1993,7 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength,
body
.removeListener('data', onData)
.removeListener('end', onFinished)
.removeListener('error', onFinished)
.removeListener('close', onAbort)
.removeListener('close', onClose)

if (!err) {
try {
Expand All @@ -2010,7 +2016,7 @@ function writeStream ({ h2stream, body, client, request, socket, contentLength,
.on('data', onData)
.on('end', onFinished)
.on('error', onFinished)
.on('close', onAbort)
.on('close', onClose)

if (body.resume) {
body.resume()
Expand Down
37 changes: 37 additions & 0 deletions test/readable.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,40 @@ test('avoid body reordering', async function (t) {

t.equal(text, 'helloworld')
})

test('destroy timing text', async function (t) {
t.plan(1)

function resume () {
}
function abort () {
}
const _err = new Error('kaboom')
const r = new Readable({ resume, abort })
r.destroy(_err)
try {
await r.text()
} catch (err) {
t.same(err, _err)
}
})

test('destroy timing promise', async function (t) {
t.plan(1)

function resume () {
}
function abort () {
}
const r = await new Promise(resolve => {
const r = new Readable({ resume, abort })
r.destroy(new Error('kaboom'))
resolve(r)
})
await new Promise(resolve => {
r.on('error', err => {
t.ok(err)
resolve(null)
})
})
})
Loading