Skip to content

Commit

Permalink
Return underlying AsyncIterators when execute result is returned (#2843)
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/execution/execute.ts
  • Loading branch information
robrichard committed Jun 3, 2021
1 parent 96d70d9 commit 551fa58
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 3 deletions.
199 changes: 199 additions & 0 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { expect } from 'chai';
import { describe, it } from 'mocha';

import { invariant } from '../../jsutils/invariant';
import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
import { parse } from '../../language/parser';

Expand Down Expand Up @@ -74,6 +75,36 @@ const query = new GraphQLObjectType({
yield await Promise.resolve({});
},
},
asyncIterableListDelayed: {
type: new GraphQLList(friendType),
async *resolve() {
for (const friend of friends) {
// pause an additional ms before yielding to allow time
// for tests to return or throw before next value is processed.
// eslint-disable-next-line no-await-in-loop
await new Promise((r) => setTimeout(r, 1));
yield friend;
}
},
},
asyncIterableListNoReturn: {
type: new GraphQLList(friendType),
resolve() {
let i = 0;
return {
[Symbol.asyncIterator]: () => ({
async next() {
const friend = friends[i++];
if (friend) {
await new Promise((r) => setTimeout(r, 1));
return { value: friend, done: false };
}
return { value: undefined, done: true };
},
}),
};
},
},
asyncIterableListDelayedClose: {
type: new GraphQLList(friendType),
async *resolve() {
Expand Down Expand Up @@ -697,4 +728,172 @@ describe('Execute: stream directive', () => {
},
]);
});
it('Returns underlying async iterables when dispatcher is returned', async () => {
const document = parse(`
query {
asyncIterableListDelayed @stream(initialCount: 1) {
name
id
}
}
`);
const schema = new GraphQLSchema({ query });

const executeResult = await execute({ schema, document, rootValue: {} });
invariant(isAsyncIterable(executeResult));
const iterator = executeResult[Symbol.asyncIterator]();

const result1 = await iterator.next();
expect(result1).to.deep.equal({
done: false,
value: {
data: {
asyncIterableListDelayed: [
{
id: '1',
name: 'Luke',
},
],
},
hasNext: true,
},
});

iterator.return?.();

// this result had started processing before return was called
const result2 = await iterator.next();
expect(result2).to.deep.equal({
done: false,
value: {
data: {
id: '2',
name: 'Han',
},
hasNext: true,
path: ['asyncIterableListDelayed', 1],
},
});

// third result is not returned because async iterator has returned
const result3 = await iterator.next();
expect(result3).to.deep.equal({
done: false,
value: {
hasNext: false,
},
});
});
it('Can return async iterable when underlying iterable does not have a return method', async () => {
const document = parse(`
query {
asyncIterableListNoReturn @stream(initialCount: 1) {
name
id
}
}
`);
const schema = new GraphQLSchema({ query });

const executeResult = await execute({ schema, document, rootValue: {} });
invariant(isAsyncIterable(executeResult));
const iterator = executeResult[Symbol.asyncIterator]();

const result1 = await iterator.next();
expect(result1).to.deep.equal({
done: false,
value: {
data: {
asyncIterableListNoReturn: [
{
id: '1',
name: 'Luke',
},
],
},
hasNext: true,
},
});

iterator.return?.();

// this result had started processing before return was called
const result2 = await iterator.next();
expect(result2).to.deep.equal({
done: false,
value: {
data: {
id: '2',
name: 'Han',
},
hasNext: true,
path: ['asyncIterableListNoReturn', 1],
},
});

// third result is not returned because async iterator has returned
const result3 = await iterator.next();
expect(result3).to.deep.equal({
done: false,
value: {
hasNext: false,
},
});
});
it('Returns underlying async iterables when dispatcher is thrown', async () => {
const document = parse(`
query {
asyncIterableListDelayed @stream(initialCount: 1) {
name
id
}
}
`);
const schema = new GraphQLSchema({ query });

const executeResult = await execute({ schema, document, rootValue: {} });
invariant(isAsyncIterable(executeResult));
const iterator = executeResult[Symbol.asyncIterator]();

const result1 = await iterator.next();
expect(result1).to.deep.equal({
done: false,
value: {
data: {
asyncIterableListDelayed: [
{
id: '1',
name: 'Luke',
},
],
},
hasNext: true,
},
});

iterator.throw?.(new Error('bad'));

// this result had started processing before return was called
const result2 = await iterator.next();
expect(result2).to.deep.equal({
done: false,
value: {
data: {
id: '2',
name: 'Han',
},
hasNext: true,
path: ['asyncIterableListDelayed', 1],
},
});

// third result is not returned because async iterator has returned
const result3 = await iterator.next();
expect(result3).to.deep.equal({
done: false,
value: {
hasNext: false,
},
});
});
});
35 changes: 32 additions & 3 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1620,10 +1620,14 @@ interface DispatcherResult {
export class Dispatcher {
_subsequentPayloads: Array<Promise<IteratorResult<DispatcherResult, void>>>;
_initialResult?: ExecutionResult;
_iterators: Array<AsyncIterator<unknown>>;
_isDone: boolean;
_hasReturnedInitialResult: boolean;

constructor() {
this._subsequentPayloads = [];
this._iterators = [];
this._isDone = false;
this._hasReturnedInitialResult = false;
}

Expand Down Expand Up @@ -1692,13 +1696,16 @@ export class Dispatcher {
label?: string,
): void {
const subsequentPayloads = this._subsequentPayloads;
const iterators = this._iterators;
iterators.push(iterator);
function next(index: number) {
const fieldPath = addPath(path, index, undefined);
const patchErrors: Array<GraphQLError> = [];
subsequentPayloads.push(
iterator.next().then(
({ value: data, done }) => {
if (done) {
iterators.splice(iterators.indexOf(iterator), 1);
return { value: undefined, done: true };
}

Expand Down Expand Up @@ -1769,6 +1776,14 @@ export class Dispatcher {
}

_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
if (this._isDone) {
return Promise.resolve({
value: {
hasNext: false,
},
done: false,
});
}
return new Promise<{
promise: Promise<IteratorResult<DispatcherResult, void>>;
}>((resolve) => {
Expand Down Expand Up @@ -1828,15 +1843,29 @@ export class Dispatcher {
return this._race();
}

get(
initialResult: ExecutionResult,
): AsyncIterableIterator<AsyncExecutionResult> {
async _return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
await Promise.all(this._iterators.map((iterator) => iterator.return?.()));
this._isDone = true;
return { value: undefined, done: true };
}

async _throw(
error?: unknown,
): Promise<IteratorResult<AsyncExecutionResult, void>> {
await Promise.all(this._iterators.map((iterator) => iterator.return?.()));
this._isDone = true;
return Promise.reject(error);
}

get(initialResult: ExecutionResult): AsyncGenerator<AsyncExecutionResult> {
this._initialResult = initialResult;
return {
[Symbol.asyncIterator]() {
return this;
},
next: () => this._next(),
return: () => this._return(),
throw: (error?: unknown) => this._throw(error),
};
}
}
Expand Down

0 comments on commit 551fa58

Please sign in to comment.