Skip to content

Commit eebc06a

Browse files
authored
feat(kafka): New Kafka utility (#1898)
* Add initial code for KafkaJson and KafkaAvro request handlers. * Add deserialization via @Deserialization annotation. * Add TODOs in code. * Fix typos and make AbstractKafkaDeserializer package private. * Remove request handler implementation in favor for @Deserialization annotation. * Parse Timestamp type correctly. * Remove custom RequestHandler implementation example. * Make AspectJ version compatible with min version Java 11. * Clarify exception message when deserialization fails. * Add more advanced JSON escpaing to JSONSerializer in logging module. * Add protobuf deserialization logic and fully working example. * Add Maven profile to compile a JAR with different dependency combinations. * Add minimal kafka example. * Add missing copyright. * Add unit tests for kafka utility. * Add minimal kafka example to examples module in pom.xml. * Add some comments. * Update powertools-examples-kafka with README and make it more minimalistic. Remove powertools-examples-kafka-minimal. * Implement PR feedback from Karthik. * Fix SAM outputs. * Do not fail on unknown properties when deserializating into KafkaEvent. * Allow customers to bring their own kafka-clients dependency. * Add Kafka utility documentation. * Update project version consistently to 2.0.0. * fix: Fix bug where abbreviated _HANDLER env var did not detect the Deserialization annotation. * fix: Bug when trying to deserialize a type into itself for Lambda default behavior. We can just return the type itself. Relevant for simple String and InputStream handlers. * When falling back to Lambda default, handle conversion between InputStream and String. * Raise a runtime exception when the KafkaEvent is invalid. * docs: Announce deprecation of v1 * fix(metrics): Do not flush when no metrics were added to avoid printing root-level _aws dict (#1891) * fix(metrics): Do not flush when no metrics were added to avoid printing root-level _aws dict. * Fix pmd linting failures. * Rename docs to Kafka Consumer and add line highlights for code examples. * Fix Spotbug issues. * Reduce cognitive complexity of DeserializationUtils making it more modular and representing handler information in a simple HandlerInfo class. * Reduce cognitive complexity of AbstractKafkaDeserializer. * Enable removal policy DESTROY on e2e test for kinesis streams and SQS queues to avoid exceeding account limit. * Replace System.out with Powertools Logging. * Add notice about kafka-clients compatibility. * Add sentence stating that Avro / Protobuf classes can be autogenerated.
1 parent 8a040ac commit eebc06a

File tree

76 files changed

+7413
-21
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+7413
-21
lines changed

docs/utilities/kafka.md

Lines changed: 1001 additions & 0 deletions
Large diffs are not rendered by default.

examples/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
<module>powertools-examples-parameters/sam</module>
4040
<module>powertools-examples-parameters/sam-graalvm</module>
4141
<module>powertools-examples-serialization</module>
42+
<module>powertools-examples-kafka</module>
4243
<module>powertools-examples-batch</module>
4344
<module>powertools-examples-validation</module>
4445
<module>powertools-examples-cloudformation</module>
@@ -58,4 +59,4 @@
5859
</plugins>
5960
</build>
6061

61-
</project>
62+
</project>
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Powertools for AWS Lambda (Java) - Kafka Example
2+
3+
This project demonstrates how to use Powertools for AWS Lambda (Java) to deserialize Kafka Lambda events directly into strongly typed Kafka ConsumerRecords<K, V> using different serialization formats.
4+
5+
## Overview
6+
7+
The example showcases automatic deserialization of Kafka Lambda events into ConsumerRecords using three formats:
8+
- JSON - Using standard JSON serialization
9+
- Avro - Using Apache Avro schema-based serialization
10+
- Protobuf - Using Google Protocol Buffers serialization
11+
12+
Each format has its own Lambda function handler that demonstrates how to use the `@Deserialization` annotation with the appropriate `DeserializationType`, eliminating the need to handle complex deserialization logic manually.
13+
14+
## Build and Deploy
15+
16+
### Prerequisites
17+
- [AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html)
18+
- Java 11+
19+
- Maven
20+
21+
### Build
22+
23+
```bash
24+
# Build the application
25+
sam build
26+
```
27+
28+
### Deploy
29+
30+
```bash
31+
# Deploy the application to AWS
32+
sam deploy --guided
33+
```
34+
35+
During the guided deployment, you'll be prompted to provide values for required parameters. After deployment, SAM will output the ARNs of the deployed Lambda functions.
36+
37+
### Build with Different Serialization Formats
38+
39+
The project includes Maven profiles to build with different serialization formats:
40+
41+
```bash
42+
# Build with JSON only (no Avro or Protobuf)
43+
mvn clean package -P base
44+
45+
# Build with Avro only
46+
mvn clean package -P avro-only
47+
48+
# Build with Protobuf only
49+
mvn clean package -P protobuf-only
50+
51+
# Build with all formats (default)
52+
mvn clean package -P full
53+
```
54+
55+
## Testing
56+
57+
The `events` directory contains sample events for each serialization format:
58+
- `kafka-json-event.json` - Sample event with JSON-serialized products
59+
- `kafka-avro-event.json` - Sample event with Avro-serialized products
60+
- `kafka-protobuf-event.json` - Sample event with Protobuf-serialized products
61+
62+
You can use these events to test the Lambda functions:
63+
64+
```bash
65+
# Test the JSON deserialization function
66+
sam local invoke JsonDeserializationFunction --event events/kafka-json-event.json
67+
68+
# Test the Avro deserialization function
69+
sam local invoke AvroDeserializationFunction --event events/kafka-avro-event.json
70+
71+
# Test the Protobuf deserialization function
72+
sam local invoke ProtobufDeserializationFunction --event events/kafka-protobuf-event.json
73+
```
74+
75+
## Sample Generator Tool
76+
77+
The project includes a tool to generate sample JSON, Avro, and Protobuf serialized data. See the [tools/README.md](tools/README.md) for more information.
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
{
2+
"eventSource": "aws:kafka",
3+
"eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
4+
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
5+
"records": {
6+
"mytopic-0": [
7+
{
8+
"topic": "mytopic",
9+
"partition": 0,
10+
"offset": 15,
11+
"timestamp": 1545084650987,
12+
"timestampType": "CREATE_TIME",
13+
"key": "NDI=",
14+
"value": "0g8MTGFwdG9wUrgehes/j0A=",
15+
"headers": [
16+
{
17+
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
18+
}
19+
]
20+
},
21+
{
22+
"topic": "mytopic",
23+
"partition": 0,
24+
"offset": 16,
25+
"timestamp": 1545084650988,
26+
"timestampType": "CREATE_TIME",
27+
"key": "NDI=",
28+
"value": "1A8UU21hcnRwaG9uZVK4HoXrv4JA",
29+
"headers": [
30+
{
31+
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
32+
}
33+
]
34+
},
35+
{
36+
"topic": "mytopic",
37+
"partition": 0,
38+
"offset": 17,
39+
"timestamp": 1545084650989,
40+
"timestampType": "CREATE_TIME",
41+
"key": null,
42+
"value": "1g8USGVhZHBob25lc0jhehSuv2JA",
43+
"headers": [
44+
{
45+
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
46+
}
47+
]
48+
}
49+
]
50+
}
51+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
{
2+
"eventSource": "aws:kafka",
3+
"eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
4+
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
5+
"records": {
6+
"mytopic-0": [
7+
{
8+
"topic": "mytopic",
9+
"partition": 0,
10+
"offset": 15,
11+
"timestamp": 1545084650987,
12+
"timestampType": "CREATE_TIME",
13+
"key": "NDI=",
14+
"value": "eyJwcmljZSI6OTk5Ljk5LCJuYW1lIjoiTGFwdG9wIiwiaWQiOjEwMDF9",
15+
"headers": [
16+
{
17+
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
18+
}
19+
]
20+
},
21+
{
22+
"topic": "mytopic",
23+
"partition": 0,
24+
"offset": 15,
25+
"timestamp": 1545084650987,
26+
"timestampType": "CREATE_TIME",
27+
"key": "NDI=",
28+
"value": "eyJwcmljZSI6NTk5Ljk5LCJuYW1lIjoiU21hcnRwaG9uZSIsImlkIjoxMDAyfQ==",
29+
"headers": [
30+
{
31+
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
32+
}
33+
]
34+
},
35+
{
36+
"topic": "mytopic",
37+
"partition": 0,
38+
"offset": 15,
39+
"timestamp": 1545084650987,
40+
"timestampType": "CREATE_TIME",
41+
"key": null,
42+
"value": "eyJwcmljZSI6MTQ5Ljk5LCJuYW1lIjoiSGVhZHBob25lcyIsImlkIjoxMDAzfQ==",
43+
"headers": [
44+
{
45+
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
46+
}
47+
]
48+
}
49+
]
50+
}
51+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
{
2+
"eventSource": "aws:kafka",
3+
"eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",
4+
"bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
5+
"records": {
6+
"mytopic-0": [
7+
{
8+
"topic": "mytopic",
9+
"partition": 0,
10+
"offset": 15,
11+
"timestamp": 1545084650987,
12+
"timestampType": "CREATE_TIME",
13+
"key": "NDI=",
14+
"value": "COkHEgZMYXB0b3AZUrgehes/j0A=",
15+
"headers": [
16+
{
17+
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
18+
}
19+
]
20+
},
21+
{
22+
"topic": "mytopic",
23+
"partition": 0,
24+
"offset": 16,
25+
"timestamp": 1545084650988,
26+
"timestampType": "CREATE_TIME",
27+
"key": "NDI=",
28+
"value": "COoHEgpTbWFydHBob25lGVK4HoXrv4JA",
29+
"headers": [
30+
{
31+
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
32+
}
33+
]
34+
},
35+
{
36+
"topic": "mytopic",
37+
"partition": 0,
38+
"offset": 17,
39+
"timestamp": 1545084650989,
40+
"timestampType": "CREATE_TIME",
41+
"key": null,
42+
"value": "COsHEgpIZWFkcGhvbmVzGUjhehSuv2JA",
43+
"headers": [
44+
{
45+
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
46+
}
47+
]
48+
}
49+
]
50+
}
51+
}

0 commit comments

Comments
 (0)