From 9937b616f4e83bcd28efd0c2cdbc98b5456d50fd Mon Sep 17 00:00:00 2001 From: Pavel Lonkin Date: Thu, 4 Mar 2021 14:19:33 +0300 Subject: [PATCH] LITE-17486 Use shared rabbit mq connection only for sync operation --- dj_cqrs/transport/base.py | 3 +-- dj_cqrs/transport/rabbit_mq.py | 45 ++++++++++++++++++++++------------ travis_integration_tests.sh | 2 +- 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/dj_cqrs/transport/base.py b/dj_cqrs/transport/base.py index 138175b..eba7aa1 100644 --- a/dj_cqrs/transport/base.py +++ b/dj_cqrs/transport/base.py @@ -21,8 +21,7 @@ def produce(payload): """ Send data from master model to replicas. - :param payload: Transport payload from master model. - :type payload: dj_cqrs.dataclasses.TransportPayload + :param dj_cqrs.dataclasses.TransportPayload payload: Transport payload from master model. """ raise NotImplementedError diff --git a/dj_cqrs/transport/rabbit_mq.py b/dj_cqrs/transport/rabbit_mq.py index ad5df35..0506c72 100644 --- a/dj_cqrs/transport/rabbit_mq.py +++ b/dj_cqrs/transport/rabbit_mq.py @@ -86,7 +86,7 @@ def _produce(cls, payload): rmq_settings = cls._get_common_settings() exchange = rmq_settings[-1] # Decided not to create context-manager to stay within the class - _, channel = cls._get_producer_rmq_objects(*rmq_settings) + _, channel = cls._get_producer_rmq_objects(*rmq_settings, signal_type=payload.signal_type) cls._produce_message(channel, exchange, payload) cls.log_produced(payload) @@ -183,23 +183,36 @@ def _get_consumer_rmq_objects(cls, host, port, creds, exchange, queue_name, pref return connection, channel @classmethod - def _get_producer_rmq_objects(cls, host, port, creds, exchange): - if cls._producer_connection is None: - connection = BlockingConnection( - ConnectionParameters( - host=host, - port=port, - credentials=creds, - blocked_connection_timeout=10, - ), - ) - channel = connection.channel() - cls._declare_exchange(channel, exchange) + def _get_producer_rmq_objects(cls, host, port, creds, exchange, signal_type=None): + """ + Use shared connection in case of sync mode, otherwise create new connection for each + message + """ + if signal_type == SignalType.SYNC: + if cls._producer_connection is None: + connection, channel = cls._create_connection(host, port, creds, exchange) + + cls._producer_connection = connection + cls._producer_channel = channel + + return cls._producer_connection, cls._producer_channel + else: + return cls._create_connection(host, port, creds, exchange) - cls._producer_connection = connection - cls._producer_channel = channel + @classmethod + def _create_connection(cls, host, port, creds, exchange): + connection = BlockingConnection( + ConnectionParameters( + host=host, + port=port, + credentials=creds, + blocked_connection_timeout=10, + ), + ) + channel = connection.channel() + cls._declare_exchange(channel, exchange) - return cls._producer_connection, cls._producer_channel + return connection, channel @staticmethod def _declare_exchange(channel, exchange): diff --git a/travis_integration_tests.sh b/travis_integration_tests.sh index 179fdd2..430e64e 100755 --- a/travis_integration_tests.sh +++ b/travis_integration_tests.sh @@ -5,7 +5,7 @@ set -e if [ "$INTEGRATION_TESTS" == "yes" ]; then echo "Running integration tests....." echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin - cd integration_tests + cd integration_tests docker-compose build docker-compose run master docker-compose down --remove-orphans