Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

All spans of a trace share sampling state #443

Merged
merged 8 commits into from
Oct 11, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

102 changes: 81 additions & 21 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ package jaeger
import (
"errors"
"fmt"
"go.uber.org/atomic"
"strconv"
"strings"
)

const (
flagSampled = byte(1)
flagDebug = byte(2)
flagFirehose = byte(8)
flagUnsampled = 0

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flagUnsampled is unused (from varcheck)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flagUnsampled is unused (from deadcode)

flagSampled = 1
flagDebug = 2
flagFirehose = 8
)

var (
Expand Down Expand Up @@ -56,9 +58,6 @@ type SpanContext struct {
// Should be 0 if the current span is a root span.
parentID SpanID

// flags is a bitmap containing such bits as 'sampled' and 'debug'.
flags byte

// Distributed Context baggage. The is a snapshot in time.
baggage map[string]string

Expand All @@ -67,6 +66,65 @@ type SpanContext struct {
//
// See JaegerDebugHeader in constants.go
debugID string

// samplingState is shared across all spans
samplingState *samplingState
}

type samplingState struct {
_flags atomic.Int32 // Only lower 8 bits are used. We use an int32 instead of a byte to use CAS operations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we never use underscores for field names, what's the reason to start now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used it to avoid a conflict with flags(), but we can rename to ctxFlags or sth else

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oic. Usually collisions are avoided by making methods public, like Flags.

Let's keep the underscore.

}

func (s *samplingState) setFlag(newFlag int32) {
swapped := false
for !swapped {
old := s._flags.Load()
swapped = s._flags.CAS(old, old|newFlag)
}
}

func (s *samplingState) resetFlag(newFlag int32) {
vprithvi marked this conversation as resolved.
Show resolved Hide resolved
swapped := false
for !swapped {
old := s._flags.Load()
swapped = s._flags.CAS(old, old&^newFlag)
}
}

func (s *samplingState) setSampled() {
s.setFlag(flagSampled)
}

func (s *samplingState) resetSampled() {
s.resetFlag(flagSampled)
}

func (s *samplingState) setDebugAndSampled() {
s.setFlag(flagDebug | flagSampled)
}

func (s *samplingState) setFirehose() {
s.setFlag(flagFirehose)
}

func (s *samplingState) setFlags(flags byte) {
s._flags.Store(int32(flags))
}

func (s *samplingState) flags() byte {
return byte(s._flags.Load())
}

func (s *samplingState) isSampled() bool {
return s._flags.Load()&flagSampled == flagSampled
}

func (s *samplingState) isDebug() bool {
return s._flags.Load()&flagDebug == flagDebug
}

func (s *samplingState) isFirehose() bool {
return s._flags.Load()&flagFirehose == flagFirehose
}

// ForeachBaggageItem implements ForeachBaggageItem() of opentracing.SpanContext
Expand All @@ -81,17 +139,17 @@ func (c SpanContext) ForeachBaggageItem(handler func(k, v string) bool) {
// IsSampled returns whether this trace was chosen for permanent storage
// by the sampling mechanism of the tracer.
func (c SpanContext) IsSampled() bool {
return (c.flags & flagSampled) == flagSampled
return c.samplingState.isSampled()
}

// IsDebug indicates whether sampling was explicitly requested by the service.
func (c SpanContext) IsDebug() bool {
return (c.flags & flagDebug) == flagDebug
return c.samplingState.isDebug()
}

// IsFirehose indicates whether the firehose flag was set
func (c SpanContext) IsFirehose() bool {
return (c.flags & flagFirehose) == flagFirehose
return c.samplingState.isFirehose()
}

// IsValid indicates whether this context actually represents a valid trace.
Expand All @@ -101,9 +159,9 @@ func (c SpanContext) IsValid() bool {

func (c SpanContext) String() string {
if c.traceID.High == 0 {
return fmt.Sprintf("%x:%x:%x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.flags)
return fmt.Sprintf("%x:%x:%x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState._flags.Load())
}
return fmt.Sprintf("%x%016x:%x:%x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.flags)
return fmt.Sprintf("%x%016x:%x:%x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState._flags.Load())
}

// ContextFromString reconstructs the Context encoded in a string
Expand All @@ -130,7 +188,8 @@ func ContextFromString(value string) (SpanContext, error) {
if err != nil {
return emptyContext, err
}
context.flags = byte(flags)
context.samplingState = &samplingState{}
context.samplingState.setFlags(byte(flags))
return context, nil
}

Expand All @@ -151,16 +210,17 @@ func (c SpanContext) ParentID() SpanID {

// NewSpanContext creates a new instance of SpanContext
func NewSpanContext(traceID TraceID, spanID, parentID SpanID, sampled bool, baggage map[string]string) SpanContext {
flags := byte(0)
samplingState := &samplingState{}
if sampled {
flags = flagSampled
samplingState.setSampled()
}

return SpanContext{
traceID: traceID,
spanID: spanID,
parentID: parentID,
flags: flags,
baggage: baggage}
traceID: traceID,
spanID: spanID,
parentID: parentID,
samplingState: samplingState,
baggage: baggage}
}

// CopyFrom copies data from ctx into this context, including span identity and baggage.
Expand All @@ -169,7 +229,7 @@ func (c *SpanContext) CopyFrom(ctx *SpanContext) {
c.traceID = ctx.traceID
c.spanID = ctx.spanID
c.parentID = ctx.parentID
c.flags = ctx.flags
c.samplingState = ctx.samplingState
if l := len(ctx.baggage); l > 0 {
c.baggage = make(map[string]string, l)
for k, v := range ctx.baggage {
Expand All @@ -193,7 +253,7 @@ func (c SpanContext) WithBaggageItem(key, value string) SpanContext {
newBaggage[key] = value
}
// Use positional parameters so the compiler will help catch new fields.
return SpanContext{c.traceID, c.spanID, c.parentID, c.flags, newBaggage, ""}
return SpanContext{c.traceID, c.spanID, c.parentID, newBaggage, "", c.samplingState}
}

// isDebugIDContainerOnly returns true when the instance of the context is only
Expand Down
4 changes: 2 additions & 2 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ func TestContextFromString(t *testing.T) {
assert.EqualValues(t, TraceID{Low: 1}, ctx.traceID)
assert.EqualValues(t, 1, ctx.spanID)
assert.EqualValues(t, 1, ctx.parentID)
assert.EqualValues(t, 1, ctx.flags)
assert.True(t, ctx.samplingState.isSampled())
ctx = NewSpanContext(TraceID{Low: 1}, 1, 1, true, nil)
assert.EqualValues(t, TraceID{Low: 1}, ctx.traceID)
assert.EqualValues(t, 1, ctx.spanID)
assert.EqualValues(t, 1, ctx.parentID)
assert.EqualValues(t, 1, ctx.flags)
assert.True(t, ctx.samplingState.isSampled())
assert.Equal(t, "ff", SpanID(255).String())
assert.Equal(t, "ff", TraceID{Low: 255}.String())
assert.Equal(t, "ff00000000000000ff", TraceID{High: 255, Low: 255}.String())
Expand Down
2 changes: 1 addition & 1 deletion jaeger_thrift_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func BuildJaegerThrift(span *Span) *j.Span {
SpanId: int64(span.context.spanID),
ParentSpanId: int64(span.context.parentID),
OperationName: span.operationName,
Flags: int32(span.context.flags),
Flags: int32(span.context.samplingState.flags()),
StartTime: startTime,
Duration: duration,
Tags: buildTags(span.tags, span.tracer.options.maxTagValueLength),
Expand Down
8 changes: 6 additions & 2 deletions propagation.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (p *BinaryPropagator) Inject(
if err := binary.Write(carrier, binary.BigEndian, sc.parentID); err != nil {
return err
}
if err := binary.Write(carrier, binary.BigEndian, sc.flags); err != nil {
if err := binary.Write(carrier, binary.BigEndian, sc.samplingState.flags()); err != nil {
return err
}

Expand Down Expand Up @@ -222,6 +222,7 @@ func (p *BinaryPropagator) Extract(abstractCarrier interface{}) (SpanContext, er
return emptyContext, opentracing.ErrInvalidCarrier
}
var ctx SpanContext
ctx.samplingState = &samplingState{}

if err := binary.Read(carrier, binary.BigEndian, &ctx.traceID); err != nil {
return emptyContext, opentracing.ErrSpanContextCorrupted
Expand All @@ -232,9 +233,12 @@ func (p *BinaryPropagator) Extract(abstractCarrier interface{}) (SpanContext, er
if err := binary.Read(carrier, binary.BigEndian, &ctx.parentID); err != nil {
return emptyContext, opentracing.ErrSpanContextCorrupted
}
if err := binary.Read(carrier, binary.BigEndian, &ctx.flags); err != nil {

var flags byte
if err := binary.Read(carrier, binary.BigEndian, &flags); err != nil {
return emptyContext, opentracing.ErrSpanContextCorrupted
}
ctx.samplingState.setFlags(flags)

// Handle the baggage items
var numBaggage int32
Expand Down
12 changes: 5 additions & 7 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,15 @@ func setSamplingPriority(s *Span, value interface{}) bool {
if !ok {
return false
}
s.Lock()
defer s.Unlock()
if val == 0 {
s.context.flags = s.context.flags & (^flagSampled)
// TODO: Add test to verify that only sampled is reset
s.context.samplingState.resetSampled()
return true
}
if s.tracer.options.noDebugFlagOnForcedSampling {
s.context.flags = s.context.flags | flagSampled
return true
s.context.samplingState.setSampled()
} else if s.tracer.isDebugAllowed(s.operationName) {
s.context.flags = s.context.flags | flagDebug | flagSampled
s.context.samplingState.setDebugAndSampled()
return true
}
return false
Expand All @@ -326,5 +324,5 @@ func setSamplingPriority(s *Span, value interface{}) bool {
func EnableFirehose(s *Span) {
s.Lock()
defer s.Unlock()
s.context.flags |= flagFirehose
s.context.samplingState.setFirehose()
}
8 changes: 4 additions & 4 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,12 @@ func (t *Tracer) startSpanWithOptions(
}
ctx.spanID = SpanID(ctx.traceID.Low)
ctx.parentID = 0
ctx.flags = byte(0)
ctx.samplingState = &samplingState{}
if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) {
ctx.flags |= (flagSampled | flagDebug)
ctx.samplingState.setDebugAndSampled()
samplerTags = []Tag{{key: JaegerDebugHeader, value: parent.debugID}}
} else if sampled, tags := t.sampler.IsSampled(ctx.traceID, operationName); sampled {
ctx.flags |= flagSampled
ctx.samplingState.setSampled()
samplerTags = tags
}
} else {
Expand All @@ -290,7 +290,7 @@ func (t *Tracer) startSpanWithOptions(
ctx.spanID = SpanID(t.randomID())
ctx.parentID = parent.spanID
}
ctx.flags = parent.flags
ctx.samplingState = parent.samplingState
}
if hasParent {
// copy baggage items
Expand Down
6 changes: 2 additions & 4 deletions tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,11 @@ func (s *tracerSuite) TestSetOperationName() {
func (s *tracerSuite) TestSamplerEffects() {
s.tracer.(*Tracer).sampler = NewConstSampler(true)
sp := s.tracer.StartSpan("test")
flags := sp.(*Span).context.flags
s.EqualValues(flagSampled, flags&flagSampled)
s.True(sp.(*Span).context.IsSampled())

s.tracer.(*Tracer).sampler = NewConstSampler(false)
sp = s.tracer.StartSpan("test")
flags = sp.(*Span).context.flags
s.EqualValues(0, flags&flagSampled)
s.False(sp.(*Span).context.IsSampled())
}

func (s *tracerSuite) TestRandomIDNotZero() {
Expand Down
4 changes: 4 additions & 0 deletions transport_udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestEmitBatchOverhead(t *testing.T) {
client := j.NewAgentClientFactory(transport, protocolFactory)

span := &Span{operationName: "test-span", tracer: jaegerTracer}
span.context.samplingState = &samplingState{}
vprithvi marked this conversation as resolved.
Show resolved Hide resolved
spanSize := getThriftSpanByteLength(t, span)

tests := []int{1, 2, 14, 15, 377, 500, 65000, 0xFFFF}
Expand Down Expand Up @@ -88,6 +89,7 @@ func TestUDPSenderFlush(t *testing.T) {
defer agent.Close()

span := &Span{operationName: "test-span", tracer: jaegerTracer}
span.context.samplingState = &samplingState{}
spanSize := getThriftSpanByteLength(t, span)
processSize := getThriftProcessByteLengthFromTracer(t, jaegerTracer)

Expand Down Expand Up @@ -134,6 +136,7 @@ func TestUDPSenderAppend(t *testing.T) {
defer agent.Close()

span := &Span{operationName: "test-span", tracer: jaegerTracer}
span.context.samplingState = &samplingState{}
spanSize := getThriftSpanByteLength(t, span)
processSize := getThriftProcessByteLengthFromTracer(t, jaegerTracer)

Expand Down Expand Up @@ -209,6 +212,7 @@ func TestUDPSenderHugeSpan(t *testing.T) {
defer agent.Close()

span := &Span{operationName: "test-span", tracer: jaegerTracer}
span.context.samplingState = &samplingState{}
spanSize := getThriftSpanByteLength(t, span)

sender, err := NewUDPTransport(agent.SpanServerAddr(), spanSize/2+emitBatchOverhead)
Expand Down
5 changes: 3 additions & 2 deletions zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (p *zipkinPropagator) Inject(
carrier.SetTraceID(ctx.TraceID().Low) // TODO this cannot work with 128bit IDs
carrier.SetSpanID(uint64(ctx.SpanID()))
carrier.SetParentID(uint64(ctx.ParentID()))
carrier.SetFlags(ctx.flags)
carrier.SetFlags(ctx.samplingState.flags())
return nil
}

Expand All @@ -71,6 +71,7 @@ func (p *zipkinPropagator) Extract(abstractCarrier interface{}) (SpanContext, er
ctx.traceID.Low = carrier.TraceID()
ctx.spanID = SpanID(carrier.SpanID())
ctx.parentID = SpanID(carrier.ParentID())
ctx.flags = carrier.Flags()
ctx.samplingState = &samplingState{}
ctx.samplingState.setFlags(carrier.Flags())
return ctx, nil
}
4 changes: 2 additions & 2 deletions zipkin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestZipkinPropagator(t *testing.T) {
assert.Equal(t, sp1.context.traceID, TraceID{Low: carrier.traceID})
assert.Equal(t, sp1.context.spanID, SpanID(carrier.spanID))
assert.Equal(t, sp1.context.parentID, SpanID(carrier.parentID))
assert.Equal(t, sp1.context.flags, carrier.flags)
assert.Equal(t, sp1.context.samplingState.flags(), carrier.flags)

sp2ctx, err := tracer.Extract("zipkin-span-format", carrier)
if err != nil {
Expand All @@ -47,7 +47,7 @@ func TestZipkinPropagator(t *testing.T) {
assert.Equal(t, sp1.context.traceID, sp3.context.traceID)
assert.Equal(t, sp1.context.spanID, sp3.context.spanID)
assert.Equal(t, sp1.context.parentID, sp3.context.parentID)
assert.Equal(t, sp1.context.flags, sp3.context.flags)
assert.Equal(t, sp1.context.samplingState.flags(), sp3.context.samplingState.flags())
}

// TestZipkinSpan is a mock-up of TChannel's internal Span struct
Expand Down