Skip to content

Commit

Permalink
lazily create deferred fragments and align with upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Aug 11, 2024
1 parent c7d7ad8 commit 82be0a6
Show file tree
Hide file tree
Showing 7 changed files with 364 additions and 354 deletions.
106 changes: 106 additions & 0 deletions packages/executor/src/execution/DeferredFragments.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import { Path } from '@graphql-tools/utils';
import { DeferUsage } from './collectFields.js';
import { PendingExecutionGroup, StreamRecord, SuccessfulExecutionGroup } from './types.js';

export type DeliveryGroup = DeferredFragmentRecord | StreamRecord;

/** @internal */
export class DeferredFragmentRecord {
path: Path | undefined;
label: string | undefined;
id?: string | undefined;
parentDeferUsage: DeferUsage | undefined;
pendingExecutionGroups: Set<PendingExecutionGroup>;
successfulExecutionGroups: Set<SuccessfulExecutionGroup>;
children: Set<DeliveryGroup>;
pending: boolean;
fns: Array<() => void>;

constructor(
path: Path | undefined,
label: string | undefined,
parentDeferUsage: DeferUsage | undefined,
) {
this.path = path;
this.label = label;
this.parentDeferUsage = parentDeferUsage;
this.pendingExecutionGroups = new Set();
this.successfulExecutionGroups = new Set();
this.children = new Set();
this.pending = false;
this.fns = [];
}

onPending(fn: () => void): void {
this.fns.push(fn);
}

setAsPending(): void {
this.pending = true;
for (const fn of this.fns) {
fn();
}
}
}

export function isDeferredFragmentRecord(
deliveryGroup: DeliveryGroup,
): deliveryGroup is DeferredFragmentRecord {
return deliveryGroup instanceof DeferredFragmentRecord;
}

/**
* @internal
*/
export class DeferredFragmentFactory {
private _rootDeferredFragments = new Map<DeferUsage, DeferredFragmentRecord>();

get(deferUsage: DeferUsage, path: Path | undefined): DeferredFragmentRecord {
const deferUsagePath = this._pathAtDepth(path, deferUsage.depth);
let deferredFragmentRecords: Map<DeferUsage, DeferredFragmentRecord> | undefined;
if (deferUsagePath === undefined) {
deferredFragmentRecords = this._rootDeferredFragments;
} else {
// A doubly nested Map<Path, Map<DeferUsage, DeferredFragmentRecord>>
// could be used, but could leak memory in long running operations.
// A WeakMap could be used instead. The below implementation is
// WeakMap-Like, saving the Map on the Path object directly.
// Alternatively, memory could be reclaimed manually, taking care to
// also reclaim memory for nested DeferredFragmentRecords if the parent
// is removed secondary to an error.
deferredFragmentRecords = (
deferUsagePath as unknown as {
deferredFragmentRecords: Map<DeferUsage, DeferredFragmentRecord>;
}
).deferredFragmentRecords;
if (deferredFragmentRecords === undefined) {
deferredFragmentRecords = new Map();
(
deferUsagePath as unknown as {
deferredFragmentRecords: Map<DeferUsage, DeferredFragmentRecord>;
}
).deferredFragmentRecords = deferredFragmentRecords;
}
}
let deferredFragmentRecord = deferredFragmentRecords.get(deferUsage);
if (deferredFragmentRecord === undefined) {
const { label, parentDeferUsage } = deferUsage;
deferredFragmentRecord = new DeferredFragmentRecord(deferUsagePath, label, parentDeferUsage);
deferredFragmentRecords.set(deferUsage, deferredFragmentRecord);
}
return deferredFragmentRecord;
}

private _pathAtDepth(path: Path | undefined, depth: number): Path | undefined {
if (depth === 0) {
return;
}
const stack: Array<Path> = [];
let currentPath = path;
while (currentPath !== undefined) {
stack.unshift(currentPath);
currentPath = currentPath.prev;
}
return stack[depth - 1];
}
}
159 changes: 98 additions & 61 deletions packages/executor/src/execution/IncrementalGraph.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
import type { GraphQLError } from 'graphql';
import { createDeferred, isPromise } from '@graphql-tools/utils';
import { createDeferred, isPromise, Path } from '@graphql-tools/utils';
import { BoxedPromiseOrValue } from './BoxedPromiseOrValue.js';
import { DeferUsage } from './collectFields.js';
import type { DeferredFragmentRecord, DeliveryGroup } from './DeferredFragments.js';
import { DeferredFragmentFactory, isDeferredFragmentRecord } from './DeferredFragments.js';
import { invariant } from './invariant.js';
import type {
CompletedExecutionGroup,
DeferredFragmentRecord,
DeliveryGroup,
IncrementalDataRecord,
IncrementalDataRecordResult,
PendingExecutionGroup,
StreamItemRecord,
StreamRecord,
SuccessfulExecutionGroup,
} from './types.js';
import { isDeferredFragmentRecord, isPendingExecutionGroup } from './types.js';
import { isPendingExecutionGroup } from './types.js';

/**
* @internal
*/
export class IncrementalGraph {
private _rootNodes: Set<DeliveryGroup>;

private _deferredFragmentFactory: DeferredFragmentFactory;
private _completedQueue: Array<IncrementalDataRecordResult>;
private _nextQueue: Array<(iterable: Iterable<IncrementalDataRecordResult> | undefined) => void>;

constructor() {
constructor(deferredFragmentFactory: DeferredFragmentFactory) {
this._rootNodes = new Set();
this._deferredFragmentFactory = deferredFragmentFactory;
this._completedQueue = [];
this._nextQueue = [];
}

getNewPending(
getNewRootNodes(
incrementalDataRecords: ReadonlyArray<IncrementalDataRecord>,
): ReadonlyArray<DeliveryGroup> {
const initialResultChildren = new Set<DeliveryGroup>();
Expand All @@ -40,12 +42,14 @@ export class IncrementalGraph {

addCompletedSuccessfulExecutionGroup(successfulExecutionGroup: SuccessfulExecutionGroup): void {
const { pendingExecutionGroup, incrementalDataRecords } = successfulExecutionGroup;
const { deferUsages, path } = pendingExecutionGroup;

const deferredFragmentRecords = pendingExecutionGroup.deferredFragmentRecords;

for (const deferredFragmentRecord of deferredFragmentRecords) {
const deferredFragmentRecords: Array<DeferredFragmentRecord> = [];
for (const deferUsage of deferUsages) {
const deferredFragmentRecord = this._deferredFragmentFactory.get(deferUsage, path);
deferredFragmentRecords.push(deferredFragmentRecord);
const { pendingExecutionGroups, successfulExecutionGroups } = deferredFragmentRecord;
pendingExecutionGroups.delete(successfulExecutionGroup.pendingExecutionGroup);
pendingExecutionGroups.delete(pendingExecutionGroup);
successfulExecutionGroups.add(successfulExecutionGroup);
}

Expand All @@ -54,6 +58,26 @@ export class IncrementalGraph {
}
}

getDeepestDeferredFragmentAtRoot(
initialDeferUsage: DeferUsage,
deferUsages: ReadonlySet<DeferUsage>,
path: Path | undefined,
): DeferredFragmentRecord {
let bestDeferUsage = initialDeferUsage;
let maxDepth = initialDeferUsage.depth;
for (const deferUsage of deferUsages) {
if (deferUsage === initialDeferUsage) {
continue;
}
const depth = deferUsage.depth;
if (depth > maxDepth) {
maxDepth = depth;
bestDeferUsage = deferUsage;
}
}
return this._deferredFragmentFactory.get(bestDeferUsage, path);
}

*currentCompletedBatch(): Generator<IncrementalDataRecordResult> {
let completed;
while ((completed = this._completedQueue.shift()) !== undefined) {
Expand Down Expand Up @@ -84,12 +108,17 @@ export class IncrementalGraph {
return this._rootNodes.size > 0;
}

completeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord):
completeDeferredFragment(
deferUsage: DeferUsage,
path: Path | undefined,
):
| {
newPending: ReadonlyArray<DeliveryGroup>;
deferredFragmentRecord: DeferredFragmentRecord;
newRootNodes: ReadonlyArray<DeliveryGroup>;
successfulExecutionGroups: ReadonlyArray<SuccessfulExecutionGroup>;
}
| undefined {
const deferredFragmentRecord = this._deferredFragmentFactory.get(deferUsage, path);
if (
!this._rootNodes.has(deferredFragmentRecord) ||
deferredFragmentRecord.pendingExecutionGroups.size > 0
Expand All @@ -99,21 +128,29 @@ export class IncrementalGraph {
const successfulExecutionGroups = Array.from(deferredFragmentRecord.successfulExecutionGroups);
this._rootNodes.delete(deferredFragmentRecord);
for (const successfulExecutionGroup of successfulExecutionGroups) {
for (const otherDeferredFragmentRecord of successfulExecutionGroup.pendingExecutionGroup
.deferredFragmentRecords) {
const { deferUsages, path: resultPath } = successfulExecutionGroup.pendingExecutionGroup;
for (const otherDeferUsage of deferUsages) {
const otherDeferredFragmentRecord = this._deferredFragmentFactory.get(
otherDeferUsage,
resultPath,
);
otherDeferredFragmentRecord.successfulExecutionGroups.delete(successfulExecutionGroup);
}
}
const newPending = this._promoteNonEmptyToRoot(deferredFragmentRecord.children);
return { newPending, successfulExecutionGroups };
const newRootNodes = this._promoteNonEmptyToRoot(deferredFragmentRecord.children);
return { deferredFragmentRecord, newRootNodes, successfulExecutionGroups };
}

removeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord): boolean {
removeDeferredFragment(
deferUsage: DeferUsage,
path: Path | undefined,
): DeferredFragmentRecord | undefined {
const deferredFragmentRecord = this._deferredFragmentFactory.get(deferUsage, path);
if (!this._rootNodes.has(deferredFragmentRecord)) {
return false;
return;
}
this._rootNodes.delete(deferredFragmentRecord);
return true;
return deferredFragmentRecord;
}

removeStream(streamRecord: StreamRecord): void {
Expand All @@ -127,11 +164,13 @@ export class IncrementalGraph {
): void {
for (const incrementalDataRecord of incrementalDataRecords) {
if (isPendingExecutionGroup(incrementalDataRecord)) {
for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) {
const { deferUsages, path } = incrementalDataRecord;
for (const deferUsage of deferUsages) {
const deferredFragmentRecord = this._deferredFragmentFactory.get(deferUsage, path);
this._addDeferredFragment(deferredFragmentRecord, initialResultChildren);
deferredFragmentRecord.pendingExecutionGroups.add(incrementalDataRecord);
}
if (this._hasPendingFragment(incrementalDataRecord)) {
if (this._completesRootNode(incrementalDataRecord)) {
this._onExecutionGroup(incrementalDataRecord);
}
} else if (parents === undefined) {
Expand All @@ -147,65 +186,71 @@ export class IncrementalGraph {
}

private _promoteNonEmptyToRoot(
maybeEmptyNewPending: Set<DeliveryGroup>,
maybeEmptyNewRootNodes: Set<DeliveryGroup>,
): ReadonlyArray<DeliveryGroup> {
const newPending: Array<DeliveryGroup> = [];
for (const deliveryGroup of maybeEmptyNewPending) {
if (isDeferredFragmentRecord(deliveryGroup)) {
if (deliveryGroup.pendingExecutionGroups.size > 0) {
deliveryGroup.setAsPending();
for (const pendingExecutionGroup of deliveryGroup.pendingExecutionGroups) {
if (!this._hasPendingFragment(pendingExecutionGroup)) {
const newRootNodes: Array<DeliveryGroup> = [];
for (const node of maybeEmptyNewRootNodes) {
if (isDeferredFragmentRecord(node)) {
if (node.pendingExecutionGroups.size > 0) {
node.setAsPending();
for (const pendingExecutionGroup of node.pendingExecutionGroups) {
if (!this._completesRootNode(pendingExecutionGroup)) {
this._onExecutionGroup(pendingExecutionGroup);
}
}
this._rootNodes.add(deliveryGroup);
newPending.push(deliveryGroup);
this._rootNodes.add(node);
newRootNodes.push(node);
continue;
}
for (const child of deliveryGroup.children) {
maybeEmptyNewPending.add(child);
for (const child of node.children) {
maybeEmptyNewRootNodes.add(child);
}
} else {
this._rootNodes.add(deliveryGroup);
newPending.push(deliveryGroup);
this._rootNodes.add(node);
newRootNodes.push(node);

this._onStreamItems(deliveryGroup);
this._onStreamItems(node);
}
}
return newPending;
return newRootNodes;
}

private _hasPendingFragment(pendingExecutionGroup: PendingExecutionGroup): boolean {
return pendingExecutionGroup.deferredFragmentRecords.some(deferredFragmentRecord =>
this._rootNodes.has(deferredFragmentRecord),
);
private _completesRootNode(pendingExecutionGroup: PendingExecutionGroup): boolean {
const { deferUsages, path } = pendingExecutionGroup;
for (const deferUsage of deferUsages) {
const deferredFragmentRecord = this._deferredFragmentFactory.get(deferUsage, path);
if (this._rootNodes.has(deferredFragmentRecord)) {
return true;
}
}
return false;
}

private _addDeferredFragment(
deferredFragmentRecord: DeferredFragmentRecord,
deliveryGroups: Set<DeliveryGroup> | undefined,
initialResultChildren: Set<DeliveryGroup> | undefined,
): void {
if (this._rootNodes.has(deferredFragmentRecord)) {
return;
}
const parent = deferredFragmentRecord.parent;
if (parent === undefined) {
invariant(deliveryGroups !== undefined);
deliveryGroups.add(deferredFragmentRecord);
const parentDeferUsage = deferredFragmentRecord.parentDeferUsage;
if (parentDeferUsage === undefined) {
invariant(initialResultChildren !== undefined);
initialResultChildren.add(deferredFragmentRecord);
return;
}
const parent = this._deferredFragmentFactory.get(parentDeferUsage, deferredFragmentRecord.path);
parent.children.add(deferredFragmentRecord);
this._addDeferredFragment(parent, deliveryGroups);
this._addDeferredFragment(parent, initialResultChildren);
}

private _onExecutionGroup(pendingExecutionGroup: PendingExecutionGroup): void {
const result = (pendingExecutionGroup.result as BoxedPromiseOrValue<CompletedExecutionGroup>)
const value = (pendingExecutionGroup.result as BoxedPromiseOrValue<CompletedExecutionGroup>)
.value;
if (isPromise(result)) {
result.then(resolved => this._enqueue(resolved));
if (isPromise(value)) {
value.then(resolved => this._enqueue(resolved));
} else {
this._enqueue(result);
this._enqueue(value);
}
}

Expand Down Expand Up @@ -272,15 +317,7 @@ export class IncrementalGraph {
first: IncrementalDataRecordResult,
): Generator<IncrementalDataRecordResult> {
yield first;
let completed;
while ((completed = this._completedQueue.shift()) !== undefined) {
yield completed;
}
if (this._rootNodes.size === 0) {
for (const resolve of this._nextQueue) {
resolve(undefined);
}
}
yield* this.currentCompletedBatch();
}

private _enqueue(completed: IncrementalDataRecordResult): void {
Expand Down
Loading

0 comments on commit 82be0a6

Please sign in to comment.