@@ -53,6 +53,7 @@ import {
5353 collectSubfields as _collectSubfields ,
5454} from './collectFields.js' ;
5555import { mapAsyncIterable } from './mapAsyncIterable.js' ;
56+ import { Publisher } from './publisher.js' ;
5657import {
5758 getArgumentValues ,
5859 getDirectiveValues ,
@@ -2197,133 +2198,3 @@ function isStreamPayload(
21972198) : asyncPayload is StreamRecord {
21982199 return asyncPayload . type === 'stream' ;
21992200}
2200-
2201- interface Source {
2202- promise : Promise < void > ;
2203- isCompleted : boolean ;
2204- isCompletedIterator ?: boolean | undefined ;
2205- iterator ?: AsyncIterator < unknown > | undefined ;
2206- }
2207-
2208- type ToIncrementalResult < TSource extends Source , TIncremental > = (
2209- source : TSource ,
2210- ) => TIncremental ;
2211-
2212- type ToPayload < TIncremental , TPayload > = (
2213- incremental : ReadonlyArray < TIncremental > ,
2214- hasNext : boolean ,
2215- ) => TPayload ;
2216-
2217- /**
2218- * @internal
2219- */
2220- export class Publisher < TSource extends Source , TIncremental , TPayload > {
2221- sources : Set < TSource > ;
2222- toIncrementalResult : ToIncrementalResult < TSource , TIncremental > ;
2223- toPayload : ToPayload < TIncremental , TPayload > ;
2224-
2225- constructor (
2226- toIncrementalResult : ToIncrementalResult < TSource , TIncremental > ,
2227- toPayload : ToPayload < TIncremental , TPayload > ,
2228- ) {
2229- this . sources = new Set ( ) ;
2230- this . toIncrementalResult = toIncrementalResult ;
2231- this . toPayload = toPayload ;
2232- }
2233-
2234- add ( source : TSource ) {
2235- this . sources . add ( source ) ;
2236- }
2237-
2238- hasNext ( ) : boolean {
2239- return this . sources . size > 0 ;
2240- }
2241-
2242- filter ( predicate : ( source : TSource ) => boolean ) : void {
2243- this . sources . forEach ( ( source ) => {
2244- if ( predicate ( source ) ) {
2245- return ;
2246- }
2247- if ( source . iterator ?. return ) {
2248- source . iterator . return ( ) . catch ( ( ) => {
2249- // ignore error
2250- } ) ;
2251- }
2252- this . sources . delete ( source ) ;
2253- } ) ;
2254- }
2255-
2256- _getCompletedIncrementalResults ( ) : Array < TIncremental > {
2257- const incrementalResults : Array < TIncremental > = [ ] ;
2258- for ( const source of this . sources ) {
2259- if ( ! source . isCompleted ) {
2260- continue ;
2261- }
2262- this . sources . delete ( source ) ;
2263- if ( source . isCompletedIterator ) {
2264- continue ;
2265- }
2266- incrementalResults . push ( this . toIncrementalResult ( source ) ) ;
2267- }
2268- return incrementalResults ;
2269- }
2270-
2271- subscribe ( ) : AsyncGenerator < TPayload , void , void > {
2272- let isDone = false ;
2273-
2274- const next = async ( ) : Promise < IteratorResult < TPayload , void > > => {
2275- if ( isDone ) {
2276- return { value : undefined , done : true } ;
2277- }
2278-
2279- await Promise . race ( Array . from ( this . sources ) . map ( ( p ) => p . promise ) ) ;
2280-
2281- if ( isDone ) {
2282- return { value : undefined , done : true } ;
2283- }
2284-
2285- const incremental = this . _getCompletedIncrementalResults ( ) ;
2286- const hasNext = this . sources . size > 0 ;
2287-
2288- if ( ! incremental . length && hasNext ) {
2289- return next ( ) ;
2290- }
2291-
2292- if ( ! hasNext ) {
2293- isDone = true ;
2294- }
2295-
2296- return {
2297- value : this . toPayload ( incremental , hasNext ) ,
2298- done : false ,
2299- } ;
2300- } ;
2301-
2302- const returnIterators = ( ) => {
2303- const promises : Array < Promise < IteratorResult < unknown > > > = [ ] ;
2304- this . sources . forEach ( ( source ) => {
2305- if ( source . iterator ?. return ) {
2306- promises . push ( source . iterator . return ( ) ) ;
2307- }
2308- } ) ;
2309- return Promise . all ( promises ) ;
2310- } ;
2311-
2312- return {
2313- [ Symbol . asyncIterator ] ( ) {
2314- return this ;
2315- } ,
2316- next,
2317- async return ( ) : Promise < IteratorResult < TPayload , void > > {
2318- await returnIterators ( ) ;
2319- isDone = true ;
2320- return { value : undefined , done : true } ;
2321- } ,
2322- async throw ( error ?: unknown ) : Promise < IteratorResult < TPayload , void > > {
2323- await returnIterators ( ) ;
2324- isDone = true ;
2325- return Promise . reject ( error ) ;
2326- } ,
2327- } ;
2328- }
2329- }
0 commit comments