From fa38b34d3d87ae9db4af2c73fd305ba8f5dc4b9e Mon Sep 17 00:00:00 2001 From: jeromew Date: Sat, 3 Jan 2015 21:03:49 +0000 Subject: [PATCH] refactor merge to allow late discovery of sources --- lib/index.js | 132 +++++++++++++++++++++++++++++++++++---------------- test/test.js | 16 ++++++- 2 files changed, 104 insertions(+), 44 deletions(-) diff --git a/lib/index.js b/lib/index.js index 5ac79b3..e6a3372 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2842,59 +2842,107 @@ exposeMethod('concat'); * _([txt, md]).merge(); * // => contents of foo.txt, bar.txt and baz.txt in the order they were read */ - Stream.prototype.merge = function () { var self = this; - var resuming = false; - var go_next = false; - var srcs; + var srcs = []; + + var srcsNeedPull = [], + first = true, + async = false; + return _(function (push, next) { - var safeNext = function () { - if (!resuming) { - next(); + if (first) { + first = false; + getSourcesSync(push, next); + } + + if (srcs.length === 0) { + push(null, nil); + } + else if (srcsNeedPull.length) { + pullFromAllSources(push, next); + next(); + } + else { + async = true; + } + }); + + // Make a handler for the main merge loop. + function srcPullHandler(push, next, src) { + return function (err, x) { + if (err) { + push(err); + srcsNeedPull.push(src); + } + else if (x === nil) { + srcs = srcs.filter(function (s) { + return s !== src; + }); } else { - go_next = true; + if (src === self) { + srcs.push(x); + srcsNeedPull.push(x); + srcsNeedPull.unshift(self); + } else { + push(null, x); + srcsNeedPull.push(src); + } + } + + if (async) { + async = false; + next(); } }; - if (!srcs) { - self.errors(push).toArray(function (xs) { - srcs = xs; - srcs.forEach(function (src) { - src.on('end', function () { - srcs = srcs.filter(function (s) { - return s !== src; - }); - safeNext(); - }); - src.on('data', function (x) { - src.pause(); - push(null, x); - safeNext(); - }); - src.on('error', function (err) { + } + + + function pullFromAllSources(push, next) { + var _srcs = srcsNeedPull; + srcsNeedPull = []; + _srcs.forEach(function (src) { + src.pull(srcPullHandler(push, next, src)); + }); + } + + // Pulls as many sources as possible from self synchronously. + function getSourcesSync(push, next) { + // Shadows the outer async variable. + var async; + var done = false; + while (!done) { + async = true; + self.pull(function (err, x) { + async = false; + if (done) { + // This means the pull was async. Handle like + // regular async. + srcPullHandler(push, next, self)(err, x); + } + else { + if (err) { push(err); - safeNext(); - }); - }); - next(); - }); - } - else if (srcs.length === 0) { - push(null, nil); - } - else { - go_next = false; - resuming = true; - srcs.forEach(function (src) { - src.resume(); + } + else if (x === nil) { + done = true; + } + else { + srcs.push(x); + srcsNeedPull.push(x); + } + } }); - resuming = false; - if (go_next) { - next(); + + // Async behavior, record self as a src and return. + if (async) { + done = true; + srcs.unshift(self); } } - }); + } + }; exposeMethod('merge'); diff --git a/test/test.js b/test/test.js index 6600253..959eeac 100755 --- a/test/test.js +++ b/test/test.js @@ -2232,7 +2232,6 @@ exports['merge'] = { }); this.clock.tick(2000); }, - /* 'read from sources as soon as they are available': function (test) { test.expect(2); var s1 = _([1, 2, 3]); @@ -2255,7 +2254,20 @@ exports['merge'] = { }, 400); this.clock.tick(400); }, - */ + 'generator generating sources synchronously': function(test) { + var srcs = _(function (push, next) { + push(null, _([1, 2, 3])); + push(null, _([3, 4, 5])); + push(null, _([6, 7, 8])); + push(null, _([9, 10, 11])); + push(null, _([12, 13, 14])); + push(null, _.nil); + }) + srcs.merge().toArray(function(xs) { + test.same(xs, [ 1, 3, 6, 9, 12, 2, 4, 7, 10, 13, 3, 5, 8, 11, 14 ]); + test.done(); + }); + }, 'github issue #124: detect late end of stream': function(test) { var s = _([1,2,3]) .map(function(x) { return _([x]) })