Skip to content

Commit 55e15eb

Browse files
committed
Return underlying AsyncIterators when execute result is returned (#2843)
# Conflicts: # src/execution/execute.ts
1 parent a3b7a3a commit 55e15eb

File tree

2 files changed

+231
-3
lines changed

2 files changed

+231
-3
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { expect } from 'chai';
22
import { describe, it } from 'mocha';
33

4+
import { invariant } from '../../jsutils/invariant';
45
import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
56
import { parse } from '../../language/parser';
67

@@ -74,6 +75,36 @@ const query = new GraphQLObjectType({
7475
yield await Promise.resolve({});
7576
},
7677
},
78+
asyncIterableListDelayed: {
79+
type: new GraphQLList(friendType),
80+
async *resolve() {
81+
for (const friend of friends) {
82+
// pause an additional ms before yielding to allow time
83+
// for tests to return or throw before next value is processed.
84+
// eslint-disable-next-line no-await-in-loop
85+
await new Promise((r) => setTimeout(r, 1));
86+
yield friend;
87+
}
88+
},
89+
},
90+
asyncIterableListNoReturn: {
91+
type: new GraphQLList(friendType),
92+
resolve() {
93+
let i = 0;
94+
return {
95+
[Symbol.asyncIterator]: () => ({
96+
async next() {
97+
const friend = friends[i++];
98+
if (friend) {
99+
await new Promise((r) => setTimeout(r, 1));
100+
return { value: friend, done: false };
101+
}
102+
return { value: undefined, done: true };
103+
},
104+
}),
105+
};
106+
},
107+
},
77108
asyncIterableListDelayedClose: {
78109
type: new GraphQLList(friendType),
79110
async *resolve() {
@@ -697,4 +728,172 @@ describe('Execute: stream directive', () => {
697728
},
698729
]);
699730
});
731+
it('Returns underlying async iterables when dispatcher is returned', async () => {
732+
const document = parse(`
733+
query {
734+
asyncIterableListDelayed @stream(initialCount: 1) {
735+
name
736+
id
737+
}
738+
}
739+
`);
740+
const schema = new GraphQLSchema({ query });
741+
742+
const executeResult = await execute({ schema, document, rootValue: {} });
743+
invariant(isAsyncIterable(executeResult));
744+
const iterator = executeResult[Symbol.asyncIterator]();
745+
746+
const result1 = await iterator.next();
747+
expect(result1).to.deep.equal({
748+
done: false,
749+
value: {
750+
data: {
751+
asyncIterableListDelayed: [
752+
{
753+
id: '1',
754+
name: 'Luke',
755+
},
756+
],
757+
},
758+
hasNext: true,
759+
},
760+
});
761+
762+
iterator.return?.();
763+
764+
// this result had started processing before return was called
765+
const result2 = await iterator.next();
766+
expect(result2).to.deep.equal({
767+
done: false,
768+
value: {
769+
data: {
770+
id: '2',
771+
name: 'Han',
772+
},
773+
hasNext: true,
774+
path: ['asyncIterableListDelayed', 1],
775+
},
776+
});
777+
778+
// third result is not returned because async iterator has returned
779+
const result3 = await iterator.next();
780+
expect(result3).to.deep.equal({
781+
done: false,
782+
value: {
783+
hasNext: false,
784+
},
785+
});
786+
});
787+
it('Can return async iterable when underlying iterable does not have a return method', async () => {
788+
const document = parse(`
789+
query {
790+
asyncIterableListNoReturn @stream(initialCount: 1) {
791+
name
792+
id
793+
}
794+
}
795+
`);
796+
const schema = new GraphQLSchema({ query });
797+
798+
const executeResult = await execute({ schema, document, rootValue: {} });
799+
invariant(isAsyncIterable(executeResult));
800+
const iterator = executeResult[Symbol.asyncIterator]();
801+
802+
const result1 = await iterator.next();
803+
expect(result1).to.deep.equal({
804+
done: false,
805+
value: {
806+
data: {
807+
asyncIterableListNoReturn: [
808+
{
809+
id: '1',
810+
name: 'Luke',
811+
},
812+
],
813+
},
814+
hasNext: true,
815+
},
816+
});
817+
818+
iterator.return?.();
819+
820+
// this result had started processing before return was called
821+
const result2 = await iterator.next();
822+
expect(result2).to.deep.equal({
823+
done: false,
824+
value: {
825+
data: {
826+
id: '2',
827+
name: 'Han',
828+
},
829+
hasNext: true,
830+
path: ['asyncIterableListNoReturn', 1],
831+
},
832+
});
833+
834+
// third result is not returned because async iterator has returned
835+
const result3 = await iterator.next();
836+
expect(result3).to.deep.equal({
837+
done: false,
838+
value: {
839+
hasNext: false,
840+
},
841+
});
842+
});
843+
it('Returns underlying async iterables when dispatcher is thrown', async () => {
844+
const document = parse(`
845+
query {
846+
asyncIterableListDelayed @stream(initialCount: 1) {
847+
name
848+
id
849+
}
850+
}
851+
`);
852+
const schema = new GraphQLSchema({ query });
853+
854+
const executeResult = await execute({ schema, document, rootValue: {} });
855+
invariant(isAsyncIterable(executeResult));
856+
const iterator = executeResult[Symbol.asyncIterator]();
857+
858+
const result1 = await iterator.next();
859+
expect(result1).to.deep.equal({
860+
done: false,
861+
value: {
862+
data: {
863+
asyncIterableListDelayed: [
864+
{
865+
id: '1',
866+
name: 'Luke',
867+
},
868+
],
869+
},
870+
hasNext: true,
871+
},
872+
});
873+
874+
iterator.throw?.(new Error('bad'));
875+
876+
// this result had started processing before return was called
877+
const result2 = await iterator.next();
878+
expect(result2).to.deep.equal({
879+
done: false,
880+
value: {
881+
data: {
882+
id: '2',
883+
name: 'Han',
884+
},
885+
hasNext: true,
886+
path: ['asyncIterableListDelayed', 1],
887+
},
888+
});
889+
890+
// third result is not returned because async iterator has returned
891+
const result3 = await iterator.next();
892+
expect(result3).to.deep.equal({
893+
done: false,
894+
value: {
895+
hasNext: false,
896+
},
897+
});
898+
});
700899
});

src/execution/execute.ts

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1398,10 +1398,14 @@ interface DispatcherResult {
13981398
export class Dispatcher {
13991399
_subsequentPayloads: Array<Promise<IteratorResult<DispatcherResult, void>>>;
14001400
_initialResult?: ExecutionResult;
1401+
_iterators: Array<AsyncIterator<unknown>>;
1402+
_isDone: boolean;
14011403
_hasReturnedInitialResult: boolean;
14021404

14031405
constructor() {
14041406
this._subsequentPayloads = [];
1407+
this._iterators = [];
1408+
this._isDone = false;
14051409
this._hasReturnedInitialResult = false;
14061410
}
14071411

@@ -1470,13 +1474,16 @@ export class Dispatcher {
14701474
label?: string,
14711475
): void {
14721476
const subsequentPayloads = this._subsequentPayloads;
1477+
const iterators = this._iterators;
1478+
iterators.push(iterator);
14731479
function next(index: number) {
14741480
const fieldPath = addPath(path, index, undefined);
14751481
const patchErrors: Array<GraphQLError> = [];
14761482
subsequentPayloads.push(
14771483
iterator.next().then(
14781484
({ value: data, done }) => {
14791485
if (done) {
1486+
iterators.splice(iterators.indexOf(iterator), 1);
14801487
return { value: undefined, done: true };
14811488
}
14821489

@@ -1547,6 +1554,14 @@ export class Dispatcher {
15471554
}
15481555

15491556
_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
1557+
if (this._isDone) {
1558+
return Promise.resolve({
1559+
value: {
1560+
hasNext: false,
1561+
},
1562+
done: false,
1563+
});
1564+
}
15501565
return new Promise<{
15511566
promise: Promise<IteratorResult<DispatcherResult, void>>;
15521567
}>((resolve) => {
@@ -1606,15 +1621,29 @@ export class Dispatcher {
16061621
return this._race();
16071622
}
16081623

1609-
get(
1610-
initialResult: ExecutionResult,
1611-
): AsyncIterableIterator<AsyncExecutionResult> {
1624+
async _return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
1625+
await Promise.all(this._iterators.map((iterator) => iterator.return?.()));
1626+
this._isDone = true;
1627+
return { value: undefined, done: true };
1628+
}
1629+
1630+
async _throw(
1631+
error?: unknown,
1632+
): Promise<IteratorResult<AsyncExecutionResult, void>> {
1633+
await Promise.all(this._iterators.map((iterator) => iterator.return?.()));
1634+
this._isDone = true;
1635+
return Promise.reject(error);
1636+
}
1637+
1638+
get(initialResult: ExecutionResult): AsyncGenerator<AsyncExecutionResult> {
16121639
this._initialResult = initialResult;
16131640
return {
16141641
[Symbol.asyncIterator]() {
16151642
return this;
16161643
},
16171644
next: () => this._next(),
1645+
return: () => this._return(),
1646+
throw: (error?: unknown) => this._throw(error),
16181647
};
16191648
}
16201649

0 commit comments

Comments
 (0)