Skip to content

Commit

Permalink
avoid declaring the wrong queue, drop unreachable block
Browse files Browse the repository at this point in the history
  • Loading branch information
Bogdanp committed Apr 28, 2024
1 parent 4fde2a4 commit 7391a36
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 12 deletions.
13 changes: 3 additions & 10 deletions dramatiq/brokers/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from threading import Event, local

import pika
from pika.exceptions import ChannelClosedByBroker

from ..broker import Broker, Consumer, MessageProxy
from ..common import current_millis, dq_name, q_name, xq_name
Expand Down Expand Up @@ -309,7 +308,8 @@ def enqueue(self, message, *, delay=None):
ConnectionClosed: If the underlying channel or connection
has been closed.
"""
queue_name = message.queue_name
canonical_queue_name = message.queue_name
queue_name = canonical_queue_name

if delay is not None:
queue_name = dq_name(queue_name)
Expand All @@ -324,7 +324,7 @@ def enqueue(self, message, *, delay=None):
attempts = 1
while True:
try:
self.declare_queue(queue_name, ensure=True)
self.declare_queue(canonical_queue_name, ensure=True)
self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name)
self.emit_before("enqueue", message, delay)
self.channel.basic_publish(
Expand All @@ -345,13 +345,6 @@ def enqueue(self, message, *, delay=None):
# next caller/attempt may initiate new ones of each.
del self.connection

# When a queue has been deleted, attempt to get it
# redeclared by removing it from the known set. The next
# time a message is enqueued -- which could be when we
# retry this block -- it will be redeclared.
if isinstance(e, ChannelClosedByBroker) and e.reply_code == 404:
self.queues.remove(q_name(queue_name))

attempts += 1
if attempts > MAX_ENQUEUE_ATTEMPTS:
raise ConnectionClosed(e) from None
Expand Down
2 changes: 0 additions & 2 deletions tests/test_rabbitmq.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import os
import random
import string
import time
from threading import Event
from unittest.mock import Mock, patch
Expand Down

0 comments on commit 7391a36

Please sign in to comment.