Skip to content

Commit

Permalink
Add ability to sample logs
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Apr 8, 2022
1 parent ca63585 commit e2d5dee
Show file tree
Hide file tree
Showing 10 changed files with 433 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- `prometheusremotewriteexporter`: Translate resource attributes to the target info metric (#8493)
- `podmanreceiver`: Add API timeout configuration option (#9014)
- `cmd/mdatagen`: Add `sem_conv_version` field to metadata.yaml that is used to set metrics SchemaURL (#9010)
- `probabilistic_sampler`: Add ability to sample logs (#9118)

### 🛑 Breaking changes 🛑

Expand Down
36 changes: 34 additions & 2 deletions processor/probabilisticsamplerprocessor/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,39 @@
# Probabilistic Sampling Processor

Supported pipeline types: traces
Supported pipeline types: traces, logs

The probabilistic sampler supports two types of sampling:
The probabilistic sampler supports sampling logs by associating a sampling rate to log severity.

A default sampling rate is mandatory. Additionally, additional sampling rates associated with a log severity can be added.
Any message with a log severity equal or higher to the log severity will adopt the new sampling.

The probabilistic sampler optionally may use a `hash_seed` to compute the hash of a log record.
This sampler samples based on hash values determined by log records. In order for
log record hashing to work, all collectors for a given tier (e.g. behind the same load balancer)
must have the same `hash_seed`. It is also possible to leverage a different `hash_seed` at
different collector tiers to support additional sampling requirements. Please refer to
[config.go](./config.go) for the config spec.

The following configuration options can be modified:
- `hash_seed` (no default): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed.
- `sampling_percentage` (default = 0): Percentage at which logs are sampled; >= 100 samples all logs
- `severity/severity_level`: `SeverityText` associated with a [severity level](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#displaying-severity)
- `severity/sampling_percentage` (default = 0): Percentage at which logs are sampled when the severity is equal or higher to the severity text; >= 100 samples all logs
Examples:

```yaml
processors:
probabilistic_sampler:
hash_seed: 22
sampling_percentage: 15
severity:
- severity_level: error
sampling_percentage: 100
- severity_level: warn
sampling_percentage: 75
```
The probabilistic sampler supports two types of sampling for traces:
1. `sampling.priority` [semantic
convention](https://github.com/opentracing/specification/blob/master/semantic_conventions.md#span-tags-table)
Expand Down Expand Up @@ -31,3 +62,4 @@ processors:

Refer to [config.yaml](./testdata/config.yaml) for detailed
examples on using the processor.

56 changes: 53 additions & 3 deletions processor/probabilisticsamplerprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,76 @@
package probabilisticsamplerprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor"

import (
"fmt"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/model/pdata"
)

// Config has the configuration guiding the trace sampler processor.
var severityTextToNum = map[string]pdata.SeverityNumber{
"default": pdata.SeverityNumberUNDEFINED,
"trace": pdata.SeverityNumberTRACE,
"trace2": pdata.SeverityNumberTRACE2,
"trace3": pdata.SeverityNumberTRACE3,
"trace4": pdata.SeverityNumberTRACE4,
"debug": pdata.SeverityNumberDEBUG,
"debug2": pdata.SeverityNumberDEBUG2,
"debug3": pdata.SeverityNumberDEBUG3,
"debug4": pdata.SeverityNumberDEBUG4,
"info": pdata.SeverityNumberINFO,
"info2": pdata.SeverityNumberINFO2,
"info3": pdata.SeverityNumberINFO3,
"info4": pdata.SeverityNumberINFO4,
"warn": pdata.SeverityNumberWARN,
"warn2": pdata.SeverityNumberWARN2,
"warn3": pdata.SeverityNumberWARN3,
"warn4": pdata.SeverityNumberWARN4,
"error": pdata.SeverityNumberERROR,
"error2": pdata.SeverityNumberERROR2,
"error3": pdata.SeverityNumberERROR3,
"error4": pdata.SeverityNumberERROR4,
"fatal": pdata.SeverityNumberFATAL,
"fatal2": pdata.SeverityNumberFATAL2,
"fatal3": pdata.SeverityNumberFATAL3,
"fatal4": pdata.SeverityNumberFATAL4,
}

type severityPair struct {
Level string `mapstructure:"severity_level"`
SamplingPercentage float32 `mapstructure:"sampling_percentage"`
}

// Config has the configuration guiding the sampler processor.
type Config struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// SamplingPercentage is the percentage rate at which traces are going to be sampled. Defaults to zero, i.e.: no sample.
// Values greater or equal 100 are treated as "sample all traces".
// SamplingPercentage is the percentage rate at which traces or logs are going to be sampled. Defaults to zero, i.e.: no sample.
// Values greater or equal 100 are treated as "sample all traces/logs".
SamplingPercentage float32 `mapstructure:"sampling_percentage"`

// HashSeed allows one to configure the hashing seed. This is important in scenarios where multiple layers of collectors
// have different sampling rates: if they use the same seed all passing one layer may pass the other even if they have
// different sampling rates, configuring different seeds avoids that.
HashSeed uint32 `mapstructure:"hash_seed"`

// Severity is an array of severity and sampling percentage pairs allocating a specific sampling percentage
// to a given severity level.
Severity []severityPair `mapstructure:"severity"`
}

var _ config.Processor = (*Config)(nil)

// Validate checks if the processor configuration is valid
func (cfg *Config) Validate() error {
keys := map[string]bool{}
for _, pair := range cfg.Severity {
if _, ok := severityTextToNum[pair.Level]; !ok {
return fmt.Errorf("unrecognized severity level: %s", pair.Level)
}
if keys[pair.Level] {
return fmt.Errorf("severity already used: %s", pair.Level)
}
keys[pair.Level] = true
}
return nil
}
23 changes: 22 additions & 1 deletion processor/probabilisticsamplerprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,17 @@ func TestLoadConfig(t *testing.T) {
SamplingPercentage: 15.3,
HashSeed: 22,
})

p1 := cfg.Processors[config.NewComponentIDWithName(typeStr, "logs")]
assert.Equal(t,
&Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentIDWithName(typeStr, "logs")),
SamplingPercentage: 15.3,
HashSeed: 22,
Severity: []severityPair{
{Level: "error", SamplingPercentage: 100},
{Level: "warn", SamplingPercentage: 80},
},
}, p1)
}

func TestLoadConfigEmpty(t *testing.T) {
Expand All @@ -59,3 +69,14 @@ func TestLoadConfigEmpty(t *testing.T) {
p0 := cfg.Processors[config.NewComponentID(typeStr)]
assert.Equal(t, p0, createDefaultConfig())
}

func TestLoadInvalidConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
require.NoError(t, err)

factory := NewFactory()
factories.Processors[typeStr] = factory

_, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid.yaml"), factories)
require.ErrorContains(t, err, "severity already used: error")
}
13 changes: 12 additions & 1 deletion processor/probabilisticsamplerprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func NewFactory() component.ProcessorFactory {
return component.NewProcessorFactory(
typeStr,
createDefaultConfig,
component.WithTracesProcessor(createTracesProcessor))
component.WithTracesProcessor(createTracesProcessor),
component.WithLogsProcessor(createLogsProcessor))
}

func createDefaultConfig() config.Processor {
Expand All @@ -50,3 +51,13 @@ func createTracesProcessor(
) (component.TracesProcessor, error) {
return newTracesProcessor(nextConsumer, cfg.(*Config))
}

// createLogsProcessor creates a log processor based on this config.
func createLogsProcessor(
_ context.Context,
_ component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Logs,
) (component.LogsProcessor, error) {
return newLogsProcessor(nextConsumer, cfg.(*Config))
}
8 changes: 8 additions & 0 deletions processor/probabilisticsamplerprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,11 @@ func TestCreateProcessor(t *testing.T) {
assert.NotNil(t, tp)
assert.NoError(t, err, "cannot create trace processor")
}

func TestCreateProcessorLogs(t *testing.T) {
cfg := createDefaultConfig()
set := componenttest.NewNopProcessorCreateSettings()
tp, err := createLogsProcessor(context.Background(), set, cfg, consumertest.NewNop())
assert.NotNil(t, tp)
assert.NoError(t, err, "cannot create logs processor")
}
95 changes: 95 additions & 0 deletions processor/probabilisticsamplerprocessor/logprobabilisticsampler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package probabilisticsamplerprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor"

import (
"context"
"sort"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/processor/processorhelper"
)

type severitySamplingRate struct {
level pdata.SeverityNumber
scaledSamplingRate uint32
}

type logsamplerprocessor struct {
samplingRates []*severitySamplingRate
hashSeed uint32
}

// newLogsProcessor returns a processor.LogsProcessor that will perform head sampling according to the given
// configuration.
func newLogsProcessor(nextConsumer consumer.Logs, cfg *Config) (component.LogsProcessor, error) {

severitySamplingRates := []*severitySamplingRate{
{level: pdata.SeverityNumberUNDEFINED, scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor)},
}
sort.SliceStable(cfg.Severity, func(i, j int) bool {
return severityTextToNum[cfg.Severity[i].Level] < severityTextToNum[cfg.Severity[j].Level]
})
for _, pair := range cfg.Severity {
newRate := &severitySamplingRate{level: severityTextToNum[pair.Level],
scaledSamplingRate: uint32(pair.SamplingPercentage * percentageScaleFactor),
}
severitySamplingRates = append(severitySamplingRates, newRate)
}

lsp := &logsamplerprocessor{
samplingRates: severitySamplingRates,
hashSeed: cfg.HashSeed,
}

return processorhelper.NewLogsProcessor(
cfg,
nextConsumer,
lsp.processLogs,
processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}))
}

func (lsp *logsamplerprocessor) processLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
ld.ResourceLogs().RemoveIf(func(rl pdata.ResourceLogs) bool {
rl.ScopeLogs().RemoveIf(func(ill pdata.ScopeLogs) bool {
ill.LogRecords().RemoveIf(func(l pdata.LogRecord) bool {

// find the correct severity sampling level.
var selectedSamplingRate *severitySamplingRate
for _, ssr := range lsp.samplingRates {
if ssr.level > l.SeverityNumber() {
break
}
selectedSamplingRate = ssr
}

// Create an id for the log record by combining the timestamp and severity text.
lidBytes := []byte(l.Timestamp().String() + l.SeverityText())
sampled := hash(lidBytes[:], lsp.hashSeed)&bitMaskHashBuckets < selectedSamplingRate.scaledSamplingRate
return !sampled
})
// Filter out empty ScopeLogs
return ill.LogRecords().Len() == 0
})
// Filter out empty ResourceLogs
return rl.ScopeLogs().Len() == 0
})
if ld.ResourceLogs().Len() == 0 {
return ld, processorhelper.ErrSkipProcessingData
}
return ld, nil
}
Loading

0 comments on commit e2d5dee

Please sign in to comment.