Skip to content

Commit 034b79a

Browse files
docs(kafka): refactor kafka documentation (#6854)
* Refactoring documentation * Making Sonar Happy
1 parent 80f2958 commit 034b79a

23 files changed

+706
-549
lines changed

docs/utilities/kafka.md

Lines changed: 101 additions & 536 deletions
Large diffs are not rendered by default.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"eventSource":"aws:kafka",
3+
"eventSourceArn":"arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
4+
"bootstrapServers":"boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
5+
"records":{
6+
"python-with-avro-doc-3":[
7+
{
8+
"topic":"python-with-avro-doc",
9+
"partition":3,
10+
"offset":0,
11+
"timestamp":1750547105187,
12+
"timestampType":"CREATE_TIME",
13+
"key":"MTIz",
14+
"value":"AwBXT2qalUhN6oaj2CwEeaEWFFBvd2VydG9vbHMK",
15+
"headers":[
16+
17+
]
18+
}
19+
]
20+
}
21+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"eventSource":"aws:kafka",
3+
"eventSourceArn":"arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
4+
"bootstrapServers":"boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
5+
"records":{
6+
"python-with-avro-doc-5":[
7+
{
8+
"topic":"python-with-avro-doc",
9+
"partition":5,
10+
"offset":0,
11+
"timestamp":1750547462087,
12+
"timestampType":"CREATE_TIME",
13+
"key":"MTIz",
14+
"value":"eyJuYW1lIjogIlBvd2VydG9vbHMiLCAiYWdlIjogNX0=",
15+
"headers":[
16+
17+
]
18+
}
19+
]
20+
}
21+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"eventSource":"aws:kafka",
3+
"eventSourceArn":"arn:aws:kafka:eu-west-3:992382490249:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
4+
"bootstrapServers":"boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
5+
"records":{
6+
"python-with-avro-doc-5":[
7+
{
8+
"topic":"python-with-avro-doc",
9+
"partition":5,
10+
"offset":1,
11+
"timestamp":1750624373324,
12+
"timestampType":"CREATE_TIME",
13+
"key":"MTIz",
14+
"value":"Cgpwb3dlcnRvb2xzEAU=",
15+
"headers":[
16+
17+
]
18+
}
19+
]
20+
}
21+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
AWSTemplateFormatVersion: '2010-09-09'
2+
Transform: AWS::Serverless-2016-10-31
3+
Resources:
4+
KafkaConsumerFunction:
5+
Type: AWS::Serverless::Function
6+
Properties:
7+
Handler: app.lambda_handler
8+
Runtime: python3.13
9+
Timeout: 30
10+
Events:
11+
MSKEvent:
12+
Type: MSK
13+
Properties:
14+
StartingPosition: LATEST
15+
Stream: "arn:aws:lambda:eu-west-3:123456789012:event-source-mapping:11a2c814-dda3-4df8-b46f-4eeafac869ac"
16+
Topics:
17+
- my-topic-1
18+
BatchSize: 100
19+
MaximumBatchingWindowInSeconds: 5
20+
Policies:
21+
- AWSLambdaMSKExecutionRole
22+
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
AWSTemplateFormatVersion: '2010-09-09'
2+
Transform: AWS::Serverless-2016-10-31
3+
Resources:
4+
KafkaConsumerFunction:
5+
Type: AWS::Serverless::Function
6+
Properties:
7+
Handler: app.lambda_handler
8+
Runtime: python3.13
9+
Timeout: 30
10+
Events:
11+
MSKEvent:
12+
Type: MSK
13+
Properties:
14+
StartingPosition: LATEST
15+
Stream: "arn:aws:lambda:eu-west-3:123456789012:event-source-mapping:11a2c814-dda3-4df8-b46f-4eeafac869ac"
16+
Topics:
17+
- my-topic-1
18+
Policies:
19+
- AWSLambdaMSKExecutionRole
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"type": "record",
3+
"name": "User",
4+
"namespace": "com.example",
5+
"fields": [
6+
{"name": "name", "type": "string"},
7+
{"name": "age", "type": "int"}
8+
]
9+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"name": "...",
3+
"age": "..."
4+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
syntax = "proto3";
2+
3+
package com.example;
4+
5+
message User {
6+
string name = 1;
7+
int32 age = 2;
8+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from aws_lambda_powertools import Logger
2+
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
3+
from aws_lambda_powertools.utilities.typing import LambdaContext
4+
5+
logger = Logger()
6+
7+
# Define Avro schema
8+
avro_schema = """
9+
{
10+
"type": "record",
11+
"name": "User",
12+
"namespace": "com.example",
13+
"fields": [
14+
{"name": "name", "type": "string"},
15+
{"name": "age", "type": "int"}
16+
]
17+
}
18+
"""
19+
20+
schema_config = SchemaConfig(
21+
value_schema_type="AVRO",
22+
value_schema=avro_schema,
23+
)
24+
25+
26+
@kafka_consumer(schema_config=schema_config)
27+
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
28+
for record in event.records:
29+
# Log record coordinates for tracing
30+
logger.info(f"Processing message from topic '{record.topic}'")
31+
logger.info(f"Partition: {record.partition}, Offset: {record.offset}")
32+
logger.info(f"Produced at: {record.timestamp}")
33+
34+
# Process message headers
35+
logger.info(f"Headers: {record.headers}")
36+
37+
# Access the Avro deserialized message content
38+
value = record.value
39+
logger.info(f"Deserialized value: {value['name']}")
40+
41+
# For debugging, you can access the original raw data
42+
logger.info(f"Raw message: {record.original_value}")
43+
44+
return {"statusCode": 200}

0 commit comments

Comments
 (0)