From 2861e2e56e296149945749e8baff0595267fce9b Mon Sep 17 00:00:00 2001 From: David Redmin Date: Thu, 23 May 2024 11:23:36 -0400 Subject: [PATCH] Prefer CamelCase for class names Co-authored-by: Nick <50747025+mcdonnnj@users.noreply.github.com> --- src/lambda_handler.py | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/src/lambda_handler.py b/src/lambda_handler.py index 1623b14..809473a 100644 --- a/src/lambda_handler.py +++ b/src/lambda_handler.py @@ -27,7 +27,7 @@ # Define some named tuples to make the code more readable -class aws_credentials(NamedTuple): +class AwsCredentials(NamedTuple): """Named tuple to hold AWS credentials.""" access_key_id: str @@ -35,14 +35,14 @@ class aws_credentials(NamedTuple): session_token: str -class ec2_info(NamedTuple): +class Ec2Info(NamedTuple): """Named tuple to hold EC2 information.""" application_tag_value: str public_ip: str -class event_validation(NamedTuple): +class EventValidation(NamedTuple): """Named tuple to hold event validation information.""" errors: List[str] @@ -69,7 +69,7 @@ class FileConfig(TypedDict): static_ips: List[str] -def assume_role(role_arn: str, session_name: str) -> aws_credentials: +def assume_role(role_arn: str, session_name: str) -> AwsCredentials: """Assume the given role and return a named tuple containing the assumed role's credentials.""" # Create an STS session with current credentials sts: boto3.client = boto3.client("sts") @@ -79,7 +79,7 @@ def assume_role(role_arn: str, session_name: str) -> aws_credentials: RoleArn=role_arn, RoleSessionName=session_name ) - return aws_credentials( + return AwsCredentials( response["Credentials"]["AccessKeyId"], response["Credentials"]["SecretAccessKey"], response["Credentials"]["SessionToken"], @@ -90,7 +90,7 @@ def create_assumed_aws_client( aws_service: str, role_arn: str, session_name: str ) -> boto3.client: """Assume the given role and return an AWS client for the given service using that role.""" - role_credentials: aws_credentials = assume_role(role_arn, session_name) + role_credentials: AwsCredentials = assume_role(role_arn, session_name) return boto3.client( aws_service, @@ -104,7 +104,7 @@ def create_assumed_aws_resource( aws_service: str, region: str, role_arn: str, session_name: str ) -> boto3.resource: """Assume the given role and return an AWS resource object for the given service using that role.""" - role_credentials: aws_credentials = assume_role(role_arn, session_name) + role_credentials: AwsCredentials = assume_role(role_arn, session_name) return boto3.resource( aws_service, @@ -127,7 +127,7 @@ def convert_tags(aws_resource: boto3.resource) -> Dict[str, str]: def get_ec2_ips( ec2: boto3.resource, application_tag_name: str, publish_egress_tag_name: str -) -> Iterator[ec2_info]: +) -> Iterator[Ec2Info]: """Create a set of public EC2 IPs. Yields (application tag value, public_ip) tuples. @@ -153,7 +153,7 @@ def get_ec2_ips( # Send back a tuple associating the public IP to an application. # If application is unset, return "", so that the IP can be included # in a list of all IPs if desired (e.g. using app_regex=".*"). - yield ec2_info(tags.get(application_tag_name, ""), instance.public_ip_address) + yield Ec2Info(tags.get(application_tag_name, ""), instance.public_ip_address) for vpc_address in vpc_addresses: # Convert elastic IP tags from an AWS dictionary into a Python dictionary @@ -165,7 +165,7 @@ def get_ec2_ips( # Send back a tuple associating the public IP to an application. # If application is unset, return "", so that the IP can be included # in a list of all IPs if desired (e.g. using app_regex=".*"). - yield ec2_info(eip_tags.get(application_tag_name, ""), vpc_address.public_ip) + yield Ec2Info(eip_tags.get(application_tag_name, ""), vpc_address.public_ip) def get_ec2_regions( @@ -222,7 +222,7 @@ def task_default(event): return result -def validate_event_data(event: Dict[str, Any]) -> event_validation: +def validate_event_data(event: Dict[str, Any]) -> EventValidation: """Validate the event data and return a tuple containing the validated event, a boolean result (True if valid, False if invalid), and a list of error message strings.""" result = True errors = [] @@ -306,7 +306,7 @@ def validate_event_data(event: Dict[str, Any]) -> event_validation: if errors: result = False - return event_validation(errors, event, result) + return EventValidation(errors, event, result) def task_publish(event: Dict[str, Any]) -> Dict[str, Union[Optional[str], bool]]: @@ -314,13 +314,13 @@ def task_publish(event: Dict[str, Any]) -> Dict[str, Union[Optional[str], bool]] result: Dict[str, Union[Optional[str], bool]] = {"message": None, "success": True} # Validate all event data before going any further - event_validation_info: event_validation = validate_event_data(event) - if not event_validation_info.valid: - for e in event_validation_info.errors: + EventValidation_info: EventValidation = validate_event_data(event) + if not EventValidation_info.valid: + for e in EventValidation_info.errors: logging.error(e) - failed_task(result, " ".join(event_validation_info.errors)) + failed_task(result, " ".join(EventValidation_info.errors)) return result - validated_event = event_validation_info.event + validated_event = EventValidation_info.event # The account IDs to examine for IP addresses account_ids: List[str] = validated_event["account_ids"] @@ -407,13 +407,13 @@ def task_publish(event: Dict[str, Any]) -> Dict[str, Union[Optional[str], bool]] ) # Get the public IPs of instances that are tagged to be published - for ec2_info in get_ec2_ips( + for Ec2Info in get_ec2_ips( ec2, application_tag_name, publish_egress_tag_name ): # Loop through all regexes and add IP to set if matched for config in file_configs: - if config["app_regex"].match(ec2_info.application_tag_value): - config["ip_set"].add(ip_network(ec2_info.public_ip)) + if config["app_regex"].match(Ec2Info.application_tag_value): + config["ip_set"].add(ip_network(Ec2Info.public_ip)) # Use a single timestamp for all files now = "{:%a %b %d %H:%M:%S UTC %Y}".format(datetime.utcnow())