Skip to content

Commit

Permalink
feat(#90): adds support for finished span handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
jcchavezs committed Jul 13, 2019
1 parent 55065cd commit a1410cd
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 8 deletions.
75 changes: 69 additions & 6 deletions span_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
type spanImpl struct {
mtx sync.RWMutex
model.SpanModel
tracer *Tracer
mustCollect int32 // used as atomic bool (1 = true, 0 = false)
flushOnFinish bool
tracer *Tracer
mustCollect int32 // used as atomic bool (1 = true, 0 = false)
flushOnFinish bool
finishedSpanHandler func(*model.SpanModel) bool
}

func (s *spanImpl) Context() model.SpanContext {
Expand Down Expand Up @@ -77,21 +78,83 @@ func (s *spanImpl) Tag(key, value string) {
}

func (s *spanImpl) Finish() {
d := time.Since(s.Timestamp)
if atomic.CompareAndSwapInt32(&s.mustCollect, 1, 0) {
s.Duration = time.Since(s.Timestamp)
if s.flushOnFinish {
s.mtx.Lock()
s.Duration = d
s.mtx.Unlock()

shouldRecord := true
if s.finishedSpanHandler != nil {
shouldRecord = s.finishedSpanHandler(&s.SpanModel)
}

if shouldRecord && s.flushOnFinish {
s.tracer.reporter.Send(s.SpanModel)
}
return
}

var hasDuration bool
s.mtx.Lock()
hasDuration = s.Duration == 0
s.mtx.Unlock()

if hasDuration {
// it was not meant to be recorded because the CompareAndSwap
// did not happen (meaning that s.mustCollect is 0 at this moment)
// and duration is still zero value.
s.mtx.Lock()
s.Duration = d
s.mtx.Unlock()

shouldRecord := false
if s.finishedSpanHandler != nil {
shouldRecord = s.finishedSpanHandler(&s.SpanModel)
}

if shouldRecord && s.flushOnFinish {
s.tracer.reporter.Send(s.SpanModel)
}
} // else the span is being finished concurrently by another goroutine
}

func (s *spanImpl) FinishedWithDuration(d time.Duration) {
if atomic.CompareAndSwapInt32(&s.mustCollect, 1, 0) {
s.mtx.Lock()
s.Duration = d
if s.flushOnFinish {
s.mtx.Unlock()

shouldRecord := true
if s.finishedSpanHandler != nil {
shouldRecord = s.finishedSpanHandler(&s.SpanModel)
}

if shouldRecord && s.flushOnFinish {
s.tracer.reporter.Send(s.SpanModel)
}
return
}

var hasDuration bool
s.mtx.Lock()
hasDuration = s.Duration == 0
s.mtx.Unlock()

if hasDuration {
// it was not meant to be recorded because the CompareAndSwap
// did not happen (meaning that s.mustCollect is 0 at this moment)
// and duration is still zero value.
s.Duration = d
shouldRecord := false
if s.finishedSpanHandler != nil {
shouldRecord = s.finishedSpanHandler(&s.SpanModel)
}

if shouldRecord && s.flushOnFinish {
s.tracer.reporter.Send(s.SpanModel)
}
} // else the span is being finished concurrently by another goroutine
}

func (s *spanImpl) Flush() {
Expand Down
6 changes: 4 additions & 2 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Tracer struct {
noop int32 // used as atomic bool (1 = true, 0 = false)
sharedSpans bool
unsampledNoop bool
finishedSpanHandler func(*model.SpanModel) bool
}

// NewTracer returns a new Zipkin Tracer.
Expand Down Expand Up @@ -93,8 +94,9 @@ func (t *Tracer) StartSpan(name string, options ...SpanOption) Span {
Annotations: make([]model.Annotation, 0),
Tags: make(map[string]string),
},
flushOnFinish: true,
tracer: t,
flushOnFinish: true,
tracer: t,
finishedSpanHandler: t.finishedSpanHandler,
}

// add default tracer tags to span
Expand Down
10 changes: 10 additions & 0 deletions tracer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,13 @@ func WithNoopTracer(tracerNoop bool) TracerOption {
return nil
}
}

// WithFinishedSpanHandler if set, can mutate all span data and decide if a span
// should be recorded or not. E.g. user could decide to sample all requests having
// an error tag set or duration over certain value.
func WithFinishedSpanHandler(handler func(*model.SpanModel) bool) TracerOption {
return func(o *Tracer) error {
o.finishedSpanHandler = handler
return nil
}
}
48 changes: 48 additions & 0 deletions tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/openzipkin/zipkin-go/reporter/recorder"

"github.com/openzipkin/zipkin-go/idgenerator"
"github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/reporter"
Expand Down Expand Up @@ -808,3 +810,49 @@ func TestLocalEndpoint(t *testing.T) {
t.Errorf("IPv6 endpoint want %+v, have %+v", want.IPv6, have.IPv6)
}
}

func TestFinishedSpanHandlerAvoidsReporting(t *testing.T) {
rep := recorder.NewReporter()
defer rep.Close()

tracer, _ := NewTracer(
rep,
WithNoopSpan(false),
WithSampler(AlwaysSample),
WithFinishedSpanHandler(func(s *model.SpanModel) bool {
return false
}),
)
sp := tracer.StartSpan("test")
sp.Finish()

if want, have := 0, len(rep.Flush()); want != have {
t.Errorf("unexpected number of spans, want: %d, have: %d", want, have)
}
}

func TestFinishedSpanAddsTagsToSpan(t *testing.T) {
rep := recorder.NewReporter()
defer rep.Close()

tracer, _ := NewTracer(
rep,
WithNoopSpan(false),
WithSampler(AlwaysSample),
WithFinishedSpanHandler(func(s *model.SpanModel) bool {
s.Tags["my_key"] = "my_value"
return true
}),
)
sp := tracer.StartSpan("test")
sp.Finish()

repSans := rep.Flush()
if want, have := 1, len(repSans); want != have {
t.Errorf("unexpected number of spans, want: %d, have: %d", want, have)
}

if want, have := "my_value", repSans[0].Tags["my_key"]; want != have {
t.Errorf("unexpected number of spans, want: %q, have: %q", want, have)
}
}

0 comments on commit a1410cd

Please sign in to comment.