Skip to content

Commit

Permalink
refactor merge to allow late discovery of sources
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromew committed Oct 1, 2014
1 parent bc59164 commit c83be7b
Showing 1 changed file with 35 additions and 20 deletions.
55 changes: 35 additions & 20 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2601,6 +2601,7 @@ Stream.prototype.merge = function () {
var self = this;
var resuming = false;
var go_next = false;
var more_sources = true;
var srcs;
return _(function (push, next) {
var safeNext = function () {
Expand All @@ -2611,30 +2612,44 @@ Stream.prototype.merge = function () {
go_next = true;
}
};
var onSource = function(src) {
src.on('end', function () {
srcs = srcs.filter(function (s) {
return s !== src;
});
safeNext();
});
src.on('data', function (x) {
src.pause();
push(null, x);
safeNext();
});
src.on('error', function (err) {
push(err);
safeNext();
});
};
if (!srcs) {
self.errors(push).toArray(function (xs) {
srcs = xs;
srcs.forEach(function (src) {
src.on('end', function () {
srcs = srcs.filter(function (s) {
return s !== src;
});
safeNext();
});
src.on('data', function (x) {
srcs = [];
self.consume(function(err, x, _push, _next) {
if (err) {
push(err);
_next();
} else if (x === _.nil) {
more_sources = false;
_push(null, x);
} else {
onSource(x);
srcs.push(x);
srcs.forEach(function(src) {
src.pause();
push(null, x);
safeNext();
});
src.on('error', function (err) {
push(err);
safeNext();
});
});
next();
});
_next();
next();
}
}).resume();
}
else if (srcs.length === 0) {
else if (!more_sources && srcs.length === 0) {
push(null, nil);
}
else {
Expand Down

0 comments on commit c83be7b

Please sign in to comment.