Skip to content

Commit

Permalink
feat(sync-v2): Wait for sync internal methods to finish before initia…
Browse files Browse the repository at this point in the history
…ting next syncing cycle
  • Loading branch information
msbrogli committed Nov 3, 2023
1 parent 98121c4 commit 38158b6
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 25 deletions.
63 changes: 40 additions & 23 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
self._deferred_best_block: Optional[Deferred[_HeightInfo]] = None
self._deferred_peer_block_hashes: Optional[Deferred[list[_HeightInfo]]] = None

# Deferreds used when we are receiving a streaming of vertices.
self._deferred_blockchain_streaming: Optional[Deferred[None]] = None
self._deferred_transactions_streaming: Optional[Deferred[None]] = None

# When syncing blocks we start streaming with all peers
# so the moment I get some repeated blocks, I stop the download
# because it's probably a streaming that I've just received
Expand Down Expand Up @@ -289,16 +293,8 @@ def run_sync(self) -> Generator[Any, Any, None]:
def _run_sync(self) -> Generator[Any, Any, None]:
""" Actual implementation of the sync step logic in run_sync.
"""
if self.receiving_stream:
# If we're receiving a stream, wait for it to finish before running sync.
# If we're sending a stream, do the sync to update the peer's synced block
self.log.debug('receiving stream, try again later')
return

if self.mempool_manager.is_running():
# It's running a mempool sync, so we wait until it finishes
self.log.debug('running mempool sync, try again later')
return
assert not self.receiving_stream
assert not self.mempool_manager.is_running()

assert self.protocol.connections is not None
assert self.tx_storage.indexes is not None
Expand All @@ -308,16 +304,17 @@ def _run_sync(self) -> Generator[Any, Any, None]:
self.log.debug('needed tx exist, sync transactions')
self.update_synced(False)
# TODO: find out whether we can sync transactions from this peer to speed things up
self.run_sync_transactions()
yield self.run_sync_transactions()
return

is_block_synced = yield self.run_sync_blocks()
if is_block_synced:
# our blocks are synced, so sync the mempool
self.state = PeerState.SYNCING_MEMPOOL
self.mempool_manager.run()
yield self.mempool_manager.run()

def run_sync_transactions(self) -> None:
@inlineCallbacks
def run_sync_transactions(self) -> Generator[Any, Any, None]:
""" Run a step of the transaction syncing phase.
"""
self.state = PeerState.SYNCING_TRANSACTIONS
Expand All @@ -344,7 +341,7 @@ def run_sync_transactions(self) -> None:

self.log.info('run sync transactions', start=[i.hex() for i in needed_txs], end_block_hash=block.hash.hex(),
end_block_height=block_height)
self.send_get_transactions_bfs(needed_txs, block.hash)
yield self.start_transactions_streaming(needed_txs, block.hash)

def get_my_best_block(self) -> _HeightInfo:
"""Return my best block info."""
Expand Down Expand Up @@ -410,10 +407,11 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]:
synced_block=self.synced_block)

# Sync from common block
self.run_block_sync(self.synced_block.id,
self.synced_block.height,
self.peer_best_block.id,
self.peer_best_block.height)
yield self.start_blockchain_streaming(self.synced_block.id,
self.synced_block.height,
self.peer_best_block.id,
self.peer_best_block.height)

return False

def get_tips(self) -> Deferred[list[bytes]]:
Expand Down Expand Up @@ -513,16 +511,20 @@ def _setup_block_streaming(self, start_hash: bytes, start_height: int, end_hash:
self._blk_stream_reverse = reverse
self._last_received_block = None

def run_block_sync(self, start_hash: bytes, start_height: int, end_hash: bytes, end_height: int) -> None:
""" Called when the bestblock is after all checkpoints.
It must syncs to the left until it reaches the remote's best block or the max stream limit.
"""
def start_blockchain_streaming(self,
start_hash: bytes,
start_height: int,
end_hash: bytes,
end_height: int) -> Deferred[None]:
"""Request peer to start streaming blocks to us."""
assert self._deferred_blockchain_streaming is None
self._setup_block_streaming(start_hash, start_height, end_hash, end_height, False)
quantity = end_height - start_height
self.log.info('get next blocks', start_height=start_height, end_height=end_height, quantity=quantity,
start_hash=start_hash.hex(), end_hash=end_hash.hex())
self.send_get_next_blocks(start_hash, end_hash, quantity)
self._deferred_blockchain_streaming = Deferred()
return self._deferred_blockchain_streaming

def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) -> None:
""" Helper to send a message.
Expand Down Expand Up @@ -728,6 +730,10 @@ def handle_blocks_end(self, payload: str) -> None:
self.protocol.send_error_and_close_connection('Not expecting to receive BLOCKS-END message')
return

assert self._deferred_blockchain_streaming is not None
self._deferred_blockchain_streaming.callback(None)
self._deferred_blockchain_streaming = None

self.log.debug('block streaming ended', reason=str(response_code))

def handle_blocks(self, payload: str) -> None:
Expand Down Expand Up @@ -879,6 +885,13 @@ def _setup_tx_streaming(self):
self._tx_max_quantity = DEFAULT_STREAMING_LIMIT # XXX: maybe this is redundant
# XXX: what else can we add for checking if everything is going well?

def start_transactions_streaming(self, start_from: list[bytes], until_first_block: bytes) -> Deferred[None]:
"""Request peer to start streaming transactions to us."""
assert self._deferred_transactions_streaming is None
self.send_get_transactions_bfs(start_from, until_first_block)
self._deferred_transactions_streaming = Deferred()
return self._deferred_transactions_streaming

def send_get_transactions_bfs(self, start_from: list[bytes], until_first_block: bytes) -> None:
""" Send a GET-TRANSACTIONS-BFS message.
Expand Down Expand Up @@ -971,6 +984,10 @@ def handle_transactions_end(self, payload: str) -> None:
self.protocol.send_error_and_close_connection('Not expecting to receive TRANSACTIONS-END message')
return

assert self._deferred_transactions_streaming is not None
self._deferred_transactions_streaming.callback(None)
self._deferred_transactions_streaming = None

self.log.debug('transaction streaming ended', reason=str(response_code))

def handle_transaction(self, payload: str) -> None:
Expand Down
14 changes: 12 additions & 2 deletions hathor/p2p/sync_v2/mempool.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def __init__(self, sync_agent: 'NodeBlockSync'):
self.tx_storage = self.manager.tx_storage
self.reactor = self.sync_agent.reactor

self.deferred: Optional[Deferred[None]] = None

# Set of tips we know but couldn't add to the DAG yet.
self.missing_tips: set[bytes] = set()

Expand All @@ -52,21 +54,29 @@ def is_running(self) -> bool:
"""Whether the sync-mempool is currently running."""
return self._is_running

def run(self) -> None:
def run(self) -> Deferred[None]:
"""Starts _run in, won't start again if already running."""
if self.is_running():
self.log.warn('already started')
return
assert self.deferred is not None
return self.deferred
self._is_running = True
self.reactor.callLater(0, self._run)

assert self.deferred is None
self.deferred = Deferred()
return self.deferred

@inlineCallbacks
def _run(self) -> Generator[Deferred, Any, None]:
try:
yield self._unsafe_run()
finally:
# sync_agent.run_sync will start it again when needed
self._is_running = False
assert self.deferred is not None
self.deferred.callback(None)
self.deferred = None

@inlineCallbacks
def _unsafe_run(self) -> Generator[Deferred, Any, None]:
Expand Down

0 comments on commit 38158b6

Please sign in to comment.