@@ -44,6 +44,8 @@ export type FairDequeuingStrategyOptions = {
44
44
* If not provided, no biasing will be applied (completely random shuffling)
45
45
*/
46
46
biases ?: FairDequeuingStrategyBiases ;
47
+ reuseSnapshotCount ?: number ;
48
+ maximumOrgCount ?: number ;
47
49
} ;
48
50
49
51
type FairQueueConcurrency = {
@@ -90,6 +92,10 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
90
92
} > ;
91
93
92
94
private _rng : seedrandom . PRNG ;
95
+ private _reusedSnapshotForConsumer : Map <
96
+ string ,
97
+ { snapshot : FairQueueSnapshot ; reuseCount : number }
98
+ > = new Map ( ) ;
93
99
94
100
constructor ( private options : FairDequeuingStrategyOptions ) {
95
101
const ctx = new DefaultStatefulContext ( ) ;
@@ -310,16 +316,53 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
310
316
span . setAttribute ( "consumer_id" , consumerId ) ;
311
317
span . setAttribute ( "parent_queue" , parentQueue ) ;
312
318
319
+ if (
320
+ typeof this . options . reuseSnapshotCount === "number" &&
321
+ this . options . reuseSnapshotCount > 0
322
+ ) {
323
+ const key = `${ parentQueue } :${ consumerId } ` ;
324
+ const reusedSnapshot = this . _reusedSnapshotForConsumer . get ( key ) ;
325
+
326
+ if ( reusedSnapshot ) {
327
+ if ( reusedSnapshot . reuseCount < this . options . reuseSnapshotCount ) {
328
+ span . setAttribute ( "reused_snapshot" , true ) ;
329
+
330
+ this . _reusedSnapshotForConsumer . set ( key , {
331
+ snapshot : reusedSnapshot . snapshot ,
332
+ reuseCount : reusedSnapshot . reuseCount + 1 ,
333
+ } ) ;
334
+
335
+ return reusedSnapshot . snapshot ;
336
+ } else {
337
+ this . _reusedSnapshotForConsumer . delete ( key ) ;
338
+ }
339
+ }
340
+ }
341
+
342
+ span . setAttribute ( "reused_snapshot" , false ) ;
343
+
313
344
const now = Date . now ( ) ;
314
345
315
- const queues = await this . #allChildQueuesByScore( parentQueue , consumerId , now ) ;
346
+ let queues = await this . #allChildQueuesByScore( parentQueue , consumerId , now ) ;
316
347
317
348
span . setAttribute ( "parent_queue_count" , queues . length ) ;
318
349
319
350
if ( queues . length === 0 ) {
320
351
return emptyFairQueueSnapshot ;
321
352
}
322
353
354
+ // Apply org selection if maximumOrgCount is specified
355
+ let selectedOrgIds : Set < string > ;
356
+ if ( this . options . maximumOrgCount && this . options . maximumOrgCount > 0 ) {
357
+ selectedOrgIds = this . #selectTopOrgs( queues , this . options . maximumOrgCount ) ;
358
+ // Filter queues to only include selected orgs
359
+ queues = queues . filter ( ( queue ) => selectedOrgIds . has ( queue . org ) ) ;
360
+
361
+ span . setAttribute ( "selected_org_count" , selectedOrgIds . size ) ;
362
+ }
363
+
364
+ span . setAttribute ( "selected_queue_count" , queues . length ) ;
365
+
323
366
const orgIds = new Set < string > ( ) ;
324
367
const envIds = new Set < string > ( ) ;
325
368
const envIdToOrgId = new Map < string , string > ( ) ;
@@ -341,10 +384,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
341
384
( org ) => org . concurrency . current >= org . concurrency . limit
342
385
) ;
343
386
344
- span . setAttributes ( {
345
- ...flattenAttributes ( orgsAtFullConcurrency , "orgs_at_full_concurrency" ) ,
346
- } ) ;
347
-
348
387
const orgIdsAtFullConcurrency = new Set ( orgsAtFullConcurrency . map ( ( org ) => org . id ) ) ;
349
388
350
389
const orgsSnapshot = orgs . reduce ( ( acc , org ) => {
@@ -355,6 +394,12 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
355
394
return acc ;
356
395
} , { } as Record < string , { concurrency : FairQueueConcurrency } > ) ;
357
396
397
+ span . setAttributes ( {
398
+ org_count : orgs . length ,
399
+ orgs_at_full_concurrency_count : orgsAtFullConcurrency . length ,
400
+ orgs_snapshot_count : Object . keys ( orgsSnapshot ) . length ,
401
+ } ) ;
402
+
358
403
if ( Object . keys ( orgsSnapshot ) . length === 0 ) {
359
404
return emptyFairQueueSnapshot ;
360
405
}
@@ -376,10 +421,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
376
421
( env ) => env . concurrency . current >= env . concurrency . limit
377
422
) ;
378
423
379
- span . setAttributes ( {
380
- ...flattenAttributes ( envsAtFullConcurrency , "envs_at_full_concurrency" ) ,
381
- } ) ;
382
-
383
424
const envIdsAtFullConcurrency = new Set ( envsAtFullConcurrency . map ( ( env ) => env . id ) ) ;
384
425
385
426
const envsSnapshot = envs . reduce ( ( acc , env ) => {
@@ -390,6 +431,11 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
390
431
return acc ;
391
432
} , { } as Record < string , { concurrency : FairQueueConcurrency } > ) ;
392
433
434
+ span . setAttributes ( {
435
+ env_count : envs . length ,
436
+ envs_at_full_concurrency_count : envsAtFullConcurrency . length ,
437
+ } ) ;
438
+
393
439
const queuesSnapshot = queues . filter (
394
440
( queue ) =>
395
441
! orgIdsAtFullConcurrency . has ( queue . org ) && ! envIdsAtFullConcurrency . has ( queue . env )
@@ -402,10 +448,66 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
402
448
queues : queuesSnapshot ,
403
449
} ;
404
450
451
+ if (
452
+ typeof this . options . reuseSnapshotCount === "number" &&
453
+ this . options . reuseSnapshotCount > 0
454
+ ) {
455
+ this . _reusedSnapshotForConsumer . set ( `${ parentQueue } :${ consumerId } ` , {
456
+ snapshot,
457
+ reuseCount : 0 ,
458
+ } ) ;
459
+ }
460
+
405
461
return snapshot ;
406
462
} ) ;
407
463
}
408
464
465
+ #selectTopOrgs( queues : FairQueue [ ] , maximumOrgCount : number ) : Set < string > {
466
+ // Group queues by org
467
+ const queuesByOrg = queues . reduce ( ( acc , queue ) => {
468
+ if ( ! acc [ queue . org ] ) {
469
+ acc [ queue . org ] = [ ] ;
470
+ }
471
+ acc [ queue . org ] . push ( queue ) ;
472
+ return acc ;
473
+ } , { } as Record < string , FairQueue [ ] > ) ;
474
+
475
+ // Calculate average age for each org
476
+ const orgAverageAges = Object . entries ( queuesByOrg ) . map ( ( [ orgId , orgQueues ] ) => {
477
+ const averageAge = orgQueues . reduce ( ( sum , q ) => sum + q . age , 0 ) / orgQueues . length ;
478
+ return { orgId, averageAge } ;
479
+ } ) ;
480
+
481
+ // Perform weighted shuffle based on average ages
482
+ const maxAge = Math . max ( ...orgAverageAges . map ( ( o ) => o . averageAge ) ) ;
483
+ const weightedOrgs = orgAverageAges . map ( ( org ) => ( {
484
+ orgId : org . orgId ,
485
+ weight : org . averageAge / maxAge , // Normalize weights
486
+ } ) ) ;
487
+
488
+ // Select top N orgs using weighted shuffle
489
+ const selectedOrgs = new Set < string > ( ) ;
490
+ let remainingOrgs = [ ...weightedOrgs ] ;
491
+ let totalWeight = remainingOrgs . reduce ( ( sum , org ) => sum + org . weight , 0 ) ;
492
+
493
+ while ( selectedOrgs . size < maximumOrgCount && remainingOrgs . length > 0 ) {
494
+ let random = this . _rng ( ) * totalWeight ;
495
+ let index = 0 ;
496
+
497
+ while ( random > 0 && index < remainingOrgs . length ) {
498
+ random -= remainingOrgs [ index ] . weight ;
499
+ index ++ ;
500
+ }
501
+ index = Math . max ( 0 , index - 1 ) ;
502
+
503
+ selectedOrgs . add ( remainingOrgs [ index ] . orgId ) ;
504
+ totalWeight -= remainingOrgs [ index ] . weight ;
505
+ remainingOrgs . splice ( index , 1 ) ;
506
+ }
507
+
508
+ return selectedOrgs ;
509
+ }
510
+
409
511
async #getOrgConcurrency( orgId : string ) : Promise < FairQueueConcurrency > {
410
512
return await startSpan ( this . options . tracer , "getOrgConcurrency" , async ( span ) => {
411
513
span . setAttribute ( "org_id" , orgId ) ;
0 commit comments