Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
swarm: Add global span store in tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
nolash committed Feb 11, 2019
1 parent 3f66ff2 commit a1bee28
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 25 deletions.
6 changes: 3 additions & 3 deletions swarm/network/stream/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/tracing"
opentracing "github.com/opentracing/opentracing-go"
)

Expand Down Expand Up @@ -214,11 +215,10 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch

// retrieve the span for the originating retrieverequest
spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr)
span, spanOk := sp.spans.Load(spanId)
sp.spans.Delete(spanId)
span := tracing.ShiftSpanByKey(spanId)

go func() {
if spanOk {
if span != nil {
defer span.(opentracing.Span).Finish()
}

Expand Down
26 changes: 5 additions & 21 deletions swarm/network/stream/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/tracing"
opentracing "github.com/opentracing/opentracing-go"
)

Expand Down Expand Up @@ -83,16 +84,11 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {
clients: make(map[Stream]*client),
clientParams: make(map[Stream]*clientParams),
quit: make(chan struct{}),
spans: sync.Map{},
//spans: sync.Map{},
}
ctx, cancel := context.WithCancel(context.Background())
go p.pq.Run(ctx, func(i interface{}) {
wmsg := i.(WrappedPriorityMsg)
// defer p.spans.Delete(wmsg.Context)
// sp, ok := p.spans.Load(wmsg.Context)
// if ok {
// defer sp.(opentracing.Span).Finish()
// }
err := p.Send(wmsg.Context, wmsg.Msg)
if err != nil {
log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err)
Expand Down Expand Up @@ -129,6 +125,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer {

go func() {
<-p.quit

cancel()
}()
return p
Expand Down Expand Up @@ -165,21 +162,8 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8,
// SendPriority sends message to the peer using the outgoing priority queue
func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error {
defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now())
tracing.StartSaveSpan(ctx)
metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1)
traceId := ctx.Value("stream_send_tag")
if traceId != nil {
traceStr := traceId.(string)
var sp opentracing.Span
ctx, sp = spancontext.StartSpan(
ctx,
traceStr,
)
traceMeta := ctx.Value("stream_send_meta")
if traceMeta != nil {
traceStr = traceStr + "." + traceMeta.(string)
}
p.spans.Store(traceId, sp)
}
wmsg := WrappedPriorityMsg{
Context: ctx,
Msg: msg,
Expand All @@ -197,7 +181,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {
var sp opentracing.Span
ctx, sp := spancontext.StartSpan(
context.TODO(),
"",
"send.offered.hashes",
)
defer sp.Finish()

Expand Down
7 changes: 7 additions & 0 deletions swarm/network/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/ethereum/go-ethereum/swarm/network/stream/intervals"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"

opentracing "github.com/opentracing/opentracing-go"
)

const (
Expand Down Expand Up @@ -95,6 +97,7 @@ type Registry struct {
spec *protocols.Spec //this protocol's spec
balance protocols.Balance //implements protocols.Balance, for accounting
prices protocols.Prices //implements protocols.Prices, provides prices to accounting
spans sync.Map
}

// RegistryOptions holds optional values for NewRegistry constructor.
Expand Down Expand Up @@ -884,6 +887,10 @@ func (r *Registry) Start(server *p2p.Server) error {
}

func (r *Registry) Stop() error {
r.spans.Range(func(k, v interface{}) bool {
v.(opentracing.Span).Finish()
return true
})
return nil
}

Expand Down
1 change: 1 addition & 0 deletions swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func (self *Swarm) updateGauges() {
// stops all component services.
func (self *Swarm) Stop() error {
if self.tracerClose != nil {
tracing.FinishSpans()
err := self.tracerClose.Close()
if err != nil {
return err
Expand Down
54 changes: 53 additions & 1 deletion swarm/tracing/tracing.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
package tracing

import (
"context"
"io"
"os"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/spancontext"

opentracing "github.com/opentracing/opentracing-go"
jaeger "github.com/uber/jaeger-client-go"
jaegercfg "github.com/uber/jaeger-client-go/config"
cli "gopkg.in/urfave/cli.v1"
)

var Enabled bool = false
var (
Enabled bool = false
store = spanStore{}
)

// TracingEnabledFlag is the CLI flag name to use to enable trace collections.
const TracingEnabledFlag = "tracing"
Expand Down Expand Up @@ -100,3 +108,47 @@ func initTracer(endpoint, svc string) (closer io.Closer) {

return closer
}

type spanStore struct {
spans sync.Map
}

func StartSaveSpan(ctx context.Context) context.Context {
if !Enabled {
return ctx
}
traceId := ctx.Value("span_save_id")
if traceId != nil {
traceStr := traceId.(string)
var sp opentracing.Span
ctx, sp = spancontext.StartSpan(
ctx,
traceStr,
)
traceMeta := ctx.Value("span_save_meta")
if traceMeta != nil {
traceStr = traceStr + "." + traceMeta.(string)
}
store.spans.Store(traceId, sp)
}
return ctx
}

func ShiftSpanByKey(k string) opentracing.Span {
if !Enabled {
return nil
}
span, spanOk := store.spans.Load(k)
if !spanOk {
return nil
}
store.spans.Delete(k)
return span.(opentracing.Span)
}

func FinishSpans() {
store.spans.Range(func(k, v interface{}) bool {
v.(opentracing.Span).Finish()
return true
})
}

0 comments on commit a1bee28

Please sign in to comment.