From da62f76fce054250ad820ccf6429dc4c77086d50 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Wed, 19 Aug 2020 15:18:28 -0400 Subject: [PATCH 1/3] Add Amazon MSK event structure. --- events/msk.go | 19 +++++++++++++++ events/msk_test.go | 42 ++++++++++++++++++++++++++++++++++ events/testdata/msk-event.json | 17 ++++++++++++++ 3 files changed, 78 insertions(+) create mode 100644 events/msk.go create mode 100644 events/msk_test.go create mode 100644 events/testdata/msk-event.json diff --git a/events/msk.go b/events/msk.go new file mode 100644 index 00000000..915d9034 --- /dev/null +++ b/events/msk.go @@ -0,0 +1,19 @@ +// Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +package events + +type MskEvent struct { + EventSource string `json:"eventSource"` + EventSourceARN string `json:"eventSourceArn"` + Records map[string][]MskRecord `json:"records"` +} + +type MskRecord struct { + Topic string `json:"topic"` + Partition int64 `json:"partition"` + Offset int64 `json:"offset"` + Timestamp MilliSecondsEpochTime `json:"timestamp"` + TimestampType string `json:"timestampType"` + Key string `json:"key,omitempty"` + Value string `json:"value,omitempty"` +} diff --git a/events/msk_test.go b/events/msk_test.go new file mode 100644 index 00000000..41986a54 --- /dev/null +++ b/events/msk_test.go @@ -0,0 +1,42 @@ +// Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +package events + +import ( + "encoding/json" + "testing" + + "github.com/aws/aws-lambda-go/events/test" + "github.com/stretchr/testify/assert" +) + +func TestMskEventMarshaling(t *testing.T) { + + // 1. read JSON from file + inputJson := test.ReadJSONFromFile(t, "./testdata/msk-event.json") + + // 2. de-serialize into Go object + var inputEvent MskEvent + if err := json.Unmarshal(inputJson, &inputEvent); err != nil { + t.Errorf("could not unmarshal event. details: %v", err) + } + + for _, records := range inputEvent.Records { + for _, record := range records { + utc := record.Timestamp.UTC() + assert.Equal(t, 2020, utc.Year()) + } + } + + // 3. serialize to JSON + outputJson, err := json.Marshal(inputEvent) + if err != nil { + t.Errorf("could not marshal event. details: %v", err) + } + + // 4. check result + assert.JSONEq(t, string(inputJson), string(outputJson)) +} + +func TestMskMarshalingMalformedJson(t *testing.T) { + test.TestMalformedJson(t, MskEvent{}) +} diff --git a/events/testdata/msk-event.json b/events/testdata/msk-event.json new file mode 100644 index 00000000..d940e57c --- /dev/null +++ b/events/testdata/msk-event.json @@ -0,0 +1,17 @@ +{ + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4", + "records": { + "AWSKafkaTopic-0": [ + { + "topic": "AWSKafkaTopic", + "partition": 0, + "offset": 0, + "timestamp": 1595035749700, + "timestampType": "CREATE_TIME", + "key": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj", + "value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj" + } + ] + } + } From e052707a0102604c07bf2cd9bf195d0476f691e7 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Tue, 22 Sep 2020 16:23:05 -0400 Subject: [PATCH 2/3] Address review comments. --- events/msk.go | 8 ++++---- events/msk_test.go | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/events/msk.go b/events/msk.go index 915d9034..ca5f07e4 100644 --- a/events/msk.go +++ b/events/msk.go @@ -1,14 +1,14 @@ -// Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. package events -type MskEvent struct { +type MSKEvent struct { EventSource string `json:"eventSource"` EventSourceARN string `json:"eventSourceArn"` - Records map[string][]MskRecord `json:"records"` + Records map[string][]MSKRecord `json:"records"` } -type MskRecord struct { +type MSKRecord struct { Topic string `json:"topic"` Partition int64 `json:"partition"` Offset int64 `json:"offset"` diff --git a/events/msk_test.go b/events/msk_test.go index 41986a54..d8047e5a 100644 --- a/events/msk_test.go +++ b/events/msk_test.go @@ -1,4 +1,4 @@ -// Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. package events import ( @@ -9,13 +9,13 @@ import ( "github.com/stretchr/testify/assert" ) -func TestMskEventMarshaling(t *testing.T) { +func TestMSKEventMarshaling(t *testing.T) { // 1. read JSON from file inputJson := test.ReadJSONFromFile(t, "./testdata/msk-event.json") // 2. de-serialize into Go object - var inputEvent MskEvent + var inputEvent MSKEvent if err := json.Unmarshal(inputJson, &inputEvent); err != nil { t.Errorf("could not unmarshal event. details: %v", err) } @@ -37,6 +37,6 @@ func TestMskEventMarshaling(t *testing.T) { assert.JSONEq(t, string(inputJson), string(outputJson)) } -func TestMskMarshalingMalformedJson(t *testing.T) { - test.TestMalformedJson(t, MskEvent{}) +func TestMSKMarshalingMalformedJson(t *testing.T) { + test.TestMalformedJson(t, MSKEvent{}) } From 7dc67d6ed7279aaeea78d809b1b89280759c3551 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Sun, 27 Sep 2020 16:20:37 -0400 Subject: [PATCH 3/3] Rename 'MSK' to 'Kafka'. --- events/{msk.go => kafka.go} | 10 +++++----- events/{msk_test.go => kafka_test.go} | 10 +++++----- events/testdata/{msk-event.json => kafka-event.json} | 0 3 files changed, 10 insertions(+), 10 deletions(-) rename events/{msk.go => kafka.go} (67%) rename events/{msk_test.go => kafka_test.go} (76%) rename events/testdata/{msk-event.json => kafka-event.json} (100%) diff --git a/events/msk.go b/events/kafka.go similarity index 67% rename from events/msk.go rename to events/kafka.go index ca5f07e4..b61499c4 100644 --- a/events/msk.go +++ b/events/kafka.go @@ -2,13 +2,13 @@ package events -type MSKEvent struct { - EventSource string `json:"eventSource"` - EventSourceARN string `json:"eventSourceArn"` - Records map[string][]MSKRecord `json:"records"` +type KafkaEvent struct { + EventSource string `json:"eventSource"` + EventSourceARN string `json:"eventSourceArn"` + Records map[string][]KafkaRecord `json:"records"` } -type MSKRecord struct { +type KafkaRecord struct { Topic string `json:"topic"` Partition int64 `json:"partition"` Offset int64 `json:"offset"` diff --git a/events/msk_test.go b/events/kafka_test.go similarity index 76% rename from events/msk_test.go rename to events/kafka_test.go index d8047e5a..bea50b32 100644 --- a/events/msk_test.go +++ b/events/kafka_test.go @@ -9,13 +9,13 @@ import ( "github.com/stretchr/testify/assert" ) -func TestMSKEventMarshaling(t *testing.T) { +func TestKafkaEventMarshaling(t *testing.T) { // 1. read JSON from file - inputJson := test.ReadJSONFromFile(t, "./testdata/msk-event.json") + inputJson := test.ReadJSONFromFile(t, "./testdata/kafka-event.json") // 2. de-serialize into Go object - var inputEvent MSKEvent + var inputEvent KafkaEvent if err := json.Unmarshal(inputJson, &inputEvent); err != nil { t.Errorf("could not unmarshal event. details: %v", err) } @@ -37,6 +37,6 @@ func TestMSKEventMarshaling(t *testing.T) { assert.JSONEq(t, string(inputJson), string(outputJson)) } -func TestMSKMarshalingMalformedJson(t *testing.T) { - test.TestMalformedJson(t, MSKEvent{}) +func TestKafkaMarshalingMalformedJson(t *testing.T) { + test.TestMalformedJson(t, KafkaEvent{}) } diff --git a/events/testdata/msk-event.json b/events/testdata/kafka-event.json similarity index 100% rename from events/testdata/msk-event.json rename to events/testdata/kafka-event.json