Skip to content

Commit

Permalink
Merge pull request #78 from epam/develop
Browse files Browse the repository at this point in the history
6.0.1b1
  • Loading branch information
bohdan-onsha authored Sep 11, 2024
2 parents 9337b80 + 7a75915 commit ea80292
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 7 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [6.0.1b1] - 2024-09-11
- fix rabbitmq bug when a wrong message was consumed aster pushing

## [6.0.0] - 2024-08-20
- Split `SIEM_DEFECT_DOJO` Parent type into:
- CUSTODIAN_SIEM_DEFECT_DOJO
Expand Down
28 changes: 22 additions & 6 deletions modular_sdk/connections/rabbit_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,28 @@ def publish_sync(self, message, routing_key, correlation_id,
)
_LOG.info('Message pushed')

def consume_sync(self, queue, correlation_id):

def _consumer_callback(ch, method, props, body):
self.responses[props.correlation_id] = body
ch.basic_ack(delivery_tag=method.delivery_tag)
ch.stop_consuming(props.correlation_id)
def consume_sync(self, queue: str, correlation_id: str) -> bytes | None:

def _consumer_callback(
ch: pika.adapters.blocking_connection.BlockingChannel,
method: pika.spec.Basic.Deliver,
props: pika.spec.BasicProperties,
body: bytes,
) -> None:
if props.correlation_id == correlation_id:
_LOG.debug(
f'Message retrieved successfully with ID: '
f'{props.correlation_id}'
)
self.responses[props.correlation_id] = body
ch.basic_ack(delivery_tag=method.delivery_tag)
ch.stop_consuming()
else:
_LOG.warning(
f'Received message with mismatched Correlation ID:'
f'{props.correlation_id} (expected: {correlation_id})'
)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

def _close_on_timeout():
_LOG.warn('Timeout exceeded. Close connection')
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "modular_sdk"
version = "6.0.0"
version = "6.0.1b1"
authors = [
{name = "EPAM Systems", email = "support@syndicate.team"}
]
Expand Down

0 comments on commit ea80292

Please sign in to comment.