diff --git a/dj_cqrs/transport/base.py b/dj_cqrs/transport/base.py index 138175b..eba7aa1 100644 --- a/dj_cqrs/transport/base.py +++ b/dj_cqrs/transport/base.py @@ -21,8 +21,7 @@ def produce(payload): """ Send data from master model to replicas. - :param payload: Transport payload from master model. - :type payload: dj_cqrs.dataclasses.TransportPayload + :param dj_cqrs.dataclasses.TransportPayload payload: Transport payload from master model. """ raise NotImplementedError diff --git a/dj_cqrs/transport/rabbit_mq.py b/dj_cqrs/transport/rabbit_mq.py index ad5df35..0506c72 100644 --- a/dj_cqrs/transport/rabbit_mq.py +++ b/dj_cqrs/transport/rabbit_mq.py @@ -86,7 +86,7 @@ def _produce(cls, payload): rmq_settings = cls._get_common_settings() exchange = rmq_settings[-1] # Decided not to create context-manager to stay within the class - _, channel = cls._get_producer_rmq_objects(*rmq_settings) + _, channel = cls._get_producer_rmq_objects(*rmq_settings, signal_type=payload.signal_type) cls._produce_message(channel, exchange, payload) cls.log_produced(payload) @@ -183,23 +183,36 @@ def _get_consumer_rmq_objects(cls, host, port, creds, exchange, queue_name, pref return connection, channel @classmethod - def _get_producer_rmq_objects(cls, host, port, creds, exchange): - if cls._producer_connection is None: - connection = BlockingConnection( - ConnectionParameters( - host=host, - port=port, - credentials=creds, - blocked_connection_timeout=10, - ), - ) - channel = connection.channel() - cls._declare_exchange(channel, exchange) + def _get_producer_rmq_objects(cls, host, port, creds, exchange, signal_type=None): + """ + Use shared connection in case of sync mode, otherwise create new connection for each + message + """ + if signal_type == SignalType.SYNC: + if cls._producer_connection is None: + connection, channel = cls._create_connection(host, port, creds, exchange) + + cls._producer_connection = connection + cls._producer_channel = channel + + return cls._producer_connection, cls._producer_channel + else: + return cls._create_connection(host, port, creds, exchange) - cls._producer_connection = connection - cls._producer_channel = channel + @classmethod + def _create_connection(cls, host, port, creds, exchange): + connection = BlockingConnection( + ConnectionParameters( + host=host, + port=port, + credentials=creds, + blocked_connection_timeout=10, + ), + ) + channel = connection.channel() + cls._declare_exchange(channel, exchange) - return cls._producer_connection, cls._producer_channel + return connection, channel @staticmethod def _declare_exchange(channel, exchange): diff --git a/travis_integration_tests.sh b/travis_integration_tests.sh index 179fdd2..430e64e 100755 --- a/travis_integration_tests.sh +++ b/travis_integration_tests.sh @@ -5,7 +5,7 @@ set -e if [ "$INTEGRATION_TESTS" == "yes" ]; then echo "Running integration tests....." echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin - cd integration_tests + cd integration_tests docker-compose build docker-compose run master docker-compose down --remove-orphans