Skip to content

Commit

Permalink
Return underlying AsyncIterators when execute result is returned (#2843)
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/execution/execute.ts
  • Loading branch information
robrichard committed Aug 3, 2022
1 parent f5ebcbe commit 1e58b67
Show file tree
Hide file tree
Showing 2 changed files with 277 additions and 9 deletions.
248 changes: 248 additions & 0 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { assert } from 'chai';
import { describe, it } from 'mocha';

import { expectJSON } from '../../__testUtils__/expectJSON';
Expand Down Expand Up @@ -162,6 +163,37 @@ const query = new GraphQLObjectType({
yield await Promise.resolve({ string: friends[1].name });
},
},
asyncIterableListDelayed: {
type: new GraphQLList(friendType),
async *resolve() {
for (const friend of friends) {
// pause an additional ms before yielding to allow time
// for tests to return or throw before next value is processed.
// eslint-disable-next-line no-await-in-loop
await resolveOnNextTick();
yield friend; /* c8 ignore start */
// Not reachable, early return
}
} /* c8 ignore stop */,
},
asyncIterableListNoReturn: {
type: new GraphQLList(friendType),
resolve() {
let i = 0;
return {
[Symbol.asyncIterator]: () => ({
async next() {
const friend = friends[i++];
if (friend) {
await resolveOnNextTick();
return { value: friend, done: false };
}
return { value: undefined, done: true };
},
}),
};
},
},
asyncIterableListDelayedClose: {
type: new GraphQLList(friendType),
async *resolve() {
Expand Down Expand Up @@ -1344,4 +1376,220 @@ describe('Execute: stream directive', () => {
},
]);
});
it('Returns underlying async iterables when returned generator is returned', async () => {
const document = parse(`
query {
asyncIterableListDelayed @stream(initialCount: 1) {
id
... @defer {
name
}
}
}
`);
const schema = new GraphQLSchema({ query });

const executeResult = await execute({ schema, document, rootValue: {} });
assert(isAsyncIterable(executeResult));
const iterator = executeResult[Symbol.asyncIterator]();

const result1 = await iterator.next();
expectJSON(result1).toDeepEqual({
done: false,
value: {
data: {
asyncIterableListDelayed: [
{
id: '1',
},
],
},
hasNext: true,
},
});
const returnPromise = iterator.return();

// these results had started processing before return was called
const result2 = await iterator.next();
expectJSON(result2).toDeepEqual({
done: false,
value: {
incremental: [
{
data: {
name: 'Luke',
},
path: ['asyncIterableListDelayed', 0],
},
],
hasNext: true,
},
});
const result3 = await iterator.next();
expectJSON(result3).toDeepEqual({
done: false,
value: {
incremental: [
{
items: [
{
id: '2',
},
],
path: ['asyncIterableListDelayed', 1],
},
],
hasNext: true,
},
});
const result4 = await iterator.next();
expectJSON(result4).toDeepEqual({
done: true,
value: undefined,
});
await returnPromise;
});
it('Can return async iterable when underlying iterable does not have a return method', async () => {
const document = parse(`
query {
asyncIterableListNoReturn @stream(initialCount: 1) {
name
id
}
}
`);
const schema = new GraphQLSchema({ query });

const executeResult = await execute({ schema, document, rootValue: {} });
assert(isAsyncIterable(executeResult));
const iterator = executeResult[Symbol.asyncIterator]();

const result1 = await iterator.next();
expectJSON(result1).toDeepEqual({
done: false,
value: {
data: {
asyncIterableListNoReturn: [
{
id: '1',
name: 'Luke',
},
],
},
hasNext: true,
},
});

const returnPromise = iterator.return();

// this result had started processing before return was called
const result2 = await iterator.next();
expectJSON(result2).toDeepEqual({
done: false,
value: {
incremental: [
{
items: [
{
id: '2',
name: 'Han',
},
],
path: ['asyncIterableListNoReturn', 1],
},
],
hasNext: true,
},
});

// third result is not returned because async iterator has returned
const result3 = await iterator.next();
expectJSON(result3).toDeepEqual({
done: true,
value: undefined,
});
await returnPromise;
});
it('Returns underlying async iterables when returned generator is thrown', async () => {
const document = parse(`
query {
asyncIterableListDelayed @stream(initialCount: 1) {
... @defer {
name
}
id
}
}
`);
const schema = new GraphQLSchema({ query });

const executeResult = await execute({ schema, document, rootValue: {} });
assert(isAsyncIterable(executeResult));
const iterator = executeResult[Symbol.asyncIterator]();

const result1 = await iterator.next();
expectJSON(result1).toDeepEqual({
done: false,
value: {
data: {
asyncIterableListDelayed: [
{
id: '1',
},
],
},
hasNext: true,
},
});

const throwPromise = iterator.throw(new Error('bad'));

// these results had started processing before return was called
const result2 = await iterator.next();
expectJSON(result2).toDeepEqual({
done: false,
value: {
incremental: [
{
data: {
name: 'Luke',
},
path: ['asyncIterableListDelayed', 0],
},
],
hasNext: true,
},
});
const result3 = await iterator.next();
expectJSON(result3).toDeepEqual({
done: false,
value: {
incremental: [
{
items: [
{
id: '2',
},
],
path: ['asyncIterableListDelayed', 1],
},
],
hasNext: true,
},
});

// this result is not returned because async iterator has returned
const result4 = await iterator.next();
expectJSON(result4).toDeepEqual({
done: true,
value: undefined,
});
try {
await throwPromise; /* c8 ignore start */
// Not reachable, always throws
/* c8 ignore stop */
} catch (e) {
// ignore error
}
});
});
38 changes: 29 additions & 9 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1747,6 +1747,7 @@ async function executeStreamIterator(
label,
path: fieldPath,
parentContext,
iterator,
});

const dataPromise = executeStreamIteratorItem(
Expand Down Expand Up @@ -1789,6 +1790,7 @@ function yieldSubsequentPayloads(
initialResult: ExecutionResult,
): AsyncGenerator<AsyncExecutionResult, void, void> {
let _hasReturnedInitialResult = false;
let isDone = false;

async function race(): Promise<IteratorResult<AsyncExecutionResult>> {
if (exeContext.subsequentPayloads.length === 0) {
Expand Down Expand Up @@ -1866,19 +1868,37 @@ function yieldSubsequentPayloads(
},
done: false,
});
} else if (exeContext.subsequentPayloads.length === 0) {
} else if (exeContext.subsequentPayloads.length === 0 || isDone) {
return Promise.resolve({ value: undefined, done: true });
}
return race();
},
// TODO: implement return & throw
// c8 ignore next 2
// will be covered in follow up
return: () => Promise.resolve({ value: undefined, done: true }),

// c8 ignore next 2
// will be covered in follow up
throw: (error?: unknown) => Promise.reject(error),
async return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
await Promise.all(
exeContext.subsequentPayloads.map((asyncPayloadRecord) => {
if (isStreamPayload(asyncPayloadRecord)) {
return asyncPayloadRecord.iterator?.return?.();
}
return undefined;
}),
);
isDone = true;
return { value: undefined, done: true };
},
async throw(
error?: unknown,
): Promise<IteratorResult<AsyncExecutionResult, void>> {
await Promise.all(
exeContext.subsequentPayloads.map((asyncPayloadRecord) => {
if (isStreamPayload(asyncPayloadRecord)) {
return asyncPayloadRecord.iterator?.return?.();
}
return undefined;
}),
);
isDone = true;
return Promise.reject(error);
},
};
}

Expand Down

0 comments on commit 1e58b67

Please sign in to comment.