Skip to content

Commit cb43c83

Browse files
committed
refactor: extract incremental graph to separate file
1 parent 62d347d commit cb43c83

File tree

3 files changed

+239
-181
lines changed

3 files changed

+239
-181
lines changed

src/execution/IncrementalGraph.ts

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
import { isPromise } from '../jsutils/isPromise.js';
2+
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';
3+
4+
import type {
5+
DeferredFragmentRecord,
6+
DeferredGroupedFieldSetResult,
7+
IncrementalDataRecord,
8+
IncrementalDataRecordResult,
9+
ReconcilableDeferredGroupedFieldSetResult,
10+
StreamItemsResult,
11+
SubsequentResultRecord,
12+
} from './types.js';
13+
import {
14+
isDeferredFragmentRecord,
15+
isDeferredGroupedFieldSetRecord,
16+
} from './types.js';
17+
18+
/**
19+
* @internal
20+
*/
21+
export class IncrementalGraph {
22+
// these are assigned within the Promise executor called synchronously within the constructor
23+
newCompletedResultAvailable!: Promise<unknown>;
24+
private _resolve!: () => void;
25+
26+
private _pending: Set<SubsequentResultRecord>;
27+
private _newPending: Set<SubsequentResultRecord>;
28+
private _completedResultQueue: Array<IncrementalDataRecordResult>;
29+
30+
constructor() {
31+
this._pending = new Set();
32+
this._newPending = new Set();
33+
this._completedResultQueue = [];
34+
this._reset();
35+
}
36+
37+
addIncrementalDataRecords(
38+
incrementalDataRecords: ReadonlyArray<IncrementalDataRecord>,
39+
): void {
40+
for (const incrementalDataRecord of incrementalDataRecords) {
41+
if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) {
42+
for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) {
43+
deferredFragmentRecord.expectedReconcilableResults++;
44+
45+
this._addDeferredFragmentRecord(deferredFragmentRecord);
46+
}
47+
48+
const result = incrementalDataRecord.result;
49+
if (isPromise(result)) {
50+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
51+
result.then((resolved) => {
52+
this._enqueueCompletedDeferredGroupedFieldSet(resolved);
53+
});
54+
} else {
55+
this._enqueueCompletedDeferredGroupedFieldSet(result);
56+
}
57+
58+
continue;
59+
}
60+
61+
const streamRecord = incrementalDataRecord.streamRecord;
62+
if (streamRecord.id === undefined) {
63+
this._newPending.add(streamRecord);
64+
}
65+
66+
const result = incrementalDataRecord.result;
67+
if (isPromise(result)) {
68+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
69+
result.then((resolved) => {
70+
this._enqueueCompletedStreamItems(resolved);
71+
});
72+
} else {
73+
this._enqueueCompletedStreamItems(result);
74+
}
75+
}
76+
}
77+
78+
getNewPending(): ReadonlyArray<SubsequentResultRecord> {
79+
const maybeEmptyNewPending = this._newPending;
80+
const newPending = [];
81+
for (const node of maybeEmptyNewPending) {
82+
if (isDeferredFragmentRecord(node)) {
83+
if (node.expectedReconcilableResults) {
84+
this._pending.add(node);
85+
newPending.push(node);
86+
continue;
87+
}
88+
for (const child of node.children) {
89+
this._addNonEmptyNewPending(child, newPending);
90+
}
91+
} else {
92+
this._pending.add(node);
93+
newPending.push(node);
94+
}
95+
}
96+
this._newPending.clear();
97+
return newPending;
98+
}
99+
100+
*completedResults(): Generator<IncrementalDataRecordResult> {
101+
let completedResult: IncrementalDataRecordResult | undefined;
102+
while (
103+
(completedResult = this._completedResultQueue.shift()) !== undefined
104+
) {
105+
yield completedResult;
106+
}
107+
}
108+
109+
hasNext(): boolean {
110+
return this._pending.size > 0;
111+
}
112+
113+
completeDeferredFragment(
114+
deferredFragmentRecord: DeferredFragmentRecord,
115+
): Array<ReconcilableDeferredGroupedFieldSetResult> | undefined {
116+
const reconcilableResults = deferredFragmentRecord.reconcilableResults;
117+
if (
118+
deferredFragmentRecord.expectedReconcilableResults !==
119+
reconcilableResults.length
120+
) {
121+
return;
122+
}
123+
this._pending.delete(deferredFragmentRecord);
124+
for (const child of deferredFragmentRecord.children) {
125+
this._newPending.add(child);
126+
this._completedResultQueue.push(...child.results);
127+
}
128+
return reconcilableResults;
129+
}
130+
131+
removeSubsequentResultRecord(
132+
subsequentResultRecord: SubsequentResultRecord,
133+
): void {
134+
this._pending.delete(subsequentResultRecord);
135+
}
136+
137+
private _addDeferredFragmentRecord(
138+
deferredFragmentRecord: DeferredFragmentRecord,
139+
): void {
140+
const parent = deferredFragmentRecord.parent;
141+
if (parent === undefined) {
142+
// Below is equivalent and slightly faster version of:
143+
// if (this._pending.has(deferredFragmentRecord)) { ... }
144+
// as all released deferredFragmentRecords have ids.
145+
if (deferredFragmentRecord.id !== undefined) {
146+
return;
147+
}
148+
149+
this._newPending.add(deferredFragmentRecord);
150+
return;
151+
}
152+
153+
if (parent.children.has(deferredFragmentRecord)) {
154+
return;
155+
}
156+
157+
parent.children.add(deferredFragmentRecord);
158+
159+
this._addDeferredFragmentRecord(parent);
160+
}
161+
162+
private _addNonEmptyNewPending(
163+
deferredFragmentRecord: DeferredFragmentRecord,
164+
newPending: Array<SubsequentResultRecord>,
165+
): void {
166+
if (deferredFragmentRecord.expectedReconcilableResults) {
167+
this._pending.add(deferredFragmentRecord);
168+
newPending.push(deferredFragmentRecord);
169+
return;
170+
}
171+
/* c8 ignore next 5 */
172+
// TODO: add test case for this, if when skipping an empty deferred fragment, the empty fragment has nested children.
173+
for (const child of deferredFragmentRecord.children) {
174+
this._addNonEmptyNewPending(child, newPending);
175+
}
176+
}
177+
178+
private _enqueueCompletedDeferredGroupedFieldSet(
179+
result: DeferredGroupedFieldSetResult,
180+
): void {
181+
let hasPendingParent = false;
182+
for (const deferredFragmentRecord of result.deferredFragmentRecords) {
183+
if (deferredFragmentRecord.id !== undefined) {
184+
hasPendingParent = true;
185+
}
186+
deferredFragmentRecord.results.push(result);
187+
}
188+
if (hasPendingParent) {
189+
this._completedResultQueue.push(result);
190+
this._trigger();
191+
}
192+
}
193+
194+
private _enqueueCompletedStreamItems(result: StreamItemsResult): void {
195+
this._completedResultQueue.push(result);
196+
this._trigger();
197+
}
198+
199+
private _trigger() {
200+
this._resolve();
201+
this._reset();
202+
}
203+
204+
private _reset() {
205+
const { promise: newCompletedResultAvailable, resolve } =
206+
// promiseWithResolvers uses void only as a generic type parameter
207+
// see: https://typescript-eslint.io/rules/no-invalid-void-type/
208+
// eslint-disable-next-line @typescript-eslint/no-invalid-void-type
209+
promiseWithResolvers<void>();
210+
this._resolve = resolve;
211+
this.newCompletedResultAvailable = newCompletedResultAvailable;
212+
}
213+
}

0 commit comments

Comments
 (0)