1- import type {
2- AsyncPayloadRecord ,
3- StreamRecord ,
4- IncrementalResult ,
5- SubsequentIncrementalExecutionResult ,
6- } from './execute.js' ;
7-
8- type ToIncrementalResult = (
9- asyncPayloadRecord : AsyncPayloadRecord ,
10- ) => IncrementalResult ;
11-
12- type ToPayload = (
13- incremental : ReadonlyArray < IncrementalResult > ,
1+ interface Source {
2+ promise : Promise < void > ;
3+ isCompleted : boolean ;
4+ isCompletedIterator ?: boolean | undefined ;
5+ iterator ?: AsyncIterator < unknown > | undefined ;
6+ }
7+
8+ type ToIncrementalResult < TSource extends Source , TIncremental > = (
9+ source : TSource ,
10+ ) => TIncremental ;
11+
12+ type ToPayload < TIncremental , TPayload > = (
13+ incremental : ReadonlyArray < TIncremental > ,
1414 hasNext : boolean ,
15- ) => SubsequentIncrementalExecutionResult ;
15+ ) => TPayload ;
1616
1717/**
1818 * @internal
1919 */
20- export class Publisher {
21- subsequentPayloads : Set < AsyncPayloadRecord > ;
22- toIncrementalResult : ToIncrementalResult ;
23- toPayload : ToPayload ;
24-
25- constructor ( toIncrementalResult : ToIncrementalResult , toPayload : ToPayload ) {
26- this . subsequentPayloads = new Set ( ) ;
20+ export class Publisher < TSource extends Source , TIncremental , TPayload > {
21+ sources : Set < TSource > ;
22+ toIncrementalResult : ToIncrementalResult < TSource , TIncremental > ;
23+ toPayload : ToPayload < TIncremental , TPayload > ;
24+
25+ constructor (
26+ toIncrementalResult : ToIncrementalResult < TSource , TIncremental > ,
27+ toPayload : ToPayload < TIncremental , TPayload > ,
28+ ) {
29+ this . sources = new Set ( ) ;
2730 this . toIncrementalResult = toIncrementalResult ;
2831 this . toPayload = toPayload ;
2932 }
3033
31- add ( payload : AsyncPayloadRecord ) {
32- this . subsequentPayloads . add ( payload ) ;
34+ add ( source : TSource ) {
35+ this . sources . add ( source ) ;
3336 }
3437
3538 hasNext ( ) : boolean {
36- return this . subsequentPayloads . size > 0 ;
39+ return this . sources . size > 0 ;
3740 }
3841
39- filter ( predicate : ( payload : AsyncPayloadRecord ) => boolean ) : void {
40- this . subsequentPayloads . forEach ( ( asyncRecord ) => {
41- if ( predicate ( asyncRecord ) ) {
42+ filter ( predicate : ( source : TSource ) => boolean ) : void {
43+ this . sources . forEach ( ( source ) => {
44+ if ( predicate ( source ) ) {
4245 return ;
4346 }
44- // asyncRecord path points to nulled error field
45- if ( isStreamPayload ( asyncRecord ) && asyncRecord . iterator ?. return ) {
46- asyncRecord . iterator . return ( ) . catch ( ( ) => {
47+ if ( source . iterator ?. return ) {
48+ source . iterator . return ( ) . catch ( ( ) => {
4749 // ignore error
4850 } ) ;
4951 }
50- this . subsequentPayloads . delete ( asyncRecord ) ;
52+ this . sources . delete ( source ) ;
5153 } ) ;
5254 }
5355
54- getCompletedIncrementalResults ( ) : Array < IncrementalResult > {
55- const incrementalResults : Array < IncrementalResult > = [ ] ;
56- for ( const asyncPayloadRecord of this . subsequentPayloads ) {
57- if ( ! asyncPayloadRecord . isCompleted ) {
56+ getCompletedIncrementalResults ( ) : Array < TIncremental > {
57+ const incrementalResults : Array < TIncremental > = [ ] ;
58+ for ( const source of this . sources ) {
59+ if ( ! source . isCompleted ) {
5860 continue ;
5961 }
60- this . subsequentPayloads . delete ( asyncPayloadRecord ) ;
61- if ( isStreamPayload ( asyncPayloadRecord ) ) {
62- if ( asyncPayloadRecord . isCompletedIterator ) {
63- // async iterable resolver just finished but there may be pending payloads
64- continue ;
65- }
62+ this . sources . delete ( source ) ;
63+ if ( source . isCompletedIterator ) {
64+ continue ;
6665 }
67- incrementalResults . push ( this . toIncrementalResult ( asyncPayloadRecord ) ) ;
66+ incrementalResults . push ( this . toIncrementalResult ( source ) ) ;
6867 }
6968 return incrementalResults ;
7069 }
7170
72- yieldSubsequentPayloads ( ) : AsyncGenerator <
73- SubsequentIncrementalExecutionResult ,
74- void ,
75- void
76- > {
71+ subscribe ( ) : AsyncGenerator < TPayload , void , void > {
7772 let isDone = false ;
7873 const publisher = this ;
7974
80- async function next ( ) : Promise <
81- IteratorResult < SubsequentIncrementalExecutionResult , void >
82- > {
75+ async function next ( ) : Promise < IteratorResult < TPayload , void > > {
8376 if ( isDone ) {
8477 return { value : undefined , done : true } ;
8578 }
8679
87- await Promise . race (
88- Array . from ( publisher . subsequentPayloads ) . map ( ( p ) => p . promise ) ,
89- ) ;
80+ await Promise . race ( Array . from ( publisher . sources ) . map ( ( p ) => p . promise ) ) ;
9081
9182 if ( isDone ) {
92- // a different call to next has exhausted all payloads
9383 return { value : undefined , done : true } ;
9484 }
9585
9686 const incremental = publisher . getCompletedIncrementalResults ( ) ;
97- const hasNext = publisher . subsequentPayloads . size > 0 ;
87+ const hasNext = publisher . sources . size > 0 ;
9888
9989 if ( ! incremental . length && hasNext ) {
10090 return next ( ) ;
@@ -110,14 +100,11 @@ export class Publisher {
110100 } ;
111101 }
112102
113- function returnStreamIterators ( ) {
103+ function returnIterators ( ) {
114104 const promises : Array < Promise < IteratorResult < unknown > > > = [ ] ;
115- publisher . subsequentPayloads . forEach ( ( asyncPayloadRecord ) => {
116- if (
117- isStreamPayload ( asyncPayloadRecord ) &&
118- asyncPayloadRecord . iterator ?. return
119- ) {
120- promises . push ( asyncPayloadRecord . iterator . return ( ) ) ;
105+ publisher . sources . forEach ( ( source ) => {
106+ if ( source . iterator ?. return ) {
107+ promises . push ( source . iterator . return ( ) ) ;
121108 }
122109 } ) ;
123110 return Promise . all ( promises ) ;
@@ -128,26 +115,16 @@ export class Publisher {
128115 return this ;
129116 } ,
130117 next,
131- async return ( ) : Promise <
132- IteratorResult < SubsequentIncrementalExecutionResult , void >
133- > {
134- await returnStreamIterators ( ) ;
118+ async return ( ) : Promise < IteratorResult < TPayload , void > > {
119+ await returnIterators ( ) ;
135120 isDone = true ;
136121 return { value : undefined , done : true } ;
137122 } ,
138- async throw (
139- error ?: unknown ,
140- ) : Promise < IteratorResult < SubsequentIncrementalExecutionResult , void > > {
141- await returnStreamIterators ( ) ;
123+ async throw ( error ?: unknown ) : Promise < IteratorResult < TPayload , void > > {
124+ await returnIterators ( ) ;
142125 isDone = true ;
143126 return Promise . reject ( error ) ;
144127 } ,
145128 } ;
146129 }
147130}
148-
149- function isStreamPayload (
150- asyncPayload : AsyncPayloadRecord ,
151- ) : asyncPayload is StreamRecord {
152- return asyncPayload . type === 'stream' ;
153- }
0 commit comments