Skip to content

Commit

Permalink
Exports Entity for awsemfexporter plugins on PutLogEvent calls (#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
dchappa authored Sep 20, 2024
1 parent 69bd759 commit 299ef87
Show file tree
Hide file tree
Showing 12 changed files with 429 additions and 44 deletions.
29 changes: 19 additions & 10 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
Expand All @@ -33,10 +34,12 @@ const (
// AppSignals EMF config
appSignalsMetricNamespace = "ApplicationSignals"
appSignalsLogGroupNamePrefix = "/aws/application-signals/"

pusherMapLimit = 1000
)

type emfExporter struct {
pusherMap map[cwlogs.StreamKey]cwlogs.Pusher
pusherMap *lru.Cache[string, cwlogs.Pusher]
svcStructuredLog *cwlogs.Client
config *Config

Expand Down Expand Up @@ -80,14 +83,20 @@ func newEmfExporter(config *Config, set exporter.Settings) (*emfExporter, error)
return nil, err
}

boundedPusherMap, err := lru.New[string, cwlogs.Pusher](pusherMapLimit)

if err != nil {
return nil, err
}

emfExporter := &emfExporter{
svcStructuredLog: svcStructuredLog,
config: config,
metricTranslator: newMetricTranslator(*config),
retryCnt: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
pusherMap: map[cwlogs.StreamKey]cwlogs.Pusher{},
processResourceLabels: func(map[string]string) {},
pusherMap: boundedPusherMap,
}

if config.IsAppSignalsEnabled() {
Expand Down Expand Up @@ -179,21 +188,21 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e

func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher {
var ok bool
if _, ok = emf.pusherMap[key]; !ok {
emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
hash := key.Hash()
var pusher cwlogs.Pusher
if pusher, ok = emf.pusherMap.Get(hash); !ok {
pusher = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger)
emf.pusherMap.Add(hash, pusher)
}
return emf.pusherMap[key]

return pusher
}

func (emf *emfExporter) listPushers() []cwlogs.Pusher {
emf.pusherMapLock.Lock()
defer emf.pusherMapLock.Unlock()

var pushers []cwlogs.Pusher
for _, pusher := range emf.pusherMap {
pushers = append(pushers, pusher)
}
return pushers
return emf.pusherMap.Values()
}

func (emf *emfExporter) start(_ context.Context, host component.Host) error {
Expand Down
90 changes: 72 additions & 18 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"testing"

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -23,6 +25,7 @@ import (
"go.uber.org/zap/zaptest/observer"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs"
)

const defaultRetryCount = 1
Expand Down Expand Up @@ -192,10 +195,16 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) {
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
streamKey := &cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
LogStreamName: expCfg.LogStreamName,
}]
Entity: &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{},
Attributes: map[string]*string{},
},
}
expectedStreamKeyHash := streamKey.Hash()
pusherMap, ok := exp.pusherMap.Get(expectedStreamKeyHash)
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -217,16 +226,31 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) {
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
keyAttributeEntityServiceName: "myService",
keyAttributeEntityDeploymentEnvironment: "myEnvironment",
attributeEntityCluster: "test-cluster-name",
keyAttributeEntityType: "Service",
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
streamKey := &cwlogs.StreamKey{
LogGroupName: "/aws/ecs/containerinsights/test-cluster-name/performance",
LogStreamName: "test-task-id",
}]
Entity: &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
"Type": aws.String("Service"),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
},
Attributes: map[string]*string{
"Cluster": aws.String("test-cluster-name"),
},
},
}
pusherMap, ok := exp.pusherMap.Get(streamKey.Hash())
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -243,21 +267,34 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) {
exp, err := newEmfExporter(expCfg, exportertest.NewNopSettings())
assert.NoError(t, err)
assert.NotNil(t, exp)
var entity = &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
"Type": aws.String("Service"),
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
},
Attributes: map[string]*string{},
}

md := generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
keyAttributeEntityServiceName: "myService",
keyAttributeEntityDeploymentEnvironment: "myEnvironment",
keyAttributeEntityType: service,
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
streamKey := cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
LogStreamName: "test-task-id",
}]
Entity: entity,
}
pusherMap, ok := exp.pusherMap.Get(streamKey.Hash())
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -279,16 +316,28 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) {
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
keyAttributeEntityType: service,
keyAttributeEntityServiceName: "myService",
keyAttributeEntityDeploymentEnvironment: "myEnvironment",
},
})
require.Error(t, exp.pushMetricsData(ctx, md))
require.NoError(t, exp.shutdown(ctx))
pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{
streamKey := cwlogs.StreamKey{
LogGroupName: expCfg.LogGroupName,
LogStreamName: expCfg.LogStreamName,
}]
Entity: &cloudwatchlogs.Entity{
KeyAttributes: map[string]*string{
"Name": aws.String("myService"),
"Environment": aws.String("myEnvironment"),
"Type": aws.String("Service"),
},
Attributes: map[string]*string{},
},
}
pusherMap, ok := exp.pusherMap.Get(streamKey.Hash())
assert.True(t, ok)
assert.NotNil(t, pusherMap)
}
Expand All @@ -312,12 +361,17 @@ func TestPushMetricsDataWithErr(t *testing.T) {
logPusher.On("ForceFlush", nil).Return("some error").Once()
logPusher.On("ForceFlush", nil).Return("").Once()
logPusher.On("ForceFlush", nil).Return("some error").Once()
exp.pusherMap = map[cwlogs.StreamKey]cwlogs.Pusher{}
exp.pusherMap[cwlogs.StreamKey{
exp.pusherMap, err = lru.New[string, cwlogs.Pusher](pusherMapLimit)
assert.Nil(t, err)
streamKey := cwlogs.StreamKey{
LogGroupName: "test-logGroupName",
LogStreamName: "test-logStreamName",
}] = logPusher

Entity: &cloudwatchlogs.Entity{
Attributes: map[string]*string{},
KeyAttributes: map[string]*string{},
},
}
exp.pusherMap.Add(streamKey.Hash(), logPusher)
md := generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
Expand Down
1 change: 1 addition & 0 deletions exporter/awsemfexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware v0.0.0-20240419190856-2f880467f335
github.com/aws/aws-sdk-go v1.53.11
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jellydator/ttlcache/v3 v3.2.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil v0.103.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs v0.103.0
Expand Down
2 changes: 2 additions & 0 deletions exporter/awsemfexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 299ef87

Please sign in to comment.