diff --git a/hathor/p2p/sync_v2/agent.py b/hathor/p2p/sync_v2/agent.py index 01519820a..eff6eda1d 100644 --- a/hathor/p2p/sync_v2/agent.py +++ b/hathor/p2p/sync_v2/agent.py @@ -22,7 +22,7 @@ from structlog import get_logger from twisted.internet.defer import Deferred, inlineCallbacks -from twisted.internet.task import LoopingCall +from twisted.internet.task import LoopingCall, deferLater from hathor.conf.get_settings import get_settings from hathor.p2p.messages import ProtocolMessages @@ -569,10 +569,12 @@ def find_best_common_block(self, self.log.debug('find_best_common_block n-ary search finished', lo=lo, hi=hi) return lo - def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> None: + @inlineCallbacks + def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> Generator[Any, Any, None]: """This method is called when a block and its transactions are downloaded.""" for tx in vertex_list: self.manager.on_new_tx(tx, propagate_to_peers=False, fails_silently=False) + yield deferLater(self.reactor, 0, lambda: None) self.manager.on_new_tx(blk, propagate_to_peers=False, fails_silently=False) diff --git a/hathor/p2p/sync_v2/transaction_streaming_client.py b/hathor/p2p/sync_v2/transaction_streaming_client.py index e41560e12..4acbb8916 100644 --- a/hathor/p2p/sync_v2/transaction_streaming_client.py +++ b/hathor/p2p/sync_v2/transaction_streaming_client.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Iterator +from collections import deque +from typing import TYPE_CHECKING, Any, Generator, Iterator, Optional from structlog import get_logger -from twisted.internet.defer import Deferred +from twisted.internet.defer import Deferred, inlineCallbacks from hathor.p2p.sync_v2.exception import ( InvalidVertexError, @@ -45,23 +46,43 @@ def __init__(self, self.protocol = self.sync_agent.protocol self.tx_storage = self.sync_agent.tx_storage self.manager = self.sync_agent.manager + self.reactor = self.manager.reactor self.log = logger.new(peer=self.protocol.get_short_peer_id()) + # List of blocks from which we will receive transactions. self.partial_blocks = partial_blocks + # True if we are processing a transaction. + self._is_processing: bool = False + + # Deferred return to the sync agent. self._deferred: Deferred[StreamEnd] = Deferred() + # Number of transactions received. self._tx_received: int = 0 + # Maximum number of transactions to be received. self._tx_max_quantity = limit + # Queue of transactions waiting to be processed. + self._queue: deque[BaseTransaction] = deque() + + # Keeps the response code if the streaming has ended. + self._response_code: Optional[StreamEnd] = None + + # Index to the current block. self._idx: int = 0 - self._buffer: list[VertexId] = [] + + # Set of hashes we are waiting to receive. self._waiting_for: set[VertexId] = set() + + # In-memory database of transactions already received but still + # waiting for dependencies. self._db: dict[VertexId, BaseTransaction] = {} self._prepare_block(self.partial_blocks[0]) + assert self._waiting_for def wait(self) -> Deferred[StreamEnd]: """Return the deferred.""" @@ -92,9 +113,37 @@ def handle_transaction(self, tx: BaseTransaction) -> None: return assert tx.hash is not None - self.log.debug('tx received', tx_id=tx.hash.hex()) + self._queue.append(tx) + assert len(self._queue) <= self._tx_max_quantity + if not self._is_processing: + self.reactor.callLater(0, self.process_queue) + + @inlineCallbacks + def process_queue(self) -> Generator[Any, Any, None]: + """Process next transaction in the queue.""" + if self._is_processing: + return + + if not self._queue: + self.check_end() + return + + self._is_processing = True + try: + tx = self._queue.popleft() + yield self._process_transaction(tx) + finally: + self._is_processing = False + + self.reactor.callLater(0, self.process_queue) + + @inlineCallbacks + def _process_transaction(self, tx: BaseTransaction) -> Generator[Any, Any, None]: + """Process transaction.""" + assert tx.hash is not None + # Run basic verification. if not tx.is_genesis: try: @@ -120,11 +169,13 @@ def handle_transaction(self, tx: BaseTransaction) -> None: self._waiting_for.add(dep) self._db[tx.hash] = tx - self._buffer.append(tx.hash) if not self._waiting_for: self.log.debug('no pending dependencies, processing buffer') - self._execute_and_prepare_next() + while not self._waiting_for: + result = yield self._execute_and_prepare_next() + if not result: + break else: self.log.debug('pending dependencies', counter=len(self._waiting_for)) @@ -144,30 +195,45 @@ def handle_transactions_end(self, response_code: StreamEnd) -> None: """This method is called by the sync agent when a TRANSACTIONS-END message is received.""" if self._deferred.called: return - self.log.info('transactions streaming ended', reason=response_code, waiting_for=len(self._waiting_for)) - self._deferred.callback(response_code) + assert self._response_code is None + self._response_code = response_code + self.check_end() + + def check_end(self) -> None: + """Check if the streaming has ended.""" + if self._response_code is None: + return + + if self._queue: + return + + self.log.info('transactions streaming ended', reason=self._response_code, waiting_for=len(self._waiting_for)) + self._deferred.callback(self._response_code) - def _execute_and_prepare_next(self) -> None: + @inlineCallbacks + def _execute_and_prepare_next(self) -> Generator[Any, Any, bool]: """Add the block and its vertices to the DAG.""" assert not self._waiting_for blk = self.partial_blocks[self._idx] - vertex_list = [self._db[_id] for _id in self._buffer] + vertex_list = list(self._db.values()) vertex_list.sort(key=lambda v: v.timestamp) try: - self.sync_agent.on_block_complete(blk, vertex_list) + yield self.sync_agent.on_block_complete(blk, vertex_list) except HathorError as e: self.fails(InvalidVertexError(repr(e))) - return + return False self._idx += 1 - if self._idx < len(self.partial_blocks): - self._prepare_block(self.partial_blocks[self._idx]) + if self._idx >= len(self.partial_blocks): + return False + + self._prepare_block(self.partial_blocks[self._idx]) + return True def _prepare_block(self, blk: 'Block') -> None: """Reset everything for the next block. It also adds blocks that have no dependencies.""" - self._buffer.clear() self._waiting_for.clear() self._db.clear() @@ -175,7 +241,3 @@ def _prepare_block(self, blk: 'Block') -> None: for dep in blk.get_all_dependencies(): if not self.tx_storage.transaction_exists(dep): self._waiting_for.add(dep) - - # If block is ready to be added then do it. - if not self._waiting_for: - self._execute_and_prepare_next()