Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#691] Propagate destroy from consumers to sources #692

Open
wants to merge 3 commits into
base: 2.x
Choose a base branch
from

Conversation

richardscarrott
Copy link

@richardscarrott richardscarrott commented Jan 12, 2020

@vqvu I know you've indicated that making this work is difficult but I think it's a really important feature for usage with Node streams, so thought I'd give it a stab to try and understand the issue better.

There's sure to be holes in my implementation as I'm v. new to highland so don't have a complete understanding of the library and I've largely been considering my specific use-case of piping to a Node.ServerResponse.

Currently this is what I've done:

  1. When a highland Stream is destroyed, also destroy the underlying node Readable stream.

This solves this problem:

const { pipeline } = require('pipeline');
pipeline(_(nodeReadableStream), nodeWritableStream, (ex) => {
  console.log('Finished', ex); 
})
nodeWritableStream.destroy();
// `nodeReadableStream` is destroyed, allowing it to clean up any connections / stop filling up it's buffer etc.
  1. Propagate consumer destroy to source.

This solves this problem:

const { pipeline } = require('pipeline');
pipeline(_(nodeReadableStream).map((chunk) => JSON.stringify(chunk)), nodeWritableStream, (ex) => {
  console.log('Finished', ex);
})
nodeWritableStream.destroy();
// `nodeReadableStream` is destroyed, allowing it to clean up any connections / stop filling up it's buffer etc.

I think the above is where the difficulty you mentioned comes in due to fork & observe. I've currently taken the stance that observe never propagates destroy and fork always propagates destroy, although I think fork should probably actually only propagate destroy if all siblings have ended too, i.e. there are no more active consumers on the source.

Additionally, if this could be made to work -- I imagine it'd make sense for it to be opt in for cases where you do not want to destroy the readable, e.g.

const _.destroyable(nodeReadableStream).map(() => { ... })
// or
const _(nodeReadableStream).propagateDestroy().map(() => { ... })

Anyway, I understand if you're not interested as it sounds like you've been down this path before but happy to help out if you think this can be made to work.

@richardscarrott
Copy link
Author

richardscarrott commented Jan 12, 2020

My changes have surfaced an issue which also exists in current 2.x whereby a stream tries to write after it's been ended. e.g.

const s = _(new Promise((r) => setTimeout(() => r('Hello'), 1000)));
s.each((a) => {
    console.log(a);
})

f1.destroy();

Stream.prototype.destroy writes nil, but the Promise resolves afterwards causing the following error when trying to write 'Hello'.

Error: Cannot write to stream after nil
    at Stream.write (/Users/rich/projects/highland/node_modules/highland/lib/index.js:1644:15)
    at /Users/rich/projects/highland/node_modules/highland/lib/index.js:686:15
    at Immediate.<anonymous> (/Users/rich/projects/highland/node_modules/highland/lib/index.js:531:17)
    at processImmediate (internal/timers.js:439:21)

Not sure if there is a way to cancel pending requests on end, or if write would have to be more lenient; perhaps noop + warn instead of throw in this scenario?

Thinking about it...it seems like a similar problem whereby the destroy isn't propagated to the Promise in the above example, just as it wasn't propagated to the Readable stream before my change in this PR -- I wonder if it would make sense to always let the source know when it's been destroyed so it can avoid pushing?

@vqvu
Copy link
Collaborator

vqvu commented Jan 12, 2020

The master branch has a partial implementation of destroy propagation if you'd like to look into it more. Search for onDestroy. The changes there should correctly handle the bug you see in #692 (comment).


I've currently taken the stance that observe never propagates destroy and fork always propagates destroy, although I think fork should probably actually only propagate destroy if all siblings have ended too, i.e. there are no more active consumers on the source.

I agree with you about observe.

With fork, your proposal works if we disallow "late" forks. That is, if we say, "You must fork all of your children up-front, and it is illegal to fork a stream after it or its children have been consumed from." The current API semantic doesn't disallow late forks, and the question to ask is whether or not it is reasonable to make this breaking change. Would someone ever want to change the number of forks depending on the value that the fork emits. Perhaps the solution here is an additional API that allows the user to turn on and off this automatic destroy behavior (with some stickiness so that once you turn it on, you can't turn it off again).

Of the top of my head, here are the other work that needs to be done

  1. Audit all stream merge operators (i.e., the Higher-order Streams section) as well as all regular operators that don't use consume or createChild and implement custom destroy propagation logic for them. This is very time-consuming: my guess is at probably 20h-40h of work for someone already familiar with the code base. Higher-order streams in particular are tricky, since you need to destroy the source stream and any streams that the source stream produced.
  2. Propagate destroy from the Readable proxy produced by toNodeStream. At least this is easy in 3.0.
  3. [Nice to Have] APIs for to make it easy to implement new higher order streams with the correct destroy propagation. This would require an understanding of what the common patterns are in correct implementations and how we could make those patterns easy to use.

All of this can be accomplished with the proper motivation and time. Unfortunately I no longer use Highland in my day-to-day work1, so I have less motivation and far less time to make these kinds of sweeping changes to the library.

If you want to do this yourself, I'm happy to accept PRs against the master branch.

1 This is not meant to be a comment on the suitability of the library for any particular purpose, merely a comment on the realities of my current job.

@richardscarrott
Copy link
Author

@vqvu thanks for the info -- I'd love to work on this but it's unlikely I'll be able to invest enough time right now unfortunately.

It's a shame 3.0 hasn't made it out the door but I totally appreciate your position; it looks like you've put a lot of work in already to get it this far!

I do think highland is of huge value to the Node ecosystem and really enables Node streams to reach their full potential. I wonder whether the Node Foundation OpenJS Foundation or similar would be able to support a 3.0 release, have you investigated this at all?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants