From 00e53b0ba9243bb69813c0c1b8a1bf852a4de3f6 Mon Sep 17 00:00:00 2001 From: Dinakar Chappa Date: Thu, 19 Sep 2024 02:49:57 -0400 Subject: [PATCH] removes bounded pusher map in favor of lru. Adds some unit tests and comments, small fixes. --- exporter/awsemfexporter/bounded_pusher_map.go | 76 ---------- .../awsemfexporter/bounded_pusher_map_test.go | 131 ------------------ exporter/awsemfexporter/emf_exporter.go | 19 ++- exporter/awsemfexporter/emf_exporter_test.go | 37 +++-- exporter/awsemfexporter/go.mod | 1 + exporter/awsemfexporter/go.sum | 2 + internal/aws/cwlogs/pusher_test.go | 30 ++++ internal/aws/cwlogs/utils.go | 6 +- 8 files changed, 74 insertions(+), 228 deletions(-) delete mode 100644 exporter/awsemfexporter/bounded_pusher_map.go delete mode 100644 exporter/awsemfexporter/bounded_pusher_map_test.go diff --git a/exporter/awsemfexporter/bounded_pusher_map.go b/exporter/awsemfexporter/bounded_pusher_map.go deleted file mode 100644 index dec5ac3a096a..000000000000 --- a/exporter/awsemfexporter/bounded_pusher_map.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package awsemfexporter - -import ( - "errors" - "time" - - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs" -) - -const ( - pusherMapLimit = 1000 -) - -type BoundedPusherMap struct { - pusherMap map[string]cwlogs.Pusher - limit int - stalePusherTracker map[string]time.Time -} - -func NewBoundedPusherMap() BoundedPusherMap { - return BoundedPusherMap{ - pusherMap: make(map[string]cwlogs.Pusher), - limit: pusherMapLimit, - stalePusherTracker: make(map[string]time.Time), - } -} - -func (bpm *BoundedPusherMap) Add(key string, pusher cwlogs.Pusher, logger *zap.Logger) { - err := bpm.EvictStalePushers() - if err != nil { - logger.Error("Error with evicting stale pushers", zap.Error(err)) - return - } - bpm.pusherMap[key] = pusher - bpm.stalePusherTracker[key] = time.Now() -} - -func (bpm *BoundedPusherMap) Get(key string) (cwlogs.Pusher, bool) { - pusher, ok := bpm.pusherMap[key] - if ok { - bpm.stalePusherTracker[key] = time.Now() - } - return pusher, ok -} - -func (bpm *BoundedPusherMap) EvictStalePushers() error { - if len(bpm.pusherMap) < bpm.limit { - return nil - } - now := time.Now() - for key, lastUsed := range bpm.stalePusherTracker { - if now.Sub(lastUsed) > time.Hour { - delete(bpm.pusherMap, key) - delete(bpm.stalePusherTracker, key) - } - } - // Ideally, we should now be below the pusher limit. If we aren't, especially after deleting pushers older than an hour, - // we should log an error. - if len(bpm.pusherMap) >= bpm.limit { - return errors.New("too many emf pushers being created. Dropping the request") - } - return nil -} - -func (bpm *BoundedPusherMap) ListAllPushers() []cwlogs.Pusher { - var pushers []cwlogs.Pusher - for _, pusher := range bpm.pusherMap { - pushers = append(pushers, pusher) - } - return pushers -} diff --git a/exporter/awsemfexporter/bounded_pusher_map_test.go b/exporter/awsemfexporter/bounded_pusher_map_test.go deleted file mode 100644 index 5f6c2dfa9564..000000000000 --- a/exporter/awsemfexporter/bounded_pusher_map_test.go +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package awsemfexporter - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs" -) - -// MockPusher implements the cwlogs.Pusher interface for testing -type MockPusher struct{} - -func (m MockPusher) AddLogEntry(_ *cwlogs.Event) error { - return nil -} - -func (m MockPusher) ForceFlush() error { - return nil -} - -func TestNewBoundedPusherMap(t *testing.T) { - bpm := NewBoundedPusherMap() - assert.Equal(t, pusherMapLimit, bpm.limit) - assert.Empty(t, bpm.pusherMap) - assert.Empty(t, bpm.stalePusherTracker) -} - -func TestBoundedPusherMap_Add(t *testing.T) { - bpm := NewBoundedPusherMap() - logger := zap.NewNop() - pusher := MockPusher{} - - bpm.Add("key1", pusher, logger) - assert.Len(t, bpm.pusherMap, 1) - assert.Len(t, bpm.stalePusherTracker, 1) - assert.Contains(t, bpm.pusherMap, "key1") - assert.Contains(t, bpm.stalePusherTracker, "key1") -} - -func TestBoundedPusherMap_Get(t *testing.T) { - bpm := NewBoundedPusherMap() - pusher := MockPusher{} - bpm.pusherMap["key1"] = pusher - bpm.stalePusherTracker["key1"] = time.Now().Add(-30 * time.Minute) - - // Test getting an existing pusher - gotPusher, ok := bpm.Get("key1") - assert.True(t, ok) - assert.Equal(t, pusher, gotPusher) - assert.True(t, bpm.stalePusherTracker["key1"].After(time.Now().Add(-1*time.Second))) - - // Test getting a non-existent pusher - gotPusher, ok = bpm.Get("key2") - assert.False(t, ok) - assert.Nil(t, gotPusher) -} - -func TestBoundedPusherMap_EvictStalePushers(t *testing.T) { - bpm := NewBoundedPusherMap() - bpm.limit = 2 - pusher := MockPusher{} - - // Add two pushers, one stale and one fresh - bpm.pusherMap["stale"] = pusher - bpm.stalePusherTracker["stale"] = time.Now().Add(-2 * time.Hour) - bpm.pusherMap["fresh"] = pusher - bpm.stalePusherTracker["fresh"] = time.Now() - - err := bpm.EvictStalePushers() - assert.NoError(t, err) - assert.Len(t, bpm.pusherMap, 1) - assert.Len(t, bpm.stalePusherTracker, 1) - assert.NotContains(t, bpm.pusherMap, "stale") - assert.NotContains(t, bpm.stalePusherTracker, "stale") - assert.Contains(t, bpm.pusherMap, "fresh") - assert.Contains(t, bpm.stalePusherTracker, "fresh") -} - -func TestBoundedPusherMap_EvictStalePushers_Error(t *testing.T) { - bpm := NewBoundedPusherMap() - bpm.limit = 2 - pusher := MockPusher{} - - // Add two fresh pushers - bpm.pusherMap["key1"] = pusher - bpm.pusherMap["key2"] = pusher - bpm.stalePusherTracker["key1"] = time.Now() - bpm.stalePusherTracker["key2"] = time.Now() - - err := bpm.EvictStalePushers() - assert.Error(t, err) - assert.Equal(t, "too many emf pushers being created. Dropping the request", err.Error()) -} - -func TestBoundedPusherMap_ListAllPushers(t *testing.T) { - bpm := NewBoundedPusherMap() - pusher1 := MockPusher{} - pusher2 := MockPusher{} - bpm.Add("key1", pusher1, zap.NewExample()) - bpm.Add("key2", pusher2, zap.NewExample()) - - pushers := bpm.ListAllPushers() - assert.Len(t, pushers, 2) - assert.Contains(t, pushers, pusher1) - assert.Contains(t, pushers, pusher2) -} - -func TestBoundedPusherMap_Add_EvictionError(t *testing.T) { - bpm := NewBoundedPusherMap() - bpm.limit = 1 - logger := zap.NewNop() - pusher := MockPusher{} - - // Add one pusher to reach the limit - bpm.Add("key1", pusher, logger) - - // Try to add another pusher, which should trigger eviction - bpm.Add("key2", pusher, logger) - - // Check that the second pusher was not added due to eviction error - assert.Len(t, bpm.pusherMap, 1) - assert.Len(t, bpm.stalePusherTracker, 1) - assert.Contains(t, bpm.pusherMap, "key1") - assert.NotContains(t, bpm.pusherMap, "key2") -} diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index f14ebfb3aeb7..94431d608bdb 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -13,6 +13,7 @@ import ( "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/google/uuid" + lru "github.com/hashicorp/golang-lru/v2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" @@ -33,10 +34,12 @@ const ( // AppSignals EMF config appSignalsMetricNamespace = "ApplicationSignals" appSignalsLogGroupNamePrefix = "/aws/application-signals/" + + pusherMapLimit = 1000 ) type emfExporter struct { - boundedPusherMap BoundedPusherMap + pusherMap *lru.Cache[string, cwlogs.Pusher] svcStructuredLog *cwlogs.Client config *Config @@ -80,6 +83,12 @@ func newEmfExporter(config *Config, set exporter.Settings) (*emfExporter, error) return nil, err } + boundedPusherMap, err := lru.New[string, cwlogs.Pusher](pusherMapLimit) + + if err != nil { + return nil, err + } + emfExporter := &emfExporter{ svcStructuredLog: svcStructuredLog, config: config, @@ -87,7 +96,7 @@ func newEmfExporter(config *Config, set exporter.Settings) (*emfExporter, error) retryCnt: *awsConfig.MaxRetries, collectorID: collectorIdentifier.String(), processResourceLabels: func(map[string]string) {}, - boundedPusherMap: NewBoundedPusherMap(), + pusherMap: boundedPusherMap, } if config.IsAppSignalsEnabled() { @@ -181,9 +190,9 @@ func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher { var ok bool hash := key.Hash() var pusher cwlogs.Pusher - if pusher, ok = emf.boundedPusherMap.Get(hash); !ok { + if pusher, ok = emf.pusherMap.Get(hash); !ok { pusher = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger) - emf.boundedPusherMap.Add(hash, pusher, emf.config.logger) + emf.pusherMap.Add(hash, pusher) } return pusher @@ -193,7 +202,7 @@ func (emf *emfExporter) listPushers() []cwlogs.Pusher { emf.pusherMapLock.Lock() defer emf.pusherMapLock.Unlock() - return emf.boundedPusherMap.ListAllPushers() + return emf.pusherMap.Values() } func (emf *emfExporter) start(_ context.Context, host component.Host) error { diff --git a/exporter/awsemfexporter/emf_exporter_test.go b/exporter/awsemfexporter/emf_exporter_test.go index f6a66b4d67f5..5837ea5c4205 100644 --- a/exporter/awsemfexporter/emf_exporter_test.go +++ b/exporter/awsemfexporter/emf_exporter_test.go @@ -12,6 +12,7 @@ import ( "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" + lru "github.com/hashicorp/golang-lru/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -197,10 +198,13 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) { streamKey := &cwlogs.StreamKey{ LogGroupName: expCfg.LogGroupName, LogStreamName: expCfg.LogStreamName, - Entity: &cloudwatchlogs.Entity{}, + Entity: &cloudwatchlogs.Entity{ + KeyAttributes: map[string]*string{}, + Attributes: map[string]*string{}, + }, } expectedStreamKeyHash := streamKey.Hash() - pusherMap, ok := exp.boundedPusherMap.Get(expectedStreamKeyHash) + pusherMap, ok := exp.pusherMap.Get(expectedStreamKeyHash) assert.True(t, ok) assert.NotNil(t, pusherMap) } @@ -236,16 +240,17 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) { LogGroupName: "/aws/ecs/containerinsights/test-cluster-name/performance", LogStreamName: "test-task-id", Entity: &cloudwatchlogs.Entity{ - Attributes: map[string]*string{ - "Cluster": aws.String("test-cluster-name"), - }, KeyAttributes: map[string]*string{ "Type": aws.String("Service"), "Name": aws.String("myService"), "Environment": aws.String("myEnvironment"), - }}, + }, + Attributes: map[string]*string{ + "Cluster": aws.String("test-cluster-name"), + }, + }, } - pusherMap, ok := exp.boundedPusherMap.Get(streamKey.Hash()) + pusherMap, ok := exp.pusherMap.Get(streamKey.Hash()) assert.True(t, ok) assert.NotNil(t, pusherMap) } @@ -268,6 +273,7 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) { "Name": aws.String("myService"), "Environment": aws.String("myEnvironment"), }, + Attributes: map[string]*string{}, } md := generateTestMetrics(testMetric{ @@ -288,7 +294,7 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) { LogStreamName: "test-task-id", Entity: entity, } - pusherMap, ok := exp.boundedPusherMap.Get(streamKey.Hash()) + pusherMap, ok := exp.pusherMap.Get(streamKey.Hash()) assert.True(t, ok) assert.NotNil(t, pusherMap) } @@ -324,13 +330,14 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) { LogStreamName: expCfg.LogStreamName, Entity: &cloudwatchlogs.Entity{ KeyAttributes: map[string]*string{ - "Type": aws.String("Service"), "Name": aws.String("myService"), "Environment": aws.String("myEnvironment"), + "Type": aws.String("Service"), }, + Attributes: map[string]*string{}, }, } - pusherMap, ok := exp.boundedPusherMap.Get(streamKey.Hash()) + pusherMap, ok := exp.pusherMap.Get(streamKey.Hash()) assert.True(t, ok) assert.NotNil(t, pusherMap) } @@ -354,13 +361,17 @@ func TestPushMetricsDataWithErr(t *testing.T) { logPusher.On("ForceFlush", nil).Return("some error").Once() logPusher.On("ForceFlush", nil).Return("").Once() logPusher.On("ForceFlush", nil).Return("some error").Once() - exp.boundedPusherMap = NewBoundedPusherMap() + exp.pusherMap, err = lru.New[string, cwlogs.Pusher](pusherMapLimit) + assert.Nil(t, err) streamKey := cwlogs.StreamKey{ LogGroupName: "test-logGroupName", LogStreamName: "test-logStreamName", - Entity: &cloudwatchlogs.Entity{}, + Entity: &cloudwatchlogs.Entity{ + Attributes: map[string]*string{}, + KeyAttributes: map[string]*string{}, + }, } - exp.boundedPusherMap.Add(streamKey.Hash(), logPusher, zap.NewExample()) + exp.pusherMap.Add(streamKey.Hash(), logPusher) md := generateTestMetrics(testMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, diff --git a/exporter/awsemfexporter/go.mod b/exporter/awsemfexporter/go.mod index 5b13b3931fbf..02d5377dd383 100644 --- a/exporter/awsemfexporter/go.mod +++ b/exporter/awsemfexporter/go.mod @@ -6,6 +6,7 @@ require ( github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware v0.0.0-20240419190856-2f880467f335 github.com/aws/aws-sdk-go v1.53.11 github.com/google/uuid v1.6.0 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jellydator/ttlcache/v3 v3.2.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil v0.103.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs v0.103.0 diff --git a/exporter/awsemfexporter/go.sum b/exporter/awsemfexporter/go.sum index 245fdca1ee52..354bd4f8e403 100644 --- a/exporter/awsemfexporter/go.sum +++ b/exporter/awsemfexporter/go.sum @@ -47,6 +47,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE= github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= diff --git a/internal/aws/cwlogs/pusher_test.go b/internal/aws/cwlogs/pusher_test.go index 4aaab00dbef4..d11c68372d44 100644 --- a/internal/aws/cwlogs/pusher_test.go +++ b/internal/aws/cwlogs/pusher_test.go @@ -6,6 +6,7 @@ package cwlogs import ( "fmt" "math/rand" + "reflect" "strings" "testing" "time" @@ -280,3 +281,32 @@ func TestMultiStreamPusher(t *testing.T) { assert.Equal(t, "foo", *inputs[1].LogGroupName) assert.Equal(t, "bar2", *inputs[1].LogStreamName) } + +func TestStreamKeyFieldCount(t *testing.T) { + expectedFields := map[string]string{ + "LogGroupName": "string", + "LogStreamName": "string", + "Entity": "*cloudwatchlogs.Entity", + } + testStructFields(t, reflect.TypeOf(StreamKey{}), expectedFields, "StreamKey") +} + +func TestEntityFieldCount(t *testing.T) { + expectedFields := map[string]string{ + "_": "struct {}", + "KeyAttributes": "map[string]*string", + "Attributes": "map[string]*string", + } + testStructFields(t, reflect.TypeOf(cloudwatchlogs.Entity{}), expectedFields, "Entity") +} + +func testStructFields(t *testing.T, structType reflect.Type, expectedFields map[string]string, structName string) { + assert.Equal(t, len(expectedFields), structType.NumField(), "%s should have exactly %d fields", structName, len(expectedFields)) + + for i := 0; i < structType.NumField(); i++ { + field := structType.Field(i) + expectedType, exists := expectedFields[field.Name] + assert.True(t, exists, "Unexpected field in %s: %s", structName, field.Name) + assert.Equal(t, expectedType, field.Type.String(), "Incorrect type for field %s in %s", field.Name, structName) + } +} diff --git a/internal/aws/cwlogs/utils.go b/internal/aws/cwlogs/utils.go index 5b6cfb41a052..6aea976a6bd9 100644 --- a/internal/aws/cwlogs/utils.go +++ b/internal/aws/cwlogs/utils.go @@ -76,12 +76,12 @@ func mapToString(m map[string]*string) string { if m == nil { return "" } - var pairs []string + pairs := make([]string, 0, len(m)) for k, v := range m { if v == nil { - pairs = append(pairs, fmt.Sprintf("%s:", k)) + pairs = append(pairs, k+":") } else { - pairs = append(pairs, fmt.Sprintf("%s:%s", k, *v)) + pairs = append(pairs, k+":"+*v) } } sort.Strings(pairs) // Ensure a consistent order