Skip to content

Commit 1bcd4ff

Browse files
Michael Brewerheitorlessa
Michael Brewer
andauthored
feat(data-classes): ActiveMQ and RabbitMQ support (#770)
Co-authored-by: heitorlessa <lessa@amazon.co.uk>
1 parent 8b01fc5 commit 1bcd4ff

File tree

6 files changed

+465
-0
lines changed

6 files changed

+465
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import base64
2+
import json
3+
from typing import Any, Iterator, Optional
4+
5+
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
6+
7+
8+
class ActiveMQMessage(DictWrapper):
9+
@property
10+
def message_id(self) -> str:
11+
"""Unique identifier for the message"""
12+
return self["messageID"]
13+
14+
@property
15+
def message_type(self) -> str:
16+
return self["messageType"]
17+
18+
@property
19+
def data(self) -> str:
20+
return self["data"]
21+
22+
@property
23+
def decoded_data(self) -> str:
24+
"""Decodes the data as a str"""
25+
return base64.b64decode(self.data.encode()).decode()
26+
27+
@property
28+
def json_data(self) -> Any:
29+
"""Parses the data as json"""
30+
return json.loads(self.decoded_data)
31+
32+
@property
33+
def connection_id(self) -> str:
34+
return self["connectionId"]
35+
36+
@property
37+
def redelivered(self) -> bool:
38+
"""true if the message is being resent to the consumer"""
39+
return self["redelivered"]
40+
41+
@property
42+
def timestamp(self) -> int:
43+
"""Time in milliseconds."""
44+
return self["timestamp"]
45+
46+
@property
47+
def broker_in_time(self) -> int:
48+
"""Time stamp (in milliseconds) for when the message arrived at the broker."""
49+
return self["brokerInTime"]
50+
51+
@property
52+
def broker_out_time(self) -> int:
53+
"""Time stamp (in milliseconds) for when the message left the broker."""
54+
return self["brokerOutTime"]
55+
56+
@property
57+
def destination_physicalname(self) -> str:
58+
return self["destination"]["physicalname"]
59+
60+
@property
61+
def delivery_mode(self) -> Optional[int]:
62+
"""persistent or non-persistent delivery"""
63+
return self.get("deliveryMode")
64+
65+
@property
66+
def correlation_id(self) -> Optional[str]:
67+
"""User defined correlation id"""
68+
return self.get("correlationID")
69+
70+
@property
71+
def reply_to(self) -> Optional[str]:
72+
"""User defined reply to"""
73+
return self.get("replyTo")
74+
75+
@property
76+
def get_type(self) -> Optional[str]:
77+
"""User defined message type"""
78+
return self.get("type")
79+
80+
@property
81+
def expiration(self) -> Optional[int]:
82+
"""Expiration attribute whose value is given in milliseconds"""
83+
return self.get("expiration")
84+
85+
@property
86+
def priority(self) -> Optional[int]:
87+
"""
88+
JMS defines a ten-level priority value, with 0 as the lowest priority and 9
89+
as the highest. In addition, clients should consider priorities 0-4 as
90+
gradations of normal priority and priorities 5-9 as gradations of expedited
91+
priority.
92+
93+
JMS does not require that a provider strictly implement priority ordering
94+
of messages; however, it should do its best to deliver expedited messages
95+
ahead of normal messages.
96+
"""
97+
return self.get("priority")
98+
99+
100+
class ActiveMQEvent(DictWrapper):
101+
"""Represents an Active MQ event sent to Lambda
102+
103+
Documentation:
104+
--------------
105+
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
106+
- https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/
107+
"""
108+
109+
@property
110+
def event_source(self) -> str:
111+
return self["eventSource"]
112+
113+
@property
114+
def event_source_arn(self) -> str:
115+
"""The Amazon Resource Name (ARN) of the event source"""
116+
return self["eventSourceArn"]
117+
118+
@property
119+
def messages(self) -> Iterator[ActiveMQMessage]:
120+
for record in self["messages"]:
121+
yield ActiveMQMessage(record)
122+
123+
@property
124+
def message(self) -> ActiveMQMessage:
125+
return next(self.messages)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import base64
2+
import json
3+
from typing import Any, Dict, List
4+
5+
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
6+
7+
8+
class BasicProperties(DictWrapper):
9+
@property
10+
def content_type(self) -> str:
11+
return self["contentType"]
12+
13+
@property
14+
def content_encoding(self) -> str:
15+
return self["contentEncoding"]
16+
17+
@property
18+
def headers(self) -> Dict[str, Any]:
19+
return self["headers"]
20+
21+
@property
22+
def delivery_mode(self) -> int:
23+
return self["deliveryMode"]
24+
25+
@property
26+
def priority(self) -> int:
27+
return self["priority"]
28+
29+
@property
30+
def correlation_id(self) -> str:
31+
return self["correlationId"]
32+
33+
@property
34+
def reply_to(self) -> str:
35+
return self["replyTo"]
36+
37+
@property
38+
def expiration(self) -> str:
39+
return self["expiration"]
40+
41+
@property
42+
def message_id(self) -> str:
43+
return self["messageId"]
44+
45+
@property
46+
def timestamp(self) -> str:
47+
return self["timestamp"]
48+
49+
@property
50+
def get_type(self) -> str:
51+
return self["type"]
52+
53+
@property
54+
def user_id(self) -> str:
55+
return self["userId"]
56+
57+
@property
58+
def app_id(self) -> str:
59+
return self["appId"]
60+
61+
@property
62+
def cluster_id(self) -> str:
63+
return self["clusterId"]
64+
65+
@property
66+
def body_size(self) -> int:
67+
return self["bodySize"]
68+
69+
70+
class RabbitMessage(DictWrapper):
71+
@property
72+
def basic_properties(self) -> BasicProperties:
73+
return BasicProperties(self["basicProperties"])
74+
75+
@property
76+
def redelivered(self) -> bool:
77+
return self["redelivered"]
78+
79+
@property
80+
def data(self) -> str:
81+
return self["data"]
82+
83+
@property
84+
def decoded_data(self) -> str:
85+
"""Decodes the data as a str"""
86+
return base64.b64decode(self.data.encode()).decode()
87+
88+
@property
89+
def json_data(self) -> Any:
90+
"""Parses the data as json"""
91+
return json.loads(self.decoded_data)
92+
93+
94+
class RabbitMQEvent(DictWrapper):
95+
"""Represents a Rabbit MQ event sent to Lambda
96+
97+
Documentation:
98+
--------------
99+
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
100+
- https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/
101+
"""
102+
103+
def __init__(self, data: Dict[str, Any]):
104+
super().__init__(data)
105+
self._rmq_messages_by_queue = {
106+
key: [RabbitMessage(message) for message in messages]
107+
for key, messages in self["rmqMessagesByQueue"].items()
108+
}
109+
110+
@property
111+
def event_source(self) -> str:
112+
return self["eventSource"]
113+
114+
@property
115+
def event_source_arn(self) -> str:
116+
"""The Amazon Resource Name (ARN) of the event source"""
117+
return self["eventSourceArn"]
118+
119+
@property
120+
def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]:
121+
return self._rmq_messages_by_queue

docs/utilities/data_classes.md

+54
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ Same example as above, but using the `event_source` decorator
5858

5959
Event Source | Data_class
6060
------------------------------------------------- | ---------------------------------------------------------------------------------
61+
[Active MQ](#active-mq) | `ActiveMQEvent`
6162
[API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent`
6263
[API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2`
6364
[API Gateway Proxy](#api-gateway-proxy) | `APIGatewayProxyEvent`
@@ -72,6 +73,7 @@ Event Source | Data_class
7273
[DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName`
7374
[EventBridge](#eventbridge) | `EventBridgeEvent`
7475
[Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent`
76+
[Rabbit MQ](#rabbit-mq) | `RabbitMQEvent`
7577
[S3](#s3) | `S3Event`
7678
[S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent`
7779
[SES](#ses) | `SESEvent`
@@ -82,6 +84,31 @@ Event Source | Data_class
8284
The examples provided below are far from exhaustive - the data classes themselves are designed to provide a form of
8385
documentation inherently (via autocompletion, types and docstrings).
8486

87+
### Active MQ
88+
89+
It is used for [Active MQ payloads](https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html){target="_blank"}, also see
90+
the [AWS blog post](https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/){target="_blank"}
91+
for more details.
92+
93+
=== "app.py"
94+
95+
```python hl_lines="4-5 9-10"
96+
from typing import Dict
97+
98+
from aws_lambda_powertools import Logger
99+
from aws_lambda_powertools.utilities.data_classes import event_source
100+
from aws_lambda_powertools.utilities.data_classes.active_mq_event import ActiveMQEvent
101+
102+
logger = Logger()
103+
104+
@event_source(data_class=ActiveMQEvent)
105+
def lambda_handler(event: ActiveMQEvent, context):
106+
for message in event.messages:
107+
logger.debug(f"MessageID: {message.message_id}")
108+
data: Dict = message.json_data
109+
logger.debug("Process json in base64 encoded data str", data)
110+
```
111+
85112
### API Gateway Authorizer
86113

87114
> New in 1.20.0
@@ -810,6 +837,33 @@ or plain text, depending on the original payload.
810837
do_something_with(data)
811838
```
812839

840+
### Rabbit MQ
841+
842+
It is used for [Rabbit MQ payloads](https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html){target="_blank"}, also see
843+
the [blog post](https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/){target="_blank"}
844+
for more details.
845+
846+
=== "app.py"
847+
848+
```python hl_lines="4-5 9-10"
849+
from typing import Dict
850+
851+
from aws_lambda_powertools import Logger
852+
from aws_lambda_powertools.utilities.data_classes import event_source
853+
from aws_lambda_powertools.utilities.data_classes.rabbit_mq_event import RabbitMQEvent
854+
855+
logger = Logger()
856+
857+
@event_source(data_class=RabbitMQEvent)
858+
def lambda_handler(event: RabbitMQEvent, context):
859+
for queue_name, messages in event.rmq_messages_by_queue.items():
860+
logger.debug(f"Messages for queue: {queue_name}")
861+
for message in messages:
862+
logger.debug(f"MessageID: {message.basic_properties.message_id}")
863+
data: Dict = message.json_data
864+
logger.debug("Process json in base64 encoded data str", data)
865+
```
866+
813867
### S3
814868

815869
=== "app.py"

tests/events/activeMQEvent.json

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
{
2+
"eventSource": "aws:amq",
3+
"eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8",
4+
"messages": [
5+
{
6+
"messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
7+
"messageType": "jms/text-message",
8+
"data": "QUJDOkFBQUE=",
9+
"connectionId": "myJMSCoID",
10+
"redelivered": false,
11+
"destination": {
12+
"physicalname": "testQueue"
13+
},
14+
"timestamp": 1598827811958,
15+
"brokerInTime": 1598827811958,
16+
"brokerOutTime": 1598827811959
17+
},
18+
{
19+
"messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
20+
"messageType": "jms/text-message",
21+
"data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==",
22+
"connectionId": "myJMSCoID2",
23+
"redelivered": false,
24+
"destination": {
25+
"physicalname": "testQueue"
26+
},
27+
"timestamp": 1598827811958,
28+
"brokerInTime": 1598827811958,
29+
"brokerOutTime": 1598827811959
30+
},
31+
{
32+
"messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
33+
"messageType": "jms/bytes-message",
34+
"data": "3DTOOW7crj51prgVLQaGQ82S48k=",
35+
"connectionId": "myJMSCoID1",
36+
"persistent": false,
37+
"destination": {
38+
"physicalname": "testQueue"
39+
},
40+
"timestamp": 1598827811958,
41+
"brokerInTime": 1598827811958,
42+
"brokerOutTime": 1598827811959
43+
}
44+
]
45+
}

0 commit comments

Comments
 (0)