From 479cc5d37e6abcab01f31e2c19825fe673ab0623 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Fri, 16 Aug 2024 21:09:03 +0000 Subject: [PATCH] Events: now supports the InputTemplate-parameter (#7992) --- .github/workflows/tests_real_aws.yml | 2 +- moto/events/models.py | 95 ++++++++----- moto/events/utils.py | 59 +++++++- moto/s3/notifications.py | 2 +- setup.cfg | 2 +- tests/test_events/test_events.py | 79 +++++++++-- .../test_events/test_events_cloudformation.py | 128 ++++++++++++++++++ 7 files changed, 318 insertions(+), 49 deletions(-) diff --git a/.github/workflows/tests_real_aws.yml b/.github/workflows/tests_real_aws.yml index 736b47a552d4..f3d204bbbe64 100644 --- a/.github/workflows/tests_real_aws.yml +++ b/.github/workflows/tests_real_aws.yml @@ -44,4 +44,4 @@ jobs: env: MOTO_TEST_ALLOW_AWS_REQUEST: ${{ true }} run: | - pytest -sv -n auto tests/test_applicationautoscaling/ tests/test_athena/ tests/test_cloudformation/ tests/test_dynamodb/ tests/test_ec2/ tests/test_iam/ tests/test_iot/ tests/test_lakeformation/ tests/test_logs/ tests/test_sqs/ tests/test_ses/ tests/test_s3* tests/test_stepfunctions/ tests/test_sns/ tests/test_timestreamwrite/ -m aws_verified + pytest -sv -n auto tests/test_applicationautoscaling/ tests/test_athena/ tests/test_cloudformation/ tests/test_dynamodb/ tests/test_ec2/ tests/test_events/ tests/test_iam/ tests/test_iot/ tests/test_lakeformation/ tests/test_logs/ tests/test_sqs/ tests/test_ses/ tests/test_s3* tests/test_stepfunctions/ tests/test_sns/ tests/test_timestreamwrite/ -m aws_verified diff --git a/moto/events/models.py b/moto/events/models.py index 9b43ba579ab7..dc54f52e813c 100644 --- a/moto/events/models.py +++ b/moto/events/models.py @@ -38,7 +38,12 @@ get_partition, ) -from .utils import _BASE_EVENT_MESSAGE, PAGINATION_MODEL, EventMessageType +from .utils import ( + _BASE_EVENT_MESSAGE, + PAGINATION_MODEL, + EventMessageType, + EventTemplateParser, +) if TYPE_CHECKING: from moto.secretsmanager.models import SecretsManagerBackend @@ -110,7 +115,8 @@ def disable(self) -> None: self.state = "DISABLED" def delete(self, account_id: str, region_name: str) -> None: - event_backend = events_backends[account_id][region_name] + event_backend: EventsBackend = events_backends[account_id][region_name] + self.remove_targets([t["Id"] for t in self.targets]) event_backend.delete_rule(name=self.name, event_bus_arn=self.event_bus_name) def put_targets(self, targets: List[Dict[str, Any]]) -> None: @@ -128,8 +134,10 @@ def remove_targets(self, ids: List[str]) -> None: if index is not None: self.targets.pop(index) - def send_to_targets(self, event: EventMessageType) -> None: - if not self.event_pattern.matches_event(event): + def send_to_targets( + self, original_event: EventMessageType, transform_input: bool = True + ) -> None: + if not self.event_pattern.matches_event(original_event): return # supported targets @@ -140,13 +148,22 @@ def send_to_targets(self, event: EventMessageType) -> None: for target in self.targets: arn = parse_arn(target["Arn"]) + if transform_input: + input_transformer = target.get("InputTransformer", {}) + event = EventTemplateParser.parse( + input_template=input_transformer.get("InputTemplate"), + input_paths_map=input_transformer.get("InputPathsMap", {}), + event=original_event, + ) + else: + event = original_event.copy() # type: ignore[assignment] + if arn.service == "logs" and arn.resource_type == "log-group": self._send_to_cw_log_group(arn.resource_id, event) elif arn.service == "events" and not arn.resource_type: - input_template = json.loads(target["InputTransformer"]["InputTemplate"]) - archive_arn = parse_arn(input_template["archive-arn"]) + archive_arn = parse_arn(event["archive-arn"]) - self._send_to_events_archive(archive_arn.resource_id, event) + self._send_to_events_archive(archive_arn.resource_id, original_event) elif arn.service == "sqs": group_id = target.get("SqsParameters", {}).get("MessageGroupId") self._send_to_sqs_queue(arn.resource_id, event, group_id) @@ -181,18 +198,15 @@ def send_to_targets(self, event: EventMessageType) -> None: else: raise NotImplementedError(f"Expr not defined for {type(self)}") - def _send_to_cw_log_group(self, name: str, event: EventMessageType) -> None: + def _send_to_cw_log_group(self, name: str, event: Dict[str, Any]) -> None: from moto.logs import logs_backends - event_copy = copy.deepcopy(event) - event_copy["time"] = iso_8601_datetime_without_milliseconds( - utcfromtimestamp(event_copy["time"]) # type: ignore[arg-type] + event["time"] = iso_8601_datetime_without_milliseconds( + utcfromtimestamp(event["time"]) # type: ignore[arg-type] ) log_stream_name = str(random.uuid4()) - log_events = [ - {"timestamp": unix_time_millis(), "message": json.dumps(event_copy)} - ] + log_events = [{"timestamp": unix_time_millis(), "message": json.dumps(event)}] log_backend = logs_backends[self.account_id][self.region_name] log_backend.create_log_stream(name, log_stream_name) @@ -214,13 +228,12 @@ def _find_api_destination(self, resource_id: str) -> "Destination": return backend.destinations[destination_name] def _send_to_sqs_queue( - self, resource_id: str, event: EventMessageType, group_id: Optional[str] = None + self, resource_id: str, event: Dict[str, Any], group_id: Optional[str] = None ) -> None: from moto.sqs import sqs_backends - event_copy = copy.deepcopy(event) - event_copy["time"] = iso_8601_datetime_without_milliseconds( - utcfromtimestamp(event_copy["time"]) # type: ignore[arg-type] + event["time"] = iso_8601_datetime_without_milliseconds( + utcfromtimestamp(float(event["time"])) # type: ignore[arg-type] ) if group_id: @@ -238,7 +251,7 @@ def _send_to_sqs_queue( sqs_backends[self.account_id][self.region_name].send_message( queue_name=resource_id, - message_body=json.dumps(event_copy), + message_body=json.dumps(event), group_id=group_id, ) @@ -288,8 +301,8 @@ def create_from_cloudformation_json( # type: ignore[misc] event_bus_arn = properties.get("EventBusName") tags = properties.get("Tags") - backend = events_backends[account_id][region_name] - return backend.put_rule( + backend: "EventsBackend" = events_backends[account_id][region_name] + rule = backend.put_rule( event_name, scheduled_expression=scheduled_expression, event_pattern=event_pattern, @@ -300,6 +313,16 @@ def create_from_cloudformation_json( # type: ignore[misc] tags=tags, ) + targets = properties.get("Targets", []) + if targets: + backend.put_targets( + name=rule.name, + event_bus_arn=event_bus_arn, + targets=targets, + ) + + return rule + @classmethod def update_from_cloudformation_json( # type: ignore[misc] cls, @@ -322,9 +345,10 @@ def delete_from_cloudformation_json( # type: ignore[misc] account_id: str, region_name: str, ) -> None: - event_backend = events_backends[account_id][region_name] + event_backend: EventsBackend = events_backends[account_id][region_name] properties = cloudformation_json["Properties"] event_bus_arn = properties.get("EventBusName") + event_backend.delete_rule(resource_name, event_bus_arn) def describe(self) -> Dict[str, Any]: @@ -378,6 +402,8 @@ def has_permissions(self) -> bool: def delete(self, account_id: str, region_name: str) -> None: event_backend = events_backends[account_id][region_name] + for rule in self.rules.values(): + rule.delete(account_id, region_name) event_backend.delete_event_bus(name=self.name) @classmethod @@ -576,6 +602,7 @@ def __init__( self.events: List[EventMessageType] = [] self.event_bus_name = source_arn.split("/")[-1] + self.rule: Optional[Rule] = None def describe_short(self) -> Dict[str, Any]: return { @@ -612,7 +639,11 @@ def update( self.retention = retention def delete(self, account_id: str, region_name: str) -> None: - event_backend = events_backends[account_id][region_name] + event_backend: EventsBackend = events_backends[account_id][region_name] + + if self.rule: + self.rule.delete(account_id, region_name) + event_backend.archives.pop(self.name) @classmethod @@ -756,6 +787,7 @@ def replay_events(self, archive: Archive) -> None: event, **{"id": str(random.uuid4()), "replay-name": self.name}, # type: ignore ), + transform_input=False, ) self.state = ReplayState.COMPLETED @@ -1148,7 +1180,11 @@ def _normalize_event_bus_arn(self, event_bus_arn: Optional[str]) -> str: def delete_rule(self, name: str, event_bus_arn: Optional[str]) -> None: event_bus_name = self._normalize_event_bus_arn(event_bus_arn) - event_bus = self._get_event_bus(event_bus_name) + try: + event_bus = self._get_event_bus(event_bus_name) + except ResourceNotFoundException: + # If the EventBus is deleted, the Rule is also gone + return rule = event_bus.rules.get(name) if not rule: return @@ -1350,6 +1386,7 @@ def put_events(self, events: List[Dict[str, Any]]) -> List[Dict[str, Any]]: event_msg["region"] = self.region_name event_msg["resources"] = event.get("Resources", []) event_msg["detail"] = json.loads(event["Detail"]) + event_msg["ingestion-time"] = unix_time() rule.send_to_targets(event_msg) return entries @@ -1610,6 +1647,7 @@ def create_archive( event_bus_arn=event_bus.name, managed_by="prod.vhs.events.aws.internal", ) + archive.rule = rule self.put_targets( rule.name, rule.event_bus_name, @@ -1619,13 +1657,8 @@ def create_archive( "Arn": f"arn:{get_partition(self.region_name)}:events:{self.region_name}:::", "InputTransformer": { "InputPathsMap": {}, - "InputTemplate": json.dumps( - { - "archive-arn": f"{archive.arn}:{archive.uuid}", - "event": "", - "ingestion-time": "", - } - ), + # Template is not valid JSON, so we can't json.dump it + "InputTemplate": f'{{"archive-arn": "{archive.arn}:{archive.uuid}", "event": , "ingestion-time": }}', }, } ], diff --git a/moto/events/utils.py b/moto/events/utils.py index e84c68548fb3..6c4640f2997f 100644 --- a/moto/events/utils.py +++ b/moto/events/utils.py @@ -1,4 +1,5 @@ -from typing import TYPE_CHECKING, List, TypedDict +import json +from typing import TYPE_CHECKING, Any, Dict, List, TypedDict if TYPE_CHECKING: from typing_extensions import Any, Dict, Required, Union @@ -19,6 +20,7 @@ "region": str, "resources": List[str], "detail": "Required[Dict[str, Any]]", + "ingestion-time": float, }, total=False, ) @@ -58,3 +60,58 @@ "resources": [], "detail": {}, } + + +class EventTemplateParser: + DEFAULT_EVENT_INPUT_TEMPLATE = '{"id": "", "time":