Skip to content

Commit 51f81e1

Browse files
authored
Merge pull request #7 from maxipavlovic/feature/LITE-16458
LITE-16458 Now CQRS won't send duplicate messages for the same instance in transaction
2 parents 31981c9 + 85b63d4 commit 51f81e1

File tree

5 files changed

+165
-23
lines changed

5 files changed

+165
-23
lines changed

README.md

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -109,24 +109,10 @@ CQRS = {
109109
Notes
110110
-----
111111

112-
When there are master models with related entities in CQRS_SERIALIZER, it's important to have operations within atomic transactions.
113-
CQRS sync will happen on transaction commit. Please, avoid saving master model within transaction more then once to reduce syncing and potential racing on replica side.
114-
Updating of related model won't trigger CQRS automatic synchronization for master model. This needs to be done manually.
115-
116-
Example:
117-
```python
118-
with transaction.atomic():
119-
publisher = models.Publisher.objects.create(id=1, name='publisher')
120-
author = models.Author.objects.create(id=1, name='author', publisher=publisher)
121-
122-
with transaction.atomic():
123-
publisher.name = 'new'
124-
publisher.save()
125-
126-
author.save()
127-
```
128-
129-
When only needed instances need to be synchronized, there is a method `is_sync_instance` to set filtering rule.
112+
* When there are master models with related entities in CQRS_SERIALIZER, it's important to have operations within atomic transactions. CQRS sync will happen on transaction commit.
113+
* Please, avoid saving different instances of the same entity within transaction to reduce syncing and potential racing on replica side.
114+
* Updating of related model won't trigger CQRS automatic synchronization for master model. This needs to be done manually.
115+
* When only needed instances need to be synchronized, there is a method `is_sync_instance` to set filtering rule.
130116
It's important to understand, that CQRS counting works even without syncing and rule is applied every time model is updated.
131117

132118
Example:

dj_cqrs/mixins.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Copyright © 2020 Ingram Micro Inc. All rights reserved.
22

3+
from django.db import router, transaction
34
from django.db.models import DateField, DateTimeField, F, IntegerField, Manager, Model
45
from django.db.models.expressions import CombinedExpression
56
from django.utils.module_loading import import_string
@@ -64,10 +65,39 @@ class directly.**"""
6465
class Meta:
6566
abstract = True
6667

68+
@property
69+
def cqrs_saves_count(self):
70+
"""Shows how many times this instance has been saved within the transaction."""
71+
return getattr(self, '_cqrs_saves_count', 0)
72+
73+
@property
74+
def is_initial_cqrs_save(self):
75+
"""This flag is used to check if instance has already been registered for CQRS update."""
76+
return self.cqrs_saves_count < 2
77+
78+
def reset_cqrs_saves_count(self):
79+
"""This method is used to automatically reset instance CQRS counters on transaction commit.
80+
But this can also be used to control custom behaviour within transaction
81+
or in case of rollback,
82+
when several sequential transactions are used to change the same instance.
83+
"""
84+
if hasattr(self, '_cqrs_saves_count'):
85+
self._cqrs_saves_count = 0
86+
6787
def save(self, *args, **kwargs):
68-
if not self._state.adding:
88+
using = kwargs.get('using') or router.db_for_write(self.__class__, instance=self)
89+
connection = transaction.get_connection(using)
90+
if connection.in_atomic_block:
91+
_cqrs_saves_count = self.cqrs_saves_count
92+
self._cqrs_saves_count = _cqrs_saves_count + 1
93+
else:
94+
self.reset_cqrs_saves_count()
95+
96+
if self.is_initial_cqrs_save and (not self._state.adding):
6997
self.cqrs_revision = F('cqrs_revision') + 1
98+
7099
self._save_tracked_fields()
100+
71101
return super(RawMasterMixin, self).save(*args, **kwargs)
72102

73103
def _save_tracked_fields(self):

dj_cqrs/signals.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ def post_save(cls, sender, **kwargs):
5656
sync = kwargs.get('sync', False)
5757
queue = kwargs.get('queue', None)
5858

59-
if not transaction.get_connection(using).in_atomic_block:
59+
connection = transaction.get_connection(using)
60+
if not connection.in_atomic_block:
61+
instance.reset_cqrs_saves_count()
6062
instance_data = instance.to_cqrs_dict(using)
6163
previous_data = instance.get_tracked_fields_data()
6264
signal_type = SignalType.SYNC if sync else SignalType.SAVE
@@ -66,7 +68,7 @@ def post_save(cls, sender, **kwargs):
6668
)
6769
producer.produce(payload)
6870

69-
else:
71+
elif instance.is_initial_cqrs_save:
7072
transaction.on_commit(
7173
lambda: MasterSignals.post_save(
7274
sender, instance=instance, using=using, sync=sync, queue=queue,

tests/dj_master/models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class TrackedFieldsChildModel(MasterMixin):
135135
CQRS_TRACKED_FIELDS = ('char_field', 'parent')
136136

137137
char_field = models.CharField(max_length=10)
138-
parent = models.ForeignKey(TrackedFieldsParentModel, on_delete=models.CASCADE)
138+
parent = models.ForeignKey(TrackedFieldsParentModel, on_delete=models.CASCADE, null=True)
139139

140140

141141
class TrackedFieldsAllWithChildModel(MasterMixin):

tests/test_master/test_mixin.py

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from dj_cqrs.constants import SignalType, FIELDS_TRACKER_FIELD_NAME
1212
from dj_cqrs.metas import MasterMeta
13+
1314
from tests.dj_master import models
1415
from tests.dj_master.serializers import AuthorSerializer
1516
from tests.utils import (
@@ -268,7 +269,7 @@ def test_create():
268269
assert m.cqrs_updated is not None
269270

270271

271-
@pytest.mark.django_db
272+
@pytest.mark.django_db(transaction=True)
272273
def test_update():
273274
m = models.AutoFieldsModel.objects.create()
274275
cqrs_updated = m.cqrs_updated
@@ -668,3 +669,126 @@ def test_m2m_not_supported():
668669

669670
assert 'm2m_field' not in cqrs_data
670671
assert 'char_field' in cqrs_data
672+
673+
674+
@pytest.mark.django_db(transaction=True)
675+
def test_transaction_instance_saved_once_simple_case(mocker):
676+
publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
677+
678+
i0 = models.TrackedFieldsChildModel.objects.create(char_field='old')
679+
with transaction.atomic():
680+
i1 = models.TrackedFieldsParentModel.objects.create(char_field='1')
681+
i1.char_field = '2'
682+
i1.save()
683+
684+
i2 = models.TrackedFieldsParentModel(char_field='a')
685+
i2.save()
686+
687+
i3 = models.TrackedFieldsChildModel.objects.create(char_field='.')
688+
689+
i0.char_field = 'new'
690+
i0.save()
691+
692+
assert publisher_mock.call_count == 5
693+
694+
for i in [i0, i1, i2, i3]:
695+
i.refresh_from_db()
696+
assert i0.cqrs_revision == 1
697+
assert i1.cqrs_revision == 0
698+
assert i2.cqrs_revision == 0
699+
assert i3.cqrs_revision == 0
700+
701+
mapper = (
702+
(i0.pk, 0, 'old', None),
703+
(i1.pk, 0, '2', '1'),
704+
(i2.pk, 0, 'a', None),
705+
(i3.pk, 0, '.', None),
706+
(i0.pk, 1, 'new', 'old'),
707+
)
708+
for index, call in enumerate(publisher_mock.call_args_list):
709+
payload = call[0][0]
710+
expected_data = mapper[index]
711+
712+
assert payload.pk == expected_data[0]
713+
assert payload.instance_data['cqrs_revision'] == expected_data[1]
714+
assert payload.instance_data['char_field'] == expected_data[2]
715+
assert payload.previous_data['char_field'] == expected_data[3]
716+
717+
718+
@pytest.mark.django_db(transaction=True)
719+
def test_cqrs_saves_count_lifecycle():
720+
instance = models.TrackedFieldsParentModel(char_field='1')
721+
instance.reset_cqrs_saves_count()
722+
assert instance.cqrs_saves_count == 0
723+
assert instance.is_initial_cqrs_save
724+
725+
instance.save()
726+
assert instance.cqrs_saves_count == 0
727+
assert instance.is_initial_cqrs_save
728+
729+
instance.save()
730+
assert instance.cqrs_saves_count == 0
731+
assert instance.is_initial_cqrs_save
732+
733+
instance.refresh_from_db()
734+
assert instance.cqrs_saves_count == 0
735+
assert instance.is_initial_cqrs_save
736+
737+
with transaction.atomic():
738+
instance.save()
739+
assert instance.cqrs_saves_count == 1
740+
assert instance.is_initial_cqrs_save
741+
742+
instance.save()
743+
assert instance.cqrs_saves_count == 2
744+
assert not instance.is_initial_cqrs_save
745+
746+
instance.refresh_from_db()
747+
assert instance.cqrs_saves_count == 2
748+
assert not instance.is_initial_cqrs_save
749+
750+
same_db_object_other_instance = models.TrackedFieldsParentModel.objects.first()
751+
assert same_db_object_other_instance.pk == instance.pk
752+
assert same_db_object_other_instance.cqrs_saves_count == 0
753+
assert same_db_object_other_instance.is_initial_cqrs_save
754+
755+
same_db_object_other_instance.save()
756+
assert same_db_object_other_instance.cqrs_saves_count == 1
757+
assert same_db_object_other_instance.is_initial_cqrs_save
758+
759+
same_db_object_other_instance.reset_cqrs_saves_count()
760+
assert same_db_object_other_instance.cqrs_saves_count == 0
761+
assert same_db_object_other_instance.is_initial_cqrs_save
762+
763+
same_db_object_other_instance.save()
764+
assert same_db_object_other_instance.cqrs_saves_count == 1
765+
assert same_db_object_other_instance.is_initial_cqrs_save
766+
767+
assert instance.cqrs_saves_count == 0
768+
assert same_db_object_other_instance.cqrs_saves_count == 0
769+
770+
771+
@pytest.mark.django_db(transaction=True)
772+
def test_sequential_transactions(mocker):
773+
publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
774+
775+
with transaction.atomic():
776+
instance = models.TrackedFieldsParentModel.objects.create(char_field='1')
777+
778+
with transaction.atomic():
779+
instance.char_field = '3'
780+
instance.save()
781+
782+
transaction.set_rollback(True)
783+
instance.reset_cqrs_saves_count()
784+
785+
with transaction.atomic():
786+
instance.char_field = '2'
787+
instance.save()
788+
789+
instance.refresh_from_db()
790+
791+
assert publisher_mock.call_count == 2
792+
assert instance.cqrs_revision == 1
793+
assert publisher_mock.call_args_list[0][0][0].instance_data['char_field'] == '1'
794+
assert publisher_mock.call_args_list[1][0][0].instance_data['char_field'] == '2'

0 commit comments

Comments
 (0)