Skip to content

Commit

Permalink
streams: fix timing relative to promises
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Dec 6, 2023
1 parent bb2dd0e commit d44bc8d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 10 deletions.
41 changes: 31 additions & 10 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,13 @@ function _destroy(self, err, cb) {
}

if (err) {
process.nextTick(emitErrorCloseNT, self, err);
queueMicrotask(() => {
emitErrorCloseNT(self, err);
});
} else {
process.nextTick(emitCloseNT, self);
queueMicrotask(() => {
emitCloseNT(self);
});
}
}
try {
Expand Down Expand Up @@ -233,7 +237,9 @@ function errorOrDestroy(stream, err, sync) {
r.errored = err;
}
if (sync) {
process.nextTick(emitErrorNT, stream, err);
queueMicrotask(() => {
emitErrorNT(stream, err);
});
} else {
emitErrorNT(stream, err);
}
Expand Down Expand Up @@ -262,7 +268,9 @@ function construct(stream, cb) {
return;
}

process.nextTick(constructNT, stream);
queueMicrotask(() => {
constructNT(stream);
});
}

function constructNT(stream) {
Expand Down Expand Up @@ -291,16 +299,22 @@ function constructNT(stream) {
} else if (err) {
errorOrDestroy(stream, err, true);
} else {
process.nextTick(emitConstructNT, stream);
queueMicrotask(() => {
emitConstructNT(stream);
});
}
}

try {
stream._construct((err) => {
process.nextTick(onConstruct, err);
queueMicrotask(() =>{
onConstruct(err);
});
});
} catch (err) {
process.nextTick(onConstruct, err);
queueMicrotask(() => {
onConstruct(err);
});
}
}

Expand All @@ -318,11 +332,14 @@ function emitCloseLegacy(stream) {

function emitErrorCloseLegacy(stream, err) {
stream.emit('error', err);
process.nextTick(emitCloseLegacy, stream);
queueMicrotask(() => {
emitCloseLegacy(stream);
});
}

// Normalize destroy for legacy.
function destroyer(stream, err) {
process._rawDebug("### 0")
if (!stream || isDestroyed(stream)) {
return;
}
Expand All @@ -345,9 +362,13 @@ function destroyer(stream, err) {
// TODO: Don't lose err?
stream.close();
} else if (err) {
process.nextTick(emitErrorCloseLegacy, stream, err);
queueMicrotask(() => {
emitErrorCloseLegacy(stream, err);
});
} else {
process.nextTick(emitCloseLegacy, stream);
queueMicrotask(() => {
emitCloseLegacy(stream);
});
}

if (!stream.destroyed) {
Expand Down
20 changes: 20 additions & 0 deletions test/parallel/test-stream-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,23 @@ const http = require('http');
req.end('asd');
});
}

{
// Destroy timing relative to Promise

new Promise(resolve => {
const r = new Readable({ read() {} });
destroy(r, new Error('asd'));
resolve(r);
}).then(common.mustCall(r => {
r.on('error', common.mustCall());
}));

new Promise(resolve => {
const r = new Readable({ read() {} });
r.destroy(new Error('asd'));
resolve(r);
}).then(common.mustCall(r => {
r.on('error', common.mustCall());
}));
}

0 comments on commit d44bc8d

Please sign in to comment.