Skip to content

Commit

Permalink
fix(sync-v2): Fix issues caused by concurrent syncing peers
Browse files Browse the repository at this point in the history
  • Loading branch information
msbrogli committed Nov 11, 2023
1 parent 090bd71 commit 22b2093
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 26 deletions.
14 changes: 9 additions & 5 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,11 +494,12 @@ def start_blockchain_streaming(self,
start_block: _HeightInfo,
end_block: _HeightInfo) -> Deferred[StreamEnd]:
"""Request peer to start streaming blocks to us."""
self.log.info('requesting blocks streaming',
start_block=start_block,
end_block=end_block)
self._blk_streaming_client = BlockchainStreamingClient(self, start_block, end_block)
quantity = self._blk_streaming_client._blk_max_quantity
self.log.info('requesting blocks streaming',
start_block=start_block,
end_block=end_block,
quantity=quantity)
self.send_get_next_blocks(start_block.id, end_block.id, quantity)
return self._blk_streaming_client.wait()

Expand Down Expand Up @@ -579,11 +580,14 @@ def find_best_common_block(self,
@inlineCallbacks
def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> Generator[Any, Any, None]:
"""This method is called when a block and its transactions are downloaded."""
# Note: Any vertex and block could have already been added by another concurrent syncing peer.
for tx in vertex_list:
self.manager.on_new_tx(tx, propagate_to_peers=False, fails_silently=False)
if not self.tx_storage.transaction_exists(tx.hash):
self.manager.on_new_tx(tx, propagate_to_peers=False, fails_silently=False)
yield deferLater(self.reactor, 0, lambda: None)

self.manager.on_new_tx(blk, propagate_to_peers=False, fails_silently=False)
if not self.tx_storage.transaction_exists(blk.hash):
self.manager.on_new_tx(blk, propagate_to_peers=False, fails_silently=False)

def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[_HeightInfo]]:
""" Returns the peer's block hashes in the given heights.
Expand Down
16 changes: 6 additions & 10 deletions hathor/p2p/sync_v2/streamers.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,22 +180,14 @@ def send_next(self) -> None:
if cur.hash == self.end_hash:
# only send the last when not reverse
if not self.reverse:
self.log.debug('send next block', blk_id=cur.hash.hex())
self.log.debug('send next block', height=cur.get_height(), blk_id=cur.hash.hex())
self.sync_agent.send_blocks(cur)
self.sync_agent.stop_blk_streaming_server(StreamEnd.END_HASH_REACHED)
return

if self.counter >= self.limit:
# only send the last when not reverse
if not self.reverse:
self.log.debug('send next block', blk_id=cur.hash.hex())
self.sync_agent.send_blocks(cur)
self.sync_agent.stop_blk_streaming_server(StreamEnd.LIMIT_EXCEEDED)
return

self.counter += 1

self.log.debug('send next block', blk_id=cur.hash.hex())
self.log.debug('send next block', height=cur.get_height(), blk_id=cur.hash.hex())
self.sync_agent.send_blocks(cur)

if self.reverse:
Expand All @@ -208,6 +200,10 @@ def send_next(self) -> None:
self.sync_agent.stop_blk_streaming_server(StreamEnd.NO_MORE_BLOCKS)
return

if self.counter >= self.limit:
self.sync_agent.stop_blk_streaming_server(StreamEnd.LIMIT_EXCEEDED)
return


class TransactionsStreamingServer(_StreamingServerBase):
"""Streams all transactions confirmed by the given block, from right to left (decreasing timestamp).
Expand Down
21 changes: 10 additions & 11 deletions hathor/p2p/sync_v2/transaction_streaming_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ def handle_transaction(self, tx: BaseTransaction) -> None:

assert tx.hash is not None
self.log.debug('tx received', tx_id=tx.hash.hex())

self._queue.append(tx)
assert len(self._queue) <= self._tx_max_quantity

if not self._is_processing:
self.reactor.callLater(0, self.process_queue)

Expand All @@ -135,6 +135,7 @@ def process_queue(self) -> Generator[Any, Any, None]:
self._is_processing = True
try:
tx = self._queue.popleft()
self.log.debug('processing tx', tx_id=tx.hash.hex())
yield self._process_transaction(tx)
finally:
self._is_processing = False
Expand All @@ -160,18 +161,18 @@ def _process_transaction(self, tx: BaseTransaction) -> Generator[Any, Any, None]
if tx.hash in self._db:
# This case might happen during a resume, so we just log and keep syncing.
self.log.debug('duplicated vertex received', tx_id=tx.hash.hex())
self.update_dependencies(tx)
elif tx.hash in self._existing_deps:
# This case might happen if we already have the transaction from another sync.
self.log.debug('existing vertex received', tx_id=tx.hash.hex())
self.update_dependencies(tx)
else:
self.log.info('unexpected vertex received', tx_id=tx.hash.hex())
self.fails(UnexpectedVertex(tx.hash.hex()))
return
self._waiting_for.remove(tx.hash)

for dep in self.get_missing_deps(tx):
self.log.debug('adding dependency', tx_id=tx.hash.hex(), dep=dep.hex())
self._waiting_for.add(dep)
self.update_dependencies(tx)

self._db[tx.hash] = tx

Expand All @@ -187,15 +188,13 @@ def _process_transaction(self, tx: BaseTransaction) -> Generator[Any, Any, None]
if self._tx_received % 100 == 0:
self.log.debug('tx streaming in progress', txs_received=self._tx_received)

def get_missing_deps(self, tx: BaseTransaction) -> Iterator[bytes]:
"""Return missing dependencies."""
def update_dependencies(self, tx: BaseTransaction) -> Iterator[bytes]:
"""Update _existing_deps and _waiting_for with the dependencies."""
for dep in tx.get_all_dependencies():
if self.tx_storage.transaction_exists(dep):
if self.tx_storage.transaction_exists(dep) or dep in self._db:
self._existing_deps.add(dep)
continue
if dep in self._db:
continue
yield dep
else:
self._waiting_for.add(dep)

def handle_transactions_end(self, response_code: StreamEnd) -> None:
"""This method is called by the sync agent when a TRANSACTIONS-END message is received."""
Expand Down

0 comments on commit 22b2093

Please sign in to comment.