Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(puller): cursors length panic #1851

Merged
merged 1 commit into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

var (
logMore = true // enable this to get more logging
logMore = false // enable this to get more logging
)

type Options struct {
Expand Down Expand Up @@ -167,7 +167,12 @@ func (p *Puller) manage() {
}

for _, v := range peersToRecalc {
p.recalcPeer(ctx, v.addr, v.po, depth)
dontSync := p.recalcPeer(ctx, v.addr, v.po, depth)
// stopgap solution for peers that dont return the correct
// amount of cursors we expect
if dontSync {
peersDisconnected[v.addr.String()] = v
}
}

for _, v := range peersDisconnected {
Expand All @@ -186,13 +191,14 @@ func (p *Puller) disconnectPeer(ctx context.Context, peer swarm.Address, po uint
if logMore {
p.logger.Debugf("puller disconnect cleanup peer %s po %d", peer, po)
}
syncCtx := p.syncPeers[po][peer.String()] // disconnectPeer is called under lock, this is safe
syncCtx.gone()

if syncCtx, ok := p.syncPeers[po][peer.String()]; ok {
// disconnectPeer is called under lock, this is safe
syncCtx.gone()
}
delete(p.syncPeers[po], peer.String())
}

func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8) {
func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8) (dontSync bool) {
if logMore {
p.logger.Debugf("puller recalculating peer %s po %d depth %d", peer, po, d)
}
Expand All @@ -205,6 +211,10 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
c := p.cursors[peer.String()]
p.cursorsMtx.Unlock()

if len(c) != int(p.bins) {
return true
}

var want, dontWant []uint8
if po >= d {
// within depth
Expand Down Expand Up @@ -233,6 +243,7 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8
}

syncCtx.cancelBins(dontWant...)
return false
}

func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) {
Expand Down Expand Up @@ -261,6 +272,13 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8)
c = cursors
}

// if length of returned cursors does not add up to
// what we expect it to be - dont do anything
if len(c) != int(p.bins) {
delete(p.syncPeers[po], peer.String())
return
}

for bin, cur := range c {
if bin == 0 || uint8(bin) < d {
continue
Expand Down
8 changes: 4 additions & 4 deletions pkg/puller/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package puller_test

import (
"errors"
"io/ioutil"
"math"
"os"
"testing"
"time"

Expand Down Expand Up @@ -123,7 +123,7 @@ func TestSyncFlow_PeerWithinDepth_Live(t *testing.T) {
), mockk.WithDepth(1),
},
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithLiveSyncReplies(tc.liveReplies...)},
bins: 5,
bins: 2,
})
t.Cleanup(func() {
pullsync.Close()
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestSyncFlow_PeerWithinDepth_Historical(t *testing.T) {
), mockk.WithDepth(1),
},
pullSync: []mockps.Option{mockps.WithCursors(tc.cursors), mockps.WithAutoReply(), mockps.WithLiveSyncBlock()},
bins: 5,
bins: 2,
})
defer puller.Close()
defer pullsync.Close()
Expand Down Expand Up @@ -592,7 +592,7 @@ func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *moc
s := mock.NewStateStore()
ps := mockps.NewPullSync(ops.pullSync...)
kad := mockk.NewMockKademlia(ops.kad...)
logger := logging.New(os.Stdout, 5)
logger := logging.New(ioutil.Discard, 0)

o := puller.Options{
Bins: ops.bins,
Expand Down