Skip to content

Commit

Permalink
stream: add more filter tests
Browse files Browse the repository at this point in the history
PR-URL: nodejs/node#41936
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
  • Loading branch information
benjamingr authored and guangwong committed Oct 10, 2022
1 parent 7662ae4 commit 1ea8af3
Showing 1 changed file with 69 additions and 1 deletion.
70 changes: 69 additions & 1 deletion test/parallel/test-stream-filter.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
Readable,
} = require('stream');
const assert = require('assert');
const { once } = require('events');
const { setTimeout } = require('timers/promises');

{
Expand Down Expand Up @@ -46,13 +47,80 @@ const { setTimeout } = require('timers/promises');
})().then(common.mustCall());
}

{
// Filter works on an infinite stream
const stream = Readable.from(async function* () {
while (true) yield 1;
}()).filter(common.mustCall(async (x) => {
return x < 3;
}, 5));
(async () => {
let i = 1;
for await (const item of stream) {
assert.strictEqual(item, 1);
if (++i === 5) break;
}
})().then(common.mustCall());
}

{
// Filter works on constructor created streams
let i = 0;
const stream = new Readable({
read() {
if (i === 10) {
this.push(null);
return;
}
this.push(Uint8Array.from([i]));
i++;
},
highWaterMark: 0,
}).filter(common.mustCall(async ([x]) => {
return x !== 5;
}, 10));
(async () => {
const result = (await stream.toArray()).map((x) => x[0]);
const expected = [...Array(10).keys()].filter((x) => x !== 5);
assert.deepStrictEqual(result, expected);
})().then(common.mustCall());
}

{
// Throwing an error during `filter` (sync)
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => {
if (x === 3) {
throw new Error('boom');
}
return true;
});
assert.rejects(
stream.map((x) => x + x).toArray(),
/boom/,
).then(common.mustCall());
}

{
// Throwing an error during `filter` (async)
const stream = Readable.from([1, 2, 3, 4, 5]).filter(async (x) => {
if (x === 3) {
throw new Error('boom');
}
return true;
});
assert.rejects(
stream.filter(() => true).toArray(),
/boom/,
).then(common.mustCall());
}

{
// Concurrency + AbortSignal
const ac = new AbortController();
let calls = 0;
const stream = Readable.from([1, 2, 3, 4]).filter(async (_, { signal }) => {
calls++;
await setTimeout(100, { signal });
await once(signal, 'abort');
}, { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
Expand Down

0 comments on commit 1ea8af3

Please sign in to comment.