From 1078e95bc4edc714f917b2c07a86cd07aff4e563 Mon Sep 17 00:00:00 2001 From: justindujardin Date: Fri, 23 Apr 2021 13:24:52 -0700 Subject: [PATCH 01/10] feat(clients): support for "s3://bucket/blobs" - install with `pip install pathy[s3]` extras - update tests to substitute the scheme where needed instead of hardcoded "gs" --- README.md | 2 +- pathy/__init__.py | 7 +- pathy/_tests/__init__.py | 8 ++ pathy/_tests/conftest.py | 24 +++- pathy/_tests/test_cli.py | 46 +++---- pathy/_tests/test_clients.py | 2 +- pathy/_tests/test_pathy.py | 189 +++++++++++++++------------ pathy/_tests/test_s3.py | 14 ++ pathy/s3.py | 246 +++++++++++++++++++++++++++++++++++ setup.py | 1 + 10 files changed, 429 insertions(+), 110 deletions(-) create mode 100644 pathy/_tests/test_s3.py create mode 100644 pathy/s3.py diff --git a/README.md b/README.md index 48fe30f..2bc0593 100644 --- a/README.md +++ b/README.md @@ -109,7 +109,7 @@ assert fluid_path.prefix == "foo.txt/" ## from_bucket classmethod ```python (doc) -Pathy.from_bucket(bucket_name: str) -> 'Pathy' +Pathy.from_bucket(bucket_name: str, scheme: str = 'gs') -> 'Pathy' ``` Initialize a Pathy from a bucket name. This helper adds a trailing slash and diff --git a/pathy/__init__.py b/pathy/__init__.py index 72f7562..fa107d9 100644 --- a/pathy/__init__.py +++ b/pathy/__init__.py @@ -366,7 +366,7 @@ def stat(self, path: "Pathy") -> BlobStat: blob: Optional[Blob] = bucket.get_blob(str(path.key)) if blob is None: raise FileNotFoundError(path) - return BlobStat(name=str(blob), size=blob.size, last_modified=blob.updated) + return BlobStat(name=str(blob.name), size=blob.size, last_modified=blob.updated) def is_dir(self, path: "Pathy") -> bool: if self.get_blob(path) is not None: @@ -516,7 +516,7 @@ def fluid(cls, path_candidate: Union[str, FluidPath]) -> FluidPath: return from_path @classmethod - def from_bucket(cls, bucket_name: str) -> "Pathy": + def from_bucket(cls, bucket_name: str, scheme:str = "gs") -> "Pathy": """Initialize a Pathy from a bucket name. This helper adds a trailing slash and the appropriate prefix. @@ -527,7 +527,7 @@ def from_bucket(cls, bucket_name: str) -> "Pathy": assert str(Pathy.from_bucket("two")) == "gs://two/" ``` """ - return Pathy(f"gs://{bucket_name}/") # type:ignore + return Pathy(f"{scheme}://{bucket_name}/") # type:ignore @classmethod def to_local(cls, blob_path: Union["Pathy", str], recurse: bool = True) -> Path: @@ -1159,6 +1159,7 @@ def scandir(self) -> Generator[BucketEntry, None, None]: # a Pathy object with a matching scheme _optional_clients: Dict[str, str] = { "gs": "pathy.gcs", + "s3": "pathy.s3", } BucketClientType = TypeVar("BucketClientType", bound=BucketClient) diff --git a/pathy/_tests/__init__.py b/pathy/_tests/__init__.py index 75f4351..a0050b5 100644 --- a/pathy/_tests/__init__.py +++ b/pathy/_tests/__init__.py @@ -5,3 +5,11 @@ has_gcs = bool(BucketClientGCS) except ImportError: has_gcs = False + +has_s3: bool +try: + from ..s3 import BucketClientS3 + + has_s3 = bool(BucketClientS3) +except ImportError: + has_s3 = False diff --git a/pathy/_tests/conftest.py b/pathy/_tests/conftest.py index fa57711..1dc6592 100644 --- a/pathy/_tests/conftest.py +++ b/pathy/_tests/conftest.py @@ -4,7 +4,7 @@ import sys import tempfile from pathlib import Path -from typing import Any, Generator, Optional +from typing import Any, Generator, Optional, Tuple import pytest @@ -15,7 +15,7 @@ has_credentials = "GCS_CREDENTIALS" in os.environ # Which adapters to use -TEST_ADAPTERS = ["gcs", "fs"] if has_credentials and has_gcs else ["fs"] +TEST_ADAPTERS = ["gcs", "s3", "fs"] if has_credentials and has_gcs else ["s3"] # A unique identifier used to allow each python version and OS to test # with separate bucket paths. This makes it possible to parallelize the # tests. @@ -82,6 +82,18 @@ def gcs_credentials_from_env() -> Optional[Any]: return credentials +def s3_credentials_from_env() -> Optional[Tuple[str, str]]: + """Extract an access key ID and Secret from the environment.""" + if not has_gcs: + return None + + access_key_id: Optional[str] = os.environ.get("PATHY_S3_ACCESS_ID", None) + access_secret: Optional[str] = os.environ.get("PATHY_S3_ACCESS_SECRET", None) + if access_key_id is None or access_secret is None: + return None + return (access_key_id, access_secret) + + @pytest.fixture() def with_adapter( adapter: str, bucket: str, other_bucket: str @@ -94,6 +106,14 @@ def with_adapter( credentials = gcs_credentials_from_env() if credentials is not None: set_client_params("gs", credentials=credentials) + elif adapter == "s3": + scheme = "s3" + # Use boto3 + use_fs(False) + credentials = s3_credentials_from_env() + if credentials is not None: + # key_id, key_secret = credentials + set_client_params("s3", credentials=credentials) elif adapter == "fs": # Use local file-system in a temp folder tmp_dir = tempfile.mkdtemp() diff --git a/pathy/_tests/test_cli.py b/pathy/_tests/test_cli.py index 8abc452..04467f2 100644 --- a/pathy/_tests/test_cli.py +++ b/pathy/_tests/test_cli.py @@ -17,16 +17,16 @@ @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_cp_invalid_from_path(with_adapter: str, bucket: str) -> None: - source = f"gs://{bucket}/{ENV_ID}/cli_cp_file_invalid/file.txt" - destination = f"gs://{bucket}/{ENV_ID}/cli_cp_file_invalid/dest.txt" + source = f"{with_adapter}://{bucket}/{ENV_ID}/cli_cp_file_invalid/file.txt" + destination = f"{with_adapter}://{bucket}/{ENV_ID}/cli_cp_file_invalid/dest.txt" assert runner.invoke(app, ["cp", source, destination]).exit_code == 1 assert not Pathy(destination).is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_cp_file(with_adapter: str, bucket: str) -> None: - source = f"gs://{bucket}/{ENV_ID}/cli_cp_file/file.txt" - destination = f"gs://{bucket}/{ENV_ID}/cli_cp_file/other.txt" + source = f"{with_adapter}://{bucket}/{ENV_ID}/cli_cp_file/file.txt" + destination = f"{with_adapter}://{bucket}/{ENV_ID}/cli_cp_file/other.txt" Pathy(source).write_text("---") assert runner.invoke(app, ["cp", source, destination]).exit_code == 0 assert Pathy(source).exists() @@ -37,7 +37,7 @@ def test_cli_cp_file(with_adapter: str, bucket: str) -> None: def test_cli_cp_file_name_from_source(with_adapter: str, bucket: str) -> None: source = pathlib.Path("./file.txt") source.touch() - destination = f"gs://{bucket}/{ENV_ID}/cli_cp_file/" + destination = f"{with_adapter}://{bucket}/{ENV_ID}/cli_cp_file/" assert runner.invoke(app, ["cp", str(source), destination]).exit_code == 0 assert Pathy(f"{destination}file.txt").is_file() source.unlink() @@ -45,7 +45,7 @@ def test_cli_cp_file_name_from_source(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_cp_folder(with_adapter: str, bucket: str) -> None: - root = Pathy.from_bucket(bucket) / ENV_ID + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") source = root / "cli_cp_folder" destination = root / "cli_cp_folder_other" for i in range(2): @@ -61,7 +61,7 @@ def test_cli_cp_folder(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_mv_folder(with_adapter: str, bucket: str) -> None: - root = Pathy.from_bucket(bucket) / ENV_ID + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") source = root / "cli_mv_folder" destination = root / "cli_mv_folder_other" for i in range(2): @@ -84,7 +84,7 @@ def test_cli_mv_folder(with_adapter: str, bucket: str) -> None: def test_cli_mv_file_copy_from_name(with_adapter: str, bucket: str) -> None: source = pathlib.Path("./file.txt") source.touch() - destination = f"gs://{bucket}/{ENV_ID}/cli_cp_file/" + destination = f"{with_adapter}://{bucket}/{ENV_ID}/cli_cp_file/" assert runner.invoke(app, ["mv", str(source), destination]).exit_code == 0 assert Pathy(f"{destination}file.txt").is_file() # unlink should happen from the operation @@ -93,8 +93,8 @@ def test_cli_mv_file_copy_from_name(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_mv_file(with_adapter: str, bucket: str) -> None: - source = f"gs://{bucket}/{ENV_ID}/cli_mv_file/file.txt" - destination = f"gs://{bucket}/{ENV_ID}/cli_mv_file/other.txt" + source = f"{with_adapter}://{bucket}/{ENV_ID}/cli_mv_file/file.txt" + destination = f"{with_adapter}://{bucket}/{ENV_ID}/cli_mv_file/other.txt" Pathy(source).write_text("---") assert Pathy(source).exists() assert runner.invoke(app, ["mv", source, destination]).exit_code == 0 @@ -106,8 +106,10 @@ def test_cli_mv_file(with_adapter: str, bucket: str) -> None: def test_cli_mv_file_across_buckets( with_adapter: str, bucket: str, other_bucket: str ) -> None: - source = f"gs://{bucket}/{ENV_ID}/cli_mv_file_across_buckets/file.txt" - destination = f"gs://{other_bucket}/{ENV_ID}/cli_mv_file_across_buckets/other.txt" + source = f"{with_adapter}://{bucket}/{ENV_ID}/cli_mv_file_across_buckets/file.txt" + destination = ( + f"{with_adapter}://{other_bucket}/{ENV_ID}/cli_mv_file_across_buckets/other.txt" + ) Pathy(source).write_text("---") assert Pathy(source).exists() assert runner.invoke(app, ["mv", source, destination]).exit_code == 0 @@ -119,9 +121,9 @@ def test_cli_mv_file_across_buckets( def test_cli_mv_folder_across_buckets( with_adapter: str, bucket: str, other_bucket: str ) -> None: - source = Pathy.from_bucket(bucket) / ENV_ID / "cli_mv_folder_across_buckets" - destination = ( - Pathy.from_bucket(other_bucket) / ENV_ID / "cli_mv_folder_across_buckets" + source = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/cli_mv_folder_across_buckets") + destination = Pathy( + f"{with_adapter}://{other_bucket}/{ENV_ID}/cli_mv_folder_across_buckets" ) for i in range(2): for j in range(2): @@ -141,7 +143,7 @@ def test_cli_mv_folder_across_buckets( @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_rm_invalid_file(with_adapter: str, bucket: str) -> None: - source = f"gs://{bucket}/{ENV_ID}/cli_rm_file_invalid/file.txt" + source = f"{with_adapter}://{bucket}/{ENV_ID}/cli_rm_file_invalid/file.txt" path = Pathy(source) assert not path.exists() assert runner.invoke(app, ["rm", source]).exit_code == 1 @@ -149,7 +151,7 @@ def test_cli_rm_invalid_file(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_rm_file(with_adapter: str, bucket: str) -> None: - source = f"gs://{bucket}/{ENV_ID}/cli_rm_file/file.txt" + source = f"{with_adapter}://{bucket}/{ENV_ID}/cli_rm_file/file.txt" path = Pathy(source) path.write_text("---") assert path.exists() @@ -159,7 +161,7 @@ def test_cli_rm_file(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_rm_verbose(with_adapter: str, bucket: str) -> None: - root = Pathy.from_bucket(bucket) / ENV_ID / "cli_rm_folder" + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") / "cli_rm_folder" source = str(root / "file.txt") other = str(root / "folder/other") Pathy(source).write_text("---") @@ -178,7 +180,7 @@ def test_cli_rm_verbose(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_rm_folder(with_adapter: str, bucket: str) -> None: - root = Pathy.from_bucket(bucket) / ENV_ID + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") source = root / "cli_rm_folder" for i in range(2): for j in range(2): @@ -196,7 +198,7 @@ def test_cli_rm_folder(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_ls_invalid_source(with_adapter: str, bucket: str) -> None: - root = Pathy.from_bucket(bucket) / ENV_ID / "cli_ls_invalid" + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") / "cli_ls_invalid" three = str(root / "folder/file.txt") result = runner.invoke(app, ["ls", str(three)]) @@ -206,7 +208,7 @@ def test_cli_ls_invalid_source(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_ls(with_adapter: str, bucket: str) -> None: - root = Pathy.from_bucket(bucket) / ENV_ID / "cli_ls" + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") / "cli_ls" one = str(root / "file.txt") two = str(root / "other.txt") three = str(root / "folder/file.txt") @@ -241,7 +243,7 @@ def test_cli_ls_local_files(with_adapter: str, bucket: str) -> None: assert blob_stat.size == 4 assert blob_stat.last_modified is not None - root = Pathy.from_bucket(bucket) / ENV_ID / "cli_ls" + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") / "cli_ls" one = str(root / "file.txt") two = str(root / "other.txt") three = str(root / "folder/file.txt") diff --git a/pathy/_tests/test_clients.py b/pathy/_tests/test_clients.py index 45f96de..8edcf69 100644 --- a/pathy/_tests/test_clients.py +++ b/pathy/_tests/test_clients.py @@ -73,7 +73,7 @@ def test_clients_use_fs(with_fs: Path) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_api_use_fs_cache(with_adapter: str, with_fs: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/directory/foo.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/directory/foo.txt") path.write_text("---") assert isinstance(path, Pathy) with pytest.raises(ValueError): diff --git a/pathy/_tests/test_pathy.py b/pathy/_tests/test_pathy.py index fd1bf07..88d033b 100644 --- a/pathy/_tests/test_pathy.py +++ b/pathy/_tests/test_pathy.py @@ -42,7 +42,7 @@ def test_pathy_fluid(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_path_to_local(with_adapter: str, bucket: str) -> None: - root: Pathy = Pathy.from_bucket(bucket) / ENV_ID / "to_local" + root: Pathy = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/to_local") foo_blob: Pathy = root / "foo" foo_blob.write_text("---") assert isinstance(foo_blob, Pathy) @@ -85,16 +85,16 @@ def test_pathy_stat(with_adapter: str, bucket: str) -> None: path = Pathy("fake-bucket-1234-0987/fake-key") with pytest.raises(ValueError): path.stat() - path = Pathy(f"gs://{bucket}/{ENV_ID}/stat/foo.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/stat/foo.txt") path.write_text("a-a-a-a-a-a-a") stat = path.stat() assert isinstance(stat, BlobStat) assert stat.size is not None and stat.size > 0 assert stat.last_modified is not None and stat.last_modified > 0 with pytest.raises(ValueError): - assert Pathy(f"gs://{bucket}").stat() + assert Pathy(f"{with_adapter}://{bucket}").stat() with pytest.raises(FileNotFoundError): - assert Pathy(f"gs://{bucket}/{ENV_ID}/stat/nonexistant_file.txt").stat() + assert Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/stat/nope.txt").stat() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) @@ -113,12 +113,12 @@ def test_pathy_exists(with_adapter: str, bucket: str) -> None: # invalid bucket name invalid_bucket = "unknown-bucket-name-123987519875419" - assert Pathy(f"gs://{invalid_bucket}").exists() is False - assert Pathy(f"gs://{invalid_bucket}/foo.blob").exists() is False + assert Pathy(f"{with_adapter}://{invalid_bucket}").exists() is False + assert Pathy(f"{with_adapter}://{invalid_bucket}/foo.blob").exists() is False # valid bucket with invalid object - assert Pathy(f"gs://{bucket}/not_found_lol_nice.txt").exists() is False + assert Pathy(f"{with_adapter}://{bucket}/not_found_lol_nice.txt").exists() is False - path = Pathy(f"gs://{bucket}/{ENV_ID}/directory/foo.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/directory/foo.txt") path.write_text("---") assert path.exists() for parent in path.parents: @@ -127,7 +127,7 @@ def test_pathy_exists(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_glob(with_adapter: str, bucket: str) -> None: - base = Pathy(f"gs://{bucket}/{ENV_ID}/glob/") + base = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") root = base / "glob/" for i in range(3): path = root / f"{i}.file" @@ -159,7 +159,7 @@ def test_pathy_glob(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_unlink_path(with_adapter: str, bucket: str) -> None: - root = Pathy(f"gs://{bucket}/{ENV_ID}/unlink/") + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/unlink/") path = root / "404.txt" with pytest.raises(FileNotFoundError): path.unlink() @@ -172,7 +172,7 @@ def test_pathy_unlink_path(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_is_dir(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/is_dir/subfolder/another/my.file") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/is_dir/subfolder/another/my.file") path.write_text("---") assert path.is_dir() is False for parent in path.parents: @@ -181,7 +181,9 @@ def test_pathy_is_dir(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_is_file(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/is_file/subfolder/another/my.file") + path = Pathy( + f"{with_adapter}://{bucket}/{ENV_ID}/is_file/subfolder/another/my.file" + ) path.write_text("---") # The full file is a file assert path.is_file() is True @@ -192,7 +194,7 @@ def test_pathy_is_file(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_iterdir(with_adapter: str, bucket: str) -> None: - root = Pathy(f"gs://{bucket}/{ENV_ID}/iterdir/") + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/iterdir/") # (n) files in a folder for i in range(2): path = root / f"{i}.file" @@ -212,16 +214,18 @@ def test_pathy_iterdir(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_iterdir_pipstore(with_adapter: str, bucket: str) -> None: - path = Pathy.from_bucket(bucket) / f"{ENV_ID}/iterdir_pipstore/prodigy/prodigy.whl" + path = Pathy( + f"{with_adapter}://{bucket}/{ENV_ID}/iterdir_pipstore/prodigy/prodigy.whl" + ) path.write_bytes(b"---") - path = Pathy.from_bucket(bucket) / f"{ENV_ID}/iterdir_pipstore" + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/iterdir_pipstore") res = [e.name for e in sorted(path.iterdir())] assert res == ["prodigy"] @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_open_errors(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/open_errors/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/open_errors/file.txt") # Invalid open mode with pytest.raises(ValueError): path.open(mode="t") @@ -237,7 +241,7 @@ def test_pathy_open_errors(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_open_for_read(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/read/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/read/file.txt") path.write_text("---") with path.open() as file_obj: assert file_obj.read() == "---" @@ -246,18 +250,18 @@ def test_pathy_open_for_read(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_open_for_write(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/write/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/write/file.txt") with path.open(mode="w") as file_obj: file_obj.write("---") file_obj.writelines(["---"]) - path = Pathy(f"gs://{bucket}/{ENV_ID}/write/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/write/file.txt") with path.open() as file_obj: assert file_obj.read() == "------" @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_open_binary_read(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/read_binary/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/read_binary/file.txt") path.write_bytes(b"---") with path.open(mode="rb") as file_obj: assert file_obj.readlines() == [b"---"] @@ -268,7 +272,7 @@ def test_pathy_open_binary_read(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_readwrite_text(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/write_text/file.txt") + path = Pathy(f"s3://{bucket}/{ENV_ID}/write_text/file.txt") path.write_text("---") with path.open() as file_obj: assert file_obj.read() == "---" @@ -277,14 +281,14 @@ def test_pathy_readwrite_text(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_readwrite_bytes(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/write_bytes/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/write_bytes/file.txt") path.write_bytes(b"---") assert path.read_bytes() == b"---" @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_readwrite_lines(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/write_text/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/write_text/file.txt") with path.open("w") as file_obj: file_obj.writelines(["---"]) with path.open("r") as file_obj: @@ -295,7 +299,7 @@ def test_pathy_readwrite_lines(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_ls_blobs_with_stat(with_adapter: str, bucket: str) -> None: - root = Pathy(f"gs://{bucket}/{ENV_ID}/ls") + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/ls") for i in range(3): (root / f"file_{i}").write_text("NICE") files = list(root.ls()) @@ -311,9 +315,9 @@ def test_pathy_ls_blobs_with_stat(with_adapter: str, bucket: str) -> None: def test_pathy_owner(with_adapter: str, bucket: str) -> None: # Raises for invalid file with pytest.raises(FileNotFoundError): - Pathy(f"gs://{bucket}/{ENV_ID}/write_text/not_a_valid_blob").owner() + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/write_text/not_a_valid_blob").owner() - path = Pathy(f"gs://{bucket}/{ENV_ID}/write_text/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/write_text/file.txt") path.write_text("---") # TODO: How to set file owner to non-None in GCS? Then assert here. # @@ -328,15 +332,15 @@ def test_pathy_owner(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_rename_files_in_bucket(with_adapter: str, bucket: str) -> None: # Rename a single file - from_blob = f"gs://{bucket}/{ENV_ID}/rename/file.txt" - to_blob = f"gs://{bucket}/{ENV_ID}/rename/other.txt" + from_blob = f"{with_adapter}://{bucket}/{ENV_ID}/rename/file.txt" + to_blob = f"{with_adapter}://{bucket}/{ENV_ID}/rename/other.txt" Pathy(from_blob).write_text("---") Pathy(from_blob).rename(to_blob) assert not Pathy(from_blob).exists() assert Pathy(to_blob).is_file() with pytest.raises(FileNotFoundError): - Pathy("gs://invlid_bkt_nme_18$%^@57582397/foo").rename(to_blob) + Pathy(f"{with_adapter}://invlid_bkt_nme_18$%^@57582397/foo").rename(to_blob) @pytest.mark.parametrize("adapter", TEST_ADAPTERS) @@ -344,26 +348,26 @@ def test_pathy_rename_files_across_buckets( with_adapter: str, bucket: str, other_bucket: str ) -> None: # Rename a single file across buckets - Pathy(f"gs://{bucket}/{ENV_ID}/rename/file.txt").write_text("---") - Pathy(f"gs://{bucket}/{ENV_ID}/rename/file.txt").rename( - f"gs://{other_bucket}/{ENV_ID}/rename/other.txt" + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/file.txt").write_text("---") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/file.txt").rename( + f"{with_adapter}://{other_bucket}/{ENV_ID}/rename/other.txt" ) - assert not Pathy(f"gs://{bucket}/{ENV_ID}/rename/file.txt").exists() - assert Pathy(f"gs://{other_bucket}/{ENV_ID}/rename/other.txt").is_file() + assert not Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/file.txt").exists() + assert Pathy(f"{with_adapter}://{other_bucket}/{ENV_ID}/rename/other.txt").is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_rename_folders_in_bucket(with_adapter: str, bucket: str) -> None: # Rename a folder in the same bucket - Pathy(f"gs://{bucket}/{ENV_ID}/rename/folder/one.txt").write_text("---") - Pathy(f"gs://{bucket}/{ENV_ID}/rename/folder/two.txt").write_text("---") - path = Pathy(f"gs://{bucket}/{ENV_ID}/rename/folder/") - new_path = Pathy(f"gs://{bucket}/{ENV_ID}/rename/other/") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/folder/one.txt").write_text("---") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/folder/two.txt").write_text("---") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/folder/") + new_path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/other/") path.rename(new_path) assert not path.exists() assert new_path.exists() - assert Pathy(f"gs://{bucket}/{ENV_ID}/rename/other/one.txt").is_file() - assert Pathy(f"gs://{bucket}/{ENV_ID}/rename/other/two.txt").is_file() + assert Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/other/one.txt").is_file() + assert Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/other/two.txt").is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) @@ -371,26 +375,30 @@ def test_pathy_rename_folders_across_buckets( with_adapter: str, bucket: str, other_bucket: str ) -> None: # Rename a folder across buckets - Pathy(f"gs://{bucket}/{ENV_ID}/rename/folder/one.txt").write_text("---") - Pathy(f"gs://{bucket}/{ENV_ID}/rename/folder/two.txt").write_text("---") - path = Pathy(f"gs://{bucket}/{ENV_ID}/rename/folder/") - new_path = Pathy(f"gs://{other_bucket}/{ENV_ID}/rename/other/") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/folder/one.txt").write_text("---") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/folder/two.txt").write_text("---") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/folder/") + new_path = Pathy(f"{with_adapter}://{other_bucket}/{ENV_ID}/rename/other/") path.rename(new_path) assert not path.exists() assert new_path.exists() - assert Pathy(f"gs://{other_bucket}/{ENV_ID}/rename/other/one.txt").is_file() - assert Pathy(f"gs://{other_bucket}/{ENV_ID}/rename/other/two.txt").is_file() + assert Pathy( + f"{with_adapter}://{other_bucket}/{ENV_ID}/rename/other/one.txt" + ).is_file() + assert Pathy( + f"{with_adapter}://{other_bucket}/{ENV_ID}/rename/other/two.txt" + ).is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_replace_files_in_bucket(with_adapter: str, bucket: str) -> None: # replace a single file - Pathy(f"gs://{bucket}/{ENV_ID}/replace/file.txt").write_text("---") - Pathy(f"gs://{bucket}/{ENV_ID}/replace/file.txt").replace( - f"gs://{bucket}/{ENV_ID}/replace/other.txt" + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/file.txt").write_text("---") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/file.txt").replace( + f"{with_adapter}://{bucket}/{ENV_ID}/replace/other.txt" ) - assert not Pathy(f"gs://{bucket}/{ENV_ID}/replace/file.txt").exists() - assert Pathy(f"gs://{bucket}/{ENV_ID}/replace/other.txt").is_file() + assert not Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/file.txt").exists() + assert Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/other.txt").is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) @@ -398,26 +406,32 @@ def test_pathy_replace_files_across_buckets( with_adapter: str, bucket: str, other_bucket: str ) -> None: # Rename a single file across buckets - Pathy(f"gs://{bucket}/{ENV_ID}/replace/file.txt").write_text("---") - Pathy(f"gs://{bucket}/{ENV_ID}/replace/file.txt").replace( - f"gs://{other_bucket}/{ENV_ID}/replace/other.txt" + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/file.txt").write_text("---") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/file.txt").replace( + f"{with_adapter}://{other_bucket}/{ENV_ID}/replace/other.txt" ) - assert not Pathy(f"gs://{bucket}/{ENV_ID}/replace/file.txt").exists() - assert Pathy(f"gs://{other_bucket}/{ENV_ID}/replace/other.txt").is_file() + assert not Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/file.txt").exists() + assert Pathy( + f"{with_adapter}://{other_bucket}/{ENV_ID}/replace/other.txt" + ).is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_replace_folders_in_bucket(with_adapter: str, bucket: str) -> None: # Rename a folder in the same bucket - Pathy(f"gs://{bucket}/{ENV_ID}/replace/folder/one.txt").write_text("---") - Pathy(f"gs://{bucket}/{ENV_ID}/replace/folder/two.txt").write_text("---") - path = Pathy(f"gs://{bucket}/{ENV_ID}/replace/folder/") - new_path = Pathy(f"gs://{bucket}/{ENV_ID}/replace/other/") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/folder/one.txt").write_text( + "---" + ) + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/folder/two.txt").write_text( + "---" + ) + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/folder/") + new_path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/other/") path.replace(new_path) assert not path.exists() assert new_path.exists() - assert Pathy(f"gs://{bucket}/{ENV_ID}/replace/other/one.txt").is_file() - assert Pathy(f"gs://{bucket}/{ENV_ID}/replace/other/two.txt").is_file() + assert Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/other/one.txt").is_file() + assert Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/other/two.txt").is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) @@ -425,31 +439,41 @@ def test_pathy_replace_folders_across_buckets( with_adapter: str, bucket: str, other_bucket: str ) -> None: # Rename a folder across buckets - Pathy(f"gs://{bucket}/{ENV_ID}/replace/folder/one.txt").write_text("---") - Pathy(f"gs://{bucket}/{ENV_ID}/replace/folder/two.txt").write_text("---") - path = Pathy(f"gs://{bucket}/{ENV_ID}/replace/folder/") - new_path = Pathy(f"gs://{other_bucket}/{ENV_ID}/replace/other/") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/folder/one.txt").write_text( + "---" + ) + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/folder/two.txt").write_text( + "---" + ) + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/folder/") + new_path = Pathy(f"{with_adapter}://{other_bucket}/{ENV_ID}/replace/other/") path.replace(new_path) assert not path.exists() assert new_path.exists() - assert Pathy(f"gs://{other_bucket}/{ENV_ID}/replace/other/one.txt").is_file() - assert Pathy(f"gs://{other_bucket}/{ENV_ID}/replace/other/two.txt").is_file() + assert Pathy( + f"{with_adapter}://{other_bucket}/{ENV_ID}/replace/other/one.txt" + ).is_file() + assert Pathy( + f"{with_adapter}://{other_bucket}/{ENV_ID}/replace/other/two.txt" + ).is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_rmdir(with_adapter: str, bucket: str) -> None: - blob = Pathy(f"gs://{bucket}/{ENV_ID}/rmdir/one.txt") + blob = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rmdir/one.txt") blob.write_text("---") # Cannot rmdir a blob with pytest.raises(NotADirectoryError): blob.rmdir() - Pathy(f"gs://{bucket}/{ENV_ID}/rmdir/folder/two.txt").write_text("---") - path = Pathy(f"gs://{bucket}/{ENV_ID}/rmdir/") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rmdir/folder/two.txt").write_text("---") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rmdir/") path.rmdir() - assert not Pathy(f"gs://{bucket}/{ENV_ID}/rmdir/one.txt").is_file() - assert not Pathy(f"gs://{bucket}/{ENV_ID}/rmdir/other/two.txt").is_file() + assert not Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rmdir/one.txt").is_file() + assert not Pathy( + f"{with_adapter}://{bucket}/{ENV_ID}/rmdir/other/two.txt" + ).is_file() assert not path.exists() # Cannot rmdir an invalid folder @@ -459,9 +483,9 @@ def test_pathy_rmdir(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_samefile(with_adapter: str, bucket: str) -> None: - blob_str = f"gs://{bucket}/{ENV_ID}/samefile/one.txt" + blob_str = f"{with_adapter}://{bucket}/{ENV_ID}/samefile/one.txt" blob_one = Pathy(blob_str) - blob_two = Pathy(f"gs://{bucket}/{ENV_ID}/samefile/two.txt") + blob_two = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/samefile/two.txt") blob_one.touch() blob_two.touch() assert blob_one.samefile(blob_two) is False @@ -474,7 +498,7 @@ def test_pathy_samefile(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_touch(with_adapter: str, bucket: str) -> None: - blob = Pathy(f"gs://{bucket}/{ENV_ID}/touch/one.txt") + blob = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/touch/one.txt") if blob.is_file(): blob.unlink() # The blob doesn't exist @@ -494,10 +518,13 @@ def test_pathy_touch(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_rglob_unlink(with_adapter: str, bucket: str) -> None: - files = [f"gs://{bucket}/{ENV_ID}/rglob_and_unlink/{i}.file.txt" for i in range(3)] + files = [ + f"{with_adapter}://{bucket}/{ENV_ID}/rglob_and_unlink/{i}.file.txt" + for i in range(3) + ] for file in files: Pathy(file).write_text("---") - path = Pathy(f"gs://{bucket}/{ENV_ID}/rglob_and_unlink/") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rglob_and_unlink/") for blob in path.rglob("*"): blob.unlink() # All the files are gone @@ -508,10 +535,10 @@ def test_pathy_rglob_unlink(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) -def test_pathy_mkdir(with_adapter: str, bucket: str) -> None: +def test_pathy_mkdir(with_adapter: str) -> None: bucket_name = f"pathy-e2e-test-{uuid4().hex}" # Create a bucket - path = Pathy(f"gs://{bucket_name}/") + path = Pathy(f"{with_adapter}://{bucket_name}/") path.mkdir() assert path.exists() # Does not assert if it already exists @@ -528,7 +555,7 @@ def test_pathy_ignore_extension(with_adapter: str, bucket: str) -> None: """The smart_open library does automatic decompression based on the filename. We disable that to avoid errors, e.g. if you have a .tar.gz file that isn't gzipped.""" - not_targz = Pathy.from_bucket(bucket) / ENV_ID / "ignore_ext/one.tar.gz" + not_targz = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/ignore_ext/one.tar.gz") fixture_tar = Path(__file__).parent / "fixtures" / "tar_but_not_gzipped.tar.gz" not_targz.write_bytes(fixture_tar.read_bytes()) again = not_targz.read_bytes() diff --git a/pathy/_tests/test_s3.py b/pathy/_tests/test_s3.py new file mode 100644 index 0000000..5b56583 --- /dev/null +++ b/pathy/_tests/test_s3.py @@ -0,0 +1,14 @@ +import pytest + +from pathy import get_client + + +from . import has_s3 + +S3_ADAPTER = ["s3"] + + +@pytest.mark.skipif(has_s3, reason="requires s3 deps to NOT be installed") +def test_s3_import_error_missing_deps() -> None: + with pytest.raises(ImportError): + get_client("s3") diff --git a/pathy/s3.py b/pathy/s3.py new file mode 100644 index 0000000..234c5cb --- /dev/null +++ b/pathy/s3.py @@ -0,0 +1,246 @@ +from dataclasses import dataclass +from typing import Any, Dict, Generator, List, Optional + + +try: + import boto3 # type:ignore + from botocore.exceptions import ParamValidationError + from botocore.client import ClientError + + S3NativeClient = Any + S3NativeBucket = Any +except (ImportError, ModuleNotFoundError): + raise ImportError( + """You are using the S3 functionality of Pathy without + having the required dependencies installed. + + Please try installing them: + + pip install pathy[s3] + + """ + ) + +from . import ( + Blob, + Bucket, + BucketClient, + BucketEntry, + PathyScanDir, + PurePathy, + register_client, +) + + +class BucketEntryS3(BucketEntry): + bucket: "BucketS3" + raw: Any + + +@dataclass +class BlobS3(Blob): + client: S3NativeClient + bucket: "BucketS3" + + def delete(self) -> None: + self.client.delete_object(Bucket=self.bucket.name, Key=self.name) + + def exists(self) -> bool: + response = self.client.list_objects_v2( + Bucket=self.bucket.name, Prefix=self.name + ) + for obj in response.get("Contents", []): + if obj["Key"] == self.name: + return True + return False + + +@dataclass +class BucketS3(Bucket): + name: str + client: S3NativeClient + bucket: S3NativeBucket + + def get_blob(self, blob_name: str) -> Optional[BlobS3]: + blob_stat: Dict[str, Any] + try: + blob_stat = self.client.head_object(Bucket=self.name, Key=blob_name) + except ClientError: + return None + updated = blob_stat["LastModified"].timestamp() + size = blob_stat["ContentLength"] + return BlobS3( + client=self.client, + bucket=self, + owner=None, # type:ignore + name=blob_name, # type:ignore + raw=None, + size=size, + updated=int(updated), # type:ignore + ) + + def copy_blob( # type:ignore[override] + self, blob: BlobS3, target: "BucketS3", name: str + ) -> Optional[BlobS3]: + source = {"Bucket": blob.bucket.name, "Key": blob.name} + self.client.copy(source, target.name, name) + pathy_blob: Optional[BlobS3] = self.get_blob(name) + assert pathy_blob is not None, "copy failed" + assert pathy_blob.updated is not None, "new blobl has invalid updated time" + return BlobS3( + client=self.client, + bucket=self.bucket, + owner=None, + name=name, + raw=pathy_blob, + size=pathy_blob.size, + updated=pathy_blob.updated, + ) + + def delete_blob(self, blob: BlobS3) -> None: # type:ignore[override] + self.client.delete_object(Bucket=self.name, Key=blob.name) + + def delete_blobs(self, blobs: List[BlobS3]) -> None: # type:ignore[override] + for blob in blobs: + self.delete_blob(blob) + + def exists(self) -> bool: + # TODO: ensure this always holds. + # + # S3 buckets don't make it this far if they don't exist. The BucketS3 instance + # is not instantiated unless a metadata check on the bucket passes. + return True + + +class BucketClientS3(BucketClient): + client: S3NativeClient + + @property + def client_params(self) -> Any: + return dict(client=self.client) + + def __init__(self, **kwargs: Any) -> None: + self.recreate(**kwargs) + + def recreate(self, **kwargs: Any) -> None: + creds = kwargs["credentials"] if "credentials" in kwargs else None + if creds is not None: + kwargs["project"] = creds.project_id + self.client = boto3.client("s3") # type:ignore + + def make_uri(self, path: PurePathy) -> str: + return str(path) + + def create_bucket( # type:ignore[override] + self, path: PurePathy + ) -> S3NativeBucket: + return self.client.create_bucket(Bucket=path.root) # type:ignore + + def delete_bucket(self, path: PurePathy) -> None: + self.client.delete_bucket(Bucket=path.root) + + def exists(self, path: PurePathy) -> bool: + # Because we want all the parents of a valid blob (e.g. "directory" in + # "directory/foo.file") to return True, we enumerate the blobs with a prefix + # and compare the object names to see if they match a substring of the path + key_name = str(path.key) + for obj in self.list_blobs(path): + if obj.name.startswith(key_name + path._flavour.sep): # type:ignore + return True + return False + + def lookup_bucket(self, path: PurePathy) -> Optional[BucketS3]: + try: + return self.get_bucket(path) + except FileNotFoundError: + return None + + def get_bucket(self, path: PurePathy) -> BucketS3: + try: + native_bucket = self.client.head_bucket(Bucket=path.root) + return BucketS3(str(path.root), client=self.client, bucket=native_bucket) + except (ClientError, ParamValidationError): + raise FileNotFoundError(f"Bucket {path.root} does not exist!") + + def list_buckets( # type:ignore[override] + self, **kwargs: Dict[str, Any] + ) -> Generator[S3NativeBucket, None, None]: + return self.client.list_buckets(**kwargs) # type:ignore + + def scandir( # type:ignore[override] + self, + path: Optional[PurePathy] = None, + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + ) -> PathyScanDir: + return ScanDirS3(client=self, path=path, prefix=prefix, delimiter=delimiter) + + def list_blobs( + self, + path: PurePathy, + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + ) -> Generator[BlobS3, None, None]: + bucket = self.lookup_bucket(path) + if bucket is None: + return + paginator = self.client.get_paginator("list_objects_v2") + kwargs = {"Bucket": bucket.name} + if prefix is not None: + kwargs["Prefix"] = prefix + for page in paginator.paginate(**kwargs): + for item in page.get("Contents", []): + yield BlobS3( + client=self.client, + bucket=bucket, + owner=None, + name=item["Key"], + raw=item, + size=item["Size"], + updated=int(item["LastModified"].timestamp()), + ) + + +class ScanDirS3(PathyScanDir): + _client: BucketClientS3 + + def scandir(self) -> Generator[BucketEntryS3, None, None]: + if self._path is None or not self._path.root: + gcs_bucket: S3NativeBucket + for gcs_bucket in self._client.list_buckets(): + yield BucketEntryS3( + gcs_bucket.name, is_dir=True, raw=None # type:ignore + ) + return + sep = self._path._flavour.sep # type:ignore + bucket = self._client.lookup_bucket(self._path) + if bucket is None: + return + + kwargs: Any = {"Bucket": bucket.name, "Delimiter": sep} + if self._prefix is not None: + kwargs["Prefix"] = self._prefix + continuation_token: Optional[str] = None + while True: + if continuation_token: + kwargs["ContinuationToken"] = continuation_token + response = self._client.client.list_objects_v2(**kwargs) + for folder in response.get("CommonPrefixes", []): + prefix = folder["Prefix"] + full_name = prefix[:-1] if prefix.endswith(sep) else prefix + name = full_name.split(sep)[-1] + yield BucketEntryS3(name, is_dir=True) + for file in response.get("Contents", ()): + name = file["Key"].split(sep)[-1] + yield BucketEntryS3( + name=name, + is_dir=False, + size=file["Size"], + last_modified=int(file["LastModified"].timestamp()), + ) + if not response.get("IsTruncated"): + break + continuation_token = response.get("NextContinuationToken") + + +register_client("s3", BucketClientS3) diff --git a/setup.py b/setup.py index 1be2900..4fce1bb 100644 --- a/setup.py +++ b/setup.py @@ -20,6 +20,7 @@ def setup_package(): long_description = f.read() extras = { "gcs": ["google-cloud-storage>=1.26.0,<2.0.0"], + "s3": ["boto3"], "test": ["pytest", "pytest-coverage", "mock", "typer-cli"], } extras["all"] = [item for group in extras.values() for item in group] From 6641251e905d691c30666723a5d2f318fc6037f9 Mon Sep 17 00:00:00 2001 From: justindujardin Date: Fri, 23 Apr 2021 13:32:38 -0700 Subject: [PATCH 02/10] chore: cleanup from review --- pathy/__init__.py | 2 +- pathy/_tests/test_gcs.py | 1 - pathy/_tests/test_s3.py | 1 - pathy/s3.py | 15 +++++---------- 4 files changed, 6 insertions(+), 13 deletions(-) diff --git a/pathy/__init__.py b/pathy/__init__.py index fa107d9..9acbc06 100644 --- a/pathy/__init__.py +++ b/pathy/__init__.py @@ -516,7 +516,7 @@ def fluid(cls, path_candidate: Union[str, FluidPath]) -> FluidPath: return from_path @classmethod - def from_bucket(cls, bucket_name: str, scheme:str = "gs") -> "Pathy": + def from_bucket(cls, bucket_name: str, scheme: str = "gs") -> "Pathy": """Initialize a Pathy from a bucket name. This helper adds a trailing slash and the appropriate prefix. diff --git a/pathy/_tests/test_gcs.py b/pathy/_tests/test_gcs.py index 74d2ee5..1600b8c 100644 --- a/pathy/_tests/test_gcs.py +++ b/pathy/_tests/test_gcs.py @@ -4,7 +4,6 @@ from pathy import Pathy, get_client, set_client_params - from . import has_gcs GCS_ADAPTER = ["gcs"] diff --git a/pathy/_tests/test_s3.py b/pathy/_tests/test_s3.py index 5b56583..144a910 100644 --- a/pathy/_tests/test_s3.py +++ b/pathy/_tests/test_s3.py @@ -2,7 +2,6 @@ from pathy import get_client - from . import has_s3 S3_ADAPTER = ["s3"] diff --git a/pathy/s3.py b/pathy/s3.py index 234c5cb..d1b5f57 100644 --- a/pathy/s3.py +++ b/pathy/s3.py @@ -1,11 +1,10 @@ from dataclasses import dataclass from typing import Any, Dict, Generator, List, Optional - try: import boto3 # type:ignore - from botocore.exceptions import ParamValidationError from botocore.client import ClientError + from botocore.exceptions import ParamValidationError S3NativeClient = Any S3NativeBucket = Any @@ -49,10 +48,9 @@ def exists(self) -> bool: response = self.client.list_objects_v2( Bucket=self.bucket.name, Prefix=self.name ) - for obj in response.get("Contents", []): - if obj["Key"] == self.name: - return True - return False + objects = response.get("Contents", []) + matched = [o["Key"] for o in objects if o["Key"] == self.name] + return len(matched) > 0 @dataclass @@ -105,7 +103,7 @@ def delete_blobs(self, blobs: List[BlobS3]) -> None: # type:ignore[override] self.delete_blob(blob) def exists(self) -> bool: - # TODO: ensure this always holds. + # TODO: are you sure this always holds? # # S3 buckets don't make it this far if they don't exist. The BucketS3 instance # is not instantiated unless a metadata check on the bucket passes. @@ -123,9 +121,6 @@ def __init__(self, **kwargs: Any) -> None: self.recreate(**kwargs) def recreate(self, **kwargs: Any) -> None: - creds = kwargs["credentials"] if "credentials" in kwargs else None - if creds is not None: - kwargs["project"] = creds.project_id self.client = boto3.client("s3") # type:ignore def make_uri(self, path: PurePathy) -> str: From 2d45cd4addd9627b0d03e346430e77ea98085dab Mon Sep 17 00:00:00 2001 From: justindujardin Date: Fri, 23 Apr 2021 14:11:22 -0700 Subject: [PATCH 03/10] test(s3): exercise list_buckets and pagination codepaths --- pathy/_tests/test_s3.py | 57 +++++++++++++++++++++++++++++++++++++++-- pathy/s3.py | 25 +++++++++++++----- 2 files changed, 74 insertions(+), 8 deletions(-) diff --git a/pathy/_tests/test_s3.py b/pathy/_tests/test_s3.py index 144a910..0273407 100644 --- a/pathy/_tests/test_s3.py +++ b/pathy/_tests/test_s3.py @@ -1,8 +1,8 @@ import pytest - -from pathy import get_client +from pathy import Pathy, get_client from . import has_s3 +from .conftest import ENV_ID S3_ADAPTER = ["s3"] @@ -11,3 +11,56 @@ def test_s3_import_error_missing_deps() -> None: with pytest.raises(ImportError): get_client("s3") + + +@pytest.mark.parametrize("adapter", S3_ADAPTER) +@pytest.mark.skipif(not has_s3, reason="requires s3") +def test_s3_scandir_list_buckets( + with_adapter: str, bucket: str, other_bucket: str +) -> None: + from pathy.s3 import ScanDirS3 + + root = Pathy("s3://foo/bar") + client = root._accessor.client(root) # type:ignore + scandir = ScanDirS3(client=client, path=Pathy()) + buckets = [s.name for s in scandir] + assert bucket in buckets + assert other_bucket in buckets + + +@pytest.mark.parametrize("adapter", S3_ADAPTER) +@pytest.mark.skipif(not has_s3, reason="requires s3") +def test_s3_scandir_scandir_continuation_token( + with_adapter: str, bucket: str, other_bucket: str +) -> None: + from pathy.s3 import ScanDirS3 + + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/s3_scandir_pagination/") + for i in range(8): + (root / f"file{i}.blob").touch() + client = root._accessor.client(root) # type:ignore + scandir = ScanDirS3(client=client, path=root, prefix=root.prefix, page_size=4) + blobs = [s.name for s in scandir] + assert len(blobs) == 8 + + +@pytest.mark.parametrize("adapter", S3_ADAPTER) +@pytest.mark.skipif(not has_s3, reason="requires s3") +def test_s3_scandir_invalid_bucket_name(with_adapter: str) -> None: + from pathy.s3 import ScanDirS3 + + root = Pathy(f"{with_adapter}://invalid_h3gE_ds5daEf_Sdf15487t2n4/bar") + client = root._accessor.client(root) # type:ignore + scandir = ScanDirS3(client=client, path=root) + assert len(list(scandir)) == 0 + + +@pytest.mark.parametrize("adapter", S3_ADAPTER) +@pytest.mark.skipif(not has_s3, reason="requires s3") +def test_s3_bucket_client_list_blobs(with_adapter: str, bucket: str) -> None: + """Test corner-case in S3 client that isn't easily reachable from Pathy""" + from pathy.s3 import BucketClientS3 + + client: BucketClientS3 = get_client("s3") + root = Pathy("s3://invalid_h3gE_ds5daEf_Sdf15487t2n4") + assert len(list(client.list_blobs(root))) == 0 diff --git a/pathy/s3.py b/pathy/s3.py index d1b5f57..4e6dc13 100644 --- a/pathy/s3.py +++ b/pathy/s3.py @@ -160,7 +160,9 @@ def get_bucket(self, path: PurePathy) -> BucketS3: def list_buckets( # type:ignore[override] self, **kwargs: Dict[str, Any] ) -> Generator[S3NativeBucket, None, None]: - return self.client.list_buckets(**kwargs) # type:ignore + native_buckets = self.client.list_buckets(**kwargs)["Buckets"] + results = (BucketS3(n["Name"], self.client, n) for n in native_buckets) + return results def scandir( # type:ignore[override] self, @@ -199,13 +201,22 @@ def list_blobs( class ScanDirS3(PathyScanDir): _client: BucketClientS3 + def __init__( + self, + client: BucketClient, + path: Optional[PurePathy] = None, + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + page_size: Optional[int] = None, + ) -> None: + super().__init__(client=client, path=path, prefix=prefix, delimiter=delimiter) + self._page_size = page_size + def scandir(self) -> Generator[BucketEntryS3, None, None]: if self._path is None or not self._path.root: - gcs_bucket: S3NativeBucket - for gcs_bucket in self._client.list_buckets(): - yield BucketEntryS3( - gcs_bucket.name, is_dir=True, raw=None # type:ignore - ) + s3_bucket: BucketS3 + for s3_bucket in self._client.list_buckets(): + yield BucketEntryS3(s3_bucket.name, is_dir=True, raw=None) return sep = self._path._flavour.sep # type:ignore bucket = self._client.lookup_bucket(self._path) @@ -215,6 +226,8 @@ def scandir(self) -> Generator[BucketEntryS3, None, None]: kwargs: Any = {"Bucket": bucket.name, "Delimiter": sep} if self._prefix is not None: kwargs["Prefix"] = self._prefix + if self._page_size is not None: + kwargs["MaxKeys"] = self._page_size continuation_token: Optional[str] = None while True: if continuation_token: From 2d5a66f7fb871f36156a004e10aad8fa935fb5c4 Mon Sep 17 00:00:00 2001 From: justindujardin Date: Fri, 23 Apr 2021 14:39:59 -0700 Subject: [PATCH 04/10] chore: fix passing credentials through the env --- pathy/_tests/conftest.py | 4 ++-- pathy/s3.py | 11 ++++++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/pathy/_tests/conftest.py b/pathy/_tests/conftest.py index 1dc6592..0d0a611 100644 --- a/pathy/_tests/conftest.py +++ b/pathy/_tests/conftest.py @@ -112,8 +112,8 @@ def with_adapter( use_fs(False) credentials = s3_credentials_from_env() if credentials is not None: - # key_id, key_secret = credentials - set_client_params("s3", credentials=credentials) + key_id, key_secret = credentials + set_client_params("s3", key_id=key_id, key_secret=key_secret) elif adapter == "fs": # Use local file-system in a temp folder tmp_dir = tempfile.mkdtemp() diff --git a/pathy/s3.py b/pathy/s3.py index 4e6dc13..534a83b 100644 --- a/pathy/s3.py +++ b/pathy/s3.py @@ -121,7 +121,16 @@ def __init__(self, **kwargs: Any) -> None: self.recreate(**kwargs) def recreate(self, **kwargs: Any) -> None: - self.client = boto3.client("s3") # type:ignore + key_id = kwargs.get("key_id", None) + key_secret = kwargs.get("key_secret", None) + if key_id is not None and key_secret is not None: + session = boto3.Session( + aws_access_key_id=key_id, + aws_secret_access_key=key_secret, + ) + else: + session = boto3 + self.client = session.client("s3") # type:ignore def make_uri(self, path: PurePathy) -> str: return str(path) From 3a1401d18feb89a784984f419965592aaa3e8aae Mon Sep 17 00:00:00 2001 From: justindujardin Date: Fri, 23 Apr 2021 14:53:06 -0700 Subject: [PATCH 05/10] chore: fix leftover dev change in conftest --- pathy/_tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pathy/_tests/conftest.py b/pathy/_tests/conftest.py index 0d0a611..36f501f 100644 --- a/pathy/_tests/conftest.py +++ b/pathy/_tests/conftest.py @@ -15,7 +15,7 @@ has_credentials = "GCS_CREDENTIALS" in os.environ # Which adapters to use -TEST_ADAPTERS = ["gcs", "s3", "fs"] if has_credentials and has_gcs else ["s3"] +TEST_ADAPTERS = ["gcs", "s3", "fs"] if has_credentials and has_gcs else ["fs"] # A unique identifier used to allow each python version and OS to test # with separate bucket paths. This makes it possible to parallelize the # tests. From 739ed0c5c4b210ab5b7d12d03b1419f0c11e513a Mon Sep 17 00:00:00 2001 From: justindujardin Date: Fri, 23 Apr 2021 15:03:01 -0700 Subject: [PATCH 06/10] chore: tighten up missing extras tests - specifically disable use_fs incase it was still set from another test --- pathy/_tests/test_gcs.py | 7 +++---- pathy/_tests/test_s3.py | 16 +++++++++------- pathy/s3.py | 3 ++- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/pathy/_tests/test_gcs.py b/pathy/_tests/test_gcs.py index 1600b8c..9581d3d 100644 --- a/pathy/_tests/test_gcs.py +++ b/pathy/_tests/test_gcs.py @@ -2,7 +2,7 @@ import pytest -from pathy import Pathy, get_client, set_client_params +from pathy import Pathy, get_client, set_client_params, use_fs from . import has_gcs @@ -38,6 +38,7 @@ def test_gcs_as_uri(with_adapter: str, bucket: str) -> None: @pytest.mark.skipif(has_gcs, reason="requires gcs deps to NOT be installed") def test_gcs_import_error_missing_deps() -> None: + use_fs(False) with pytest.raises(ImportError): get_client("gs") @@ -57,9 +58,7 @@ def test_gcs_scandir_list_buckets( @pytest.mark.parametrize("adapter", GCS_ADAPTER) @pytest.mark.skipif(not has_gcs, reason="requires gcs") -def test_gcs_scandir_invalid_bucket_name( - with_adapter: str, bucket: str, other_bucket: str -) -> None: +def test_gcs_scandir_invalid_bucket_name(with_adapter: str) -> None: from pathy.gcs import ScanDirGCS root = Pathy("gs://invalid_h3gE_ds5daEf_Sdf15487t2n4/bar") diff --git a/pathy/_tests/test_s3.py b/pathy/_tests/test_s3.py index 0273407..c57b95a 100644 --- a/pathy/_tests/test_s3.py +++ b/pathy/_tests/test_s3.py @@ -1,5 +1,6 @@ import pytest -from pathy import Pathy, get_client + +from pathy import Pathy, get_client, use_fs from . import has_s3 from .conftest import ENV_ID @@ -7,12 +8,6 @@ S3_ADAPTER = ["s3"] -@pytest.mark.skipif(has_s3, reason="requires s3 deps to NOT be installed") -def test_s3_import_error_missing_deps() -> None: - with pytest.raises(ImportError): - get_client("s3") - - @pytest.mark.parametrize("adapter", S3_ADAPTER) @pytest.mark.skipif(not has_s3, reason="requires s3") def test_s3_scandir_list_buckets( @@ -64,3 +59,10 @@ def test_s3_bucket_client_list_blobs(with_adapter: str, bucket: str) -> None: client: BucketClientS3 = get_client("s3") root = Pathy("s3://invalid_h3gE_ds5daEf_Sdf15487t2n4") assert len(list(client.list_blobs(root))) == 0 + + +@pytest.mark.skipif(has_s3, reason="requires s3 deps to NOT be installed") +def test_s3_import_error_missing_deps() -> None: + use_fs(False) + with pytest.raises(ImportError): + get_client("s3") diff --git a/pathy/s3.py b/pathy/s3.py index 534a83b..10496ec 100644 --- a/pathy/s3.py +++ b/pathy/s3.py @@ -123,8 +123,9 @@ def __init__(self, **kwargs: Any) -> None: def recreate(self, **kwargs: Any) -> None: key_id = kwargs.get("key_id", None) key_secret = kwargs.get("key_secret", None) + session: Any if key_id is not None and key_secret is not None: - session = boto3.Session( + session = boto3.Session( # type:ignore aws_access_key_id=key_id, aws_secret_access_key=key_secret, ) From 58c764ac9caafa7c0368082680f5691b48e6f0d7 Mon Sep 17 00:00:00 2001 From: justindujardin Date: Fri, 23 Apr 2021 15:23:22 -0700 Subject: [PATCH 07/10] chore: fix passing s3 creds to smart_open --- pathy/s3.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pathy/s3.py b/pathy/s3.py index 10496ec..aa05c7a 100644 --- a/pathy/s3.py +++ b/pathy/s3.py @@ -112,10 +112,11 @@ def exists(self) -> bool: class BucketClientS3(BucketClient): client: S3NativeClient + _session: Optional[Any] @property - def client_params(self) -> Any: - return dict(client=self.client) + def client_params(self) -> Dict[str, Any]: + return dict() if self._session is None else dict(session=self._session) def __init__(self, **kwargs: Any) -> None: self.recreate(**kwargs) @@ -123,15 +124,14 @@ def __init__(self, **kwargs: Any) -> None: def recreate(self, **kwargs: Any) -> None: key_id = kwargs.get("key_id", None) key_secret = kwargs.get("key_secret", None) - session: Any if key_id is not None and key_secret is not None: - session = boto3.Session( # type:ignore + self._session = boto3.Session( # type:ignore aws_access_key_id=key_id, aws_secret_access_key=key_secret, ) + self.client = self._session.client("s3") # type:ignore else: - session = boto3 - self.client = session.client("s3") # type:ignore + self.client = boto3.client("s3") # type:ignore def make_uri(self, path: PurePathy) -> str: return str(path) From 922ffca933e795b8bcb8dfe4621e3eb630c3038b Mon Sep 17 00:00:00 2001 From: justindujardin Date: Fri, 23 Apr 2021 15:29:38 -0700 Subject: [PATCH 08/10] chore: cleanup from review - add env vars for s3 --- .github/workflows/python-package.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 12114e6..b028623 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -29,6 +29,8 @@ jobs: - name: Test Wheel env: GCS_CREDENTIALS: ${{ secrets.GCS_CREDENTIALS }} + PATHY_S3_ACCESS_ID: ${{ secrets.PATHY_S3_ACCESS_ID }} + PATHY_S3_ACCESS_SECRET: ${{ secrets.PATHY_S3_ACCESS_SECRET }} run: rm -rf ./pathy/ && sh tools/test_package.sh - name: Report Code Coverage env: From ca2befca1489406e149cbbf595df8a337adc0f8c Mon Sep 17 00:00:00 2001 From: justindujardin Date: Fri, 23 Apr 2021 15:37:23 -0700 Subject: [PATCH 09/10] chore: cleanup from review --- pathy/s3.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pathy/s3.py b/pathy/s3.py index aa05c7a..1ab8e01 100644 --- a/pathy/s3.py +++ b/pathy/s3.py @@ -124,14 +124,13 @@ def __init__(self, **kwargs: Any) -> None: def recreate(self, **kwargs: Any) -> None: key_id = kwargs.get("key_id", None) key_secret = kwargs.get("key_secret", None) + boto_session: Any = boto3 if key_id is not None and key_secret is not None: - self._session = boto3.Session( # type:ignore + self._session = boto_session = boto3.Session( # type:ignore aws_access_key_id=key_id, aws_secret_access_key=key_secret, ) - self.client = self._session.client("s3") # type:ignore - else: - self.client = boto3.client("s3") # type:ignore + self.client = boto_session.client("s3") # type:ignore def make_uri(self, path: PurePathy) -> str: return str(path) From 6f139ce17b8087260c5c0b0ca02e4194b7cb2230 Mon Sep 17 00:00:00 2001 From: justindujardin Date: Fri, 23 Apr 2021 16:06:10 -0700 Subject: [PATCH 10/10] chore(readme): add cloud platform support table --- README.md | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 2bc0593..65d0590 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![Pypi version](https://badgen.net/pypi/v/pathy)](https://pypi.org/project/pathy/) [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/ambv/black) -Pathy is a python package (_with type annotations_) for working with Bucket storage providers. It provides a CLI app for basic file operations between local files and remote buckets. It enables a smooth developer experience by supporting local file-system backed buckets during development and testing. It makes converting bucket blobs into local files a snap with optional local file caching of blobs. +Pathy is a python package (_with type annotations_) for working with Cloud Bucket storage providers using a pathlib interface. It provides an easy-to-use API bundled with a CLI app for basic file operations between local files and remote buckets. It enables a smooth developer experience by letting developers work against the local file system during development and only switch over to live APIs for deployment. It also makes converting bucket blobs into local files a snap with optional local file caching. ## 🚀 Quickstart @@ -15,7 +15,7 @@ You can install `pathy` from pip: pip install pathy ``` -The package exports the `Pathy` class and utilities for configuring the bucket storage provider to use. By default Pathy prefers GoogleCloudStorage paths of the form `gs://bucket_name/folder/blob_name.txt`. Internally Pathy can convert GCS paths to local files, allowing for a nice developer experience. +The package exports the `Pathy` class and utilities for configuring the bucket storage provider to use. ```python from pathy import Pathy, use_fs @@ -37,6 +37,16 @@ greeting.unlink() assert not greeting.exists() ``` +## Supported Clouds + +The table below details the supported cloud provider APIs. + +| Cloud Service | Support | Install Extras | +| :------------------- | :-----: | :----------------------: | +| Google Cloud Storage | ✅ | `pip install pathy[gcs]` | +| Amazon S3 | ✅ | `pip install pathy[s3]` | +| Azure | ❌ | | + ## Semantic Versioning Before Pathy reaches v1.0 the project is not guaranteed to have a consistent API, which means that types and classes may move around or be removed. That said, we try to be predictable when it comes to breaking changes, so the project uses semantic versioning to help users avoid breakage.