Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(pushsync): Don't account for failed retries due to accounting #1662

Merged
merged 7 commits into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 101 additions & 95 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ const (
)

const (
maxPeers = 5
maxPeers = 3
maxAttempts = 16
)

var (
ErrOutOfDepthReplication = errors.New("replication outside of the neighborhood")
ErrNoPush = errors.New("could not push chunk")
)

type PushSyncer interface {
Expand Down Expand Up @@ -69,7 +71,7 @@ type PushSync struct {
isFullNode bool
}

var timeToLive = 5 * time.Second // request time to live
var timeToLive = 20 * time.Second // request time to live
var timeToWaitForPushsyncToNeighbor = 3 * time.Second // time to wait to get a receipt for a chunk
var nPeersToPushsync = 3 // number of peers to replicate to as receipt is sent upstream

Expand Down Expand Up @@ -174,7 +176,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
span, _, ctx := ps.tracer.StartSpanFromContext(ctx, "pushsync-handler", ps.logger, opentracing.Tag{Key: "address", Value: chunk.Address().String()})
defer span.Finish()

receipt, err := ps.pushToClosest(ctx, chunk)
receipt, err := ps.pushToClosest(ctx, chunk, false)
if err != nil {
if errors.Is(err, topology.ErrWantSelf) {
_, err = ps.storer.Put(ctx, storage.ModePutSync, chunk)
Expand Down Expand Up @@ -300,7 +302,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)
// a receipt from that peer and returns error or nil based on the receiving and
// the validity of the receipt.
func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Receipt, error) {
r, err := ps.pushToClosest(ctx, ch)
r, err := ps.pushToClosest(ctx, ch, true)
if err != nil {
return nil, err
}
Expand All @@ -309,41 +311,23 @@ func (ps *PushSync) PushChunkToClosest(ctx context.Context, ch swarm.Chunk) (*Re
Signature: r.Signature}, nil
}

func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.Receipt, reterr error) {
func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, retryAllowed bool) (*pb.Receipt, error) {
span, logger, ctx := ps.tracer.StartSpanFromContext(ctx, "push-closest", ps.logger, opentracing.Tag{Key: "address", Value: ch.Address().String()})
defer span.Finish()

var (
skipPeers []swarm.Address
lastErr error
skipPeers []swarm.Address
allowedRetries = 1
resultC = make(chan *pushResult)
includeSelf = ps.isFullNode
)

stamp, err := ch.Stamp().MarshalBinary()
if err != nil {
return nil, err
if retryAllowed {
// only originator retries
allowedRetries = maxPeers
}

deferFuncs := make([]func(), 0)
defersFn := func() {
if len(deferFuncs) > 0 {
for _, deferFn := range deferFuncs {
deferFn()
}
deferFuncs = deferFuncs[:0]
}
}
defer defersFn()

for i := 0; i < maxPeers; i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

defersFn()

includeSelf := ps.isFullNode

for i := maxAttempts; allowedRetries > 0 && i > 0; i-- {
// find the next closest peer
peer, err := ps.topologyDriver.ClosestPeer(ch.Address(), includeSelf, skipPeers...)
if err != nil {
Expand All @@ -352,86 +336,108 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk) (rr *pb.R
// if ErrWantSelf is returned, it means we are the closest peer.
return nil, fmt.Errorf("closest peer: %w", err)
}

// save found peer (to be skipped if there is some error with him)
skipPeers = append(skipPeers, peer)

deferFuncs = append(deferFuncs, func() {
if lastErr != nil {
ps.metrics.TotalErrors.Inc()
logger.Errorf("pushsync: %v", lastErr)
}
})
go func(peer swarm.Address, ch swarm.Chunk) {
ctxd, canceld := context.WithTimeout(ctx, timeToLive)
defer canceld()

// compute the price we pay for this receipt and reserve it for the rest of this function
receiptPrice := ps.pricer.PeerPrice(peer, ch.Address())
r, attempted, err := ps.pushPeer(ctxd, peer, ch)
// attempted is true if we get past accounting and actually attempt
// to send the request to the peer. If we dont get past accounting, we
// should not count the retry and try with a different peer again
if attempted {
allowedRetries--
}
if err != nil {
logger.Debugf("could not push to peer %s: %v", peer.String(), err)
resultC <- &pushResult{err: err}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this error does not seem to be read anywhere.
Consider using the errgroup package.

return
}
select {
case resultC <- &pushResult{receipt: r}:
case <-ctx.Done():
}
}(peer, ch)

streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
lastErr = fmt.Errorf("new stream for peer %s: %w", peer.String(), err)
continue
select {
case r := <-resultC:
if r.receipt != nil {
return r.receipt, nil
}
// proceed to retrying if applicable
case <-ctx.Done():
return nil, ctx.Err()
}
deferFuncs = append(deferFuncs, func() { go streamer.FullClose() })
}

// Reserve to see whether we can make the request
err = ps.accounting.Reserve(ctx, peer, receiptPrice)
if err != nil {
return nil, fmt.Errorf("reserve balance for peer %s: %w", peer.String(), err)
}
deferFuncs = append(deferFuncs, func() { ps.accounting.Release(peer, receiptPrice) })

w, r := protobuf.NewWriterAndReader(streamer)
ctxd, canceld := context.WithTimeout(ctx, timeToLive)
deferFuncs = append(deferFuncs, func() { canceld() })
if err := w.WriteMsgWithContext(ctxd, &pb.Delivery{
Address: ch.Address().Bytes(),
Data: ch.Data(),
Stamp: stamp,
}); err != nil {
_ = streamer.Reset()
lastErr = fmt.Errorf("chunk %s deliver to peer %s: %w", ch.Address().String(), peer.String(), err)
continue
}
return nil, ErrNoPush
}

ps.metrics.TotalSent.Inc()
func (ps *PushSync) pushPeer(ctx context.Context, peer swarm.Address, ch swarm.Chunk) (*pb.Receipt, bool, error) {
// compute the price we pay for this receipt and reserve it for the rest of this function
receiptPrice := ps.pricer.PeerPrice(peer, ch.Address())

// if you manage to get a tag, just increment the respective counter
t, err := ps.tagger.Get(ch.TagID())
if err == nil && t != nil {
err = t.Inc(tags.StateSent)
if err != nil {
lastErr = fmt.Errorf("tag %d increment: %v", ch.TagID(), err)
err = lastErr
return nil, err
}
}
// Reserve to see whether we can make the request
err := ps.accounting.Reserve(ctx, peer, receiptPrice)
if err != nil {
return nil, false, fmt.Errorf("reserve balance for peer %s: %w", peer.String(), err)
}
defer ps.accounting.Release(peer, receiptPrice)

var receipt pb.Receipt
if err := r.ReadMsgWithContext(ctxd, &receipt); err != nil {
_ = streamer.Reset()
lastErr = fmt.Errorf("chunk %s receive receipt from peer %s: %w", ch.Address().String(), peer.String(), err)
continue
}
stamp, err := ch.Stamp().MarshalBinary()
if err != nil {
// TODO: Return modified error?
return nil, false, err
}

if !ch.Address().Equal(swarm.NewAddress(receipt.Address)) {
// if the receipt is invalid, try to push to the next peer
lastErr = fmt.Errorf("invalid receipt. chunk %s, peer %s", ch.Address().String(), peer.String())
continue
}
streamer, err := ps.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
if err != nil {
return nil, true, fmt.Errorf("new stream for peer %s: %w", peer.String(), err)
}
defer streamer.Close()

w, r := protobuf.NewWriterAndReader(streamer)
if err := w.WriteMsgWithContext(ctx, &pb.Delivery{
Address: ch.Address().Bytes(),
Data: ch.Data(),
Stamp: stamp,
}); err != nil {
_ = streamer.Reset()
return nil, true, fmt.Errorf("chunk %s deliver to peer %s: %w", ch.Address().String(), peer.String(), err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the String() call is not needed on %s arguments

}

err = ps.accounting.Credit(peer, receiptPrice)
ps.metrics.TotalSent.Inc()

// if you manage to get a tag, just increment the respective counter
t, err := ps.tagger.Get(ch.TagID())
if err == nil && t != nil {
err = t.Inc(tags.StateSent)
if err != nil {
return nil, err
return nil, true, fmt.Errorf("tag %d increment: %v", ch.TagID(), err)
}
}

return &receipt, nil
var receipt pb.Receipt
if err := r.ReadMsgWithContext(ctx, &receipt); err != nil {
_ = streamer.Reset()
return nil, true, fmt.Errorf("chunk %s receive receipt from peer %s: %w", ch.Address().String(), peer.String(), err)
}

logger.Tracef("pushsync: chunk %s: reached %v peers", ch.Address(), maxPeers)
if !ch.Address().Equal(swarm.NewAddress(receipt.Address)) {
// if the receipt is invalid, try to push to the next peer
return nil, true, fmt.Errorf("invalid receipt. chunk %s, peer %s", ch.Address().String(), peer.String())
}

if lastErr != nil {
return nil, lastErr
err = ps.accounting.Credit(peer, receiptPrice)
if err != nil {
return nil, true, err
}

return nil, topology.ErrNotFound
return &receipt, true, nil
}

type pushResult struct {
aloknerurkar marked this conversation as resolved.
Show resolved Hide resolved
receipt *pb.Receipt
err error
}
Loading