Skip to content

Commit

Permalink
cleanup mypy
Browse files Browse the repository at this point in the history
  • Loading branch information
dwhswenson committed May 31, 2023
1 parent 236b263 commit b805aac
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 54 deletions.
131 changes: 94 additions & 37 deletions gufe/storage/stagingdirectory.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,10 @@ def __init__(
*,
holding: PathLike = Path(".holding"),
delete_holding: bool = True,
read_only: bool = False,
):
self.external = external
self.scratch = Path(scratch)
self.prefix = Path(prefix)
self.read_only = read_only
self.delete_holding = delete_holding
self.holding = holding

Expand All @@ -104,34 +102,9 @@ def __init__(
self.staging_dir = self.scratch / holding / prefix
self.staging_dir.mkdir(exist_ok=True, parents=True)

def get_other_staging_dir(self, prefix, delete_holding=None):
"""Get a related unit's staging directory.
"""
if delete_holding is None:
delete_holding = self.delete_holding

return StagingDirectory(
scratch=self.scratch,
external=self.external,
prefix=prefix,
holding=self.holding,
delete_holding=delete_holding,
read_only=True,
)

@contextmanager
def other_shared(self, prefix, delete_holding=None):
other = self.get_other_staging_dir(prefix, delete_holding)
yield other
other.cleanup()


def transfer_single_file_to_external(self, held_file):
"""Transfer a given file from holding into external storage
"""
if self.read_only:
logging.debug("Read-only: Not transfering to external storage")
return # early exit

path = Path(held_file)
if not path.exists():
Expand All @@ -146,10 +119,6 @@ def transfer_single_file_to_external(self, held_file):

def transfer_holding_to_external(self):
"""Transfer all objects in the registry to external storage"""
if self.read_only:
logging.debug("Read-only: Not transfering to external storage")
return # early exit

for obj in self.registry:
self.transfer_single_file_to_external(obj)

Expand Down Expand Up @@ -182,20 +151,18 @@ def register_path(self, staging_path):
"""
label_exists = self.external.exists(staging_path.label)

if self.read_only and not label_exists:
raise IOError(f"Unable to create '{staging_path.label}'. File "
"does not exist in external storage, and This "
"staging path is read-only.")

self.registry.add(staging_path)

# if this is a file that exists, bring it into our subdir
# NB: this happens even if you're intending to overwrite the path,
# which is kind of wasteful
if label_exists:
self._load_file_from_external(self.external, staging_path)

def _load_file_from_external(self, external, staging_path):
scratch_path = self.staging_dir / staging_path.path
# TODO: switch this to using `get_filename` and `store_path`
with self.external.load_stream(staging_path.label) as f:
with external.load_stream(staging_path.label) as f:
external_bytes = f.read()
if scratch_path.exists():
self.preexisting.add(staging_path)
Expand All @@ -217,6 +184,96 @@ def __repr__(self):
)


class SharedStaging(StagingDirectory):
def __init__(
self,
scratch: PathLike,
external: ExternalStorage,
prefix: str,
*,
holding: PathLike = Path(".holding"),
delete_holding: bool = True,
read_only: bool = False,
):
super().__init__(scratch, external, prefix, holding=holding,
delete_holding=delete_holding)
self.read_only = read_only

def get_other_shared(self, prefix, delete_holding=None):
"""Get a related unit's staging directory.
"""
if delete_holding is None:
delete_holding = self.delete_holding

return SharedStaging(
scratch=self.scratch,
external=self.external,
prefix=prefix,
holding=self.holding,
delete_holding=delete_holding,
read_only=True,
)

@contextmanager
def other_shared(self, prefix, delete_holding=None):
"""Context manager approach for getting a related unit's directory.
This is usually the recommended way to get a previous unit's shared
data.
"""
other = self.get_other_shared(prefix, delete_holding)
yield other
other.cleanup()

def transfer_single_file_to_external(self, held_file):
if self.read_only:
logging.debug("Read-only: Not transfering to external storage")
return # early exit

super().transfer_single_file_to_external(held_file)

def transfer_holding_to_external(self):
if self.read_only:
logging.debug("Read-only: Not transfering to external storage")
return # early exit

super().transfer_holding_to_external()

def register_path(self, staging_path):
label_exists = self.external.exists(staging_path.label)

if self.read_only and not label_exists:
raise IOError(f"Unable to create '{staging_path.label}'. File "
"does not exist in external storage, and This "
"staging path is read-only.")

super().register_path(staging_path)


class PermanentStaging(StagingDirectory):
def __init__(
self,
scratch: PathLike,
external: ExternalStorage,
shared: ExternalStorage,
prefix: str,
*,
holding: PathLike = Path(".holding"),
delete_holding: bool = True,
):
super().__init__(scratch, external, prefix, holding=holding,
delete_holding=delete_holding)
self.shared = shared

def transfer_single_file_to_external(self, held_file):
# for this one, if we can't fin
path = Path(held_file)
if not path.exists():
self._load_file_from_external(self.shared, held_file)

super().transfer_single_file_to_external(held_file)


class StagingPath:
"""PathLike object linking local path with label for external storage.
Expand Down
28 changes: 22 additions & 6 deletions gufe/storage/storagemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from contextlib import contextmanager
import shutil

from typing import Type

from .externalresource import ExternalStorage, FileStorage
from .stagingdirectory import StagingDirectory
from .stagingdirectory import SharedStaging, PermanentStaging

def _storage_path_conflict(external, path):
"""Check if deleting ``path`` could delete externally stored data
Expand All @@ -22,8 +24,20 @@ def _storage_path_conflict(external, path):
else:
return True

class _AbstractDAGContextManager:
@classmethod
@contextmanager
def running_dag(cls, storage_manager, dag_label):
raise NotImplementedError()

@contextmanager
def running_unit(cls, unit):
raise NotImplementedError()

DAGContextManager = Type[_DAGStorageManager]

class _DAGStorageManager:

class _DAGStorageManager(_AbstractDAGContextManager):
"""Context manager to handle details of storage lifecycle.
Making this a separate class ensures that ``running_unit`` is always
Expand Down Expand Up @@ -108,9 +122,9 @@ def __init__(
keep_scratch: bool = False,
keep_holding: bool = False,
holding: PathLike = Path(".holding"),
DAGContextClass: type = _DAGStorageManager,
DAGContextClass: DAGContextManager = _DAGStorageManager,
):
self.scratch_root = scratch_root
self.scratch_root = Path(scratch_root)
self.shared_root = shared_root
self.permanent_root = permanent_root
self.keep_scratch = keep_scratch
Expand All @@ -120,21 +134,23 @@ def __init__(

def get_scratch(self, dag_label: str , unit_label: str) -> Path:
"""Get the path for this unit's scratch directory"""

scratch = self.scratch_root / dag_label / "scratch" / unit_label
scratch.mkdir(parents=True, exist_ok=True)
return scratch

def get_permanent(self, dag_label, unit_label):
"""Get the object for this unit's permanent holding directory"""
return StagingDirectory(
return PermanentStaging(
scratch=self.scratch_root / dag_label,
external=self.permanent_root,
shared=self.shared_root,
prefix=unit_label,
)

def get_shared(self, dag_label, unit_label):
"""Get the object for this unit's shared holding directory"""
return StagingDirectory(
return SharedStaging(
scratch=self.scratch_root / dag_label,
external=self.shared_root,
prefix=unit_label
Expand Down
10 changes: 5 additions & 5 deletions gufe/tests/storage/test_stagingdirectory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

from gufe.storage.externalresource import MemoryStorage
from gufe.storage.stagingdirectory import (
StagingDirectory, _delete_empty_dirs
SharedStaging, PermanentStaging, _delete_empty_dirs
)

@pytest.fixture
def root(tmp_path):
external = MemoryStorage()
external.store_bytes("old_unit/data.txt", b"foo")
root = StagingDirectory(
root = SharedStaging(
scratch=tmp_path,
external=external,
prefix="new_unit",
Expand Down Expand Up @@ -74,7 +74,7 @@ def test_delete_empty_dirs_delete_root(tmp_path, delete_root):



class TestStagingDirectory:
class TestSharedStaging:
@pytest.mark.parametrize('pathlist', [
['file.txt'], ['dir', 'file.txt']
])
Expand All @@ -100,7 +100,7 @@ def test_read_old(self, root):

# when we create the specific StagingPath, it registers and
# "downloads" the file
old_staging = root.get_other_staging_dir("old_unit")
old_staging = root.get_other_shared("old_unit")
filepath = old_staging / "data.txt"
assert pathlib.Path(filepath) == on_filesystem
assert on_filesystem.exists()
Expand All @@ -122,7 +122,7 @@ def test_write_new(self, root):
assert not root.external.exists(label)

def test_write_old_fail(self, root):
old_staging = root.get_other_staging_dir("old_unit")
old_staging = root.get_other_shared("old_unit")
with pytest.raises(IOError, match="read-only"):
old_staging / "foo.txt"

Expand Down
13 changes: 7 additions & 6 deletions gufe/tests/storage/test_storagemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ def storage_manager_holding_overlaps_permanent(tmp_path):
@pytest.fixture
def dag_units():
class Unit1:
@property
def key(self):
return "unit1"
key = "unit1"

def run(self, scratch, shared, permanent):
(scratch / "foo.txt").touch()
Expand All @@ -39,9 +37,7 @@ def run(self, scratch, shared, permanent):
return "done 1"

class Unit2:
@property
def key(self):
return "unit2"
key = "unit2"

def run(self, scratch, shared, permanent):
(scratch / "foo2.txt").touch()
Expand Down Expand Up @@ -128,6 +124,11 @@ def test_lifecycle(request, manager, dag_units):
# assert set(shared_root.iter_contents()) == {"unit1/bar.txt",
# "unit1/baz.txt"}
assert list(permanent_root.iter_contents()) == ["unit1/baz.txt"]
# check the results
assert results == [
"done 1",
{"bar": "bar was written", "baz": "baz was written"}
]

def test_lifecycle_keep_scratch_and_holding():
...
Expand Down

0 comments on commit b805aac

Please sign in to comment.