From c47f06ed7e49d4a289e7dfc7af72f3caae2a70d0 Mon Sep 17 00:00:00 2001 From: Marcelo Salhab Brogliato Date: Fri, 27 Oct 2023 14:09:59 -0500 Subject: [PATCH] fix(sync-v2): Fix n-ary search to handle reorgs during its execution --- hathor/p2p/sync_v2/agent.py | 63 ++++++++++++++++++----------- tests/p2p/test_sync_v2.py | 81 +++++++++++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 23 deletions(-) diff --git a/hathor/p2p/sync_v2/agent.py b/hathor/p2p/sync_v2/agent.py index 732c069c0..937ea89af 100644 --- a/hathor/p2p/sync_v2/agent.py +++ b/hathor/p2p/sync_v2/agent.py @@ -364,13 +364,17 @@ def run_sync_blocks(self) -> Generator[Any, Any, None]: # Get my best block. my_best_block = self.get_my_best_block() - # Find peer's best block + # Get peer's best block self.peer_best_block = yield self.get_peer_best_block() assert self.peer_best_block is not None - # find best common block + # Find best common block self.synced_block = yield self.find_best_common_block(my_best_block, self.peer_best_block) - assert self.synced_block is not None + if self.synced_block is None: + # Find best common block failed. Try again soon. + # This might happen if a reorg occurs during the search. + return + self.log.debug('run_sync_blocks', my_best_block=my_best_block, peer_best_block=self.peer_best_block, @@ -516,7 +520,7 @@ def partial_vertex_exists(self, vertex_id: VertexId) -> bool: @inlineCallbacks def find_best_common_block(self, my_best_block: _HeightInfo, - peer_best_block: _HeightInfo) -> Generator[Any, Any, _HeightInfo]: + peer_best_block: _HeightInfo) -> Generator[Any, Any, Optional[_HeightInfo]]: """ Search for the highest block/height where we're synced. """ self.log.debug('find_best_common_block', peer_best_block=peer_best_block, my_best_block=my_best_block) @@ -545,36 +549,49 @@ def find_best_common_block(self, # Run an n-ary search in the interval [lo, hi). # `lo` is always a height where we are synced. # `hi` is always a height where sync state is unknown. - hi = min(peer_best_block.height, my_best_block.height) - lo = 0 - - lo_block_hash = self._settings.GENESIS_BLOCK_HASH + hi = min(peer_best_block, my_best_block, key=lambda x: x.height) + lo = _HeightInfo(height=0, id=self._settings.GENESIS_BLOCK_HASH) - while hi - lo > 1: + while hi.height - lo.height > 1: self.log.info('find_best_common_block n-ary search query', lo=lo, hi=hi) - step = math.ceil((hi - lo) / 10) - heights = list(range(lo, hi, step)) - heights.append(hi) - - block_height_list = yield self.get_peer_block_hashes(heights) - block_height_list.sort(key=lambda x: x.height, reverse=True) - - for height, block_hash in block_height_list: + step = math.ceil((hi.height - lo.height) / 10) + heights = list(range(lo.height, hi.height, step)) + heights.append(hi.height) + + block_info_list = yield self.get_peer_block_hashes(heights) + block_info_list.sort(key=lambda x: x.height, reverse=True) + + # As we are supposed to be always synced at `lo`, we expect to receive a response + # with at least one item equals to lo. If it does not happen, we stop the search + # and return None. This might be caused when a reorg occurs during the search. + if not block_info_list: + self.log.info('n-ary search failed because it got a response with no lo_block_info', + lo=lo, + hi=hi) + return None + lo_block_info = block_info_list[-1] + if lo_block_info != lo: + self.log.info('n-ary search failed because lo != lo_block_info', + lo=lo, + hi=hi, + lo_block_info=lo_block_info) + return None + + for info in block_info_list: try: # We must check only fully validated transactions. - blk = self.tx_storage.get_transaction(block_hash) + blk = self.tx_storage.get_transaction(info.id) except TransactionDoesNotExist: - hi = height + hi = info else: assert blk.get_metadata().validation.is_fully_connected() assert isinstance(blk, Block) - assert height == blk.get_height() - lo = height - lo_block_hash = block_hash + assert info.height == blk.get_height() + lo = info break self.log.debug('find_best_common_block n-ary search finished', lo=lo, hi=hi) - return _HeightInfo(height=lo, id=lo_block_hash) + return lo def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[_HeightInfo]]: """ Returns the peer's block hashes in the given heights. diff --git a/tests/p2p/test_sync_v2.py b/tests/p2p/test_sync_v2.py index f25086215..ed4412f25 100644 --- a/tests/p2p/test_sync_v2.py +++ b/tests/p2p/test_sync_v2.py @@ -1,8 +1,10 @@ import pytest +from twisted.internet.defer import inlineCallbacks, succeed from twisted.python.failure import Failure from hathor.conf import HathorSettings from hathor.p2p.peer_id import PeerId +from hathor.p2p.sync_v2.agent import _HeightInfo from hathor.simulator import FakeConnection from hathor.simulator.trigger import StopAfterNMinedBlocks, StopAfterNTransactions, StopWhenTrue, Trigger from hathor.transaction.storage.traversal import DFSWalk @@ -243,3 +245,82 @@ def test_exceeds_streaming_and_mempool_limits(self) -> None: self.assertEqual(manager1.tx_storage.get_vertices_count(), manager2.tx_storage.get_vertices_count()) self.assertConsensusEqualSyncV2(manager1, manager2) + + def _prepare_sync_v2_find_best_common_block_reorg(self): + manager1 = self.create_peer(enable_sync_v1=False, enable_sync_v2=True) + manager1.allow_mining_without_peers() + miner1 = self.simulator.create_miner(manager1, hashpower=10e6) + miner1.start() + self.assertTrue(self.simulator.run(24 * 3600)) + miner1.stop() + + manager2 = self.create_peer(enable_sync_v1=False, enable_sync_v2=True) + conn12 = FakeConnection(manager1, manager2, latency=0.05) + self.simulator.add_connection(conn12) + + self.assertTrue(self.simulator.run(3600)) + return conn12 + + @inlineCallbacks + def test_sync_v2_find_best_common_block_reorg_1(self): + conn12 = self._prepare_sync_v2_find_best_common_block_reorg() + sync_agent = conn12._proto1.state.sync_agent + rng = conn12.manager2.rng + + my_best_block = sync_agent.get_my_best_block() + peer_best_block = sync_agent.peer_best_block + + fake_peer_best_block = _HeightInfo(my_best_block.height + 3, rng.randbytes(32)) + reorg_height = peer_best_block.height - 50 + + def fake_get_peer_block_hashes(heights): + # return empty as soon as the search lowest height is not the genesis + if heights[0] != 0: + return [] + + # simulate a reorg + response = [] + for h in heights: + if h < reorg_height: + vertex_id = conn12.manager2.tx_storage.indexes.height.get(h) + else: + vertex_id = rng.randbytes(32) + response.append(_HeightInfo(height=h, id=vertex_id)) + return succeed(response) + + sync_agent.get_peer_block_hashes = fake_get_peer_block_hashes + common_block_info = yield sync_agent.find_best_common_block(my_best_block, fake_peer_best_block) + self.assertIsNone(common_block_info) + + @inlineCallbacks + def test_sync_v2_find_best_common_block_reorg_2(self): + conn12 = self._prepare_sync_v2_find_best_common_block_reorg() + sync_agent = conn12._proto1.state.sync_agent + rng = conn12.manager2.rng + + my_best_block = sync_agent.get_my_best_block() + peer_best_block = sync_agent.peer_best_block + + fake_peer_best_block = _HeightInfo(my_best_block.height + 3, rng.randbytes(32)) + reorg_height = peer_best_block.height - 50 + + def fake_get_peer_block_hashes(heights): + if heights[0] != 0: + return succeed([ + _HeightInfo(height=h, id=rng.randbytes(32)) + for h in heights + ]) + + # simulate a reorg + response = [] + for h in heights: + if h < reorg_height: + vertex_id = conn12.manager2.tx_storage.indexes.height.get(h) + else: + vertex_id = rng.randbytes(32) + response.append(_HeightInfo(height=h, id=vertex_id)) + return succeed(response) + + sync_agent.get_peer_block_hashes = fake_get_peer_block_hashes + common_block_info = yield sync_agent.find_best_common_block(my_best_block, fake_peer_best_block) + self.assertIsNone(common_block_info)