Skip to content

Commit

Permalink
Unit test for filtering and some filtering fixes
Browse files Browse the repository at this point in the history
Signed-off-by: John <john.dorman@sony.com>
  • Loading branch information
boostchicken authored and jpkrohling committed Jan 18, 2022
1 parent 00bc404 commit 35595c8
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 35 deletions.
5 changes: 1 addition & 4 deletions processor/filterprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type MetricFilters struct {
RegexpConfig *regexp.Config `mapstructure:"regexp"`
}

// SpanFilters filters by Span attributes and various other fields
// SpanFilters filters by Span attributes and various other fields, Regexp config is per matcher
type SpanFilters struct {
// Include match properties describe spans that should be included in the Collector Service pipeline,
// all other spans should be dropped from further processing.
Expand All @@ -61,9 +61,6 @@ type SpanFilters struct {
// all other spans should be included.
// If both Include and Exclude are specified, Include filtering occurs first.
Exclude *filterconfig.MatchProperties `mapstructure:"exclude"`

// RegexpConfig specifies options for the Regexp match type
RegexpConfig *regexp.Config `mapstructure:"regexp"`
}

// LogFilters filters by Log properties.
Expand Down
58 changes: 27 additions & 31 deletions processor/filterprocessor/filter_processor_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package filterprocessor
import (
"context"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterspan"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/processor/processorhelper"
Expand All @@ -36,11 +35,7 @@ func newFilterSpansProcessor(logger *zap.Logger, cfg *Config) (*filterSpanProces
return nil, nil
}

inc, err := createSpanMatcher(cfg.Spans.Include)
if err != nil {
return nil, err
}
exc, err := createSpanMatcher(cfg.Spans.Exclude)
inc, exc, err := createSpanMatcher(cfg)
if err != nil {
return nil, err
}
Expand All @@ -56,20 +51,10 @@ func newFilterSpansProcessor(logger *zap.Logger, cfg *Config) (*filterSpanProces
}

logger.Info(
"Span filtering configured",
"Span filter configured",
zap.String("ID", cfg.ID().String()),
zap.String("[Include] match_type", includeMatchType),
zap.Any("[Include] attributes", cfg.Spans.Include.Attributes),
zap.Any("[Include] libraries", cfg.Spans.Include.Libraries),
zap.Any("[Include] attributes", cfg.Spans.Include.Resources),
zap.Strings("[Include] services", cfg.Spans.Include.Services),
zap.Strings("[Include] span_names", cfg.Spans.Include.SpanNames),
zap.String("[Exclude] match_type", excludeMatchType),
zap.Any("[Exclude] attributes", cfg.Spans.Exclude.Attributes),
zap.Any("[Exclude] libraries", cfg.Spans.Exclude.Libraries),
zap.Any("[Exclude] resources", cfg.Spans.Exclude.Resources),
zap.Strings("[Exclude] services", cfg.Spans.Exclude.Services),
zap.Strings("[Exclude] span_names", cfg.Spans.Exclude.SpanNames),

)

return &filterSpanProcessor{
Expand All @@ -80,11 +65,23 @@ func newFilterSpansProcessor(logger *zap.Logger, cfg *Config) (*filterSpanProces
}, nil
}

func createSpanMatcher(sp *filterconfig.MatchProperties) (filterspan.Matcher, error) {
if sp == nil {
return nil, nil
func createSpanMatcher(cfg *Config) (filterspan.Matcher, filterspan.Matcher, error) {
var includeMatcher filterspan.Matcher
var excludeMatcher filterspan.Matcher
var err error
if cfg.Spans.Include != nil {
includeMatcher, err = filterspan.NewMatcher(cfg.Spans.Include)
if err != nil {
return nil, nil, err
}
}
if cfg.Spans.Exclude != nil {
excludeMatcher, err = filterspan.NewMatcher(cfg.Spans.Exclude)
if err != nil {
return nil, nil, err
}
}
return filterspan.NewMatcher(sp)
return includeMatcher, excludeMatcher, nil
}

// processTraces filters the given spans of a traces based off the filterSpanProcessor's filters.
Expand All @@ -94,37 +91,36 @@ func (fsp *filterSpanProcessor) processTraces(_ context.Context, pdt pdata.Trace
for x := 0; x < resSpan.InstrumentationLibrarySpans().Len(); x++ {
ils := resSpan.InstrumentationLibrarySpans().At(x)
ils.Spans().RemoveIf(func(span pdata.Span) bool {
return !fsp.shouldKeepSpan(span, resSpan.Resource(), ils.InstrumentationLibrary())
return fsp.shouldRemoveSpan(span, resSpan.Resource(), ils.InstrumentationLibrary())
})
}

// Remove empty elements, that way if we delete everything we can tell
// the pipeline to stop processing completely (ErrSkipProcessingData)
resSpan.InstrumentationLibrarySpans().RemoveIf(func(ilsSpans pdata.InstrumentationLibrarySpans) bool {
return ilsSpans.Spans().Len() == 0
})
pdt.ResourceSpans().RemoveIf(func(res pdata.ResourceSpans) bool {
return res.InstrumentationLibrarySpans().Len() == 0
})
}
pdt.ResourceSpans().RemoveIf(func(res pdata.ResourceSpans) bool {
return res.InstrumentationLibrarySpans().Len() == 0
})
if pdt.ResourceSpans().Len() == 0 {
return pdt, processorhelper.ErrSkipProcessingData
}
return pdt, nil
}

func (fsp *filterSpanProcessor) shouldKeepSpan(span pdata.Span, resource pdata.Resource, library pdata.InstrumentationLibrary) bool {
func (fsp *filterSpanProcessor) shouldRemoveSpan(span pdata.Span, resource pdata.Resource, library pdata.InstrumentationLibrary) bool {
if fsp.include != nil {
if i := fsp.include.MatchSpan(span, resource, library); !i {
return false
return true
}
}

if fsp.exclude != nil {
if e := fsp.exclude.MatchSpan(span, resource, library); e {
return false
return true
}
}

return true
return false
}
160 changes: 160 additions & 0 deletions processor/filterprocessor/filter_processor_traces_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package filterprocessor

import (
"context"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/model/pdata"
"testing"
)

type testTrace struct {
span_name string
library_name string
library_version string
resource_attributes map[string]pdata.AttributeValue
tags map[string]pdata.AttributeValue
}

type traceTest struct {
name string
inc *filterconfig.MatchProperties
exc *filterconfig.MatchProperties
inTraces pdata.Traces
allTracesFiltered bool
spanCount int
}

var redisTraces = []testTrace{
{
span_name: "test!",
library_name: "otel",
library_version: "11",
resource_attributes: map[string]pdata.AttributeValue{
"service.name": pdata.NewAttributeValueString("test_service"),
},
tags: map[string]pdata.AttributeValue{
"db.type": pdata.NewAttributeValueString("redis"),
},
},
}

var nameTraces = []testTrace{
{
span_name: "test!",
library_name: "otel",
library_version: "11",
resource_attributes: map[string]pdata.AttributeValue{
"service.name": pdata.NewAttributeValueString("keep"),
},
tags: map[string]pdata.AttributeValue{
"db.type": pdata.NewAttributeValueString("redis"),
},
},
{
span_name: "test!",
library_name: "otel",
library_version: "11",
resource_attributes: map[string]pdata.AttributeValue{
"service.name": pdata.NewAttributeValueString("dont_keep"),
},
tags: map[string]pdata.AttributeValue{
"db.type": pdata.NewAttributeValueString("redis"),
"service.name": pdata.NewAttributeValueString("dont_keep"),
},
},
{
span_name: "test!",
library_name: "otel",
library_version: "11",
resource_attributes: map[string]pdata.AttributeValue{
"service.name": pdata.NewAttributeValueString("keep"),
},
tags: map[string]pdata.AttributeValue{
"db.type": pdata.NewAttributeValueString("redis"),
},
},
}
var serviceNameMatchProperties = &filterconfig.MatchProperties{Config: filterset.Config{MatchType: filterset.Strict}, Services: []string{"keep"}}
var redisMatchProperties = &filterconfig.MatchProperties{Attributes: []filterconfig.Attribute{{Key: "db.type", Value: "redis"}}}
var standardTraceTests = []traceTest{
{
name: "filterRedis",
exc: redisMatchProperties,
inTraces: generateTraces(redisTraces),
allTracesFiltered: true,
},
{
name: "keepRedis",
inc: redisMatchProperties,
inTraces: generateTraces(redisTraces),
spanCount: 1,
},
{
name: "keepServiceName",
inc: serviceNameMatchProperties,
inTraces: generateTraces(nameTraces),
spanCount: 2,
},
}

func TestFilterTraceProcessor(t *testing.T) {
for _, test := range standardTraceTests {
t.Run(test.name, func(t *testing.T) {
// next stores the results of the filter metric processor
next := new(consumertest.TracesSink)
cfg := &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
Spans: SpanFilters{
Include: test.inc,
Exclude: test.exc,
},
}
factory := NewFactory()
fmp, err := factory.CreateTracesProcessor(
context.Background(),
componenttest.NewNopProcessorCreateSettings(),
cfg,
next,
)
assert.NotNil(t, fmp)
assert.Nil(t, err)

caps := fmp.Capabilities()
assert.True(t, caps.MutatesData)
ctx := context.Background()
assert.NoError(t, fmp.Start(ctx, nil))

cErr := fmp.ConsumeTraces(context.Background(), test.inTraces)
assert.Nil(t, cErr)
got := next.AllTraces()

if test.allTracesFiltered {
require.Equal(t, 0, len(got))
} else {
require.Equal(t, test.spanCount, got[0].SpanCount())
}
assert.NoError(t, fmp.Shutdown(ctx))
})
}
}
func generateTraces(tests []testTrace) pdata.Traces {
md := pdata.NewTraces()

for _, test := range tests {
rs := md.ResourceSpans().AppendEmpty()
ils := rs.InstrumentationLibrarySpans().AppendEmpty()
ils.InstrumentationLibrary().SetName(test.library_name)
ils.InstrumentationLibrary().SetVersion(test.library_version)
rs.Resource().Attributes().InitFromMap(test.resource_attributes)
span := ils.Spans().AppendEmpty()
span.Attributes().InitFromMap(test.tags)
span.SetName(test.span_name)
}
return md
}

0 comments on commit 35595c8

Please sign in to comment.