@@ -121,7 +121,11 @@ export interface ExecutionContext {
121121 typeResolver : GraphQLTypeResolver < any , any > ;
122122 subscribeFieldResolver : GraphQLFieldResolver < any , any > ;
123123 errors : Array < GraphQLError > ;
124- subsequentPayloads : Set < AsyncPayloadRecord > ;
124+ publisher : Publisher <
125+ AsyncPayloadRecord ,
126+ IncrementalResult ,
127+ SubsequentIncrementalExecutionResult
128+ > ;
125129}
126130
127131/**
@@ -357,13 +361,14 @@ function executeImpl(
357361 return result . then (
358362 ( data ) => {
359363 const initialResult = buildResponse ( data , exeContext . errors ) ;
360- if ( exeContext . subsequentPayloads . size > 0 ) {
364+ const publisher = exeContext . publisher ;
365+ if ( publisher . hasNext ( ) ) {
361366 return {
362367 initialResult : {
363368 ...initialResult ,
364369 hasNext : true ,
365370 } ,
366- subsequentResults : yieldSubsequentPayloads ( exeContext ) ,
371+ subsequentResults : publisher . subscribe ( ) ,
367372 } ;
368373 }
369374 return initialResult ;
@@ -375,13 +380,14 @@ function executeImpl(
375380 ) ;
376381 }
377382 const initialResult = buildResponse ( result , exeContext . errors ) ;
378- if ( exeContext . subsequentPayloads . size > 0 ) {
383+ const publisher = exeContext . publisher ;
384+ if ( publisher . hasNext ( ) ) {
379385 return {
380386 initialResult : {
381387 ...initialResult ,
382388 hasNext : true ,
383389 } ,
384- subsequentResults : yieldSubsequentPayloads ( exeContext ) ,
390+ subsequentResults : publisher . subscribe ( ) ,
385391 } ;
386392 }
387393 return initialResult ;
@@ -503,7 +509,7 @@ export function buildExecutionContext(
503509 fieldResolver : fieldResolver ?? defaultFieldResolver ,
504510 typeResolver : typeResolver ?? defaultTypeResolver ,
505511 subscribeFieldResolver : subscribeFieldResolver ?? defaultFieldResolver ,
506- subsequentPayloads : new Set ( ) ,
512+ publisher : new Publisher ( resultFromAsyncPayloadRecord , payloadFromResults ) ,
507513 errors : [ ] ,
508514 } ;
509515}
@@ -515,7 +521,7 @@ function buildPerEventExecutionContext(
515521 return {
516522 ...exeContext ,
517523 rootValue : payload ,
518- subsequentPayloads : new Set ( ) ,
524+ publisher : new Publisher ( resultFromAsyncPayloadRecord , payloadFromResults ) ,
519525 errors : [ ] ,
520526 } ;
521527}
@@ -2038,132 +2044,49 @@ function filterSubsequentPayloads(
20382044 currentAsyncRecord : AsyncPayloadRecord | undefined ,
20392045) : void {
20402046 const nullPathArray = pathToArray ( nullPath ) ;
2041- exeContext . subsequentPayloads . forEach ( ( asyncRecord ) => {
2047+ exeContext . publisher . filter ( ( asyncRecord ) => {
20422048 if ( asyncRecord === currentAsyncRecord ) {
20432049 // don't remove payload from where error originates
2044- return ;
2050+ return true ;
20452051 }
20462052 for ( let i = 0 ; i < nullPathArray . length ; i ++ ) {
20472053 if ( asyncRecord . path [ i ] !== nullPathArray [ i ] ) {
20482054 // asyncRecord points to a path unaffected by this payload
2049- return ;
2055+ return true ;
20502056 }
20512057 }
2052- // asyncRecord path points to nulled error field
2053- if ( isStreamPayload ( asyncRecord ) && asyncRecord . iterator ?. return ) {
2054- asyncRecord . iterator . return ( ) . catch ( ( ) => {
2055- // ignore error
2056- } ) ;
2057- }
2058- exeContext . subsequentPayloads . delete ( asyncRecord ) ;
2058+
2059+ return false ;
20592060 } ) ;
20602061}
20612062
2062- function getCompletedIncrementalResults (
2063- exeContext : ExecutionContext ,
2064- ) : Array < IncrementalResult > {
2065- const incrementalResults : Array < IncrementalResult > = [ ] ;
2066- for ( const asyncPayloadRecord of exeContext . subsequentPayloads ) {
2067- const incrementalResult : IncrementalResult = { } ;
2068- if ( ! asyncPayloadRecord . isCompleted ) {
2069- continue ;
2070- }
2071- exeContext . subsequentPayloads . delete ( asyncPayloadRecord ) ;
2072- if ( isStreamPayload ( asyncPayloadRecord ) ) {
2073- const items = asyncPayloadRecord . items ;
2074- if ( asyncPayloadRecord . isCompletedIterator ) {
2075- // async iterable resolver just finished but there may be pending payloads
2076- continue ;
2077- }
2078- ( incrementalResult as IncrementalStreamResult ) . items = items ;
2079- } else {
2080- const data = asyncPayloadRecord . data ;
2081- ( incrementalResult as IncrementalDeferResult ) . data = data ?? null ;
2082- }
2083-
2084- incrementalResult . path = asyncPayloadRecord . path ;
2085- if ( asyncPayloadRecord . label ) {
2086- incrementalResult . label = asyncPayloadRecord . label ;
2087- }
2088- if ( asyncPayloadRecord . errors . length > 0 ) {
2089- incrementalResult . errors = asyncPayloadRecord . errors ;
2090- }
2091- incrementalResults . push ( incrementalResult ) ;
2063+ function resultFromAsyncPayloadRecord (
2064+ asyncPayloadRecord : AsyncPayloadRecord ,
2065+ ) : IncrementalResult {
2066+ const incrementalResult : IncrementalResult = { } ;
2067+ if ( isStreamPayload ( asyncPayloadRecord ) ) {
2068+ const items = asyncPayloadRecord . items ;
2069+ ( incrementalResult as IncrementalStreamResult ) . items = items ;
2070+ } else {
2071+ const data = asyncPayloadRecord . data ;
2072+ ( incrementalResult as IncrementalDeferResult ) . data = data ?? null ;
20922073 }
2093- return incrementalResults ;
2094- }
2095-
2096- function yieldSubsequentPayloads (
2097- exeContext : ExecutionContext ,
2098- ) : AsyncGenerator < SubsequentIncrementalExecutionResult , void , void > {
2099- let isDone = false ;
2100-
2101- async function next ( ) : Promise <
2102- IteratorResult < SubsequentIncrementalExecutionResult , void >
2103- > {
2104- if ( isDone ) {
2105- return { value : undefined , done : true } ;
2106- }
2107-
2108- await Promise . race (
2109- Array . from ( exeContext . subsequentPayloads ) . map ( ( p ) => p . promise ) ,
2110- ) ;
2111-
2112- if ( isDone ) {
2113- // a different call to next has exhausted all payloads
2114- return { value : undefined , done : true } ;
2115- }
21162074
2117- const incremental = getCompletedIncrementalResults ( exeContext ) ;
2118- const hasNext = exeContext . subsequentPayloads . size > 0 ;
2119-
2120- if ( ! incremental . length && hasNext ) {
2121- return next ( ) ;
2122- }
2123-
2124- if ( ! hasNext ) {
2125- isDone = true ;
2126- }
2127-
2128- return {
2129- value : incremental . length ? { incremental, hasNext } : { hasNext } ,
2130- done : false ,
2131- } ;
2075+ incrementalResult . path = asyncPayloadRecord . path ;
2076+ if ( asyncPayloadRecord . label ) {
2077+ incrementalResult . label = asyncPayloadRecord . label ;
21322078 }
2133-
2134- function returnStreamIterators ( ) {
2135- const promises : Array < Promise < IteratorResult < unknown > > > = [ ] ;
2136- exeContext . subsequentPayloads . forEach ( ( asyncPayloadRecord ) => {
2137- if (
2138- isStreamPayload ( asyncPayloadRecord ) &&
2139- asyncPayloadRecord . iterator ?. return
2140- ) {
2141- promises . push ( asyncPayloadRecord . iterator . return ( ) ) ;
2142- }
2143- } ) ;
2144- return Promise . all ( promises ) ;
2079+ if ( asyncPayloadRecord . errors . length > 0 ) {
2080+ incrementalResult . errors = asyncPayloadRecord . errors ;
21452081 }
2082+ return incrementalResult ;
2083+ }
21462084
2147- return {
2148- [ Symbol . asyncIterator ] ( ) {
2149- return this ;
2150- } ,
2151- next,
2152- async return ( ) : Promise <
2153- IteratorResult < SubsequentIncrementalExecutionResult , void >
2154- > {
2155- await returnStreamIterators ( ) ;
2156- isDone = true ;
2157- return { value : undefined , done : true } ;
2158- } ,
2159- async throw (
2160- error ?: unknown ,
2161- ) : Promise < IteratorResult < SubsequentIncrementalExecutionResult , void > > {
2162- await returnStreamIterators ( ) ;
2163- isDone = true ;
2164- return Promise . reject ( error ) ;
2165- } ,
2166- } ;
2085+ function payloadFromResults (
2086+ incremental : ReadonlyArray < IncrementalResult > ,
2087+ hasNext : boolean ,
2088+ ) : SubsequentIncrementalExecutionResult {
2089+ return incremental . length ? { incremental, hasNext } : { hasNext } ;
21672090}
21682091
21692092class DeferredFragmentRecord {
@@ -2189,7 +2112,7 @@ class DeferredFragmentRecord {
21892112 this . parentContext = opts . parentContext ;
21902113 this . errors = [ ] ;
21912114 this . _exeContext = opts . exeContext ;
2192- this . _exeContext . subsequentPayloads . add ( this ) ;
2115+ this . _exeContext . publisher . add ( this ) ;
21932116 this . isCompleted = false ;
21942117 this . data = null ;
21952118 this . promise = new Promise < ObjMap < unknown > | null > ( ( resolve ) => {
@@ -2240,7 +2163,7 @@ class StreamRecord {
22402163 this . iterator = opts . iterator ;
22412164 this . errors = [ ] ;
22422165 this . _exeContext = opts . exeContext ;
2243- this . _exeContext . subsequentPayloads . add ( this ) ;
2166+ this . _exeContext . publisher . add ( this ) ;
22442167 this . isCompleted = false ;
22452168 this . items = null ;
22462169 this . promise = new Promise < Array < unknown > | null > ( ( resolve ) => {
@@ -2274,3 +2197,133 @@ function isStreamPayload(
22742197) : asyncPayload is StreamRecord {
22752198 return asyncPayload . type === 'stream' ;
22762199}
2200+
2201+ interface Source {
2202+ promise : Promise < void > ;
2203+ isCompleted : boolean ;
2204+ isCompletedIterator ?: boolean | undefined ;
2205+ iterator ?: AsyncIterator < unknown > | undefined ;
2206+ }
2207+
2208+ type ToIncrementalResult < TSource extends Source , TIncremental > = (
2209+ source : TSource ,
2210+ ) => TIncremental ;
2211+
2212+ type ToPayload < TIncremental , TPayload > = (
2213+ incremental : ReadonlyArray < TIncremental > ,
2214+ hasNext : boolean ,
2215+ ) => TPayload ;
2216+
2217+ /**
2218+ * @internal
2219+ */
2220+ export class Publisher < TSource extends Source , TIncremental , TPayload > {
2221+ sources : Set < TSource > ;
2222+ toIncrementalResult : ToIncrementalResult < TSource , TIncremental > ;
2223+ toPayload : ToPayload < TIncremental , TPayload > ;
2224+
2225+ constructor (
2226+ toIncrementalResult : ToIncrementalResult < TSource , TIncremental > ,
2227+ toPayload : ToPayload < TIncremental , TPayload > ,
2228+ ) {
2229+ this . sources = new Set ( ) ;
2230+ this . toIncrementalResult = toIncrementalResult ;
2231+ this . toPayload = toPayload ;
2232+ }
2233+
2234+ add ( source : TSource ) {
2235+ this . sources . add ( source ) ;
2236+ }
2237+
2238+ hasNext ( ) : boolean {
2239+ return this . sources . size > 0 ;
2240+ }
2241+
2242+ filter ( predicate : ( source : TSource ) => boolean ) : void {
2243+ this . sources . forEach ( ( source ) => {
2244+ if ( predicate ( source ) ) {
2245+ return ;
2246+ }
2247+ if ( source . iterator ?. return ) {
2248+ source . iterator . return ( ) . catch ( ( ) => {
2249+ // ignore error
2250+ } ) ;
2251+ }
2252+ this . sources . delete ( source ) ;
2253+ } ) ;
2254+ }
2255+
2256+ _getCompletedIncrementalResults ( ) : Array < TIncremental > {
2257+ const incrementalResults : Array < TIncremental > = [ ] ;
2258+ for ( const source of this . sources ) {
2259+ if ( ! source . isCompleted ) {
2260+ continue ;
2261+ }
2262+ this . sources . delete ( source ) ;
2263+ if ( source . isCompletedIterator ) {
2264+ continue ;
2265+ }
2266+ incrementalResults . push ( this . toIncrementalResult ( source ) ) ;
2267+ }
2268+ return incrementalResults ;
2269+ }
2270+
2271+ subscribe ( ) : AsyncGenerator < TPayload , void , void > {
2272+ let isDone = false ;
2273+
2274+ const next = async ( ) : Promise < IteratorResult < TPayload , void > > => {
2275+ if ( isDone ) {
2276+ return { value : undefined , done : true } ;
2277+ }
2278+
2279+ await Promise . race ( Array . from ( this . sources ) . map ( ( p ) => p . promise ) ) ;
2280+
2281+ if ( isDone ) {
2282+ return { value : undefined , done : true } ;
2283+ }
2284+
2285+ const incremental = this . _getCompletedIncrementalResults ( ) ;
2286+ const hasNext = this . sources . size > 0 ;
2287+
2288+ if ( ! incremental . length && hasNext ) {
2289+ return next ( ) ;
2290+ }
2291+
2292+ if ( ! hasNext ) {
2293+ isDone = true ;
2294+ }
2295+
2296+ return {
2297+ value : this . toPayload ( incremental , hasNext ) ,
2298+ done : false ,
2299+ } ;
2300+ } ;
2301+
2302+ const returnIterators = ( ) => {
2303+ const promises : Array < Promise < IteratorResult < unknown > > > = [ ] ;
2304+ this . sources . forEach ( ( source ) => {
2305+ if ( source . iterator ?. return ) {
2306+ promises . push ( source . iterator . return ( ) ) ;
2307+ }
2308+ } ) ;
2309+ return Promise . all ( promises ) ;
2310+ } ;
2311+
2312+ return {
2313+ [ Symbol . asyncIterator ] ( ) {
2314+ return this ;
2315+ } ,
2316+ next,
2317+ async return ( ) : Promise < IteratorResult < TPayload , void > > {
2318+ await returnIterators ( ) ;
2319+ isDone = true ;
2320+ return { value : undefined , done : true } ;
2321+ } ,
2322+ async throw ( error ?: unknown ) : Promise < IteratorResult < TPayload , void > > {
2323+ await returnIterators ( ) ;
2324+ isDone = true ;
2325+ return Promise . reject ( error ) ;
2326+ } ,
2327+ } ;
2328+ }
2329+ }
0 commit comments