From 12eca2847d8eeabaecf249a64b85c61a2b5c0b84 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Tue, 16 May 2023 14:57:32 +0200 Subject: [PATCH 1/2] Fix dataype_change not updating HDCA update_time That update_time is used to find changed items in the history --- lib/galaxy/celery/tasks.py | 9 +++++++++ lib/galaxy/model/__init__.py | 9 +++++++++ lib/galaxy/webapps/galaxy/services/history_contents.py | 7 +++++-- lib/galaxy_test/api/test_history_contents.py | 10 ++++++++-- 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/lib/galaxy/celery/tasks.py b/lib/galaxy/celery/tasks.py index 8c7fdc1d47a3..ffda3e23417e 100644 --- a/lib/galaxy/celery/tasks.py +++ b/lib/galaxy/celery/tasks.py @@ -133,6 +133,15 @@ def change_datatype( set_metadata(hda_manager, ldda_manager, sa_session, dataset_id, model_class) +@galaxy_task(action="touch update_time of object") +def touch(sa_session: galaxy_scoped_session, item_id: int, model_class: str = "HistoryDatasetCollectionAssociation"): + if model_class != "HistoryDatasetCollectionAssociation": + raise NotImplementedError(f"touch method not implemented for '{model_class}'") + item = sa_session.query(model.HistoryDatasetCollectionAssociation).filter_by(id=item_id).one() + item.touch() + sa_session.flush() + + @galaxy_task(action="set dataset association metadata") def set_metadata( hda_manager: HDAManager, diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 198d15cafe7c..335a8e462f68 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -96,6 +96,7 @@ registry, relationship, ) +from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.orm.collections import attribute_mapped_collection from sqlalchemy.sql import exists from typing_extensions import Protocol @@ -6212,6 +6213,14 @@ def dataset_dbkeys_and_extensions_summary(self): def job_source_id(self): return self.implicit_collection_jobs_id or self.job_id + def touch(self): + # cause an update to be emitted, so that e.g. update_time is incremented and triggers are notified + if getattr(self, "name", None): + # attribute to flag doesn't really matter as long as it's not null (and primary key also doesn't work) + flag_modified(self, "name") + if self.collection: + flag_modified(self.collection, "collection_type") + def to_hda_representative(self, multiple=False): rval = [] for dataset in self.collection.dataset_elements: diff --git a/lib/galaxy/webapps/galaxy/services/history_contents.py b/lib/galaxy/webapps/galaxy/services/history_contents.py index ffaeadb545aa..6eccd54d5690 100644 --- a/lib/galaxy/webapps/galaxy/services/history_contents.py +++ b/lib/galaxy/webapps/galaxy/services/history_contents.py @@ -12,7 +12,7 @@ Union, ) -from celery import group +from celery import chain from pydantic import ( Extra, Field, @@ -28,6 +28,7 @@ materialize as materialize_task, prepare_dataset_collection_download, prepare_history_content_download, + touch, write_history_content_to, ) from galaxy.managers import ( @@ -1428,8 +1429,10 @@ def _change_datatype( wrapped_task = self._change_item_datatype(dataset_instance, params, trans) if wrapped_task: wrapped_tasks.append(wrapped_task) + trans.sa_session.execute trans.sa_session.flush() - group(wrapped_tasks).delay() + # chain these for sequential execution. chord would be nice, but requires a non-RPC backend. + chain(*wrapped_tasks, touch.si(item_id=item.id, model_class="HistoryDatasetCollectionAssociation")).delay() def _change_item_datatype( self, item: HistoryDatasetAssociation, params: ChangeDatatypeOperationParams, trans: ProvidesHistoryContext diff --git a/lib/galaxy_test/api/test_history_contents.py b/lib/galaxy_test/api/test_history_contents.py index eba99f84b94e..df76a366c56a 100644 --- a/lib/galaxy_test/api/test_history_contents.py +++ b/lib/galaxy_test/api/test_history_contents.py @@ -1081,13 +1081,14 @@ def test_bulk_datatype_change_collection(self): _, collection_ids, history_contents = self._create_test_history_contents(history_id) history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata") + original_collection_update_times = [] for item in history_contents: if item["history_content_type"] == "dataset": assert item["extension"] == "txt" assert item["data_type"] == "galaxy.datatypes.data.Text" assert "metadata_column_names" not in item - - self.dataset_populator.wait_for_history_jobs(history_id) + if item["history_content_type"] == "dataset_collection": + original_collection_update_times.append(item["update_time"]) expected_datatype = "tabular" # Change datatype of all datasets @@ -1107,11 +1108,16 @@ def test_bulk_datatype_change_collection(self): self.dataset_populator.wait_for_history(history_id) history_contents = self._get_history_contents(history_id, query="?v=dev&keys=extension,data_type,metadata") + new_collection_update_times = [] for item in history_contents: if item["history_content_type"] == "dataset": assert item["extension"] == "tabular" assert item["data_type"] == "galaxy.datatypes.tabular.Tabular" assert "metadata_column_names" in item + if item["history_content_type"] == "dataset_collection": + new_collection_update_times.append(item["update_time"]) + + assert original_collection_update_times != new_collection_update_times def test_bulk_datatype_change_should_skip_set_metadata_on_deferred_data(self): with self.dataset_populator.test_history() as history_id: From cfcd9775f64fed35545ac8d7af4bd5bdbc3c7d6a Mon Sep 17 00:00:00 2001 From: Marius van den Beek Date: Wed, 17 May 2023 08:22:26 +0200 Subject: [PATCH 2/2] Drop unused line --- lib/galaxy/webapps/galaxy/services/history_contents.py | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/galaxy/webapps/galaxy/services/history_contents.py b/lib/galaxy/webapps/galaxy/services/history_contents.py index 6eccd54d5690..53b68c440716 100644 --- a/lib/galaxy/webapps/galaxy/services/history_contents.py +++ b/lib/galaxy/webapps/galaxy/services/history_contents.py @@ -1429,7 +1429,6 @@ def _change_datatype( wrapped_task = self._change_item_datatype(dataset_instance, params, trans) if wrapped_task: wrapped_tasks.append(wrapped_task) - trans.sa_session.execute trans.sa_session.flush() # chain these for sequential execution. chord would be nice, but requires a non-RPC backend. chain(*wrapped_tasks, touch.si(item_id=item.id, model_class="HistoryDatasetCollectionAssociation")).delay()