11import { redisTest } from "@internal/testcontainers" ;
22import { trace } from "@internal/tracing" ;
3- import { Logger } from "@trigger.dev/core/logger" ;
43import { describe } from "node:test" ;
54import { FairQueueSelectionStrategy } from "../fairQueueSelectionStrategy.js" ;
65import { RunQueue } from "../index.js" ;
@@ -12,7 +11,6 @@ const testOptions = {
1211 tracer : trace . getTracer ( "rq" ) ,
1312 workers : 1 ,
1413 defaultEnvConcurrency : 25 ,
15- logger : new Logger ( "RunQueue" , "warn" ) ,
1614 retryOptions : {
1715 maxAttempts : 5 ,
1816 factor : 1.1 ,
@@ -264,4 +262,95 @@ describe("RunQueue.dequeueMessageFromMasterQueue", () => {
264262 }
265263 }
266264 ) ;
265+
266+ redisTest (
267+ "should exit early when no messages can be dequeued in a full cycle" ,
268+ async ( { redisContainer } ) => {
269+ const mockLogger = {
270+ log : vi . fn ( ) ,
271+ error : vi . fn ( ) ,
272+ warn : vi . fn ( ) ,
273+ debug : vi . fn ( ) ,
274+ name : "test-logger" ,
275+ level : "debug" ,
276+ filteredKeys : [ ] ,
277+ additionalFields : { } ,
278+ setLevel : vi . fn ( ) ,
279+ setFilteredKeys : vi . fn ( ) ,
280+ setAdditionalFields : vi . fn ( ) ,
281+ child : vi . fn ( ) ,
282+ } ;
283+
284+ const queue = new RunQueue ( {
285+ ...testOptions ,
286+ queueSelectionStrategy : new FairQueueSelectionStrategy ( {
287+ redis : {
288+ keyPrefix : "runqueue:test:" ,
289+ host : redisContainer . getHost ( ) ,
290+ port : redisContainer . getPort ( ) ,
291+ } ,
292+ keys : testOptions . keys ,
293+ } ) ,
294+ redis : {
295+ keyPrefix : "runqueue:test:" ,
296+ host : redisContainer . getHost ( ) ,
297+ port : redisContainer . getPort ( ) ,
298+ } ,
299+ // @ts -expect-error
300+ logger : mockLogger ,
301+ } ) ;
302+
303+ try {
304+ const envMasterQueue = `env:${ authenticatedEnvDev . id } ` ;
305+ const queueCount = 10 ; // Reduced for simplicity
306+
307+ // First, create all queues and enqueue initial messages
308+ for ( let i = 0 ; i < queueCount ; i ++ ) {
309+ const queueName = `${ messageDev . queue } _${ i } ` ;
310+ // Set each queue's concurrency limit to 0 (this guarantees dequeue will fail)
311+ await queue . updateQueueConcurrencyLimits ( authenticatedEnvDev , queueName , 0 ) ;
312+
313+ // Enqueue a message to each queue
314+ await queue . enqueueMessage ( {
315+ env : authenticatedEnvDev ,
316+ message : { ...messageDev , runId : `r${ 4321 + i } ` , queue : queueName } ,
317+ masterQueues : [ "main" , envMasterQueue ] ,
318+ } ) ;
319+ }
320+
321+ // Try to dequeue messages - this should exit early due to concurrency limits
322+ const startTime = Date . now ( ) ;
323+ const dequeued = await queue . dequeueMessageFromMasterQueue (
324+ "test_12345" ,
325+ envMasterQueue ,
326+ queueCount
327+ ) ;
328+ const endTime = Date . now ( ) ;
329+
330+ // Verify no messages were dequeued
331+ expect ( dequeued . length ) . toBe ( 0 ) ;
332+
333+ // Verify the operation completed quickly (under 1000ms)
334+ const duration = endTime - startTime ;
335+ expect ( duration ) . toBeLessThan ( 1000 ) ;
336+
337+ // Verify we only logged one early exit message
338+ expect ( mockLogger . log ) . toHaveBeenCalledWith (
339+ expect . stringContaining ( "No successful dequeues in a full cycle, exiting" )
340+ ) ;
341+ expect ( mockLogger . log . mock . calls . length ) . toBeLessThanOrEqual ( 2 ) ;
342+
343+ // Verify all messages are still in queues
344+ let totalRemaining = 0 ;
345+ for ( let i = 0 ; i < queueCount ; i ++ ) {
346+ const queueName = `${ messageDev . queue } _${ i } ` ;
347+ const length = await queue . lengthOfQueue ( authenticatedEnvDev , queueName ) ;
348+ totalRemaining += length ;
349+ }
350+ expect ( totalRemaining ) . toBe ( queueCount ) ;
351+ } finally {
352+ await queue . quit ( ) ;
353+ }
354+ }
355+ ) ;
267356} ) ;
0 commit comments