Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 36 additions & 27 deletions dj_cqrs/controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from contextlib import ExitStack

from django.conf import settings
from django.db import Error, close_old_connections, transaction

from dj_cqrs.constants import SignalType
Expand All @@ -25,11 +26,12 @@ def consume(payload):
payload.instance_data,
previous_data=payload.previous_data,
meta=payload.meta,
queue=payload.queue,
)


def route_signal_to_replica_model(
signal_type, cqrs_id, instance_data, previous_data=None, meta=None,
signal_type, cqrs_id, instance_data, previous_data=None, meta=None, queue=None,
):
""" Routes signal to model method to create/update/delete replica instance.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be interesting to have the new way of documentation syntax like google style even though this function is not in the documentation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a part of high level refactoring, I agree. But, I guess, that's a story for another PR.


Expand All @@ -38,43 +40,50 @@ def route_signal_to_replica_model(
:param dict instance_data: Master model data.
:param dict or None previous_data: Previous model data for changed tracked fields, if exists.
:param dict or None meta: Payload metadata, if exists.
:param str or None queue: Synced queue.
"""
if signal_type not in (SignalType.DELETE, SignalType.SAVE, SignalType.SYNC):
logger.error('Bad signal type "{0}" for CQRS_ID "{1}".'.format(signal_type, cqrs_id))
return

model_cls = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)
if model_cls:
db_is_needed = not model_cls.CQRS_NO_DB_OPERATIONS
if db_is_needed:
close_old_connections()
if not model_cls:
return

is_meta_supported = model_cls.CQRS_META
try:
with transaction.atomic(savepoint=False) if db_is_needed else ExitStack():
if signal_type == SignalType.DELETE:
if is_meta_supported:
return model_cls.cqrs_delete(instance_data, meta=meta)
this_queue = settings.CQRS['queue']
if signal_type == SignalType.SYNC and model_cls.CQRS_ONLY_DIRECT_SYNCS and queue != this_queue:
return True

return model_cls.cqrs_delete(instance_data)
db_is_needed = not model_cls.CQRS_NO_DB_OPERATIONS
if db_is_needed:
close_old_connections()

f_kw = {'previous_data': previous_data}
is_meta_supported = model_cls.CQRS_META
try:
with transaction.atomic(savepoint=False) if db_is_needed else ExitStack():
if signal_type == SignalType.DELETE:
if is_meta_supported:
f_kw['meta'] = meta
return model_cls.cqrs_delete(instance_data, meta=meta)

return model_cls.cqrs_delete(instance_data)

f_kw = {'previous_data': previous_data}
if is_meta_supported:
f_kw['meta'] = meta

if signal_type == SignalType.SAVE:
return model_cls.cqrs_save(instance_data, **f_kw)
if signal_type == SignalType.SAVE:
return model_cls.cqrs_save(instance_data, **f_kw)

if signal_type == SignalType.SYNC:
f_kw['sync'] = True
return model_cls.cqrs_save(instance_data, **f_kw)
if signal_type == SignalType.SYNC:
f_kw['sync'] = True
return model_cls.cqrs_save(instance_data, **f_kw)

except Error as e:
pk_value = instance_data.get(model_cls._meta.pk.name)
cqrs_revision = instance_data.get('cqrs_revision')
except Error as e:
pk_value = instance_data.get(model_cls._meta.pk.name)
cqrs_revision = instance_data.get('cqrs_revision')

logger.error(
'{0}\nCQRS {1} error: pk = {2}, cqrs_revision = {3} ({4}).'.format(
str(e), signal_type, pk_value, cqrs_revision, model_cls.CQRS_ID,
),
)
logger.error(
'{0}\nCQRS {1} error: pk = {2}, cqrs_revision = {3} ({4}).'.format(
str(e), signal_type, pk_value, cqrs_revision, model_cls.CQRS_ID,
),
)
3 changes: 3 additions & 0 deletions dj_cqrs/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ class ReplicaMixin(RawReplicaMixin, Model, metaclass=ReplicaMeta):
CQRS_META = False
"""Set it to True to receive meta data for this model."""

CQRS_ONLY_DIRECT_SYNCS = False
"""Set it to True to ignore broadcast sync packages and to receive only direct queue syncs."""

objects = Manager()
cqrs = ReplicaManager()
"""Manager that adds needed CQRS queryset methods."""
Expand Down
5 changes: 5 additions & 0 deletions tests/dj_replica/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ def cqrs_save(cls, *args, **kwargs):
raise Exception('Test fail Exception')


class OnlyDirectSyncModel(ReplicaMixin):
CQRS_ID = 'only_direct_sync'
CQRS_ONLY_DIRECT_SYNCS = True


class CQRSMetaModel(ReplicaMixin):
CQRS_ID = 'meta'
CQRS_META = True
Expand Down
17 changes: 15 additions & 2 deletions tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dj_cqrs.controller.consumer import consume, route_signal_to_replica_model
from dj_cqrs.controller.producer import produce
from dj_cqrs.dataclasses import TransportPayload
from tests.dj_replica.models import OnlyDirectSyncModel


def test_producer(mocker):
Expand All @@ -30,9 +31,11 @@ def test_producer(mocker):

def test_consumer(mocker):
factory_mock = mocker.patch('dj_cqrs.controller.consumer.route_signal_to_replica_model')
consume(TransportPayload('a', 'b', {}, 'c', previous_data={'e': 'f'}))
consume(TransportPayload('a', 'b', {}, 'c', previous_data={'e': 'f'}, queue='xyz'))

factory_mock.assert_called_once_with('a', 'b', {}, previous_data={'e': 'f'}, meta=None)
factory_mock.assert_called_once_with(
'a', 'b', {}, previous_data={'e': 'f'}, meta=None, queue='xyz',
)


def test_changed_payload_data_during_consume(mocker):
Expand Down Expand Up @@ -94,3 +97,13 @@ def test_route_signal_to_replica_model_integrity_error(caplog):
def test_route_signal_to_replica_model_without_db():
with pytest.raises(NotImplementedError):
route_signal_to_replica_model(SignalType.SAVE, 'no_db', {})


@pytest.mark.parametrize('queue', ('abc', None))
def test_route_signal_to_replica_with_only_direct_syncs(queue):
assert route_signal_to_replica_model(
signal_type=SignalType.SYNC,
cqrs_id=OnlyDirectSyncModel.CQRS_ID,
instance_data={},
queue=queue,
) is True