-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Otherwise not picking up empty stream #570
Comments
I can't think of any reason why this wouldn't work. However, the implementation of Can you provide a test case that I can run that causes this behavior? I can't tell you much more without one. Alternatively, the workaround is to implement function getBuildAction (stream) {
return stream
.flatMap(state => {
if (state.lastAssetVersion !== state.currentAssetVersion) {
return runGitDiff(state.sourceDir);
} else {
return _.of(state);
}
});
} |
Hah the solution I've put in place of the I've been trying to isolate it in a test case but unfortunately it's a little challenging to track down. Below is the closest I've been able to reproduce the error where none of the let _ = require('highland');
let assert = require('assert');
function getBuildAction (stream) {
let stateStream = stream.observe();
return stream
// Only process when we know there is a newer version of the source
.filter((state) => state.lastAssetVersion !== state.currentAssetVersion)
// Diff the repo against the target branch & emit diff files
.flatMap((state) => _.of({ action: 'build' }))
// Flip back to original state stream if diff was not required
.otherwise(stateStream)
}
it('Should return a non-buildable state if not new source', (done) => {
let state = {
action: null,
lastAssetVersion: 'abcdefgh',
currentAssetVersion: 'abcdefgh'
};
let stream = _.of(state).observe();
return stream
.through(getBuildAction)
.toCallback((err, buildState) => {
assert.equal(buildState.action, null, 'Build action should be unmodified and left as null.');
done(err, buildState);
})
}); |
The reason why your test case doesn't execute properly is because you only If you change your code to this, it should work. let stream = _.of(state);
stream.observe()
.through(getBuildAction)
.toCallback((err, buildState) => {
assert.equal(buildState.action, null, 'Build action should be unmodified and left as null.');
done(err, buildState);
})
stream.resume(); This is probably not the root cause of your problem, right? |
Oh my mistake. Yeah unfortunately the reported issue is not so simple as it fires that first filter operation then stops. My hunch is that since this function works fine in the unit test it has more to do with the state of the stream coming into it. |
Hmm no luck so far but I do have a couple of questions: 1.) When are 2.) What's the best way to debug the state of the stream? Just add console statements to |
You'll want to reduce the test case as much as possible first. The stream state being printed out can get verbose very quickly as your pipeline becomes more complicated. Try starting with a failing pipeline and removing transforms until it stops breaking. For example, is |
Hey again, it's been a while. Today I was updating one of the util functions this project uses and while writing unit tests for it I stumbled upon the cause of this issue and confirmed it with a separate, isolated test. In short before the aforementioned through function I was calling describe('consume', () => {
it('Should fail if next is not called', (done) => {
let preFilterSpy = expect.createSpy();
let endSpy = expect.createSpy();
_([ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ])
.consume((err, x, push) => {
push(err, x);
})
.tap(preFilterSpy)
.filter(x => x === 'h')
.otherwise(_.of('h'))
.each(x => {
expect(x).toBe('h');
})
.done(endSpy);
expect(preFilterSpy).toHaveBeenCalled();
expect(preFilterSpy.calls.length).toBe(2);
expect(endSpy).toNotHaveBeenCalled();
done();
});
it('Should succeed if next is called', (done) => {
let preFilterSpy = expect.createSpy();
let endSpy = expect.createSpy();
_([ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ])
.consume((err, x, push, next) => {
push(err, x);
if (x !== _.nil) next();
})
.tap(preFilterSpy)
.filter(x => x === 'h')
.otherwise(_.of('h'))
.each(x => {
expect(x).toBe('h');
})
.done(endSpy);
expect(preFilterSpy).toHaveBeenCalled();
expect(preFilterSpy.calls.length).toBe(7);
expect(endSpy).toHaveBeenCalled();
done();
});
}); While I'm ecstatic this issue is resolved, I still have a couple of questions for you @vqvu if it's no trouble.
|
I'm glad you found the problem! And thanks for posting your repro case. It actually shows an unrelated bug in Highland. In the failure case, To answer your questions:
They're not all that different. For value generators, generally, you'll have code that looks like
For consume handlers, you have a very similar thing,
I can think of two reasons.
|
FYI, I've released expect(preFilterSpy.calls.length).toBe(1); Let me know if the new release causes any issues. Edit: Fix typos. |
Cool, glad it uncovered something that may help others. Will give it a shot and report any issues that may come up. Thanks for the explanation of the So for reading from an array you would probably want to use next so that data is only sent down when requested by the consumers: let data = ['one', 'two', 'three', 'four'];
_((push, next) => {
push(data.shift());
if (data.length) next();
else push(null, _.nil);
}); But for running a one-and-done like _((push) => {
child_process.exec('ls', (err, stdout, stderr) => {
if (err === null || err.code === 0) {
push(null, stdout.toString('utf8'));
}
else if (err.code > 0) {
push(stdout.toString('utf8') + '\n' + stderr.toString('utf8'));
}
else {
push(err);
}
push(null, _.nil);
});
}); |
Yes to both cases, though there's a typo in your array example. It should be using push(null, data.shift()) |
In my application repo I have the following function. I stream an immutable object down the pipeline. It filters it to see if we need to run a git diff to determine if we should copy the last version's assets or run a full build. However, it appears that after filter runs and if it returns false, that otherwise never fires. The stream appears to end, however it works completely as expected in my unit test. Does any cause come to mind which as to why the stream stops if that filter function returns false? I've put a console log statement in the
otherwise
stream-returning function but it does not fire.The code before it also runs from an observe like this:
The text was updated successfully, but these errors were encountered: