Skip to content

Commit

Permalink
Merge pull request #81 from epam/develop
Browse files Browse the repository at this point in the history
Feature/update pika to 132 v t6199 (#76)
  • Loading branch information
bohdan-onsha authored Sep 23, 2024
2 parents 6b3334a + 0900658 commit 9df6a15
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 78 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [6.1.0] - 2024-09-13
- Update `pika` library from version `1.0.0b1` to `1.3.2`
- Update `RabbitMqConnection` class to support `pika` version `1.3.2`
- Replace `uuid1` with `uuid4`
- Up lib version to `pynamodb==5.5.1` and `dynamodb-json~=1.4.2`

## [6.0.1b2] - 2024-09-12
- Dump lib version to `pynamodb==5.3.2` and `dynamodb-json~=1.3`

Expand Down
186 changes: 116 additions & 70 deletions modular_sdk/connections/rabbit_connection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pika
import pika.exceptions

from modular_sdk.commons import ModularException
from modular_sdk.commons.log_helper import get_logger
Expand All @@ -8,8 +9,7 @@


class RabbitMqConnection:
def __init__(self, connection_url,
timeout):
def __init__(self, connection_url: str, timeout: int):
self.connection_url = connection_url
self.timeout = timeout or RABBIT_DEFAULT_RESPONSE_TIMEOUT
self.responses = {}
Expand All @@ -20,63 +20,108 @@ def _open_channel(self):
self.conn = pika.BlockingConnection(parameters)
return self.conn.channel()
except pika.exceptions.AMQPConnectionError as e:
_LOG.error('Connection to RabbitMQ refused. Bad credentials.')
error_msg = str(e) or "Bad credentials"
_LOG.error(f'Connection to RabbitMQ refused: {error_msg}')
raise ModularException(
code=502,
content='Connection to RabbitMQ refused. Bad credentials.'
code=502, content=f'Connection to RabbitMQ refused: {error_msg}'
)

def _close(self):
if self.conn.is_open:
self.conn.close()

def publish(self, message, routing_key, exchange='', headers=None,
content_type=None):
try:
if self.conn.is_open:
self.conn.close()
except Exception as e:
_LOG.error(f"Failed to close RabbitMQ connection: {e}")

def publish(
self,
message: str,
routing_key: str,
exchange: str = '',
headers: dict = None,
content_type: str = None,
) -> None:
_LOG.debug(f'Request queue: {routing_key}')
channel = self._open_channel()
channel.confirm_delivery()
response = channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
properties=pika.BasicProperties(headers=headers,
content_type=content_type),
body=message,
mandatory=True)
self._close()
if not response:
_LOG.error(f'Message was not sent: routing_key={routing_key}, '
f'exchange={exchange}, content_type={content_type}')
raise ModularException(
code=504,
content='Message was not sent. Check RabbitMQ configuration'
)
_LOG.info('Message pushed')

def publish_sync(self, message, routing_key, correlation_id,
callback_queue, exchange='', headers=None,
content_type=None):
_LOG.debug(f'Request queue: {routing_key}; '
f'Response queue: {callback_queue}')
try:
channel.confirm_delivery()
if not self.__basic_publish(
channel=channel,
exchange=exchange,
routing_key=routing_key,
properties=pika.BasicProperties(
headers=headers, content_type=content_type,
),
body=message,
mandatory=True,
):
_LOG.error(
f'Message was not sent: routing_key={routing_key}, '
f'exchange={exchange}, content_type={content_type}'
)
raise ModularException(
code=504,
content='Message was not sent. Check RabbitMQ configuration'
)
_LOG.info('Message pushed')
finally:
self._close()

@staticmethod
def __basic_publish(
channel: pika.adapters.blocking_connection.BlockingChannel,
**kwargs,
) -> bool:
try:
channel.basic_publish(**kwargs)
return True
except (pika.exceptions.NackError, pika.exceptions.UnroutableError):
_LOG.exception('Pika exception occurred')
return False

def publish_sync(
self,
message: str,
routing_key: str,
correlation_id: str,
callback_queue: str,
exchange: str = '',
headers: dict = None,
content_type: str = None,
) -> None:
_LOG.debug(
f'Request queue: {routing_key}; Response queue: {callback_queue}'
)
channel = self._open_channel()
channel.confirm_delivery()
response = channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
properties=pika.BasicProperties(headers=headers,
reply_to=callback_queue,
correlation_id=correlation_id,
content_type=content_type),
body=message)
if not response:
_LOG.error(f'Message was not sent: routing_key={routing_key}, '
f'correlation_id={correlation_id}, '
f'callback_queue={callback_queue}, '
f'exchange={exchange}, content_type={content_type}')
raise ModularException(
code=504,
content='Message was not sent. Check RabbitMQ configuration'
try:
channel.confirm_delivery()
properties = pika.BasicProperties(
headers=headers,
reply_to=callback_queue,
correlation_id=correlation_id,
content_type=content_type,
)
_LOG.info('Message pushed')
if not self.__basic_publish(
channel=channel,
exchange=exchange,
routing_key=routing_key,
properties=properties,
body=message,
):
error_msg = (
f"Message was not sent: routing_key={routing_key}, "
f"correlation_id={correlation_id}, "
f"callback_queue={callback_queue}, "
f"exchange={exchange}, content_type={content_type}"
)
_LOG.error(error_msg)
raise ModularException(
code=504,
content='Message was not sent. Check RabbitMQ configuration'
)
_LOG.info('Message pushed')
finally:
self._close()

def consume_sync(self, queue: str, correlation_id: str) -> bytes | None:

Expand All @@ -102,36 +147,37 @@ def _consumer_callback(
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

def _close_on_timeout():
_LOG.warn('Timeout exceeded. Close connection')
self.conn.close()
_LOG.warning('Timeout exceeded. Close connection')
self._close()

channel = self._open_channel()
if channel.basic_consume(queue=queue,
on_message_callback=_consumer_callback,
consumer_tag=correlation_id):
_LOG.debug('Waiting for message. Queue: {0}, Correlation id: {1}'
.format(queue, correlation_id))
if channel.basic_consume(
queue=queue,
on_message_callback=_consumer_callback,
consumer_tag=correlation_id,
):
_LOG.debug(
f'Waiting for message. Queue: {queue}, '
f'Correlation id: {correlation_id}'
)
else:
_LOG.error('Failed to consume. Queue: {0}'.format(queue))
_LOG.error(f"Failed to consume message from queue '{queue}'")
return None

self.conn.add_timeout(self.timeout, _close_on_timeout)
self.conn.call_later(self.timeout, _close_on_timeout)

# blocking method
channel.start_consuming()
self._close()

if correlation_id in list(self.responses.keys()):
response = self.responses.pop(correlation_id)
_LOG.debug('Response received')
response = self.responses.pop(correlation_id, None)
if response:
_LOG.debug('Response successfully received and processed')
return response
else:
_LOG.error('Response was not received. '
'Timeout: {0} seconds. '
.format(self.timeout))
return None
_LOG.error(f"Response wasn't received. Timeout: {self.timeout} seconds")
return None

def check_queue_exists(self, queue_name):
def check_queue_exists(self, queue_name: str) -> bool:
channel = self._open_channel()
try:
channel.queue_declare(queue=queue_name, durable=True, passive=True)
Expand All @@ -141,7 +187,7 @@ def check_queue_exists(self, queue_name):
self._close()
return True

def declare_queue(self, queue_name):
def declare_queue(self, queue_name: str) -> None:
channel = self._open_channel()
declare_resp = channel.queue_declare(queue=queue_name, durable=True)
_LOG.info('Queue declaration response: {0}'.format(declare_resp))
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def post_process_request(self, response):

@staticmethod
def _generate_id():
return str(uuid.uuid1())
return str(uuid.uuid4())

@staticmethod
def _decrypt(secret_key, data):
Expand Down
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "modular_sdk"
version = "6.0.1b2"
version = "6.1.0"
authors = [
{name = "EPAM Systems", email = "support@syndicate.team"}
]
Expand All @@ -19,14 +19,14 @@ classifiers = [
"Operating System :: OS Independent"
]
dependencies = [
"pika==1.0.0b1",
"pynamodb~=5.3.2",
"pika==1.3.2",
"pynamodb>=5.5.1,<6",
"boto3>=1.26.80,<1.35",
"botocore>=1.29.80,<1.35",
"pymongo~=4.5.0",
"python-dateutil>=2.8.2,<3.0",
"cachetools~=5.4.0",
"dynamodb-json~=1.3",
"dynamodb-json~=1.4.2",
"aws-xray-sdk~=2.14.0"
]

Expand Down
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
pika==1.0.0b1
pynamodb==5.3.2
pika==1.3.2
pynamodb>=5.5.1,<6
boto3>=1.26.80,<1.35
botocore>=1.29.80,<1.35
pymongo~=4.5.0
python-dateutil>=2.8.2,<3.0
cachetools~=5.4.0
dynamodb-json~=1.3
dynamodb-json~=1.4.2
aws-xray-sdk~=2.14.0
cryptography~=41.0.7

0 comments on commit 9df6a15

Please sign in to comment.