diff --git a/lib/index.js b/lib/index.js index f58331e..a26d86b 100755 --- a/lib/index.js +++ b/lib/index.js @@ -126,6 +126,11 @@ function bindContext(fn, context) { * i.e., contains a method that returns an object that conforms to the iterator protocol. The stream will use the * iterator defined in the `Symbol.iterator` property of the iterable object to generate emitted values. * + * **Asynchronous Iterator -** Accepts an iterator produced by an ES8 [async generator function](https://github.com/tc39/proposal-async-iteration#async-generator-functions), + * yields all the values from the iterator by resolving its `next()` method and terminates when the + * iterator's done value returns true. If the iterator's `next()` method throws or rejects, the exception will be emitted as an error, + * and the stream will be ended with no further calls to `next()`. + * * @id _(source) * @section Stream Objects * @name _(source) @@ -679,33 +684,42 @@ function promiseStream(StreamCtor, promise) { function iteratorStream(StreamCtor, it) { return new StreamCtor(function (push, next) { - var iterElem, iterErr; - try { - iterElem = it.next(); - } - catch (err) { - iterErr = err; + function pushIt(iterElem) { + if (iterElem.done) { + if (!_.isUndefined(iterElem.value)) { + // generators can return a final + // value on completion using return + // keyword otherwise value will be + // undefined + push(null, iterElem.value); + } + push(null, _.nil); + } + else { + push(null, iterElem.value); + next(); + } } - if (iterErr) { - push(iterErr); - push(null, _.nil); - } - else if (iterElem.done) { - if (!_.isUndefined(iterElem.value)) { - // generators can return a final - // value on completion using return - // keyword otherwise value will be - // undefined - push(null, iterElem.value); + try { + var iterElem = it.next(); + + if (_.isFunction(iterElem.then)) { + iterElem + .then(pushIt) + .catch(function(err) { + push(err); + push(null, _.nil); + }); + } + else { + pushIt(iterElem); } - push(null, _.nil); } - else { - push(null, iterElem.value); - next(); + catch (err) { + push(err); + push(null, _.nil); } - }); } diff --git a/test/test.js b/test/test.js index 8915606..5aa6680 100755 --- a/test/test.js +++ b/test/test.js @@ -760,6 +760,29 @@ exports.constructor = { }, }; }; + this.createTestAsyncIterator = function(array, error, lastVal) { + var count = 0, + length = array.length; + return { + next: function() { + if (count < length) { + if (error && count === 2) { + return Promise.reject(error); + } + var iterElem = { + value: array[count], done: false, + }; + count++; + return Promise.resolve(iterElem); + } + else { + return Promise.resolve({ + value: lastVal, done: true, + }); + } + }, + }; + }; this.tester = function (expected, test) { return function (xs) { test.same(xs, expected); @@ -1234,6 +1257,53 @@ exports.constructor = { _(this.createTestIterator([1, 2, 3, 4, 5], void 0, 0)) .toArray(this.tester([1, 2, 3, 4, 5, 0], test)); }, + 'from iterator - yields promises without affecting them': function (test) { + test.expect(1); + + var unresolved = new Promise(Function.prototype); + + _(this.createTestIterator([unresolved])) + .toArray(function (array) { + Promise.race([array[0], 'pending']) + .then(function (value) { + test.equal(value, 'pending'); + test.done(); + }); + }); + }, + 'from async iterator': function (test) { + test.expect(1); + + _(this.createTestAsyncIterator([1, 2, 3, 4, 5], void 0, 0)) + .toArray(this.tester([1, 2, 3, 4, 5, 0], test)); + }, + 'from async iterator - error': function (test) { + test.expect(2); + _(this.createTestAsyncIterator([1, 2, 3, 4, 5], new Error('Error at index 2'))) + .errors(function (err) { + test.equals(err.message, 'Error at index 2'); + }) + .toArray(this.tester([1, 2], test)); + }, + 'from async iterator - final return falsy': function (test) { + test.expect(1); + _(this.createTestAsyncIterator([1, 2, 3, 4, 5], void 0, 0)) + .toArray(this.tester([1, 2, 3, 4, 5, 0], test)); + }, + 'from async iterator - yields promises without affecting them': function (test) { + test.expect(1); + + var unresolved = new Promise(Function.prototype); + + _(this.createTestAsyncIterator([unresolved])) + .toArray(function (array) { + Promise.race([array[0], 'pending']) + .then(function (value) { + test.equal(value, 'pending'); + test.done(); + }); + }); + }, 'only gutless streams and pipelines are writable': function (test) { test.ok(_().writable, 'gutless stream should be writable'); test.ok(_.pipeline(_.map(function (x) { return x + 1; })).writable, 'pipelines should be writable'); @@ -1396,8 +1466,8 @@ if (global.Map && global.Symbol) { _(map).toArray(function (xs) { test.same(xs, [['a', 1], ['b', 2], ['c', 3]]); + test.done(); }); - test.done(); }; exports['constructor from Map iterator'] = function (test) { @@ -1409,8 +1479,8 @@ if (global.Map && global.Symbol) { _(map.entries()).toArray(function (xs) { test.same(xs, [['a', 1], ['b', 2], ['c', 3]]); + test.done(); }); - test.done(); }; exports['constructor from empty Map iterator'] = function (test) { @@ -1419,8 +1489,8 @@ if (global.Map && global.Symbol) { _(map.entries()).toArray(function (xs) { test.same(xs, []); + test.done(); }); - test.done(); }; } @@ -1433,8 +1503,8 @@ if (global.Set && global.Symbol) { _(sett).toArray(function (xs) { test.same(xs, [1, 2, 3, 4]); + test.done(); }); - test.done(); }; exports['constructor from Set iterator'] = function (test) { @@ -1443,8 +1513,8 @@ if (global.Set && global.Symbol) { _(sett.values()).toArray(function (xs) { test.same(xs, [1, 2, 3, 4]); + test.done(); }); - test.done(); }; exports['constructor from empty Map iterator'] = function (test) { @@ -1453,8 +1523,8 @@ if (global.Set && global.Symbol) { _(sett.values()).toArray(function (xs) { test.same(xs, []); + test.done(); }); - test.done(); }; }