Skip to content

Commit

Permalink
Include componentID as prefix in metrics 'policy'
Browse files Browse the repository at this point in the history
This change includes the componentID as a dot prefix to the metrics
'policy' dimension when generating metrics for the processor. The change
ensures that similarly named policys in the tail sampling processor that
belong to different components also has a unique value.

Resolves: open-telemetry#34099
  • Loading branch information
EOjeah committed Jul 22, 2024
1 parent fbdcea4 commit 47e2f00
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 70 deletions.
4 changes: 2 additions & 2 deletions processor/tailsamplingprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func createDefaultConfig() component.Config {

func createTracesProcessor(
ctx context.Context,
params processor.Settings,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Traces,
) (processor.Traces, error) {
tCfg := cfg.(*Config)
return newTracesProcessor(ctx, params.TelemetrySettings, nextConsumer, *tCfg)
return newTracesProcessor(ctx, set, nextConsumer, *tCfg)
}
58 changes: 32 additions & 26 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type policy struct {
type tailSamplingSpanProcessor struct {
ctx context.Context

telemetry *metadata.TelemetryBuilder
logger *zap.Logger
telemetryBuilder *metadata.TelemetryBuilder
logger *zap.Logger

nextConsumer consumer.Traces
maxNumTraces uint64
Expand Down Expand Up @@ -83,8 +83,9 @@ type Option func(*tailSamplingSpanProcessor)

// newTracesProcessor returns a processor.TracesProcessor that will perform tail sampling according to the given
// configuration.
func newTracesProcessor(ctx context.Context, settings component.TelemetrySettings, nextConsumer consumer.Traces, cfg Config, opts ...Option) (processor.Traces, error) {
telemetry, err := metadata.NewTelemetryBuilder(settings)
func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsumer consumer.Traces, cfg Config, opts ...Option) (processor.Traces, error) {
telemetrySettings := set.TelemetrySettings
telemetryBuilder, err := metadata.NewTelemetryBuilder(telemetrySettings)
if err != nil {
return nil, err
}
Expand All @@ -97,14 +98,14 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
}

tsp := &tailSamplingSpanProcessor{
ctx: ctx,
telemetry: telemetry,
nextConsumer: nextConsumer,
maxNumTraces: cfg.NumTraces,
sampledIDCache: sampledDecisions,
logger: settings.Logger,
numTracesOnMap: &atomic.Uint64{},
deleteChan: make(chan pcommon.TraceID, cfg.NumTraces),
ctx: ctx,
telemetryBuilder: telemetryBuilder,
nextConsumer: nextConsumer,
maxNumTraces: cfg.NumTraces,
sampledIDCache: sampledDecisions,
logger: telemetrySettings.Logger,
numTracesOnMap: &atomic.Uint64{},
deleteChan: make(chan pcommon.TraceID, cfg.NumTraces),
}
tsp.policyTicker = &timeutils.PolicyTicker{OnTickFunc: tsp.samplingPolicyOnTick}

Expand All @@ -119,6 +120,7 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
if tsp.policies == nil {
policyNames := map[string]bool{}
tsp.policies = make([]*policy, len(cfg.PolicyCfgs))
componentID := set.ID.Name()
for i := range cfg.PolicyCfgs {
policyCfg := &cfg.PolicyCfgs[i]

Expand All @@ -127,14 +129,18 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
}
policyNames[policyCfg.Name] = true

eval, err := getPolicyEvaluator(settings, policyCfg)
eval, err := getPolicyEvaluator(telemetrySettings, policyCfg)
if err != nil {
return nil, err
}
uniquePolicyName := policyCfg.Name
if componentID != "" {
uniquePolicyName = fmt.Sprintf("%s.%s", componentID, policyCfg.Name)
}
p := &policy{
name: policyCfg.Name,
evaluator: eval,
attribute: metric.WithAttributes(attribute.String("policy", policyCfg.Name)),
attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)),
}
tsp.policies[i] = p
}
Expand Down Expand Up @@ -256,11 +262,11 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
trace.DecisionTime = time.Now()

decision := tsp.makeDecision(id, trace, &metrics)
tsp.telemetry.ProcessorTailSamplingSamplingDecisionTimerLatency.Record(tsp.ctx, int64(time.Since(startTime)/time.Microsecond))
tsp.telemetry.ProcessorTailSamplingSamplingTraceDroppedTooEarly.Add(tsp.ctx, metrics.idNotFoundOnMapCount)
tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount)
tsp.telemetry.ProcessorTailSamplingSamplingTracesOnMemory.Record(tsp.ctx, int64(tsp.numTracesOnMap.Load()))
tsp.telemetry.ProcessorTailSamplingGlobalCountTracesSampled.Add(tsp.ctx, 1, decisionToAttribute[decision])
tsp.telemetryBuilder.ProcessorTailSamplingSamplingDecisionTimerLatency.Record(tsp.ctx, int64(time.Since(startTime)/time.Microsecond))
tsp.telemetryBuilder.ProcessorTailSamplingSamplingTraceDroppedTooEarly.Add(tsp.ctx, metrics.idNotFoundOnMapCount)
tsp.telemetryBuilder.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount)
tsp.telemetryBuilder.ProcessorTailSamplingSamplingTracesOnMemory.Record(tsp.ctx, int64(tsp.numTracesOnMap.Load()))
tsp.telemetryBuilder.ProcessorTailSamplingGlobalCountTracesSampled.Add(tsp.ctx, 1, decisionToAttribute[decision])

// Sampled or not, remove the batches
trace.Lock()
Expand Down Expand Up @@ -298,15 +304,15 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa
for _, p := range tsp.policies {
policyEvaluateStartTime := time.Now()
decision, err := p.evaluator.Evaluate(ctx, id, trace)
tsp.telemetry.ProcessorTailSamplingSamplingDecisionLatency.Record(ctx, int64(time.Since(policyEvaluateStartTime)/time.Microsecond), p.attribute)
tsp.telemetryBuilder.ProcessorTailSamplingSamplingDecisionLatency.Record(ctx, int64(time.Since(policyEvaluateStartTime)/time.Microsecond), p.attribute)
if err != nil {
samplingDecision[sampling.Error] = true
metrics.evaluateErrorCount++
tsp.logger.Debug("Sampling policy error", zap.Error(err))
} else {
tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(ctx, 1, p.attribute, decisionToAttribute[decision])
tsp.telemetryBuilder.ProcessorTailSamplingCountTracesSampled.Add(ctx, 1, p.attribute, decisionToAttribute[decision])
if telemetry.IsMetricStatCountSpansSampledEnabled() {
tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(ctx, trace.SpanCount.Load(), p.attribute, decisionToAttribute[decision])
tsp.telemetryBuilder.ProcessorTailSamplingCountSpansSampled.Add(ctx, trace.SpanCount.Load(), p.attribute, decisionToAttribute[decision])
}

samplingDecision[decision] = true
Expand Down Expand Up @@ -365,7 +371,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc
traceTd := ptrace.NewTraces()
appendToTraces(traceTd, resourceSpans, spans)
tsp.releaseSampledTrace(tsp.ctx, id, traceTd)
tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision.Add(tsp.ctx, int64(len(spans)))
tsp.telemetryBuilder.ProcessorTailSamplingEarlyReleasesFromCacheDecision.Add(tsp.ctx, int64(len(spans)))
continue
}

Expand Down Expand Up @@ -423,15 +429,15 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc
appendToTraces(traceTd, resourceSpans, spans)
tsp.releaseSampledTrace(tsp.ctx, id, traceTd)
case sampling.NotSampled:
tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second))
tsp.telemetryBuilder.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second))
default:
tsp.logger.Warn("Encountered unexpected sampling decision",
zap.Int("decision", int(finalDecision)))
}
}
}

tsp.telemetry.ProcessorTailSamplingNewTraceIDReceived.Add(tsp.ctx, newTraceIDs)
tsp.telemetryBuilder.ProcessorTailSamplingNewTraceIDReceived.Add(tsp.ctx, newTraceIDs)
}

func (tsp *tailSamplingSpanProcessor) Capabilities() consumer.Capabilities {
Expand Down Expand Up @@ -464,7 +470,7 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID pcommon.TraceID, deletio
return
}

tsp.telemetry.ProcessorTailSamplingSamplingTraceRemovalAge.Record(tsp.ctx, int64(deletionTime.Sub(trace.ArrivalTime)/time.Second))
tsp.telemetryBuilder.ProcessorTailSamplingSamplingTraceRemovalAge.Record(tsp.ctx, int64(deletionTime.Sub(trace.ArrivalTime)/time.Second))
}

// releaseSampledTrace sends the trace data to the next consumer.
Expand Down
4 changes: 2 additions & 2 deletions processor/tailsamplingprocessor/processor_benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processortest"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
)
Expand All @@ -24,8 +25,7 @@ func BenchmarkSampling(b *testing.B) {
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testPolicy,
}

sp, _ := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg)
sp, _ := newTracesProcessor(context.Background(), processortest.NewNopSettings(), consumertest.NewNop(), cfg)
tsp := sp.(*tailSamplingSpanProcessor)
require.NoError(b, tsp.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
Expand Down
29 changes: 8 additions & 21 deletions processor/tailsamplingprocessor/processor_decisions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

Expand All @@ -25,8 +26,6 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand All @@ -35,7 +34,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {
{name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))},
}

p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -70,8 +69,6 @@ func TestSamplingPolicyInvertSampled(t *testing.T) {
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand All @@ -80,7 +77,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) {
{name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))},
}

p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -115,8 +112,6 @@ func TestSamplingMultiplePolicies(t *testing.T) {
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand All @@ -127,7 +122,7 @@ func TestSamplingMultiplePolicies(t *testing.T) {
{name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))},
}

p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -166,8 +161,6 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand All @@ -176,7 +169,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
{name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))},
}

p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -212,8 +205,6 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand All @@ -224,7 +215,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {
{name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))},
}

p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -263,8 +254,6 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
Expand All @@ -275,7 +264,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
{name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))},
}

p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -333,8 +322,6 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
idb := newSyncIDBatcher()

mpe := &mockPolicyEvaluator{}
Expand All @@ -345,7 +332,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
// Use this instead of the default no-op cache
c, err := cache.NewLRUDecisionCache[bool](200)
require.NoError(t, err)
p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withSampledDecisionCache(c))
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withSampledDecisionCache(c))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down
8 changes: 4 additions & 4 deletions processor/tailsamplingprocessor/processor_telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestMetricsAfterOneEvaluation(t *testing.T) {
},
}
cs := &consumertest.TracesSink{}
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
require.NoError(t, err)
defer func() {
Expand Down Expand Up @@ -238,7 +238,7 @@ func TestProcessorTailSamplingCountSpansSampled(t *testing.T) {
},
}
cs := &consumertest.TracesSink{}
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
require.NoError(t, err)
defer func() {
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestProcessorTailSamplingSamplingTraceRemovalAge(t *testing.T) {
},
}
cs := &consumertest.TracesSink{}
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
require.NoError(t, err)
defer func() {
Expand Down Expand Up @@ -364,7 +364,7 @@ func TestProcessorTailSamplingSamplingLateSpanAge(t *testing.T) {
},
}
cs := &consumertest.TracesSink{}
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
require.NoError(t, err)
defer func() {
Expand Down
Loading

0 comments on commit 47e2f00

Please sign in to comment.