From f5c5f811de121daf4c6604b246c1760a0837e3b6 Mon Sep 17 00:00:00 2001 From: Rixing Xu Date: Tue, 5 Sep 2023 13:55:45 -0700 Subject: [PATCH] add changes to event config lambda to support sending to SQS queues --- .../namespaced/s3-event-config-lambda.yaml | 3 +- src/lambda_function/s3_event_config/README.md | 9 ++- src/lambda_function/s3_event_config/app.py | 15 +++-- .../s3_event_config/template.yaml | 11 +++- tests/test_s3_event_config_lambda.py | 63 +++++++++++++++++-- 5 files changed, 84 insertions(+), 17 deletions(-) diff --git a/config/develop/namespaced/s3-event-config-lambda.yaml b/config/develop/namespaced/s3-event-config-lambda.yaml index 98f29daf..ad6442e0 100644 --- a/config/develop/namespaced/s3-event-config-lambda.yaml +++ b/config/develop/namespaced/s3-event-config-lambda.yaml @@ -10,6 +10,7 @@ stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig' stack_tags: {{ stack_group_config.default_stack_tags }} parameters: Namespace: {{ stack_group_config.namespace }} - S3ToGlueFunctionArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-S3ToGlue::S3ToGlueFunctionArn" + S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-S3ToGlue::S3ToGlueFunctionArn" + S3ToGlueDestinationType: "Queue" S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} diff --git a/src/lambda_function/s3_event_config/README.md b/src/lambda_function/s3_event_config/README.md index ccef795e..ca748f7d 100644 --- a/src/lambda_function/s3_event_config/README.md +++ b/src/lambda_function/s3_event_config/README.md @@ -4,9 +4,14 @@ The s3_event_config lambda is triggered by a github action during deployment or It will then put a S3 event notification configuration into the input data bucket which allows the input data bucket to -trigger the S3 to JSON lambda with S3 new object notifications whenever new objects are added +trigger a specific destination type with S3 new object notifications whenever new objects are added to it and eventually lead to the start of the S3-to-JSON workflow. +Currently **only** the following destination types are supported: + +- Lambda Function +- SQS queue + ## Event format The events that will trigger the s3-event-config-lambda @@ -23,6 +28,8 @@ Where the allowed RequestType values are: - "Update" - "Delete" +You can test the lambda by going to the AWS console for the lambda function, pasting the above sample event in and triggering the function. Any updates should then be visible in the input bucket's event config to confirm it was successful. + ## Launching Lambda stack in AWS There are two main stacks involved in the s3_event_config lambda. They are the diff --git a/src/lambda_function/s3_event_config/app.py b/src/lambda_function/s3_event_config/app.py index 5fb83ea4..307b2ce2 100644 --- a/src/lambda_function/s3_event_config/app.py +++ b/src/lambda_function/s3_event_config/app.py @@ -29,7 +29,8 @@ def lambda_handler(event, context): logger.info(f'Request Type: {event["RequestType"]}') add_notification( s3, - lambda_arn=os.environ["S3_TO_GLUE_FUNCTION_ARN"], + destination_arn=os.environ["S3_TO_GLUE_DESTINATION_ARN"], + destination_type=os.environ["S3_TO_GLUE_DESTINATION_TYPE"], bucket=os.environ["S3_SOURCE_BUCKET_NAME"], bucket_key_prefix=os.environ["BUCKET_KEY_PREFIX"], ) @@ -41,7 +42,8 @@ def lambda_handler(event, context): def add_notification( s3_client: boto3.client, - lambda_arn: str, + destination_type : str, + destination_arn: str, bucket: str, bucket_key_prefix: str, ): @@ -49,21 +51,22 @@ def add_notification( Args: s3_client (boto3.client) : s3 client to use for s3 event config - lambda_arn (str): Arn of the lambda s3 event config function + destination_type (str): String name of the destination type for the configuration + destination_arn (str): Arn of the destination's s3 event config bucket (str): bucket name of the s3 bucket to add the config to bucket_key_prefix (str): bucket key prefix for where to look for s3 object notifications """ s3_client.put_bucket_notification_configuration( Bucket=bucket, NotificationConfiguration={ - "LambdaFunctionConfigurations": [ + f"{destination_type}Configurations": [ { - "LambdaFunctionArn": lambda_arn, + f"{destination_type}Arn": destination_arn, "Events": ["s3:ObjectCreated:*"], "Filter": { "Key": { "FilterRules": [ - {"Name": "prefix", "Value": bucket_key_prefix} + {"Name": "prefix", "Value": f"{bucket_key_prefix}/"} ] } }, diff --git a/src/lambda_function/s3_event_config/template.yaml b/src/lambda_function/s3_event_config/template.yaml index 467d2754..e8487f51 100644 --- a/src/lambda_function/s3_event_config/template.yaml +++ b/src/lambda_function/s3_event_config/template.yaml @@ -11,9 +11,13 @@ Parameters: Description: >- The namespace string used for the bucket key prefix - S3ToGlueFunctionArn: + S3ToGlueDestinationArn: Type: String - Description: Arn for the S3 Event Config Lambda Function + Description: Arn for the S3 Event Config Destination + + S3ToGlueDestinationType: + Type: String + Description: The S3 Event Config Destination Type S3EventConfigRoleArn: Type: String @@ -43,7 +47,8 @@ Resources: Environment: Variables: S3_SOURCE_BUCKET_NAME: !Ref S3SourceBucketName - S3_TO_GLUE_FUNCTION_ARN: !Ref S3ToGlueFunctionArn + S3_TO_GLUE_DESTINATION_ARN: !Ref S3ToGlueDestinationArn + S3_TO_GLUE_DESTINATION_TYPE: !Ref S3ToGlueDestinationType BUCKET_KEY_PREFIX: !Ref Namespace Outputs: diff --git a/tests/test_s3_event_config_lambda.py b/tests/test_s3_event_config_lambda.py index 5ccd5e7f..cbe1032e 100644 --- a/tests/test_s3_event_config_lambda.py +++ b/tests/test_s3_event_config_lambda.py @@ -1,14 +1,14 @@ import zipfile import io import boto3 -from moto import mock_s3, mock_lambda, mock_iam, mock_logs +from moto import mock_s3, mock_lambda, mock_iam, mock_sqs import pytest from src.lambda_function.s3_event_config import app @pytest.fixture(scope="function") -def mock_iam_role(mock_aws_credentials): +def mock_iam_role(): with mock_iam(): iam = boto3.client("iam") yield iam.create_role( @@ -19,7 +19,7 @@ def mock_iam_role(mock_aws_credentials): @pytest.fixture(scope="function") -def mock_lambda_function(mock_aws_credentials, mock_iam_role): +def mock_lambda_function(mock_iam_role): with mock_lambda(): client = boto3.client("lambda") client.create_function( @@ -31,11 +31,23 @@ def mock_lambda_function(mock_aws_credentials, mock_iam_role): yield client.get_function(FunctionName="some_function") +@pytest.fixture(scope="function") +def mock_sqs_queue(mock_aws_credentials): + with mock_sqs(): + client = boto3.client("sqs") + client.create_queue(QueueName="test_sqs") + queue_url = client.get_queue_url(QueueName="test_sqs") + yield client.get_queue_attributes( + QueueUrl=queue_url["QueueUrl"], AttributeNames=["QueueArn"] + ) + + @mock_s3 -def test_that_add_notification_adds_expected_settings(s3, mock_lambda_function): +def test_that_add_notification_adds_expected_settings_for_lambda(s3, mock_lambda_function): s3.create_bucket(Bucket="some_bucket") set_config = app.add_notification( s3, + "LambdaFunction", mock_lambda_function["Configuration"]["FunctionArn"], "some_bucket", "test_folder", @@ -49,15 +61,16 @@ def test_that_add_notification_adds_expected_settings(s3, mock_lambda_function): "s3:ObjectCreated:*" ] assert get_config["LambdaFunctionConfigurations"][0]["Filter"] == { - "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder"}]} + "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} } @mock_s3 -def test_that_delete_notification_is_successful(s3, mock_lambda_function): +def test_that_delete_notification_is_successful_for_lambda(s3, mock_lambda_function): s3.create_bucket(Bucket="some_bucket") app.add_notification( s3, + "LambdaFunction", mock_lambda_function["Configuration"]["FunctionArn"], "some_bucket", "test_folder", @@ -65,3 +78,41 @@ def test_that_delete_notification_is_successful(s3, mock_lambda_function): app.delete_notification(s3, "some_bucket") get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") assert "LambdaFunctionConfigurations" not in get_config + + +@mock_s3 +def test_that_add_notification_adds_expected_settings_for_sqs(s3, mock_sqs_queue): + s3.create_bucket(Bucket="some_bucket") + set_config = app.add_notification( + s3, + "Queue", + mock_sqs_queue['Attributes']['QueueArn'], + "some_bucket", + "test_folder", + ) + get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + assert ( + get_config["QueueConfigurations"][0]["QueueArn"] + == mock_sqs_queue['Attributes']['QueueArn'] + ) + assert get_config["QueueConfigurations"][0]["Events"] == [ + "s3:ObjectCreated:*" + ] + assert get_config["QueueConfigurations"][0]["Filter"] == { + "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} + } + + +@mock_s3 +def test_that_delete_notification_is_successful_for_sqs(s3, mock_sqs_queue): + s3.create_bucket(Bucket="some_bucket") + app.add_notification( + s3, + "Queue", + mock_sqs_queue['Attributes']['QueueArn'], + "some_bucket", + "test_folder", + ) + app.delete_notification(s3, "some_bucket") + get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + assert "QueueConfigurations" not in get_config