Skip to content

Commit d3a0007

Browse files
feat(data-classes): add KafkaEvent and KafkaEventRecord (#1485)
Co-authored-by: Leandro Damascena <leandro.damascena@gmail.com>
1 parent a43f816 commit d3a0007

File tree

6 files changed

+275
-0
lines changed

6 files changed

+275
-0
lines changed

aws_lambda_powertools/utilities/data_classes/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from .dynamo_db_stream_event import DynamoDBStreamEvent
1313
from .event_bridge_event import EventBridgeEvent
1414
from .event_source import event_source
15+
from .kafka_event import KafkaEvent
1516
from .kinesis_stream_event import KinesisStreamEvent
1617
from .lambda_function_url_event import LambdaFunctionUrlEvent
1718
from .s3_event import S3Event
@@ -30,6 +31,7 @@
3031
"ConnectContactFlowEvent",
3132
"DynamoDBStreamEvent",
3233
"EventBridgeEvent",
34+
"KafkaEvent",
3335
"KinesisStreamEvent",
3436
"LambdaFunctionUrlEvent",
3537
"S3Event",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import base64
2+
import json
3+
from typing import Any, Dict, Iterator, List, Optional
4+
5+
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
6+
7+
8+
class KafkaEventRecord(DictWrapper):
9+
@property
10+
def topic(self) -> str:
11+
"""The Kafka topic."""
12+
return self["topic"]
13+
14+
@property
15+
def partition(self) -> str:
16+
"""The Kafka record parition."""
17+
return self["partition"]
18+
19+
@property
20+
def offset(self) -> str:
21+
"""The Kafka record offset."""
22+
return self["offset"]
23+
24+
@property
25+
def timestamp(self) -> int:
26+
"""The Kafka record timestamp."""
27+
return self["timestamp"]
28+
29+
@property
30+
def timestamp_type(self) -> str:
31+
"""The Kafka record timestamp type."""
32+
return self["timestampType"]
33+
34+
@property
35+
def key(self) -> str:
36+
"""The raw (base64 encoded) Kafka record key."""
37+
return self["key"]
38+
39+
@property
40+
def decoded_key(self) -> bytes:
41+
"""Decode the base64 encoded key as bytes."""
42+
return base64.b64decode(self.key)
43+
44+
@property
45+
def value(self) -> str:
46+
"""The raw (base64 encoded) Kafka record value."""
47+
return self["value"]
48+
49+
@property
50+
def decoded_value(self) -> bytes:
51+
"""Decodes the base64 encoded value as bytes."""
52+
return base64.b64decode(self.value)
53+
54+
@property
55+
def json_value(self) -> Any:
56+
"""Decodes the text encoded data as JSON."""
57+
if self._json_data is None:
58+
self._json_data = json.loads(self.decoded_value.decode("utf-8"))
59+
return self._json_data
60+
61+
@property
62+
def headers(self) -> List[Dict[str, List[int]]]:
63+
"""The raw Kafka record headers."""
64+
return self["headers"]
65+
66+
@property
67+
def decoded_headers(self) -> Dict[str, bytes]:
68+
"""Decodes the headers as a single dictionary."""
69+
return {k: bytes(v) for chunk in self.headers for k, v in chunk.items()}
70+
71+
def get_header_value(
72+
self, name: str, default_value: Optional[Any] = None, case_sensitive: bool = True
73+
) -> Optional[bytes]:
74+
"""Get a decoded header value by name."""
75+
if case_sensitive:
76+
return self.decoded_headers.get(name, default_value)
77+
name_lower = name.lower()
78+
79+
return next(
80+
# Iterate over the dict and do a case-insensitive key comparison
81+
(value for key, value in self.decoded_headers.items() if key.lower() == name_lower),
82+
# Default value is returned if no matches was found
83+
default_value,
84+
)
85+
86+
87+
class KafkaEvent(DictWrapper):
88+
"""Self-managed Apache Kafka event trigger
89+
Documentation:
90+
--------------
91+
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
92+
"""
93+
94+
@property
95+
def event_source(self) -> str:
96+
"""The AWS service from which the Kafka event record originated."""
97+
return self["eventSource"]
98+
99+
@property
100+
def event_source_arn(self) -> Optional[str]:
101+
"""The AWS service ARN from which the Kafka event record originated."""
102+
return self.get("eventSourceArn")
103+
104+
@property
105+
def bootstrap_servers(self) -> str:
106+
"""The Kafka bootstrap URL."""
107+
return self["bootstrapServers"]
108+
109+
@property
110+
def decoded_bootstrap_servers(self) -> List[str]:
111+
"""The decoded Kafka bootstrap URL."""
112+
return self.bootstrap_servers.split(",")
113+
114+
@property
115+
def records(self) -> Iterator[KafkaEventRecord]:
116+
"""The Kafka records."""
117+
for chunk in self["records"].values():
118+
for record in chunk:
119+
yield KafkaEventRecord(record)
120+
121+
@property
122+
def record(self) -> KafkaEventRecord:
123+
"""The next Kafka record."""
124+
return next(self.records)

docs/utilities/data_classes.md

+17
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ Event Source | Data_class
7575
[Connect Contact Flow](#connect-contact-flow) | `ConnectContactFlowEvent`
7676
[DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName`
7777
[EventBridge](#eventbridge) | `EventBridgeEvent`
78+
[Kafka](#kafka) | `KafkaEvent`
7879
[Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent`
7980
[Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent`
8081
[Rabbit MQ](#rabbit-mq) | `RabbitMQEvent`
@@ -852,6 +853,22 @@ attributes values (`AttributeValue`), as well as enums for stream view type (`St
852853

853854
```
854855

856+
### Kafka
857+
858+
This example is based on the AWS docs for [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html){target="_blank"} and [self-managed Apache Kafka](https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html){target="_blank"}.
859+
860+
=== "app.py"
861+
862+
```python
863+
from aws_lambda_powertools.utilities.data_classes import event_source, KafkaEvent
864+
865+
@event_source(data_class=KafkaEvent)
866+
def lambda_handler(event: KafkaEvent, context):
867+
for record in event.records:
868+
do_something_with(record.decoded_key, record.json_value)
869+
870+
```
871+
855872
### Kinesis streams
856873

857874
Kinesis events by default contain base64 encoded data. You can use the helper function to access the data either as json

tests/events/kafkaEventMsk.json

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"eventSource":"aws:kafka",
3+
"eventSourceArn":"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
4+
"bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
5+
"records":{
6+
"mytopic-0":[
7+
{
8+
"topic":"mytopic",
9+
"partition":0,
10+
"offset":15,
11+
"timestamp":1545084650987,
12+
"timestampType":"CREATE_TIME",
13+
"key":"cmVjb3JkS2V5",
14+
"value":"eyJrZXkiOiJ2YWx1ZSJ9",
15+
"headers":[
16+
{
17+
"headerKey":[
18+
104,
19+
101,
20+
97,
21+
100,
22+
101,
23+
114,
24+
86,
25+
97,
26+
108,
27+
117,
28+
101
29+
]
30+
}
31+
]
32+
}
33+
]
34+
}
35+
}
+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
{
2+
"eventSource":"aws:aws:SelfManagedKafka",
3+
"bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
4+
"records":{
5+
"mytopic-0":[
6+
{
7+
"topic":"mytopic",
8+
"partition":0,
9+
"offset":15,
10+
"timestamp":1545084650987,
11+
"timestampType":"CREATE_TIME",
12+
"key":"cmVjb3JkS2V5",
13+
"value":"eyJrZXkiOiJ2YWx1ZSJ9",
14+
"headers":[
15+
{
16+
"headerKey":[
17+
104,
18+
101,
19+
97,
20+
100,
21+
101,
22+
114,
23+
86,
24+
97,
25+
108,
26+
117,
27+
101
28+
]
29+
}
30+
]
31+
}
32+
]
33+
}
34+
}

tests/functional/test_data_classes.py

+63
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
CloudWatchLogsEvent,
1818
CodePipelineJobEvent,
1919
EventBridgeEvent,
20+
KafkaEvent,
2021
KinesisStreamEvent,
2122
S3Event,
2223
SESEvent,
@@ -1138,6 +1139,68 @@ def test_base_proxy_event_json_body_with_base64_encoded_data():
11381139
assert event.json_body == data
11391140

11401141

1142+
def test_kafka_msk_event():
1143+
event = KafkaEvent(load_event("kafkaEventMsk.json"))
1144+
assert event.event_source == "aws:kafka"
1145+
assert (
1146+
event.event_source_arn
1147+
== "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4"
1148+
)
1149+
1150+
bootstrap_servers_raw = "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092" # noqa E501
1151+
1152+
bootstrap_servers_list = [
1153+
"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
1154+
"b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
1155+
]
1156+
1157+
assert event.bootstrap_servers == bootstrap_servers_raw
1158+
assert event.decoded_bootstrap_servers == bootstrap_servers_list
1159+
1160+
records = list(event.records)
1161+
assert len(records) == 1
1162+
record = records[0]
1163+
assert record.topic == "mytopic"
1164+
assert record.partition == 0
1165+
assert record.offset == 15
1166+
assert record.timestamp == 1545084650987
1167+
assert record.timestamp_type == "CREATE_TIME"
1168+
assert record.decoded_key == b"recordKey"
1169+
assert record.value == "eyJrZXkiOiJ2YWx1ZSJ9"
1170+
assert record.json_value == {"key": "value"}
1171+
assert record.decoded_headers == {"headerKey": b"headerValue"}
1172+
assert record.get_header_value("HeaderKey", case_sensitive=False) == b"headerValue"
1173+
1174+
1175+
def test_kafka_self_managed_event():
1176+
event = KafkaEvent(load_event("kafkaEventSelfManaged.json"))
1177+
assert event.event_source == "aws:aws:SelfManagedKafka"
1178+
1179+
bootstrap_servers_raw = "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092" # noqa E501
1180+
1181+
bootstrap_servers_list = [
1182+
"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
1183+
"b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
1184+
]
1185+
1186+
assert event.bootstrap_servers == bootstrap_servers_raw
1187+
assert event.decoded_bootstrap_servers == bootstrap_servers_list
1188+
1189+
records = list(event.records)
1190+
assert len(records) == 1
1191+
record = records[0]
1192+
assert record.topic == "mytopic"
1193+
assert record.partition == 0
1194+
assert record.offset == 15
1195+
assert record.timestamp == 1545084650987
1196+
assert record.timestamp_type == "CREATE_TIME"
1197+
assert record.decoded_key == b"recordKey"
1198+
assert record.value == "eyJrZXkiOiJ2YWx1ZSJ9"
1199+
assert record.json_value == {"key": "value"}
1200+
assert record.decoded_headers == {"headerKey": b"headerValue"}
1201+
assert record.get_header_value("HeaderKey", case_sensitive=False) == b"headerValue"
1202+
1203+
11411204
def test_kinesis_stream_event():
11421205
event = KinesisStreamEvent(load_event("kinesisStreamEvent.json"))
11431206

0 commit comments

Comments
 (0)