Skip to content

Commit

Permalink
add changes to event config lambda to support sending to SQS queues
Browse files Browse the repository at this point in the history
  • Loading branch information
Rixing Xu committed Sep 5, 2023
1 parent 6c29f29 commit f5c5f81
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 17 deletions.
3 changes: 2 additions & 1 deletion config/develop/namespaced/s3-event-config-lambda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig'
stack_tags: {{ stack_group_config.default_stack_tags }}
parameters:
Namespace: {{ stack_group_config.namespace }}
S3ToGlueFunctionArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-S3ToGlue::S3ToGlueFunctionArn"
S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-S3ToGlue::S3ToGlueFunctionArn"
S3ToGlueDestinationType: "Queue"
S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn"
S3SourceBucketName: {{ stack_group_config.input_bucket_name }}
9 changes: 8 additions & 1 deletion src/lambda_function/s3_event_config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ The s3_event_config lambda is triggered by a github action during deployment or

It will then put a S3 event notification configuration into
the input data bucket which allows the input data bucket to
trigger the S3 to JSON lambda with S3 new object notifications whenever new objects are added
trigger a specific destination type with S3 new object notifications whenever new objects are added
to it and eventually lead to the start of the S3-to-JSON workflow.

Currently **only** the following destination types are supported:

- Lambda Function
- SQS queue

## Event format

The events that will trigger the s3-event-config-lambda
Expand All @@ -23,6 +28,8 @@ Where the allowed RequestType values are:
- "Update"
- "Delete"

You can test the lambda by going to the AWS console for the lambda function, pasting the above sample event in and triggering the function. Any updates should then be visible in the input bucket's event config to confirm it was successful.

## Launching Lambda stack in AWS

There are two main stacks involved in the s3_event_config lambda. They are the
Expand Down
15 changes: 9 additions & 6 deletions src/lambda_function/s3_event_config/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def lambda_handler(event, context):
logger.info(f'Request Type: {event["RequestType"]}')
add_notification(
s3,
lambda_arn=os.environ["S3_TO_GLUE_FUNCTION_ARN"],
destination_arn=os.environ["S3_TO_GLUE_DESTINATION_ARN"],
destination_type=os.environ["S3_TO_GLUE_DESTINATION_TYPE"],
bucket=os.environ["S3_SOURCE_BUCKET_NAME"],
bucket_key_prefix=os.environ["BUCKET_KEY_PREFIX"],
)
Expand All @@ -41,29 +42,31 @@ def lambda_handler(event, context):

def add_notification(
s3_client: boto3.client,
lambda_arn: str,
destination_type : str,
destination_arn: str,
bucket: str,
bucket_key_prefix: str,
):
"""Adds the S3 notification configuration to an existing bucket
Args:
s3_client (boto3.client) : s3 client to use for s3 event config
lambda_arn (str): Arn of the lambda s3 event config function
destination_type (str): String name of the destination type for the configuration
destination_arn (str): Arn of the destination's s3 event config
bucket (str): bucket name of the s3 bucket to add the config to
bucket_key_prefix (str): bucket key prefix for where to look for s3 object notifications
"""
s3_client.put_bucket_notification_configuration(
Bucket=bucket,
NotificationConfiguration={
"LambdaFunctionConfigurations": [
f"{destination_type}Configurations": [
{
"LambdaFunctionArn": lambda_arn,
f"{destination_type}Arn": destination_arn,
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {
"FilterRules": [
{"Name": "prefix", "Value": bucket_key_prefix}
{"Name": "prefix", "Value": f"{bucket_key_prefix}/"}
]
}
},
Expand Down
11 changes: 8 additions & 3 deletions src/lambda_function/s3_event_config/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ Parameters:
Description: >-
The namespace string used for the bucket key prefix
S3ToGlueFunctionArn:
S3ToGlueDestinationArn:
Type: String
Description: Arn for the S3 Event Config Lambda Function
Description: Arn for the S3 Event Config Destination

S3ToGlueDestinationType:
Type: String
Description: The S3 Event Config Destination Type

S3EventConfigRoleArn:
Type: String
Expand Down Expand Up @@ -43,7 +47,8 @@ Resources:
Environment:
Variables:
S3_SOURCE_BUCKET_NAME: !Ref S3SourceBucketName
S3_TO_GLUE_FUNCTION_ARN: !Ref S3ToGlueFunctionArn
S3_TO_GLUE_DESTINATION_ARN: !Ref S3ToGlueDestinationArn
S3_TO_GLUE_DESTINATION_TYPE: !Ref S3ToGlueDestinationType
BUCKET_KEY_PREFIX: !Ref Namespace

Outputs:
Expand Down
63 changes: 57 additions & 6 deletions tests/test_s3_event_config_lambda.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import zipfile
import io
import boto3
from moto import mock_s3, mock_lambda, mock_iam, mock_logs
from moto import mock_s3, mock_lambda, mock_iam, mock_sqs
import pytest

from src.lambda_function.s3_event_config import app


@pytest.fixture(scope="function")
def mock_iam_role(mock_aws_credentials):
def mock_iam_role():
with mock_iam():
iam = boto3.client("iam")
yield iam.create_role(
Expand All @@ -19,7 +19,7 @@ def mock_iam_role(mock_aws_credentials):


@pytest.fixture(scope="function")
def mock_lambda_function(mock_aws_credentials, mock_iam_role):
def mock_lambda_function(mock_iam_role):
with mock_lambda():
client = boto3.client("lambda")
client.create_function(
Expand All @@ -31,11 +31,23 @@ def mock_lambda_function(mock_aws_credentials, mock_iam_role):
yield client.get_function(FunctionName="some_function")


@pytest.fixture(scope="function")
def mock_sqs_queue(mock_aws_credentials):
with mock_sqs():
client = boto3.client("sqs")
client.create_queue(QueueName="test_sqs")
queue_url = client.get_queue_url(QueueName="test_sqs")
yield client.get_queue_attributes(
QueueUrl=queue_url["QueueUrl"], AttributeNames=["QueueArn"]
)


@mock_s3
def test_that_add_notification_adds_expected_settings(s3, mock_lambda_function):
def test_that_add_notification_adds_expected_settings_for_lambda(s3, mock_lambda_function):
s3.create_bucket(Bucket="some_bucket")
set_config = app.add_notification(
s3,
"LambdaFunction",
mock_lambda_function["Configuration"]["FunctionArn"],
"some_bucket",
"test_folder",
Expand All @@ -49,19 +61,58 @@ def test_that_add_notification_adds_expected_settings(s3, mock_lambda_function):
"s3:ObjectCreated:*"
]
assert get_config["LambdaFunctionConfigurations"][0]["Filter"] == {
"Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder"}]}
"Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]}
}


@mock_s3
def test_that_delete_notification_is_successful(s3, mock_lambda_function):
def test_that_delete_notification_is_successful_for_lambda(s3, mock_lambda_function):
s3.create_bucket(Bucket="some_bucket")
app.add_notification(
s3,
"LambdaFunction",
mock_lambda_function["Configuration"]["FunctionArn"],
"some_bucket",
"test_folder",
)
app.delete_notification(s3, "some_bucket")
get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket")
assert "LambdaFunctionConfigurations" not in get_config


@mock_s3
def test_that_add_notification_adds_expected_settings_for_sqs(s3, mock_sqs_queue):
s3.create_bucket(Bucket="some_bucket")
set_config = app.add_notification(
s3,
"Queue",
mock_sqs_queue['Attributes']['QueueArn'],
"some_bucket",
"test_folder",
)
get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket")
assert (
get_config["QueueConfigurations"][0]["QueueArn"]
== mock_sqs_queue['Attributes']['QueueArn']
)
assert get_config["QueueConfigurations"][0]["Events"] == [
"s3:ObjectCreated:*"
]
assert get_config["QueueConfigurations"][0]["Filter"] == {
"Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]}
}


@mock_s3
def test_that_delete_notification_is_successful_for_sqs(s3, mock_sqs_queue):
s3.create_bucket(Bucket="some_bucket")
app.add_notification(
s3,
"Queue",
mock_sqs_queue['Attributes']['QueueArn'],
"some_bucket",
"test_folder",
)
app.delete_notification(s3, "some_bucket")
get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket")
assert "QueueConfigurations" not in get_config

0 comments on commit f5c5f81

Please sign in to comment.