Skip to content

Commit

Permalink
removes bounded pusher map in favor of lru. Adds some unit tests and …
Browse files Browse the repository at this point in the history
…comments, small fixes.
  • Loading branch information
dchappa committed Sep 19, 2024
1 parent a1cd2fc commit 00e53b0
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 228 deletions.
76 changes: 0 additions & 76 deletions exporter/awsemfexporter/bounded_pusher_map.go

This file was deleted.

131 changes: 0 additions & 131 deletions exporter/awsemfexporter/bounded_pusher_map_test.go

This file was deleted.

19 changes: 14 additions & 5 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
Expand All @@ -33,10 +34,12 @@ const (
// AppSignals EMF config
appSignalsMetricNamespace = "ApplicationSignals"
appSignalsLogGroupNamePrefix = "/aws/application-signals/"

pusherMapLimit = 1000
)

type emfExporter struct {
boundedPusherMap BoundedPusherMap
pusherMap *lru.Cache[string, cwlogs.Pusher]
svcStructuredLog *cwlogs.Client
config *Config

Expand Down Expand Up @@ -80,14 +83,20 @@ func newEmfExporter(config *Config, set exporter.Settings) (*emfExporter, error)
return nil, err
}

boundedPusherMap, err := lru.New[string, cwlogs.Pusher](pusherMapLimit)

if err != nil {
return nil, err
}

emfExporter := &emfExporter{
svcStructuredLog: svcStructuredLog,
config: config,
metricTranslator: newMetricTranslator(*config),
retryCnt: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
processResourceLabels: func(map[string]string) {},
boundedPusherMap: NewBoundedPusherMap(),
pusherMap: boundedPusherMap,
}

if config.IsAppSignalsEnabled() {
Expand Down Expand Up @@ -181,9 +190,9 @@ func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher {
var ok bool
hash := key.Hash()
var pusher cwlogs.Pusher
if pusher, ok = emf.boundedPusherMap.Get(hash); !ok {
if pusher, ok = emf.pusherMap.Get(hash); !ok {
pusher = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
emf.boundedPusherMap.Add(hash, pusher, emf.config.logger)
emf.pusherMap.Add(hash, pusher)
}

return pusher
Expand All @@ -193,7 +202,7 @@ func (emf *emfExporter) listPushers() []cwlogs.Pusher {
emf.pusherMapLock.Lock()
defer emf.pusherMapLock.Unlock()

return emf.boundedPusherMap.ListAllPushers()
return emf.pusherMap.Values()
}

func (emf *emfExporter) start(_ context.Context, host component.Host) error {
Expand Down
37 changes: 24 additions & 13 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -197,10 +198,13 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) {
streamKey := &cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
LogStreamName: expCfg.LogStreamName,
Entity: &cloudwatchlogs.Entity{},
Entity: &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{},
Attributes: map[string]*string{},
},
}
expectedStreamKeyHash := streamKey.Hash()
pusherMap, ok := exp.boundedPusherMap.Get(expectedStreamKeyHash)
pusherMap, ok := exp.pusherMap.Get(expectedStreamKeyHash)
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand Down Expand Up @@ -236,16 +240,17 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) {
LogGroupName: "/aws/ecs/containerinsights/test-cluster-name/performance",
LogStreamName: "test-task-id",
Entity: &cloudwatchlogs.Entity{
Attributes: map[string]*string{
"Cluster": aws.String("test-cluster-name"),
},
KeyAttributes: map[string]*string{
"Type": aws.String("Service"),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
}},
},
Attributes: map[string]*string{
"Cluster": aws.String("test-cluster-name"),
},
},
}
pusherMap, ok := exp.boundedPusherMap.Get(streamKey.Hash())
pusherMap, ok := exp.pusherMap.Get(streamKey.Hash())
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -268,6 +273,7 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) {
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
},
Attributes: map[string]*string{},
}

md := generateTestMetrics(testMetric{
Expand All @@ -288,7 +294,7 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) {
LogStreamName: "test-task-id",
Entity: entity,
}
pusherMap, ok := exp.boundedPusherMap.Get(streamKey.Hash())
pusherMap, ok := exp.pusherMap.Get(streamKey.Hash())
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand Down Expand Up @@ -324,13 +330,14 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) {
LogStreamName: expCfg.LogStreamName,
Entity: &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
"Type": aws.String("Service"),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
"Type": aws.String("Service"),
},
Attributes: map[string]*string{},
},
}
pusherMap, ok := exp.boundedPusherMap.Get(streamKey.Hash())
pusherMap, ok := exp.pusherMap.Get(streamKey.Hash())
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -354,13 +361,17 @@ func TestPushMetricsDataWithErr(t *testing.T) {
logPusher.On("ForceFlush", nil).Return("some error").Once()
logPusher.On("ForceFlush", nil).Return("").Once()
logPusher.On("ForceFlush", nil).Return("some error").Once()
exp.boundedPusherMap = NewBoundedPusherMap()
exp.pusherMap, err = lru.New[string, cwlogs.Pusher](pusherMapLimit)
assert.Nil(t, err)
streamKey := cwlogs.StreamKey{
LogGroupName: "test-logGroupName",
LogStreamName: "test-logStreamName",
Entity: &cloudwatchlogs.Entity{},
Entity: &cloudwatchlogs.Entity{
Attributes: map[string]*string{},
KeyAttributes: map[string]*string{},
},
}
exp.boundedPusherMap.Add(streamKey.Hash(), logPusher, zap.NewExample())
exp.pusherMap.Add(streamKey.Hash(), logPusher)
md := generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
Expand Down
1 change: 1 addition & 0 deletions exporter/awsemfexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware v0.0.0-20240419190856-2f880467f335
github.com/aws/aws-sdk-go v1.53.11
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jellydator/ttlcache/v3 v3.2.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil v0.103.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs v0.103.0
Expand Down
2 changes: 2 additions & 0 deletions exporter/awsemfexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 00e53b0

Please sign in to comment.