Skip to content

Commit

Permalink
Events: now supports the InputTemplate-parameter (#7992)
Browse files Browse the repository at this point in the history
  • Loading branch information
bblommers authored Aug 16, 2024
1 parent 88405fc commit 479cc5d
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests_real_aws.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ jobs:
env:
MOTO_TEST_ALLOW_AWS_REQUEST: ${{ true }}
run: |
pytest -sv -n auto tests/test_applicationautoscaling/ tests/test_athena/ tests/test_cloudformation/ tests/test_dynamodb/ tests/test_ec2/ tests/test_iam/ tests/test_iot/ tests/test_lakeformation/ tests/test_logs/ tests/test_sqs/ tests/test_ses/ tests/test_s3* tests/test_stepfunctions/ tests/test_sns/ tests/test_timestreamwrite/ -m aws_verified
pytest -sv -n auto tests/test_applicationautoscaling/ tests/test_athena/ tests/test_cloudformation/ tests/test_dynamodb/ tests/test_ec2/ tests/test_events/ tests/test_iam/ tests/test_iot/ tests/test_lakeformation/ tests/test_logs/ tests/test_sqs/ tests/test_ses/ tests/test_s3* tests/test_stepfunctions/ tests/test_sns/ tests/test_timestreamwrite/ -m aws_verified
95 changes: 64 additions & 31 deletions moto/events/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@
get_partition,
)

from .utils import _BASE_EVENT_MESSAGE, PAGINATION_MODEL, EventMessageType
from .utils import (
_BASE_EVENT_MESSAGE,
PAGINATION_MODEL,
EventMessageType,
EventTemplateParser,
)

if TYPE_CHECKING:
from moto.secretsmanager.models import SecretsManagerBackend
Expand Down Expand Up @@ -110,7 +115,8 @@ def disable(self) -> None:
self.state = "DISABLED"

def delete(self, account_id: str, region_name: str) -> None:
event_backend = events_backends[account_id][region_name]
event_backend: EventsBackend = events_backends[account_id][region_name]
self.remove_targets([t["Id"] for t in self.targets])
event_backend.delete_rule(name=self.name, event_bus_arn=self.event_bus_name)

def put_targets(self, targets: List[Dict[str, Any]]) -> None:
Expand All @@ -128,8 +134,10 @@ def remove_targets(self, ids: List[str]) -> None:
if index is not None:
self.targets.pop(index)

def send_to_targets(self, event: EventMessageType) -> None:
if not self.event_pattern.matches_event(event):
def send_to_targets(
self, original_event: EventMessageType, transform_input: bool = True
) -> None:
if not self.event_pattern.matches_event(original_event):
return

# supported targets
Expand All @@ -140,13 +148,22 @@ def send_to_targets(self, event: EventMessageType) -> None:
for target in self.targets:
arn = parse_arn(target["Arn"])

if transform_input:
input_transformer = target.get("InputTransformer", {})
event = EventTemplateParser.parse(
input_template=input_transformer.get("InputTemplate"),
input_paths_map=input_transformer.get("InputPathsMap", {}),
event=original_event,
)
else:
event = original_event.copy() # type: ignore[assignment]

if arn.service == "logs" and arn.resource_type == "log-group":
self._send_to_cw_log_group(arn.resource_id, event)
elif arn.service == "events" and not arn.resource_type:
input_template = json.loads(target["InputTransformer"]["InputTemplate"])
archive_arn = parse_arn(input_template["archive-arn"])
archive_arn = parse_arn(event["archive-arn"])

self._send_to_events_archive(archive_arn.resource_id, event)
self._send_to_events_archive(archive_arn.resource_id, original_event)
elif arn.service == "sqs":
group_id = target.get("SqsParameters", {}).get("MessageGroupId")
self._send_to_sqs_queue(arn.resource_id, event, group_id)
Expand Down Expand Up @@ -181,18 +198,15 @@ def send_to_targets(self, event: EventMessageType) -> None:
else:
raise NotImplementedError(f"Expr not defined for {type(self)}")

def _send_to_cw_log_group(self, name: str, event: EventMessageType) -> None:
def _send_to_cw_log_group(self, name: str, event: Dict[str, Any]) -> None:
from moto.logs import logs_backends

event_copy = copy.deepcopy(event)
event_copy["time"] = iso_8601_datetime_without_milliseconds(
utcfromtimestamp(event_copy["time"]) # type: ignore[arg-type]
event["time"] = iso_8601_datetime_without_milliseconds(
utcfromtimestamp(event["time"]) # type: ignore[arg-type]
)

log_stream_name = str(random.uuid4())
log_events = [
{"timestamp": unix_time_millis(), "message": json.dumps(event_copy)}
]
log_events = [{"timestamp": unix_time_millis(), "message": json.dumps(event)}]

log_backend = logs_backends[self.account_id][self.region_name]
log_backend.create_log_stream(name, log_stream_name)
Expand All @@ -214,13 +228,12 @@ def _find_api_destination(self, resource_id: str) -> "Destination":
return backend.destinations[destination_name]

def _send_to_sqs_queue(
self, resource_id: str, event: EventMessageType, group_id: Optional[str] = None
self, resource_id: str, event: Dict[str, Any], group_id: Optional[str] = None
) -> None:
from moto.sqs import sqs_backends

event_copy = copy.deepcopy(event)
event_copy["time"] = iso_8601_datetime_without_milliseconds(
utcfromtimestamp(event_copy["time"]) # type: ignore[arg-type]
event["time"] = iso_8601_datetime_without_milliseconds(
utcfromtimestamp(float(event["time"])) # type: ignore[arg-type]
)

if group_id:
Expand All @@ -238,7 +251,7 @@ def _send_to_sqs_queue(

sqs_backends[self.account_id][self.region_name].send_message(
queue_name=resource_id,
message_body=json.dumps(event_copy),
message_body=json.dumps(event),
group_id=group_id,
)

Expand Down Expand Up @@ -288,8 +301,8 @@ def create_from_cloudformation_json( # type: ignore[misc]
event_bus_arn = properties.get("EventBusName")
tags = properties.get("Tags")

backend = events_backends[account_id][region_name]
return backend.put_rule(
backend: "EventsBackend" = events_backends[account_id][region_name]
rule = backend.put_rule(
event_name,
scheduled_expression=scheduled_expression,
event_pattern=event_pattern,
Expand All @@ -300,6 +313,16 @@ def create_from_cloudformation_json( # type: ignore[misc]
tags=tags,
)

targets = properties.get("Targets", [])
if targets:
backend.put_targets(
name=rule.name,
event_bus_arn=event_bus_arn,
targets=targets,
)

return rule

@classmethod
def update_from_cloudformation_json( # type: ignore[misc]
cls,
Expand All @@ -322,9 +345,10 @@ def delete_from_cloudformation_json( # type: ignore[misc]
account_id: str,
region_name: str,
) -> None:
event_backend = events_backends[account_id][region_name]
event_backend: EventsBackend = events_backends[account_id][region_name]
properties = cloudformation_json["Properties"]
event_bus_arn = properties.get("EventBusName")

event_backend.delete_rule(resource_name, event_bus_arn)

def describe(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -378,6 +402,8 @@ def has_permissions(self) -> bool:

def delete(self, account_id: str, region_name: str) -> None:
event_backend = events_backends[account_id][region_name]
for rule in self.rules.values():
rule.delete(account_id, region_name)
event_backend.delete_event_bus(name=self.name)

@classmethod
Expand Down Expand Up @@ -576,6 +602,7 @@ def __init__(

self.events: List[EventMessageType] = []
self.event_bus_name = source_arn.split("/")[-1]
self.rule: Optional[Rule] = None

def describe_short(self) -> Dict[str, Any]:
return {
Expand Down Expand Up @@ -612,7 +639,11 @@ def update(
self.retention = retention

def delete(self, account_id: str, region_name: str) -> None:
event_backend = events_backends[account_id][region_name]
event_backend: EventsBackend = events_backends[account_id][region_name]

if self.rule:
self.rule.delete(account_id, region_name)

event_backend.archives.pop(self.name)

@classmethod
Expand Down Expand Up @@ -756,6 +787,7 @@ def replay_events(self, archive: Archive) -> None:
event,
**{"id": str(random.uuid4()), "replay-name": self.name}, # type: ignore
),
transform_input=False,
)

self.state = ReplayState.COMPLETED
Expand Down Expand Up @@ -1148,7 +1180,11 @@ def _normalize_event_bus_arn(self, event_bus_arn: Optional[str]) -> str:

def delete_rule(self, name: str, event_bus_arn: Optional[str]) -> None:
event_bus_name = self._normalize_event_bus_arn(event_bus_arn)
event_bus = self._get_event_bus(event_bus_name)
try:
event_bus = self._get_event_bus(event_bus_name)
except ResourceNotFoundException:
# If the EventBus is deleted, the Rule is also gone
return
rule = event_bus.rules.get(name)
if not rule:
return
Expand Down Expand Up @@ -1350,6 +1386,7 @@ def put_events(self, events: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
event_msg["region"] = self.region_name
event_msg["resources"] = event.get("Resources", [])
event_msg["detail"] = json.loads(event["Detail"])
event_msg["ingestion-time"] = unix_time()
rule.send_to_targets(event_msg)

return entries
Expand Down Expand Up @@ -1610,6 +1647,7 @@ def create_archive(
event_bus_arn=event_bus.name,
managed_by="prod.vhs.events.aws.internal",
)
archive.rule = rule
self.put_targets(
rule.name,
rule.event_bus_name,
Expand All @@ -1619,13 +1657,8 @@ def create_archive(
"Arn": f"arn:{get_partition(self.region_name)}:events:{self.region_name}:::",
"InputTransformer": {
"InputPathsMap": {},
"InputTemplate": json.dumps(
{
"archive-arn": f"{archive.arn}:{archive.uuid}",
"event": "<aws.events.event.json>",
"ingestion-time": "<aws.events.event.ingestion-time>",
}
),
# Template is not valid JSON, so we can't json.dump it
"InputTemplate": f'{{"archive-arn": "{archive.arn}:{archive.uuid}", "event": <aws.events.event.json>, "ingestion-time": <aws.events.event.ingestion-time>}}',
},
}
],
Expand Down
59 changes: 58 additions & 1 deletion moto/events/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import TYPE_CHECKING, List, TypedDict
import json
from typing import TYPE_CHECKING, Any, Dict, List, TypedDict

if TYPE_CHECKING:
from typing_extensions import Any, Dict, Required, Union
Expand All @@ -19,6 +20,7 @@
"region": str,
"resources": List[str],
"detail": "Required[Dict[str, Any]]",
"ingestion-time": float,
},
total=False,
)
Expand Down Expand Up @@ -58,3 +60,58 @@
"resources": [],
"detail": {},
}


class EventTemplateParser:
DEFAULT_EVENT_INPUT_TEMPLATE = '{"id": "<id>", "time": <time>, "version": "0", "detail-type": "<detail-type>", "source": "<source>", "region": "<region>", "resources": <resources>, "detail": <detail>}'
DEFAULT_EVENT_INPUT_PATHS_MAP = {
"account": "$.account",
"detail": "$.detail",
"detail-type": "$.detail-type",
"source": "$.source",
"id": "$.id",
"region": "$.region",
"resources": "$.resources",
"time": "$.time",
}

@staticmethod
def _stringify(result: Any) -> str: # type: ignore[misc]
if isinstance(result, dict):
result = json.dumps(result)
elif isinstance(result, list):
result = json.dumps([EventTemplateParser._stringify(x) for x in result])
elif isinstance(result, (int, float)):
result = str(result)
return result

@staticmethod
def parse( # type: ignore[misc]
input_template: str, input_paths_map: Dict[str, Any], event: EventMessageType
) -> Dict[str, Any]:
from jsonpath_ng.ext import parse

template_to_use = (
input_template or EventTemplateParser.DEFAULT_EVENT_INPUT_TEMPLATE
)
input_paths_map = (
input_paths_map or EventTemplateParser.DEFAULT_EVENT_INPUT_PATHS_MAP
)
for input_path in input_paths_map:
input_expr = parse(input_paths_map[input_path])
matches = input_expr.find(event)
result = (
EventTemplateParser._stringify(matches[0].value) if matches else None
)
if result:
template_to_use = template_to_use.replace(f"<{input_path}>", result)

default_inputs_map = {
"aws.events.event.json": event,
"aws.events.event.ingestion-time": event["ingestion-time"],
}
for input_path in default_inputs_map:
result = EventTemplateParser._stringify(default_inputs_map[input_path])
template_to_use = template_to_use.replace(f"<{input_path}>", result)

return json.loads(template_to_use)
2 changes: 1 addition & 1 deletion moto/s3/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def _send_event_bridge_message(
events_backend = events_backends[account_id][bucket.region_name]
for event_bus in events_backend.event_buses.values():
for rule in event_bus.rules.values():
rule.send_to_targets(event)
rule.send_to_targets(event, transform_input=False)

except: # noqa
# This is an async action in AWS.
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ emr =
emrcontainers =
emrserverless =
es =
events =
events = jsonpath_ng
firehose =
forecast =
glacier =
Expand Down
Loading

0 comments on commit 479cc5d

Please sign in to comment.