From b4388e43b47f893691f98ee8b8e9969439bdc8f6 Mon Sep 17 00:00:00 2001 From: Peiyu Wu Date: Thu, 22 Jul 2021 14:05:41 -0700 Subject: [PATCH 1/3] Add bootstarpServers and headers to Kafka event --- events/kafka.go | 22 ++++++++++++---------- events/kafka_test.go | 12 ++++++++++++ events/testdata/kafka-event.json | 20 +++++++++++++++++++- 3 files changed, 43 insertions(+), 11 deletions(-) diff --git a/events/kafka.go b/events/kafka.go index b61499c4..6c85ed97 100644 --- a/events/kafka.go +++ b/events/kafka.go @@ -3,17 +3,19 @@ package events type KafkaEvent struct { - EventSource string `json:"eventSource"` - EventSourceARN string `json:"eventSourceArn"` - Records map[string][]KafkaRecord `json:"records"` + EventSource string `json:"eventSource"` + EventSourceARN string `json:"eventSourceArn"` + Records map[string][]KafkaRecord `json:"records"` + BootstrapSevers string `json:"bootstrapSevers"` } type KafkaRecord struct { - Topic string `json:"topic"` - Partition int64 `json:"partition"` - Offset int64 `json:"offset"` - Timestamp MilliSecondsEpochTime `json:"timestamp"` - TimestampType string `json:"timestampType"` - Key string `json:"key,omitempty"` - Value string `json:"value,omitempty"` + Topic string `json:"topic"` + Partition int64 `json:"partition"` + Offset int64 `json:"offset"` + Timestamp MilliSecondsEpochTime `json:"timestamp"` + TimestampType string `json:"timestampType"` + Key string `json:"key,omitempty"` + Value string `json:"value,omitempty"` + Headers []map[string][]byte `json:"headers"` } diff --git a/events/kafka_test.go b/events/kafka_test.go index bea50b32..30a6d800 100644 --- a/events/kafka_test.go +++ b/events/kafka_test.go @@ -20,10 +20,22 @@ func TestKafkaEventMarshaling(t *testing.T) { t.Errorf("could not unmarshal event. details: %v", err) } + assert.Equal(t, inputEvent.BootstrapSevers, "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092") + assert.Equal(t, inputEvent.EventSource, "aws:kafka") + assert.Equal(t, EventSourceARN, "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4") for _, records := range inputEvent.Records { for _, record := range records { utc := record.Timestamp.UTC() assert.Equal(t, 2020, utc.Year()) + assert.Equal(t, record.Key, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj") + assert.Equal(t, record.Value, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj") + for header := range record.Headers { + for key, value := range header { + assert.Equal(t, key, "headerKey") + var headerValue String := string(value) + assert.Equal(t, headerValue, "headerValue") + } + } } } diff --git a/events/testdata/kafka-event.json b/events/testdata/kafka-event.json index d940e57c..beec3cd6 100644 --- a/events/testdata/kafka-event.json +++ b/events/testdata/kafka-event.json @@ -1,6 +1,7 @@ { "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4", + "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records": { "AWSKafkaTopic-0": [ { @@ -10,7 +11,24 @@ "timestamp": 1595035749700, "timestampType": "CREATE_TIME", "key": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj", - "value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj" + "value": "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj", + "headers": [ + { + "headerKey": [ + 104, + 101, + 97, + 100, + 101, + 114, + 86, + 97, + 108, + 117, + 101 + ] + } + ] } ] } From 20a2917b8ee6dcbed72a0bbd2d7a72664adda168 Mon Sep 17 00:00:00 2001 From: Peiyu Wu Date: Fri, 23 Jul 2021 02:44:53 -0700 Subject: [PATCH 2/3] Update test for Kafka bootstrapServers and record header --- events/kafka.go | 8 ++++---- events/kafka_test.go | 18 +++++------------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/events/kafka.go b/events/kafka.go index 6c85ed97..9188a3ca 100644 --- a/events/kafka.go +++ b/events/kafka.go @@ -3,10 +3,10 @@ package events type KafkaEvent struct { - EventSource string `json:"eventSource"` - EventSourceARN string `json:"eventSourceArn"` - Records map[string][]KafkaRecord `json:"records"` - BootstrapSevers string `json:"bootstrapSevers"` + EventSource string `json:"eventSource"` + EventSourceARN string `json:"eventSourceArn"` + Records map[string][]KafkaRecord `json:"records"` + BootstrapServers string `json:"bootstrapServers"` } type KafkaRecord struct { diff --git a/events/kafka_test.go b/events/kafka_test.go index 30a6d800..21cda9ac 100644 --- a/events/kafka_test.go +++ b/events/kafka_test.go @@ -20,33 +20,25 @@ func TestKafkaEventMarshaling(t *testing.T) { t.Errorf("could not unmarshal event. details: %v", err) } - assert.Equal(t, inputEvent.BootstrapSevers, "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092") + assert.Equal(t, inputEvent.BootstrapServers, "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092") assert.Equal(t, inputEvent.EventSource, "aws:kafka") - assert.Equal(t, EventSourceARN, "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4") + assert.Equal(t, inputEvent.EventSourceARN, "arn:aws:kafka:us-west-2:012345678901:cluster/ExampleMSKCluster/e9f754c6-d29a-4430-a7db-958a19fd2c54-4") for _, records := range inputEvent.Records { for _, record := range records { utc := record.Timestamp.UTC() assert.Equal(t, 2020, utc.Year()) assert.Equal(t, record.Key, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj") assert.Equal(t, record.Value, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj") - for header := range record.Headers { + + for _, header := range record.Headers { for key, value := range header { assert.Equal(t, key, "headerKey") - var headerValue String := string(value) + var headerValue string = string(value) assert.Equal(t, headerValue, "headerValue") } } } } - - // 3. serialize to JSON - outputJson, err := json.Marshal(inputEvent) - if err != nil { - t.Errorf("could not marshal event. details: %v", err) - } - - // 4. check result - assert.JSONEq(t, string(inputJson), string(outputJson)) } func TestKafkaMarshalingMalformedJson(t *testing.T) { From 5d6f36f71b55aaf6e7161e10e767cdbd080b4b5f Mon Sep 17 00:00:00 2001 From: Peiyu Wu Date: Fri, 23 Jul 2021 02:52:06 -0700 Subject: [PATCH 3/3] fix format issue --- events/kafka.go | 24 ++++++++++++------------ events/kafka_test.go | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/events/kafka.go b/events/kafka.go index 9188a3ca..12d17091 100644 --- a/events/kafka.go +++ b/events/kafka.go @@ -3,19 +3,19 @@ package events type KafkaEvent struct { - EventSource string `json:"eventSource"` - EventSourceARN string `json:"eventSourceArn"` - Records map[string][]KafkaRecord `json:"records"` - BootstrapServers string `json:"bootstrapServers"` + EventSource string `json:"eventSource"` + EventSourceARN string `json:"eventSourceArn"` + Records map[string][]KafkaRecord `json:"records"` + BootstrapServers string `json:"bootstrapServers"` } type KafkaRecord struct { - Topic string `json:"topic"` - Partition int64 `json:"partition"` - Offset int64 `json:"offset"` - Timestamp MilliSecondsEpochTime `json:"timestamp"` - TimestampType string `json:"timestampType"` - Key string `json:"key,omitempty"` - Value string `json:"value,omitempty"` - Headers []map[string][]byte `json:"headers"` + Topic string `json:"topic"` + Partition int64 `json:"partition"` + Offset int64 `json:"offset"` + Timestamp MilliSecondsEpochTime `json:"timestamp"` + TimestampType string `json:"timestampType"` + Key string `json:"key,omitempty"` + Value string `json:"value,omitempty"` + Headers []map[string][]byte `json:"headers"` } diff --git a/events/kafka_test.go b/events/kafka_test.go index 21cda9ac..27fdd91c 100644 --- a/events/kafka_test.go +++ b/events/kafka_test.go @@ -29,7 +29,7 @@ func TestKafkaEventMarshaling(t *testing.T) { assert.Equal(t, 2020, utc.Year()) assert.Equal(t, record.Key, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj") assert.Equal(t, record.Value, "OGQ1NTk2YjQtMTgxMy00MjM4LWIyNGItNmRhZDhlM2QxYzBj") - + for _, header := range record.Headers { for key, value := range header { assert.Equal(t, key, "headerKey")