Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I have a question about back-pressure and drain event handler #2426

Closed
jooohyun opened this issue Jan 19, 2020 · 4 comments
Closed

I have a question about back-pressure and drain event handler #2426

jooohyun opened this issue Jan 19, 2020 · 4 comments

Comments

@jooohyun
Copy link

  • Node.js Version:12.14.0
  • OS:mac os
  • Scope (install, code, runtime, meta, other?):
  • Module (and version) (if relevant):stream module, 12.14.0

I am currently studying node.js back-pressure myself.

I intend not to use .pipe() or .pipeline()

because I want to understand back-pressure and drain event.

but I don't know how to write appropriate drain handler.

let's see the below code.

"use strict";

const rs = getReadableStreamSomehow();
const ws = getWritableStreamSomehow();

rs.on("data", function handler(data) {
  if (!ws.write(data)) {
    ws.once("drain", handler);
  }
});

it seems that the above source code has some problem. because I encountered memory leak warning from console.

(node:29788) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 drain listeners added to [WriteStream]. Use emitter.setMaxListeners() to increase limit

Is there anyone knows how to write drain event handler?
Thank you.

@renatomariscal
Copy link

You are trying to write a pipe from a reader to a writer, so it is important to understand how to deal with the back-pressure in both sides.

In the writer side: write returns false as a way to say "please stop sending more data", and event drain to tell it is done with the buffer so it can take more data.

In the reader side: it has "pause" and "flowing" modes. In the moment you connect with rs.on("data", handler), it moves from pause to flowing. It will be calling handler while it is flowing. You can call rs.pause() to pause, and rs.resume() to resume.

Back to your code:

  if (!ws.write(data)) {
    ws.once("drain", handler);
  }

When the writer is asking you to stop sending data (false), you are scheduling an event handler to run once, on drain. That doesn't cause the reader to stop.
Your reader is faster than your writer, so before the drain happen, you are getting more and more data, so you get more and more false's, scheduling several listeners on 'drain'.

The correct would be stopping the reader:

if (!ws.write(data)) {
rs.pause();

And resume on drain:

ws.once("drain", function resume(){ 
   rs.resume();
});

So your code would be closer to correct as

const rs = getReadableStreamSomehow();
const ws = getWritableStreamSomehow();

rs.on("data", function handler(data) {
  if (!ws.write(data)) {
    rs.pause();
    ws.once("drain", function resume() {
      rs.resume();
    });
  }
});

That may be sufficient in terms of back-pressure; Getting error handlers correct is a bigger challenge.

@jooohyun
Copy link
Author

Thank you @renatomariscal . I accepted your feedback and I checked out that there will be no memory leak warning.

I have a question. Could you give me your opinion about the following code snippet?

const rs = getReadableStreamSomehow();
const ws = getWritableStreamSomehow();

rs.on("data", function handler(data) {
  if (!ws.write(data)) {
    ws.removeAllListeners("drain");
    ws.once("drain", handler);
  }
});

when I code like this, there are also no memory leak warning. so is it correct way?

@renatomariscal
Copy link

It is not correct, NodeJS is unable to detect the memory leak because you are resetting the listeners, but, as you are not pausing you still accumulate data in memory ignoring the ask to pause.

In the current state, your code is in the best case, equivalent of:

const rs = getReadableStreamSomehow();
const ws = getWritableStreamSomehow();

rs.on("data", function handler(data) {
  ws.write(data);
});

@jooohyun
Copy link
Author

@renatomariscal thank you. I've learned a lot.

I will close this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants