diff --git a/README.md b/README.md index 62b1bc8..d01a4d2 100644 --- a/README.md +++ b/README.md @@ -353,7 +353,7 @@ Stat for a bucket item ```python (doc) use_fs( root: Optional[str, pathlib.Path, bool] = None, -) -> Optional[pathy.file.BucketClientFS] +) -> Optional[pathy.base.BucketClientFS] ``` Use a path in the local file-system to store blobs and buckets. @@ -364,7 +364,7 @@ applications. # get_fs_client function ```python (doc) -get_fs_client() -> Optional[pathy.file.BucketClientFS] +get_fs_client() -> Optional[pathy.base.BucketClientFS] ``` Get the file-system client (or None) diff --git a/pathy/__init__.py b/pathy/__init__.py index f9193de..3384de1 100644 --- a/pathy/__init__.py +++ b/pathy/__init__.py @@ -1,25 +1,32 @@ from .base import ( - BasePath, - Blob, - BlobStat, - Bucket, - BucketClient, - BucketEntry, - BucketsAccessor, - ClientError, - FluidPath, - Pathy, - PurePathy, + BasePath as BasePath, + Blob as Blob, + BlobStat as BlobStat, + Bucket as Bucket, + BucketClient as BucketClient, + BucketEntry as BucketEntry, + BucketsAccessor as BucketsAccessor, + ClientError as ClientError, + FluidPath as FluidPath, + Pathy as Pathy, + PurePathy as PurePathy, + BlobFS as BlobFS, + BucketClientFS as BucketClientFS, + BucketEntryFS as BucketEntryFS, + BucketFS as BucketFS, + clear_fs_cache as clear_fs_cache, + get_client as get_client, + get_fs_cache as get_fs_cache, + get_fs_client as get_fs_client, + register_client as register_client, + set_client_params as set_client_params, + use_fs as use_fs, + use_fs_cache as use_fs_cache, ) -from .clients import ( - clear_fs_cache, - get_client, - get_fs_cache, - get_fs_client, - register_client, - set_client_params, - use_fs, - use_fs_cache, +from .gcs import ( + BlobGCS as BlobGCS, + BucketClientGCS as BucketClientGCS, + BucketEntryGCS as BucketEntryGCS, + BucketGCS as BucketGCS, + has_gcs as has_gcs, ) -from .file import BlobFS, BucketClientFS, BucketEntryFS, BucketFS -from .gcs import BlobGCS, BucketClientGCS, BucketEntryGCS, BucketGCS, has_gcs diff --git a/pathy/base.py b/pathy/base.py index 6870273..80ea08e 100644 --- a/pathy/base.py +++ b/pathy/base.py @@ -1,5 +1,8 @@ import os -from dataclasses import dataclass +import pathlib +import shutil +import tempfile +from dataclasses import dataclass, field from io import DEFAULT_BUFFER_SIZE from pathlib import _Accessor # type:ignore from pathlib import _PosixFlavour # type:ignore @@ -16,6 +19,7 @@ List, Optional, Tuple, + Type, TypeVar, Union, cast, @@ -335,9 +339,6 @@ class BucketsAccessor(_Accessor): _client: Optional[BucketClient] def client(self, path: "Pathy") -> BucketClient: - # lazily avoid circular imports - from .clients import get_client - return get_client(path.scheme) def get_blob(self, path: "Pathy") -> Optional[Blob]: @@ -546,8 +547,6 @@ def to_local(cls, blob_path: Union["Pathy", str], recurse: bool = True) -> Path: The cache is sensitive to the file updated time, and downloads new blobs as their updated timestamps change.""" - from .clients import get_fs_cache - cache_folder = get_fs_cache() if cache_folder is None: raise ValueError( @@ -899,3 +898,402 @@ def __iter__(self) -> Generator[BucketEntry, None, None]: def scandir(self) -> Generator[BucketEntry, None, None]: raise NotImplementedError("must be implemented in a subclass") + + +# +# File system adapter +# + + +class BucketEntryFS(BucketEntry["BucketFS", pathlib.Path]): + ... + + +@dataclass +class BlobFS(Blob["BucketFS", pathlib.Path]): + raw: pathlib.Path + bucket: "BucketFS" + + def delete(self) -> None: + """Delete a file-based blob.""" + file_folder: str = os.path.dirname(self.raw) + self.raw.unlink() + # NOTE: in buckets folders only exist if a file is contained in them. Mimic + # that behavior here by removing empty folders when the last file is removed. + if len(os.listdir(file_folder)) == 0: + shutil.rmtree(file_folder) + + def exists(self) -> bool: + return self.raw.exists() + + +@dataclass +class BucketFS(Bucket): + name: str + bucket: pathlib.Path + + def get_blob(self, blob_name: str) -> Optional[BlobFS]: # type:ignore[override] + native_blob = self.bucket / blob_name + if not native_blob.exists() or native_blob.is_dir(): + return None + stat = native_blob.stat() + # path.owner() raises KeyError if the owner's UID isn't known + # + # https://docs.python.org/3/library/pathlib.html#pathlib.Path.owner + owner: Optional[str] + try: + owner = native_blob.owner() + except KeyError: + owner = None + return BlobFS( + bucket=self, + owner=owner, + name=blob_name, + raw=native_blob, + size=stat.st_size, + updated=int(round(stat.st_mtime)), + ) + + def copy_blob( # type:ignore[override] + self, blob: BlobFS, target: "BucketFS", name: str + ) -> Optional[BlobFS]: + in_file = str(blob.bucket.bucket / blob.name) + out_file = str(target.bucket / name) + out_path = pathlib.Path(os.path.dirname(out_file)) + if not out_path.exists(): + out_path.mkdir(parents=True) + shutil.copy(in_file, out_file) + return None + + def delete_blob(self, blob: BlobFS) -> None: # type:ignore[override] + blob.delete() + + def delete_blobs(self, blobs: List[BlobFS]) -> None: # type:ignore[override] + for blob in blobs: + blob.delete() + + def exists(self) -> bool: + return self.bucket.exists() + + +@dataclass +class BucketClientFS(BucketClient): + # Root to store file-system buckets as children of + root: pathlib.Path = field(default_factory=lambda: pathlib.Path("/tmp/")) + + def full_path(self, path: Pathy) -> pathlib.Path: + if path.root is None: + raise ValueError(f"Invalid bucket name for path: {path}") + full_path = self.root.absolute() / path.root + if path.key is not None: + full_path = full_path / path.key + return full_path + + def exists(self, path: Pathy) -> bool: + """Return True if the path exists as a file or folder on disk""" + return self.full_path(path).exists() + + def is_dir(self, path: Pathy) -> bool: + return self.full_path(path).is_dir() + + def rmdir(self, path: Pathy) -> None: + full_path = self.full_path(path) + return shutil.rmtree(str(full_path)) + + def open( + self, + path: Pathy, + *, + mode: str = "r", + buffering: int = DEFAULT_BUFFER_SIZE, + encoding: Optional[str] = None, + errors: Optional[str] = None, + newline: Optional[str] = None, + ) -> StreamableType: + if self.lookup_bucket(path) is None: + raise ClientError(message=f'bucket "{path.root}" does not exist', code=404) + + full_path = self.full_path(path) + if not full_path.exists(): + if full_path.name != "": + full_path = full_path.parent + full_path.mkdir(parents=True, exist_ok=True) + return super().open( + path, + mode=mode, + buffering=buffering, + encoding=encoding, + errors=errors, + newline=newline, + ) + + def make_uri(self, path: PurePathy) -> str: + if not path.root: + raise ValueError(f"cannot make a URI to an invalid bucket: {path.root}") + full_path = self.root.absolute() / path.root + if path.key is not None: + full_path /= path.key + result = f"file://{full_path}" + return result + + def create_bucket(self, path: PurePathy) -> Bucket: + if not path.root: + raise ValueError(f"Invalid bucket name: {path.root}") + bucket_path: pathlib.Path = self.root / path.root + if bucket_path.exists(): + raise FileExistsError(f"Bucket already exists at: {bucket_path}") + bucket_path.mkdir(parents=True, exist_ok=True) + return BucketFS(str(path.root), bucket=bucket_path) + + def delete_bucket(self, path: PurePathy) -> None: + bucket_path: pathlib.Path = self.root / str(path.root) + if bucket_path.exists(): + shutil.rmtree(bucket_path) + + def lookup_bucket(self, path: PurePathy) -> Optional[BucketFS]: + if path.root: + bucket_path: pathlib.Path = self.root / path.root + if bucket_path.exists(): + return BucketFS(str(path.root), bucket=bucket_path) + return None + + def get_bucket(self, path: PurePathy) -> BucketFS: + if not path.root: + raise ValueError(f"path has an invalid bucket_name: {path.root}") + bucket_path: pathlib.Path = self.root / path.root + if bucket_path.is_dir(): + return BucketFS(str(path.root), bucket=bucket_path) + raise FileNotFoundError(f"Bucket {path.root} does not exist!") + + def list_buckets(self, **kwargs: Dict[str, Any]) -> Generator[BucketFS, None, None]: + for f in self.root.glob("*"): + if f.is_dir(): + yield BucketFS(f.name, f) + + def scandir( + self, + path: Pathy = None, + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + ) -> PathyScanDir: + return _FSScanDir(client=self, path=path, prefix=prefix, delimiter=delimiter) + + def list_blobs( + self, + path: PurePathy, + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + include_dirs: bool = False, + ) -> Generator[BlobFS, None, None]: + assert path.root is not None + bucket = self.get_bucket(path) + scan_path = self.root / path.root + if prefix is not None: + scan_path = scan_path / prefix + elif prefix is not None and path.key is not None: + scan_path = scan_path / path.key + + # Path to a file + if scan_path.exists() and not scan_path.is_dir(): + stat = scan_path.stat() + file_size = stat.st_size + updated = int(round(stat.st_mtime_ns * 1000)) + yield BlobFS( + bucket, + name=scan_path.name, + size=file_size, + updated=updated, + owner=None, + raw=scan_path, + ) + + # Yield blobs for each file + for file_path in scan_path.rglob("*"): + if file_path.is_dir(): + continue + stat = file_path.stat() + file_size = stat.st_size + updated = int(round(stat.st_mtime_ns * 1000)) + yield BlobFS( + bucket, + name=f"{prefix if prefix is not None else ''}{file_path.name}", + size=file_size, + updated=updated, + owner=None, + raw=file_path, + ) + + +class _FSScanDir(PathyScanDir): + _client: BucketClientFS + + def scandir(self) -> Generator[BucketEntry[BucketFS, pathlib.Path], None, None]: + if self._path is None or not self._path.root: + for bucket in self._client.list_buckets(): + yield BucketEntryFS(bucket.name, is_dir=True, raw=None) + return + assert self._path is not None + assert self._path.root is not None + scan_path = self._client.root / self._path.root + if self._prefix is not None: + scan_path = scan_path / self._prefix + for dir_entry in scan_path.glob("*"): + if dir_entry.is_dir(): + yield BucketEntryFS(dir_entry.name, is_dir=True, raw=None) + else: + file_path = pathlib.Path(dir_entry) + stat = file_path.stat() + file_size = stat.st_size + updated = int(round(stat.st_mtime)) + blob: Blob = BlobFS( + self._client.get_bucket(self._path), + name=dir_entry.name, + size=file_size, + updated=updated, + owner=None, + raw=file_path, + ) + yield BucketEntryFS( + name=dir_entry.name, + is_dir=False, + size=file_size, + last_modified=updated, + raw=blob, + ) + + +# +# Client Registration +# + + +# TODO: Maybe this should be dynamic, but I'm trying to see if we can +# hardcode it (atleast the base schemes) to get nice types flowing +# in cases where they would otherwise be lost. +_client_registry: Dict[str, Type[BucketClient]] = { + "": BucketClientFS, + "file": BucketClientFS, +} + +# Hold given client args for a scheme +_client_args_registry: Dict[str, Any] = {} +_instance_cache: Dict[str, Any] = {} +_fs_client: Optional["BucketClientFS"] = None +_fs_cache: Optional[pathlib.Path] = None + + +def register_client(scheme: str, type: Type[BucketClient]) -> None: + """Register a bucket client for use with certain scheme Pathy objects""" + global _client_registry + _client_registry[scheme] = type + + +def get_client(scheme: str) -> BucketClientType: + """Retrieve the bucket client for use with a given scheme""" + global _client_registry, _instance_cache, _fs_client, _client_args_registry + if _fs_client is not None: + return _fs_client # type: ignore + if scheme in _instance_cache: + return _instance_cache[scheme] + elif scheme in _client_registry: + kwargs = ( + _client_args_registry[scheme] if scheme in _client_args_registry else {} + ) + _instance_cache[scheme] = _client_registry[scheme](**kwargs) # type:ignore + return _instance_cache[scheme] + raise ValueError(f'There is no client registered to handle "{scheme}" paths') + + +def set_client_params(scheme: str, **kwargs: Any) -> None: + """Specify args to pass when instantiating a service-specific Client + object. This allows for passing credentials in whatever way your underlying + client library prefers.""" + global _client_registry, _instance_cache, _client_args_registry + _client_args_registry[scheme] = kwargs + if scheme in _instance_cache: + _instance_cache[scheme].recreate(**_client_args_registry[scheme]) + return None + + +def use_fs( + root: Optional[Union[str, pathlib.Path, bool]] = None +) -> Optional[BucketClientFS]: + """Use a path in the local file-system to store blobs and buckets. + + This is useful for development and testing situations, and for embedded + applications.""" + global _fs_client + # False - disable adapter + if root is False: + _fs_client = None + return None + + # None or True - enable FS adapter with default root + if root is None or root is True: + # Look up "data" folder of pathy package similar to spaCy + client_root = pathlib.Path(__file__).parent / "data" + else: + assert isinstance( + root, (str, pathlib.Path) + ), f"root is not a known type: {type(root)}" + client_root = pathlib.Path(root) + if not client_root.exists(): + client_root.mkdir(parents=True) + _fs_client = BucketClientFS(root=client_root) + return _fs_client + + +def get_fs_client() -> Optional[BucketClientFS]: + """Get the file-system client (or None)""" + global _fs_client + assert _fs_client is None or isinstance( + _fs_client, BucketClientFS + ), "invalid root type" + return _fs_client + + +def use_fs_cache( + root: Optional[Union[str, pathlib.Path, bool]] = None +) -> Optional[pathlib.Path]: + """Use a path in the local file-system to cache blobs and buckets. + + This is useful for when you want to avoid fetching large blobs multiple + times, or need to pass a local file path to a third-party library.""" + global _fs_cache + # False - disable adapter + if root is False: + _fs_cache = None + return None + + # None or True - enable FS cache with default root + if root is None or root is True: + # Use a temporary folder. Cache will be removed according to OS policy + cache_root = pathlib.Path(tempfile.mkdtemp()) + else: + assert isinstance( + root, (str, pathlib.Path) + ), f"root is not a known type: {type(root)}" + cache_root = pathlib.Path(root) + if not cache_root.exists(): + cache_root.mkdir(parents=True) + _fs_cache = cache_root + return cache_root + + +def get_fs_cache() -> Optional[pathlib.Path]: + """Get the folder that holds file-system cached blobs and timestamps.""" + global _fs_cache + assert _fs_cache is None or isinstance(_fs_cache, pathlib.Path), "invalid root type" + return _fs_cache + + +def clear_fs_cache(force: bool = False) -> None: + """Remove the existing file-system blob cache folder. + + Raises AssertionError if the cache path is unset or points to the + root of the file-system.""" + cache_path = get_fs_cache() + assert cache_path is not None, "no cache to clear" + resolved = cache_path.resolve() + assert str(resolved) != "/", f"refusing to remove a root path: {resolved}" + shutil.rmtree(str(resolved)) diff --git a/pathy/clients.py b/pathy/clients.py deleted file mode 100644 index 9e7058f..0000000 --- a/pathy/clients.py +++ /dev/null @@ -1,140 +0,0 @@ -import pathlib -import shutil -import tempfile -from typing import Any, Dict, Optional, Type, Union - -from .base import BucketClient, BucketClientType -from .file import BucketClientFS -from .gcs import BucketClientGCS - -# TODO: Maybe this should be dynamic, but I'm trying to see if we can -# hardcode it (atleast the base schemes) to get nice types flowing -# in cases where they would otherwise be lost. -_client_registry: Dict[str, Type[BucketClient]] = { - "": BucketClientFS, - "file": BucketClientFS, - "gs": BucketClientGCS, -} - -# Hold given client args for a scheme -_client_args_registry: Dict[str, Any] = {} -_instance_cache: Dict[str, Any] = {} -_fs_client: Optional[BucketClientFS] = None -_fs_cache: Optional[pathlib.Path] = None - - -def register_client(scheme: str, type: Type[BucketClient]) -> None: - """Register a bucket client for use with certain scheme Pathy objects""" - global _client_registry - _client_registry[scheme] = type - - -def get_client(scheme: str) -> BucketClientType: - """Retrieve the bucket client for use with a given scheme""" - global _client_registry, _instance_cache, _fs_client, _client_args_registry - if _fs_client is not None: - return _fs_client # type: ignore - if scheme in _instance_cache: - return _instance_cache[scheme] - elif scheme in _client_registry: - kwargs = ( - _client_args_registry[scheme] if scheme in _client_args_registry else {} - ) - _instance_cache[scheme] = _client_registry[scheme](**kwargs) # type:ignore - return _instance_cache[scheme] - raise ValueError(f'There is no client registered to handle "{scheme}" paths') - - -def set_client_params(scheme: str, **kwargs: Any) -> None: - """Specify args to pass when instantiating a service-specific Client - object. This allows for passing credentials in whatever way your underlying - client library prefers.""" - global _client_registry, _instance_cache, _client_args_registry - _client_args_registry[scheme] = kwargs - if scheme in _instance_cache: - _instance_cache[scheme].recreate(**_client_args_registry[scheme]) - return None - - -def use_fs( - root: Optional[Union[str, pathlib.Path, bool]] = None -) -> Optional[BucketClientFS]: - """Use a path in the local file-system to store blobs and buckets. - - This is useful for development and testing situations, and for embedded - applications.""" - global _fs_client - # False - disable adapter - if root is False: - _fs_client = None - return None - - # None or True - enable FS adapter with default root - if root is None or root is True: - # Look up "data" folder of pathy package similar to spaCy - client_root = pathlib.Path(__file__).parent / "data" - else: - assert isinstance( - root, (str, pathlib.Path) - ), f"root is not a known type: {type(root)}" - client_root = pathlib.Path(root) - if not client_root.exists(): - client_root.mkdir(parents=True) - _fs_client = BucketClientFS(root=client_root) - return _fs_client - - -def get_fs_client() -> Optional[BucketClientFS]: - """Get the file-system client (or None)""" - global _fs_client - assert _fs_client is None or isinstance( - _fs_client, BucketClientFS - ), "invalid root type" - return _fs_client - - -def use_fs_cache( - root: Optional[Union[str, pathlib.Path, bool]] = None -) -> Optional[pathlib.Path]: - """Use a path in the local file-system to cache blobs and buckets. - - This is useful for when you want to avoid fetching large blobs multiple - times, or need to pass a local file path to a third-party library.""" - global _fs_cache - # False - disable adapter - if root is False: - _fs_cache = None - return None - - # None or True - enable FS cache with default root - if root is None or root is True: - # Use a temporary folder. Cache will be removed according to OS policy - cache_root = pathlib.Path(tempfile.mkdtemp()) - else: - assert isinstance( - root, (str, pathlib.Path) - ), f"root is not a known type: {type(root)}" - cache_root = pathlib.Path(root) - if not cache_root.exists(): - cache_root.mkdir(parents=True) - _fs_cache = cache_root - return cache_root - - -def get_fs_cache() -> Optional[pathlib.Path]: - """Get the folder that holds file-system cached blobs and timestamps.""" - global _fs_cache - assert _fs_cache is None or isinstance(_fs_cache, pathlib.Path), "invalid root type" - return _fs_cache - - -def clear_fs_cache(force: bool = False) -> None: - """Remove the existing file-system blob cache folder. - - Raises AssertionError if the cache path is unset or points to the - root of the file-system.""" - cache_path = get_fs_cache() - assert cache_path is not None, "no cache to clear" - resolved = cache_path.resolve() - assert str(resolved) != "/", f"refusing to remove a root path: {resolved}" - shutil.rmtree(str(resolved)) diff --git a/pathy/file.py b/pathy/file.py deleted file mode 100644 index 055bf27..0000000 --- a/pathy/file.py +++ /dev/null @@ -1,275 +0,0 @@ -import os -import pathlib -import shutil -from dataclasses import dataclass, field -from io import DEFAULT_BUFFER_SIZE -from typing import Any, Dict, Generator, List, Optional - -from .base import ( - Blob, - Bucket, - BucketClient, - BucketEntry, - ClientError, - Pathy, - PathyScanDir, - PurePathy, - StreamableType, -) - - -class BucketEntryFS(BucketEntry["BucketFS", pathlib.Path]): - ... - - -@dataclass -class BlobFS(Blob["BucketFS", pathlib.Path]): - raw: pathlib.Path - bucket: "BucketFS" - - def delete(self) -> None: - """Delete a file-based blob.""" - file_folder: str = os.path.dirname(self.raw) - self.raw.unlink() - # NOTE: in buckets folders only exist if a file is contained in them. Mimic - # that behavior here by removing empty folders when the last file is removed. - if len(os.listdir(file_folder)) == 0: - shutil.rmtree(file_folder) - - def exists(self) -> bool: - return self.raw.exists() - - -@dataclass -class BucketFS(Bucket): - name: str - bucket: pathlib.Path - - def get_blob(self, blob_name: str) -> Optional[BlobFS]: # type:ignore[override] - native_blob = self.bucket / blob_name - if not native_blob.exists() or native_blob.is_dir(): - return None - stat = native_blob.stat() - # path.owner() raises KeyError if the owner's UID isn't known - # - # https://docs.python.org/3/library/pathlib.html#pathlib.Path.owner - owner: Optional[str] - try: - owner = native_blob.owner() - except KeyError: - owner = None - return BlobFS( - bucket=self, - owner=owner, - name=blob_name, - raw=native_blob, - size=stat.st_size, - updated=int(round(stat.st_mtime)), - ) - - def copy_blob( # type:ignore[override] - self, blob: BlobFS, target: "BucketFS", name: str - ) -> Optional[BlobFS]: - in_file = str(blob.bucket.bucket / blob.name) - out_file = str(target.bucket / name) - out_path = pathlib.Path(os.path.dirname(out_file)) - if not out_path.exists(): - out_path.mkdir(parents=True) - shutil.copy(in_file, out_file) - return None - - def delete_blob(self, blob: BlobFS) -> None: # type:ignore[override] - blob.delete() - - def delete_blobs(self, blobs: List[BlobFS]) -> None: # type:ignore[override] - for blob in blobs: - blob.delete() - - def exists(self) -> bool: - return self.bucket.exists() - - -@dataclass -class BucketClientFS(BucketClient): - # Root to store file-system buckets as children of - root: pathlib.Path = field(default_factory=lambda: pathlib.Path("/tmp/")) - - def full_path(self, path: Pathy) -> pathlib.Path: - if path.root is None: - raise ValueError(f"Invalid bucket name for path: {path}") - full_path = self.root.absolute() / path.root - if path.key is not None: - full_path = full_path / path.key - return full_path - - def exists(self, path: Pathy) -> bool: - """Return True if the path exists as a file or folder on disk""" - return self.full_path(path).exists() - - def is_dir(self, path: Pathy) -> bool: - return self.full_path(path).is_dir() - - def rmdir(self, path: Pathy) -> None: - full_path = self.full_path(path) - return shutil.rmtree(str(full_path)) - - def open( - self, - path: Pathy, - *, - mode: str = "r", - buffering: int = DEFAULT_BUFFER_SIZE, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, - ) -> StreamableType: - if self.lookup_bucket(path) is None: - raise ClientError(message=f'bucket "{path.root}" does not exist', code=404) - - full_path = self.full_path(path) - if not full_path.exists(): - if full_path.name != "": - full_path = full_path.parent - full_path.mkdir(parents=True, exist_ok=True) - return super().open( - path, - mode=mode, - buffering=buffering, - encoding=encoding, - errors=errors, - newline=newline, - ) - - def make_uri(self, path: PurePathy) -> str: - if not path.root: - raise ValueError(f"cannot make a URI to an invalid bucket: {path.root}") - full_path = self.root.absolute() / path.root - if path.key is not None: - full_path /= path.key - result = f"file://{full_path}" - return result - - def create_bucket(self, path: PurePathy) -> Bucket: - if not path.root: - raise ValueError(f"Invalid bucket name: {path.root}") - bucket_path: pathlib.Path = self.root / path.root - if bucket_path.exists(): - raise FileExistsError(f"Bucket already exists at: {bucket_path}") - bucket_path.mkdir(parents=True, exist_ok=True) - return BucketFS(str(path.root), bucket=bucket_path) - - def delete_bucket(self, path: PurePathy) -> None: - bucket_path: pathlib.Path = self.root / str(path.root) - if bucket_path.exists(): - shutil.rmtree(bucket_path) - - def lookup_bucket(self, path: PurePathy) -> Optional[BucketFS]: - if path.root: - bucket_path: pathlib.Path = self.root / path.root - if bucket_path.exists(): - return BucketFS(str(path.root), bucket=bucket_path) - return None - - def get_bucket(self, path: PurePathy) -> BucketFS: - if not path.root: - raise ValueError(f"path has an invalid bucket_name: {path.root}") - bucket_path: pathlib.Path = self.root / path.root - if bucket_path.is_dir(): - return BucketFS(str(path.root), bucket=bucket_path) - raise FileNotFoundError(f"Bucket {path.root} does not exist!") - - def list_buckets(self, **kwargs: Dict[str, Any]) -> Generator[BucketFS, None, None]: - for f in self.root.glob("*"): - if f.is_dir(): - yield BucketFS(f.name, f) - - def scandir( - self, - path: Pathy = None, - prefix: Optional[str] = None, - delimiter: Optional[str] = None, - ) -> PathyScanDir: - return _FSScanDir(client=self, path=path, prefix=prefix, delimiter=delimiter) - - def list_blobs( - self, - path: PurePathy, - prefix: Optional[str] = None, - delimiter: Optional[str] = None, - include_dirs: bool = False, - ) -> Generator[BlobFS, None, None]: - assert path.root is not None - bucket = self.get_bucket(path) - scan_path = self.root / path.root - if prefix is not None: - scan_path = scan_path / prefix - elif prefix is not None and path.key is not None: - scan_path = scan_path / path.key - - # Path to a file - if scan_path.exists() and not scan_path.is_dir(): - stat = scan_path.stat() - file_size = stat.st_size - updated = int(round(stat.st_mtime_ns * 1000)) - yield BlobFS( - bucket, - name=scan_path.name, - size=file_size, - updated=updated, - owner=None, - raw=scan_path, - ) - - # Yield blobs for each file - for file_path in scan_path.rglob("*"): - if file_path.is_dir(): - continue - stat = file_path.stat() - file_size = stat.st_size - updated = int(round(stat.st_mtime_ns * 1000)) - yield BlobFS( - bucket, - name=f"{prefix if prefix is not None else ''}{file_path.name}", - size=file_size, - updated=updated, - owner=None, - raw=file_path, - ) - - -class _FSScanDir(PathyScanDir): - _client: BucketClientFS - - def scandir(self) -> Generator[BucketEntry[BucketFS, pathlib.Path], None, None]: - if self._path is None or not self._path.root: - for bucket in self._client.list_buckets(): - yield BucketEntryFS(bucket.name, is_dir=True, raw=None) - return - assert self._path is not None - assert self._path.root is not None - scan_path = self._client.root / self._path.root - if self._prefix is not None: - scan_path = scan_path / self._prefix - for dir_entry in scan_path.glob("*"): - if dir_entry.is_dir(): - yield BucketEntryFS(dir_entry.name, is_dir=True, raw=None) - else: - file_path = pathlib.Path(dir_entry) - stat = file_path.stat() - file_size = stat.st_size - updated = int(round(stat.st_mtime)) - blob: Blob = BlobFS( - self._client.get_bucket(self._path), - name=dir_entry.name, - size=file_size, - updated=updated, - owner=None, - raw=file_path, - ) - yield BucketEntryFS( - name=dir_entry.name, - is_dir=False, - size=file_size, - last_modified=updated, - raw=blob, - ) diff --git a/pathy/gcs.py b/pathy/gcs.py index 62d60dc..00259e0 100644 --- a/pathy/gcs.py +++ b/pathy/gcs.py @@ -9,6 +9,7 @@ ClientError, PathyScanDir, PurePathy, + register_client, ) try: @@ -272,3 +273,6 @@ def scandir(self) -> Generator[BucketEntryGCS, None, None]: if response.next_page_token is None: break continuation_token = response.next_page_token + + +register_client("gs", BucketClientGCS) \ No newline at end of file diff --git a/pathy/tests/conftest.py b/pathy/tests/conftest.py index b2718be..c4db52c 100644 --- a/pathy/tests/conftest.py +++ b/pathy/tests/conftest.py @@ -7,8 +7,7 @@ import pytest -from pathy import Pathy, use_fs, use_fs_cache -from pathy.clients import set_client_params +from pathy import Pathy, use_fs, use_fs_cache, set_client_params from pathy.gcs import has_gcs has_credentials = "GCS_CREDENTIALS" in os.environ diff --git a/pathy/tests/test_base.py b/pathy/tests/test_base.py index 12c20e8..46b9df6 100644 --- a/pathy/tests/test_base.py +++ b/pathy/tests/test_base.py @@ -6,8 +6,7 @@ import pytest import spacy - -from pathy import ( +from .. import ( BasePath, Blob, BlobStat, @@ -22,8 +21,8 @@ use_fs, use_fs_cache, ) -from pathy.about import __version__ +from ..about import __version__ from .conftest import TEST_ADAPTERS # todo: test samefile/touch method @@ -784,7 +783,6 @@ def test_api_raises_with_no_known_bucket_clients_for_a_scheme( assert isinstance(accessor.client(path), BucketClientFS) -@pytest.mark.skip("requires: https://github.com/explosion/thinc/pull/465") def test_api_export_spacy_model(temp_folder: Path) -> None: """spaCy model loading is one of the things we need to support""" use_fs(temp_folder) @@ -795,6 +793,7 @@ def test_api_export_spacy_model(temp_folder: Path) -> None: model.to_disk(output_path) sorted_entries = sorted([str(p) for p in output_path.glob("*")]) expected_entries = [ + "gs://my-bucket/models/my_model/config.cfg", "gs://my-bucket/models/my_model/meta.json", "gs://my-bucket/models/my_model/tokenizer", "gs://my-bucket/models/my_model/vocab", diff --git a/pathy/tests/test_clients.py b/pathy/tests/test_clients.py index 3a112bc..fb14167 100644 --- a/pathy/tests/test_clients.py +++ b/pathy/tests/test_clients.py @@ -6,14 +6,14 @@ from pathy import Pathy, get_client from pathy.base import BucketClient -from pathy.clients import ( +from pathy import ( get_fs_client, register_client, set_client_params, use_fs, use_fs_cache, ) -from pathy.file import BucketClientFS +from pathy import BucketClientFS from pathy.gcs import BucketClientGCS from .conftest import TEST_ADAPTERS diff --git a/pathy/tests/test_file.py b/pathy/tests/test_file.py index e91c580..7378b82 100644 --- a/pathy/tests/test_file.py +++ b/pathy/tests/test_file.py @@ -5,7 +5,7 @@ import pytest from pathy import Pathy, get_client -from pathy.file import BlobFS, BucketClientFS, BucketFS +from pathy import BlobFS, BucketClientFS, BucketFS def raise_owner(self: Any) -> None: