Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Adds push and pull project steps for S3 #229

Merged
merged 7 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Support for unsigned AWS requests - [#220](https://github.com/PrefectHQ/prefect-aws/pull/220)
- Added push and pull project steps for S3 = [#229](https://github.com/PrefectHQ/prefect-aws/pull/229)

### Changed

Expand Down
6 changes: 6 additions & 0 deletions docs/projects/steps.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
description: Prefect project steps for managing project code storage via AWS S3.
notes: This documentation page is generated from source file docstrings.
---

::: prefect_aws.projects.steps
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ nav:
- Client Waiter: client_waiter.md
- Credentials: credentials.md
- ECS: ecs.md
- Projects:
- Steps: projects/steps.md
- S3: s3.md
- Secrets Manager: secrets_manager.md

Expand Down
Empty file.
177 changes: 177 additions & 0 deletions prefect_aws/projects/steps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
"""
Prefect project steps for code storage and retrieval in S3 and S3 compatible services.
"""
from pathlib import Path, PurePosixPath
from typing import Dict, Optional

import boto3
from botocore.client import Config
from prefect.utilities.filesystem import filter_files, relative_path_to_current_platform
from typing_extensions import TypedDict


class PushProjectToS3Output(TypedDict):
"""
The output of the `push_project_to_s3` step.
"""

bucket: str
folder: str


class PullProjectFromS3Output(TypedDict):
"""
The output of the `pull_project_from_s3` step.
"""

bucket: str
folder: str
directory: str


def push_project_to_s3(
bucket: str,
folder: str,
credentials: Optional[Dict] = None,
client_parameters: Optional[Dict] = None,
ignore_file: Optional[str] = ".prefectignore",
) -> PushProjectToS3Output:
"""
Pushes the contents of the current working directory to an S3 bucket,
excluding files and folders specified in the ignore_file.
Args:
bucket: The name of the S3 bucket where the project files will be uploaded.
folder: The folder in the S3 bucket where the project files will be uploaded.
credentials: A dictionary of AWS credentials (aws_access_key_id,
aws_secret_access_key, aws_session_token).
client_parameters: A dictionary of additional parameters to pass to the boto3
client.
ignore_file: The name of the file containing ignore patterns.
Returns:
A dictionary containing the bucket and folder where the project was uploaded.
Examples:
Push a project to an S3 bucket:
```yaml
build:
- prefect_aws.projects.steps.push_project_to_s3:
requires: prefect-aws
bucket: my-bucket
folder: my-project
```
Push a project to an S3 bucket using credentials stored in a block:
```yaml
build:
- prefect_aws.projects.steps.push_project_to_s3:
requires: prefect-aws
bucket: my-bucket
folder: my-project
credentials: "{{ prefect.blocks.aws-credentials.dev-credentials }}"
```
"""
if credentials is None:
credentials = {}
if client_parameters is None:
client_parameters = {}
advanced_config = client_parameters.pop("config", {})
client = boto3.client(
"s3", **credentials, **client_parameters, config=Config(**advanced_config)
)

local_path = Path.cwd()

included_files = None
if ignore_file and Path(ignore_file).exists():
with open(ignore_file, "r") as f:
ignore_patterns = f.readlines()

included_files = filter_files(str(local_path), ignore_patterns)

for local_file_path in local_path.expanduser().rglob("*"):
if (
included_files is not None
and str(local_file_path.relative_to(local_path)) not in included_files
):
continue
elif not local_file_path.is_dir():
remote_file_path = Path(folder) / local_file_path.relative_to(local_path)
client.upload_file(str(local_file_path), bucket, str(remote_file_path))

return {
"bucket": bucket,
"folder": folder,
}


def pull_project_from_s3(
bucket: str,
folder: str,
credentials: Optional[Dict] = None,
client_parameters: Optional[Dict] = None,
) -> PullProjectFromS3Output:
"""
Pulls the contents of a project from an S3 bucket to the current working directory.
Args:
bucket: The name of the S3 bucket where the project files are stored.
folder: The folder in the S3 bucket where the project files are stored.
credentials: A dictionary of AWS credentials (aws_access_key_id,
aws_secret_access_key, aws_session_token).
client_parameters: A dictionary of additional parameters to pass to the
boto3 client.
Returns:
A dictionary containing the bucket, folder, and local directory where the
project files were downloaded.
Examples:
Pull a project from S3 using the default credentials and client parameters:
```yaml
build:
- prefect_aws.projects.steps.pull_project_from_s3:
requires: prefect-aws
bucket: my-bucket
folder: my-project
```
Pull a project from S3 using credentials stored in a block:
```yaml
build:
- prefect_aws.projects.steps.pull_project_from_s3:
requires: prefect-aws
bucket: my-bucket
folder: my-project
credentials: "{{ prefect.blocks.aws-credentials.dev-credentials }}"
```
"""
if credentials is None:
credentials = {}
if client_parameters is None:
client_parameters = {}
advanced_config = client_parameters.pop("config", {})
bucket_resource = (
boto3.Session(**credentials)
.resource("s3", **client_parameters, config=Config(**advanced_config))
.Bucket(bucket)
)

local_path = Path.cwd()
for obj in bucket_resource.objects.filter(Prefix=folder):
if obj.key[-1] == "/":
# object is a folder and will be created if it contains any objects
continue
target = PurePosixPath(
local_path / relative_path_to_current_platform(obj.key).relative_to(folder)
)
Path.mkdir(Path(target.parent), parents=True, exist_ok=True)
bucket_resource.download_file(obj.key, str(target))

return {
"bucket": bucket,
"folder": folder,
"directory": str(local_path),
}
Loading