diff --git a/hathor/p2p/sync_v2/agent.py b/hathor/p2p/sync_v2/agent.py index 0a4f362e7..409a5a3a1 100644 --- a/hathor/p2p/sync_v2/agent.py +++ b/hathor/p2p/sync_v2/agent.py @@ -27,17 +27,26 @@ from hathor.conf.get_settings import get_settings from hathor.p2p.messages import ProtocolMessages from hathor.p2p.sync_agent import SyncAgent +from hathor.p2p.sync_v2.blockchain_streaming_client import BlockchainStreamingClient, StreamingError from hathor.p2p.sync_v2.mempool import SyncMempoolManager -from hathor.p2p.sync_v2.streamers import DEFAULT_STREAMING_LIMIT, BlockchainStreaming, StreamEnd, TransactionsStreaming +from hathor.p2p.sync_v2.payloads import BestBlockPayload, GetNextBlocksPayload, GetTransactionsBFSPayload +from hathor.p2p.sync_v2.streamers import ( + DEFAULT_STREAMING_LIMIT, + BlockchainStreamingServer, + StreamEnd, + TransactionsStreamingServer, +) +from hathor.p2p.sync_v2.transaction_streaming_client import TransactionStreamingClient from hathor.transaction import BaseTransaction, Block, Transaction from hathor.transaction.base_transaction import tx_or_block_from_bytes from hathor.transaction.exceptions import HathorError from hathor.transaction.storage.exceptions import TransactionDoesNotExist from hathor.types import VertexId -from hathor.util import Reactor, collect_n +from hathor.util import Reactor, collect_n, not_none if TYPE_CHECKING: from hathor.p2p.protocol import HathorProtocol + from hathor.transaction.storage import TransactionStorage logger = get_logger() @@ -82,7 +91,7 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None self._settings = get_settings() self.protocol = protocol self.manager = protocol.node - self.tx_storage = protocol.node.tx_storage + self.tx_storage: 'TransactionStorage' = protocol.node.tx_storage self.state = PeerState.UNKNOWN self.DEFAULT_STREAMING_LIMIT = DEFAULT_STREAMING_LIMIT @@ -97,11 +106,6 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None # Create logger with context self.log = logger.new(peer=self.protocol.get_short_peer_id()) - # Extra - self._blk_size = 0 - self._blk_end_hash = self._settings.GENESIS_BLOCK_HASH - self._blk_max_quantity = 0 - # indicates whether we're receiving a stream from the peer self.receiving_stream = False @@ -117,18 +121,13 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None self._deferred_best_block: Optional[Deferred[_HeightInfo]] = None self._deferred_peer_block_hashes: Optional[Deferred[list[_HeightInfo]]] = None - # Deferreds used when we are receiving a streaming of vertices. - self._deferred_blockchain_streaming: Optional[Deferred[None]] = None - self._deferred_transactions_streaming: Optional[Deferred[None]] = None - - # When syncing blocks we start streaming with all peers - # so the moment I get some repeated blocks, I stop the download - # because it's probably a streaming that I've just received - self.max_repeated_blocks = 10 + # Clients to handle streaming messages. + self._blk_streaming_client: Optional[BlockchainStreamingClient] = None + self._tx_streaming_client: Optional[TransactionStreamingClient] = None - # Streaming objects - self.blockchain_streaming: Optional[BlockchainStreaming] = None - self.transactions_streaming: Optional[TransactionsStreaming] = None + # Streaming server objects + self._blk_streaming_server: Optional[BlockchainStreamingServer] = None + self._tx_streaming_server: Optional[TransactionsStreamingServer] = None # Whether the peers are synced, i.e. we have the same best block. # Notice that this flag ignores the mempool. @@ -137,11 +136,6 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None # Indicate whether the sync manager has been started. self._started: bool = False - # Saves the last received block from the block streaming # this is useful to be used when running the sync of - # transactions in the case when I am downloading a side chain. Starts at the genesis, which is common to all - # peers on the network - self._last_received_block: Optional[Block] = None - # Saves if I am in the middle of a mempool sync # we don't execute any sync while in the middle of it self.mempool_manager = SyncMempoolManager(self) @@ -159,9 +153,6 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None # Whether we propagate transactions or not self._is_relaying = False - # This stores the final height that we expect the last "get blocks" stream to end on - self._blk_end_height: Optional[int] = None - # Whether to sync with this peer self._is_enabled: bool = False @@ -328,7 +319,7 @@ def run_sync_transactions(self) -> Generator[Any, Any, None]: MAX_GET_TRANSACTIONS_BFS_LEN) # Start with the last received block and find the best block full validated in its chain - block = self._last_received_block + block = self._blk_streaming_client._last_received_block if self._blk_streaming_client else None if block is None: block = cast(Block, self.tx_storage.get_genesis(self._settings.GENESIS_BLOCK_HASH)) else: @@ -341,7 +332,10 @@ def run_sync_transactions(self) -> Generator[Any, Any, None]: self.log.info('run sync transactions', start=[i.hex() for i in needed_txs], end_block_hash=block.hash.hex(), end_block_height=block_height) - yield self.start_transactions_streaming(needed_txs, block.hash) + try: + yield self.start_transactions_streaming(needed_txs, block.hash, block.hash) + except StreamingError: + self.receiving_stream = False def get_my_best_block(self) -> _HeightInfo: """Return my best block info.""" @@ -407,10 +401,12 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]: synced_block=self.synced_block) # Sync from common block - yield self.start_blockchain_streaming(self.synced_block.id, - self.synced_block.height, - self.peer_best_block.id, - self.peer_best_block.height) + try: + yield self.start_blockchain_streaming(self.synced_block, + self.peer_best_block) + except StreamingError: + self.send_stop_block_streaming() + self.receiving_stream = False return False @@ -495,36 +491,14 @@ def handle_relay(self, payload: str) -> None: self.protocol.send_error_and_close_connection('RELAY: invalid value') return - def _setup_block_streaming(self, start_hash: bytes, start_height: int, end_hash: bytes, end_height: int, - reverse: bool) -> None: - """ Common setup before starting an outgoing block stream. - """ - self._blk_start_hash = start_hash - self._blk_start_height = start_height - self._blk_end_hash = end_hash - self._blk_end_height = end_height - self._blk_received = 0 - self._blk_repeated = 0 - raw_quantity = end_height - start_height + 1 - self._blk_max_quantity = -raw_quantity if reverse else raw_quantity - self._blk_prev_hash: Optional[bytes] = None - self._blk_stream_reverse = reverse - self._last_received_block = None - def start_blockchain_streaming(self, - start_hash: bytes, - start_height: int, - end_hash: bytes, - end_height: int) -> Deferred[None]: + start_block: _HeightInfo, + end_block: _HeightInfo) -> Deferred[StreamEnd]: """Request peer to start streaming blocks to us.""" - assert self._deferred_blockchain_streaming is None - self._setup_block_streaming(start_hash, start_height, end_hash, end_height, False) - quantity = end_height - start_height - self.log.info('get next blocks', start_height=start_height, end_height=end_height, quantity=quantity, - start_hash=start_hash.hex(), end_hash=end_hash.hex()) - self.send_get_next_blocks(start_hash, end_hash, quantity) - self._deferred_blockchain_streaming = Deferred() - return self._deferred_blockchain_streaming + self._blk_streaming_client = BlockchainStreamingClient(self, start_block, end_block) + quantity = self._blk_streaming_client._blk_max_quantity + self.send_get_next_blocks(start_block.id, end_block.id, quantity) + return self._blk_streaming_client.wait() def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) -> None: """ Helper to send a message. @@ -644,12 +618,12 @@ def handle_peer_block_hashes(self, payload: str) -> None: def send_get_next_blocks(self, start_hash: bytes, end_hash: bytes, quantity: int) -> None: """ Send a PEER-BLOCK-HASHES message. """ - payload = json.dumps(dict( - start_hash=start_hash.hex(), - end_hash=end_hash.hex(), + payload = GetNextBlocksPayload( + start_hash=start_hash, + end_hash=end_hash, quantity=quantity, - )) - self.send_message(ProtocolMessages.GET_NEXT_BLOCKS, payload) + ) + self.send_message(ProtocolMessages.GET_NEXT_BLOCKS, payload.json()) self.receiving_stream = True def handle_get_next_blocks(self, payload: str) -> None: @@ -659,11 +633,11 @@ def handle_get_next_blocks(self, payload: str) -> None: if self._is_streaming: self.protocol.send_error_and_close_connection('GET-NEXT-BLOCKS received before previous one finished') return - data = json.loads(payload) + data = GetNextBlocksPayload.parse_raw(payload) self.send_next_blocks( - start_hash=bytes.fromhex(data['start_hash']), - end_hash=bytes.fromhex(data['end_hash']), - quantity=data['quantity'], + start_hash=data.start_hash, + end_hash=data.end_hash, + quantity=data.quantity, ) def send_next_blocks(self, start_hash: bytes, end_hash: bytes, quantity: int) -> None: @@ -689,11 +663,11 @@ def send_next_blocks(self, start_hash: bytes, end_hash: bytes, quantity: int) -> # (tracked by issue #711) # self.send_message(ProtocolMessages.NOT_FOUND, start_hash.hex()) # return - if self.blockchain_streaming is not None and self.blockchain_streaming.is_running: - self.blockchain_streaming.stop() + if self._blk_streaming_server is not None and self._blk_streaming_server.is_running: + self._blk_streaming_server.stop() limit = min(quantity, self.DEFAULT_STREAMING_LIMIT) - self.blockchain_streaming = BlockchainStreaming(self, blk, end_hash, limit=limit) - self.blockchain_streaming.start() + self._blk_streaming_server = BlockchainStreamingServer(self, blk, end_hash, limit=limit) + self._blk_streaming_server.start() def send_blocks(self, blk: Block) -> None: """ Send a BLOCKS message. @@ -719,7 +693,7 @@ def handle_blocks_end(self, payload: str) -> None: This is important to know that the other peer will not send any BLOCKS messages anymore as a response to a previous command. """ - self.log.debug('recv BLOCKS-END', payload=payload, size=self._blk_size) + self.log.debug('recv BLOCKS-END', payload=payload) response_code = StreamEnd(int(payload)) self.receiving_stream = False @@ -730,10 +704,8 @@ def handle_blocks_end(self, payload: str) -> None: self.protocol.send_error_and_close_connection('Not expecting to receive BLOCKS-END message') return - assert self._deferred_blockchain_streaming is not None - self._deferred_blockchain_streaming.callback(None) - self._deferred_blockchain_streaming = None - + assert self._blk_streaming_client is not None + self._blk_streaming_client.handle_blocks_end(response_code) self.log.debug('block streaming ended', reason=str(response_code)) def handle_blocks(self, payload: str) -> None: @@ -752,74 +724,10 @@ def handle_blocks(self, payload: str) -> None: # Not a block. Punish peer? return blk.storage = self.tx_storage - assert blk.hash is not None - self._blk_received += 1 - if self._blk_received > self._blk_max_quantity + 1: - self.log.warn('too many blocks received', - blk_received=self._blk_received, - blk_max_quantity=self._blk_max_quantity, - last_block=blk.hash_hex) - # Too many blocks. Punish peer? - self.state = PeerState.ERROR - return - - if self.partial_vertex_exists(blk.hash): - # We reached a block we already have. Skip it. - self._blk_prev_hash = blk.hash - self._blk_repeated += 1 - if self.receiving_stream and self._blk_repeated > self.max_repeated_blocks: - self.log.debug('repeated block received', total_repeated=self._blk_repeated) - self.handle_many_repeated_blocks() - - # basic linearity validation, crucial for correctly predicting the next block's height - if self._blk_stream_reverse: - if self._last_received_block and blk.hash != self._last_received_block.get_block_parent_hash(): - self.handle_invalid_block('received block is not parent of previous block') - return - else: - if self._last_received_block and blk.get_block_parent_hash() != self._last_received_block.hash: - self.handle_invalid_block('received block is not child of previous block') - return - - try: - # this methods takes care of checking if the block already exists, - # it will take care of doing at least a basic validation - # self.log.debug('add new block', block=blk.hash_hex) - if self.partial_vertex_exists(blk.hash): - # XXX: early terminate? - self.log.debug('block early terminate?', blk_id=blk.hash.hex()) - else: - self.log.debug('block received', blk_id=blk.hash.hex()) - self.on_new_tx(blk, propagate_to_peers=False, quiet=True) - except HathorError: - self.handle_invalid_block(exc_info=True) - return - else: - self._last_received_block = blk - self._blk_repeated = 0 - # XXX: debugging log, maybe add timing info - if self._blk_received % 500 == 0: - self.log.debug('block streaming in progress', blocks_received=self._blk_received) - - def handle_invalid_block(self, msg: Optional[str] = None, *, exc_info: bool = False) -> None: - """ Call this method when receiving an invalid block. - """ - kwargs: dict[str, Any] = {} - if msg is not None: - kwargs['error'] = msg - if exc_info: - kwargs['exc_info'] = True - self.log.warn('invalid new block', **kwargs) - # Invalid block?! - self.state = PeerState.ERROR - - def handle_many_repeated_blocks(self) -> None: - """ Call this when a stream sends too many blocks in sequence that we already have. - """ - self.send_stop_block_streaming() - self.receiving_stream = False + assert self._blk_streaming_client is not None + self._blk_streaming_client.handle_blocks(blk) def send_stop_block_streaming(self) -> None: """ Send a STOP-BLOCK-STREAMING message. @@ -833,13 +741,13 @@ def handle_stop_block_streaming(self, payload: str) -> None: This means the remote peer wants to stop the current block stream. """ - if not self.blockchain_streaming or not self._is_streaming: + if not self._blk_streaming_server or not self._is_streaming: self.log.debug('got stop streaming message with no streaming running') return self.log.debug('got stop streaming message') - self.blockchain_streaming.stop() - self.blockchain_streaming = None + self._blk_streaming_server.stop() + self._blk_streaming_server = None def get_peer_best_block(self) -> Deferred[_HeightInfo]: """ Async call to get the remote peer's best block. @@ -856,43 +764,42 @@ def send_get_best_block(self) -> None: """ self.send_message(ProtocolMessages.GET_BEST_BLOCK) - def handle_get_best_block(self, payload: str) -> None: + def handle_get_best_block(self, _payload: str) -> None: """ Handle a GET-BEST-BLOCK message. """ best_block = self.tx_storage.get_best_block() meta = best_block.get_metadata() assert meta.validation.is_fully_connected() - data = {'block': best_block.hash_hex, 'height': meta.height} - self.send_message(ProtocolMessages.BEST_BLOCK, json.dumps(data)) + payload = BestBlockPayload( + block=not_none(best_block.hash), + height=not_none(meta.height), + ) + self.send_message(ProtocolMessages.BEST_BLOCK, payload.json()) def handle_best_block(self, payload: str) -> None: """ Handle a BEST-BLOCK message. """ - data = json.loads(payload) - _id = bytes.fromhex(data['block']) - height = data['height'] - best_block = _HeightInfo(height=height, id=_id) + data = BestBlockPayload.parse_raw(payload) + best_block = _HeightInfo(height=data.height, id=data.block) deferred = self._deferred_best_block self._deferred_best_block = None if deferred: deferred.callback(best_block) - def _setup_tx_streaming(self): - """ Common setup before starting an outgoing transaction stream. - """ - self._tx_received = 0 - self._tx_max_quantity = DEFAULT_STREAMING_LIMIT # XXX: maybe this is redundant - # XXX: what else can we add for checking if everything is going well? - - def start_transactions_streaming(self, start_from: list[bytes], until_first_block: bytes) -> Deferred[None]: + def start_transactions_streaming(self, + start_from: list[bytes], + first_block_hash: bytes, + last_block_hash: bytes) -> Deferred[StreamEnd]: """Request peer to start streaming transactions to us.""" - assert self._deferred_transactions_streaming is None - self.send_get_transactions_bfs(start_from, until_first_block) - self._deferred_transactions_streaming = Deferred() - return self._deferred_transactions_streaming - - def send_get_transactions_bfs(self, start_from: list[bytes], until_first_block: bytes) -> None: + self._tx_streaming_client = TransactionStreamingClient(self, start_from, first_block_hash, last_block_hash) + self.send_get_transactions_bfs(start_from, first_block_hash, last_block_hash) + return self._tx_streaming_client.wait() + + def send_get_transactions_bfs(self, + start_from: list[bytes], + first_block_hash: bytes, + last_block_hash: bytes) -> None: """ Send a GET-TRANSACTIONS-BFS message. This will request a BFS of all transactions starting from start_from list and walking back into parents/inputs. @@ -904,15 +811,19 @@ def send_get_transactions_bfs(self, start_from: list[bytes], until_first_block: height of until_first_block. The other peer will return an empty response if it doesn't have any of the transactions in start_from or if it doesn't have the until_first_block block. """ - self._setup_tx_streaming() start_from_hexlist = [tx.hex() for tx in start_from] - until_first_block_hex = until_first_block.hex() - self.log.debug('send_get_transactions_bfs', start_from=start_from_hexlist, last_block=until_first_block_hex) - payload = json.dumps(dict( - start_from=start_from_hexlist, - until_first_block=until_first_block_hex, - )) - self.send_message(ProtocolMessages.GET_TRANSACTIONS_BFS, payload) + first_block_hash_hex = first_block_hash.hex() + last_block_hash_hex = last_block_hash.hex() + self.log.debug('send_get_transactions_bfs', + start_from=start_from_hexlist, + first_block_hash=first_block_hash_hex, + last_block_hash=last_block_hash_hex) + payload = GetTransactionsBFSPayload( + start_from=start_from, + first_block_hash=first_block_hash, + last_block_hash=last_block_hash, + ) + self.send_message(ProtocolMessages.GET_TRANSACTIONS_BFS, payload.json()) self.receiving_stream = True def handle_get_transactions_bfs(self, payload: str) -> None: @@ -921,19 +832,18 @@ def handle_get_transactions_bfs(self, payload: str) -> None: if self._is_streaming: self.log.warn('ignore GET-TRANSACTIONS-BFS, already streaming') return - data = json.loads(payload) + data = GetTransactionsBFSPayload.parse_raw(payload) # XXX: todo verify this limit while parsing the payload. - start_from = data['start_from'] - if len(start_from) > MAX_GET_TRANSACTIONS_BFS_LEN: + if len(data.start_from) > MAX_GET_TRANSACTIONS_BFS_LEN: self.log.error('too many transactions in GET-TRANSACTIONS-BFS', state=self.state) self.protocol.send_error_and_close_connection('Too many transactions in GET-TRANSACTIONS-BFS') return - self.log.debug('handle_get_transactions_bfs', **data) - start_from = [bytes.fromhex(tx_hash_hex) for tx_hash_hex in start_from] - until_first_block = bytes.fromhex(data['until_first_block']) - self.send_transactions_bfs(start_from, until_first_block) + self.send_transactions_bfs(data.start_from, data.first_block_hash, data.last_block_hash) - def send_transactions_bfs(self, start_from: list[bytes], until_first_block: bytes) -> None: + def send_transactions_bfs(self, + start_from: list[bytes], + first_block_hash: bytes, + last_block_hash: bytes) -> None: """ Start a transactions BFS stream. """ start_from_txs = [] @@ -945,16 +855,24 @@ def send_transactions_bfs(self, start_from: list[bytes], until_first_block: byte self.log.debug('requested start_from_hash not found', start_from_hash=start_from_hash.hex()) self.send_message(ProtocolMessages.NOT_FOUND, start_from_hash.hex()) return - if not self.tx_storage.transaction_exists(until_first_block): + if not self.tx_storage.transaction_exists(first_block_hash): + # In case the tx does not exist we send a NOT-FOUND message + self.log.debug('requested first_block_hash not found', first_block_hash=first_block_hash.hex()) + self.send_message(ProtocolMessages.NOT_FOUND, first_block_hash.hex()) + return + if not self.tx_storage.transaction_exists(last_block_hash): # In case the tx does not exist we send a NOT-FOUND message - self.log.debug('requested until_first_block not found', until_first_block=until_first_block.hex()) - self.send_message(ProtocolMessages.NOT_FOUND, until_first_block.hex()) + self.log.debug('requested last_block_hash not found', last_block_hash=last_block_hash.hex()) + self.send_message(ProtocolMessages.NOT_FOUND, last_block_hash.hex()) return - if self.transactions_streaming is not None and self.transactions_streaming.is_running: - self.transactions_streaming.stop() - self.transactions_streaming = TransactionsStreaming(self, start_from_txs, until_first_block, - limit=self.DEFAULT_STREAMING_LIMIT) - self.transactions_streaming.start() + if self._tx_streaming_server is not None and self._tx_streaming_server.is_running: + self._tx_streaming_server.stop() + self._tx_streaming_server = TransactionsStreamingServer(self, + start_from_txs, + first_block_hash, + last_block_hash, + limit=self.DEFAULT_STREAMING_LIMIT) + self._tx_streaming_server.start() def send_transaction(self, tx: Transaction) -> None: """ Send a TRANSACTION message. @@ -973,7 +891,7 @@ def send_transactions_end(self, response_code: StreamEnd) -> None: def handle_transactions_end(self, payload: str) -> None: """ Handle a TRANSACTIONS-END message. """ - self.log.debug('recv TRANSACTIONS-END', payload=payload, size=self._blk_size) + self.log.debug('recv TRANSACTIONS-END', payload=payload) response_code = StreamEnd(int(payload)) self.receiving_stream = False @@ -984,10 +902,8 @@ def handle_transactions_end(self, payload: str) -> None: self.protocol.send_error_and_close_connection('Not expecting to receive TRANSACTIONS-END message') return - assert self._deferred_transactions_streaming is not None - self._deferred_transactions_streaming.callback(None) - self._deferred_transactions_streaming = None - + assert self._tx_streaming_client is not None + self._tx_streaming_client.handle_transactions_end(response_code) self.log.debug('transaction streaming ended', reason=str(response_code)) def handle_transaction(self, payload: str) -> None: @@ -1004,33 +920,8 @@ def handle_transaction(self, payload: str) -> None: # Not a transaction. Punish peer? return - self._tx_received += 1 - if self._tx_received > self._tx_max_quantity + 1: - self.log.warn('too many txs received') - self.state = PeerState.ERROR - return - - try: - # this methods takes care of checking if the tx already exists, it will take care of doing at least - # a basic validation - # self.log.debug('add new tx', tx=tx.hash_hex) - if self.partial_vertex_exists(tx.hash): - # XXX: early terminate? - self.log.debug('tx early terminate?', tx_id=tx.hash.hex()) - else: - self.log.debug('tx received', tx_id=tx.hash.hex()) - self.on_new_tx(tx, propagate_to_peers=False, quiet=True, reject_locked_reward=True) - except HathorError: - self.log.warn('invalid new tx', exc_info=True) - # Invalid block?! - # Invalid transaction?! - # Maybe stop syncing and punish peer. - self.state = PeerState.ERROR - return - else: - # XXX: debugging log, maybe add timing info - if self._tx_received % 100 == 0: - self.log.debug('tx streaming in progress', txs_received=self._tx_received) + assert self._tx_streaming_client is not None + self._tx_streaming_client.handle_transaction(tx) @inlineCallbacks def get_tx(self, tx_id: bytes) -> Generator[Deferred, Any, BaseTransaction]: diff --git a/hathor/p2p/sync_v2/blockchain_streaming_client.py b/hathor/p2p/sync_v2/blockchain_streaming_client.py new file mode 100644 index 000000000..39aca043d --- /dev/null +++ b/hathor/p2p/sync_v2/blockchain_streaming_client.py @@ -0,0 +1,141 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import TYPE_CHECKING, Optional + +from structlog import get_logger +from twisted.internet.defer import Deferred + +from hathor.p2p.sync_v2.exception import ( + BlockNotConnectedToPreviousBlock, + InvalidVertexError, + StreamingError, + TooManyRepeatedVerticesError, + TooManyVerticesReceivedError, +) +from hathor.p2p.sync_v2.streamers import StreamEnd +from hathor.transaction import Block +from hathor.transaction.exceptions import HathorError +from hathor.types import VertexId + +if TYPE_CHECKING: + from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo + +logger = get_logger() + + +class BlockchainStreamingClient: + def __init__(self, sync_agent: 'NodeBlockSync', start_block: '_HeightInfo', end_block: '_HeightInfo') -> None: + self.sync_agent = sync_agent + self.protocol = self.sync_agent.protocol + self.tx_storage = self.sync_agent.tx_storage + self.manager = self.sync_agent.manager + + self.log = logger.new(peer=self.protocol.get_short_peer_id()) + + self.start_block = start_block + self.end_block = end_block + + # When syncing blocks we start streaming with all peers + # so the moment I get some repeated blocks, I stop the download + # because it's probably a streaming that I've already received + self.max_repeated_blocks = 10 + + self._deferred: Deferred[StreamEnd] = Deferred() + + self._blk_received: int = 0 + self._blk_repeated: int = 0 + + self._blk_max_quantity = self.end_block.height - self.start_block.height + 1 + self._reverse: bool = False + if self._blk_max_quantity < 0: + self._blk_max_quantity = -self._blk_max_quantity + self._reverse = True + + self._last_received_block: Optional[Block] = None + + self._partial_blocks: list[Block] = [] + + def wait(self) -> Deferred[StreamEnd]: + """Return the deferred.""" + return self._deferred + + def fails(self, reason: 'StreamingError') -> None: + """Fail the execution by resolving the deferred with an error.""" + self._deferred.errback(reason) + + def partial_vertex_exists(self, vertex_id: VertexId) -> bool: + """Return true if the vertex exists no matter its validation state.""" + with self.tx_storage.allow_partially_validated_context(): + return self.tx_storage.transaction_exists(vertex_id) + + def handle_blocks(self, blk: Block) -> None: + """This method is called by the sync agent when a BLOCKS message is received.""" + if self._deferred.called: + return + + self._blk_received += 1 + if self._blk_received > self._blk_max_quantity: + self.log.warn('too many blocks received', + blk_received=self._blk_received, + blk_max_quantity=self._blk_max_quantity) + self.fails(TooManyVerticesReceivedError()) + return + + assert blk.hash is not None + is_duplicated = False + if self.partial_vertex_exists(blk.hash): + # We reached a block we already have. Skip it. + self._blk_repeated += 1 + is_duplicated = True + if self._blk_repeated > self.max_repeated_blocks: + self.log.debug('too many repeated block received', total_repeated=self._blk_repeated) + self.fails(TooManyRepeatedVerticesError()) + + # basic linearity validation, crucial for correctly predicting the next block's height + if self._reverse: + if self._last_received_block and blk.hash != self._last_received_block.get_block_parent_hash(): + self.fails(BlockNotConnectedToPreviousBlock()) + return + else: + if self._last_received_block and blk.get_block_parent_hash() != self._last_received_block.hash: + self.fails(BlockNotConnectedToPreviousBlock()) + return + + try: + # this methods takes care of checking if the block already exists, + # it will take care of doing at least a basic validation + if is_duplicated: + self.log.debug('block early terminate?', blk_id=blk.hash.hex()) + else: + self.log.debug('block received', blk_id=blk.hash.hex()) + self.sync_agent.on_new_tx(blk, propagate_to_peers=False, quiet=True) + except HathorError: + self.fails(InvalidVertexError()) + return + else: + self._last_received_block = blk + self._blk_repeated = 0 + # XXX: debugging log, maybe add timing info + if self._blk_received % 500 == 0: + self.log.debug('block streaming in progress', blocks_received=self._blk_received) + + if not blk.can_validate_full(): + self._partial_blocks.append(blk) + + def handle_blocks_end(self, response_code: StreamEnd) -> None: + """This method is called by the sync agent when a BLOCKS-END message is received.""" + if self._deferred.called: + return + self._deferred.callback(response_code) diff --git a/hathor/p2p/sync_v2/exception.py b/hathor/p2p/sync_v2/exception.py new file mode 100644 index 000000000..54a66dd96 --- /dev/null +++ b/hathor/p2p/sync_v2/exception.py @@ -0,0 +1,37 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class StreamingError(Exception): + """Base error for sync-v2 streaming.""" + pass + + +class TooManyVerticesReceivedError(StreamingError): + """Raised when the other peer sent too many vertices.""" + pass + + +class TooManyRepeatedVerticesError(StreamingError): + """Raised when the other peer sent too many repeated vertices.""" + pass + + +class BlockNotConnectedToPreviousBlock(StreamingError): + """Raised when the received block is not connected to the previous one.""" + pass + + +class InvalidVertexError(StreamingError): + """Raised when the received vertex fails validation.""" + pass diff --git a/hathor/p2p/sync_v2/payloads.py b/hathor/p2p/sync_v2/payloads.py new file mode 100644 index 000000000..002b2d67f --- /dev/null +++ b/hathor/p2p/sync_v2/payloads.py @@ -0,0 +1,73 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pydantic import validator + +from hathor.types import VertexId +from hathor.utils.pydantic import BaseModel + + +class PayloadBaseModel(BaseModel): + + @classmethod + def convert_hex_to_bytes(cls, value: str | VertexId) -> VertexId: + """Convert a string in hex format to bytes. If bytes are given, it does nothing.""" + if isinstance(value, str): + return bytes.fromhex(value) + elif isinstance(value, VertexId): + return value + raise ValueError('invalid type') + + class Config: + json_encoders = { + VertexId: lambda x: x.hex() + } + + +class GetNextBlocksPayload(PayloadBaseModel): + """GET-NEXT-BLOCKS message is used to request a stream of blocks in the best blockchain.""" + + start_hash: VertexId + end_hash: VertexId + quantity: int + + @validator('start_hash', 'end_hash', pre=True) + def validate_bytes_fields(cls, value: str | bytes) -> VertexId: + return cls.convert_hex_to_bytes(value) + + +class BestBlockPayload(PayloadBaseModel): + """BEST-BLOCK message is used to send information about the current best block.""" + + block: VertexId + height: int + + @validator('block', pre=True) + def validate_bytes_fields(cls, value: str | VertexId) -> VertexId: + return cls.convert_hex_to_bytes(value) + + +class GetTransactionsBFSPayload(PayloadBaseModel): + """GET-TRANSACTIONS-BFS message is used to request a stream of transactions confirmed by blocks.""" + start_from: list[VertexId] + first_block_hash: VertexId + last_block_hash: VertexId + + @validator('first_block_hash', 'last_block_hash', pre=True) + def validate_bytes_fields(cls, value: str | VertexId) -> VertexId: + return cls.convert_hex_to_bytes(value) + + @validator('start_from', pre=True, each_item=True) + def validate_start_from(cls, value: str | VertexId) -> VertexId: + return cls.convert_hex_to_bytes(value) diff --git a/hathor/p2p/sync_v2/streamers.py b/hathor/p2p/sync_v2/streamers.py index 1c8fac80e..faefcd65b 100644 --- a/hathor/p2p/sync_v2/streamers.py +++ b/hathor/p2p/sync_v2/streamers.py @@ -55,7 +55,7 @@ def __str__(self): @implementer(IPushProducer) -class _StreamingBase: +class _StreamingServerBase: def __init__(self, node_sync: 'NodeBlockSync', *, limit: int = DEFAULT_STREAMING_LIMIT): self.node_sync = node_sync self.protocol: 'HathorProtocol' = node_sync.protocol @@ -123,7 +123,7 @@ def stopProducing(self) -> None: self.pauseProducing() -class BlockchainStreaming(_StreamingBase): +class BlockchainStreamingServer(_StreamingServerBase): def __init__(self, node_sync: 'NodeBlockSync', start_block: Block, end_hash: bytes, *, limit: int = DEFAULT_STREAMING_LIMIT, reverse: bool = False): super().__init__(node_sync, limit=limit) @@ -186,12 +186,17 @@ def send_next(self) -> None: self.schedule_if_needed() -class TransactionsStreaming(_StreamingBase): +class TransactionsStreamingServer(_StreamingServerBase): """Streams all transactions confirmed by the given block, from right to left (decreasing timestamp). """ - def __init__(self, node_sync: 'NodeBlockSync', start_from: list[BaseTransaction], last_block_hash: bytes, - *, limit: int = DEFAULT_STREAMING_LIMIT): + def __init__(self, + node_sync: 'NodeBlockSync', + start_from: list[BaseTransaction], + first_block_hash: bytes, + last_block_hash: bytes, + *, + limit: int = DEFAULT_STREAMING_LIMIT) -> None: # XXX: is limit needed for tx streaming? Or let's always send all txs for # a block? Very unlikely we'll reach this limit super().__init__(node_sync, limit=limit) @@ -199,6 +204,7 @@ def __init__(self, node_sync: 'NodeBlockSync', start_from: list[BaseTransaction] assert len(start_from) > 0 assert start_from[0].storage is not None self.storage = start_from[0].storage + self.first_block_hash = first_block_hash self.last_block_hash = last_block_hash self.last_block_height = 0 diff --git a/hathor/p2p/sync_v2/transaction_streaming_client.py b/hathor/p2p/sync_v2/transaction_streaming_client.py new file mode 100644 index 000000000..e3921f4a4 --- /dev/null +++ b/hathor/p2p/sync_v2/transaction_streaming_client.py @@ -0,0 +1,120 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import TYPE_CHECKING + +from structlog import get_logger +from twisted.internet.defer import Deferred + +from hathor.p2p.sync_v2.exception import ( + InvalidVertexError, + StreamingError, + TooManyRepeatedVerticesError, + TooManyVerticesReceivedError, +) +from hathor.p2p.sync_v2.streamers import DEFAULT_STREAMING_LIMIT, StreamEnd +from hathor.transaction import BaseTransaction +from hathor.transaction.exceptions import HathorError +from hathor.types import VertexId + +if TYPE_CHECKING: + from hathor.p2p.sync_v2.agent import NodeBlockSync + +logger = get_logger() + + +class TransactionStreamingClient: + def __init__(self, + sync_agent: 'NodeBlockSync', + start_from: list[bytes], + start_block: bytes, + end_block: bytes) -> None: + self.sync_agent = sync_agent + self.protocol = self.sync_agent.protocol + self.tx_storage = self.sync_agent.tx_storage + self.manager = self.sync_agent.manager + + self.log = logger.new(peer=self.protocol.get_short_peer_id()) + + self.start_from = start_from + self.start_block = start_block + self.end_block = end_block + + # Let's keep it at "infinity" until a known issue is fixed. + self.max_repeated_transactions = 1_000_000 + + self._deferred: Deferred[StreamEnd] = Deferred() + + self._tx_received: int = 0 + self._tx_repeated: int = 0 + + self._tx_max_quantity = DEFAULT_STREAMING_LIMIT + + def wait(self) -> Deferred[StreamEnd]: + """Return the deferred.""" + return self._deferred + + def fails(self, reason: 'StreamingError') -> None: + """Fail the execution by resolving the deferred with an error.""" + self._deferred.errback(reason) + + def partial_vertex_exists(self, vertex_id: VertexId) -> bool: + """Return true if the vertex exists no matter its validation state.""" + with self.tx_storage.allow_partially_validated_context(): + return self.tx_storage.transaction_exists(vertex_id) + + def handle_transaction(self, tx: BaseTransaction) -> None: + """This method is called by the sync agent when a TRANSACTION message is received.""" + if self._deferred.called: + return + + self._tx_received += 1 + if self._tx_received > self._tx_max_quantity: + self.log.warn('too many transactions received', + tx_received=self._tx_received, + tx_max_quantity=self._tx_max_quantity) + self.fails(TooManyVerticesReceivedError()) + return + + assert tx.hash is not None + is_duplicated = False + if self.partial_vertex_exists(tx.hash): + # We reached a block we already have. Skip it. + self._tx_repeated += 1 + is_duplicated = True + if self._tx_repeated > self.max_repeated_transactions: + self.log.debug('too many repeated transactions received', total_repeated=self._tx_repeated) + self.fails(TooManyRepeatedVerticesError()) + + try: + # this methods takes care of checking if the block already exists, + # it will take care of doing at least a basic validation + if is_duplicated: + self.log.debug('tx early terminate?', tx_id=tx.hash.hex()) + else: + self.log.debug('tx received', tx_id=tx.hash.hex()) + self.sync_agent.on_new_tx(tx, propagate_to_peers=False, quiet=True, reject_locked_reward=True) + except HathorError: + self.fails(InvalidVertexError()) + return + else: + # XXX: debugging log, maybe add timing info + if self._tx_received % 100 == 0: + self.log.debug('tx streaming in progress', txs_received=self._tx_received) + + def handle_transactions_end(self, response_code: StreamEnd) -> None: + """This method is called by the sync agent when a TRANSACTIONS-END message is received.""" + if self._deferred.called: + return + self._deferred.callback(response_code) diff --git a/tests/p2p/test_protocol.py b/tests/p2p/test_protocol.py index 3aaf098a7..0cf572ec6 100644 --- a/tests/p2p/test_protocol.py +++ b/tests/p2p/test_protocol.py @@ -425,7 +425,11 @@ def test_get_data(self): self.assertAndStepConn(self.conn, b'^RELAY') self.assertIsConnected() missing_tx = '00000000228dfcd5dec1c9c6263f6430a5b4316bb9e3decb9441a6414bfd8697' - payload = {'until_first_block': missing_tx, 'start_from': [settings.GENESIS_BLOCK_HASH.hex()]} + payload = { + 'first_block_hash': missing_tx, + 'last_block_hash': missing_tx, + 'start_from': [settings.GENESIS_BLOCK_HASH.hex()] + } yield self._send_cmd(self.conn.proto1, 'GET-TRANSACTIONS-BFS', json_dumps(payload)) self._check_result_only_cmd(self.conn.peek_tr1_value(), b'NOT-FOUND') self.conn.run_one_step() diff --git a/tests/p2p/test_sync_v2.py b/tests/p2p/test_sync_v2.py index ed4412f25..4272534e2 100644 --- a/tests/p2p/test_sync_v2.py +++ b/tests/p2p/test_sync_v2.py @@ -228,15 +228,15 @@ def test_exceeds_streaming_and_mempool_limits(self) -> None: sync1 = conn12.proto1.state.sync_agent sync1.DEFAULT_STREAMING_LIMIT = 30 sync1.mempool_manager.MAX_STACK_LENGTH = 30 - self.assertIsNone(sync1.blockchain_streaming) - self.assertIsNone(sync1.transactions_streaming) + self.assertIsNone(sync1._blk_streaming_server) + self.assertIsNone(sync1._tx_streaming_server) # Change manager2 default streaming and mempool limits. sync2 = conn12.proto2.state.sync_agent sync2.DEFAULT_STREAMING_LIMIT = 50 sync2.mempool_manager.MAX_STACK_LENGTH = 50 - self.assertIsNone(sync2.blockchain_streaming) - self.assertIsNone(sync2.transactions_streaming) + self.assertIsNone(sync2._blk_streaming_server) + self.assertIsNone(sync2._tx_streaming_server) # Run until fully synced. # trigger = StopWhenTrue(sync2.is_synced)