@@ -574,6 +574,8 @@ impl<H: DurableExecution> Queue<H> {
574574 self : & Arc < Self > ,
575575 batch_size : usize ,
576576 ) -> RedisResult < Vec < BorrowedJob < H :: JobData > > > {
577+ let pop_id = nanoid:: nanoid!( 4 ) ;
578+
577579 // Lua script that does:
578580 // 1. Clean up expired leases (with lease token validation)
579581 // 2. Process pending cancellations
@@ -582,8 +584,9 @@ impl<H: DurableExecution> Queue<H> {
582584 let script = redis:: Script :: new (
583585 r#"
584586 local now = tonumber(ARGV[1])
585- local batch_size = tonumber(ARGV[2])
586- local lease_seconds = tonumber(ARGV[3])
587+ local pop_id = ARGV[2]
588+ local batch_size = tonumber(ARGV[3])
589+ local lease_seconds = tonumber(ARGV[4])
587590
588591 local queue_id = KEYS[1]
589592 local delayed_zset_name = KEYS[2]
@@ -707,7 +710,7 @@ impl<H: DurableExecution> Queue<H> {
707710 local attempts = redis.call('HINCRBY', job_meta_hash_name, 'attempts', 1)
708711
709712 -- Generate unique lease token
710- local lease_token = now .. '_' .. job_id .. '_' .. attempts
713+ local lease_token = now .. '_' .. job_id .. '_' .. attempts .. '_' .. pop_id
711714
712715 -- Create separate lease key with TTL
713716 local lease_key = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':lease:' .. lease_token
@@ -748,6 +751,7 @@ impl<H: DurableExecution> Queue<H> {
748751 . key ( self . failed_list_name ( ) )
749752 . key ( self . success_list_name ( ) )
750753 . arg ( now)
754+ . arg ( pop_id)
751755 . arg ( batch_size)
752756 . arg ( self . options . lease_duration . as_secs ( ) )
753757 . invoke_async ( & mut self . redis . clone ( ) )
0 commit comments