@@ -48,6 +48,11 @@ export enum ConnectivityState {
4848 SHUTDOWN ,
4949}
5050
51+ /**
52+ * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
53+ */
54+ const MAX_TIMEOUT_TIME = 2147483647 ;
55+
5156let nextCallNumber = 0 ;
5257
5358function getNewCallNumber ( ) : number {
@@ -137,6 +142,14 @@ export class ChannelImplementation implements Channel {
137142 private defaultAuthority : string ;
138143 private filterStackFactory : FilterStackFactory ;
139144 private target : GrpcUri ;
145+ /**
146+ * This timer does not do anything on its own. Its purpose is to hold the
147+ * event loop open while there are any pending calls for the channel that
148+ * have not yet been assigned to specific subchannels. In other words,
149+ * the invariant is that callRefTimer is reffed if and only if pickQueue
150+ * is non-empty.
151+ */
152+ private callRefTimer : NodeJS . Timer ;
140153 constructor (
141154 target : string ,
142155 private readonly credentials : ChannelCredentials ,
@@ -177,6 +190,10 @@ export class ChannelImplementation implements Channel {
177190 `Could not find a default scheme for target name "${ target } "`
178191 ) ;
179192 }
193+
194+ this . callRefTimer = setInterval ( ( ) => { } , MAX_TIMEOUT_TIME ) ;
195+ this . callRefTimer . unref ?.( ) ;
196+
180197 if ( this . options [ 'grpc.default_authority' ] ) {
181198 this . defaultAuthority = this . options [ 'grpc.default_authority' ] as string ;
182199 } else {
@@ -206,6 +223,7 @@ export class ChannelImplementation implements Channel {
206223 updateState : ( connectivityState : ConnectivityState , picker : Picker ) => {
207224 this . currentPicker = picker ;
208225 const queueCopy = this . pickQueue . slice ( ) ;
226+ this . callRefTimer . unref ?.( ) ;
209227 this . pickQueue = [ ] ;
210228 for ( const { callStream, callMetadata } of queueCopy ) {
211229 this . tryPick ( callStream , callMetadata ) ;
@@ -232,6 +250,11 @@ export class ChannelImplementation implements Channel {
232250 ] ) ;
233251 }
234252
253+ private pushPick ( callStream : Http2CallStream , callMetadata : Metadata ) {
254+ this . callRefTimer . ref ?.( ) ;
255+ this . pickQueue . push ( { callStream, callMetadata } ) ;
256+ }
257+
235258 /**
236259 * Check the picker output for the given call and corresponding metadata,
237260 * and take any relevant actions. Should not be called while iterating
@@ -276,7 +299,7 @@ export class ChannelImplementation implements Channel {
276299 ' has state ' +
277300 ConnectivityState [ pickResult . subchannel ! . getConnectivityState ( ) ]
278301 ) ;
279- this . pickQueue . push ( { callStream, callMetadata } ) ;
302+ this . pushPick ( callStream , callMetadata ) ;
280303 break ;
281304 }
282305 /* We need to clone the callMetadata here because the transparent
@@ -367,11 +390,11 @@ export class ChannelImplementation implements Channel {
367390 }
368391 break ;
369392 case PickResultType . QUEUE :
370- this . pickQueue . push ( { callStream, callMetadata } ) ;
393+ this . pushPick ( callStream , callMetadata ) ;
371394 break ;
372395 case PickResultType . TRANSIENT_FAILURE :
373396 if ( callMetadata . getOptions ( ) . waitForReady ) {
374- this . pickQueue . push ( { callStream, callMetadata } ) ;
397+ this . pushPick ( callStream , callMetadata ) ;
375398 } else {
376399 callStream . cancelWithStatus (
377400 pickResult . status ! . code ,
@@ -433,6 +456,7 @@ export class ChannelImplementation implements Channel {
433456 close ( ) {
434457 this . resolvingLoadBalancer . destroy ( ) ;
435458 this . updateState ( ConnectivityState . SHUTDOWN ) ;
459+ clearInterval ( this . callRefTimer ) ;
436460
437461 this . subchannelPool . unrefUnusedSubchannels ( ) ;
438462 }
0 commit comments