Skip to content

Commit fdacb0a

Browse files
authored
Merge pull request #19 from cloudblue/bugfix/LITE-17486-shared-only-for-sync
LITE-17486 Use shared rabbit mq connection only for sync operation
2 parents 16c7d80 + 9937b61 commit fdacb0a

File tree

3 files changed

+31
-19
lines changed

3 files changed

+31
-19
lines changed

dj_cqrs/transport/base.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ def produce(payload):
2121
"""
2222
Send data from master model to replicas.
2323
24-
:param payload: Transport payload from master model.
25-
:type payload: dj_cqrs.dataclasses.TransportPayload
24+
:param dj_cqrs.dataclasses.TransportPayload payload: Transport payload from master model.
2625
"""
2726
raise NotImplementedError
2827

dj_cqrs/transport/rabbit_mq.py

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def _produce(cls, payload):
8686
rmq_settings = cls._get_common_settings()
8787
exchange = rmq_settings[-1]
8888
# Decided not to create context-manager to stay within the class
89-
_, channel = cls._get_producer_rmq_objects(*rmq_settings)
89+
_, channel = cls._get_producer_rmq_objects(*rmq_settings, signal_type=payload.signal_type)
9090

9191
cls._produce_message(channel, exchange, payload)
9292
cls.log_produced(payload)
@@ -183,23 +183,36 @@ def _get_consumer_rmq_objects(cls, host, port, creds, exchange, queue_name, pref
183183
return connection, channel
184184

185185
@classmethod
186-
def _get_producer_rmq_objects(cls, host, port, creds, exchange):
187-
if cls._producer_connection is None:
188-
connection = BlockingConnection(
189-
ConnectionParameters(
190-
host=host,
191-
port=port,
192-
credentials=creds,
193-
blocked_connection_timeout=10,
194-
),
195-
)
196-
channel = connection.channel()
197-
cls._declare_exchange(channel, exchange)
186+
def _get_producer_rmq_objects(cls, host, port, creds, exchange, signal_type=None):
187+
"""
188+
Use shared connection in case of sync mode, otherwise create new connection for each
189+
message
190+
"""
191+
if signal_type == SignalType.SYNC:
192+
if cls._producer_connection is None:
193+
connection, channel = cls._create_connection(host, port, creds, exchange)
194+
195+
cls._producer_connection = connection
196+
cls._producer_channel = channel
197+
198+
return cls._producer_connection, cls._producer_channel
199+
else:
200+
return cls._create_connection(host, port, creds, exchange)
198201

199-
cls._producer_connection = connection
200-
cls._producer_channel = channel
202+
@classmethod
203+
def _create_connection(cls, host, port, creds, exchange):
204+
connection = BlockingConnection(
205+
ConnectionParameters(
206+
host=host,
207+
port=port,
208+
credentials=creds,
209+
blocked_connection_timeout=10,
210+
),
211+
)
212+
channel = connection.channel()
213+
cls._declare_exchange(channel, exchange)
201214

202-
return cls._producer_connection, cls._producer_channel
215+
return connection, channel
203216

204217
@staticmethod
205218
def _declare_exchange(channel, exchange):

travis_integration_tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ set -e
55
if [ "$INTEGRATION_TESTS" == "yes" ]; then
66
echo "Running integration tests....."
77
echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin
8-
cd integration_tests
8+
cd integration_tests
99
docker-compose build
1010
docker-compose run master
1111
docker-compose down --remove-orphans

0 commit comments

Comments
 (0)