Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure _.through Propogates Node Stream Errors #240

Merged
merged 1 commit into from
Mar 5, 2015

Conversation

svozza
Copy link
Collaborator

@svozza svozza commented Feb 27, 2015

I've recently been using Dominic Tarr's JSONStream and found it very puzzling why errors in _.through were not being caught by my errors function:

_(req).through(JSONStream.parse()).errors(handleInvalidJsonError)

The reason I used through was because I wanted to 'Highlandify' the only non-Highland stream in my chain and I suspect that's what many others would think too. After a bit of digging around in the code and the issues I found that there's a fix for this planned but it's part of the wider refactorings planned for 3.0 (#191). I'm just adding a note to the documentation to warn people about this behaviour in case they get tripped up by it like I did. There's a fairly simple work around, which is to wrap the whole pipe expression in another Highland stream:

_(_(req).pipe(JSONStream.parse())).errors(handleInvalidJsonError)

But it strikes me, would it not be an acceptable interim fix to just do this in the through method rather than wait until 3.0 drops?

Stream.prototype.through = function (target) {
    if (_.isFunction(target)) {
        return target(this);
    }
    else {
        var output = _();
        target.pause();
        _(this.pipe(target)).pipe(output);
        return output;
    }
};

Or is there something I'm not considering?

@vqvu
Copy link
Collaborator

vqvu commented Feb 27, 2015

Does your proposed fix work? It's still using pipe, so errors will still not get propagated to the output stream.

@LewisJEllis
Copy link
Collaborator

I don't have time to dig back into this at the moment, but it's definitely something we want to do; relevant PR: #166

@vqvu
Copy link
Collaborator

vqvu commented Feb 27, 2015

See my comment in #136 about this.

@svozza
Copy link
Collaborator Author

svozza commented Feb 27, 2015

Actually, you're right my fix won't work, is there even a need for the second pipe if we just wrap the whole thing in a Highland stream?

Stream.prototype.through = function (target) {
    if (_.isFunction(target)) {
        return target(this);
    }
    else {
        target.pause();
        return _(this.pipe(target));
    }
};

@vqvu
Copy link
Collaborator

vqvu commented Feb 27, 2015

Hmm, good point. This would wrap errors that target emits. There's still the issue of errors emitted by this.

For a proper fix, I think we need to be more careful about laziness. As it is, the source stream will be completely drained (and buffered downstream) as soon as the through is executed.

@vqvu
Copy link
Collaborator

vqvu commented Feb 27, 2015

Actually, I take that back about the laziness. The readable stream constructor uses pipe, which should respect back pressure, and not the data event like I thought it did.

@svozza
Copy link
Collaborator Author

svozza commented Feb 27, 2015

And an error in this brings us back to where we started because it goes straight into pipe, which then throws the error. Short of calling stopOnError or doing something even more egregious involving consume and otherwise I can't see a way around that with the current behaviour of pipe. So I think your comment in #136 still stands, we'll have to implement through without pipe.

@svozza
Copy link
Collaborator Author

svozza commented Feb 27, 2015

So I tried a naive implementation just based on _.pipeline:

Stream.prototype.through = function (target) {
   var self = this,
        dest = _(target),
        wrapper;

    if (_.isFunction(target)) {
        return target(this);
    }
    else {
        wrapper = _(function (push, next) {
            self.pull(function (err, x) {
                if (err) {
                    wrapper._send(err);
                    next();
                }
                else if (x === nil) {
                    wrapper._send(null, nil);
                }
                else {
                    wrapper._send(null, x);
                    next();
                }
            });
        });
        wrapper.write = function (x) {
            dest.write(x);
        };
        return wrapper;
    }
};

This catches any errors from this but passes the data through to dest untouched:

_(fs.createReadStream('test.json')).through(JSONStream.parse())
    .stopOnError(_.log)
    .toArray(_.log);
//=> [ <Buffer 7b 7a 22 61 22 3a 31 2c 22 62 22 3a 32 7d 0a> ]
// should give error because the JSON is invalid

I also tried using consume like in _.pipe but got the exact same result:

if (_.isFunction(target)) {
        return target(this);
    }
    else {
       //resuming/handling drain etc left out for clarity
        var s = self.consume(function (err, x, push, next) {
            if(err) {
                push(err);
            }
            else if (x === nil) {
                    dest.end();
                }
            }
            else if (dest.write(x) !== false) {
                next();
            }
        });
}
return s;

_(fs.createReadStream('test.json')).through2(JSONStream.parse())
    .stopOnError(_.log)
    .toArray(_.log);
//=> [ <Buffer 7b 7a 22 61 22 3a 31 2c 22 62 22 3a 32 7d 0a> ]
// should give error because the JSON is invalid

Tbh, I'm not really sure what I'm doing here so I could be missing something really obvious.

@vqvu
Copy link
Collaborator

vqvu commented Feb 27, 2015

The first one doesn't work because the stream constructor treats node
streams as purely readable, so your writes won't actually pass through
target if you write to dest.

The second one is probably closer, but without the rest of that code, I
can't tell what's wrong with it. Writing directly to dest is still
probably not the right thing to do.

What about using pipe but also bind an error handler to this that
forwards the error? That should cause pipe not to throw. You'll have to set
up the hander before you call pipe, so you'll likely have to inline the
node stream constructor code.

@svozza
Copy link
Collaborator Author

svozza commented Feb 28, 2015

Yeah, that's much simpler and works now for all cases.

else {
        target.pause();
        var s = _(this.on('error', function(err){
            s._send(err);
        }).pipe(target));

        return s;
    }

Just for my own information, why do we pause the target stream there? I know it's important because the tests fail if we don't but I'd like understand why.

@svozza
Copy link
Collaborator Author

svozza commented Feb 28, 2015

Actually, this won't work for all cases because if the first item in the stream is an Error then s will be undefined.

@vqvu
Copy link
Collaborator

vqvu commented Feb 28, 2015

Yeah, that's what I meant by "you have to inline the constructor". If you
look at the constructor code for the node stream branch, you'll see it
creates s first, then bind the error handlers, then does the pipe.

You'll need to do the same. Create the stream, bind the two error handlers,
then do the two pipes.
On Feb 28, 2015 5:28 AM, "Stefano Vozza" notifications@github.com wrote:

Actually, this won't work for all cases because if the first item in the
stream is an Error then s will be undefined.


Reply to this email directly or view it on GitHub
#240 (comment).

@svozza
Copy link
Collaborator Author

svozza commented Mar 1, 2015

Ah right, I'd no idea what you meant by node stream branch until I realised you were talking about the 3.0 code base. Binding the two error handler works well:

else {
        target.pause();
        var s = new Stream();
        this.on('error', function (xs, err) {
            s.write(new StreamError(err));
        });
        target.on('error', function (xs, err) {
            s.write(new StreamError(err));
        });
        return this.pipe(target).pipe(s);
    }

It passes for my JSONStream code and with a generator function that creates errors but I'm having trouble with the through module used in the unit tests. Something simple like this still causes an unhandled error:

var parser = through(
    function (data) {
        this.queue(JSON.parse(data));
    },
    function () {
        this.queue(null);
    }
);

var s = _.through(parser, ['zz{"a": 1}']);
s.errors(function (err) {
    console.log(err);
}).toArray(function (xs) {
    console.log(xs);
});

I can't understand why JSONStream works but not through as they both throw errors.

@svozza svozza changed the title add note for _.through docs on node stream errors Ensure _.through Propogates Node Stream Errors Mar 1, 2015
@vqvu
Copy link
Collaborator

vqvu commented Mar 1, 2015

I think there's some equivalent code on 2.x too, some where in the Steam
constructor.

Does through automatically catch exceptions in the data generator
function and emit it as an error? (I've never used it). If not, then that's
the reason why: JSON.parse threw and there was nothing to catch it. If it
does, then I'm not sure; I'll have to look at it when I have more than my
phone with me.
On Feb 28, 2015 4:02 PM, "Stefano Vozza" notifications@github.com wrote:

Ah right, I'd no idea what you meant by node stream branch until I
realised you were talking about the 3.0 code base. Binding the two error
handler works well:

else {
target.pause();
var s = new Stream();
this.on('error', function (xs, err) {
s.write(new StreamError(err));
});
target.on('error', function (xs, err) {
s.write(new StreamError(err));
});
return this.pipe(target).pipe(s);
}

It passes for my JSONStream code and with a generator function that
creates errors but I'm having trouble with the through module used in the
unit tests. Something simple like this still causes an unhandled error:

var parser = through(
function (data) {
this.queue(JSON.parse(data));
},
function () {
this.queue(null);
}
);
var s = _.through(parser, ['zz{"a": 1}']);
s.errors(function (err) {
console.log(err);
}).toArray(function (xs) {
console.log(xs);
});

I can't understand why JSONStream works but not through.


Reply to this email directly or view it on GitHub
#240 (comment).

@svozza
Copy link
Collaborator Author

svozza commented Mar 1, 2015

Of course, I should be doing something like this in the creation of the through module stream:

var parser = through(
    function (data) {
        try {
            this.queue(JSON.parse(data));
        }
        catch (err) {
            this.emit('error', err);
        }
    },
    function () {
        this.queue(null);
    }
);

Like yourself, I'm not familiar with through but I wanted to use what was already there rather than add another devDependency.

*/

Stream.prototype.through = function (target) {
var s, writeErr = _.curry(function (xs, err) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to do this convoluted curry thing. You already have a closure with the variable s. Just define a

function writeErr(err) {
    s.write(new StreamError(err));
}

@svozza svozza force-pushed the through-error-documentation branch 3 times, most recently from 95f4220 to 24d2756 Compare March 3, 2015 10:00
@svozza
Copy link
Collaborator Author

svozza commented Mar 3, 2015

All done.

@vqvu
Copy link
Collaborator

vqvu commented Mar 4, 2015

Also, can you change the commit message to something that describes the the commit does now? It no longer just adds a note.

@vqvu
Copy link
Collaborator

vqvu commented Mar 4, 2015

Do you think we should check if target is already a Highland stream and skip the second pipe if it is? This saves an unnecessary step in the pipeline, but is something of a micro optimization.

@svozza
Copy link
Collaborator Author

svozza commented Mar 4, 2015

I actually tried to change to commit message a couple of times but for some reason it wouldn't work. I think it's possible to change the message when merging the pull request so maybe we can do that? Not sure if there's a huge benefit in doing the check for the second pipe. Maybe if it becomes a bottleneck in the future...

@vqvu
Copy link
Collaborator

vqvu commented Mar 4, 2015

Gotcha about the check.

You can't do git commit --amend?

It's possible to change the message for the merge commit that's created, but the message for the original commit is still there.

add note for _.through docs on node stream errors

add error handlers to both pipe functions in through and tidy up tests

fix typo

fix docs and amend writeErr function

use more descriptive name for output stream

add note for _.through docs on node stream errors

add error handlers to both pipe functions in through and tidy up tests

fix typo

fix docs and amend writeErr function

use more descriptive name for output stream
@svozza svozza force-pushed the through-error-documentation branch from ee78ad1 to 548b808 Compare March 5, 2015 13:25
@svozza
Copy link
Collaborator Author

svozza commented Mar 5, 2015

Yeah, I tried git commit --amend and all it does is let me edit the body of the commit which lists all the squashed commits. So I managed to do it with git rebase instead.

@vqvu
Copy link
Collaborator

vqvu commented Mar 5, 2015

Hmm dunno what you did, but it seems to have fixed it.

vqvu added a commit that referenced this pull request Mar 5, 2015
Ensure _.through propogates node stream errors. Resolves #136.
@vqvu vqvu merged commit 48bccf9 into caolan:master Mar 5, 2015
@svozza svozza mentioned this pull request Apr 9, 2015
@vqvu vqvu added this to the v2.5.0 milestone Apr 17, 2015
@LewisJEllis LewisJEllis mentioned this pull request Jul 28, 2015
34 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants