Skip to content

Commit

Permalink
Merge pull request #159 from bjester/deferred-fk-checks
Browse files Browse the repository at this point in the history
Defer foreign key checks for bulk processing during deserialization
  • Loading branch information
bjester authored Apr 28, 2022
2 parents c70d26c + ec912bc commit 7c497ec
Show file tree
Hide file tree
Showing 12 changed files with 735 additions and 139 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ List of the most important changes for each release.
## 0.6.11
- Added deferred processing of foreign keys to allow bulk processing and to improve performance.
- Eliminated extraneous SQL queries for the transfer session when querying for buffers.
- Added database index to Store's partition field to improve querying performance.

## 0.6.10
- Fixes Django migration issue introduced in 0.6.7 allowing nullable fields with PostgreSQL backends
Expand Down
104 changes: 79 additions & 25 deletions morango/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import json
import logging
import uuid
from collections import defaultdict
from collections import namedtuple
from functools import reduce

from django.core import exceptions
Expand Down Expand Up @@ -450,15 +452,16 @@ class Meta:
models.Index(fields=['partition'], name='idx_morango_store_partition'),
]

def _deserialize_store_model(self, fk_cache): # noqa: C901
def _deserialize_store_model(self, fk_cache, defer_fks=False): # noqa: C901
"""
When deserializing a store model, we look at the deleted flags to know if we should delete the app model.
Upon loading the app model in memory we validate the app models fields, if any errors occurs we follow
foreign key relationships to see if the related model has been deleted to propagate that deletion to the target app model.
We return:
None => if the model was deleted successfully
model => if the model validates successfully
None => if the model was deleted successfully
model => if the model validates successfully
"""
deferred_fks = {}
klass_model = syncable_models.get_model(self.profile, self.model_name)
# if store model marked as deleted, attempt to delete in app layer
if self.deleted:
Expand All @@ -470,7 +473,7 @@ def _deserialize_store_model(self, fk_cache): # noqa: C901
pass
else:
klass_model.objects.filter(id=self.id).delete()
return None
return None, deferred_fks
else:
# load model into memory
app_model = klass_model.deserialize(json.loads(self.serialized))
Expand All @@ -479,10 +482,12 @@ def _deserialize_store_model(self, fk_cache): # noqa: C901
app_model._morango_dirty_bit = False

try:

# validate and return the model
app_model.cached_clean_fields(fk_cache)
return app_model
if defer_fks:
deferred_fks = app_model.deferred_clean_fields()
else:
app_model.cached_clean_fields(fk_cache)
return app_model, deferred_fks

except (exceptions.ValidationError, exceptions.ObjectDoesNotExist) as e:

Expand All @@ -492,23 +497,24 @@ def _deserialize_store_model(self, fk_cache): # noqa: C901
)
)

# check FKs in store to see if any of those models were deleted or hard_deleted to propagate to this model
fk_ids = [
getattr(app_model, field.attname)
for field in app_model._meta.fields
if isinstance(field, ForeignKey)
]
for fk_id in fk_ids:
try:
st_model = Store.objects.get(id=fk_id)
if st_model.deleted:
# if hard deleted, propagate to store model
if st_model.hard_deleted:
app_model._update_hard_deleted_models()
app_model._update_deleted_models()
return None
except Store.DoesNotExist:
pass
if not defer_fks and isinstance(e, exceptions.ObjectDoesNotExist):
# check FKs in store to see if any of those models were deleted or hard_deleted to propagate to this model
fk_ids = [
getattr(app_model, field.attname)
for field in app_model._meta.fields
if isinstance(field, ForeignKey)
]
for fk_id in fk_ids:
try:
st_model = Store.objects.get(id=fk_id)
if st_model.deleted:
# if hard deleted, propagate to store model
if st_model.hard_deleted:
app_model._update_hard_deleted_models()
app_model._update_deleted_models()
return None, {}
except Store.DoesNotExist:
pass

# if we got here, it means the validation error wasn't handled by propagating deletion, so re-raise it
raise e
Expand Down Expand Up @@ -771,6 +777,9 @@ class RecordMaxCounterBuffer(AbstractCounter):
model_uuid = UUIDField(db_index=True)


ForeignKeyReference = namedtuple("ForeignKeyReference", ["from_field", "from_pk", "to_pk"])


class SyncableModel(UUIDModelMixin):
"""
``SyncableModel`` is the base model class for syncing. Other models inherit from this class if they want to make
Expand Down Expand Up @@ -839,13 +848,21 @@ def delete(
return collector.delete()

def cached_clean_fields(self, fk_lookup_cache):
"""
Immediately validates all fields, but uses a cache for foreign key (FK) lookups to reduce
repeated queries for many records with the same FK
:param fk_lookup_cache: A dictionary to use as a cache to prevent querying the database if a
FK exists in the cache, having already been validated
"""
excluded_fields = []
fk_fields = [
field for field in self._meta.fields if isinstance(field, models.ForeignKey)
]

for f in fk_fields:
raw_value = getattr(self, f.attname)
key = "morango_{id}_{db_table}_foreignkey".format(
key = "{id}_{db_table}".format(
db_table=f.related_model._meta.db_table, id=raw_value
)
try:
Expand All @@ -859,7 +876,44 @@ def cached_clean_fields(self, fk_lookup_cache):
else:
fk_lookup_cache[key] = 1
excluded_fields.append(f.name)

self.clean_fields(exclude=excluded_fields)

# after cleaning, we can confidently set ourselves in the fk_lookup_cache
self_key = "{id}_{db_table}".format(
db_table=self._meta.db_table,
id=self.id,
)
fk_lookup_cache[self_key] = 1

def deferred_clean_fields(self):
"""
Calls `.clean_fields()` but excludes all foreign key fields and instead returns them as a
dictionary for deferred batch processing
:return: A dictionary containing lists of `ForeignKeyReference`s keyed by the name of the
model being referenced by the FK
"""
excluded_fields = []
deferred_fks = defaultdict(list)
for field in self._meta.fields:
if not isinstance(field, models.ForeignKey):
continue
# by not excluding the field if it's null, the default validation logic will apply
# and should raise a ValidationError if the FK field is not nullable
if getattr(self, field.attname) is None:
continue
excluded_fields.append(field.name)
deferred_fks[field.related_model._meta.verbose_name].append(
ForeignKeyReference(
from_field=field.attname,
from_pk=self.pk,
to_pk=getattr(self, field.attname)
)
)

self.clean_fields(exclude=excluded_fields)
return deferred_fks

def serialize(self):
"""All concrete fields of the ``SyncableModel`` subclass, except for those specifically blacklisted, are returned in a dict."""
Expand Down
44 changes: 44 additions & 0 deletions morango/sync/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,38 @@


class BaseSQLWrapper(object):
create_temporary_table_template = "CREATE TEMP TABLE {name} ({fields})"

def __init__(self, connection):
self.connection = connection

def _create_placeholder_list(self, fields, db_values):
# number of rows to update
num_of_rows = len(db_values) // len(fields)
# create '%s' placeholders for a single row
placeholder_tuple = tuple(["%s" for _ in range(len(fields))])
# create list of the '%s' tuple placeholders based on number of rows to update
return [str(placeholder_tuple) for _ in range(num_of_rows)]

def _bulk_full_record_upsert(self, cursor, table_name, fields, db_values):
raise NotImplementedError("Subclass must implement this method.")

def _bulk_insert(self, cursor, table_name, fields, db_values):
placeholder_str = ", ".join(
self._create_placeholder_list(fields, db_values)
).replace("'", "")
fields_str = str(tuple(str(f.attname) for f in fields)).replace("'", "")
insert = """
INSERT INTO {table_name} {fields}
VALUES {placeholder_str}
""".format(
table_name=table_name, fields=fields_str, placeholder_str=placeholder_str
)
cursor.execute(insert, db_values)

def _bulk_update(self, cursor, table_name, fields, db_values):
raise NotImplementedError("Subclass must implement this method.")

def _dequeuing_delete_rmcb_records(self, cursor, transfersession_id):
# delete all RMCBs which are a reverse FF (store version newer than buffer version)
delete_rmcb_records = """DELETE FROM {rmcb}
Expand Down Expand Up @@ -132,3 +164,15 @@ def _dequeuing_delete_remaining_buffer(self, cursor, transfersession_id):
buffer=Buffer._meta.db_table, transfer_session_id=transfersession_id
)
cursor.execute(delete_remaining_buffer)

def _create_temporary_table(self, cursor, name, field_sqls, fields_params):
"""
:param cursor: The database connection cursor
:param name: The str name of the temp table
:param field_sqls: A list of SQL strings representing the fields
:param fields_params: A list of SQL parameters if necessary for the fields SQL
"""
sql = self.create_temporary_table_template.format(
name=name, fields=", ".join(field_sqls)
)
cursor.execute(sql, fields_params)
107 changes: 73 additions & 34 deletions morango/sync/backends/postgres.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from django.db import connection

from .base import BaseSQLWrapper
from .utils import get_pk_field
from morango.models.core import Buffer
from morango.models.core import RecordMaxCounter
from morango.models.core import RecordMaxCounterBuffer
Expand All @@ -9,58 +8,98 @@

class SQLWrapper(BaseSQLWrapper):
backend = "postgresql"
create_temporary_table_template = (
"CREATE TEMP TABLE {name} ({fields}) ON COMMIT DROP"
)

def _bulk_insert_into_app_models(
self, cursor, app_model, fields, db_values, placeholder_list
):
def _prepare_with_values(self, name, fields, db_values):
placeholder_list = self._create_placeholder_list(fields, db_values)
# convert this list to a string to be passed into raw sql query
placeholder_str = ", ".join(placeholder_list).replace("'", "")
# cast the values in the SET statement to their appropiate postgres db types
set_casted_values = ", ".join(
return """
WITH {name} {fields} as
(
VALUES {placeholder_str}
)
""".format(
name=name,
fields=str(tuple(str(f.column) for f in fields)).replace("'", ""),
placeholder_str=placeholder_str,
)

def _prepare_casted_fields(self, fields):
return ", ".join(
map(
lambda f: "{f} = nv.{f}::{type}".format(
f=f.attname, type=f.rel_db_type(connection)
lambda f: "{f}::{type}".format(
f=f.column, type=f.rel_db_type(self.connection)
),
fields,
)
)
# cast the values in the SELECT statement to their appropiate posgtres db types
select_casted_values = ", ".join(

def _prepare_set_casted_values(self, fields, source_table):
return ", ".join(
map(
lambda f: "{f}::{type}".format(
f=f.attname, type=f.rel_db_type(connection)
lambda f: "{f} = {src}.{f}::{type}".format(
f=f.attname,
type=f.rel_db_type(self.connection),
src=source_table,
),
fields,
)
)
# cast the pk to the correct field type for this model
pk = [f for f in fields if f.primary_key][0]
fields = str(tuple(str(f.attname) for f in fields)).replace("'", "")

insert = """
WITH new_values {fields} as
(
VALUES {placeholder_str}
),
def _bulk_full_record_upsert(self, cursor, table_name, fields, db_values):
pk = get_pk_field(fields)

cte_name = "new_values"
upsert = """
{cte},
updated as
(
UPDATE {app_model} model
UPDATE {table_name} model
SET {set_values}
FROM new_values nv
WHERE model.id = nv.id::{id_type}
returning model.*
FROM {cte_name} cte
WHERE model.id = cte.{pk_field}::{pk_type}
RETURNING model.{pk_field}
)
INSERT INTO {app_model} {fields}
INSERT INTO {table_name} {fields}
SELECT {select_fields}
FROM new_values ut
WHERE ut.id::{id_type} not in (SELECT id FROM updated)
FROM {cte_name} cte
WHERE cte.{pk_field}::{pk_type} NOT IN (SELECT {pk_field} FROM updated)
""".format(
app_model=app_model,
fields=fields,
placeholder_str=placeholder_str,
set_values=set_casted_values,
select_fields=select_casted_values,
id_type=pk.rel_db_type(connection),
cte=self._prepare_with_values(cte_name, fields, db_values),
cte_name=cte_name,
table_name=table_name,
fields=str(tuple(str(f.column) for f in fields)).replace("'", ""),
set_values=self._prepare_set_casted_values(fields, "cte"),
select_fields=self._prepare_casted_fields(fields),
pk_field=pk.column,
pk_type=pk.rel_db_type(self.connection),
)
# use DB-APIs parameter substitution (2nd parameter expects a sequence)
cursor.execute(upsert, db_values)

def _bulk_update(self, cursor, table_name, fields, db_values):
pk = get_pk_field(fields)

insert = """
{cte}
UPDATE {table_name} model
SET {set_values}
FROM {cte_name} cte
WHERE model.{pk_field} = cte.{pk_field}::{pk_type}
"""

cte_name = "new_values"
insert = insert.format(
cte=self._prepare_with_values(cte_name, fields, db_values),
cte_name=cte_name,
table_name=table_name,
fields=str(tuple(str(f.column) for f in fields)).replace("'", ""),
set_values=self._prepare_set_casted_values(fields, "cte"),
pk_field=pk.column,
pk_type=pk.rel_db_type(self.connection),
)
# use DB-APIs parameter substitution (2nd parameter expects a sequence)
cursor.execute(insert, db_values)
Expand Down
Loading

0 comments on commit 7c497ec

Please sign in to comment.