From 2a5bc74bca802dc8e4f588f0f7e1b80cf63978f5 Mon Sep 17 00:00:00 2001 From: CBroz1 Date: Tue, 27 Aug 2024 16:33:54 -0500 Subject: [PATCH] Remove delete_downstream. Update tests --- src/spyglass/utils/dj_merge_tables.py | 2 +- src/spyglass/utils/dj_mixin.py | 232 +++----------------------- tests/common/test_position.py | 6 - tests/conftest.py | 13 -- tests/container.py | 6 +- tests/position/test_trodes.py | 2 - tests/utils/test_merge.py | 6 +- tests/utils/test_mixin.py | 125 ++++++++------ 8 files changed, 102 insertions(+), 290 deletions(-) diff --git a/src/spyglass/utils/dj_merge_tables.py b/src/spyglass/utils/dj_merge_tables.py index 4ffbf38f4..6d7e51d5c 100644 --- a/src/spyglass/utils/dj_merge_tables.py +++ b/src/spyglass/utils/dj_merge_tables.py @@ -29,7 +29,7 @@ def is_merge_table(table): def trim_def(definition): return re_sub( r"\n\s*\n", "\n", re_sub(r"#.*\n", "\n", definition.strip()) - ) + ).replace(" ", "") if isinstance(table, str): table = dj.FreeTable(dj.conn(), table) diff --git a/src/spyglass/utils/dj_mixin.py b/src/spyglass/utils/dj_mixin.py index 31fef07af..b44f16b1b 100644 --- a/src/spyglass/utils/dj_mixin.py +++ b/src/spyglass/utils/dj_mixin.py @@ -6,16 +6,14 @@ from os import environ from re import match as re_match from time import time -from typing import Dict, List, Union +from typing import List import datajoint as dj from datajoint.condition import make_condition from datajoint.errors import DataJointError from datajoint.expression import QueryExpression -from datajoint.logging import logger as dj_logger from datajoint.table import Table -from datajoint.utils import get_master, to_camel_case, user_choice -from networkx import NetworkXError +from datajoint.utils import to_camel_case from packaging.version import parse as version_parse from pandas import DataFrame from pymysql.err import DataError @@ -52,14 +50,6 @@ class SpyglassMixin: Fetch NWBFile object from relevant table. Uses either a foreign key to a NWBFile table (including AnalysisNwbfile) or a _nwb_table attribute to determine which table to use. - delte_downstream_merge(restriction=None, dry_run=True, reload_cache=False) - Delete downstream merge table entries associated with restriction. - Requires caching of merge tables and links, which is slow on first call. - `restriction` can be set to a string to restrict the delete. `dry_run` - can be set to False to commit the delete. `reload_cache` can be set to - True to reload the merge cache. - ddp(*args, **kwargs) - Alias for delete_downstream_parts cautious_delete(force_permission=False, *args, **kwargs) Check user permissions before deleting table rows. Permission is granted to users listed as admin in LabMember table or to users on a team with @@ -68,8 +58,6 @@ class SpyglassMixin: delete continues. If the Session has no experimenter, or if the user is not on a team with the Session experimenter(s), a PermissionError is raised. `force_permission` can be set to True to bypass permission check. - cdel(*args, **kwargs) - Alias for cautious_delete. """ # _nwb_table = None # NWBFile table class, defined at the table level @@ -134,15 +122,17 @@ def file_like(self, name=None, **kwargs): def find_insert_fail(self, key): """Find which parent table is causing an IntergrityError on insert.""" + rets = [] for parent in self.parents(as_objects=True): parent_key = { k: v for k, v in key.items() if k in parent.heading.names } parent_name = to_camel_case(parent.table_name) if query := parent & parent_key: - logger.info(f"{parent_name}:\n{query}") + rets.append(f"{parent_name}:\n{query}") else: - logger.info(f"{parent_name}: MISSING") + rets.append(f"{parent_name}: MISSING") + logger.info("\n".join(rets)) @classmethod def _safe_context(cls): @@ -298,163 +288,6 @@ def load_shared_schemas(self, additional_prefixes: list = None) -> None: for schema in schemas: dj.schema(schema[0]).connection.dependencies.load() - @cached_property - def _part_masters(self) -> set: - """Set of master tables downstream of self. - - Cache of masters in self.descendants(as_objects=True) with another - foreign key reference in the part. Used for delete_downstream_parts. - """ - self.connection.dependencies.load() - part_masters = set() - - def search_descendants(parent): - for desc_name in parent.descendants(): - if ( # Check if has master, is part - not (master := get_master(desc_name)) - or master in part_masters # already in cache - or desc_name.replace("`", "").split("_")[0] - not in SHARED_MODULES - ): - continue - desc = dj.FreeTable(self.connection, desc_name) - if not set(desc.parents()) - set([master]): # no other parent - continue - part_masters.add(master) - search_descendants(dj.FreeTable(self.connection, master)) - - try: - _ = search_descendants(self) - except NetworkXError: - try: # Attempt to import failing schema - self.load_shared_schemas() - _ = search_descendants(self) - except NetworkXError as e: - table_name = "".join(e.args[0].split("`")[1:4]) - raise ValueError(f"Please import {table_name} and try again.") - - logger.info( - f"Building part-parent cache for {self.camel_name}.\n\t" - + f"Found {len(part_masters)} downstream part tables" - ) - - return part_masters - - def _commit_downstream_delete(self, down_fts, start=None, **kwargs): - """ - Commit delete of downstream parts via down_fts. Logs with _log_delete. - - Used by both delete_downstream_parts and cautious_delete. - """ - start = start or time() - - safemode = ( - dj.config.get("safemode", True) - if kwargs.get("safemode") is None - else kwargs["safemode"] - ) - _ = kwargs.pop("safemode", None) - - ran_deletes = True - if down_fts: - for down_ft in down_fts: - dj_logger.info( - f"Spyglass: Deleting {len(down_ft)} rows from " - + f"{down_ft.full_table_name}" - ) - if ( - self._test_mode - or not safemode - or user_choice("Commit deletes?", default="no") == "yes" - ): - for down_ft in down_fts: # safemode off b/c already checked - down_ft.delete(safemode=False, **kwargs) - else: - logger.info("Delete aborted.") - ran_deletes = False - - self._log_delete(start, del_blob=down_fts if ran_deletes else None) - - return ran_deletes - - def delete_downstream_parts( - self, - restriction: str = None, - dry_run: bool = True, - reload_cache: bool = False, - disable_warning: bool = False, - return_graph: bool = False, - verbose: bool = False, - **kwargs, - ) -> List[dj.FreeTable]: - """Delete downstream merge table entries associated with restriction. - - Requires caching of merge tables and links, which is slow on first call. - - Parameters - ---------- - restriction : str, optional - Restriction to apply to merge tables. Default None. Will attempt to - use table restriction if None. - dry_run : bool, optional - If True, return list of merge part entries to be deleted. Default - True. - reload_cache : bool, optional - If True, reload merge cache. Default False. - disable_warning : bool, optional - If True, do not warn if no merge tables found. Default False. - return_graph: bool, optional - If True, return RestrGraph object used to identify downstream - tables. Default False, return list of part FreeTables. - True. If False, return dictionary of merge tables and their joins. - verbose : bool, optional - If True, call RestrGraph with verbose=True. Default False. - **kwargs : Any - Passed to datajoint.table.Table.delete. - """ - RestrGraph = self._graph_deps[1] - - start = time() - - if reload_cache: - _ = self.__dict__.pop("_part_masters", None) - - _ = self._part_masters # load cache before loading graph - restriction = restriction or self.restriction or True - - restr_graph = RestrGraph( - seed_table=self, - leaves={self.full_table_name: restriction}, - direction="down", - cascade=True, - verbose=verbose, - ) - - if return_graph: - return restr_graph - - down_fts = restr_graph.ft_from_list( - self._part_masters, sort_reverse=False - ) - - if not down_fts and not disable_warning: - logger.warning( - f"No part deletes found w/ {self.camel_name} & " - + f"{restriction}.\n\tIf this is unexpected, try importing " - + " Merge table(s) and running with `reload_cache`." - ) - - if dry_run: - return down_fts - - self._commit_downstream_delete(down_fts, start, **kwargs) - - def ddp( - self, *args, **kwargs - ) -> Union[List[QueryExpression], Dict[str, List[QueryExpression]]]: - """Alias for delete_downstream_parts.""" - return self.delete_downstream_parts(*args, **kwargs) - # ---------------------------- cautious_delete ---------------------------- @cached_property @@ -597,15 +430,10 @@ def _check_delete_permission(self) -> None: ) logger.info(f"Queueing delete for session(s):\n{sess_summary}") - @cached_property - def _cautious_del_tbl(self): - """Temporary inclusion for usage tracking.""" + def _log_delete(self, start, del_blob=None, super_delete=False): + """Log use of super_delete.""" from spyglass.common.common_usage import CautiousDelete - return CautiousDelete() - - def _log_delete(self, start, del_blob=None, super_delete=False): - """Log use of cautious_delete.""" safe_insert = dict( duration=time() - start, dj_user=dj.config["database.user"], @@ -614,7 +442,7 @@ def _log_delete(self, start, del_blob=None, super_delete=False): restr_str = "Super delete: " if super_delete else "" restr_str += "".join(self.restriction) if self.restriction else "None" try: - self._cautious_del_tbl.insert1( + CautiousDelete().insert1( dict( **safe_insert, restriction=restr_str[:255], @@ -622,11 +450,17 @@ def _log_delete(self, start, del_blob=None, super_delete=False): ) ) except (DataJointError, DataError): - self._cautious_del_tbl.insert1( - dict(**safe_insert, restriction="Unknown") - ) + CautiousDelete().insert1(dict(**safe_insert, restriction="Unknown")) + + @cached_property + def _has_updated_dj_version(self): + """Return True if DataJoint version is up to date.""" + target_dj = version_parse("0.14.2") + ret = version_parse(dj.__version__) >= target_dj + if not ret: + logger.warning(f"Please update DataJoint to {target_dj} or later.") + return ret - # TODO: Intercept datajoint delete confirmation prompt for merge deletes def cautious_delete( self, force_permission: bool = False, dry_run=False, *args, **kwargs ): @@ -638,10 +472,6 @@ def cautious_delete( continues. If the Session has no experimenter, or if the user is not on a team with the Session experimenter(s), a PermissionError is raised. - Potential downstream orphans are deleted first. These are master tables - whose parts have foreign keys to descendants of self. Then, rows from - self are deleted. Last, Nwbfile and IntervalList externals are deleted. - Parameters ---------- force_permission : bool, optional @@ -653,33 +483,25 @@ def cautious_delete( *args, **kwargs : Any Passed to datajoint.table.Table.delete. """ - start = time() - if len(self) == 0: logger.warning(f"Table is empty. No need to delete.\n{self}") return + if self._has_updated_dj_version: + kwargs["force_masters"] = True + external, IntervalList = self._delete_deps[3], self._delete_deps[4] if not force_permission or dry_run: self._check_delete_permission() - down_fts = self.delete_downstream_parts( - dry_run=True, - disable_warning=True, - ) - if dry_run: return ( - down_fts, IntervalList(), # cleanup func relies on downstream deletes external["raw"].unused(), external["analysis"].unused(), ) - if not self._commit_downstream_delete(down_fts, start=start, **kwargs): - return # Abort delete based on user input - super().delete(*args, **kwargs) # Confirmation here for ext_type in ["raw", "analysis"]: @@ -687,13 +509,8 @@ def cautious_delete( delete_external_files=True, display_progress=False ) - _ = IntervalList().nightly_cleanup(dry_run=False) - - self._log_delete(start=start, del_blob=down_fts) - - def cdel(self, *args, **kwargs): - """Alias for cautious_delete.""" - return self.cautious_delete(*args, **kwargs) + if not self._test_mode: + _ = IntervalList().nightly_cleanup(dry_run=False) def delete(self, *args, **kwargs): """Alias for cautious_delete, overwrites datajoint.table.Table.delete""" @@ -728,6 +545,7 @@ def _hash_upstream(self, keys): RestrGraph = self._graph_deps[1] if not (parents := self.parents(as_objects=True, primary=True)): + # Should not happen, as this is only called from populated tables raise RuntimeError("No upstream tables found for upstream hash.") leaves = { # Restriction on each primary parent diff --git a/tests/common/test_position.py b/tests/common/test_position.py index e5f39c20c..889dafa60 100644 --- a/tests/common/test_position.py +++ b/tests/common/test_position.py @@ -30,8 +30,6 @@ def param_table(common_position, default_param_key, teardown): param_table = common_position.PositionInfoParameters() param_table.insert1(default_param_key, skip_duplicates=True) yield param_table - if teardown: - param_table.delete(safemode=False) @pytest.fixture(scope="session") @@ -61,8 +59,6 @@ def upsample_position( ) common_position.IntervalPositionInfo.populate(interval_pos_key) yield interval_pos_key - if teardown: - (param_table & upsample_param_key).delete(safemode=False) @pytest.fixture(scope="session") @@ -101,8 +97,6 @@ def upsample_position_error( interval_pos_key, skip_duplicates=not teardown ) yield interval_pos_key - if teardown: - (param_table & upsample_param_key).delete(safemode=False) def test_interval_position_info_insert_error( diff --git a/tests/conftest.py b/tests/conftest.py index 04420ffe2..09b6486d0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -465,8 +465,6 @@ def trodes_params(trodes_params_table, teardown): [v for k, v in paramsets.items()], skip_duplicates=True ) yield paramsets - if teardown: - trodes_params_table.delete(safemode=False) @pytest.fixture(scope="session") @@ -488,8 +486,6 @@ def trodes_sel_keys( ] trodes_sel_table.insert(keys, skip_duplicates=True) yield keys - if teardown: - trodes_sel_table.delete(safemode=False) @pytest.fixture(scope="session") @@ -497,8 +493,6 @@ def trodes_pos_v1(teardown, sgp, trodes_sel_keys): v1 = sgp.v1.TrodesPosV1() v1.populate(trodes_sel_keys) yield v1 - if teardown: - v1.delete(safemode=False) @pytest.fixture(scope="session") @@ -609,8 +603,6 @@ def track_graph(teardown, sgpl, track_graph_key): ) yield sgpl.TrackGraph & {"track_graph_name": "6 arm"} - if teardown: - sgpl.TrackGraph().delete(safemode=False) @pytest.fixture(scope="session") @@ -645,8 +637,6 @@ def lin_sel(teardown, sgpl, lin_sel_key): sel_table = sgpl.LinearizationSelection() sel_table.insert1(lin_sel_key, skip_duplicates=True) yield sel_table - if teardown: - sel_table.delete(safemode=False) @pytest.fixture(scope="session") @@ -654,8 +644,6 @@ def lin_v1(teardown, sgpl, lin_sel): v1 = sgpl.LinearizedPositionV1() v1.populate() yield v1 - if teardown: - v1.delete(safemode=False) @pytest.fixture(scope="session") @@ -882,7 +870,6 @@ def insert_project( yield project_key, cfg, config_path if teardown: - (dlc_project_tbl & project_key).delete(safemode=False) shutil_rmtree(str(Path(config_path).parent)) diff --git a/tests/container.py b/tests/container.py index 1747d76b8..fb960dc07 100644 --- a/tests/container.py +++ b/tests/container.py @@ -215,9 +215,9 @@ def stop(self, remove=True) -> None: return container_name = self.container_name - self.container.stop() - self.logger.info(f"Container {container_name} stopped.") + self.container.stop() # Logger I/O operations close during teardown + print(f"Container {container_name} stopped.") if remove: self.container.remove() - self.logger.info(f"Container {container_name} removed.") + print(f"Container {container_name} removed.") diff --git a/tests/position/test_trodes.py b/tests/position/test_trodes.py index 6d65f375c..2608d70c6 100644 --- a/tests/position/test_trodes.py +++ b/tests/position/test_trodes.py @@ -30,8 +30,6 @@ def sel_table(teardown, params_table, trodes_sel_table, pos_interval_key): edit_name=new_name, ) yield trodes_sel_table & restr_dict - if teardown: - (trodes_sel_table & restr_dict).delete(safemode=False) def test_sel_default(sel_table): diff --git a/tests/utils/test_merge.py b/tests/utils/test_merge.py index 2876555a1..fc225cf21 100644 --- a/tests/utils/test_merge.py +++ b/tests/utils/test_merge.py @@ -49,9 +49,9 @@ class NonMerge(SpyglassMixin, dj.Manual): yield NonMerge -def test_non_merge(NonMerge): - with pytest.raises(AttributeError): - NonMerge() +def test_non_merge(schema_test, NonMerge): + with pytest.raises(TypeError): + schema_test(NonMerge) def test_part_camel(merge_table): diff --git a/tests/utils/test_mixin.py b/tests/utils/test_mixin.py index a35041013..a10b39118 100644 --- a/tests/utils/test_mixin.py +++ b/tests/utils/test_mixin.py @@ -39,6 +39,11 @@ def test_auto_increment(schema_test, Mixin): assert ret["id"] == 2, "Auto increment not working." +def test_good_file_like(common): + common.Session().file_like("min") + assert len(common.Session()) > 0, "file_like not working." + + def test_null_file_like(schema_test, Mixin): schema_test(Mixin) ret = Mixin().file_like(None) @@ -52,73 +57,83 @@ def test_bad_file_like(caplog, schema_test, Mixin): assert "No file_like field" in caplog.text, "No warning issued." -def test_partmaster_detect(Nwbfile, pos_merge_tables): - """Test that the mixin can detect merge children of merge.""" - assert len(Nwbfile._part_masters) >= 14, "Part masters not detected." - - -def test_downstream_restrict( - Nwbfile, frequent_imports, pos_merge_tables, lin_v1, lfp_merge_key -): - """Test that the mixin can join merge chains.""" - - _ = frequent_imports # graph for cascade - _ = lin_v1, lfp_merge_key # merge tables populated +@pytest.mark.skipif(not VERBOSE, reason="No logging to test when quiet-spy.") +def test_insert_fail(caplog, common, mini_dict): + this_key = dict(mini_dict, interval_list_name="BadName") + common.PositionSource().find_insert_fail(this_key) + assert "IntervalList: MISSING" in caplog.text, "No warning issued." - restr_ddp = Nwbfile.ddp(dry_run=True, reload_cache=True) - end_len = [len(ft) for ft in restr_ddp] - assert sum(end_len) >= 8, "Downstream parts not restricted correctly." +def test_exp_summary(Nwbfile): + fields = Nwbfile._get_exp_summary().heading.names + expected = ["nwb_file_name", "lab_member_name"] + assert fields == expected, "Exp summary fields not as expected." -def test_get_downstream_merge(Nwbfile, pos_merge_tables): - """Test that the mixin can get the chain of a merge.""" - lin_output = pos_merge_tables[1].full_table_name - assert lin_output in Nwbfile._part_masters, "Merge not found." +def test_exp_summary_no_link(schema_test, Mixin): + schema_test(Mixin) + assert Mixin()._get_exp_summary() is None, "Exp summary not None." -@pytest.mark.skipif(not VERBOSE, reason="No logging to test when quiet-spy.") -def test_ddp_warning(Nwbfile, caplog): - """Test that the mixin warns on empty delete_downstream_merge.""" - (Nwbfile.file_like("BadName")).delete_downstream_parts( - reload_cache=True, disable_warnings=False - ) - assert "No part deletes found" in caplog.text, "No warning issued." +def test_exp_summary_auto_link(common): + lab_member = common.LabMember() + lab_join = lab_member * common.Session.Experimenter + assert lab_member._get_exp_summary() == lab_join, "Auto link not working." -def test_ddp_dry_run( - Nwbfile, frequent_imports, common, sgp, pos_merge_tables, lin_v1 -): - """Test that the mixin can dry run delete_downstream_merge.""" - _ = lin_v1 # merge tables populated - _ = frequent_imports # graph for cascade +def test_cautious_del_dry_run(Nwbfile, frequent_imports): + _ = frequent_imports # part of cascade, need import + ret = Nwbfile.cautious_delete(dry_run=True)[1].full_table_name + assert ( + ret == "`common_nwbfile`.`~external_raw`" + ), "Dry run delete not working." - pos_output_name = pos_merge_tables[0].full_table_name - param_field = "trodes_pos_params_name" - trodes_params = sgp.v1.TrodesPosParams() +@pytest.mark.skipif(not VERBOSE, reason="No logging to test when quiet-spy.") +def test_empty_cautious_del(caplog, schema_test, Mixin): + schema_test(Mixin) + Mixin().cautious_delete() + assert "empty" in caplog.text, "No warning issued." - rft = [ - table - for table in (trodes_params & f'{param_field} LIKE "%ups%"').ddp( - reload_cache=True, dry_run=True - ) - if table.full_table_name == pos_output_name - ] - assert len(rft) == 1, "ddp did not return restricted table." +def test_super_delete(schema_test, Mixin, common): + schema_test(Mixin) + Mixin().insert1((0,)) + Mixin().super_delete(safemode=False) + assert len(Mixin()) == 0, "Super delete not working." + + logged_dels = common.common_usage.CautiousDelete & 'restriction LIKE "Sup%"' + assert len(logged_dels) > 0, "Super delete not logged." + + +def test_compare_versions(common): + compare_func = common.Nwbfile().compare_versions + compare_func("0.1.0", "0.1.0") + with pytest.raises(RuntimeError): + compare_func("0.1.0", "0.1.1") + + +@pytest.fixture +def custom_table(): + """Custom table on user prefix for testing load_shared_schemas.""" + db, table = dj.config["database.user"] + "_test", "custom" + dj.conn().query(f"CREATE DATABASE IF NOT EXISTS {db};") + dj.conn().query(f"USE {db};") + dj.conn().query( + f"CREATE TABLE IF NOT EXISTS {table} ( " + + "`merge_id` binary(16) NOT NULL COMMENT ':uuid:', " + + "`unit_id` int NOT NULL, " + + "PRIMARY KEY (`merge_id`), " + + "CONSTRAINT `unit_annotation_ibfk_1` FOREIGN KEY (`merge_id`) " + + "REFERENCES `spikesorting_merge`.`spike_sorting_output` (`merge_id`) " + + "ON DELETE RESTRICT ON UPDATE CASCADE);" + ) + yield f"`{db}`.`{table}`" -def test_exp_summary(Nwbfile): - fields = Nwbfile._get_exp_summary().heading.names - expected = ["nwb_file_name", "lab_member_name"] - assert fields == expected, "Exp summary fields not as expected." +def test_load_shared_schemas(common, custom_table): + # from spyglass.common import Nwbfile -def test_cautious_del_dry_run(Nwbfile, frequent_imports): - _ = frequent_imports # part of cascade, need import - ret = Nwbfile.cdel(dry_run=True) - part_master_names = [t.full_table_name for t in ret[0]] - part_masters = Nwbfile._part_masters - assert all( - [pm in part_masters for pm in part_master_names] - ), "Non part masters found in cautious delete dry run." + common.Nwbfile().load_shared_schemas(additional_prefixes=["test"]) + nodes = common.Nwbfile().connection.dependencies.nodes + assert custom_table in nodes, "Custom table not loaded."