Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spanmetricsprocessor] Validate duplicate dimensions at start #2844

Merged
merged 3 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion processor/spanmetricsprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ func createDefaultConfig() configmodels.Processor {
}

func createTraceProcessor(_ context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, nextConsumer consumer.Traces) (component.TracesProcessor, error) {
return newProcessor(params.Logger, cfg, nextConsumer), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

return newProcessor(params.Logger, cfg, nextConsumer)
}
68 changes: 66 additions & 2 deletions processor/spanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"sync"
"time"
"unicode"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
Expand Down Expand Up @@ -82,7 +83,7 @@ type processorImp struct {
metricKeyToDimensions map[metricKey]dimKV
}

func newProcessor(logger *zap.Logger, config configmodels.Exporter, nextConsumer consumer.Traces) *processorImp {
func newProcessor(logger *zap.Logger, config configmodels.Exporter, nextConsumer consumer.Traces) (*processorImp, error) {
logger.Info("Building spanmetricsprocessor")
pConfig := config.(*Config)

Expand All @@ -98,6 +99,10 @@ func newProcessor(logger *zap.Logger, config configmodels.Exporter, nextConsumer
}
}

if err := validateDimensions(pConfig.Dimensions); err != nil {
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test to cover this line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

return &processorImp{
logger: logger,
config: *pConfig,
Expand All @@ -110,7 +115,7 @@ func newProcessor(logger *zap.Logger, config configmodels.Exporter, nextConsumer
nextConsumer: nextConsumer,
dimensions: pConfig.Dimensions,
metricKeyToDimensions: make(map[metricKey]dimKV),
}
}, nil
}

func mapDurationsToMillis(vs []time.Duration, f func(duration time.Duration) float64) []float64 {
Expand All @@ -121,6 +126,35 @@ func mapDurationsToMillis(vs []time.Duration, f func(duration time.Duration) flo
return vsm
}

// validateDimensions checks duplicates for reserved dimensions and additional dimensions. Considering
// the usage of Prometheus related exporters, we also validate the dimensions after sanitization.
func validateDimensions(dimensions []Dimension) error {
labelNames := make(map[string]struct{})
for _, key := range []string{serviceNameKey, spanKindKey, statusCodeKey} {
labelNames[key] = struct{}{}
labelNames[sanitize(key)] = struct{}{}
}
labelNames[operationKey] = struct{}{}

for _, key := range dimensions {
if _, ok := labelNames[key.Name]; ok {
return fmt.Errorf("duplicate dimension name %s", key.Name)
}
labelNames[key.Name] = struct{}{}

sanitizedName := sanitize(key.Name)
if sanitizedName == key.Name {
continue
}
if _, ok := labelNames[sanitizedName]; ok {
return fmt.Errorf("duplicate dimension name %s after sanitization", sanitizedName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test to cover this line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
labelNames[sanitizedName] = struct{}{}
}

return nil
}

// Start implements the component.Component interface.
func (p *processorImp) Start(ctx context.Context, host component.Host) error {
p.logger.Info("Starting spanmetricsprocessor")
Expand Down Expand Up @@ -379,3 +413,33 @@ func (p *processorImp) cache(serviceName string, span pdata.Span, k metricKey) {
p.metricKeyToDimensions[k] = buildDimensionKVs(serviceName, span, p.dimensions)
}
}

// copied from prometheus-go-metric-exporter
// sanitize replaces non-alphanumeric characters with underscores in s.
func sanitize(s string) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth copying the tests from prometheusexporter too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if len(s) == 0 {
return s
}

// Note: No length limit for label keys because Prometheus doesn't
// define a length limit, thus we should NOT be truncating label keys.
// See https://github.com/orijtech/prometheus-go-metrics-exporter/issues/4.
s = strings.Map(sanitizeRune, s)
if unicode.IsDigit(rune(s[0])) {
s = "key_" + s
}
if s[0] == '_' {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use delimiter here instead? or otherwise, remove the delimiter and keyStr constants as in the original sanitize implementation, and use string literals instead, which are just as readable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here _ is a rune while delimiter is a string type. For readability, I use string literals now.

s = "key" + s
}
return s
}

// copied from prometheus-go-metric-exporter
// sanitizeRune converts anything that is not a letter or digit to an underscore
func sanitizeRune(r rune) rune {
if unicode.IsLetter(r) || unicode.IsDigit(r) {
return r
}
// Everything else turns into an underscore
return '_'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and here as well?

}
97 changes: 94 additions & 3 deletions processor/spanmetricsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ func TestProcessorShutdown(t *testing.T) {

// Test
next := new(consumertest.TracesSink)
p := newProcessor(zap.NewNop(), cfg, next)
err := p.Shutdown(context.Background())
p, err := newProcessor(zap.NewNop(), cfg, next)
assert.NoError(t, err)
err = p.Shutdown(context.Background())

// Verify
assert.NoError(t, err)
Expand All @@ -139,7 +140,8 @@ func TestProcessorCapabilities(t *testing.T) {

// Test
next := new(consumertest.TracesSink)
p := newProcessor(zap.NewNop(), cfg, next)
p, err := newProcessor(zap.NewNop(), cfg, next)
assert.NoError(t, err)
caps := p.GetCapabilities()

// Verify
Expand Down Expand Up @@ -493,3 +495,92 @@ func TestBuildKey(t *testing.T) {

assert.NotEqual(t, k0, k1)
}

func TestProcessorDuplicateDimensions(t *testing.T) {
// Prepare
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
// Duplicate dimension with reserved label after sanitization.
cfg.Dimensions = []Dimension{
{Name: "status_code"},
}

// Test
next := new(consumertest.TracesSink)
p, err := newProcessor(zap.NewNop(), cfg, next)
assert.Error(t, err)
assert.Nil(t, p)
}

func TestValidateDimensions(t *testing.T) {
for _, tc := range []struct {
name string
dimensions []Dimension
expectedErr string
}{
{
name: "no additional dimensions",
dimensions: []Dimension{},
},
{
name: "no duplicate dimensions",
dimensions: []Dimension{
{Name: "http.service_name"},
{Name: "http.status_code"},
},
},
{
name: "duplicate dimension with reserved labels",
dimensions: []Dimension{
{Name: "service.name"},
},
expectedErr: "duplicate dimension name service.name",
},
{
name: "duplicate dimension with reserved labels after sanitization",
dimensions: []Dimension{
{Name: "service_name"},
},
expectedErr: "duplicate dimension name service_name",
},
{
name: "duplicate additional dimensions",
dimensions: []Dimension{
{Name: "service_name"},
{Name: "service_name"},
},
expectedErr: "duplicate dimension name service_name",
},
{
name: "duplicate additional dimensions after sanitization",
dimensions: []Dimension{
{Name: "http.status_code"},
{Name: "http!status_code"},
},
expectedErr: "duplicate dimension name http_status_code after sanitization",
},
{
name: "we skip the case if the dimension name is the same after sanitization",
dimensions: []Dimension{
{Name: "http_status_code"},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
err := validateDimensions(tc.dimensions)
if tc.expectedErr != "" {
assert.EqualError(t, err, tc.expectedErr)
} else {
assert.NoError(t, err)
}
})
}
}

func TestSanitize(t *testing.T) {
require.Equal(t, "", sanitize(""), "")
require.Equal(t, "key_test", sanitize("_test"))
require.Equal(t, "key_0test", sanitize("0test"))
require.Equal(t, "test", sanitize("test"))
require.Equal(t, "test__", sanitize("test_/"))
}