Skip to content

Commit

Permalink
refactor(sync-v2): Refactor sync_v2 agent to hold (block_height, bloc…
Browse files Browse the repository at this point in the history
…k_id) information in an internal namedtuple
  • Loading branch information
msbrogli committed Oct 27, 2023
1 parent 20474fd commit 89d347e
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 101 deletions.
168 changes: 95 additions & 73 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import struct
from collections import OrderedDict
from enum import Enum
from typing import TYPE_CHECKING, Any, Callable, Generator, Optional, cast
from typing import TYPE_CHECKING, Any, Callable, Generator, NamedTuple, Optional, cast

from structlog import get_logger
from twisted.internet.defer import Deferred, inlineCallbacks
Expand All @@ -44,6 +44,20 @@
MAX_GET_TRANSACTIONS_BFS_LEN: int = 8


class _HeightInfo(NamedTuple):
height: int
id: VertexId

def __repr__(self):
return f'_HeightInfo({self.height}, {self.id.hex()})'

def to_json(self) -> dict[str, Any]:
return {
'height': self.height,
'id': self.id.hex(),
}


class PeerState(Enum):
ERROR = 'error'
UNKNOWN = 'unknown'
Expand Down Expand Up @@ -92,16 +106,16 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
self.receiving_stream = False

# highest block where we are synced
self.synced_height = 0
self.synced_block: Optional[_HeightInfo] = None

# highest block peer has
self.peer_height = 0
self.peer_best_block: Optional[_HeightInfo] = None

# Latest deferred waiting for a reply.
self._deferred_txs: dict[VertexId, Deferred[BaseTransaction]] = {}
self._deferred_tips: Optional[Deferred[list[bytes]]] = None
self._deferred_best_block: Optional[Deferred[dict[str, Any]]] = None
self._deferred_peer_block_hashes: Optional[Deferred[list[tuple[int, bytes]]]] = None
self._deferred_best_block: Optional[Deferred[_HeightInfo]] = None
self._deferred_peer_block_hashes: Optional[Deferred[list[_HeightInfo]]] = None

# When syncing blocks we start streaming with all peers
# so the moment I get some repeated blocks, I stop the download
Expand Down Expand Up @@ -151,8 +165,8 @@ def get_status(self) -> dict[str, Any]:
"""
res = {
'is_enabled': self.is_sync_enabled(),
'peer_height': self.peer_height,
'synced_height': self.synced_height,
'peer_best_block': self.peer_best_block.to_json() if self.peer_best_block else None,
'synced_block': self.synced_block.to_json() if self.synced_block else None,
'synced': self._synced,
'state': self.state.value,
}
Expand Down Expand Up @@ -332,37 +346,43 @@ def run_sync_transactions(self) -> None:
end_block_height=block_height)
self.send_get_transactions_bfs(needed_txs, block.hash)

def get_my_best_block(self) -> _HeightInfo:
"""Return my best block info."""
bestblock = self.tx_storage.get_best_block()
assert bestblock.hash is not None
meta = bestblock.get_metadata()
assert meta.validation.is_fully_connected()
return _HeightInfo(height=bestblock.get_height(), id=bestblock.hash)

@inlineCallbacks
def run_sync_blocks(self) -> Generator[Any, Any, None]:
""" Async step of the block syncing phase.
"""
assert self.tx_storage.indexes is not None
self.state = PeerState.SYNCING_BLOCKS

# Find my height
bestblock = self.tx_storage.get_best_block()
assert bestblock.hash is not None
meta = bestblock.get_metadata()
my_height = meta.height

self.log.debug('run sync blocks', my_height=my_height)
# Get my best block.
my_best_block = self.get_my_best_block()

# Find best block
data = yield self.get_peer_best_block()
peer_best_block = data['block']
peer_best_height = data['height']
self.peer_height = peer_best_height
# Find peer's best block
self.peer_best_block = yield self.get_peer_best_block()
assert self.peer_best_block is not None

# find best common block
yield self.find_best_common_block(peer_best_height, peer_best_block)
self.log.debug('run_sync_blocks', peer_height=self.peer_height, synced_height=self.synced_height)

if self.synced_height < self.peer_height:
self.synced_block = yield self.find_best_common_block(my_best_block, self.peer_best_block)
assert self.synced_block is not None
self.log.debug('run_sync_blocks',
my_best_block=my_best_block,
peer_best_block=self.peer_best_block,
synced_block=self.synced_block)

if self.synced_block.height < self.peer_best_block.height:
# sync from common block
peer_block_at_height = yield self.get_peer_block_hashes([self.synced_height])
if peer_block_at_height:
self.run_block_sync(peer_block_at_height[0][1], self.synced_height, peer_best_block, peer_best_height)
elif my_height == self.synced_height == self.peer_height:
self.run_block_sync(self.synced_block.id,
self.synced_block.height,
self.peer_best_block.id,
self.peer_best_block.height)
elif my_best_block.height == self.synced_block.height == self.peer_best_block.height:
# we're synced and on the same height, get their mempool
self.state = PeerState.SYNCING_MEMPOOL
self.mempool_manager.run()
Expand Down Expand Up @@ -494,68 +514,69 @@ def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
return self.tx_storage.transaction_exists(vertex_id)

@inlineCallbacks
def find_best_common_block(self, peer_best_height: int, peer_best_block: bytes) -> Generator[Any, Any, None]:
def find_best_common_block(self,
my_best_block: _HeightInfo,
peer_best_block: _HeightInfo) -> Generator[Any, Any, _HeightInfo]:
""" Search for the highest block/height where we're synced.
"""
assert self.tx_storage.indexes is not None
my_best_height = self.tx_storage.get_height_best_block()

self.log.debug('find common chain', peer_height=peer_best_height, my_height=my_best_height)
self.log.debug('find_best_common_block', peer_best_block=peer_best_block, my_best_block=my_best_block)

if peer_best_height <= my_best_height:
my_block = self.tx_storage.indexes.height.get(peer_best_height)
if my_block == peer_best_block:
if peer_best_block.height <= my_best_block.height:
assert self.tx_storage.indexes is not None
common_block_hash = self.tx_storage.indexes.height.get(peer_best_block.height)
if peer_best_block.id == common_block_hash:
# we have all the peer's blocks
if peer_best_height == my_best_height:
if peer_best_block.height == my_best_block.height:
# We are in sync, ask for relay so the remote sends transactions in real time
self.update_synced(True)
self.send_relay()
else:
self.update_synced(False)

self.log.debug('synced to the latest peer block', height=peer_best_height)
self.synced_height = peer_best_height
return
self.log.debug('synced to the latest peer block', peer_best_block=peer_best_block)
return _HeightInfo(height=peer_best_block.height, id=common_block_hash)
else:
# TODO peer is on a different best chain
self.log.warn('peer on different chain', peer_height=peer_best_height,
peer_block=peer_best_block.hex(), my_block=(my_block.hex() if my_block is not None else
None))
# peer is on a different best chain
self.log.warn('peer on different chain',
peer_best_block=peer_best_block,
my_best_block=my_best_block)

self.update_synced(False)
not_synced = min(peer_best_height, my_best_height)
synced = self.synced_height

while not_synced - synced > 1:
self.log.debug('find_best_common_block synced not_synced', synced=synced, not_synced=not_synced)
step = math.ceil((not_synced - synced)/10)
heights = []
height = synced
while height < not_synced:
heights.append(height)
height += step
heights.append(not_synced)

# Run an n-ary search in the interval [lo, hi).
# `lo` is always a height where we are synced.
# `hi` is always a height where sync state is unknown.
hi = min(peer_best_block.height, my_best_block.height)
lo = 0

lo_block_hash = self._settings.GENESIS_BLOCK_HASH

while hi - lo > 1:
self.log.info('find_best_common_block n-ary search query', lo=lo, hi=hi)
step = math.ceil((hi - lo) / 10)
heights = list(range(lo, hi, step))
heights.append(hi)

block_height_list = yield self.get_peer_block_hashes(heights)
block_height_list.reverse()
block_height_list.sort(key=lambda x: x.height, reverse=True)

for height, block_hash in block_height_list:
try:
# We must check only fully validated transactions.
blk = self.tx_storage.get_transaction(block_hash)
except TransactionDoesNotExist:
hi = height
else:
assert blk.get_metadata().validation.is_fully_connected()
assert isinstance(blk, Block)
if height != blk.get_height():
# WTF?! It should never happen.
self.state = PeerState.ERROR
return
synced = height
assert height == blk.get_height()
lo = height
lo_block_hash = block_hash
break
except TransactionDoesNotExist:
not_synced = height

self.log.debug('find_best_common_block finished synced not_synced', synced=synced, not_synced=not_synced)
self.synced_height = synced
self.log.debug('find_best_common_block n-ary search finished', lo=lo, hi=hi)
return _HeightInfo(height=lo, id=lo_block_hash)

def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[tuple[int, bytes]]]:
def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[_HeightInfo]]:
""" Returns the peer's block hashes in the given heights.
"""
if self._deferred_peer_block_hashes is not None:
Expand Down Expand Up @@ -597,7 +618,7 @@ def handle_peer_block_hashes(self, payload: str) -> None:
""" Handle a PEER-BLOCK-HASHES message.
"""
data = json.loads(payload)
data = [(h, bytes.fromhex(block_hash)) for (h, block_hash) in data]
data = [_HeightInfo(height=h, id=bytes.fromhex(block_hash)) for (h, block_hash) in data]
deferred = self._deferred_peer_block_hashes
self._deferred_peer_block_hashes = None
if deferred:
Expand Down Expand Up @@ -799,7 +820,7 @@ def handle_stop_block_streaming(self, payload: str) -> None:
self.blockchain_streaming.stop()
self.blockchain_streaming = None

def get_peer_best_block(self) -> Deferred[dict[str, Any]]:
def get_peer_best_block(self) -> Deferred[_HeightInfo]:
""" Async call to get the remote peer's best block.
"""
if self._deferred_best_block is not None:
Expand All @@ -819,21 +840,22 @@ def handle_get_best_block(self, payload: str) -> None:
"""
best_block = self.tx_storage.get_best_block()
meta = best_block.get_metadata()
assert meta.validation.is_fully_connected()
data = {'block': best_block.hash_hex, 'height': meta.height}
self.send_message(ProtocolMessages.BEST_BLOCK, json.dumps(data))

def handle_best_block(self, payload: str) -> None:
""" Handle a BEST-BLOCK message.
"""
data = json.loads(payload)
assert self.protocol.connections is not None
self.log.debug('got best block', **data)
data['block'] = bytes.fromhex(data['block'])
_id = bytes.fromhex(data['block'])
height = data['height']
best_block = _HeightInfo(height=height, id=_id)

deferred = self._deferred_best_block
self._deferred_best_block = None
if deferred:
deferred.callback(data)
deferred.callback(best_block)

def _setup_tx_streaming(self):
""" Common setup before starting an outgoing transaction stream.
Expand Down
2 changes: 2 additions & 0 deletions tests/p2p/test_get_best_blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

class BaseGetBestBlockchainTestCase(SimulatorTestCase):

seed_config = 6

def _send_cmd(self, proto, cmd, payload=None):
if not payload:
line = '{}\r\n'.format(cmd)
Expand Down
42 changes: 21 additions & 21 deletions tests/p2p/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,9 +503,9 @@ def test_sync_metadata(self):
# check they have the same consensus
node_sync1 = conn.proto1.state.sync_agent
node_sync2 = conn.proto2.state.sync_agent
self.assertEqual(node_sync1.peer_height, height)
self.assertEqual(node_sync1.synced_height, height)
self.assertEqual(node_sync2.peer_height, height)
self.assertEqual(node_sync1.peer_best_block.height, height)
self.assertEqual(node_sync1.synced_block.height, height)
self.assertEqual(node_sync2.peer_best_block.height, height)
# 3 genesis + blocks + 8 txs
self.assertEqual(self.manager1.tx_storage.get_vertices_count(), height + 11)
self.assertEqual(manager2.tx_storage.get_vertices_count(), height + 11)
Expand All @@ -527,14 +527,14 @@ def test_tx_propagation_nat_peers(self):

node_sync1 = self.conn1.proto1.state.sync_agent
self.assertEqual(self.manager1.tx_storage.latest_timestamp, self.manager2.tx_storage.latest_timestamp)
self.assertEqual(node_sync1.peer_height, node_sync1.synced_height)
self.assertEqual(node_sync1.peer_height, self.manager1.tx_storage.get_height_best_block())
self.assertEqual(node_sync1.peer_best_block, node_sync1.synced_block)
self.assertEqual(node_sync1.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
self.assertConsensusEqual(self.manager1, self.manager2)

node_sync2 = self.conn2.proto1.state.sync_agent
self.assertEqual(self.manager2.tx_storage.latest_timestamp, self.manager3.tx_storage.latest_timestamp)
self.assertEqual(node_sync2.peer_height, node_sync2.synced_height)
self.assertEqual(node_sync2.peer_height, self.manager2.tx_storage.get_height_best_block())
self.assertEqual(node_sync2.peer_best_block, node_sync2.synced_block)
self.assertEqual(node_sync2.peer_best_block.height, self.manager2.tx_storage.get_height_best_block())
self.assertConsensusEqual(self.manager2, self.manager3)

def test_block_sync_new_blocks_and_txs(self):
Expand All @@ -560,8 +560,8 @@ def test_block_sync_new_blocks_and_txs(self):

node_sync = conn.proto1.state.sync_agent
self.assertEqual(self.manager1.tx_storage.latest_timestamp, manager2.tx_storage.latest_timestamp)
self.assertEqual(node_sync.peer_height, node_sync.synced_height)
self.assertEqual(node_sync.peer_height, self.manager1.tx_storage.get_height_best_block())
self.assertEqual(node_sync.peer_best_block, node_sync.synced_block)
self.assertEqual(node_sync.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
self.assertConsensusEqual(self.manager1, manager2)
self.assertConsensusValid(self.manager1)
self.assertConsensusValid(manager2)
Expand All @@ -581,8 +581,8 @@ def test_block_sync_many_new_blocks(self):
self.clock.advance(1)

node_sync = conn.proto1.state.sync_agent
self.assertEqual(node_sync.peer_height, node_sync.synced_height)
self.assertEqual(node_sync.peer_height, self.manager1.tx_storage.get_height_best_block())
self.assertEqual(node_sync.peer_best_block, node_sync.synced_block)
self.assertEqual(node_sync.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
self.assertConsensusEqual(self.manager1, manager2)
self.assertConsensusValid(self.manager1)
self.assertConsensusValid(manager2)
Expand All @@ -602,8 +602,8 @@ def test_block_sync_new_blocks(self):
self.clock.advance(1)

node_sync = conn.proto1.state.sync_agent
self.assertEqual(node_sync.peer_height, node_sync.synced_height)
self.assertEqual(node_sync.peer_height, self.manager1.tx_storage.get_height_best_block())
self.assertEqual(node_sync.peer_best_block, node_sync.synced_block)
self.assertEqual(node_sync.peer_best_block.height, self.manager1.tx_storage.get_height_best_block())
self.assertConsensusEqual(self.manager1, manager2)
self.assertConsensusValid(self.manager1)
self.assertConsensusValid(manager2)
Expand Down Expand Up @@ -664,9 +664,9 @@ def test_full_sync(self):

node_sync1 = conn.proto1.state.sync_agent
node_sync2 = conn.proto2.state.sync_agent
self.assertEqual(node_sync1.peer_height, common_height)
self.assertEqual(node_sync1.synced_height, common_height)
self.assertEqual(node_sync2.peer_height, common_height)
self.assertEqual(node_sync1.peer_best_block.height, common_height)
self.assertEqual(node_sync1.synced_block.height, common_height)
self.assertEqual(node_sync2.peer_best_block.height, common_height)
self.assertConsensusValid(self.manager1)
self.assertConsensusValid(manager2)
self.assertConsensusEqual(self.manager1, manager2)
Expand Down Expand Up @@ -715,9 +715,9 @@ def test_block_sync_checkpoints(self):
node_sync1 = conn.proto1.state.sync_agent
node_sync2 = conn.proto2.state.sync_agent

self.assertEqual(node_sync1.peer_height, TOTAL_BLOCKS)
self.assertEqual(node_sync1.synced_height, TOTAL_BLOCKS)
self.assertEqual(node_sync2.peer_height, len(blocks))
self.assertEqual(node_sync1.peer_best_block.height, TOTAL_BLOCKS)
self.assertEqual(node_sync1.synced_block.height, TOTAL_BLOCKS)
self.assertEqual(node_sync2.peer_best_block.height, len(blocks))
self.assertConsensusValid(self.manager1)
self.assertConsensusValid(manager2)

Expand All @@ -738,8 +738,8 @@ def test_block_sync_only_genesis(self):
self.clock.advance(1)

node_sync = conn.proto1.state.sync_agent
self.assertEqual(node_sync.synced_height, 0)
self.assertEqual(node_sync.peer_height, 0)
self.assertEqual(node_sync.synced_block.height, 0)
self.assertEqual(node_sync.peer_best_block.height, 0)

self.assertEqual(self.manager1.tx_storage.get_vertices_count(), 3)
self.assertEqual(manager2.tx_storage.get_vertices_count(), 3)
Expand Down
Loading

0 comments on commit 89d347e

Please sign in to comment.