From 47e2f00efedcbddfffbac31395668bf39b16412b Mon Sep 17 00:00:00 2001 From: Emmanuel Ojeah Date: Mon, 22 Jul 2024 03:48:50 -0700 Subject: [PATCH] Include componentID as prefix in metrics 'policy' This change includes the componentID as a dot prefix to the metrics 'policy' dimension when generating metrics for the processor. The change ensures that similarly named policys in the tail sampling processor that belong to different components also has a unique value. Resolves: #34099 --- processor/tailsamplingprocessor/factory.go | 4 +- processor/tailsamplingprocessor/processor.go | 58 ++++++++++--------- .../processor_benchmarks_test.go | 4 +- .../processor_decisions_test.go | 29 +++------- .../processor_telemetry_test.go | 8 +-- .../tailsamplingprocessor/processor_test.go | 25 ++++---- 6 files changed, 58 insertions(+), 70 deletions(-) diff --git a/processor/tailsamplingprocessor/factory.go b/processor/tailsamplingprocessor/factory.go index a64f27fe289c..1a132c1d0331 100644 --- a/processor/tailsamplingprocessor/factory.go +++ b/processor/tailsamplingprocessor/factory.go @@ -33,10 +33,10 @@ func createDefaultConfig() component.Config { func createTracesProcessor( ctx context.Context, - params processor.Settings, + set processor.Settings, cfg component.Config, nextConsumer consumer.Traces, ) (processor.Traces, error) { tCfg := cfg.(*Config) - return newTracesProcessor(ctx, params.TelemetrySettings, nextConsumer, *tCfg) + return newTracesProcessor(ctx, set, nextConsumer, *tCfg) } diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index b0a58504001d..d99ffb9b6e07 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -45,8 +45,8 @@ type policy struct { type tailSamplingSpanProcessor struct { ctx context.Context - telemetry *metadata.TelemetryBuilder - logger *zap.Logger + telemetryBuilder *metadata.TelemetryBuilder + logger *zap.Logger nextConsumer consumer.Traces maxNumTraces uint64 @@ -83,8 +83,9 @@ type Option func(*tailSamplingSpanProcessor) // newTracesProcessor returns a processor.TracesProcessor that will perform tail sampling according to the given // configuration. -func newTracesProcessor(ctx context.Context, settings component.TelemetrySettings, nextConsumer consumer.Traces, cfg Config, opts ...Option) (processor.Traces, error) { - telemetry, err := metadata.NewTelemetryBuilder(settings) +func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsumer consumer.Traces, cfg Config, opts ...Option) (processor.Traces, error) { + telemetrySettings := set.TelemetrySettings + telemetryBuilder, err := metadata.NewTelemetryBuilder(telemetrySettings) if err != nil { return nil, err } @@ -97,14 +98,14 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting } tsp := &tailSamplingSpanProcessor{ - ctx: ctx, - telemetry: telemetry, - nextConsumer: nextConsumer, - maxNumTraces: cfg.NumTraces, - sampledIDCache: sampledDecisions, - logger: settings.Logger, - numTracesOnMap: &atomic.Uint64{}, - deleteChan: make(chan pcommon.TraceID, cfg.NumTraces), + ctx: ctx, + telemetryBuilder: telemetryBuilder, + nextConsumer: nextConsumer, + maxNumTraces: cfg.NumTraces, + sampledIDCache: sampledDecisions, + logger: telemetrySettings.Logger, + numTracesOnMap: &atomic.Uint64{}, + deleteChan: make(chan pcommon.TraceID, cfg.NumTraces), } tsp.policyTicker = &timeutils.PolicyTicker{OnTickFunc: tsp.samplingPolicyOnTick} @@ -119,6 +120,7 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting if tsp.policies == nil { policyNames := map[string]bool{} tsp.policies = make([]*policy, len(cfg.PolicyCfgs)) + componentID := set.ID.Name() for i := range cfg.PolicyCfgs { policyCfg := &cfg.PolicyCfgs[i] @@ -127,14 +129,18 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting } policyNames[policyCfg.Name] = true - eval, err := getPolicyEvaluator(settings, policyCfg) + eval, err := getPolicyEvaluator(telemetrySettings, policyCfg) if err != nil { return nil, err } + uniquePolicyName := policyCfg.Name + if componentID != "" { + uniquePolicyName = fmt.Sprintf("%s.%s", componentID, policyCfg.Name) + } p := &policy{ name: policyCfg.Name, evaluator: eval, - attribute: metric.WithAttributes(attribute.String("policy", policyCfg.Name)), + attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)), } tsp.policies[i] = p } @@ -256,11 +262,11 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { trace.DecisionTime = time.Now() decision := tsp.makeDecision(id, trace, &metrics) - tsp.telemetry.ProcessorTailSamplingSamplingDecisionTimerLatency.Record(tsp.ctx, int64(time.Since(startTime)/time.Microsecond)) - tsp.telemetry.ProcessorTailSamplingSamplingTraceDroppedTooEarly.Add(tsp.ctx, metrics.idNotFoundOnMapCount) - tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount) - tsp.telemetry.ProcessorTailSamplingSamplingTracesOnMemory.Record(tsp.ctx, int64(tsp.numTracesOnMap.Load())) - tsp.telemetry.ProcessorTailSamplingGlobalCountTracesSampled.Add(tsp.ctx, 1, decisionToAttribute[decision]) + tsp.telemetryBuilder.ProcessorTailSamplingSamplingDecisionTimerLatency.Record(tsp.ctx, int64(time.Since(startTime)/time.Microsecond)) + tsp.telemetryBuilder.ProcessorTailSamplingSamplingTraceDroppedTooEarly.Add(tsp.ctx, metrics.idNotFoundOnMapCount) + tsp.telemetryBuilder.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount) + tsp.telemetryBuilder.ProcessorTailSamplingSamplingTracesOnMemory.Record(tsp.ctx, int64(tsp.numTracesOnMap.Load())) + tsp.telemetryBuilder.ProcessorTailSamplingGlobalCountTracesSampled.Add(tsp.ctx, 1, decisionToAttribute[decision]) // Sampled or not, remove the batches trace.Lock() @@ -298,15 +304,15 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa for _, p := range tsp.policies { policyEvaluateStartTime := time.Now() decision, err := p.evaluator.Evaluate(ctx, id, trace) - tsp.telemetry.ProcessorTailSamplingSamplingDecisionLatency.Record(ctx, int64(time.Since(policyEvaluateStartTime)/time.Microsecond), p.attribute) + tsp.telemetryBuilder.ProcessorTailSamplingSamplingDecisionLatency.Record(ctx, int64(time.Since(policyEvaluateStartTime)/time.Microsecond), p.attribute) if err != nil { samplingDecision[sampling.Error] = true metrics.evaluateErrorCount++ tsp.logger.Debug("Sampling policy error", zap.Error(err)) } else { - tsp.telemetry.ProcessorTailSamplingCountTracesSampled.Add(ctx, 1, p.attribute, decisionToAttribute[decision]) + tsp.telemetryBuilder.ProcessorTailSamplingCountTracesSampled.Add(ctx, 1, p.attribute, decisionToAttribute[decision]) if telemetry.IsMetricStatCountSpansSampledEnabled() { - tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(ctx, trace.SpanCount.Load(), p.attribute, decisionToAttribute[decision]) + tsp.telemetryBuilder.ProcessorTailSamplingCountSpansSampled.Add(ctx, trace.SpanCount.Load(), p.attribute, decisionToAttribute[decision]) } samplingDecision[decision] = true @@ -365,7 +371,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc traceTd := ptrace.NewTraces() appendToTraces(traceTd, resourceSpans, spans) tsp.releaseSampledTrace(tsp.ctx, id, traceTd) - tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision.Add(tsp.ctx, int64(len(spans))) + tsp.telemetryBuilder.ProcessorTailSamplingEarlyReleasesFromCacheDecision.Add(tsp.ctx, int64(len(spans))) continue } @@ -423,7 +429,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc appendToTraces(traceTd, resourceSpans, spans) tsp.releaseSampledTrace(tsp.ctx, id, traceTd) case sampling.NotSampled: - tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second)) + tsp.telemetryBuilder.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second)) default: tsp.logger.Warn("Encountered unexpected sampling decision", zap.Int("decision", int(finalDecision))) @@ -431,7 +437,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc } } - tsp.telemetry.ProcessorTailSamplingNewTraceIDReceived.Add(tsp.ctx, newTraceIDs) + tsp.telemetryBuilder.ProcessorTailSamplingNewTraceIDReceived.Add(tsp.ctx, newTraceIDs) } func (tsp *tailSamplingSpanProcessor) Capabilities() consumer.Capabilities { @@ -464,7 +470,7 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID pcommon.TraceID, deletio return } - tsp.telemetry.ProcessorTailSamplingSamplingTraceRemovalAge.Record(tsp.ctx, int64(deletionTime.Sub(trace.ArrivalTime)/time.Second)) + tsp.telemetryBuilder.ProcessorTailSamplingSamplingTraceRemovalAge.Record(tsp.ctx, int64(deletionTime.Sub(trace.ArrivalTime)/time.Second)) } // releaseSampledTrace sends the trace data to the next consumer. diff --git a/processor/tailsamplingprocessor/processor_benchmarks_test.go b/processor/tailsamplingprocessor/processor_benchmarks_test.go index c7dc26a48060..5a673228cfa7 100644 --- a/processor/tailsamplingprocessor/processor_benchmarks_test.go +++ b/processor/tailsamplingprocessor/processor_benchmarks_test.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/processortest" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" ) @@ -24,8 +25,7 @@ func BenchmarkSampling(b *testing.B) { ExpectedNewTracesPerSec: 64, PolicyCfgs: testPolicy, } - - sp, _ := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg) + sp, _ := newTracesProcessor(context.Background(), processortest.NewNopSettings(), consumertest.NewNop(), cfg) tsp := sp.(*tailSamplingSpanProcessor) require.NoError(b, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index 5c232b8dd3a4..42c28dc9151f 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/processortest" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -25,8 +26,6 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := setupTestTelemetry() - ct := s.NewSettings().TelemetrySettings idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -35,7 +34,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, } - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -70,8 +69,6 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := setupTestTelemetry() - ct := s.NewSettings().TelemetrySettings idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -80,7 +77,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, } - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -115,8 +112,6 @@ func TestSamplingMultiplePolicies(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := setupTestTelemetry() - ct := s.NewSettings().TelemetrySettings idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -127,7 +122,7 @@ func TestSamplingMultiplePolicies(t *testing.T) { {name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))}, } - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -166,8 +161,6 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := setupTestTelemetry() - ct := s.NewSettings().TelemetrySettings idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -176,7 +169,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, } - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -212,8 +205,6 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := setupTestTelemetry() - ct := s.NewSettings().TelemetrySettings idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -224,7 +215,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { {name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))}, } - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -263,8 +254,6 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := setupTestTelemetry() - ct := s.NewSettings().TelemetrySettings idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -275,7 +264,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { {name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))}, } - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -333,8 +322,6 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := setupTestTelemetry() - ct := s.NewSettings().TelemetrySettings idb := newSyncIDBatcher() mpe := &mockPolicyEvaluator{} @@ -345,7 +332,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { // Use this instead of the default no-op cache c, err := cache.NewLRUDecisionCache[bool](200) require.NoError(t, err) - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withSampledDecisionCache(c)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withSampledDecisionCache(c)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/tailsamplingprocessor/processor_telemetry_test.go b/processor/tailsamplingprocessor/processor_telemetry_test.go index 8cee92b2d028..9db20ca604bc 100644 --- a/processor/tailsamplingprocessor/processor_telemetry_test.go +++ b/processor/tailsamplingprocessor/processor_telemetry_test.go @@ -37,7 +37,7 @@ func TestMetricsAfterOneEvaluation(t *testing.T) { }, } cs := &consumertest.TracesSink{} - ct := s.NewSettings().TelemetrySettings + ct := s.NewSettings() proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher)) require.NoError(t, err) defer func() { @@ -238,7 +238,7 @@ func TestProcessorTailSamplingCountSpansSampled(t *testing.T) { }, } cs := &consumertest.TracesSink{} - ct := s.NewSettings().TelemetrySettings + ct := s.NewSettings() proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher)) require.NoError(t, err) defer func() { @@ -303,7 +303,7 @@ func TestProcessorTailSamplingSamplingTraceRemovalAge(t *testing.T) { }, } cs := &consumertest.TracesSink{} - ct := s.NewSettings().TelemetrySettings + ct := s.NewSettings() proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher)) require.NoError(t, err) defer func() { @@ -364,7 +364,7 @@ func TestProcessorTailSamplingSamplingLateSpanAge(t *testing.T) { }, } cs := &consumertest.TracesSink{} - ct := s.NewSettings().TelemetrySettings + ct := s.NewSettings() proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher)) require.NoError(t, err) defer func() { diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index 0f68b2a0b9b6..fc1b1e8ecc30 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/processortest" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" @@ -120,8 +121,6 @@ func TestTraceIntegrity(t *testing.T) { NumTraces: defaultNumTraces, } nextConsumer := new(consumertest.TracesSink) - s := setupTestTelemetry() - ct := s.NewSettings().TelemetrySettings idb := newSyncIDBatcher() mpe1 := &mockPolicyEvaluator{} @@ -130,7 +129,7 @@ func TestTraceIntegrity(t *testing.T) { {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, } - p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -182,7 +181,7 @@ func TestSequentialTraceArrival(t *testing.T) { ExpectedNewTracesPerSec: 64, PolicyCfgs: testPolicy, } - sp, err := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg, withTickerFrequency(time.Millisecond)) + sp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), consumertest.NewNop(), cfg, withTickerFrequency(time.Millisecond)) require.NoError(t, err) err = sp.Start(context.Background(), componenttest.NewNopHost()) @@ -215,7 +214,7 @@ func TestConcurrentTraceArrival(t *testing.T) { ExpectedNewTracesPerSec: 64, PolicyCfgs: testPolicy, } - sp, err := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg, withTickerFrequency(time.Millisecond)) + sp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), consumertest.NewNop(), cfg, withTickerFrequency(time.Millisecond)) require.NoError(t, err) err = sp.Start(context.Background(), componenttest.NewNopHost()) @@ -269,7 +268,7 @@ func TestConcurrentArrivalAndEvaluation(t *testing.T) { ExpectedNewTracesPerSec: 64, PolicyCfgs: testLatencyPolicy, } - sp, err := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg, withTickerFrequency(time.Millisecond)) + sp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), consumertest.NewNop(), cfg, withTickerFrequency(time.Millisecond)) require.NoError(t, err) err = sp.Start(context.Background(), componenttest.NewNopHost()) @@ -313,7 +312,7 @@ func TestSequentialTraceMapSize(t *testing.T) { ExpectedNewTracesPerSec: 64, PolicyCfgs: testPolicy, } - sp, err := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg, withTickerFrequency(100*time.Millisecond)) + sp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), consumertest.NewNop(), cfg, withTickerFrequency(100*time.Millisecond)) require.NoError(t, err) err = sp.Start(context.Background(), componenttest.NewNopHost()) @@ -347,7 +346,7 @@ func TestConcurrentTraceMapSize(t *testing.T) { ExpectedNewTracesPerSec: 64, PolicyCfgs: testPolicy, } - sp, _ := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg, withTickerFrequency(100*time.Millisecond)) + sp, _ := newTracesProcessor(context.Background(), processortest.NewNopSettings(), consumertest.NewNop(), cfg, withTickerFrequency(100*time.Millisecond)) require.NoError(t, sp.Start(context.Background(), componenttest.NewNopHost())) defer func() { require.NoError(t, sp.Shutdown(context.Background())) @@ -387,12 +386,10 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) { }, }, } - s := setupTestTelemetry() - ct := s.NewSettings().TelemetrySettings idb := newSyncIDBatcher() msp := new(consumertest.TracesSink) - p, err := newTracesProcessor(context.Background(), ct, msp, cfg, withDecisionBatcher(idb)) + p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), msp, cfg, withDecisionBatcher(idb)) require.NoError(t, err) require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) @@ -449,8 +446,7 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) { func TestSubSecondDecisionTime(t *testing.T) { // prepare msp := new(consumertest.TracesSink) - - tsp, err := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), msp, Config{ + tsp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), msp, Config{ DecisionWait: 500 * time.Millisecond, NumTraces: defaultNumTraces, PolicyCfgs: testPolicy, @@ -501,7 +497,6 @@ func TestPolicyLoggerAddsPolicyName(t *testing.T) { func TestDuplicatePolicyName(t *testing.T) { // prepare - set := componenttest.NewNopTelemetrySettings() msp := new(consumertest.TracesSink) alwaysSample := sharedPolicyCfg{ @@ -509,7 +504,7 @@ func TestDuplicatePolicyName(t *testing.T) { Type: AlwaysSample, } - _, err := newTracesProcessor(context.Background(), set, msp, Config{ + _, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), msp, Config{ DecisionWait: defaultTestDecisionWait, NumTraces: defaultNumTraces, PolicyCfgs: []PolicyCfg{