diff --git a/packages/executor/src/execution/IncrementalGraph.ts b/packages/executor/src/execution/IncrementalGraph.ts index b8c5ebf0d92..6825aec4f6c 100644 --- a/packages/executor/src/execution/IncrementalGraph.ts +++ b/packages/executor/src/execution/IncrementalGraph.ts @@ -22,9 +22,7 @@ export class IncrementalGraph { private _rootNodes: Set; private _completedQueue: Array; - private _nextQueue: Array< - (iterable: IteratorResult>) => void - >; + private _nextQueue: Array<(iterable: Iterable | undefined) => void>; constructor() { this._rootNodes = new Set(); @@ -60,46 +58,30 @@ export class IncrementalGraph { } } - currentCompletedIncrementalData() { - return { - [Symbol.iterator]() { - return this; - }, - next: (): IteratorResult => { - const value = this._completedQueue.shift(); - if (value !== undefined) { - return { value, done: false }; - } - return { value: undefined, done: true }; - }, - }; + *currentCompletedBatch(): Generator { + let completed; + while ((completed = this._completedQueue.shift()) !== undefined) { + yield completed; + } + if (this._rootNodes.size === 0) { + for (const resolve of this._nextQueue) { + resolve(undefined); + } + } } - completedIncrementalData() { - return { - [Symbol.asyncIterator]() { - return this; - }, - next: (): Promise>> => { - const firstResult = this._completedQueue.shift(); - if (firstResult !== undefined) { - return Promise.resolve({ - value: this._yieldCurrentCompletedIncrementalData(firstResult), - done: false, - }); - } - const { promise, resolve } = - createDeferred>>(); - this._nextQueue.push(resolve); - return promise; - }, - return: (): Promise>> => { - for (const resolve of this._nextQueue) { - resolve({ value: undefined, done: true }); - } - return Promise.resolve({ value: undefined, done: true }); - }, - }; + nextCompletedBatch(): Promise | undefined> { + const { promise, resolve } = createDeferred< + Iterable | undefined + >(); + this._nextQueue.push(resolve); + return promise; + } + + abort(): void { + for (const resolve of this._nextQueue) { + resolve(undefined); + } } hasNext(): boolean { @@ -144,11 +126,6 @@ export class IncrementalGraph { private _removePending(subsequentResultRecord: SubsequentResultRecord): void { this._rootNodes.delete(subsequentResultRecord); - if (this._rootNodes.size === 0) { - for (const resolve of this._nextQueue) { - resolve({ value: undefined, done: true }); - } - } } private _addIncrementalDataRecords( @@ -312,15 +289,17 @@ export class IncrementalGraph { while ((completed = this._completedQueue.shift()) !== undefined) { yield completed; } + if (this._rootNodes.size === 0) { + for (const resolve of this._nextQueue) { + resolve(undefined); + } + } } private _enqueue(completed: IncrementalDataRecordResult): void { const next = this._nextQueue.shift(); if (next !== undefined) { - next({ - value: this._yieldCurrentCompletedIncrementalData(completed), - done: false, - }); + next(this._yieldCurrentCompletedIncrementalData(completed)); return; } this._completedQueue.push(completed); diff --git a/packages/executor/src/execution/IncrementalPublisher.ts b/packages/executor/src/execution/IncrementalPublisher.ts index ba28080a0da..f30ab664eef 100644 --- a/packages/executor/src/execution/IncrementalPublisher.ts +++ b/packages/executor/src/execution/IncrementalPublisher.ts @@ -48,6 +48,19 @@ interface SubsequentIncrementalExecutionResultContext { completed: Array; } +/** + * The IncrementalPublisherState Enum tracks the state of the IncrementalPublisher, which is initialized to + * "Started". When there are no more incremental results to publish, the state is set to "Completed". On the + * next call to next, clean-up is potentially performed and the state is set to "Finished". + * + * If the IncrementalPublisher is ended early, it may be advanced directly from "Started" to "Finished". + */ +enum IncrementalPublisherState { + Started = 1, + Completed = 2, + Finished = 3, +} + /** * This class is used to publish incremental results to the client, enabling semi-concurrent * execution while preserving result order. @@ -113,18 +126,32 @@ class IncrementalPublisher { void, void > { - let isDone = false; + let incrementalPublisherState: IncrementalPublisherState = IncrementalPublisherState.Started; + + const _finish = async (): Promise => { + incrementalPublisherState = IncrementalPublisherState.Finished; + this._incrementalGraph.abort(); + await this._returnAsyncIterators(); + }; this._context.signal?.addEventListener('abort', () => { - this._incrementalGraph.completedIncrementalData().return(); + this._incrementalGraph.abort(); }); const _next = async (): Promise< IteratorResult, void> > => { - if (isDone) { - await this._returnAsyncIteratorsIgnoringErrors(); - return { value: undefined, done: true }; + switch (incrementalPublisherState) { + case IncrementalPublisherState.Finished: { + return { value: undefined, done: true }; + } + case IncrementalPublisherState.Completed: { + await _finish(); + return { value: undefined, done: true }; + } + case IncrementalPublisherState.Started: { + // continue + } } const context: SubsequentIncrementalExecutionResultContext = { @@ -133,12 +160,10 @@ class IncrementalPublisher { completed: [], }; - let currentCompletedIncrementalData = - this._incrementalGraph.currentCompletedIncrementalData(); - const completedIncrementalData = this._incrementalGraph.completedIncrementalData(); - const asyncIterator = completedIncrementalData[Symbol.asyncIterator](); + let batch: Iterable | undefined = + this._incrementalGraph.currentCompletedBatch(); do { - for (const completedResult of currentCompletedIncrementalData) { + for (const completedResult of batch) { this._handleCompletedIncrementalData(completedResult, context); } @@ -147,7 +172,7 @@ class IncrementalPublisher { const hasNext = this._incrementalGraph.hasNext(); if (!hasNext) { - isDone = true; + incrementalPublisherState = IncrementalPublisherState.Completed; } const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult = @@ -169,31 +194,27 @@ class IncrementalPublisher { return { value: subsequentIncrementalExecutionResult, done: false }; } - const iteration = await asyncIterator.next(); - currentCompletedIncrementalData = iteration.value; - } while (currentCompletedIncrementalData !== undefined); + batch = await this._incrementalGraph.nextCompletedBatch(); + } while (batch !== undefined); if (this._context.signal?.aborted) { throw this._context.signal.reason; } - await this._returnAsyncIteratorsIgnoringErrors(); return { value: undefined, done: true }; }; const _return = async (): Promise< IteratorResult, void> > => { - isDone = true; - await this._returnAsyncIterators(); + await _finish(); return { value: undefined, done: true }; }; const _throw = async ( error?: unknown, ): Promise, void>> => { - isDone = true; - await this._returnAsyncIterators(); + await _finish(); return Promise.reject(error); }; @@ -400,7 +421,7 @@ class IncrementalPublisher { } private async _returnAsyncIterators(): Promise { - await this._incrementalGraph.completedIncrementalData().return(); + await this._incrementalGraph.abort(); const cancellableStreams = this._context.cancellableStreams; if (cancellableStreams === undefined) { @@ -414,10 +435,4 @@ class IncrementalPublisher { } await Promise.all(promises); } - - private async _returnAsyncIteratorsIgnoringErrors(): Promise { - await this._returnAsyncIterators().catch(() => { - // Ignore errors - }); - } } diff --git a/packages/executor/src/execution/__tests__/stream-test.ts b/packages/executor/src/execution/__tests__/stream-test.ts index ff3052e4e7f..024d6d1e044 100644 --- a/packages/executor/src/execution/__tests__/stream-test.ts +++ b/packages/executor/src/execution/__tests__/stream-test.ts @@ -10,6 +10,7 @@ import { } from 'graphql'; import { createDeferred, MaybePromise } from '@graphql-tools/utils'; import { expectJSON } from '../../__testUtils__/expectJSON.js'; +import { expectPromise } from '../../__testUtils__/expectPromise.js'; import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js'; import { execute } from '../execute.js'; import type { @@ -1831,7 +1832,7 @@ describe('Execute: stream directive', () => { ]); }); - it('Returns iterator and ignores errors when stream payloads are filtered', async () => { + it('Returns iterator and passes through errors when stream payloads are filtered', async () => { let returned = false; let requested = false; const iterable = { @@ -1854,7 +1855,7 @@ describe('Execute: stream directive', () => { }, return: () => { returned = true; - // Ignores errors from return. + // This error should be passed through. return Promise.reject(new Error('Oops')); }, }), @@ -1927,8 +1928,8 @@ describe('Execute: stream directive', () => { }, }); - const result3 = await iterator.next(); - expectJSON(result3).toDeepEqual({ done: true, value: undefined }); + const result3Promise = iterator.next(); + await expectPromise(result3Promise).toRejectWith('Oops'); expect(returned).toBeTruthy(); }); @@ -2371,29 +2372,24 @@ describe('Execute: stream directive', () => { }); it('Returns underlying async iterables when returned generator is returned', async () => { let returned = false; - let index = 0; const iterable = { [Symbol.asyncIterator]: () => ({ - next: () => { - const friend = friends[index++]; - if (friend == null) { - return Promise.resolve({ done: true, value: undefined }); - } - return Promise.resolve({ done: false, value: friend }); - }, + next: () => + new Promise(() => { + /* never resolves */ + }), return: () => { returned = true; + // This error should be passed through. + return Promise.reject(new Error('Oops')); }, }), }; const document = parse(/* GraphQL */ ` query { - friendList @stream(initialCount: 1) { + friendList @stream { id - ... @defer { - name - } } } `); @@ -2412,26 +2408,21 @@ describe('Execute: stream directive', () => { const result1 = executeResult.initialResult; expectJSON(result1).toDeepEqual({ data: { - friendList: [ - { - id: '1', - }, - ], + friendList: [], }, - pending: [ - { id: '0', path: ['friendList', 0] }, - { id: '1', path: ['friendList'] }, - ], + pending: [{ id: '0', path: ['friendList'] }], hasNext: true, }); + + const result2Promise = iterator.next(); const returnPromise = iterator.return(); - const result2 = await iterator.next(); + const result2 = await result2Promise; expectJSON(result2).toDeepEqual({ done: true, value: undefined, }); - await returnPromise; + await expectPromise(returnPromise).toRejectWith('Oops'); expect(returned).toBeTruthy(); }); it('Can return async iterable when underlying iterable does not have a return method', async () => { @@ -2553,13 +2544,7 @@ describe('Execute: stream directive', () => { done: true, value: undefined, }); - try { - await throwPromise; /* c8 ignore start */ - // Not reachable, always throws - /* c8 ignore stop */ - } catch (e) { - // ignore error - } + await expectPromise(throwPromise).toRejectWith('bad'); expect(returned).toBeTruthy(); }); });