Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: fix end-of-stream for silent .destroy() #26638

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function isStreamEmittingClose(stream) {
if (stream._writableState && !stream._writableState.emitClose)
return false;
if (stream._readableState && !stream._readableState.emitClose)
return false;

return true;
}

function eos(stream, opts, callback) {
if (arguments.length === 2) {
callback = opts;
Expand Down Expand Up @@ -71,6 +80,18 @@ function eos(stream, opts, callback) {
stream.req.on('finish', onfinish);
};

const isEmittingClose = isStreamEmittingClose(stream);

if (!isEmittingClose) {
const _destroy = stream._destroy;
stream._destroy = function(err, cb) {
_destroy.call(stream, err, (_err) => {
if (!_err) process.nextTick(() => callback.call(stream, _err));
cb(_err);
});
};
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think this is the correct fix. We should not monkey patching the incoming stream. If the default implementation of destroy needs a change, let's do it there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcollina do you think some private method like _onDestroy would make sense?

Or what you think is the best way to proceed with this and be notified when this silent destroy is happened?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the main problem here is that currently it's allowed for stream to be silently destroyed, but we can not change this behaviour now, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream should not be tampered with in any form apart from adding events. This is a hard rule, this function is not expected to alter the internals in the streams. We should also not rely on internals as much as possible, as "previous generation" streams could be passed inside these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I can also think about introducing new “destroy” event which will be always emitted after “.destroy()” method.

Would it be the better way? My initial idea was to touch the smallest piece of code and functionality as possible. Each stream has “.destroy()” method so it seems natural to monkey patch it. In this way we don’t even need to introduce something new.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

end-of-stream cannot catch those because emitClose: false prevent them to be observable. That's the whole reason for that option - it's a "you should handle this yourself if you want to, these are dangerous waters" option. Now, the documentation might be improved..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does stdout set emitClose to false?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

net set it to false because it handles emit('close') in its own _destroy method. However in

stdout._destroy = dummyDestroy;
we override _destroy, and that means 'close' is never emitted.
I have a PR coming.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generically a stream that sets emitClose: false is responsible for emitting close in user-logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, that sounds reasonable!


if (isRequest(stream)) {
stream.on('complete', onfinish);
stream.on('abort', onclose);
Expand Down
50 changes: 50 additions & 0 deletions test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const { Writable, Readable, Transform, finished } = require('stream');
const assert = require('assert');
const fs = require('fs');
const { promisify } = require('util');
const Countdown = require('../common/countdown');

{
const rs = new Readable({
Expand Down Expand Up @@ -175,3 +176,52 @@ const { promisify } = require('util');
rs.push(null);
rs.resume();
}

{
const rs = new Readable({
emitClose: false,
read() {}
});

finished(rs, common.mustCall((err) => {
assert(!err, 'no error');
}));

setImmediate(() => rs.destroy());
}

{
const ws = new Writable({
emitClose: false,
write(data, enc, cb) {
cb();
}
});

finished(ws, common.mustCall((err) => {
assert(!err, 'no error');
}));

setImmediate(() => ws.destroy());
}

{
const countdown = new Countdown(1, common.mustCall());
const rs = new Readable({
emitClose: false,
read() {},
destroy(err, cb) {
setImmediate(() => {
countdown.dec();
cb(err);
});
}
});

finished(rs, common.mustCall((err) => {
assert(!err, 'no error');
assert(!countdown.remaining, 'destroy should be called first');
}));

setImmediate(() => rs.destroy());
}
47 changes: 47 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const { Stream, Writable, Readable, Transform, pipeline } = require('stream');
const assert = require('assert');
const http = require('http');
const { promisify } = require('util');
const Countdown = require('../common/countdown');

{
let finished = false;
Expand Down Expand Up @@ -477,3 +478,49 @@ const { promisify } = require('util');
{ code: 'ERR_INVALID_CALLBACK' }
);
}

{
const read = new Readable({
read() {}
});

const write = new Writable({
emitClose: false,
write(data, enc, cb) {
cb();
}
});

setImmediate(() => read.destroy());

pipeline(read, write, common.mustCall((err) => {
assert.ok(err, 'should have an error');
}));
}

{
const countdown = new Countdown(1, common.mustCall());
const read = new Readable({
read() {}
});

const write = new Writable({
emitClose: false,
write(data, enc, cb) {
cb();
},
destroy(err, cb) {
setImmediate(() => {
countdown.dec();
cb(err);
});
}
});

setImmediate(() => read.destroy());

pipeline(read, write, common.mustCall((err) => {
assert.ok(err, 'should have an error');
assert(!countdown.remaining, 'destroy should be called first');
}));
}