Skip to content

Commit 1b73302

Browse files
authored
Merge pull request #13 from d3rky/feature/LITE-17357-shared-rabbit-connection-on-produce
LITE-17357 Add shared connection to RabbitMQ transport for producer
2 parents 10679d5 + 2fdd3dc commit 1b73302

File tree

8 files changed

+67
-28
lines changed

8 files changed

+67
-28
lines changed

dj_cqrs/transport/base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,8 @@ def produce(payload):
3030
def consume(*args, **kwargs):
3131
"""Receive data from master model."""
3232
raise NotImplementedError
33+
34+
@staticmethod
35+
def clean_connection(*args, **kwargs):
36+
"""Clean transport connection. Here you can close all connections that you have"""
37+
raise NotImplementedError

dj_cqrs/transport/kombu.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ def get_consumers(self, Consumer, channel):
6969
class KombuTransport(LoggingMixin, BaseTransport):
7070
CONSUMER_RETRY_TIMEOUT = 5
7171

72+
@classmethod
73+
def clean_connection(cls):
74+
"""Nothing to do here"""
75+
pass
76+
7277
@classmethod
7378
def consume(cls):
7479
queue_name, prefetch_count = cls._get_consumer_settings()

dj_cqrs/transport/rabbit_mq.py

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from socket import gaierror
77
from urllib.parse import unquote, urlparse
88

9-
109
import ujson
1110
from django.conf import settings
1211
from pika import exceptions, BasicProperties, BlockingConnection, ConnectionParameters, credentials
@@ -24,6 +23,16 @@
2423
class RabbitMQTransport(LoggingMixin, BaseTransport):
2524
CONSUMER_RETRY_TIMEOUT = 5
2625

26+
_producer_connection = None
27+
_producer_channel = None
28+
29+
@classmethod
30+
def clean_connection(cls):
31+
if cls._producer_connection and not cls._producer_connection.is_closed:
32+
cls._producer_connection.close()
33+
cls._producer_connection = None
34+
cls._producer_channel = None
35+
2736
@classmethod
2837
def consume(cls):
2938
consumer_rabbit_settings = cls._get_consumer_settings()
@@ -43,28 +52,29 @@ def consume(cls):
4352
logger.error('AMQP connection error. Reconnecting...')
4453
time.sleep(cls.CONSUMER_RETRY_TIMEOUT)
4554
finally:
46-
if connection:
55+
if connection and not connection.is_closed:
4756
connection.close()
4857

4958
@classmethod
5059
def produce(cls, payload):
60+
# TODO: try to produce and reconnect several times, now leave as before
61+
# if cannot publish message - drop it and try to reconnect on next event
5162
rmq_settings = cls._get_common_settings()
5263
exchange = rmq_settings[-1]
5364

54-
connection = None
5565
try:
5666
# Decided not to create context-manager to stay within the class
57-
connection, channel = cls._get_producer_rmq_objects(*rmq_settings)
67+
_, channel = cls._get_producer_rmq_objects(*rmq_settings)
5868

5969
cls._produce_message(channel, exchange, payload)
6070
cls.log_produced(payload)
6171
except (exceptions.AMQPError, exceptions.ChannelError, exceptions.ReentrancyError):
6272
logger.error("CQRS couldn't be published: pk = {} ({}).".format(
6373
payload.pk, payload.cqrs_id,
6474
))
65-
finally:
66-
if connection:
67-
connection.close()
75+
76+
# in case of any error - close connection and try to reconnect
77+
cls.clean_connection()
6878

6979
@classmethod
7080
def _consume_message(cls, ch, method, properties, body):
@@ -114,7 +124,7 @@ def _produce_message(cls, channel, exchange, payload):
114124
properties=BasicProperties(
115125
content_type='text/plain',
116126
delivery_mode=2, # make message persistent
117-
expiration='60000', # milliseconds
127+
expiration=settings.CQRS.get('MESSAGE_TTL', '60000'), # milliseconds
118128
)
119129
)
120130

@@ -159,18 +169,22 @@ def _get_consumer_rmq_objects(cls, host, port, creds, exchange, queue_name, pref
159169

160170
@classmethod
161171
def _get_producer_rmq_objects(cls, host, port, creds, exchange):
162-
connection = BlockingConnection(
163-
ConnectionParameters(
164-
host=host,
165-
port=port,
166-
credentials=creds,
167-
blocked_connection_timeout=10,
168-
),
169-
)
170-
channel = connection.channel()
171-
cls._declare_exchange(channel, exchange)
172+
if cls._producer_connection is None:
173+
connection = BlockingConnection(
174+
ConnectionParameters(
175+
host=host,
176+
port=port,
177+
credentials=creds,
178+
blocked_connection_timeout=10,
179+
),
180+
)
181+
channel = connection.channel()
182+
cls._declare_exchange(channel, exchange)
172183

173-
return connection, channel
184+
cls._producer_connection = connection
185+
cls._producer_channel = channel
186+
187+
return cls._producer_connection, cls._producer_channel
174188

175189
@staticmethod
176190
def _declare_exchange(channel, exchange):

integration_tests/tests/conftest.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
from integration_tests.tests.utils import REPLICA_TABLES
77

8+
from dj_cqrs.transport import current_transport
9+
810

911
@pytest.fixture
1012
def replica_cursor():
@@ -21,3 +23,10 @@ def replica_cursor():
2123

2224
cursor.close()
2325
connection.close()
26+
27+
28+
@pytest.fixture
29+
def clean_rabbit_transport_connection():
30+
current_transport.clean_connection()
31+
32+
yield

integration_tests/tests/test_asynchronous_consuming.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010

1111

1212
@pytest.mark.django_db(transaction=True)
13-
def test_both_consumers_consume(replica_cursor):
13+
def test_both_consumers_consume(settings, replica_cursor, clean_rabbit_transport_connection):
14+
settings.CQRS['MESSAGE_TTL'] = '4000'
1415
assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 0
1516
assert count_replica_rows(replica_cursor, REPLICA_EVENT_TABLE) == 0
1617

@@ -23,7 +24,7 @@ def test_both_consumers_consume(replica_cursor):
2324
])
2425
BasicFieldsModel.call_post_bulk_create(master_instances)
2526

26-
transport_delay(3)
27+
transport_delay(5)
2728
assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 9
2829
assert count_replica_rows(replica_cursor, REPLICA_EVENT_TABLE) == 9
2930

@@ -32,13 +33,18 @@ def test_both_consumers_consume(replica_cursor):
3233

3334

3435
@pytest.mark.django_db(transaction=True)
35-
def test_de_duplication(replica_cursor):
36+
def test_de_duplication(settings, replica_cursor, clean_rabbit_transport_connection):
37+
settings.CQRS['MESSAGE_TTL'] = '4000'
3638
assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 0
3739
assert count_replica_rows(replica_cursor, REPLICA_EVENT_TABLE) == 0
3840

39-
master_instance = BasicFieldsModel.objects.create(int_field=1, char_field='text')
40-
BasicFieldsModel.call_post_bulk_create([master_instance for _ in range(9)])
41+
master_instance = BasicFieldsModel.objects.create(int_field=21, char_field='text')
42+
BasicFieldsModel.call_post_bulk_create([master_instance])
43+
transport_delay(5)
4144

42-
transport_delay(3)
45+
replica_cursor.execute('TRUNCATE TABLE {};'.format(REPLICA_EVENT_TABLE))
46+
BasicFieldsModel.call_post_bulk_create([master_instance for _ in range(10)])
47+
48+
transport_delay(5)
4349
assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 1
4450
assert count_replica_rows(replica_cursor, REPLICA_EVENT_TABLE) == 10

integration_tests/tests/test_bulk_operations.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010

1111
@pytest.mark.django_db(transaction=True)
12-
def test_flow(replica_cursor):
12+
def test_flow(replica_cursor, clean_rabbit_transport_connection):
1313
assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 0
1414

1515
# Create

integration_tests/tests/test_single_basic_instance.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111

1212
@pytest.mark.django_db(transaction=True)
13-
def test_flow(replica_cursor):
13+
def test_flow(replica_cursor, clean_rabbit_transport_connection):
1414
assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 0
1515

1616
# Create

integration_tests/tests/test_sync_to_a_certain_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010

1111
@pytest.mark.django_db(transaction=True)
12-
def test_flow(replica_cursor, mocker):
12+
def test_flow(replica_cursor, mocker, clean_rabbit_transport_connection):
1313
assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 0
1414

1515
# Create

0 commit comments

Comments
 (0)