diff --git a/README.md b/README.md index 748615a..373b225 100644 --- a/README.md +++ b/README.md @@ -228,8 +228,8 @@ Unit testing Run tests with various RDBMS: - `cd integration_tests` -- `DB=postgres docker-compose -f docker-compose.yml -f rdbms.yml run app_test` -- `DB=mysql docker-compose -f docker-compose.yml -f rdbms.yml run app_test` +- `DB=postgres docker compose -f docker-compose.yml -f rdbms.yml run app_test` +- `DB=mysql docker compose -f docker-compose.yml -f rdbms.yml run app_test` Check code style: `flake8` Run tests: `pytest` @@ -244,6 +244,6 @@ To generate HTML coverage reports use: Integrational testing ------ -1. docker-compose +1. docker compose 2. `cd integration_tests` -3. `docker-compose run master` +3. `docker compose run master` diff --git a/dj_cqrs/mixins.py b/dj_cqrs/mixins.py index 0425b25..4140ff6 100644 --- a/dj_cqrs/mixins.py +++ b/dj_cqrs/mixins.py @@ -1,4 +1,4 @@ -# Copyright © 2023 Ingram Micro Inc. All rights reserved. +# Copyright © 2024 Ingram Micro Inc. All rights reserved. import logging @@ -20,6 +20,7 @@ from dj_cqrs.managers import MasterManager, ReplicaManager from dj_cqrs.metas import MasterMeta, ReplicaMeta from dj_cqrs.signals import MasterSignals, post_bulk_create, post_update +from dj_cqrs.state import cqrs_state logger = logging.getLogger('django-cqrs') @@ -292,9 +293,16 @@ def _class_serialization(self, using, sync=False): if sync: instance = self else: + instance = None db = using if using is not None else self._state.db - qs = self.__class__._default_manager.using(db) - instance = self.relate_cqrs_serialization(qs).get(pk=self.pk) + + bulk_relate_cm = cqrs_state.bulk_relate_cm + if bulk_relate_cm: + instance = bulk_relate_cm.get_cached_instance(self, db) + + if not instance: + qs = self.__class__._default_manager.using(db) + instance = self.relate_cqrs_serialization(qs).get(pk=self.pk) data = self._cqrs_serializer_cls(instance).data data['cqrs_revision'] = instance.cqrs_revision diff --git a/dj_cqrs/signals.py b/dj_cqrs/signals.py index af436f4..7b02344 100644 --- a/dj_cqrs/signals.py +++ b/dj_cqrs/signals.py @@ -1,4 +1,4 @@ -# Copyright © 2023 Ingram Micro Inc. All rights reserved. +# Copyright © 2024 Ingram Micro Inc. All rights reserved. import logging @@ -9,6 +9,7 @@ from dj_cqrs.constants import SignalType from dj_cqrs.controller import producer from dj_cqrs.dataclasses import TransportPayload +from dj_cqrs.state import cqrs_state from dj_cqrs.utils import get_message_expiration_dt @@ -64,6 +65,10 @@ def post_save(cls, sender, **kwargs): using = kwargs['using'] + bulk_relate_cm = cqrs_state.bulk_relate_cm + if bulk_relate_cm: + bulk_relate_cm.register(instance, using) + sync = kwargs.get('sync', False) queue = kwargs.get('queue', None) diff --git a/dj_cqrs/state.py b/dj_cqrs/state.py new file mode 100644 index 0000000..0a575af --- /dev/null +++ b/dj_cqrs/state.py @@ -0,0 +1,7 @@ +# Copyright © 2024 Ingram Micro Inc. All rights reserved. + +import threading + + +cqrs_state = threading.local() +cqrs_state.bulk_relate_cm = None diff --git a/dj_cqrs/utils.py b/dj_cqrs/utils.py index ae75690..ff80eb0 100644 --- a/dj_cqrs/utils.py +++ b/dj_cqrs/utils.py @@ -1,6 +1,8 @@ -# Copyright © 2023 Ingram Micro Inc. All rights reserved. +# Copyright © 2024 Ingram Micro Inc. All rights reserved. import logging +from collections import defaultdict +from contextlib import ContextDecorator from datetime import date, datetime, timedelta from uuid import UUID @@ -10,6 +12,7 @@ from dj_cqrs.constants import DB_VENDOR_PG, SUPPORTED_TIMEOUT_DB_VENDORS from dj_cqrs.logger import install_last_query_capturer +from dj_cqrs.state import cqrs_state logger = logging.getLogger('django-cqrs') @@ -80,3 +83,60 @@ def apply_query_timeouts(model_cls): # pragma: no cover cursor.execute(statement, params=(query_timeout,)) install_last_query_capturer(model_cls) + + +class _BulkRelateCM(ContextDecorator): + def __init__(self, cqrs_id=None): + self._cqrs_id = cqrs_id + self._mapping = defaultdict(lambda: defaultdict(set)) + self._cache = {} + + def register(self, instance, using=None): + instance_cqrs_id = getattr(instance, 'CQRS_ID', None) + if (not instance_cqrs_id) or (self._cqrs_id and instance_cqrs_id != self._cqrs_id): + return + + self._mapping[instance_cqrs_id][using].add(instance.pk) + + def get_cached_instance(self, instance, using=None): + instance_cqrs_id = getattr(instance, 'CQRS_ID', None) + if (not instance_cqrs_id) or (self._cqrs_id and instance_cqrs_id != self._cqrs_id): + return + + instance_pk = instance.pk + cached_instances = self._cache.get(instance_cqrs_id, {}).get(using, {}) + if cached_instances: + return cached_instances.get(instance_pk) + + cached_pks = self._mapping[instance_cqrs_id][using] + if not cached_pks: + return + + qs = instance.__class__._default_manager.using(using) + instances_cache = { + instance.pk: instance + for instance in instance.__class__.relate_cqrs_serialization(qs) + .filter( + pk__in=cached_pks, + ) + .order_by() + .all() + } + self._cache.update( + { + instance_cqrs_id: { + using: instances_cache, + }, + } + ) + return instances_cache.get(instance_pk) + + def __enter__(self): + cqrs_state.bulk_relate_cm = self + + def __exit__(self, exc_type, exc_val, exc_tb): + cqrs_state.bulk_relate_cm = None + + +def bulk_relate_cqrs_serialization(cqrs_id=None): + return _BulkRelateCM(cqrs_id=cqrs_id) diff --git a/examples/demo_project/README.md b/examples/demo_project/README.md index f39af4e..a15c5d1 100644 --- a/examples/demo_project/README.md +++ b/examples/demo_project/README.md @@ -8,12 +8,12 @@ It's a simple demo project contains 2 services: ## Start project: ``` -docker-compose up -d db_pgsql db_mysql -docker-compose run master ./manage.py migrate -docker-compose run replica ./manage.py migrate -docker-compose up -d -docker-compose run master ./manage.py cqrs_sync --cqrs-id=user -f={} -docker-compose run master ./manage.py cqrs_sync --cqrs-id=product -f={} +docker compose up -d db_pgsql db_mysql +docker compose run master ./manage.py migrate +docker compose run replica ./manage.py migrate +docker compose up -d +docker compose run master ./manage.py cqrs_sync --cqrs-id=user -f={} +docker compose run master ./manage.py cqrs_sync --cqrs-id=product -f={} ``` It starts master WEB app on [http://127.0.0.1:8000](http://127.0.0.1:8000) and replica on [http://127.0.0.1:8001](http://127.0.0.1:8001) diff --git a/integration_tests/Makefile b/integration_tests/Makefile index edf76f2..0c32303 100644 --- a/integration_tests/Makefile +++ b/integration_tests/Makefile @@ -3,40 +3,40 @@ .DEFAULT_GOAL := pika build: - docker-compose build + docker compose build build_master_v1: - docker-compose -f docker-compose.yml -f masterV1.yml build + docker compose -f docker-compose.yml -f masterV1.yml build build_replica_v1: - docker-compose -f docker-compose.yml -f replicaV1.yml build + docker compose -f docker-compose.yml -f replicaV1.yml build pika: build @echo "Run PIKA integration tests..." - docker-compose run master + docker compose run master @echo "Stopping running containers..." - docker-compose down --remove-orphans + docker compose down --remove-orphans @echo "Done!" kombu: build @echo "Run KOMBU integration tests..." - docker-compose -f docker-compose.yml -f kombu.yml run master + docker compose -f docker-compose.yml -f kombu.yml run master @echo "Stopping running containers..." - docker-compose -f docker-compose.yml -f kombu.yml down --remove-orphans + docker compose -f docker-compose.yml -f kombu.yml down --remove-orphans @echo "Done!" master_v1: build_master_v1 @echo "Run regression tests Master v1.3.1..." - docker-compose -f docker-compose.yml -f masterV1.yml run master + docker compose -f docker-compose.yml -f masterV1.yml run master @echo "Stopping running containers..." - docker-compose -f docker-compose.yml -f masterV1.yml down --remove-orphans + docker compose -f docker-compose.yml -f masterV1.yml down --remove-orphans @echo "Done!" replica_v1: build_replica_v1 @echo "Run regression tests Replica v1.3.1..." - docker-compose -f docker-compose.yml -f replicaV1.yml run master + docker compose -f docker-compose.yml -f replicaV1.yml run master @echo "Stopping running containers..." - docker-compose -f docker-compose.yml -f replicaV1.yml down --remove-orphans + docker compose -f docker-compose.yml -f replicaV1.yml down --remove-orphans @echo "Done!" all: pika kombu master_v1 replica_v1 diff --git a/tests/test_master/test_signals.py b/tests/test_master/test_signals.py index 05d1c90..d27ca9f 100644 --- a/tests/test_master/test_signals.py +++ b/tests/test_master/test_signals.py @@ -1,4 +1,4 @@ -# Copyright © 2023 Ingram Micro Inc. All rights reserved. +# Copyright © 2024 Ingram Micro Inc. All rights reserved. from datetime import datetime, timezone @@ -8,6 +8,7 @@ from dj_cqrs.constants import SignalType from dj_cqrs.signals import post_bulk_create, post_update +from dj_cqrs.utils import bulk_relate_cqrs_serialization from tests.dj_master import models from tests.utils import assert_is_sub_dict, assert_publisher_once_called_with_args @@ -127,6 +128,34 @@ def test_manual_post_bulk_create(mocker): assert publisher_mock.call_count == 3 +@pytest.mark.django_db(transaction=True) +@pytest.mark.parametrize('count', (1, 3, 5)) +def test_bulk_relate_cqrs_serialization( + django_assert_num_queries, + django_v_trans_q_count_sup, + mocker, + count, + settings, +): + mocker.patch('dj_cqrs.controller.producer.produce') + + if settings.DB_ENGINE == 'sqlite' and django_v_trans_q_count_sup == 0: + suppl = 1 + else: + suppl = django_v_trans_q_count_sup + + opt_query_count = count + 2 + suppl + with django_assert_num_queries(opt_query_count): + with bulk_relate_cqrs_serialization(): + with transaction.atomic(savepoint=False): + [models.Author.objects.create(id=i) for i in range(count)] + + not_opt_query_count = count + count * 2 + suppl + with django_assert_num_queries(not_opt_query_count): + with transaction.atomic(savepoint=False): + [models.Author.objects.create(id=10 + i) for i in range(count)] + + @pytest.mark.django_db(transaction=True) def test_automatic_post_bulk_create(mocker): publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce') diff --git a/tests/test_utils.py b/tests/test_utils.py index 34920ab..0c7c838 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -10,14 +10,18 @@ from uuid import UUID import pytest +from django.db import transaction +from dj_cqrs.state import cqrs_state from dj_cqrs.utils import ( apply_query_timeouts, + bulk_relate_cqrs_serialization, get_delay_queue_max_size, get_json_valid_value, get_message_expiration_dt, get_messages_prefetch_count_per_worker, ) +from tests.dj_master import models as master_models from tests.dj_replica import models @@ -109,3 +113,127 @@ def test_apply_query_timeouts(settings, engine, p_count): assert apply_query_timeouts(models.BasicFieldsModelRef) is None assert p.call_count == p_count + + +@pytest.mark.django_db(transaction=True) +def test_bulk_relate_cqrs_serialization_simple_model(mocker): + produce_mock = mocker.patch('dj_cqrs.controller.producer.produce') + + @bulk_relate_cqrs_serialization() + def func(): + assert cqrs_state.bulk_relate_cm + + instance = master_models.SimplestModel(id=1) + instance.save() + + assert cqrs_state.bulk_relate_cm is None + func() + + assert master_models.SimplestModel.objects.count() == 1 + assert produce_mock.call_count == 1 + assert cqrs_state.bulk_relate_cm is None + + +@pytest.mark.django_db(transaction=True) +def test_bulk_relate_cqrs_serialization_serialized_model(mocker): + produce_mock = mocker.patch('dj_cqrs.controller.producer.produce') + + assert cqrs_state.bulk_relate_cm is None + with bulk_relate_cqrs_serialization(cqrs_id=master_models.Author.CQRS_ID): + bulk_relate_cm = cqrs_state.bulk_relate_cm + + with transaction.atomic(savepoint=False): + master_models.Author.objects.create(id=1) + + assert bulk_relate_cm + assert bulk_relate_cm._mapping + assert not bulk_relate_cm._cache + + assert bulk_relate_cm._cache + + assert master_models.Author.objects.count() == 1 + assert produce_mock.call_count == 1 + assert cqrs_state.bulk_relate_cm is None + + +def test_bulk_relate_cqrs_serialization_error(): + assert cqrs_state.bulk_relate_cm is None + + try: + with bulk_relate_cqrs_serialization(cqrs_id=master_models.Author.CQRS_ID): + assert cqrs_state.bulk_relate_cm + raise ValueError + except ValueError: + pass + + assert cqrs_state.bulk_relate_cm is None + + +@pytest.mark.django_db(transaction=True) +def test_bulk_relate_cqrs_serialization_register(): + author1 = master_models.Author(id=1) + author2 = master_models.Author(id=2) + + with bulk_relate_cqrs_serialization(cqrs_id=master_models.Author.CQRS_ID): + bulk_relate_cm = cqrs_state.bulk_relate_cm + bulk_relate_cm.register(ValueError) + bulk_relate_cm.register(master_models.FilteredSimplestModel()) + bulk_relate_cm.register(author1, 'default') + bulk_relate_cm.register(author1, 'default') + bulk_relate_cm.register(author1, 'other') + bulk_relate_cm.register(author2, 'other') + bulk_relate_cm.register(author2) + + assert bulk_relate_cm._mapping == { + master_models.Author.CQRS_ID: { + 'default': {1}, + 'other': {1, 2}, + None: {2}, + }, + } + + assert cqrs_state.bulk_relate_cm is None + + +@pytest.mark.django_db(transaction=True) +def test_bulk_relate_cqrs_serialization_get_cached_instance(mocker, django_assert_num_queries): + produce_mock = mocker.patch('dj_cqrs.controller.producer.produce') + + simple = master_models.SimplestModel.objects.create(id=1) + + with bulk_relate_cqrs_serialization(): + bulk_relate_cm = cqrs_state.bulk_relate_cm + + with transaction.atomic(): + author1 = master_models.Author.objects.create(id=1) + author1.name = 'new' + author1.save() + author2 = master_models.Author.objects.create(id=2) + + af = master_models.AutoFieldsModel.objects.using('default').create() + publisher = master_models.Publisher.objects.create(id=3) + + assert produce_mock.call_count == 4 + assert bulk_relate_cm._cache == { + master_models.Author.CQRS_ID: { + 'default': { + 1: author1, + 2: author2, + }, + }, + } + + assert bulk_relate_cm.get_cached_instance(publisher) is None + assert bulk_relate_cm.get_cached_instance(ValueError, 'test') is None + + with django_assert_num_queries(0): + assert bulk_relate_cm.get_cached_instance(simple) is None + assert bulk_relate_cm.get_cached_instance(author1, 'default') == author1 + assert bulk_relate_cm.get_cached_instance(author1, 'default') == author1 + assert bulk_relate_cm.get_cached_instance(author1, 'other') is None + assert bulk_relate_cm.get_cached_instance(author2, 'default') == author2 + assert bulk_relate_cm.get_cached_instance(author2) is None + assert bulk_relate_cm.get_cached_instance(master_models.Author(id=3)) is None + assert bulk_relate_cm.get_cached_instance(af) is None + + assert cqrs_state.bulk_relate_cm is None diff --git a/travis_compat_tests.sh b/travis_compat_tests.sh index fa6f7ed..6e2e70a 100755 --- a/travis_compat_tests.sh +++ b/travis_compat_tests.sh @@ -6,13 +6,13 @@ if [ "$COMPAT_TESTS" == "yes" ]; then echo "Running backward compatibility tests....." cd integration_tests echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin - docker-compose -f docker-compose.yml -f masterV1.yml build - docker-compose -f docker-compose.yml -f masterV1.yml run master - docker-compose -f docker-compose.yml -f masterV1.yml down --remove-orphans + docker compose -f docker-compose.yml -f masterV1.yml build + docker compose -f docker-compose.yml -f masterV1.yml run master + docker compose -f docker-compose.yml -f masterV1.yml down --remove-orphans - docker-compose -f docker-compose.yml -f replicaV1.yml build - docker-compose -f docker-compose.yml -f replicaV1.yml run master - docker-compose -f docker-compose.yml -f replicaV1.yml down --remove-orphans + docker compose -f docker-compose.yml -f replicaV1.yml build + docker compose -f docker-compose.yml -f replicaV1.yml run master + docker compose -f docker-compose.yml -f replicaV1.yml down --remove-orphans cd .. echo "Done!" else diff --git a/travis_integration_tests.sh b/travis_integration_tests.sh index e315720..63ea8a9 100755 --- a/travis_integration_tests.sh +++ b/travis_integration_tests.sh @@ -6,14 +6,14 @@ if [ "$INTEGRATION_TESTS" == "yes" ]; then echo "Running integration tests....." echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin cd integration_tests - docker-compose build - docker-compose run master - docker-compose down --remove-orphans - docker-compose -f docker-compose.yml -f kombu.yml run master - docker-compose -f docker-compose.yml -f kombu.yml down --remove-orphans - DB=postgres docker-compose -f docker-compose.yml -f rdbms.yml run app_test - DB=mysql docker-compose -f docker-compose.yml -f rdbms.yml run app_test - docker-compose -f docker-compose.yml -f rdbms.yml down --remove-orphans + docker compose build + docker compose run master + docker compose down --remove-orphans + docker compose -f docker-compose.yml -f kombu.yml run master + docker compose -f docker-compose.yml -f kombu.yml down --remove-orphans + DB=postgres docker compose -f docker-compose.yml -f rdbms.yml run app_test + DB=mysql docker compose -f docker-compose.yml -f rdbms.yml run app_test + docker compose -f docker-compose.yml -f rdbms.yml down --remove-orphans cd .. echo "Done!"