Skip to content

Commit

Permalink
[processor/tailsampling] Duplicate policy names should yield an error (
Browse files Browse the repository at this point in the history
…#27017)

Fixes #27016

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>

---------

Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
Co-authored-by: Curtis Robert <92119472+crobert-1@users.noreply.github.com>
  • Loading branch information
jpkrohling and crobert-1 authored Sep 25, 2023
1 parent b7ea607 commit c6c14b2
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 6 deletions.
27 changes: 27 additions & 0 deletions .chloggen/27016-duplicate-policy-names.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'bug_fix'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/tailsampling

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Prevent the tail-sampling processor from accepting duplicate policy names

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27016]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
21 changes: 15 additions & 6 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,16 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
return nil, component.ErrNilNextConsumer
}

numDecisionBatches := math.Max(1, cfg.DecisionWait.Seconds())
inBatcher, err := idbatcher.New(uint64(numDecisionBatches), cfg.ExpectedNewTracesPerSec, uint64(2*runtime.NumCPU()))
if err != nil {
return nil, err
}

policyNames := map[string]bool{}
policies := make([]*policy, len(cfg.PolicyCfgs))
for i := range cfg.PolicyCfgs {
policyCfg := &cfg.PolicyCfgs[i]

if policyNames[policyCfg.Name] {
return nil, fmt.Errorf("duplicate policy name %q", policyCfg.Name)
}
policyNames[policyCfg.Name] = true

policyCtx, err := tag.New(ctx, tag.Upsert(tagPolicyKey, policyCfg.Name), tag.Upsert(tagSourceFormat, sourceFormat))
if err != nil {
return nil, err
Expand All @@ -97,6 +98,14 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
policies[i] = p
}

// this will start a goroutine in the background, so we run it only if everything went
// well in creating the policies
numDecisionBatches := math.Max(1, cfg.DecisionWait.Seconds())
inBatcher, err := idbatcher.New(uint64(numDecisionBatches), cfg.ExpectedNewTracesPerSec, uint64(2*runtime.NumCPU()))
if err != nil {
return nil, err
}

tsp := &tailSamplingSpanProcessor{
ctx: ctx,
nextConsumer: nextConsumer,
Expand Down
23 changes: 23 additions & 0 deletions processor/tailsamplingprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,29 @@ func TestPolicyLoggerAddsPolicyName(t *testing.T) {
assert.Equal(t, AlwaysSample, logs.All()[0].ContextMap()["policy"])
}

func TestDuplicatePolicyName(t *testing.T) {
// prepare
set := componenttest.NewNopTelemetrySettings()
msp := new(consumertest.TracesSink)

alwaysSample := sharedPolicyCfg{
Name: "always_sample",
Type: AlwaysSample,
}

_, err := newTracesProcessor(context.Background(), set, msp, Config{
DecisionWait: 500 * time.Millisecond,
NumTraces: uint64(50000),
PolicyCfgs: []PolicyCfg{
{sharedPolicyCfg: alwaysSample},
{sharedPolicyCfg: alwaysSample},
},
})

// verify
assert.Equal(t, err, errors.New(`duplicate policy name "always_sample"`))
}

func collectSpanIds(trace ptrace.Traces) []pcommon.SpanID {
var spanIDs []pcommon.SpanID

Expand Down

0 comments on commit c6c14b2

Please sign in to comment.