Skip to content

Commit

Permalink
feat: ref annos now soft delete/update
Browse files Browse the repository at this point in the history
  • Loading branch information
dlbrittain committed Jun 13, 2024
1 parent 6fe2083 commit e5fd779
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 10 deletions.
93 changes: 91 additions & 2 deletions dynamicannotationdb/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ def create_table(
with_crud_columns=with_crud_columns,
)

if hasattr(AnnotationModel, "target_id") and reference_table:
reference_table_name = self.db.get_table_sql_metadata(reference_table)
logging.info(
f"{table_name} is targeting reference table: {reference_table_name}"
)

self.create_reference_update_trigger(
table_name, reference_table, AnnotationModel
)

self.db.base.metadata.tables[AnnotationModel.__name__].create(
bind=self.db.engine
)
Expand Down Expand Up @@ -141,6 +151,76 @@ def create_table(
)
return table_name

def create_reference_update_trigger(self, table_name, reference_table, model):
func_name = f"{table_name}_update_reference_id"

column_names = [
col.name
for col in model.__table__.columns
if col.name not in ["id", "target_id"]
]
column_names_str = ", ".join(column_names)

func = DDL(
f"""
CREATE OR REPLACE FUNCTION {func_name}()
RETURNS TRIGGER
AS $func$
DECLARE
new_id INTEGER;
BEGIN
IF EXISTS
(SELECT 1
FROM information_schema.columns
WHERE table_name='{reference_table}'
AND column_name='superceded_id') THEN
IF NEW.superceded_id IS NOT NULL THEN
-- Copy the current row from {table_name} without the id column
INSERT INTO {table_name} ({column_names_str}, target_id)
SELECT {column_names_str}, {table_name}.target_id
FROM {table_name}
WHERE {table_name}.target_id = OLD.id
RETURNING id INTO new_id;
-- Update the new row's target_id to new.superceded_id
UPDATE {table_name}
SET target_id = NEW.superceded_id
WHERE id = new_id;
-- Update the original row's superceded_id and valid column if it exists
UPDATE {table_name}
SET superceded_id = new_id,
valid = (CASE WHEN EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name='{table_name}' AND column_name='valid') THEN FALSE ELSE valid END),
deleted = (CASE WHEN EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name='{table_name}' AND column_name='deleted') THEN timezone('utc', now()) ELSE deleted END)
WHERE target_id = OLD.id;
END IF;
RETURN NEW;
ELSE
RETURN NULL;
END IF;
END;
$func$ LANGUAGE plpgsql;
""")

trigger = DDL(
f"""
CREATE TRIGGER update_{table_name}_target_id AFTER UPDATE OF superceded_id ON {reference_table}
FOR EACH ROW EXECUTE PROCEDURE {func_name}();
""")

event.listen(
model.__table__,
"after_create",
func.execute_if(dialect="postgresql"),
)

event.listen(
model.__table__,
"after_create",
trigger.execute_if(dialect="postgresql"),
)
return True

def update_table_metadata(
self,
table_name: str,
Expand Down Expand Up @@ -273,7 +353,6 @@ def insert_annotations(self, table_name: str, annotations: List[dict]):

formatted_anno_data = []
for annotation in annotations:

annotation_data, __ = self.schema.split_flattened_schema_data(
schema_type, annotation
)
Expand Down Expand Up @@ -353,7 +432,7 @@ def update_annotation(self, table_name: str, annotation: dict) -> str:
table_name : str
name of targeted table to update annotations
annotation : dict
new data for that annotation, allows for partial updates but
new data for that annotation, allows for partial updates but
requires an 'id' field to target the row
Returns
Expand Down Expand Up @@ -381,6 +460,16 @@ def update_annotation(self, table_name: str, annotation: dict) -> str:
raise f"No result found for {anno_id}. Error: {e}" from e

if old_anno.superceded_id:
# find the latest annotation by finding the annotation with no superceded_id
while old_anno.superceded_id:
old_anno = (
self.db.cached_session.query(AnnotationModel)
.filter(AnnotationModel.id == old_anno.superceded_id)
.one()
)
logging.debug(f"Found superceded annotation: {old_anno.id} {old_anno.superceded_id}")
logging.debug(f"Found latest annotation: {old_anno.id}")

raise UpdateAnnotationError(anno_id, old_anno.superceded_id)

# Merge old data with new changes
Expand Down
30 changes: 22 additions & 8 deletions tests/test_annotation.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
import pytest

from emannotationschemas import type_mapping
from emannotationschemas.schemas.base import ReferenceAnnotation

from dynamicannotationdb.errors import UpdateAnnotationError


def test_create_table(dadb_interface, annotation_metadata):
table_name = annotation_metadata["table_name"]
Expand Down Expand Up @@ -257,12 +258,19 @@ def test_update_reference_annotation(dadb_interface, annotation_metadata):
"id": 1,
"bouton_type": "basmati",
}

with pytest.raises(UpdateAnnotationError) as e:
# UpdateAnnotationError('Annotation with ID 1 has already been superseded by annotation ID 2, update annotation ID 2 instead')
update_map = dadb_interface.annotation.update_annotation(table_name, test_data)

# lets try again with the correct id
test_data = {
"id": 2,
"bouton_type": "basmati",
}
update_map = dadb_interface.annotation.update_annotation(table_name, test_data)

assert update_map == {1: 2}
assert update_map == {2: 3}
# return values from newly updated row
test_data = dadb_interface.annotation.get_annotations(table_name, [2])
test_data = dadb_interface.annotation.get_annotations(table_name, [3])
assert test_data[0]["bouton_type"] == "basmati"


Expand All @@ -273,19 +281,25 @@ def test_nested_update_reference_annotation(dadb_interface, annotation_metadata)
"tag": "here is a updated tag",
"id": 1,
}
with pytest.raises(UpdateAnnotationError) as e:
update_map = dadb_interface.annotation.update_annotation(table_name, test_data)

test_data = {
"tag": "here is a updated tag",
"id": 3,
}
update_map = dadb_interface.annotation.update_annotation(table_name, test_data)

assert update_map == {1: 2}
assert update_map == {3: 4}
# return values from newly updated row
test_data = dadb_interface.annotation.get_annotations(table_name, [2])
test_data = dadb_interface.annotation.get_annotations(table_name, [4])
assert test_data[0]["tag"] == "here is a updated tag"


def test_delete_reference_annotation(dadb_interface, annotation_metadata):
table_name = "presynaptic_bouton_types"

ids_to_delete = [2]
ids_to_delete = [1,2]
is_deleted = dadb_interface.annotation.delete_annotation(table_name, ids_to_delete)

assert is_deleted == ids_to_delete
Expand Down

0 comments on commit e5fd779

Please sign in to comment.