From 299ef870ed829017717d16c5772de1e2a1e347ed Mon Sep 17 00:00:00 2001 From: Dinakar Chappa Date: Fri, 20 Sep 2024 10:28:11 -0400 Subject: [PATCH] Exports Entity for awsemfexporter plugins on PutLogEvent calls (#233) --- exporter/awsemfexporter/emf_exporter.go | 29 +++-- exporter/awsemfexporter/emf_exporter_test.go | 90 ++++++++++++---- exporter/awsemfexporter/go.mod | 1 + exporter/awsemfexporter/go.sum | 2 + exporter/awsemfexporter/metric_translator.go | 71 +++++++++++- .../awsemfexporter/metric_translator_test.go | 30 ++++++ exporter/awsemfexporter/util.go | 14 +++ exporter/awsemfexporter/util_test.go | 101 ++++++++++++++++++ internal/aws/cwlogs/cwlog_client_test.go | 26 +++++ internal/aws/cwlogs/pusher.go | 58 +++++++--- internal/aws/cwlogs/pusher_test.go | 32 ++++++ internal/aws/cwlogs/utils.go | 19 ++++ 12 files changed, 429 insertions(+), 44 deletions(-) diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index 32a62dc33c36..94431d608bdb 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -13,6 +13,7 @@ import ( "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/google/uuid" + lru "github.com/hashicorp/golang-lru/v2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" @@ -33,10 +34,12 @@ const ( // AppSignals EMF config appSignalsMetricNamespace = "ApplicationSignals" appSignalsLogGroupNamePrefix = "/aws/application-signals/" + + pusherMapLimit = 1000 ) type emfExporter struct { - pusherMap map[cwlogs.StreamKey]cwlogs.Pusher + pusherMap *lru.Cache[string, cwlogs.Pusher] svcStructuredLog *cwlogs.Client config *Config @@ -80,14 +83,20 @@ func newEmfExporter(config *Config, set exporter.Settings) (*emfExporter, error) return nil, err } + boundedPusherMap, err := lru.New[string, cwlogs.Pusher](pusherMapLimit) + + if err != nil { + return nil, err + } + emfExporter := &emfExporter{ svcStructuredLog: svcStructuredLog, config: config, metricTranslator: newMetricTranslator(*config), retryCnt: *awsConfig.MaxRetries, collectorID: collectorIdentifier.String(), - pusherMap: map[cwlogs.StreamKey]cwlogs.Pusher{}, processResourceLabels: func(map[string]string) {}, + pusherMap: boundedPusherMap, } if config.IsAppSignalsEnabled() { @@ -179,21 +188,21 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher { var ok bool - if _, ok = emf.pusherMap[key]; !ok { - emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger) + hash := key.Hash() + var pusher cwlogs.Pusher + if pusher, ok = emf.pusherMap.Get(hash); !ok { + pusher = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.config.logger) + emf.pusherMap.Add(hash, pusher) } - return emf.pusherMap[key] + + return pusher } func (emf *emfExporter) listPushers() []cwlogs.Pusher { emf.pusherMapLock.Lock() defer emf.pusherMapLock.Unlock() - var pushers []cwlogs.Pusher - for _, pusher := range emf.pusherMap { - pushers = append(pushers, pusher) - } - return pushers + return emf.pusherMap.Values() } func (emf *emfExporter) start(_ context.Context, host component.Host) error { diff --git a/exporter/awsemfexporter/emf_exporter_test.go b/exporter/awsemfexporter/emf_exporter_test.go index ecb2d54036df..5837ea5c4205 100644 --- a/exporter/awsemfexporter/emf_exporter_test.go +++ b/exporter/awsemfexporter/emf_exporter_test.go @@ -10,7 +10,9 @@ import ( "testing" "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" + lru "github.com/hashicorp/golang-lru/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -23,6 +25,7 @@ import ( "go.uber.org/zap/zaptest/observer" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs" ) const defaultRetryCount = 1 @@ -192,10 +195,16 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) { }) require.Error(t, exp.pushMetricsData(ctx, md)) require.NoError(t, exp.shutdown(ctx)) - pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{ + streamKey := &cwlogs.StreamKey{ LogGroupName: expCfg.LogGroupName, LogStreamName: expCfg.LogStreamName, - }] + Entity: &cloudwatchlogs.Entity{ + KeyAttributes: map[string]*string{}, + Attributes: map[string]*string{}, + }, + } + expectedStreamKeyHash := streamKey.Hash() + pusherMap, ok := exp.pusherMap.Get(expectedStreamKeyHash) assert.True(t, ok) assert.NotNil(t, pusherMap) } @@ -217,16 +226,31 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) { metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, resourceAttributeMap: map[string]any{ - "aws.ecs.cluster.name": "test-cluster-name", - "aws.ecs.task.id": "test-task-id", + "aws.ecs.cluster.name": "test-cluster-name", + "aws.ecs.task.id": "test-task-id", + keyAttributeEntityServiceName: "myService", + keyAttributeEntityDeploymentEnvironment: "myEnvironment", + attributeEntityCluster: "test-cluster-name", + keyAttributeEntityType: "Service", }, }) require.Error(t, exp.pushMetricsData(ctx, md)) require.NoError(t, exp.shutdown(ctx)) - pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{ + streamKey := &cwlogs.StreamKey{ LogGroupName: "/aws/ecs/containerinsights/test-cluster-name/performance", LogStreamName: "test-task-id", - }] + Entity: &cloudwatchlogs.Entity{ + KeyAttributes: map[string]*string{ + "Type": aws.String("Service"), + "Name": aws.String("myService"), + "Environment": aws.String("myEnvironment"), + }, + Attributes: map[string]*string{ + "Cluster": aws.String("test-cluster-name"), + }, + }, + } + pusherMap, ok := exp.pusherMap.Get(streamKey.Hash()) assert.True(t, ok) assert.NotNil(t, pusherMap) } @@ -243,21 +267,34 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) { exp, err := newEmfExporter(expCfg, exportertest.NewNopSettings()) assert.NoError(t, err) assert.NotNil(t, exp) + var entity = &cloudwatchlogs.Entity{ + KeyAttributes: map[string]*string{ + "Type": aws.String("Service"), + "Name": aws.String("myService"), + "Environment": aws.String("myEnvironment"), + }, + Attributes: map[string]*string{}, + } md := generateTestMetrics(testMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, resourceAttributeMap: map[string]any{ - "aws.ecs.cluster.name": "test-cluster-name", - "aws.ecs.task.id": "test-task-id", + "aws.ecs.cluster.name": "test-cluster-name", + "aws.ecs.task.id": "test-task-id", + keyAttributeEntityServiceName: "myService", + keyAttributeEntityDeploymentEnvironment: "myEnvironment", + keyAttributeEntityType: service, }, }) require.Error(t, exp.pushMetricsData(ctx, md)) require.NoError(t, exp.shutdown(ctx)) - pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{ + streamKey := cwlogs.StreamKey{ LogGroupName: expCfg.LogGroupName, LogStreamName: "test-task-id", - }] + Entity: entity, + } + pusherMap, ok := exp.pusherMap.Get(streamKey.Hash()) assert.True(t, ok) assert.NotNil(t, pusherMap) } @@ -279,16 +316,28 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) { metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, resourceAttributeMap: map[string]any{ - "aws.ecs.cluster.name": "test-cluster-name", - "aws.ecs.task.id": "test-task-id", + "aws.ecs.cluster.name": "test-cluster-name", + "aws.ecs.task.id": "test-task-id", + keyAttributeEntityType: service, + keyAttributeEntityServiceName: "myService", + keyAttributeEntityDeploymentEnvironment: "myEnvironment", }, }) require.Error(t, exp.pushMetricsData(ctx, md)) require.NoError(t, exp.shutdown(ctx)) - pusherMap, ok := exp.pusherMap[cwlogs.StreamKey{ + streamKey := cwlogs.StreamKey{ LogGroupName: expCfg.LogGroupName, LogStreamName: expCfg.LogStreamName, - }] + Entity: &cloudwatchlogs.Entity{ + KeyAttributes: map[string]*string{ + "Name": aws.String("myService"), + "Environment": aws.String("myEnvironment"), + "Type": aws.String("Service"), + }, + Attributes: map[string]*string{}, + }, + } + pusherMap, ok := exp.pusherMap.Get(streamKey.Hash()) assert.True(t, ok) assert.NotNil(t, pusherMap) } @@ -312,12 +361,17 @@ func TestPushMetricsDataWithErr(t *testing.T) { logPusher.On("ForceFlush", nil).Return("some error").Once() logPusher.On("ForceFlush", nil).Return("").Once() logPusher.On("ForceFlush", nil).Return("some error").Once() - exp.pusherMap = map[cwlogs.StreamKey]cwlogs.Pusher{} - exp.pusherMap[cwlogs.StreamKey{ + exp.pusherMap, err = lru.New[string, cwlogs.Pusher](pusherMapLimit) + assert.Nil(t, err) + streamKey := cwlogs.StreamKey{ LogGroupName: "test-logGroupName", LogStreamName: "test-logStreamName", - }] = logPusher - + Entity: &cloudwatchlogs.Entity{ + Attributes: map[string]*string{}, + KeyAttributes: map[string]*string{}, + }, + } + exp.pusherMap.Add(streamKey.Hash(), logPusher) md := generateTestMetrics(testMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, diff --git a/exporter/awsemfexporter/go.mod b/exporter/awsemfexporter/go.mod index 5b13b3931fbf..02d5377dd383 100644 --- a/exporter/awsemfexporter/go.mod +++ b/exporter/awsemfexporter/go.mod @@ -6,6 +6,7 @@ require ( github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware v0.0.0-20240419190856-2f880467f335 github.com/aws/aws-sdk-go v1.53.11 github.com/google/uuid v1.6.0 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jellydator/ttlcache/v3 v3.2.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil v0.103.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs v0.103.0 diff --git a/exporter/awsemfexporter/go.sum b/exporter/awsemfexporter/go.sum index 245fdca1ee52..354bd4f8e403 100644 --- a/exporter/awsemfexporter/go.sum +++ b/exporter/awsemfexporter/go.sum @@ -47,6 +47,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE= github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= diff --git a/exporter/awsemfexporter/metric_translator.go b/exporter/awsemfexporter/metric_translator.go index 19415b46113b..a81553d1df08 100644 --- a/exporter/awsemfexporter/metric_translator.go +++ b/exporter/awsemfexporter/metric_translator.go @@ -11,12 +11,14 @@ import ( "strings" "time" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/multierr" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs" - aws "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs" + awsmetrics "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics" ) const ( @@ -32,8 +34,46 @@ const ( containerInsightsReceiver = "awscontainerinsight" attributeReceiver = "receiver" fieldPrometheusMetricType = "prom_metric_type" + + // Entity fields + keyAttributeEntityServiceName = "aws.entity.service.name" + serviceName = "Name" + keyAttributeEntityDeploymentEnvironment = "aws.entity.deployment.environment" + deploymentEnvironment = "Environment" + keyAttributeEntityType = "aws.entity.type" + entityType = "Type" + service = "Service" + resource = "Resource" + keyAttributeEntityResourceType = "aws.entity.resource.type" + resourceType = "ResourceType" + keyAttributeEntityIdentifier = "aws.entity.resource.identifier" + identifier = "Identifier" + attributeEntityCluster = "aws.entity.k8s.cluster.name" + cluster = "Cluster" + attributeEntityNamespace = "aws.entity.k8s.namespace.name" + namespace = "Namespace" + attributeEntityWorkload = "aws.entity.k8s.workload.name" + workload = "Workload" + attributeEntityNode = "aws.entity.k8s.node.name" + node = "Node" ) +var keyAttributeEntityToShortNameMap = map[string]string{ + keyAttributeEntityType: entityType, + keyAttributeEntityResourceType: resourceType, + keyAttributeEntityIdentifier: identifier, + keyAttributeEntityServiceName: serviceName, + keyAttributeEntityDeploymentEnvironment: deploymentEnvironment, +} + +var attributeEntityToShortNameMap = map[string]string{ + attributeEntityCluster: cluster, + attributeEntityNamespace: namespace, + attributeEntityWorkload: workload, + attributeEntityNode: node, + // TODO: add attributes for EC2 +} + var errMissingMetricsForEnhancedContainerInsights = errors.New("nil event detected with EnhancedContainerInsights enabled") var fieldPrometheusTypes = map[pmetric.MetricType]string{ @@ -79,6 +119,7 @@ type groupedMetricMetadata struct { timestampMs int64 logGroup string logStream string + entity *cloudwatchlogs.Entity metricDataType pmetric.MetricType retainInitialValueForDelta bool } @@ -103,8 +144,8 @@ func newMetricTranslator(config Config) metricTranslator { return metricTranslator{ metricDescriptor: mt, calculators: &emfCalculators{ - delta: aws.NewFloat64DeltaCalculator(), - summary: aws.NewMetricCalculator(calculateSummaryDelta), + delta: awsmetrics.NewFloat64DeltaCalculator(), + summary: awsmetrics.NewMetricCalculator(calculateSummaryDelta), }, } } @@ -124,7 +165,6 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri logGroup, logStream, patternReplaceSucceeded := getLogInfo(rm, cWNamespace, config) deltaInitialValue := config.RetainInitialValueOfDeltaMetric - ilms := rm.ScopeMetrics() var metricReceiver string if receiver, ok := rm.Resource().Attributes().Get(attributeReceiver); ok { metricReceiver = receiver.Str() @@ -138,6 +178,13 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri metricReceiver = containerInsightsReceiver } } + // the original resourceAttributes map is immutable, so we need to create a mutable copy of the resource metrics + // to remove the entity fields from the attributes + mutableMetrics := pmetric.NewResourceMetrics() + rm.CopyTo(mutableMetrics) + entity := fetchEntityFields(mutableMetrics.Resource().Attributes()) + + ilms := mutableMetrics.ScopeMetrics() for j := 0; j < ilms.Len(); j++ { ilm := ilms.At(j) @@ -154,6 +201,7 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri timestampMs: timestamp, logGroup: logGroup, logStream: logStream, + entity: &entity, metricDataType: metric.Type(), retainInitialValueForDelta: deltaInitialValue, }, @@ -169,6 +217,19 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri return nil } +func fetchEntityFields(resourceAttributes pcommon.Map) cloudwatchlogs.Entity { + keyAttributesMap := map[string]*string{} + attributeMap := map[string]*string{} + + processAttributes(keyAttributeEntityToShortNameMap, keyAttributesMap, resourceAttributes) + processAttributes(attributeEntityToShortNameMap, attributeMap, resourceAttributes) + + return cloudwatchlogs.Entity{ + KeyAttributes: keyAttributesMap, + Attributes: attributeMap, + } +} + // translateGroupedMetricToCWMetric converts Grouped Metric format to CloudWatch Metric format. func translateGroupedMetricToCWMetric(groupedMetric *groupedMetric, config *Config) *cWMetrics { labels := groupedMetric.labels @@ -505,6 +566,7 @@ func translateGroupedMetricToEmf(groupedMetric *groupedMetric, config *Config, d logGroup := groupedMetric.metadata.logGroup logStream := groupedMetric.metadata.logStream + entity := groupedMetric.metadata.entity if logStream == "" { logStream = defaultLogStream @@ -512,6 +574,7 @@ func translateGroupedMetricToEmf(groupedMetric *groupedMetric, config *Config, d event.LogGroupName = logGroup event.LogStreamName = logStream + event.Entity = entity return event, nil } diff --git a/exporter/awsemfexporter/metric_translator_test.go b/exporter/awsemfexporter/metric_translator_test.go index 076bbf48ad72..54848f80780f 100644 --- a/exporter/awsemfexporter/metric_translator_test.go +++ b/exporter/awsemfexporter/metric_translator_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go/aws" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" @@ -22,6 +23,7 @@ import ( "go.uber.org/zap/zaptest/observer" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/cwlogs/sdk/service/cloudwatchlogs" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/occonventions" ) @@ -39,6 +41,7 @@ func createTestResourceMetricsHelper(numMetrics int) pmetric.ResourceMetrics { rm.Resource().Attributes().PutStr("ClusterName", "myCluster") rm.Resource().Attributes().PutStr("PodName", "myPod") rm.Resource().Attributes().PutStr(attributeReceiver, prometheusReceiver) + rm.Resource().Attributes().PutStr(keyAttributeEntityServiceName, "myServiceName") sm := rm.ScopeMetrics().AppendEmpty() m := sm.Metrics().AppendEmpty() @@ -2574,6 +2577,33 @@ func TestTranslateOtToGroupedMetricForInitialDeltaValue(t *testing.T) { } } +func TestFetchEntityFields(t *testing.T) { + resourceMetrics := pmetric.NewResourceMetrics() + resourceMetrics.Resource().Attributes().PutStr(keyAttributeEntityType, "Service") + resourceMetrics.Resource().Attributes().PutStr(keyAttributeEntityDeploymentEnvironment, "my-environment") + resourceMetrics.Resource().Attributes().PutStr(keyAttributeEntityServiceName, "my-service") + resourceMetrics.Resource().Attributes().PutStr(attributeEntityNode, "my-node") + resourceMetrics.Resource().Attributes().PutStr(attributeEntityCluster, "my-cluster") + resourceMetrics.Resource().Attributes().PutStr(attributeEntityNamespace, "my-namespace") + resourceMetrics.Resource().Attributes().PutStr(attributeEntityWorkload, "my-workload") + + expectedEntity := cloudwatchlogs.Entity{KeyAttributes: map[string]*string{ + entityType: aws.String(service), + serviceName: aws.String("my-service"), + deploymentEnvironment: aws.String("my-environment"), + }, + Attributes: map[string]*string{ + node: aws.String("my-node"), + cluster: aws.String("my-cluster"), + namespace: aws.String("my-namespace"), + workload: aws.String("my-workload"), + }, + } + entity := fetchEntityFields(resourceMetrics.Resource().Attributes()) + assert.Equal(t, expectedEntity, entity) + +} + func generateTestMetrics(tm testMetric) pmetric.Metrics { md := pmetric.NewMetrics() now := time.Now() diff --git a/exporter/awsemfexporter/util.go b/exporter/awsemfexporter/util.go index a820fd4d1ed6..61e3fcfbb57d 100644 --- a/exporter/awsemfexporter/util.go +++ b/exporter/awsemfexporter/util.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/aws/aws-sdk-go/aws" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" @@ -172,3 +173,16 @@ func attrMaptoStringMap(attrMap pcommon.Map) map[string]string { }) return strMap } + +// processAttributes fetches the aws.entity fields and creates an entity to be sent at the PutLogEvent call. It also +// removes the entity attributes so that it is not tagged as a dimension, and reduces the size of the PLE payload. +func processAttributes(entityMap map[string]string, targetMap map[string]*string, mutableResourceAttributes pcommon.Map) { + for entityField, shortName := range entityMap { + if val, ok := mutableResourceAttributes.Get(entityField); ok { + if strVal := val.Str(); strVal != "" { + targetMap[shortName] = aws.String(strVal) + } + mutableResourceAttributes.Remove(entityField) + } + } +} diff --git a/exporter/awsemfexporter/util_test.go b/exporter/awsemfexporter/util_test.go index dc5a3dc3735b..2e5164912a36 100644 --- a/exporter/awsemfexporter/util_test.go +++ b/exporter/awsemfexporter/util_test.go @@ -6,6 +6,7 @@ package awsemfexporter import ( "testing" + "github.com/aws/aws-sdk-go/aws" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -366,3 +367,103 @@ func TestGetLogInfo(t *testing.T) { } } + +func TestProcessAttributes(t *testing.T) { + testCases := []struct { + name string + entityMap []map[string]string + resourceAttributes map[string]any + wantedAttributes map[string]*string + leftoverAttributes map[string]any + }{ + { + name: "key_attributes", + entityMap: []map[string]string{keyAttributeEntityToShortNameMap}, + resourceAttributes: map[string]any{ + keyAttributeEntityServiceName: "my-service", + keyAttributeEntityDeploymentEnvironment: "my-environment", + }, + wantedAttributes: map[string]*string{ + serviceName: aws.String("my-service"), + deploymentEnvironment: aws.String("my-environment"), + }, + leftoverAttributes: make(map[string]any), + }, + { + name: "non-key_attributes", + entityMap: []map[string]string{attributeEntityToShortNameMap}, + resourceAttributes: map[string]any{ + attributeEntityCluster: "my-cluster", + attributeEntityNamespace: "my-namespace", + attributeEntityNode: "my-node", + attributeEntityWorkload: "my-workload", + }, + wantedAttributes: map[string]*string{ + cluster: aws.String("my-cluster"), + namespace: aws.String("my-namespace"), + node: aws.String("my-node"), + workload: aws.String("my-workload"), + }, + leftoverAttributes: make(map[string]any), + }, + { + name: "key_and_non_key_attributes", + entityMap: []map[string]string{keyAttributeEntityToShortNameMap, attributeEntityToShortNameMap}, + resourceAttributes: map[string]any{ + keyAttributeEntityServiceName: "my-service", + keyAttributeEntityDeploymentEnvironment: "my-environment", + attributeEntityCluster: "my-cluster", + attributeEntityNamespace: "my-namespace", + attributeEntityNode: "my-node", + attributeEntityWorkload: "my-workload", + }, + wantedAttributes: map[string]*string{ + serviceName: aws.String("my-service"), + deploymentEnvironment: aws.String("my-environment"), + cluster: aws.String("my-cluster"), + namespace: aws.String("my-namespace"), + node: aws.String("my-node"), + workload: aws.String("my-workload"), + }, + leftoverAttributes: make(map[string]any), + }, + { + name: "key_and_non_key_attributes_plus_extras", + entityMap: []map[string]string{keyAttributeEntityToShortNameMap, attributeEntityToShortNameMap}, + resourceAttributes: map[string]any{ + "extra_attribute": "extra_value", + keyAttributeEntityServiceName: "my-service", + keyAttributeEntityDeploymentEnvironment: "my-environment", + attributeEntityCluster: "my-cluster", + attributeEntityNamespace: "my-namespace", + attributeEntityNode: "my-node", + attributeEntityWorkload: "my-workload", + }, + wantedAttributes: map[string]*string{ + serviceName: aws.String("my-service"), + deploymentEnvironment: aws.String("my-environment"), + cluster: aws.String("my-cluster"), + namespace: aws.String("my-namespace"), + node: aws.String("my-node"), + workload: aws.String("my-workload"), + }, + leftoverAttributes: map[string]any{ + "extra_attribute": "extra_value", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + attrs := pcommon.NewMap() + err := attrs.FromRaw(tc.resourceAttributes) + assert.Nil(t, err) + targetMap := make(map[string]*string) + for _, entityMap := range tc.entityMap { + processAttributes(entityMap, targetMap, attrs) + } + assert.Equal(t, tc.leftoverAttributes, attrs.AsRaw()) + assert.Equal(t, tc.wantedAttributes, targetMap) + }) + } +} diff --git a/internal/aws/cwlogs/cwlog_client_test.go b/internal/aws/cwlogs/cwlog_client_test.go index 5e9fee1a42e1..9c84d98c50fe 100644 --- a/internal/aws/cwlogs/cwlog_client_test.go +++ b/internal/aws/cwlogs/cwlog_client_test.go @@ -82,6 +82,18 @@ var previousSequenceToken = "0000" var expectedNextSequenceToken = "1111" var logGroup = "logGroup" var logStreamName = "logStream" + +var entity = cloudwatchlogs.Entity{ + KeyAttributes: map[string]*string{ + "Type": aws.String("Service"), + "Name": aws.String("myService"), + "Environment": aws.String("myEnvironment"), + }, + Attributes: map[string]*string{ + "Instance": aws.String("i-1234567"), + "Type": aws.String("AWS::EC2"), + }, +} var emptySequenceToken = "" func TestPutLogEvents_HappyCase(t *testing.T) { @@ -91,6 +103,7 @@ func TestPutLogEvents_HappyCase(t *testing.T) { LogGroupName: &logGroup, LogStreamName: &logStreamName, SequenceToken: &previousSequenceToken, + Entity: &entity, } putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: &expectedNextSequenceToken} @@ -111,6 +124,7 @@ func TestPutLogEvents_HappyCase_SomeRejectedInfo(t *testing.T) { LogGroupName: &logGroup, LogStreamName: &logStreamName, SequenceToken: &previousSequenceToken, + Entity: &entity, } rejectedLogEventsInfo := &cloudwatchlogs.RejectedLogEventsInfo{ ExpiredLogEventEndIndex: aws.Int64(1), @@ -137,6 +151,7 @@ func TestPutLogEvents_NonAWSError(t *testing.T) { LogGroupName: &logGroup, LogStreamName: &logStreamName, SequenceToken: &previousSequenceToken, + Entity: &entity, } putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: &expectedNextSequenceToken} @@ -157,6 +172,7 @@ func TestPutLogEvents_InvalidParameterException(t *testing.T) { LogGroupName: &logGroup, LogStreamName: &logStreamName, SequenceToken: &previousSequenceToken, + Entity: &entity, } putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: &expectedNextSequenceToken} @@ -178,6 +194,7 @@ func TestPutLogEvents_OperationAbortedException(t *testing.T) { LogGroupName: &logGroup, LogStreamName: &logStreamName, SequenceToken: &previousSequenceToken, + Entity: &entity, } putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: &expectedNextSequenceToken} @@ -199,6 +216,7 @@ func TestPutLogEvents_ServiceUnavailableException(t *testing.T) { LogGroupName: &logGroup, LogStreamName: &logStreamName, SequenceToken: &previousSequenceToken, + Entity: &entity, } putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: &expectedNextSequenceToken} @@ -220,6 +238,7 @@ func TestPutLogEvents_UnknownException(t *testing.T) { LogGroupName: &logGroup, LogStreamName: &logStreamName, SequenceToken: &previousSequenceToken, + Entity: &entity, } putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: &expectedNextSequenceToken} @@ -241,6 +260,7 @@ func TestPutLogEvents_ThrottlingException(t *testing.T) { LogGroupName: &logGroup, LogStreamName: &logStreamName, SequenceToken: &previousSequenceToken, + Entity: &entity, } putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ NextSequenceToken: &expectedNextSequenceToken} @@ -262,6 +282,7 @@ func TestPutLogEvents_ResourceNotFoundException(t *testing.T) { LogGroupName: &logGroup, LogStreamName: &logStreamName, SequenceToken: &emptySequenceToken, + Entity: &entity, } putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ @@ -289,6 +310,7 @@ func TestLogRetention_NeverExpire(t *testing.T) { LogGroupName: &logGroup, LogStreamName: &logStreamName, SequenceToken: &emptySequenceToken, + Entity: &entity, } putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ @@ -324,6 +346,7 @@ func TestLogRetention_RetentionDaysInputted(t *testing.T) { LogGroupName: &logGroup, LogStreamName: &logStreamName, SequenceToken: &emptySequenceToken, + Entity: &entity, } putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ @@ -360,6 +383,7 @@ func TestSetTags_NotCalled(t *testing.T) { LogGroupName: &logGroup, LogStreamName: &logStreamName, SequenceToken: &emptySequenceToken, + Entity: &entity, } putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ @@ -395,6 +419,7 @@ func TestSetTags_Called(t *testing.T) { LogGroupName: &logGroup, LogStreamName: &logStreamName, SequenceToken: &emptySequenceToken, + Entity: &entity, } putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ @@ -431,6 +456,7 @@ func TestPutLogEvents_AllRetriesFail(t *testing.T) { LogGroupName: &logGroup, LogStreamName: &logStreamName, SequenceToken: &emptySequenceToken, + Entity: &entity, } putLogEventsOutput := &cloudwatchlogs.PutLogEventsOutput{ diff --git a/internal/aws/cwlogs/pusher.go b/internal/aws/cwlogs/pusher.go index 929adcee00b6..5009fb3b673d 100644 --- a/internal/aws/cwlogs/pusher.go +++ b/internal/aws/cwlogs/pusher.go @@ -5,6 +5,7 @@ package cwlogs // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "errors" + "fmt" "sort" "sync" "time" @@ -54,10 +55,33 @@ func NewEvent(timestampMs int64, message string) *Event { return event } -// Uniquely identify a cloudwatch logs stream +// Uniquely identify a cloudwatch logs stream. Any changes to this struct will require updates to Hash type StreamKey struct { LogGroupName string LogStreamName string + Entity *cloudwatchlogs.Entity +} + +// Custom hash function for StreamKey. Necessary to uniquely identify with Entity. +func (sk *StreamKey) Hash() string { + var attributes, keyAttributes string + if sk.Entity != nil { + if sk.Entity.Attributes != nil { + attributes = mapToString(sk.Entity.Attributes) + } + if sk.Entity.KeyAttributes != nil { + keyAttributes = mapToString(sk.Entity.KeyAttributes) + } + } + + data := fmt.Sprintf( + "%s|%s|%s|%s", + sk.LogGroupName, + sk.LogStreamName, + attributes, + keyAttributes, + ) + return data } func (logEvent *Event) Validate(logger *zap.Logger) error { @@ -112,11 +136,14 @@ type eventBatch struct { // Create a new log event batch if needed. func newEventBatch(key StreamKey) *eventBatch { + return &eventBatch{ putLogEventsInput: &cloudwatchlogs.PutLogEventsInput{ LogGroupName: aws.String(key.LogGroupName), LogStreamName: aws.String(key.LogStreamName), - LogEvents: make([]*cloudwatchlogs.InputLogEvent, 0, maxRequestEventCount)}, + LogEvents: make([]*cloudwatchlogs.InputLogEvent, 0, maxRequestEventCount), + Entity: key.Entity, + }, } } @@ -186,6 +213,8 @@ type logPusher struct { logGroupName *string // log stream name of the current logPusher logStreamName *string + // entity name of the current logPusher + entity *cloudwatchlogs.Entity logEventBatch *eventBatch @@ -213,6 +242,7 @@ func newLogPusher(streamKey StreamKey, pusher := &logPusher{ logGroupName: aws.String(streamKey.LogGroupName), logStreamName: aws.String(streamKey.LogStreamName), + entity: streamKey.Entity, svcStructuredLog: svcStructuredLog, logger: logger, } @@ -271,7 +301,8 @@ func (p *logPusher) pushEventBatch(req any) error { p.logger.Debug("logpusher: publish log events successfully.", zap.Int("NumOfLogEvents", len(putLogEventsInput.LogEvents)), zap.Float64("LogEventsSize", float64(logEventBatch.byteTotal)/float64(1024)), - zap.Int64("Time", time.Since(startTime).Nanoseconds()/int64(time.Millisecond))) + zap.Int64("Time", time.Since(startTime).Nanoseconds()/int64(time.Millisecond)), + ) return nil } @@ -288,6 +319,7 @@ func (p *logPusher) addLogEvent(logEvent *Event) *eventBatch { currentBatch = newEventBatch(StreamKey{ LogGroupName: *p.logGroupName, LogStreamName: *p.logStreamName, + Entity: p.entity, }) } currentBatch.append(logEvent) @@ -304,6 +336,7 @@ func (p *logPusher) renewEventBatch() *eventBatch { p.logEventBatch = newEventBatch(StreamKey{ LogGroupName: *p.logGroupName, LogStreamName: *p.logStreamName, + Entity: p.entity, }) } @@ -314,7 +347,7 @@ func (p *logPusher) renewEventBatch() *eventBatch { type multiStreamPusher struct { logStreamManager LogStreamManager client Client - pusherMap map[StreamKey]Pusher + pusherMap map[string]Pusher logger *zap.Logger } @@ -323,7 +356,7 @@ func newMultiStreamPusher(logStreamManager LogStreamManager, client Client, logg logStreamManager: logStreamManager, client: client, logger: logger, - pusherMap: make(map[StreamKey]Pusher), + pusherMap: make(map[string]Pusher), } } @@ -335,9 +368,9 @@ func (m *multiStreamPusher) AddLogEntry(event *Event) error { var pusher Pusher var ok bool - if pusher, ok = m.pusherMap[event.StreamKey]; !ok { + if pusher, ok = m.pusherMap[event.StreamKey.Hash()]; !ok { pusher = NewPusher(event.StreamKey, 1, m.client, m.logger) - m.pusherMap[event.StreamKey] = pusher + m.pusherMap[event.StreamKey.Hash()] = pusher } return pusher.AddLogEntry(event) @@ -395,25 +428,26 @@ type LogStreamManager interface { type logStreamManager struct { logStreamMutex sync.Mutex - streams map[StreamKey]bool + streams map[string]bool client Client } func NewLogStreamManager(svcStructuredLog Client) LogStreamManager { return &logStreamManager{ client: svcStructuredLog, - streams: make(map[StreamKey]bool), + streams: make(map[string]bool), } } func (lsm *logStreamManager) InitStream(streamKey StreamKey) error { - if _, ok := lsm.streams[streamKey]; !ok { + hash := streamKey.Hash() + if _, ok := lsm.streams[hash]; !ok { lsm.logStreamMutex.Lock() defer lsm.logStreamMutex.Unlock() - if _, ok := lsm.streams[streamKey]; !ok { + if _, ok := lsm.streams[hash]; !ok { err := lsm.client.CreateStream(&streamKey.LogGroupName, &streamKey.LogStreamName) - lsm.streams[streamKey] = true + lsm.streams[hash] = true return err } } diff --git a/internal/aws/cwlogs/pusher_test.go b/internal/aws/cwlogs/pusher_test.go index 2f1b7b97924e..d11c68372d44 100644 --- a/internal/aws/cwlogs/pusher_test.go +++ b/internal/aws/cwlogs/pusher_test.go @@ -6,6 +6,7 @@ package cwlogs import ( "fmt" "math/rand" + "reflect" "strings" "testing" "time" @@ -114,6 +115,7 @@ func newMockPusher() *logPusher { return newLogPusher(StreamKey{ LogGroupName: logGroup, LogStreamName: logStreamName, + Entity: &entity, }, *svc, zap.NewNop()) } @@ -130,6 +132,7 @@ func TestPusher_newLogEventBatch(t *testing.T) { logEventBatch := newEventBatch(StreamKey{ LogGroupName: logGroup, LogStreamName: logStreamName, + Entity: &entity, }) assert.Equal(t, int64(0), logEventBatch.maxTimestampMs) assert.Equal(t, int64(0), logEventBatch.minTimestampMs) @@ -278,3 +281,32 @@ func TestMultiStreamPusher(t *testing.T) { assert.Equal(t, "foo", *inputs[1].LogGroupName) assert.Equal(t, "bar2", *inputs[1].LogStreamName) } + +func TestStreamKeyFieldCount(t *testing.T) { + expectedFields := map[string]string{ + "LogGroupName": "string", + "LogStreamName": "string", + "Entity": "*cloudwatchlogs.Entity", + } + testStructFields(t, reflect.TypeOf(StreamKey{}), expectedFields, "StreamKey") +} + +func TestEntityFieldCount(t *testing.T) { + expectedFields := map[string]string{ + "_": "struct {}", + "KeyAttributes": "map[string]*string", + "Attributes": "map[string]*string", + } + testStructFields(t, reflect.TypeOf(cloudwatchlogs.Entity{}), expectedFields, "Entity") +} + +func testStructFields(t *testing.T, structType reflect.Type, expectedFields map[string]string, structName string) { + assert.Equal(t, len(expectedFields), structType.NumField(), "%s should have exactly %d fields", structName, len(expectedFields)) + + for i := 0; i < structType.NumField(); i++ { + field := structType.Field(i) + expectedType, exists := expectedFields[field.Name] + assert.True(t, exists, "Unexpected field in %s: %s", structName, field.Name) + assert.Equal(t, expectedType, field.Type.String(), "Incorrect type for field %s in %s", field.Name, structName) + } +} diff --git a/internal/aws/cwlogs/utils.go b/internal/aws/cwlogs/utils.go index 1f1adc53960b..6aea976a6bd9 100644 --- a/internal/aws/cwlogs/utils.go +++ b/internal/aws/cwlogs/utils.go @@ -7,6 +7,8 @@ import ( "errors" "fmt" "regexp" + "sort" + "strings" ) // Added function to check if value is an accepted number of log retention days @@ -68,3 +70,20 @@ func ValidateTagsInput(input map[string]*string) error { return nil } + +// Helper function to convert a map to a consistent string representation +func mapToString(m map[string]*string) string { + if m == nil { + return "" + } + pairs := make([]string, 0, len(m)) + for k, v := range m { + if v == nil { + pairs = append(pairs, k+":") + } else { + pairs = append(pairs, k+":"+*v) + } + } + sort.Strings(pairs) // Ensure a consistent order + return strings.Join(pairs, ";") +}