diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 0df7f44fc0a..9ccfdcdffae 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -23,7 +23,7 @@ import ( ) var ( - logMore = true // enable this to get more logging + logMore = false // enable this to get more logging ) type Options struct { @@ -167,7 +167,12 @@ func (p *Puller) manage() { } for _, v := range peersToRecalc { - p.recalcPeer(ctx, v.addr, v.po, depth) + dontSync := p.recalcPeer(ctx, v.addr, v.po, depth) + // stopgap solution for peers that dont return the correct + // amount of cursors we expect + if dontSync { + peersDisconnected[v.addr.String()] = v + } } for _, v := range peersDisconnected { @@ -186,13 +191,14 @@ func (p *Puller) disconnectPeer(ctx context.Context, peer swarm.Address, po uint if logMore { p.logger.Debugf("puller disconnect cleanup peer %s po %d", peer, po) } - syncCtx := p.syncPeers[po][peer.String()] // disconnectPeer is called under lock, this is safe - syncCtx.gone() - + if syncCtx, ok := p.syncPeers[po][peer.String()]; ok { + // disconnectPeer is called under lock, this is safe + syncCtx.gone() + } delete(p.syncPeers[po], peer.String()) } -func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8) { +func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8) (dontSync bool) { if logMore { p.logger.Debugf("puller recalculating peer %s po %d depth %d", peer, po, d) } @@ -205,6 +211,10 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8 c := p.cursors[peer.String()] p.cursorsMtx.Unlock() + if len(c) != int(p.bins) { + return true + } + var want, dontWant []uint8 if po >= d { // within depth @@ -233,6 +243,7 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8 } syncCtx.cancelBins(dontWant...) + return false } func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) { @@ -261,6 +272,13 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) c = cursors } + // if length of returned cursors does not add up to + // what we expect it to be - dont do anything + if len(c) != int(p.bins) { + delete(p.syncPeers[po], peer.String()) + return + } + for bin, cur := range c { if bin == 0 || uint8(bin) < d { continue diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index 65626afd943..e77c4cfd62d 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -6,8 +6,8 @@ package puller_test import ( "errors" + "io/ioutil" "math" - "os" "testing" "time" @@ -123,7 +123,7 @@ func TestSyncFlow_PeerWithinDepth_Live(t *testing.T) { ), mockk.WithDepth(1), }, pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithLiveSyncReplies(tc.liveReplies...)}, - bins: 5, + bins: 2, }) t.Cleanup(func() { pullsync.Close() @@ -199,7 +199,7 @@ func TestSyncFlow_PeerWithinDepth_Historical(t *testing.T) { ), mockk.WithDepth(1), }, pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithAutoReply(), mockps.WithLiveSyncBlock()}, - bins: 5, + bins: 2, }) defer puller.Close() defer pullsync.Close() @@ -592,7 +592,7 @@ func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *moc s := mock.NewStateStore() ps := mockps.NewPullSync(ops.pullSync...) kad := mockk.NewMockKademlia(ops.kad...) - logger := logging.New(os.Stdout, 5) + logger := logging.New(ioutil.Discard, 0) o := puller.Options{ Bins: ops.bins,