Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitoring platform events with zipkin-go and opentracing #50

Merged
merged 5 commits into from
Jun 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package zipkintracer

import (
opentracing "github.com/opentracing/opentracing-go"
otobserver "github.com/opentracing-contrib/go-observer"
)

// observer is a dispatcher to other observers
type observer struct {
observers []otobserver.Observer
}

// spanObserver is a dispatcher to other span observers
type spanObserver struct {
observers []otobserver.SpanObserver
}

func (o observer) OnStartSpan(sp opentracing.Span, operationName string, options opentracing.StartSpanOptions) (otobserver.SpanObserver, bool) {
var spanObservers []otobserver.SpanObserver
for _, obs := range o.observers {
spanObs, ok := obs.OnStartSpan(sp, operationName, options)
if ok {
if spanObservers == nil {
spanObservers = make([]otobserver.SpanObserver, 0, len(o.observers))
}
spanObservers = append(spanObservers, spanObs)
}
}
if len(spanObservers) == 0 {
return nil, false
}

return spanObserver{observers: spanObservers}, true
}

func (o spanObserver) OnSetOperationName(operationName string) {
for _, obs := range o.observers {
obs.OnSetOperationName(operationName)
}
}

func (o spanObserver) OnSetTag(key string, value interface{}) {
for _, obs := range o.observers {
obs.OnSetTag(key, value)
}
}

func (o spanObserver) OnFinish(options opentracing.FinishOptions) {
for _, obs := range o.observers {
obs.OnFinish(options)
}
}
13 changes: 13 additions & 0 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/opentracing/opentracing-go/log"

"github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore"
otobserver "github.com/opentracing-contrib/go-observer"
)

// Span provides access to the essential details of the span, for use
Expand All @@ -29,6 +30,7 @@ type Span interface {
type spanImpl struct {
tracer *tracerImpl
event func(SpanEvent)
observer otobserver.SpanObserver
sync.Mutex // protects the fields below
raw RawSpan
// The number of logs dropped because of MaxLogsPerSpan.
Expand Down Expand Up @@ -63,6 +65,9 @@ func (s *spanImpl) reset() {
}

func (s *spanImpl) SetOperationName(operationName string) opentracing.Span {
if s.observer != nil {
s.observer.OnSetOperationName(operationName)
}
s.Lock()
defer s.Unlock()
s.raw.Operation = operationName
Expand All @@ -75,6 +80,10 @@ func (s *spanImpl) trim() bool {

func (s *spanImpl) SetTag(key string, value interface{}) opentracing.Span {
defer s.onTag(key, value)
if s.observer != nil {
s.observer.OnSetTag(key, value)
}

s.Lock()
defer s.Unlock()
if key == string(ext.SamplingPriority) {
Expand Down Expand Up @@ -190,6 +199,10 @@ func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) {
}
duration := finishTime.Sub(s.raw.Start)

if s.observer != nil {
s.observer.OnFinish(opts)
}

s.Lock()
defer s.Unlock()

Expand Down
18 changes: 18 additions & 0 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/opentracing/opentracing-go/ext"

"github.com/openzipkin/zipkin-go-opentracing/flag"
otobserver "github.com/opentracing-contrib/go-observer"
)

// ErrInvalidEndpoint will be thrown if hostPort parameter is corrupted or host
Expand Down Expand Up @@ -107,6 +108,8 @@ type TracerOptions struct {
// Regardless of this setting, the library will propagate and support both
// 64 and 128 bit incoming traces from upstream sources.
traceID128Bit bool

observer otobserver.Observer
}

// TracerOption allows for functional options.
Expand Down Expand Up @@ -228,6 +231,7 @@ func NewTracer(recorder SpanRecorder, options ...TracerOption) (opentracing.Trac
debugMode: false,
traceID128Bit: false,
maxLogsPerSpan: 10000,
observer: nil,
}
for _, o := range options {
err := o(opts)
Expand Down Expand Up @@ -286,6 +290,11 @@ func (t *tracerImpl) startSpanWithOptions(
// Build the new span. This is the only allocation: We'll return this as
// an opentracing.Span.
sp := t.getSpan()

if t.options.observer != nil {
sp.observer, _ = t.options.observer.OnStartSpan(sp, operationName, opts)
}

// Look for a parent in the list of References.
//
// TODO: would be nice if basictracer did something with all
Expand Down Expand Up @@ -376,6 +385,7 @@ func (t *tracerImpl) startSpanInternal(
sp.raw.Start = startTime
sp.raw.Duration = -1
sp.raw.Tags = tags

if t.options.debugAssertSingleGoroutine {
sp.SetTag(debugGoroutineIDTag, curGoroutineID())
}
Expand Down Expand Up @@ -417,3 +427,11 @@ func (t *tracerImpl) Extract(format interface{}, carrier interface{}) (opentraci
func (t *tracerImpl) Options() TracerOptions {
return t.options
}

// WithObserver assigns an initialized observer to opts.observer
func WithObserver(observer otobserver.Observer) TracerOption {
return func(opts *TracerOptions) error {
opts.observer = observer
return nil
}
}