-
Notifications
You must be signed in to change notification settings - Fork 685
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
StateDownloader now handles data requests from peers #987
StateDownloader now handles data requests from peers #987
Conversation
The other two PRs this is based on have been 👍 I'll review this once those are merged and this has been rebased (hard to parse as is) |
bc445f8
to
1b01e14
Compare
Rebased |
1b01e14
to
6dc934d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel reasonably strongly about my suggestion for a PeerRequestHandler
API of some sort to reduce the complexity of the various call sites which handle peer requests. Let me know your thoughts.
p2p/chain.py
Outdated
type(block_number_or_hash), | ||
) | ||
|
||
limit = max(max_headers, eth.MAX_HEADERS_FETCH) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about splitting this part of the body (the pure part) into a stand alone utility function for which we can write some simple tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to save that for another PR, as I'm just shuffling this code around in this one
p2p/chain.py
Outdated
|
||
headers = [header async for header in self._generate_available_headers(block_numbers)] | ||
headers = await lookup_headers( | ||
self.db, header_request['block_number_or_hash'], header_request['max_headers'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These call signatures are quite large. What do you think about something like the following.
class PeerRequestHandler:
def __init__(self, db, logger, cancel_token):
self.db = db
...
def lookup_headers(self, ...):
return lookup_headers(db=self.db, ..., logger=self.logger, token=self.cancel_token)
...
Then the BaseChainSyncer
classes can just set an instance of this class as a local variable and the call turns into headers = await self.handler.lookup_headers(bn_or_hash, max_headers, skip, reverse)
and eliminates the extra 3 arguments that are always the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's a nice improvement
p2p/chain.py
Outdated
elif isinstance(cmd, eth.GetReceipts): | ||
await self._handle_get_receipts(peer, cast(List[Hash32], msg)) | ||
await handle_get_receipts( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In conjunction with my previous comment, these would now be await self.handler.get_receipts(peer, ...)
p2p/chain.py
Outdated
logger: logging.Logger, token: CancelToken) -> None: | ||
nodes = [] | ||
# Only serve up to eth.MAX_STATE_FETCH items in every request. | ||
for node_hash in node_hashes[:eth.MAX_STATE_FETCH]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My gut says that the truncation of which hashes we retrieve should happen at a higher level where this function is actually called, and that this function should blindly return all of the data that was requested.
I think this is a pretty minor thing, but architecturally, I think it is more correct.
- simplifies this function to be less complex.
- separates different classes of business logic.
- allows this function to be used for larger retrieval sizes (if that ever becomes a requirement)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, but this is where we call the sub-proto method that actually sends the data, so would need to extract this part into a separate unit that just returns the trie nodes. It'd also have to become a generator otherwise malicious peers could easily DoS us as we'd be always retrieving all the data from the DB, regardless of the limit.
How would you feel about just adding a new limit
argument to the handle_*()
methods for now, and having them default to the current values we use? That way we don't need to break these into even smaller methods (which is not necessary yet), but we keep them flexible and easy to refactor should it become necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention was not to truncate the return value, but to truncate node_hashes
before it was passed into this function.
# at the call-site
trie_data = self._handler.handle_get_node_data(..., node_hashes=requested_hashes[:MAX_STATE_FETCH])
I think I prefer this over the limit
argument, but both would be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see! Went with your proposed solution
Those are the methods related to handling GetBlockHeader requests from peers, which are also needed in StateDownloader, so turned into funcs
Turned the ChainSyncer methods that do that into standalone funcs so they could be reused in StateDownloader
6dc934d
to
ee315b7
Compare
p2p/chain.py
Outdated
chaindb = cast('AsyncChainDB', self.db) | ||
bodies = [] | ||
# Only serve up to eth.MAX_BODIES_FETCH items in every request. | ||
for block_hash in block_hashes[:eth.MAX_BODIES_FETCH]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thought here about moving the limits up a level to the call site of this function.
bodies = self._handler.handle_get_block_bodies(peer, requested_block_hashes[:MAX_BODIES_FETCH])
self.logger = logger | ||
self.cancel_token = token | ||
|
||
async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: List[Hash32]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the class has the name Handler
what do you think about dropping the handle_
prefix for these methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, if I drop the handle_
prefix the names will kinda suggest the methods are getters (e.g. get_block_bodies()
), so I'd rather keep the prefix
p2p/chain.py
Outdated
# Only serve up to eth.MAX_BODIES_FETCH items in every request. | ||
for block_hash in block_hashes[:eth.MAX_BODIES_FETCH]: | ||
try: | ||
header = await wait_with_token( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe put the self.wait(...)
API on this class. Maybe we can introduce that as a mixin class that
class Cancelable:
cancel_token: CancelToken = None
def wait(...):
...
def wait_first(...):
...
Or Just inline a duplication of the wait
function from our Service
class with a comment/issue to remove the duplication at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the mixin idea! Just thought I'd add a Mixin
suffix to its name to make that clear, though
|
||
async def _get_block_numbers_for_request( | ||
self, block_number_or_hash: Union[int, bytes], max_headers: int, | ||
skip: int, reverse: bool) -> List[BlockNumber]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly unrelated. I think go-ethereum recently patched a bug which exploited overflowing their integer with very large skip
values. While we don't suffer from the same overflow problems, it makes me think that we should enforce a reasonable upper bound on the skip size.
I noticed a low peer count during a state sync (which makes it quite slow) and it turns out that's because peers attempt to fetch data from us and disconnect as we fail to reply. This should ensure we keep a high peer count, speeding up the state sync significantly
Depends on #980