-
Notifications
You must be signed in to change notification settings - Fork 110
Conversation
This is the result of resolving the conflicts between This PR is still in progress, because I have to fix the tests and run integration tests on it ( Therefore feel free to start reviewing and asking questions if something is not clear, but also expect to see new commits. |
05f9168
to
d8f9a43
Compare
3fa521a
to
c37ef91
Compare
935a0bd
to
60145da
Compare
@@ -62,52 +59,42 @@ func NewDelivery(kad *network.Kademlia, netStore *storage.NetStore) *Delivery { | |||
|
|||
// RetrieveRequestMsg is the protocol msg for chunk retrieve requests | |||
type RetrieveRequestMsg struct { | |||
Addr storage.Address | |||
SkipCheck bool | |||
HopCount uint8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protocol message change. after we merge this, we need to bump protocol versions.
@@ -300,293 +280,3 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { | |||
} | |||
|
|||
} | |||
|
|||
func TestDeliveryFromNodes(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestDeliveryFromNodes
is removed, because we no longer test that a request for a chunk is forwarded within a chain of nodes. FindPeer
functionality is changed so that requests with the NN are handled, but not forwarded.
if wait := c.NeedData(ctx, hash); wait != nil { | ||
log.Trace("checking offered hash", "ref", fmt.Sprintf("%x", hash)) | ||
|
||
if _, wait := c.NeedData(ctx, hash); wait != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR we introduce no change to the production syncer code. The bug where a chunk might be requested for N peers if N peers offer it to us, will be addressed in a follow PR.
NewNetFetcherFunc func(ctx context.Context, addr Address, peers *sync.Map) NetFetcher | ||
const ( | ||
// capacity for the fetchers LRU cache | ||
fetchersCapacity = 500000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fetchers
LRU cache is responsible for removing stale fetchers (outgoing requests for chunks that were never delivered. 500k fetchers is equal to 28MB, and in practice we should never reach that many active fetchers).
var fetcherTimeout = 2 * time.Minute // timeout to cancel the fetcher even if requests are coming in | ||
// NewNetStore creates a new NetStore using the provided chunk.Store and localID of the node. | ||
func NewNetStore(store chunk.Store, localID enode.ID) *NetStore { | ||
fetchers, _ := lru.New(fetchersCapacity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We ignore the error, because we know
that we are creating a LRU cache with a positive size.
@jpeletier could you explain a bit further the behaviour you rely on? Based on the tests it appeared that
From your comment I see that you care mostly if the client cancelled the context or if the chunk was not found by the NetStore, is that right? If this is the case, then we could probably refactor this bit. To give you background, the reason for this change is because the Swarm Feed tests were failing, when a chunk was not found by the NetStore and it returns an error. You can reproduce this, by reverting this particular change, and trying to run the tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great work @nonsense.
a few small comments, mainly about tracing, whitespacing (feel free to ignore), and a small comment about the netstore shutdown. otherwise lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ship it 🚢
// TODO: define "eligible" | ||
func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*enode.ID, chan struct{}, error) { | ||
requestFromPeersCount.Inc(1) | ||
// getOriginPo returns the originPo if the incoming Request has an Origin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the codebase po
generally refers to proximity order
which is the number of matching leading (most significant) bits between two byte slices.
This PR is a subset of the
simplify-fetchers
branch for easier review and mostly addressing the issues described at #1309It includes:
FetchFunc
with two new functions introduced are:You probably want to start reviewing this PR by checking them out and checking where they are used.
swarm/network/fetcher.go
has been removed and replaced withsingleflight.Group
. Every time theNetstore.Get
looks for chunk that is not in our LocalStore, it uses thesingleflight.Group
in order to suppress duplicate retrievals for the same chunk.Each in-flight request for a chunk adds a
Fetcher
value to a map, so that theNetStore.Get
receives a signal that the chunk it requested has been delivered. The signal that a chunk is delivered and stored to the LocalStore is the closing of theFetcher.Delivered
channel.NetStore.Put
is responsible for removing fetchers and closing the delivered channel. If a fetcher item has been added to the fetchers map, then we must get that chunk and there is an interested party in the delivered signal.We need to add a timeout to the fetchers, because even though a chunk has been requested and an actor waits for it, doesn't mean it will be delivered.
handleOfferedHashesMsg
uses theNetstore.Has
to determine if it needs a chunk (while syncing). If a chunk is needed, then aFetcher
is also created, so that we keep track while the current batch is being delivered.It is possible for a chunk to be both requested through a retrieve request and delivered through syncing independently, but there would always be only one
Fetcher
and only oneDelivered
channel for it so that interested parties are notified when the chunk is delivered and stored.// * first it tries to request explicitly from peers that are known to have offered the chunk
- this part of the functionality ofRequestFromPeers
has been removed. We no longer control the flow through values in thecontext.Context
.RequestTimeout
has been split intoRequestTimeout
andFailedPeerSkipDelay
as these have different meanings. All timeouts are now placed in the timeouts package, and have documentation.Added
LNetStore
(we probably need a better name) - aNetStore
wrapper needed for theLazyChunkReader
.Found a bug in
OfferedHashes
where we request the same chunk from multiple peers via theOfferedHashes/WantedHashes
protocol. In a future PR we will address it the following way: if we have requested a chunk, we have a fetcher for it, so subsequentOfferedHashes
won't have an effect.Solved bug where the interval passed to SubscriptionPull is exclusive, meaning we lose one chunk between batches.
Solved a bug with chunk deliveries described at Fix tracing for chunk delivery #1292 - chunks are delivered, but fetcher continues to make requests for the same chunk.
TODO TESTS:
streamer_test.go
testsTestDeliveryFromNodes
is removed altogether, because we no longer blindly forward requests to our peers -FindPeer
was changed.TestRetrieval
TestFileRetrieval
TODO:
NICE TO HAVE:
timeouts.SearchTimeout
- we can immediately call next peer.FindPeer
- it has at least one new condition - not going out of depth - Add test forFindPeer
#1362