diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 3a5f59a24a..e051b804ca 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/tracing" opentracing "github.com/opentracing/opentracing-go" ) @@ -214,11 +215,10 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch // retrieve the span for the originating retrieverequest spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr) - span, spanOk := sp.spans.Load(spanId) - sp.spans.Delete(spanId) + span := tracing.ShiftSpanByKey(spanId) go func() { - if spanOk { + if span != nil { defer span.(opentracing.Span).Finish() } diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index ee7d89fec8..bcefdd7ae5 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/swarm/spancontext" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/ethereum/go-ethereum/swarm/tracing" opentracing "github.com/opentracing/opentracing-go" ) @@ -83,16 +84,11 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { clients: make(map[Stream]*client), clientParams: make(map[Stream]*clientParams), quit: make(chan struct{}), - spans: sync.Map{}, + //spans: sync.Map{}, } ctx, cancel := context.WithCancel(context.Background()) go p.pq.Run(ctx, func(i interface{}) { wmsg := i.(WrappedPriorityMsg) - // defer p.spans.Delete(wmsg.Context) - // sp, ok := p.spans.Load(wmsg.Context) - // if ok { - // defer sp.(opentracing.Span).Finish() - // } err := p.Send(wmsg.Context, wmsg.Msg) if err != nil { log.Error("Message send error, dropping peer", "peer", p.ID(), "err", err) @@ -129,6 +125,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { go func() { <-p.quit + cancel() }() return p @@ -165,21 +162,8 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, // SendPriority sends message to the peer using the outgoing priority queue func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error { defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("peer.sendpriority_t.%d", priority), nil).UpdateSince(time.Now()) + tracing.StartSaveSpan(ctx) metrics.GetOrRegisterCounter(fmt.Sprintf("peer.sendpriority.%d", priority), nil).Inc(1) - traceId := ctx.Value("stream_send_tag") - if traceId != nil { - traceStr := traceId.(string) - var sp opentracing.Span - ctx, sp = spancontext.StartSpan( - ctx, - traceStr, - ) - traceMeta := ctx.Value("stream_send_meta") - if traceMeta != nil { - traceStr = traceStr + "." + traceMeta.(string) - } - p.spans.Store(traceId, sp) - } wmsg := WrappedPriorityMsg{ Context: ctx, Msg: msg, @@ -197,7 +181,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { var sp opentracing.Span ctx, sp := spancontext.StartSpan( context.TODO(), - "", + "send.offered.hashes", ) defer sp.Finish() diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 035a404f7e..6d03e91b8b 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -35,6 +35,8 @@ import ( "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" + + opentracing "github.com/opentracing/opentracing-go" ) const ( @@ -95,6 +97,7 @@ type Registry struct { spec *protocols.Spec //this protocol's spec balance protocols.Balance //implements protocols.Balance, for accounting prices protocols.Prices //implements protocols.Prices, provides prices to accounting + spans sync.Map } // RegistryOptions holds optional values for NewRegistry constructor. @@ -884,6 +887,10 @@ func (r *Registry) Start(server *p2p.Server) error { } func (r *Registry) Stop() error { + r.spans.Range(func(k, v interface{}) bool { + v.(opentracing.Span).Finish() + return true + }) return nil } diff --git a/swarm/swarm.go b/swarm/swarm.go index 705fc43975..913f9fa921 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -422,6 +422,7 @@ func (self *Swarm) updateGauges() { // stops all component services. func (self *Swarm) Stop() error { if self.tracerClose != nil { + tracing.FinishSpans() err := self.tracerClose.Close() if err != nil { return err diff --git a/swarm/tracing/tracing.go b/swarm/tracing/tracing.go index f95fa41b8f..9ad7e48ec1 100644 --- a/swarm/tracing/tracing.go +++ b/swarm/tracing/tracing.go @@ -1,18 +1,26 @@ package tracing import ( + "context" "io" "os" "strings" + "sync" "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/spancontext" + + opentracing "github.com/opentracing/opentracing-go" jaeger "github.com/uber/jaeger-client-go" jaegercfg "github.com/uber/jaeger-client-go/config" cli "gopkg.in/urfave/cli.v1" ) -var Enabled bool = false +var ( + Enabled bool = false + store = spanStore{} +) // TracingEnabledFlag is the CLI flag name to use to enable trace collections. const TracingEnabledFlag = "tracing" @@ -100,3 +108,47 @@ func initTracer(endpoint, svc string) (closer io.Closer) { return closer } + +type spanStore struct { + spans sync.Map +} + +func StartSaveSpan(ctx context.Context) context.Context { + if !Enabled { + return ctx + } + traceId := ctx.Value("span_save_id") + if traceId != nil { + traceStr := traceId.(string) + var sp opentracing.Span + ctx, sp = spancontext.StartSpan( + ctx, + traceStr, + ) + traceMeta := ctx.Value("span_save_meta") + if traceMeta != nil { + traceStr = traceStr + "." + traceMeta.(string) + } + store.spans.Store(traceId, sp) + } + return ctx +} + +func ShiftSpanByKey(k string) opentracing.Span { + if !Enabled { + return nil + } + span, spanOk := store.spans.Load(k) + if !spanOk { + return nil + } + store.spans.Delete(k) + return span.(opentracing.Span) +} + +func FinishSpans() { + store.spans.Range(func(k, v interface{}) bool { + v.(opentracing.Span).Finish() + return true + }) +}