diff --git a/dynamicannotationdb/annotation.py b/dynamicannotationdb/annotation.py index 8f451e1..20bcd75 100644 --- a/dynamicannotationdb/annotation.py +++ b/dynamicannotationdb/annotation.py @@ -109,6 +109,16 @@ def create_table( with_crud_columns=with_crud_columns, ) + if hasattr(AnnotationModel, "target_id") and reference_table: + reference_table_name = self.db.get_table_sql_metadata(reference_table) + logging.info( + f"{table_name} is targeting reference table: {reference_table_name}" + ) + + self.create_reference_update_trigger( + table_name, reference_table, AnnotationModel + ) + self.db.base.metadata.tables[AnnotationModel.__name__].create( bind=self.db.engine ) @@ -141,6 +151,76 @@ def create_table( ) return table_name + def create_reference_update_trigger(self, table_name, reference_table, model): + func_name = f"{table_name}_update_reference_id" + + column_names = [ + col.name + for col in model.__table__.columns + if col.name not in ["id", "target_id"] + ] + column_names_str = ", ".join(column_names) + + func = DDL( + f""" + CREATE OR REPLACE FUNCTION {func_name}() + RETURNS TRIGGER + AS $func$ + DECLARE + new_id INTEGER; + BEGIN + IF EXISTS + (SELECT 1 + FROM information_schema.columns + WHERE table_name='{reference_table}' + AND column_name='superceded_id') THEN + IF NEW.superceded_id IS NOT NULL THEN + -- Copy the current row from {table_name} without the id column + INSERT INTO {table_name} ({column_names_str}, target_id) + SELECT {column_names_str}, {table_name}.target_id + FROM {table_name} + WHERE {table_name}.target_id = OLD.id + RETURNING id INTO new_id; + + -- Update the new row's target_id to new.superceded_id + UPDATE {table_name} + SET target_id = NEW.superceded_id + WHERE id = new_id; + + -- Update the original row's superceded_id and valid column if it exists + UPDATE {table_name} + SET superceded_id = new_id, + valid = (CASE WHEN EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name='{table_name}' AND column_name='valid') THEN FALSE ELSE valid END), + deleted = (CASE WHEN EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name='{table_name}' AND column_name='deleted') THEN timezone('utc', now()) ELSE deleted END) + WHERE target_id = OLD.id; + END IF; + RETURN NEW; + ELSE + RETURN NULL; + END IF; + END; + $func$ LANGUAGE plpgsql; + """) + + trigger = DDL( + f""" + CREATE TRIGGER update_{table_name}_target_id AFTER UPDATE OF superceded_id ON {reference_table} + FOR EACH ROW EXECUTE PROCEDURE {func_name}(); + """) + + event.listen( + model.__table__, + "after_create", + func.execute_if(dialect="postgresql"), + ) + + event.listen( + model.__table__, + "after_create", + trigger.execute_if(dialect="postgresql"), + ) + return True + def update_table_metadata( self, table_name: str, @@ -273,7 +353,6 @@ def insert_annotations(self, table_name: str, annotations: List[dict]): formatted_anno_data = [] for annotation in annotations: - annotation_data, __ = self.schema.split_flattened_schema_data( schema_type, annotation ) @@ -353,7 +432,7 @@ def update_annotation(self, table_name: str, annotation: dict) -> str: table_name : str name of targeted table to update annotations annotation : dict - new data for that annotation, allows for partial updates but + new data for that annotation, allows for partial updates but requires an 'id' field to target the row Returns @@ -381,6 +460,16 @@ def update_annotation(self, table_name: str, annotation: dict) -> str: raise f"No result found for {anno_id}. Error: {e}" from e if old_anno.superceded_id: + # find the latest annotation by finding the annotation with no superceded_id + while old_anno.superceded_id: + old_anno = ( + self.db.cached_session.query(AnnotationModel) + .filter(AnnotationModel.id == old_anno.superceded_id) + .one() + ) + logging.debug(f"Found superceded annotation: {old_anno.id} {old_anno.superceded_id}") + logging.debug(f"Found latest annotation: {old_anno.id}") + raise UpdateAnnotationError(anno_id, old_anno.superceded_id) # Merge old data with new changes diff --git a/tests/test_annotation.py b/tests/test_annotation.py index 348aada..9c10abc 100644 --- a/tests/test_annotation.py +++ b/tests/test_annotation.py @@ -1,9 +1,10 @@ import logging import pytest - from emannotationschemas import type_mapping from emannotationschemas.schemas.base import ReferenceAnnotation +from dynamicannotationdb.errors import UpdateAnnotationError + def test_create_table(dadb_interface, annotation_metadata): table_name = annotation_metadata["table_name"] @@ -257,12 +258,19 @@ def test_update_reference_annotation(dadb_interface, annotation_metadata): "id": 1, "bouton_type": "basmati", } - + with pytest.raises(UpdateAnnotationError) as e: + # UpdateAnnotationError('Annotation with ID 1 has already been superseded by annotation ID 2, update annotation ID 2 instead') + update_map = dadb_interface.annotation.update_annotation(table_name, test_data) + + # lets try again with the correct id + test_data = { + "id": 2, + "bouton_type": "basmati", + } update_map = dadb_interface.annotation.update_annotation(table_name, test_data) - - assert update_map == {1: 2} + assert update_map == {2: 3} # return values from newly updated row - test_data = dadb_interface.annotation.get_annotations(table_name, [2]) + test_data = dadb_interface.annotation.get_annotations(table_name, [3]) assert test_data[0]["bouton_type"] == "basmati" @@ -273,19 +281,25 @@ def test_nested_update_reference_annotation(dadb_interface, annotation_metadata) "tag": "here is a updated tag", "id": 1, } + with pytest.raises(UpdateAnnotationError) as e: + update_map = dadb_interface.annotation.update_annotation(table_name, test_data) + test_data = { + "tag": "here is a updated tag", + "id": 3, + } update_map = dadb_interface.annotation.update_annotation(table_name, test_data) - assert update_map == {1: 2} + assert update_map == {3: 4} # return values from newly updated row - test_data = dadb_interface.annotation.get_annotations(table_name, [2]) + test_data = dadb_interface.annotation.get_annotations(table_name, [4]) assert test_data[0]["tag"] == "here is a updated tag" def test_delete_reference_annotation(dadb_interface, annotation_metadata): table_name = "presynaptic_bouton_types" - ids_to_delete = [2] + ids_to_delete = [1,2] is_deleted = dadb_interface.annotation.delete_annotation(table_name, ids_to_delete) assert is_deleted == ids_to_delete