From fb639eda5922bc1f993cc14969f8dfcf93079631 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 23 Jun 2019 20:06:50 -0700 Subject: [PATCH] Import shared plugins library from Kinesis Firehose plugin --- cloudwatch/cloudwatch.go | 2 +- cloudwatch/cloudwatch_test.go | 2 +- fluent-bit-cloudwatch.go | 2 +- go.mod | 5 +- go.sum | 3 + plugins/plugins.go | 238 ---------------------------------- plugins/plugins_test.go | 68 ---------- 7 files changed, 9 insertions(+), 311 deletions(-) delete mode 100644 plugins/plugins.go delete mode 100644 plugins/plugins_test.go diff --git a/cloudwatch/cloudwatch.go b/cloudwatch/cloudwatch.go index 9c8e554..83c867d 100644 --- a/cloudwatch/cloudwatch.go +++ b/cloudwatch/cloudwatch.go @@ -20,7 +20,7 @@ import ( "time" "unicode/utf8" - "github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/plugins" + "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials/stscreds" diff --git a/cloudwatch/cloudwatch_test.go b/cloudwatch/cloudwatch_test.go index a230f40..af034f5 100644 --- a/cloudwatch/cloudwatch_test.go +++ b/cloudwatch/cloudwatch_test.go @@ -20,7 +20,7 @@ import ( "time" "github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch/mock_cloudwatch" - "github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/plugins" + "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" diff --git a/fluent-bit-cloudwatch.go b/fluent-bit-cloudwatch.go index fbac1de..f6646c5 100644 --- a/fluent-bit-cloudwatch.go +++ b/fluent-bit-cloudwatch.go @@ -21,7 +21,7 @@ import ( "time" "github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/cloudwatch" - "github.com/aws/amazon-cloudwatch-logs-for-fluent-bit/plugins" + "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/fluent/fluent-bit-go/output" "github.com/sirupsen/logrus" diff --git a/go.mod b/go.mod index b96cc27..db5a317 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,12 @@ module github.com/aws/amazon-cloudwatch-logs-for-fluent-bit go 1.12 require ( + github.com/aws/amazon-kinesis-firehose-for-fluent-bit v0.0.0-20190624030122-c41b42995068 github.com/aws/aws-sdk-go v1.20.6 github.com/cenkalti/backoff v2.1.1+incompatible github.com/fluent/fluent-bit-go v0.0.0-20190614024040-c017a8579953 + github.com/golang/mock v1.3.1 github.com/json-iterator/go v1.1.6 - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.1 // indirect github.com/sirupsen/logrus v1.4.2 + github.com/stretchr/testify v1.2.2 ) diff --git a/go.sum b/go.sum index 2c98350..fd6ee62 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/aws/amazon-kinesis-firehose-for-fluent-bit v0.0.0-20190624030122-c41b42995068 h1:cMoOT6QgDnReGqOcg7+d2YtMpamUhijuubTHGy9dlq8= +github.com/aws/amazon-kinesis-firehose-for-fluent-bit v0.0.0-20190624030122-c41b42995068/go.mod h1:EGtphTrA5BAr9VKTGmK6P+3/ny4l8wTU2tLjqDyP1F8= github.com/aws/aws-sdk-go v1.20.6 h1:kmy4Gvdlyez1fV4kw5RYxZzWKVyuHZHgPWeU/YvRsV4= github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/cenkalti/backoff v2.1.1+incompatible h1:tKJnvO2kl0zmb/jA5UKAt4VoEVw1qxKWjE/Bpp46npY= @@ -24,6 +26,7 @@ github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/plugins/plugins.go b/plugins/plugins.go deleted file mode 100644 index 1471d7a..0000000 --- a/plugins/plugins.go +++ /dev/null @@ -1,238 +0,0 @@ -// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -// Package plugins contains functions that are useful across fluent bit plugins. -// This package will be imported by the CloudWatch Logs and Kinesis Data Streams plugins. -package plugins - -import ( - "os" - "strings" - "time" - - retry "github.com/cenkalti/backoff" - "github.com/sirupsen/logrus" -) - -const ( - fluentBitLogLevelEnvVar = "FLB_LOG_LEVEL" - sendFailureTimeoutEnvVar = "SEND_FAILURE_TIMEOUT" -) - -const ( - initialInterval = 100 // milliseconds - maxInterval = 10 // seconds -) - -// Backoff wraps github.com/cenkalti/backoff -// Wait() is called for each AWS API call that may need back off -// But backoff only occurs if StartBackoff() has previously been called -// Reset() should be called whenever backoff can end. -type Backoff struct { - doBackoff bool - expBackoff *retry.ExponentialBackOff -} - -// Reset ends the exponential backoff -func (b *Backoff) Reset() { - b.doBackoff = false - b.expBackoff.Reset() -} - -// Wait enacts the exponential backoff, if StartBackoff() has been called -func (b *Backoff) Wait() { - if b.doBackoff { - d := b.expBackoff.NextBackOff() - logrus.Debugf("[go plugin] In exponential backoff, waiting %v", d) - time.Sleep(d) - } -} - -// StartBackoff begins exponential backoff -// its a no-op if backoff has already started -func (b *Backoff) StartBackoff() { - b.doBackoff = true -} - -// NewBackoff creates a new Backoff struct with default values -func NewBackoff() *Backoff { - b := retry.NewExponentialBackOff() - b.InitialInterval = initialInterval * time.Millisecond - b.MaxElapsedTime = 0 // The backoff object never expires - b.MaxInterval = maxInterval * time.Second - return &Backoff{ - doBackoff: false, - expBackoff: b, - } -} - -// Timeout is a simple timeout for single-threaded programming -// (Goroutines are expensive in Cgo) -type Timeout struct { - timeoutFunc func(time.Duration) - duration time.Duration - stopTime time.Time - ticking bool - enabled bool -} - -// Start the timer -// this method has no effect if the timer has already been started -func (t *Timeout) Start() { - if t.enabled && !t.ticking { - logrus.Debug("Starting timeout") - t.ticking = true - t.stopTime = time.Now().Add(t.duration) - } -} - -// Reset the timer -func (t *Timeout) Reset() { - t.ticking = false -} - -// Check the timer to see if its timed out -func (t *Timeout) Check() { - logrus.Debug("Checking timeout") - logrus.Debugf("%s left", t.stopTime.Sub(time.Now()).String()) - if t.enabled && t.ticking { - if t.stopTime.Before(time.Now()) { - // run the timeout function - t.timeoutFunc(t.duration) - } - } -} - -// NewTimeout returns a new timeout object -// with a duration set from the env var -// if the env var is not set, then a timer is returned that is disabled (it doesn't do anything) -func NewTimeout(timeoutFunc func(duration time.Duration)) (*Timeout, error) { - if timeout := os.Getenv(sendFailureTimeoutEnvVar); timeout != "" { - duration, err := time.ParseDuration(timeout) - if err != nil { - return nil, err - } - logrus.Debugf("Configuring timer with timeout of %d", duration) - return &Timeout{ - timeoutFunc: timeoutFunc, - duration: duration, - ticking: false, - enabled: true, - }, nil - } - - // timeout not enabled - return &Timeout{ - timeoutFunc: timeoutFunc, - ticking: false, - enabled: false, - }, nil -} - -// SetupLogger sets up Logrus with the log level determined by the Fluent Bit Env Var -func SetupLogger() { - logrus.SetOutput(os.Stdout) - switch strings.ToUpper(os.Getenv(fluentBitLogLevelEnvVar)) { - default: - logrus.SetLevel(logrus.InfoLevel) - case "DEBUG": - logrus.SetLevel(logrus.DebugLevel) - case "INFO": - logrus.SetLevel(logrus.InfoLevel) - case "ERROR": - logrus.SetLevel(logrus.ErrorLevel) - } -} - -// DecodeMap prepares a record for JSON marshalling -// Any []byte will be base64 encoded when marshaled to JSON, so we must directly cast all []byte to string -func DecodeMap(record map[interface{}]interface{}) (map[interface{}]interface{}, error) { - for k, v := range record { - switch t := v.(type) { - case []byte: - // convert all byte slices to strings - record[k] = string(t) - case map[interface{}]interface{}: - decoded, err := DecodeMap(t) - if err != nil { - return nil, err - } - record[k] = decoded - case []interface{}: - decoded, err := decodeSlice(t) - if err != nil { - return nil, err - } - record[k] = decoded - } - } - return record, nil -} - -// DataKeys allows users to specify a list of keys in the record which they want to be sent -// all others are discarded -func DataKeys(input string, record map[interface{}]interface{}) map[interface{}]interface{} { - input = strings.TrimSpace(input) - keys := strings.Split(input, ",") - - for k := range record { - var currentKey string - switch t := k.(type) { - case []byte: - currentKey = string(t) - case string: - currentKey = t - default: - logrus.Debugf("[external plugin]: Unable to determine type of key %v\n", t) - continue - } - - if !contains(keys, currentKey) { - delete(record, k) - } - } - - return record -} - -func decodeSlice(record []interface{}) ([]interface{}, error) { - for i, v := range record { - switch t := v.(type) { - case []byte: - // convert all byte slices to strings - record[i] = string(t) - case map[interface{}]interface{}: - decoded, err := DecodeMap(t) - if err != nil { - return nil, err - } - record[i] = decoded - case []interface{}: - decoded, err := decodeSlice(t) - if err != nil { - return nil, err - } - record[i] = decoded - } - } - return record, nil -} - -func contains(s []string, e string) bool { - for _, a := range s { - if a == e { - return true - } - } - return false -} diff --git a/plugins/plugins_test.go b/plugins/plugins_test.go deleted file mode 100644 index c6b5f4c..0000000 --- a/plugins/plugins_test.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package plugins - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestDecodeMap(t *testing.T) { - sliceVal := []interface{}{ - []byte("Seattle is"), - []byte("rainy"), - } - innerMap := map[interface{}]interface{}{ - "clyde": []byte("best dog"), - "slice_value": sliceVal, - } - record := map[interface{}]interface{}{ - "somekey": []byte("some value"), - "key-with-nested-value": innerMap, - } - - var err error - record, err = DecodeMap(record) - - assert.NoError(t, err, "Unexpected error calling DecodeMap") - - assertTypeIsString(t, record["somekey"]) - assertTypeIsString(t, innerMap["clyde"]) - assertTypeIsString(t, sliceVal[0]) - assertTypeIsString(t, sliceVal[1]) - -} - -func assertTypeIsString(t *testing.T, val interface{}) { - _, ok := val.(string) - assert.True(t, ok, "Expected value to be a string after call to DecodeMap") -} - -func TestDataKeys(t *testing.T) { - record := map[interface{}]interface{}{ - "this": "is a test", - "this is only": "a test", - "dumpling": "is a dog", - "pudding": "is a dog", - "sushi": "is a dog", - "why do": "people name their dogs after food...", - } - - record = DataKeys("dumpling,pudding", record) - - assert.Len(t, record, 2, "Expected record to contain 2 keys") - assert.Equal(t, record["pudding"], "is a dog", "Expected data key to have correct value") - assert.Equal(t, record["dumpling"], "is a dog", "Expected data key to have correct value") -}