Skip to content

Commit

Permalink
Merge pull request #13 from opentracing/bg/protobuf
Browse files Browse the repository at this point in the history
Add protocol buffers for binary injection/joining
  • Loading branch information
bg451 committed Mar 25, 2016
2 parents 8ac8cc7 + 9e6073e commit fe1253c
Show file tree
Hide file tree
Showing 6 changed files with 632 additions and 82 deletions.
114 changes: 32 additions & 82 deletions propagation_ot.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package basictracer

import (
"bytes"
"encoding/binary"
"io"
"strconv"
"strings"
"time"

"github.com/gogo/protobuf/proto"
"github.com/opentracing/basictracer-go/wire"
opentracing "github.com/opentracing/opentracing-go"
)

Expand Down Expand Up @@ -134,50 +135,26 @@ func (p *binaryPropagator) Inject(
if !ok {
return opentracing.ErrInvalidCarrier
}
var err error
var sampledByte byte
if sc.raw.Sampled {
sampledByte = 1
}

// Handle the trace and span ids, and sampled status.
err = binary.Write(carrier, binary.BigEndian, sc.raw.TraceID)
if err != nil {
return err
}
state := wire.TracerState{}
state.TraceId = sc.raw.TraceID
state.SpanId = sc.raw.SpanID
state.Sampled = sc.raw.Sampled
state.BaggageItems = sc.raw.Baggage

err = binary.Write(carrier, binary.BigEndian, sc.raw.SpanID)
b, err := proto.Marshal(&state)
if err != nil {
return err
}

err = binary.Write(carrier, binary.BigEndian, sampledByte)
if err != nil {
// Write the length of the marshalled binary to the writer.
length := uint32(len(b))
if err := binary.Write(carrier, binary.BigEndian, &length); err != nil {
return err
}

// Handle the baggage.
err = binary.Write(carrier, binary.BigEndian, int32(len(sc.raw.Baggage)))
if err != nil {
return err
}
for key, val := range sc.raw.Baggage {
if err = binary.Write(carrier, binary.BigEndian, int32(len(key))); err != nil {
return err
}
if _, err = io.WriteString(carrier, key); err != nil {
return err
}

if err = binary.Write(carrier, binary.BigEndian, int32(len(val))); err != nil {
return err
}
if _, err = io.WriteString(carrier, val); err != nil {
return err
}
}

return nil
_, err = carrier.Write(b)
return err
}

func (p *binaryPropagator) Join(
Expand All @@ -188,66 +165,39 @@ func (p *binaryPropagator) Join(
if !ok {
return nil, opentracing.ErrInvalidCarrier
}
// Handle the trace, span ids, and sampled status.
var traceID, propagatedSpanID int64
var sampledByte byte

if err := binary.Read(carrier, binary.BigEndian, &traceID); err != nil {
if err == io.EOF {
return nil, opentracing.ErrTraceNotFound
}
return nil, opentracing.ErrTraceCorrupted
}
if err := binary.Read(carrier, binary.BigEndian, &propagatedSpanID); err != nil {
// Read the length of marshalled binary. io.ReadAll isn't that performant
// since it keeps resizing the underlying buffer as it encounters more bytes
// to read. By reading the length, we can allocate a fixed sized buf and read
// the exact amount of bytes into it.
var length uint32
if err := binary.Read(carrier, binary.BigEndian, &length); err != nil {
return nil, opentracing.ErrTraceCorrupted
}
if err := binary.Read(carrier, binary.BigEndian, &sampledByte); err != nil {
return nil, opentracing.ErrTraceCorrupted
buf := make([]byte, length)
if n, err := carrier.Read(buf); err != nil {
if n > 0 {
return nil, opentracing.ErrTraceCorrupted
}
return nil, opentracing.ErrTraceNotFound
}

// Handle the baggage.
var numBaggage int32
if err := binary.Read(carrier, binary.BigEndian, &numBaggage); err != nil {
ctx := wire.TracerState{}
if err := proto.Unmarshal(buf, &ctx); err != nil {
return nil, opentracing.ErrTraceCorrupted
}
iNumBaggage := int(numBaggage)
var baggageMap map[string]string
if iNumBaggage > 0 {
var buf bytes.Buffer // TODO(tschottdorf): candidate for sync.Pool
baggageMap = make(map[string]string, iNumBaggage)
var keyLen, valLen int32
for i := 0; i < iNumBaggage; i++ {
if err := binary.Read(carrier, binary.BigEndian, &keyLen); err != nil {
return nil, opentracing.ErrTraceCorrupted
}
buf.Grow(int(keyLen))
if n, err := io.CopyN(&buf, carrier, int64(keyLen)); err != nil || int32(n) != keyLen {
return nil, opentracing.ErrTraceCorrupted
}
key := buf.String()
buf.Reset()

if err := binary.Read(carrier, binary.BigEndian, &valLen); err != nil {
return nil, opentracing.ErrTraceCorrupted
}
if n, err := io.CopyN(&buf, carrier, int64(valLen)); err != nil || int32(n) != valLen {
return nil, opentracing.ErrTraceCorrupted
}
baggageMap[key] = buf.String()
buf.Reset()
}
}

sp := p.tracer.getSpan()
sp.raw = RawSpan{
Context: Context{
TraceID: traceID,
TraceID: ctx.TraceId,
SpanID: randomID(),
ParentSpanID: propagatedSpanID,
Sampled: sampledByte != 0,
ParentSpanID: ctx.SpanId,
Sampled: ctx.Sampled,
},
}
sp.raw.Baggage = baggageMap

sp.raw.Baggage = ctx.BaggageItems

return p.tracer.startSpanInternal(
sp,
Expand Down
40 changes: 40 additions & 0 deletions wire/carrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package wire

// ProtobufCarrier is a DelegatingCarrier that uses protocol buffers as the
// the underlying datastructure. The reason for implementing DelagatingCarrier
// is to allow for end users to serialize the underlying protocol buffers using
// jsonpb or any other serialization forms they want.
type ProtobufCarrier TracerState

// SetState set's the tracer state.
func (p *ProtobufCarrier) SetState(traceID, spanID int64, sampled bool) {
p.TraceId = traceID
p.SpanId = spanID
p.Sampled = sampled
}

// State returns the tracer state.
func (p *ProtobufCarrier) State() (traceID, spanID int64, sampled bool) {
traceID = p.TraceId
spanID = p.SpanId
sampled = p.Sampled
return traceID, spanID, sampled
}

// SetBaggageItem sets a baggage item.
func (p *ProtobufCarrier) SetBaggageItem(key, value string) {
if p.BaggageItems == nil {
p.BaggageItems = map[string]string{key: value}
return
}

p.BaggageItems[key] = value
}

// GetBaggage iterates over each baggage item and executes the callback with
// the key:value pair.
func (p *ProtobufCarrier) GetBaggage(f func(k, v string)) {
for k, v := range p.BaggageItems {
f(k, v)
}
}
38 changes: 38 additions & 0 deletions wire/carrier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package wire_test

import (
"testing"

"github.com/opentracing/basictracer-go"
"github.com/opentracing/basictracer-go/wire"
)

func TestProtobufCarrier(t *testing.T) {
var carrier basictracer.DelegatingCarrier = &wire.ProtobufCarrier{}

var traceID, spanID int64 = 1, 2
sampled := true
baggageKey, expVal := "key1", "val1"

carrier.SetState(traceID, spanID, sampled)
carrier.SetBaggageItem(baggageKey, expVal)
gotTraceID, gotSpanID, gotSampled := carrier.State()
if traceID != gotTraceID || spanID != gotSpanID || sampled != gotSampled {
t.Errorf("Wanted state %d %d %t, got %d %d %t", spanID, traceID, sampled,
gotTraceID, gotSpanID, gotSampled)
}

gotBaggage := map[string]string{}
f := func(k, v string) {
gotBaggage[k] = v
}

carrier.GetBaggage(f)
value, ok := gotBaggage[baggageKey]
if !ok {
t.Errorf("Expected baggage item %s to exist", baggageKey)
}
if value != expVal {
t.Errorf("Expected key %s to be %s, got %s", baggageKey, expVal, value)
}
}
6 changes: 6 additions & 0 deletions wire/gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package wire

//go:generate protoc --gogofaster_out=$GOPATH/src/github.com/opentracing/basictracer-go/wire wire.proto

// Run `go get github.com/gogo/protobuf/protoc-gen-gogofaster` to install the
// gogofaster generator binary.
Loading

0 comments on commit fe1253c

Please sign in to comment.