Skip to content

Commit

Permalink
avoid changing the ensure argument, fix retry handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Bogdanp committed Apr 27, 2024
1 parent 91c68d5 commit 4fde2a4
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 26 deletions.
23 changes: 12 additions & 11 deletions dramatiq/brokers/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ class RabbitmqBroker(Broker):
.. _ConnectionParameters: https://pika.readthedocs.io/en/stable/modules/parameters.html
"""

def __init__(self, *, confirm_delivery=False, url=None, middleware=None, max_priority=None, parameters=None,
**kwargs):
def __init__(self, *, confirm_delivery=False, url=None, middleware=None, max_priority=None, parameters=None, **kwargs):
super().__init__(middleware=middleware)

if max_priority is not None and not (0 < max_priority <= 255):
Expand Down Expand Up @@ -218,7 +217,7 @@ def consume(self, queue_name, prefetch=1, timeout=5000):
Returns:
Consumer: A consumer that retrieves messages from RabbitMQ.
"""
self.declare_queue(queue_name, ensure="strict")
self.declare_queue(queue_name, ensure=True)
return self.consumer_class(self.parameters, queue_name, prefetch, timeout)

def declare_queue(self, queue_name, *, ensure=False):
Expand All @@ -227,9 +226,8 @@ def declare_queue(self, queue_name, *, ensure=False):
Parameters:
queue_name(str): The name of the new queue.
ensure(bool|str): When True, the queue is created immediately on
the server. When 'strict', make sure the queue is created on the
server.
ensure(bool): When True, the queue is created on the server,
if necessary.
Raises:
ConnectionClosed: When ensure=True if the underlying channel
Expand All @@ -246,13 +244,13 @@ def declare_queue(self, queue_name, *, ensure=False):
self.emit_after("declare_delay_queue", delayed_name)

if ensure:
self._ensure_queue(queue_name, ensure == "strict")
self._ensure_queue(queue_name)

def _ensure_queue(self, queue_name, strict=False):
def _ensure_queue(self, queue_name):
attempts = 1
while True:
try:
if strict or queue_name in self.queues_pending:
if queue_name in self.queues_pending:
self._declare_queue(queue_name)
self._declare_dq_queue(queue_name)
self._declare_xq_queue(queue_name)
Expand Down Expand Up @@ -347,9 +345,12 @@ def enqueue(self, message, *, delay=None):
# next caller/attempt may initiate new ones of each.
del self.connection

# When a queue has been deleted, attempt to get it
# redeclared by removing it from the known set. The next
# time a message is enqueued -- which could be when we
# retry this block -- it will be redeclared.
if isinstance(e, ChannelClosedByBroker) and e.reply_code == 404:
self.queues_pending.add(queue_name(queue_name))
raise ConnectionClosed(e) from None
self.queues.remove(q_name(queue_name))

attempts += 1
if attempts > MAX_ENQUEUE_ATTEMPTS:
Expand Down
38 changes: 23 additions & 15 deletions tests/test_rabbitmq.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import os
import random
import string
import time
from threading import Event
from unittest.mock import Mock, patch
import string
import random

import pika.exceptions
import pytest

import dramatiq
from dramatiq import Message, QueueJoinTimeout, Worker
from dramatiq import Message, Middleware, QueueJoinTimeout, Worker
from dramatiq.brokers.rabbitmq import RabbitmqBroker, URLRabbitmqBroker, _IgnoreScaryLogs
from dramatiq.common import current_millis

Expand Down Expand Up @@ -52,8 +52,7 @@ def test_rabbitmq_broker_raises_an_error_if_given_invalid_parameter_combinations
# When I try to give it both a connection URL and a list of connection parameters
# Then a RuntimeError should be raised
with pytest.raises(RuntimeError):
RabbitmqBroker(url="amqp://127.0.0.1:5672",
parameters=[dict(host="127.0.0.1", credentials=RABBITMQ_CREDENTIALS)])
RabbitmqBroker(url="amqp://127.0.0.1:5672", parameters=[dict(host="127.0.0.1", credentials=RABBITMQ_CREDENTIALS)])

# When I try to give it both a connection URL and pika connection parameters
# Then a RuntimeError should be raised
Expand Down Expand Up @@ -467,25 +466,34 @@ def do_work():
worker.stop()


def test_rabbitmq_broker_retries_declaring_queues_when_declared_queues_is_gone(rabbitmq_broker):
def test_rabbitmq_broker_retries_declaring_queues_when_declared_queue_disappears(rabbitmq_broker):
executed = False

# I declare an actor
characters = string.ascii_letters + string.digits
suffix = "".join(random.choice(characters) for _ in range(2))
# Given that I have an actor on a flaky queue
flaky_queue_name = "flaky_queue"
rabbitmq_broker.channel.queue_delete(flaky_queue_name)

@dramatiq.actor(queue_name=f"flaky_queue_{suffix}")
@dramatiq.actor(queue_name=flaky_queue_name)
def do_work():
nonlocal executed
executed = True

# Let worker to ensure_queue and consume message
# When I start a server
worker = Worker(rabbitmq_broker, worker_threads=1)
worker.start()

# Check the queue is declared
rabbitmq_broker.channel.queue_declare(do_work.queue_name)
# Let the queue go unexpectedly
declared_ev = Event()

class DeclaredMiddleware(Middleware):
def after_declare_queue(self, broker, queue_name):
if queue_name == flaky_queue_name:
declared_ev.set()

# I expect that queue to be declared
rabbitmq_broker.add_middleware(DeclaredMiddleware())
assert declared_ev.wait(timeout=5)

# If I delete the queue
rabbitmq_broker.channel.queue_delete(do_work.queue_name)
with pytest.raises(pika.exceptions.ChannelClosedByBroker):
rabbitmq_broker.channel.queue_declare(do_work.queue_name, passive=True)
Expand All @@ -498,7 +506,7 @@ def do_work():
finally:
worker.stop()

# Then the queue should eventually be declared and the message executed
# Then the queue should be declared and the message executed
assert executed


Expand Down

0 comments on commit 4fde2a4

Please sign in to comment.