Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modifies pipe to pass along highland errors #166

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,9 @@ Stream.prototype.end = function () {
* automatically managing flow so that the destination is not overwhelmed
* by a fast source.
*
* If the destination is a Highland Stream, any errors from the source will be
* passed along to it. Otherwise, errors are emitted and must be handled.
*
* This function returns the destination so you can chain together pipe calls.
*
* @id pipe
Expand All @@ -780,15 +783,18 @@ Stream.prototype.pipe = function (dest) {

var s = self.consume(function (err, x, push, next) {
if (err) {
self.emit('error', err);
return;
}
if (x === nil) {
if (_.isStream(dest)) {
dest.write(new StreamError(err));
next();
} else {
self.emit('error', err);
return;
}
} else if (x === nil) {
if (canClose) {
dest.end();
}
}
else if (dest.write(x) !== false) {
} else if (dest.write(x) !== false) {
next();
}
});
Expand Down
49 changes: 49 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,55 @@ exports['pipe highland stream to node stream'] = function (test) {
src.pipe(dest);
};

exports['pipe highland stream to highland stream'] = function (test) {
var src = _([1,2,3]);
var dest = _();
src.pipe(dest).toArray(function (xs) {
test.same(xs, [1,2,3]);
test.done();
});
};

exports['pipe error to highland stream'] = function (test) {
var src = _([1,2,3]).map(function (x) {
if (x === 2) {
throw new Error('pipe error');
} else {
return x;
}
});
var dest = _();
src.pipe(dest).errors(function (err) {
test.equals(err.message, 'pipe error');
}).toArray(function (xs) {
test.same(xs, [1,3]);
test.done();
});
};

exports['pipe error to node stream'] = function (test) {
test.expect(2);
var src = _([1,2,3]).map(function (x) {
if (x === 2) {
throw new Error('pipe error');
} else {
return x;
}
});

var dest = new EventEmitter();
dest.writable = true;
dest.write = function (x) {
test.equals(x, 1);
return true;
};

test.throws(function () {
src.pipe(dest);
}, /pipe error/);
test.done();
};

exports['pipe to node stream with backpressure'] = function (test) {
test.expect(3);
var src = _([1,2,3,4]);
Expand Down