diff --git a/cloudpathlib/gs/gsclient.py b/cloudpathlib/gs/gsclient.py index 25c47055..ee0d517e 100644 --- a/cloudpathlib/gs/gsclient.py +++ b/cloudpathlib/gs/gsclient.py @@ -1,8 +1,20 @@ +import dataclasses from datetime import datetime import mimetypes import os from pathlib import Path, PurePosixPath -from typing import Any, Callable, Dict, Iterable, Optional, TYPE_CHECKING, Tuple, Union +from typing import ( + Any, + Callable, + Dict, + Iterable, + Optional, + TYPE_CHECKING, + Tuple, + Union, + MutableMapping, +) +from weakref import WeakKeyDictionary from ..client import Client, register_client_class from ..cloudpath import implementation_registry @@ -13,13 +25,18 @@ from google.auth.credentials import Credentials from google.auth.exceptions import DefaultCredentialsError - from google.cloud.storage import Client as StorageClient + from google.cloud.storage import Client as StorageClient, Bucket except ModuleNotFoundError: implementation_registry["gs"].dependencies_loaded = False +@dataclasses.dataclass +class PathMetadata: + is_file_or_dir: Optional[str] + + @register_client_class("gs") class GSClient(Client): """Client class for Google Cloud Storage which handles authentication with GCP for @@ -85,15 +102,17 @@ def __init__( except DefaultCredentialsError: self.client = StorageClient.create_anonymous_client() + self._metadata_cache: MutableMapping[GSPath, PathMetadata] = WeakKeyDictionary() super().__init__(local_cache_dir=local_cache_dir, content_type_method=content_type_method) def _get_metadata(self, cloud_path: GSPath) -> Optional[Dict[str, Any]]: - bucket = self.client.bucket(cloud_path.bucket) + bucket: Bucket = self.client.bucket(cloud_path.bucket) blob = bucket.get_blob(cloud_path.blob) if blob is None: return None else: + self._set_metadata_cache(cloud_path, "file") return { "etag": blob.etag, "size": blob.size, @@ -102,23 +121,25 @@ def _get_metadata(self, cloud_path: GSPath) -> Optional[Dict[str, Any]]: } def _download_file(self, cloud_path: GSPath, local_path: Union[str, os.PathLike]) -> Path: - bucket = self.client.bucket(cloud_path.bucket) + bucket: Bucket = self.client.bucket(cloud_path.bucket) blob = bucket.get_blob(cloud_path.blob) - - local_path = Path(local_path) + self._set_metadata_cache(cloud_path, "file") blob.download_to_filename(local_path) - return local_path + return Path(local_path) def _is_file_or_dir(self, cloud_path: GSPath) -> Optional[str]: # short-circuit the root-level bucket if not cloud_path.blob: return "dir" + if cloud_path in self._metadata_cache: + return self._metadata_cache[cloud_path].is_file_or_dir - bucket = self.client.bucket(cloud_path.bucket) + bucket: Bucket = self.client.bucket(cloud_path.bucket) blob = bucket.get_blob(cloud_path.blob) if blob is not None: + self._set_metadata_cache(cloud_path, "file") return "file" else: prefix = cloud_path.blob @@ -130,15 +151,17 @@ def _is_file_or_dir(self, cloud_path: GSPath) -> Optional[str]: # at least one key with the prefix of the directory if bool(list(f)): + self._set_metadata_cache(cloud_path, "dir") return "dir" else: + self._set_metadata_cache(cloud_path, None) return None def _exists(self, cloud_path: GSPath) -> bool: - return self._is_file_or_dir(cloud_path) in ["file", "dir"] + return self._is_file_or_dir(cloud_path) is not None def _list_dir(self, cloud_path: GSPath, recursive=False) -> Iterable[Tuple[GSPath, bool]]: - bucket = self.client.bucket(cloud_path.bucket) + bucket: Bucket = self.client.bucket(cloud_path.bucket) prefix = cloud_path.blob if prefix and not prefix.endswith("/"): @@ -154,13 +177,15 @@ def _list_dir(self, cloud_path: GSPath, recursive=False) -> Iterable[Tuple[GSPat # if we haven't surfaced thei directory already if parent not in yielded_dirs and str(parent) != ".": - # skip if not recursive and this is beyond our depth if not recursive and "/" in str(parent): continue + path = self.CloudPath(f"gs://{cloud_path.bucket}/{prefix}{parent}") + self._set_metadata_cache(path, "dir") + yield ( - self.CloudPath(f"gs://{cloud_path.bucket}/{prefix}{parent}"), + path, True, # is a directory ) yielded_dirs.add(parent) @@ -169,12 +194,15 @@ def _list_dir(self, cloud_path: GSPath, recursive=False) -> Iterable[Tuple[GSPat if not recursive and "/" in o.name[len(prefix) :]: continue - yield (self.CloudPath(f"gs://{cloud_path.bucket}/{o.name}"), False) # is a file + path = self.CloudPath(f"gs://{cloud_path.bucket}/{o.name}") + self._set_metadata_cache(path, "file") + + yield path, False # is a file def _move_file(self, src: GSPath, dst: GSPath, remove_src: bool = True) -> GSPath: # just a touch, so "REPLACE" metadata if src == dst: - bucket = self.client.bucket(src.bucket) + bucket: Bucket = self.client.bucket(src.bucket) blob = bucket.get_blob(src.blob) # See https://github.com/googleapis/google-cloud-python/issues/1185#issuecomment-431537214 @@ -185,13 +213,15 @@ def _move_file(self, src: GSPath, dst: GSPath, remove_src: bool = True) -> GSPat blob.patch() else: - src_bucket = self.client.bucket(src.bucket) - dst_bucket = self.client.bucket(dst.bucket) + src_bucket: Bucket = self.client.bucket(src.bucket) + dst_bucket: Bucket = self.client.bucket(dst.bucket) src_blob = src_bucket.get_blob(src.blob) src_bucket.copy_blob(src_blob, dst_bucket, dst.blob) + self._set_metadata_cache(dst, "file") if remove_src: + self._set_metadata_cache(src, None) src_blob.delete() return dst @@ -199,22 +229,25 @@ def _move_file(self, src: GSPath, dst: GSPath, remove_src: bool = True) -> GSPat def _remove(self, cloud_path: GSPath, missing_ok: bool = True) -> None: file_or_dir = self._is_file_or_dir(cloud_path) if file_or_dir == "dir": - blobs = [ - b.blob for b, is_dir in self._list_dir(cloud_path, recursive=True) if not is_dir + files = [ + path for path, is_dir in self._list_dir(cloud_path, recursive=True) if not is_dir ] - bucket = self.client.bucket(cloud_path.bucket) - for blob in blobs: - bucket.get_blob(blob).delete() + bucket: Bucket = self.client.bucket(cloud_path.bucket) + for path in files: + bucket.get_blob(path.blob).delete() + self._set_metadata_cache(path, None) + self._set_metadata_cache(cloud_path, None) elif file_or_dir == "file": bucket = self.client.bucket(cloud_path.bucket) bucket.get_blob(cloud_path.blob).delete() + self._set_metadata_cache(cloud_path, None) else: # Does not exist if not missing_ok: raise FileNotFoundError(f"File does not exist: {cloud_path}") def _upload_file(self, local_path: Union[str, os.PathLike], cloud_path: GSPath) -> GSPath: - bucket = self.client.bucket(cloud_path.bucket) + bucket: Bucket = self.client.bucket(cloud_path.bucket) blob = bucket.blob(cloud_path.blob) extra_args = {} @@ -223,7 +256,23 @@ def _upload_file(self, local_path: Union[str, os.PathLike], cloud_path: GSPath) extra_args["content_type"] = content_type blob.upload_from_filename(str(local_path), **extra_args) + self._set_metadata_cache(cloud_path, "file") return cloud_path + def _set_metadata_cache(self, cloud_path: GSPath, is_file_or_dir: Optional[str]) -> None: + if is_file_or_dir is None: + self._metadata_cache[cloud_path] = PathMetadata(is_file_or_dir=is_file_or_dir) + # If a file/dir is now known to not exist, its parent directories may no longer exist + # either, since cloud directories only exist if they have a file in them. Since their + # state is no longer known we remove them from the cache. + for parent in cloud_path.parents: + if parent in self._metadata_cache: + del self._metadata_cache[parent] + else: + self._metadata_cache[cloud_path] = PathMetadata(is_file_or_dir=is_file_or_dir) + + def clear_metadata_cache(self) -> None: + self._metadata_cache.clear() + GSClient.GSPath = GSClient.CloudPath # type: ignore diff --git a/cloudpathlib/local/implementations/gs.py b/cloudpathlib/local/implementations/gs.py index d121bff2..c7ab0f73 100644 --- a/cloudpathlib/local/implementations/gs.py +++ b/cloudpathlib/local/implementations/gs.py @@ -14,6 +14,9 @@ class LocalGSClient(LocalClient): _cloud_meta = local_gs_implementation + def clear_metadata_cache(self): + pass + LocalGSClient.GSPath = LocalGSClient.CloudPath # type: ignore