Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(event_source): Add support for S3 batch operations #3572

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9d8e9a4
Add support for S3 Batch Operations event and response along with uni…
sbailliez Dec 29, 2023
319a048
Add documentation with example based on the AWS S3 documentation
sbailliez Dec 29, 2023
d834c96
Use unquote_plus and add unit test for key encoded with space
sbailliez Dec 30, 2023
0045c06
Merge branch 'aws-powertools:develop' into feat/s3-batch-operations
sbailliez Dec 30, 2023
7af9a73
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena Jan 2, 2024
f3955eb
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena Jan 2, 2024
da5105a
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena Jan 4, 2024
eee9536
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena Jan 10, 2024
688e746
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena Jan 15, 2024
a401507
Initial refactor
leandrodamascena Jan 16, 2024
bd29d0a
Changing the DX to improve usability
leandrodamascena Jan 16, 2024
dfb4618
Documentation
leandrodamascena Jan 16, 2024
b778609
Adding parser
leandrodamascena Jan 16, 2024
fe5424f
Small refactor
leandrodamascena Jan 16, 2024
b4996a3
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena Jan 16, 2024
c49adb7
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena Jan 17, 2024
deb270a
Addressing Ruben's feedback - Docs and examples
leandrodamascena Jan 17, 2024
12e81d4
Addressing Ruben's feedback - Docs and examples
leandrodamascena Jan 17, 2024
57681c4
Addressing Ruben's feedback - Code
leandrodamascena Jan 17, 2024
a99594f
Addressing Ruben's feedback - Code
leandrodamascena Jan 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions aws_lambda_powertools/utilities/data_classes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
)
from .kinesis_stream_event import KinesisStreamEvent
from .lambda_function_url_event import LambdaFunctionUrlEvent
from .s3_batch_operation_event import (
S3BatchOperationEvent,
S3BatchOperationResponse,
S3BatchOperationResponseRecord,
)
from .s3_event import S3Event, S3EventBridgeNotificationEvent
from .secrets_manager_event import SecretsManagerEvent
from .ses_event import SESEvent
Expand Down Expand Up @@ -52,6 +57,9 @@
"LambdaFunctionUrlEvent",
"S3Event",
"S3EventBridgeNotificationEvent",
"S3BatchOperationEvent",
"S3BatchOperationResponse",
"S3BatchOperationResponseRecord",
"SESEvent",
"SNSEvent",
"SQSEvent",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
import warnings
from dataclasses import dataclass, field
from typing import Any, Dict, Iterator, List, Optional, Tuple
from urllib.parse import unquote_plus

from aws_lambda_powertools.shared.types import Literal
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper

# list of valid result code. Used both in S3BatchOperationResponse and S3BatchOperationResponseRecord
VALID_RESULT_CODE_TYPES: Tuple[str, str, str] = ("Succeeded", "TemporaryFailure", "PermanentFailure")


@dataclass(repr=False, order=False)
class S3BatchOperationResponseRecord:
task_id: str
result_code: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"]
rubenfonseca marked this conversation as resolved.
Show resolved Hide resolved
result_string: Optional[str] = None

def asdict(self) -> Dict[str, Any]:
if self.result_code not in VALID_RESULT_CODE_TYPES:
warnings.warn(
stacklevel=2,
message=f"The resultCode {self.result_code} is not valid. "
"Choose from 'Ok', 'Dropped' or 'ProcessingFailed '",
rubenfonseca marked this conversation as resolved.
Show resolved Hide resolved
)

return {
"taskId": self.task_id,
"resultCode": self.result_code,
"resultString": self.result_string,
}


@dataclass(repr=False, order=False)
class S3BatchOperationResponse:
"""S3 Batch Operations response object

Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/services-s3-batch.html
- https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html#batch-ops-invoke-lambda-custom-functions
- https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-InvocationSchemaVersion

Parameters
----------
invocation_schema_version : str
Specifies the schema version for the payload that Batch Operations sends when invoking
an AWS Lambda function., either '1.0' or '2.0'. This must be copied from the event.

invocation_id : str
The identifier of the invocation request. This must be copied from the event.

treat_missing_keys_as : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"]
undocumented parameter, defaults to "PermanentFailure"
rubenfonseca marked this conversation as resolved.
Show resolved Hide resolved

results : List[S3BatchOperationResult]
results of each S3 Batch Operations task,
rubenfonseca marked this conversation as resolved.
Show resolved Hide resolved
optional parameter at start. can be added later using `add_result` function.
rubenfonseca marked this conversation as resolved.
Show resolved Hide resolved

Examples
--------

**S3 Batch Operations**

```python
import boto3

from botocore.exceptions import ClientError

from aws_lambda_powertools.utilities.data_classes import (
S3BatchOperationEvent,
S3BatchOperationResponse,
event_source
)
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=S3BatchOperationEvent)
def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext):
response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id, "PermanentFailure")

result = None
task = event.task
src_key: str = task.s3_key
src_bucket: str = task.s3_bucket

s3 = boto3.client("s3", region_name='us-east-1')

try:
dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key)
result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}")
except ClientError as e:
error_code = e.response['Error']['Code']
error_message = e.response['Error']['Message']
if error_code == 'RequestTimeout':
result = task.build_task_batch_response("TemporaryFailure", "Retry request to Amazon S3 due to timeout.")
else:
result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}")
except Exception as e:
result = task.build_task_batch_response("PermanentFailure", str(e))
finally:
response.add_result(result)

return response.asdict()

rubenfonseca marked this conversation as resolved.
Show resolved Hide resolved
```
""" # noqa: E501
rubenfonseca marked this conversation as resolved.
Show resolved Hide resolved

invocation_schema_version: str
invocation_id: str
treat_missing_keys_as: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded"
results: List[S3BatchOperationResponseRecord] = field(default_factory=list)

def __post_init__(self):
if self.treat_missing_keys_as not in VALID_RESULT_CODE_TYPES:
warnings.warn(
stacklevel=2,
message=f"The value {self.treat_missing_keys_as} is not valid for treat_missing_keys_as, "
"Choose from 'Succeeded', 'TemporaryFailure', 'PermanentFailure'",
leandrodamascena marked this conversation as resolved.
Show resolved Hide resolved
)

def add_result(self, result: S3BatchOperationResponseRecord):
self.results.append(result)

def asdict(self) -> Dict:
result_count = len(self.results)

if result_count != 1:
raise ValueError(f"Response must have exactly one result, but got {result_count}")

return {
"invocationSchemaVersion": self.invocation_schema_version,
"treatMissingKeysAs": self.treat_missing_keys_as,
"invocationId": self.invocation_id,
"results": [result.asdict() for result in self.results],
}


class S3BatchOperationJob(DictWrapper):
@property
def get_id(self) -> str:
# Note: this name conflicts with existing python builtins
return self["id"]

@property
def user_arguments(self) -> Optional[Dict[str, str]]:
"""Get user arguments provided for this job (only for invocation schema 2.0)"""
return self.get("userArguments")


class S3BatchOperationTask(DictWrapper):
@property
def task_id(self) -> str:
"""Get the task id"""
return self["taskId"]

@property
def s3_key(self) -> str:
"""Get the object key using unquote_plus"""
return unquote_plus(self["s3Key"])

@property
def s3_version_id(self) -> Optional[str]:
"""Object version if bucket is versioning-enabled, otherwise null"""
return self.get("s3VersionId")

@property
def s3_bucket_arn(self) -> Optional[str]:
"""Get the s3 bucket arn (present only for invocationSchemaVersion '1.0')"""
return self.get("s3BucketArn")

@property
def s3_bucket(self) -> str:
""" "
leandrodamascena marked this conversation as resolved.
Show resolved Hide resolved
Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0')
or from 's3BucketArn' (invocationSchemaVersion '1.0')
"""
if self.s3_bucket_arn:
return self.s3_bucket_arn.split(":::")[-1]
return self["s3Bucket"]

def build_task_batch_response(
self,
result_code: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded",
result_string: str = "",
) -> S3BatchOperationResponseRecord:
"""Create a S3BatchOperationResponseRecord directly using the record_id and given values
leandrodamascena marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
result_code : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded"
task result, supported value: "Succeeded", "TemporaryFailure", "PermanentFailure"
result_string : str
string to identify in the report
"""
return S3BatchOperationResponseRecord(
task_id=self.task_id,
result_code=result_code,
result_string=result_string,
)


class S3BatchOperationEvent(DictWrapper):
"""Amazon S3BatchOperation Event

Documentation:
--------------
- https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html
"""

@property
def invocation_id(self) -> str:
"""Get the identifier of the invocation request"""
return self["invocationId"]

@property
def invocation_schema_version(self) -> Literal["1.0", "2.0"]:
""" "
rubenfonseca marked this conversation as resolved.
Show resolved Hide resolved
Get the schema version for the payload that Batch Operations sends when invoking an
AWS Lambda function. Either '1.0' or '2.0'.
"""
return self["invocationSchemaVersion"]

@property
def tasks(self) -> Iterator[S3BatchOperationTask]:
leandrodamascena marked this conversation as resolved.
Show resolved Hide resolved
for task in self["tasks"]:
yield S3BatchOperationTask(task)

@property
def task(self) -> S3BatchOperationTask:
"""Get the first s3 batch operation task"""
return next(self.tasks)

@property
def job(self) -> S3BatchOperationJob:
"""Get the s3 batch operation job"""
return S3BatchOperationJob(self["job"])
4 changes: 4 additions & 0 deletions aws_lambda_powertools/utilities/parser/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
S3Model,
S3RecordModel,
)
from .s3_batch_operation import S3BatchOperationJobModel, S3BatchOperationModel, S3BatchOperationTaskModel
from .s3_event_notification import (
S3SqsEventNotificationModel,
S3SqsEventNotificationRecordModel,
Expand Down Expand Up @@ -177,4 +178,7 @@
"BedrockAgentEventModel",
"BedrockAgentRequestBodyModel",
"BedrockAgentRequestMediaModel",
"S3BatchOperationJobModel",
"S3BatchOperationModel",
"S3BatchOperationTaskModel",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, validator

from aws_lambda_powertools.utilities.parser.types import Literal


class S3BatchOperationTaskModel(BaseModel):
taskId: str
s3Key: str
s3VersionId: Optional[str] = None
s3BucketArn: Optional[str] = None
s3Bucket: Optional[str] = None

@validator("s3Bucket", pre=True, always=True)
def validate_bucket(cls, current_value, values):
# Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0')
# or from 's3BucketArn' (invocationSchemaVersion '1.0')
if values.get("s3BucketArn") and not current_value:
# Replace s3Bucket value with the value from s3BucketArn
return values["s3BucketArn"].split(":::")[-1]
return current_value


class S3BatchOperationJobModel(BaseModel):
id: str
userArguments: Optional[Dict[str, Any]] = None


class S3BatchOperationModel(BaseModel):
invocationId: str
invocationSchemaVersion: Literal["1.0", "2.0"]
job: S3BatchOperationJobModel
tasks: List[S3BatchOperationTaskModel]
51 changes: 50 additions & 1 deletion docs/utilities/data_classes.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Log Data Event for Troubleshooting
## Supported event sources

| Event Source | Data_class |
| ------------------------------------------------------------------------- | -------------------------------------------------- |
|---------------------------------------------------------------------------|----------------------------------------------------|
| [Active MQ](#active-mq) | `ActiveMQEvent` |
| [API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent` |
| [API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2` |
Expand All @@ -99,6 +99,7 @@ Log Data Event for Troubleshooting
| [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` |
| [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` |
| [S3](#s3) | `S3Event` |
| [S3 Batch Operations](#s3-batch-operations) | `S3BatchOperationEvent` |
| [S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent` |
| [S3 EventBridge Notification](#s3-eventbridge-notification) | `S3EventBridgeNotificationEvent` |
| [SES](#ses) | `SESEvent` |
Expand Down Expand Up @@ -1076,6 +1077,54 @@ for more details.
do_something_with(f"{bucket_name}/{object_key}")
```

### S3 Batch Operations

This example is based on the AWS S3 Batch Operations documentation [Example Lambda function for S3 Batch Operations](https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html){target="_blank"}.

=== "app.py"

```python hl_lines="5-8 13 15 26 31 33 35"
import boto3

from botocore.exceptions import ClientError

from aws_lambda_powertools.utilities.data_classes import (
S3BatchOperationEvent,
S3BatchOperationResponse,
event_source
)
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=S3BatchOperationEvent)
def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext):
response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id, "PermanentFailure")

result = None
task = event.task
src_key: str = task.s3_key
src_bucket: str = task.s3_bucket

s3 = boto3.client("s3", region_name='us-east-1')

try:
dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key)
result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}")
except ClientError as e:
error_code = e.response['Error']['Code']
error_message = e.response['Error']['Message']
if error_code == 'RequestTimeout':
result = task.build_task_batch_response("TemporaryFailure", "Retry request to Amazon S3 due to timeout.")
else:
result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}")
except Exception as e:
result = task.build_task_batch_response("PermanentFailure", str(e))
finally:
response.add_result(result)

return response.asdict()
```
leandrodamascena marked this conversation as resolved.
Show resolved Hide resolved

### S3 Object Lambda

This example is based on the AWS Blog post [Introducing Amazon S3 Object Lambda – Use Your Code to Process Data as It Is Being Retrieved from S3](https://aws.amazon.com/blogs/aws/introducing-amazon-s3-object-lambda-use-your-code-to-process-data-as-it-is-being-retrieved-from-s3/){target="_blank"}.
Expand Down
1 change: 1 addition & 0 deletions docs/utilities/parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ Parser comes with the following built-in models:
| **KinesisFirehoseModel** | Lambda Event Source payload for Amazon Kinesis Firehose |
| **KinesisFirehoseSqsModel** | Lambda Event Source payload for SQS messages wrapped in Kinesis Firehose records |
| **LambdaFunctionUrlModel** | Lambda Event Source payload for Lambda Function URL payload |
| **S3BatchOperationModel** | Lambda Event Source payload for Amazon S3 Batch Operation |
| **S3EventNotificationEventBridgeModel** | Lambda Event Source payload for Amazon S3 Event Notification to EventBridge. |
| **S3Model** | Lambda Event Source payload for Amazon S3 |
| **S3ObjectLambdaEvent** | Lambda Event Source payload for Amazon S3 Object Lambda |
Expand Down
Loading