Skip to content

Commit

Permalink
RabbitListener support threading
Browse files Browse the repository at this point in the history
  • Loading branch information
mic1on committed Aug 14, 2024
1 parent f36f0e2 commit 253fe28
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 48 deletions.
26 changes: 0 additions & 26 deletions example/demo.py

This file was deleted.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "use-rabbitmq"
version = "0.2.1"
version = "0.2.2"
description = ""
authors = ["miclon <jcnd@163.com>"]
readme = "README.md"
Expand Down
62 changes: 44 additions & 18 deletions src/use_rabbitmq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import logging
import os
import threading
import time
from typing import Callable, Optional, Union
from typing import Callable, Optional, Union, Any, Dict

import amqpstorm
from amqpstorm import Message
from amqpstorm.exception import AMQPConnectionError, AMQPChannelError

logger = logging.getLogger(__name__)


class RabbitMQStore:
"""
RabbitMQ消息队列存储和消费类。
该类提供了与RabbitMQ交互的各种方法,包括连接、声明队列、发送消息、获取消息数量和消费消息等。
它还包含了重试机制和异常处理,以确保连接的可靠性和消息的正确传递。
"""

MAX_SEND_ATTEMPTS: int = 6 # 最大发送重试次数
MAX_CONNECTION_ATTEMPTS: float = float("inf") # 最大连接重试次数
MAX_CONNECTION_DELAY: int = 2 ** 5 # 最大延迟时间
Expand All @@ -33,19 +43,19 @@ def __init__(
:param kwargs: RabbitMQ parameters
"""
self.__shutdown = False
self.parameters = {
"hostname": host or "localhost",
"port": port or 5672,
"username": username or "guest",
"password": password or "guest",
self.parameters: Dict[str, Any] = {
"hostname": host or os.environ.get("RABBITMQ_HOST", "localhost"),
"port": port or int(os.environ.get("RABBITMQ_PORT", 5672)),
"username": username or os.environ.get("RABBITMQ_USER", "guest"),
"password": password or os.environ.get("RABBITMQ_PASSWORD", "guest"),
}
if kwargs:
self.parameters.update(kwargs)
self.confirm_delivery = confirm_delivery
self._connection = None
self._channel = None
self._connection: Optional[amqpstorm.Connection] = None
self._channel: Optional[amqpstorm.Channel] = None

def _create_connection(self):
def _create_connection(self) -> amqpstorm.Connection:
attempts = 1
reconnection_delay = self.RECONNECTION_DELAY
while attempts <= self.MAX_CONNECTION_ATTEMPTS:
Expand Down Expand Up @@ -76,7 +86,7 @@ def connection(self) -> amqpstorm.Connection:
return self._connection

@connection.deleter
def connection(self):
def connection(self) -> None:
del self.channel
if self._connection:
if self._connection.is_open:
Expand Down Expand Up @@ -168,9 +178,10 @@ def start_consuming(
)
self.channel.start_consuming(to_tuple=False)
except AMQPChannelError as exc:
logger.error(f"RabbitmqStore channel error: {exc}")
raise exc
except AMQPConnectionError as exc:
logger.warning(
logger.error(
f"RabbitmqStore consume connection error<{exc}> reconnecting..."
)
del self.connection
Expand All @@ -197,9 +208,15 @@ def __del__(self):
def listener(self, queue_name: str, no_ack: bool = False, **kwargs):
self.declare_queue(queue_name)

def wrapper(callback):
def wrapper(callback: Callable[[Message], Any]):
logger.info(f"RabbitMQStore consume {queue_name}")
self.start_consuming(queue_name, callback, no_ack=no_ack, **kwargs)

def target():
self.start_consuming(queue_name, callback, no_ack=no_ack, **kwargs)

thread = threading.Thread(target=target)
thread.start()
return thread

return wrapper

Expand All @@ -208,10 +225,19 @@ def stop_listener(self, queue_name: str):
self.shutdown()


useRabbitMQ = RabbitMQStore
class RabbitListener:
def __init__(self, instance: RabbitMQStore, *, queue_name: str, no_ack: bool = False, **kwargs):
self.instance = instance
self.queue_name = queue_name
self.no_ack = no_ack
self.kwargs = kwargs

def __call__(self, callback: Callable[[amqpstorm.Message], None]):
listener = self.instance.listener(self.queue_name, self.no_ack, **self.kwargs)
return listener(callback)


def useRabbitListener(
instance: RabbitMQStore, *, queue_name: str, no_ack: bool = False, **kwargs
):
return instance.listener(queue_name, no_ack=no_ack, **kwargs)
# alias

useRabbitMQ = RabbitMQStore
useRabbitListener = RabbitListener
10 changes: 7 additions & 3 deletions tests/test_rabbitmq.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import os

import pytest

from use_rabbitmq import useRabbitMQ
from use_rabbitmq import useRabbitMQ, RabbitListener

os.environ.setdefault("RABBITMQ_PASSWORD", "admin")


@pytest.fixture
def rabbitmq():
return useRabbitMQ(host="localhost", port=5672, username="admin", password="admin")
return useRabbitMQ(host="localhost", port=5672, username="admin")


def test_rabbitmq_connection(rabbitmq):
Expand Down Expand Up @@ -60,7 +64,7 @@ def test_useRabbitListener(rabbitmq):
queue_name = "test_queue"
assert rabbitmq.send(queue_name=queue_name, message="789") == "789"

@rabbitmq.listener(queue_name=queue_name)
@RabbitListener(rabbitmq, queue_name=queue_name)
def callback(message):
assert message.body == "789"
rabbitmq.stop_listener(queue_name)

0 comments on commit 253fe28

Please sign in to comment.