-
Notifications
You must be signed in to change notification settings - Fork 30.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make streams default pause mode apply for non data events (such as 'error') #39722
Comments
@nodejs/streams |
Adding a test case to replicate: const stream = require('stream');
const chai = require('chai');
async function fork(f) {
return new Promise((resolve, reject) => setTimeout(() => {
try {
resolve(f());
} catch (e) {
reject(e);
}
}, 0));
}
async function main() {
const st1 = stream.PassThrough();
const st2 = stream.PassThrough();
const st3 = stream.PassThrough();
const data = [];
const errors = [[], []];
await fork(() => {
st1.write('a');
st1.write('b');
st1.write('c');
});
const pp1 = await fork(() => stream.pipeline(st1, st2, (err) => {
if (err) {
errors[0].push(err);
}
}));
await fork(() => {
st1.emit('error', 500);
st1.emit('close', null); // can either send 'close' or invoke `end()`
// st1.end();
});
const pp2 = await fork(() =>
stream.pipeline(pp1, st3, (err) => {
if (err) {
errors[1].push(err);
}
})
);
pp2.on('data', (d) => data.push(d.toString()));
await fork(() => {}); // do nothing and let the event loop process events
chai.expect(data).to.deep.eq(['a', 'b', 'c']);
chai.expect(errors[0]).to.deep.eq([500]);
chai.expect(errors[1]).to.deep.eq([500]); // fails (error output of pipeline #2 is empty)
console.log('success');
}
main.catch(console.log); Trying to replace the last // await fork(() => {});
await new Promise(resolve => stream.finished(pp2, () => {
console.log('this is never printed');
resolve();
})); Results in the process exiting without printing anything (not even the |
Thanks for the report. I'm really not sure I see the case where this is a problem? Is the scenario:
If the stream emits I am not sure how that's different from the following scanerio:
Can you elaborate on when you ran into this? (As a note promises don't wait for I/O and pending promises don't block Node.js from exiting) |
@benjamingr Thanks for the reply!
I'm not sure I follow this scenario, as I don't see how this would happen with promises. For instance, here: async function foo() {
const responsePromise = callApi().promise();
await longProcessing();
try {
await responsePromise;
} catch { ... }
// or
responsePromise.catch(...);
} I would expect to catch an error even if the According to this code it seems that my expectation is met: async function lateErrorHandling() {
const p = new Promise((resolve, reject) => setTimeout(reject('foo'), 0));
await new Promise(resolve => setTimeout(resolve, 1000));
try {
await p;
console.log('success');
} catch (e) {
console.log(`error: ${e}`);
}
}
async function main() {
console.log('start');
await lateErrorHandling();
console.log('end');
}
main().catch(console.log); which prints:
However, it does come with a deprecation warning "Unhandled promise rejections are deprecated" and so on (I'm using node 14.16.1), which seems very unfortunate to me.
In short, when trying to stream a result set from a mysql database to S3, while formatting and zipping the results in between the two. I am using node-mysql2 as a client with their promise-wrapping API ( |
The "big" example could be simplified to the following: const assert = require('assert')
const { PassThrough, pipeline } = require('stream')
const s1 = new PassThrough()
const s2 = new PassThrough()
s1.destroy()
s1.on('close', function () {
let called = false
pipeline(s1, s2, function (err) {
// this function is never called
called = true
console.log(err)
})
setImmediate(function () {
assert.strictEqual(called, true)
})
}) The ask is for pipeline to destroy the other streams if the first one is destroyed. (Note, the proper way to error a stream is to call |
wdyt @ronag? |
I think this is already fixed on master? |
Indeed it is! Which PR did you fix this in? Are we planning to backport? |
#39235 semver-major |
@mcollina My bad. Running from IntelliJ doesn't seem to use the right version (I used With the nightly build it seems that the "short" test case passes and prints:
(not sure if this is intended) However, the "long" test case fails on the assertion of the data:
Removing this assertion and leaving only the error assertions passes the test. |
Is your feature request related to a problem? Please describe.
I am trying build a pipeline of streams in conjunction with promises/async code in the midst of the orchestration of the pipeline:
This should work without a problem.
However, one might decide to split this function into multiple ones:
Now, though, the code isn't safe.
If I understand correctly, it could be that in between the time that the
queryStream
function produces a stream and the time that thewriteStream
function subscribes to said stream's events (viapipeline
), there might be events emitted to the stream, since these functions do not run synchronically and so they make room for other functions -- such as IO -- to run in between.This is fine for incoming data, as the stream begins in paused mode, buffering data until turning to flowing mode when the stream is piped by
pipeline
. In contrast, other events such aserror
orclose
are not buffered, and are emitted immediately as they are received, which means that the subscribing stream inwriteStream
will not be aware of them, which is a problem.Describe the solution you'd like
I would love for the default paused mode -- i.e, buffering events until first "subscriber" -- to apply for non data events (at least the builtin ones). Should it be the default behavior or not is a separate question, but having an ability to provide an option to the stream at construction could prove extremely useful.
As a first step though, it might be helpful to add this scenario to the documentation on streams.
Describe alternatives you've considered
I tried a workaround where before returning a queue from an async function, I added listeners to these events to buffer them, and then overridden each of the event-registration methods (
on
,once
, etc) to push events to any new subscriber of these events. I encountered two problems:pipeline
, which in turn usespipe
, makes it so that if the stream has event listeners prior to the function, it will not propagate these evens further on (I guess it assumes that there's already a handler in place).pipeline
/pipe
method is not necessarily (and is often not) the same as the order in which the events were emitted; for instance,error
is usually emitted beforeclose
, while the registration to these events is in the opposite order.The text was updated successfully, but these errors were encountered: