@@ -18,6 +18,7 @@ type SupervisorSessionOptions = SupervisorClientCommonOptions & {
1818 preDequeue ?: PreDequeueFn ;
1919 preSkip ?: PreSkipFn ;
2020 maxRunCount ?: number ;
21+ maxConsumerCount ?: number ;
2122} ;
2223
2324export class SupervisorSession extends EventEmitter < WorkerEvents > {
@@ -27,7 +28,7 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
2728 private runNotificationsSocket ?: Socket < WorkerServerToClientEvents , WorkerClientToServerEvents > ;
2829
2930 private readonly queueConsumerEnabled : boolean ;
30- private readonly queueConsumer : RunQueueConsumer ;
31+ private readonly queueConsumers : RunQueueConsumer [ ] ;
3132
3233 private readonly heartbeat : IntervalService ;
3334 private readonly heartbeatIntervalSeconds : number ;
@@ -39,13 +40,15 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
3940 this . queueConsumerEnabled = opts . queueConsumerEnabled ?? true ;
4041
4142 this . httpClient = new SupervisorHttpClient ( opts ) ;
42- this . queueConsumer = new RunQueueConsumer ( {
43- client : this . httpClient ,
44- preDequeue : opts . preDequeue ,
45- preSkip : opts . preSkip ,
46- onDequeue : this . onDequeue . bind ( this ) ,
47- intervalMs : opts . dequeueIntervalMs ,
48- maxRunCount : opts . maxRunCount ,
43+ this . queueConsumers = Array . from ( { length : opts . maxConsumerCount ?? 1 } , ( ) => {
44+ return new RunQueueConsumer ( {
45+ client : this . httpClient ,
46+ preDequeue : opts . preDequeue ,
47+ preSkip : opts . preSkip ,
48+ onDequeue : this . onDequeue . bind ( this ) ,
49+ intervalMs : opts . dequeueIntervalMs ,
50+ maxRunCount : opts . maxRunCount ,
51+ } ) ;
4952 } ) ;
5053
5154 // TODO: This should be dynamic and set by (or at least overridden by) the platform
@@ -181,7 +184,7 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
181184
182185 if ( this . queueConsumerEnabled ) {
183186 console . log ( "[SupervisorSession] Queue consumer enabled" ) ;
184- this . queueConsumer . start ( ) ;
187+ await Promise . allSettled ( this . queueConsumers . map ( async ( q ) => q . start ( ) ) ) ;
185188 this . heartbeat . start ( ) ;
186189 } else {
187190 console . warn ( "[SupervisorSession] Queue consumer disabled" ) ;
@@ -196,6 +199,7 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
196199 }
197200
198201 async stop ( ) {
202+ await Promise . allSettled ( this . queueConsumers . map ( async ( q ) => q . stop ( ) ) ) ;
199203 this . heartbeat . stop ( ) ;
200204 this . runNotificationsSocket ?. disconnect ( ) ;
201205 }
0 commit comments