Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mergeLimit function as mentioned in #374 #375

Merged
merged 6 commits into from
Sep 20, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3610,6 +3610,80 @@ Stream.prototype.merge = function () {
};
exposeMethod('merge');

/**
* Takes a Stream of Streams and merges their values and errors into a
* single new Stream, limitting the number of unpaused streams that can
* running at any one time.
*
* Note that no guarantee is made with respect to the order in which
* values for each stream end up in the merged stream. Values in the
* merged stream will, however, respect the order they were emitted from
* their respective streams.
*
* @id mergeWithLimit
* @section Higher-order Streams
* @name Stream.mergeWithLimit(n)
* @param {Number} n - the maximum number of streams to run in parallel
* @api public
*
* var txt = _(['foo.txt', 'bar.txt']).flatMap(readFile)
* var md = _(['baz.md']).flatMap(readFile)
* var js = _(['bosh.js']).flatMap(readFile)
*
* _([txt, md, js]).mergeWithLimit(2);
* // => contents of foo.txt, bar.txt, baz.txt and bosh.js in the order
* // they were read, but bosh.js is not read until either foo.txt and bar.txt
* // has completely been read or baz.txt has been read
*/


Stream.prototype.mergeWithLimit = function (n){
var self = this;
var processCount = 0;
var waiting = false;
if (typeof n !== 'number' || n < 1) {
throw new Error('mergeWithLimit expects a positive number, but got: ' + n)
}
if (n === Infinity) {
return this.merge();
}
return _(function(push, next){
self.pull(function(err, x){
var done = x === nil;
if (err){
push(err);
next();
}
else if (x === nil) {
push(null, nil);
}
else {
processCount++;
push(err, x);
// console.log('start', x.id);
x.once('end', function(){
processCount--;
// console.log('end', x.id);
if (waiting) {
// console.log('get more');
waiting = false;
next();
}
});
if (!done && processCount < n) {
next();
}
else {
// console.log('wait till something ends');
waiting = true;
}
}

});
}).merge();
};
exposeMethod('mergeWithLimit');

/**
* Calls a named method on each object from the Stream - returning
* a new stream with the result of those calls.
Expand Down
58 changes: 58 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3104,6 +3104,64 @@ exports['merge'] = {
}
};

exports['mergeWithLimit'] = {
setUp: function (callback) {
this.clock = sinon.useFakeTimers();
this.__delay = function (n){
return _(function (push, next) {
setTimeout(function () {
push(null, n);
push(null, _.nil);
}, n*10);
});
};
callback();
},
tearDown: function (callback) {
this.clock.restore();
delete this.__delay
callback();
},
'run three at a time': function (test) {
_.mergeWithLimit(3, [5,3,4,4,2].map(this.__delay)).toArray(function (xs) {
test.same(xs, [3,4,5,2,4]);
test.done();
});
this.clock.tick(100);
},
'run two at a time': function (test) {
_.mergeWithLimit(2, [4,3,2,3,1].map(this.__delay)).toArray(function (xs) {
test.same(xs, [3,4,2,1,3]);
test.done();
});
this.clock.tick(100);
},
'run one at a time': function (test) {
_.mergeWithLimit(1, [4,3,2,3,1].map(this.__delay)).toArray(function (xs) {
test.same(xs, [4,3,2,3,1]);
test.done();
});
this.clock.tick(150);
},
'handle backpressure': function (test) {
var s1 = _([1,2,3,4]);
var s2 = _([5,6,7,8]);
var s = _.mergeWithLimit(10, [s1, s2]);
s.take(5).toArray(function (xs) {
test.same(xs, [1,5,2,6,3]);
_.setImmediate(function () {
test.equal(s._outgoing.length, 0);
test.equal(s._incoming.length, 1);
test.equal(s1._incoming.length, 2);
test.equal(s2._incoming.length, 2);
test.done();
});
});
this.clock.tick(100);
},
'noValueOnError': noValueOnErrorTest(_.mergeWithLimit(1)),
};

exports['invoke'] = function (test) {
test.expect(2);
_.invoke('toString', [], [1,2,3,4]).toArray(function (xs) {
Expand Down