Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/tailsampling] Include componentID as prefix in metrics 'policy' #34192

27 changes: 27 additions & 0 deletions .chloggen/bug_unique-policy-name-tail-sampling-processor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'bug_fix'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: tailsamplingprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Update the `policy` value in metrics dimension value to be unique across multiple tail sampling components with the same policy name."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34192]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: "This change ensures that the `policy` value in the metrics exported by the tail sampling processor is unique across multiple tail sampling processors with the same policy name."

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion processor/tailsamplingprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ func createTracesProcessor(
nextConsumer consumer.Traces,
) (processor.Traces, error) {
tCfg := cfg.(*Config)
return newTracesProcessor(ctx, params.TelemetrySettings, nextConsumer, *tCfg)
return newTracesProcessor(ctx, params, nextConsumer, *tCfg)
}
16 changes: 11 additions & 5 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ type Option func(*tailSamplingSpanProcessor)

// newTracesProcessor returns a processor.TracesProcessor that will perform tail sampling according to the given
// configuration.
func newTracesProcessor(ctx context.Context, settings component.TelemetrySettings, nextConsumer consumer.Traces, cfg Config, opts ...Option) (processor.Traces, error) {
telemetry, err := metadata.NewTelemetryBuilder(settings)
func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsumer consumer.Traces, cfg Config, opts ...Option) (processor.Traces, error) {
telemetrySettings := set.TelemetrySettings
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good reason for renaming component.TelemetrySettings from settings to telemetrySettings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was only to differentiate between the new argument processor.Settings (settings) from initial argument which was processor.Settings.TelemetrySettings (telemetrySettings)

Do i change it to below instead?

func newTracesProcessor(ctx context.Context, processorSettings processor.Settings, ...) (processor.Traces, error) {
  settings := processorSettings.TelemetrySettings
  telemetry, err := metadata.NewTelemetryBuilder(settings)
  if err != nil {
    return nil, err
  }
  ...
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's okay. This is small enough to not be a problem.

telemetry, err := metadata.NewTelemetryBuilder(telemetrySettings)
if err != nil {
return nil, err
}
Expand All @@ -102,7 +103,7 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
nextConsumer: nextConsumer,
maxNumTraces: cfg.NumTraces,
sampledIDCache: sampledDecisions,
logger: settings.Logger,
logger: telemetrySettings.Logger,
numTracesOnMap: &atomic.Uint64{},
deleteChan: make(chan pcommon.TraceID, cfg.NumTraces),
}
Expand All @@ -119,6 +120,7 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
if tsp.policies == nil {
policyNames := map[string]bool{}
tsp.policies = make([]*policy, len(cfg.PolicyCfgs))
componentID := set.ID.Name()
for i := range cfg.PolicyCfgs {
policyCfg := &cfg.PolicyCfgs[i]

Expand All @@ -127,14 +129,18 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
}
policyNames[policyCfg.Name] = true

eval, err := getPolicyEvaluator(settings, policyCfg)
eval, err := getPolicyEvaluator(telemetrySettings, policyCfg)
if err != nil {
return nil, err
}
uniquePolicyName := policyCfg.Name
if componentID != "" {
uniquePolicyName = fmt.Sprintf("%s.%s", componentID, policyCfg.Name)
}
p := &policy{
name: policyCfg.Name,
evaluator: eval,
attribute: metric.WithAttributes(attribute.String("policy", policyCfg.Name)),
attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)),
}
tsp.policies[i] = p
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processortest"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
)
Expand All @@ -24,8 +25,7 @@ func BenchmarkSampling(b *testing.B) {
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testPolicy,
}

sp, _ := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg)
sp, _ := newTracesProcessor(context.Background(), processortest.NewNopSettings(), consumertest.NewNop(), cfg)
tsp := sp.(*tailSamplingSpanProcessor)
require.NoError(b, tsp.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
Expand Down
14 changes: 7 additions & 7 deletions processor/tailsamplingprocessor/processor_decisions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) {
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestSamplingMultiplePolicies(t *testing.T) {
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe := &mockPolicyEvaluator{}
Expand Down
105 changes: 101 additions & 4 deletions processor/tailsamplingprocessor/processor_telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/featuregate"
Expand Down Expand Up @@ -37,7 +38,7 @@ func TestMetricsAfterOneEvaluation(t *testing.T) {
},
}
cs := &consumertest.TracesSink{}
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
require.NoError(t, err)
defer func() {
Expand Down Expand Up @@ -211,6 +212,102 @@ func TestMetricsAfterOneEvaluation(t *testing.T) {
assert.Len(t, cs.AllTraces(), 1)
}

func TestMetricsWithComponentID(t *testing.T) {
// prepare
s := setupTestTelemetry()
b := newSyncIDBatcher()
syncBatcher := b.(*syncIDBatcher)

cfg := Config{
DecisionWait: 1,
NumTraces: 100,
PolicyCfgs: []PolicyCfg{
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "always",
Type: AlwaysSample,
},
},
},
}
cs := &consumertest.TracesSink{}
ct := s.NewSettings()
ct.ID = component.MustNewIDWithName("tail_sampling", "unique_id") // e.g tail_sampling/unique_id
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
require.NoError(t, err)
defer func() {
err = proc.Shutdown(context.Background())
require.NoError(t, err)
}()

err = proc.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

// test
err = proc.ConsumeTraces(context.Background(), simpleTraces())
require.NoError(t, err)

tsp := proc.(*tailSamplingSpanProcessor)
tsp.policyTicker.OnTick() // the first tick always gets an empty batch
tsp.policyTicker.OnTick()

// verify
var md metricdata.ResourceMetrics
require.NoError(t, s.reader.Collect(context.Background(), &md))
require.Equal(t, 8, s.len(md))

for _, tt := range []struct {
opts []metricdatatest.Option
m metricdata.Metrics
}{
{
opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()},
m: metricdata.Metrics{
Name: "otelcol_processor_tail_sampling_count_traces_sampled",
Description: "Count of traces that were sampled or not per sampling policy",
Unit: "{traces}",
Data: metricdata.Sum[int64]{
IsMonotonic: true,
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("policy", "unique_id.always"),
attribute.String("sampled", "true"),
),
Value: 1,
},
},
},
},
},
{
opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()},
m: metricdata.Metrics{
Name: "otelcol_processor_tail_sampling_sampling_decision_latency",
Description: "Latency (in microseconds) of a given sampling policy",
Unit: "µs",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("policy", "unique_id.always"),
),
},
},
},
},
},
} {
got := s.getMetric(tt.m.Name, md)
metricdatatest.AssertEqual(t, tt.m, got, tt.opts...)
}

// sanity check
assert.Len(t, cs.AllTraces(), 1)
}

func TestProcessorTailSamplingCountSpansSampled(t *testing.T) {
err := featuregate.GlobalRegistry().Set("processor.tailsamplingprocessor.metricstatcountspanssampled", true)
require.NoError(t, err)
Expand Down Expand Up @@ -238,7 +335,7 @@ func TestProcessorTailSamplingCountSpansSampled(t *testing.T) {
},
}
cs := &consumertest.TracesSink{}
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
require.NoError(t, err)
defer func() {
Expand Down Expand Up @@ -303,7 +400,7 @@ func TestProcessorTailSamplingSamplingTraceRemovalAge(t *testing.T) {
},
}
cs := &consumertest.TracesSink{}
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
require.NoError(t, err)
defer func() {
Expand Down Expand Up @@ -364,7 +461,7 @@ func TestProcessorTailSamplingSamplingLateSpanAge(t *testing.T) {
},
}
cs := &consumertest.TracesSink{}
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
require.NoError(t, err)
defer func() {
Expand Down
Loading
Loading