Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stdio: implement manual start for ReadStream #36277

Closed
wants to merge 3 commits into from

Conversation

mmomtchev
Copy link
Contributor

All stdio ReadStream's use manual start to avoid
consuming data for example when a process
execs/spawns

Refs: #36251

Checklist
  • make -j4 test (UNIX), or vcbuild test (Windows) passes
  • tests and/or benchmarks are included
  • commit message follows commit guidelines

@gireeshpunathil gireeshpunathil added the stream Issues and PRs related to the stream subsystem. label Nov 26, 2020
@joyeecheung
Copy link
Member

cc @nodejs/streams

Copy link
Member

@ronag ronag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

manualStart should be documented.

@ronag
Copy link
Member

ronag commented Nov 26, 2020

I'm not sure I understand the root problem here. When would it ever not be safe to start reading immediately? Seems more like a workaround for a different issue.

@mscdex
Copy link
Contributor

mscdex commented Nov 26, 2020

Minor nit: perhaps call this autoStart instead which would fit better with existing naming (e.g. autoClose) ?

@mmomtchev
Copy link
Contributor Author

I'm not sure I understand the root problem here. When would it ever not be safe to start reading immediately? Seems more like a workaround for a different issue.

@ronag stdin - when reading from a file through a shell redirection, reading will start immediately and the data will be waiting to be consumed in the NodeFile buffers
This is a problem when you want to spawn a child process to read this data - look at @isaacs 's issue

@mscdex there is already a manualStart option for net.Socket

Where should this be documented? Readable or ReadStream? The net.Socket manualStart is not documented

Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be implemented outside of Readable? How could somebody start the stream if manualStart is true?

@@ -196,7 +196,8 @@ function Readable(options) {
Stream.call(this, options);

destroyImpl.construct(this, () => {
maybeReadMore(this, this._readableState);
if (!options || !options.manualStart)
maybeReadMore(this, this._readableState);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs some tests that are specific to streams. How is a user of Stream be able to use this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcollina @joyeecheung @ronag @mscdex

Attaching a data handler will start the stream.
I simply recreated the semantics of the non-documented option in net.Socket - which is what the thread bootstrap code uses.
Its only user at the moment is this thread bootstrap code for the stdin stream. The net.Socket option has a second use case in TLSWrap - but I am not sure if it actually works - because this wouldn't happen : #35475

The reason I did in Readable is simply that the reading is started from here for all Readable except net.Socket which has its own startup code

To move this to ReadStream I would need to reimplement part of this code and it will be a much more invasive change

So two options:

  • Keep both manualStart options as hidden non-documented options, eventually replacing them with Symbol names
  • Document them, unit test them and make them official

It is also worth noting that there are two other net.Socket specific options, both non-documented, pauseOnStart and pauseOnConnect - which have different semantics - the bootstrap code being made for manualStart

lib/internal/streams/readable.js Show resolved Hide resolved
@ronag
Copy link
Member

ronag commented Nov 27, 2020

What about?

const myStream = createStream()
myStream.pause()

@mmomtchev
Copy link
Contributor Author

@ronag For a ReadStream with a file, the pause is too late

maybeReadMore in Readable will eventually trigger a C++ FileHandle::ReadStart which will call uv_fs_read()
uv_fs_read() will happen in another thread and quite often will beat stream.pause() - or if it doesn't alright beat it - at least by the time you pause it, everything will already be set in motion and the first part of the file will be consumed

@mmomtchev
Copy link
Contributor Author

What about?

const myStream = createStream()
myStream.pause()

Wait a second, just tested it and maybe you are right

@mmomtchev
Copy link
Contributor Author

@ronag, yes, in fact the pause will be immediate because maybeReadMore happens in a nextTick
However if you pause, this does not cancel maybeReadMore which still schedules a read
Any ideas?

@ronag
Copy link
Member

ronag commented Nov 27, 2020

However if you pause, this does not cancel maybeReadMore which still schedules a read

Maybe we should fix this? i.e. maybeReadMore does nothing if paused.

@mmomtchev
Copy link
Contributor Author

However if you pause, this does not cancel maybeReadMore which still schedules a read

Maybe we should fix this? i.e. maybeReadMore does nothing if paused.

Look at this:

while (!state.reading && !state.ended &&

If the stream is paused, state.flowing is false - however state.length is 0 and state.highWaterMark is 65536 so the read still happens
What shall be the interaction between state.flowing and state.highWaterMark - the huge comment above that while loop does not make it very clear - but it sounds like if the stream is not flowing, try still to fill the buffer up to state.highWaterMark - which does sound somewhat weird to me? @mcollina ?

So using manualStart can be replaced by

const myStream = createStream()
myStream._readableState.highWaterMark = 0;
myStream.pause()

The above solution also works, I tested it

Is this intended? It does not feel very right

@ronag
Copy link
Member

ronag commented Nov 27, 2020

@mmomtchev maybe in the construct callback we check if paused before starting to read?

  destroyImpl.construct(this, () => {
    if (!this.isPaused()) {
      maybeReadMore(this, this._readableState);    
    }
   })

@mcollina mcollina added the request-ci Add this label to start a Jenkins CI on a PR. label Nov 27, 2020
Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@github-actions github-actions bot removed the request-ci Add this label to start a Jenkins CI on a PR. label Nov 27, 2020
@nodejs-github-bot
Copy link
Collaborator

@mmomtchev
Copy link
Contributor Author

This test
https://github.com/nodejs/node/blob/master/test/parallel/test-stream-preprocess.js
which is using 'readable' is failing because of this:

state.flowing = false;

Does anyone know why flowing is set to false when a 'readable' is binded?

@mmomtchev
Copy link
Contributor Author

mmomtchev commented Nov 27, 2020

I tried removing it and there is an unit test which does specifically test for this, although I suspect that it might not be intentional
Its author is referring to highWaterMark=0 streams, but in fact, this test passes because flowing is set to false when a readable is binded

https://github.com/nodejs/node/blob/master/test/parallel/test-stream-readable-hwm-0-no-flow-data.js

@ronag
Copy link
Member

ronag commented Nov 27, 2020

Does anyone know why flowing is set to false when a 'readable' is binded?

When binding 'readable' the stream should leave "flowing mode" and stop emitting 'data' events since the user is now expected to call .read().

@mmomtchev
Copy link
Contributor Author

Does anyone know why flowing is set to false when a 'readable' is binded?

When binding 'readable' the stream should leave "flowing mode" and stop emitting 'data' events since the user is now expected to call .read().

'readable' stops the flowing, but does not pause the stream

Look at this test: https://github.com/nodejs/node/blob/master/test/parallel/test-stream-preprocess.js

This works because the stream continues reading - otherwise the readline interface won't work - flowing is false but the stream is not paused
After this PR, the stream will really be paused and the readline interface won't be reading

@ronag
Copy link
Member

ronag commented Nov 27, 2020

Maybe:

diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js
index 93153908fe..4f72c0cdc1 100644
--- a/lib/internal/streams/readable.js
+++ b/lib/internal/streams/readable.js
@@ -196,7 +196,9 @@ function Readable(options) {
   Stream.call(this, options);
 
   destroyImpl.construct(this, () => {
-    maybeReadMore(this, this._readableState);
+    if (!this.isPaused()) {
+      maybeReadMore(this, this._readableState);
+    }
   });
 }
 
@@ -870,8 +872,8 @@ Readable.prototype.on = function(ev, fn) {
   } else if (ev === 'readable') {
     if (!state.endEmitted && !state.readableListening) {
       state.readableListening = state.needReadable = true;
-      state.flowing = false;
+      this.pause();
       state.emittedReadable = false;
       debug('on readable', state.length, state.reading);
       if (state.length) {
         emitReadable(this);

@mmomtchev
Copy link
Contributor Author

Maybe:

diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js
index 93153908fe..4f72c0cdc1 100644
--- a/lib/internal/streams/readable.js
+++ b/lib/internal/streams/readable.js
@@ -196,7 +196,9 @@ function Readable(options) {
   Stream.call(this, options);
 
   destroyImpl.construct(this, () => {
-    maybeReadMore(this, this._readableState);
+    if (!this.isPaused()) {
+      maybeReadMore(this, this._readableState);
+    }
   });
 }
 
@@ -870,8 +872,8 @@ Readable.prototype.on = function(ev, fn) {
   } else if (ev === 'readable') {
     if (!state.endEmitted && !state.readableListening) {
       state.readableListening = state.needReadable = true;
-      state.flowing = false;
+      this.pause();
       state.emittedReadable = false;
       debug('on readable', state.length, state.reading);
       if (state.length) {
         emitReadable(this);

@ronag, this is a bold move and I like it because it is a move in the right direction - having only one paused flag with uniform semantics, but I will have to modify this unit test and there will probably be user code that won't run after this change
https://github.com/nodejs/node/blob/master/test/parallel/test-stream-preprocess.js

@ronag
Copy link
Member

ronag commented Dec 2, 2020

What exactly fails with my proposal?

@mmomtchev
Copy link
Contributor Author

In that unit test, once you pause it because of the readable event, nothing will resume it for the readline interface
Currently it works because the stream is not really paused, it is simply not flowing

@mmomtchev
Copy link
Contributor Author

@ronag, I can't get test/parallel/test-stream-preprocess.js to work without changing it if I start messing with .pause()

fd: fd,
manualStart: false
});
setTimeout(() => assert(rs.bytesRead > 0), common.platformTimeout(10));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a setImmediate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read handler is already in a setImmediate - so this will run the assert before Node has a chance to start reading data.
I don't have any ideas on how to achieve this without a timer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the clarification. Maybe add this in a comment?

fd: fd,
manualStart: true
});
setTimeout(() => assert(rs.bytesRead === 0), common.platformTimeout(10));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

his should be a setImmediate

const assert = require('assert');
const fs = require('fs');

fs.promises.open(__filename).then(common.mustCall((fd) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you using fs.promises? Could you please use fs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because a cook who doesn't eat its own food is not a good cook 😄

const fs = require('fs');

fs.promises.open(__filename).then(common.mustCall((fd) => {
const rs = new fs.ReadStream(null, {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fs.ReadStream, not stream.Readable. While this test is correct, can you please use one that just use stream.Readable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Readable always starts in paused mode, this change affects only a ReadStream

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Readable always starts in paused mode

How is that relevant? It behaves the same as ReadStream. The same test should apply.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Readable always starts in paused mode, this change affects only a ReadStream

If that was true, you'd not need to modify Readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there is some biggus problem, how do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there is some biggus problem, how do you think?

Likely, but it's possible to add an independent test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused what the problem here is? We just need change ReadStream to Readable. What is the "biggus" problem?

Copy link
Member

@ronag ronag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see #36823

@mmomtchev
Copy link
Contributor Author

Included in #36823

@mmomtchev mmomtchev closed this Jan 13, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants