Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to sample logs #9118

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- `podmanreceiver`: Add API timeout configuration option (#9014)
- `cmd/mdatagen`: Add `sem_conv_version` field to metadata.yaml that is used to set metrics SchemaURL (#9010)
- `splunkheceporter`: Add an option to disable log or profiling data (#9065)
- `probabilistic_sampler`: Add ability to sample logs (#9118)

### 🛑 Breaking changes 🛑

Expand Down
50 changes: 48 additions & 2 deletions processor/probabilisticsamplerprocessor/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Probabilistic Sampling Processor

Supported pipeline types: traces
Supported pipeline types: traces, logs

The probabilistic sampler supports two types of sampling:
The probabilistic sampler supports two types of sampling for traces:

1. `sampling.priority` [semantic
convention](https://github.com/opentracing/specification/blob/master/semantic_conventions.md#span-tags-table)
Expand All @@ -29,5 +29,51 @@ processors:
sampling_percentage: 15.3
```

The probabilistic sampler supports sampling logs according to their trace ID, or by a specific log record attribute.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, not by resource attributes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No?


The probabilistic sampler optionally may use a `hash_seed` to compute the hash of a log record.
This sampler samples based on hash values determined by log records. In order for
log record hashing to work, all collectors for a given tier (e.g. behind the same load balancer)
must have the same `hash_seed`. It is also possible to leverage a different `hash_seed` at
different collector tiers to support additional sampling requirements. Please refer to
[config.go](./config.go) for the config spec.

The following configuration options can be modified:
- `hash_seed` (no default, optional): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed.
- `sampling_percentage` (default = 0, required): Percentage at which logs are sampled; >= 100 samples all logs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does 0 mean here? Nothing is going to be sampled? If it's required, what does it mean to have a default?

- `trace_id_sampling` (default = true, optional): Whether to use the log record trace ID to sample the log record.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens for entries without a trace ID?

- `sampling_source` (default = null, optional): The optional name of a log record attribute used for sampling purposes, such as a unique log record ID. The value of the attribute is only used if the trace ID is absent or if `trace_id_sampling` is set to `false`.
- `sampling_priority` (default = null, optional): The optional name of a log record attribute used to set a different sampling priority from the `sampling_percentage` setting. 0 means to never sample the log record, and >= 100 means to always sample the log record.

Examples:

Sample 15% of the logs:
```yaml
processors:
probabilistic_sampler:
sampling_percentage: 15
```

Sample logs according to their logID attribute:

```yaml
processors:
probabilistic_sampler:
sampling_percentage: 15
trace_id_sampling: false
sampling_source: logID
```

Sample logs according to the attribute `priority`:

```yaml
processors:
probabilistic_sampler:
sampling_percentage: 15
sampling_priority: priority
```


Refer to [config.yaml](./testdata/config.yaml) for detailed
examples on using the processor.

21 changes: 18 additions & 3 deletions processor/probabilisticsamplerprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,41 @@
package probabilisticsamplerprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor"

import (
"fmt"

"go.opentelemetry.io/collector/config"
)

// Config has the configuration guiding the trace sampler processor.
// Config has the configuration guiding the sampler processor.
type Config struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// SamplingPercentage is the percentage rate at which traces are going to be sampled. Defaults to zero, i.e.: no sample.
// Values greater or equal 100 are treated as "sample all traces".
// SamplingPercentage is the percentage rate at which traces or logs are going to be sampled. Defaults to zero, i.e.: no sample.
// Values greater or equal 100 are treated as "sample all traces/logs".
SamplingPercentage float32 `mapstructure:"sampling_percentage"`

// HashSeed allows one to configure the hashing seed. This is important in scenarios where multiple layers of collectors
// have different sampling rates: if they use the same seed all passing one layer may pass the other even if they have
// different sampling rates, configuring different seeds avoids that.
HashSeed uint32 `mapstructure:"hash_seed"`

// TraceIDEnabled (logs only) allows to choose to use to sample by trace id or by a specific log record attribute.
// By default, this option is true.
TraceIDEnabled *bool `mapstructure:"trace_id_sampling"`
// SamplingSource (logs only) allows to use a log record attribute designed by the `sampling_source` key
// to be used to compute the sampling hash of the log record instead of trace id, if trace id is absent or trace id sampling is disabled.
SamplingSource string `mapstructure:"sampling_source"`
// SamplingPriority (logs only) allows to use a log record attribute designed by the `sampling_priority` key
// to be used as the sampling priority of the log record.
SamplingPriority string `mapstructure:"sampling_priority"`
}

var _ config.Processor = (*Config)(nil)

// Validate checks if the processor configuration is valid
func (cfg *Config) Validate() error {
if cfg.SamplingPercentage < 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there conflicting options? For instance, what does it mean to have a sampling source when using this processor in a traces pipeline?

return fmt.Errorf("negative sampling rate: %.2f", cfg.SamplingPercentage)
}
return nil
}
19 changes: 18 additions & 1 deletion processor/probabilisticsamplerprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ func TestLoadConfig(t *testing.T) {
SamplingPercentage: 15.3,
HashSeed: 22,
})

p1 := cfg.Processors[config.NewComponentIDWithName(typeStr, "logs")]
assert.Equal(t,
&Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentIDWithName(typeStr, "logs")),
SamplingPercentage: 15.3,
HashSeed: 22,
}, p1)
}

func TestLoadConfigEmpty(t *testing.T) {
Expand All @@ -59,3 +65,14 @@ func TestLoadConfigEmpty(t *testing.T) {
p0 := cfg.Processors[config.NewComponentID(typeStr)]
assert.Equal(t, p0, createDefaultConfig())
}

func TestLoadInvalidConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
require.NoError(t, err)

factory := NewFactory()
factories.Processors[typeStr] = factory

_, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid.yaml"), factories)
require.ErrorContains(t, err, "negative sampling rate: -15.30")
}
13 changes: 12 additions & 1 deletion processor/probabilisticsamplerprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func NewFactory() component.ProcessorFactory {
return component.NewProcessorFactory(
typeStr,
createDefaultConfig,
component.WithTracesProcessor(createTracesProcessor))
component.WithTracesProcessor(createTracesProcessor),
component.WithLogsProcessor(createLogsProcessor))
}

func createDefaultConfig() config.Processor {
Expand All @@ -50,3 +51,13 @@ func createTracesProcessor(
) (component.TracesProcessor, error) {
return newTracesProcessor(nextConsumer, cfg.(*Config))
}

// createLogsProcessor creates a log processor based on this config.
func createLogsProcessor(
_ context.Context,
_ component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Logs,
) (component.LogsProcessor, error) {
return newLogsProcessor(nextConsumer, cfg.(*Config))
}
8 changes: 8 additions & 0 deletions processor/probabilisticsamplerprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,11 @@ func TestCreateProcessor(t *testing.T) {
assert.NotNil(t, tp)
assert.NoError(t, err, "cannot create trace processor")
}

func TestCreateProcessorLogs(t *testing.T) {
cfg := createDefaultConfig()
set := componenttest.NewNopProcessorCreateSettings()
tp, err := createLogsProcessor(context.Background(), set, cfg, consumertest.NewNop())
assert.NotNil(t, tp)
assert.NoError(t, err, "cannot create logs processor")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package probabilisticsamplerprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/processor/processorhelper"
)

type logsamplerprocessor struct {
scaledSamplingRate uint32
hashSeed uint32
traceIdEnabled bool
samplingSource string
samplingPriority string
}

// newLogsProcessor returns a processor.LogsProcessor that will perform head sampling according to the given
// configuration.
func newLogsProcessor(nextConsumer consumer.Logs, cfg *Config) (component.LogsProcessor, error) {

lsp := &logsamplerprocessor{
scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor),
hashSeed: cfg.HashSeed,
traceIdEnabled: cfg.TraceIDEnabled == nil || *cfg.TraceIDEnabled,
samplingPriority: cfg.SamplingPriority,
samplingSource: cfg.SamplingSource,
}

return processorhelper.NewLogsProcessor(
cfg,
nextConsumer,
lsp.processLogs,
processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}))
}

func (lsp *logsamplerprocessor) processLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
ld.ResourceLogs().RemoveIf(func(rl pdata.ResourceLogs) bool {
rl.ScopeLogs().RemoveIf(func(ill pdata.ScopeLogs) bool {
ill.LogRecords().RemoveIf(func(l pdata.LogRecord) bool {

// pick the sampling source.
var lidBytes []byte
if lsp.traceIdEnabled && !l.TraceID().IsEmpty() {
value := l.TraceID().Bytes()
lidBytes = value[:]
}
if lidBytes == nil && lsp.samplingSource != "" {
if value, ok := l.Attributes().Get(lsp.samplingSource); ok {
lidBytes = value.BytesVal()
}
}
priority := lsp.scaledSamplingRate
if lsp.samplingPriority != "" {
if localPriority, ok := l.Attributes().Get(lsp.samplingPriority); ok {
priority = uint32(localPriority.DoubleVal() * percentageScaleFactor)
}
}

sampled := hash(lidBytes, lsp.hashSeed)&bitMaskHashBuckets < priority
return !sampled
})
// Filter out empty ScopeLogs
return ill.LogRecords().Len() == 0
})
// Filter out empty ResourceLogs
return rl.ScopeLogs().Len() == 0
})
if ld.ResourceLogs().Len() == 0 {
return ld, processorhelper.ErrSkipProcessingData
}
return ld, nil
}
Loading