Skip to content

Commit 10679d5

Browse files
authored
Merge pull request #12 from d3rky/bugfix/LITE-17358-nack-created-messages
LITE-17358 nack with requeueing, add expiration for messages to 60s
2 parents 6c4bbec + 6ad4ec7 commit 10679d5

File tree

2 files changed

+7
-3
lines changed

2 files changed

+7
-3
lines changed

dj_cqrs/transport/mixins.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def log_consumed_denied(payload):
2828
:param dj_cqrs.dataclasses.TransportPayload payload: Transport payload from master model.
2929
"""
3030
if payload.pk:
31-
logger.info('CQRS is denied: pk = {} ({}).'.format(payload.pk, payload.cqrs_id))
31+
logger.warning('CQRS is denied: pk = {} ({}).'.format(payload.pk, payload.cqrs_id))
3232

3333
@staticmethod
3434
def log_produced(payload):

dj_cqrs/transport/rabbit_mq.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def _consume_message(cls, ch, method, properties, body):
9999
ch.basic_ack(delivery_tag=method.delivery_tag)
100100
cls.log_consumed_accepted(payload)
101101
else:
102-
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
102+
ch.basic_nack(delivery_tag=method.delivery_tag)
103103
cls.log_consumed_denied(payload)
104104

105105
@classmethod
@@ -111,7 +111,11 @@ def _produce_message(cls, channel, exchange, payload):
111111
routing_key=routing_key,
112112
body=ujson.dumps(payload.to_dict()),
113113
mandatory=True,
114-
properties=BasicProperties(content_type='text/plain', delivery_mode=2)
114+
properties=BasicProperties(
115+
content_type='text/plain',
116+
delivery_mode=2, # make message persistent
117+
expiration='60000', # milliseconds
118+
)
115119
)
116120

117121
@staticmethod

0 commit comments

Comments
 (0)