Skip to content

Commit

Permalink
feat(pusher): retry shallow receipts
Browse files Browse the repository at this point in the history
  • Loading branch information
acud committed Jun 14, 2021
1 parent 1e58ecf commit c14b840
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 21 deletions.
6 changes: 6 additions & 0 deletions pkg/pusher/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package pusher

var (
RetryInterval = &retryInterval
RetryCount = &retryCount
)
12 changes: 11 additions & 1 deletion pkg/pusher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type metrics struct {
SyncTime prometheus.Histogram
ErrorTime prometheus.Histogram

ReceiptDepth *prometheus.CounterVec
ReceiptDepth *prometheus.CounterVec
ShallowReceiptDepth *prometheus.CounterVec
}

func newMetrics() metrics {
Expand Down Expand Up @@ -72,6 +73,15 @@ func newMetrics() metrics {
},
[]string{"depth"},
),
ShallowReceiptDepth: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "shallow_receipt_depth",
Help: "Counter of shallow receipts received at different depths.",
},
[]string{"depth"},
),
}
}

Expand Down
32 changes: 28 additions & 4 deletions pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Service struct {
networkID uint64
storer storage.Storer
pushSyncer pushsync.PushSyncer
depther topology.NeighborhoodDepther
logger logging.Logger
tag *tags.Tags
tracer *tracing.Tracer
Expand All @@ -45,15 +46,20 @@ type Service struct {
var (
retryInterval = 5 * time.Second // time interval between retries
concurrentJobs = 10 // how many chunks to push simultaneously
retryCount = 6
)

var ErrInvalidAddress = errors.New("invalid address")
var (
ErrInvalidAddress = errors.New("invalid address")
ErrShallowReceipt = errors.New("shallow recipt")
)

func New(networkID uint64, storer storage.Storer, peerSuggester topology.ClosestPeerer, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer, warmupTime time.Duration) *Service {
func New(networkID uint64, storer storage.Storer, depther topology.NeighborhoodDepther, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer, warmupTime time.Duration) *Service {
service := &Service{
networkID: networkID,
storer: storer,
pushSyncer: pushSyncer,
depther: depther,
tag: tagger,
logger: logger,
tracer: tracer,
Expand All @@ -80,6 +86,7 @@ func (s *Service) chunksWorker(warmupTime time.Duration) {
mtx sync.Mutex
span opentracing.Span
logger *logrus.Entry
retryCounter = make(map[string]int)
)

defer timer.Stop()
Expand Down Expand Up @@ -154,19 +161,20 @@ LOOP:
storerPeer swarm.Address
)
defer func() {
mtx.Lock()
if err == nil {
s.metrics.TotalSynced.Inc()
s.metrics.SyncTime.Observe(time.Since(startTime).Seconds())
// only print this if there was no error while sending the chunk
logger.Tracef("pusher: pushed chunk %s to node %s", ch.Address().String(), storerPeer.String())
po := swarm.Proximity(ch.Address().Bytes(), storerPeer.Bytes())
logger.Tracef("pusher: pushed chunk %s to node %s, receipt depth %d", ch.Address().String(), storerPeer.String(), po)
s.metrics.ReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc()
delete(retryCounter, ch.Address().ByteString())
} else {
s.metrics.TotalErrors.Inc()
s.metrics.ErrorTime.Observe(time.Since(startTime).Seconds())
logger.Tracef("pusher: cannot push chunk %s: %v", ch.Address().String(), err)
}
mtx.Lock()
delete(inflight, ch.Address().String())
mtx.Unlock()
<-sem
Expand Down Expand Up @@ -200,6 +208,22 @@ LOOP:
err = fmt.Errorf("pusher: receipt storer address: %w", err)
return
}

po := swarm.Proximity(ch.Address().Bytes(), storerPeer.Bytes())
d := s.depther.NeighborhoodDepth()
if po < d {
mtx.Lock()
retryCounter[ch.Address().ByteString()]++
if retryCounter[ch.Address().ByteString()] < retryCount {
mtx.Unlock()
err = fmt.Errorf("pusher: shallow receipt depth %d, want at least %d", po, d)
s.metrics.ShallowReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc()
return
}
mtx.Unlock()
} else {
s.metrics.ReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc()
}
}

if err = s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil {
Expand Down
63 changes: 59 additions & 4 deletions pkg/pusher/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"io/ioutil"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -87,7 +88,7 @@ func TestSendChunkToSyncWithTag(t *testing.T) {
return receipt, nil
})

mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
defer storer.Close()
defer p.Close()

Expand Down Expand Up @@ -143,7 +144,7 @@ func TestSendChunkToPushSyncWithoutTag(t *testing.T) {
return receipt, nil
})

_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
defer storer.Close()
defer p.Close()

Expand Down Expand Up @@ -228,7 +229,7 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
return receipt, nil
})

_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))
defer storer.Close()
defer p.Close()

Expand Down Expand Up @@ -283,7 +284,7 @@ func TestPusherClose(t *testing.T) {
return receipt, nil
})

_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer))
_, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0))

chunk := testingc.GenerateTestRandomChunk()

Expand Down Expand Up @@ -361,6 +362,60 @@ func TestPusherClose(t *testing.T) {
}
}

func TestPusherRetryShallow(t *testing.T) {
defer func(d time.Duration, retryCount int) {
*pusher.RetryInterval = d
*pusher.RetryCount = retryCount
}(*pusher.RetryInterval, *pusher.RetryCount)
*pusher.RetryInterval = 500 * time.Millisecond
*pusher.RetryCount = 3

var (
pivotPeer = swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
closestPeer = swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000")
key, _ = crypto.GenerateSecp256k1Key()
signer = crypto.NewDefaultSigner(key)
callCount = int32(0)
)
pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) {
atomic.AddInt32(&callCount, 1)
signature, _ := signer.Sign(chunk.Address().Bytes())
receipt := &pushsync.Receipt{
Address: swarm.NewAddress(chunk.Address().Bytes()),
Signature: signature,
}
return receipt, nil
})

// create the pivot peer pusher with depth 31, this makes
// sure that virtually any receipt generated by the random
// key will be considered too shallow
_, _, storer := createPusher(t, pivotPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(31))

// generate a chunk at PO 1 with closestPeer, meaning that we get a
// receipt which is shallower than the pivot peer's depth, resulting
// in retries
chunk := testingc.GenerateTestRandomChunkAt(closestPeer, 1)

_, err := storer.Put(context.Background(), storage.ModePutUpload, chunk)
if err != nil {
t.Fatal(err)
}
c := 0
for i := 0; i < 5; i++ {
c = int(atomic.LoadInt32(&callCount))
if c == *pusher.RetryCount {
return
}
if c > *pusher.RetryCount {
t.Fatalf("too many retries. got %d want %d", c, *pusher.RetryCount)
}
time.Sleep(1 * time.Second)
}

t.Fatalf("timed out waiting for retries. got %d want %d", c, *pusher.RetryCount)
}

func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.PushSyncer, mockOpts ...mock.Option) (*tags.Tags, *pusher.Service, *Store) {
t.Helper()
logger := logging.New(ioutil.Discard, 0)
Expand Down
2 changes: 1 addition & 1 deletion pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ type pushResult struct {
attempted bool
}

const failureThreshold = 3
const failureThreshold = 2

type failedRequestCache struct {
mtx sync.RWMutex
Expand Down
9 changes: 1 addition & 8 deletions pkg/pushsync/pushsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,14 +873,8 @@ func TestFailureRequestCache(t *testing.T) {
}

cache.RecordFailure(peer, chunk)
if !cache.Useful(peer, chunk) {
t.Fatal("incorrect cache state after 2nd failure")
}

cache.RecordFailure(peer, chunk)

if cache.Useful(peer, chunk) {
t.Fatal("peer should no longer be useful")
t.Fatal("incorrect cache state after 2nd failure")
}
})

Expand All @@ -901,7 +895,6 @@ func TestFailureRequestCache(t *testing.T) {
// the previous failed request and the peer should still be useful after
// more failures
cache.RecordFailure(peer, chunk)
cache.RecordFailure(peer, chunk)

if !cache.Useful(peer, chunk) {
t.Fatal("peer should still be useful after intermittent success")
Expand Down
11 changes: 9 additions & 2 deletions pkg/topology/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

type mock struct {
peers []swarm.Address
depth uint8
closestPeer swarm.Address
closestPeerErr error
peersErr error
Expand All @@ -35,6 +36,12 @@ func WithAddPeersErr(err error) Option {
})
}

func WithNeighborhoodDepth(dd uint8) Option {
return optionFunc(func(d *mock) {
d.depth = dd
})
}

func WithClosestPeer(addr swarm.Address) Option {
return optionFunc(func(d *mock) {
d.closestPeer = addr
Expand Down Expand Up @@ -136,8 +143,8 @@ func (d *mock) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) {
return c, unsubscribe
}

func (*mock) NeighborhoodDepth() uint8 {
return 0
func (m *mock) NeighborhoodDepth() uint8 {
return m.depth
}

func (m *mock) IsWithinDepth(addr swarm.Address) bool {
Expand Down
6 changes: 5 additions & 1 deletion pkg/topology/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Driver interface {
ClosestPeerer
EachPeerer
EachNeighbor
NeighborhoodDepth() uint8
NeighborhoodDepther
SubscribePeersChange() (c <-chan struct{}, unsubscribe func())
io.Closer
Halter
Expand Down Expand Up @@ -137,3 +137,7 @@ type Halter interface {
// while allowing it to still run.
Halt()
}

type NeighborhoodDepther interface {
NeighborhoodDepth() uint8
}

0 comments on commit c14b840

Please sign in to comment.