Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions dj_cqrs/controller/consumer.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# Copyright © 2020 Ingram Micro Inc. All rights reserved.

import logging
from contextlib import ExitStack

from django.db import close_old_connections, transaction

from dj_cqrs.constants import SignalType
from dj_cqrs.registries import ReplicaRegistry


logger = logging.getLogger('django-cqrs')


Expand All @@ -29,26 +29,26 @@ def route_signal_to_replica_model(signal_type, cqrs_id, instance_data, previous_
:param str cqrs_id: Replica model CQRS unique identifier.
:param dict instance_data: Master model data.
"""
model_cls = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)
if signal_type not in (SignalType.DELETE, SignalType.SAVE, SignalType.SYNC):
logger.error('Bad signal type "{}" for CQRS_ID "{}".'.format(signal_type, cqrs_id))
return

model_cls = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)
if model_cls:
close_old_connections()
db_is_needed = not model_cls.CQRS_NO_DB_OPERATIONS
if db_is_needed:
close_old_connections()

if signal_type == SignalType.DELETE:
with transaction.atomic(savepoint=False):
with transaction.atomic(savepoint=False) if db_is_needed else ExitStack():
if signal_type == SignalType.DELETE:
return model_cls.cqrs_delete(instance_data)

elif signal_type == SignalType.SAVE:
with transaction.atomic(savepoint=False):
elif signal_type == SignalType.SAVE:
return model_cls.cqrs_save(instance_data, previous_data=previous_data)

elif signal_type == SignalType.SYNC:
with transaction.atomic(savepoint=False):
elif signal_type == SignalType.SYNC:
return model_cls.cqrs_save(
instance_data,
previous_data=previous_data,
sync=True,
)

else:
logger.error('Bad signal type "{}" for CQRS_ID "{}".'.format(signal_type, cqrs_id))
9 changes: 9 additions & 0 deletions dj_cqrs/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,9 @@ class ReplicaMixin(Model, metaclass=ReplicaMeta):
CQRS_SELECT_FOR_UPDATE = False
"""Set it to True to acquire lock on instance creation/update."""

CQRS_NO_DB_OPERATIONS = False
"""Set it to True to disable any default DB operations for this model."""

objects = Manager()

cqrs = ReplicaManager()
Expand All @@ -328,6 +331,9 @@ def cqrs_save(cls, master_data, previous_data=None, sync=False):
:return: Model instance.
:rtype: django.db.models.Model
"""
if cls.CQRS_NO_DB_OPERATIONS:
raise NotImplementedError

return cls.cqrs.save_instance(master_data, previous_data, sync)

@classmethod
Expand Down Expand Up @@ -367,4 +373,7 @@ def cqrs_delete(cls, master_data):
:return: Flag, if delete operation is successful (even if nothing was deleted).
:rtype: bool
"""
if cls.CQRS_NO_DB_OPERATIONS:
raise NotImplementedError

return cls.cqrs.delete_instance(master_data)
10 changes: 10 additions & 0 deletions tests/dj_replica/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ class LockModelRef(ReplicaMixin, models.Model):
id = models.IntegerField(primary_key=True)


class NoDBModelRef(ReplicaMixin):
CQRS_ID = 'no_db'
CQRS_NO_DB_OPERATIONS = True

id = models.IntegerField(primary_key=True)

class Meta:
abstract = True


class Event(models.Model):
pid = models.IntegerField()
cqrs_id = models.CharField(max_length=20)
Expand Down
16 changes: 15 additions & 1 deletion tests/test_controller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Copyright © 2020 Ingram Micro Inc. All rights reserved.

from dj_cqrs.controller.consumer import consume
import pytest

from dj_cqrs.constants import SignalType
from dj_cqrs.controller.consumer import consume, route_signal_to_replica_model
from dj_cqrs.controller.producer import produce
from dj_cqrs.dataclasses import TransportPayload

Expand All @@ -24,3 +27,14 @@ def test_consumer(mocker):
consume(TransportPayload('a', 'b', {}, 'c', previous_data={'e': 'f'}))

factory_mock.assert_called_once_with('a', 'b', {}, previous_data={'e': 'f'})


@pytest.mark.django_db(transaction=True)
def test_route_signal_to_replica_model_with_db(django_assert_num_queries):
with django_assert_num_queries(1):
route_signal_to_replica_model(SignalType.SAVE, 'lock', {})


def test_route_signal_to_replica_model_without_db():
with pytest.raises(NotImplementedError):
route_signal_to_replica_model(SignalType.SAVE, 'no_db', {})
9 changes: 9 additions & 0 deletions tests/test_replica/test_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,3 +485,12 @@ def test_select_for_update_lock(mocker):

assert instance.id == 1
m.assert_called_once()


@pytest.mark.django_db
def test_nodb(mocker):
with pytest.raises(NotImplementedError):
models.NoDBModelRef.cqrs_save(None)

with pytest.raises(NotImplementedError):
models.NoDBModelRef.cqrs_delete(None)