Skip to content

Commit

Permalink
swarm/network, swarm/storage: Preserve opentracing contexts (#19022)
Browse files Browse the repository at this point in the history
  • Loading branch information
nolash authored and nonsense committed Feb 8, 2019
1 parent 0436412 commit 0c10d37
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 107 deletions.
36 changes: 19 additions & 17 deletions swarm/network/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Fetcher struct {
requestC chan uint8 // channel for incoming requests (with the hopCount value in it)
searchTimeout time.Duration
skipCheck bool
ctx context.Context
}

type Request struct {
Expand Down Expand Up @@ -109,29 +110,30 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
// contain the peers which are actively requesting this chunk, to make sure we
// don't request back the chunks from them.
// The created Fetcher is started and returned.
func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher {
fetcher := NewFetcher(source, f.request, f.skipCheck)
go fetcher.run(ctx, peersToSkip)
func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peers *sync.Map) storage.NetFetcher {
fetcher := NewFetcher(ctx, source, f.request, f.skipCheck)
go fetcher.run(peers)
return fetcher
}

// NewFetcher creates a new Fetcher for the given chunk address using the given request function.
func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
func NewFetcher(ctx context.Context, addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
return &Fetcher{
addr: addr,
protoRequestFunc: rf,
offerC: make(chan *enode.ID),
requestC: make(chan uint8),
searchTimeout: defaultSearchTimeout,
skipCheck: skipCheck,
ctx: ctx,
}
}

// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally.
func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
func (f *Fetcher) Offer(source *enode.ID) {
// First we need to have this select to make sure that we return if context is done
select {
case <-ctx.Done():
case <-f.ctx.Done():
return
default:
}
Expand All @@ -140,15 +142,15 @@ func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select {
case f.offerC <- source:
case <-ctx.Done():
case <-f.ctx.Done():
}
}

// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
func (f *Fetcher) Request(hopCount uint8) {
// First we need to have this select to make sure that we return if context is done
select {
case <-ctx.Done():
case <-f.ctx.Done():
return
default:
}
Expand All @@ -162,13 +164,13 @@ func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select {
case f.requestC <- hopCount + 1:
case <-ctx.Done():
case <-f.ctx.Done():
}
}

// start prepares the Fetcher
// it keeps the Fetcher alive within the lifecycle of the passed context
func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
func (f *Fetcher) run(peers *sync.Map) {
var (
doRequest bool // determines if retrieval is initiated in the current iteration
wait *time.Timer // timer for search timeout
Expand Down Expand Up @@ -219,7 +221,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
doRequest = requested

// all Fetcher context closed, can quit
case <-ctx.Done():
case <-f.ctx.Done():
log.Trace("terminate fetcher", "request addr", f.addr)
// TODO: send cancellations to all peers left over in peers map (i.e., those we requested from)
return
Expand All @@ -228,7 +230,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// need to issue a new request
if doRequest {
var err error
sources, err = f.doRequest(ctx, gone, peers, sources, hopCount)
sources, err = f.doRequest(gone, peers, sources, hopCount)
if err != nil {
log.Info("unable to request", "request addr", f.addr, "err", err)
}
Expand Down Expand Up @@ -266,7 +268,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// * the peer's address is added to the set of peers to skip
// * the peer's address is removed from prospective sources, and
// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
var i int
var sourceID *enode.ID
var quit chan struct{}
Expand All @@ -283,7 +285,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
for i = 0; i < len(sources); i++ {
req.Source = sources[i]
var err error
sourceID, quit, err = f.protoRequestFunc(ctx, req)
sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err == nil {
// remove the peer from known sources
// Note: we can modify the source although we are looping on it, because we break from the loop immediately
Expand All @@ -297,7 +299,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
if !foundSource {
req.Source = nil
var err error
sourceID, quit, err = f.protoRequestFunc(ctx, req)
sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err != nil {
// if no peers found to request from
return sources, err
Expand All @@ -314,7 +316,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
select {
case <-quit:
gone <- sourceID
case <-ctx.Done():
case <-f.ctx.Done():
}
}()
return sources, nil
Expand Down
113 changes: 54 additions & 59 deletions swarm/network/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,21 @@ func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*enode
func TestFetcherSingleRequest(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fetcher := NewFetcher(ctx, addr, requester.doRequest, true)

peers := []string{"a", "b", "c", "d"}
peersToSkip := &sync.Map{}
for _, p := range peers {
peersToSkip.Store(p, time.Now())
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go fetcher.run(ctx, peersToSkip)
go fetcher.run(peersToSkip)

rctx := context.Background()
fetcher.Request(rctx, 0)
fetcher.Request(0)

select {
case request := <-requester.requestC:
Expand Down Expand Up @@ -115,20 +115,19 @@ func TestFetcherSingleRequest(t *testing.T) {
func TestFetcherCancelStopsFetcher(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

peersToSkip := &sync.Map{}

ctx, cancel := context.WithCancel(context.Background())

fetcher := NewFetcher(ctx, addr, requester.doRequest, true)

peersToSkip := &sync.Map{}

// we start the fetcher, and then we immediately cancel the context
go fetcher.run(ctx, peersToSkip)
go fetcher.run(peersToSkip)
cancel()

rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer rcancel()
// we call Request with an active context
fetcher.Request(rctx, 0)
fetcher.Request(0)

// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
select {
Expand All @@ -140,23 +139,23 @@ func TestFetcherCancelStopsFetcher(t *testing.T) {

// TestFetchCancelStopsRequest tests that calling a Request function with a cancelled context does not initiate a request
func TestFetcherCancelStopsRequest(t *testing.T) {
t.Skip("since context is now per fetcher, this test is likely redundant")

requester := newMockRequester(100 * time.Millisecond)
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

peersToSkip := &sync.Map{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// we start the fetcher with an active context
go fetcher.run(ctx, peersToSkip)
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)

rctx, rcancel := context.WithCancel(context.Background())
rcancel()
peersToSkip := &sync.Map{}

// we start the fetcher with an active context
go fetcher.run(peersToSkip)

// we call Request with a cancelled context
fetcher.Request(rctx, 0)
fetcher.Request(0)

// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
select {
Expand All @@ -166,8 +165,7 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
}

// if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled
rctx = context.Background()
fetcher.Request(rctx, 0)
fetcher.Request(0)

select {
case <-requester.requestC:
Expand All @@ -182,19 +180,19 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
func TestFetcherOfferUsesSource(t *testing.T) {
requester := newMockRequester(100 * time.Millisecond)
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

peersToSkip := &sync.Map{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fetcher := NewFetcher(ctx, addr, requester.doRequest, true)

peersToSkip := &sync.Map{}

// start the fetcher
go fetcher.run(ctx, peersToSkip)
go fetcher.run(peersToSkip)

rctx := context.Background()
// call the Offer function with the source peer
fetcher.Offer(rctx, &sourcePeerID)
fetcher.Offer(&sourcePeerID)

// fetcher should not initiate request
select {
Expand All @@ -204,8 +202,7 @@ func TestFetcherOfferUsesSource(t *testing.T) {
}

// call Request after the Offer
rctx = context.Background()
fetcher.Request(rctx, 0)
fetcher.Request(0)

// there should be exactly 1 request coming from fetcher
var request *Request
Expand Down Expand Up @@ -234,19 +231,19 @@ func TestFetcherOfferUsesSource(t *testing.T) {
func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
requester := newMockRequester(100 * time.Millisecond)
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

peersToSkip := &sync.Map{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fetcher := NewFetcher(ctx, addr, requester.doRequest, true)

peersToSkip := &sync.Map{}

// start the fetcher
go fetcher.run(ctx, peersToSkip)
go fetcher.run(peersToSkip)

// call Request first
rctx := context.Background()
fetcher.Request(rctx, 0)
fetcher.Request(0)

// there should be a request coming from fetcher
var request *Request
Expand All @@ -260,7 +257,7 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
}

// after the Request call Offer
fetcher.Offer(context.Background(), &sourcePeerID)
fetcher.Offer(&sourcePeerID)

// there should be a request coming from fetcher
select {
Expand All @@ -283,21 +280,21 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
func TestFetcherRetryOnTimeout(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
// set searchTimeOut to low value so the test is quicker
fetcher.searchTimeout = 250 * time.Millisecond

peersToSkip := &sync.Map{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// start the fetcher
go fetcher.run(ctx, peersToSkip)
go fetcher.run(peersToSkip)

// call the fetch function with an active context
rctx := context.Background()
fetcher.Request(rctx, 0)
fetcher.Request(0)

// after 100ms the first request should be initiated
time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -339,7 +336,7 @@ func TestFetcherFactory(t *testing.T) {

fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip)

fetcher.Request(context.Background(), 0)
fetcher.Request(0)

// check if the created fetchFunction really starts a fetcher and initiates a request
select {
Expand All @@ -353,21 +350,21 @@ func TestFetcherFactory(t *testing.T) {
func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fetcher := NewFetcher(ctx, addr, requester.doRequest, true)

// make sure the searchTimeout is long so it is sure the request is not
// retried because of timeout
fetcher.searchTimeout = 10 * time.Second

peersToSkip := &sync.Map{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go fetcher.run(ctx, peersToSkip)
go fetcher.run(peersToSkip)

rctx := context.Background()
fetcher.Request(rctx, 0)
fetcher.Request(0)

select {
case <-requester.requestC:
Expand Down Expand Up @@ -460,17 +457,15 @@ func TestRequestSkipPeerPermanent(t *testing.T) {
func TestFetcherMaxHopCount(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

peersToSkip := &sync.Map{}
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)

go fetcher.run(ctx, peersToSkip)
peersToSkip := &sync.Map{}

rctx := context.Background()
fetcher.Request(rctx, maxHopCount)
go fetcher.run(peersToSkip)

// if hopCount is already at max no request should be initiated
select {
Expand Down
Loading

0 comments on commit 0c10d37

Please sign in to comment.