Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement flatTap #651

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open

Implement flatTap #651

wants to merge 8 commits into from

Conversation

tomwhale
Copy link

@tomwhale tomwhale commented Jun 14, 2018

From Issue #616

This method creates a short hand for the following:

stream.flatMap(item => save(item).map(_ => item))

where save returns a highland stream.

@tomwhale tomwhale mentioned this pull request Jul 4, 2018
lib/index.js Outdated
@@ -2167,6 +2167,28 @@ addMethod('doto', function (f) {
Stream.prototype.tap = Stream.prototype.doto;
_.tap = _.doto;

/**
* Applies a function to each value from the source, and re-emits the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mention that

  1. The function must return a Highland stream.
  2. The source value will not be re-emitted until the stream that the function returns ends.

lib/index.js Outdated
* @param {Function} f - the function to apply
* @api public
*
* _([1, 2, 3]).flatTap(httpLog)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an example for how httpLog would need to be implemented? Maybe using promises + setTimeout + console.log to illustrates the fact that the stream will wait until the stream completes before it continues..

lib/index.js Outdated
addMethod('flatTap', function (f) {
return this.flatMap(function (x) {
return f(x)
.flatMap(function () {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I mentioned that flatTap could be implemented with flatMap like this, but a complete implementation should account for the fact that the stream f(x) may emit zero values or more than one values. You're better off implementing with consume and looking for the _.nil sentinel instead.

test/test.js Outdated
@@ -4909,6 +4909,86 @@ exports['tap - doto alias'] = function (test) {
test.done();
};

exports.flatTap = function (test) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you implement the tests using the object syntax (see the tests for pull as an example). This will group them together into a suite that makes it easier to run separately.

test/test.js Outdated
var seen;
function record(x) {
var y = x * 2;
seen.push(y);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seen.push(y) should be done asynchronously. You want to verify that the stream actually waits until the completion of the async action before continuing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see you do this in the GeneratorStream and ArrayStream tests. You can just remove this test then, since it doesn't seem to add any value.

test/test.js Outdated
var f = function (x) {
return _(function (push, next) {
setTimeout(function () {
push(null, x * 2);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you verify that this code executes before toArray completes? The seen.push(y) approach that you use above is fine.

Also do this for the GeneratorStream test.

test/test.js Show resolved Hide resolved
@vqvu
Copy link
Collaborator

vqvu commented Jul 4, 2018

Hi @tomwhale,

Sorry for dropping the ball on this. I made some comments when you first sent this PR, but got confused by Github's interface and never ended up posting them.

@tomwhale
Copy link
Author

Sorry, had a chance to pick this up again. I think I have resolved all your comments. If I have misunderstood what you meant, lemme know!

Copy link
Collaborator

@vqvu vqvu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. I just have a comment about error handling.

lib/index.js Outdated
* @param {Function} f - the function to apply
* @api public
*
* const httpLog = H(new Promnise(res => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use _ instead of H for the Highland object.

Suggested change
* const httpLog = H(new Promnise(res => {
* const httpLog = _(new Promnise(res => {

lib/index.js Outdated
return this.flatMap(function (x) {
return f(x).consume(function (err, y, push, next) {
if (err) {
// next();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should push the err in this case and continue on with the stream. That's what most transforms do. The user can use stopOnError if they want to stop the stream early.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, this is an interesting one though, as it is a tap you could argue the error should be silenced? I think I do agree with you though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's definitely a lot of uses of tap where you want to ignore errors. I just don't like unilaterally silencing errors in library functions. That's something that developer using the library should decide.

Also, your fix isn't entirely correct. It pushes nil immediately after an error. In general, Highland streams don't end after the first error, so you need to call next instead of push(null, _.nil).

s.pull(valueEquals(test, 3));
test.done();
},
'flatTap - argument function throws': function (test) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a similar test to this, but for the stream (i.e., the return value of the argument function) emitting an error instead.

Something like this,

_([1, 2, 3, 4]).flatTap((x) => _((push) => {
    push(err);
    push(null, _.nil);
}));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants