Skip to content

Commit 406b8fc

Browse files
ewbankkitbmoffatt
andauthored
Add Amazon MSK event structure (#316)
* Add Amazon MSK event structure. * Address review comments. * Rename 'MSK' to 'Kafka'. Co-authored-by: Bryan Moffatt <bmoffatt@users.noreply.github.com>
1 parent f5f1839 commit 406b8fc

File tree

3 files changed

+78
-0
lines changed

3 files changed

+78
-0
lines changed

events/kafka.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
3+
package events
4+
5+
type KafkaEvent struct {
6+
EventSource string `json:"eventSource"`
7+
EventSourceARN string `json:"eventSourceArn"`
8+
Records map[string][]KafkaRecord `json:"records"`
9+
}
10+
11+
type KafkaRecord struct {
12+
Topic string `json:"topic"`
13+
Partition int64 `json:"partition"`
14+
Offset int64 `json:"offset"`
15+
Timestamp MilliSecondsEpochTime `json:"timestamp"`
16+
TimestampType string `json:"timestampType"`
17+
Key string `json:"key,omitempty"`
18+
Value string `json:"value,omitempty"`
19+
}

events/kafka_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
package events
3+
4+
import (
5+
"encoding/json"
6+
"testing"
7+
8+
"github.com/aws/aws-lambda-go/events/test"
9+
"github.com/stretchr/testify/assert"
10+
)
11+
12+
func TestKafkaEventMarshaling(t *testing.T) {
13+
14+
// 1. read JSON from file
15+
inputJson := test.ReadJSONFromFile(t, "./testdata/kafka-event.json")
16+
17+
// 2. de-serialize into Go object
18+
var inputEvent KafkaEvent
19+
if err := json.Unmarshal(inputJson, &inputEvent); err != nil {
20+
t.Errorf("could not unmarshal event. details: %v", err)
21+
}
22+
23+
for _, records := range inputEvent.Records {
24+
for _, record := range records {
25+
utc := record.Timestamp.UTC()
26+
assert.Equal(t, 2020, utc.Year())
27+
}
28+
}
29+
30+
// 3. serialize to JSON
31+
outputJson, err := json.Marshal(inputEvent)
32+
if err != nil {
33+
t.Errorf("could not marshal event. details: %v", err)
34+
}
35+
36+
// 4. check result
37+
assert.JSONEq(t, string(inputJson), string(outputJson))
38+
}
39+
40+
func TestKafkaMarshalingMalformedJson(t *testing.T) {
41+
test.TestMalformedJson(t, KafkaEvent{})
42+
}

events/testdata/kafka-event.json

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"eventSource": "aws:kafka",
3+
"eventSourceArn": "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4",
4+
"records": {
5+
"AWSKafkaTopic-0": [
6+
{
7+
"topic": "AWSKafkaTopic",
8+
"partition": 0,
9+
"offset": 0,
10+
"timestamp": 1595035749700,
11+
"timestampType": "CREATE_TIME",
12+
"key": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj",
13+
"value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj"
14+
}
15+
]
16+
}
17+
}

0 commit comments

Comments
 (0)