Skip to content

Commit

Permalink
fix: redis requeue concurrency bug #1800 (#1805)
Browse files Browse the repository at this point in the history
* fix: redis requeue concurrency bug  #1800

* fix: add unit test

* fix: update
  • Loading branch information
jiangxianfu authored Oct 30, 2023
1 parent 7187021 commit 3884eb9
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
4 changes: 3 additions & 1 deletion kombu/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,9 @@ def ack(self, delivery_tag):
def reject(self, delivery_tag, requeue=False):
if requeue:
self.restore_by_tag(delivery_tag, leftmost=True)
self.ack(delivery_tag)
else:
self._remove_from_indices(delivery_tag).execute()
super().ack(delivery_tag)

@contextmanager
def pipe_or_acquire(self, pipe=None, client=None):
Expand Down
11 changes: 9 additions & 2 deletions t/unit/transport/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1521,9 +1521,16 @@ def test_get_no_actions(self):
def test_qos_reject(self):
p, channel = self.create_get()
qos = redis.QoS(channel)
qos.ack = Mock(name='Qos.ack')
qos._remove_from_indices = Mock(name='_remove_from_indices')
qos.reject(1234)
qos.ack.assert_called_with(1234)
qos._remove_from_indices.assert_called_with(1234)

def test_qos_requeue(self):
p, channel = self.create_get()
qos = redis.QoS(channel)
qos.restore_by_tag = Mock(name='restore_by_tag')
qos.reject(1234, True)
qos.restore_by_tag.assert_called_with(1234, leftmost=True)

def test_get_brpop_qos_allow(self):
p, channel = self.create_get(queues=['a_queue'])
Expand Down

0 comments on commit 3884eb9

Please sign in to comment.