diff --git a/events/kafka.go b/events/kafka.go new file mode 100644 index 00000000..b61499c4 --- /dev/null +++ b/events/kafka.go @@ -0,0 +1,19 @@ +// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +package events + +type KafkaEvent struct { + EventSource string `json:"eventSource"` + EventSourceARN string `json:"eventSourceArn"` + Records map[string][]KafkaRecord `json:"records"` +} + +type KafkaRecord struct { + Topic string `json:"topic"` + Partition int64 `json:"partition"` + Offset int64 `json:"offset"` + Timestamp MilliSecondsEpochTime `json:"timestamp"` + TimestampType string `json:"timestampType"` + Key string `json:"key,omitempty"` + Value string `json:"value,omitempty"` +} diff --git a/events/kafka_test.go b/events/kafka_test.go new file mode 100644 index 00000000..bea50b32 --- /dev/null +++ b/events/kafka_test.go @@ -0,0 +1,42 @@ +// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +package events + +import ( + "encoding/json" + "testing" + + "github.com/aws/aws-lambda-go/events/test" + "github.com/stretchr/testify/assert" +) + +func TestKafkaEventMarshaling(t *testing.T) { + + // 1. read JSON from file + inputJson := test.ReadJSONFromFile(t, "./testdata/kafka-event.json") + + // 2. de-serialize into Go object + var inputEvent KafkaEvent + if err := json.Unmarshal(inputJson, &inputEvent); err != nil { + t.Errorf("could not unmarshal event. details: %v", err) + } + + for _, records := range inputEvent.Records { + for _, record := range records { + utc := record.Timestamp.UTC() + assert.Equal(t, 2020, utc.Year()) + } + } + + // 3. serialize to JSON + outputJson, err := json.Marshal(inputEvent) + if err != nil { + t.Errorf("could not marshal event. details: %v", err) + } + + // 4. check result + assert.JSONEq(t, string(inputJson), string(outputJson)) +} + +func TestKafkaMarshalingMalformedJson(t *testing.T) { + test.TestMalformedJson(t, KafkaEvent{}) +} diff --git a/events/testdata/kafka-event.json b/events/testdata/kafka-event.json new file mode 100644 index 00000000..d940e57c --- /dev/null +++ b/events/testdata/kafka-event.json @@ -0,0 +1,17 @@ +{ + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4", + "records": { + "AWSKafkaTopic-0": [ + { + "topic": "AWSKafkaTopic", + "partition": 0, + "offset": 0, + "timestamp": 1595035749700, + "timestampType": "CREATE_TIME", + "key": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj", + "value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj" + } + ] + } + }