Skip to content

Commit

Permalink
fixes logic to remove entity attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
dchappa committed Sep 18, 2024
1 parent 2eb0318 commit e81b01d
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 23 deletions.
21 changes: 15 additions & 6 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ func newEmfExporter(config *Config, set exporter.Settings) (*emfExporter, error)
}

func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) error {
rms := md.ResourceMetrics()
// the original resourceAttributes map is immutable, so we need to create a mutable copy of the resource metrics
// to remove the entity fields from the attributes
mutableMetrics := pmetric.NewMetrics()
md.CopyTo(mutableMetrics)
rms := mutableMetrics.ResourceMetrics()
labels := map[string]string{}
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
Expand All @@ -127,6 +131,7 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e
if err != nil {
return err
}
rms.At(i).Resource().Attributes().RemoveIf(filterEntityAttributes())
}

for _, groupedMetric := range groupedMetrics {
Expand Down Expand Up @@ -176,6 +181,14 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e
return nil
}

func filterEntityAttributes() func(s string, _ pcommon.Value) bool {
return func(s string, _ pcommon.Value) bool {
_, containsKeyAttribute := keyAttributeEntityToShortNameMap[s]
_, containsAttribute := keyAttributeEntityToShortNameMap[s]
return containsKeyAttribute || containsAttribute
}
}

func (emf *emfExporter) getPusher(key cwlogs.StreamKey) cwlogs.Pusher {
var ok bool
hash := key.Hash()
Expand All @@ -192,11 +205,7 @@ func (emf *emfExporter) listPushers() []cwlogs.Pusher {
emf.pusherMapLock.Lock()
defer emf.pusherMapLock.Unlock()

var pushers []cwlogs.Pusher
for _, pusher := range emf.boundedPusherMap.ListAllPushers() {
pushers = append(pushers, pusher)
}
return pushers
return emf.boundedPusherMap.ListAllPushers()
}

func (emf *emfExporter) start(_ context.Context, host component.Host) error {
Expand Down
22 changes: 22 additions & 0 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,28 @@ func TestConsumeMetrics(t *testing.T) {
require.NoError(t, exp.shutdown(ctx))
}

func TestFilterEntities(t *testing.T) {
md := generateTestMetrics(testMetric{
metricNames: []string{"metric_1", "metric_2"},
metricValues: [][]float64{{100}, {4}},
resourceAttributeMap: map[string]any{
"aws.ecs.cluster.name": "test-cluster-name",
"aws.ecs.task.id": "test-task-id",
keyAttributeEntityType: service,
keyAttributeEntityServiceName: "myService",
keyAttributeEntityDeploymentEnvironment: "myEnvironment",
},
})

rms := md.ResourceMetrics()

for i := 0; i < rms.Len(); i++ {
assert.Equal(t, 5, rms.At(i).Resource().Attributes().Len())
rms.At(i).Resource().Attributes().RemoveIf(filterEntityAttributes())
assert.Equal(t, 2, rms.At(i).Resource().Attributes().Len())
}
}

func TestConsumeMetricsWithNaNValues(t *testing.T) {
tests := []struct {
testName string
Expand Down
7 changes: 1 addition & 6 deletions exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,7 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri
metricReceiver = containerInsightsReceiver
}
}
// the original resourceAttributes map is immutable, so we need to create a mutable copy of the resource metrics
// to remove the entity fields from the attributes
newResourceMetrics := pmetric.NewResourceMetrics()
rm.CopyTo(newResourceMetrics)
entity := fetchEntityFields(newResourceMetrics.Resource().Attributes())
rm = newResourceMetrics
entity := fetchEntityFields(rm.Resource().Attributes())

for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
Expand Down
2 changes: 0 additions & 2 deletions exporter/awsemfexporter/metric_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2599,10 +2599,8 @@ func TestFetchEntityFields(t *testing.T) {
workload: aws.String("my-workload"),
},
}
assert.Equal(t, 7, resourceMetrics.Resource().Attributes().Len())
entity := fetchEntityFields(resourceMetrics.Resource().Attributes())
assert.Equal(t, expectedEntity, entity)
assert.Equal(t, 0, resourceMetrics.Resource().Attributes().Len())

}

Expand Down
2 changes: 1 addition & 1 deletion exporter/awsemfexporter/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func processAttributes(entityMap map[string]string, targetMap map[string]*string
if strVal := val.Str(); strVal != "" {
targetMap[shortName] = aws.String(strVal)
}
mutableResourceAttributes.Remove(entityField)
//mutableResourceAttributes.Remove(entityField)
}
}
}
8 changes: 0 additions & 8 deletions exporter/awsemfexporter/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ func TestProcessAttributes(t *testing.T) {
entityMap []map[string]string
resourceAttributes map[string]any
wantedAttributes map[string]*string
leftoverAttributes map[string]any
}{
{
name: "key_attributes",
Expand All @@ -387,7 +386,6 @@ func TestProcessAttributes(t *testing.T) {
serviceName: aws.String("my-service"),
deploymentEnvironment: aws.String("my-environment"),
},
leftoverAttributes: make(map[string]any),
},
{
name: "non-key_attributes",
Expand All @@ -404,7 +402,6 @@ func TestProcessAttributes(t *testing.T) {
node: aws.String("my-node"),
workload: aws.String("my-workload"),
},
leftoverAttributes: make(map[string]any),
},
{
name: "key_and_non_key_attributes",
Expand All @@ -425,7 +422,6 @@ func TestProcessAttributes(t *testing.T) {
node: aws.String("my-node"),
workload: aws.String("my-workload"),
},
leftoverAttributes: make(map[string]any),
},
{
name: "key_and_non_key_attributes_plus_extras",
Expand All @@ -447,9 +443,6 @@ func TestProcessAttributes(t *testing.T) {
node: aws.String("my-node"),
workload: aws.String("my-workload"),
},
leftoverAttributes: map[string]any{
"extra_attribute": "extra_value",
},
},
}

Expand All @@ -462,7 +455,6 @@ func TestProcessAttributes(t *testing.T) {
for _, entityMap := range tc.entityMap {
processAttributes(entityMap, targetMap, attrs)
}
assert.Equal(t, tc.leftoverAttributes, attrs.AsRaw())
assert.Equal(t, tc.wantedAttributes, targetMap)
})
}
Expand Down

0 comments on commit e81b01d

Please sign in to comment.