Skip to content

Commit

Permalink
Merge pull request #50 from hkshaw1990/perfevent_support
Browse files Browse the repository at this point in the history
Monitoring platform events with zipkin-go and opentracing
  • Loading branch information
basvanbeek authored Jun 22, 2017
2 parents d88c901 + 7552a47 commit 1cafbdf
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 0 deletions.
52 changes: 52 additions & 0 deletions observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package zipkintracer

import (
opentracing "github.com/opentracing/opentracing-go"
otobserver "github.com/opentracing-contrib/go-observer"
)

// observer is a dispatcher to other observers
type observer struct {
observers []otobserver.Observer
}

// spanObserver is a dispatcher to other span observers
type spanObserver struct {
observers []otobserver.SpanObserver
}

func (o observer) OnStartSpan(sp opentracing.Span, operationName string, options opentracing.StartSpanOptions) (otobserver.SpanObserver, bool) {
var spanObservers []otobserver.SpanObserver
for _, obs := range o.observers {
spanObs, ok := obs.OnStartSpan(sp, operationName, options)
if ok {
if spanObservers == nil {
spanObservers = make([]otobserver.SpanObserver, 0, len(o.observers))
}
spanObservers = append(spanObservers, spanObs)
}
}
if len(spanObservers) == 0 {
return nil, false
}

return spanObserver{observers: spanObservers}, true
}

func (o spanObserver) OnSetOperationName(operationName string) {
for _, obs := range o.observers {
obs.OnSetOperationName(operationName)
}
}

func (o spanObserver) OnSetTag(key string, value interface{}) {
for _, obs := range o.observers {
obs.OnSetTag(key, value)
}
}

func (o spanObserver) OnFinish(options opentracing.FinishOptions) {
for _, obs := range o.observers {
obs.OnFinish(options)
}
}
13 changes: 13 additions & 0 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/opentracing/opentracing-go/log"

"github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore"
otobserver "github.com/opentracing-contrib/go-observer"
)

// Span provides access to the essential details of the span, for use
Expand All @@ -29,6 +30,7 @@ type Span interface {
type spanImpl struct {
tracer *tracerImpl
event func(SpanEvent)
observer otobserver.SpanObserver
sync.Mutex // protects the fields below
raw RawSpan
// The number of logs dropped because of MaxLogsPerSpan.
Expand Down Expand Up @@ -63,6 +65,9 @@ func (s *spanImpl) reset() {
}

func (s *spanImpl) SetOperationName(operationName string) opentracing.Span {
if s.observer != nil {
s.observer.OnSetOperationName(operationName)
}
s.Lock()
defer s.Unlock()
s.raw.Operation = operationName
Expand All @@ -75,6 +80,10 @@ func (s *spanImpl) trim() bool {

func (s *spanImpl) SetTag(key string, value interface{}) opentracing.Span {
defer s.onTag(key, value)
if s.observer != nil {
s.observer.OnSetTag(key, value)
}

s.Lock()
defer s.Unlock()
if key == string(ext.SamplingPriority) {
Expand Down Expand Up @@ -190,6 +199,10 @@ func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) {
}
duration := finishTime.Sub(s.raw.Start)

if s.observer != nil {
s.observer.OnFinish(opts)
}

s.Lock()
defer s.Unlock()

Expand Down
18 changes: 18 additions & 0 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/opentracing/opentracing-go/ext"

"github.com/openzipkin/zipkin-go-opentracing/flag"
otobserver "github.com/opentracing-contrib/go-observer"
)

// ErrInvalidEndpoint will be thrown if hostPort parameter is corrupted or host
Expand Down Expand Up @@ -107,6 +108,8 @@ type TracerOptions struct {
// Regardless of this setting, the library will propagate and support both
// 64 and 128 bit incoming traces from upstream sources.
traceID128Bit bool

observer otobserver.Observer
}

// TracerOption allows for functional options.
Expand Down Expand Up @@ -231,6 +234,7 @@ func NewTracer(recorder SpanRecorder, options ...TracerOption) (opentracing.Trac
debugMode: false,
traceID128Bit: false,
maxLogsPerSpan: 10000,
observer: nil,
}
for _, o := range options {
err := o(opts)
Expand Down Expand Up @@ -289,6 +293,11 @@ func (t *tracerImpl) startSpanWithOptions(
// Build the new span. This is the only allocation: We'll return this as
// an opentracing.Span.
sp := t.getSpan()

if t.options.observer != nil {
sp.observer, _ = t.options.observer.OnStartSpan(sp, operationName, opts)
}

// Look for a parent in the list of References.
//
// TODO: would be nice if basictracer did something with all
Expand Down Expand Up @@ -379,6 +388,7 @@ func (t *tracerImpl) startSpanInternal(
sp.raw.Start = startTime
sp.raw.Duration = -1
sp.raw.Tags = tags

if t.options.debugAssertSingleGoroutine {
sp.SetTag(debugGoroutineIDTag, curGoroutineID())
}
Expand Down Expand Up @@ -420,3 +430,11 @@ func (t *tracerImpl) Extract(format interface{}, carrier interface{}) (opentraci
func (t *tracerImpl) Options() TracerOptions {
return t.options
}

// WithObserver assigns an initialized observer to opts.observer
func WithObserver(observer otobserver.Observer) TracerOption {
return func(opts *TracerOptions) error {
opts.observer = observer
return nil
}
}

0 comments on commit 1cafbdf

Please sign in to comment.