diff --git a/processor/spanmetricsprocessor/README.md b/processor/spanmetricsprocessor/README.md index c1a0e6a3d593..9df7e2aac4a2 100644 --- a/processor/spanmetricsprocessor/README.md +++ b/processor/spanmetricsprocessor/README.md @@ -31,11 +31,13 @@ promexample_latency_bucket{http_method="GET",http_status_code="200",label1="valu ``` Each metric will have _at least_ the following dimensions because they are common across all spans: -- Service name - Operation - Span kind - Status code +Each metric will have _at least_ the following resource attributes because they are common across all spans: +- Service name + This processor lets traces to continue through the pipeline unmodified. The following settings are required: @@ -48,6 +50,8 @@ The following settings can be optionally configured: - Default: `[2ms, 4ms, 6ms, 8ms, 10ms, 50ms, 100ms, 200ms, 400ms, 800ms, 1s, 1400ms, 2s, 5s, 10s, 15s]` - `dimensions`: the list of dimensions to add together with the default dimensions defined above. Each additional dimension is defined with a `name` which is looked up in the span's collection of attributes. If the `name`d attribute is missing in the span, the optional provided `default` is used. If no `default` is provided, this dimension will be **omitted** from the metric. +- `resource_attributes`: the list of resource attributes to add together with the default resource attributes defined above. Each additional resource attributes is defined with a `name` which is looked up in the span's collection of resource attributes. If the `name`d resource attribute is missing in the span, the optional provided `default` is used. If no `default` is provided, this resource attribute will be **omitted** from the metric. + ## Examples The following is a simple example usage of the spanmetrics processor. diff --git a/processor/spanmetricsprocessor/config.go b/processor/spanmetricsprocessor/config.go index 501532cc5bdc..5358fc13e2ee 100644 --- a/processor/spanmetricsprocessor/config.go +++ b/processor/spanmetricsprocessor/config.go @@ -38,11 +38,17 @@ type Config struct { LatencyHistogramBuckets []time.Duration `mapstructure:"latency_histogram_buckets"` // Dimensions defines the list of additional dimensions on top of the provided: - // - service.name // - operation // - span.kind // - status.code // The dimensions will be fetched from the span's attributes. Examples of some conventionally used attributes: // https://github.com/open-telemetry/opentelemetry-collector/blob/main/translator/conventions/opentelemetry.go. Dimensions []Dimension `mapstructure:"dimensions"` + + // ResourceAttributes defines the list of additional resource attributes to attach to metrics on top of the provided: + // - service.name + // These will be fetched from the span's resource attributes. For more details, see: + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/sdk.md + // and https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/README.md. + ResourceAttributes []Dimension `mapstructure:"resource_attributes"` } diff --git a/processor/spanmetricsprocessor/processor.go b/processor/spanmetricsprocessor/processor.go index 067645bd44f1..89d46c6260fa 100644 --- a/processor/spanmetricsprocessor/processor.go +++ b/processor/spanmetricsprocessor/processor.go @@ -34,11 +34,12 @@ import ( ) const ( - serviceNameKey = conventions.AttributeServiceName - operationKey = "operation" // is there a constant we can refer to? - spanKindKey = tracetranslator.TagSpanKind - statusCodeKey = tracetranslator.TagStatusCode - metricKeySeparator = string(byte(0)) + serviceNameKey = conventions.AttributeServiceName + instrumentationLibraryName = "spanmetricsprocessor" + operationKey = "operation" // is there a constant we can refer to? + spanKindKey = tracetranslator.TagSpanKind + statusCodeKey = tracetranslator.TagStatusCode + metricKeySeparator = string(byte(0)) ) var ( @@ -50,10 +51,11 @@ var ( } ) -// dimKV represents the dimension key-value pairs for a metric. -type dimKV map[string]string +// kvPairs represents the dimension/resource attribute key-value pairs for a metric. +type kvPairs map[string]string type metricKey string +type resourceKey string type processorImp struct { lock sync.RWMutex @@ -66,21 +68,29 @@ type processorImp struct { // Additional dimensions to add to metrics. dimensions []Dimension + // Additional resourceAttributes to add to metrics. + resourceAttributes []Dimension + // The starting time of the data points. startTime time.Time // Call & Error counts. - callSum map[metricKey]int64 + callSum map[resourceKey]map[metricKey]int64 // Latency histogram. - latencyCount map[metricKey]uint64 - latencySum map[metricKey]float64 - latencyBucketCounts map[metricKey][]uint64 + latencyCount map[resourceKey]map[metricKey]uint64 + latencySum map[resourceKey]map[metricKey]float64 + latencyBucketCounts map[resourceKey]map[metricKey][]uint64 latencyBounds []float64 - // A cache of dimension key-value maps keyed by a unique identifier formed by a concatenation of its values: + // List of seen resource attributes. + // Map structure for faster lookup. + resourceAttrList map[resourceKey]bool + + // A cache of dimension and resource attribute key-value maps keyed by a unique identifier formed by a concatenation of its values: // e.g. { "foo/barOK": { "serviceName": "foo", "operation": "/bar", "status_code": "OK" }} - metricKeyToDimensions map[metricKey]dimKV + metricKeyToDimensions map[metricKey]kvPairs + resourceKeyToDimensions map[resourceKey]kvPairs } func newProcessor(logger *zap.Logger, config config.Processor, nextConsumer consumer.Traces) (*processorImp, error) { @@ -99,22 +109,28 @@ func newProcessor(logger *zap.Logger, config config.Processor, nextConsumer cons } } - if err := validateDimensions(pConfig.Dimensions); err != nil { + if err := validateDimensions(pConfig.Dimensions, []string{spanKindKey, statusCodeKey}); err != nil { + return nil, err + } + if err := validateDimensions(pConfig.ResourceAttributes, []string{serviceNameKey}); err != nil { return nil, err } return &processorImp{ - logger: logger, - config: *pConfig, - startTime: time.Now(), - callSum: make(map[metricKey]int64), - latencyBounds: bounds, - latencySum: make(map[metricKey]float64), - latencyCount: make(map[metricKey]uint64), - latencyBucketCounts: make(map[metricKey][]uint64), - nextConsumer: nextConsumer, - dimensions: pConfig.Dimensions, - metricKeyToDimensions: make(map[metricKey]dimKV), + logger: logger, + config: *pConfig, + startTime: time.Now(), + callSum: make(map[resourceKey]map[metricKey]int64), + latencyBounds: bounds, + latencySum: make(map[resourceKey]map[metricKey]float64), + latencyCount: make(map[resourceKey]map[metricKey]uint64), + latencyBucketCounts: make(map[resourceKey]map[metricKey][]uint64), + nextConsumer: nextConsumer, + dimensions: pConfig.Dimensions, + resourceAttributes: pConfig.ResourceAttributes, + resourceAttrList: make(map[resourceKey]bool), + metricKeyToDimensions: make(map[metricKey]kvPairs), + resourceKeyToDimensions: make(map[resourceKey]kvPairs), }, nil } @@ -128,9 +144,9 @@ func mapDurationsToMillis(vs []time.Duration, f func(duration time.Duration) flo // validateDimensions checks duplicates for reserved dimensions and additional dimensions. Considering // the usage of Prometheus related exporters, we also validate the dimensions after sanitization. -func validateDimensions(dimensions []Dimension) error { +func validateDimensions(dimensions []Dimension, defaults []string) error { labelNames := make(map[string]struct{}) - for _, key := range []string{serviceNameKey, spanKindKey, statusCodeKey} { + for _, key := range defaults { labelNames[key] = struct{}{} labelNames[sanitize(key)] = struct{}{} } @@ -221,21 +237,43 @@ func (p *processorImp) ConsumeTraces(ctx context.Context, traces pdata.Traces) e // writes the raw metrics data into the metrics object. func (p *processorImp) buildMetrics() *pdata.Metrics { m := pdata.NewMetrics() - ilm := m.ResourceMetrics().AppendEmpty().InstrumentationLibraryMetrics().AppendEmpty() - ilm.InstrumentationLibrary().SetName("spanmetricsprocessor") - p.lock.RLock() - p.collectCallMetrics(ilm) - p.collectLatencyMetrics(ilm) - p.lock.RUnlock() + rms := m.ResourceMetrics() + for key := range p.resourceAttrList { + p.lock.RLock() + resourceAttrKey := resourceKey(key) + resourceAttributesMap := p.resourceKeyToDimensions[resourceAttrKey] + + // if the service name doesn't exist, we treat it as invalid and do not generate a trace + if _, ok := resourceAttributesMap[serviceNameKey]; !ok { + p.lock.RUnlock() + continue + } + + rm := rms.AppendEmpty() + + // append resource attributes + for attrName, attrValue := range resourceAttributesMap { + value := pdata.NewAttributeValueString(attrValue) + rm.Resource().Attributes().Insert(attrName, value) + } + + ilm := rm.InstrumentationLibraryMetrics().AppendEmpty() + ilm.InstrumentationLibrary().SetName(instrumentationLibraryName) + + // build metrics per resource + p.collectCallMetrics(ilm, resourceAttrKey) + p.collectLatencyMetrics(ilm, resourceAttrKey) + p.lock.RUnlock() + } return &m } // collectLatencyMetrics collects the raw latency metrics, writing the data // into the given instrumentation library metrics. -func (p *processorImp) collectLatencyMetrics(ilm pdata.InstrumentationLibraryMetrics) { - for key := range p.latencyCount { +func (p *processorImp) collectLatencyMetrics(ilm pdata.InstrumentationLibraryMetrics, resAttrKey resourceKey) { + for mKey := range p.latencyCount[resAttrKey] { mLatency := ilm.Metrics().AppendEmpty() mLatency.SetDataType(pdata.MetricDataTypeIntHistogram) mLatency.SetName("latency") @@ -245,18 +283,18 @@ func (p *processorImp) collectLatencyMetrics(ilm pdata.InstrumentationLibraryMet dpLatency.SetStartTimestamp(pdata.TimestampFromTime(p.startTime)) dpLatency.SetTimestamp(pdata.TimestampFromTime(time.Now())) dpLatency.SetExplicitBounds(p.latencyBounds) - dpLatency.SetBucketCounts(p.latencyBucketCounts[key]) - dpLatency.SetCount(p.latencyCount[key]) - dpLatency.SetSum(int64(p.latencySum[key])) + dpLatency.SetBucketCounts(p.latencyBucketCounts[resAttrKey][mKey]) + dpLatency.SetCount(p.latencyCount[resAttrKey][mKey]) + dpLatency.SetSum(int64(p.latencySum[resAttrKey][mKey])) - dpLatency.LabelsMap().InitFromMap(p.metricKeyToDimensions[key]) + dpLatency.LabelsMap().InitFromMap(p.metricKeyToDimensions[mKey]) } } // collectCallMetrics collects the raw call count metrics, writing the data // into the given instrumentation library metrics. -func (p *processorImp) collectCallMetrics(ilm pdata.InstrumentationLibraryMetrics) { - for key := range p.callSum { +func (p *processorImp) collectCallMetrics(ilm pdata.InstrumentationLibraryMetrics, resAttrKey resourceKey) { + for metricKey := range p.callSum[resAttrKey] { mCalls := ilm.Metrics().AppendEmpty() mCalls.SetDataType(pdata.MetricDataTypeIntSum) mCalls.SetName("calls_total") @@ -266,16 +304,16 @@ func (p *processorImp) collectCallMetrics(ilm pdata.InstrumentationLibraryMetric dpCalls := mCalls.IntSum().DataPoints().AppendEmpty() dpCalls.SetStartTimestamp(pdata.TimestampFromTime(p.startTime)) dpCalls.SetTimestamp(pdata.TimestampFromTime(time.Now())) - dpCalls.SetValue(p.callSum[key]) + dpCalls.SetValue(p.callSum[resAttrKey][metricKey]) - dpCalls.LabelsMap().InitFromMap(p.metricKeyToDimensions[key]) + dpCalls.LabelsMap().InitFromMap(p.metricKeyToDimensions[metricKey]) } } // aggregateMetrics aggregates the raw metrics from the input trace data. // Each metric is identified by a key that is built from the service name // and span metadata such as operation, kind, status_code and any additional -// dimensions the user has configured. +// dimensions and resource attributes the user has configured. func (p *processorImp) aggregateMetrics(traces pdata.Traces) { for i := 0; i < traces.ResourceSpans().Len(); i++ { rspans := traces.ResourceSpans().At(i) @@ -297,44 +335,68 @@ func (p *processorImp) aggregateMetricsForServiceSpans(rspans pdata.ResourceSpan spans := ils.Spans() for k := 0; k < spans.Len(); k++ { span := spans.At(k) - p.aggregateMetricsForSpan(serviceName, span) + p.aggregateMetricsForSpan(serviceName, span, rspans.Resource().Attributes()) } } } -func (p *processorImp) aggregateMetricsForSpan(serviceName string, span pdata.Span) { +func (p *processorImp) aggregateMetricsForSpan(serviceName string, span pdata.Span, resourceAttr pdata.AttributeMap) { latencyInMilliseconds := float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds()) // Binary search to find the latencyInMilliseconds bucket index. index := sort.SearchFloat64s(p.latencyBounds, latencyInMilliseconds) - key := buildKey(serviceName, span, p.dimensions) + mKey := p.buildMetricKey(span) + resourceAttrKey := p.buildResourceAttrKey(serviceName, resourceAttr) p.lock.Lock() - p.cache(serviceName, span, key) - p.updateCallMetrics(key) - p.updateLatencyMetrics(key, latencyInMilliseconds, index) + p.resourceAttrList[resourceAttrKey] = true + p.cacheMetricKey(span, mKey) + p.cacheResourceAttrKey(serviceName, resourceAttr, resourceAttrKey) + p.updateCallMetrics(resourceAttrKey, mKey) + p.updateLatencyMetrics(resourceAttrKey, mKey, latencyInMilliseconds, index) p.lock.Unlock() } // updateCallMetrics increments the call count for the given metric key. -func (p *processorImp) updateCallMetrics(key metricKey) { - p.callSum[key]++ +func (p *processorImp) updateCallMetrics(resourceAttrKey resourceKey, mkey metricKey) { + if _, ok := p.callSum[resourceAttrKey]; ok { + p.callSum[resourceAttrKey][mkey]++ + } else { + p.callSum[resourceAttrKey] = map[metricKey]int64{mkey: 1} + } } // updateLatencyMetrics increments the histogram counts for the given metric key and bucket index. -func (p *processorImp) updateLatencyMetrics(key metricKey, latency float64, index int) { - if _, ok := p.latencyBucketCounts[key]; !ok { - p.latencyBucketCounts[key] = make([]uint64, len(p.latencyBounds)) +func (p *processorImp) updateLatencyMetrics(resourceAttrKey resourceKey, mKey metricKey, latency float64, index int) { + _, ok := p.latencyBucketCounts[resourceAttrKey] + + if !ok { + p.latencyBucketCounts[resourceAttrKey] = make(map[metricKey][]uint64) + p.latencyBucketCounts[resourceAttrKey][mKey] = make([]uint64, len(p.latencyBounds)) + } + + if _, ok = p.latencyBucketCounts[resourceAttrKey][mKey]; !ok { + p.latencyBucketCounts[resourceAttrKey][mKey] = make([]uint64, len(p.latencyBounds)) + } + + p.latencyBucketCounts[resourceAttrKey][mKey][index]++ + + if _, ok := p.latencySum[resourceAttrKey]; ok { + p.latencySum[resourceAttrKey][mKey] += latency + } else { + p.latencySum[resourceAttrKey] = map[metricKey]float64{mKey: latency} + } + + if _, ok := p.latencyCount[resourceAttrKey]; ok { + p.latencyCount[resourceAttrKey][mKey]++ + } else { + p.latencyCount[resourceAttrKey] = map[metricKey]uint64{mKey: 1} } - p.latencySum[key] += latency - p.latencyCount[key]++ - p.latencyBucketCounts[key][index]++ } -func buildDimensionKVs(serviceName string, span pdata.Span, optionalDims []Dimension) dimKV { - dims := make(dimKV) - dims[serviceNameKey] = serviceName +func buildDimensionKVs(span pdata.Span, optionalDims []Dimension) kvPairs { + dims := make(kvPairs) dims[operationKey] = span.Name() dims[spanKindKey] = span.Kind().String() dims[statusCodeKey] = span.Status().Code().String() @@ -350,28 +412,41 @@ func buildDimensionKVs(serviceName string, span pdata.Span, optionalDims []Dimen return dims } -func concatDimensionValue(metricKeyBuilder *strings.Builder, value string, prefixSep bool) { +func extractResourceAttrsByKeys(serviceName string, keys []Dimension, resourceAttrs pdata.AttributeMap) kvPairs { + dims := make(kvPairs) + dims[serviceNameKey] = serviceName + for _, ra := range keys { + if attr, ok := resourceAttrs.Get(ra.Name); ok { + dims[ra.Name] = tracetranslator.AttributeValueToString(attr) + } else if ra.Default != nil { + // Set the default if configured, otherwise this metric should have no value set for the resource attribute. + dims[ra.Name] = *ra.Default + } + } + return dims +} + +func concatDimensionValue(metricKeyBuilder *strings.Builder, value string) { // It's worth noting that from pprof benchmarks, WriteString is the most expensive operation of this processor. // Specifically, the need to grow the underlying []byte slice to make room for the appended string. - if prefixSep { + if metricKeyBuilder.Len() != 0 { metricKeyBuilder.WriteString(metricKeySeparator) } metricKeyBuilder.WriteString(value) } -// buildKey builds the metric key from the service name and span metadata such as operation, kind, status_code and +// buildMetricKey builds the metric key from the service name and span metadata such as operation, kind, status_code and // any additional dimensions the user has configured. // The metric key is a simple concatenation of dimension values. -func buildKey(serviceName string, span pdata.Span, optionalDims []Dimension) metricKey { +func (p *processorImp) buildMetricKey(span pdata.Span) metricKey { var metricKeyBuilder strings.Builder - concatDimensionValue(&metricKeyBuilder, serviceName, false) - concatDimensionValue(&metricKeyBuilder, span.Name(), true) - concatDimensionValue(&metricKeyBuilder, span.Kind().String(), true) - concatDimensionValue(&metricKeyBuilder, span.Status().Code().String(), true) + concatDimensionValue(&metricKeyBuilder, span.Name()) + concatDimensionValue(&metricKeyBuilder, span.Kind().String()) + concatDimensionValue(&metricKeyBuilder, span.Status().Code().String()) spanAttr := span.Attributes() var value string - for _, d := range optionalDims { + for _, d := range p.dimensions { // Set the default if configured, otherwise this metric will have no value set for the dimension. if d.Default != nil { value = *d.Default @@ -379,19 +454,47 @@ func buildKey(serviceName string, span pdata.Span, optionalDims []Dimension) met if attr, ok := spanAttr.Get(d.Name); ok { value = tracetranslator.AttributeValueToString(attr) } - concatDimensionValue(&metricKeyBuilder, value, true) + concatDimensionValue(&metricKeyBuilder, value) } k := metricKey(metricKeyBuilder.String()) return k } +func (p *processorImp) buildResourceAttrKey(serviceName string, resourceAttr pdata.AttributeMap) resourceKey { + var resourceKeyBuilder strings.Builder + concatDimensionValue(&resourceKeyBuilder, serviceName) + + var value string + for _, ra := range p.resourceAttributes { + // Set the default if configured, otherwise this metric will have no value set for the resource attribute. + if ra.Default != nil { + value = *ra.Default + } + if attr, ok := resourceAttr.Get(value); ok { + value = tracetranslator.AttributeValueToString(attr) + } + concatDimensionValue(&resourceKeyBuilder, value) + } + + k := resourceKey(resourceKeyBuilder.String()) + return k +} + // cache the dimension key-value map for the metricKey if there is a cache miss. // This enables a lookup of the dimension key-value map when constructing the metric like so: // LabelsMap().InitFromMap(p.metricKeyToDimensions[key]) -func (p *processorImp) cache(serviceName string, span pdata.Span, k metricKey) { +func (p *processorImp) cacheMetricKey(span pdata.Span, k metricKey) { if _, ok := p.metricKeyToDimensions[k]; !ok { - p.metricKeyToDimensions[k] = buildDimensionKVs(serviceName, span, p.dimensions) + p.metricKeyToDimensions[k] = buildDimensionKVs(span, p.dimensions) + } +} + +// cache the dimension key-value map for the resourceAttrKey if there is a cache miss. +// This enables a lookup of the dimension key-value map when constructing the resource. +func (p *processorImp) cacheResourceAttrKey(serviceName string, resourceAttrs pdata.AttributeMap, k resourceKey) { + if _, ok := p.resourceKeyToDimensions[k]; !ok { + p.resourceKeyToDimensions[k] = extractResourceAttrsByKeys(serviceName, p.resourceAttributes, resourceAttrs) } } diff --git a/processor/spanmetricsprocessor/processor_test.go b/processor/spanmetricsprocessor/processor_test.go index 22e8973a27e7..45dd358eff76 100644 --- a/processor/spanmetricsprocessor/processor_test.go +++ b/processor/spanmetricsprocessor/processor_test.go @@ -17,6 +17,7 @@ package spanmetricsprocessor import ( "context" "fmt" + "reflect" "testing" "time" @@ -47,6 +48,13 @@ const ( notInSpanAttrName0 = "shouldBeInMetric" notInSpanAttrName1 = "shouldNotBeInMetric" + resourceAttr1 = "resourceAttr1" + resourceAttr2 = "resourceAttr2" + notInSpanResourceAttr0 = "resourceAttrShouldBeInMetric" + notInSpanResourceAttr1 = "resourceAttrShouldNotBeInMetric" + + defaultNotInSpanAttrVal = "defaultNotInSpanAttrVal" + sampleLatency = 11 sampleLatencyDuration = sampleLatency * time.Millisecond ) @@ -215,6 +223,79 @@ func TestProcessorConsumeTraces(t *testing.T) { assert.NoError(t, err) } +func TestResourceCopying(t *testing.T) { + // Prepare + mexp := &mocks.MetricsExporter{} + tcon := &mocks.TracesConsumer{} + + mexp.On("ConsumeMetrics", mock.Anything, mock.MatchedBy(func(input pdata.Metrics) bool { + rm := input.ResourceMetrics() + require.Equal(t, 2, rm.Len()) + + serviceAResourceMetrics := rm.At(0) + serviceBResourceMetrics := rm.At(1) + + require.Equal(t, 4, serviceAResourceMetrics.Resource().Attributes().Len()) + require.Equal(t, 4, serviceAResourceMetrics.InstrumentationLibraryMetrics().At(0).Metrics().Len()) + require.Equal(t, 2, serviceBResourceMetrics.Resource().Attributes().Len()) + require.Equal(t, 2, serviceBResourceMetrics.InstrumentationLibraryMetrics().At(0).Metrics().Len()) + + wantResourceAttrServiceA := map[string]string{ + resourceAttr1: "1", + resourceAttr2: "2", + notInSpanResourceAttr0: defaultNotInSpanAttrVal, + serviceNameKey: "service-a", + } + serviceAResourceMetrics.Resource().Attributes().Range(func(k string, v pdata.AttributeValue) bool { + value := v.StringVal() + switch k { + case notInSpanResourceAttr1: + assert.Fail(t, notInSpanResourceAttr1+" should not be in this metric") + default: + assert.Equal(t, wantResourceAttrServiceA[k], value) + delete(wantResourceAttrServiceA, k) + } + return true + }) + assert.Empty(t, wantResourceAttrServiceA, "Did not see all expected dimensions in metric. Missing: ", wantResourceAttrServiceA) + + wantResourceAttrServiceB := map[string]string{ + notInSpanResourceAttr0: defaultNotInSpanAttrVal, + serviceNameKey: "service-b", + } + serviceBResourceMetrics.Resource().Attributes().Range(func(k string, v pdata.AttributeValue) bool { + value := v.StringVal() + switch k { + case notInSpanResourceAttr1: + assert.Fail(t, notInSpanResourceAttr1+" should not be in this metric") + default: + assert.Equal(t, wantResourceAttrServiceB[k], value) + delete(wantResourceAttrServiceB, k) + } + return true + }) + assert.Empty(t, wantResourceAttrServiceB, "Did not see all expected dimensions in metric. Missing: ", wantResourceAttrServiceB) + + return true + })).Return(nil) + + tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) + + defaultNullValue := "defaultNullValue" + p := newProcessorImp(mexp, tcon, &defaultNullValue) + + traces := buildSampleTrace() + traces.ResourceSpans().At(0).Resource().Attributes().Insert(resourceAttr1, pdata.NewAttributeValueString("1")) + traces.ResourceSpans().At(0).Resource().Attributes().Insert(resourceAttr2, pdata.NewAttributeValueString("2")) + + // Test + ctx := metadata.NewIncomingContext(context.Background(), nil) + err := p.ConsumeTraces(ctx, traces) + + // Verify + assert.NoError(t, err) +} + func TestMetricKeyCache(t *testing.T) { // Prepare mexp := &mocks.MetricsExporter{} @@ -235,7 +316,7 @@ func TestMetricKeyCache(t *testing.T) { // Validate require.NoError(t, err) - origKeyCache := make(map[metricKey]dimKV) + origKeyCache := make(map[metricKey]kvPairs) for k, v := range p.metricKeyToDimensions { origKeyCache[k] = v } @@ -265,17 +346,17 @@ func BenchmarkProcessorConsumeTraces(b *testing.B) { } func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, defaultNullValue *string) *processorImp { - defaultNotInSpanAttrVal := "defaultNotInSpanAttrVal" + localDefaultNotInSpanAttrVal := defaultNotInSpanAttrVal return &processorImp{ logger: zap.NewNop(), metricsExporter: mexp, nextConsumer: tcon, startTime: time.Now(), - callSum: make(map[metricKey]int64), - latencySum: make(map[metricKey]float64), - latencyCount: make(map[metricKey]uint64), - latencyBucketCounts: make(map[metricKey][]uint64), + callSum: make(map[resourceKey]map[metricKey]int64), + latencySum: make(map[resourceKey]map[metricKey]float64), + latencyCount: make(map[resourceKey]map[metricKey]uint64), + latencyBucketCounts: make(map[resourceKey]map[metricKey][]uint64), latencyBounds: defaultLatencyHistogramBucketsMs, dimensions: []Dimension{ // Set nil defaults to force a lookup for the attribute in the span. @@ -287,11 +368,19 @@ func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, de {arrayAttrName, nil}, {nullAttrName, defaultNullValue}, // Add a default value for an attribute that doesn't exist in a span - {notInSpanAttrName0, &defaultNotInSpanAttrVal}, + {notInSpanAttrName0, &localDefaultNotInSpanAttrVal}, // Leave the default value unset to test that this dimension should not be added to the metric. {notInSpanAttrName1, nil}, }, - metricKeyToDimensions: make(map[metricKey]dimKV), + resourceAttributes: []Dimension{ + {resourceAttr1, nil}, + {resourceAttr2, nil}, + {notInSpanResourceAttr0, &localDefaultNotInSpanAttrVal}, + {notInSpanResourceAttr1, nil}, + }, + resourceAttrList: make(map[resourceKey]bool), + metricKeyToDimensions: make(map[metricKey]kvPairs), + resourceKeyToDimensions: make(map[resourceKey]kvPairs), } } @@ -304,19 +393,33 @@ func verifyConsumeMetricsInput(input pdata.Metrics, t *testing.T) bool { ) rm := input.ResourceMetrics() - require.Equal(t, 1, rm.Len()) + require.Equal(t, 2, rm.Len()) ilm := rm.At(0).InstrumentationLibraryMetrics() require.Equal(t, 1, ilm.Len()) - assert.Equal(t, "spanmetricsprocessor", ilm.At(0).InstrumentationLibrary().Name()) + assert.Equal(t, instrumentationLibraryName, ilm.At(0).InstrumentationLibrary().Name()) m := ilm.At(0).Metrics() - require.Equal(t, 6, m.Len()) + require.Equal(t, 4, m.Len()) + + verifyMetrics(m, 2, t) + + ilm1 := rm.At(1).InstrumentationLibraryMetrics() + require.Equal(t, 1, ilm1.Len()) + assert.Equal(t, instrumentationLibraryName, ilm1.At(0).InstrumentationLibrary().Name()) + m1 := ilm1.At(0).Metrics() + require.Equal(t, 2, m1.Len()) + verifyMetrics(m1, 1, t) + + return true +} + +func verifyMetrics(m pdata.MetricSlice, numOfCallCounts int, t *testing.T) { seenMetricIDs := make(map[metricID]bool) mi := 0 - // The first 3 metrics are for call counts. - for ; mi < 3; mi++ { + // The first metrics are for call counts. + for ; mi < numOfCallCounts; mi++ { assert.Equal(t, "calls_total", m.At(mi).Name()) data := m.At(mi).IntSum() @@ -368,7 +471,6 @@ func verifyConsumeMetricsInput(input pdata.Metrics, t *testing.T) bool { } verifyMetricLabels(dp, t, seenMetricIDs) } - return true } func verifyMetricLabels(dp metricDataPoint, t *testing.T, seenMetricIDs map[metricID]bool) { @@ -381,12 +483,10 @@ func verifyMetricLabels(dp metricDataPoint, t *testing.T, seenMetricIDs map[metr nullAttrName: "", arrayAttrName: "[]", mapAttrName: "{}", - notInSpanAttrName0: "defaultNotInSpanAttrVal", + notInSpanAttrName0: defaultNotInSpanAttrVal, } dp.LabelsMap().Range(func(k string, v string) bool { switch k { - case serviceNameKey: - mID.service = v case operationKey: mID.operation = v case spanKindKey: @@ -491,13 +591,24 @@ func newOTLPExporters(t *testing.T) (*otlpexporter.Config, component.MetricsExpo } func TestBuildKey(t *testing.T) { + // Prepare + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + + // Test + next := new(consumertest.TracesSink) + p, err := newProcessor(zap.NewNop(), cfg, next) + + // Verify + assert.NoError(t, err) + span0 := pdata.NewSpan() span0.SetName("c") - k0 := buildKey("ab", span0, nil) + k0 := p.buildMetricKey(span0) span1 := pdata.NewSpan() span1.SetName("bc") - k1 := buildKey("a", span1, nil) + k1 := p.buildMetricKey(span1) assert.NotEqual(t, k0, k1) } @@ -518,6 +629,22 @@ func TestProcessorDuplicateDimensions(t *testing.T) { assert.Nil(t, p) } +func TestProcessorDuplicateResourceAttributes(t *testing.T) { + // Prepare + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + // Duplicate dimension with reserved label after sanitization. + cfg.ResourceAttributes = []Dimension{ + {Name: "service.name"}, + } + + // Test + next := new(consumertest.TracesSink) + p, err := newProcessor(zap.NewNop(), cfg, next) + assert.Error(t, err) + assert.Nil(t, p) +} + func TestValidateDimensions(t *testing.T) { for _, tc := range []struct { name string @@ -573,7 +700,8 @@ func TestValidateDimensions(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - err := validateDimensions(tc.dimensions) + t.Parallel() + err := validateDimensions(tc.dimensions, []string{serviceNameKey, spanKindKey, statusCodeKey}) if tc.expectedErr != "" { assert.EqualError(t, err, tc.expectedErr) } else { @@ -590,3 +718,68 @@ func TestSanitize(t *testing.T) { require.Equal(t, "test", sanitize("test")) require.Equal(t, "test__", sanitize("test_/")) } + +func TestTraceWithoutServiceNameDoesNotGenerateMetrics(t *testing.T) { + // Prepare + mexp := &mocks.MetricsExporter{} + tcon := &mocks.TracesConsumer{} + + mexp.On("ConsumeMetrics", mock.Anything, mock.MatchedBy(func(input pdata.Metrics) bool { + require.Equal(t, 0, input.MetricCount(), + "Should be 0 as the trace does not have a service name and hence is skipped when building metrics", + ) + return true + })).Return(nil) + tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) + + defaultNullValue := "defaultNullValue" + p := newProcessorImp(mexp, tcon, &defaultNullValue) + + trace := pdata.NewTraces() + + initServiceSpans( + serviceSpans{ + serviceName: "", + spans: []span{ + { + operation: "/ping", + kind: pdata.SpanKindServer, + statusCode: pdata.StatusCodeOk, + }, + { + operation: "/ping", + kind: pdata.SpanKindClient, + statusCode: pdata.StatusCodeOk, + }, + }, + }, trace.ResourceSpans().AppendEmpty()) + + // Test + ctx := metadata.NewIncomingContext(context.Background(), nil) + err := p.ConsumeTraces(ctx, trace) + + // Verify + assert.NoError(t, err) +} + +func TestDimensionsAndResourceAttributesOrdered(t *testing.T) { + // Prepare + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + + // Test + next := new(consumertest.TracesSink) + p, err := newProcessor(zap.NewNop(), cfg, next) + + // Verify + assert.NoError(t, err) + + dimType := reflect.TypeOf(p.dimensions).Kind() + resourceAttrType := reflect.TypeOf(p.resourceAttributes).Kind() + + // dimensions and resource attributes must be of an ordered type e.g Slice. + // This is because the aggregation generates a string of concatenated key value pairs + // and hence is dependent on the order of the keys. + assert.Equal(t, reflect.Slice, dimType) + assert.Equal(t, reflect.Slice, resourceAttrType) +}