Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: FastChainSyncer._sync() always returns when sync is complete #908

Merged
merged 1 commit into from
Jun 15, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions p2p/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ async def sync(self, peer: ETHPeer) -> None:
self._syncing = False

async def _sync(self, peer: ETHPeer) -> None:
"""Try to fetch/process blocks until the given peer's head_hash.

Returns when the peer's head_hash is available in our ChainDB, or if any error occurs
during the sync.

If in fast-sync mode, the _sync_completed event will be set upon successful completion of
a sync.
"""
head = await self.wait(self.chaindb.coro_get_canonical_head())
head_td = await self.wait(self.chaindb.coro_get_score(head.hash))
if peer.head_td <= head_td:
Expand All @@ -154,7 +162,7 @@ async def _sync(self, peer: ETHPeer) -> None:
# FIXME: Fetch a batch of headers, in reverse order, starting from our current head, and
# find the common ancestor between our chain and the peer's.
start_at = max(0, head.block_number - eth.MAX_HEADERS_FETCH)
while not self._sync_complete.is_set():
while True:
if not peer.is_running:
self.logger.info("%s disconnected, aborting sync", peer)
break
Expand Down Expand Up @@ -182,6 +190,20 @@ async def _sync(self, peer: ETHPeer) -> None:
break
start_at = head_number + 1

# Quite often the header batch we receive here includes headers past the peer's reported
# head (via the NewBlock msg), so we can't compare our head's hash to the peer's in
# order to see if the sync is completed. Instead we just check that we have the peer's
# head_hash in our chain.
if await self.wait(self.chaindb.coro_header_exists(peer.head_hash)):
self.logger.info("Sync with %s completed", peer)
self._sync_complete_callback()
break

def _sync_complete_callback(self) -> None:
# When doing a fast-sync, as soon as we complete a sync we set this event to cause _run()
# to return, so that we can download the state.
self._sync_complete.set()

async def _calculate_td(self, headers: List[BlockHeader]) -> int:
"""Return the score (total difficulty) of the last header in the given list.

Expand Down Expand Up @@ -237,17 +259,6 @@ async def _process_headers(self, peer: ETHPeer, headers: List[BlockHeader]) -> i
head.block_number,
encode_hex(head.hash)[2:8],
)
# Quite often the header batch we receive here includes headers past the peer's reported
# head (via the NewBlock msg), so we can't compare our head's hash to the peer's in
# order to see if the sync is completed. Instead we just check that we have the peer's
# head_hash in our chain.
try:
await self.wait(self.chaindb.coro_get_block_header_by_hash(peer.head_hash))
except HeaderNotFound:
pass
else:
self.logger.info("Fast sync with %s completed", peer)
self._sync_complete.set()

return head.block_number

Expand Down Expand Up @@ -442,6 +453,11 @@ def __init__(self,
super().__init__(chaindb, peer_pool, token)
self.chain = chain

def _sync_complete_callback(self) -> None:
# Unlike fast-sync, whenever we complete a sync we just sit and wait for peers to announce
# new blocks, so we have nothing to do here.
pass

async def _handle_msg(self, peer: ETHPeer, cmd: protocol.Command,
msg: protocol._DecodedMsgType) -> None:
if isinstance(cmd, eth.BlockHeaders):
Expand Down