Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clearer way of consuming a stream and stopping on errors #484

Closed
0chroma opened this issue Apr 19, 2016 · 6 comments
Closed

Clearer way of consuming a stream and stopping on errors #484

0chroma opened this issue Apr 19, 2016 · 6 comments

Comments

@0chroma
Copy link
Contributor

0chroma commented Apr 19, 2016

This is an issue I've been running into while using highland, a lot of times I'll want to call a callback with a result from a highland stream when it's done, and it's a little awkward in some cases. I'd love to contribute to the library to make this easier though!

Highland usually works great if you're reducing a stream down to a single value, then calling .pull(callback) to return either an error or the value. The case I run into mostly involves when I'm using highland purely for control flow, like iterating over a database and making some upserts to each document as you iterate, ie .map(_.wrapCallback(doUpsert)).series().

A lot of times I'll just want to call a callback with either an error or when the stream ends, which you can do, but it isn't the nicest thing to look at. My current solution is to do something like .filter(item => item == _.nil).pull(callback), it's not super intuitive though. I also made the mistake of doing something like .stopOnError(callback).done(callback), which doesn't work since the callback gets called twice. I think it's something that could use some work/documentation.

I think this sort of pattern is also hard to figure out when it comes to just returning an array of results as well... intuitively people might reach for toArray(callback) when really they want to do something like .collect().pull(callback) since then the first error will be passed through to the callback.

Is there a better way of doing this that I'm missing? If not I'd love to contribute documentation for this pattern, or write a function that handles this case a little better, depending on feedback.

@jonsadka
Copy link

+1 to this

@vqvu
Copy link
Collaborator

vqvu commented Apr 19, 2016

Yeah, I get what you're saying. The reason this is a bit weird (and why you're running into these almost-works-but-not-quite situations) is that Highland doesn't treat errors as a terminal value.

I believe the best way is to do something like

var error;
s.stopOnError(e => error = e)
    .done(() => callback(error));

But this requires creating a closure to hold error, so it really belongs in a separate function. I'd be happy to accept a PR that implements this.

@ronag
Copy link

ronag commented Apr 22, 2016

I would say the best way would be to:

s.collect().pull(callback)

@vqvu
Copy link
Collaborator

vqvu commented Apr 22, 2016

The reason why s.collect().pull(callback) isn't perfect is because it leaves the stream in a "live" state, where it can continue to emit values (as it hasn't emitted the _.nil sigil yet). That means it can't clean up any resources or state that it's holding on to. Also, it makes it so that the operator is not a consumption operator since it doesn't fully consume the stream**. This is a general problem with using pull.

It's not really a problem in the 2.x world, since the engine doesn't do much resource cleanup, but it will be an issue in 3.x. I don't want to add new operators that will just have to be changed in the 3.x branch when it gets merged.

You're better off using stopOnError in conjunction with collect and pull for forward compatibility with the 3.x branch.

var result = s.collect()
    .stopOnError((e, push) => push(e));

result.pull((err, x) => {
    // Fully consume the stream.
    result.pull(() => {
        // baring a bug, this should have immediately emitted a _.nil.
        callback(err, x);
    }
});

** I know toArray also uses pull in the same way. It will be fixed.

@vqvu
Copy link
Collaborator

vqvu commented Apr 25, 2016

@Hflw, just to verify, does #488 implement what you need?

@0chroma
Copy link
Contributor Author

0chroma commented Apr 25, 2016

Oh woah, that PR actually works really well for my use case. In my case in particular I'd have to add a compact() method to the end for cases where I'm not emitting any values and using the library for pure flow control, ie:

_ = require("highland");
var doWork = function(item, cb) {
  //some kine of async work that only emits an error
  cb(err);
}
_([1,2,3,4]).map(_.wrapCallback(doWork)).series().compact().complete(_.log)

I think needing to add the compact() is fine though, since filtering your resultset down to a bounded value has a lot of possible solutions depending on your situation, and the end developer already has all the tools they need to do that I feel.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants