diff --git a/gufe/storage/stagingdirectory.py b/gufe/storage/stagingdirectory.py index d7411e6e..e19b7882 100644 --- a/gufe/storage/stagingdirectory.py +++ b/gufe/storage/stagingdirectory.py @@ -82,12 +82,10 @@ def __init__( *, holding: PathLike = Path(".holding"), delete_holding: bool = True, - read_only: bool = False, ): self.external = external self.scratch = Path(scratch) self.prefix = Path(prefix) - self.read_only = read_only self.delete_holding = delete_holding self.holding = holding @@ -104,34 +102,9 @@ def __init__( self.staging_dir = self.scratch / holding / prefix self.staging_dir.mkdir(exist_ok=True, parents=True) - def get_other_staging_dir(self, prefix, delete_holding=None): - """Get a related unit's staging directory. - """ - if delete_holding is None: - delete_holding = self.delete_holding - - return StagingDirectory( - scratch=self.scratch, - external=self.external, - prefix=prefix, - holding=self.holding, - delete_holding=delete_holding, - read_only=True, - ) - - @contextmanager - def other_shared(self, prefix, delete_holding=None): - other = self.get_other_staging_dir(prefix, delete_holding) - yield other - other.cleanup() - - def transfer_single_file_to_external(self, held_file): """Transfer a given file from holding into external storage """ - if self.read_only: - logging.debug("Read-only: Not transfering to external storage") - return # early exit path = Path(held_file) if not path.exists(): @@ -146,10 +119,6 @@ def transfer_single_file_to_external(self, held_file): def transfer_holding_to_external(self): """Transfer all objects in the registry to external storage""" - if self.read_only: - logging.debug("Read-only: Not transfering to external storage") - return # early exit - for obj in self.registry: self.transfer_single_file_to_external(obj) @@ -182,20 +151,18 @@ def register_path(self, staging_path): """ label_exists = self.external.exists(staging_path.label) - if self.read_only and not label_exists: - raise IOError(f"Unable to create '{staging_path.label}'. File " - "does not exist in external storage, and This " - "staging path is read-only.") - self.registry.add(staging_path) # if this is a file that exists, bring it into our subdir # NB: this happens even if you're intending to overwrite the path, # which is kind of wasteful if label_exists: + self._load_file_from_external(self.external, staging_path) + + def _load_file_from_external(self, external, staging_path): scratch_path = self.staging_dir / staging_path.path # TODO: switch this to using `get_filename` and `store_path` - with self.external.load_stream(staging_path.label) as f: + with external.load_stream(staging_path.label) as f: external_bytes = f.read() if scratch_path.exists(): self.preexisting.add(staging_path) @@ -217,6 +184,96 @@ def __repr__(self): ) +class SharedStaging(StagingDirectory): + def __init__( + self, + scratch: PathLike, + external: ExternalStorage, + prefix: str, + *, + holding: PathLike = Path(".holding"), + delete_holding: bool = True, + read_only: bool = False, + ): + super().__init__(scratch, external, prefix, holding=holding, + delete_holding=delete_holding) + self.read_only = read_only + + def get_other_shared(self, prefix, delete_holding=None): + """Get a related unit's staging directory. + """ + if delete_holding is None: + delete_holding = self.delete_holding + + return SharedStaging( + scratch=self.scratch, + external=self.external, + prefix=prefix, + holding=self.holding, + delete_holding=delete_holding, + read_only=True, + ) + + @contextmanager + def other_shared(self, prefix, delete_holding=None): + """Context manager approach for getting a related unit's directory. + + This is usually the recommended way to get a previous unit's shared + data. + """ + other = self.get_other_shared(prefix, delete_holding) + yield other + other.cleanup() + + def transfer_single_file_to_external(self, held_file): + if self.read_only: + logging.debug("Read-only: Not transfering to external storage") + return # early exit + + super().transfer_single_file_to_external(held_file) + + def transfer_holding_to_external(self): + if self.read_only: + logging.debug("Read-only: Not transfering to external storage") + return # early exit + + super().transfer_holding_to_external() + + def register_path(self, staging_path): + label_exists = self.external.exists(staging_path.label) + + if self.read_only and not label_exists: + raise IOError(f"Unable to create '{staging_path.label}'. File " + "does not exist in external storage, and This " + "staging path is read-only.") + + super().register_path(staging_path) + + +class PermanentStaging(StagingDirectory): + def __init__( + self, + scratch: PathLike, + external: ExternalStorage, + shared: ExternalStorage, + prefix: str, + *, + holding: PathLike = Path(".holding"), + delete_holding: bool = True, + ): + super().__init__(scratch, external, prefix, holding=holding, + delete_holding=delete_holding) + self.shared = shared + + def transfer_single_file_to_external(self, held_file): + # for this one, if we can't fin + path = Path(held_file) + if not path.exists(): + self._load_file_from_external(self.shared, held_file) + + super().transfer_single_file_to_external(held_file) + + class StagingPath: """PathLike object linking local path with label for external storage. diff --git a/gufe/storage/storagemanager.py b/gufe/storage/storagemanager.py index f914d32d..aa4fc4ef 100644 --- a/gufe/storage/storagemanager.py +++ b/gufe/storage/storagemanager.py @@ -3,8 +3,10 @@ from contextlib import contextmanager import shutil +from typing import Type + from .externalresource import ExternalStorage, FileStorage -from .stagingdirectory import StagingDirectory +from .stagingdirectory import SharedStaging, PermanentStaging def _storage_path_conflict(external, path): """Check if deleting ``path`` could delete externally stored data @@ -22,8 +24,20 @@ def _storage_path_conflict(external, path): else: return True +class _AbstractDAGContextManager: + @classmethod + @contextmanager + def running_dag(cls, storage_manager, dag_label): + raise NotImplementedError() + + @contextmanager + def running_unit(cls, unit): + raise NotImplementedError() + +DAGContextManager = Type[_DAGStorageManager] -class _DAGStorageManager: + +class _DAGStorageManager(_AbstractDAGContextManager): """Context manager to handle details of storage lifecycle. Making this a separate class ensures that ``running_unit`` is always @@ -108,9 +122,9 @@ def __init__( keep_scratch: bool = False, keep_holding: bool = False, holding: PathLike = Path(".holding"), - DAGContextClass: type = _DAGStorageManager, + DAGContextClass: DAGContextManager = _DAGStorageManager, ): - self.scratch_root = scratch_root + self.scratch_root = Path(scratch_root) self.shared_root = shared_root self.permanent_root = permanent_root self.keep_scratch = keep_scratch @@ -120,21 +134,23 @@ def __init__( def get_scratch(self, dag_label: str , unit_label: str) -> Path: """Get the path for this unit's scratch directory""" + scratch = self.scratch_root / dag_label / "scratch" / unit_label scratch.mkdir(parents=True, exist_ok=True) return scratch def get_permanent(self, dag_label, unit_label): """Get the object for this unit's permanent holding directory""" - return StagingDirectory( + return PermanentStaging( scratch=self.scratch_root / dag_label, external=self.permanent_root, + shared=self.shared_root, prefix=unit_label, ) def get_shared(self, dag_label, unit_label): """Get the object for this unit's shared holding directory""" - return StagingDirectory( + return SharedStaging( scratch=self.scratch_root / dag_label, external=self.shared_root, prefix=unit_label diff --git a/gufe/tests/storage/test_stagingdirectory.py b/gufe/tests/storage/test_stagingdirectory.py index 46c48819..09f4fc2d 100644 --- a/gufe/tests/storage/test_stagingdirectory.py +++ b/gufe/tests/storage/test_stagingdirectory.py @@ -5,14 +5,14 @@ from gufe.storage.externalresource import MemoryStorage from gufe.storage.stagingdirectory import ( - StagingDirectory, _delete_empty_dirs + SharedStaging, PermanentStaging, _delete_empty_dirs ) @pytest.fixture def root(tmp_path): external = MemoryStorage() external.store_bytes("old_unit/data.txt", b"foo") - root = StagingDirectory( + root = SharedStaging( scratch=tmp_path, external=external, prefix="new_unit", @@ -74,7 +74,7 @@ def test_delete_empty_dirs_delete_root(tmp_path, delete_root): -class TestStagingDirectory: +class TestSharedStaging: @pytest.mark.parametrize('pathlist', [ ['file.txt'], ['dir', 'file.txt'] ]) @@ -100,7 +100,7 @@ def test_read_old(self, root): # when we create the specific StagingPath, it registers and # "downloads" the file - old_staging = root.get_other_staging_dir("old_unit") + old_staging = root.get_other_shared("old_unit") filepath = old_staging / "data.txt" assert pathlib.Path(filepath) == on_filesystem assert on_filesystem.exists() @@ -122,7 +122,7 @@ def test_write_new(self, root): assert not root.external.exists(label) def test_write_old_fail(self, root): - old_staging = root.get_other_staging_dir("old_unit") + old_staging = root.get_other_shared("old_unit") with pytest.raises(IOError, match="read-only"): old_staging / "foo.txt" diff --git a/gufe/tests/storage/test_storagemanager.py b/gufe/tests/storage/test_storagemanager.py index 95fdf255..c4d60e56 100644 --- a/gufe/tests/storage/test_storagemanager.py +++ b/gufe/tests/storage/test_storagemanager.py @@ -25,9 +25,7 @@ def storage_manager_holding_overlaps_permanent(tmp_path): @pytest.fixture def dag_units(): class Unit1: - @property - def key(self): - return "unit1" + key = "unit1" def run(self, scratch, shared, permanent): (scratch / "foo.txt").touch() @@ -39,9 +37,7 @@ def run(self, scratch, shared, permanent): return "done 1" class Unit2: - @property - def key(self): - return "unit2" + key = "unit2" def run(self, scratch, shared, permanent): (scratch / "foo2.txt").touch() @@ -128,6 +124,11 @@ def test_lifecycle(request, manager, dag_units): # assert set(shared_root.iter_contents()) == {"unit1/bar.txt", # "unit1/baz.txt"} assert list(permanent_root.iter_contents()) == ["unit1/baz.txt"] + # check the results + assert results == [ + "done 1", + {"bar": "bar was written", "baz": "baz was written"} + ] def test_lifecycle_keep_scratch_and_holding(): ...