Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added TTL and max length to _setup_queue for any private confirm queu… #35

Merged
merged 3 commits into from
Nov 21, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions python/idsse_common/idsse/common/publish_confirm.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class PublishConfirmParams(NamedTuple):
queue: Queue


class PublishConfirm():
class PublishConfirm:
"""This is a publisher that will handle unexpected interactions
with RabbitMQ such as channel and connection closures for any process.
If RabbitMQ closes the connection, it will reopen it. You should
Expand Down Expand Up @@ -142,7 +142,7 @@ def _start(self, callback: Optional[Callable[[], None]] = None):
Args:
callback (Optional[Callable[[], None]]): callback function to be invoked
once instance is ready to publish messages (all RabbitMQ connection and channel
are setup, delivery confirmation is enabled, etc.). Default to None.
are set up, delivery confirmation is enabled, etc.). Default to None.
"""
logger.debug('Starting thread with callback')
if callback is not None:
Expand Down Expand Up @@ -199,7 +199,7 @@ def _wait_for_channel_to_be_ready(self) -> None:
"""If connection or channel are not open, start the PublishConfirm to do needed
RabbitMQ setup. This method will not return until channel is confirmed ready for use"""

# validate that PublishConfirm thread has been setup and connected to RabbitMQ
# validate that PublishConfirm thread has been set up and connected to RabbitMQ
if not (self._connection and self._connection.is_open
and self._channel and self._channel.is_open):
logger.debug('Channel is not ready to publish, calling _start() now')
Expand Down Expand Up @@ -317,8 +317,12 @@ def _setup_queue(self, queue: Queue):
:param str|unicode queue_name: The name of the queue to declare.
"""
logger.debug('Declaring queue %s', queue.name)
args = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment about what it means when a queue name starts with '_' and what the args will do?

if queue.name.startswith('_'):
args = {'x-message-ttl': 10000, 'x-max-length': 1000} # 10 second TTL
self._channel.queue_declare(queue=queue.name,
durable=queue.durable,
arguments=args,
exclusive=queue.exclusive,
auto_delete=queue.auto_delete,
callback=self._on_queue_declareok)
Expand Down Expand Up @@ -390,7 +394,7 @@ def _on_delivery_confirmation(self, method_frame: Method):
ack_multiple = method.multiple
delivery_tag = method.delivery_tag

logger.debug('Received %s for delivery tag: %i (multiple: %s)',
logger.info('Received %s for delivery tag: %i (multiple: %s)',
confirmation_type, delivery_tag, ack_multiple)

if confirmation_type == 'ack':
Expand Down