diff --git a/README.md b/README.md
index 4ba33ad..1c921a9 100644
--- a/README.md
+++ b/README.md
@@ -82,7 +82,7 @@ print(fluid_path.prefix)
## from_bucket classmethod
```python
-Pathy.from_bucket(bucket_name: str) -> ~PathType
+Pathy.from_bucket(bucket_name: str) -> 'Pathy'
```
Initialize a Pathy from a bucket name. This helper adds a trailing slash and
@@ -97,9 +97,9 @@ assert str(Pathy.from_bucket("two")) == "gs://two/"
```python
Pathy.glob(
- self: ~PathType,
+ self: 'Pathy',
pattern: str,
-) -> Generator[~PathType, NoneType, NoneType]
+) -> Generator[Pathy, NoneType, NoneType]
```
Perform a glob match relative to this Pathy instance, yielding all matched
@@ -108,7 +108,7 @@ blobs.
## is_dir method
```python
-Pathy.is_dir(self: ~PathType) -> bool
+Pathy.is_dir(self: 'Pathy') -> bool
```
Determine if the path points to a bucket or a prefix of a given blob
@@ -120,7 +120,7 @@ Returns False if it points to a blob or the path doesn't exist.
## is_file method
```python
-Pathy.is_file(self: ~PathType) -> bool
+Pathy.is_file(self: 'Pathy') -> bool
```
Determine if the path points to a blob in the bucket.
@@ -132,7 +132,9 @@ exist.
## iterdir method
```python
-Pathy.iterdir(self: ~PathType) -> Generator[~PathType, NoneType, NoneType]
+Pathy.iterdir(
+ self: 'Pathy',
+) -> Generator[Pathy, NoneType, NoneType]
```
Iterate over the blobs found in the given bucket or blob prefix path.
@@ -164,7 +166,7 @@ Raises FileExistsError if exist_ok is false and the bucket already exists.
```python
Pathy.open(
- self: ~PathType,
+ self: 'Pathy',
mode: str = 'r',
buffering: int = 8192,
encoding: Optional[str] = None,
@@ -204,7 +206,7 @@ to match the target prefix.
## replace method
```python
-Pathy.replace(self: ~PathType, target: Union[str, pathlib.PurePath]) -> None
+Pathy.replace(self: 'Pathy', target: Union[str, pathlib.PurePath]) -> None
```
Renames this path to the given target.
@@ -228,9 +230,9 @@ assert path.resolve() == Pathy("gs://my_bucket/blob")
```python
Pathy.rglob(
- self: ~PathType,
+ self: 'Pathy',
pattern: str,
-) -> Generator[~PathType, NoneType, NoneType]
+) -> Generator[Pathy, NoneType, NoneType]
```
Perform a recursive glob match relative to this Pathy instance, yielding
@@ -258,7 +260,7 @@ Determine if this path points to the same location as other_path.
## stat method
```python
-Pathy.stat(self: ~PathType) -> pathy.base.BlobStat
+Pathy.stat(self: 'Pathy') -> pathy.base.BlobStat
```
Returns information about this bucket path.
@@ -267,7 +269,7 @@ Returns information about this bucket path.
```python
Pathy.to_local(
- blob_path: Union[~PathType, str],
+ blob_path: Union[Pathy, str],
recurse: bool = True,
) -> pathlib.Path
```
@@ -280,7 +282,7 @@ as their updated timestamps change.
## touch method
```python
-Pathy.touch(self: ~PathType, mode: int = 438, exist_ok: bool = True) -> None
+Pathy.touch(self: 'Pathy', mode: int = 438, exist_ok: bool = True) -> None
```
Create a blob at this path.
diff --git a/pathy/base.py b/pathy/base.py
index 65e9ea6..73d5450 100644
--- a/pathy/base.py
+++ b/pathy/base.py
@@ -22,7 +22,7 @@
import smart_open
SUBCLASS_ERROR = "must be implemented in a subclass"
-PathType = TypeVar("PathType", bound="Pathy")
+
StreamableType = IO[Any]
FluidPath = Union["Pathy", Path]
BucketClientType = TypeVar("BucketClientType", bound="BucketClient")
@@ -133,7 +133,7 @@ class BucketClient:
def open(
self,
- path: PathType,
+ path: "Pathy",
*,
mode: str = "r",
buffering: int = DEFAULT_BUFFER_SIZE,
@@ -152,22 +152,22 @@ def open(
ignore_ext=True,
) # type:ignore
- def make_uri(self, path: PathType) -> str:
+ def make_uri(self, path: "Pathy") -> str:
return path.as_uri()
- def is_dir(self, path: PathType) -> bool:
+ def is_dir(self, path: "Pathy") -> bool:
return any(self.list_blobs(path, prefix=path.prefix))
- def rmdir(self, path: PathType) -> None:
+ def rmdir(self, path: "Pathy") -> None:
return None
- def exists(self, path: PathType) -> bool:
+ def exists(self, path: "Pathy") -> bool:
raise NotImplementedError(SUBCLASS_ERROR)
- def lookup_bucket(self, path: PathType) -> Optional[Bucket]:
+ def lookup_bucket(self, path: "Pathy") -> Optional[Bucket]:
raise NotImplementedError(SUBCLASS_ERROR)
- def get_bucket(self, path: PathType) -> Bucket:
+ def get_bucket(self, path: "Pathy") -> Bucket:
raise NotImplementedError(SUBCLASS_ERROR)
def list_buckets(self) -> Generator[Bucket, None, None]:
@@ -175,7 +175,7 @@ def list_buckets(self) -> Generator[Bucket, None, None]:
def list_blobs(
self,
- path: PathType,
+ path: "Pathy",
prefix: Optional[str] = None,
delimiter: Optional[str] = None,
include_dirs: bool = False,
@@ -184,16 +184,16 @@ def list_blobs(
def scandir(
self,
- path: PathType = None,
+ path: "Pathy" = None,
prefix: Optional[str] = None,
delimiter: Optional[str] = None,
) -> Generator[BucketEntry[BucketType, BucketBlobType], None, None]:
raise NotImplementedError(SUBCLASS_ERROR)
- def create_bucket(self, path: PathType) -> Bucket:
+ def create_bucket(self, path: "Pathy") -> Bucket:
raise NotImplementedError(SUBCLASS_ERROR)
- def delete_bucket(self, path: PathType) -> None:
+ def delete_bucket(self, path: "Pathy") -> None:
raise NotImplementedError(SUBCLASS_ERROR)
@@ -215,7 +215,7 @@ def parse_parts(self, parts: List[str]) -> Tuple[str, str, List[str]]:
parsed.remove(part)
return drv, root, parsed
- def make_uri(self, path: PathType) -> str:
+ def make_uri(self, path: "Pathy") -> str:
uri = super().make_uri(path)
return uri.replace("file:///", "gs://")
@@ -295,13 +295,13 @@ class BucketsAccessor(_Accessor):
_client: Optional[BucketClient]
- def client(self, path: PathType) -> BucketClient:
+ def client(self, path: "Pathy") -> BucketClient:
# lazily avoid circular imports
from .clients import get_client
return get_client(path.scheme)
- def get_blob(self, path: PathType) -> Optional[Blob]:
+ def get_blob(self, path: "Pathy") -> Optional[Blob]:
"""Get the blob associated with a path or return None"""
if not path.root:
return None
@@ -311,7 +311,7 @@ def get_blob(self, path: PathType) -> Optional[Blob]:
key_name = str(path.key)
return bucket.get_blob(key_name)
- def unlink(self, path: PathType) -> None:
+ def unlink(self, path: "Pathy") -> None:
"""Delete a link to a blob in a bucket."""
bucket = self.client(path).get_bucket(path)
blob: Optional[Blob] = bucket.get_blob(str(path.key))
@@ -319,21 +319,21 @@ def unlink(self, path: PathType) -> None:
raise FileNotFoundError(path)
blob.delete()
- def stat(self, path: PathType) -> BlobStat:
+ def stat(self, path: "Pathy") -> BlobStat:
bucket = self.client(path).get_bucket(path)
blob: Optional[Blob] = bucket.get_blob(str(path.key))
if blob is None:
raise FileNotFoundError(path)
return BlobStat(size=blob.size, last_modified=blob.updated)
- def is_dir(self, path: PathType) -> bool:
+ def is_dir(self, path: "Pathy") -> bool:
if str(path) == path.root:
return True
if self.get_blob(path) is not None:
return False
return self.client(path).is_dir(path)
- def exists(self, path: PathType) -> bool:
+ def exists(self, path: "Pathy") -> bool:
client = self.client(path)
if not path.root:
return any(client.list_buckets())
@@ -353,16 +353,16 @@ def exists(self, path: PathType) -> bool:
# Determine if the path exists according to the current adapter
return client.exists(path)
- def scandir(self, path: PathType) -> Generator[BucketEntry, None, None]:
+ def scandir(self, path: "Pathy") -> Generator[BucketEntry, None, None]:
return self.client(path).scandir(path, prefix=path.prefix)
- def listdir(self, path: PathType) -> Generator[str, None, None]:
+ def listdir(self, path: "Pathy") -> Generator[str, None, None]:
for entry in self.scandir(path):
yield entry.name
def open(
self,
- path: PathType,
+ path: "Pathy",
*,
mode: str = "r",
buffering: int = -1,
@@ -379,15 +379,15 @@ def open(
newline=newline,
)
- def owner(self, path: PathType) -> Optional[str]:
+ def owner(self, path: "Pathy") -> Optional[str]:
blob: Optional[Blob] = self.get_blob(path)
return blob.owner if blob is not None else None
- def resolve(self, path: PathType, strict: bool = False) -> "Pathy":
+ def resolve(self, path: "Pathy", strict: bool = False) -> "Pathy":
path_parts = str(path).replace(path.drive, "")
return Pathy(f"{path.drive}{os.path.abspath(path_parts)}")
- def rename(self, path: PathType, target: PathType) -> None:
+ def rename(self, path: "Pathy", target: "Pathy") -> None:
client: BucketClient = self.client(path)
bucket: Bucket = client.get_bucket(path)
target_bucket: Bucket = client.get_bucket(target)
@@ -412,10 +412,10 @@ def rename(self, path: PathType, target: PathType) -> None:
for blob in blobs:
bucket.delete_blob(blob)
- def replace(self, path: PathType, target: PathType) -> None:
+ def replace(self, path: "Pathy", target: "Pathy") -> None:
return self.rename(path, target)
- def rmdir(self, path: PathType) -> None:
+ def rmdir(self, path: "Pathy") -> None:
client: BucketClient = self.client(path)
key_name = str(path.key) if path.key is not None else None
bucket: Bucket = client.get_bucket(path)
@@ -427,7 +427,7 @@ def rmdir(self, path: PathType) -> None:
elif client.is_dir(path):
client.rmdir(path)
- def mkdir(self, path: PathType, mode: int) -> None:
+ def mkdir(self, path: "Pathy", mode: int) -> None:
client: BucketClient = self.client(path)
bucket: Optional[Bucket] = client.lookup_bucket(path)
if bucket is None or not bucket.exists():
@@ -447,10 +447,10 @@ class Pathy(Path, PurePathy):
_default_accessor = BucketsAccessor()
_NOT_SUPPORTED_MESSAGE = "{method} is an unsupported bucket operation"
- def __truediv__(self, key: Union[str, Path, "Pathy", PurePathy]) -> PathType: # type: ignore[override]
+ def __truediv__(self, key: Union[str, Path, "Pathy", PurePathy]) -> "Pathy": # type: ignore[override]
return super().__truediv__(key) # type:ignore
- def __rtruediv__(self, key: Union[str, Path, "Pathy", PurePathy]) -> PathType: # type: ignore[override]
+ def __rtruediv__(self, key: Union[str, Path, "Pathy", PurePathy]) -> "Pathy": # type: ignore[override]
return super().__rtruediv__(key) # type:ignore
def _init(self: "Pathy", template: Optional[Any] = None) -> None:
@@ -485,7 +485,7 @@ def fluid(cls, path_candidate: Union[str, FluidPath]) -> FluidPath:
return from_path
@classmethod
- def from_bucket(cls, bucket_name: str) -> PathType:
+ def from_bucket(cls, bucket_name: str) -> "Pathy":
"""Initialize a Pathy from a bucket name. This helper adds a trailing slash and
the appropriate prefix.
@@ -497,7 +497,7 @@ def from_bucket(cls, bucket_name: str) -> PathType:
return Pathy(f"gs://{bucket_name}/") # type:ignore
@classmethod
- def to_local(cls, blob_path: Union[PathType, str], recurse: bool = True) -> Path:
+ def to_local(cls, blob_path: Union["Pathy", str], recurse: bool = True) -> Path:
"""Download and cache either a blob or a set of blobs matching a prefix.
The cache is sensitive to the file updated time, and downloads new blobs
@@ -553,7 +553,7 @@ def to_local(cls, blob_path: Union[PathType, str], recurse: bool = True) -> Path
Pathy.to_local(blob, recurse=False)
return cache_blob
- def stat(self: PathType) -> BlobStat: # type: ignore[override]
+ def stat(self: "Pathy") -> BlobStat: # type: ignore[override]
"""Returns information about this bucket path."""
self._absolute_path_validation()
if not self.key:
@@ -565,7 +565,7 @@ def exists(self) -> bool:
self._absolute_path_validation()
return self._accessor.exists(self)
- def is_dir(self: PathType) -> bool:
+ def is_dir(self: "Pathy") -> bool:
"""Determine if the path points to a bucket or a prefix of a given blob
in the bucket.
@@ -577,7 +577,7 @@ def is_dir(self: PathType) -> bool:
return True
return self._accessor.is_dir(self)
- def is_file(self: PathType) -> bool:
+ def is_file(self: "Pathy") -> bool:
"""Determine if the path points to a blob in the bucket.
Returns True if the path points to a blob.
@@ -592,23 +592,23 @@ def is_file(self: PathType) -> bool:
except (ClientError, FileNotFoundError):
return False
- def iterdir(self: PathType) -> Generator[PathType, None, None]:
+ def iterdir(self: "Pathy") -> Generator["Pathy", None, None]:
"""Iterate over the blobs found in the given bucket or blob prefix path."""
self._absolute_path_validation()
yield from super().iterdir() # type:ignore
- def glob(self: PathType, pattern: str) -> Generator[PathType, None, None]:
+ def glob(self: "Pathy", pattern: str) -> Generator["Pathy", None, None]:
"""Perform a glob match relative to this Pathy instance, yielding all matched
blobs."""
yield from super().glob(pattern) # type:ignore
- def rglob(self: PathType, pattern: str) -> Generator[PathType, None, None]:
+ def rglob(self: "Pathy", pattern: str) -> Generator["Pathy", None, None]:
"""Perform a recursive glob match relative to this Pathy instance, yielding
all matched blobs. Imagine adding "**/" before a call to glob."""
yield from super().rglob(pattern) # type:ignore
def open(
- self: PathType,
+ self: "Pathy",
mode: str = "r",
buffering: int = DEFAULT_BUFFER_SIZE,
encoding: Optional[str] = None,
@@ -677,7 +677,7 @@ def rename(self: "Pathy", target: Union[str, PurePath]) -> None:
target._absolute_path_validation() # type:ignore
super().rename(target)
- def replace(self: PathType, target: Union[str, PurePath]) -> None:
+ def replace(self: "Pathy", target: Union[str, PurePath]) -> None:
"""Renames this path to the given target.
If target points to an existing path, it will be replaced."""
@@ -702,7 +702,7 @@ def samefile(self: "Pathy", other_path: Union[str, bytes, int, Path]) -> bool:
self.bucket == other_path.bucket and self.key == self.key and self.is_file()
)
- def touch(self: PathType, mode: int = 0o666, exist_ok: bool = True) -> None:
+ def touch(self: "Pathy", mode: int = 0o666, exist_ok: bool = True) -> None:
"""Create a blob at this path.
If the blob already exists, the function succeeds if exist_ok is true
@@ -739,61 +739,61 @@ def mkdir(
if not exist_ok:
raise
- def is_mount(self: PathType) -> bool:
+ def is_mount(self: "Pathy") -> bool:
return False
- def is_symlink(self: PathType) -> bool:
+ def is_symlink(self: "Pathy") -> bool:
return False
- def is_socket(self: PathType) -> bool:
+ def is_socket(self: "Pathy") -> bool:
return False
- def is_fifo(self: PathType) -> bool:
+ def is_fifo(self: "Pathy") -> bool:
return False
# Unsupported operations below here
@classmethod
- def cwd(cls) -> PathType:
+ def cwd(cls) -> "Pathy":
message = cls._NOT_SUPPORTED_MESSAGE.format(method=cls.cwd.__qualname__)
raise NotImplementedError(message)
@classmethod
- def home(cls) -> PathType:
+ def home(cls) -> "Pathy":
message = cls._NOT_SUPPORTED_MESSAGE.format(method=cls.home.__qualname__)
raise NotImplementedError(message)
- def chmod(self: PathType, mode: int) -> None:
+ def chmod(self: "Pathy", mode: int) -> None:
message = self._NOT_SUPPORTED_MESSAGE.format(method=self.chmod.__qualname__)
raise NotImplementedError(message)
- def expanduser(self: PathType) -> PathType:
+ def expanduser(self: "Pathy") -> "Pathy":
message = self._NOT_SUPPORTED_MESSAGE.format(
method=self.expanduser.__qualname__
)
raise NotImplementedError(message)
- def lchmod(self: PathType, mode: int) -> None:
+ def lchmod(self: "Pathy", mode: int) -> None:
message = self._NOT_SUPPORTED_MESSAGE.format(method=self.lchmod.__qualname__)
raise NotImplementedError(message)
- def group(self: PathType) -> str:
+ def group(self: "Pathy") -> str:
message = self._NOT_SUPPORTED_MESSAGE.format(method=self.group.__qualname__)
raise NotImplementedError(message)
- def is_block_device(self: PathType) -> bool:
+ def is_block_device(self: "Pathy") -> bool:
message = self._NOT_SUPPORTED_MESSAGE.format(
method=self.is_block_device.__qualname__
)
raise NotImplementedError(message)
- def is_char_device(self: PathType) -> bool:
+ def is_char_device(self: "Pathy") -> bool:
message = self._NOT_SUPPORTED_MESSAGE.format(
method=self.is_char_device.__qualname__
)
raise NotImplementedError(message)
- def lstat(self: PathType) -> os.stat_result:
+ def lstat(self: "Pathy") -> os.stat_result:
message = self._NOT_SUPPORTED_MESSAGE.format(method=self.lstat.__qualname__)
raise NotImplementedError(message)