Skip to content

Commit

Permalink
fix(sync-v2): Fix issues caused by reorg during sync
Browse files Browse the repository at this point in the history
  • Loading branch information
msbrogli committed Nov 10, 2023
1 parent 090bd71 commit c9dfc8d
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 16 deletions.
14 changes: 9 additions & 5 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,11 +494,12 @@ def start_blockchain_streaming(self,
start_block: _HeightInfo,
end_block: _HeightInfo) -> Deferred[StreamEnd]:
"""Request peer to start streaming blocks to us."""
self.log.info('requesting blocks streaming',
start_block=start_block,
end_block=end_block)
self._blk_streaming_client = BlockchainStreamingClient(self, start_block, end_block)
quantity = self._blk_streaming_client._blk_max_quantity
self.log.info('requesting blocks streaming',
start_block=start_block,
end_block=end_block,
quantity=quantity)
self.send_get_next_blocks(start_block.id, end_block.id, quantity)
return self._blk_streaming_client.wait()

Expand Down Expand Up @@ -579,11 +580,14 @@ def find_best_common_block(self,
@inlineCallbacks
def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> Generator[Any, Any, None]:
"""This method is called when a block and its transactions are downloaded."""
# Note: Any vertex and block could have already been added by another concurrent syncing peer.
for tx in vertex_list:
self.manager.on_new_tx(tx, propagate_to_peers=False, fails_silently=False)
if not self.tx_storage.transaction_exists(tx.hash):
self.manager.on_new_tx(tx, propagate_to_peers=False, fails_silently=False)
yield deferLater(self.reactor, 0, lambda: None)

self.manager.on_new_tx(blk, propagate_to_peers=False, fails_silently=False)
if not self.tx_storage.transaction_exists(blk.hash):
self.manager.on_new_tx(blk, propagate_to_peers=False, fails_silently=False)

def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[_HeightInfo]]:
""" Returns the peer's block hashes in the given heights.
Expand Down
16 changes: 6 additions & 10 deletions hathor/p2p/sync_v2/streamers.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,22 +180,14 @@ def send_next(self) -> None:
if cur.hash == self.end_hash:
# only send the last when not reverse
if not self.reverse:
self.log.debug('send next block', blk_id=cur.hash.hex())
self.log.debug('send next block', height=cur.get_height(), blk_id=cur.hash.hex())
self.sync_agent.send_blocks(cur)
self.sync_agent.stop_blk_streaming_server(StreamEnd.END_HASH_REACHED)
return

if self.counter >= self.limit:
# only send the last when not reverse
if not self.reverse:
self.log.debug('send next block', blk_id=cur.hash.hex())
self.sync_agent.send_blocks(cur)
self.sync_agent.stop_blk_streaming_server(StreamEnd.LIMIT_EXCEEDED)
return

self.counter += 1

self.log.debug('send next block', blk_id=cur.hash.hex())
self.log.debug('send next block', height=cur.get_height(), blk_id=cur.hash.hex())
self.sync_agent.send_blocks(cur)

if self.reverse:
Expand All @@ -208,6 +200,10 @@ def send_next(self) -> None:
self.sync_agent.stop_blk_streaming_server(StreamEnd.NO_MORE_BLOCKS)
return

if self.counter >= self.limit:
self.sync_agent.stop_blk_streaming_server(StreamEnd.LIMIT_EXCEEDED)
return


class TransactionsStreamingServer(_StreamingServerBase):
"""Streams all transactions confirmed by the given block, from right to left (decreasing timestamp).
Expand Down
3 changes: 2 additions & 1 deletion hathor/p2p/sync_v2/transaction_streaming_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ def handle_transaction(self, tx: BaseTransaction) -> None:

assert tx.hash is not None
self.log.debug('tx received', tx_id=tx.hash.hex())

self._queue.append(tx)
assert len(self._queue) <= self._tx_max_quantity

if not self._is_processing:
self.reactor.callLater(0, self.process_queue)

Expand All @@ -135,6 +135,7 @@ def process_queue(self) -> Generator[Any, Any, None]:
self._is_processing = True
try:
tx = self._queue.popleft()
self.log.debug('processing tx', tx_id=tx.hash.hex())
yield self._process_transaction(tx)
finally:
self._is_processing = False
Expand Down

0 comments on commit c9dfc8d

Please sign in to comment.