Skip to content

Commit

Permalink
stream: add drop and take
Browse files Browse the repository at this point in the history
  • Loading branch information
benjamingr committed Jan 21, 2022
1 parent c340256 commit f5c152b
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
23 changes: 23 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,33 @@ async function* flatMap(fn, options) {
}
}

async function* drop(number) {
validateInteger(number, 'number', 0);
for await (const val of this) {
if (number-- <= 0) {
yield await val;
}
}
}


async function* take(number) {
validateInteger(number, 'number', 0);
for await (const val of this) {
if (number-- > 0) {
yield val;
} else {
return;
}
}
}

module.exports.streamReturningOperators = {
drop,
filter,
flatMap,
map,
take,
};

module.exports.promiseReturningOperators = {
Expand Down
67 changes: 67 additions & 0 deletions test/parallel/test-stream-drop-take.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const { deepStrictEqual, rejects } = require('assert');

const { from } = Readable;
{
// Synchronous streams
(async () => {
deepStrictEqual(await from([1, 2, 3]).drop(2).toArray(), [3]);
deepStrictEqual(await from([1, 2, 3]).take(1).toArray(), [1]);
deepStrictEqual(await from([]).drop(2).toArray(), []);
deepStrictEqual(await from([]).take(1).toArray(), []);
deepStrictEqual(await from([1, 2, 3]).drop(1).take(1).toArray(), [2]);
deepStrictEqual(await from([1, 2]).drop(0).toArray(), [1, 2]);
deepStrictEqual(await from([1, 2]).take(0).toArray(), []);
})().then(common.mustCall());
// Asynchronous streams
(async () => {
const fromAsync = (...args) => from(...args).map(async (x) => x);
deepStrictEqual(await fromAsync([1, 2, 3]).drop(2).toArray(), [3]);
deepStrictEqual(await fromAsync([1, 2, 3]).take(1).toArray(), [1]);
deepStrictEqual(await fromAsync([]).drop(2).toArray(), []);
deepStrictEqual(await fromAsync([]).take(1).toArray(), []);
deepStrictEqual(await fromAsync([1, 2, 3]).drop(1).take(1).toArray(), [2]);
deepStrictEqual(await fromAsync([1, 2]).drop(0).toArray(), [1, 2]);
deepStrictEqual(await fromAsync([1, 2]).take(0).toArray(), []);
})().then(common.mustCall());
// Infinite streams
// Asynchronous streams
(async () => {
const naturals = () => from(async function*() {
let i = 1;
while (true) {
yield i++;
}
}());
deepStrictEqual(await naturals().take(1).toArray(), [1]);
deepStrictEqual(await naturals().drop(1).take(1).toArray(), [2]);
const next10 = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20];
deepStrictEqual(await naturals().drop(10).take(10).toArray(), next10);
deepStrictEqual(await naturals().take(5).take(1).toArray(), [1]);
})().then(common.mustCall());
}
{
// Error cases
// TODO(benjamingr) check if the spec coerces
const invalidArgs = [
'5',
undefined,
null,
{},
[],
from([1, 2, 3]),
Promise.resolve(5),
];

for (const example of invalidArgs) {
rejects(
from([]).take(example).toArray(),
/ERR_INVALID_ARG_TYPE/
).then(common.mustCall());
}
}

0 comments on commit f5c152b

Please sign in to comment.