Skip to content

Commit

Permalink
fix(puller): cursors length panic (#1851)
Browse files Browse the repository at this point in the history
  • Loading branch information
acud authored May 24, 2021
1 parent 4d918bf commit 799c511
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
30 changes: 24 additions & 6 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

var (
logMore = true // enable this to get more logging
logMore = false // enable this to get more logging
)

type Options struct {
Expand Down Expand Up @@ -167,7 +167,12 @@ func (p *Puller) manage() {
}

for _, v := range peersToRecalc {
p.recalcPeer(ctx, v.addr, v.po, depth)
dontSync := p.recalcPeer(ctx, v.addr, v.po, depth)
// stopgap solution for peers that dont return the correct
// amount of cursors we expect
if dontSync {
peersDisconnected[v.addr.String()] = v
}
}

for _, v := range peersDisconnected {
Expand All @@ -186,13 +191,14 @@ func (p *Puller) disconnectPeer(ctx context.Context, peer swarm.Address, po uint
if logMore {
p.logger.Debugf("puller disconnect cleanup peer %s po %d", peer, po)
}
syncCtx := p.syncPeers[po][peer.String()] // disconnectPeer is called under lock, this is safe
syncCtx.gone()

if syncCtx, ok := p.syncPeers[po][peer.String()]; ok {
// disconnectPeer is called under lock, this is safe
syncCtx.gone()
}
delete(p.syncPeers[po], peer.String())
}

func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8) {
func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8) (dontSync bool) {
if logMore {
p.logger.Debugf("puller recalculating peer %s po %d depth %d", peer, po, d)
}
Expand All @@ -205,6 +211,10 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
c := p.cursors[peer.String()]
p.cursorsMtx.Unlock()

if len(c) != int(p.bins) {
return true
}

var want, dontWant []uint8
if po >= d {
// within depth
Expand Down Expand Up @@ -233,6 +243,7 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
}

syncCtx.cancelBins(dontWant...)
return false
}

func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) {
Expand Down Expand Up @@ -261,6 +272,13 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
c = cursors
}

// if length of returned cursors does not add up to
// what we expect it to be - dont do anything
if len(c) != int(p.bins) {
delete(p.syncPeers[po], peer.String())
return
}

for bin, cur := range c {
if bin == 0 || uint8(bin) < d {
continue
Expand Down
8 changes: 4 additions & 4 deletions pkg/puller/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package puller_test

import (
"errors"
"io/ioutil"
"math"
"os"
"testing"
"time"

Expand Down Expand Up @@ -123,7 +123,7 @@ func TestSyncFlow_PeerWithinDepth_Live(t *testing.T) {
), mockk.WithDepth(1),
},
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithLiveSyncReplies(tc.liveReplies...)},
bins: 5,
bins: 2,
})
t.Cleanup(func() {
pullsync.Close()
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestSyncFlow_PeerWithinDepth_Historical(t *testing.T) {
), mockk.WithDepth(1),
},
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithAutoReply(), mockps.WithLiveSyncBlock()},
bins: 5,
bins: 2,
})
defer puller.Close()
defer pullsync.Close()
Expand Down Expand Up @@ -592,7 +592,7 @@ func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *moc
s := mock.NewStateStore()
ps := mockps.NewPullSync(ops.pullSync...)
kad := mockk.NewMockKademlia(ops.kad...)
logger := logging.New(os.Stdout, 5)
logger := logging.New(ioutil.Discard, 0)

o := puller.Options{
Bins: ops.bins,
Expand Down

0 comments on commit 799c511

Please sign in to comment.