Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

retrieval: fix memory leak #2103

Merged
merged 4 commits into from
Feb 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion network/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ func (k *Kademlia) kademliaInfo() (ki KademliaInfo) {

row := []string{}
bin.ValIterator(func(val pot.Val) bool {
e := val.(*Peer)
e := val.(*entry)
row = append(row, hex.EncodeToString(e.Address()))
return true
})
Expand Down
7 changes: 7 additions & 0 deletions network/retrieval/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ func (p *Peer) addRetrieval(ruid uint, addr storage.Address) {
p.retrievals[ruid] = addr
}

func (p *Peer) expireRetrieval(ruid uint) {
p.mtx.Lock()
defer p.mtx.Unlock()

delete(p.retrievals, ruid)
}

// chunkReceived is called upon ChunkDelivery message reception
// it is meant to idenfify unsolicited chunk deliveries
func (p *Peer) checkRequest(ruid uint, addr storage.Address) error {
Expand Down
17 changes: 11 additions & 6 deletions network/retrieval/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,9 @@ func (r *Retrieval) handleChunkDelivery(ctx context.Context, p *Peer, msg *Chunk
return nil
}

// RequestFromPeers sends a chunk retrieve request to the next found peer
func (r *Retrieval) RequestFromPeers(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) {
// RequestFromPeers sends a chunk retrieve request to the next found peer.
// returns the next peer to try, a cleanup function to expire retrievals that were never delivered
func (r *Retrieval) RequestFromPeers(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, func(), error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have a short explanation what is the purpose of the returned function, as now, it requires to go through the code. Or to name it in function signature.

r.logger.Debug("retrieval.requestFromPeers", "req.Addr", req.Addr, "localID", localID)
metrics.GetOrRegisterCounter("network.retrieve.request_from_peers", nil).Inc(1)

Expand All @@ -395,7 +396,7 @@ FINDPEER:
sp, err := r.findPeerLB(ctx, req)
if err != nil {
r.logger.Trace(err.Error())
return nil, err
return nil, func() {}, err
}

protoPeer := r.getPeer(sp.ID())
Expand All @@ -405,7 +406,7 @@ FINDPEER:
retries++
if retries == maxFindPeerRetries {
r.logger.Error("max find peer retries reached", "max retries", maxFindPeerRetries, "ref", req.Addr)
return nil, ErrNoPeerFound
return nil, func() {}, ErrNoPeerFound
}

goto FINDPEER
Expand All @@ -417,14 +418,18 @@ FINDPEER:
}
protoPeer.logger.Trace("sending retrieve request", "ref", ret.Addr, "origin", localID, "ruid", ret.Ruid)
protoPeer.addRetrieval(ret.Ruid, ret.Addr)
cleanup := func() {
protoPeer.expireRetrieval(ret.Ruid)
}
err = protoPeer.Send(ctx, ret)
if err != nil {
protoPeer.logger.Error("error sending retrieve request to peer", "ruid", ret.Ruid, "err", err)
return nil, err
cleanup()
return nil, func() {}, err
}

spID := protoPeer.ID()
return &spID, nil
return &spID, cleanup, nil
}

func (r *Retrieval) Start(server *p2p.Server) error {
Expand Down
65 changes: 61 additions & 4 deletions network/retrieval/retrieve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethersphere/swarm/chunk"
chunktesting "github.com/ethersphere/swarm/chunk/testing"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/network/simulation"
"github.com/ethersphere/swarm/p2p/protocols"
Expand Down Expand Up @@ -137,6 +138,62 @@ func TestChunkDelivery(t *testing.T) {
}
}

// TestNoSuitablePeer brings up two nodes, tries to retrieve a chunk which is never
// found, expecting a NoSuitablePeer error from netstore
func TestNoSuitablePeer(t *testing.T) {
nodes := 2

sim := simulation.NewBzzInProc(map[string]simulation.ServiceFunc{
"bzz-retrieve": newBzzRetrieveWithLocalstore,
}, true)
defer sim.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := sim.AddNodesAndConnectFull(nodes)
if err != nil {
t.Fatal(err)
}

result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
nodeIDs := sim.UpNodeIDs()
if len(nodeIDs) != nodes {
t.Fatal("not enough nodes up")
}
// allow the two nodes time to set up the protocols otherwise kademlias will be empty when retrieve requests happen
i := 0
for iterate := true; iterate; {
kinfo := sim.MustNodeItem(nodeIDs[1], simulation.BucketKeyKademlia).(*network.Kademlia).KademliaInfo()
if kinfo.TotalConnections != 1 {
i++
} else {
break
}
time.Sleep(50 * time.Millisecond)
if i == 5 {
t.Fatal("timed out waiting for 1 connections")
}
}

log.Debug("fetching through node", "enode", nodeIDs[1])
ns := sim.MustNodeItem(nodeIDs[1], bucketKeyNetstore).(*storage.NetStore)
c := chunktesting.GenerateTestRandomChunk()

ref := c.Address()
_, err := ns.Get(context.Background(), chunk.ModeGetRequest, storage.NewRequest(ref))
if err == nil {
return errors.New("expected netstore retrieval error but got none")
}
if err != storage.ErrNoSuitablePeer {
return fmt.Errorf("expected ErrNoSuitablePeer but got %v instead", err)
}
return nil
})
if result.Error != nil {
t.Fatal(result.Error)
}
}

// TestUnsolicitedChunkDelivery tests that a node is dropped in response to an unsolicited chunk delivery
// this case covers a chunk Ruid that was not previously known to the downstream peer
func TestUnsolicitedChunkDelivery(t *testing.T) {
Expand Down Expand Up @@ -193,8 +250,8 @@ func TestUnsolicitedChunkDeliveryFaultyAddr(t *testing.T) {
t.Fatal(err)
}
defer teardown()
ns.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) {
return &enode.ID{}, nil
ns.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, func(), error) {
return &enode.ID{}, func() {}, nil
}
node := tester.Nodes[0]

Expand Down Expand Up @@ -267,8 +324,8 @@ func TestUnsolicitedChunkDeliveryDouble(t *testing.T) {
t.Fatal(err)
}
defer teardown()
ns.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) {
return &enode.ID{}, nil
ns.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, func(), error) {
return &enode.ID{}, func() {}, nil
}
node := tester.Nodes[0]

Expand Down
8 changes: 4 additions & 4 deletions storage/feed/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error)
localStore := chunk.NewValidatorStore(db, storage.NewContentAddressValidator(storage.MakeHashFunc(feedsHashAlgorithm)), fh)

netStore := storage.NewNetStore(localStore, network.NewBzzAddr(make([]byte, 32), nil))
netStore.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) {
return nil, errors.New("not found")
netStore.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, func(), error) {
return nil, func() {}, errors.New("not found")
}
fh.SetStore(netStore)
return &TestHandler{fh}, nil
Expand All @@ -69,8 +69,8 @@ func newTestHandlerWithStore(fh *Handler, datadir string, db chunk.Store, params
localStore := chunk.NewValidatorStore(db, storage.NewContentAddressValidator(storage.MakeHashFunc(feedsHashAlgorithm)), fh)

netStore := storage.NewNetStore(localStore, network.NewBzzAddr(make([]byte, 32), nil))
netStore.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) {
return nil, errors.New("not found")
netStore.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, func(), error) {
return nil, func() {}, errors.New("not found")
}
fh.SetStore(netStore)
return &TestHandler{fh}, nil
Expand Down
5 changes: 3 additions & 2 deletions storage/netstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (fi *Fetcher) SafeClose(ch chunk.Chunk) {
})
}

type RemoteGetFunc func(ctx context.Context, req *Request, localID enode.ID) (*enode.ID, error)
type RemoteGetFunc func(ctx context.Context, req *Request, localID enode.ID) (*enode.ID, func(), error)

// NetStore is an extension of LocalStore
// it implements the ChunkStore interface
Expand Down Expand Up @@ -247,13 +247,14 @@ func (n *NetStore) RemoteFetch(ctx context.Context, req *Request, fi *Fetcher) (

log.Trace("remote.fetch", "ref", ref)

currentPeer, err := n.RemoteGet(ctx, req, n.LocalID)
currentPeer, cleanup, err := n.RemoteGet(ctx, req, n.LocalID)
if err != nil {
n.logger.Trace(err.Error(), "ref", ref)
osp.LogFields(olog.String("err", err.Error()))
osp.Finish()
return nil, ErrNoSuitablePeer
}
defer cleanup()

// add peer to the set of peers to skip from now
n.logger.Trace("remote.fetch, adding peer to skip", "ref", ref, "peer", currentPeer.String())
Expand Down