Skip to content

Commit

Permalink
refactor(pushsync): better tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
acud committed Sep 16, 2021
1 parent d473f02 commit 5942a25
Showing 1 changed file with 18 additions and 5 deletions.
23 changes: 18 additions & 5 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)

chunk := swarm.NewChunk(swarm.NewAddress(ch.Address), ch.Data)
chunkAddress := chunk.Address()

span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunkAddress.String()})
defer span.Finish()

stamp := new(postage.Stamp)
// attaching the stamp is required becase pushToClosest expects a chunk with a stamp
err = stamp.UnmarshalBinary(ch.Stamp)
Expand Down Expand Up @@ -168,6 +172,9 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
ctxd, canceld := context.WithTimeout(context.Background(), timeToWaitForPushsyncToNeighbor)
defer canceld()

span, _, ctxd := ps.tracer.StartSpanFromContext(ctxd, "pushsync-replication-storage", ps.logger, opentracing.Tag{Key: "address", Value: chunkAddress.String()})
defer span.Finish()

chunk, err = ps.validStamp(chunk, ch.Stamp)
if err != nil {
ps.metrics.InvalidStampErrors.Inc()
Expand Down Expand Up @@ -229,9 +236,6 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
}
}

span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunkAddress.String()})
defer span.Finish()

receipt, err := ps.pushToClosest(ctx, chunk, false, p.Address)
if err != nil {
if errors.Is(err, topology.ErrWantSelf) {
Expand Down Expand Up @@ -353,7 +357,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllo
return true, false, nil
}
count++
go ps.pushToNeighbour(peer, ch, retryAllowed)
go ps.pushToNeighbour(ctx, peer, ch, retryAllowed)
return false, false, nil
})
return nil, err
Expand Down Expand Up @@ -466,7 +470,7 @@ func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.C
}

// pushToNeighbour handles in-neighborhood replication for a single peer.
func (ps *PushSync) pushToNeighbour(peer swarm.Address, ch swarm.Chunk, origin bool) {
func (ps *PushSync) pushToNeighbour(ctx context.Context, peer swarm.Address, ch swarm.Chunk, origin bool) {
var err error
defer func() {
if err != nil {
Expand All @@ -480,9 +484,18 @@ func (ps *PushSync) pushToNeighbour(peer swarm.Address, ch swarm.Chunk, origin b
// price for neighborhood replication
receiptPrice := ps.pricer.PeerPrice(peer, ch.Address())

// decouple the span data from the original context so it doesn't get
// cancelled, then glue the stuff on the new context
span := tracing.FromContext(ctx)

ctx, cancel := context.WithTimeout(context.Background(), timeToWaitForPushsyncToNeighbor)
defer cancel()

// now bring in the span data to the new context
ctx = tracing.WithContext(ctx, span)
spanInner, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-replication", ps.logger, opentracing.Tag{Key: "address", Value: ch.Address().String()})
defer spanInner.Finish()

err = ps.accounting.Reserve(ctx, peer, receiptPrice)
if err != nil {
err = fmt.Errorf("reserve balance for peer %s: %w", peer.String(), err)
Expand Down

0 comments on commit 5942a25

Please sign in to comment.