Skip to content

Commit

Permalink
hew more closely to repeater js implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Jun 1, 2021
1 parent 99bda2d commit df69614
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 77 deletions.
60 changes: 14 additions & 46 deletions packages/utils/src/mapAsyncIterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,65 +7,33 @@
* so that all payloads will be delivered in the original order
*/

import { Push, Stop, Repeater } from '@repeaterjs/repeater';
import { Repeater } from '@repeaterjs/repeater';

export function mapAsyncIterator<T, U>(
iterator: AsyncIterator<T>,
mapValue: (value: T) => Promise<U> | U,
mapValue: (value: T) => Promise<U> | U
): AsyncIterableIterator<U> {
const returner = iterator.return?.bind(iterator) ?? (() => {});
const returner = iterator.return?.bind(iterator) ?? (() => true);

return new Repeater(async (push, stop) => {
let earlyReturn: any;
stop.then(() => {
earlyReturn = returner();
});

await loop(push, stop, earlyReturn, iterator, mapValue);
/* eslint-disable no-unmodified-loop-condition */
while (!earlyReturn) {
const iteration = await iterator.next();

await earlyReturn;
});
}

async function loop<T, U>(
push: Push<U>,
stop: Stop,
earlyReturn: Promise<any> | any,
iterator: AsyncIterator<T>,
mapValue: (value: T) => Promise<U> | U,
): Promise<void> {
/* eslint-disable no-unmodified-loop-condition */
while (!earlyReturn) {
const iteration = await next(iterator, mapValue);

if (iteration.done) {
if (iteration.value !== undefined) {
await push(iteration.value);
if (iteration.done) {
stop();
return iteration.value;
}
stop();
return;
}

await push(iteration.value);
}
/* eslint-enable no-unmodified-loop-condition */
}

async function next<T, U>(
iterator: AsyncIterator<T>,
mapValue: (value: T) => Promise<U> | U,
): Promise<IteratorResult<U>> {
const iterationCandidate = await iterator.next();

const value = iterationCandidate.value;
if (value === undefined) {
return iterationCandidate as IteratorResult<U>;
}

const newValue = await mapValue(iterationCandidate.value);
await push(mapValue(iteration.value));
}
/* eslint-enable no-unmodified-loop-condition */

return {
...iterationCandidate,
value: newValue,
};
await earlyReturn;
});
}
47 changes: 16 additions & 31 deletions packages/utils/src/splitAsyncIterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
// and: https://gist.github.com/jed/cc1e949419d42e2cb26d7f2e1645864d
// and also: https://github.com/repeaterjs/repeater/issues/48#issuecomment-569134039

import { Push, Stop, Repeater } from '@repeaterjs/repeater';
import { Repeater } from '@repeaterjs/repeater';

type Splitter<T> = (item: T) => [number | undefined, T];

export function splitAsyncIterator<T>(iterator: AsyncIterator<T>, n: number, splitter: Splitter<T>) {
const returner = iterator.return?.bind(iterator) ?? (() => {});
const returner = iterator.return?.bind(iterator) ?? (() => true);

const buffers: Array<Array<IteratorResult<T>>> = Array(n);
for (let i = 0; i < n; i++) {
Expand All @@ -26,41 +26,26 @@ export function splitAsyncIterator<T>(iterator: AsyncIterator<T>, n: number, spl
}
});

await loop(push, stop, earlyReturn, buffer, buffers, iterator, splitter);
/* eslint-disable no-unmodified-loop-condition */
while (!earlyReturn) {
const iteration = await next(buffer, buffers, iterator, splitter);

await earlyReturn;
});
});
}

async function loop<T>(
push: Push<T>,
stop: Stop,
earlyReturn: Promise<any> | any,
buffer: Array<IteratorResult<T>>,
buffers: Array<Array<IteratorResult<T>>>,
iterator: AsyncIterator<T>,
splitter: Splitter<T>
): Promise<void> {
/* eslint-disable no-unmodified-loop-condition */
while (!earlyReturn) {
const iteration = await next(buffer, buffers, iterator, splitter);
if (iteration === undefined) {
continue;
}

if (iteration === undefined) {
continue;
}
if (iteration.done) {
stop();
return iteration.value;
}

if (iteration.done) {
if (iteration.value !== undefined) {
await push(iteration.value);
}
stop();
return;
}
/* eslint-enable no-unmodified-loop-condition */

await push(iteration.value);
}
/* eslint-enable no-unmodified-loop-condition */
await earlyReturn;
});
});
}

async function next<T>(
Expand Down
48 changes: 48 additions & 0 deletions packages/utils/tests/splitAsyncIterator.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { splitAsyncIterator } from '../src/splitAsyncIterator';
import { mapAsyncIterator } from '../src/mapAsyncIterator';

describe('splitAsyncIterator', () => {
test('it works sequentially', async () => {
Expand Down Expand Up @@ -44,3 +45,50 @@ describe('splitAsyncIterator', () => {
expect(twoResults).toEqual([undefined, undefined, undefined]);
});
});

describe('splitAsyncIterator with mapAsyncIterator', () => {
test('it works sequentially', async () => {
const gen3 = async function* () {
for (let i = 0; i < 3; i++) {
yield i;
}
}();

const mappedGen3 = mapAsyncIterator(gen3, value => value);
const [one, two] = splitAsyncIterator(mappedGen3, 2, (x) => [0, x + 5]);

let results = [];
for await (const result of one) {
results.push(result);
}
expect(results).toEqual([5, 6, 7]);

results = [];
for await (const result of two) {
results.push(result);
}
expect(results).toEqual([]);
});

test('it works in parallel', async () => {
const gen3 = async function* () {
for (let i = 0; i < 3; i++) {
yield i;
}
}();

const mappedGen3 = mapAsyncIterator(gen3, value => value);
const [one, two] = splitAsyncIterator(mappedGen3, 2, (x) => [0, x + 5]);

const oneResults = [];
const twoResults = [];
for (let i = 0; i < 3; i++) {
const results = await Promise.all([one.next(), two.next()]);
oneResults.push(results[0].value);
twoResults.push(results[1].value);
}

expect(oneResults).toEqual([5, 6, 7]);
expect(twoResults).toEqual([undefined, undefined, undefined]);
});
});

0 comments on commit df69614

Please sign in to comment.