diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts new file mode 100644 index 0000000000..081214c09d --- /dev/null +++ b/src/execution/IncrementalPublisher.ts @@ -0,0 +1,328 @@ +import type { ObjMap } from '../jsutils/ObjMap.js'; +import type { Path } from '../jsutils/Path.js'; +import { pathToArray } from '../jsutils/Path.js'; +import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; +import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; + +import type { + GraphQLError, + GraphQLFormattedError, +} from '../error/GraphQLError.js'; + +export interface SubsequentIncrementalExecutionResult< + TData = ObjMap, + TExtensions = ObjMap, +> { + hasNext: boolean; + incremental?: ReadonlyArray>; + extensions?: TExtensions; +} + +export interface FormattedSubsequentIncrementalExecutionResult< + TData = ObjMap, + TExtensions = ObjMap, +> { + hasNext: boolean; + incremental?: ReadonlyArray>; + extensions?: TExtensions; +} + +export interface IncrementalDeferResult< + TData = ObjMap, + TExtensions = ObjMap, +> { + errors?: ReadonlyArray; + data?: TData | null; + path?: ReadonlyArray; + label?: string; + extensions?: TExtensions; +} + +export interface FormattedIncrementalDeferResult< + TData = ObjMap, + TExtensions = ObjMap, +> { + errors?: ReadonlyArray; + data?: TData | null; + path?: ReadonlyArray; + label?: string; + extensions?: TExtensions; +} + +export interface IncrementalStreamResult< + TData = Array, + TExtensions = ObjMap, +> { + errors?: ReadonlyArray; + items?: TData | null; + path?: ReadonlyArray; + label?: string; + extensions?: TExtensions; +} + +export interface FormattedIncrementalStreamResult< + TData = Array, + TExtensions = ObjMap, +> { + errors?: ReadonlyArray; + items?: TData | null; + path?: ReadonlyArray; + label?: string; + extensions?: TExtensions; +} + +export type IncrementalResult< + TData = ObjMap, + TExtensions = ObjMap, +> = + | IncrementalDeferResult + | IncrementalStreamResult; + +export type FormattedIncrementalResult< + TData = ObjMap, + TExtensions = ObjMap, +> = + | FormattedIncrementalDeferResult + | FormattedIncrementalStreamResult; + +export function yieldSubsequentPayloads( + subsequentPayloads: Set, +): AsyncGenerator { + let isDone = false; + + async function next(): Promise< + IteratorResult + > { + if (isDone) { + return { value: undefined, done: true }; + } + + await Promise.race(Array.from(subsequentPayloads).map((p) => p.promise)); + + if (isDone) { + // a different call to next has exhausted all payloads + return { value: undefined, done: true }; + } + + const incremental = getCompletedIncrementalResults(subsequentPayloads); + const hasNext = subsequentPayloads.size > 0; + + if (!incremental.length && hasNext) { + return next(); + } + + if (!hasNext) { + isDone = true; + } + + return { + value: incremental.length ? { incremental, hasNext } : { hasNext }, + done: false, + }; + } + + function returnStreamIterators() { + const promises: Array>> = []; + subsequentPayloads.forEach((incrementalDataRecord) => { + if ( + isStreamItemsRecord(incrementalDataRecord) && + incrementalDataRecord.asyncIterator?.return + ) { + promises.push(incrementalDataRecord.asyncIterator.return()); + } + }); + return Promise.all(promises); + } + + return { + [Symbol.asyncIterator]() { + return this; + }, + next, + async return(): Promise< + IteratorResult + > { + await returnStreamIterators(); + isDone = true; + return { value: undefined, done: true }; + }, + async throw( + error?: unknown, + ): Promise> { + await returnStreamIterators(); + isDone = true; + return Promise.reject(error); + }, + }; +} + +function getCompletedIncrementalResults( + subsequentPayloads: Set, +): Array { + const incrementalResults: Array = []; + for (const incrementalDataRecord of subsequentPayloads) { + const incrementalResult: IncrementalResult = {}; + if (!incrementalDataRecord.isCompleted) { + continue; + } + subsequentPayloads.delete(incrementalDataRecord); + if (isStreamItemsRecord(incrementalDataRecord)) { + const items = incrementalDataRecord.items; + if (incrementalDataRecord.isCompletedAsyncIterator) { + // async iterable resolver just finished but there may be pending payloads + continue; + } + (incrementalResult as IncrementalStreamResult).items = items; + } else { + const data = incrementalDataRecord.data; + (incrementalResult as IncrementalDeferResult).data = data ?? null; + } + + incrementalResult.path = incrementalDataRecord.path; + if (incrementalDataRecord.label != null) { + incrementalResult.label = incrementalDataRecord.label; + } + if (incrementalDataRecord.errors.length > 0) { + incrementalResult.errors = incrementalDataRecord.errors; + } + incrementalResults.push(incrementalResult); + } + return incrementalResults; +} + +export function filterSubsequentPayloads( + subsequentPayloads: Set, + nullPath: Path, + currentIncrementalDataRecord: IncrementalDataRecord | undefined, +): void { + const nullPathArray = pathToArray(nullPath); + subsequentPayloads.forEach((incrementalDataRecord) => { + if (incrementalDataRecord === currentIncrementalDataRecord) { + // don't remove payload from where error originates + return; + } + for (let i = 0; i < nullPathArray.length; i++) { + if (incrementalDataRecord.path[i] !== nullPathArray[i]) { + // incrementalDataRecord points to a path unaffected by this payload + return; + } + } + // incrementalDataRecord path points to nulled error field + if ( + isStreamItemsRecord(incrementalDataRecord) && + incrementalDataRecord.asyncIterator?.return + ) { + incrementalDataRecord.asyncIterator.return().catch(() => { + // ignore error + }); + } + subsequentPayloads.delete(incrementalDataRecord); + }); +} + +/** @internal */ +export class DeferredFragmentRecord { + type: 'defer'; + errors: Array; + label: string | undefined; + path: Array; + promise: Promise; + data: ObjMap | null; + parentContext: IncrementalDataRecord | undefined; + isCompleted: boolean; + _subsequentPayloads: Set; + _resolve?: (arg: PromiseOrValue | null>) => void; + constructor(opts: { + label: string | undefined; + path: Path | undefined; + parentContext: IncrementalDataRecord | undefined; + subsequentPayloads: Set; + }) { + this.type = 'defer'; + this.label = opts.label; + this.path = pathToArray(opts.path); + this.parentContext = opts.parentContext; + this.errors = []; + this._subsequentPayloads = opts.subsequentPayloads; + this._subsequentPayloads.add(this); + this.isCompleted = false; + this.data = null; + const { promise, resolve } = promiseWithResolvers | null>(); + this._resolve = resolve; + this.promise = promise.then((data) => { + this.data = data; + this.isCompleted = true; + }); + } + + addData(data: PromiseOrValue | null>) { + const parentData = this.parentContext?.promise; + if (parentData) { + this._resolve?.(parentData.then(() => data)); + return; + } + this._resolve?.(data); + } +} + +/** @internal */ +export class StreamItemsRecord { + type: 'stream'; + errors: Array; + label: string | undefined; + path: Array; + items: Array | null; + promise: Promise; + parentContext: IncrementalDataRecord | undefined; + asyncIterator: AsyncIterator | undefined; + isCompletedAsyncIterator?: boolean; + isCompleted: boolean; + _subsequentPayloads: Set; + _resolve?: (arg: PromiseOrValue | null>) => void; + constructor(opts: { + label: string | undefined; + path: Path | undefined; + asyncIterator?: AsyncIterator; + parentContext: IncrementalDataRecord | undefined; + subsequentPayloads: Set; + }) { + this.type = 'stream'; + this.items = null; + this.label = opts.label; + this.path = pathToArray(opts.path); + this.parentContext = opts.parentContext; + this.asyncIterator = opts.asyncIterator; + this.errors = []; + this._subsequentPayloads = opts.subsequentPayloads; + this._subsequentPayloads.add(this); + this.isCompleted = false; + this.items = null; + const { promise, resolve } = promiseWithResolvers | null>(); + this._resolve = resolve; + this.promise = promise.then((items) => { + this.items = items; + this.isCompleted = true; + }); + } + + addItems(items: PromiseOrValue | null>) { + const parentData = this.parentContext?.promise; + if (parentData) { + this._resolve?.(parentData.then(() => items)); + return; + } + this._resolve?.(items); + } + + setIsCompletedAsyncIterator() { + this.isCompletedAsyncIterator = true; + } +} + +export type IncrementalDataRecord = DeferredFragmentRecord | StreamItemsRecord; + +function isStreamItemsRecord( + incrementalDataRecord: IncrementalDataRecord, +): incrementalDataRecord is StreamItemsRecord { + return incrementalDataRecord.type === 'stream'; +} diff --git a/src/execution/__tests__/defer-test.ts b/src/execution/__tests__/defer-test.ts index 31ddf9e6c0..e2f8834ca4 100644 --- a/src/execution/__tests__/defer-test.ts +++ b/src/execution/__tests__/defer-test.ts @@ -16,11 +16,9 @@ import { import { GraphQLID, GraphQLString } from '../../type/scalars.js'; import { GraphQLSchema } from '../../type/schema.js'; -import type { - InitialIncrementalExecutionResult, - SubsequentIncrementalExecutionResult, -} from '../execute.js'; +import type { InitialIncrementalExecutionResult } from '../execute.js'; import { execute, experimentalExecuteIncrementally } from '../execute.js'; +import type { SubsequentIncrementalExecutionResult } from '../IncrementalPublisher.js'; const friendType = new GraphQLObjectType({ fields: { diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index d75f280277..9f61adac1b 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -17,11 +17,9 @@ import { import { GraphQLID, GraphQLString } from '../../type/scalars.js'; import { GraphQLSchema } from '../../type/schema.js'; -import type { - InitialIncrementalExecutionResult, - SubsequentIncrementalExecutionResult, -} from '../execute.js'; +import type { InitialIncrementalExecutionResult } from '../execute.js'; import { experimentalExecuteIncrementally } from '../execute.js'; +import type { SubsequentIncrementalExecutionResult } from '../IncrementalPublisher.js'; const friendType = new GraphQLObjectType({ fields: { diff --git a/src/execution/execute.ts b/src/execution/execute.ts index d5aca82871..8c9d8f9668 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -12,7 +12,6 @@ import { addPath, pathToArray } from '../jsutils/Path.js'; import { promiseForObject } from '../jsutils/promiseForObject.js'; import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; import { promiseReduce } from '../jsutils/promiseReduce.js'; -import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; import type { GraphQLFormattedError } from '../error/GraphQLError.js'; import { GraphQLError } from '../error/GraphQLError.js'; @@ -53,6 +52,18 @@ import { collectFields, collectSubfields as _collectSubfields, } from './collectFields.js'; +import type { + FormattedIncrementalResult, + IncrementalDataRecord, + IncrementalResult, + SubsequentIncrementalExecutionResult, +} from './IncrementalPublisher.js'; +import { + DeferredFragmentRecord, + filterSubsequentPayloads, + StreamItemsRecord, + yieldSubsequentPayloads, +} from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; import { getArgumentValues, @@ -182,76 +193,6 @@ export interface FormattedInitialIncrementalExecutionResult< extensions?: TExtensions; } -export interface SubsequentIncrementalExecutionResult< - TData = ObjMap, - TExtensions = ObjMap, -> { - hasNext: boolean; - incremental?: ReadonlyArray>; - extensions?: TExtensions; -} - -export interface FormattedSubsequentIncrementalExecutionResult< - TData = ObjMap, - TExtensions = ObjMap, -> { - hasNext: boolean; - incremental?: ReadonlyArray>; - extensions?: TExtensions; -} - -export interface IncrementalDeferResult< - TData = ObjMap, - TExtensions = ObjMap, -> extends ExecutionResult { - path?: ReadonlyArray; - label?: string; -} - -export interface FormattedIncrementalDeferResult< - TData = ObjMap, - TExtensions = ObjMap, -> extends FormattedExecutionResult { - path?: ReadonlyArray; - label?: string; -} - -export interface IncrementalStreamResult< - TData = Array, - TExtensions = ObjMap, -> { - errors?: ReadonlyArray; - items?: TData | null; - path?: ReadonlyArray; - label?: string; - extensions?: TExtensions; -} - -export interface FormattedIncrementalStreamResult< - TData = Array, - TExtensions = ObjMap, -> { - errors?: ReadonlyArray; - items?: TData | null; - path?: ReadonlyArray; - label?: string; - extensions?: TExtensions; -} - -export type IncrementalResult< - TData = ObjMap, - TExtensions = ObjMap, -> = - | IncrementalDeferResult - | IncrementalStreamResult; - -export type FormattedIncrementalResult< - TData = ObjMap, - TExtensions = ObjMap, -> = - | FormattedIncrementalDeferResult - | FormattedIncrementalStreamResult; - export interface ExecutionArgs { schema: GraphQLSchema; document: DocumentNode; @@ -364,7 +305,9 @@ function executeImpl( ...initialResult, hasNext: true, }, - subsequentResults: yieldSubsequentPayloads(exeContext), + subsequentResults: yieldSubsequentPayloads( + exeContext.subsequentPayloads, + ), }; } return initialResult; @@ -382,7 +325,9 @@ function executeImpl( ...initialResult, hasNext: true, }, - subsequentResults: yieldSubsequentPayloads(exeContext), + subsequentResults: yieldSubsequentPayloads( + exeContext.subsequentPayloads, + ), }; } return initialResult; @@ -769,7 +714,11 @@ function executeField( path, incrementalDataRecord, ); - filterSubsequentPayloads(exeContext, path, incrementalDataRecord); + filterSubsequentPayloads( + exeContext.subsequentPayloads, + path, + incrementalDataRecord, + ); return null; }); } @@ -783,7 +732,11 @@ function executeField( path, incrementalDataRecord, ); - filterSubsequentPayloads(exeContext, path, incrementalDataRecord); + filterSubsequentPayloads( + exeContext.subsequentPayloads, + path, + incrementalDataRecord, + ); return null; } } @@ -984,7 +937,11 @@ async function completePromisedValue( path, incrementalDataRecord, ); - filterSubsequentPayloads(exeContext, path, incrementalDataRecord); + filterSubsequentPayloads( + exeContext.subsequentPayloads, + path, + incrementalDataRecord, + ); return null; } } @@ -1260,7 +1217,11 @@ function completeListItemValue( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads(exeContext, itemPath, incrementalDataRecord); + filterSubsequentPayloads( + exeContext.subsequentPayloads, + itemPath, + incrementalDataRecord, + ); return null; }), ); @@ -1278,7 +1239,11 @@ function completeListItemValue( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads(exeContext, itemPath, incrementalDataRecord); + filterSubsequentPayloads( + exeContext.subsequentPayloads, + itemPath, + incrementalDataRecord, + ); completedResults.push(null); } @@ -1812,7 +1777,7 @@ function executeDeferredFragment( label, path, parentContext, - exeContext, + subsequentPayloads: exeContext.subsequentPayloads, }); let promiseOrData; try { @@ -1853,7 +1818,7 @@ function executeStreamField( label, path: itemPath, parentContext, - exeContext, + subsequentPayloads: exeContext.subsequentPayloads, }); if (isPromise(item)) { const completedItems = completePromisedValue( @@ -1868,7 +1833,11 @@ function executeStreamField( (value) => [value], (error) => { incrementalDataRecord.errors.push(error); - filterSubsequentPayloads(exeContext, path, incrementalDataRecord); + filterSubsequentPayloads( + exeContext.subsequentPayloads, + path, + incrementalDataRecord, + ); return null; }, ); @@ -1899,11 +1868,19 @@ function executeStreamField( incrementalDataRecord, ); completedItem = null; - filterSubsequentPayloads(exeContext, itemPath, incrementalDataRecord); + filterSubsequentPayloads( + exeContext.subsequentPayloads, + itemPath, + incrementalDataRecord, + ); } } catch (error) { incrementalDataRecord.errors.push(error); - filterSubsequentPayloads(exeContext, path, incrementalDataRecord); + filterSubsequentPayloads( + exeContext.subsequentPayloads, + path, + incrementalDataRecord, + ); incrementalDataRecord.addItems(null); return incrementalDataRecord; } @@ -1919,14 +1896,22 @@ function executeStreamField( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads(exeContext, itemPath, incrementalDataRecord); + filterSubsequentPayloads( + exeContext.subsequentPayloads, + itemPath, + incrementalDataRecord, + ); return null; }) .then( (value) => [value], (error) => { incrementalDataRecord.errors.push(error); - filterSubsequentPayloads(exeContext, path, incrementalDataRecord); + filterSubsequentPayloads( + exeContext.subsequentPayloads, + path, + incrementalDataRecord, + ); return null; }, ); @@ -1982,7 +1967,11 @@ async function executeStreamAsyncIteratorItem( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads(exeContext, itemPath, incrementalDataRecord); + filterSubsequentPayloads( + exeContext.subsequentPayloads, + itemPath, + incrementalDataRecord, + ); return null; }); } @@ -1996,7 +1985,11 @@ async function executeStreamAsyncIteratorItem( itemPath, incrementalDataRecord, ); - filterSubsequentPayloads(exeContext, itemPath, incrementalDataRecord); + filterSubsequentPayloads( + exeContext.subsequentPayloads, + itemPath, + incrementalDataRecord, + ); return { done: false, value: null }; } } @@ -2022,7 +2015,7 @@ async function executeStreamAsyncIterator( path: itemPath, parentContext: previousIncrementalDataRecord, asyncIterator, - exeContext, + subsequentPayloads: exeContext.subsequentPayloads, }); let iteration; @@ -2040,7 +2033,11 @@ async function executeStreamAsyncIterator( ); } catch (error) { incrementalDataRecord.errors.push(error); - filterSubsequentPayloads(exeContext, path, incrementalDataRecord); + filterSubsequentPayloads( + exeContext.subsequentPayloads, + path, + incrementalDataRecord, + ); incrementalDataRecord.addItems(null); // entire stream has errored and bubbled upwards if (asyncIterator?.return) { @@ -2059,7 +2056,11 @@ async function executeStreamAsyncIterator( (value) => [value], (error) => { incrementalDataRecord.errors.push(error); - filterSubsequentPayloads(exeContext, path, incrementalDataRecord); + filterSubsequentPayloads( + exeContext.subsequentPayloads, + path, + incrementalDataRecord, + ); return null; }, ); @@ -2076,245 +2077,3 @@ async function executeStreamAsyncIterator( index++; } } - -function filterSubsequentPayloads( - exeContext: ExecutionContext, - nullPath: Path, - currentIncrementalDataRecord: IncrementalDataRecord | undefined, -): void { - const nullPathArray = pathToArray(nullPath); - exeContext.subsequentPayloads.forEach((incrementalDataRecord) => { - if (incrementalDataRecord === currentIncrementalDataRecord) { - // don't remove payload from where error originates - return; - } - for (let i = 0; i < nullPathArray.length; i++) { - if (incrementalDataRecord.path[i] !== nullPathArray[i]) { - // incrementalDataRecord points to a path unaffected by this payload - return; - } - } - // incrementalDataRecord path points to nulled error field - if ( - isStreamItemsRecord(incrementalDataRecord) && - incrementalDataRecord.asyncIterator?.return - ) { - incrementalDataRecord.asyncIterator.return().catch(() => { - // ignore error - }); - } - exeContext.subsequentPayloads.delete(incrementalDataRecord); - }); -} - -function getCompletedIncrementalResults( - exeContext: ExecutionContext, -): Array { - const incrementalResults: Array = []; - for (const incrementalDataRecord of exeContext.subsequentPayloads) { - const incrementalResult: IncrementalResult = {}; - if (!incrementalDataRecord.isCompleted) { - continue; - } - exeContext.subsequentPayloads.delete(incrementalDataRecord); - if (isStreamItemsRecord(incrementalDataRecord)) { - const items = incrementalDataRecord.items; - if (incrementalDataRecord.isCompletedAsyncIterator) { - // async iterable resolver just finished but there may be pending payloads - continue; - } - (incrementalResult as IncrementalStreamResult).items = items; - } else { - const data = incrementalDataRecord.data; - (incrementalResult as IncrementalDeferResult).data = data ?? null; - } - - incrementalResult.path = incrementalDataRecord.path; - if (incrementalDataRecord.label != null) { - incrementalResult.label = incrementalDataRecord.label; - } - if (incrementalDataRecord.errors.length > 0) { - incrementalResult.errors = incrementalDataRecord.errors; - } - incrementalResults.push(incrementalResult); - } - return incrementalResults; -} - -function yieldSubsequentPayloads( - exeContext: ExecutionContext, -): AsyncGenerator { - let isDone = false; - - async function next(): Promise< - IteratorResult - > { - if (isDone) { - return { value: undefined, done: true }; - } - - await Promise.race( - Array.from(exeContext.subsequentPayloads).map((p) => p.promise), - ); - - if (isDone) { - // a different call to next has exhausted all payloads - return { value: undefined, done: true }; - } - - const incremental = getCompletedIncrementalResults(exeContext); - const hasNext = exeContext.subsequentPayloads.size > 0; - - if (!incremental.length && hasNext) { - return next(); - } - - if (!hasNext) { - isDone = true; - } - - return { - value: incremental.length ? { incremental, hasNext } : { hasNext }, - done: false, - }; - } - - function returnStreamIterators() { - const promises: Array>> = []; - exeContext.subsequentPayloads.forEach((incrementalDataRecord) => { - if ( - isStreamItemsRecord(incrementalDataRecord) && - incrementalDataRecord.asyncIterator?.return - ) { - promises.push(incrementalDataRecord.asyncIterator.return()); - } - }); - return Promise.all(promises); - } - - return { - [Symbol.asyncIterator]() { - return this; - }, - next, - async return(): Promise< - IteratorResult - > { - await returnStreamIterators(); - isDone = true; - return { value: undefined, done: true }; - }, - async throw( - error?: unknown, - ): Promise> { - await returnStreamIterators(); - isDone = true; - return Promise.reject(error); - }, - }; -} - -class DeferredFragmentRecord { - type: 'defer'; - errors: Array; - label: string | undefined; - path: Array; - promise: Promise; - data: ObjMap | null; - parentContext: IncrementalDataRecord | undefined; - isCompleted: boolean; - _exeContext: ExecutionContext; - _resolve?: (arg: PromiseOrValue | null>) => void; - constructor(opts: { - label: string | undefined; - path: Path | undefined; - parentContext: IncrementalDataRecord | undefined; - exeContext: ExecutionContext; - }) { - this.type = 'defer'; - this.label = opts.label; - this.path = pathToArray(opts.path); - this.parentContext = opts.parentContext; - this.errors = []; - this._exeContext = opts.exeContext; - this._exeContext.subsequentPayloads.add(this); - this.isCompleted = false; - this.data = null; - const { promise, resolve } = promiseWithResolvers | null>(); - this._resolve = resolve; - this.promise = promise.then((data) => { - this.data = data; - this.isCompleted = true; - }); - } - - addData(data: PromiseOrValue | null>) { - const parentData = this.parentContext?.promise; - if (parentData) { - this._resolve?.(parentData.then(() => data)); - return; - } - this._resolve?.(data); - } -} - -class StreamItemsRecord { - type: 'stream'; - errors: Array; - label: string | undefined; - path: Array; - items: Array | null; - promise: Promise; - parentContext: IncrementalDataRecord | undefined; - asyncIterator: AsyncIterator | undefined; - isCompletedAsyncIterator?: boolean; - isCompleted: boolean; - _exeContext: ExecutionContext; - _resolve?: (arg: PromiseOrValue | null>) => void; - constructor(opts: { - label: string | undefined; - path: Path | undefined; - asyncIterator?: AsyncIterator; - parentContext: IncrementalDataRecord | undefined; - exeContext: ExecutionContext; - }) { - this.type = 'stream'; - this.items = null; - this.label = opts.label; - this.path = pathToArray(opts.path); - this.parentContext = opts.parentContext; - this.asyncIterator = opts.asyncIterator; - this.errors = []; - this._exeContext = opts.exeContext; - this._exeContext.subsequentPayloads.add(this); - this.isCompleted = false; - this.items = null; - const { promise, resolve } = promiseWithResolvers | null>(); - this._resolve = resolve; - this.promise = promise.then((items) => { - this.items = items; - this.isCompleted = true; - }); - } - - addItems(items: PromiseOrValue | null>) { - const parentData = this.parentContext?.promise; - if (parentData) { - this._resolve?.(parentData.then(() => items)); - return; - } - this._resolve?.(items); - } - - setIsCompletedAsyncIterator() { - this.isCompletedAsyncIterator = true; - } -} - -type IncrementalDataRecord = DeferredFragmentRecord | StreamItemsRecord; - -function isStreamItemsRecord( - incrementalDataRecord: IncrementalDataRecord, -): incrementalDataRecord is StreamItemsRecord { - return incrementalDataRecord.type === 'stream'; -} diff --git a/src/execution/index.ts b/src/execution/index.ts index 61b170e100..3c8581c7b0 100644 --- a/src/execution/index.ts +++ b/src/execution/index.ts @@ -15,17 +15,20 @@ export type { ExecutionResult, ExperimentalIncrementalExecutionResults, InitialIncrementalExecutionResult, + FormattedExecutionResult, + FormattedInitialIncrementalExecutionResult, +} from './execute.js'; + +export type { SubsequentIncrementalExecutionResult, IncrementalDeferResult, IncrementalStreamResult, IncrementalResult, - FormattedExecutionResult, - FormattedInitialIncrementalExecutionResult, FormattedSubsequentIncrementalExecutionResult, FormattedIncrementalDeferResult, FormattedIncrementalStreamResult, FormattedIncrementalResult, -} from './execute.js'; +} from './IncrementalPublisher.js'; export { getArgumentValues,