Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use eventual consistency to update EntityList num_entities #2651

Merged
merged 63 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
3284911
index deleted_at field for model EntityList, Entity
kelvin-muchiri Jul 23, 2024
cecaaa3
delete form metadata when EntityList is hard deleted
kelvin-muchiri Jul 23, 2024
34fada4
use eventual consistency to increment/decrement EntityList num_entities
kelvin-muchiri Jul 25, 2024
55831c2
remove unnecessary code
kelvin-muchiri Jul 25, 2024
2f8cf7f
remove whitespace
kelvin-muchiri Jul 25, 2024
a386cb4
add tests
kelvin-muchiri Jul 25, 2024
644873b
add tests
kelvin-muchiri Jul 25, 2024
e3b8035
include cached counter in API response
kelvin-muchiri Jul 25, 2024
b455c5f
decrement db counter if cache missing
kelvin-muchiri Jul 25, 2024
5fbf5b6
refactor code
kelvin-muchiri Jul 25, 2024
bc4d734
handle cache ConnectionError
kelvin-muchiri Jul 25, 2024
453ad3f
handle cache ConnectionError
kelvin-muchiri Jul 25, 2024
274f9b9
suppress warning import-outside-toplevel
kelvin-muchiri Jul 25, 2024
09c0dae
refactor constants
kelvin-muchiri Jul 25, 2024
bf9a1dc
address lint redefined-builtin / Redefining built-in 'id'
kelvin-muchiri Jul 25, 2024
8b1891f
resolve cyclic dependency
kelvin-muchiri Jul 25, 2024
61f6def
resolve cyclic dependency
kelvin-muchiri Jul 25, 2024
fdf0122
update docstring
kelvin-muchiri Jul 25, 2024
3da6d26
resolve cyclic dependency
kelvin-muchiri Jul 25, 2024
38a50d8
resolve cyclic dependency
kelvin-muchiri Jul 25, 2024
2dae0c3
handle incassible cache, handle already acquired lock
kelvin-muchiri Jul 26, 2024
8943e23
fix typo
kelvin-muchiri Jul 26, 2024
c4b0337
add item to cached set using locking
kelvin-muchiri Jul 26, 2024
485a5e9
use dict to store unique lock for each cache key
kelvin-muchiri Jul 26, 2024
f85624d
set no expiry for EntityList cached counter
kelvin-muchiri Jul 26, 2024
7300c06
update docstring
kelvin-muchiri Jul 26, 2024
8361b8e
add test case
kelvin-muchiri Jul 26, 2024
50cc89c
update comment
kelvin-muchiri Jul 26, 2024
5a791da
update comment
kelvin-muchiri Jul 26, 2024
34bd1f5
fix failing test
kelvin-muchiri Jul 26, 2024
ed60427
fix failing test
kelvin-muchiri Jul 26, 2024
d09a1ae
update doc string
kelvin-muchiri Jul 26, 2024
8471c50
avoid cyclic dependency
kelvin-muchiri Jul 26, 2024
eca8d39
execute failover if commit timeout threshold exceeded
kelvin-muchiri Jul 26, 2024
36ef8a7
fix failing tests
kelvin-muchiri Jul 26, 2024
166ac03
fix failing tests
kelvin-muchiri Jul 26, 2024
c4acc9e
update doc string
kelvin-muchiri Jul 26, 2024
972e1ce
refactor code
kelvin-muchiri Jul 26, 2024
3880e3a
refactor code
kelvin-muchiri Jul 26, 2024
5597389
refactor code
kelvin-muchiri Jul 26, 2024
57de5d6
fix invalid-name lint warning
kelvin-muchiri Jul 26, 2024
867b3e5
add utility method
kelvin-muchiri Jul 26, 2024
9d5f70f
lock cache between processes
kelvin-muchiri Jul 30, 2024
8442cbe
resolve lint error line too long
kelvin-muchiri Jul 30, 2024
9557da5
send entity list failover report once
kelvin-muchiri Aug 5, 2024
0a0afb5
refactor code
kelvin-muchiri Aug 5, 2024
84fdee4
add test
kelvin-muchiri Aug 5, 2024
5527d40
rename symbol
kelvin-muchiri Aug 5, 2024
fc5fd34
fix failing tests
kelvin-muchiri Sep 4, 2024
c1856d5
suppress lint warning
kelvin-muchiri Sep 4, 2024
3ba026b
increment and decrement num_entities asynchronously
kelvin-muchiri Sep 4, 2024
9c1c1d1
wait for transaction to complete
kelvin-muchiri Sep 4, 2024
c9b1a2c
resolve cyclic dependency
kelvin-muchiri Sep 4, 2024
6db3ad1
fix invalid-name lint error
kelvin-muchiri Sep 4, 2024
b15bb99
fix failing test
kelvin-muchiri Sep 4, 2024
3053f43
fix failing tests
kelvin-muchiri Sep 4, 2024
889f2c5
pin to master
kelvin-muchiri Sep 4, 2024
cb07f18
remove duplicate pin to master
kelvin-muchiri Sep 4, 2024
d0ab33a
fix failing test
kelvin-muchiri Sep 4, 2024
3eb77b6
refactor code
kelvin-muchiri Sep 6, 2024
fc82f3f
rename cache keys
kelvin-muchiri Sep 6, 2024
aa8bda6
rename constant
kelvin-muchiri Sep 6, 2024
9a62b3a
address lint warnings
kelvin-muchiri Sep 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 63 additions & 30 deletions onadata/apps/api/tests/viewsets/test_entity_list_viewset.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
from datetime import datetime, timezone as dtz
from unittest.mock import patch

from django.core.cache import cache
from django.test import override_settings
from django.utils import timezone

from onadata.apps.api.viewsets.entity_list_viewset import EntityListViewSet
from onadata.apps.api.tests.viewsets.test_abstract_viewset import TestAbstractViewSet
from onadata.libs.pagination import StandardPageNumberPagination
from onadata.apps.logger.models import Entity, EntityHistory, EntityList, Project
from onadata.libs.models.share_project import ShareProject
from onadata.libs.pagination import StandardPageNumberPagination
from onadata.libs.permissions import ROLES, OwnerRole
from onadata.libs.utils.user_auth import get_user_default_project

Expand Down Expand Up @@ -243,16 +244,17 @@ def _create_entity_list(self, name, project=None):
@override_settings(TIME_ZONE="UTC")
def test_get_all(self):
"""Getting all EntityLists works"""
Entity.objects.create(
entity_list=self.trees_entity_list,
json={
"species": "purpleheart",
"geometry": "-1.286905 36.772845 0 0",
"circumference_cm": 300,
"label": "300cm purpleheart",
},
uuid="dbee4c32-a922-451c-9df7-42f40bf78f48",
)
with self.captureOnCommitCallbacks(execute=True):
Entity.objects.create(
entity_list=self.trees_entity_list,
json={
"species": "purpleheart",
"geometry": "-1.286905 36.772845 0 0",
"circumference_cm": 300,
"label": "300cm purpleheart",
},
uuid="dbee4c32-a922-451c-9df7-42f40bf78f48",
)
qs = EntityList.objects.all().order_by("pk")
first = qs[0]
second = qs[1]
Expand Down Expand Up @@ -384,6 +386,30 @@ def test_soft_deleted_excluded(self):
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 0)

def test_num_entities_cached(self):
"""`num_entities` includes cached counter"""
entity_list = EntityList.objects.get(name="trees")
entity_list.num_entities = 5
entity_list.save()
cache.set(f"elist-num-entities-{entity_list.pk}", 7)

request = self.factory.get("/", **self.extra)
response = self.view(request)

self.assertEqual(response.status_code, 200)
self.assertEqual(response.data[0]["num_entities"], 12)

# Defaults to database counter if cache inaccessible
with patch.object(cache, "get") as mock_cache_get:
with patch("onadata.libs.utils.cache_tools.logger.exception") as mock_exc:
mock_cache_get.side_effect = ConnectionError
request = self.factory.get("/", **self.extra)
response = self.view(request)

self.assertEqual(response.status_code, 200)
self.assertEqual(response.data[0]["num_entities"], 5)
mock_exc.assert_called()


@override_settings(TIME_ZONE="UTC")
class GetSingleEntityListTestCase(TestAbstractViewSet):
Expand All @@ -401,16 +427,18 @@ def setUp(self):
# Create Entity for trees EntityList
trees_entity_list = EntityList.objects.get(name="trees")
OwnerRole.add(self.user, trees_entity_list)
Entity.objects.create(
entity_list=trees_entity_list,
json={
"species": "purpleheart",
"geometry": "-1.286905 36.772845 0 0",
"circumference_cm": 300,
"label": "300cm purpleheart",
},
uuid="dbee4c32-a922-451c-9df7-42f40bf78f48",
)

with self.captureOnCommitCallbacks(execute=True):
Entity.objects.create(
entity_list=trees_entity_list,
json={
"species": "purpleheart",
"geometry": "-1.286905 36.772845 0 0",
"circumference_cm": 300,
"label": "300cm purpleheart",
},
uuid="dbee4c32-a922-451c-9df7-42f40bf78f48",
)

def test_get_entity_list(self):
"""Returns a single EntityList"""
Expand Down Expand Up @@ -1452,27 +1480,32 @@ def setUp(self):
super().setUp()

self.view = EntityListViewSet.as_view({"delete": "entities"})
self._create_entity()

with self.captureOnCommitCallbacks(execute=True):
self._create_entity()

OwnerRole.add(self.user, self.entity_list)

@patch("django.utils.timezone.now")
def test_delete(self, mock_now):
"""Delete Entity works"""
self.entity_list.refresh_from_db()
self.assertEqual(self.entity_list.num_entities, 1)
self.assertEqual(cache.get(f"elist-num-entities-{self.entity_list.pk}"), 1)
date = datetime(2024, 6, 11, 14, 9, 0, tzinfo=timezone.utc)
mock_now.return_value = date
request = self.factory.delete(
"/", data={"entity_ids": [self.entity.pk]}, **self.extra
)
response = self.view(request, pk=self.entity_list.pk)
self.entity.refresh_from_db()
self.entity_list.refresh_from_db()

with self.captureOnCommitCallbacks(execute=True):
request = self.factory.delete(
"/", data={"entity_ids": [self.entity.pk]}, **self.extra
)
response = self.view(request, pk=self.entity_list.pk)
self.entity.refresh_from_db()
self.entity_list.refresh_from_db()

self.assertEqual(response.status_code, 204)
self.assertEqual(self.entity.deleted_at, date)
self.assertEqual(self.entity.deleted_by, self.user)
self.assertEqual(self.entity_list.num_entities, 0)
self.assertEqual(cache.get(f"elist-num-entities-{self.entity_list.pk}"), 0)
self.assertEqual(
self.entity_list.last_entity_update_time, self.entity.date_modified
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ def test_member_added_to_org_with_correct_perms(self):
)
response = view(request, user="denoinc")
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data, ["denoinc", "aboy"])
self.assertCountEqual(response.data, ["denoinc", "aboy"])

project_view = ProjectViewSet.as_view({"get": "retrieve"})
request = self.factory.get(
Expand Down
10 changes: 8 additions & 2 deletions onadata/apps/logger/models/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import uuid
import importlib

from django.contrib.auth import get_user_model
from django.db import models, transaction
Expand Down Expand Up @@ -40,8 +41,13 @@ def soft_delete(self, deleted_by=None):
self.deleted_at = deletion_time
self.deleted_by = deleted_by
self.save(update_fields=["deleted_at", "deleted_by"])
self.entity_list.num_entities = models.F("num_entities") - 1
self.entity_list.save()
# Avoid cyclic dependency errors
logger_tasks = importlib.import_module("onadata.apps.logger.tasks")
transaction.on_commit(
lambda: logger_tasks.dec_elist_num_entities_async.delay(
self.entity_list.pk
)
)

class Meta(BaseModel.Meta):
app_label = "logger"
Expand Down
20 changes: 9 additions & 11 deletions onadata/apps/logger/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
"""
from django.contrib.contenttypes.models import ContentType
from django.db import transaction
from django.db.models import F
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.utils import timezone

from onadata.apps.logger.models import Entity, EntityList, Instance, SubmissionReview
from onadata.apps.logger.models.xform import clear_project_cache
from onadata.apps.logger.tasks import set_entity_list_perms_async
from onadata.apps.logger.tasks import (
dec_elist_num_entities_async,
inc_elist_num_entities_async,
set_entity_list_perms_async,
)
from onadata.apps.main.models.meta_data import MetaData
from onadata.libs.utils.logger_tools import create_or_update_entity_from_instance

Expand Down Expand Up @@ -49,21 +52,16 @@ def increment_entity_list_num_entities(sender, instance, created=False, **kwargs
entity_list = instance.entity_list

if created:
# Using Queryset.update ensures we do not call the model's save method and
# signals
EntityList.objects.filter(pk=entity_list.pk).update(
num_entities=F("num_entities") + 1
transaction.on_commit(
lambda: inc_elist_num_entities_async.delay(entity_list.pk)
)


@receiver(post_delete, sender=Entity, dispatch_uid="update_enti_el_dec_num_entities")
def decrement_entity_list_num_entities(sender, instance, **kwargs):
"""Decrement EntityList `num_entities`"""
entity_list = instance.entity_list
# Using Queryset.update ensures we do not call the model's save method and
# signals
EntityList.objects.filter(pk=entity_list.pk).update(
num_entities=F("num_entities") - 1
transaction.on_commit(
lambda: dec_elist_num_entities_async.delay(instance.entity_list.pk)
)


Expand Down
83 changes: 65 additions & 18 deletions onadata/apps/logger/tasks.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
# pylint: disable=import-error,ungrouped-imports
"""Module for logger tasks"""
"""
Asynchronous tasks for the logger app
"""
import logging

from django.core.cache import cache
from django.contrib.auth import get_user_model
from django.db import DatabaseError
from multidb.pinning import use_master

from onadata.apps.logger.models import Entity, EntityList, Project
from onadata.celeryapp import app
from onadata.libs.utils.cache_tools import PROJECT_DATE_MODIFIED_CACHE, safe_delete
from onadata.libs.utils.cache_tools import (
PROJECT_DATE_MODIFIED_CACHE,
safe_delete,
)
from onadata.libs.utils.project_utils import set_project_perms_to_object
from onadata.libs.utils.logger_tools import soft_delete_entities_bulk
from onadata.libs.utils.logger_tools import (
commit_cached_elist_num_entities,
dec_elist_num_entities,
inc_elist_num_entities,
soft_delete_entities_bulk,
)


logger = logging.getLogger(__name__)
Expand All @@ -24,14 +35,15 @@ def set_entity_list_perms_async(entity_list_id):
Args:
pk (int): Primary key for EntityList
"""
try:
entity_list = EntityList.objects.get(pk=entity_list_id)
with use_master:
try:
entity_list = EntityList.objects.get(pk=entity_list_id)

except EntityList.DoesNotExist as err:
logger.exception(err)
return
except EntityList.DoesNotExist as err:
logger.exception(err)
return

set_project_perms_to_object(entity_list, entity_list.project)
set_project_perms_to_object(entity_list, entity_list.project)


@app.task(retry_backoff=3, autoretry_for=(DatabaseError, ConnectionError))
Expand Down Expand Up @@ -59,15 +71,50 @@ def delete_entities_bulk_async(entity_pks: list[int], username: str | None = Non
entity_pks (list(int)): Primary keys of Entities to be deleted
username (str): Username of the user initiating the delete
"""
entity_qs = Entity.objects.filter(pk__in=entity_pks, deleted_at__isnull=True)
deleted_by = None
with use_master:
entity_qs = Entity.objects.filter(pk__in=entity_pks, deleted_at__isnull=True)
deleted_by = None

try:
if username is not None:
deleted_by = User.objects.get(username=username)

except User.DoesNotExist as exc:
logger.exception(exc)

else:
soft_delete_entities_bulk(entity_qs, deleted_by)


try:
if username is not None:
deleted_by = User.objects.get(username=username)
@app.task(retry_backoff=3, autoretry_for=(DatabaseError, ConnectionError))
def commit_cached_elist_num_entities_async():
"""Commit cached EntityList `num_entities` counter to the database

Call this task periodically, such as in a background task to ensure
cached counters for EntityList `num_entities` are commited to the
database.

except User.DoesNotExist as exc:
logger.exception(exc)
Cached counters have no expiry, so it is essential to ensure that
this task is called periodically.
"""
commit_cached_elist_num_entities()

else:
soft_delete_entities_bulk(entity_qs, deleted_by)

@app.task(retry_backoff=3, autoretry_for=(DatabaseError, ConnectionError))
def inc_elist_num_entities_async(elist_pk: int):
"""Increment EntityList `num_entities` counter asynchronously

Args:
elist_pk (int): Primary key for EntityList
"""
inc_elist_num_entities(elist_pk)


@app.task(retry_backoff=3, autoretry_for=(DatabaseError, ConnectionError))
def dec_elist_num_entities_async(elist_pk: int) -> None:
"""Decrement EntityList `num_entities` counter asynchronously

Args:
elist_pk (int): Primary key for EntityList
"""
dec_elist_num_entities(elist_pk)
14 changes: 7 additions & 7 deletions onadata/apps/logger/tests/models/test_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class EntityTestCase(TestBase):

def setUp(self):
super().setUp()

self.mocked_now = datetime(2023, 11, 8, 13, 17, 0, tzinfo=pytz.utc)
self.project = get_user_default_project(self.user)
self.entity_list = EntityList.objects.create(name="trees", project=self.project)
Expand Down Expand Up @@ -57,25 +58,25 @@ def test_optional_fields(self):
self.assertEqual(entity.json, {})
self.assertIsInstance(entity.uuid, uuid.UUID)

@patch("onadata.apps.logger.tasks.dec_elist_num_entities_async.delay")
@patch("django.utils.timezone.now")
def test_soft_delete(self, mock_now):
def test_soft_delete(self, mock_now, mock_dec):
"""Soft delete works"""
mock_now.return_value = self.mocked_now
entity = Entity.objects.create(entity_list=self.entity_list)
self.entity_list.refresh_from_db()

self.assertEqual(self.entity_list.num_entities, 1)
self.assertIsNone(entity.deleted_at)
self.assertIsNone(entity.deleted_by)

entity.soft_delete(self.user)
self.entity_list.refresh_from_db()
entity.refresh_from_db()

self.assertEqual(self.entity_list.num_entities, 0)
self.assertEqual(self.entity_list.last_entity_update_time, self.mocked_now)
self.assertEqual(entity.deleted_at, self.mocked_now)
self.assertEqual(entity.deleted_at, self.mocked_now)
mock_dec.assert_called_once_with(self.entity_list.pk)

# Soft deleted item cannot be soft deleted again
deleted_at = timezone.now()
Expand All @@ -95,20 +96,19 @@ def test_soft_delete(self, mock_now):
self.assertEqual(entity3.deleted_at, self.mocked_now)
self.assertIsNone(entity3.deleted_by)

def test_hard_delete(self):
@patch("onadata.apps.logger.tasks.dec_elist_num_entities_async.delay")
def test_hard_delete(self, mock_dec):
"""Hard deleting updates dataset info"""
entity = Entity.objects.create(entity_list=self.entity_list)
self.entity_list.refresh_from_db()
old_last_entity_update_time = self.entity_list.last_entity_update_time

self.assertEqual(self.entity_list.num_entities, 1)

entity.delete()
self.entity_list.refresh_from_db()
new_last_entity_update_time = self.entity_list.last_entity_update_time

self.assertEqual(self.entity_list.num_entities, 0)
self.assertTrue(old_last_entity_update_time < new_last_entity_update_time)
mock_dec.assert_called_once_with(self.entity_list.pk)

def test_entity_list_uuid_unique(self):
"""`entity_list` and `uuid` are unique together"""
Expand Down
Loading