Skip to content

Return underlying AsyncIterators when execute result is returned #2843

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions src/execution/__tests__/stream-test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { expect } from 'chai';
import { describe, it } from 'mocha';

import invariant from '../../jsutils/invariant';
import isAsyncIterable from '../../jsutils/isAsyncIterable';
import { parse } from '../../language/parser';

Expand Down Expand Up @@ -72,6 +73,36 @@ const query = new GraphQLObjectType({
yield {};
},
},
asyncIterableListDelayed: {
type: new GraphQLList(friendType),
async *resolve() {
for (const friend of friends) {
// pause an additional ms before yielding to allow time
// for tests to return or throw before next value is processed.
// eslint-disable-next-line no-await-in-loop
await new Promise((r) => setTimeout(r, 1));
yield friend;
}
},
},
asyncIterableListNoReturn: {
type: new GraphQLList(friendType),
resolve() {
let i = 0;
return {
[Symbol.asyncIterator]: () => ({
async next() {
const friend = friends[i++];
if (friend) {
await new Promise((r) => setTimeout(r, 1));
return { value: friend, done: false };
}
return { value: undefined, done: true };
},
}),
};
},
},
asyncIterableListDelayedClose: {
type: new GraphQLList(friendType),
async *resolve() {
Expand Down Expand Up @@ -626,4 +657,114 @@ describe('Execute: stream directive', () => {
},
]);
});
it('Returns underlying async iterables when dispatcher is returned', async () => {
const document = parse(`
query {
asyncIterableListDelayed @stream(initialCount: 1) {
name
id
}
}
`);
const schema = new GraphQLSchema({ query });

const executeResult = await execute(schema, document, {});
invariant(isAsyncIterable(executeResult));

const result1 = await executeResult.next();
expect(result1).to.deep.equal({
done: false,
value: {
data: {
asyncIterableListDelayed: [
{
id: '1',
name: 'Luke',
},
],
},
hasNext: true,
},
});

executeResult.return();

// this result had started processing before return was called
const result2 = await executeResult.next();
expect(result2).to.deep.equal({
done: false,
value: {
data: {
id: '2',
name: 'Han',
},
hasNext: true,
path: ['asyncIterableListDelayed', 1],
},
});

// third result is not returned because async iterator has returned
const result3 = await executeResult.next();
expect(result3).to.deep.equal({
done: false,
value: {
hasNext: false,
},
});
});
it('Can return async iterable when underlying iterable does not have a return method', async () => {
const document = parse(`
query {
asyncIterableListNoReturn @stream(initialCount: 1) {
name
id
}
}
`);
const schema = new GraphQLSchema({ query });

const executeResult = await execute(schema, document, {});
invariant(isAsyncIterable(executeResult));

const result1 = await executeResult.next();
expect(result1).to.deep.equal({
done: false,
value: {
data: {
asyncIterableListNoReturn: [
{
id: '1',
name: 'Luke',
},
],
},
hasNext: true,
},
});

executeResult.return();

// this result had started processing before return was called
const result2 = await executeResult.next();
expect(result2).to.deep.equal({
done: false,
value: {
data: {
id: '2',
name: 'Han',
},
hasNext: true,
path: ['asyncIterableListNoReturn', 1],
},
});

// third result is not returned because async iterator has returned
const result3 = await executeResult.next();
expect(result3).to.deep.equal({
done: false,
value: {
hasNext: false,
},
});
});
});
26 changes: 26 additions & 0 deletions src/execution/execute.js
Original file line number Diff line number Diff line change
Expand Up @@ -1645,11 +1645,15 @@ type DispatcherResult = {|
*/
export class Dispatcher {
_subsequentPayloads: Array<Promise<IteratorResult<DispatcherResult, void>>>;
_iterators: Array<AsyncIterator<mixed>>;
_isDone: boolean;
_initialResult: ?ExecutionResult;
_hasReturnedInitialResult: boolean;

constructor() {
this._subsequentPayloads = [];
this._iterators = [];
this._isDone = false;
this._hasReturnedInitialResult = false;
}

Expand Down Expand Up @@ -1718,13 +1722,16 @@ export class Dispatcher {
itemType: GraphQLOutputType,
): void {
const subsequentPayloads = this._subsequentPayloads;
const iterators = this._iterators;
iterators.push(iterator);
function next(index) {
const fieldPath = addPath(path, index);
const patchErrors = [];
subsequentPayloads.push(
iterator.next().then(
({ value: data, done }) => {
if (done) {
iterators.splice(iterators.indexOf(iterator), 1);
return { value: undefined, done: true };
}

Expand Down Expand Up @@ -1795,6 +1802,14 @@ export class Dispatcher {
}

_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
if (this._isDone) {
return Promise.resolve({
value: {
hasNext: false,
},
done: false,
});
}
return new Promise((resolve) => {
this._subsequentPayloads.forEach((promise) => {
promise.then(() => {
Expand Down Expand Up @@ -1851,13 +1866,24 @@ export class Dispatcher {
return this._race();
}

_return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
return Promise.all(
// $FlowFixMe[prop-missing]
this._iterators.map((iterator) => iterator.return?.()),
).then(() => {
this._isDone = true;
return { value: undefined, done: true };
});
}

get(initialResult: ExecutionResult): AsyncIterable<AsyncExecutionResult> {
this._initialResult = initialResult;
return ({
[SYMBOL_ASYNC_ITERATOR]() {
return this;
},
next: () => this._next(),
return: () => this._return(),
}: any);
}
}
Expand Down