Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement flatTap #651

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2167,6 +2167,44 @@ addMethod('doto', function (f) {
Stream.prototype.tap = Stream.prototype.doto;
_.tap = _.doto;

/**
* Applies a function, which must return a (possibly empty) highland stream, to each value from the source, and re-emits the
* source value when the function return ends.
*
* @id flatTap
* @section Transforms
* @name Stream.flatTap(f)
* @param {Function} f - the function to apply
* @api public
*
* const httpLog = H(new Promnise(res => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use _ instead of H for the Highland object.

Suggested change
* const httpLog = H(new Promnise(res => {
* const httpLog = _(new Promnise(res => {

* setTimeout(() => {
* console.log('Log');
* res('Success')
* }, 3000)
* }))
*
* _([1, 2, 3]).flatTap(httpLog) // Will emit [1,2,3]
*/

addMethod('flatTap', function (f) {
return this.flatMap(function (x) {
return f(x).consume(function (err, y, push, next) {
if (err) {
// next();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should push the err in this case and continue on with the stream. That's what most transforms do. The user can use stopOnError if they want to stop the stream early.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, this is an interesting one though, as it is a tap you could argue the error should be silenced? I think I do agree with you though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's definitely a lot of uses of tap where you want to ignore errors. I just don't like unilaterally silencing errors in library functions. That's something that developer using the library should decide.

Also, your fix isn't entirely correct. It pushes nil immediately after an error. In general, Highland streams don't end after the first error, so you need to call next instead of push(null, _.nil).

push(null, _.nil);
}
else if (y === _.nil) {
push(null, x);
push(null, _.nil);
}
else {
next();
}
});
});
});

/**
* Limits number of values through the stream to a maximum of number of values
* per window. Errors are not limited but allowed to pass through as soon as
Expand Down
96 changes: 96 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4909,6 +4909,102 @@ exports['tap - doto alias'] = function (test) {
test.done();
};


exports.flatTap = {
'flatTap - noValueOnError': noValueOnErrorTest(_.flatTap(function (x) { return _(); })),
'flatTap - returnsSameStream': returnsSameStreamTest(function (s) {
return s.flatTap(function (x) { return _([2]); });
}, [1], [1]),
'flatTap - On a empty stream': function (test) {
test.expect(1);
var s = _([]).flatTap(function (x) {
return _([6]);
});

s.pull(valueEquals(test, _.nil));
test.done();
},
'flatTap - Returns an Empty stream': function (test) {
test.expect(1);
var s = _([3]).flatTap(function (x) {
return _([]);
});

s.pull(valueEquals(test, 3));
test.done();
},
'flatTap - Emits an multiple values': function (test) {
test.expect(1);
var s = _([3]).flatTap(function (x) {
return _([5, 6, 7, 8]);
});

s.pull(valueEquals(test, 3));
test.done();
},
'flatTap - argument function throws': function (test) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a similar test to this, but for the stream (i.e., the return value of the argument function) emitting an error instead.

Something like this,

_([1, 2, 3, 4]).flatTap((x) => _((push) => {
    push(err);
    push(null, _.nil);
}));

test.expect(4);
var err = new Error('error');
var s = _([1, 2, 3, 4]).flatTap(function (x) {
if (x === 1) { throw err; }
if (x === 2) { _(['hello']); }
if (x === 3) { throw err; }
return _(['world']);
});

s.pull(errorEquals(test, 'error'));
s.pull(valueEquals(test, 2));
s.pull(anyError(test));
s.pull(valueEquals(test, 4));
test.done();
},
'flatTap - ArrayStream': function (test) {
var seen = [];
var f = function (x) {
return _(function (push, next) {
setTimeout(function () {
var y = x * 2;
seen.push(y);
push(null, y);
push(null, _.nil);
}, 10);
});
};
_([1, 2, 3, 4]).flatTap(f).toArray(function (xs) {
test.same(xs, [1, 2, 3, 4]);
test.same(seen, [2, 4, 6, 8]);
test.done();
});
},
'flatTap - GeneratorStream': function (test) {
var seen = [];
var f = function (x) {
return _(function (push, next) {
setTimeout(function () {
var y = x * 2;
seen.push(y);
push(null, y);
push(null, _.nil);
}, 10);
});
};
var s = _(function (push, next) {
push(null, 1);
setTimeout(function () {
push(null, 2);
push(null, 3);
push(null, 4);
push(null, _.nil);
}, 10);
});
s.flatTap(f).toArray(function (xs) {
test.same(xs, [1, 2, 3, 4]);
test.same(seen, [2, 4, 6, 8]);
test.done();
});
}
};

tomwhale marked this conversation as resolved.
Show resolved Hide resolved
exports.flatMap = function (test) {
var f = function (x) {
return _(function (push, next) {
Expand Down