Skip to content

Commit

Permalink
stream: add drop and take
Browse files Browse the repository at this point in the history
This adds the `drop` and `take` methods to readable streams allowing
users easily drop and take items from the stream.

This continues the iterator-helper proposal alignment task.

Co-Authored-By: Robert Nagy <ronagy@icloud.com>
PR-URL: nodejs#41630
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
2 people authored and Linkgoron committed Jan 31, 2022
1 parent 96ce950 commit 9518a3f
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 0 deletions.
44 changes: 44 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2074,6 +2074,50 @@ for await (const result of concatResult) {
}
```

### `readable.drop(limit[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `limit` {number} the number of chunks to drop from the readable.
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream with `limit` chunks dropped.

This method returns a new stream with the first `limit` chunks dropped.

```mjs
import { Readable } from 'stream';

await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4]
```

### `readable.take(limit[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `limit` {number} the number of chunks to take from the readable.
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream with `limit` chunks taken.

This method returns a new stream with the first `limit` chunks.

```mjs
import { Readable } from 'stream';

await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
55 changes: 55 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const { AbortController } = require('internal/abort_controller');
const {
codes: {
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE,
},
AbortError,
} = require('internal/errors');
Expand All @@ -14,6 +15,8 @@ const { kWeakHandler } = require('internal/event_target');
const {
ArrayPrototypePush,
MathFloor,
Number,
NumberIsNaN,
Promise,
PromiseReject,
PromisePrototypeCatch,
Expand Down Expand Up @@ -232,10 +235,62 @@ async function* flatMap(fn, options) {
}
}

function toIntegerOrInfinity(number) {
// We coerce here to align with the spec
// https://github.com/tc39/proposal-iterator-helpers/issues/169
number = Number(number);
if (NumberIsNaN(number)) {
return 0;
}
if (number < 0) {
throw new ERR_OUT_OF_RANGE('number', '>= 0', number);
}
return number;
}

function drop(number, options) {
number = toIntegerOrInfinity(number);
return async function* drop() {
if (options?.signal?.aborted) {
throw new AbortError();
}
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError();
}
if (number-- <= 0) {
yield val;
}
}
}.call(this);
}


function take(number, options) {
number = toIntegerOrInfinity(number);
return async function* take() {
if (options?.signal?.aborted) {
throw new AbortError();
}
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError();
}
if (number-- > 0) {
yield val;
} else {
return;
}
}
}.call(this);
}

module.exports.streamReturningOperators = {
drop,
filter,
flatMap,
map,
take,
};

module.exports.promiseReturningOperators = {
Expand Down
96 changes: 96 additions & 0 deletions test/parallel/test-stream-drop-take.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const { deepStrictEqual, rejects, throws } = require('assert');

const { from } = Readable;

const fromAsync = (...args) => from(...args).map(async (x) => x);

const naturals = () => from(async function*() {
let i = 1;
while (true) {
yield i++;
}
}());

{
// Synchronous streams
(async () => {
deepStrictEqual(await from([1, 2, 3]).drop(2).toArray(), [3]);
deepStrictEqual(await from([1, 2, 3]).take(1).toArray(), [1]);
deepStrictEqual(await from([]).drop(2).toArray(), []);
deepStrictEqual(await from([]).take(1).toArray(), []);
deepStrictEqual(await from([1, 2, 3]).drop(1).take(1).toArray(), [2]);
deepStrictEqual(await from([1, 2]).drop(0).toArray(), [1, 2]);
deepStrictEqual(await from([1, 2]).take(0).toArray(), []);
})().then(common.mustCall());
// Asynchronous streams
(async () => {
deepStrictEqual(await fromAsync([1, 2, 3]).drop(2).toArray(), [3]);
deepStrictEqual(await fromAsync([1, 2, 3]).take(1).toArray(), [1]);
deepStrictEqual(await fromAsync([]).drop(2).toArray(), []);
deepStrictEqual(await fromAsync([]).take(1).toArray(), []);
deepStrictEqual(await fromAsync([1, 2, 3]).drop(1).take(1).toArray(), [2]);
deepStrictEqual(await fromAsync([1, 2]).drop(0).toArray(), [1, 2]);
deepStrictEqual(await fromAsync([1, 2]).take(0).toArray(), []);
})().then(common.mustCall());
// Infinite streams
// Asynchronous streams
(async () => {
deepStrictEqual(await naturals().take(1).toArray(), [1]);
deepStrictEqual(await naturals().drop(1).take(1).toArray(), [2]);
const next10 = [11, 12, 13, 14, 15, 16, 17, 18, 19, 20];
deepStrictEqual(await naturals().drop(10).take(10).toArray(), next10);
deepStrictEqual(await naturals().take(5).take(1).toArray(), [1]);
})().then(common.mustCall());
}

{
// Coercion
(async () => {
// The spec made me do this ^^
deepStrictEqual(await naturals().take('cat').toArray(), []);
deepStrictEqual(await naturals().take('2').toArray(), [1, 2]);
deepStrictEqual(await naturals().take(true).toArray(), [1]);
})().then(common.mustCall());
}

{
// Support for AbortSignal
const ac = new AbortController();
rejects(
Readable.from([1, 2, 3]).take(1, { signal: ac.signal }).toArray(), {
name: 'AbortError',
}).then(common.mustCall());
rejects(
Readable.from([1, 2, 3]).drop(1, { signal: ac.signal }).toArray(), {
name: 'AbortError',
}).then(common.mustCall());
ac.abort();
}

{
// Support for AbortSignal, already aborted
const signal = AbortSignal.abort();
rejects(
Readable.from([1, 2, 3]).take(1, { signal }).toArray(), {
name: 'AbortError',
}).then(common.mustCall());
}

{
// Error cases
const invalidArgs = [
-1,
-Infinity,
-40,
];

for (const example of invalidArgs) {
throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/);
}
}

0 comments on commit 9518a3f

Please sign in to comment.