Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store sampling.probability in sampled span attributes #2215

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ The following configuration options can be modified:
- `hash_seed` (no default): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed.
- `sampling_percentage` (default = 0): Percentage at which traces are sampled; >= 100 samples all traces

The sampled spans have [`sampling.probability`](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/sdk.md#sampling)
attribute added, which includes the value in range of `(0, 1.0]` representing the probability with which the record
was sampled. If the span was already sampled before and the attribute is present, the existing value is multiplied.

Examples:

```yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
)

// samplingPriority has the semantic result of parsing the "sampling.priority"
Expand Down Expand Up @@ -49,9 +50,10 @@ const (
)

type tracesamplerprocessor struct {
nextConsumer consumer.TracesConsumer
scaledSamplingRate uint32
hashSeed uint32
nextConsumer consumer.TracesConsumer
scaledSamplingRate uint32
samplingProbability float64
hashSeed uint32
}

// newTraceProcessor returns a processor.TracesProcessor that will perform head sampling according to the given
Expand All @@ -64,8 +66,9 @@ func newTraceProcessor(nextConsumer consumer.TracesConsumer, cfg Config) (compon
return &tracesamplerprocessor{
nextConsumer: nextConsumer,
// Adjust sampling percentage on private so recalculations are avoided.
scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor),
hashSeed: cfg.HashSeed,
scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor),
samplingProbability: float64(cfg.SamplingPercentage) * 0.01,
hashSeed: cfg.HashSeed,
}, nil
}

Expand All @@ -82,6 +85,16 @@ func (tsp *tracesamplerprocessor) ConsumeTraces(ctx context.Context, td pdata.Tr
return tsp.nextConsumer.ConsumeTraces(ctx, sampledTraceData)
}

func (tsp *tracesamplerprocessor) updateSamplingProbability(sampledSpanAttributes pdata.AttributeMap) {
samplingProbability := tsp.samplingProbability
attr, found := sampledSpanAttributes.Get(conventions.AttributeSamplingProbability)
if found && attr.Type() == pdata.AttributeValueDOUBLE {
samplingProbability *= attr.DoubleVal()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really a multiplication here? I think it depends on the algorithm used, may be a min or multiplication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

sampledSpanAttributes.UpsertDouble(conventions.AttributeSamplingProbability, samplingProbability)
}

func (tsp *tracesamplerprocessor) processTraces(resourceSpans pdata.ResourceSpans, sampledTraceData pdata.Traces) {
scaledSamplingRate := tsp.scaledSamplingRate

Expand Down Expand Up @@ -115,6 +128,7 @@ func (tsp *tracesamplerprocessor) processTraces(resourceSpans pdata.ResourceSpan
hash(tidBytes[:], tsp.hashSeed)&bitMaskHashBuckets < scaledSamplingRate

if sampled {
tsp.updateSamplingProbability(span.Attributes())
spns.Append(span)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
)

Expand Down Expand Up @@ -71,6 +72,7 @@ func TestNewTraceProcessor(t *testing.T) {
if !tt.wantErr {
// The truncation below with uint32 cannot be defined at initialization (compiler error), performing it at runtime.
tt.want.(*tracesamplerprocessor).scaledSamplingRate = uint32(tt.cfg.SamplingPercentage * percentageScaleFactor)
tt.want.(*tracesamplerprocessor).samplingProbability = float64(tt.cfg.SamplingPercentage) * 0.01
}
got, err := newTraceProcessor(tt.nextConsumer, tt.cfg)
if (err != nil) != tt.wantErr {
Expand Down Expand Up @@ -227,15 +229,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange_MultipleResourceSpans(t

// Test_tracesamplerprocessor_SpanSamplingPriority checks if handling of "sampling.priority" is correct.
func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
singleSpanWithAttrib := func(key string, attribValue pdata.AttributeValue) pdata.Traces {
traces := pdata.NewTraces()
traces.ResourceSpans().Resize(1)
rs := traces.ResourceSpans().At(0)
rs.InstrumentationLibrarySpans().Resize(1)
instrLibrarySpans := rs.InstrumentationLibrarySpans().At(0)
instrLibrarySpans.Spans().Append(getSpanWithAttributes(key, attribValue))
return traces
}

tests := []struct {
name string
cfg Config
Expand All @@ -247,7 +241,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 0.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"sampling.priority",
pdata.NewAttributeValueInt(2)),
sampled: true,
Expand All @@ -257,7 +251,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 0.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"sampling.priority",
pdata.NewAttributeValueDouble(1)),
sampled: true,
Expand All @@ -267,7 +261,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 0.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"sampling.priority",
pdata.NewAttributeValueString("1")),
sampled: true,
Expand All @@ -277,7 +271,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 100.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"sampling.priority",
pdata.NewAttributeValueInt(0)),
},
Expand All @@ -286,7 +280,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 100.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"sampling.priority",
pdata.NewAttributeValueDouble(0)),
},
Expand All @@ -295,7 +289,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 100.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"sampling.priority",
pdata.NewAttributeValueString("0")),
},
Expand All @@ -304,7 +298,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 0.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"no.sampling.priority",
pdata.NewAttributeValueInt(2)),
},
Expand All @@ -313,7 +307,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 100.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"no.sampling.priority",
pdata.NewAttributeValueInt(2)),
sampled: true,
Expand Down Expand Up @@ -416,6 +410,59 @@ func Test_parseSpanSamplingPriority(t *testing.T) {
}
}

// Test_tracesamplerprocessor_SamplingProbabilityAttribute verifies if the attribute describing current sampling rate is included in sampled spans
func Test_tracesamplerprocessor_SamplingProbabilityAttribute(t *testing.T) {
cfg := Config{
SamplingPercentage: 100.0,
}

tests := []struct {
name string
traces pdata.Traces
wantSamplingProbabilityAttribute pdata.AttributeValue
}{
{
name: "simple_span",
traces: getTracesWithSpanWithAttribute("foo", pdata.NewAttributeValueString("bar")),
wantSamplingProbabilityAttribute: pdata.NewAttributeValueDouble(1.0),
},
{
name: "span_came_through_sampler_already",
traces: getTracesWithSpanWithAttribute(conventions.AttributeSamplingProbability, pdata.NewAttributeValueDouble(0.01)),
wantSamplingProbabilityAttribute: pdata.NewAttributeValueDouble(0.01),
},
{
name: "simple_with_invalid_attribute_value",
traces: getTracesWithSpanWithAttribute(conventions.AttributeSamplingProbability, pdata.NewAttributeValueString("bar")),
wantSamplingProbabilityAttribute: pdata.NewAttributeValueDouble(1.0),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sink := new(consumertest.TracesSink)
tsp, err := newTraceProcessor(sink, cfg)
if err != nil {
t.Errorf("error when creating tracesamplerprocessor: %v", err)
return
}

if err := tsp.ConsumeTraces(context.Background(), tt.traces); err != nil {
t.Errorf("tracesamplerprocessor.ConsumeTraceData() error = %v", err)
return
}
assert.Equal(t, 1, sink.SpansCount())
for _, td := range sink.AllTraces() {
span := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0)
attrValue, found := span.Attributes().Get(conventions.AttributeSamplingProbability)
assert.True(t, found, "Sampling probability attribute not found")
assert.Equal(t, tt.wantSamplingProbabilityAttribute, attrValue)
}
sink.Reset()
})
}
}

func getSpanWithAttributes(key string, value pdata.AttributeValue) pdata.Span {
span := pdata.NewSpan()
span.InitEmpty()
Expand All @@ -424,6 +471,16 @@ func getSpanWithAttributes(key string, value pdata.AttributeValue) pdata.Span {
return span
}

func getTracesWithSpanWithAttribute(key string, attribValue pdata.AttributeValue) pdata.Traces {
traces := pdata.NewTraces()
traces.ResourceSpans().Resize(1)
rs := traces.ResourceSpans().At(0)
rs.InstrumentationLibrarySpans().Resize(1)
instrLibrarySpans := rs.InstrumentationLibrarySpans().At(0)
instrLibrarySpans.Spans().Append(getSpanWithAttributes(key, attribValue))
return traces
}

// Test_hash ensures that the hash function supports different key lengths even if in
// practice it is only expected to receive keys with length 16 (trace id length in OC proto).
func Test_hash(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions translator/conventions/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
AttributeProcessExecutablePath = "process.executable.path"
AttributeProcessID = "process.pid"
AttributeProcessOwner = "process.owner"
AttributeSamplingProbability = "sampling.probability"
AttributeServiceInstance = "service.instance.id"
AttributeServiceName = "service.name"
AttributeServiceNamespace = "service.namespace"
Expand Down