From e9924db57a33561cdc2637df4c801c88a7b166e6 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Mon, 3 Apr 2023 11:56:33 -0500 Subject: [PATCH 1/7] Adds push and pull steps for S3 --- prefect_aws/projects/__init__.py | 0 prefect_aws/projects/steps.py | 83 ++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) create mode 100644 prefect_aws/projects/__init__.py create mode 100644 prefect_aws/projects/steps.py diff --git a/prefect_aws/projects/__init__.py b/prefect_aws/projects/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/prefect_aws/projects/steps.py b/prefect_aws/projects/steps.py new file mode 100644 index 00000000..5e418acb --- /dev/null +++ b/prefect_aws/projects/steps.py @@ -0,0 +1,83 @@ +from pathlib import Path, PurePosixPath +from typing import Dict, Optional +from typing_extensions import TypedDict +import boto3 +from prefect.utilities.filesystem import filter_files + + +class PushProjectToS3Output(TypedDict): + bucket: str + folder: str + +class PullProjectFromS3Output(TypedDict): + bucket: str + folder: str + directory: str + + +def push_project_to_s3( + bucket: str, + folder: str, + credentials: Optional[Dict] = None, + client_parameters: Optional[Dict] = None, + ignore_file: Optional[str] = ".prefectignore", +) -> PushProjectToS3Output: + if credentials is None: + credentials = {} + if client_parameters is None: + client_parameters = {} + client = boto3.client("s3", **credentials, **client_parameters) + + local_path = Path.cwd() + + included_files = None + if ignore_file: + with open(ignore_file, "r") as f: + ignore_patterns = f.readlines() + + included_files = filter_files(str(local_path), ignore_patterns) + + for local_file_path in local_path.expanduser().rglob("*"): + if ( + included_files is not None + and str(local_file_path.relative_to(local_path)) not in included_files + ): + continue + elif not local_file_path.is_dir(): + remote_file_path = Path(folder) / local_file_path.relative_to(local_path) + client.upload_file(str(local_file_path), bucket, str(remote_file_path)) + + return { + "bucket": bucket, + "folder": folder, + } + + +def pull_project_from_s3( + bucket: str, + folder: str, + credentials: Optional[Dict] = None, + client_parameters: Optional[Dict] = None, +) -> PullProjectFromS3Output: + + if credentials is None: + credentials = {} + if client_parameters is None: + client_parameters = {} + bucket_resource = boto3.Session(**credentials).resource("s3").Bucket(bucket) + + local_path = Path.cwd() + for obj in bucket_resource.objects.filter(Prefix=folder): + if obj.key[-1] == "/": + # object is a folder and will be created if it contains any objects + continue + target = local_path / PurePosixPath(obj.key).relative_to(folder) + Path.mkdir(target.parent, parents=True, exist_ok=True) + bucket_resource.download_file(obj.key, str(target)) + + return { + "bucket": bucket, + "folder": folder, + "directory": str(local_path), + } + From f236fcfc6d8980a9874683aa5c59a2060a64dd42 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Mon, 3 Apr 2023 13:27:37 -0500 Subject: [PATCH 2/7] Adds tests --- prefect_aws/projects/steps.py | 2 +- tests/projects/test_steps.py | 225 ++++++++++++++++++++++++++++++++++ 2 files changed, 226 insertions(+), 1 deletion(-) create mode 100644 tests/projects/test_steps.py diff --git a/prefect_aws/projects/steps.py b/prefect_aws/projects/steps.py index 5e418acb..68bb1aa4 100644 --- a/prefect_aws/projects/steps.py +++ b/prefect_aws/projects/steps.py @@ -31,7 +31,7 @@ def push_project_to_s3( local_path = Path.cwd() included_files = None - if ignore_file: + if ignore_file and Path(ignore_file).exists(): with open(ignore_file, "r") as f: ignore_patterns = f.readlines() diff --git a/tests/projects/test_steps.py b/tests/projects/test_steps.py new file mode 100644 index 00000000..8aae5c8e --- /dev/null +++ b/tests/projects/test_steps.py @@ -0,0 +1,225 @@ +import os +from pathlib import Path, PurePosixPath +import boto3 +import pytest +from moto import mock_s3 +from prefect_aws.projects.steps import ( + push_project_to_s3, + pull_project_from_s3, +) + + +@pytest.fixture +def s3_setup(): + with mock_s3(): + bucket_name = "my-test-bucket" + s3 = boto3.client("s3") + s3.create_bucket(Bucket=bucket_name) + yield s3, bucket_name + + +@pytest.fixture +def tmp_files(tmp_path: Path): + files = [ + "testfile1.txt", + "testfile2.txt", + "testfile3.txt", + "testdir1/testfile4.txt", + "testdir2/testfile5.txt", + ] + + (tmp_path / ".prefectignore").write_text(""" + testdir1/* + .prefectignore + """) + + for file in files: + filepath = tmp_path / file + filepath.parent.mkdir(parents=True, exist_ok=True) + filepath.write_text("Sample text") + + return tmp_path + +@pytest.fixture +def mock_aws_credentials(monkeypatch): + # Set mock environment variables for AWS credentials + monkeypatch.setenv("AWS_ACCESS_KEY_ID", "test_access_key") + monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "test_secret_key") + monkeypatch.setenv("AWS_SESSION_TOKEN", "test_session_token") + + # Yield control back to the test function + yield + + # Clean up by deleting the mock environment variables + monkeypatch.delenv("AWS_ACCESS_KEY_ID", raising=False) + monkeypatch.delenv("AWS_SECRET_ACCESS_KEY", raising=False) + monkeypatch.delenv("AWS_SESSION_TOKEN", raising=False) + + +def test_push_project_to_s3(s3_setup, tmp_files, mock_aws_credentials): + s3, bucket_name = s3_setup + folder = "my-project" + + os.chdir(tmp_files) + + push_project_to_s3(bucket_name, folder) + + s3_objects = s3.list_objects_v2(Bucket=bucket_name) + object_keys = [item["Key"] for item in s3_objects["Contents"]] + + expected_keys = [ + f"{folder}/testfile1.txt", + f"{folder}/testfile2.txt", + f"{folder}/testfile3.txt", + f"{folder}/testdir2/testfile5.txt", + ] + + assert set(object_keys) == set(expected_keys) + + +def test_pull_project_from_s3(s3_setup, tmp_path, mock_aws_credentials): + s3, bucket_name = s3_setup + folder = "my-project" + + files = { + f"{folder}/testfile1.txt": "Hello, world!", + f"{folder}/testfile2.txt": "Test content", + f"{folder}/testdir1/testfile3.txt": "Nested file", + } + + for key, content in files.items(): + s3.put_object(Bucket=bucket_name, Key=key, Body=content) + + os.chdir(tmp_path) + pull_project_from_s3(bucket_name, folder) + + for key, content in files.items(): + target = Path(tmp_path) / PurePosixPath(key).relative_to(folder) + assert target.exists() + assert target.read_text() == content + +def test_push_pull_empty_folders(s3_setup, tmp_path, mock_aws_credentials): + s3, bucket_name = s3_setup + folder = "my-project" + + # Create empty folders + (tmp_path / "empty1").mkdir() + (tmp_path / "empty2").mkdir() + + # Create test files + (tmp_path / "testfile1.txt").write_text("Sample text") + (tmp_path / "testfile2.txt").write_text("Sample text") + + os.chdir(tmp_path) + + # Push to S3 + push_project_to_s3(bucket_name, folder) + + # Check if the empty folders are not uploaded + s3_objects = s3.list_objects_v2(Bucket=bucket_name) + object_keys = [item["Key"] for item in s3_objects["Contents"]] + + assert f"{folder}/empty1/" not in object_keys + assert f"{folder}/empty2/" not in object_keys + + # Pull from S3 + pull_project_from_s3(bucket_name, folder) + + # Check if the empty folders are not created + assert not (tmp_path / "empty1_copy").exists() + assert not (tmp_path / "empty2_copy").exists() + +def test_custom_credentials_and_client_parameters(s3_setup, tmp_files): + s3, bucket_name = s3_setup + folder = "my-project" + + # Custom credentials and client parameters + custom_credentials = { + "aws_access_key_id": "fake_access_key", + "aws_secret_access_key": "fake_secret_key", + } + + custom_client_parameters = { + "region_name": "us-west-1", + } + + os.chdir(tmp_files) + + # Test push_project_to_s3 with custom credentials and client parameters + push_project_to_s3( + bucket_name, + folder, + credentials=custom_credentials, + client_parameters=custom_client_parameters, + ) + + # Test pull_project_from_s3 with custom credentials and client parameters + tmp_path = tmp_files / "test_pull" + tmp_path.mkdir(parents=True, exist_ok=True) + os.chdir(tmp_path) + + pull_project_from_s3( + bucket_name, + folder, + credentials=custom_credentials, + client_parameters=custom_client_parameters, + ) + + for file in tmp_files.iterdir(): + if file.is_file() and file.name != ".prefectignore": + assert (tmp_path / file.name).exists() + + +def test_without_prefectignore_file(s3_setup, tmp_files: Path, mock_aws_credentials): + s3, bucket_name = s3_setup + folder = "my-project" + + # Remove the .prefectignore file + (tmp_files / ".prefectignore").unlink() + + os.chdir(tmp_files) + + # Test push_project_to_s3 without .prefectignore file + push_project_to_s3(bucket_name, folder) + + # Test pull_project_from_s3 without .prefectignore file + tmp_path = tmp_files / "test_pull" + tmp_path.mkdir(parents=True, exist_ok=True) + os.chdir(tmp_path) + + pull_project_from_s3(bucket_name, folder) + + for file in tmp_files.iterdir(): + if file.is_file(): + assert (tmp_path / file.name).exists() + + +def test_prefectignore_with_comments_and_empty_lines(s3_setup, tmp_files: Path, mock_aws_credentials): + s3, bucket_name = s3_setup + folder = "my-project" + + # Update the .prefectignore file with comments and empty lines + (tmp_files / ".prefectignore").write_text( + """ + # This is a comment + testdir1/* + + .prefectignore + """ + ) + + os.chdir(tmp_files) + + # Test push_project_to_s3 + push_project_to_s3(bucket_name, folder) + + # Test pull_project_from_s3 + tmp_path = tmp_files / "test_pull" + tmp_path.mkdir(parents=True, exist_ok=True) + os.chdir(tmp_path) + + pull_project_from_s3(bucket_name, folder) + + for file in tmp_files.iterdir(): + if file.is_file() and file.name != ".prefectignore": + assert (tmp_path / file.name).exists() From fd18df8748d34ed3e04f4b30673045c7889b96fa Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Mon, 3 Apr 2023 16:00:41 -0500 Subject: [PATCH 3/7] Updates docs --- docs/projects/steps.md | 6 +++ mkdocs.yml | 2 + prefect_aws/projects/steps.py | 89 +++++++++++++++++++++++++++++++++-- 3 files changed, 94 insertions(+), 3 deletions(-) create mode 100644 docs/projects/steps.md diff --git a/docs/projects/steps.md b/docs/projects/steps.md new file mode 100644 index 00000000..84189bdd --- /dev/null +++ b/docs/projects/steps.md @@ -0,0 +1,6 @@ +--- +description: Prefect project steps for managing project code storage via AWS S3. +notes: This documentation page is generated from source file docstrings. +--- + +::: prefect_aws.projects.steps \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index bd96844e..1854f226 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -80,6 +80,8 @@ nav: - Client Waiter: client_waiter.md - Credentials: credentials.md - ECS: ecs.md + - Projects: + - Steps: projects/steps.md - S3: s3.md - Secrets Manager: secrets_manager.md diff --git a/prefect_aws/projects/steps.py b/prefect_aws/projects/steps.py index 68bb1aa4..44871fbc 100644 --- a/prefect_aws/projects/steps.py +++ b/prefect_aws/projects/steps.py @@ -1,15 +1,28 @@ +""" +Prefect project steps for code storage and retrieval in S3 and S3 compatible services. +""" from pathlib import Path, PurePosixPath from typing import Dict, Optional -from typing_extensions import TypedDict + import boto3 from prefect.utilities.filesystem import filter_files +from typing_extensions import TypedDict class PushProjectToS3Output(TypedDict): + """ + The output of the `push_project_to_s3` step. + """ + bucket: str folder: str + class PullProjectFromS3Output(TypedDict): + """ + The output of the `pull_project_from_s3` step. + """ + bucket: str folder: str directory: str @@ -22,6 +35,43 @@ def push_project_to_s3( client_parameters: Optional[Dict] = None, ignore_file: Optional[str] = ".prefectignore", ) -> PushProjectToS3Output: + """ + Pushes the contents of the current working directory to an S3 bucket, + excluding files and folders specified in the ignore_file. + + Args: + bucket: The name of the S3 bucket where the project files will be uploaded. + folder: The folder in the S3 bucket where the project files will be uploaded. + credentials: A dictionary of AWS credentials (aws_access_key_id, + aws_secret_access_key, aws_session_token). + client_parameters: A dictionary of additional parameters to pass to the boto3 + client. + ignore_file: The name of the file containing ignore patterns. + + Returns: + A dictionary containing the bucket and folder where the project was uploaded. + + Examples: + Push a project to an S3 bucket: + ```yaml + build: + - prefect_aws.projects.steps.push_project_to_s3: + requires: prefect-aws + bucket: my-bucket + folder: my-project + ``` + + Push a project to an S3 bucket using credentials stored in a block: + ```yaml + build: + - prefect_aws.projects.steps.push_project_to_s3: + requires: prefect-aws + bucket: my-bucket + folder: my-project + credentials: "{{ prefect.blocks.aws-credentials.dev-credentials }}" + ``` + + """ if credentials is None: credentials = {} if client_parameters is None: @@ -59,13 +109,47 @@ def pull_project_from_s3( credentials: Optional[Dict] = None, client_parameters: Optional[Dict] = None, ) -> PullProjectFromS3Output: + """ + Pulls the contents of a project from an S3 bucket to the current working directory. + + Args: + bucket: The name of the S3 bucket where the project files are stored. + folder: The folder in the S3 bucket where the project files are stored. + credentials: A dictionary of AWS credentials (aws_access_key_id, + aws_secret_access_key, aws_session_token). + client_parameters: A dictionary of additional parameters to pass to the + boto3 client. + Returns: + A dictionary containing the bucket, folder, and local directory where the + project files were downloaded. + + Examples: + Pull a project from S3 using the default credentials and client parameters: + ```yaml + build: + - prefect_aws.projects.steps.pull_project_from_s3: + requires: prefect-aws + bucket: my-bucket + folder: my-project + ``` + + Pull a project from S3 using credentials stored in a block: + ```yaml + build: + - prefect_aws.projects.steps.pull_project_from_s3: + requires: prefect-aws + bucket: my-bucket + folder: my-project + credentials: "{{ prefect.blocks.aws-credentials.dev-credentials }}" + ``` + """ if credentials is None: credentials = {} if client_parameters is None: client_parameters = {} bucket_resource = boto3.Session(**credentials).resource("s3").Bucket(bucket) - + local_path = Path.cwd() for obj in bucket_resource.objects.filter(Prefix=folder): if obj.key[-1] == "/": @@ -80,4 +164,3 @@ def pull_project_from_s3( "folder": folder, "directory": str(local_path), } - From 3ac7e2444eb566bc773b4d99ee44e27bf130ebff Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Mon, 3 Apr 2023 16:05:34 -0500 Subject: [PATCH 4/7] Adds changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4092e92..8fb7cd25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Support for unsigned AWS requests - [#220](https://github.com/PrefectHQ/prefect-aws/pull/220) +- Added push and pull project steps for S3 = [#229](https://github.com/PrefectHQ/prefect-aws/pull/229) ### Changed From 419d9be3ef40a1a0a23e829663f08e50b8816e0a Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Mon, 3 Apr 2023 16:35:06 -0500 Subject: [PATCH 5/7] Handles advanced config for boto3 clients --- prefect_aws/projects/steps.py | 13 +++++++++++-- tests/projects/test_steps.py | 21 ++++++++++++++------- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/prefect_aws/projects/steps.py b/prefect_aws/projects/steps.py index 44871fbc..9c3fef6f 100644 --- a/prefect_aws/projects/steps.py +++ b/prefect_aws/projects/steps.py @@ -5,6 +5,7 @@ from typing import Dict, Optional import boto3 +from botocore.client import Config from prefect.utilities.filesystem import filter_files from typing_extensions import TypedDict @@ -76,7 +77,10 @@ def push_project_to_s3( credentials = {} if client_parameters is None: client_parameters = {} - client = boto3.client("s3", **credentials, **client_parameters) + advanced_config = client_parameters.pop("config", {}) + client = boto3.client( + "s3", **credentials, **client_parameters, config=Config(**advanced_config) + ) local_path = Path.cwd() @@ -148,7 +152,12 @@ def pull_project_from_s3( credentials = {} if client_parameters is None: client_parameters = {} - bucket_resource = boto3.Session(**credentials).resource("s3").Bucket(bucket) + advanced_config = client_parameters.pop("config", {}) + bucket_resource = ( + boto3.Session(**credentials) + .resource("s3", **client_parameters, config=Config(**advanced_config)) + .Bucket(bucket) + ) local_path = Path.cwd() for obj in bucket_resource.objects.filter(Prefix=folder): diff --git a/tests/projects/test_steps.py b/tests/projects/test_steps.py index 8aae5c8e..c2833663 100644 --- a/tests/projects/test_steps.py +++ b/tests/projects/test_steps.py @@ -1,12 +1,11 @@ import os from pathlib import Path, PurePosixPath + import boto3 import pytest from moto import mock_s3 -from prefect_aws.projects.steps import ( - push_project_to_s3, - pull_project_from_s3, -) + +from prefect_aws.projects.steps import pull_project_from_s3, push_project_to_s3 @pytest.fixture @@ -28,10 +27,12 @@ def tmp_files(tmp_path: Path): "testdir2/testfile5.txt", ] - (tmp_path / ".prefectignore").write_text(""" + (tmp_path / ".prefectignore").write_text( + """ testdir1/* .prefectignore - """) + """ + ) for file in files: filepath = tmp_path / file @@ -40,6 +41,7 @@ def tmp_files(tmp_path: Path): return tmp_path + @pytest.fixture def mock_aws_credentials(monkeypatch): # Set mock environment variables for AWS credentials @@ -98,6 +100,7 @@ def test_pull_project_from_s3(s3_setup, tmp_path, mock_aws_credentials): assert target.exists() assert target.read_text() == content + def test_push_pull_empty_folders(s3_setup, tmp_path, mock_aws_credentials): s3, bucket_name = s3_setup folder = "my-project" @@ -129,6 +132,7 @@ def test_push_pull_empty_folders(s3_setup, tmp_path, mock_aws_credentials): assert not (tmp_path / "empty1_copy").exists() assert not (tmp_path / "empty2_copy").exists() + def test_custom_credentials_and_client_parameters(s3_setup, tmp_files): s3, bucket_name = s3_setup folder = "my-project" @@ -141,6 +145,7 @@ def test_custom_credentials_and_client_parameters(s3_setup, tmp_files): custom_client_parameters = { "region_name": "us-west-1", + "config": {"signature_version": "s3v4"}, } os.chdir(tmp_files) @@ -194,7 +199,9 @@ def test_without_prefectignore_file(s3_setup, tmp_files: Path, mock_aws_credenti assert (tmp_path / file.name).exists() -def test_prefectignore_with_comments_and_empty_lines(s3_setup, tmp_files: Path, mock_aws_credentials): +def test_prefectignore_with_comments_and_empty_lines( + s3_setup, tmp_files: Path, mock_aws_credentials +): s3, bucket_name = s3_setup folder = "my-project" From 047be331a495410b89dcc9687a0a5ca05da5700a Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Tue, 4 Apr 2023 09:44:41 -0500 Subject: [PATCH 6/7] Attempts to fix windows tests --- prefect_aws/projects/steps.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/prefect_aws/projects/steps.py b/prefect_aws/projects/steps.py index 9c3fef6f..7ee7ab0f 100644 --- a/prefect_aws/projects/steps.py +++ b/prefect_aws/projects/steps.py @@ -164,8 +164,8 @@ def pull_project_from_s3( if obj.key[-1] == "/": # object is a folder and will be created if it contains any objects continue - target = local_path / PurePosixPath(obj.key).relative_to(folder) - Path.mkdir(target.parent, parents=True, exist_ok=True) + target = PurePosixPath(local_path / PurePosixPath(obj.key).relative_to(folder)) + Path.mkdir(Path(target.parent), parents=True, exist_ok=True) bucket_resource.download_file(obj.key, str(target)) return { From 561b59c37c9e2b98a93b8f88e1545bfb14be4989 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Tue, 4 Apr 2023 10:16:59 -0500 Subject: [PATCH 7/7] Attempts to fix windows tests --- prefect_aws/projects/steps.py | 6 ++++-- tests/projects/test_steps.py | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/prefect_aws/projects/steps.py b/prefect_aws/projects/steps.py index 7ee7ab0f..cb756d57 100644 --- a/prefect_aws/projects/steps.py +++ b/prefect_aws/projects/steps.py @@ -6,7 +6,7 @@ import boto3 from botocore.client import Config -from prefect.utilities.filesystem import filter_files +from prefect.utilities.filesystem import filter_files, relative_path_to_current_platform from typing_extensions import TypedDict @@ -164,7 +164,9 @@ def pull_project_from_s3( if obj.key[-1] == "/": # object is a folder and will be created if it contains any objects continue - target = PurePosixPath(local_path / PurePosixPath(obj.key).relative_to(folder)) + target = PurePosixPath( + local_path / relative_path_to_current_platform(obj.key).relative_to(folder) + ) Path.mkdir(Path(target.parent), parents=True, exist_ok=True) bucket_resource.download_file(obj.key, str(target)) diff --git a/tests/projects/test_steps.py b/tests/projects/test_steps.py index c2833663..5d907ff4 100644 --- a/tests/projects/test_steps.py +++ b/tests/projects/test_steps.py @@ -1,5 +1,5 @@ import os -from pathlib import Path, PurePosixPath +from pathlib import Path, PurePath, PurePosixPath import boto3 import pytest @@ -67,7 +67,7 @@ def test_push_project_to_s3(s3_setup, tmp_files, mock_aws_credentials): push_project_to_s3(bucket_name, folder) s3_objects = s3.list_objects_v2(Bucket=bucket_name) - object_keys = [item["Key"] for item in s3_objects["Contents"]] + object_keys = [PurePath(item["Key"]).as_posix() for item in s3_objects["Contents"]] expected_keys = [ f"{folder}/testfile1.txt",