From fae5da500bad94c39a7ecd77a4c4361b58d6d2da Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Thu, 6 Jul 2023 03:49:42 +0300 Subject: [PATCH] incremental: subsequent result records should not store parent references (#3929) as memory then cannot be freed This affects both the existing branching executor on main as well as the non-branching, deduplicated version in #3886 We want to ensure that after an incremental result is sent to the client, no subsequent results reference this result so that garbage collection can free the memory associated with the result. To effect this, two changes are required: 1. Prior to this change, we performed filtering by modifying the children stored on a parent; now we add a flag on the filtered item itself, so that we no longer need a backreference from child to parent. 2. We no longer store a permanent reference on the ExecutionContext to the children of the initial result. Rather, we have the IncrementalPublisher provide a new InitialResultRecord that carries its own errors and children properties. --- src/execution/IncrementalPublisher.ts | 143 +++++++++++--------------- src/execution/execute.ts | 75 ++++++++------ 2 files changed, 104 insertions(+), 114 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index f48e62e6a2..b36c5c7653 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -97,34 +97,17 @@ export type FormattedIncrementalResult< * parents have completed so that they can no longer be filtered. This includes all Incremental * Data records in `released`, as well as Incremental Data records that have not yet completed. * - * `_initialResult`: a record containing the state of the initial result, as follows: - * `isCompleted`: indicates whether the initial result has completed. - * `children`: the set of Incremental Data records that can be be published when the initial - * result is completed. - * - * Each Incremental Data record also contains similar metadata, i.e. these records also contain - * similar `isCompleted` and `children` properties. - * * @internal */ export class IncrementalPublisher { - private _initialResult: { - children: Set; - isCompleted: boolean; - }; - - private _released: Set; - private _pending: Set; + private _released: Set; + private _pending: Set; // these are assigned within the Promise executor called synchronously within the constructor private _signalled!: Promise; private _resolve!: () => void; constructor() { - this._initialResult = { - children: new Set(), - isCompleted: false, - }; this._released = new Set(); this._pending = new Set(); this._reset(); @@ -210,19 +193,22 @@ export class IncrementalPublisher { }; } + prepareInitialResultRecord(): InitialResultRecord { + return { + errors: [], + children: new Set(), + }; + } + prepareNewDeferredFragmentRecord(opts: { label: string | undefined; path: Path | undefined; - parentContext: IncrementalDataRecord | undefined; + parentContext: IncrementalDataRecord; }): DeferredFragmentRecord { const deferredFragmentRecord = new DeferredFragmentRecord(opts); const parentContext = opts.parentContext; - if (parentContext) { - parentContext.children.add(deferredFragmentRecord); - } else { - this._initialResult.children.add(deferredFragmentRecord); - } + parentContext.children.add(deferredFragmentRecord); return deferredFragmentRecord; } @@ -231,16 +217,12 @@ export class IncrementalPublisher { label: string | undefined; path: Path | undefined; asyncIterator?: AsyncIterator; - parentContext: IncrementalDataRecord | undefined; + parentContext: IncrementalDataRecord; }): StreamItemsRecord { const streamItemsRecord = new StreamItemsRecord(opts); const parentContext = opts.parentContext; - if (parentContext) { - parentContext.children.add(streamItemsRecord); - } else { - this._initialResult.children.add(streamItemsRecord); - } + parentContext.children.add(streamItemsRecord); return streamItemsRecord; } @@ -274,36 +256,36 @@ export class IncrementalPublisher { incrementalDataRecord.errors.push(error); } - publishInitial() { - for (const child of this._initialResult.children) { + publishInitial(initialResult: InitialResultRecord) { + for (const child of initialResult.children) { + if (child.filtered) { + continue; + } this._publish(child); } } - filter( - nullPath: Path, - erroringIncrementalDataRecord: IncrementalDataRecord | undefined, - ) { + getInitialErrors( + initialResult: InitialResultRecord, + ): ReadonlyArray { + return initialResult.errors; + } + + filter(nullPath: Path, erroringIncrementalDataRecord: IncrementalDataRecord) { const nullPathArray = pathToArray(nullPath); const asyncIterators = new Set>(); - const children = - erroringIncrementalDataRecord === undefined - ? this._initialResult.children - : erroringIncrementalDataRecord.children; + const descendants = this._getDescendants( + erroringIncrementalDataRecord.children, + ); - for (const child of this._getDescendants(children)) { + for (const child of descendants) { if (!this._matchesPath(child.path, nullPathArray)) { continue; } - this._delete(child); - const parent = - child.parentContext === undefined - ? this._initialResult - : child.parentContext; - parent.children.delete(child); + child.filtered = true; if (isStreamItemsRecord(child)) { if (child.asyncIterator !== undefined) { @@ -333,37 +315,34 @@ export class IncrementalPublisher { this._signalled = signalled; } - private _introduce(item: IncrementalDataRecord) { + private _introduce(item: SubsequentDataRecord) { this._pending.add(item); } - private _release(item: IncrementalDataRecord): void { + private _release(item: SubsequentDataRecord): void { if (this._pending.has(item)) { this._released.add(item); this._trigger(); } } - private _push(item: IncrementalDataRecord): void { + private _push(item: SubsequentDataRecord): void { this._released.add(item); this._pending.add(item); this._trigger(); } - private _delete(item: IncrementalDataRecord) { - this._released.delete(item); - this._pending.delete(item); - this._trigger(); - } - private _getIncrementalResult( - completedRecords: ReadonlySet, + completedRecords: ReadonlySet, ): SubsequentIncrementalExecutionResult | undefined { const incrementalResults: Array = []; let encounteredCompletedAsyncIterator = false; for (const incrementalDataRecord of completedRecords) { const incrementalResult: IncrementalResult = {}; for (const child of incrementalDataRecord.children) { + if (child.filtered) { + continue; + } this._publish(child); } if (isStreamItemsRecord(incrementalDataRecord)) { @@ -396,18 +375,18 @@ export class IncrementalPublisher { : undefined; } - private _publish(incrementalDataRecord: IncrementalDataRecord) { - if (incrementalDataRecord.isCompleted) { - this._push(incrementalDataRecord); + private _publish(subsequentResultRecord: SubsequentDataRecord) { + if (subsequentResultRecord.isCompleted) { + this._push(subsequentResultRecord); } else { - this._introduce(incrementalDataRecord); + this._introduce(subsequentResultRecord); } } private _getDescendants( - children: ReadonlySet, - descendants = new Set(), - ): ReadonlySet { + children: ReadonlySet, + descendants = new Set(), + ): ReadonlySet { for (const child of children) { descendants.add(child); this._getDescendants(child.children, descendants); @@ -429,26 +408,27 @@ export class IncrementalPublisher { } } +export interface InitialResultRecord { + errors: Array; + children: Set; +} + /** @internal */ export class DeferredFragmentRecord { errors: Array; label: string | undefined; path: Array; data: ObjMap | null; - parentContext: IncrementalDataRecord | undefined; - children: Set; + children: Set; isCompleted: boolean; - constructor(opts: { - label: string | undefined; - path: Path | undefined; - parentContext: IncrementalDataRecord | undefined; - }) { + filtered: boolean; + constructor(opts: { label: string | undefined; path: Path | undefined }) { this.label = opts.label; this.path = pathToArray(opts.path); - this.parentContext = opts.parentContext; this.errors = []; this.children = new Set(); this.isCompleted = false; + this.filtered = false; this.data = null; } } @@ -459,33 +439,34 @@ export class StreamItemsRecord { label: string | undefined; path: Array; items: Array | null; - parentContext: IncrementalDataRecord | undefined; - children: Set; + children: Set; asyncIterator: AsyncIterator | undefined; isCompletedAsyncIterator?: boolean; isCompleted: boolean; + filtered: boolean; constructor(opts: { label: string | undefined; path: Path | undefined; asyncIterator?: AsyncIterator; - parentContext: IncrementalDataRecord | undefined; }) { this.items = null; this.label = opts.label; this.path = pathToArray(opts.path); - this.parentContext = opts.parentContext; this.asyncIterator = opts.asyncIterator; this.errors = []; this.children = new Set(); this.isCompleted = false; + this.filtered = false; this.items = null; } } -export type IncrementalDataRecord = DeferredFragmentRecord | StreamItemsRecord; +export type SubsequentDataRecord = DeferredFragmentRecord | StreamItemsRecord; + +export type IncrementalDataRecord = InitialResultRecord | SubsequentDataRecord; function isStreamItemsRecord( - incrementalDataRecord: IncrementalDataRecord, -): incrementalDataRecord is StreamItemsRecord { - return incrementalDataRecord instanceof StreamItemsRecord; + subsequentResultRecord: SubsequentDataRecord, +): subsequentResultRecord is StreamItemsRecord { + return subsequentResultRecord instanceof StreamItemsRecord; } diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 1ec11f72cc..af68c286e1 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -56,7 +56,9 @@ import type { FormattedIncrementalResult, IncrementalDataRecord, IncrementalResult, + InitialResultRecord, StreamItemsRecord, + SubsequentDataRecord, SubsequentIncrementalExecutionResult, } from './IncrementalPublisher.js'; import { IncrementalPublisher } from './IncrementalPublisher.js'; @@ -128,7 +130,6 @@ export interface ExecutionContext { fieldResolver: GraphQLFieldResolver; typeResolver: GraphQLTypeResolver; subscribeFieldResolver: GraphQLFieldResolver; - errors: Array; incrementalPublisher: IncrementalPublisher; } @@ -289,14 +290,17 @@ function executeImpl( // Errors from sub-fields of a NonNull type may propagate to the top level, // at which point we still log the error and null the parent field, which // in this case is the entire response. - const { incrementalPublisher, errors } = exeContext; + const incrementalPublisher = exeContext.incrementalPublisher; + const initialResultRecord = incrementalPublisher.prepareInitialResultRecord(); try { - const result = executeOperation(exeContext); + const result = executeOperation(exeContext, initialResultRecord); if (isPromise(result)) { return result.then( (data) => { + const errors = + incrementalPublisher.getInitialErrors(initialResultRecord); const initialResult = buildResponse(data, errors); - incrementalPublisher.publishInitial(); + incrementalPublisher.publishInitial(initialResultRecord); if (incrementalPublisher.hasNext()) { return { initialResult: { @@ -309,13 +313,15 @@ function executeImpl( return initialResult; }, (error) => { - errors.push(error); + incrementalPublisher.addFieldError(initialResultRecord, error); + const errors = + incrementalPublisher.getInitialErrors(initialResultRecord); return buildResponse(null, errors); }, ); } - const initialResult = buildResponse(result, errors); - incrementalPublisher.publishInitial(); + const initialResult = buildResponse(result, initialResultRecord.errors); + incrementalPublisher.publishInitial(initialResultRecord); if (incrementalPublisher.hasNext()) { return { initialResult: { @@ -327,7 +333,8 @@ function executeImpl( } return initialResult; } catch (error) { - errors.push(error); + incrementalPublisher.addFieldError(initialResultRecord, error); + const errors = incrementalPublisher.getInitialErrors(initialResultRecord); return buildResponse(null, errors); } } @@ -445,7 +452,6 @@ export function buildExecutionContext( typeResolver: typeResolver ?? defaultTypeResolver, subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver, incrementalPublisher: new IncrementalPublisher(), - errors: [], }; } @@ -456,8 +462,6 @@ function buildPerEventExecutionContext( return { ...exeContext, rootValue: payload, - // no need to update incrementalPublisher, incremental delivery is not supported for subscriptions - errors: [], }; } @@ -466,6 +470,7 @@ function buildPerEventExecutionContext( */ function executeOperation( exeContext: ExecutionContext, + initialResultRecord: InitialResultRecord, ): PromiseOrValue> { const { operation, schema, fragments, variableValues, rootValue } = exeContext; @@ -495,6 +500,7 @@ function executeOperation( rootValue, path, groupedFieldSet, + initialResultRecord, ); break; case OperationTypeNode.MUTATION: @@ -504,6 +510,7 @@ function executeOperation( rootValue, path, groupedFieldSet, + initialResultRecord, ); break; case OperationTypeNode.SUBSCRIPTION: @@ -515,6 +522,7 @@ function executeOperation( rootValue, path, groupedFieldSet, + initialResultRecord, ); } @@ -525,6 +533,7 @@ function executeOperation( rootType, rootValue, patchGroupedFieldSet, + initialResultRecord, label, path, ); @@ -543,6 +552,7 @@ function executeFieldsSerially( sourceValue: unknown, path: Path | undefined, groupedFieldSet: GroupedFieldSet, + incrementalDataRecord: InitialResultRecord, ): PromiseOrValue> { return promiseReduce( groupedFieldSet, @@ -554,6 +564,7 @@ function executeFieldsSerially( sourceValue, fieldGroup, fieldPath, + incrementalDataRecord, ); if (result === undefined) { return results; @@ -581,7 +592,7 @@ function executeFields( sourceValue: unknown, path: Path | undefined, groupedFieldSet: GroupedFieldSet, - incrementalDataRecord?: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): PromiseOrValue> { const results = Object.create(null); let containsPromise = false; @@ -638,7 +649,7 @@ function executeField( source: unknown, fieldGroup: FieldGroup, path: Path, - incrementalDataRecord?: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): PromiseOrValue { const fieldName = fieldGroup[0].name.value; const fieldDef = exeContext.schema.getField(parentType, fieldName); @@ -761,7 +772,7 @@ function handleFieldError( returnType: GraphQLOutputType, fieldGroup: FieldGroup, path: Path, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): void { const error = locatedError(rawError, fieldGroup, pathToArray(path)); @@ -771,11 +782,9 @@ function handleFieldError( throw error; } - const errors = incrementalDataRecord?.errors ?? exeContext.errors; - // Otherwise, error protection is applied, logging the error and resolving // a null value for this field if one is encountered. - errors.push(error); + exeContext.incrementalPublisher.addFieldError(incrementalDataRecord, error); } /** @@ -806,7 +815,7 @@ function completeValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): PromiseOrValue { // If result is an Error, throw a located error. if (result instanceof Error) { @@ -898,7 +907,7 @@ async function completePromisedValue( info: GraphQLResolveInfo, path: Path, result: Promise, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): Promise { try { const resolved = await result; @@ -997,7 +1006,7 @@ async function completeAsyncIteratorValue( info: GraphQLResolveInfo, path: Path, asyncIterator: AsyncIterator, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): Promise> { const stream = getStreamValues(exeContext, fieldGroup, path); let containsPromise = false; @@ -1019,8 +1028,8 @@ async function completeAsyncIteratorValue( info, itemType, path, - stream.label, incrementalDataRecord, + stream.label, ); break; } @@ -1067,7 +1076,7 @@ function completeListValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): PromiseOrValue> { const itemType = returnType.ofType; @@ -1117,8 +1126,8 @@ function completeListValue( fieldGroup, info, itemType, - stream.label, previousIncrementalDataRecord, + stream.label, ); index++; continue; @@ -1158,7 +1167,7 @@ function completeListItemValue( fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemPath: Path, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): boolean { if (isPromise(item)) { completedResults.push( @@ -1257,7 +1266,7 @@ function completeAbstractValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): PromiseOrValue> { const resolveTypeFn = returnType.resolveType ?? exeContext.typeResolver; const contextValue = exeContext.contextValue; @@ -1367,7 +1376,7 @@ function completeObjectValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): PromiseOrValue> { // If there is an isTypeOf predicate function, call it with the // current result. If isTypeOf returns false, then raise an error rather @@ -1423,7 +1432,7 @@ function collectAndExecuteSubfields( fieldGroup: FieldGroup, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): PromiseOrValue> { // Collect sub-fields to execute to complete this value. const { groupedFieldSet: subGroupedFieldSet, patches: subPatches } = @@ -1445,9 +1454,9 @@ function collectAndExecuteSubfields( returnType, result, subPatchGroupedFieldSet, + incrementalDataRecord, label, path, - incrementalDataRecord, ); } @@ -1747,9 +1756,9 @@ function executeDeferredFragment( parentType: GraphQLObjectType, sourceValue: unknown, fields: GroupedFieldSet, + parentContext: IncrementalDataRecord, label?: string, path?: Path, - parentContext?: IncrementalDataRecord, ): void { const incrementalPublisher = exeContext.incrementalPublisher; const incrementalDataRecord = @@ -1808,9 +1817,9 @@ function executeStreamField( fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemType: GraphQLOutputType, + parentContext: IncrementalDataRecord, label?: string, - parentContext?: IncrementalDataRecord, -): IncrementalDataRecord { +): SubsequentDataRecord { const incrementalPublisher = exeContext.incrementalPublisher; const incrementalDataRecord = incrementalPublisher.prepareNewStreamItemsRecord({ @@ -1990,12 +1999,12 @@ async function executeStreamAsyncIterator( info: GraphQLResolveInfo, itemType: GraphQLOutputType, path: Path, + parentContext: IncrementalDataRecord, label?: string, - parentContext?: IncrementalDataRecord, ): Promise { const incrementalPublisher = exeContext.incrementalPublisher; let index = initialIndex; - let previousIncrementalDataRecord = parentContext ?? undefined; + let previousIncrementalDataRecord = parentContext; // eslint-disable-next-line no-constant-condition while (true) { const itemPath = addPath(path, index, undefined);