Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce cascading filter processor #359

Merged
merged 3 commits into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/httpforwarder"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/hostobserver"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cascadingfilterprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor"
Expand Down Expand Up @@ -153,6 +154,7 @@ func components() (component.Factories, error) {
}

processors := []component.ProcessorFactory{
cascadingfilterprocessor.NewFactory(),
sourceprocessor.NewFactory(),
groupbytraceprocessor.NewFactory(),
k8sprocessor.NewFactory(),
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/extension/httpforwarder v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/hostobserver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/processor/cascadingfilterprocessor v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -67,6 +68,8 @@ require (
honnef.co/go/tools v0.0.1-2020.1.6
)

replace go.opentelemetry.io/collector => github.com/SumoLogic/opentelemetry-collector v0.16.0-sumo

// Replace references to modules that are in this repository with their relateive paths
// so that we always build with current (latest) version of the source code.

Expand Down Expand Up @@ -172,6 +175,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/stanz

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/memcachedreceiver => ./receiver/memcachedreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/cascadingfilterprocessor => ./processor/cascadingfilterprocessor

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor => ./processor/groupbytraceprocessor

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor => ./processor/k8sprocessor/
Expand Down
1 change: 1 addition & 0 deletions processor/cascadingfilterprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
111 changes: 111 additions & 0 deletions processor/cascadingfilterprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Cascading Filter Processor

Supported pipeline types: traces

The Cascading Filter processor is a [fork of tailsamplingprocessor](../tailsamplingprocessor) which
allows for defining smart cascading filtering rules with preset limits.

## Processor configuration

The following configuration options should be configured as desired:
- `policies` (no default): Policies used to make a sampling decision
- `spans_per_second` (default = 1500): Maximum total number of emitted spans per second
- `probabilistic_filtering_ratio` (default = 0.2): Ratio of spans that are always probabilistically filtered
(hence might be used for metrics calculation)

The following configuration options can also be modified:
- `decision_wait` (default = 30s): Wait time since the first span of a trace before making a filtering decision
- `num_traces` (default = 50000): Number of traces kept in memory
- `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures)

## Policy configuration

Each defined policy is evaluated with order as specified in config. There are several properties:
- `name` (required): identifies the policy
- `spans_per_second` (default = 0): defines maximum number of spans per second that could be handled by this policy. When set to `-1`,
it selects the traces only if the global limit is not exceeded by other policies (however, without further limitations)

Additionally, each of the policy might have any of the following filtering criteria defined. They are evaluated for
each of the trace spans. If at least one span matching all defined criteria is found, the trace is selected:
- `numeric_attribute: {key: <name>, min_value: <min_value>, max_value: <max_value>}`: selects span my matching numeric
pmm-sumo marked this conversation as resolved.
Show resolved Hide resolved
attribute (either at resource of span level)
- `string_attribute: {key: <name>, values: [<value1>, <value2>]}`: selects span by matching string attribute that is one
of the provided values (either at resource of span level)
- `properties: { min_number_of_spans: <number>}`: selects the trace if it has at least provided number of spans
- `properties: { min_duration_micros: <duration>}`: selects the span if the duration is greater or equal the given value,
- `properties: { name_pattern: <regex>`}: selects the span if its operation name matches the provided regular expression

## Limiting the number of spans

There are two `spans_per_second` settings. The global one and the policy-one.

While evaluating traces, the limit is evaluated first on the policy level and then on the global level. The sum
of all `spans_per_second` rates might be actually higher than the global limit, but the latter will never be
exceeded (so some of the traces will not be included).

For example, we have 3 policies: `A, B, C`. Each of them has limit of `300` spans per second and the global limit
is `500` spans per second. Now, lets say, that there for each of the policies there were 5 distinct traces, each
having `100` spans and matching policy criteria (lets call them `A1, A2, ... B1, B2...` and so forth:

`Policy A`: `A1, A2, A3`
`Policy B`: `B1, B2, B3`
`Policy C`: `C1, C2, C3`

However, in total, this is `900` spans, which is more than the global limit of `500` spans/second. The processor
will take care of that and randomly select only the spans up to the global limit. So eventually, it might
for example send further only following traces: `A1, A2, B1, C2, C5` and filter out the others.

## Example

```yaml
processors:
cascading_filter:
decision_wait: 10s
num_traces: 100
expected_new_traces_per_sec: 10
spans_per_second: 1000
probabilistic_filtering_ratio: 0.1
policies:
[
{
name: test-policy-1,
},
{
name: test-policy-2,
numeric_attribute: {key: key1, min_value: 50, max_value: 100}
},
{
name: test-policy-3,
string_attribute: {key: key2, values: [value1, value2]}
},
{
name: test-policy-4,
spans_per_second: 35,
},
{
name: test-policy-5,
spans_per_second: 123,
numeric_attribute: {key: key1, min_value: 50, max_value: 100}
},
{
name: test-policy-6,
spans_per_second: 50,
properties: {min_duration_micros: 9000000 }
},
{
name: test-policy-7,
properties: {
name_pattern: "foo.*",
min_number_of_spans: 10,
min_duration_micros: 9000000
}
},
{
name: everything_else,
spans_per_second: -1
},
]
```

Refer to [cascading_filter_config.yaml](./testdata/cascading_filter_config.yaml) for detailed
examples on using the processor.
159 changes: 159 additions & 0 deletions processor/cascadingfilterprocessor/cascading_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cascadingfilterprocessor

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"

tsconfig "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cascadingfilterprocessor/config"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cascadingfilterprocessor/sampling"
)

var testValue = int64(10000)
var cfg = tsconfig.Config{
ProcessorSettings: configmodels.ProcessorSettings{},
DecisionWait: 2 * time.Second,
NumTraces: 100,
ExpectedNewTracesPerSec: 100,
SpansPerSecond: 1000,
PolicyCfgs: []tsconfig.PolicyCfg{
{
Name: "duration",
SpansPerSecond: 10,
PropertiesCfg: tsconfig.PropertiesCfg{
MinDurationMicros: &testValue,
},
},
{
Name: "everything else",
SpansPerSecond: -1,
},
},
}

func fillSpan(span *pdata.Span, durationMicros int64) {
nowTs := time.Now().UnixNano()
startTime := nowTs - durationMicros*1000

span.Attributes().InsertInt("foo", 55)
span.SetStartTime(pdata.TimestampUnixNano(startTime))
span.SetEndTime(pdata.TimestampUnixNano(nowTs))
}

func createTrace(fsp *cascadingFilterSpanProcessor, numSpans int, durationMicros int64) *sampling.TraceData {
var traceBatches []pdata.Traces

traces := pdata.NewTraces()
traces.ResourceSpans().Resize(1)
rs := traces.ResourceSpans().At(0)
rs.InstrumentationLibrarySpans().Resize(1)
ils := rs.InstrumentationLibrarySpans().At(0)

ils.Spans().Resize(numSpans)

for i := 0; i < numSpans; i++ {
span := ils.Spans().At(i)

fillSpan(&span, durationMicros)
}

traceBatches = append(traceBatches, traces)

return &sampling.TraceData{
Mutex: sync.Mutex{},
Decisions: make([]sampling.Decision, len(fsp.policies)),
ArrivalTime: time.Time{},
DecisionTime: time.Time{},
SpanCount: int64(numSpans),
ReceivedBatches: traceBatches,
}
}

func createCascadingEvaluator(t *testing.T) *cascadingFilterSpanProcessor {
cascading, err := newCascadingFilterSpanProcessor(zap.NewNop(), nil, cfg)
assert.NoError(t, err)
return cascading
}

var (
metrics = &policyMetrics{}
)

func TestSampling(t *testing.T) {
cascading := createCascadingEvaluator(t)

decision, _ := cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{0}), createTrace(cascading, 8, 1000000), metrics)
require.Equal(t, sampling.Sampled, decision)

decision, _ = cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{1}), createTrace(cascading, 1000, 1000), metrics)
require.Equal(t, sampling.SecondChance, decision)
}

func TestSecondChanceEvaluation(t *testing.T) {
cascading := createCascadingEvaluator(t)

decision, _ := cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{0}), createTrace(cascading, 8, 1000), metrics)
require.Equal(t, sampling.SecondChance, decision)

decision, _ = cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{1}), createTrace(cascading, 8, 1000), metrics)
require.Equal(t, sampling.SecondChance, decision)

// TODO: This could me optimized to make a decision within cascadingfilter processor, as such span would never fit anyway
//decision, _ = cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{1}), createTrace(8000, 1000), metrics)
//require.Equal(t, sampling.NotSampled, decision)
}

func TestProbabilisticFilter(t *testing.T) {
ratio := float32(0.5)
cfg.ProbabilisticFilteringRatio = &ratio
cascading := createCascadingEvaluator(t)

trace1 := createTrace(cascading, 8, 1000000)
decision, _ := cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{0}), trace1, metrics)
require.Equal(t, sampling.Sampled, decision)
require.True(t, trace1.SelectedByProbabilisticFilter)

trace2 := createTrace(cascading, 800, 1000000)
decision, _ = cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{1}), trace2, metrics)
require.Equal(t, sampling.SecondChance, decision)
require.False(t, trace2.SelectedByProbabilisticFilter)

ratio = float32(0.0)
cfg.ProbabilisticFilteringRatio = &ratio
}

//func TestSecondChanceReevaluation(t *testing.T) {
// cascading := createCascadingEvaluator()
//
// decision, _ := cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{1}), createTrace(100, 1000), metrics)
// require.Equal(t, sampling.Sampled, decision)
//
// // Too much
// decision, _ = cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{1}), createTrace(1000, 1000), metrics)
// require.Equal(t, sampling.NotSampled, decision)
//
// // Just right
// decision, _ = cascading.makeProvisionalDecision(pdata.NewTraceID([16]byte{1}), createTrace(900, 1000), metrics)
// require.Equal(t, sampling.Sampled, decision)
//}
Loading