Skip to content

Commit

Permalink
Merge pull request #682 from dougmoscrop/master
Browse files Browse the repository at this point in the history
Support async generators
  • Loading branch information
vqvu authored Jul 7, 2019
2 parents e504b42 + f265c95 commit 97397c1
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 28 deletions.
58 changes: 36 additions & 22 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ function bindContext(fn, context) {
* i.e., contains a method that returns an object that conforms to the iterator protocol. The stream will use the
* iterator defined in the `Symbol.iterator` property of the iterable object to generate emitted values.
*
* **Asynchronous Iterator -** Accepts an iterator produced by an ES8 [async generator function](https://github.com/tc39/proposal-async-iteration#async-generator-functions),
* yields all the values from the iterator by resolving its `next()` method and terminates when the
* iterator's done value returns true. If the iterator's `next()` method throws or rejects, the exception will be emitted as an error,
* and the stream will be ended with no further calls to `next()`.
*
* @id _(source)
* @section Stream Objects
* @name _(source)
Expand Down Expand Up @@ -679,33 +684,42 @@ function promiseStream(StreamCtor, promise) {

function iteratorStream(StreamCtor, it) {
return new StreamCtor(function (push, next) {
var iterElem, iterErr;
try {
iterElem = it.next();
}
catch (err) {
iterErr = err;
function pushIt(iterElem) {
if (iterElem.done) {
if (!_.isUndefined(iterElem.value)) {
// generators can return a final
// value on completion using return
// keyword otherwise value will be
// undefined
push(null, iterElem.value);
}
push(null, _.nil);
}
else {
push(null, iterElem.value);
next();
}
}

if (iterErr) {
push(iterErr);
push(null, _.nil);
}
else if (iterElem.done) {
if (!_.isUndefined(iterElem.value)) {
// generators can return a final
// value on completion using return
// keyword otherwise value will be
// undefined
push(null, iterElem.value);
try {
var iterElem = it.next();

if (_.isFunction(iterElem.then)) {
iterElem
.then(pushIt)
.catch(function(err) {
push(err);
push(null, _.nil);
});
}
else {
pushIt(iterElem);
}
push(null, _.nil);
}
else {
push(null, iterElem.value);
next();
catch (err) {
push(err);
push(null, _.nil);
}

});
}

Expand Down
82 changes: 76 additions & 6 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,29 @@ exports.constructor = {
},
};
};
this.createTestAsyncIterator = function(array, error, lastVal) {
var count = 0,
length = array.length;
return {
next: function() {
if (count < length) {
if (error && count === 2) {
return Promise.reject(error);
}
var iterElem = {
value: array[count], done: false,
};
count++;
return Promise.resolve(iterElem);
}
else {
return Promise.resolve({
value: lastVal, done: true,
});
}
},
};
};
this.tester = function (expected, test) {
return function (xs) {
test.same(xs, expected);
Expand Down Expand Up @@ -1234,6 +1257,53 @@ exports.constructor = {
_(this.createTestIterator([1, 2, 3, 4, 5], void 0, 0))
.toArray(this.tester([1, 2, 3, 4, 5, 0], test));
},
'from iterator - yields promises without affecting them': function (test) {
test.expect(1);

var unresolved = new Promise(Function.prototype);

_(this.createTestIterator([unresolved]))
.toArray(function (array) {
Promise.race([array[0], 'pending'])
.then(function (value) {
test.equal(value, 'pending');
test.done();
});
});
},
'from async iterator': function (test) {
test.expect(1);

_(this.createTestAsyncIterator([1, 2, 3, 4, 5], void 0, 0))
.toArray(this.tester([1, 2, 3, 4, 5, 0], test));
},
'from async iterator - error': function (test) {
test.expect(2);
_(this.createTestAsyncIterator([1, 2, 3, 4, 5], new Error('Error at index 2')))
.errors(function (err) {
test.equals(err.message, 'Error at index 2');
})
.toArray(this.tester([1, 2], test));
},
'from async iterator - final return falsy': function (test) {
test.expect(1);
_(this.createTestAsyncIterator([1, 2, 3, 4, 5], void 0, 0))
.toArray(this.tester([1, 2, 3, 4, 5, 0], test));
},
'from async iterator - yields promises without affecting them': function (test) {
test.expect(1);

var unresolved = new Promise(Function.prototype);

_(this.createTestAsyncIterator([unresolved]))
.toArray(function (array) {
Promise.race([array[0], 'pending'])
.then(function (value) {
test.equal(value, 'pending');
test.done();
});
});
},
'only gutless streams and pipelines are writable': function (test) {
test.ok(_().writable, 'gutless stream should be writable');
test.ok(_.pipeline(_.map(function (x) { return x + 1; })).writable, 'pipelines should be writable');
Expand Down Expand Up @@ -1396,8 +1466,8 @@ if (global.Map && global.Symbol) {

_(map).toArray(function (xs) {
test.same(xs, [['a', 1], ['b', 2], ['c', 3]]);
test.done();
});
test.done();
};

exports['constructor from Map iterator'] = function (test) {
Expand All @@ -1409,8 +1479,8 @@ if (global.Map && global.Symbol) {

_(map.entries()).toArray(function (xs) {
test.same(xs, [['a', 1], ['b', 2], ['c', 3]]);
test.done();
});
test.done();
};

exports['constructor from empty Map iterator'] = function (test) {
Expand All @@ -1419,8 +1489,8 @@ if (global.Map && global.Symbol) {

_(map.entries()).toArray(function (xs) {
test.same(xs, []);
test.done();
});
test.done();
};

}
Expand All @@ -1433,8 +1503,8 @@ if (global.Set && global.Symbol) {

_(sett).toArray(function (xs) {
test.same(xs, [1, 2, 3, 4]);
test.done();
});
test.done();
};

exports['constructor from Set iterator'] = function (test) {
Expand All @@ -1443,8 +1513,8 @@ if (global.Set && global.Symbol) {

_(sett.values()).toArray(function (xs) {
test.same(xs, [1, 2, 3, 4]);
test.done();
});
test.done();
};

exports['constructor from empty Map iterator'] = function (test) {
Expand All @@ -1453,8 +1523,8 @@ if (global.Set && global.Symbol) {

_(sett.values()).toArray(function (xs) {
test.same(xs, []);
test.done();
});
test.done();
};

}
Expand Down

0 comments on commit 97397c1

Please sign in to comment.