@@ -65,6 +65,7 @@ export type MarQSOptions = {
6565 queuePriorityStrategy : MarQSQueuePriorityStrategy ;
6666 envQueuePriorityStrategy : MarQSQueuePriorityStrategy ;
6767 visibilityTimeoutStrategy : VisibilityTimeoutStrategy ;
68+ maximumNackCount : number ;
6869 enableRebalancing ?: boolean ;
6970 verbose ?: boolean ;
7071 subscriber ?: MessageQueueSubscriber ;
@@ -299,7 +300,6 @@ export class MarQS {
299300 const messageData = await this . #callDequeueMessage( {
300301 messageQueue,
301302 parentQueue,
302- visibilityQueue : constants . MESSAGE_VISIBILITY_TIMEOUT_QUEUE ,
303303 concurrencyLimitKey : this . keys . concurrencyLimitKeyFromQueue ( messageQueue ) ,
304304 currentConcurrencyKey : this . keys . currentConcurrencyKeyFromQueue ( messageQueue ) ,
305305 envConcurrencyLimitKey : this . keys . envConcurrencyLimitKeyFromQueue ( messageQueue ) ,
@@ -335,7 +335,6 @@ export class MarQS {
335335 parentQueue,
336336 messageKey : this . keys . messageKey ( messageData . messageId ) ,
337337 messageQueue : messageQueue ,
338- visibilityQueue : constants . MESSAGE_VISIBILITY_TIMEOUT_QUEUE ,
339338 concurrencyKey : this . keys . currentConcurrencyKeyFromQueue ( messageQueue ) ,
340339 envConcurrencyKey : this . keys . envCurrentConcurrencyKeyFromQueue ( messageQueue ) ,
341340 orgConcurrencyKey : this . keys . orgCurrentConcurrencyKeyFromQueue ( messageQueue ) ,
@@ -409,7 +408,6 @@ export class MarQS {
409408 const messageData = await this . #callDequeueMessage( {
410409 messageQueue,
411410 parentQueue,
412- visibilityQueue : constants . MESSAGE_VISIBILITY_TIMEOUT_QUEUE ,
413411 concurrencyLimitKey : this . keys . concurrencyLimitKeyFromQueue ( messageQueue ) ,
414412 currentConcurrencyKey : this . keys . currentConcurrencyKeyFromQueue ( messageQueue ) ,
415413 envConcurrencyLimitKey : this . keys . envConcurrencyLimitKeyFromQueue ( messageQueue ) ,
@@ -496,7 +494,6 @@ export class MarQS {
496494 parentQueue : message . parentQueue ,
497495 messageKey : this . keys . messageKey ( messageId ) ,
498496 messageQueue : message . queue ,
499- visibilityQueue : constants . MESSAGE_VISIBILITY_TIMEOUT_QUEUE ,
500497 concurrencyKey : this . keys . currentConcurrencyKeyFromQueue ( message . queue ) ,
501498 envConcurrencyKey : this . keys . envCurrentConcurrencyKeyFromQueue ( message . queue ) ,
502499 orgConcurrencyKey : this . keys . orgCurrentConcurrencyKeyFromQueue ( message . queue ) ,
@@ -565,7 +562,6 @@ export class MarQS {
565562 parentQueue : oldMessage . parentQueue ,
566563 messageKey : this . keys . messageKey ( messageId ) ,
567564 messageQueue : oldMessage . queue ,
568- visibilityQueue : constants . MESSAGE_VISIBILITY_TIMEOUT_QUEUE ,
569565 concurrencyKey : this . keys . currentConcurrencyKeyFromQueue ( oldMessage . queue ) ,
570566 envConcurrencyKey : this . keys . envCurrentConcurrencyKeyFromQueue ( oldMessage . queue ) ,
571567 orgConcurrencyKey : this . keys . orgCurrentConcurrencyKeyFromQueue ( oldMessage . queue ) ,
@@ -717,7 +713,7 @@ export class MarQS {
717713 const message = await this . readMessage ( messageId ) ;
718714
719715 if ( ! message ) {
720- logger . log ( `[${ this . name } ].nackMessage() message not found` , {
716+ logger . debug ( `[${ this . name } ].nackMessage() message not found` , {
721717 messageId,
722718 retryAt,
723719 updates,
@@ -726,6 +722,25 @@ export class MarQS {
726722 return ;
727723 }
728724
725+ const nackCount = await this . #getNackCount( messageId ) ;
726+
727+ span . setAttribute ( "nack_count" , nackCount ) ;
728+
729+ if ( nackCount >= this . options . maximumNackCount ) {
730+ logger . debug ( `[${ this . name } ].nackMessage() maximum nack count reached` , {
731+ messageId,
732+ retryAt,
733+ updates,
734+ service : this . name ,
735+ } ) ;
736+
737+ span . setAttribute ( "maximum_nack_count_reached" , true ) ;
738+
739+ // If we have reached the maximum nack count, we will ack the message
740+ await this . acknowledgeMessage ( messageId , "maximum nack count reached" ) ;
741+ return ;
742+ }
743+
729744 span . setAttributes ( {
730745 [ SemanticAttributes . QUEUE ] : message . queue ,
731746 [ SemanticAttributes . MESSAGE_ID ] : message . messageId ,
@@ -746,7 +761,7 @@ export class MarQS {
746761 concurrencyKey : this . keys . currentConcurrencyKeyFromQueue ( message . queue ) ,
747762 envConcurrencyKey : this . keys . envCurrentConcurrencyKeyFromQueue ( message . queue ) ,
748763 orgConcurrencyKey : this . keys . orgCurrentConcurrencyKeyFromQueue ( message . queue ) ,
749- visibilityQueue : constants . MESSAGE_VISIBILITY_TIMEOUT_QUEUE ,
764+ nackCounterKey : this . keys . nackCounterKey ( messageId ) ,
750765 messageId,
751766 messageScore : retryAt ,
752767 } ) ;
@@ -764,6 +779,12 @@ export class MarQS {
764779 ) ;
765780 }
766781
782+ async #getNackCount( messageId : string ) : Promise < number > {
783+ const result = await this . redis . get ( this . keys . nackCounterKey ( messageId ) ) ;
784+
785+ return result ? Number ( result ) : 0 ;
786+ }
787+
767788 // This should increment by the number of seconds, but with a max value of Date.now() + visibilityTimeoutInMs
768789 public async heartbeatMessage ( messageId : string ) {
769790 await this . options . visibilityTimeoutStrategy . heartbeat ( messageId , this . visibilityTimeoutInMs ) ;
@@ -1234,7 +1255,6 @@ export class MarQS {
12341255 async #callDequeueMessage( {
12351256 messageQueue,
12361257 parentQueue,
1237- visibilityQueue,
12381258 concurrencyLimitKey,
12391259 envConcurrencyLimitKey,
12401260 orgConcurrencyLimitKey,
@@ -1244,7 +1264,6 @@ export class MarQS {
12441264 } : {
12451265 messageQueue : string ;
12461266 parentQueue : string ;
1247- visibilityQueue : string ;
12481267 concurrencyLimitKey : string ;
12491268 envConcurrencyLimitKey : string ;
12501269 orgConcurrencyLimitKey : string ;
@@ -1303,7 +1322,6 @@ export class MarQS {
13031322 parentQueue,
13041323 messageKey,
13051324 messageQueue,
1306- visibilityQueue,
13071325 concurrencyKey,
13081326 envConcurrencyKey,
13091327 orgConcurrencyKey,
@@ -1312,7 +1330,6 @@ export class MarQS {
13121330 parentQueue : string ;
13131331 messageKey : string ;
13141332 messageQueue : string ;
1315- visibilityQueue : string ;
13161333 concurrencyKey : string ;
13171334 envConcurrencyKey : string ;
13181335 orgConcurrencyKey : string ;
@@ -1321,7 +1338,6 @@ export class MarQS {
13211338 logger . debug ( "Calling acknowledgeMessage" , {
13221339 messageKey,
13231340 messageQueue,
1324- visibilityQueue,
13251341 concurrencyKey,
13261342 envConcurrencyKey,
13271343 orgConcurrencyKey,
@@ -1334,7 +1350,6 @@ export class MarQS {
13341350 parentQueue ,
13351351 messageKey ,
13361352 messageQueue ,
1337- visibilityQueue ,
13381353 concurrencyKey ,
13391354 envConcurrencyKey ,
13401355 orgConcurrencyKey ,
@@ -1351,7 +1366,7 @@ export class MarQS {
13511366 concurrencyKey,
13521367 envConcurrencyKey,
13531368 orgConcurrencyKey,
1354- visibilityQueue ,
1369+ nackCounterKey ,
13551370 messageId,
13561371 messageScore,
13571372 } : {
@@ -1361,7 +1376,7 @@ export class MarQS {
13611376 concurrencyKey : string ;
13621377 envConcurrencyKey : string ;
13631378 orgConcurrencyKey : string ;
1364- visibilityQueue : string ;
1379+ nackCounterKey : string ;
13651380 messageId : string ;
13661381 messageScore : number ;
13671382 } ) {
@@ -1372,7 +1387,7 @@ export class MarQS {
13721387 concurrencyKey,
13731388 envConcurrencyKey,
13741389 orgConcurrencyKey,
1375- visibilityQueue ,
1390+ nackCounterKey ,
13761391 messageId,
13771392 messageScore,
13781393 service : this . name ,
@@ -1385,8 +1400,8 @@ export class MarQS {
13851400 concurrencyKey ,
13861401 envConcurrencyKey ,
13871402 orgConcurrencyKey ,
1388- visibilityQueue ,
13891403 this . keys . envQueueKeyFromQueue ( messageQueue ) ,
1404+ nackCounterKey ,
13901405 messageQueue ,
13911406 messageId ,
13921407 String ( Date . now ( ) ) ,
@@ -1604,17 +1619,16 @@ redis.call('SET', messageKey, messageData, 'GET')
16041619 } ) ;
16051620
16061621 this . redis . defineCommand ( "acknowledgeMessage" , {
1607- numberOfKeys : 8 ,
1622+ numberOfKeys : 7 ,
16081623 lua : `
1609- -- Keys: parentQueue, messageKey, messageQueue, visibilityQueue, concurrencyKey, envCurrentConcurrencyKey, orgCurrentConcurrencyKey
1624+ -- Keys: parentQueue, messageKey, messageQueue, concurrencyKey, envCurrentConcurrencyKey, orgCurrentConcurrencyKey
16101625local parentQueue = KEYS[1]
16111626local messageKey = KEYS[2]
16121627local messageQueue = KEYS[3]
1613- local visibilityQueue = KEYS[4]
1614- local concurrencyKey = KEYS[5]
1615- local envCurrentConcurrencyKey = KEYS[6]
1616- local orgCurrentConcurrencyKey = KEYS[7]
1617- local envQueueKey = KEYS[8]
1628+ local concurrencyKey = KEYS[4]
1629+ local envCurrentConcurrencyKey = KEYS[5]
1630+ local orgCurrentConcurrencyKey = KEYS[6]
1631+ local envQueueKey = KEYS[7]
16181632
16191633-- Args: messageId, messageQueueName
16201634local messageId = ARGV[1]
@@ -1637,9 +1651,6 @@ else
16371651 redis.call('ZADD', parentQueue, earliestMessage[2], messageQueueName)
16381652end
16391653
1640- -- Remove the message from the timeout queue (deprecated, will eventually remove this)
1641- redis.call('ZREM', visibilityQueue, messageId)
1642-
16431654-- Update the concurrency keys
16441655redis.call('SREM', concurrencyKey, messageId)
16451656redis.call('SREM', envCurrentConcurrencyKey, messageId)
@@ -1650,15 +1661,14 @@ redis.call('SREM', orgCurrentConcurrencyKey, messageId)
16501661 this . redis . defineCommand ( "nackMessage" , {
16511662 numberOfKeys : 8 ,
16521663 lua : `
1653- -- Keys: childQueueKey, parentQueueKey, visibilityQueue, concurrencyKey, envConcurrencyKey, orgConcurrencyKey, messageId
16541664local messageKey = KEYS[1]
16551665local childQueueKey = KEYS[2]
16561666local parentQueueKey = KEYS[3]
16571667local concurrencyKey = KEYS[4]
16581668local envConcurrencyKey = KEYS[5]
16591669local orgConcurrencyKey = KEYS[6]
1660- local visibilityQueue = KEYS[7]
1661- local envQueueKey = KEYS[8]
1670+ local envQueueKey = KEYS[7]
1671+ local nackCounterKey = KEYS[8]
16621672
16631673-- Args: childQueueName, messageId, currentTime, messageScore
16641674local childQueueName = ARGV[1]
@@ -1671,20 +1681,16 @@ redis.call('SREM', concurrencyKey, messageId)
16711681redis.call('SREM', envConcurrencyKey, messageId)
16721682redis.call('SREM', orgConcurrencyKey, messageId)
16731683
1674- -- Check to see if the message is still in the visibilityQueue
1675- local messageVisibility = tonumber(redis.call('ZSCORE', visibilityQueue, messageId)) or 0
1676-
1677- if messageVisibility > 0 then
1678- -- Remove the message from the timeout queue (deprecated, will eventually remove this)
1679- redis.call('ZREM', visibilityQueue, messageId)
1680- end
1681-
16821684-- Enqueue the message into the queue
16831685redis.call('ZADD', childQueueKey, messageScore, messageId)
16841686
16851687-- Enqueue the message into the env queue
16861688redis.call('ZADD', envQueueKey, messageScore, messageId)
16871689
1690+ -- Increment the nack counter with an expiry of 30 days
1691+ redis.call('INCR', nackCounterKey)
1692+ redis.call('EXPIRE', nackCounterKey, 2592000)
1693+
16881694-- Rebalance the parent queue
16891695local earliestMessage = redis.call('ZRANGE', childQueueKey, 0, 0, 'WITHSCORES')
16901696if #earliestMessage == 0 then
@@ -1713,36 +1719,6 @@ redis.call('SREM', orgCurrentConcurrencyKey, messageId)
17131719` ,
17141720 } ) ;
17151721
1716- this . redis . defineCommand ( "heartbeatMessage" , {
1717- numberOfKeys : 1 ,
1718- lua : `
1719- -- Keys: visibilityQueue
1720- local visibilityQueue = KEYS[1]
1721-
1722- -- Args: messageId, milliseconds, maxVisibilityTimeout
1723- local messageId = ARGV[1]
1724- local milliseconds = tonumber(ARGV[2])
1725- local maxVisibilityTimeout = tonumber(ARGV[3])
1726-
1727- -- Get the current visibility timeout
1728- local zscoreResult = redis.call('ZSCORE', visibilityQueue, messageId)
1729-
1730- -- If there's no currentVisibilityTimeout, return and do not execute ZADD
1731- if zscoreResult == false then
1732- return
1733- end
1734-
1735- local currentVisibilityTimeout = tonumber(zscoreResult)
1736-
1737-
1738- -- Calculate the new visibility timeout
1739- local newVisibilityTimeout = math.min(currentVisibilityTimeout + milliseconds * 1000, maxVisibilityTimeout)
1740-
1741- -- Update the visibility timeout
1742- redis.call('ZADD', visibilityQueue, newVisibilityTimeout, messageId)
1743- ` ,
1744- } ) ;
1745-
17461722 this . redis . defineCommand ( "calculateQueueCurrentConcurrencies" , {
17471723 numberOfKeys : 3 ,
17481724 lua : `
@@ -1854,7 +1830,6 @@ declare module "ioredis" {
18541830 parentQueue : string ,
18551831 messageKey : string ,
18561832 messageQueue : string ,
1857- visibilityQueue : string ,
18581833 concurrencyKey : string ,
18591834 envConcurrencyKey : string ,
18601835 orgConcurrencyKey : string ,
@@ -1871,8 +1846,8 @@ declare module "ioredis" {
18711846 concurrencyKey : string ,
18721847 envConcurrencyKey : string ,
18731848 orgConcurrencyKey : string ,
1874- visibilityQueue : string ,
18751849 envQueueKey : string ,
1850+ nackCounterKey : string ,
18761851 childQueueName : string ,
18771852 messageId : string ,
18781853 currentTime : string ,
@@ -1888,14 +1863,6 @@ declare module "ioredis" {
18881863 callback ?: Callback < void >
18891864 ) : Result < void , Context > ;
18901865
1891- heartbeatMessage (
1892- visibilityQueue : string ,
1893- messageId : string ,
1894- milliseconds : string ,
1895- maxVisibilityTimeout : string ,
1896- callback ?: Callback < void >
1897- ) : Result < void , Context > ;
1898-
18991866 updateGlobalConcurrencyLimits (
19001867 envConcurrencyLimitKey : string ,
19011868 orgConcurrencyLimitKey : string ,
@@ -1953,6 +1920,7 @@ function getMarQSClient() {
19531920 defaultOrgConcurrency : env . DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT ,
19541921 visibilityTimeoutInMs : env . MARQS_VISIBILITY_TIMEOUT_MS ,
19551922 enableRebalancing : ! env . MARQS_DISABLE_REBALANCING ,
1923+ maximumNackCount : env . MARQS_MAXIMUM_NACK_COUNT ,
19561924 subscriber : concurrencyTracker ,
19571925 } ) ;
19581926 } else {
0 commit comments