Skip to content

Commit

Permalink
feat(sync-v2): Improve sync-v2 reliability
Browse files Browse the repository at this point in the history
  • Loading branch information
msbrogli committed Nov 3, 2023
1 parent b0642e2 commit 98121c4
Showing 1 changed file with 48 additions and 50 deletions.
98 changes: 48 additions & 50 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
self.blockchain_streaming: Optional[BlockchainStreaming] = None
self.transactions_streaming: Optional[TransactionsStreaming] = None

# Whether the peers are synced, i.e. our best height and best block are the same
# Whether the peers are synced, i.e. we have the same best block.
# Notice that this flag ignores the mempool.
self._synced = False

# Indicate whether the sync manager has been started.
Expand Down Expand Up @@ -299,11 +300,6 @@ def _run_sync(self) -> Generator[Any, Any, None]:
self.log.debug('running mempool sync, try again later')
return

bestblock = self.tx_storage.get_best_block()
meta = bestblock.get_metadata()

self.log.debug('run sync', height=meta.height)

assert self.protocol.connections is not None
assert self.tx_storage.indexes is not None
assert self.tx_storage.indexes.deps is not None
Expand All @@ -313,9 +309,13 @@ def _run_sync(self) -> Generator[Any, Any, None]:
self.update_synced(False)
# TODO: find out whether we can sync transactions from this peer to speed things up
self.run_sync_transactions()
else:
# I am already in sync with all checkpoints, sync next blocks
yield self.run_sync_blocks()
return

is_block_synced = yield self.run_sync_blocks()
if is_block_synced:
# our blocks are synced, so sync the mempool
self.state = PeerState.SYNCING_MEMPOOL
self.mempool_manager.run()

def run_sync_transactions(self) -> None:
""" Run a step of the transaction syncing phase.
Expand Down Expand Up @@ -355,8 +355,10 @@ def get_my_best_block(self) -> _HeightInfo:
return _HeightInfo(height=bestblock.get_height(), id=bestblock.hash)

@inlineCallbacks
def run_sync_blocks(self) -> Generator[Any, Any, None]:
""" Async step of the block syncing phase.
def run_sync_blocks(self) -> Generator[Any, Any, bool]:
"""Async step of the block syncing phase. Return True if we already have all other peer's blocks.
Notice that we might already have all other peer's blocks while the other peer is still syncing.
"""
assert self.tx_storage.indexes is not None
self.state = PeerState.SYNCING_BLOCKS
Expand All @@ -368,34 +370,51 @@ def run_sync_blocks(self) -> Generator[Any, Any, None]:
self.peer_best_block = yield self.get_peer_best_block()
assert self.peer_best_block is not None

# Are we synced?
if self.peer_best_block == my_best_block:
# Yes, we are synced! \o/
self.log.info('blocks are synced', best_block=my_best_block)
self.update_synced(True)
self.send_relay(enable=True)
self.synced_block = self.peer_best_block
return True

# Not synced but same blockchain?
if self.peer_best_block.height <= my_best_block.height:
# Is peer behind me at the same blockchain?
common_block_hash = self.tx_storage.indexes.height.get(self.peer_best_block.height)
if common_block_hash == self.peer_best_block.id:
# If yes, nothing to sync from this peer.
self.log.info('nothing to sync because peer is behind me at the same best blockchain',
my_best_block=my_best_block, peer_best_block=self.peer_best_block)
self.update_synced(True)
self.send_relay(enable=True)
self.synced_block = self.peer_best_block
return True

# Ok. We have blocks to sync.
self.update_synced(False)
self.send_relay(enable=False)

# Find best common block
self.synced_block = yield self.find_best_common_block(my_best_block, self.peer_best_block)
if self.synced_block is None:
# Find best common block failed. Try again soon.
# This might happen if a reorg occurs during the search.
return
self.log.debug('find_best_common_block failed.')
return False

self.log.debug('run_sync_blocks',
self.log.debug('starting to sync blocks',
my_best_block=my_best_block,
peer_best_block=self.peer_best_block,
synced_block=self.synced_block)

if self.synced_block.height < self.peer_best_block.height:
# sync from common block
self.run_block_sync(self.synced_block.id,
self.synced_block.height,
self.peer_best_block.id,
self.peer_best_block.height)
elif my_best_block.height == self.synced_block.height == self.peer_best_block.height:
# we're synced and on the same height, get their mempool
self.state = PeerState.SYNCING_MEMPOOL
self.mempool_manager.run()
elif self._is_relaying:
# TODO: validate if this is when we should disable relaying
self.send_relay(enable=False)
else:
# we got all the peer's blocks but aren't on the same height, nothing to do
pass
# Sync from common block
self.run_block_sync(self.synced_block.id,
self.synced_block.height,
self.peer_best_block.id,
self.peer_best_block.height)
return False

def get_tips(self) -> Deferred[list[bytes]]:
""" Async method to request the remote peer's tips.
Expand Down Expand Up @@ -525,27 +544,6 @@ def find_best_common_block(self,
"""
self.log.debug('find_best_common_block', peer_best_block=peer_best_block, my_best_block=my_best_block)

if peer_best_block.height <= my_best_block.height:
assert self.tx_storage.indexes is not None
common_block_hash = self.tx_storage.indexes.height.get(peer_best_block.height)
if peer_best_block.id == common_block_hash:
# we have all the peer's blocks
if peer_best_block.height == my_best_block.height:
# We are in sync, ask for relay so the remote sends transactions in real time
self.update_synced(True)
self.send_relay()
else:
self.update_synced(False)
self.log.debug('synced to the latest peer block', peer_best_block=peer_best_block)
return _HeightInfo(height=peer_best_block.height, id=common_block_hash)
else:
# peer is on a different best chain
self.log.warn('peer on different chain',
peer_best_block=peer_best_block,
my_best_block=my_best_block)

self.update_synced(False)

# Run an n-ary search in the interval [lo, hi).
# `lo` is always a height where we are synced.
# `hi` is always a height where sync state is unknown.
Expand Down

0 comments on commit 98121c4

Please sign in to comment.