Skip to content

Commit

Permalink
Update to account for destination ARN
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanFauble committed Oct 20, 2023
1 parent 8d41535 commit c3afdef
Show file tree
Hide file tree
Showing 2 changed files with 311 additions and 122 deletions.
238 changes: 178 additions & 60 deletions src/lambda_function/s3_event_config/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
input data S3 bucket set by the environment variable `S3_SOURCE_BUCKET_NAME`.
This Lambda app also has the option of deleting the notification configuration
from an S3 bucket
from an S3 bucket.
Only certain notification configurations work. Only `QueueConfigurations` are expected.
"""
import os
import json
import logging
import typing

import boto3

Expand Down Expand Up @@ -44,25 +47,111 @@ def lambda_handler(event, context):
raise KeyError(err_msg)


def get_existing_bucket_notification_configuration_and_type(
s3_client: boto3.client, bucket: str, destination_type: str
) -> typing.Tuple[dict, list]:
"""
Gets the existing bucket notification configuration and the existing notification
configurations for a specific destination type.
Arguments:
s3_client (boto3.client) : s3 client to use for s3 event config
bucket (str): bucket name of the s3 bucket to delete the config in
destination_type (str): String name of the destination type for the configuration
Returns:
Tuple: A bucket notifiction configuration,
and the notification configurations for a specific destination type
"""
existing_bucket_notification_configuration = (
s3_client.get_bucket_notification_configuration(Bucket=bucket)
)

# Remove ResponseMetadata because we don't want to persist it
existing_bucket_notification_configuration.pop("ResponseMetadata", None)

existing_notification_configurations_for_type = (
existing_bucket_notification_configuration.get(
f"{destination_type}Configurations", []
)
)

# Initialize this with an empty list to have consistent handling if it's present
# or missing.
if not existing_notification_configurations_for_type:
existing_bucket_notification_configuration[
f"{destination_type}Configurations"
] = []

return (
existing_bucket_notification_configuration,
existing_notification_configurations_for_type,
)


def get_matching_notification_configuration(
destination_type_arn: str,
existing_notification_configurations_for_type: list,
destination_arn: str,
) -> typing.Union[tuple[int, dict], tuple[None, None]]:
"""
Search through the list of existing notifications and find the one that has a key of
`destination_type_arn` and a value of `destination_arn`.
Arguments:
destination_type_arn (str): Key value for the destination type arn
existing_notification_configurations_for_type (list): The existing notification configs
destination_arn (str): Arn of the destination's s3 event config
Returns:
tuple: The index of the matching notification configuration and the matching
notification configuration or None, None if no match is found
"""
for index, existing_notification_configuration_for_type in enumerate(
existing_notification_configurations_for_type
):
if (
destination_type_arn in existing_notification_configuration_for_type
and existing_notification_configuration_for_type[destination_type_arn]
== destination_arn
):
return index, existing_notification_configuration_for_type
return None, None


def create_formatted_message(
bucket: str, destination_type: str, destination_arn: str
) -> str:
"""Creates a formatted message for logging purposes.
Arguments:
bucket (str): bucket name of the s3 bucket
destination_type (str): String name of the destination type for the configuration
destination_arn (str): Arn of the destination's s3 event config
Returns:
str: A formatted message
"""
return f"Bucket: {bucket}, DestinationType: {destination_type}, DestinationArn: {destination_arn}"


def add_notification(
s3_client: boto3.client,
destination_type: str,
destination_arn: str,
bucket: str,
bucket_key_prefix: str,
):
) -> None:
"""Adds the S3 notification configuration to an existing bucket.
Use cases:
1) If a bucket has no `NotificationConfiguration` then create the config
2) If a bucket has a `NotificationConfiguration` but no matching
"{destination_type}Configurations" then merge and add the config
"{destination_arn}" for the "{destination_type}" then add the config
3) If a bucket has a `NotificationConfiguration` and a matching
"{destination_type}Configurations":
3a) If the config is the same then do nothing - ordering of the dict
does not matter
3b) If the config is different then overwrite the matching
"{destination_type}Configurations"
"{destination_arn}" for the "{destination_type}":
3a) If the config is the same then do nothing
3b) If the config is different then overwrite the matching config
Args:
s3_client (boto3.client) : s3 client to use for s3 event config
Expand All @@ -71,88 +160,117 @@ def add_notification(
bucket (str): bucket name of the s3 bucket to add the config to
bucket_key_prefix (str): bucket key prefix for where to look for s3 object notifications
"""
existing_bucket_notification_configuration = (
s3_client.get_bucket_notification_configuration(Bucket=bucket)
)
existing_bucket_notification_configuration.pop("ResponseMetadata", None)
existing_notification_config_for_type = (
existing_bucket_notification_configuration.get(
f"{destination_type}Configurations", {}
)
)

new_notification_config = {
f"{destination_type}Configurations": [
{
f"{destination_type}Arn": destination_arn,
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {
"FilterRules": [
{"Name": "prefix", "Value": f"{bucket_key_prefix}/"}
]
}
},
update_required = False
destination_type_arn = f"{destination_type}Arn"
new_notification_configuration = {
destination_type_arn: destination_arn,
"Events": ["s3:ObjectCreated:*"],
"Filter": {
"Key": {
"FilterRules": [{"Name": "prefix", "Value": f"{bucket_key_prefix}/"}]
}
]
},
}

# If the configuration we want to add isn't there
# or is different from what we want to update it with
if not existing_notification_config_for_type or json.dumps(
existing_notification_config_for_type, sort_keys=True
) != json.dumps(
new_notification_config[f"{destination_type}Configurations"], sort_keys=True
):
merged_config = {
**existing_bucket_notification_configuration,
**new_notification_config,
}
(
existing_bucket_notification_configuration,
existing_notification_configurations_for_type,
) = get_existing_bucket_notification_configuration_and_type(
s3_client, bucket, destination_type
)

(
index_of_matching_arn,
matching_notification_configuration,
) = get_matching_notification_configuration(
destination_type_arn,
existing_notification_configurations_for_type,
destination_arn,
)

if index_of_matching_arn is not None:
if json.dumps(
matching_notification_configuration, sort_keys=True
) != json.dumps(new_notification_configuration, sort_keys=True):
existing_notification_configurations_for_type[
index_of_matching_arn
] = new_notification_configuration
update_required = True
else:
existing_notification_configurations_for_type.append(
new_notification_configuration
)
update_required = True

if update_required:
existing_bucket_notification_configuration[
f"{destination_type}Configurations"
] = existing_notification_configurations_for_type
s3_client.put_bucket_notification_configuration(
Bucket=bucket,
NotificationConfiguration=merged_config,
NotificationConfiguration=existing_bucket_notification_configuration,
)
logger.info(
f"Put request completed to add a \
NotificationConfiguration for `{destination_type}Configurations`."
f"Put request completed to add a NotificationConfiguration for"
+ create_formatted_message(bucket, destination_type, destination_arn)
)
else:
logger.info(
f"Put not required as an existing NotificationConfiguration \
for `{destination_type}Configurations` already exists."
f"Put not required as an existing NotificationConfiguration already exists for"
+ create_formatted_message(bucket, destination_type, destination_arn)
)

def delete_notification(s3_client: boto3.client, bucket: str, destination_type: str):

def delete_notification(
s3_client: boto3.client, bucket: str, destination_type: str, destination_arn: str
) -> None:
"""Deletes the S3 notification configuration from an existing bucket for a specific destination type.
Args:
s3_client (boto3.client) : s3 client to use for s3 event config
bucket (str): bucket name of the s3 bucket to delete the config in
destination_type (str): String name of the destination type for the configuration
"""
existing_bucket_notification_configuration = (
s3_client.get_bucket_notification_configuration(Bucket=bucket)
destination_type_arn = f"{destination_type}Arn"

(
existing_bucket_notification_configuration,
existing_notification_configurations_for_type,
) = get_existing_bucket_notification_configuration_and_type(
s3_client, bucket, destination_type
)
existing_bucket_notification_configuration.pop("ResponseMetadata", None)
configuration_name = f"{destination_type}Configurations"

existing_notification_config_for_type = (
existing_bucket_notification_configuration.get(configuration_name, {})
(
index_of_matching_arn,
matching_notification_confiugration,
) = get_matching_notification_configuration(
destination_type_arn,
existing_notification_configurations_for_type,
destination_arn,
)

if existing_notification_config_for_type:
del existing_bucket_notification_configuration[configuration_name]
if index_of_matching_arn is not None:
del existing_notification_configurations_for_type[index_of_matching_arn]

if existing_notification_configurations_for_type:
existing_bucket_notification_configuration[
f"{destination_type}Configurations"
] = existing_notification_configurations_for_type
else:
del existing_bucket_notification_configuration[
f"{destination_type}Configurations"
]

s3_client.put_bucket_notification_configuration(
Bucket=bucket,
NotificationConfiguration=existing_bucket_notification_configuration,
)
logger.info(
f"Delete request completed to remove a \
NotificationConfiguration for `{destination_type}Configurations`."
f"Delete request completed to remove a NotificationConfiguration for"
+ create_formatted_message(bucket, destination_type, destination_arn)
)
else:
logger.info(
f"Delete not required as no \
NotificationConfiguration exists for `{destination_type}Configurations`."
f"Delete not required as no NotificationConfiguration exists for"
+ create_formatted_message(bucket, destination_type, destination_arn)
)
Loading

0 comments on commit c3afdef

Please sign in to comment.