Skip to content

Commit

Permalink
refactor merge to allow late discovery of sources
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromew committed Jan 5, 2015
1 parent 5e0b846 commit fa38b34
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 44 deletions.
132 changes: 90 additions & 42 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2842,59 +2842,107 @@ exposeMethod('concat');
* _([txt, md]).merge();
* // => contents of foo.txt, bar.txt and baz.txt in the order they were read
*/

Stream.prototype.merge = function () {
var self = this;
var resuming = false;
var go_next = false;
var srcs;
var srcs = [];

var srcsNeedPull = [],
first = true,
async = false;

return _(function (push, next) {
var safeNext = function () {
if (!resuming) {
next();
if (first) {
first = false;
getSourcesSync(push, next);
}

if (srcs.length === 0) {
push(null, nil);
}
else if (srcsNeedPull.length) {
pullFromAllSources(push, next);
next();
}
else {
async = true;
}
});

// Make a handler for the main merge loop.
function srcPullHandler(push, next, src) {
return function (err, x) {
if (err) {
push(err);
srcsNeedPull.push(src);
}
else if (x === nil) {
srcs = srcs.filter(function (s) {
return s !== src;
});
}
else {
go_next = true;
if (src === self) {
srcs.push(x);
srcsNeedPull.push(x);
srcsNeedPull.unshift(self);
} else {
push(null, x);
srcsNeedPull.push(src);
}
}

if (async) {
async = false;
next();
}
};
if (!srcs) {
self.errors(push).toArray(function (xs) {
srcs = xs;
srcs.forEach(function (src) {
src.on('end', function () {
srcs = srcs.filter(function (s) {
return s !== src;
});
safeNext();
});
src.on('data', function (x) {
src.pause();
push(null, x);
safeNext();
});
src.on('error', function (err) {
}


function pullFromAllSources(push, next) {
var _srcs = srcsNeedPull;
srcsNeedPull = [];
_srcs.forEach(function (src) {
src.pull(srcPullHandler(push, next, src));
});
}

// Pulls as many sources as possible from self synchronously.
function getSourcesSync(push, next) {
// Shadows the outer async variable.
var async;
var done = false;
while (!done) {
async = true;
self.pull(function (err, x) {
async = false;
if (done) {
// This means the pull was async. Handle like
// regular async.
srcPullHandler(push, next, self)(err, x);
}
else {
if (err) {
push(err);
safeNext();
});
});
next();
});
}
else if (srcs.length === 0) {
push(null, nil);
}
else {
go_next = false;
resuming = true;
srcs.forEach(function (src) {
src.resume();
}
else if (x === nil) {
done = true;
}
else {
srcs.push(x);
srcsNeedPull.push(x);
}
}
});
resuming = false;
if (go_next) {
next();

// Async behavior, record self as a src and return.
if (async) {
done = true;
srcs.unshift(self);
}
}
});
}

};
exposeMethod('merge');

Expand Down
16 changes: 14 additions & 2 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2232,7 +2232,6 @@ exports['merge'] = {
});
this.clock.tick(2000);
},
/*
'read from sources as soon as they are available': function (test) {
test.expect(2);
var s1 = _([1, 2, 3]);
Expand All @@ -2255,7 +2254,20 @@ exports['merge'] = {
}, 400);
this.clock.tick(400);
},
*/
'generator generating sources synchronously': function(test) {
var srcs = _(function (push, next) {
push(null, _([1, 2, 3]));
push(null, _([3, 4, 5]));
push(null, _([6, 7, 8]));
push(null, _([9, 10, 11]));
push(null, _([12, 13, 14]));
push(null, _.nil);
})
srcs.merge().toArray(function(xs) {
test.same(xs, [ 1, 3, 6, 9, 12, 2, 4, 7, 10, 13, 3, 5, 8, 11, 14 ]);
test.done();
});
},
'github issue #124: detect late end of stream': function(test) {
var s = _([1,2,3])
.map(function(x) { return _([x]) })
Expand Down

0 comments on commit fa38b34

Please sign in to comment.