From 5bb7e1b8faa422c8a80d91d7d8e71f6de4052962 Mon Sep 17 00:00:00 2001 From: Justin DuJardin Date: Fri, 23 Apr 2021 16:13:54 -0700 Subject: [PATCH] feat(clients): add support for S3 bucket storage (#54) * feat(clients): support for "s3://bucket/blobs" - install with `pip install pathy[s3]` extras - update tests to substitute the scheme where needed instead of hardcoded "gs" * chore: cleanup from review * test(s3): exercise list_buckets and pagination codepaths * chore: fix passing credentials through the env * chore: fix leftover dev change in conftest * chore: tighten up missing extras tests - specifically disable use_fs incase it was still set from another test * chore: fix passing s3 creds to smart_open * chore: cleanup from review - add env vars for s3 * chore: cleanup from review * chore(readme): add cloud platform support table --- .github/workflows/python-package.yml | 2 + README.md | 16 +- pathy/__init__.py | 7 +- pathy/_tests/__init__.py | 8 + pathy/_tests/conftest.py | 24 ++- pathy/_tests/test_cli.py | 46 ++--- pathy/_tests/test_clients.py | 2 +- pathy/_tests/test_gcs.py | 8 +- pathy/_tests/test_pathy.py | 189 ++++++++++--------- pathy/_tests/test_s3.py | 68 +++++++ pathy/s3.py | 263 +++++++++++++++++++++++++++ setup.py | 1 + 12 files changed, 517 insertions(+), 117 deletions(-) create mode 100644 pathy/_tests/test_s3.py create mode 100644 pathy/s3.py diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 12114e6..b028623 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -29,6 +29,8 @@ jobs: - name: Test Wheel env: GCS_CREDENTIALS: ${{ secrets.GCS_CREDENTIALS }} + PATHY_S3_ACCESS_ID: ${{ secrets.PATHY_S3_ACCESS_ID }} + PATHY_S3_ACCESS_SECRET: ${{ secrets.PATHY_S3_ACCESS_SECRET }} run: rm -rf ./pathy/ && sh tools/test_package.sh - name: Report Code Coverage env: diff --git a/README.md b/README.md index 48fe30f..65d0590 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![Pypi version](https://badgen.net/pypi/v/pathy)](https://pypi.org/project/pathy/) [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/ambv/black) -Pathy is a python package (_with type annotations_) for working with Bucket storage providers. It provides a CLI app for basic file operations between local files and remote buckets. It enables a smooth developer experience by supporting local file-system backed buckets during development and testing. It makes converting bucket blobs into local files a snap with optional local file caching of blobs. +Pathy is a python package (_with type annotations_) for working with Cloud Bucket storage providers using a pathlib interface. It provides an easy-to-use API bundled with a CLI app for basic file operations between local files and remote buckets. It enables a smooth developer experience by letting developers work against the local file system during development and only switch over to live APIs for deployment. It also makes converting bucket blobs into local files a snap with optional local file caching. ## 🚀 Quickstart @@ -15,7 +15,7 @@ You can install `pathy` from pip: pip install pathy ``` -The package exports the `Pathy` class and utilities for configuring the bucket storage provider to use. By default Pathy prefers GoogleCloudStorage paths of the form `gs://bucket_name/folder/blob_name.txt`. Internally Pathy can convert GCS paths to local files, allowing for a nice developer experience. +The package exports the `Pathy` class and utilities for configuring the bucket storage provider to use. ```python from pathy import Pathy, use_fs @@ -37,6 +37,16 @@ greeting.unlink() assert not greeting.exists() ``` +## Supported Clouds + +The table below details the supported cloud provider APIs. + +| Cloud Service | Support | Install Extras | +| :------------------- | :-----: | :----------------------: | +| Google Cloud Storage | ✅ | `pip install pathy[gcs]` | +| Amazon S3 | ✅ | `pip install pathy[s3]` | +| Azure | ❌ | | + ## Semantic Versioning Before Pathy reaches v1.0 the project is not guaranteed to have a consistent API, which means that types and classes may move around or be removed. That said, we try to be predictable when it comes to breaking changes, so the project uses semantic versioning to help users avoid breakage. @@ -109,7 +119,7 @@ assert fluid_path.prefix == "foo.txt/" ## from_bucket classmethod ```python (doc) -Pathy.from_bucket(bucket_name: str) -> 'Pathy' +Pathy.from_bucket(bucket_name: str, scheme: str = 'gs') -> 'Pathy' ``` Initialize a Pathy from a bucket name. This helper adds a trailing slash and diff --git a/pathy/__init__.py b/pathy/__init__.py index 72f7562..9acbc06 100644 --- a/pathy/__init__.py +++ b/pathy/__init__.py @@ -366,7 +366,7 @@ def stat(self, path: "Pathy") -> BlobStat: blob: Optional[Blob] = bucket.get_blob(str(path.key)) if blob is None: raise FileNotFoundError(path) - return BlobStat(name=str(blob), size=blob.size, last_modified=blob.updated) + return BlobStat(name=str(blob.name), size=blob.size, last_modified=blob.updated) def is_dir(self, path: "Pathy") -> bool: if self.get_blob(path) is not None: @@ -516,7 +516,7 @@ def fluid(cls, path_candidate: Union[str, FluidPath]) -> FluidPath: return from_path @classmethod - def from_bucket(cls, bucket_name: str) -> "Pathy": + def from_bucket(cls, bucket_name: str, scheme: str = "gs") -> "Pathy": """Initialize a Pathy from a bucket name. This helper adds a trailing slash and the appropriate prefix. @@ -527,7 +527,7 @@ def from_bucket(cls, bucket_name: str) -> "Pathy": assert str(Pathy.from_bucket("two")) == "gs://two/" ``` """ - return Pathy(f"gs://{bucket_name}/") # type:ignore + return Pathy(f"{scheme}://{bucket_name}/") # type:ignore @classmethod def to_local(cls, blob_path: Union["Pathy", str], recurse: bool = True) -> Path: @@ -1159,6 +1159,7 @@ def scandir(self) -> Generator[BucketEntry, None, None]: # a Pathy object with a matching scheme _optional_clients: Dict[str, str] = { "gs": "pathy.gcs", + "s3": "pathy.s3", } BucketClientType = TypeVar("BucketClientType", bound=BucketClient) diff --git a/pathy/_tests/__init__.py b/pathy/_tests/__init__.py index 75f4351..a0050b5 100644 --- a/pathy/_tests/__init__.py +++ b/pathy/_tests/__init__.py @@ -5,3 +5,11 @@ has_gcs = bool(BucketClientGCS) except ImportError: has_gcs = False + +has_s3: bool +try: + from ..s3 import BucketClientS3 + + has_s3 = bool(BucketClientS3) +except ImportError: + has_s3 = False diff --git a/pathy/_tests/conftest.py b/pathy/_tests/conftest.py index fa57711..36f501f 100644 --- a/pathy/_tests/conftest.py +++ b/pathy/_tests/conftest.py @@ -4,7 +4,7 @@ import sys import tempfile from pathlib import Path -from typing import Any, Generator, Optional +from typing import Any, Generator, Optional, Tuple import pytest @@ -15,7 +15,7 @@ has_credentials = "GCS_CREDENTIALS" in os.environ # Which adapters to use -TEST_ADAPTERS = ["gcs", "fs"] if has_credentials and has_gcs else ["fs"] +TEST_ADAPTERS = ["gcs", "s3", "fs"] if has_credentials and has_gcs else ["fs"] # A unique identifier used to allow each python version and OS to test # with separate bucket paths. This makes it possible to parallelize the # tests. @@ -82,6 +82,18 @@ def gcs_credentials_from_env() -> Optional[Any]: return credentials +def s3_credentials_from_env() -> Optional[Tuple[str, str]]: + """Extract an access key ID and Secret from the environment.""" + if not has_gcs: + return None + + access_key_id: Optional[str] = os.environ.get("PATHY_S3_ACCESS_ID", None) + access_secret: Optional[str] = os.environ.get("PATHY_S3_ACCESS_SECRET", None) + if access_key_id is None or access_secret is None: + return None + return (access_key_id, access_secret) + + @pytest.fixture() def with_adapter( adapter: str, bucket: str, other_bucket: str @@ -94,6 +106,14 @@ def with_adapter( credentials = gcs_credentials_from_env() if credentials is not None: set_client_params("gs", credentials=credentials) + elif adapter == "s3": + scheme = "s3" + # Use boto3 + use_fs(False) + credentials = s3_credentials_from_env() + if credentials is not None: + key_id, key_secret = credentials + set_client_params("s3", key_id=key_id, key_secret=key_secret) elif adapter == "fs": # Use local file-system in a temp folder tmp_dir = tempfile.mkdtemp() diff --git a/pathy/_tests/test_cli.py b/pathy/_tests/test_cli.py index 8abc452..04467f2 100644 --- a/pathy/_tests/test_cli.py +++ b/pathy/_tests/test_cli.py @@ -17,16 +17,16 @@ @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_cp_invalid_from_path(with_adapter: str, bucket: str) -> None: - source = f"gs://{bucket}/{ENV_ID}/cli_cp_file_invalid/file.txt" - destination = f"gs://{bucket}/{ENV_ID}/cli_cp_file_invalid/dest.txt" + source = f"{with_adapter}://{bucket}/{ENV_ID}/cli_cp_file_invalid/file.txt" + destination = f"{with_adapter}://{bucket}/{ENV_ID}/cli_cp_file_invalid/dest.txt" assert runner.invoke(app, ["cp", source, destination]).exit_code == 1 assert not Pathy(destination).is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_cp_file(with_adapter: str, bucket: str) -> None: - source = f"gs://{bucket}/{ENV_ID}/cli_cp_file/file.txt" - destination = f"gs://{bucket}/{ENV_ID}/cli_cp_file/other.txt" + source = f"{with_adapter}://{bucket}/{ENV_ID}/cli_cp_file/file.txt" + destination = f"{with_adapter}://{bucket}/{ENV_ID}/cli_cp_file/other.txt" Pathy(source).write_text("---") assert runner.invoke(app, ["cp", source, destination]).exit_code == 0 assert Pathy(source).exists() @@ -37,7 +37,7 @@ def test_cli_cp_file(with_adapter: str, bucket: str) -> None: def test_cli_cp_file_name_from_source(with_adapter: str, bucket: str) -> None: source = pathlib.Path("./file.txt") source.touch() - destination = f"gs://{bucket}/{ENV_ID}/cli_cp_file/" + destination = f"{with_adapter}://{bucket}/{ENV_ID}/cli_cp_file/" assert runner.invoke(app, ["cp", str(source), destination]).exit_code == 0 assert Pathy(f"{destination}file.txt").is_file() source.unlink() @@ -45,7 +45,7 @@ def test_cli_cp_file_name_from_source(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_cp_folder(with_adapter: str, bucket: str) -> None: - root = Pathy.from_bucket(bucket) / ENV_ID + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") source = root / "cli_cp_folder" destination = root / "cli_cp_folder_other" for i in range(2): @@ -61,7 +61,7 @@ def test_cli_cp_folder(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_mv_folder(with_adapter: str, bucket: str) -> None: - root = Pathy.from_bucket(bucket) / ENV_ID + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") source = root / "cli_mv_folder" destination = root / "cli_mv_folder_other" for i in range(2): @@ -84,7 +84,7 @@ def test_cli_mv_folder(with_adapter: str, bucket: str) -> None: def test_cli_mv_file_copy_from_name(with_adapter: str, bucket: str) -> None: source = pathlib.Path("./file.txt") source.touch() - destination = f"gs://{bucket}/{ENV_ID}/cli_cp_file/" + destination = f"{with_adapter}://{bucket}/{ENV_ID}/cli_cp_file/" assert runner.invoke(app, ["mv", str(source), destination]).exit_code == 0 assert Pathy(f"{destination}file.txt").is_file() # unlink should happen from the operation @@ -93,8 +93,8 @@ def test_cli_mv_file_copy_from_name(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_mv_file(with_adapter: str, bucket: str) -> None: - source = f"gs://{bucket}/{ENV_ID}/cli_mv_file/file.txt" - destination = f"gs://{bucket}/{ENV_ID}/cli_mv_file/other.txt" + source = f"{with_adapter}://{bucket}/{ENV_ID}/cli_mv_file/file.txt" + destination = f"{with_adapter}://{bucket}/{ENV_ID}/cli_mv_file/other.txt" Pathy(source).write_text("---") assert Pathy(source).exists() assert runner.invoke(app, ["mv", source, destination]).exit_code == 0 @@ -106,8 +106,10 @@ def test_cli_mv_file(with_adapter: str, bucket: str) -> None: def test_cli_mv_file_across_buckets( with_adapter: str, bucket: str, other_bucket: str ) -> None: - source = f"gs://{bucket}/{ENV_ID}/cli_mv_file_across_buckets/file.txt" - destination = f"gs://{other_bucket}/{ENV_ID}/cli_mv_file_across_buckets/other.txt" + source = f"{with_adapter}://{bucket}/{ENV_ID}/cli_mv_file_across_buckets/file.txt" + destination = ( + f"{with_adapter}://{other_bucket}/{ENV_ID}/cli_mv_file_across_buckets/other.txt" + ) Pathy(source).write_text("---") assert Pathy(source).exists() assert runner.invoke(app, ["mv", source, destination]).exit_code == 0 @@ -119,9 +121,9 @@ def test_cli_mv_file_across_buckets( def test_cli_mv_folder_across_buckets( with_adapter: str, bucket: str, other_bucket: str ) -> None: - source = Pathy.from_bucket(bucket) / ENV_ID / "cli_mv_folder_across_buckets" - destination = ( - Pathy.from_bucket(other_bucket) / ENV_ID / "cli_mv_folder_across_buckets" + source = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/cli_mv_folder_across_buckets") + destination = Pathy( + f"{with_adapter}://{other_bucket}/{ENV_ID}/cli_mv_folder_across_buckets" ) for i in range(2): for j in range(2): @@ -141,7 +143,7 @@ def test_cli_mv_folder_across_buckets( @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_rm_invalid_file(with_adapter: str, bucket: str) -> None: - source = f"gs://{bucket}/{ENV_ID}/cli_rm_file_invalid/file.txt" + source = f"{with_adapter}://{bucket}/{ENV_ID}/cli_rm_file_invalid/file.txt" path = Pathy(source) assert not path.exists() assert runner.invoke(app, ["rm", source]).exit_code == 1 @@ -149,7 +151,7 @@ def test_cli_rm_invalid_file(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_rm_file(with_adapter: str, bucket: str) -> None: - source = f"gs://{bucket}/{ENV_ID}/cli_rm_file/file.txt" + source = f"{with_adapter}://{bucket}/{ENV_ID}/cli_rm_file/file.txt" path = Pathy(source) path.write_text("---") assert path.exists() @@ -159,7 +161,7 @@ def test_cli_rm_file(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_rm_verbose(with_adapter: str, bucket: str) -> None: - root = Pathy.from_bucket(bucket) / ENV_ID / "cli_rm_folder" + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") / "cli_rm_folder" source = str(root / "file.txt") other = str(root / "folder/other") Pathy(source).write_text("---") @@ -178,7 +180,7 @@ def test_cli_rm_verbose(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_rm_folder(with_adapter: str, bucket: str) -> None: - root = Pathy.from_bucket(bucket) / ENV_ID + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") source = root / "cli_rm_folder" for i in range(2): for j in range(2): @@ -196,7 +198,7 @@ def test_cli_rm_folder(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_ls_invalid_source(with_adapter: str, bucket: str) -> None: - root = Pathy.from_bucket(bucket) / ENV_ID / "cli_ls_invalid" + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") / "cli_ls_invalid" three = str(root / "folder/file.txt") result = runner.invoke(app, ["ls", str(three)]) @@ -206,7 +208,7 @@ def test_cli_ls_invalid_source(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_cli_ls(with_adapter: str, bucket: str) -> None: - root = Pathy.from_bucket(bucket) / ENV_ID / "cli_ls" + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") / "cli_ls" one = str(root / "file.txt") two = str(root / "other.txt") three = str(root / "folder/file.txt") @@ -241,7 +243,7 @@ def test_cli_ls_local_files(with_adapter: str, bucket: str) -> None: assert blob_stat.size == 4 assert blob_stat.last_modified is not None - root = Pathy.from_bucket(bucket) / ENV_ID / "cli_ls" + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") / "cli_ls" one = str(root / "file.txt") two = str(root / "other.txt") three = str(root / "folder/file.txt") diff --git a/pathy/_tests/test_clients.py b/pathy/_tests/test_clients.py index 45f96de..8edcf69 100644 --- a/pathy/_tests/test_clients.py +++ b/pathy/_tests/test_clients.py @@ -73,7 +73,7 @@ def test_clients_use_fs(with_fs: Path) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_api_use_fs_cache(with_adapter: str, with_fs: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/directory/foo.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/directory/foo.txt") path.write_text("---") assert isinstance(path, Pathy) with pytest.raises(ValueError): diff --git a/pathy/_tests/test_gcs.py b/pathy/_tests/test_gcs.py index 74d2ee5..9581d3d 100644 --- a/pathy/_tests/test_gcs.py +++ b/pathy/_tests/test_gcs.py @@ -2,8 +2,7 @@ import pytest -from pathy import Pathy, get_client, set_client_params - +from pathy import Pathy, get_client, set_client_params, use_fs from . import has_gcs @@ -39,6 +38,7 @@ def test_gcs_as_uri(with_adapter: str, bucket: str) -> None: @pytest.mark.skipif(has_gcs, reason="requires gcs deps to NOT be installed") def test_gcs_import_error_missing_deps() -> None: + use_fs(False) with pytest.raises(ImportError): get_client("gs") @@ -58,9 +58,7 @@ def test_gcs_scandir_list_buckets( @pytest.mark.parametrize("adapter", GCS_ADAPTER) @pytest.mark.skipif(not has_gcs, reason="requires gcs") -def test_gcs_scandir_invalid_bucket_name( - with_adapter: str, bucket: str, other_bucket: str -) -> None: +def test_gcs_scandir_invalid_bucket_name(with_adapter: str) -> None: from pathy.gcs import ScanDirGCS root = Pathy("gs://invalid_h3gE_ds5daEf_Sdf15487t2n4/bar") diff --git a/pathy/_tests/test_pathy.py b/pathy/_tests/test_pathy.py index fd1bf07..88d033b 100644 --- a/pathy/_tests/test_pathy.py +++ b/pathy/_tests/test_pathy.py @@ -42,7 +42,7 @@ def test_pathy_fluid(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_path_to_local(with_adapter: str, bucket: str) -> None: - root: Pathy = Pathy.from_bucket(bucket) / ENV_ID / "to_local" + root: Pathy = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/to_local") foo_blob: Pathy = root / "foo" foo_blob.write_text("---") assert isinstance(foo_blob, Pathy) @@ -85,16 +85,16 @@ def test_pathy_stat(with_adapter: str, bucket: str) -> None: path = Pathy("fake-bucket-1234-0987/fake-key") with pytest.raises(ValueError): path.stat() - path = Pathy(f"gs://{bucket}/{ENV_ID}/stat/foo.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/stat/foo.txt") path.write_text("a-a-a-a-a-a-a") stat = path.stat() assert isinstance(stat, BlobStat) assert stat.size is not None and stat.size > 0 assert stat.last_modified is not None and stat.last_modified > 0 with pytest.raises(ValueError): - assert Pathy(f"gs://{bucket}").stat() + assert Pathy(f"{with_adapter}://{bucket}").stat() with pytest.raises(FileNotFoundError): - assert Pathy(f"gs://{bucket}/{ENV_ID}/stat/nonexistant_file.txt").stat() + assert Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/stat/nope.txt").stat() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) @@ -113,12 +113,12 @@ def test_pathy_exists(with_adapter: str, bucket: str) -> None: # invalid bucket name invalid_bucket = "unknown-bucket-name-123987519875419" - assert Pathy(f"gs://{invalid_bucket}").exists() is False - assert Pathy(f"gs://{invalid_bucket}/foo.blob").exists() is False + assert Pathy(f"{with_adapter}://{invalid_bucket}").exists() is False + assert Pathy(f"{with_adapter}://{invalid_bucket}/foo.blob").exists() is False # valid bucket with invalid object - assert Pathy(f"gs://{bucket}/not_found_lol_nice.txt").exists() is False + assert Pathy(f"{with_adapter}://{bucket}/not_found_lol_nice.txt").exists() is False - path = Pathy(f"gs://{bucket}/{ENV_ID}/directory/foo.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/directory/foo.txt") path.write_text("---") assert path.exists() for parent in path.parents: @@ -127,7 +127,7 @@ def test_pathy_exists(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_glob(with_adapter: str, bucket: str) -> None: - base = Pathy(f"gs://{bucket}/{ENV_ID}/glob/") + base = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/") root = base / "glob/" for i in range(3): path = root / f"{i}.file" @@ -159,7 +159,7 @@ def test_pathy_glob(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_unlink_path(with_adapter: str, bucket: str) -> None: - root = Pathy(f"gs://{bucket}/{ENV_ID}/unlink/") + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/unlink/") path = root / "404.txt" with pytest.raises(FileNotFoundError): path.unlink() @@ -172,7 +172,7 @@ def test_pathy_unlink_path(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_is_dir(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/is_dir/subfolder/another/my.file") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/is_dir/subfolder/another/my.file") path.write_text("---") assert path.is_dir() is False for parent in path.parents: @@ -181,7 +181,9 @@ def test_pathy_is_dir(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_is_file(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/is_file/subfolder/another/my.file") + path = Pathy( + f"{with_adapter}://{bucket}/{ENV_ID}/is_file/subfolder/another/my.file" + ) path.write_text("---") # The full file is a file assert path.is_file() is True @@ -192,7 +194,7 @@ def test_pathy_is_file(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_iterdir(with_adapter: str, bucket: str) -> None: - root = Pathy(f"gs://{bucket}/{ENV_ID}/iterdir/") + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/iterdir/") # (n) files in a folder for i in range(2): path = root / f"{i}.file" @@ -212,16 +214,18 @@ def test_pathy_iterdir(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_iterdir_pipstore(with_adapter: str, bucket: str) -> None: - path = Pathy.from_bucket(bucket) / f"{ENV_ID}/iterdir_pipstore/prodigy/prodigy.whl" + path = Pathy( + f"{with_adapter}://{bucket}/{ENV_ID}/iterdir_pipstore/prodigy/prodigy.whl" + ) path.write_bytes(b"---") - path = Pathy.from_bucket(bucket) / f"{ENV_ID}/iterdir_pipstore" + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/iterdir_pipstore") res = [e.name for e in sorted(path.iterdir())] assert res == ["prodigy"] @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_open_errors(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/open_errors/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/open_errors/file.txt") # Invalid open mode with pytest.raises(ValueError): path.open(mode="t") @@ -237,7 +241,7 @@ def test_pathy_open_errors(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_open_for_read(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/read/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/read/file.txt") path.write_text("---") with path.open() as file_obj: assert file_obj.read() == "---" @@ -246,18 +250,18 @@ def test_pathy_open_for_read(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_open_for_write(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/write/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/write/file.txt") with path.open(mode="w") as file_obj: file_obj.write("---") file_obj.writelines(["---"]) - path = Pathy(f"gs://{bucket}/{ENV_ID}/write/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/write/file.txt") with path.open() as file_obj: assert file_obj.read() == "------" @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_open_binary_read(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/read_binary/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/read_binary/file.txt") path.write_bytes(b"---") with path.open(mode="rb") as file_obj: assert file_obj.readlines() == [b"---"] @@ -268,7 +272,7 @@ def test_pathy_open_binary_read(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_readwrite_text(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/write_text/file.txt") + path = Pathy(f"s3://{bucket}/{ENV_ID}/write_text/file.txt") path.write_text("---") with path.open() as file_obj: assert file_obj.read() == "---" @@ -277,14 +281,14 @@ def test_pathy_readwrite_text(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_readwrite_bytes(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/write_bytes/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/write_bytes/file.txt") path.write_bytes(b"---") assert path.read_bytes() == b"---" @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_readwrite_lines(with_adapter: str, bucket: str) -> None: - path = Pathy(f"gs://{bucket}/{ENV_ID}/write_text/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/write_text/file.txt") with path.open("w") as file_obj: file_obj.writelines(["---"]) with path.open("r") as file_obj: @@ -295,7 +299,7 @@ def test_pathy_readwrite_lines(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_ls_blobs_with_stat(with_adapter: str, bucket: str) -> None: - root = Pathy(f"gs://{bucket}/{ENV_ID}/ls") + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/ls") for i in range(3): (root / f"file_{i}").write_text("NICE") files = list(root.ls()) @@ -311,9 +315,9 @@ def test_pathy_ls_blobs_with_stat(with_adapter: str, bucket: str) -> None: def test_pathy_owner(with_adapter: str, bucket: str) -> None: # Raises for invalid file with pytest.raises(FileNotFoundError): - Pathy(f"gs://{bucket}/{ENV_ID}/write_text/not_a_valid_blob").owner() + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/write_text/not_a_valid_blob").owner() - path = Pathy(f"gs://{bucket}/{ENV_ID}/write_text/file.txt") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/write_text/file.txt") path.write_text("---") # TODO: How to set file owner to non-None in GCS? Then assert here. # @@ -328,15 +332,15 @@ def test_pathy_owner(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_rename_files_in_bucket(with_adapter: str, bucket: str) -> None: # Rename a single file - from_blob = f"gs://{bucket}/{ENV_ID}/rename/file.txt" - to_blob = f"gs://{bucket}/{ENV_ID}/rename/other.txt" + from_blob = f"{with_adapter}://{bucket}/{ENV_ID}/rename/file.txt" + to_blob = f"{with_adapter}://{bucket}/{ENV_ID}/rename/other.txt" Pathy(from_blob).write_text("---") Pathy(from_blob).rename(to_blob) assert not Pathy(from_blob).exists() assert Pathy(to_blob).is_file() with pytest.raises(FileNotFoundError): - Pathy("gs://invlid_bkt_nme_18$%^@57582397/foo").rename(to_blob) + Pathy(f"{with_adapter}://invlid_bkt_nme_18$%^@57582397/foo").rename(to_blob) @pytest.mark.parametrize("adapter", TEST_ADAPTERS) @@ -344,26 +348,26 @@ def test_pathy_rename_files_across_buckets( with_adapter: str, bucket: str, other_bucket: str ) -> None: # Rename a single file across buckets - Pathy(f"gs://{bucket}/{ENV_ID}/rename/file.txt").write_text("---") - Pathy(f"gs://{bucket}/{ENV_ID}/rename/file.txt").rename( - f"gs://{other_bucket}/{ENV_ID}/rename/other.txt" + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/file.txt").write_text("---") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/file.txt").rename( + f"{with_adapter}://{other_bucket}/{ENV_ID}/rename/other.txt" ) - assert not Pathy(f"gs://{bucket}/{ENV_ID}/rename/file.txt").exists() - assert Pathy(f"gs://{other_bucket}/{ENV_ID}/rename/other.txt").is_file() + assert not Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/file.txt").exists() + assert Pathy(f"{with_adapter}://{other_bucket}/{ENV_ID}/rename/other.txt").is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_rename_folders_in_bucket(with_adapter: str, bucket: str) -> None: # Rename a folder in the same bucket - Pathy(f"gs://{bucket}/{ENV_ID}/rename/folder/one.txt").write_text("---") - Pathy(f"gs://{bucket}/{ENV_ID}/rename/folder/two.txt").write_text("---") - path = Pathy(f"gs://{bucket}/{ENV_ID}/rename/folder/") - new_path = Pathy(f"gs://{bucket}/{ENV_ID}/rename/other/") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/folder/one.txt").write_text("---") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/folder/two.txt").write_text("---") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/folder/") + new_path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/other/") path.rename(new_path) assert not path.exists() assert new_path.exists() - assert Pathy(f"gs://{bucket}/{ENV_ID}/rename/other/one.txt").is_file() - assert Pathy(f"gs://{bucket}/{ENV_ID}/rename/other/two.txt").is_file() + assert Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/other/one.txt").is_file() + assert Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/other/two.txt").is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) @@ -371,26 +375,30 @@ def test_pathy_rename_folders_across_buckets( with_adapter: str, bucket: str, other_bucket: str ) -> None: # Rename a folder across buckets - Pathy(f"gs://{bucket}/{ENV_ID}/rename/folder/one.txt").write_text("---") - Pathy(f"gs://{bucket}/{ENV_ID}/rename/folder/two.txt").write_text("---") - path = Pathy(f"gs://{bucket}/{ENV_ID}/rename/folder/") - new_path = Pathy(f"gs://{other_bucket}/{ENV_ID}/rename/other/") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/folder/one.txt").write_text("---") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/folder/two.txt").write_text("---") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rename/folder/") + new_path = Pathy(f"{with_adapter}://{other_bucket}/{ENV_ID}/rename/other/") path.rename(new_path) assert not path.exists() assert new_path.exists() - assert Pathy(f"gs://{other_bucket}/{ENV_ID}/rename/other/one.txt").is_file() - assert Pathy(f"gs://{other_bucket}/{ENV_ID}/rename/other/two.txt").is_file() + assert Pathy( + f"{with_adapter}://{other_bucket}/{ENV_ID}/rename/other/one.txt" + ).is_file() + assert Pathy( + f"{with_adapter}://{other_bucket}/{ENV_ID}/rename/other/two.txt" + ).is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_replace_files_in_bucket(with_adapter: str, bucket: str) -> None: # replace a single file - Pathy(f"gs://{bucket}/{ENV_ID}/replace/file.txt").write_text("---") - Pathy(f"gs://{bucket}/{ENV_ID}/replace/file.txt").replace( - f"gs://{bucket}/{ENV_ID}/replace/other.txt" + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/file.txt").write_text("---") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/file.txt").replace( + f"{with_adapter}://{bucket}/{ENV_ID}/replace/other.txt" ) - assert not Pathy(f"gs://{bucket}/{ENV_ID}/replace/file.txt").exists() - assert Pathy(f"gs://{bucket}/{ENV_ID}/replace/other.txt").is_file() + assert not Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/file.txt").exists() + assert Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/other.txt").is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) @@ -398,26 +406,32 @@ def test_pathy_replace_files_across_buckets( with_adapter: str, bucket: str, other_bucket: str ) -> None: # Rename a single file across buckets - Pathy(f"gs://{bucket}/{ENV_ID}/replace/file.txt").write_text("---") - Pathy(f"gs://{bucket}/{ENV_ID}/replace/file.txt").replace( - f"gs://{other_bucket}/{ENV_ID}/replace/other.txt" + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/file.txt").write_text("---") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/file.txt").replace( + f"{with_adapter}://{other_bucket}/{ENV_ID}/replace/other.txt" ) - assert not Pathy(f"gs://{bucket}/{ENV_ID}/replace/file.txt").exists() - assert Pathy(f"gs://{other_bucket}/{ENV_ID}/replace/other.txt").is_file() + assert not Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/file.txt").exists() + assert Pathy( + f"{with_adapter}://{other_bucket}/{ENV_ID}/replace/other.txt" + ).is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_replace_folders_in_bucket(with_adapter: str, bucket: str) -> None: # Rename a folder in the same bucket - Pathy(f"gs://{bucket}/{ENV_ID}/replace/folder/one.txt").write_text("---") - Pathy(f"gs://{bucket}/{ENV_ID}/replace/folder/two.txt").write_text("---") - path = Pathy(f"gs://{bucket}/{ENV_ID}/replace/folder/") - new_path = Pathy(f"gs://{bucket}/{ENV_ID}/replace/other/") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/folder/one.txt").write_text( + "---" + ) + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/folder/two.txt").write_text( + "---" + ) + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/folder/") + new_path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/other/") path.replace(new_path) assert not path.exists() assert new_path.exists() - assert Pathy(f"gs://{bucket}/{ENV_ID}/replace/other/one.txt").is_file() - assert Pathy(f"gs://{bucket}/{ENV_ID}/replace/other/two.txt").is_file() + assert Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/other/one.txt").is_file() + assert Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/other/two.txt").is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) @@ -425,31 +439,41 @@ def test_pathy_replace_folders_across_buckets( with_adapter: str, bucket: str, other_bucket: str ) -> None: # Rename a folder across buckets - Pathy(f"gs://{bucket}/{ENV_ID}/replace/folder/one.txt").write_text("---") - Pathy(f"gs://{bucket}/{ENV_ID}/replace/folder/two.txt").write_text("---") - path = Pathy(f"gs://{bucket}/{ENV_ID}/replace/folder/") - new_path = Pathy(f"gs://{other_bucket}/{ENV_ID}/replace/other/") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/folder/one.txt").write_text( + "---" + ) + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/folder/two.txt").write_text( + "---" + ) + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/replace/folder/") + new_path = Pathy(f"{with_adapter}://{other_bucket}/{ENV_ID}/replace/other/") path.replace(new_path) assert not path.exists() assert new_path.exists() - assert Pathy(f"gs://{other_bucket}/{ENV_ID}/replace/other/one.txt").is_file() - assert Pathy(f"gs://{other_bucket}/{ENV_ID}/replace/other/two.txt").is_file() + assert Pathy( + f"{with_adapter}://{other_bucket}/{ENV_ID}/replace/other/one.txt" + ).is_file() + assert Pathy( + f"{with_adapter}://{other_bucket}/{ENV_ID}/replace/other/two.txt" + ).is_file() @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_rmdir(with_adapter: str, bucket: str) -> None: - blob = Pathy(f"gs://{bucket}/{ENV_ID}/rmdir/one.txt") + blob = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rmdir/one.txt") blob.write_text("---") # Cannot rmdir a blob with pytest.raises(NotADirectoryError): blob.rmdir() - Pathy(f"gs://{bucket}/{ENV_ID}/rmdir/folder/two.txt").write_text("---") - path = Pathy(f"gs://{bucket}/{ENV_ID}/rmdir/") + Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rmdir/folder/two.txt").write_text("---") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rmdir/") path.rmdir() - assert not Pathy(f"gs://{bucket}/{ENV_ID}/rmdir/one.txt").is_file() - assert not Pathy(f"gs://{bucket}/{ENV_ID}/rmdir/other/two.txt").is_file() + assert not Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rmdir/one.txt").is_file() + assert not Pathy( + f"{with_adapter}://{bucket}/{ENV_ID}/rmdir/other/two.txt" + ).is_file() assert not path.exists() # Cannot rmdir an invalid folder @@ -459,9 +483,9 @@ def test_pathy_rmdir(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_samefile(with_adapter: str, bucket: str) -> None: - blob_str = f"gs://{bucket}/{ENV_ID}/samefile/one.txt" + blob_str = f"{with_adapter}://{bucket}/{ENV_ID}/samefile/one.txt" blob_one = Pathy(blob_str) - blob_two = Pathy(f"gs://{bucket}/{ENV_ID}/samefile/two.txt") + blob_two = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/samefile/two.txt") blob_one.touch() blob_two.touch() assert blob_one.samefile(blob_two) is False @@ -474,7 +498,7 @@ def test_pathy_samefile(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_touch(with_adapter: str, bucket: str) -> None: - blob = Pathy(f"gs://{bucket}/{ENV_ID}/touch/one.txt") + blob = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/touch/one.txt") if blob.is_file(): blob.unlink() # The blob doesn't exist @@ -494,10 +518,13 @@ def test_pathy_touch(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) def test_pathy_rglob_unlink(with_adapter: str, bucket: str) -> None: - files = [f"gs://{bucket}/{ENV_ID}/rglob_and_unlink/{i}.file.txt" for i in range(3)] + files = [ + f"{with_adapter}://{bucket}/{ENV_ID}/rglob_and_unlink/{i}.file.txt" + for i in range(3) + ] for file in files: Pathy(file).write_text("---") - path = Pathy(f"gs://{bucket}/{ENV_ID}/rglob_and_unlink/") + path = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/rglob_and_unlink/") for blob in path.rglob("*"): blob.unlink() # All the files are gone @@ -508,10 +535,10 @@ def test_pathy_rglob_unlink(with_adapter: str, bucket: str) -> None: @pytest.mark.parametrize("adapter", TEST_ADAPTERS) -def test_pathy_mkdir(with_adapter: str, bucket: str) -> None: +def test_pathy_mkdir(with_adapter: str) -> None: bucket_name = f"pathy-e2e-test-{uuid4().hex}" # Create a bucket - path = Pathy(f"gs://{bucket_name}/") + path = Pathy(f"{with_adapter}://{bucket_name}/") path.mkdir() assert path.exists() # Does not assert if it already exists @@ -528,7 +555,7 @@ def test_pathy_ignore_extension(with_adapter: str, bucket: str) -> None: """The smart_open library does automatic decompression based on the filename. We disable that to avoid errors, e.g. if you have a .tar.gz file that isn't gzipped.""" - not_targz = Pathy.from_bucket(bucket) / ENV_ID / "ignore_ext/one.tar.gz" + not_targz = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/ignore_ext/one.tar.gz") fixture_tar = Path(__file__).parent / "fixtures" / "tar_but_not_gzipped.tar.gz" not_targz.write_bytes(fixture_tar.read_bytes()) again = not_targz.read_bytes() diff --git a/pathy/_tests/test_s3.py b/pathy/_tests/test_s3.py new file mode 100644 index 0000000..c57b95a --- /dev/null +++ b/pathy/_tests/test_s3.py @@ -0,0 +1,68 @@ +import pytest + +from pathy import Pathy, get_client, use_fs + +from . import has_s3 +from .conftest import ENV_ID + +S3_ADAPTER = ["s3"] + + +@pytest.mark.parametrize("adapter", S3_ADAPTER) +@pytest.mark.skipif(not has_s3, reason="requires s3") +def test_s3_scandir_list_buckets( + with_adapter: str, bucket: str, other_bucket: str +) -> None: + from pathy.s3 import ScanDirS3 + + root = Pathy("s3://foo/bar") + client = root._accessor.client(root) # type:ignore + scandir = ScanDirS3(client=client, path=Pathy()) + buckets = [s.name for s in scandir] + assert bucket in buckets + assert other_bucket in buckets + + +@pytest.mark.parametrize("adapter", S3_ADAPTER) +@pytest.mark.skipif(not has_s3, reason="requires s3") +def test_s3_scandir_scandir_continuation_token( + with_adapter: str, bucket: str, other_bucket: str +) -> None: + from pathy.s3 import ScanDirS3 + + root = Pathy(f"{with_adapter}://{bucket}/{ENV_ID}/s3_scandir_pagination/") + for i in range(8): + (root / f"file{i}.blob").touch() + client = root._accessor.client(root) # type:ignore + scandir = ScanDirS3(client=client, path=root, prefix=root.prefix, page_size=4) + blobs = [s.name for s in scandir] + assert len(blobs) == 8 + + +@pytest.mark.parametrize("adapter", S3_ADAPTER) +@pytest.mark.skipif(not has_s3, reason="requires s3") +def test_s3_scandir_invalid_bucket_name(with_adapter: str) -> None: + from pathy.s3 import ScanDirS3 + + root = Pathy(f"{with_adapter}://invalid_h3gE_ds5daEf_Sdf15487t2n4/bar") + client = root._accessor.client(root) # type:ignore + scandir = ScanDirS3(client=client, path=root) + assert len(list(scandir)) == 0 + + +@pytest.mark.parametrize("adapter", S3_ADAPTER) +@pytest.mark.skipif(not has_s3, reason="requires s3") +def test_s3_bucket_client_list_blobs(with_adapter: str, bucket: str) -> None: + """Test corner-case in S3 client that isn't easily reachable from Pathy""" + from pathy.s3 import BucketClientS3 + + client: BucketClientS3 = get_client("s3") + root = Pathy("s3://invalid_h3gE_ds5daEf_Sdf15487t2n4") + assert len(list(client.list_blobs(root))) == 0 + + +@pytest.mark.skipif(has_s3, reason="requires s3 deps to NOT be installed") +def test_s3_import_error_missing_deps() -> None: + use_fs(False) + with pytest.raises(ImportError): + get_client("s3") diff --git a/pathy/s3.py b/pathy/s3.py new file mode 100644 index 0000000..1ab8e01 --- /dev/null +++ b/pathy/s3.py @@ -0,0 +1,263 @@ +from dataclasses import dataclass +from typing import Any, Dict, Generator, List, Optional + +try: + import boto3 # type:ignore + from botocore.client import ClientError + from botocore.exceptions import ParamValidationError + + S3NativeClient = Any + S3NativeBucket = Any +except (ImportError, ModuleNotFoundError): + raise ImportError( + """You are using the S3 functionality of Pathy without + having the required dependencies installed. + + Please try installing them: + + pip install pathy[s3] + + """ + ) + +from . import ( + Blob, + Bucket, + BucketClient, + BucketEntry, + PathyScanDir, + PurePathy, + register_client, +) + + +class BucketEntryS3(BucketEntry): + bucket: "BucketS3" + raw: Any + + +@dataclass +class BlobS3(Blob): + client: S3NativeClient + bucket: "BucketS3" + + def delete(self) -> None: + self.client.delete_object(Bucket=self.bucket.name, Key=self.name) + + def exists(self) -> bool: + response = self.client.list_objects_v2( + Bucket=self.bucket.name, Prefix=self.name + ) + objects = response.get("Contents", []) + matched = [o["Key"] for o in objects if o["Key"] == self.name] + return len(matched) > 0 + + +@dataclass +class BucketS3(Bucket): + name: str + client: S3NativeClient + bucket: S3NativeBucket + + def get_blob(self, blob_name: str) -> Optional[BlobS3]: + blob_stat: Dict[str, Any] + try: + blob_stat = self.client.head_object(Bucket=self.name, Key=blob_name) + except ClientError: + return None + updated = blob_stat["LastModified"].timestamp() + size = blob_stat["ContentLength"] + return BlobS3( + client=self.client, + bucket=self, + owner=None, # type:ignore + name=blob_name, # type:ignore + raw=None, + size=size, + updated=int(updated), # type:ignore + ) + + def copy_blob( # type:ignore[override] + self, blob: BlobS3, target: "BucketS3", name: str + ) -> Optional[BlobS3]: + source = {"Bucket": blob.bucket.name, "Key": blob.name} + self.client.copy(source, target.name, name) + pathy_blob: Optional[BlobS3] = self.get_blob(name) + assert pathy_blob is not None, "copy failed" + assert pathy_blob.updated is not None, "new blobl has invalid updated time" + return BlobS3( + client=self.client, + bucket=self.bucket, + owner=None, + name=name, + raw=pathy_blob, + size=pathy_blob.size, + updated=pathy_blob.updated, + ) + + def delete_blob(self, blob: BlobS3) -> None: # type:ignore[override] + self.client.delete_object(Bucket=self.name, Key=blob.name) + + def delete_blobs(self, blobs: List[BlobS3]) -> None: # type:ignore[override] + for blob in blobs: + self.delete_blob(blob) + + def exists(self) -> bool: + # TODO: are you sure this always holds? + # + # S3 buckets don't make it this far if they don't exist. The BucketS3 instance + # is not instantiated unless a metadata check on the bucket passes. + return True + + +class BucketClientS3(BucketClient): + client: S3NativeClient + _session: Optional[Any] + + @property + def client_params(self) -> Dict[str, Any]: + return dict() if self._session is None else dict(session=self._session) + + def __init__(self, **kwargs: Any) -> None: + self.recreate(**kwargs) + + def recreate(self, **kwargs: Any) -> None: + key_id = kwargs.get("key_id", None) + key_secret = kwargs.get("key_secret", None) + boto_session: Any = boto3 + if key_id is not None and key_secret is not None: + self._session = boto_session = boto3.Session( # type:ignore + aws_access_key_id=key_id, + aws_secret_access_key=key_secret, + ) + self.client = boto_session.client("s3") # type:ignore + + def make_uri(self, path: PurePathy) -> str: + return str(path) + + def create_bucket( # type:ignore[override] + self, path: PurePathy + ) -> S3NativeBucket: + return self.client.create_bucket(Bucket=path.root) # type:ignore + + def delete_bucket(self, path: PurePathy) -> None: + self.client.delete_bucket(Bucket=path.root) + + def exists(self, path: PurePathy) -> bool: + # Because we want all the parents of a valid blob (e.g. "directory" in + # "directory/foo.file") to return True, we enumerate the blobs with a prefix + # and compare the object names to see if they match a substring of the path + key_name = str(path.key) + for obj in self.list_blobs(path): + if obj.name.startswith(key_name + path._flavour.sep): # type:ignore + return True + return False + + def lookup_bucket(self, path: PurePathy) -> Optional[BucketS3]: + try: + return self.get_bucket(path) + except FileNotFoundError: + return None + + def get_bucket(self, path: PurePathy) -> BucketS3: + try: + native_bucket = self.client.head_bucket(Bucket=path.root) + return BucketS3(str(path.root), client=self.client, bucket=native_bucket) + except (ClientError, ParamValidationError): + raise FileNotFoundError(f"Bucket {path.root} does not exist!") + + def list_buckets( # type:ignore[override] + self, **kwargs: Dict[str, Any] + ) -> Generator[S3NativeBucket, None, None]: + native_buckets = self.client.list_buckets(**kwargs)["Buckets"] + results = (BucketS3(n["Name"], self.client, n) for n in native_buckets) + return results + + def scandir( # type:ignore[override] + self, + path: Optional[PurePathy] = None, + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + ) -> PathyScanDir: + return ScanDirS3(client=self, path=path, prefix=prefix, delimiter=delimiter) + + def list_blobs( + self, + path: PurePathy, + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + ) -> Generator[BlobS3, None, None]: + bucket = self.lookup_bucket(path) + if bucket is None: + return + paginator = self.client.get_paginator("list_objects_v2") + kwargs = {"Bucket": bucket.name} + if prefix is not None: + kwargs["Prefix"] = prefix + for page in paginator.paginate(**kwargs): + for item in page.get("Contents", []): + yield BlobS3( + client=self.client, + bucket=bucket, + owner=None, + name=item["Key"], + raw=item, + size=item["Size"], + updated=int(item["LastModified"].timestamp()), + ) + + +class ScanDirS3(PathyScanDir): + _client: BucketClientS3 + + def __init__( + self, + client: BucketClient, + path: Optional[PurePathy] = None, + prefix: Optional[str] = None, + delimiter: Optional[str] = None, + page_size: Optional[int] = None, + ) -> None: + super().__init__(client=client, path=path, prefix=prefix, delimiter=delimiter) + self._page_size = page_size + + def scandir(self) -> Generator[BucketEntryS3, None, None]: + if self._path is None or not self._path.root: + s3_bucket: BucketS3 + for s3_bucket in self._client.list_buckets(): + yield BucketEntryS3(s3_bucket.name, is_dir=True, raw=None) + return + sep = self._path._flavour.sep # type:ignore + bucket = self._client.lookup_bucket(self._path) + if bucket is None: + return + + kwargs: Any = {"Bucket": bucket.name, "Delimiter": sep} + if self._prefix is not None: + kwargs["Prefix"] = self._prefix + if self._page_size is not None: + kwargs["MaxKeys"] = self._page_size + continuation_token: Optional[str] = None + while True: + if continuation_token: + kwargs["ContinuationToken"] = continuation_token + response = self._client.client.list_objects_v2(**kwargs) + for folder in response.get("CommonPrefixes", []): + prefix = folder["Prefix"] + full_name = prefix[:-1] if prefix.endswith(sep) else prefix + name = full_name.split(sep)[-1] + yield BucketEntryS3(name, is_dir=True) + for file in response.get("Contents", ()): + name = file["Key"].split(sep)[-1] + yield BucketEntryS3( + name=name, + is_dir=False, + size=file["Size"], + last_modified=int(file["LastModified"].timestamp()), + ) + if not response.get("IsTruncated"): + break + continuation_token = response.get("NextContinuationToken") + + +register_client("s3", BucketClientS3) diff --git a/setup.py b/setup.py index 1be2900..4fce1bb 100644 --- a/setup.py +++ b/setup.py @@ -20,6 +20,7 @@ def setup_package(): long_description = f.read() extras = { "gcs": ["google-cloud-storage>=1.26.0,<2.0.0"], + "s3": ["boto3"], "test": ["pytest", "pytest-coverage", "mock", "typer-cli"], } extras["all"] = [item for group in extras.values() for item in group]