From 445b47d27141538adfa31371f900fde72a927e67 Mon Sep 17 00:00:00 2001 From: acud <12988138+acud@users.noreply.github.com> Date: Mon, 7 Jun 2021 15:01:27 +0200 Subject: [PATCH] pusher: retry shallow receipts --- pkg/pusher/export_test.go | 6 ++++ pkg/pusher/pusher.go | 29 ++++++++++++++++-- pkg/pusher/pusher_test.go | 63 ++++++++++++++++++++++++++++++++++++--- pkg/pushsync/pushsync.go | 2 +- pkg/topology/mock/mock.go | 11 +++++-- pkg/topology/topology.go | 6 +++- 6 files changed, 106 insertions(+), 11 deletions(-) create mode 100644 pkg/pusher/export_test.go diff --git a/pkg/pusher/export_test.go b/pkg/pusher/export_test.go new file mode 100644 index 00000000000..558210aa8df --- /dev/null +++ b/pkg/pusher/export_test.go @@ -0,0 +1,6 @@ +package pusher + +var ( + RetryInterval = &retryInterval + RetryCount = &retryCount +) diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index d857e7eccbe..2d4138d9f94 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -34,6 +34,7 @@ type Service struct { networkID uint64 storer storage.Storer pushSyncer pushsync.PushSyncer + depther topology.NeighborhoodDepther logger logging.Logger tag *tags.Tags tracer *tracing.Tracer @@ -45,15 +46,20 @@ type Service struct { var ( retryInterval = 5 * time.Second // time interval between retries concurrentJobs = 10 // how many chunks to push simultaneously + retryCount = 6 ) -var ErrInvalidAddress = errors.New("invalid address") +var ( + ErrInvalidAddress = errors.New("invalid address") + ErrShallowReceipt = errors.New("shallow recipt") +) -func New(networkID uint64, storer storage.Storer, peerSuggester topology.ClosestPeerer, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer) *Service { +func New(networkID uint64, storer storage.Storer, depther topology.NeighborhoodDepther, pushSyncer pushsync.PushSyncer, tagger *tags.Tags, logger logging.Logger, tracer *tracing.Tracer) *Service { service := &Service{ networkID: networkID, storer: storer, pushSyncer: pushSyncer, + depther: depther, tag: tagger, logger: logger, tracer: tracer, @@ -80,6 +86,7 @@ func (s *Service) chunksWorker() { mtx sync.Mutex span opentracing.Span logger *logrus.Entry + retryCounter = make(map[string]int) ) defer timer.Stop() defer close(s.chunksWorkerQuitC) @@ -144,6 +151,7 @@ LOOP: storerPeer swarm.Address ) defer func() { + mtx.Lock() if err == nil { s.metrics.TotalSynced.Inc() s.metrics.SyncTime.Observe(time.Since(startTime).Seconds()) @@ -151,12 +159,12 @@ LOOP: logger.Tracef("pusher: pushed chunk %s to node %s", ch.Address().String(), storerPeer.String()) po := swarm.Proximity(ch.Address().Bytes(), storerPeer.Bytes()) s.metrics.ReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc() + delete(retryCounter, ch.Address().ByteString()) } else { s.metrics.TotalErrors.Inc() s.metrics.ErrorTime.Observe(time.Since(startTime).Seconds()) logger.Tracef("pusher: cannot push chunk %s: %v", ch.Address().String(), err) } - mtx.Lock() delete(inflight, ch.Address().String()) mtx.Unlock() <-sem @@ -190,6 +198,21 @@ LOOP: err = fmt.Errorf("pusher: receipt storer address: %w", err) return } + + po := swarm.Proximity(ch.Address().Bytes(), storerPeer.Bytes()) + d := s.depther.NeighborhoodDepth() + if po < d { + mtx.Lock() + retryCounter[ch.Address().ByteString()]++ + if retryCounter[ch.Address().ByteString()] < retryCount { + mtx.Unlock() + err = fmt.Errorf("pusher: shallow receipt depth %d, want at least %d", po, d) + po := swarm.Proximity(ch.Address().Bytes(), storerPeer.Bytes()) + s.metrics.ReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc() + return + } + mtx.Unlock() + } } if err = s.storer.Set(ctx, storage.ModeSetSync, ch.Address()); err != nil { diff --git a/pkg/pusher/pusher_test.go b/pkg/pusher/pusher_test.go index 9891ab3a3d5..1cef8fe5792 100644 --- a/pkg/pusher/pusher_test.go +++ b/pkg/pusher/pusher_test.go @@ -9,6 +9,7 @@ import ( "errors" "io/ioutil" "sync" + "sync/atomic" "testing" "time" @@ -87,7 +88,7 @@ func TestSendChunkToSyncWithTag(t *testing.T) { return receipt, nil }) - mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer)) + mtags, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0)) defer storer.Close() defer p.Close() @@ -143,7 +144,7 @@ func TestSendChunkToPushSyncWithoutTag(t *testing.T) { return receipt, nil }) - _, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer)) + _, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0)) defer storer.Close() defer p.Close() @@ -228,7 +229,7 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) { return receipt, nil }) - _, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer)) + _, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0)) defer storer.Close() defer p.Close() @@ -283,7 +284,7 @@ func TestPusherClose(t *testing.T) { return receipt, nil }) - _, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer)) + _, p, storer := createPusher(t, triggerPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(0)) chunk := testingc.GenerateTestRandomChunk() @@ -361,6 +362,60 @@ func TestPusherClose(t *testing.T) { } } +func TestPusherRetryShallow(t *testing.T) { + defer func(d time.Duration, retryCount int) { + *pusher.RetryInterval = d + *pusher.RetryCount = retryCount + }(*pusher.RetryInterval, *pusher.RetryCount) + *pusher.RetryInterval = 500 * time.Millisecond + *pusher.RetryCount = 3 + + var ( + pivotPeer = swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") + closestPeer = swarm.MustParseHexAddress("f000000000000000000000000000000000000000000000000000000000000000") + key, _ = crypto.GenerateSecp256k1Key() + signer = crypto.NewDefaultSigner(key) + callCount = int32(0) + ) + pushSyncService := pushsyncmock.New(func(ctx context.Context, chunk swarm.Chunk) (*pushsync.Receipt, error) { + atomic.AddInt32(&callCount, 1) + signature, _ := signer.Sign(chunk.Address().Bytes()) + receipt := &pushsync.Receipt{ + Address: swarm.NewAddress(chunk.Address().Bytes()), + Signature: signature, + } + return receipt, nil + }) + + // create the pivot peer pusher with depth 31, this makes + // sure that virtually any receipt generated by the random + // key will be considered too shallow + _, _, storer := createPusher(t, pivotPeer, pushSyncService, mock.WithClosestPeer(closestPeer), mock.WithNeighborhoodDepth(31)) + + // generate a chunk at PO 1 with closestPeer, meaning that we get a + // receipt which is shallower than the pivot peer's depth, resulting + // in retries + chunk := testingc.GenerateTestRandomChunkAt(closestPeer, 1) + + _, err := storer.Put(context.Background(), storage.ModePutUpload, chunk) + if err != nil { + t.Fatal(err) + } + c := 0 + for i := 0; i < 5; i++ { + c = int(atomic.LoadInt32(&callCount)) + if c == *pusher.RetryCount { + return + } + if c > *pusher.RetryCount { + t.Fatalf("too many retries. got %d want %d", c, *pusher.RetryCount) + } + time.Sleep(1 * time.Second) + } + + t.Fatalf("timed out waiting for retries. got %d want %d", c, *pusher.RetryCount) +} + func createPusher(t *testing.T, addr swarm.Address, pushSyncService pushsync.PushSyncer, mockOpts ...mock.Option) (*tags.Tags, *pusher.Service, *Store) { t.Helper() logger := logging.New(ioutil.Discard, 0) diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 18332aeba57..02c22242f6e 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -479,7 +479,7 @@ type pushResult struct { attempted bool } -const failureThreshold = 3 +const failureThreshold = 2 type failedRequestCache struct { mtx sync.RWMutex diff --git a/pkg/topology/mock/mock.go b/pkg/topology/mock/mock.go index e8ef9b4aaeb..2b769dbc13b 100644 --- a/pkg/topology/mock/mock.go +++ b/pkg/topology/mock/mock.go @@ -14,6 +14,7 @@ import ( type mock struct { peers []swarm.Address + depth uint8 closestPeer swarm.Address closestPeerErr error peersErr error @@ -35,6 +36,12 @@ func WithAddPeersErr(err error) Option { }) } +func WithNeighborhoodDepth(dd uint8) Option { + return optionFunc(func(d *mock) { + d.depth = dd + }) +} + func WithClosestPeer(addr swarm.Address) Option { return optionFunc(func(d *mock) { d.closestPeer = addr @@ -136,8 +143,8 @@ func (d *mock) SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) { return c, unsubscribe } -func (*mock) NeighborhoodDepth() uint8 { - return 0 +func (m *mock) NeighborhoodDepth() uint8 { + return m.depth } func (m *mock) IsWithinDepth(addr swarm.Address) bool { diff --git a/pkg/topology/topology.go b/pkg/topology/topology.go index 79d67fd7472..01781cc9560 100644 --- a/pkg/topology/topology.go +++ b/pkg/topology/topology.go @@ -25,7 +25,7 @@ type Driver interface { ClosestPeerer EachPeerer EachNeighbor - NeighborhoodDepth() uint8 + NeighborhoodDepther SubscribePeersChange() (c <-chan struct{}, unsubscribe func()) io.Closer Halter @@ -137,3 +137,7 @@ type Halter interface { // while allowing it to still run. Halt() } + +type NeighborhoodDepther interface { + NeighborhoodDepth() uint8 +}