@@ -109,6 +109,12 @@ export interface FormattedCompletedResult {
109
109
errors ?: ReadonlyArray < GraphQLError > ;
110
110
}
111
111
112
+ interface IncrementalAggregate {
113
+ newPendingSources : Set < DeferredFragmentRecord | StreamRecord > ;
114
+ incrementalResults : Array < IncrementalResult > ;
115
+ completedResults : Array < CompletedResult > ;
116
+ }
117
+
112
118
/**
113
119
* This class is used to publish incremental results to the client, enabling semi-concurrent
114
120
* execution while preserving result order.
@@ -179,20 +185,28 @@ export class IncrementalPublisher {
179
185
return { value : undefined , done : true } ;
180
186
}
181
187
182
- for ( const item of this . _released ) {
183
- this . _pending . delete ( item ) ;
184
- }
185
- const released = this . _released ;
186
- this . _released = new Set ( ) ;
188
+ if ( this . _released . size > 0 ) {
189
+ let aggregate = this . _incrementalInitializer ( ) ;
190
+ do {
191
+ for ( const item of this . _released ) {
192
+ this . _pending . delete ( item ) ;
193
+ }
194
+ const released = this . _released ;
195
+ this . _released = new Set ( ) ;
187
196
188
- const result = this . _getIncrementalResult ( released ) ;
197
+ aggregate = this . _incrementalReducer ( aggregate , released ) ;
198
+ } while ( this . _released . size > 0 ) ;
189
199
190
- if ( ! this . hasNext ( ) ) {
191
- isDone = true ;
192
- }
200
+ const hasNext = this . hasNext ( ) ;
201
+
202
+ if ( ! hasNext ) {
203
+ isDone = true ;
204
+ }
193
205
194
- if ( result !== undefined ) {
195
- return { value : result , done : false } ;
206
+ return {
207
+ value : this . _incrementalFinalizer ( aggregate ) ,
208
+ done : false ,
209
+ } ;
196
210
}
197
211
198
212
// eslint-disable-next-line no-await-in-loop
@@ -542,37 +556,20 @@ export class IncrementalPublisher {
542
556
this . _trigger ( ) ;
543
557
}
544
558
545
- private _getIncrementalResult (
546
- completedRecords : ReadonlySet < SubsequentResultRecord > ,
547
- ) : SubsequentIncrementalExecutionResult | undefined {
548
- const { pending, incremental, completed } =
549
- this . _processPending ( completedRecords ) ;
550
-
551
- const hasNext = this . hasNext ( ) ;
552
- if ( incremental . length === 0 && completed . length === 0 && hasNext ) {
553
- return undefined ;
554
- }
555
-
556
- const result : SubsequentIncrementalExecutionResult = { hasNext } ;
557
- if ( pending . length ) {
558
- result . pending = pending ;
559
- }
560
- if ( incremental . length ) {
561
- result . incremental = incremental ;
562
- }
563
- if ( completed . length ) {
564
- result . completed = completed ;
565
- }
566
-
567
- return result ;
559
+ private _incrementalInitializer ( ) : IncrementalAggregate {
560
+ return {
561
+ newPendingSources : new Set < DeferredFragmentRecord | StreamRecord > ( ) ,
562
+ incrementalResults : [ ] ,
563
+ completedResults : [ ] ,
564
+ } ;
568
565
}
569
566
570
- private _processPending (
567
+ private _incrementalReducer (
568
+ aggregate : IncrementalAggregate ,
571
569
completedRecords : ReadonlySet < SubsequentResultRecord > ,
572
- ) : IncrementalUpdate {
573
- const newPendingSources = new Set < DeferredFragmentRecord | StreamRecord > ( ) ;
574
- const incrementalResults : Array < IncrementalResult > = [ ] ;
575
- const completedResults : Array < CompletedResult > = [ ] ;
570
+ ) : IncrementalAggregate {
571
+ const { newPendingSources, incrementalResults, completedResults } =
572
+ aggregate ;
576
573
for ( const subsequentResultRecord of completedRecords ) {
577
574
for ( const child of subsequentResultRecord . children ) {
578
575
const pendingSource = isStreamItemsRecord ( child )
@@ -635,11 +632,30 @@ export class IncrementalPublisher {
635
632
}
636
633
}
637
634
638
- return {
639
- pending : this . pendingSourcesToResults ( newPendingSources ) ,
640
- incremental : incrementalResults ,
641
- completed : completedResults ,
635
+ return aggregate ;
636
+ }
637
+
638
+ private _incrementalFinalizer (
639
+ aggregate : IncrementalAggregate ,
640
+ ) : SubsequentIncrementalExecutionResult {
641
+ const { newPendingSources, incrementalResults, completedResults } =
642
+ aggregate ;
643
+ const pendingResults = this . pendingSourcesToResults ( newPendingSources ) ;
644
+
645
+ const result : SubsequentIncrementalExecutionResult = {
646
+ hasNext : this . hasNext ( ) ,
642
647
} ;
648
+ if ( pendingResults . length ) {
649
+ result . pending = pendingResults ;
650
+ }
651
+ if ( incrementalResults . length ) {
652
+ result . incremental = incrementalResults ;
653
+ }
654
+ if ( completedResults . length ) {
655
+ result . completed = completedResults ;
656
+ }
657
+
658
+ return result ;
643
659
}
644
660
645
661
private _completedRecordToResult (
0 commit comments