Skip to content

Commit

Permalink
[spanmetricsprocessor] Validate duplicate dimensions at start (#2844)
Browse files Browse the repository at this point in the history
* validate duplicate dimensions at start

Signed-off-by: yeya24 <yb532204897@gmail.com>

* cleanup unit tests

Signed-off-by: yeya24 <yb532204897@gmail.com>

* update

Signed-off-by: yeya24 <yb532204897@gmail.com>
  • Loading branch information
yeya24 authored Mar 25, 2021
1 parent 5655ed6 commit 1eebd43
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 6 deletions.
2 changes: 1 addition & 1 deletion processor/spanmetricsprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ func createDefaultConfig() configmodels.Processor {
}

func createTraceProcessor(_ context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, nextConsumer consumer.Traces) (component.TracesProcessor, error) {
return newProcessor(params.Logger, cfg, nextConsumer), nil
return newProcessor(params.Logger, cfg, nextConsumer)
}
68 changes: 66 additions & 2 deletions processor/spanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"sync"
"time"
"unicode"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
Expand Down Expand Up @@ -82,7 +83,7 @@ type processorImp struct {
metricKeyToDimensions map[metricKey]dimKV
}

func newProcessor(logger *zap.Logger, config configmodels.Exporter, nextConsumer consumer.Traces) *processorImp {
func newProcessor(logger *zap.Logger, config configmodels.Exporter, nextConsumer consumer.Traces) (*processorImp, error) {
logger.Info("Building spanmetricsprocessor")
pConfig := config.(*Config)

Expand All @@ -98,6 +99,10 @@ func newProcessor(logger *zap.Logger, config configmodels.Exporter, nextConsumer
}
}

if err := validateDimensions(pConfig.Dimensions); err != nil {
return nil, err
}

return &processorImp{
logger: logger,
config: *pConfig,
Expand All @@ -110,7 +115,7 @@ func newProcessor(logger *zap.Logger, config configmodels.Exporter, nextConsumer
nextConsumer: nextConsumer,
dimensions: pConfig.Dimensions,
metricKeyToDimensions: make(map[metricKey]dimKV),
}
}, nil
}

func mapDurationsToMillis(vs []time.Duration, f func(duration time.Duration) float64) []float64 {
Expand All @@ -121,6 +126,35 @@ func mapDurationsToMillis(vs []time.Duration, f func(duration time.Duration) flo
return vsm
}

// validateDimensions checks duplicates for reserved dimensions and additional dimensions. Considering
// the usage of Prometheus related exporters, we also validate the dimensions after sanitization.
func validateDimensions(dimensions []Dimension) error {
labelNames := make(map[string]struct{})
for _, key := range []string{serviceNameKey, spanKindKey, statusCodeKey} {
labelNames[key] = struct{}{}
labelNames[sanitize(key)] = struct{}{}
}
labelNames[operationKey] = struct{}{}

for _, key := range dimensions {
if _, ok := labelNames[key.Name]; ok {
return fmt.Errorf("duplicate dimension name %s", key.Name)
}
labelNames[key.Name] = struct{}{}

sanitizedName := sanitize(key.Name)
if sanitizedName == key.Name {
continue
}
if _, ok := labelNames[sanitizedName]; ok {
return fmt.Errorf("duplicate dimension name %s after sanitization", sanitizedName)
}
labelNames[sanitizedName] = struct{}{}
}

return nil
}

// Start implements the component.Component interface.
func (p *processorImp) Start(ctx context.Context, host component.Host) error {
p.logger.Info("Starting spanmetricsprocessor")
Expand Down Expand Up @@ -379,3 +413,33 @@ func (p *processorImp) cache(serviceName string, span pdata.Span, k metricKey) {
p.metricKeyToDimensions[k] = buildDimensionKVs(serviceName, span, p.dimensions)
}
}

// copied from prometheus-go-metric-exporter
// sanitize replaces non-alphanumeric characters with underscores in s.
func sanitize(s string) string {
if len(s) == 0 {
return s
}

// Note: No length limit for label keys because Prometheus doesn't
// define a length limit, thus we should NOT be truncating label keys.
// See https://github.com/orijtech/prometheus-go-metrics-exporter/issues/4.
s = strings.Map(sanitizeRune, s)
if unicode.IsDigit(rune(s[0])) {
s = "key_" + s
}
if s[0] == '_' {
s = "key" + s
}
return s
}

// copied from prometheus-go-metric-exporter
// sanitizeRune converts anything that is not a letter or digit to an underscore
func sanitizeRune(r rune) rune {
if unicode.IsLetter(r) || unicode.IsDigit(r) {
return r
}
// Everything else turns into an underscore
return '_'
}
97 changes: 94 additions & 3 deletions processor/spanmetricsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ func TestProcessorShutdown(t *testing.T) {

// Test
next := new(consumertest.TracesSink)
p := newProcessor(zap.NewNop(), cfg, next)
err := p.Shutdown(context.Background())
p, err := newProcessor(zap.NewNop(), cfg, next)
assert.NoError(t, err)
err = p.Shutdown(context.Background())

// Verify
assert.NoError(t, err)
Expand All @@ -139,7 +140,8 @@ func TestProcessorCapabilities(t *testing.T) {

// Test
next := new(consumertest.TracesSink)
p := newProcessor(zap.NewNop(), cfg, next)
p, err := newProcessor(zap.NewNop(), cfg, next)
assert.NoError(t, err)
caps := p.GetCapabilities()

// Verify
Expand Down Expand Up @@ -493,3 +495,92 @@ func TestBuildKey(t *testing.T) {

assert.NotEqual(t, k0, k1)
}

func TestProcessorDuplicateDimensions(t *testing.T) {
// Prepare
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
// Duplicate dimension with reserved label after sanitization.
cfg.Dimensions = []Dimension{
{Name: "status_code"},
}

// Test
next := new(consumertest.TracesSink)
p, err := newProcessor(zap.NewNop(), cfg, next)
assert.Error(t, err)
assert.Nil(t, p)
}

func TestValidateDimensions(t *testing.T) {
for _, tc := range []struct {
name string
dimensions []Dimension
expectedErr string
}{
{
name: "no additional dimensions",
dimensions: []Dimension{},
},
{
name: "no duplicate dimensions",
dimensions: []Dimension{
{Name: "http.service_name"},
{Name: "http.status_code"},
},
},
{
name: "duplicate dimension with reserved labels",
dimensions: []Dimension{
{Name: "service.name"},
},
expectedErr: "duplicate dimension name service.name",
},
{
name: "duplicate dimension with reserved labels after sanitization",
dimensions: []Dimension{
{Name: "service_name"},
},
expectedErr: "duplicate dimension name service_name",
},
{
name: "duplicate additional dimensions",
dimensions: []Dimension{
{Name: "service_name"},
{Name: "service_name"},
},
expectedErr: "duplicate dimension name service_name",
},
{
name: "duplicate additional dimensions after sanitization",
dimensions: []Dimension{
{Name: "http.status_code"},
{Name: "http!status_code"},
},
expectedErr: "duplicate dimension name http_status_code after sanitization",
},
{
name: "we skip the case if the dimension name is the same after sanitization",
dimensions: []Dimension{
{Name: "http_status_code"},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
err := validateDimensions(tc.dimensions)
if tc.expectedErr != "" {
assert.EqualError(t, err, tc.expectedErr)
} else {
assert.NoError(t, err)
}
})
}
}

func TestSanitize(t *testing.T) {
require.Equal(t, "", sanitize(""), "")
require.Equal(t, "key_test", sanitize("_test"))
require.Equal(t, "key_0test", sanitize("0test"))
require.Equal(t, "test", sanitize("test"))
require.Equal(t, "test__", sanitize("test_/"))
}

0 comments on commit 1eebd43

Please sign in to comment.