-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proposal Stream.buffer(highWaterMark) #655
Comments
This sounds reasonable, though I'd like call it something like
|
I can go with batchUpTo. This whole thing depends on my correct understanding that for this pipeline
map would only request the next value from batchUpTo once mergeWithLimit finishes with the previous substream. |
Yes. That should be the case.
BTW, I suggest you use flatMap(...) instead of .map(...).mergeWithLimit(1).
They are equivalent, but flatMap is clearer, and potentially more efficient.
…On Thu, Oct 18, 2018 at 9:32 AM Stephen Dahl ***@***.***> wrote:
I can go with batchUpTo.
This whole thing depends on my correct understanding that for this pipeline
_(source)
.batchUpTo(50)
.tap(batch => _(somethingAsync))
.mergeWithLimit(1)
tap would only request the next value from batchUpTo once mergeWithLimit
finishes with the previous substream.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#655 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AGIyZWv-nLNzKBM0Qm-j9xX8wIUJVuZWks5umJDqgaJpZM4XpLMH>
.
|
Thanks for the tip. this library has too many useful things and it is easy to lose track of what tools are available. |
The trick is to use The difference is that with var batched = [];
var resumeSource;
var resumeDest;
var pushDest;
var waitingForBatch = false;
var source = this.consume(function (err, x, ignore, next) {
resumeSource = next;
if (err) {
// pushDest the error
// resumeDest and continue this stream
} else if (x === nil) {
// pushDest batched if not empty.
// pushDest nil
} else {
batched.push(x);
if (waitingForBatch) {
waitingForBatch = false;
// pushDest batch and clear it.
// resumeDest
}
// Continue this stream if batched is not full.
// This stream will implicitly pause if the batch *is* full
});
resumeSource = source.resume.bind(source);
return source.createChild(function (push, next) {
resumeSource();
pushDest = push;
resumeDest = next;
if (buffer.length > 0) {
// push batch and clear it.
// continue this stream.
} else {
// This part signifies that the source should push the next element as soon as it has it.
waitingForBuffer = true;
}
}); You can also use |
finally got around to working on this. I decided that this fit best as part of I was also having trouble seeing where to extend the unit tests. I noticed that nodeunit is not being maintained anymore. is there a plan to switch to mocha or jest for the new version? I have worked with both and jest seems better to me. |
have it working now in my own test but still getting an infinite loop and stack overflow when running
|
I'm not sure why you see an infinite loop, but I left some comments in your commit. The other problem is that you can't add arguments to transforms. They get exported as a top-level transform that is curried, and adding additional arguments breaks that API. That is, you can run code like I think merging it into |
I propose Stream.buffer as a new method that mimics the functionality of the node stream writev function
This fills a similar role to batch. the difference is that batch waits for the provided count to be pushed to it before it pushes the array downstream.
buffer would collect items until downstream requests another batch up to a limit
I am willing to write this assuming it is something the project is willing to accept.
The text was updated successfully, but these errors were encountered: