Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] Ensure return is called on AsyncIterators #348

Merged
merged 6 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
"console": "integratedTerminal",
"program": "${workspaceFolder}/node_modules/.bin/jest",
"skipFiles": [
"<node_internals>/**/*.js",
"${workspaceFolder}/node_modules/**/*.js"
"<node_internals>/**/*",
"${workspaceFolder}/node_modules/**/*",
],
"env": {
"NODE_NO_WARNINGS": "1",
Expand Down
32 changes: 16 additions & 16 deletions spec/asynciterable-operators/concatmap-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { hasNext, noNext } from '../asynciterablehelpers';
import { hasNext, hasErr, noNext } from '../asynciterablehelpers';
import { of, range, sequenceEqual, throwError } from 'ix/asynciterable';
import { map, tap, concatMap } from 'ix/asynciterable/operators';

Expand All @@ -7,13 +7,13 @@ test('AsyncIterable#concatMap with range', async () => {
const ys = xs.pipe(concatMap(async (x) => range(0, x)));

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
hasNext(it, 0);
hasNext(it, 1);
hasNext(it, 2);
noNext(it);
await hasNext(it, 0);
await hasNext(it, 0);
await hasNext(it, 1);
await hasNext(it, 0);
await hasNext(it, 1);
await hasNext(it, 2);
await noNext(it);
});

test('AsyncIterable#concatMap order of effects', async () => {
Expand All @@ -35,10 +35,10 @@ test('AsyncIterable#concatMap selector returns throw', async () => {
const ys = xs.pipe(concatMap(async (x) => (x < 3 ? range(0, x) : throwError(err))));

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
await expect(it.next()).rejects.toThrow(err);
await hasNext(it, 0);
await hasNext(it, 0);
await hasNext(it, 1);
await hasErr(it, err);
});

test('AsyncIterable#concatMap with error throws', async () => {
Expand All @@ -63,8 +63,8 @@ test('AsyncIterable#concatMap selector throws error', async () => {
);

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
await expect(it.next()).rejects.toThrow(err);
await hasNext(it, 0);
await hasNext(it, 0);
await hasNext(it, 1);
await hasErr(it, err);
});
57 changes: 54 additions & 3 deletions spec/asynciterable-operators/debounce-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { hasNext, noNext, delayValue } from '../asynciterablehelpers';
import { debounce } from 'ix/asynciterable/operators';
import { hasNext, hasErr, noNext, delayError, delayValue } from '../asynciterablehelpers';
import { debounce, finalize } from 'ix/asynciterable/operators';
import { as } from 'ix/asynciterable';
import { AbortError } from 'ix/Ix';

Expand Down Expand Up @@ -53,8 +53,59 @@ test(
const it = ys[Symbol.asyncIterator](controller.signal);
await hasNext(it, 1);
setImmediate(() => controller.abort());
await expect(hasNext(it, 3)).rejects.toThrow(AbortError);
await hasErr(it, AbortError);
await noNext(it);
},
10 * 1000
);

test(
'AsyncIterable#debounce triggers finalize on error',
async () => {
let done = false;
const e = new Error();
const xs = async function* () {
yield await delayValue(1, 100);
yield await delayError(e, 100);
yield await delayValue(3, 100);
};
const ys = as(xs()).pipe(
finalize(() => {
done = true;
}),
debounce(50)
);

const it = ys[Symbol.asyncIterator]();
await hasNext(it, 1);
await hasErr(it, e);
await noNext(it);
expect(done).toBeTruthy();
},
10 * 1000
);

test(
'AsyncIterable#debounce triggers finalize on complete',
async () => {
let done = false;
const xs = async function* () {
yield await delayValue(1, 200);
yield await delayValue(2, 400);
yield await delayValue(3, 200);
};
const ys = as(xs()).pipe(
finalize(() => {
done = true;
}),
debounce(300)
);

const it = ys[Symbol.asyncIterator]();
await hasNext(it, 1);
await hasNext(it, 3);
await noNext(it);
expect(done).toBeTruthy();
},
10 * 1000
);
74 changes: 69 additions & 5 deletions spec/asynciterable-operators/finalize-spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { hasNext, noNext } from '../asynciterablehelpers';
import { hasNext, hasErr, noNext } from '../asynciterablehelpers';
import { range, throwError } from 'ix/asynciterable';
import { finalize } from 'ix/asynciterable/operators';
import { flatMap, finalize, tap } from 'ix/asynciterable/operators';

test('AsyncIterable#finally defers behavior', async () => {
test('AsyncIterable#finalize defers behavior', async () => {
let done = false;

const xs = range(0, 2).pipe(
Expand All @@ -25,7 +25,7 @@ test('AsyncIterable#finally defers behavior', async () => {
expect(done).toBeTruthy();
});

test('AsyncIterable#finally calls even with error', async () => {
test('AsyncIterable#finalize calls even with error', async () => {
let done = false;

const err = new Error();
Expand All @@ -34,12 +34,76 @@ test('AsyncIterable#finally calls even with error', async () => {
done = true;
})
);

expect(done).toBeFalsy();

const it = xs[Symbol.asyncIterator]();

expect(done).toBeFalsy();

await hasErr(it, err);

expect(done).toBeTruthy();
});

test('AsyncIterable#finalize calls with downstream error', async () => {
let done = false;

const err = new Error();
const xs = range(0, 2).pipe(
finalize(async () => {
done = true;
}),
tap(async () => {
throw err;
})
);

expect(done).toBeFalsy();

const it = xs[Symbol.asyncIterator]();

expect(done).toBeFalsy();

await expect(hasNext(it, 0)).rejects.toThrow(err);
await hasErr(it, err);

expect(done).toBeTruthy();
});

test('AsyncIterable#finalize calls with downstream error from flattening', async () => {
let done = false;
// let srcValues = [] as number[];

const err = new Error();
const xs = range(0, 4).pipe(
finalize(async () => {
done = true;
}),
flatMap(async (x) => {
// srcValues.push(x);
if (x === 1) {
return throwError(err);
}
return [x];
})
);

expect(done).toBeFalsy();

const it = xs[Symbol.asyncIterator]();

expect(done).toBeFalsy();

await hasNext(it, 0);
await hasErr(it, err);
await noNext(it);

expect(done).toBeTruthy();
// The source will yield one more value after the throwError(err),
// because the internal Promise.race([outer, inner]) call resolves
// to the last outer value before the inner error. This is an artifact
// of the JS Promise scheduler's breadth-first scheduling behavior, not
// a bug in IxJS.
// TODO: This is broken in google-closure-compiler?
// expect(srcValues).toEqual([0, 1, 2]);
});
46 changes: 36 additions & 10 deletions spec/asynciterable-operators/flatmap-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { hasNext } from '../asynciterablehelpers';
import { hasNext, hasErr } from '../asynciterablehelpers';
import { of, range, throwError, toArray } from 'ix/asynciterable';
import { flatMap } from 'ix/asynciterable/operators';

Expand All @@ -10,15 +10,25 @@ test('AsyncIterable#flatMap with range', async () => {
});

test('AsyncIterable#flatMap selector returns throw', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(flatMap((x) => (x < 3 ? range(0, x) : throwError(err))));

const it = ys[Symbol.asyncIterator]();
await hasNext(it, 0);
await hasNext(it, 0);
await hasErr(it, err);
});

test('AsyncIterable#flatMap async selector returns throw', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(flatMap(async (x) => (x < 3 ? range(0, x) : throwError(err))));

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
await expect(it.next()).rejects.toThrow(err);
await hasNext(it, 0);
await hasNext(it, 0);
await hasErr(it, err);
});

test('AsyncIterable#flatMap with error throws', async () => {
Expand All @@ -27,10 +37,27 @@ test('AsyncIterable#flatMap with error throws', async () => {
const ys = xs.pipe(flatMap((x) => range(0, x)));

const it = ys[Symbol.asyncIterator]();
await expect(it.next()).rejects.toThrow(err);
await hasErr(it, err);
});

test('AsyncIterable#flatMap selector throws error', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(
flatMap((x) => {
if (x < 3) {
return range(0, x);
}
throw err;
})
);

const it = ys[Symbol.asyncIterator]();
await hasNext(it, 0);
await hasErr(it, err);
});

test('AsyncIterable#flatMap async selector throws error', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(
Expand All @@ -43,8 +70,7 @@ test('AsyncIterable#flatMap selector throws error', async () => {
);

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
await expect(it.next()).rejects.toThrow(err);
await hasNext(it, 0);
await hasNext(it, 0);
await hasErr(it, err);
});
26 changes: 23 additions & 3 deletions spec/asynciterable-operators/timeout-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { hasNext, noNext, delayValue } from '../asynciterablehelpers';
import { timeout } from 'ix/asynciterable/operators';
import { hasNext, hasErr, noNext, delayValue } from '../asynciterablehelpers';
import { timeout, finalize } from 'ix/asynciterable/operators';
import { as } from 'ix/asynciterable';
import { TimeoutError } from 'ix/asynciterable/operators/timeout';

Expand Down Expand Up @@ -27,6 +27,26 @@ test('AsyncIterable#timeout throws when delayed', async () => {

const it = ys[Symbol.asyncIterator]();
await hasNext(it, 1);
await expect(it.next()).rejects.toThrow(TimeoutError);
await hasErr(it, TimeoutError);
await noNext(it);
});

test('AsyncIterable#timeout triggers finalize', async () => {
let done = false;
const xs = async function* () {
yield await delayValue(1, 50);
yield await delayValue(2, 200);
};
const ys = as(xs()).pipe(
finalize(() => {
done = true;
}),
timeout(100)
);

const it = ys[Symbol.asyncIterator]();
await hasNext(it, 1);
await hasErr(it, TimeoutError);
await noNext(it);
expect(done).toBeTruthy();
});
19 changes: 14 additions & 5 deletions spec/asynciterablehelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ import { AsyncIterableX } from 'ix/asynciterable';
import { Observer, PartialObserver } from '../src/observer';

export async function hasNext<T>(source: AsyncIterator<T>, expected: T) {
const { done, value } = await source.next();
expect(done).toBeFalsy();
expect(value).toEqual(expected);
await expect(source.next()).resolves.toEqual({ done: false, value: expected });
}

export async function hasErr(source: AsyncIterator<any>, expected: any) {
await expect(source.next()).rejects.toThrow(expected);
}

export async function noNext<T>(source: AsyncIterator<T>) {
const next = await source.next();
expect(next.done).toBeTruthy();
await expect(source.next()).resolves.toEqual({ done: true, value: undefined });
}

export function delayValue<T>(item: T, delay: number): Promise<T> {
Expand All @@ -21,6 +22,14 @@ export function delayValue<T>(item: T, delay: number): Promise<T> {
});
}

export function delayError<T>(item: T, delay: number): Promise<void> {
return new Promise<void>((_, reject) => {
setTimeout(() => {
reject(item);
}, delay);
});
}

const noop = (_?: any) => {
/**/
};
Expand Down
4 changes: 2 additions & 2 deletions src/asynciterable/asynciterablex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ export abstract class AsyncIterableX<T> implements AsyncIterable<T> {
*/
static as(source: string): AsyncIterableX<string>;
/**
* Converts the async iterable like input into an async-iterable.
* Converts the AsyncIterable-like input or single element into an AsyncIterable.
*
* @template T The type of elements in the async-iterable like sequence.
* @param {AsyncIterableInput<T>} source The async-iterable like input to convert to an async-iterable.
* @returns {AsyncIterableX<T>} An async-iterable stream from elements in the async-iterable like sequence.
*/
static as<T>(source: AsyncIterableInput<T>): AsyncIterableX<T>;
static as<T>(source: AsyncIterableInput<T> | T): AsyncIterableX<T>;
/**
* Converts the single element into an async-iterable sequence.
*
Expand Down
Loading