From fd090215efc392e76d167e9c0203520c251578da Mon Sep 17 00:00:00 2001 From: justindujardin Date: Tue, 20 Apr 2021 07:24:32 -0700 Subject: [PATCH] fix(auto-import): move public exports into __init__.py - it seems that pylance can't manage to auto-import symbols with short paths unless they're defined in __init__.py. - it's supposed to be the case that it just magically picks the shortest import path, but it doesn't work in my testing. BREAKING CHANGE: Previously you could import symbols directly from their files in the module, e.g. `from pathy.base import Pathy`. Now you must import them from the base package, e.g. `from pathy import Pathy`. --- .coveragerc | 4 + README.md | 6 +- pathy/__init__.py | 1349 ++++++++++++++++- pathy/_gcs.py | 255 ++++ pathy/{tests => _tests}/__init__.py | 0 pathy/{tests => _tests}/conftest.py | 2 +- .../fixtures/tar_but_not_gzipped.tar.gz | Bin pathy/{tests => _tests}/test_base.py | 10 +- pathy/{tests => _tests}/test_cli.py | 0 pathy/{tests => _tests}/test_clients.py | 16 +- pathy/{tests => _tests}/test_file.py | 3 +- pathy/{tests => _tests}/test_gcs.py | 3 +- pathy/_types.py | 15 + pathy/base.py | 1299 ---------------- pathy/cli.py | 2 +- pathy/gcs.py | 276 +--- tools/clean.sh | 4 + tools/lint.sh | 2 +- tools/test.sh | 2 +- 19 files changed, 1635 insertions(+), 1613 deletions(-) create mode 100644 .coveragerc create mode 100644 pathy/_gcs.py rename pathy/{tests => _tests}/__init__.py (100%) rename pathy/{tests => _tests}/conftest.py (97%) rename pathy/{tests => _tests}/fixtures/tar_but_not_gzipped.tar.gz (100%) rename pathy/{tests => _tests}/test_base.py (99%) rename pathy/{tests => _tests}/test_cli.py (100%) rename pathy/{tests => _tests}/test_clients.py (92%) rename pathy/{tests => _tests}/test_file.py (88%) rename pathy/{tests => _tests}/test_gcs.py (91%) create mode 100644 pathy/_types.py delete mode 100644 pathy/base.py create mode 100644 tools/clean.sh diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..fd45cbc --- /dev/null +++ b/.coveragerc @@ -0,0 +1,4 @@ +[run] +omit = + # omit tests + */_tests/* diff --git a/README.md b/README.md index d01a4d2..2cb17fb 100644 --- a/README.md +++ b/README.md @@ -304,7 +304,7 @@ Determine if this path points to the same location as other_path. ## stat method ```python (doc) -Pathy.stat(self: 'Pathy') -> pathy.base.BlobStat +Pathy.stat(self: 'Pathy') -> pathy.BlobStat ``` Returns information about this bucket path. @@ -353,7 +353,7 @@ Stat for a bucket item ```python (doc) use_fs( root: Optional[str, pathlib.Path, bool] = None, -) -> Optional[pathy.base.BucketClientFS] +) -> Optional[pathy.BucketClientFS] ``` Use a path in the local file-system to store blobs and buckets. @@ -364,7 +364,7 @@ applications. # get_fs_client function ```python (doc) -get_fs_client() -> Optional[pathy.base.BucketClientFS] +get_fs_client() -> Optional[pathy.BucketClientFS] ``` Get the file-system client (or None) diff --git a/pathy/__init__.py b/pathy/__init__.py index 3384de1..ba17fe9 100644 --- a/pathy/__init__.py +++ b/pathy/__init__.py @@ -1,32 +1,1319 @@ -from .base import ( - BasePath as BasePath, - Blob as Blob, - BlobStat as BlobStat, - Bucket as Bucket, - BucketClient as BucketClient, - BucketEntry as BucketEntry, - BucketsAccessor as BucketsAccessor, - ClientError as ClientError, - FluidPath as FluidPath, - Pathy as Pathy, - PurePathy as PurePathy, - BlobFS as BlobFS, - BucketClientFS as BucketClientFS, - BucketEntryFS as BucketEntryFS, - BucketFS as BucketFS, - clear_fs_cache as clear_fs_cache, - get_client as get_client, - get_fs_cache as get_fs_cache, - get_fs_client as get_fs_client, - register_client as register_client, - set_client_params as set_client_params, - use_fs as use_fs, - use_fs_cache as use_fs_cache, -) -from .gcs import ( - BlobGCS as BlobGCS, - BucketClientGCS as BucketClientGCS, - BucketEntryGCS as BucketEntryGCS, - BucketGCS as BucketGCS, - has_gcs as has_gcs, +import importlib +import os +import pathlib +import shutil +import tempfile +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from io import DEFAULT_BUFFER_SIZE +from pathlib import Path, PurePath +from typing import ( + IO, + Any, + Callable, + ContextManager, + Dict, + Generator, + Iterator, + List, + Optional, + Tuple, + Type, + TypeVar, + Union, + cast, ) + +import smart_open + +from ._types import _Accessor, _PosixFlavour, _WindowsFlavour + +SUBCLASS_ERROR = "must be implemented in a subclass" + +StreamableType = IO[Any] +FluidPath = Union["Pathy", "BasePath"] +BucketType = TypeVar("BucketType") +BucketBlobType = TypeVar("BucketBlobType") + +_windows_flavour: Any = _WindowsFlavour() # type:ignore +_posix_flavour: Any = _PosixFlavour() # type:ignore + + +@dataclass +class ClientError(BaseException): + message: str + code: Optional[int] + + def __str__(self) -> str: + return self.__repr__() + + def __repr__(self) -> str: + return f"({self.code}) {self.message}" + + +@dataclass +class BlobStat: + """Stat for a bucket item""" + + name: str + size: Optional[int] + last_modified: Optional[int] + + +@dataclass +class Blob: + bucket: Any + name: str + size: Optional[int] + updated: Optional[int] + owner: Optional[str] + raw: Any + + def delete(self) -> None: + raise NotImplementedError(SUBCLASS_ERROR) + + def exists(self) -> bool: + raise NotImplementedError(SUBCLASS_ERROR) + + +class BucketEntry: + """A single item returned from scanning a path""" + + name: str + _is_dir: bool + _stat: BlobStat + raw: Optional[Blob] + + def __init__( + self, + name: str, + is_dir: bool = False, + size: int = -1, + last_modified: int = -1, + raw: Optional[Blob] = None, + ): + self.name = name + self.raw = raw + self._is_dir = is_dir + self._stat = BlobStat(name=name, size=size, last_modified=last_modified) + + def __repr__(self) -> str: + return "{}(name={}, is_dir={}, stat={})".format( + type(self).__name__, self.name, self._is_dir, self._stat + ) + + def inode(self, *args: Any, **kwargs: Dict[str, Any]) -> None: + return None + + def is_dir(self) -> bool: + return self._is_dir + + def is_file(self) -> bool: + return not self._is_dir + + def is_symlink(self) -> bool: + return False + + def stat(self) -> BlobStat: + return self._stat + + +@dataclass +class Bucket: + def get_blob(self, blob_name: str) -> Optional[Blob]: + raise NotImplementedError(SUBCLASS_ERROR) + + def copy_blob(self, blob: Blob, target: "Bucket", name: str) -> Optional[Blob]: + raise NotImplementedError(SUBCLASS_ERROR) + + def delete_blob(self, blob: Blob) -> None: + raise NotImplementedError(SUBCLASS_ERROR) + + def delete_blobs(self, blobs: List[Blob]) -> None: + raise NotImplementedError(SUBCLASS_ERROR) + + def exists(self) -> bool: + raise NotImplementedError(SUBCLASS_ERROR) + + +class BucketClient: + """Base class for a client that interacts with a bucket-based storage system.""" + + def recreate(self, **kwargs: Any) -> None: + """Recreate any underlying bucket client adapter with the given kwargs""" + + def open( + self, + path: "Pathy", + *, + mode: str = "r", + buffering: int = DEFAULT_BUFFER_SIZE, + encoding: Optional[str] = None, + errors: Optional[str] = None, + newline: Optional[str] = None, + ) -> StreamableType: + client_params = {} + if hasattr(self, "client_params"): + client_params = getattr(self, "client_params") + + return smart_open.open( + self.make_uri(path), + mode=mode, + buffering=buffering, + encoding=encoding, + errors=errors, + newline=newline, + transport_params=client_params, + # Disable de/compression based on the file extension + ignore_ext=True, + ) # type:ignore + + def make_uri(self, path: "Pathy") -> str: + return path.as_uri() + + def is_dir(self, path: "Pathy") -> bool: + return any(self.list_blobs(path, prefix=path.prefix)) + + def rmdir(self, path: "Pathy") -> None: + return None + + def exists(self, path: "Pathy") -> bool: + raise NotImplementedError(SUBCLASS_ERROR) + + def lookup_bucket(self, path: "Pathy") -> Optional[Bucket]: + raise NotImplementedError(SUBCLASS_ERROR) + + def get_bucket(self, path: "Pathy") -> Bucket: + raise NotImplementedError(SUBCLASS_ERROR) + + def list_buckets(self) -> Generator[Bucket, None, None]: + raise NotImplementedError(SUBCLASS_ERROR) + + def list_blobs( + self, + path: "Pathy", + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + include_dirs: bool = False, + ) -> Generator[Blob, None, None]: + raise NotImplementedError(SUBCLASS_ERROR) + + def scandir( + self, + path: Optional["Pathy"] = None, + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + ) -> "PathyScanDir": + raise NotImplementedError(SUBCLASS_ERROR) + + def create_bucket(self, path: "Pathy") -> Bucket: + raise NotImplementedError(SUBCLASS_ERROR) + + def delete_bucket(self, path: "Pathy") -> None: + raise NotImplementedError(SUBCLASS_ERROR) + + +class _PathyFlavour(_PosixFlavour): # type:ignore + sep: str + is_supported = True + + def parse_parts(self, parts: List[str]) -> Tuple[str, str, List[str]]: + parse_tuple: Tuple[str, str, List[str]] = super().parse_parts( # type:ignore + parts + ) + drv, root, parsed = parse_tuple + if len(parsed) and parsed[0].endswith(":"): + if len(parsed) < 2: + raise ValueError("need atleast two parts") + # Restore the + drv = parsed[0] # scheme: + root = parsed[1] # bucket_name + for part in parsed[1:]: + if part == "..": + index = parsed.index(part) + parsed.pop(index - 1) + parsed.remove(part) + return drv, root, parsed + + def make_uri(self, path: "Pathy") -> str: + uri: str = super().make_uri(path) # type:ignore + return uri.replace("file:///", "gs://") + + +class PurePathy(PurePath): + """PurePath subclass for bucket storage.""" + + _flavour = _PathyFlavour() + __slots__ = () + + @property + def scheme(self) -> str: + """Return the scheme portion of this path. A path's scheme is the leading + few characters. In a website you would see a scheme of "http" or "https". + + Consider a few examples: + + ```python + assert Pathy("gs://foo/bar").scheme == "gs" + assert Pathy("file:///tmp/foo/bar").scheme == "file" + assert Pathy("/dev/null").scheme == "" + + """ + # If there is no drive, return nothing + if self.drive == "": + return "" + # This is an assumption of mine. I think it's fine, but let's + # cause an error if it's not the case. + assert self.drive[-1] == ":", "drive should end with :" + return self.drive[:-1] + + @property + def bucket(self) -> "Pathy": + """Return a new instance of only the bucket path.""" + self._absolute_path_validation() + return cast(Pathy, type(self)(f"{self.drive}//{self.root}")) + + @property + def key(self) -> Optional["Pathy"]: + """Return a new instance of only the key path.""" + self._absolute_path_validation() + key = self._flavour.sep.join(self.parts[2:]) + if not key or len(self.parts) < 2: + return None + return cast(Pathy, type(self)(key)) + + @property + def prefix(self) -> str: + sep = self._flavour.sep + str(self) + key = self.key + if not key: + return "" + key_name = str(key) + if not key_name.endswith(sep): + return key_name + sep + return key_name + + def _absolute_path_validation(self) -> None: + if not self.is_absolute(): + raise ValueError("relative paths has no bucket/key specification") + + @classmethod + def _format_parsed_parts(cls, drv: str, root: str, parts: List[str]) -> str: + # Bucket path "gs://foo/bar" + join_fn: Callable[[List[str]], str] = cls._flavour.join # type:ignore + res: str + if drv and root: + res = f"{drv}//{root}/" + join_fn(parts[2:]) + # Absolute path + elif drv or root: + res = drv + root + join_fn(parts[1:]) + else: + # Relative path + res = join_fn(parts) + return res + + +_SUPPORTED_OPEN_MODES = {"r", "rb", "tr", "rt", "w", "wb", "bw", "wt", "tw"} + + +class _PathyExtensions: + def ls(self) -> Generator["BlobStat", None, None]: + blobs: "PathyScanDir" = self._accessor.scandir(self) # type:ignore + for blob in blobs: + any_blob: Any = blob + if hasattr(any_blob, "_stat"): + yield any_blob._stat + elif isinstance(any_blob, os.DirEntry): + os_blob: Any = blob + stat = os_blob.stat() + file_size = stat.st_size + updated = int(round(stat.st_mtime)) + yield BlobStat(name=os_blob.name, size=file_size, last_modified=updated) + + +class BasePath(_PathyExtensions, Path): + # NOTE: pathlib normally takes care of this, but the logic checks + # for specifically "Path" type class in __new__ so we need to + # set the flavour manually based on the OS. + _flavour = _windows_flavour if os.name == "nt" else _posix_flavour # type:ignore + + +class BucketsAccessor(_Accessor): # type:ignore + """Access data from blob buckets""" + + _client: Optional[BucketClient] + + def client(self, path: "Pathy") -> BucketClient: + return get_client(path.scheme) + + def get_blob(self, path: "Pathy") -> Optional[Blob]: + """Get the blob associated with a path or return None""" + if not path.root: + return None + bucket = self.client(path).lookup_bucket(path) + if bucket is None: + return None + key_name = str(path.key) + return bucket.get_blob(key_name) + + def unlink(self, path: "Pathy") -> None: + """Delete a link to a blob in a bucket.""" + bucket = self.client(path).get_bucket(path) + blob: Optional[Blob] = bucket.get_blob(str(path.key)) + if blob is None: + raise FileNotFoundError(path) + blob.delete() + + def stat(self, path: "Pathy") -> BlobStat: + bucket = self.client(path).get_bucket(path) + blob: Optional[Blob] = bucket.get_blob(str(path.key)) + if blob is None: + raise FileNotFoundError(path) + return BlobStat(name=str(blob), size=blob.size, last_modified=blob.updated) + + def is_dir(self, path: "Pathy") -> bool: + if str(path) == path.root: + return True + if self.get_blob(path) is not None: + return False + return self.client(path).is_dir(path) + + def exists(self, path: "Pathy") -> bool: + client = self.client(path) + if not path.root: + return any(client.list_buckets()) + try: + bucket = client.lookup_bucket(path) + except ClientError: + return False + if bucket is None or not bucket.exists(): + return False + if not path.key: + return True + + key_name = str(path.key) + blob: Optional[Blob] = bucket.get_blob(key_name) + if blob is not None: + return blob.exists() + # Determine if the path exists according to the current adapter + return client.exists(path) + + def scandir(self, path: "Pathy") -> "PathyScanDir": + return self.client(path).scandir(path, prefix=path.prefix) + + def listdir(self, path: "Pathy") -> Generator[str, None, None]: + with self.scandir(path) as entries: + for entry in entries: + yield entry.name + + def open( + self, + path: "Pathy", + *, + mode: str = "r", + buffering: int = -1, + encoding: Optional[str] = None, + errors: Optional[str] = None, + newline: Optional[str] = None, + ) -> StreamableType: + return self.client(path).open( + path, + mode=mode, + buffering=buffering, + encoding=encoding, + errors=errors, + newline=newline, + ) + + def owner(self, path: "Pathy") -> Optional[str]: + blob: Optional[Blob] = self.get_blob(path) + return blob.owner if blob is not None else None + + def resolve(self, path: "Pathy", strict: bool = False) -> "Pathy": + path_parts = str(path).replace(path.drive, "") + return Pathy(f"{path.drive}{os.path.abspath(path_parts)}") + + def rename(self, path: "Pathy", target: "Pathy") -> None: + client: BucketClient = self.client(path) + bucket: Bucket = client.get_bucket(path) + target_bucket: Bucket = client.get_bucket(target) + + # Single file + if not self.is_dir(path): + from_blob: Optional[Blob] = bucket.get_blob(str(path.key)) + if from_blob is None: + raise FileNotFoundError(f'source file "{path}" does not exist') + target_bucket.copy_blob(from_blob, target_bucket, str(target.key)) + bucket.delete_blob(from_blob) + return + + # Folder with objects + sep: str = path._flavour.sep # type:ignore + blobs = list(client.list_blobs(path, prefix=path.prefix, delimiter=sep)) + # First rename + for blob in blobs: + target_key_name = blob.name.replace(str(path.key), str(target.key)) + target_bucket.copy_blob(blob, target_bucket, target_key_name) + # Then delete the sources + for blob in blobs: + bucket.delete_blob(blob) + + def replace(self, path: "Pathy", target: "Pathy") -> None: + return self.rename(path, target) + + def rmdir(self, path: "Pathy") -> None: + client: BucketClient = self.client(path) + key_name = str(path.key) if path.key is not None else None + bucket: Bucket = client.get_bucket(path) + blobs = list(client.list_blobs(path, prefix=key_name)) + bucket.delete_blobs(blobs) + # The path is just the bucket + if key_name is None: + client.delete_bucket(path) + elif client.is_dir(path): + client.rmdir(path) + + def mkdir(self, path: "Pathy", mode: int) -> None: + client: BucketClient = self.client(path) + bucket: Optional[Bucket] = client.lookup_bucket(path) + if bucket is None or not bucket.exists(): + client.create_bucket(path) + elif path.key is not None: + assert isinstance(path, Pathy) + blob: Optional[Blob] = bucket.get_blob(str(path.key)) + if blob is not None and blob.exists(): + raise OSError(f"Blob already exists: {path}") + + +class Pathy(Path, PurePathy, _PathyExtensions): + """Subclass of `pathlib.Path` that works with bucket APIs.""" + + __slots__ = () + _accessor: "BucketsAccessor" + _default_accessor = BucketsAccessor() + _NOT_SUPPORTED_MESSAGE = "{method} is an unsupported bucket operation" + + def __truediv__(self, key: Union[str, Path, "Pathy", PurePathy]) -> "Pathy": # type: ignore[override] + return super().__truediv__(key) # type:ignore + + def __rtruediv__(self, key: Union[str, Path, "Pathy", PurePathy]) -> "Pathy": # type: ignore[override] + return super().__rtruediv__(key) # type:ignore + + def _init(self: "Pathy", template: Optional[Any] = None) -> None: + super()._init(template) # type:ignore + if template is None: + self._accessor = Pathy._default_accessor + else: + self._accessor = template._accessor + + @classmethod + def fluid(cls, path_candidate: Union[str, FluidPath]) -> FluidPath: + """Infer either a Pathy or pathlib.Path from an input path or string. + + The returned type is a union of the potential `FluidPath` types and will + type-check correctly against the minimum overlapping APIs of all the input + types. + + If you need to use specific implementation details of a type, "narrow" the + return of this function to the desired type, e.g. + + ```python + from pathy import FluidPath, Pathy + + fluid_path: FluidPath = Pathy.fluid("gs://my_bucket/foo.txt") + # Narrow the type to a specific class + assert isinstance(fluid_path, Pathy), "must be Pathy" + # Use a member specific to that class + assert fluid_path.prefix == "foo.txt/" + ``` + """ + from_path: FluidPath = Pathy(path_candidate) + if from_path.root in ["/", ""]: + from_path = BasePath(path_candidate) + return from_path + + @classmethod + def from_bucket(cls, bucket_name: str) -> "Pathy": + """Initialize a Pathy from a bucket name. This helper adds a trailing slash and + the appropriate prefix. + + ```python + from pathy import Pathy + + assert str(Pathy.from_bucket("one")) == "gs://one/" + assert str(Pathy.from_bucket("two")) == "gs://two/" + ``` + """ + return Pathy(f"gs://{bucket_name}/") # type:ignore + + @classmethod + def to_local(cls, blob_path: Union["Pathy", str], recurse: bool = True) -> Path: + """Download and cache either a blob or a set of blobs matching a prefix. + + The cache is sensitive to the file updated time, and downloads new blobs + as their updated timestamps change.""" + cache_folder = get_fs_cache() + if cache_folder is None: + raise ValueError( + 'cannot get and cache a blob without first calling "use_fs_cache"' + ) + + cache_folder.mkdir(exist_ok=True, parents=True) + + in_path: Pathy + if not isinstance(blob_path, Pathy): + in_path = Pathy(blob_path) + else: + in_path = blob_path + + cache_blob: Path = cache_folder.absolute() / in_path.root + if in_path.key is not None: + cache_blob /= in_path.key + cache_time: Path = ( + cache_folder.absolute() / in_path.root / f"{in_path.key}.time" + ) + # Keep a cache of downloaded files. Fetch new ones when: + # - the file isn't in the cache + # - cached_stat.updated != latest_stat.updated + if cache_blob.exists() and cache_time.exists(): + fs_time: str = cache_time.read_text() + gcs_stat: BlobStat = in_path.stat() + # If the times match, return the cached blob + if fs_time == str(gcs_stat.last_modified): + return cache_blob + # remove the cache files because they're out of date + cache_blob.unlink() + cache_time.unlink() + + # If the file isn't in the cache, download it + if not cache_blob.exists(): + # Is a blob + if in_path.is_file(): + dest_folder = cache_blob.parent + dest_folder.mkdir(exist_ok=True, parents=True) + cache_blob.write_bytes(in_path.read_bytes()) + blob_stat: BlobStat = in_path.stat() + cache_time.write_text(str(blob_stat.last_modified)) + elif recurse: + # If not a specific blob, enumerate all the blobs under + # the path and cache them, then return the cache folder + for blob in in_path.rglob("*"): + Pathy.to_local(blob, recurse=False) + return cache_blob + + def ls(self: "Pathy") -> Generator["BlobStat", None, None]: + """List blob names with stat information under the given path. + + This is considerably faster than using iterdir if you also need + the stat information for the enumerated blobs. + + Yields BlobStat objects for each found blob. + """ + yield from super(Pathy, self).ls() + + def stat(self: "Pathy") -> BlobStat: # type: ignore[override] + """Returns information about this bucket path.""" + self._absolute_path_validation() + if not self.key: + raise ValueError("cannot stat a bucket without a key") + return cast(BlobStat, super().stat()) + + def exists(self) -> bool: + """Returns True if the path points to an existing bucket, blob, or prefix.""" + self._absolute_path_validation() + return self._accessor.exists(self) + + def is_dir(self: "Pathy") -> bool: + """Determine if the path points to a bucket or a prefix of a given blob + in the bucket. + + Returns True if the path points to a bucket or a blob prefix. + Returns False if it points to a blob or the path doesn't exist. + """ + self._absolute_path_validation() + if self.bucket and not self.key: + return True + return self._accessor.is_dir(self) + + def is_file(self: "Pathy") -> bool: + """Determine if the path points to a blob in the bucket. + + Returns True if the path points to a blob. + Returns False if it points to a bucket or blob prefix, or if the path doesn’t + exist. + """ + self._absolute_path_validation() + if not self.bucket or not self.key: + return False + try: + return bool(self.stat()) + except (ClientError, FileNotFoundError): + return False + + def iterdir(self: "Pathy") -> Generator["Pathy", None, None]: + """Iterate over the blobs found in the given bucket or blob prefix path.""" + self._absolute_path_validation() + yield from super().iterdir() # type:ignore + + def glob(self: "Pathy", pattern: str) -> Generator["Pathy", None, None]: + """Perform a glob match relative to this Pathy instance, yielding all matched + blobs.""" + yield from super().glob(pattern) # type:ignore + + def rglob(self: "Pathy", pattern: str) -> Generator["Pathy", None, None]: + """Perform a recursive glob match relative to this Pathy instance, yielding + all matched blobs. Imagine adding "**/" before a call to glob.""" + yield from super().rglob(pattern) # type:ignore + + def open( # type:ignore + self: "Pathy", + mode: str = "r", + buffering: int = DEFAULT_BUFFER_SIZE, + encoding: Optional[str] = None, + errors: Optional[str] = None, + newline: Optional[str] = None, + ) -> StreamableType: + """Open the given blob for streaming. This delegates to the `smart_open` + library that handles large file streaming for a number of bucket API + providers.""" + self._absolute_path_validation() + if mode not in _SUPPORTED_OPEN_MODES: + raise ValueError( + "supported modes are {} got {}".format(_SUPPORTED_OPEN_MODES, mode) + ) + if buffering == 0 or buffering == 1: + raise ValueError( + "supported buffering values are only block sizes, no 0 or 1" + ) + if "b" in mode and encoding: + raise ValueError("binary mode doesn't take an encoding argument") + + # Leftover pathlib internals stuff + if hasattr(self, "_closed") and self._closed: # type:ignore + self._raise_closed() # type:ignore + return self._accessor.open( + self, + mode=mode, + buffering=buffering, + encoding=encoding, + errors=errors, + newline=newline, + ) + + def owner(self: "Pathy") -> Optional[str]: # type:ignore[override] + """Returns the name of the user that owns the bucket or blob + this path points to. Returns None if the owner is unknown or + not supported by the bucket API provider.""" + self._absolute_path_validation() + if not self.is_file(): + raise FileNotFoundError(str(self)) + return self._accessor.owner(self) + + def resolve(self, strict: bool = False) -> "Pathy": + """Resolve the given path to remove any relative path specifiers. + + ```python + from pathy import Pathy + + path = Pathy("gs://my_bucket/folder/../blob") + assert path.resolve() == Pathy("gs://my_bucket/blob") + ``` + """ + self._absolute_path_validation() + return self._accessor.resolve(self, strict=strict) + + def rename(self: "Pathy", target: Union[str, PurePath]) -> "Pathy": # type:ignore + """Rename this path to the given target. + + If the target exists and is a file, it will be replaced silently if the user + has permission. + + If path is a blob prefix, it will replace all the blobs with the same prefix + to match the target prefix.""" + self._absolute_path_validation() + self_type = type(self) + result = target if isinstance(target, self_type) else self_type(target) + result._absolute_path_validation() # type:ignore + super().rename(result) + return result + + def replace(self: "Pathy", target: Union[str, PurePath]) -> "Pathy": # type:ignore + """Renames this path to the given target. + + If target points to an existing path, it will be replaced.""" + return self.rename(target) + + def rmdir(self: "Pathy") -> None: + """Removes this bucket or blob prefix. It must be empty.""" + self._absolute_path_validation() + if self.is_file(): + raise NotADirectoryError() + if not self.is_dir(): + raise FileNotFoundError() + super().rmdir() + + def samefile(self: "Pathy", other_path: Union[str, bytes, int, Path]) -> bool: + """Determine if this path points to the same location as other_path.""" + self._absolute_path_validation() + if not isinstance(other_path, Path): + other_path = type(self)(other_path) # type:ignore + assert isinstance(other_path, Pathy) + return ( + self.bucket == other_path.bucket and self.key == self.key and self.is_file() + ) + + def touch(self: "Pathy", mode: int = 0o666, exist_ok: bool = True) -> None: + """Create a blob at this path. + + If the blob already exists, the function succeeds if exist_ok is true + (and its modification time is updated to the current time), otherwise + FileExistsError is raised. + """ + if self.exists() and not exist_ok: + raise FileExistsError() + self.write_text("") + + def mkdir( + self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False + ) -> None: + """Create a bucket from the given path. Since bucket APIs only have implicit + folder structures (determined by the existence of a blob with an overlapping + prefix) this does nothing other than create buckets. + + If parents is False, the bucket will only be created if the path points to + exactly the bucket and nothing else. If parents is true the bucket will be + created even if the path points to a specific blob. + + The mode param is ignored. + + Raises FileExistsError if exist_ok is false and the bucket already exists. + """ + try: + if self.bucket is None: + raise FileNotFoundError("No bucket in {} {}".format(type(self), self)) + # If the whole path is just the bucket, respect the result of "bucket.exists()" + if self.key is None and not exist_ok and self.bucket.exists(): + raise FileExistsError("Bucket {} already exists".format(self.bucket)) + return super().mkdir(mode, parents=parents, exist_ok=exist_ok) + except OSError: + if not exist_ok: + raise + + def is_mount(self: "Pathy") -> bool: + return False + + def is_symlink(self: "Pathy") -> bool: + return False + + def is_socket(self: "Pathy") -> bool: + return False + + def is_fifo(self: "Pathy") -> bool: + return False + + # Unsupported operations below here + + @classmethod + def cwd(cls) -> "Pathy": + message = cls._NOT_SUPPORTED_MESSAGE.format(method=cls.cwd.__qualname__) + raise NotImplementedError(message) + + @classmethod + def home(cls) -> "Pathy": + message = cls._NOT_SUPPORTED_MESSAGE.format(method=cls.home.__qualname__) + raise NotImplementedError(message) + + def chmod(self: "Pathy", mode: int) -> None: + message = self._NOT_SUPPORTED_MESSAGE.format(method=self.chmod.__qualname__) + raise NotImplementedError(message) + + def expanduser(self: "Pathy") -> "Pathy": + message = self._NOT_SUPPORTED_MESSAGE.format( + method=self.expanduser.__qualname__ + ) + raise NotImplementedError(message) + + def lchmod(self: "Pathy", mode: int) -> None: + message = self._NOT_SUPPORTED_MESSAGE.format(method=self.lchmod.__qualname__) + raise NotImplementedError(message) + + def group(self: "Pathy") -> str: + message = self._NOT_SUPPORTED_MESSAGE.format(method=self.group.__qualname__) + raise NotImplementedError(message) + + def is_block_device(self: "Pathy") -> bool: + message = self._NOT_SUPPORTED_MESSAGE.format( + method=self.is_block_device.__qualname__ + ) + raise NotImplementedError(message) + + def is_char_device(self: "Pathy") -> bool: + message = self._NOT_SUPPORTED_MESSAGE.format( + method=self.is_char_device.__qualname__ + ) + raise NotImplementedError(message) + + def lstat(self: "Pathy") -> os.stat_result: + message = self._NOT_SUPPORTED_MESSAGE.format(method=self.lstat.__qualname__) + raise NotImplementedError(message) + + def symlink_to( + self, target: Union[str, Path], target_is_directory: bool = False + ) -> None: + message = self._NOT_SUPPORTED_MESSAGE.format( + method=self.symlink_to.__qualname__ + ) + raise NotImplementedError(message) + + +class PathyScanDir(Iterator[Any], ContextManager[Any], ABC): + """A scandir implementation that works for all python 3.x versions. + + Python < 3.7 requires that scandir be iterable so it can be converted + to a list of results. + + Python >= 3.8 requires that scandir work as a context manager. + """ + + def __init__( + self, + client: BucketClient, + path: Optional[PurePathy] = None, + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + ) -> None: + super().__init__() + self._client = client + self._path = path + self._prefix = prefix + self._delimiter = delimiter + self._generator = self.scandir() + + def __enter__(self) -> Generator[BucketEntry, None, None]: + return self._generator + + def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + pass + + def __next__(self) -> Generator[BucketEntry, None, None]: + yield from self._generator + + def __iter__(self) -> Generator[BucketEntry, None, None]: + yield from self._generator + + @abstractmethod + def scandir(self) -> Generator[BucketEntry, None, None]: + raise NotImplementedError("must be implemented in a subclass") + + +# +# File system adapter +# + + +class BucketEntryFS(BucketEntry): + ... + + +@dataclass +class BlobFS(Blob): + def delete(self) -> None: + """Delete a file-based blob.""" + file_folder: str = os.path.dirname(self.raw) + self.raw.unlink() + # NOTE: in buckets folders only exist if a file is contained in them. Mimic + # that behavior here by removing empty folders when the last file is removed. + if len(os.listdir(file_folder)) == 0: + shutil.rmtree(file_folder) + + def exists(self) -> bool: + return self.raw.exists() + + +@dataclass +class BucketFS(Bucket): + name: str + bucket: pathlib.Path + + def get_blob(self, blob_name: str) -> Optional[BlobFS]: # type:ignore[override] + native_blob = self.bucket / blob_name + if not native_blob.exists() or native_blob.is_dir(): + return None + stat = native_blob.stat() + # path.owner() raises KeyError if the owner's UID isn't known + # + # https://docs.python.org/3/library/pathlib.html#pathlib.Path.owner + owner: Optional[str] + try: + owner = native_blob.owner() + except KeyError: + owner = None + return BlobFS( + bucket=self, + owner=owner, + name=blob_name, + raw=native_blob, + size=stat.st_size, + updated=int(round(stat.st_mtime)), + ) + + def copy_blob( # type:ignore[override] + self, blob: BlobFS, target: "BucketFS", name: str + ) -> Optional[BlobFS]: + in_file = str(blob.bucket.bucket / blob.name) + out_file = str(target.bucket / name) + out_path = pathlib.Path(os.path.dirname(out_file)) + if not out_path.exists(): + out_path.mkdir(parents=True) + shutil.copy(in_file, out_file) + return None + + def delete_blob(self, blob: BlobFS) -> None: # type:ignore[override] + blob.delete() + + def delete_blobs(self, blobs: List[BlobFS]) -> None: # type:ignore[override] + for blob in blobs: + blob.delete() + + def exists(self) -> bool: + return self.bucket.exists() + + +@dataclass +class BucketClientFS(BucketClient): + # Root to store file-system buckets as children of + root: pathlib.Path = field(default_factory=lambda: pathlib.Path("/tmp/")) + + def full_path(self, path: Pathy) -> pathlib.Path: + if path.root is None: + raise ValueError(f"Invalid bucket name for path: {path}") + full_path = self.root.absolute() / path.root + if path.key is not None: + full_path = full_path / path.key + return full_path + + def exists(self, path: Pathy) -> bool: + """Return True if the path exists as a file or folder on disk""" + return self.full_path(path).exists() + + def is_dir(self, path: Pathy) -> bool: + return self.full_path(path).is_dir() + + def rmdir(self, path: Pathy) -> None: + full_path = self.full_path(path) + return shutil.rmtree(str(full_path)) + + def open( + self, + path: Pathy, + *, + mode: str = "r", + buffering: int = DEFAULT_BUFFER_SIZE, + encoding: Optional[str] = None, + errors: Optional[str] = None, + newline: Optional[str] = None, + ) -> StreamableType: + if self.lookup_bucket(path) is None: + raise ClientError(message=f'bucket "{path.root}" does not exist', code=404) + + full_path = self.full_path(path) + if not full_path.exists(): + if full_path.name != "": + full_path = full_path.parent + full_path.mkdir(parents=True, exist_ok=True) + return super().open( + path, + mode=mode, + buffering=buffering, + encoding=encoding, + errors=errors, + newline=newline, + ) + + def make_uri(self, path: PurePathy) -> str: + if not path.root: + raise ValueError(f"cannot make a URI to an invalid bucket: {path.root}") + full_path = self.root.absolute() / path.root + if path.key is not None: + full_path /= path.key + result = f"file://{full_path}" + return result + + def create_bucket(self, path: PurePathy) -> Bucket: + if not path.root: + raise ValueError(f"Invalid bucket name: {path.root}") + bucket_path: pathlib.Path = self.root / path.root + if bucket_path.exists(): + raise FileExistsError(f"Bucket already exists at: {bucket_path}") + bucket_path.mkdir(parents=True, exist_ok=True) + return BucketFS(str(path.root), bucket=bucket_path) + + def delete_bucket(self, path: PurePathy) -> None: + bucket_path: pathlib.Path = self.root / str(path.root) + if bucket_path.exists(): + shutil.rmtree(bucket_path) + + def lookup_bucket(self, path: PurePathy) -> Optional[BucketFS]: + if path.root: + bucket_path: pathlib.Path = self.root / path.root + if bucket_path.exists(): + return BucketFS(str(path.root), bucket=bucket_path) + return None + + def get_bucket(self, path: PurePathy) -> BucketFS: + if not path.root: + raise ValueError(f"path has an invalid bucket_name: {path.root}") + bucket_path: pathlib.Path = self.root / path.root + if bucket_path.is_dir(): + return BucketFS(str(path.root), bucket=bucket_path) + raise FileNotFoundError(f"Bucket {path.root} does not exist!") + + def list_buckets(self, **kwargs: Dict[str, Any]) -> Generator[BucketFS, None, None]: + for f in self.root.glob("*"): + if f.is_dir(): + yield BucketFS(f.name, f) + + def scandir( + self, + path: Optional[Pathy] = None, + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + ) -> PathyScanDir: + return _FSScanDir(client=self, path=path, prefix=prefix, delimiter=delimiter) + + def list_blobs( + self, + path: PurePathy, + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + include_dirs: bool = False, + ) -> Generator[BlobFS, None, None]: + assert path.root is not None + bucket = self.get_bucket(path) + scan_path = self.root / path.root + if prefix is not None: + scan_path = scan_path / prefix + elif prefix is not None and path.key is not None: + scan_path = scan_path / path.key + + # Path to a file + if scan_path.exists() and not scan_path.is_dir(): + stat = scan_path.stat() + file_size = stat.st_size + updated = int(round(stat.st_mtime_ns * 1000)) + yield BlobFS( + bucket, + name=scan_path.name, + size=file_size, + updated=updated, + owner=None, + raw=scan_path, + ) + + # Yield blobs for each file + for file_path in scan_path.rglob("*"): + if file_path.is_dir(): + continue + stat = file_path.stat() + file_size = stat.st_size + updated = int(round(stat.st_mtime_ns * 1000)) + yield BlobFS( + bucket, + name=f"{prefix if prefix is not None else ''}{file_path.name}", + size=file_size, + updated=updated, + owner=None, + raw=file_path, + ) + + +class _FSScanDir(PathyScanDir): + _client: BucketClientFS + + def scandir(self) -> Generator[BucketEntry, None, None]: + if self._path is None or not self._path.root: + for bucket in self._client.list_buckets(): + yield BucketEntryFS(bucket.name, is_dir=True, raw=None) + return + assert self._path is not None + assert self._path.root is not None + scan_path = self._client.root / self._path.root + if self._prefix is not None: + scan_path = scan_path / self._prefix + for dir_entry in scan_path.glob("*"): + if dir_entry.is_dir(): + yield BucketEntryFS(dir_entry.name, is_dir=True, raw=None) + else: + file_path = pathlib.Path(dir_entry) + stat = file_path.stat() + file_size = stat.st_size + updated = int(round(stat.st_mtime)) + blob: BlobFS = BlobFS( + self._client.get_bucket(self._path), + name=dir_entry.name, + size=file_size, + updated=updated, + owner=None, + raw=file_path, + ) + yield BucketEntryFS( + name=dir_entry.name, + is_dir=False, + size=file_size, + last_modified=updated, + raw=blob, + ) + + +# +# Client Registration +# + + +# TODO: Maybe this should be dynamic, but I'm trying to see if we can +# hardcode it (atleast the base schemes) to get nice types flowing +# in cases where they would otherwise be lost. +_client_registry: Dict[str, Type[BucketClient]] = { + "": BucketClientFS, + "file": BucketClientFS, +} +# Optional clients that we attempt to dynamically load when encountering +# a Pathy object with a matching scheme +_optional_clients: Dict[str, str] = { + "gs": "pathy.gcs", +} +BucketClientType = TypeVar("BucketClientType", bound=BucketClient) + +# Hold given client args for a scheme +_client_args_registry: Dict[str, Any] = {} +_instance_cache: Dict[str, Any] = {} +_fs_client: Optional["BucketClientFS"] = None +_fs_cache: Optional[pathlib.Path] = None + + +def register_client(scheme: str, type: Type[BucketClient]) -> None: + """Register a bucket client for use with certain scheme Pathy objects""" + global _client_registry + _client_registry[scheme] = type + + +def get_client(scheme: str) -> BucketClientType: # type:ignore + """Retrieve the bucket client for use with a given scheme""" + global _client_registry, _instance_cache, _fs_client + global _optional_clients, _client_args_registry + if _fs_client is not None: + return _fs_client # type: ignore + if scheme in _instance_cache: + return _instance_cache[scheme] + + # Attempt to dynamically load optional clients if we find a matching scheme + if scheme not in _client_registry and scheme in _optional_clients: + # We don't handle ImportError here because we expect clients to register + # on import or add a "null" handler if their requirements are not satisfied. + importlib.import_module(_optional_clients[scheme]) + + # Create the client from the known registry + if scheme in _client_registry: + kwargs: Dict[str, Any] = ( + _client_args_registry[scheme] if scheme in _client_args_registry else {} + ) + _instance_cache[scheme] = _client_registry[scheme](**kwargs) # type:ignore + return _instance_cache[scheme] + + raise ValueError(f'There is no client registered to handle "{scheme}" paths') + + +def set_client_params(scheme: str, **kwargs: Any) -> None: + """Specify args to pass when instantiating a service-specific Client + object. This allows for passing credentials in whatever way your underlying + client library prefers.""" + global _client_registry, _instance_cache, _client_args_registry + _client_args_registry[scheme] = kwargs + if scheme in _instance_cache: + _instance_cache[scheme].recreate(**_client_args_registry[scheme]) + return None + + +def use_fs( + root: Optional[Union[str, pathlib.Path, bool]] = None +) -> Optional[BucketClientFS]: + """Use a path in the local file-system to store blobs and buckets. + + This is useful for development and testing situations, and for embedded + applications.""" + global _fs_client + # False - disable adapter + if root is False: + _fs_client = None + return None + + # None or True - enable FS adapter with default root + if root is None or root is True: + # Look up "data" folder of pathy package similar to spaCy + client_root = pathlib.Path(__file__).parent / "data" + else: + assert isinstance( + root, (str, pathlib.Path) + ), f"root is not a known type: {type(root)}" + client_root = pathlib.Path(root) + if not client_root.exists(): + client_root.mkdir(parents=True) + _fs_client = BucketClientFS(root=client_root) + return _fs_client + + +def get_fs_client() -> Optional[BucketClientFS]: + """Get the file-system client (or None)""" + global _fs_client + assert _fs_client is None or isinstance( + _fs_client, BucketClientFS + ), "invalid root type" + return _fs_client + + +def use_fs_cache( + root: Optional[Union[str, pathlib.Path, bool]] = None +) -> Optional[pathlib.Path]: + """Use a path in the local file-system to cache blobs and buckets. + + This is useful for when you want to avoid fetching large blobs multiple + times, or need to pass a local file path to a third-party library.""" + global _fs_cache + # False - disable adapter + if root is False: + _fs_cache = None + return None + + # None or True - enable FS cache with default root + if root is None or root is True: + # Use a temporary folder. Cache will be removed according to OS policy + cache_root = pathlib.Path(tempfile.mkdtemp()) + else: + assert isinstance( + root, (str, pathlib.Path) + ), f"root is not a known type: {type(root)}" + cache_root = pathlib.Path(root) + if not cache_root.exists(): + cache_root.mkdir(parents=True) + _fs_cache = cache_root + return cache_root + + +def get_fs_cache() -> Optional[pathlib.Path]: + """Get the folder that holds file-system cached blobs and timestamps.""" + global _fs_cache + assert _fs_cache is None or isinstance(_fs_cache, pathlib.Path), "invalid root type" + return _fs_cache + + +def clear_fs_cache(force: bool = False) -> None: + """Remove the existing file-system blob cache folder. + + Raises AssertionError if the cache path is unset or points to the + root of the file-system.""" + cache_path = get_fs_cache() + assert cache_path is not None, "no cache to clear" + resolved = cache_path.resolve() + assert str(resolved) != "/", f"refusing to remove a root path: {resolved}" + shutil.rmtree(str(resolved)) diff --git a/pathy/_gcs.py b/pathy/_gcs.py new file mode 100644 index 0000000..f100713 --- /dev/null +++ b/pathy/_gcs.py @@ -0,0 +1,255 @@ +from dataclasses import dataclass +from typing import Any, Dict, Generator, List, Optional + +from google.api_core import exceptions as gcs_errors # type:ignore +from google.cloud.storage import Blob as GCSNativeBlob # type:ignore +from google.cloud.storage import Bucket as GCSNativeBucket # type:ignore +from google.cloud.storage import Client as GCSNativeClient # type:ignore + +from . import ( + Blob, + Bucket, + BucketClient, + BucketEntry, + ClientError, + PathyScanDir, + PurePathy, + register_client, +) + + +class BucketEntryGCS(BucketEntry): + bucket: "BucketGCS" + raw: GCSNativeBlob # type:ignore[override] + + +@dataclass +class BlobGCS(Blob): + def delete(self) -> None: + self.raw.delete() # type:ignore + + def exists(self) -> bool: + return self.raw.exists() # type:ignore + + +@dataclass +class BucketGCS(Bucket): + name: str + bucket: GCSNativeBucket + + def get_blob(self, blob_name: str) -> Optional[BlobGCS]: + assert isinstance( + blob_name, str + ), f"expected str blob name, but found: {type(blob_name)}" + native_blob: Optional[Any] = None + try: + native_blob = self.bucket.get_blob(blob_name) # type:ignore + except gcs_errors.ClientError: + pass + if native_blob is None: + return None + return BlobGCS( + bucket=self.bucket, + owner=native_blob.owner, # type:ignore + name=native_blob.name, # type:ignore + raw=native_blob, + size=native_blob.size, + updated=int(native_blob.updated.timestamp()), # type:ignore + ) + + def copy_blob( # type:ignore[override] + self, blob: BlobGCS, target: "BucketGCS", name: str + ) -> Optional[BlobGCS]: + assert blob.raw is not None, "raw storage.Blob instance required" + native_blob: GCSNativeBlob = self.bucket.copy_blob( # type: ignore + blob.raw, target.bucket, name + ) + if native_blob is None: + return None + return BlobGCS( + bucket=self.bucket, + owner=native_blob.owner, # type:ignore + name=native_blob.name, # type:ignore + raw=native_blob, + size=native_blob.size, + updated=int(native_blob.updated.timestamp()), # type:ignore + ) + + def delete_blob(self, blob: BlobGCS) -> None: # type:ignore[override] + return self.bucket.delete_blob(blob.name) # type:ignore + + def delete_blobs(self, blobs: List[BlobGCS]) -> None: # type:ignore[override] + return self.bucket.delete_blobs(blobs) # type:ignore + + def exists(self) -> bool: + try: + return self.bucket.exists() # type:ignore + except gcs_errors.ClientError: + return False + + +class BucketClientGCS(BucketClient): + client: GCSNativeClient + + @property + def client_params(self) -> Any: + return dict(client=self.client) + + def __init__(self, **kwargs: Any) -> None: + self.recreate(**kwargs) + + def recreate(self, **kwargs: Any) -> None: + creds = kwargs["credentials"] if "credentials" in kwargs else None + if creds is not None: + kwargs["project"] = creds.project_id + self.client = GCSNativeClient(**kwargs) + + def make_uri(self, path: PurePathy) -> str: + return str(path) + + def create_bucket( # type:ignore[override] + self, path: PurePathy + ) -> GCSNativeBucket: + return self.client.create_bucket(path.root) # type:ignore + + def delete_bucket(self, path: PurePathy) -> None: + bucket = self.client.get_bucket(path.root) # type:ignore + bucket.delete() # type:ignore + + def exists(self, path: PurePathy) -> bool: + # Because we want all the parents of a valid blob (e.g. "directory" in + # "directory/foo.file") to return True, we enumerate the blobs with a prefix + # and compare the object names to see if they match a substring of the path + key_name = str(path.key) + try: + for obj in self.list_blobs(path): + if obj.name == key_name: + return True + if obj.name.startswith(key_name + path._flavour.sep): # type:ignore + return True + except gcs_errors.ClientError: + return False + return False + + def lookup_bucket(self, path: PurePathy) -> Optional[BucketGCS]: + try: + native_bucket = self.client.bucket(path.root) # type:ignore + if native_bucket is not None: + return BucketGCS(str(path.root), bucket=native_bucket) + except gcs_errors.ClientError as err: + print(err) + + return None + + def get_bucket(self, path: PurePathy) -> BucketGCS: + try: + native_bucket = self.client.bucket(path.root) # type:ignore + if native_bucket is not None: + return BucketGCS(str(path.root), bucket=native_bucket) + raise FileNotFoundError(f"Bucket {path.root} does not exist!") + except gcs_errors.ClientError as e: + raise ClientError(message=e.message, code=e.code) # type:ignore + + def list_buckets( # type:ignore[override] + self, **kwargs: Dict[str, Any] + ) -> Generator[GCSNativeBucket, None, None]: + return self.client.list_buckets(**kwargs) # type:ignore + + def scandir( # type:ignore[override] + self, + path: Optional[PurePathy] = None, + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + ) -> PathyScanDir: + return _GCSScanDir(client=self, path=path, prefix=prefix, delimiter=delimiter) + + def list_blobs( + self, + path: PurePathy, + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + include_dirs: bool = False, + ) -> Generator[BlobGCS, None, None]: + continuation_token: Any = None + bucket = self.lookup_bucket(path) + if bucket is None: + return + while True: + response: Any + if continuation_token: + response = self.client.list_blobs( # type:ignore + path.root, + prefix=prefix, + delimiter=delimiter, + page_token=continuation_token, + ) + else: + response = self.client.list_blobs( # type:ignore + path.root, prefix=prefix, delimiter=delimiter + ) + + page: Any + item: Any + for page in response.pages: # type:ignore + for item in page: + yield BlobGCS( + bucket=bucket, + owner=item.owner, + name=item.name, + raw=item, + size=item.size, + updated=item.updated.timestamp(), + ) + if response.next_page_token is None: + break + continuation_token = response.next_page_token + + +class _GCSScanDir(PathyScanDir): + _client: BucketClientGCS + + def scandir(self) -> Generator[BucketEntryGCS, None, None]: + continuation_token = None + if self._path is None or not self._path.root: + gcs_bucket: GCSNativeBucket + for gcs_bucket in self._client.client.list_buckets(): + yield BucketEntryGCS(gcs_bucket.name, is_dir=True, raw=None) + return + sep = self._path._flavour.sep + bucket = self._client.lookup_bucket(self._path) + if bucket is None: + return + while True: + if continuation_token: + response = self._client.client.list_blobs( + bucket.name, + prefix=self._prefix, + delimiter=sep, + page_token=continuation_token, + ) + else: + response = self._client.client.list_blobs( + bucket.name, prefix=self._prefix, delimiter=sep + ) + for page in response.pages: + for folder in list(page.prefixes): + full_name = folder[:-1] if folder.endswith(sep) else folder + name = full_name.split(sep)[-1] + if name: + yield BucketEntryGCS(name, is_dir=True, raw=None) + for item in page: + name = item.name.split(sep)[-1] + if name: + yield BucketEntryGCS( + name=name, + is_dir=False, + size=item.size, + last_modified=item.updated.timestamp(), + raw=item, + ) + if response.next_page_token is None: + break + continuation_token = response.next_page_token + + +register_client("gs", BucketClientGCS) diff --git a/pathy/tests/__init__.py b/pathy/_tests/__init__.py similarity index 100% rename from pathy/tests/__init__.py rename to pathy/_tests/__init__.py diff --git a/pathy/tests/conftest.py b/pathy/_tests/conftest.py similarity index 97% rename from pathy/tests/conftest.py rename to pathy/_tests/conftest.py index c4db52c..d66cb4f 100644 --- a/pathy/tests/conftest.py +++ b/pathy/_tests/conftest.py @@ -7,7 +7,7 @@ import pytest -from pathy import Pathy, use_fs, use_fs_cache, set_client_params +from pathy import Pathy, set_client_params, use_fs, use_fs_cache from pathy.gcs import has_gcs has_credentials = "GCS_CREDENTIALS" in os.environ diff --git a/pathy/tests/fixtures/tar_but_not_gzipped.tar.gz b/pathy/_tests/fixtures/tar_but_not_gzipped.tar.gz similarity index 100% rename from pathy/tests/fixtures/tar_but_not_gzipped.tar.gz rename to pathy/_tests/fixtures/tar_but_not_gzipped.tar.gz diff --git a/pathy/tests/test_base.py b/pathy/_tests/test_base.py similarity index 99% rename from pathy/tests/test_base.py rename to pathy/_tests/test_base.py index 46b9df6..f1c1c51 100644 --- a/pathy/tests/test_base.py +++ b/pathy/_tests/test_base.py @@ -6,6 +6,7 @@ import pytest import spacy + from .. import ( BasePath, Blob, @@ -21,7 +22,6 @@ use_fs, use_fs_cache, ) - from ..about import __version__ from .conftest import TEST_ADAPTERS @@ -317,8 +317,8 @@ def test_base_with_suffix() -> None: def test_api_path_support() -> None: - assert PurePathy in Pathy.mro() # type: ignore - assert Path in Pathy.mro() # type: ignore + assert PurePathy in Pathy.mro() + assert Path in Pathy.mro() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) @@ -810,7 +810,7 @@ def test_client_create_bucket(temp_folder: Path) -> None: def test_client_base_bucket_raises_not_implemented() -> None: - bucket = Bucket() + bucket: Bucket = Bucket() blob: Blob = Blob(bucket, "foo", -1, -1, None, None) with pytest.raises(NotImplementedError): bucket.copy_blob(blob, bucket, "baz") @@ -833,7 +833,7 @@ def test_client_base_blob_raises_not_implemented() -> None: def test_client_base_bucket_client_raises_not_implemented() -> None: - client = BucketClient() + client: BucketClient = BucketClient() with pytest.raises(NotImplementedError): client.lookup_bucket(Pathy("gs://foo")) with pytest.raises(NotImplementedError): diff --git a/pathy/tests/test_cli.py b/pathy/_tests/test_cli.py similarity index 100% rename from pathy/tests/test_cli.py rename to pathy/_tests/test_cli.py diff --git a/pathy/tests/test_clients.py b/pathy/_tests/test_clients.py similarity index 92% rename from pathy/tests/test_clients.py rename to pathy/_tests/test_clients.py index fb14167..14c7e48 100644 --- a/pathy/tests/test_clients.py +++ b/pathy/_tests/test_clients.py @@ -4,23 +4,29 @@ import pytest -from pathy import Pathy, get_client -from pathy.base import BucketClient from pathy import ( + BucketClient, + BucketClientFS, + Pathy, + get_client, get_fs_client, register_client, set_client_params, use_fs, use_fs_cache, ) -from pathy import BucketClientFS -from pathy.gcs import BucketClientGCS +from pathy._gcs import BucketClientGCS +from pathy.gcs import has_gcs from .conftest import TEST_ADAPTERS -def test_clients_get_client_works_with_builtin_schems() -> None: +@pytest.mark.skipif(not has_gcs, reason="requires gcs") +def test_clients_get_client_works_with_optional_builtin_schems() -> None: assert isinstance(get_client("gs"), BucketClientGCS) + + +def test_clients_get_client_works_with_builtin_schems() -> None: assert isinstance(get_client("file"), BucketClientFS) assert isinstance(get_client(""), BucketClientFS) diff --git a/pathy/tests/test_file.py b/pathy/_tests/test_file.py similarity index 88% rename from pathy/tests/test_file.py rename to pathy/_tests/test_file.py index 7378b82..0b15347 100644 --- a/pathy/tests/test_file.py +++ b/pathy/_tests/test_file.py @@ -4,8 +4,7 @@ import mock import pytest -from pathy import Pathy, get_client -from pathy import BlobFS, BucketClientFS, BucketFS +from pathy import BlobFS, BucketClientFS, BucketFS, Pathy, get_client def raise_owner(self: Any) -> None: diff --git a/pathy/tests/test_gcs.py b/pathy/_tests/test_gcs.py similarity index 91% rename from pathy/tests/test_gcs.py rename to pathy/_tests/test_gcs.py index d984d4f..a8ad232 100644 --- a/pathy/tests/test_gcs.py +++ b/pathy/_tests/test_gcs.py @@ -2,7 +2,8 @@ import pytest -from pathy import has_gcs, set_client_params +from pathy import set_client_params +from pathy.gcs import has_gcs def raise_default_creds_error() -> None: diff --git a/pathy/_types.py b/pathy/_types.py new file mode 100644 index 0000000..bd5e74b --- /dev/null +++ b/pathy/_types.py @@ -0,0 +1,15 @@ +from pathlib import _Accessor as _PathlibAccessor # type:ignore +from pathlib import _PosixFlavour as _PathlibPosixFlavour # type:ignore +from pathlib import _WindowsFlavour as _PathlibWindowsFlavour # type:ignore + + +class _Accessor(_PathlibAccessor): # type:ignore + pass + + +class _PosixFlavour(_PathlibPosixFlavour): # type:ignore + pass + + +class _WindowsFlavour(_PathlibWindowsFlavour): # type:ignore + pass diff --git a/pathy/base.py b/pathy/base.py deleted file mode 100644 index 80ea08e..0000000 --- a/pathy/base.py +++ /dev/null @@ -1,1299 +0,0 @@ -import os -import pathlib -import shutil -import tempfile -from dataclasses import dataclass, field -from io import DEFAULT_BUFFER_SIZE -from pathlib import _Accessor # type:ignore -from pathlib import _PosixFlavour # type:ignore -from pathlib import _WindowsFlavour # type:ignore -from pathlib import Path, PurePath -from typing import ( - IO, - Any, - ContextManager, - Dict, - Generator, - Generic, - Iterator, - List, - Optional, - Tuple, - Type, - TypeVar, - Union, - cast, -) - -import smart_open - -SUBCLASS_ERROR = "must be implemented in a subclass" - -StreamableType = IO[Any] -FluidPath = Union["Pathy", "BasePath"] -BucketClientType = TypeVar("BucketClientType", bound="BucketClient") -BucketType = TypeVar("BucketType") -BucketBlobType = TypeVar("BucketBlobType") - -_windows_flavour = _WindowsFlavour() -_posix_flavour = _PosixFlavour() - - -@dataclass -class ClientError(BaseException): - message: str - code: Optional[int] - - def __str__(self) -> str: - return self.__repr__() - - def __repr__(self) -> str: - return f"({self.code}) {self.message}" - - -@dataclass -class BlobStat: - """Stat for a bucket item""" - - name: str - size: Optional[int] - last_modified: Optional[int] - - -@dataclass -class Blob(Generic[BucketType, BucketBlobType]): - bucket: "Bucket" - name: str - size: Optional[int] - updated: Optional[int] - owner: Optional[str] - raw: BucketBlobType - - def delete(self) -> None: - raise NotImplementedError(SUBCLASS_ERROR) - - def exists(self) -> bool: - raise NotImplementedError(SUBCLASS_ERROR) - - -class BucketEntry(Generic[BucketType, BucketBlobType]): - """A single item returned from scanning a path""" - - name: str - _is_dir: bool - _stat: BlobStat - raw: Optional[Blob[BucketType, BucketBlobType]] - - def __init__( - self, - name: str, - is_dir: bool = False, - size: int = -1, - last_modified: int = -1, - raw: Optional[Blob[BucketType, BucketBlobType]] = None, - ): - self.name = name - self.raw = raw - self._is_dir = is_dir - self._stat = BlobStat(name=name, size=size, last_modified=last_modified) - - def __repr__(self) -> str: - return "{}(name={}, is_dir={}, stat={})".format( - type(self).__name__, self.name, self._is_dir, self._stat - ) - - def inode(self, *args: Any, **kwargs: Dict[str, Any]) -> None: - return None - - def is_dir(self) -> bool: - return self._is_dir - - def is_file(self) -> bool: - return not self._is_dir - - def is_symlink(self) -> bool: - return False - - def stat(self) -> BlobStat: - return self._stat - - -@dataclass -class Bucket: - def get_blob(self, blob_name: str) -> Optional[Blob[BucketType, BucketBlobType]]: - raise NotImplementedError(SUBCLASS_ERROR) - - def copy_blob( - self, blob: Blob[BucketType, BucketBlobType], target: "Bucket", name: str - ) -> Optional[Blob[BucketType, BucketBlobType]]: - raise NotImplementedError(SUBCLASS_ERROR) - - def delete_blob(self, blob: Blob[BucketType, BucketBlobType]) -> None: - raise NotImplementedError(SUBCLASS_ERROR) - - def delete_blobs(self, blobs: List[Blob[BucketType, BucketBlobType]]) -> None: - raise NotImplementedError(SUBCLASS_ERROR) - - def exists(self) -> bool: - raise NotImplementedError(SUBCLASS_ERROR) - - -class BucketClient: - """Base class for a client that interacts with a bucket-based storage system.""" - - def recreate(self, **kwargs: Any) -> None: - """Recreate any underlying bucket client adapter with the given kwargs""" - - def open( - self, - path: "Pathy", - *, - mode: str = "r", - buffering: int = DEFAULT_BUFFER_SIZE, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, - ) -> StreamableType: - client_params = {} - if hasattr(self, "client_params"): - client_params = getattr(self, "client_params") - - return smart_open.open( - self.make_uri(path), - mode=mode, - buffering=buffering, - encoding=encoding, - errors=errors, - newline=newline, - transport_params=client_params, - # Disable de/compression based on the file extension - ignore_ext=True, - ) # type:ignore - - def make_uri(self, path: "Pathy") -> str: - return path.as_uri() - - def is_dir(self, path: "Pathy") -> bool: - return any(self.list_blobs(path, prefix=path.prefix)) - - def rmdir(self, path: "Pathy") -> None: - return None - - def exists(self, path: "Pathy") -> bool: - raise NotImplementedError(SUBCLASS_ERROR) - - def lookup_bucket(self, path: "Pathy") -> Optional[Bucket]: - raise NotImplementedError(SUBCLASS_ERROR) - - def get_bucket(self, path: "Pathy") -> Bucket: - raise NotImplementedError(SUBCLASS_ERROR) - - def list_buckets(self) -> Generator[Bucket, None, None]: - raise NotImplementedError(SUBCLASS_ERROR) - - def list_blobs( - self, - path: "Pathy", - prefix: Optional[str] = None, - delimiter: Optional[str] = None, - include_dirs: bool = False, - ) -> Generator[Blob, None, None]: - raise NotImplementedError(SUBCLASS_ERROR) - - def scandir( - self, - path: "Pathy" = None, - prefix: Optional[str] = None, - delimiter: Optional[str] = None, - ) -> "PathyScanDir": - raise NotImplementedError(SUBCLASS_ERROR) - - def create_bucket(self, path: "Pathy") -> Bucket: - raise NotImplementedError(SUBCLASS_ERROR) - - def delete_bucket(self, path: "Pathy") -> None: - raise NotImplementedError(SUBCLASS_ERROR) - - -class _PathyFlavour(_PosixFlavour): - is_supported = True - - def parse_parts(self, parts: List[str]) -> Tuple[str, str, List[str]]: - drv, root, parsed = super().parse_parts(parts) - if len(parsed) and parsed[0].endswith(":"): - if len(parsed) < 2: - raise ValueError("need atleast two parts") - # Restore the - drv = parsed[0] # scheme: - root = parsed[1] # bucket_name - for part in parsed[1:]: - if part == "..": - index = parsed.index(part) - parsed.pop(index - 1) - parsed.remove(part) - return drv, root, parsed - - def make_uri(self, path: "Pathy") -> str: - uri = super().make_uri(path) - return uri.replace("file:///", "gs://") - - -class PurePathy(PurePath): - """PurePath subclass for bucket storage.""" - - _flavour = _PathyFlavour() - __slots__ = () - - @property - def scheme(self) -> str: - """Return the scheme portion of this path. A path's scheme is the leading - few characters. In a website you would see a scheme of "http" or "https". - - Consider a few examples: - - ```python - assert Pathy("gs://foo/bar").scheme == "gs" - assert Pathy("file:///tmp/foo/bar").scheme == "file" - assert Pathy("/dev/null").scheme == "" - - """ - # If there is no drive, return nothing - if self.drive == "": - return "" - # This is an assumption of mine. I think it's fine, but let's - # cause an error if it's not the case. - assert self.drive[-1] == ":", "drive should end with :" - return self.drive[:-1] - - @property - def bucket(self) -> "Pathy": - """Return a new instance of only the bucket path.""" - self._absolute_path_validation() - return cast(Pathy, type(self)(f"{self.drive}//{self.root}")) - - @property - def key(self) -> Optional["Pathy"]: - """Return a new instance of only the key path.""" - self._absolute_path_validation() - key = self._flavour.sep.join(self.parts[2:]) - if not key or len(self.parts) < 2: - return None - return cast(Pathy, type(self)(key)) - - @property - def prefix(self) -> str: - sep = self._flavour.sep - str(self) - key = self.key - if not key: - return "" - key_name = str(key) - if not key_name.endswith(sep): - return key_name + sep - return key_name - - def _absolute_path_validation(self) -> None: - if not self.is_absolute(): - raise ValueError("relative paths has no bucket/key specification") - - @classmethod - def _format_parsed_parts(cls, drv: str, root: str, parts: List[str]) -> str: - # Bucket path "gs://foo/bar" - if drv and root: - return f"{drv}//{root}/" + cls._flavour.join(parts[2:]) - # Absolute path - elif drv or root: - return drv + root + cls._flavour.join(parts[1:]) - else: - # Relative path - return cls._flavour.join(parts) - - -_SUPPORTED_OPEN_MODES = {"r", "rb", "tr", "rt", "w", "wb", "bw", "wt", "tw"} - - -class _PathyExtensions: - def ls(self) -> Generator["BlobStat", None, None]: - blobs: "PathyScanDir" = self._accessor.scandir(self) # type:ignore - for blob in blobs: - if hasattr(blob, "_stat"): - yield blob._stat - elif isinstance(blob, os.DirEntry): - os_blob = cast(os.DirEntry, blob) - stat = os_blob.stat() - file_size = stat.st_size - updated = int(round(stat.st_mtime)) - yield BlobStat(name=os_blob.name, size=file_size, last_modified=updated) - - -class BasePath(_PathyExtensions, Path): - # NOTE: pathlib normally takes care of this, but the logic checks - # for specifically "Path" type class in __new__ so we need to - # set the flavour manually based on the OS. - _flavour = _windows_flavour if os.name == "nt" else _posix_flavour # type:ignore - - -class BucketsAccessor(_Accessor): - """Access data from blob buckets""" - - _client: Optional[BucketClient] - - def client(self, path: "Pathy") -> BucketClient: - return get_client(path.scheme) - - def get_blob(self, path: "Pathy") -> Optional[Blob]: - """Get the blob associated with a path or return None""" - if not path.root: - return None - bucket = self.client(path).lookup_bucket(path) - if bucket is None: - return None - key_name = str(path.key) - return bucket.get_blob(key_name) - - def unlink(self, path: "Pathy") -> None: - """Delete a link to a blob in a bucket.""" - bucket = self.client(path).get_bucket(path) - blob: Optional[Blob] = bucket.get_blob(str(path.key)) - if blob is None: - raise FileNotFoundError(path) - blob.delete() - - def stat(self, path: "Pathy") -> BlobStat: - bucket = self.client(path).get_bucket(path) - blob: Optional[Blob] = bucket.get_blob(str(path.key)) - if blob is None: - raise FileNotFoundError(path) - return BlobStat(name=str(blob), size=blob.size, last_modified=blob.updated) - - def is_dir(self, path: "Pathy") -> bool: - if str(path) == path.root: - return True - if self.get_blob(path) is not None: - return False - return self.client(path).is_dir(path) - - def exists(self, path: "Pathy") -> bool: - client = self.client(path) - if not path.root: - return any(client.list_buckets()) - try: - bucket = client.lookup_bucket(path) - except ClientError: - return False - if bucket is None or not bucket.exists(): - return False - if not path.key: - return True - - key_name = str(path.key) - blob: Optional[Blob] = bucket.get_blob(key_name) - if blob is not None: - return blob.exists() - # Determine if the path exists according to the current adapter - return client.exists(path) - - def scandir(self, path: "Pathy") -> "PathyScanDir": - return self.client(path).scandir(path, prefix=path.prefix) - - def listdir(self, path: "Pathy") -> Generator[str, None, None]: - with self.scandir(path) as entries: - for entry in entries: - yield entry.name - - def open( - self, - path: "Pathy", - *, - mode: str = "r", - buffering: int = -1, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, - ) -> StreamableType: - return self.client(path).open( - path, - mode=mode, - buffering=buffering, - encoding=encoding, - errors=errors, - newline=newline, - ) - - def owner(self, path: "Pathy") -> Optional[str]: - blob: Optional[Blob] = self.get_blob(path) - return blob.owner if blob is not None else None - - def resolve(self, path: "Pathy", strict: bool = False) -> "Pathy": - path_parts = str(path).replace(path.drive, "") - return Pathy(f"{path.drive}{os.path.abspath(path_parts)}") - - def rename(self, path: "Pathy", target: "Pathy") -> None: - client: BucketClient = self.client(path) - bucket: Bucket = client.get_bucket(path) - target_bucket: Bucket = client.get_bucket(target) - - # Single file - if not self.is_dir(path): - from_blob: Optional[Blob] = bucket.get_blob(str(path.key)) - if from_blob is None: - raise FileNotFoundError(f'source file "{path}" does not exist') - target_bucket.copy_blob(from_blob, target_bucket, str(target.key)) - bucket.delete_blob(from_blob) - return - - # Folder with objects - sep = path._flavour.sep - blobs = list(client.list_blobs(path, prefix=path.prefix, delimiter=sep)) - # First rename - for blob in blobs: - target_key_name = blob.name.replace(str(path.key), str(target.key)) - target_bucket.copy_blob(blob, target_bucket, target_key_name) - # Then delete the sources - for blob in blobs: - bucket.delete_blob(blob) - - def replace(self, path: "Pathy", target: "Pathy") -> None: - return self.rename(path, target) - - def rmdir(self, path: "Pathy") -> None: - client: BucketClient = self.client(path) - key_name = str(path.key) if path.key is not None else None - bucket: Bucket = client.get_bucket(path) - blobs = list(client.list_blobs(path, prefix=key_name)) - bucket.delete_blobs(blobs) - # The path is just the bucket - if key_name is None: - client.delete_bucket(path) - elif client.is_dir(path): - client.rmdir(path) - - def mkdir(self, path: "Pathy", mode: int) -> None: - client: BucketClient = self.client(path) - bucket: Optional[Bucket] = client.lookup_bucket(path) - if bucket is None or not bucket.exists(): - client.create_bucket(path) - elif isinstance(path, Pathy) and path.key is not None: - assert isinstance(path, Pathy) - blob: Optional[Blob] = bucket.get_blob(str(path.key)) - if blob is not None and blob.exists(): - raise OSError(f"Blob already exists: {path}") - - -class Pathy(Path, PurePathy, _PathyExtensions): - """Subclass of `pathlib.Path` that works with bucket APIs.""" - - __slots__ = () - _accessor: "BucketsAccessor" - _default_accessor = BucketsAccessor() - _NOT_SUPPORTED_MESSAGE = "{method} is an unsupported bucket operation" - - def __truediv__(self, key: Union[str, Path, "Pathy", PurePathy]) -> "Pathy": # type: ignore[override] - return super().__truediv__(key) # type:ignore - - def __rtruediv__(self, key: Union[str, Path, "Pathy", PurePathy]) -> "Pathy": # type: ignore[override] - return super().__rtruediv__(key) # type:ignore - - def _init(self: "Pathy", template: Optional[Any] = None) -> None: - super()._init(template) # type:ignore - if template is None: - self._accessor = Pathy._default_accessor - else: - self._accessor = template._accessor - - @classmethod - def fluid(cls, path_candidate: Union[str, FluidPath]) -> FluidPath: - """Infer either a Pathy or pathlib.Path from an input path or string. - - The returned type is a union of the potential `FluidPath` types and will - type-check correctly against the minimum overlapping APIs of all the input - types. - - If you need to use specific implementation details of a type, "narrow" the - return of this function to the desired type, e.g. - - ```python - from pathy import FluidPath, Pathy - - fluid_path: FluidPath = Pathy.fluid("gs://my_bucket/foo.txt") - # Narrow the type to a specific class - assert isinstance(fluid_path, Pathy), "must be Pathy" - # Use a member specific to that class - assert fluid_path.prefix == "foo.txt/" - ``` - """ - from_path: FluidPath = Pathy(path_candidate) - if from_path.root in ["/", ""]: - from_path = BasePath(path_candidate) - return from_path - - @classmethod - def from_bucket(cls, bucket_name: str) -> "Pathy": - """Initialize a Pathy from a bucket name. This helper adds a trailing slash and - the appropriate prefix. - - ```python - from pathy import Pathy - - assert str(Pathy.from_bucket("one")) == "gs://one/" - assert str(Pathy.from_bucket("two")) == "gs://two/" - ``` - """ - return Pathy(f"gs://{bucket_name}/") # type:ignore - - @classmethod - def to_local(cls, blob_path: Union["Pathy", str], recurse: bool = True) -> Path: - """Download and cache either a blob or a set of blobs matching a prefix. - - The cache is sensitive to the file updated time, and downloads new blobs - as their updated timestamps change.""" - cache_folder = get_fs_cache() - if cache_folder is None: - raise ValueError( - 'cannot get and cache a blob without first calling "use_fs_cache"' - ) - - cache_folder.mkdir(exist_ok=True, parents=True) - - in_path: Pathy - if not isinstance(blob_path, Pathy): - in_path = Pathy(blob_path) - else: - in_path = blob_path - - cache_blob: Path = cache_folder.absolute() / in_path.root - if in_path.key is not None: - cache_blob /= in_path.key - cache_time: Path = ( - cache_folder.absolute() / in_path.root / f"{in_path.key}.time" - ) - # Keep a cache of downloaded files. Fetch new ones when: - # - the file isn't in the cache - # - cached_stat.updated != latest_stat.updated - if cache_blob.exists() and cache_time.exists(): - fs_time: str = cache_time.read_text() - gcs_stat: BlobStat = in_path.stat() - # If the times match, return the cached blob - if fs_time == str(gcs_stat.last_modified): - return cache_blob - # remove the cache files because they're out of date - cache_blob.unlink() - cache_time.unlink() - - # If the file isn't in the cache, download it - if not cache_blob.exists(): - # Is a blob - if in_path.is_file(): - dest_folder = cache_blob.parent - dest_folder.mkdir(exist_ok=True, parents=True) - cache_blob.write_bytes(in_path.read_bytes()) - blob_stat: BlobStat = in_path.stat() - cache_time.write_text(str(blob_stat.last_modified)) - elif recurse: - # If not a specific blob, enumerate all the blobs under - # the path and cache them, then return the cache folder - for blob in in_path.rglob("*"): - Pathy.to_local(blob, recurse=False) - return cache_blob - - def ls(self: "Pathy") -> Generator["BlobStat", None, None]: - """List blob names with stat information under the given path. - - This is considerably faster than using iterdir if you also need - the stat information for the enumerated blobs. - - Yields BlobStat objects for each found blob. - """ - yield from super(Pathy, self).ls() - - def stat(self: "Pathy") -> BlobStat: # type: ignore[override] - """Returns information about this bucket path.""" - self._absolute_path_validation() - if not self.key: - raise ValueError("cannot stat a bucket without a key") - return cast(BlobStat, super().stat()) - - def exists(self) -> bool: - """Returns True if the path points to an existing bucket, blob, or prefix.""" - self._absolute_path_validation() - return self._accessor.exists(self) - - def is_dir(self: "Pathy") -> bool: - """Determine if the path points to a bucket or a prefix of a given blob - in the bucket. - - Returns True if the path points to a bucket or a blob prefix. - Returns False if it points to a blob or the path doesn't exist. - """ - self._absolute_path_validation() - if self.bucket and not self.key: - return True - return self._accessor.is_dir(self) - - def is_file(self: "Pathy") -> bool: - """Determine if the path points to a blob in the bucket. - - Returns True if the path points to a blob. - Returns False if it points to a bucket or blob prefix, or if the path doesn’t - exist. - """ - self._absolute_path_validation() - if not self.bucket or not self.key: - return False - try: - return bool(self.stat()) - except (ClientError, FileNotFoundError): - return False - - def iterdir(self: "Pathy") -> Generator["Pathy", None, None]: - """Iterate over the blobs found in the given bucket or blob prefix path.""" - self._absolute_path_validation() - yield from super().iterdir() # type:ignore - - def glob(self: "Pathy", pattern: str) -> Generator["Pathy", None, None]: - """Perform a glob match relative to this Pathy instance, yielding all matched - blobs.""" - yield from super().glob(pattern) # type:ignore - - def rglob(self: "Pathy", pattern: str) -> Generator["Pathy", None, None]: - """Perform a recursive glob match relative to this Pathy instance, yielding - all matched blobs. Imagine adding "**/" before a call to glob.""" - yield from super().rglob(pattern) # type:ignore - - def open( # type:ignore - self: "Pathy", - mode: str = "r", - buffering: int = DEFAULT_BUFFER_SIZE, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, - ) -> StreamableType: - """Open the given blob for streaming. This delegates to the `smart_open` - library that handles large file streaming for a number of bucket API - providers.""" - self._absolute_path_validation() - if mode not in _SUPPORTED_OPEN_MODES: - raise ValueError( - "supported modes are {} got {}".format(_SUPPORTED_OPEN_MODES, mode) - ) - if buffering == 0 or buffering == 1: - raise ValueError( - "supported buffering values are only block sizes, no 0 or 1" - ) - if "b" in mode and encoding: - raise ValueError("binary mode doesn't take an encoding argument") - - # Leftover pathlib internals stuff - if hasattr(self, "_closed") and self._closed: # type:ignore - self._raise_closed() # type:ignore - return self._accessor.open( - self, - mode=mode, - buffering=buffering, - encoding=encoding, - errors=errors, - newline=newline, - ) - - def owner(self: "Pathy") -> Optional[str]: # type:ignore[override] - """Returns the name of the user that owns the bucket or blob - this path points to. Returns None if the owner is unknown or - not supported by the bucket API provider.""" - self._absolute_path_validation() - if not self.is_file(): - raise FileNotFoundError(str(self)) - return self._accessor.owner(self) - - def resolve(self, strict: bool = False) -> "Pathy": - """Resolve the given path to remove any relative path specifiers. - - ```python - from pathy import Pathy - - path = Pathy("gs://my_bucket/folder/../blob") - assert path.resolve() == Pathy("gs://my_bucket/blob") - ``` - """ - self._absolute_path_validation() - return self._accessor.resolve(self, strict=strict) - - def rename(self: "Pathy", target: Union[str, PurePath]) -> "Pathy": # type:ignore - """Rename this path to the given target. - - If the target exists and is a file, it will be replaced silently if the user - has permission. - - If path is a blob prefix, it will replace all the blobs with the same prefix - to match the target prefix.""" - self._absolute_path_validation() - self_type = type(self) - result = target if isinstance(target, self_type) else self_type(target) - result._absolute_path_validation() # type:ignore - super().rename(result) - return result - - def replace(self: "Pathy", target: Union[str, PurePath]) -> "Pathy": # type:ignore - """Renames this path to the given target. - - If target points to an existing path, it will be replaced.""" - return self.rename(target) - - def rmdir(self: "Pathy") -> None: - """Removes this bucket or blob prefix. It must be empty.""" - self._absolute_path_validation() - if self.is_file(): - raise NotADirectoryError() - if not self.is_dir(): - raise FileNotFoundError() - super().rmdir() - - def samefile(self: "Pathy", other_path: Union[str, bytes, int, Path]) -> bool: - """Determine if this path points to the same location as other_path.""" - self._absolute_path_validation() - if not isinstance(other_path, Path): - other_path = type(self)(other_path) # type:ignore - assert isinstance(other_path, Pathy) - return ( - self.bucket == other_path.bucket and self.key == self.key and self.is_file() - ) - - def touch(self: "Pathy", mode: int = 0o666, exist_ok: bool = True) -> None: - """Create a blob at this path. - - If the blob already exists, the function succeeds if exist_ok is true - (and its modification time is updated to the current time), otherwise - FileExistsError is raised. - """ - if self.exists() and not exist_ok: - raise FileExistsError() - self.write_text("") - - def mkdir( - self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False - ) -> None: - """Create a bucket from the given path. Since bucket APIs only have implicit - folder structures (determined by the existence of a blob with an overlapping - prefix) this does nothing other than create buckets. - - If parents is False, the bucket will only be created if the path points to - exactly the bucket and nothing else. If parents is true the bucket will be - created even if the path points to a specific blob. - - The mode param is ignored. - - Raises FileExistsError if exist_ok is false and the bucket already exists. - """ - try: - if self.bucket is None: - raise FileNotFoundError("No bucket in {} {}".format(type(self), self)) - # If the whole path is just the bucket, respect the result of "bucket.exists()" - if self.key is None and not exist_ok and self.bucket.exists(): - raise FileExistsError("Bucket {} already exists".format(self.bucket)) - return super().mkdir(mode, parents=parents, exist_ok=exist_ok) - except OSError: - if not exist_ok: - raise - - def is_mount(self: "Pathy") -> bool: - return False - - def is_symlink(self: "Pathy") -> bool: - return False - - def is_socket(self: "Pathy") -> bool: - return False - - def is_fifo(self: "Pathy") -> bool: - return False - - # Unsupported operations below here - - @classmethod - def cwd(cls) -> "Pathy": - message = cls._NOT_SUPPORTED_MESSAGE.format(method=cls.cwd.__qualname__) - raise NotImplementedError(message) - - @classmethod - def home(cls) -> "Pathy": - message = cls._NOT_SUPPORTED_MESSAGE.format(method=cls.home.__qualname__) - raise NotImplementedError(message) - - def chmod(self: "Pathy", mode: int) -> None: - message = self._NOT_SUPPORTED_MESSAGE.format(method=self.chmod.__qualname__) - raise NotImplementedError(message) - - def expanduser(self: "Pathy") -> "Pathy": - message = self._NOT_SUPPORTED_MESSAGE.format( - method=self.expanduser.__qualname__ - ) - raise NotImplementedError(message) - - def lchmod(self: "Pathy", mode: int) -> None: - message = self._NOT_SUPPORTED_MESSAGE.format(method=self.lchmod.__qualname__) - raise NotImplementedError(message) - - def group(self: "Pathy") -> str: - message = self._NOT_SUPPORTED_MESSAGE.format(method=self.group.__qualname__) - raise NotImplementedError(message) - - def is_block_device(self: "Pathy") -> bool: - message = self._NOT_SUPPORTED_MESSAGE.format( - method=self.is_block_device.__qualname__ - ) - raise NotImplementedError(message) - - def is_char_device(self: "Pathy") -> bool: - message = self._NOT_SUPPORTED_MESSAGE.format( - method=self.is_char_device.__qualname__ - ) - raise NotImplementedError(message) - - def lstat(self: "Pathy") -> os.stat_result: - message = self._NOT_SUPPORTED_MESSAGE.format(method=self.lstat.__qualname__) - raise NotImplementedError(message) - - def symlink_to( - self, target: Union[str, Path], target_is_directory: bool = False - ) -> None: - message = self._NOT_SUPPORTED_MESSAGE.format( - method=self.symlink_to.__qualname__ - ) - raise NotImplementedError(message) - - -class PathyScanDir(Iterator[Any], ContextManager[Any]): - """A scandir implementation that works for all python 3.x versions. - - Python < 3.7 requires that scandir be iterable so it can be converted - to a list of results. - - Python >= 3.8 requires that scandir work as a context manager. - """ - - def __init__( - self, - client: BucketClient, - path: Optional[PurePathy] = None, - prefix: Optional[str] = None, - delimiter: Optional[str] = None, - ) -> None: - super().__init__() - self._client = client - self._path = path - self._prefix = prefix - self._delimiter = delimiter - self._generator = self.scandir() - - def __enter__(self) -> Generator[BucketEntry, None, None]: - return self._generator - - def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: - pass - - def __next__(self) -> Generator[BucketEntry, None, None]: - yield from self._generator - - def __iter__(self) -> Generator[BucketEntry, None, None]: - yield from self._generator - - def scandir(self) -> Generator[BucketEntry, None, None]: - raise NotImplementedError("must be implemented in a subclass") - - -# -# File system adapter -# - - -class BucketEntryFS(BucketEntry["BucketFS", pathlib.Path]): - ... - - -@dataclass -class BlobFS(Blob["BucketFS", pathlib.Path]): - raw: pathlib.Path - bucket: "BucketFS" - - def delete(self) -> None: - """Delete a file-based blob.""" - file_folder: str = os.path.dirname(self.raw) - self.raw.unlink() - # NOTE: in buckets folders only exist if a file is contained in them. Mimic - # that behavior here by removing empty folders when the last file is removed. - if len(os.listdir(file_folder)) == 0: - shutil.rmtree(file_folder) - - def exists(self) -> bool: - return self.raw.exists() - - -@dataclass -class BucketFS(Bucket): - name: str - bucket: pathlib.Path - - def get_blob(self, blob_name: str) -> Optional[BlobFS]: # type:ignore[override] - native_blob = self.bucket / blob_name - if not native_blob.exists() or native_blob.is_dir(): - return None - stat = native_blob.stat() - # path.owner() raises KeyError if the owner's UID isn't known - # - # https://docs.python.org/3/library/pathlib.html#pathlib.Path.owner - owner: Optional[str] - try: - owner = native_blob.owner() - except KeyError: - owner = None - return BlobFS( - bucket=self, - owner=owner, - name=blob_name, - raw=native_blob, - size=stat.st_size, - updated=int(round(stat.st_mtime)), - ) - - def copy_blob( # type:ignore[override] - self, blob: BlobFS, target: "BucketFS", name: str - ) -> Optional[BlobFS]: - in_file = str(blob.bucket.bucket / blob.name) - out_file = str(target.bucket / name) - out_path = pathlib.Path(os.path.dirname(out_file)) - if not out_path.exists(): - out_path.mkdir(parents=True) - shutil.copy(in_file, out_file) - return None - - def delete_blob(self, blob: BlobFS) -> None: # type:ignore[override] - blob.delete() - - def delete_blobs(self, blobs: List[BlobFS]) -> None: # type:ignore[override] - for blob in blobs: - blob.delete() - - def exists(self) -> bool: - return self.bucket.exists() - - -@dataclass -class BucketClientFS(BucketClient): - # Root to store file-system buckets as children of - root: pathlib.Path = field(default_factory=lambda: pathlib.Path("/tmp/")) - - def full_path(self, path: Pathy) -> pathlib.Path: - if path.root is None: - raise ValueError(f"Invalid bucket name for path: {path}") - full_path = self.root.absolute() / path.root - if path.key is not None: - full_path = full_path / path.key - return full_path - - def exists(self, path: Pathy) -> bool: - """Return True if the path exists as a file or folder on disk""" - return self.full_path(path).exists() - - def is_dir(self, path: Pathy) -> bool: - return self.full_path(path).is_dir() - - def rmdir(self, path: Pathy) -> None: - full_path = self.full_path(path) - return shutil.rmtree(str(full_path)) - - def open( - self, - path: Pathy, - *, - mode: str = "r", - buffering: int = DEFAULT_BUFFER_SIZE, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, - ) -> StreamableType: - if self.lookup_bucket(path) is None: - raise ClientError(message=f'bucket "{path.root}" does not exist', code=404) - - full_path = self.full_path(path) - if not full_path.exists(): - if full_path.name != "": - full_path = full_path.parent - full_path.mkdir(parents=True, exist_ok=True) - return super().open( - path, - mode=mode, - buffering=buffering, - encoding=encoding, - errors=errors, - newline=newline, - ) - - def make_uri(self, path: PurePathy) -> str: - if not path.root: - raise ValueError(f"cannot make a URI to an invalid bucket: {path.root}") - full_path = self.root.absolute() / path.root - if path.key is not None: - full_path /= path.key - result = f"file://{full_path}" - return result - - def create_bucket(self, path: PurePathy) -> Bucket: - if not path.root: - raise ValueError(f"Invalid bucket name: {path.root}") - bucket_path: pathlib.Path = self.root / path.root - if bucket_path.exists(): - raise FileExistsError(f"Bucket already exists at: {bucket_path}") - bucket_path.mkdir(parents=True, exist_ok=True) - return BucketFS(str(path.root), bucket=bucket_path) - - def delete_bucket(self, path: PurePathy) -> None: - bucket_path: pathlib.Path = self.root / str(path.root) - if bucket_path.exists(): - shutil.rmtree(bucket_path) - - def lookup_bucket(self, path: PurePathy) -> Optional[BucketFS]: - if path.root: - bucket_path: pathlib.Path = self.root / path.root - if bucket_path.exists(): - return BucketFS(str(path.root), bucket=bucket_path) - return None - - def get_bucket(self, path: PurePathy) -> BucketFS: - if not path.root: - raise ValueError(f"path has an invalid bucket_name: {path.root}") - bucket_path: pathlib.Path = self.root / path.root - if bucket_path.is_dir(): - return BucketFS(str(path.root), bucket=bucket_path) - raise FileNotFoundError(f"Bucket {path.root} does not exist!") - - def list_buckets(self, **kwargs: Dict[str, Any]) -> Generator[BucketFS, None, None]: - for f in self.root.glob("*"): - if f.is_dir(): - yield BucketFS(f.name, f) - - def scandir( - self, - path: Pathy = None, - prefix: Optional[str] = None, - delimiter: Optional[str] = None, - ) -> PathyScanDir: - return _FSScanDir(client=self, path=path, prefix=prefix, delimiter=delimiter) - - def list_blobs( - self, - path: PurePathy, - prefix: Optional[str] = None, - delimiter: Optional[str] = None, - include_dirs: bool = False, - ) -> Generator[BlobFS, None, None]: - assert path.root is not None - bucket = self.get_bucket(path) - scan_path = self.root / path.root - if prefix is not None: - scan_path = scan_path / prefix - elif prefix is not None and path.key is not None: - scan_path = scan_path / path.key - - # Path to a file - if scan_path.exists() and not scan_path.is_dir(): - stat = scan_path.stat() - file_size = stat.st_size - updated = int(round(stat.st_mtime_ns * 1000)) - yield BlobFS( - bucket, - name=scan_path.name, - size=file_size, - updated=updated, - owner=None, - raw=scan_path, - ) - - # Yield blobs for each file - for file_path in scan_path.rglob("*"): - if file_path.is_dir(): - continue - stat = file_path.stat() - file_size = stat.st_size - updated = int(round(stat.st_mtime_ns * 1000)) - yield BlobFS( - bucket, - name=f"{prefix if prefix is not None else ''}{file_path.name}", - size=file_size, - updated=updated, - owner=None, - raw=file_path, - ) - - -class _FSScanDir(PathyScanDir): - _client: BucketClientFS - - def scandir(self) -> Generator[BucketEntry[BucketFS, pathlib.Path], None, None]: - if self._path is None or not self._path.root: - for bucket in self._client.list_buckets(): - yield BucketEntryFS(bucket.name, is_dir=True, raw=None) - return - assert self._path is not None - assert self._path.root is not None - scan_path = self._client.root / self._path.root - if self._prefix is not None: - scan_path = scan_path / self._prefix - for dir_entry in scan_path.glob("*"): - if dir_entry.is_dir(): - yield BucketEntryFS(dir_entry.name, is_dir=True, raw=None) - else: - file_path = pathlib.Path(dir_entry) - stat = file_path.stat() - file_size = stat.st_size - updated = int(round(stat.st_mtime)) - blob: Blob = BlobFS( - self._client.get_bucket(self._path), - name=dir_entry.name, - size=file_size, - updated=updated, - owner=None, - raw=file_path, - ) - yield BucketEntryFS( - name=dir_entry.name, - is_dir=False, - size=file_size, - last_modified=updated, - raw=blob, - ) - - -# -# Client Registration -# - - -# TODO: Maybe this should be dynamic, but I'm trying to see if we can -# hardcode it (atleast the base schemes) to get nice types flowing -# in cases where they would otherwise be lost. -_client_registry: Dict[str, Type[BucketClient]] = { - "": BucketClientFS, - "file": BucketClientFS, -} - -# Hold given client args for a scheme -_client_args_registry: Dict[str, Any] = {} -_instance_cache: Dict[str, Any] = {} -_fs_client: Optional["BucketClientFS"] = None -_fs_cache: Optional[pathlib.Path] = None - - -def register_client(scheme: str, type: Type[BucketClient]) -> None: - """Register a bucket client for use with certain scheme Pathy objects""" - global _client_registry - _client_registry[scheme] = type - - -def get_client(scheme: str) -> BucketClientType: - """Retrieve the bucket client for use with a given scheme""" - global _client_registry, _instance_cache, _fs_client, _client_args_registry - if _fs_client is not None: - return _fs_client # type: ignore - if scheme in _instance_cache: - return _instance_cache[scheme] - elif scheme in _client_registry: - kwargs = ( - _client_args_registry[scheme] if scheme in _client_args_registry else {} - ) - _instance_cache[scheme] = _client_registry[scheme](**kwargs) # type:ignore - return _instance_cache[scheme] - raise ValueError(f'There is no client registered to handle "{scheme}" paths') - - -def set_client_params(scheme: str, **kwargs: Any) -> None: - """Specify args to pass when instantiating a service-specific Client - object. This allows for passing credentials in whatever way your underlying - client library prefers.""" - global _client_registry, _instance_cache, _client_args_registry - _client_args_registry[scheme] = kwargs - if scheme in _instance_cache: - _instance_cache[scheme].recreate(**_client_args_registry[scheme]) - return None - - -def use_fs( - root: Optional[Union[str, pathlib.Path, bool]] = None -) -> Optional[BucketClientFS]: - """Use a path in the local file-system to store blobs and buckets. - - This is useful for development and testing situations, and for embedded - applications.""" - global _fs_client - # False - disable adapter - if root is False: - _fs_client = None - return None - - # None or True - enable FS adapter with default root - if root is None or root is True: - # Look up "data" folder of pathy package similar to spaCy - client_root = pathlib.Path(__file__).parent / "data" - else: - assert isinstance( - root, (str, pathlib.Path) - ), f"root is not a known type: {type(root)}" - client_root = pathlib.Path(root) - if not client_root.exists(): - client_root.mkdir(parents=True) - _fs_client = BucketClientFS(root=client_root) - return _fs_client - - -def get_fs_client() -> Optional[BucketClientFS]: - """Get the file-system client (or None)""" - global _fs_client - assert _fs_client is None or isinstance( - _fs_client, BucketClientFS - ), "invalid root type" - return _fs_client - - -def use_fs_cache( - root: Optional[Union[str, pathlib.Path, bool]] = None -) -> Optional[pathlib.Path]: - """Use a path in the local file-system to cache blobs and buckets. - - This is useful for when you want to avoid fetching large blobs multiple - times, or need to pass a local file path to a third-party library.""" - global _fs_cache - # False - disable adapter - if root is False: - _fs_cache = None - return None - - # None or True - enable FS cache with default root - if root is None or root is True: - # Use a temporary folder. Cache will be removed according to OS policy - cache_root = pathlib.Path(tempfile.mkdtemp()) - else: - assert isinstance( - root, (str, pathlib.Path) - ), f"root is not a known type: {type(root)}" - cache_root = pathlib.Path(root) - if not cache_root.exists(): - cache_root.mkdir(parents=True) - _fs_cache = cache_root - return cache_root - - -def get_fs_cache() -> Optional[pathlib.Path]: - """Get the folder that holds file-system cached blobs and timestamps.""" - global _fs_cache - assert _fs_cache is None or isinstance(_fs_cache, pathlib.Path), "invalid root type" - return _fs_cache - - -def clear_fs_cache(force: bool = False) -> None: - """Remove the existing file-system blob cache folder. - - Raises AssertionError if the cache path is unset or points to the - root of the file-system.""" - cache_path = get_fs_cache() - assert cache_path is not None, "no cache to clear" - resolved = cache_path.resolve() - assert str(resolved) != "/", f"refusing to remove a root path: {resolved}" - shutil.rmtree(str(resolved)) diff --git a/pathy/cli.py b/pathy/cli.py index 8960abf..fbb985e 100644 --- a/pathy/cli.py +++ b/pathy/cli.py @@ -2,7 +2,7 @@ import typer -from .base import FluidPath, Pathy +from . import FluidPath, Pathy app = typer.Typer(help="Pathy command line interface.") diff --git a/pathy/gcs.py b/pathy/gcs.py index 00259e0..ad90f16 100644 --- a/pathy/gcs.py +++ b/pathy/gcs.py @@ -1,34 +1,10 @@ -from dataclasses import dataclass -from typing import Any, Dict, Generator, List, Optional +from . import BucketClient, register_client -from .base import ( - Blob, - Bucket, - BucketClient, - BucketEntry, - ClientError, - PathyScanDir, - PurePathy, - register_client, -) -try: - from google.api_core import exceptions as gcs_errors # type:ignore - from google.auth.exceptions import DefaultCredentialsError # type:ignore - from google.cloud.storage import Blob as GCSNativeBlob # type:ignore - from google.cloud.storage import Bucket as GCSNativeBucket # type:ignore - from google.cloud.storage import Client as GCSNativeClient # type:ignore - - has_gcs = True -except ImportError: - GCSNativeBlob = Any - DefaultCredentialsError = BaseException - gcs_errors = Any - GCSNativeBucket = Any - GCSNativeClient = Any - has_gcs = False - -_MISSING_DEPS = """You are using the GCS functionality of Pathy without +class GCSMissingClient(BucketClient): + def __init__(self) -> None: + raise AssertionError( + """You are using the GCS functionality of Pathy without having the required dependencies installed. Please try installing them: @@ -36,243 +12,17 @@ pip install pathy[gcs] """ - - -class BucketEntryGCS(BucketEntry["BucketGCS", GCSNativeBlob]): - ... - - -@dataclass -class BlobGCS(Blob[GCSNativeBucket, GCSNativeBlob]): - def delete(self) -> None: - self.raw.delete() - - def exists(self) -> bool: - return self.raw.exists() - - -@dataclass -class BucketGCS(Bucket): - name: str - bucket: GCSNativeBucket - - def get_blob(self, blob_name: str) -> Optional[BlobGCS]: - assert isinstance( - blob_name, str - ), f"expected str blob name, but found: {type(blob_name)}" - native_blob = None - try: - native_blob = self.bucket.get_blob(blob_name) - except gcs_errors.ClientError: - pass - if native_blob is None: - return None - return BlobGCS( - bucket=self.bucket, - owner=native_blob.owner, - name=native_blob.name, - raw=native_blob, - size=native_blob.size, - updated=int(native_blob.updated.timestamp()), ) - def copy_blob( # type:ignore[override] - self, blob: BlobGCS, target: "BucketGCS", name: str - ) -> Optional[BlobGCS]: - assert blob.raw is not None, "raw storage.Blob instance required" - native_blob = self.bucket.copy_blob(blob.raw, target.bucket, name) - if native_blob is None: - return None - return BlobGCS( - bucket=self.bucket, - owner=native_blob.owner, - name=native_blob.name, - raw=native_blob, - size=native_blob.size, - updated=int(native_blob.updated.timestamp()), - ) - - def delete_blob(self, blob: BlobGCS) -> None: # type:ignore[override] - return self.bucket.delete_blob(blob.name) - - def delete_blobs(self, blobs: List[BlobGCS]) -> None: # type:ignore[override] - return self.bucket.delete_blobs(blobs) - - def exists(self) -> bool: - try: - return self.bucket.exists() - except gcs_errors.ClientError: - return False - -class BucketClientGCS(BucketClient): - client: Optional[GCSNativeClient] - - @property - def client_params(self) -> Any: - return dict(client=self.client) - - def __init__(self, **kwargs: Any) -> None: - self.recreate(**kwargs) - - def recreate(self, **kwargs: Any) -> None: - creds = kwargs["credentials"] if "credentials" in kwargs else None - if creds is not None: - kwargs["project"] = creds.project_id - try: - self.client = GCSNativeClient(**kwargs) - except TypeError: - # TypeError is raised if the imports for GCSNativeClient fail and are - # assigned to Any, which is not callable. - self.client = None - - def make_uri(self, path: PurePathy) -> str: - return str(path) - - def create_bucket(self, path: PurePathy) -> Bucket: - assert self.client is not None, _MISSING_DEPS - return self.client.create_bucket(path.root) - - def delete_bucket(self, path: PurePathy) -> None: - assert self.client is not None, _MISSING_DEPS - bucket = self.client.get_bucket(path.root) - bucket.delete() - - def exists(self, path: PurePathy) -> bool: - # Because we want all the parents of a valid blob (e.g. "directory" in - # "directory/foo.file") to return True, we enumerate the blobs with a prefix - # and compare the object names to see if they match a substring of the path - key_name = str(path.key) - try: - for obj in self.list_blobs(path): - if obj.name == key_name: - return True - if obj.name.startswith(key_name + path._flavour.sep): - return True - except gcs_errors.ClientError: - return False - return False - - def lookup_bucket(self, path: PurePathy) -> Optional[BucketGCS]: - assert self.client is not None, _MISSING_DEPS - try: - native_bucket = self.client.bucket(path.root) - if native_bucket is not None: - return BucketGCS(str(path.root), bucket=native_bucket) - except gcs_errors.ClientError as err: - print(err) - - return None - - def get_bucket(self, path: PurePathy) -> BucketGCS: - assert self.client is not None, _MISSING_DEPS - try: - native_bucket = self.client.bucket(path.root) - if native_bucket is not None: - return BucketGCS(str(path.root), bucket=native_bucket) - raise FileNotFoundError(f"Bucket {path.root} does not exist!") - except gcs_errors.ClientError as e: - raise ClientError(message=e.message, code=e.code) - - def list_buckets( - self, **kwargs: Dict[str, Any] - ) -> Generator[GCSNativeBucket, None, None]: - assert self.client is not None, _MISSING_DEPS - return self.client.list_buckets(**kwargs) # type:ignore - - def scandir( # type:ignore[override] - self, - path: Optional[PurePathy] = None, - prefix: Optional[str] = None, - delimiter: Optional[str] = None, - ) -> PathyScanDir: - return _GCSScanDir(client=self, path=path, prefix=prefix, delimiter=delimiter) - - def list_blobs( - self, - path: PurePathy, - prefix: Optional[str] = None, - delimiter: Optional[str] = None, - include_dirs: bool = False, - ) -> Generator[BlobGCS, None, None]: - assert self.client is not None, _MISSING_DEPS - continuation_token = None - bucket = self.lookup_bucket(path) - if bucket is None: - return - while True: - if continuation_token: - response = self.client.list_blobs( - path.root, - prefix=prefix, - delimiter=delimiter, - page_token=continuation_token, - ) - else: - response = self.client.list_blobs( - path.root, prefix=prefix, delimiter=delimiter - ) - for page in response.pages: - for item in page: - yield BlobGCS( - bucket=bucket, - owner=item.owner, - name=item.name, - raw=item, - size=item.size, - updated=item.updated.timestamp(), - ) - if response.next_page_token is None: - break - continuation_token = response.next_page_token - - -class _GCSScanDir(PathyScanDir): - _client: BucketClientGCS +has_gcs: bool +try: + from ._gcs import BucketClientGCS - def scandir(self) -> Generator[BucketEntryGCS, None, None]: - assert self._client.client is not None, _MISSING_DEPS - continuation_token = None - if self._path is None or not self._path.root: - gcs_bucket: GCSNativeBucket - for gcs_bucket in self._client.client.list_buckets(): - yield BucketEntryGCS(gcs_bucket.name, is_dir=True, raw=None) - return - sep = self._path._flavour.sep - bucket = self._client.lookup_bucket(self._path) - if bucket is None: - return - while True: - if continuation_token: - response = self._client.client.list_blobs( - bucket.name, - prefix=self._prefix, - delimiter=sep, - page_token=continuation_token, - ) - else: - response = self._client.client.list_blobs( - bucket.name, prefix=self._prefix, delimiter=sep - ) - for page in response.pages: - for folder in list(page.prefixes): - full_name = folder[:-1] if folder.endswith(sep) else folder - name = full_name.split(sep)[-1] - if name: - yield BucketEntryGCS(name, is_dir=True, raw=None) - for item in page: - name = item.name.split(sep)[-1] - if name: - yield BucketEntryGCS( - name=name, - is_dir=False, - size=item.size, - last_modified=item.updated.timestamp(), - raw=item, - ) - if response.next_page_token is None: - break - continuation_token = response.next_page_token + has_gcs = bool(BucketClientGCS) +except ImportError: + register_client("gs", GCSMissingClient) + has_gcs = False -register_client("gs", BucketClientGCS) \ No newline at end of file +__all__ = ("has_gcs",) diff --git a/tools/clean.sh b/tools/clean.sh new file mode 100644 index 0000000..3c1a219 --- /dev/null +++ b/tools/clean.sh @@ -0,0 +1,4 @@ +#!/bin/bash +set -e +echo "Removing build files..." +rm -rf dist/ build/ pathy.egg-info htmlcov diff --git a/tools/lint.sh b/tools/lint.sh index 4556ab6..c0dc817 100644 --- a/tools/lint.sh +++ b/tools/lint.sh @@ -4,7 +4,7 @@ set -e . .env/bin/activate echo "========================= mypy" -mypy pathy +mypy pathy --strict-equality --disallow-untyped-calls --disallow-untyped-defs echo "========================= flake8" flake8 pathy echo "========================= black" diff --git a/tools/test.sh b/tools/test.sh index 2c81278..479ec9b 100644 --- a/tools/test.sh +++ b/tools/test.sh @@ -3,5 +3,5 @@ set -e echo "Activating virtualenv... (if this fails you may need to run setup.sh first)" . .env/bin/activate echo "Running tests..." -pytest pathy/tests --cov=pathy +pytest pathy/_tests --cov=pathy