Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/network: fix data race in fetcher_test.go #18469

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions swarm/network/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,31 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage"
)

var searchTimeout = 1 * time.Second
const (
defaultSearchTimeout = 1 * time.Second
Copy link
Member

@nonsense nonsense Jan 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably best to keep this a var so that we can change it from tests. Applies also to maxHopCount.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second reading, I see that this is exactly what you are trying to fix here. I don't think we want to store the timeout in the fetcher - we might have thousands of these. I also see that we have a race by modifying the timeout in tests.

We should probably refactor the tests, rather than store the timeout in every single fetcher.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we only have two tests, I'd rather we fix the tests, than copy a searchTimeout in every fetcher.

// maximum number of forwarded requests (hops), to make sure requests are not
// forwarded forever in peer loops
maxHopCount uint8 = 20
)

// Time to consider peer to be skipped.
// Also used in stream delivery.
var RequestTimeout = 10 * time.Second

var maxHopCount uint8 = 20 // maximum number of forwarded requests (hops), to make sure requests are not forwarded forever in peer loops

type RequestFunc func(context.Context, *Request) (*enode.ID, chan struct{}, error)

// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and
// keeps it alive until all active requests are completed. This can happen:
// 1. either because the chunk is delivered
// 2. or becuse the requestor cancelled/timed out
// 2. or because the requester cancelled/timed out
// Fetcher self destroys itself after it is completed.
// TODO: cancel all forward requests after termination
type Fetcher struct {
protoRequestFunc RequestFunc // request function fetcher calls to issue retrieve request for a chunk
addr storage.Address // the address of the chunk to be fetched
offerC chan *enode.ID // channel of sources (peer node id strings)
requestC chan uint8 // channel for incoming requests (with the hopCount value in it)
searchTimeout time.Duration
skipCheck bool
}

Expand Down Expand Up @@ -79,7 +83,7 @@ func (r *Request) SkipPeer(nodeID string) bool {
}
t, ok := val.(time.Time)
if ok && time.Now().After(t.Add(RequestTimeout)) {
// deadine expired
// deadline expired
r.peersToSkip.Delete(nodeID)
return false
}
Expand All @@ -100,9 +104,10 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
}
}

// New contructs a new Fetcher, for the given chunk. All peers in peersToSkip are not requested to
// deliver the given chunk. peersToSkip should always contain the peers which are actively requesting
// this chunk, to make sure we don't request back the chunks from them.
// New constructs a new Fetcher, for the given chunk. All peers in peersToSkip
// are not requested to deliver the given chunk. peersToSkip should always
// contain the peers which are actively requesting this chunk, to make sure we
// don't request back the chunks from them.
// The created Fetcher is started and returned.
func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher {
fetcher := NewFetcher(source, f.request, f.skipCheck)
Expand All @@ -117,6 +122,7 @@ func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
protoRequestFunc: rf,
offerC: make(chan *enode.ID),
requestC: make(chan uint8),
searchTimeout: defaultSearchTimeout,
skipCheck: skipCheck,
}
}
Expand Down Expand Up @@ -176,7 +182,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// loop that keeps the fetching process alive
// after every request a timer is set. If this goes off we request again from another peer
// note that the previous request is still alive and has the chance to deliver, so
// rerequesting extends the search. ie.,
// requesting again extends the search. ie.,
// if a peer we requested from is gone we issue a new request, so the number of active
// requests never decreases
for {
Expand Down Expand Up @@ -209,13 +215,13 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// search timeout: too much time passed since the last request,
// extend the search to a new peer if we can find one
case <-waitC:
log.Trace("search timed out: rerequesting", "request addr", f.addr)
log.Trace("search timed out: requesting", "request addr", f.addr)
doRequest = requested

// all Fetcher context closed, can quit
case <-ctx.Done():
log.Trace("terminate fetcher", "request addr", f.addr)
// TODO: send cancelations to all peers left over in peers map (i.e., those we requested from)
// TODO: send cancellations to all peers left over in peers map (i.e., those we requested from)
return
}

Expand All @@ -231,7 +237,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// if wait channel is not set, set it to a timer
if requested {
if wait == nil {
wait = time.NewTimer(searchTimeout)
wait = time.NewTimer(f.searchTimeout)
defer wait.Stop()
waitC = wait.C
} else {
Expand All @@ -242,8 +248,8 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
default:
}
}
// reset the timer to go off after searchTimeout
wait.Reset(searchTimeout)
// reset the timer to go off after defaultSearchTimeout
wait.Reset(f.searchTimeout)
}
}
doRequest = false
Expand Down
16 changes: 5 additions & 11 deletions swarm/network/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,11 @@ func TestFetcherRetryOnTimeout(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)
// set searchTimeOut to low value so the test is quicker
fetcher.searchTimeout = 250 * time.Millisecond

peersToSkip := &sync.Map{}

// set searchTimeOut to low value so the test is quicker
defer func(t time.Duration) {
searchTimeout = t
}(searchTimeout)
searchTimeout = 250 * time.Millisecond

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -359,11 +355,9 @@ func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

// make sure searchTimeout is long so it is sure the request is not retried because of timeout
defer func(t time.Duration) {
searchTimeout = t
}(searchTimeout)
searchTimeout = 10 * time.Second
// make sure the searchTimeout is long so it is sure the request is not
// retried because of timeout
fetcher.searchTimeout = 10 * time.Second

peersToSkip := &sync.Map{}

Expand Down