From 3884eb9dd62bf3ee2d47dacc5f7a764936b16b54 Mon Sep 17 00:00:00 2001 From: jiangxianfu Date: Mon, 30 Oct 2023 21:34:31 +0800 Subject: [PATCH] fix: redis requeue concurrency bug #1800 (#1805) * fix: redis requeue concurrency bug #1800 * fix: add unit test * fix: update --- kombu/transport/redis.py | 4 +++- t/unit/transport/test_redis.py | 11 +++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/kombu/transport/redis.py b/kombu/transport/redis.py index 2d932e972..abbe92727 100644 --- a/kombu/transport/redis.py +++ b/kombu/transport/redis.py @@ -382,7 +382,9 @@ def ack(self, delivery_tag): def reject(self, delivery_tag, requeue=False): if requeue: self.restore_by_tag(delivery_tag, leftmost=True) - self.ack(delivery_tag) + else: + self._remove_from_indices(delivery_tag).execute() + super().ack(delivery_tag) @contextmanager def pipe_or_acquire(self, pipe=None, client=None): diff --git a/t/unit/transport/test_redis.py b/t/unit/transport/test_redis.py index a5fa5ea04..ecbed7eb0 100644 --- a/t/unit/transport/test_redis.py +++ b/t/unit/transport/test_redis.py @@ -1521,9 +1521,16 @@ def test_get_no_actions(self): def test_qos_reject(self): p, channel = self.create_get() qos = redis.QoS(channel) - qos.ack = Mock(name='Qos.ack') + qos._remove_from_indices = Mock(name='_remove_from_indices') qos.reject(1234) - qos.ack.assert_called_with(1234) + qos._remove_from_indices.assert_called_with(1234) + + def test_qos_requeue(self): + p, channel = self.create_get() + qos = redis.QoS(channel) + qos.restore_by_tag = Mock(name='restore_by_tag') + qos.reject(1234, True) + qos.restore_by_tag.assert_called_with(1234, leftmost=True) def test_get_brpop_qos_allow(self): p, channel = self.create_get(queues=['a_queue'])