diff --git a/hathor/p2p/sync_v2/agent.py b/hathor/p2p/sync_v2/agent.py index 24ab4516d..89468932a 100644 --- a/hathor/p2p/sync_v2/agent.py +++ b/hathor/p2p/sync_v2/agent.py @@ -494,11 +494,12 @@ def start_blockchain_streaming(self, start_block: _HeightInfo, end_block: _HeightInfo) -> Deferred[StreamEnd]: """Request peer to start streaming blocks to us.""" - self.log.info('requesting blocks streaming', - start_block=start_block, - end_block=end_block) self._blk_streaming_client = BlockchainStreamingClient(self, start_block, end_block) quantity = self._blk_streaming_client._blk_max_quantity + self.log.info('requesting blocks streaming', + start_block=start_block, + end_block=end_block, + quantity=quantity) self.send_get_next_blocks(start_block.id, end_block.id, quantity) return self._blk_streaming_client.wait() @@ -579,11 +580,14 @@ def find_best_common_block(self, @inlineCallbacks def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> Generator[Any, Any, None]: """This method is called when a block and its transactions are downloaded.""" + # Note: Any vertex and block could have already been added by another concurrent syncing peer. for tx in vertex_list: - self.manager.on_new_tx(tx, propagate_to_peers=False, fails_silently=False) + if not self.tx_storage.transaction_exists(not_none(tx.hash)): + self.manager.on_new_tx(tx, propagate_to_peers=False, fails_silently=False) yield deferLater(self.reactor, 0, lambda: None) - self.manager.on_new_tx(blk, propagate_to_peers=False, fails_silently=False) + if not self.tx_storage.transaction_exists(not_none(blk.hash)): + self.manager.on_new_tx(blk, propagate_to_peers=False, fails_silently=False) def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[_HeightInfo]]: """ Returns the peer's block hashes in the given heights. diff --git a/hathor/p2p/sync_v2/streamers.py b/hathor/p2p/sync_v2/streamers.py index b51f7030c..22dbd8360 100644 --- a/hathor/p2p/sync_v2/streamers.py +++ b/hathor/p2p/sync_v2/streamers.py @@ -180,22 +180,14 @@ def send_next(self) -> None: if cur.hash == self.end_hash: # only send the last when not reverse if not self.reverse: - self.log.debug('send next block', blk_id=cur.hash.hex()) + self.log.debug('send next block', height=cur.get_height(), blk_id=cur.hash.hex()) self.sync_agent.send_blocks(cur) self.sync_agent.stop_blk_streaming_server(StreamEnd.END_HASH_REACHED) return - if self.counter >= self.limit: - # only send the last when not reverse - if not self.reverse: - self.log.debug('send next block', blk_id=cur.hash.hex()) - self.sync_agent.send_blocks(cur) - self.sync_agent.stop_blk_streaming_server(StreamEnd.LIMIT_EXCEEDED) - return - self.counter += 1 - self.log.debug('send next block', blk_id=cur.hash.hex()) + self.log.debug('send next block', height=cur.get_height(), blk_id=cur.hash.hex()) self.sync_agent.send_blocks(cur) if self.reverse: @@ -208,6 +200,10 @@ def send_next(self) -> None: self.sync_agent.stop_blk_streaming_server(StreamEnd.NO_MORE_BLOCKS) return + if self.counter >= self.limit: + self.sync_agent.stop_blk_streaming_server(StreamEnd.LIMIT_EXCEEDED) + return + class TransactionsStreamingServer(_StreamingServerBase): """Streams all transactions confirmed by the given block, from right to left (decreasing timestamp). diff --git a/hathor/p2p/sync_v2/transaction_streaming_client.py b/hathor/p2p/sync_v2/transaction_streaming_client.py index 4ae05cdd7..aa6bc5dee 100644 --- a/hathor/p2p/sync_v2/transaction_streaming_client.py +++ b/hathor/p2p/sync_v2/transaction_streaming_client.py @@ -13,7 +13,7 @@ # limitations under the License. from collections import deque -from typing import TYPE_CHECKING, Any, Generator, Iterator, Optional +from typing import TYPE_CHECKING, Any, Generator, Optional from structlog import get_logger from twisted.internet.defer import Deferred, inlineCallbacks @@ -27,6 +27,7 @@ from hathor.p2p.sync_v2.streamers import StreamEnd from hathor.transaction import BaseTransaction from hathor.transaction.exceptions import HathorError, TxValidationError +from hathor.util import not_none from hathor.types import VertexId if TYPE_CHECKING: @@ -116,9 +117,9 @@ def handle_transaction(self, tx: BaseTransaction) -> None: assert tx.hash is not None self.log.debug('tx received', tx_id=tx.hash.hex()) - self._queue.append(tx) assert len(self._queue) <= self._tx_max_quantity + if not self._is_processing: self.reactor.callLater(0, self.process_queue) @@ -135,6 +136,7 @@ def process_queue(self) -> Generator[Any, Any, None]: self._is_processing = True try: tx = self._queue.popleft() + self.log.debug('processing tx', tx_id=not_none(tx.hash).hex()) yield self._process_transaction(tx) finally: self._is_processing = False @@ -160,18 +162,18 @@ def _process_transaction(self, tx: BaseTransaction) -> Generator[Any, Any, None] if tx.hash in self._db: # This case might happen during a resume, so we just log and keep syncing. self.log.debug('duplicated vertex received', tx_id=tx.hash.hex()) + self.update_dependencies(tx) elif tx.hash in self._existing_deps: # This case might happen if we already have the transaction from another sync. self.log.debug('existing vertex received', tx_id=tx.hash.hex()) + self.update_dependencies(tx) else: self.log.info('unexpected vertex received', tx_id=tx.hash.hex()) self.fails(UnexpectedVertex(tx.hash.hex())) return self._waiting_for.remove(tx.hash) - for dep in self.get_missing_deps(tx): - self.log.debug('adding dependency', tx_id=tx.hash.hex(), dep=dep.hex()) - self._waiting_for.add(dep) + self.update_dependencies(tx) self._db[tx.hash] = tx @@ -187,15 +189,13 @@ def _process_transaction(self, tx: BaseTransaction) -> Generator[Any, Any, None] if self._tx_received % 100 == 0: self.log.debug('tx streaming in progress', txs_received=self._tx_received) - def get_missing_deps(self, tx: BaseTransaction) -> Iterator[bytes]: - """Return missing dependencies.""" + def update_dependencies(self, tx: BaseTransaction) -> None: + """Update _existing_deps and _waiting_for with the dependencies.""" for dep in tx.get_all_dependencies(): - if self.tx_storage.transaction_exists(dep): + if self.tx_storage.transaction_exists(dep) or dep in self._db: self._existing_deps.add(dep) - continue - if dep in self._db: - continue - yield dep + else: + self._waiting_for.add(dep) def handle_transactions_end(self, response_code: StreamEnd) -> None: """This method is called by the sync agent when a TRANSACTIONS-END message is received."""