Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sync-v2): Fix n-ary search to handle reorgs during its execution #842

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 40 additions & 23 deletions hathor/p2p/sync_v2/agent.py
glevco marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -364,13 +364,17 @@ def run_sync_blocks(self) -> Generator[Any, Any, None]:
# Get my best block.
my_best_block = self.get_my_best_block()

# Find peer's best block
# Get peer's best block
self.peer_best_block = yield self.get_peer_best_block()
assert self.peer_best_block is not None

# find best common block
# Find best common block
self.synced_block = yield self.find_best_common_block(my_best_block, self.peer_best_block)
assert self.synced_block is not None
if self.synced_block is None:
# Find best common block failed. Try again soon.
# This might happen if a reorg occurs during the search.
return

self.log.debug('run_sync_blocks',
my_best_block=my_best_block,
peer_best_block=self.peer_best_block,
Expand Down Expand Up @@ -516,7 +520,7 @@ def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
@inlineCallbacks
def find_best_common_block(self,
my_best_block: _HeightInfo,
peer_best_block: _HeightInfo) -> Generator[Any, Any, _HeightInfo]:
peer_best_block: _HeightInfo) -> Generator[Any, Any, Optional[_HeightInfo]]:
""" Search for the highest block/height where we're synced.
"""
self.log.debug('find_best_common_block', peer_best_block=peer_best_block, my_best_block=my_best_block)
Expand Down Expand Up @@ -545,36 +549,49 @@ def find_best_common_block(self,
# Run an n-ary search in the interval [lo, hi).
# `lo` is always a height where we are synced.
# `hi` is always a height where sync state is unknown.
hi = min(peer_best_block.height, my_best_block.height)
lo = 0

lo_block_hash = self._settings.GENESIS_BLOCK_HASH
hi = min(peer_best_block, my_best_block, key=lambda x: x.height)
lo = _HeightInfo(height=0, id=self._settings.GENESIS_BLOCK_HASH)

while hi - lo > 1:
while hi.height - lo.height > 1:
self.log.info('find_best_common_block n-ary search query', lo=lo, hi=hi)
step = math.ceil((hi - lo) / 10)
heights = list(range(lo, hi, step))
heights.append(hi)

block_height_list = yield self.get_peer_block_hashes(heights)
block_height_list.sort(key=lambda x: x.height, reverse=True)

for height, block_hash in block_height_list:
step = math.ceil((hi.height - lo.height) / 10)
heights = list(range(lo.height, hi.height, step))
heights.append(hi.height)

block_info_list = yield self.get_peer_block_hashes(heights)
block_info_list.sort(key=lambda x: x.height, reverse=True)

# As we are supposed to be always synced at `lo`, we expect to receive a response
# with at least one item equals to lo. If it does not happen, we stop the search
# and return None. This might be caused when a reorg occurs during the search.
if not block_info_list:
self.log.info('n-ary search failed because it got a response with no lo_block_info',
lo=lo,
hi=hi)
return None
lo_block_info = block_info_list[-1]
if lo_block_info != lo:
self.log.info('n-ary search failed because lo != lo_block_info',
lo=lo,
hi=hi,
lo_block_info=lo_block_info)
return None

for info in block_info_list:
try:
# We must check only fully validated transactions.
blk = self.tx_storage.get_transaction(block_hash)
blk = self.tx_storage.get_transaction(info.id)
except TransactionDoesNotExist:
hi = height
hi = info
else:
assert blk.get_metadata().validation.is_fully_connected()
assert isinstance(blk, Block)
assert height == blk.get_height()
lo = height
lo_block_hash = block_hash
assert info.height == blk.get_height()
lo = info
break

self.log.debug('find_best_common_block n-ary search finished', lo=lo, hi=hi)
return _HeightInfo(height=lo, id=lo_block_hash)
return lo

def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[_HeightInfo]]:
""" Returns the peer's block hashes in the given heights.
Expand Down
81 changes: 81 additions & 0 deletions tests/p2p/test_sync_v2.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import pytest
from twisted.internet.defer import inlineCallbacks, succeed
from twisted.python.failure import Failure

from hathor.conf import HathorSettings
from hathor.p2p.peer_id import PeerId
from hathor.p2p.sync_v2.agent import _HeightInfo
from hathor.simulator import FakeConnection
from hathor.simulator.trigger import StopAfterNMinedBlocks, StopAfterNTransactions, StopWhenTrue, Trigger
from hathor.transaction.storage.traversal import DFSWalk
Expand Down Expand Up @@ -243,3 +245,82 @@ def test_exceeds_streaming_and_mempool_limits(self) -> None:

self.assertEqual(manager1.tx_storage.get_vertices_count(), manager2.tx_storage.get_vertices_count())
self.assertConsensusEqualSyncV2(manager1, manager2)

def _prepare_sync_v2_find_best_common_block_reorg(self):
manager1 = self.create_peer(enable_sync_v1=False, enable_sync_v2=True)
manager1.allow_mining_without_peers()
miner1 = self.simulator.create_miner(manager1, hashpower=10e6)
miner1.start()
self.assertTrue(self.simulator.run(24 * 3600))
miner1.stop()

manager2 = self.create_peer(enable_sync_v1=False, enable_sync_v2=True)
conn12 = FakeConnection(manager1, manager2, latency=0.05)
self.simulator.add_connection(conn12)

self.assertTrue(self.simulator.run(3600))
return conn12

@inlineCallbacks
def test_sync_v2_find_best_common_block_reorg_1(self):
conn12 = self._prepare_sync_v2_find_best_common_block_reorg()
sync_agent = conn12._proto1.state.sync_agent
rng = conn12.manager2.rng

my_best_block = sync_agent.get_my_best_block()
peer_best_block = sync_agent.peer_best_block

fake_peer_best_block = _HeightInfo(my_best_block.height + 3, rng.randbytes(32))
reorg_height = peer_best_block.height - 50

def fake_get_peer_block_hashes(heights):
# return empty as soon as the search lowest height is not the genesis
if heights[0] != 0:
return []

# simulate a reorg
response = []
for h in heights:
if h < reorg_height:
vertex_id = conn12.manager2.tx_storage.indexes.height.get(h)
else:
vertex_id = rng.randbytes(32)
response.append(_HeightInfo(height=h, id=vertex_id))
return succeed(response)

sync_agent.get_peer_block_hashes = fake_get_peer_block_hashes
common_block_info = yield sync_agent.find_best_common_block(my_best_block, fake_peer_best_block)
self.assertIsNone(common_block_info)

@inlineCallbacks
def test_sync_v2_find_best_common_block_reorg_2(self):
conn12 = self._prepare_sync_v2_find_best_common_block_reorg()
sync_agent = conn12._proto1.state.sync_agent
rng = conn12.manager2.rng

my_best_block = sync_agent.get_my_best_block()
peer_best_block = sync_agent.peer_best_block

fake_peer_best_block = _HeightInfo(my_best_block.height + 3, rng.randbytes(32))
reorg_height = peer_best_block.height - 50

def fake_get_peer_block_hashes(heights):
if heights[0] != 0:
return succeed([
_HeightInfo(height=h, id=rng.randbytes(32))
for h in heights
])

# simulate a reorg
response = []
for h in heights:
if h < reorg_height:
vertex_id = conn12.manager2.tx_storage.indexes.height.get(h)
else:
vertex_id = rng.randbytes(32)
response.append(_HeightInfo(height=h, id=vertex_id))
return succeed(response)

sync_agent.get_peer_block_hashes = fake_get_peer_block_hashes
common_block_info = yield sync_agent.find_best_common_block(my_best_block, fake_peer_best_block)
self.assertIsNone(common_block_info)
Loading