From a1c55c37d97c0eff90113800d15c0cebb0ac98df Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 7 Jun 2019 09:55:20 -0700 Subject: [PATCH 01/28] Added gitignore --- .gitignore | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1508c4d --- /dev/null +++ b/.gitignore @@ -0,0 +1,16 @@ + +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# build output dir +bin + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out From ea98d4a5b09f2261d50fc763c9ee30cc8497c79f Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 7 Jun 2019 09:57:41 -0700 Subject: [PATCH 02/28] Project scaffolding --- Makefile | 42 ++++++++++++++++++++++++++++++++++++++++++ go.mod | 15 +++++++++++++++ go.sum | 37 +++++++++++++++++++++++++++++++++++++ scripts/mockgen.sh | 44 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 138 insertions(+) create mode 100644 Makefile create mode 100644 go.mod create mode 100644 go.sum create mode 100755 scripts/mockgen.sh diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..bb19585 --- /dev/null +++ b/Makefile @@ -0,0 +1,42 @@ +# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +ROOT := $(shell pwd) + +all: build + +SCRIPT_PATH := $(ROOT)/scripts/:${PATH} +SOURCES := $(shell find . -name '*.go') +PLUGIN_BINARY := ./bin/cloudwatch.so + +.PHONY: build +build: $(PLUGIN_BINARY) + +$(PLUGIN_BINARY): $(SOURCES) + PATH=${PATH} golint ./cloudwatch + mkdir -p ./bin + go build -buildmode c-shared -o ./bin/cloudwatch.so ./ + @echo "Built Amazon CloudWatch Logs Fluent Bit Plugin" + +.PHONY: generate +generate: $(SOURCES) + PATH=$(SCRIPT_PATH) go generate ./... + + +.PHONY: test +test: + go test -timeout=120s -v -cover ./... + +.PHONY: clean +clean: + rm -rf ./bin/* diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..b718389 --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit + +go 1.12 + +require ( + github.com/aws/aws-sdk-go v1.19.45 + github.com/cenkalti/backoff v2.1.1+incompatible + github.com/fluent/fluent-bit-go v0.0.0-20190521122216-fc386d263885 + github.com/golang/mock v1.3.1 + github.com/json-iterator/go v1.1.6 + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/sirupsen/logrus v1.4.2 + github.com/stretchr/testify v1.2.2 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e545070 --- /dev/null +++ b/go.sum @@ -0,0 +1,37 @@ +github.com/aws/aws-sdk-go v1.19.45 h1:jAxmC8qqa7mW531FDgM8Ahbqlb3zmiHgTpJU6fY3vJ0= +github.com/aws/aws-sdk-go v1.19.45/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/awslabs/amazon-kinesis-firehose-for-fluent-bit v0.0.0-20190430230011-6413ba698e77 h1:QODGNn1Mdwo6EPZc/KJ+0WnHxTbgiTuOX3kTDJC0q1Q= +github.com/cenkalti/backoff v2.1.1+incompatible h1:tKJnvO2kl0zmb/jA5UKAt4VoEVw1qxKWjE/Bpp46npY= +github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fluent/fluent-bit-go v0.0.0-20190521122216-fc386d263885 h1:U+Vvh2frbsjyNGQGFOY8PFqnnmeqrCGL6q2TMysHHH4= +github.com/fluent/fluent-bit-go v0.0.0-20190521122216-fc386d263885/go.mod h1:WQX+afhrekY9rGK+WT4xvKSlzmia9gDoLYu4GGYGASQ= +github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= +github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/ugorji/go v1.1.4 h1:j4s+tAvLfL3bZyefP2SEWmhBzmuIlH/eqNuPdFPgngw= +github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= diff --git a/scripts/mockgen.sh b/scripts/mockgen.sh new file mode 100755 index 0000000..1ed7e1d --- /dev/null +++ b/scripts/mockgen.sh @@ -0,0 +1,44 @@ +#!/bin/bash +# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the +# "License"). You may not use this file except in compliance +# with the License. A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and +# limitations under the License. +# +# This script wraps the mockgen tool and inserts licensing information. + +set -e +package=${1?Must provide package} +interfaces=${2?Must provide interface names} +outputfile=${3?Must provide an output file} + +export PATH="${GOPATH//://bin:}/bin:$PATH" + +data=$( +cat << EOF +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +$(mockgen "$package" "$interfaces") +EOF +) + +echo "$data" | goimports > "${outputfile}" From 5cf68e54b80c718c9fd171f8deb0d25140b6511e Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 7 Jun 2019 09:57:55 -0700 Subject: [PATCH 03/28] Code duplicated from https://github.com/awslabs/amazon-kinesis-firehose-for-fluent-bit/pull/1 Remove once that PR is merged! --- plugins/plugins.go | 234 ++++++++++++++++++++++++++++++++++++++++ plugins/plugins_test.go | 68 ++++++++++++ 2 files changed, 302 insertions(+) create mode 100644 plugins/plugins.go create mode 100644 plugins/plugins_test.go diff --git a/plugins/plugins.go b/plugins/plugins.go new file mode 100644 index 0000000..18ba24d --- /dev/null +++ b/plugins/plugins.go @@ -0,0 +1,234 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +// Package plugins contains functions that are useful across fluent bit plugins. +// This package will be imported by the CloudWatch Logs and Kinesis Data Streams plugins. +package plugins + +import ( + "os" + "strings" + "time" + + retry "github.com/cenkalti/backoff" + "github.com/sirupsen/logrus" +) + +const ( + fluentBitLogLevelEnvVar = "FLB_LOG_LEVEL" + sendFailureTimeoutEnvVar = "SEND_FAILURE_TIMEOUT" +) + +const ( + initialInterval = 100 // milliseconds + maxInterval = 10 // seconds +) + +// Backoff wraps github.com/cenkalti/backoff +// Wait() is called for each AWS API call that may need back off +// But backoff only occurs if StartBackoff() has previously been called +// Reset() should be called whenever backoff can end. +type Backoff struct { + doBackoff bool + expBackoff *retry.ExponentialBackOff +} + +// Reset ends the exponential backoff +func (b *Backoff) Reset() { + b.doBackoff = false + b.expBackoff.Reset() +} + +// Wait enacts the exponential backoff, if StartBackoff() has been called +func (b *Backoff) Wait() { + if b.doBackoff { + d := b.expBackoff.NextBackOff() + logrus.Debugf("[go plugin] In exponential backoff, waiting %v", d) + time.Sleep(d) + } +} + +// StartBackoff begins exponential backoff +// its a no-op if backoff has already started +func (b *Backoff) StartBackoff() { + b.doBackoff = true +} + +// NewBackoff creates a new Backoff struct with default values +func NewBackoff() *Backoff { + b := retry.NewExponentialBackOff() + b.InitialInterval = initialInterval * time.Millisecond + b.MaxElapsedTime = 0 // The backoff object never expires + b.MaxInterval = maxInterval * time.Second + return &Backoff{ + doBackoff: false, + expBackoff: b, + } +} + +// Timeout is a simple timeout for single-threaded programming +// (Goroutines are expensive in Cgo) +type Timeout struct { + timeoutFunc func(time.Duration) + duration time.Duration + stopTime time.Time + ticking bool + enabled bool +} + +// Start the timer +// this method has no effect if the timer has already been started +func (t *Timeout) Start() { + if t.enabled && !t.ticking { + t.ticking = true + t.stopTime = time.Now().Add(t.duration) + } +} + +// Reset the timer +func (t *Timeout) Reset() { + t.ticking = false +} + +// Check the timer to see if its timed out +func (t *Timeout) Check() { + if t.enabled && t.ticking { + if t.stopTime.Before(time.Now()) { + // run the timeout function + t.timeoutFunc(t.duration) + } + } +} + +// NewTimeout returns a new timeout object +// with a duration set from the env var +// if the env var is not set, then a timer is returned that is disabled (it doesn't do anything) +func NewTimeout(timeoutFunc func(duration time.Duration)) (*Timeout, error) { + if os.Getenv(sendFailureTimeoutEnvVar) != "" { + duration, err := time.ParseDuration(os.Getenv(sendFailureTimeoutEnvVar)) + if err != nil { + return nil, err + } + return &Timeout{ + timeoutFunc: timeoutFunc, + duration: duration, + ticking: false, + enabled: true, + }, nil + } + + // timeout not enabled + return &Timeout{ + timeoutFunc: timeoutFunc, + ticking: false, + enabled: false, + }, nil +} + +// SetupLogger sets up Logrus with the log level determined by the Fluent Bit Env Var +func SetupLogger() { + logrus.SetOutput(os.Stdout) + switch strings.ToUpper(os.Getenv(fluentBitLogLevelEnvVar)) { + default: + logrus.SetLevel(logrus.InfoLevel) + case "DEBUG": + logrus.SetLevel(logrus.DebugLevel) + case "INFO": + logrus.SetLevel(logrus.InfoLevel) + case "ERROR": + logrus.SetLevel(logrus.ErrorLevel) + } +} + +// DecodeMap prepares a record for JSON marshalling +// Any []byte will be base64 encoded when marshaled to JSON, so we must directly cast all []byte to string +func DecodeMap(record map[interface{}]interface{}) (map[interface{}]interface{}, error) { + for k, v := range record { + switch t := v.(type) { + case []byte: + // convert all byte slices to strings + record[k] = string(t) + case map[interface{}]interface{}: + decoded, err := DecodeMap(t) + if err != nil { + return nil, err + } + record[k] = decoded + case []interface{}: + decoded, err := decodeSlice(t) + if err != nil { + return nil, err + } + record[k] = decoded + } + } + return record, nil +} + +// DataKeys allows users to specify a list of keys in the record which they want to be sent +// all others are discarded +func DataKeys(input string, record map[interface{}]interface{}) map[interface{}]interface{} { + input = strings.TrimSpace(input) + keys := strings.Split(input, ",") + + for k := range record { + var currentKey string + switch t := k.(type) { + case []byte: + currentKey = string(t) + case string: + currentKey = t + default: + logrus.Debugf("[external plugin]: Unable to determine type of key %v\n", t) + continue + } + + if !contains(keys, currentKey) { + delete(record, k) + } + } + + return record +} + +func decodeSlice(record []interface{}) ([]interface{}, error) { + for i, v := range record { + switch t := v.(type) { + case []byte: + // convert all byte slices to strings + record[i] = string(t) + case map[interface{}]interface{}: + decoded, err := DecodeMap(t) + if err != nil { + return nil, err + } + record[i] = decoded + case []interface{}: + decoded, err := decodeSlice(t) + if err != nil { + return nil, err + } + record[i] = decoded + } + } + return record, nil +} + +func contains(s []string, e string) bool { + for _, a := range s { + if a == e { + return true + } + } + return false +} diff --git a/plugins/plugins_test.go b/plugins/plugins_test.go new file mode 100644 index 0000000..c6b5f4c --- /dev/null +++ b/plugins/plugins_test.go @@ -0,0 +1,68 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package plugins + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDecodeMap(t *testing.T) { + sliceVal := []interface{}{ + []byte("Seattle is"), + []byte("rainy"), + } + innerMap := map[interface{}]interface{}{ + "clyde": []byte("best dog"), + "slice_value": sliceVal, + } + record := map[interface{}]interface{}{ + "somekey": []byte("some value"), + "key-with-nested-value": innerMap, + } + + var err error + record, err = DecodeMap(record) + + assert.NoError(t, err, "Unexpected error calling DecodeMap") + + assertTypeIsString(t, record["somekey"]) + assertTypeIsString(t, innerMap["clyde"]) + assertTypeIsString(t, sliceVal[0]) + assertTypeIsString(t, sliceVal[1]) + +} + +func assertTypeIsString(t *testing.T, val interface{}) { + _, ok := val.(string) + assert.True(t, ok, "Expected value to be a string after call to DecodeMap") +} + +func TestDataKeys(t *testing.T) { + record := map[interface{}]interface{}{ + "this": "is a test", + "this is only": "a test", + "dumpling": "is a dog", + "pudding": "is a dog", + "sushi": "is a dog", + "why do": "people name their dogs after food...", + } + + record = DataKeys("dumpling,pudding", record) + + assert.Len(t, record, 2, "Expected record to contain 2 keys") + assert.Equal(t, record["pudding"], "is a dog", "Expected data key to have correct value") + assert.Equal(t, record["dumpling"], "is a dog", "Expected data key to have correct value") +} From bc6857aff175332908305b9fa9521659efd3deb0 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 7 Jun 2019 09:59:44 -0700 Subject: [PATCH 04/28] Fluent Bit CloudWatch plugin initial implementation --- cloudwatch/cloudwatch.go | 239 +++++++++++++++++++++++++++++ cloudwatch/cloudwatch_test.go | 110 +++++++++++++ cloudwatch/generate_mock.go | 16 ++ cloudwatch/mock_cloudwatch/mock.go | 87 +++++++++++ fluent-bit-cloudwatch.go | 124 +++++++++++++++ 5 files changed, 576 insertions(+) create mode 100644 cloudwatch/cloudwatch.go create mode 100644 cloudwatch/cloudwatch_test.go create mode 100644 cloudwatch/generate_mock.go create mode 100644 cloudwatch/mock_cloudwatch/mock.go create mode 100644 fluent-bit-cloudwatch.go diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go new file mode 100644 index 0000000..5a898b4 --- /dev/null +++ b/cloudwatch/cloudwatch.go @@ -0,0 +1,239 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package cloudwatch + +import ( + "fmt" + "os" + "time" + "unicode/utf8" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials/stscreds" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + "github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/plugins" + fluentbit "github.com/fluent/fluent-bit-go/output" + jsoniter "github.com/json-iterator/go" + "github.com/sirupsen/logrus" +) + +const ( + // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html + perEventBytes = 26 + maximumBytesPerPut = 1048576 + maximumLogEventsPerPut = 10000 +) + +type CloudWatchLogsClient interface { + CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) + CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) + PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) +} + +type logStream struct { + logEvents []*cloudwatchlogs.InputLogEvent + currentByteLength int + nextSequenceToken *string + logStreamName string +} + +type OutputPlugin struct { + region string + logGroupName string + logStreamPrefix string + client CloudWatchLogsClient + streams map[string]*logStream + backoff *plugins.Backoff + timer *plugins.Timeout +} + +// NewOutputPlugin creates a OutputPlugin object +func NewOutputPlugin(region string, logGroupName string, logStreamPrefix string, roleARN string, autoCreateGroup bool) (*OutputPlugin, error) { + sess, err := session.NewSession(&aws.Config{ + Region: aws.String(region), + }) + if err != nil { + return nil, err + } + + client := newCloudWatchLogsClient(roleARN, sess) + + timer, err := plugins.NewTimeout(func(d time.Duration) { + logrus.Errorf("[cloudwatch] timeout threshold reached: Failed to send logs for %v\n", d) + logrus.Error("[cloudwatch] Quitting Fluent Bit") + os.Exit(1) + }) + + if err != nil { + return nil, err + } + + if autoCreateGroup { + err = createLogGroup(logGroupName, client) + if err != nil { + return nil, err + } + } + + return &OutputPlugin{ + region: region, + logGroupName: logGroupName, + logStreamPrefix: logStreamPrefix, + client: client, + backoff: plugins.NewBackoff(), + timer: timer, + streams: make(map[string]*logStream), + }, nil +} + +func newCloudWatchLogsClient(roleARN string, sess *session.Session) *cloudwatchlogs.CloudWatchLogs { + if roleARN != "" { + creds := stscreds.NewCredentials(sess, roleARN) + return cloudwatchlogs.New(sess, &aws.Config{Credentials: creds}) + } + + return cloudwatchlogs.New(sess) +} + +func (output *OutputPlugin) AddEvent(tag string, record map[interface{}]interface{}, timestamp time.Time) (int, error) { + data, retCode, err := output.processRecord(record) + if err != nil { + return retCode, err + } + event := string(data) + + stream, err := output.getLogStream(tag) + if err != nil { + return fluentbit.FLB_ERROR, err + } + + if len(stream.logEvents) == maximumLogEventsPerPut || (stream.currentByteLength+cloudwatchLen(event)) >= maximumBytesPerPut { + err = output.putLogEvents(stream) + if err != nil { + return fluentbit.FLB_ERROR, err + } + } + + stream.logEvents = append(stream.logEvents, &cloudwatchlogs.InputLogEvent{ + Message: aws.String(event), + Timestamp: aws.Int64(timestamp.Unix()), + }) + stream.currentByteLength += cloudwatchLen(event) + return fluentbit.FLB_ERROR, nil +} + +func (output *OutputPlugin) getLogStream(tag string) (*logStream, error) { + // find log stream by tag + stream, ok := output.streams[tag] + if !ok { + // stream doesn't exist, create it + return output.createStream(output.logStreamPrefix+tag, tag) + } + + return stream, nil +} + +func (output *OutputPlugin) createStream(name, tag string) (*logStream, error) { + _, err := output.client.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ + LogGroupName: aws.String(output.logGroupName), + LogStreamName: aws.String(name), + }) + + if err != nil { + return nil, err + } + + stream := &logStream{ + logStreamName: name, + logEvents: make([]*cloudwatchlogs.InputLogEvent, 0, maximumLogEventsPerPut), + nextSequenceToken: nil, // sequence token not required for a new log stream + } + + output.streams[tag] = stream + + return stream, nil +} + +func createLogGroup(name string, client CloudWatchLogsClient) error { + _, err := client.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ + LogGroupName: aws.String(name), + }) + + return err +} + +func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([]byte, int, error) { + var err error + record, err = plugins.DecodeMap(record) + if err != nil { + logrus.Debugf("[cloudwatch] Failed to decode record: %v\n", record) + return nil, fluentbit.FLB_ERROR, err + } + + var json = jsoniter.ConfigCompatibleWithStandardLibrary + data, err := json.Marshal(record) + if err != nil { + logrus.Debugf("[cloudwatch] Failed to marshal record: %v\n", record) + return nil, fluentbit.FLB_ERROR, err + } + + return data, fluentbit.FLB_OK, nil +} + +func (output *OutputPlugin) Flush(tag string) error { + stream, err := output.getLogStream(tag) + if err != nil { + return err + } + return output.putLogEvents(stream) +} + +func (output *OutputPlugin) putLogEvents(stream *logStream) error { + fmt.Println("Sending to CloudWatch") + response, err := output.client.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ + LogEvents: stream.logEvents, + LogGroupName: aws.String(output.logGroupName), + LogStreamName: aws.String(stream.logStreamName), + SequenceToken: stream.nextSequenceToken, + }) + if err != nil { + fmt.Println(err) + return err + } + fmt.Printf("Sent %d events to CloudWatch\n", len(stream.logEvents)) + + stream.nextSequenceToken = response.NextSequenceToken + stream.logEvents = stream.logEvents[:0] + stream.currentByteLength = 0 + + return nil +} + +// effectiveLen counts the effective number of bytes in the string, after +// UTF-8 normalization. UTF-8 normalization includes replacing bytes that do +// not constitute valid UTF-8 encoded Unicode codepoints with the Unicode +// replacement codepoint U+FFFD (a 3-byte UTF-8 sequence, represented in Go as +// utf8.RuneError) +func effectiveLen(line string) int { + effectiveBytes := 0 + for _, rune := range line { + effectiveBytes += utf8.RuneLen(rune) + } + return effectiveBytes +} + +func cloudwatchLen(event string) int { + return effectiveLen(event) + perEventBytes +} diff --git a/cloudwatch/cloudwatch_test.go b/cloudwatch/cloudwatch_test.go new file mode 100644 index 0000000..c694ad1 --- /dev/null +++ b/cloudwatch/cloudwatch_test.go @@ -0,0 +1,110 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package cloudwatch + +import ( + "os" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + "github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch/mock_cloudwatch" + "github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/plugins" + "github.com/golang/mock/gomock" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" +) + +const ( + testRegion = "us-west-2" + testLogGroup = "my-logs" + testLogStreamPrefix = "my-prefix" + testTag = "tag" +) + +func TestAddEvent(t *testing.T) { + timer, _ := plugins.NewTimeout(func(d time.Duration) { + logrus.Errorf("[firehose] timeout threshold reached: Failed to send logs for %v\n", d) + logrus.Error("[firehose] Quitting Fluent Bit") + os.Exit(1) + }) + + ctrl := gomock.NewController(t) + mockCloudWatch := mock_cloudwatch.NewMockCloudWatchLogsClient(ctrl) + + mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") + }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil) + + output := OutputPlugin{ + region: testRegion, + logGroupName: testLogGroup, + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + backoff: plugins.NewBackoff(), + timer: timer, + streams: make(map[string]*logStream), + } + + record := map[interface{}]interface{}{ + "somekey": []byte("some value"), + } + + output.AddEvent(testTag, record, time.Now()) + +} + +func TestAddEventAndFlush(t *testing.T) { + timer, _ := plugins.NewTimeout(func(d time.Duration) { + logrus.Errorf("[firehose] timeout threshold reached: Failed to send logs for %v\n", d) + logrus.Error("[firehose] Quitting Fluent Bit") + os.Exit(1) + }) + + ctrl := gomock.NewController(t) + mockCloudWatch := mock_cloudwatch.NewMockCloudWatchLogsClient(ctrl) + + gomock.InOrder( + mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") + }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil), + mockCloudWatch.EXPECT().PutLogEvents(gomock.Any()).Do(func(input *cloudwatchlogs.PutLogEventsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") + }).Return(&cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String("token"), + }, nil), + ) + + output := OutputPlugin{ + region: testRegion, + logGroupName: testLogGroup, + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + backoff: plugins.NewBackoff(), + timer: timer, + streams: make(map[string]*logStream), + } + + record := map[interface{}]interface{}{ + "somekey": []byte("some value"), + } + + output.AddEvent(testTag, record, time.Now()) + output.Flush(testTag) + +} diff --git a/cloudwatch/generate_mock.go b/cloudwatch/generate_mock.go new file mode 100644 index 0000000..a1d2614 --- /dev/null +++ b/cloudwatch/generate_mock.go @@ -0,0 +1,16 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package cloudwatch + +//go:generate mockgen.sh github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch CloudWatchLogsClient mock_cloudwatch/mock.go diff --git a/cloudwatch/mock_cloudwatch/mock.go b/cloudwatch/mock_cloudwatch/mock.go new file mode 100644 index 0000000..4c5cfb2 --- /dev/null +++ b/cloudwatch/mock_cloudwatch/mock.go @@ -0,0 +1,87 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch (interfaces: CloudWatchLogsClient) + +// Package mock_cloudwatch is a generated GoMock package. +package mock_cloudwatch + +import ( + reflect "reflect" + + cloudwatchlogs "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + gomock "github.com/golang/mock/gomock" +) + +// MockCloudWatchLogsClient is a mock of CloudWatchLogsClient interface +type MockCloudWatchLogsClient struct { + ctrl *gomock.Controller + recorder *MockCloudWatchLogsClientMockRecorder +} + +// MockCloudWatchLogsClientMockRecorder is the mock recorder for MockCloudWatchLogsClient +type MockCloudWatchLogsClientMockRecorder struct { + mock *MockCloudWatchLogsClient +} + +// NewMockCloudWatchLogsClient creates a new mock instance +func NewMockCloudWatchLogsClient(ctrl *gomock.Controller) *MockCloudWatchLogsClient { + mock := &MockCloudWatchLogsClient{ctrl: ctrl} + mock.recorder = &MockCloudWatchLogsClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockCloudWatchLogsClient) EXPECT() *MockCloudWatchLogsClientMockRecorder { + return m.recorder +} + +// CreateLogGroup mocks base method +func (m *MockCloudWatchLogsClient) CreateLogGroup(arg0 *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { + ret := m.ctrl.Call(m, "CreateLogGroup", arg0) + ret0, _ := ret[0].(*cloudwatchlogs.CreateLogGroupOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateLogGroup indicates an expected call of CreateLogGroup +func (mr *MockCloudWatchLogsClientMockRecorder) CreateLogGroup(arg0 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateLogGroup", reflect.TypeOf((*MockCloudWatchLogsClient)(nil).CreateLogGroup), arg0) +} + +// CreateLogStream mocks base method +func (m *MockCloudWatchLogsClient) CreateLogStream(arg0 *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + ret := m.ctrl.Call(m, "CreateLogStream", arg0) + ret0, _ := ret[0].(*cloudwatchlogs.CreateLogStreamOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateLogStream indicates an expected call of CreateLogStream +func (mr *MockCloudWatchLogsClientMockRecorder) CreateLogStream(arg0 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateLogStream", reflect.TypeOf((*MockCloudWatchLogsClient)(nil).CreateLogStream), arg0) +} + +// PutLogEvents mocks base method +func (m *MockCloudWatchLogsClient) PutLogEvents(arg0 *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + ret := m.ctrl.Call(m, "PutLogEvents", arg0) + ret0, _ := ret[0].(*cloudwatchlogs.PutLogEventsOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PutLogEvents indicates an expected call of PutLogEvents +func (mr *MockCloudWatchLogsClientMockRecorder) PutLogEvents(arg0 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutLogEvents", reflect.TypeOf((*MockCloudWatchLogsClient)(nil).PutLogEvents), arg0) +} diff --git a/fluent-bit-cloudwatch.go b/fluent-bit-cloudwatch.go new file mode 100644 index 0000000..18762e7 --- /dev/null +++ b/fluent-bit-cloudwatch.go @@ -0,0 +1,124 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package main + +import ( + "C" + "fmt" + "unsafe" + + "time" + + "github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch" + "github.com/fluent/fluent-bit-go/output" + + "github.com/sirupsen/logrus" +) + +var ( + cloudwatchLogs *cloudwatch.OutputPlugin +) + +//export FLBPluginRegister +func FLBPluginRegister(ctx unsafe.Pointer) int { + return output.FLBPluginRegister(ctx, "cloudwatch", "AWS CloudWatch Fluent Bit Plugin!") +} + +//export FLBPluginInit +func FLBPluginInit(ctx unsafe.Pointer) int { + logGroup := output.FLBPluginConfigKey(ctx, "log_group") + fmt.Printf("[cloudwatch] plugin parameter log_group = '%s'\n", logGroup) + logStreamPrefix := output.FLBPluginConfigKey(ctx, "log_stream_prefix") + fmt.Printf("[cloudwatch] plugin parameter log_stream_prefix = '%s'\n", logStreamPrefix) + region := output.FLBPluginConfigKey(ctx, "region") + fmt.Printf("[cloudwatch] plugin parameter = '%s'\n", region) + roleARN := output.FLBPluginConfigKey(ctx, "role_arn") + logrus.Infof("[firehose] plugin parameter role_arn = '%s'\n", roleARN) + + if logGroup == "" || logStreamPrefix == "" || region == "" { + return output.FLB_ERROR + } + + var err error + cloudwatchLogs, err = cloudwatch.NewOutputPlugin(region, logGroup, logStreamPrefix, roleARN, true) + if err != nil { + fmt.Println(err) + return output.FLB_ERROR + } + return output.FLB_OK +} + +//export FLBPluginFlush +func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { + var count int + var ret int + var ts interface{} + var record map[interface{}]interface{} + + // Create Fluent Bit decoder + dec := output.NewDecoder(data, int(length)) + + fluentTag := C.GoString(tag) + logrus.Debugf("[firehose] Found logs with tag: %s\n", fluentTag) + + for { + // Extract Record + ret, ts, record = output.GetRecord(dec) + if ret != 0 { + break + } + + var timestamp time.Time + switch tts := ts.(type) { + case output.FLBTime: + timestamp = tts.Time + case uint64: + // From our observation, when ts is of type uint64 it appears to + // be the amount of seconds since unix epoch. + timestamp = time.Unix(int64(tts), 0) + default: + timestamp = time.Now() + } + + retCode, err := cloudwatchLogs.AddEvent(fluentTag, record, timestamp) + if err != nil { + logrus.Error(err) + return retCode + } + count++ + } + err := cloudwatchLogs.Flush(fluentTag) + if err != nil { + fmt.Println(err) + // TODO: Better error handling + return output.FLB_RETRY + } + + logrus.Debugf("[firehose] Processed %d events with tag %s\n", count, fluentTag) + + // Return options: + // + // output.FLB_OK = data have been processed. + // output.FLB_ERROR = unrecoverable error, do not try this again. + // output.FLB_RETRY = retry to flush later. + return output.FLB_OK +} + +//export FLBPluginExit +func FLBPluginExit() int { + return output.FLB_OK +} + +func main() { +} From 3525abb0c3565e384f1cacac96ca824e76d19df4 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Wed, 12 Jun 2019 21:00:29 -0700 Subject: [PATCH 05/28] WIP: Adding all other options --- cloudwatch/cloudwatch.go | 159 +++++++++++++++++++++++++++++++++------ fluent-bit-cloudwatch.go | 56 ++++++++++---- 2 files changed, 175 insertions(+), 40 deletions(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 5a898b4..f99a710 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -20,6 +20,7 @@ import ( "unicode/utf8" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials/stscreds" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" @@ -39,6 +40,7 @@ const ( type CloudWatchLogsClient interface { CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) + DescribeLogStreams(input *cloudwatchlogs.DescribeLogStreamsInput) (*cloudwatchlogs.DescribeLogStreamsOutput, error) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) } @@ -50,25 +52,51 @@ type logStream struct { } type OutputPlugin struct { - region string logGroupName string logStreamPrefix string + logStreamName string + logKey string client CloudWatchLogsClient streams map[string]*logStream backoff *plugins.Backoff timer *plugins.Timeout } +type OutputPluginConfig struct { + Region string + LogGroupName string + LogStreamPrefix string + LogStreamName string + LogKey string + RoleARN string + AutoCreateGroup bool +} + +func (config OutputPluginConfig) Validate() error { + errorStr := "%s is a required parameter" + if config.Region == "" { + return fmt.Errorf(errorStr, "region") + } + if config.LogGroupName == "" { + return fmt.Errorf(errorStr, "log_group_name") + } + if config.LogStreamName == "" && config.LogStreamPrefix == "" { + return fmt.Errorf("log_stream_name or log_stream_prefix is required") + } + + return nil +} + // NewOutputPlugin creates a OutputPlugin object -func NewOutputPlugin(region string, logGroupName string, logStreamPrefix string, roleARN string, autoCreateGroup bool) (*OutputPlugin, error) { +func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) { sess, err := session.NewSession(&aws.Config{ - Region: aws.String(region), + Region: aws.String(config.Region), }) if err != nil { return nil, err } - client := newCloudWatchLogsClient(roleARN, sess) + client := newCloudWatchLogsClient(config.RoleARN, sess) timer, err := plugins.NewTimeout(func(d time.Duration) { logrus.Errorf("[cloudwatch] timeout threshold reached: Failed to send logs for %v\n", d) @@ -80,17 +108,18 @@ func NewOutputPlugin(region string, logGroupName string, logStreamPrefix string, return nil, err } - if autoCreateGroup { - err = createLogGroup(logGroupName, client) + if config.AutoCreateGroup { + err = createLogGroup(config.LogGroupName, client) if err != nil { return nil, err } } return &OutputPlugin{ - region: region, - logGroupName: logGroupName, - logStreamPrefix: logStreamPrefix, + logGroupName: config.LogGroupName, + logStreamPrefix: config.LogStreamPrefix, + logStreamName: config.LogStreamName, + logKey: config.LogKey, client: client, backoff: plugins.NewBackoff(), timer: timer, @@ -107,22 +136,31 @@ func newCloudWatchLogsClient(roleARN string, sess *session.Session) *cloudwatchl return cloudwatchlogs.New(sess) } -func (output *OutputPlugin) AddEvent(tag string, record map[interface{}]interface{}, timestamp time.Time) (int, error) { - data, retCode, err := output.processRecord(record) +// AddEvent accepts a record and adds it to the buffer for its stream, flushing the buffer if it is full +// the return value is one of: FLB_OK, FLB_RETRY +// API Errors lead to an FLB_RETRY, and all other errors are logged, the record is discarded and FLB_OK is returned +func (output *OutputPlugin) AddEvent(tag string, record map[interface{}]interface{}, timestamp time.Time) int { + data, err := output.processRecord(record) if err != nil { - return retCode, err + logrus.Errorf("[cloudwatch] %v\n", err) + // discard this single bad record and let the batch continue + return fluentbit.FLB_OK } event := string(data) stream, err := output.getLogStream(tag) if err != nil { - return fluentbit.FLB_ERROR, err + logrus.Errorf("[cloudwatch] Error: %v\n", err) + // an error means that the log stream was not created; this is retryable + return fluentbit.FLB_RETRY } if len(stream.logEvents) == maximumLogEventsPerPut || (stream.currentByteLength+cloudwatchLen(event)) >= maximumBytesPerPut { err = output.putLogEvents(stream) if err != nil { - return fluentbit.FLB_ERROR, err + logrus.Errorf("[cloudwatch] %v\n", err) + // send failures are retryable + return fluentbit.FLB_RETRY } } @@ -131,7 +169,7 @@ func (output *OutputPlugin) AddEvent(tag string, record map[interface{}]interfac Timestamp: aws.Int64(timestamp.Unix()), }) stream.currentByteLength += cloudwatchLen(event) - return fluentbit.FLB_ERROR, nil + return fluentbit.FLB_OK } func (output *OutputPlugin) getLogStream(tag string) (*logStream, error) { @@ -139,13 +177,65 @@ func (output *OutputPlugin) getLogStream(tag string) (*logStream, error) { stream, ok := output.streams[tag] if !ok { // stream doesn't exist, create it - return output.createStream(output.logStreamPrefix+tag, tag) + stream, err := output.createStream(tag) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == cloudwatchlogs.ErrCodeResourceAlreadyExistsException { + // existing stream + return output.existingLogStream(tag, nil) + } + } + } + + return stream, err } return stream, nil } -func (output *OutputPlugin) createStream(name, tag string) (*logStream, error) { +func (output *OutputPlugin) existingLogStream(tag string, nextToken *string) (*logStream, error) { + name := output.getStreamName(tag) + resp, err := output.client.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{ + LogGroupName: aws.String(output.logGroupName), + LogStreamNamePrefix: aws.String(name), + NextToken: nextToken, + }) + + if err != nil { + return nil, err + } + + for _, result := range resp.LogStreams { + if aws.StringValue(result.LogStreamName) == name { + stream := &logStream{ + logStreamName: name, + logEvents: make([]*cloudwatchlogs.InputLogEvent, 0, maximumLogEventsPerPut), + nextSequenceToken: result.UploadSequenceToken, + } + + output.streams[tag] = stream + + return stream, nil + } + } + + if resp.NextToken != nil { + return output.existingLogStream(tag, resp.NextToken) + } + + return nil, fmt.Errorf("Error: Does not compute: Log Stream %s could not be created, but also could not be found in the log group.", name) +} + +func (output *OutputPlugin) getStreamName(tag string) string { + if output.logStreamPrefix != "" { + return output.logStreamPrefix + tag + } else { + return output.logStreamName + } +} + +func (output *OutputPlugin) createStream(tag string) (*logStream, error) { + name := output.getStreamName(tag) _, err := output.client.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ LogGroupName: aws.String(output.logGroupName), LogStreamName: aws.String(name), @@ -174,22 +264,45 @@ func createLogGroup(name string, client CloudWatchLogsClient) error { return err } -func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([]byte, int, error) { +func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([]byte, error) { var err error record, err = plugins.DecodeMap(record) if err != nil { logrus.Debugf("[cloudwatch] Failed to decode record: %v\n", record) - return nil, fluentbit.FLB_ERROR, err + return nil, err } var json = jsoniter.ConfigCompatibleWithStandardLibrary data, err := json.Marshal(record) if err != nil { logrus.Debugf("[cloudwatch] Failed to marshal record: %v\n", record) - return nil, fluentbit.FLB_ERROR, err + return nil, err + } + + return data, nil +} + +// Implements the log_key option, which allows customers to only send the value of a given key to CW Logs +func logKey(record map[interface{}]interface{}, logKey string) (*interface{}, error) { + for key, val := range record { + var currentKey string + switch t := key.(type) { + case []byte: + currentKey = string(t) + case string: + currentKey = t + default: + logrus.Debugf("[go plugin]: Unable to determine type of key %v\n", t) + continue + } + + if logKey == currentKey { + return &val, nil + } + } - return data, fluentbit.FLB_OK, nil + return nil, fmt.Errorf("Failed to find key %s specified by log_key option in log record: %v", logKey, record) } func (output *OutputPlugin) Flush(tag string) error { @@ -201,7 +314,6 @@ func (output *OutputPlugin) Flush(tag string) error { } func (output *OutputPlugin) putLogEvents(stream *logStream) error { - fmt.Println("Sending to CloudWatch") response, err := output.client.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ LogEvents: stream.logEvents, LogGroupName: aws.String(output.logGroupName), @@ -209,10 +321,9 @@ func (output *OutputPlugin) putLogEvents(stream *logStream) error { SequenceToken: stream.nextSequenceToken, }) if err != nil { - fmt.Println(err) return err } - fmt.Printf("Sent %d events to CloudWatch\n", len(stream.logEvents)) + logrus.Debugf("Sent %d events to CloudWatch\n", len(stream.logEvents)) stream.nextSequenceToken = response.NextSequenceToken stream.logEvents = stream.logEvents[:0] diff --git a/fluent-bit-cloudwatch.go b/fluent-bit-cloudwatch.go index 18762e7..44d693d 100644 --- a/fluent-bit-cloudwatch.go +++ b/fluent-bit-cloudwatch.go @@ -25,6 +25,7 @@ import ( "github.com/sirupsen/logrus" ) +import "strings" var ( cloudwatchLogs *cloudwatch.OutputPlugin @@ -35,25 +36,49 @@ func FLBPluginRegister(ctx unsafe.Pointer) int { return output.FLBPluginRegister(ctx, "cloudwatch", "AWS CloudWatch Fluent Bit Plugin!") } +func getConfiguration(ctx unsafe.Pointer) cloudwatch.OutputPluginConfig { + config := cloudwatch.OutputPluginConfig{} + config.LogGroupName = output.FLBPluginConfigKey(ctx, "log_group_name") + logrus.Infof("[cloudwatch] plugin parameter log_group = '%s'\n", config.LogGroupName) + config.LogStreamPrefix = output.FLBPluginConfigKey(ctx, "log_stream_prefix") + logrus.Infof("[cloudwatch] plugin parameter log_stream_prefix = '%s'\n", config.LogStreamPrefix) + config.LogStreamName = output.FLBPluginConfigKey(ctx, "log_stream_name") + logrus.Infof("[cloudwatch] plugin parameter log_stream = '%s'\n", config.LogStreamName) + config.Region = output.FLBPluginConfigKey(ctx, "region") + logrus.Infof("[cloudwatch] plugin parameter = '%s'\n", config.Region) + config.LogKey = output.FLBPluginConfigKey(ctx, "log_key") + logrus.Infof("[cloudwatch] plugin parameter log_key = '%s'\n", config.LogKey) + config.RoleARN = output.FLBPluginConfigKey(ctx, "role_arn") + logrus.Infof("[cloudwatch] plugin parameter role_arn = '%s'\n", config.RoleARN) + config.AutoCreateGroup = getBoolParam(ctx, "auto_create_group", false) + logrus.Infof("[cloudwatch] plugin parameter auto_create_group = '%s'\n", config.AutoCreateGroup) + + return config +} + +func getBoolParam(ctx unsafe.Pointer, param string, defaultVal bool) bool { + val := strings.ToLower(output.FLBPluginConfigKey(ctx, param)) + if val == "true" { + return true + } else if val == "false" { + return false + } else { + return defaultVal + } +} + //export FLBPluginInit func FLBPluginInit(ctx unsafe.Pointer) int { - logGroup := output.FLBPluginConfigKey(ctx, "log_group") - fmt.Printf("[cloudwatch] plugin parameter log_group = '%s'\n", logGroup) - logStreamPrefix := output.FLBPluginConfigKey(ctx, "log_stream_prefix") - fmt.Printf("[cloudwatch] plugin parameter log_stream_prefix = '%s'\n", logStreamPrefix) - region := output.FLBPluginConfigKey(ctx, "region") - fmt.Printf("[cloudwatch] plugin parameter = '%s'\n", region) - roleARN := output.FLBPluginConfigKey(ctx, "role_arn") - logrus.Infof("[firehose] plugin parameter role_arn = '%s'\n", roleARN) - - if logGroup == "" || logStreamPrefix == "" || region == "" { + config := getConfiguration(ctx) + err := config.Validate() + if err != nil { + logrus.Error(err) return output.FLB_ERROR } - var err error - cloudwatchLogs, err = cloudwatch.NewOutputPlugin(region, logGroup, logStreamPrefix, roleARN, true) + cloudwatchLogs, err = cloudwatch.NewOutputPlugin(config) if err != nil { - fmt.Println(err) + logrus.Error(err) return output.FLB_ERROR } return output.FLB_OK @@ -91,9 +116,8 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { timestamp = time.Now() } - retCode, err := cloudwatchLogs.AddEvent(fluentTag, record, timestamp) - if err != nil { - logrus.Error(err) + retCode := cloudwatchLogs.AddEvent(fluentTag, record, timestamp) + if retCode != output.FLB_OK { return retCode } count++ From cd25350b1076dda56e8940dee0a2a34d5d4ccbe8 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sat, 15 Jun 2019 23:10:09 -0700 Subject: [PATCH 06/28] Added a test for the case where multiple describes are needed to find the log stream --- cloudwatch/cloudwatch.go | 48 ++++++++--- cloudwatch/cloudwatch_test.go | 132 +++++++++++++++++++++++++++-- cloudwatch/generate_mock.go | 2 +- cloudwatch/mock_cloudwatch/mock.go | 55 +++++++----- fluent-bit-cloudwatch.go | 2 +- 5 files changed, 199 insertions(+), 40 deletions(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index f99a710..a11aa2f 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -16,6 +16,7 @@ package cloudwatch import ( "fmt" "os" + "strings" "time" "unicode/utf8" @@ -37,7 +38,8 @@ const ( maximumLogEventsPerPut = 10000 ) -type CloudWatchLogsClient interface { +// LogsClient contains the CloudWatch API calls used by this plugin +type LogsClient interface { CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) DescribeLogStreams(input *cloudwatchlogs.DescribeLogStreamsInput) (*cloudwatchlogs.DescribeLogStreamsOutput, error) @@ -51,17 +53,19 @@ type logStream struct { logStreamName string } +// OutputPlugin is the CloudWatch Logs Fluent Bit output plugin type OutputPlugin struct { logGroupName string logStreamPrefix string logStreamName string logKey string - client CloudWatchLogsClient + client LogsClient streams map[string]*logStream backoff *plugins.Backoff timer *plugins.Timeout } +// OutputPluginConfig is the input information used by NewOutputPlugin to create a new OutputPlugin type OutputPluginConfig struct { Region string LogGroupName string @@ -72,6 +76,7 @@ type OutputPluginConfig struct { AutoCreateGroup bool } +// Validate checks the configuration input for an OutputPlugin instances func (config OutputPluginConfig) Validate() error { errorStr := "%s is a required parameter" if config.Region == "" { @@ -146,11 +151,12 @@ func (output *OutputPlugin) AddEvent(tag string, record map[interface{}]interfac // discard this single bad record and let the batch continue return fluentbit.FLB_OK } - event := string(data) + + event := logString(data) stream, err := output.getLogStream(tag) if err != nil { - logrus.Errorf("[cloudwatch] Error: %v\n", err) + logrus.Errorf("[cloudwatch] %v\n", err) // an error means that the log stream was not created; this is retryable return fluentbit.FLB_RETRY } @@ -223,15 +229,16 @@ func (output *OutputPlugin) existingLogStream(tag string, nextToken *string) (*l return output.existingLogStream(tag, resp.NextToken) } - return nil, fmt.Errorf("Error: Does not compute: Log Stream %s could not be created, but also could not be found in the log group.", name) + return nil, fmt.Errorf("error: does not compute: Log Stream %s could not be created, but also could not be found in the log group", name) } func (output *OutputPlugin) getStreamName(tag string) string { + name := output.logStreamName if output.logStreamPrefix != "" { - return output.logStreamPrefix + tag - } else { - return output.logStreamName + name = output.logStreamPrefix + tag } + + return name } func (output *OutputPlugin) createStream(tag string) (*logStream, error) { @@ -256,7 +263,7 @@ func (output *OutputPlugin) createStream(tag string) (*logStream, error) { return stream, nil } -func createLogGroup(name string, client CloudWatchLogsClient) error { +func createLogGroup(name string, client LogsClient) error { _, err := client.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{ LogGroupName: aws.String(name), }) @@ -264,6 +271,12 @@ func createLogGroup(name string, client CloudWatchLogsClient) error { return err } +// Takes the byte slice and returns a string +// Also removes leading and trailing whitespace +func logString(record []byte) string { + return strings.TrimSpace(string(record)) +} + func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([]byte, error) { var err error record, err = plugins.DecodeMap(record) @@ -273,9 +286,21 @@ func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([ } var json = jsoniter.ConfigCompatibleWithStandardLibrary - data, err := json.Marshal(record) + var data []byte + + if output.logKey != "" { + log, err := logKey(record, output.logKey) + if err != nil { + return nil, err + } + + data, err = json.Marshal(log) + } else { + data, err = json.Marshal(record) + } + if err != nil { - logrus.Debugf("[cloudwatch] Failed to marshal record: %v\n", record) + logrus.Debugf("[cloudwatch] Failed to marshal record: %v\nLog Key: %s\n", record, output.logKey) return nil, err } @@ -305,6 +330,7 @@ func logKey(record map[interface{}]interface{}, logKey string) (*interface{}, er return nil, fmt.Errorf("Failed to find key %s specified by log_key option in log record: %v", logKey, record) } +// Flush sends the current buffer of records (for the stream that corresponds with the given tag) func (output *OutputPlugin) Flush(tag string) error { stream, err := output.getLogStream(tag) if err != nil { diff --git a/cloudwatch/cloudwatch_test.go b/cloudwatch/cloudwatch_test.go index c694ad1..9d57f4b 100644 --- a/cloudwatch/cloudwatch_test.go +++ b/cloudwatch/cloudwatch_test.go @@ -14,14 +14,17 @@ package cloudwatch import ( + "fmt" "os" "testing" "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch/mock_cloudwatch" "github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/plugins" + fluentbit "github.com/fluent/fluent-bit-go/output" "github.com/golang/mock/gomock" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -32,6 +35,7 @@ const ( testLogGroup = "my-logs" testLogStreamPrefix = "my-prefix" testTag = "tag" + testNextToken = "next token" ) func TestAddEvent(t *testing.T) { @@ -42,7 +46,7 @@ func TestAddEvent(t *testing.T) { }) ctrl := gomock.NewController(t) - mockCloudWatch := mock_cloudwatch.NewMockCloudWatchLogsClient(ctrl) + mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") @@ -50,7 +54,6 @@ func TestAddEvent(t *testing.T) { }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil) output := OutputPlugin{ - region: testRegion, logGroupName: testLogGroup, logStreamPrefix: testLogStreamPrefix, client: mockCloudWatch, @@ -63,7 +66,124 @@ func TestAddEvent(t *testing.T) { "somekey": []byte("some value"), } - output.AddEvent(testTag, record, time.Now()) + retCode := output.AddEvent(testTag, record, time.Now()) + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to FLB_OK") + +} + +// Existing Log Stream that requires 2 API calls to find +func TestAddEventExistingStream(t *testing.T) { + timer, _ := plugins.NewTimeout(func(d time.Duration) { + logrus.Errorf("[firehose] timeout threshold reached: Failed to send logs for %v\n", d) + logrus.Error("[firehose] Quitting Fluent Bit") + os.Exit(1) + }) + + ctrl := gomock.NewController(t) + mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) + + gomock.InOrder( + mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") + }).Return(nil, awserr.New(cloudwatchlogs.ErrCodeResourceAlreadyExistsException, "Log Stream already exists", fmt.Errorf("API Error"))), + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamNamePrefix), testLogStreamPrefix+testTag, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{ + LogStreams: []*cloudwatchlogs.LogStream{ + &cloudwatchlogs.LogStream{ + LogStreamName: aws.String("wrong stream"), + }, + }, + NextToken: aws.String(testNextToken), + }, nil), + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamNamePrefix), testLogStreamPrefix+testTag, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.NextToken), testNextToken, "Expected next token to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{ + LogStreams: []*cloudwatchlogs.LogStream{ + &cloudwatchlogs.LogStream{ + LogStreamName: aws.String(testLogStreamPrefix + testTag), + }, + }, + NextToken: aws.String(testNextToken), + }, nil), + ) + + output := OutputPlugin{ + logGroupName: testLogGroup, + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + backoff: plugins.NewBackoff(), + timer: timer, + streams: make(map[string]*logStream), + } + + record := map[interface{}]interface{}{ + "somekey": []byte("some value"), + } + + retCode := output.AddEvent(testTag, record, time.Now()) + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to FLB_OK") + +} + +func TestAddEventExistingStreamNotFound(t *testing.T) { + timer, _ := plugins.NewTimeout(func(d time.Duration) { + logrus.Errorf("[firehose] timeout threshold reached: Failed to send logs for %v\n", d) + logrus.Error("[firehose] Quitting Fluent Bit") + os.Exit(1) + }) + + ctrl := gomock.NewController(t) + mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) + + gomock.InOrder( + mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log group name to match") + }).Return(nil, awserr.New(cloudwatchlogs.ErrCodeResourceAlreadyExistsException, "Log Stream already exists", fmt.Errorf("API Error"))), + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamNamePrefix), testLogStreamPrefix+testTag, "Expected log group name to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{ + LogStreams: []*cloudwatchlogs.LogStream{ + &cloudwatchlogs.LogStream{ + LogStreamName: aws.String("wrong stream"), + }, + }, + NextToken: aws.String(testNextToken), + }, nil), + mockCloudWatch.EXPECT().DescribeLogStreams(gomock.Any()).Do(func(input *cloudwatchlogs.DescribeLogStreamsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamNamePrefix), testLogStreamPrefix+testTag, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.NextToken), testNextToken, "Expected next token to match") + }).Return(&cloudwatchlogs.DescribeLogStreamsOutput{ + LogStreams: []*cloudwatchlogs.LogStream{ + &cloudwatchlogs.LogStream{ + LogStreamName: aws.String("another wrong stream"), + }, + }, + }, nil), + ) + + output := OutputPlugin{ + logGroupName: testLogGroup, + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + backoff: plugins.NewBackoff(), + timer: timer, + streams: make(map[string]*logStream), + } + + record := map[interface{}]interface{}{ + "somekey": []byte("some value"), + } + + retCode := output.AddEvent(testTag, record, time.Now()) + assert.Equal(t, retCode, fluentbit.FLB_RETRY, "Expected return code to FLB_RETRY") } @@ -75,7 +195,7 @@ func TestAddEventAndFlush(t *testing.T) { }) ctrl := gomock.NewController(t) - mockCloudWatch := mock_cloudwatch.NewMockCloudWatchLogsClient(ctrl) + mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) gomock.InOrder( mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { @@ -91,7 +211,6 @@ func TestAddEventAndFlush(t *testing.T) { ) output := OutputPlugin{ - region: testRegion, logGroupName: testLogGroup, logStreamPrefix: testLogStreamPrefix, client: mockCloudWatch, @@ -104,7 +223,8 @@ func TestAddEventAndFlush(t *testing.T) { "somekey": []byte("some value"), } - output.AddEvent(testTag, record, time.Now()) + retCode := output.AddEvent(testTag, record, time.Now()) + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to FLB_OK") output.Flush(testTag) } diff --git a/cloudwatch/generate_mock.go b/cloudwatch/generate_mock.go index a1d2614..c35e6b4 100644 --- a/cloudwatch/generate_mock.go +++ b/cloudwatch/generate_mock.go @@ -13,4 +13,4 @@ package cloudwatch -//go:generate mockgen.sh github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch CloudWatchLogsClient mock_cloudwatch/mock.go +//go:generate mockgen.sh github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch LogsClient mock_cloudwatch/mock.go diff --git a/cloudwatch/mock_cloudwatch/mock.go b/cloudwatch/mock_cloudwatch/mock.go index 4c5cfb2..aaef74a 100644 --- a/cloudwatch/mock_cloudwatch/mock.go +++ b/cloudwatch/mock_cloudwatch/mock.go @@ -12,7 +12,7 @@ // permissions and limitations under the License. // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch (interfaces: CloudWatchLogsClient) +// Source: github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch (interfaces: LogsClient) // Package mock_cloudwatch is a generated GoMock package. package mock_cloudwatch @@ -24,31 +24,31 @@ import ( gomock "github.com/golang/mock/gomock" ) -// MockCloudWatchLogsClient is a mock of CloudWatchLogsClient interface -type MockCloudWatchLogsClient struct { +// MockLogsClient is a mock of LogsClient interface +type MockLogsClient struct { ctrl *gomock.Controller - recorder *MockCloudWatchLogsClientMockRecorder + recorder *MockLogsClientMockRecorder } -// MockCloudWatchLogsClientMockRecorder is the mock recorder for MockCloudWatchLogsClient -type MockCloudWatchLogsClientMockRecorder struct { - mock *MockCloudWatchLogsClient +// MockLogsClientMockRecorder is the mock recorder for MockLogsClient +type MockLogsClientMockRecorder struct { + mock *MockLogsClient } -// NewMockCloudWatchLogsClient creates a new mock instance -func NewMockCloudWatchLogsClient(ctrl *gomock.Controller) *MockCloudWatchLogsClient { - mock := &MockCloudWatchLogsClient{ctrl: ctrl} - mock.recorder = &MockCloudWatchLogsClientMockRecorder{mock} +// NewMockLogsClient creates a new mock instance +func NewMockLogsClient(ctrl *gomock.Controller) *MockLogsClient { + mock := &MockLogsClient{ctrl: ctrl} + mock.recorder = &MockLogsClientMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use -func (m *MockCloudWatchLogsClient) EXPECT() *MockCloudWatchLogsClientMockRecorder { +func (m *MockLogsClient) EXPECT() *MockLogsClientMockRecorder { return m.recorder } // CreateLogGroup mocks base method -func (m *MockCloudWatchLogsClient) CreateLogGroup(arg0 *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { +func (m *MockLogsClient) CreateLogGroup(arg0 *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { ret := m.ctrl.Call(m, "CreateLogGroup", arg0) ret0, _ := ret[0].(*cloudwatchlogs.CreateLogGroupOutput) ret1, _ := ret[1].(error) @@ -56,12 +56,12 @@ func (m *MockCloudWatchLogsClient) CreateLogGroup(arg0 *cloudwatchlogs.CreateLog } // CreateLogGroup indicates an expected call of CreateLogGroup -func (mr *MockCloudWatchLogsClientMockRecorder) CreateLogGroup(arg0 interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateLogGroup", reflect.TypeOf((*MockCloudWatchLogsClient)(nil).CreateLogGroup), arg0) +func (mr *MockLogsClientMockRecorder) CreateLogGroup(arg0 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateLogGroup", reflect.TypeOf((*MockLogsClient)(nil).CreateLogGroup), arg0) } // CreateLogStream mocks base method -func (m *MockCloudWatchLogsClient) CreateLogStream(arg0 *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { +func (m *MockLogsClient) CreateLogStream(arg0 *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { ret := m.ctrl.Call(m, "CreateLogStream", arg0) ret0, _ := ret[0].(*cloudwatchlogs.CreateLogStreamOutput) ret1, _ := ret[1].(error) @@ -69,12 +69,25 @@ func (m *MockCloudWatchLogsClient) CreateLogStream(arg0 *cloudwatchlogs.CreateLo } // CreateLogStream indicates an expected call of CreateLogStream -func (mr *MockCloudWatchLogsClientMockRecorder) CreateLogStream(arg0 interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateLogStream", reflect.TypeOf((*MockCloudWatchLogsClient)(nil).CreateLogStream), arg0) +func (mr *MockLogsClientMockRecorder) CreateLogStream(arg0 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateLogStream", reflect.TypeOf((*MockLogsClient)(nil).CreateLogStream), arg0) +} + +// DescribeLogStreams mocks base method +func (m *MockLogsClient) DescribeLogStreams(arg0 *cloudwatchlogs.DescribeLogStreamsInput) (*cloudwatchlogs.DescribeLogStreamsOutput, error) { + ret := m.ctrl.Call(m, "DescribeLogStreams", arg0) + ret0, _ := ret[0].(*cloudwatchlogs.DescribeLogStreamsOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DescribeLogStreams indicates an expected call of DescribeLogStreams +func (mr *MockLogsClientMockRecorder) DescribeLogStreams(arg0 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeLogStreams", reflect.TypeOf((*MockLogsClient)(nil).DescribeLogStreams), arg0) } // PutLogEvents mocks base method -func (m *MockCloudWatchLogsClient) PutLogEvents(arg0 *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { +func (m *MockLogsClient) PutLogEvents(arg0 *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { ret := m.ctrl.Call(m, "PutLogEvents", arg0) ret0, _ := ret[0].(*cloudwatchlogs.PutLogEventsOutput) ret1, _ := ret[1].(error) @@ -82,6 +95,6 @@ func (m *MockCloudWatchLogsClient) PutLogEvents(arg0 *cloudwatchlogs.PutLogEvent } // PutLogEvents indicates an expected call of PutLogEvents -func (mr *MockCloudWatchLogsClientMockRecorder) PutLogEvents(arg0 interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutLogEvents", reflect.TypeOf((*MockCloudWatchLogsClient)(nil).PutLogEvents), arg0) +func (mr *MockLogsClientMockRecorder) PutLogEvents(arg0 interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutLogEvents", reflect.TypeOf((*MockLogsClient)(nil).PutLogEvents), arg0) } diff --git a/fluent-bit-cloudwatch.go b/fluent-bit-cloudwatch.go index 44d693d..0ed32e7 100644 --- a/fluent-bit-cloudwatch.go +++ b/fluent-bit-cloudwatch.go @@ -51,7 +51,7 @@ func getConfiguration(ctx unsafe.Pointer) cloudwatch.OutputPluginConfig { config.RoleARN = output.FLBPluginConfigKey(ctx, "role_arn") logrus.Infof("[cloudwatch] plugin parameter role_arn = '%s'\n", config.RoleARN) config.AutoCreateGroup = getBoolParam(ctx, "auto_create_group", false) - logrus.Infof("[cloudwatch] plugin parameter auto_create_group = '%s'\n", config.AutoCreateGroup) + logrus.Infof("[cloudwatch] plugin parameter auto_create_group = '%v'\n", config.AutoCreateGroup) return config } From f5ee27af09be563984947f890ac446d56eb96e6c Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sat, 15 Jun 2019 23:18:52 -0700 Subject: [PATCH 07/28] Init logger in FLBPluginInit --- fluent-bit-cloudwatch.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fluent-bit-cloudwatch.go b/fluent-bit-cloudwatch.go index 0ed32e7..d233f58 100644 --- a/fluent-bit-cloudwatch.go +++ b/fluent-bit-cloudwatch.go @@ -21,6 +21,7 @@ import ( "time" "github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch" + "github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/plugins" "github.com/fluent/fluent-bit-go/output" "github.com/sirupsen/logrus" @@ -69,6 +70,8 @@ func getBoolParam(ctx unsafe.Pointer, param string, defaultVal bool) bool { //export FLBPluginInit func FLBPluginInit(ctx unsafe.Pointer) int { + plugins.SetupLogger() + config := getConfiguration(ctx) err := config.Validate() if err != nil { From 2a41383c662cfa74ffc66d3dd1a470f66297fb60 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sat, 15 Jun 2019 23:42:44 -0700 Subject: [PATCH 08/28] bugfix: plugin does not error when trying to create an existing log group --- cloudwatch/cloudwatch.go | 9 ++++++++- fluent-bit-cloudwatch.go | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index a11aa2f..150b14e 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -116,7 +116,14 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) { if config.AutoCreateGroup { err = createLogGroup(config.LogGroupName, client) if err != nil { - return nil, err + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() != cloudwatchlogs.ErrCodeResourceAlreadyExistsException { + return nil, err + } + logrus.Info("Log group %s already exists", config.LogGroupName) + } else { + return nil, err + } } } diff --git a/fluent-bit-cloudwatch.go b/fluent-bit-cloudwatch.go index d233f58..e3ae4bc 100644 --- a/fluent-bit-cloudwatch.go +++ b/fluent-bit-cloudwatch.go @@ -46,7 +46,7 @@ func getConfiguration(ctx unsafe.Pointer) cloudwatch.OutputPluginConfig { config.LogStreamName = output.FLBPluginConfigKey(ctx, "log_stream_name") logrus.Infof("[cloudwatch] plugin parameter log_stream = '%s'\n", config.LogStreamName) config.Region = output.FLBPluginConfigKey(ctx, "region") - logrus.Infof("[cloudwatch] plugin parameter = '%s'\n", config.Region) + logrus.Infof("[cloudwatch] plugin parameter region = '%s'\n", config.Region) config.LogKey = output.FLBPluginConfigKey(ctx, "log_key") logrus.Infof("[cloudwatch] plugin parameter log_key = '%s'\n", config.LogKey) config.RoleARN = output.FLBPluginConfigKey(ctx, "role_arn") From 6fa0682c1b82a23fcb64216c652d06a6362cc5fd Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sat, 15 Jun 2019 23:56:40 -0700 Subject: [PATCH 09/28] fix log statements --- cloudwatch/cloudwatch.go | 3 ++- fluent-bit-cloudwatch.go | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 150b14e..4df4147 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -120,7 +120,7 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) { if awsErr.Code() != cloudwatchlogs.ErrCodeResourceAlreadyExistsException { return nil, err } - logrus.Info("Log group %s already exists", config.LogGroupName) + logrus.Infof("[cloudwatch] Log group %s already exists\n", config.LogGroupName) } else { return nil, err } @@ -266,6 +266,7 @@ func (output *OutputPlugin) createStream(tag string) (*logStream, error) { } output.streams[tag] = stream + logrus.Debugf("[cloudwatch] Created log stream %s", name) return stream, nil } diff --git a/fluent-bit-cloudwatch.go b/fluent-bit-cloudwatch.go index e3ae4bc..7d0d33e 100644 --- a/fluent-bit-cloudwatch.go +++ b/fluent-bit-cloudwatch.go @@ -98,7 +98,7 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { dec := output.NewDecoder(data, int(length)) fluentTag := C.GoString(tag) - logrus.Debugf("[firehose] Found logs with tag: %s\n", fluentTag) + logrus.Debugf("[cloudwatch] Found logs with tag: %s\n", fluentTag) for { // Extract Record @@ -132,7 +132,7 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { return output.FLB_RETRY } - logrus.Debugf("[firehose] Processed %d events with tag %s\n", count, fluentTag) + logrus.Debugf("[cloudwatch] Processed %d events with tag %s\n", count, fluentTag) // Return options: // From c0fa73abf520dca56bac8a669e44c88129208e2f Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 16 Jun 2019 00:06:50 -0700 Subject: [PATCH 10/28] Log events in a single PutLogEvents request must be in chronological order. --- cloudwatch/cloudwatch.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 4df4147..5c64324 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -16,6 +16,7 @@ package cloudwatch import ( "fmt" "os" + "sort" "strings" "time" "unicode/utf8" @@ -348,6 +349,10 @@ func (output *OutputPlugin) Flush(tag string) error { } func (output *OutputPlugin) putLogEvents(stream *logStream) error { + // Log events in a single PutLogEvents request must be in chronological order. + sort.Slice(stream.logEvents, func(i, j int) bool { + return aws.Int64Value(stream.logEvents[i].Timestamp) < aws.Int64Value(stream.logEvents[j].Timestamp) + }) response, err := output.client.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{ LogEvents: stream.logEvents, LogGroupName: aws.String(output.logGroupName), From b5c424e181fb7eeb4165ecf064e97fc899f4b9e6 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 16 Jun 2019 15:37:07 -0700 Subject: [PATCH 11/28] CloudWatch uses milliseconds since epoch for time stamps --- cloudwatch/cloudwatch.go | 3 ++- fluent-bit-cloudwatch.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 5c64324..e5602e6 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -180,7 +180,7 @@ func (output *OutputPlugin) AddEvent(tag string, record map[interface{}]interfac stream.logEvents = append(stream.logEvents, &cloudwatchlogs.InputLogEvent{ Message: aws.String(event), - Timestamp: aws.Int64(timestamp.Unix()), + Timestamp: aws.Int64(timestamp.UnixNano() / 1e6), // CloudWatch uses milliseconds since epoch }) stream.currentByteLength += cloudwatchLen(event) return fluentbit.FLB_OK @@ -359,6 +359,7 @@ func (output *OutputPlugin) putLogEvents(stream *logStream) error { LogStreamName: aws.String(stream.logStreamName), SequenceToken: stream.nextSequenceToken, }) + logrus.Debug(*response) if err != nil { return err } diff --git a/fluent-bit-cloudwatch.go b/fluent-bit-cloudwatch.go index 7d0d33e..c34cef7 100644 --- a/fluent-bit-cloudwatch.go +++ b/fluent-bit-cloudwatch.go @@ -112,7 +112,7 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { case output.FLBTime: timestamp = tts.Time case uint64: - // From our observation, when ts is of type uint64 it appears to + // when ts is of type uint64 it appears to // be the amount of seconds since unix epoch. timestamp = time.Unix(int64(tts), 0) default: From 7daeed4924f23fb167e6c08c535c19e741d68cfb Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 16 Jun 2019 16:40:23 -0700 Subject: [PATCH 12/28] added timeout --- cloudwatch/cloudwatch.go | 25 +++++++++++++++++++++++-- plugins/plugins.go | 1 + 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index e5602e6..53e2d50 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -126,6 +126,7 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) { return nil, err } } + logrus.Infof("Created log group %s\n", config.LogGroupName) } return &OutputPlugin{ @@ -134,7 +135,6 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) { logStreamName: config.LogStreamName, logKey: config.LogKey, client: client, - backoff: plugins.NewBackoff(), timer: timer, streams: make(map[string]*logStream), }, nil @@ -216,8 +216,10 @@ func (output *OutputPlugin) existingLogStream(tag string, nextToken *string) (*l }) if err != nil { + output.timer.Start() return nil, err } + output.timer.Check() for _, result := range resp.LogStreams { if aws.StringValue(result.LogStreamName) == name { @@ -257,8 +259,10 @@ func (output *OutputPlugin) createStream(tag string) (*logStream, error) { }) if err != nil { + output.timer.Start() return nil, err } + output.timer.Reset() stream := &logStream{ logStreamName: name, @@ -349,6 +353,8 @@ func (output *OutputPlugin) Flush(tag string) error { } func (output *OutputPlugin) putLogEvents(stream *logStream) error { + output.timer.Check() + // Log events in a single PutLogEvents request must be in chronological order. sort.Slice(stream.logEvents, func(i, j int) bool { return aws.Int64Value(stream.logEvents[i].Timestamp) < aws.Int64Value(stream.logEvents[j].Timestamp) @@ -359,10 +365,11 @@ func (output *OutputPlugin) putLogEvents(stream *logStream) error { LogStreamName: aws.String(stream.logStreamName), SequenceToken: stream.nextSequenceToken, }) - logrus.Debug(*response) if err != nil { + output.timer.Start() return err } + output.timer.Reset() logrus.Debugf("Sent %d events to CloudWatch\n", len(stream.logEvents)) stream.nextSequenceToken = response.NextSequenceToken @@ -372,6 +379,20 @@ func (output *OutputPlugin) putLogEvents(stream *logStream) error { return nil } +func processRejectedEventsInfo(response *cloudwatchlogs.PutLogEventsOutput) { + if response.RejectedLogEventsInfo != nil { + if response.RejectedLogEventsInfo.ExpiredLogEventEndIndex != nil { + logrus.Warnf("[cloudwatch] %d log events were marked as expired by CloudWatch\n", aws.Int64Value(response.RejectedLogEventsInfo.ExpiredLogEventEndIndex)) + } + if response.RejectedLogEventsInfo.TooNewLogEventStartIndex != nil { + logrus.Warnf("[cloudwatch] %d log events were marked as too new by CloudWatch\n", aws.Int64Value(response.RejectedLogEventsInfo.TooNewLogEventStartIndex)) + } + if response.RejectedLogEventsInfo.TooOldLogEventEndIndex != nil { + logrus.Warnf("[cloudwatch] %d log events were marked as too old by CloudWatch\n", aws.Int64Value(response.RejectedLogEventsInfo.TooOldLogEventEndIndex)) + } + } +} + // effectiveLen counts the effective number of bytes in the string, after // UTF-8 normalization. UTF-8 normalization includes replacing bytes that do // not constitute valid UTF-8 encoded Unicode codepoints with the Unicode diff --git a/plugins/plugins.go b/plugins/plugins.go index 18ba24d..406dfe0 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -119,6 +119,7 @@ func NewTimeout(timeoutFunc func(duration time.Duration)) (*Timeout, error) { if err != nil { return nil, err } + logrus.Debugf("Configuring timer with timeout of %d", duration) return &Timeout{ timeoutFunc: timeoutFunc, duration: duration, From c67097425c9d3b665a899313b2265c0a6bb0fb40 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 16 Jun 2019 16:54:05 -0700 Subject: [PATCH 13/28] Debugging timeout --- cloudwatch/cloudwatch.go | 6 ++++-- plugins/plugins.go | 3 +++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 53e2d50..92399c8 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -105,7 +105,7 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) { client := newCloudWatchLogsClient(config.RoleARN, sess) timer, err := plugins.NewTimeout(func(d time.Duration) { - logrus.Errorf("[cloudwatch] timeout threshold reached: Failed to send logs for %v\n", d) + logrus.Errorf("[cloudwatch] timeout threshold reached: Failed to send logs for %s\n", d.String()) logrus.Error("[cloudwatch] Quitting Fluent Bit") os.Exit(1) }) @@ -208,6 +208,7 @@ func (output *OutputPlugin) getLogStream(tag string) (*logStream, error) { } func (output *OutputPlugin) existingLogStream(tag string, nextToken *string) (*logStream, error) { + output.timer.Check() name := output.getStreamName(tag) resp, err := output.client.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{ LogGroupName: aws.String(output.logGroupName), @@ -219,7 +220,7 @@ func (output *OutputPlugin) existingLogStream(tag string, nextToken *string) (*l output.timer.Start() return nil, err } - output.timer.Check() + output.timer.Reset() for _, result := range resp.LogStreams { if aws.StringValue(result.LogStreamName) == name { @@ -252,6 +253,7 @@ func (output *OutputPlugin) getStreamName(tag string) string { } func (output *OutputPlugin) createStream(tag string) (*logStream, error) { + output.timer.Check() name := output.getStreamName(tag) _, err := output.client.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{ LogGroupName: aws.String(output.logGroupName), diff --git a/plugins/plugins.go b/plugins/plugins.go index 406dfe0..71ad311 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -90,6 +90,7 @@ type Timeout struct { // this method has no effect if the timer has already been started func (t *Timeout) Start() { if t.enabled && !t.ticking { + logrus.Debug("Starting timeout") t.ticking = true t.stopTime = time.Now().Add(t.duration) } @@ -102,6 +103,8 @@ func (t *Timeout) Reset() { // Check the timer to see if its timed out func (t *Timeout) Check() { + logrus.Debug("Checking timeout") + logrus.Debugf("%s left", t.stopTime.Sub(time.Now()).String()) if t.enabled && t.ticking { if t.stopTime.Before(time.Now()) { // run the timeout function From 9024c31a14d52a557207b311543f57a202bf395f Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 16 Jun 2019 19:56:28 -0700 Subject: [PATCH 14/28] temp changes for plugins (will be removed once firehose PR is merged --- plugins/plugins.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/plugins.go b/plugins/plugins.go index 71ad311..1471d7a 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -117,8 +117,8 @@ func (t *Timeout) Check() { // with a duration set from the env var // if the env var is not set, then a timer is returned that is disabled (it doesn't do anything) func NewTimeout(timeoutFunc func(duration time.Duration)) (*Timeout, error) { - if os.Getenv(sendFailureTimeoutEnvVar) != "" { - duration, err := time.ParseDuration(os.Getenv(sendFailureTimeoutEnvVar)) + if timeout := os.Getenv(sendFailureTimeoutEnvVar); timeout != "" { + duration, err := time.ParseDuration(timeout) if err != nil { return nil, err } From 1539acd4eed36c9d5cf8f60d7e3412b038efd6ce Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 18 Jun 2019 23:12:31 -0700 Subject: [PATCH 15/28] Add the ability to configure endpoint --- cloudwatch/cloudwatch.go | 25 ++++++++++++++++++++----- fluent-bit-cloudwatch.go | 2 ++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 92399c8..f6d9ab3 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -24,6 +24,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials/stscreds" + "github.com/aws/aws-sdk-go/aws/endpoints" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/plugins" @@ -75,6 +76,7 @@ type OutputPluginConfig struct { LogKey string RoleARN string AutoCreateGroup bool + CWEndpoint string } // Validate checks the configuration input for an OutputPlugin instances @@ -102,7 +104,7 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) { return nil, err } - client := newCloudWatchLogsClient(config.RoleARN, sess) + client := newCloudWatchLogsClient(config.RoleARN, sess, config.CWEndpoint) timer, err := plugins.NewTimeout(func(d time.Duration) { logrus.Errorf("[cloudwatch] timeout threshold reached: Failed to send logs for %s\n", d.String()) @@ -140,7 +142,20 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) { }, nil } -func newCloudWatchLogsClient(roleARN string, sess *session.Session) *cloudwatchlogs.CloudWatchLogs { +func newCloudWatchLogsClient(roleARN string, sess *session.Session, endpoint string) *cloudwatchlogs.CloudWatchLogs { + svcConfig := aws.Config{} + if endpoint != "" { + defaultResolver := endpoints.DefaultResolver() + cwCustomResolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) { + if service == "logs" { + return endpoints.ResolvedEndpoint{ + URL: endpoint, + }, nil + } + return defaultResolver.EndpointFor(service, region, optFns...) + } + svcConfig.EndpointResolver = endpoints.ResolverFunc(cwCustomResolverFn) + } if roleARN != "" { creds := stscreds.NewCredentials(sess, roleARN) return cloudwatchlogs.New(sess, &aws.Config{Credentials: creds}) @@ -236,11 +251,11 @@ func (output *OutputPlugin) existingLogStream(tag string, nextToken *string) (*l } } - if resp.NextToken != nil { - return output.existingLogStream(tag, resp.NextToken) + if resp.NextToken == nil { + return nil, fmt.Errorf("error: does not compute: Log Stream %s could not be created, but also could not be found in the log group", name) } - return nil, fmt.Errorf("error: does not compute: Log Stream %s could not be created, but also could not be found in the log group", name) + return output.existingLogStream(tag, resp.NextToken) } func (output *OutputPlugin) getStreamName(tag string) string { diff --git a/fluent-bit-cloudwatch.go b/fluent-bit-cloudwatch.go index c34cef7..3e5a8d8 100644 --- a/fluent-bit-cloudwatch.go +++ b/fluent-bit-cloudwatch.go @@ -53,6 +53,8 @@ func getConfiguration(ctx unsafe.Pointer) cloudwatch.OutputPluginConfig { logrus.Infof("[cloudwatch] plugin parameter role_arn = '%s'\n", config.RoleARN) config.AutoCreateGroup = getBoolParam(ctx, "auto_create_group", false) logrus.Infof("[cloudwatch] plugin parameter auto_create_group = '%v'\n", config.AutoCreateGroup) + config.CWEndpoint = output.FLBPluginConfigKey(ctx, "endpoint") + logrus.Infof("[cloudwatch] plugin parameter endpoint = '%s'\n", config.CWEndpoint) return config } From 43db402b5f23839f3dc9aecc5758d209fddef568 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 18 Jun 2019 23:32:37 -0700 Subject: [PATCH 16/28] Convert recursion to iteration --- cloudwatch/cloudwatch.go | 57 ++++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index f6d9ab3..032b261 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -211,7 +211,7 @@ func (output *OutputPlugin) getLogStream(tag string) (*logStream, error) { if awsErr, ok := err.(awserr.Error); ok { if awsErr.Code() == cloudwatchlogs.ErrCodeResourceAlreadyExistsException { // existing stream - return output.existingLogStream(tag, nil) + return output.existingLogStream(tag) } } } @@ -222,9 +222,40 @@ func (output *OutputPlugin) getLogStream(tag string) (*logStream, error) { return stream, nil } -func (output *OutputPlugin) existingLogStream(tag string, nextToken *string) (*logStream, error) { - output.timer.Check() +func (output *OutputPlugin) existingLogStream(tag string) (*logStream, error) { + var nextToken *string + var stream *logStream name := output.getStreamName(tag) + + for stream == nil { + resp, err := output.describeLogStreams(name, nextToken) + if err != nil { + return nil, err + } + + for _, result := range resp.LogStreams { + if aws.StringValue(result.LogStreamName) == name { + stream = &logStream{ + logStreamName: name, + logEvents: make([]*cloudwatchlogs.InputLogEvent, 0, maximumLogEventsPerPut), + nextSequenceToken: result.UploadSequenceToken, + } + + output.streams[tag] = stream + } + } + + if resp.NextToken == nil { + return nil, fmt.Errorf("error: does not compute: Log Stream %s could not be created, but also could not be found in the log group", name) + } + + nextToken = resp.NextToken + } + return stream, nil +} + +func (output *OutputPlugin) describeLogStreams(name string, nextToken *string) (*cloudwatchlogs.DescribeLogStreamsOutput, error) { + output.timer.Check() resp, err := output.client.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{ LogGroupName: aws.String(output.logGroupName), LogStreamNamePrefix: aws.String(name), @@ -237,25 +268,7 @@ func (output *OutputPlugin) existingLogStream(tag string, nextToken *string) (*l } output.timer.Reset() - for _, result := range resp.LogStreams { - if aws.StringValue(result.LogStreamName) == name { - stream := &logStream{ - logStreamName: name, - logEvents: make([]*cloudwatchlogs.InputLogEvent, 0, maximumLogEventsPerPut), - nextSequenceToken: result.UploadSequenceToken, - } - - output.streams[tag] = stream - - return stream, nil - } - } - - if resp.NextToken == nil { - return nil, fmt.Errorf("error: does not compute: Log Stream %s could not be created, but also could not be found in the log group", name) - } - - return output.existingLogStream(tag, resp.NextToken) + return resp, err } func (output *OutputPlugin) getStreamName(tag string) string { From 3ee46f84fdb0f871ded308a9e207b72ecebf71b4 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Wed, 19 Jun 2019 00:01:51 -0700 Subject: [PATCH 17/28] Add cases for DataAlreadyAcceptedException and InvalidSequenceTokenException --- cloudwatch/cloudwatch.go | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 032b261..651859d 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -396,8 +396,28 @@ func (output *OutputPlugin) putLogEvents(stream *logStream) error { SequenceToken: stream.nextSequenceToken, }) if err != nil { - output.timer.Start() - return err + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == cloudwatchlogs.ErrCodeDataAlreadyAcceptedException { + // already submitted, just grab the correct sequence token + parts := strings.Split(awsErr.Message(), " ") + stream.nextSequenceToken = &parts[len(parts)-1] + stream.logEvents = stream.logEvents[:0] + stream.currentByteLength = 0 + logrus.Infof("[cloudwatch] Encountered error %v; data already accepted, ignoring error\n", awsErr) + return nil + } else if awsErr.Code() == cloudwatchlogs.ErrCodeInvalidSequenceTokenException { + // sequence code is bad, grab the correct one and retry + parts := strings.Split(awsErr.Message(), " ") + stream.nextSequenceToken = &parts[len(parts)-1] + + return output.putLogEvents(stream) + } else { + output.timer.Start() + return err + } + } else { + return err + } } output.timer.Reset() logrus.Debugf("Sent %d events to CloudWatch\n", len(stream.logEvents)) From e83906734e28af5fb4490eeb5bba200b725a76c8 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Wed, 19 Jun 2019 00:23:38 -0700 Subject: [PATCH 18/28] Add test cases for DataAlreadyAcceptedException and InvalidSequenceTokenException --- cloudwatch/cloudwatch_test.go | 90 ++++++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/cloudwatch/cloudwatch_test.go b/cloudwatch/cloudwatch_test.go index 9d57f4b..cd9e9cd 100644 --- a/cloudwatch/cloudwatch_test.go +++ b/cloudwatch/cloudwatch_test.go @@ -35,7 +35,8 @@ const ( testLogGroup = "my-logs" testLogStreamPrefix = "my-prefix" testTag = "tag" - testNextToken = "next token" + testNextToken = "next-token" + testSequenceToken = "sequence-token" ) func TestAddEvent(t *testing.T) { @@ -228,3 +229,90 @@ func TestAddEventAndFlush(t *testing.T) { output.Flush(testTag) } + +func TestAddEventAndFlushDataAlreadyAcceptedException(t *testing.T) { + timer, _ := plugins.NewTimeout(func(d time.Duration) { + logrus.Errorf("[firehose] timeout threshold reached: Failed to send logs for %v\n", d) + logrus.Error("[firehose] Quitting Fluent Bit") + os.Exit(1) + }) + + ctrl := gomock.NewController(t) + mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) + + gomock.InOrder( + mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") + }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil), + mockCloudWatch.EXPECT().PutLogEvents(gomock.Any()).Do(func(input *cloudwatchlogs.PutLogEventsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") + }).Return(nil, awserr.New(cloudwatchlogs.ErrCodeDataAlreadyAcceptedException, "Data already accepted; The next expected sequenceToken is: "+testSequenceToken, fmt.Errorf("API Error"))), + ) + + output := OutputPlugin{ + logGroupName: testLogGroup, + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + backoff: plugins.NewBackoff(), + timer: timer, + streams: make(map[string]*logStream), + } + + record := map[interface{}]interface{}{ + "somekey": []byte("some value"), + } + + retCode := output.AddEvent(testTag, record, time.Now()) + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to FLB_OK") + output.Flush(testTag) + +} + +func TestAddEventAndFlushDataInvalidSequenceTokenException(t *testing.T) { + timer, _ := plugins.NewTimeout(func(d time.Duration) { + logrus.Errorf("[firehose] timeout threshold reached: Failed to send logs for %v\n", d) + logrus.Error("[firehose] Quitting Fluent Bit") + os.Exit(1) + }) + + ctrl := gomock.NewController(t) + mockCloudWatch := mock_cloudwatch.NewMockLogsClient(ctrl) + + gomock.InOrder( + mockCloudWatch.EXPECT().CreateLogStream(gomock.Any()).Do(func(input *cloudwatchlogs.CreateLogStreamInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") + }).Return(&cloudwatchlogs.CreateLogStreamOutput{}, nil), + mockCloudWatch.EXPECT().PutLogEvents(gomock.Any()).Do(func(input *cloudwatchlogs.PutLogEventsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") + }).Return(nil, awserr.New(cloudwatchlogs.ErrCodeInvalidSequenceTokenException, "The given sequenceToken is invalid; The next expected sequenceToken is: "+testSequenceToken, fmt.Errorf("API Error"))), + mockCloudWatch.EXPECT().PutLogEvents(gomock.Any()).Do(func(input *cloudwatchlogs.PutLogEventsInput) { + assert.Equal(t, aws.StringValue(input.LogGroupName), testLogGroup, "Expected log group name to match") + assert.Equal(t, aws.StringValue(input.LogStreamName), testLogStreamPrefix+testTag, "Expected log stream name to match") + assert.Equal(t, aws.StringValue(input.SequenceToken), testSequenceToken, "Expected sequence token to match response from previous error") + }).Return(&cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String("token"), + }, nil), + ) + + output := OutputPlugin{ + logGroupName: testLogGroup, + logStreamPrefix: testLogStreamPrefix, + client: mockCloudWatch, + backoff: plugins.NewBackoff(), + timer: timer, + streams: make(map[string]*logStream), + } + + record := map[interface{}]interface{}{ + "somekey": []byte("some value"), + } + + retCode := output.AddEvent(testTag, record, time.Now()) + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to FLB_OK") + output.Flush(testTag) + +} From 0776d3378f6abde115e1151c786e59be26dab7d7 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Wed, 19 Jun 2019 00:26:28 -0700 Subject: [PATCH 19/28] Address some of @hencrice comments --- Makefile | 2 +- cloudwatch/cloudwatch.go | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index bb19585..23af816 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,7 @@ build: $(PLUGIN_BINARY) $(PLUGIN_BINARY): $(SOURCES) PATH=${PATH} golint ./cloudwatch mkdir -p ./bin - go build -buildmode c-shared -o ./bin/cloudwatch.so ./ + go build -buildmode c-shared -o $(PLUGIN_BINARY) ./ @echo "Built Amazon CloudWatch Logs Fluent Bit Plugin" .PHONY: generate diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 651859d..1684c21 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -15,7 +15,6 @@ package cloudwatch import ( "fmt" - "os" "sort" "strings" "time" @@ -108,8 +107,7 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) { timer, err := plugins.NewTimeout(func(d time.Duration) { logrus.Errorf("[cloudwatch] timeout threshold reached: Failed to send logs for %s\n", d.String()) - logrus.Error("[cloudwatch] Quitting Fluent Bit") - os.Exit(1) + logrus.Fatal("[cloudwatch] Quitting Fluent Bit") // exit the plugin and kill Fluent Bit }) if err != nil { From 249f6fb0056767080f8c373ac35172e63596ed93 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Wed, 19 Jun 2019 16:49:43 -0700 Subject: [PATCH 20/28] Added periodic clean up for log stream buffers, to prevent long term build up of memory used by the plugin --- cloudwatch/cloudwatch.go | 41 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 1684c21..bfd1b2b 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -39,6 +39,13 @@ const ( maximumLogEventsPerPut = 10000 ) +const ( + // Log stream objects that are empty and inactive for longer than the timeout get cleaned up + logStreamInactivityTimeout = time.Hour + // Check for expired log streams every 10 minutes + logStreamInactivityCheckInterval = 10 +) + // LogsClient contains the CloudWatch API calls used by this plugin type LogsClient interface { CreateLogGroup(input *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) @@ -52,6 +59,18 @@ type logStream struct { currentByteLength int nextSequenceToken *string logStreamName string + expiration time.Time +} + +func (stream *logStream) isExpired() bool { + if len(stream.logEvents) == 0 && stream.expiration.Before(time.Now()) { + return true + } + return false +} + +func (stream *logStream) updateExpiration() { + stream.expiration = time.Now().Add(logStreamInactivityTimeout) } // OutputPlugin is the CloudWatch Logs Fluent Bit output plugin @@ -199,6 +218,22 @@ func (output *OutputPlugin) AddEvent(tag string, record map[interface{}]interfac return fluentbit.FLB_OK } +// This plugin tracks CW Log streams +// We need to periodically delete any streams that haven't been written to in a while +// Because each stream incurs some memory for its buffer of log events +// (Which would be empty for an unused stream) +func (output *OutputPlugin) cleanUpExpiredLogStreams() { + if (time.Now().Minute() % logStreamInactivityCheckInterval) == 0 { + logrus.Debug("[cloudwatch] Checking for expired log streams") + for tag, stream := range output.streams { + if stream.isExpired() { + logrus.Debugf("[cloudwatch] Removing internal buffer for log stream %s; the stream has not been written to for %s", stream.logStreamName, logStreamInactivityTimeout.String()) + delete(output.streams, tag) + } + } + } +} + func (output *OutputPlugin) getLogStream(tag string) (*logStream, error) { // find log stream by tag stream, ok := output.streams[tag] @@ -213,7 +248,6 @@ func (output *OutputPlugin) getLogStream(tag string) (*logStream, error) { } } } - return stream, err } @@ -240,6 +274,7 @@ func (output *OutputPlugin) existingLogStream(tag string) (*logStream, error) { } output.streams[tag] = stream + stream.updateExpiration() // initialize } } @@ -299,6 +334,7 @@ func (output *OutputPlugin) createStream(tag string) (*logStream, error) { } output.streams[tag] = stream + stream.updateExpiration() // initialize logrus.Debugf("[cloudwatch] Created log stream %s", name) return stream, nil @@ -373,6 +409,8 @@ func logKey(record map[interface{}]interface{}, logKey string) (*interface{}, er // Flush sends the current buffer of records (for the stream that corresponds with the given tag) func (output *OutputPlugin) Flush(tag string) error { + output.cleanUpExpiredLogStreams() // will periodically clean up, otherwise is no-op + stream, err := output.getLogStream(tag) if err != nil { return err @@ -382,6 +420,7 @@ func (output *OutputPlugin) Flush(tag string) error { func (output *OutputPlugin) putLogEvents(stream *logStream) error { output.timer.Check() + stream.updateExpiration() // Log events in a single PutLogEvents request must be in chronological order. sort.Slice(stream.logEvents, func(i, j int) bool { From cb26d5f0d3e051cb0c8d0b3c881e5436ce402bf4 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Wed, 19 Jun 2019 16:58:33 -0700 Subject: [PATCH 21/28] Add more debug statements --- cloudwatch/cloudwatch.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index bfd1b2b..3b0d566 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -272,7 +272,7 @@ func (output *OutputPlugin) existingLogStream(tag string) (*logStream, error) { logEvents: make([]*cloudwatchlogs.InputLogEvent, 0, maximumLogEventsPerPut), nextSequenceToken: result.UploadSequenceToken, } - + logrus.Debugf("[cloudwatch] Initializing internal buffer for exising log stream %s\n", name) output.streams[tag] = stream stream.updateExpiration() // initialize } @@ -332,7 +332,7 @@ func (output *OutputPlugin) createStream(tag string) (*logStream, error) { logEvents: make([]*cloudwatchlogs.InputLogEvent, 0, maximumLogEventsPerPut), nextSequenceToken: nil, // sequence token not required for a new log stream } - + logrus.Debugf("[cloudwatch] Created new log stream %s\n", name) output.streams[tag] = stream stream.updateExpiration() // initialize logrus.Debugf("[cloudwatch] Created log stream %s", name) From f6f5db4192c60af6091f8690e6f8f19b3bfe2e4a Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sat, 22 Jun 2019 13:52:33 -0700 Subject: [PATCH 22/28] Bugfix: set custom endpoint in all cases --- cloudwatch/cloudwatch.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 3b0d566..8fa7630 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -160,7 +160,7 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) { } func newCloudWatchLogsClient(roleARN string, sess *session.Session, endpoint string) *cloudwatchlogs.CloudWatchLogs { - svcConfig := aws.Config{} + svcConfig := &aws.Config{} if endpoint != "" { defaultResolver := endpoints.DefaultResolver() cwCustomResolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) { @@ -175,10 +175,11 @@ func newCloudWatchLogsClient(roleARN string, sess *session.Session, endpoint str } if roleARN != "" { creds := stscreds.NewCredentials(sess, roleARN) - return cloudwatchlogs.New(sess, &aws.Config{Credentials: creds}) + svcConfig.Credentials = creds + return cloudwatchlogs.New(sess, svcConfig) } - return cloudwatchlogs.New(sess) + return cloudwatchlogs.New(sess, svcConfig) } // AddEvent accepts a record and adds it to the buffer for its stream, flushing the buffer if it is full From f65552f91f0f68f8a60e1638b18dcba8a2372011 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 23 Jun 2019 19:56:23 -0700 Subject: [PATCH 23/28] Project rename github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit => github.com/aws/amazon-cloudwatch-logs-for-fluent-bit --- cloudwatch/cloudwatch.go | 3 +-- cloudwatch/cloudwatch_test.go | 4 ++-- cloudwatch/generate_mock.go | 2 +- fluent-bit-cloudwatch.go | 4 ++-- go.mod | 8 +++----- go.sum | 14 ++++++++------ 6 files changed, 17 insertions(+), 18 deletions(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 8fa7630..9c8e554 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -20,13 +20,13 @@ import ( "time" "unicode/utf8" + "github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/plugins" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials/stscreds" "github.com/aws/aws-sdk-go/aws/endpoints" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" - "github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/plugins" fluentbit "github.com/fluent/fluent-bit-go/output" jsoniter "github.com/json-iterator/go" "github.com/sirupsen/logrus" @@ -176,7 +176,6 @@ func newCloudWatchLogsClient(roleARN string, sess *session.Session, endpoint str if roleARN != "" { creds := stscreds.NewCredentials(sess, roleARN) svcConfig.Credentials = creds - return cloudwatchlogs.New(sess, svcConfig) } return cloudwatchlogs.New(sess, svcConfig) diff --git a/cloudwatch/cloudwatch_test.go b/cloudwatch/cloudwatch_test.go index cd9e9cd..a230f40 100644 --- a/cloudwatch/cloudwatch_test.go +++ b/cloudwatch/cloudwatch_test.go @@ -19,11 +19,11 @@ import ( "testing" "time" + "github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch/mock_cloudwatch" + "github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/plugins" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" - "github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch/mock_cloudwatch" - "github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/plugins" fluentbit "github.com/fluent/fluent-bit-go/output" "github.com/golang/mock/gomock" "github.com/sirupsen/logrus" diff --git a/cloudwatch/generate_mock.go b/cloudwatch/generate_mock.go index c35e6b4..f469403 100644 --- a/cloudwatch/generate_mock.go +++ b/cloudwatch/generate_mock.go @@ -13,4 +13,4 @@ package cloudwatch -//go:generate mockgen.sh github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch LogsClient mock_cloudwatch/mock.go +//go:generate mockgen.sh github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch LogsClient mock_cloudwatch/mock.go diff --git a/fluent-bit-cloudwatch.go b/fluent-bit-cloudwatch.go index 3e5a8d8..fbac1de 100644 --- a/fluent-bit-cloudwatch.go +++ b/fluent-bit-cloudwatch.go @@ -20,8 +20,8 @@ import ( "time" - "github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch" - "github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit/plugins" + "github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch" + "github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/plugins" "github.com/fluent/fluent-bit-go/output" "github.com/sirupsen/logrus" diff --git a/go.mod b/go.mod index b718389..b96cc27 100644 --- a/go.mod +++ b/go.mod @@ -1,15 +1,13 @@ -module github.com/awslabs/amazon-cloudwatch-logs-for-fluent-bit +module github.com/aws/amazon-cloudwatch-logs-for-fluent-bit go 1.12 require ( - github.com/aws/aws-sdk-go v1.19.45 + github.com/aws/aws-sdk-go v1.20.6 github.com/cenkalti/backoff v2.1.1+incompatible - github.com/fluent/fluent-bit-go v0.0.0-20190521122216-fc386d263885 - github.com/golang/mock v1.3.1 + github.com/fluent/fluent-bit-go v0.0.0-20190614024040-c017a8579953 github.com/json-iterator/go v1.1.6 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect github.com/sirupsen/logrus v1.4.2 - github.com/stretchr/testify v1.2.2 ) diff --git a/go.sum b/go.sum index e545070..2c98350 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,12 @@ -github.com/aws/aws-sdk-go v1.19.45 h1:jAxmC8qqa7mW531FDgM8Ahbqlb3zmiHgTpJU6fY3vJ0= -github.com/aws/aws-sdk-go v1.19.45/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= -github.com/awslabs/amazon-kinesis-firehose-for-fluent-bit v0.0.0-20190430230011-6413ba698e77 h1:QODGNn1Mdwo6EPZc/KJ+0WnHxTbgiTuOX3kTDJC0q1Q= +github.com/aws/aws-sdk-go v1.20.6 h1:kmy4Gvdlyez1fV4kw5RYxZzWKVyuHZHgPWeU/YvRsV4= +github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/cenkalti/backoff v2.1.1+incompatible h1:tKJnvO2kl0zmb/jA5UKAt4VoEVw1qxKWjE/Bpp46npY= github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/fluent/fluent-bit-go v0.0.0-20190521122216-fc386d263885 h1:U+Vvh2frbsjyNGQGFOY8PFqnnmeqrCGL6q2TMysHHH4= -github.com/fluent/fluent-bit-go v0.0.0-20190521122216-fc386d263885/go.mod h1:WQX+afhrekY9rGK+WT4xvKSlzmia9gDoLYu4GGYGASQ= +github.com/fluent/fluent-bit-go v0.0.0-20190614024040-c017a8579953 h1:dDGtm4HU/xEd2vkhzkJimQ0tPoQ3AKo7cr6vnx+qg5c= +github.com/fluent/fluent-bit-go v0.0.0-20190614024040-c017a8579953/go.mod h1:WQX+afhrekY9rGK+WT4xvKSlzmia9gDoLYu4GGYGASQ= github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= @@ -22,9 +22,11 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/ugorji/go v1.1.4 h1:j4s+tAvLfL3bZyefP2SEWmhBzmuIlH/eqNuPdFPgngw= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= From fb639eda5922bc1f993cc14969f8dfcf93079631 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 23 Jun 2019 20:06:50 -0700 Subject: [PATCH 24/28] Import shared plugins library from Kinesis Firehose plugin --- cloudwatch/cloudwatch.go | 2 +- cloudwatch/cloudwatch_test.go | 2 +- fluent-bit-cloudwatch.go | 2 +- go.mod | 5 +- go.sum | 3 + plugins/plugins.go | 238 ---------------------------------- plugins/plugins_test.go | 68 ---------- 7 files changed, 9 insertions(+), 311 deletions(-) delete mode 100644 plugins/plugins.go delete mode 100644 plugins/plugins_test.go diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 9c8e554..83c867d 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -20,7 +20,7 @@ import ( "time" "unicode/utf8" - "github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/plugins" + "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials/stscreds" diff --git a/cloudwatch/cloudwatch_test.go b/cloudwatch/cloudwatch_test.go index a230f40..af034f5 100644 --- a/cloudwatch/cloudwatch_test.go +++ b/cloudwatch/cloudwatch_test.go @@ -20,7 +20,7 @@ import ( "time" "github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch/mock_cloudwatch" - "github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/plugins" + "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" diff --git a/fluent-bit-cloudwatch.go b/fluent-bit-cloudwatch.go index fbac1de..f6646c5 100644 --- a/fluent-bit-cloudwatch.go +++ b/fluent-bit-cloudwatch.go @@ -21,7 +21,7 @@ import ( "time" "github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch" - "github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/plugins" + "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/fluent/fluent-bit-go/output" "github.com/sirupsen/logrus" diff --git a/go.mod b/go.mod index b96cc27..db5a317 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,12 @@ module github.com/aws/amazon-cloudwatch-logs-for-fluent-bit go 1.12 require ( + github.com/aws/amazon-kinesis-firehose-for-fluent-bit v0.0.0-20190624030122-c41b42995068 github.com/aws/aws-sdk-go v1.20.6 github.com/cenkalti/backoff v2.1.1+incompatible github.com/fluent/fluent-bit-go v0.0.0-20190614024040-c017a8579953 + github.com/golang/mock v1.3.1 github.com/json-iterator/go v1.1.6 - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.1 // indirect github.com/sirupsen/logrus v1.4.2 + github.com/stretchr/testify v1.2.2 ) diff --git a/go.sum b/go.sum index 2c98350..fd6ee62 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/aws/amazon-kinesis-firehose-for-fluent-bit v0.0.0-20190624030122-c41b42995068 h1:cMoOT6QgDnReGqOcg7+d2YtMpamUhijuubTHGy9dlq8= +github.com/aws/amazon-kinesis-firehose-for-fluent-bit v0.0.0-20190624030122-c41b42995068/go.mod h1:EGtphTrA5BAr9VKTGmK6P+3/ny4l8wTU2tLjqDyP1F8= github.com/aws/aws-sdk-go v1.20.6 h1:kmy4Gvdlyez1fV4kw5RYxZzWKVyuHZHgPWeU/YvRsV4= github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/cenkalti/backoff v2.1.1+incompatible h1:tKJnvO2kl0zmb/jA5UKAt4VoEVw1qxKWjE/Bpp46npY= @@ -24,6 +26,7 @@ github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/plugins/plugins.go b/plugins/plugins.go deleted file mode 100644 index 1471d7a..0000000 --- a/plugins/plugins.go +++ /dev/null @@ -1,238 +0,0 @@ -// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -// Package plugins contains functions that are useful across fluent bit plugins. -// This package will be imported by the CloudWatch Logs and Kinesis Data Streams plugins. -package plugins - -import ( - "os" - "strings" - "time" - - retry "github.com/cenkalti/backoff" - "github.com/sirupsen/logrus" -) - -const ( - fluentBitLogLevelEnvVar = "FLB_LOG_LEVEL" - sendFailureTimeoutEnvVar = "SEND_FAILURE_TIMEOUT" -) - -const ( - initialInterval = 100 // milliseconds - maxInterval = 10 // seconds -) - -// Backoff wraps github.com/cenkalti/backoff -// Wait() is called for each AWS API call that may need back off -// But backoff only occurs if StartBackoff() has previously been called -// Reset() should be called whenever backoff can end. -type Backoff struct { - doBackoff bool - expBackoff *retry.ExponentialBackOff -} - -// Reset ends the exponential backoff -func (b *Backoff) Reset() { - b.doBackoff = false - b.expBackoff.Reset() -} - -// Wait enacts the exponential backoff, if StartBackoff() has been called -func (b *Backoff) Wait() { - if b.doBackoff { - d := b.expBackoff.NextBackOff() - logrus.Debugf("[go plugin] In exponential backoff, waiting %v", d) - time.Sleep(d) - } -} - -// StartBackoff begins exponential backoff -// its a no-op if backoff has already started -func (b *Backoff) StartBackoff() { - b.doBackoff = true -} - -// NewBackoff creates a new Backoff struct with default values -func NewBackoff() *Backoff { - b := retry.NewExponentialBackOff() - b.InitialInterval = initialInterval * time.Millisecond - b.MaxElapsedTime = 0 // The backoff object never expires - b.MaxInterval = maxInterval * time.Second - return &Backoff{ - doBackoff: false, - expBackoff: b, - } -} - -// Timeout is a simple timeout for single-threaded programming -// (Goroutines are expensive in Cgo) -type Timeout struct { - timeoutFunc func(time.Duration) - duration time.Duration - stopTime time.Time - ticking bool - enabled bool -} - -// Start the timer -// this method has no effect if the timer has already been started -func (t *Timeout) Start() { - if t.enabled && !t.ticking { - logrus.Debug("Starting timeout") - t.ticking = true - t.stopTime = time.Now().Add(t.duration) - } -} - -// Reset the timer -func (t *Timeout) Reset() { - t.ticking = false -} - -// Check the timer to see if its timed out -func (t *Timeout) Check() { - logrus.Debug("Checking timeout") - logrus.Debugf("%s left", t.stopTime.Sub(time.Now()).String()) - if t.enabled && t.ticking { - if t.stopTime.Before(time.Now()) { - // run the timeout function - t.timeoutFunc(t.duration) - } - } -} - -// NewTimeout returns a new timeout object -// with a duration set from the env var -// if the env var is not set, then a timer is returned that is disabled (it doesn't do anything) -func NewTimeout(timeoutFunc func(duration time.Duration)) (*Timeout, error) { - if timeout := os.Getenv(sendFailureTimeoutEnvVar); timeout != "" { - duration, err := time.ParseDuration(timeout) - if err != nil { - return nil, err - } - logrus.Debugf("Configuring timer with timeout of %d", duration) - return &Timeout{ - timeoutFunc: timeoutFunc, - duration: duration, - ticking: false, - enabled: true, - }, nil - } - - // timeout not enabled - return &Timeout{ - timeoutFunc: timeoutFunc, - ticking: false, - enabled: false, - }, nil -} - -// SetupLogger sets up Logrus with the log level determined by the Fluent Bit Env Var -func SetupLogger() { - logrus.SetOutput(os.Stdout) - switch strings.ToUpper(os.Getenv(fluentBitLogLevelEnvVar)) { - default: - logrus.SetLevel(logrus.InfoLevel) - case "DEBUG": - logrus.SetLevel(logrus.DebugLevel) - case "INFO": - logrus.SetLevel(logrus.InfoLevel) - case "ERROR": - logrus.SetLevel(logrus.ErrorLevel) - } -} - -// DecodeMap prepares a record for JSON marshalling -// Any []byte will be base64 encoded when marshaled to JSON, so we must directly cast all []byte to string -func DecodeMap(record map[interface{}]interface{}) (map[interface{}]interface{}, error) { - for k, v := range record { - switch t := v.(type) { - case []byte: - // convert all byte slices to strings - record[k] = string(t) - case map[interface{}]interface{}: - decoded, err := DecodeMap(t) - if err != nil { - return nil, err - } - record[k] = decoded - case []interface{}: - decoded, err := decodeSlice(t) - if err != nil { - return nil, err - } - record[k] = decoded - } - } - return record, nil -} - -// DataKeys allows users to specify a list of keys in the record which they want to be sent -// all others are discarded -func DataKeys(input string, record map[interface{}]interface{}) map[interface{}]interface{} { - input = strings.TrimSpace(input) - keys := strings.Split(input, ",") - - for k := range record { - var currentKey string - switch t := k.(type) { - case []byte: - currentKey = string(t) - case string: - currentKey = t - default: - logrus.Debugf("[external plugin]: Unable to determine type of key %v\n", t) - continue - } - - if !contains(keys, currentKey) { - delete(record, k) - } - } - - return record -} - -func decodeSlice(record []interface{}) ([]interface{}, error) { - for i, v := range record { - switch t := v.(type) { - case []byte: - // convert all byte slices to strings - record[i] = string(t) - case map[interface{}]interface{}: - decoded, err := DecodeMap(t) - if err != nil { - return nil, err - } - record[i] = decoded - case []interface{}: - decoded, err := decodeSlice(t) - if err != nil { - return nil, err - } - record[i] = decoded - } - } - return record, nil -} - -func contains(s []string, e string) bool { - for _, a := range s { - if a == e { - return true - } - } - return false -} diff --git a/plugins/plugins_test.go b/plugins/plugins_test.go deleted file mode 100644 index c6b5f4c..0000000 --- a/plugins/plugins_test.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package plugins - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestDecodeMap(t *testing.T) { - sliceVal := []interface{}{ - []byte("Seattle is"), - []byte("rainy"), - } - innerMap := map[interface{}]interface{}{ - "clyde": []byte("best dog"), - "slice_value": sliceVal, - } - record := map[interface{}]interface{}{ - "somekey": []byte("some value"), - "key-with-nested-value": innerMap, - } - - var err error - record, err = DecodeMap(record) - - assert.NoError(t, err, "Unexpected error calling DecodeMap") - - assertTypeIsString(t, record["somekey"]) - assertTypeIsString(t, innerMap["clyde"]) - assertTypeIsString(t, sliceVal[0]) - assertTypeIsString(t, sliceVal[1]) - -} - -func assertTypeIsString(t *testing.T, val interface{}) { - _, ok := val.(string) - assert.True(t, ok, "Expected value to be a string after call to DecodeMap") -} - -func TestDataKeys(t *testing.T) { - record := map[interface{}]interface{}{ - "this": "is a test", - "this is only": "a test", - "dumpling": "is a dog", - "pudding": "is a dog", - "sushi": "is a dog", - "why do": "people name their dogs after food...", - } - - record = DataKeys("dumpling,pudding", record) - - assert.Len(t, record, 2, "Expected record to contain 2 keys") - assert.Equal(t, record["pudding"], "is a dog", "Expected data key to have correct value") - assert.Equal(t, record["dumpling"], "is a dog", "Expected data key to have correct value") -} From c9af980b459962f07d3367a9f8fdd0806803e2f0 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 28 Jun 2019 00:25:35 -0700 Subject: [PATCH 25/28] Address @hencrice comment --- cloudwatch/cloudwatch.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 83c867d..25b010a 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -75,14 +75,15 @@ func (stream *logStream) updateExpiration() { // OutputPlugin is the CloudWatch Logs Fluent Bit output plugin type OutputPlugin struct { - logGroupName string - logStreamPrefix string - logStreamName string - logKey string - client LogsClient - streams map[string]*logStream - backoff *plugins.Backoff - timer *plugins.Timeout + logGroupName string + logStreamPrefix string + logStreamName string + logKey string + client LogsClient + streams map[string]*logStream + backoff *plugins.Backoff + timer *plugins.Timeout + nextLogStreamCleanUpCheckTime time.Time } // OutputPluginConfig is the input information used by NewOutputPlugin to create a new OutputPlugin @@ -156,6 +157,7 @@ func NewOutputPlugin(config OutputPluginConfig) (*OutputPlugin, error) { client: client, timer: timer, streams: make(map[string]*logStream), + nextLogStreamCleanUpCheckTime: time.Now().Add(logStreamInactivityCheckInterval), }, nil } @@ -223,7 +225,7 @@ func (output *OutputPlugin) AddEvent(tag string, record map[interface{}]interfac // Because each stream incurs some memory for its buffer of log events // (Which would be empty for an unused stream) func (output *OutputPlugin) cleanUpExpiredLogStreams() { - if (time.Now().Minute() % logStreamInactivityCheckInterval) == 0 { + if output.nextLogStreamCleanUpCheckTime.Before(time.Now()) { logrus.Debug("[cloudwatch] Checking for expired log streams") for tag, stream := range output.streams { if stream.isExpired() { @@ -231,6 +233,7 @@ func (output *OutputPlugin) cleanUpExpiredLogStreams() { delete(output.streams, tag) } } + output.nextLogStreamCleanUpCheckTime = time.Now().Add(logStreamInactivityCheckInterval) } } From b64082f39a455b3d886af8b2516e69bec841c6c8 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sat, 29 Jun 2019 21:51:51 -0700 Subject: [PATCH 26/28] Bugfix: logStreamInactivityCheckInterval is a duration in minutes --- cloudwatch/cloudwatch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 25b010a..0700881 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -43,7 +43,7 @@ const ( // Log stream objects that are empty and inactive for longer than the timeout get cleaned up logStreamInactivityTimeout = time.Hour // Check for expired log streams every 10 minutes - logStreamInactivityCheckInterval = 10 + logStreamInactivityCheckInterval = 10 * time.Minute ) // LogsClient contains the CloudWatch API calls used by this plugin From 5e5e48623cababe985fc33443a6728e2244dda32 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 30 Jun 2019 18:52:40 -0700 Subject: [PATCH 27/28] Bugfix: fix logic for finding an existing log stream --- cloudwatch/cloudwatch.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 0700881..bfd560a 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -278,10 +278,11 @@ func (output *OutputPlugin) existingLogStream(tag string) (*logStream, error) { logrus.Debugf("[cloudwatch] Initializing internal buffer for exising log stream %s\n", name) output.streams[tag] = stream stream.updateExpiration() // initialize + break } } - if resp.NextToken == nil { + if stream == nil && resp.NextToken == nil { return nil, fmt.Errorf("error: does not compute: Log Stream %s could not be created, but also could not be found in the log group", name) } From 96ac4035ef934ac35bbd81df5c37309bf509b4c6 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 30 Jun 2019 19:02:21 -0700 Subject: [PATCH 28/28] bugfix: store log streams based on their name, not the fluent tag --- cloudwatch/cloudwatch.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index bfd560a..43bb5c1 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -239,7 +239,8 @@ func (output *OutputPlugin) cleanUpExpiredLogStreams() { func (output *OutputPlugin) getLogStream(tag string) (*logStream, error) { // find log stream by tag - stream, ok := output.streams[tag] + name := output.getStreamName(tag) + stream, ok := output.streams[name] if !ok { // stream doesn't exist, create it stream, err := output.createStream(tag) @@ -276,7 +277,7 @@ func (output *OutputPlugin) existingLogStream(tag string) (*logStream, error) { nextSequenceToken: result.UploadSequenceToken, } logrus.Debugf("[cloudwatch] Initializing internal buffer for exising log stream %s\n", name) - output.streams[tag] = stream + output.streams[name] = stream stream.updateExpiration() // initialize break } @@ -337,7 +338,7 @@ func (output *OutputPlugin) createStream(tag string) (*logStream, error) { nextSequenceToken: nil, // sequence token not required for a new log stream } logrus.Debugf("[cloudwatch] Created new log stream %s\n", name) - output.streams[tag] = stream + output.streams[name] = stream stream.updateExpiration() // initialize logrus.Debugf("[cloudwatch] Created log stream %s", name)