Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sync-v2): improve deferreds handling #732

Merged
merged 1 commit into from
Jul 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 29 additions & 38 deletions hathor/p2p/sync_v2/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None
self.peer_height = 0

# Latest deferred waiting for a reply.
self.deferred_by_key: dict[str, Deferred] = {}
self._deferred_txs: dict[VertexId, Deferred[BaseTransaction]] = {}
self._deferred_tips: Optional[Deferred[list[bytes]]] = None
self._deferred_best_block: Optional[Deferred[dict[str, Any]]] = None
self._deferred_peer_block_hashes: Optional[Deferred[list[tuple[int, bytes]]]] = None

# When syncing blocks we start streaming with all peers
# so the moment I get some repeated blocks, I stop the download
Expand Down Expand Up @@ -372,14 +375,12 @@ def run_sync_blocks(self) -> Generator[Any, Any, None]:
def get_tips(self) -> Deferred[list[bytes]]:
""" Async method to request the remote peer's tips.
"""
key = 'tips'
deferred = self.deferred_by_key.get(key, None)
if deferred is None:
deferred = self.deferred_by_key[key] = Deferred()
if self._deferred_tips is None:
self._deferred_tips = Deferred()
self.send_get_tips()
else:
assert self._receiving_tips is not None
return deferred
return self._deferred_tips

def send_get_tips(self) -> None:
""" Send a GET-TIPS message.
Expand All @@ -388,7 +389,7 @@ def send_get_tips(self) -> None:
self.send_message(ProtocolMessages.GET_TIPS)
self._receiving_tips = []

def handle_get_tips(self, payload: str) -> None:
def handle_get_tips(self, _payload: str) -> None:
jansegre marked this conversation as resolved.
Show resolved Hide resolved
""" Handle a GET-TIPS message.
"""
assert self.tx_storage.indexes is not None
Expand Down Expand Up @@ -420,12 +421,12 @@ def handle_tips(self, payload: str) -> None:
# filter-out txs we already have
self._receiving_tips.extend(tx_id for tx_id in data if not self.partial_vertex_exists(tx_id))

def handle_tips_end(self, payload: str) -> None:
def handle_tips_end(self, _payload: str) -> None:
""" Handle a TIPS-END message.
"""
assert self._receiving_tips is not None
key = 'tips'
deferred = self.deferred_by_key.pop(key, None)
deferred = self._deferred_tips
self._deferred_tips = None
if deferred is None:
self.protocol.send_error_and_close_connection('TIPS-END not expected')
return
Expand Down Expand Up @@ -556,13 +557,11 @@ def find_best_common_block(self, peer_best_height: int, peer_best_block: bytes)
def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[tuple[int, bytes]]]:
""" Returns the peer's block hashes in the given heights.
"""
key = 'peer-block-hashes'
if self.deferred_by_key.get(key, None) is not None:
if self._deferred_peer_block_hashes is not None:
raise Exception('latest_deferred is not None')
self.send_get_peer_block_hashes(heights)
deferred: Deferred[list[tuple[int, bytes]]] = Deferred()
self.deferred_by_key[key] = deferred
return deferred
self._deferred_peer_block_hashes = Deferred()
return self._deferred_peer_block_hashes

def send_get_peer_block_hashes(self, heights: list[int]) -> None:
""" Send a GET-PEER-BLOCK-HASHES message.
Expand Down Expand Up @@ -598,8 +597,8 @@ def handle_peer_block_hashes(self, payload: str) -> None:
"""
data = json.loads(payload)
data = [(h, bytes.fromhex(block_hash)) for (h, block_hash) in data]
key = 'peer-block-hashes'
deferred = self.deferred_by_key.pop(key, None)
deferred = self._deferred_peer_block_hashes
self._deferred_peer_block_hashes = None
if deferred:
deferred.callback(data)

Expand Down Expand Up @@ -796,15 +795,12 @@ def handle_stop_block_streaming(self, payload: str) -> None:
def get_peer_best_block(self) -> Deferred[dict[str, Any]]:
""" Async call to get the remote peer's best block.
"""
key = 'best-block'
deferred = self.deferred_by_key.pop(key, None)
if self.deferred_by_key.get(key, None) is not None:
if self._deferred_best_block is not None:
raise Exception('latest_deferred is not None')

self.send_get_best_block()
deferred = Deferred()
self.deferred_by_key[key] = deferred
return deferred
self._deferred_best_block = Deferred()
return self._deferred_best_block

def send_get_best_block(self) -> None:
""" Send a GET-BEST-BLOCK messsage.
Expand All @@ -827,8 +823,8 @@ def handle_best_block(self, payload: str) -> None:
self.log.debug('got best block', **data)
data['block'] = bytes.fromhex(data['block'])

key = 'best-block'
deferred = self.deferred_by_key.pop(key, None)
deferred = self._deferred_best_block
self._deferred_best_block = None
if deferred:
deferred.callback(data)

Expand Down Expand Up @@ -987,44 +983,39 @@ def get_tx(self, tx_id: bytes) -> Generator[Deferred, Any, BaseTransaction]:
tx = self.tx_storage.get_transaction(tx_id)
except TransactionDoesNotExist:
tx = yield self.get_data(tx_id, 'mempool')
if tx is None:
self.log.error('failed to get tx', tx_id=tx_id.hex())
self.protocol.send_error_and_close_connection(f'DATA mempool {tx_id.hex()} not found')
raise
assert tx is not None
if tx.hash != tx_id:
self.protocol.send_error_and_close_connection(f'DATA mempool {tx_id.hex()} hash mismatch')
raise
return tx

def get_data(self, tx_id: bytes, origin: str) -> Deferred:
def get_data(self, tx_id: bytes, origin: str) -> Deferred[BaseTransaction]:
""" Async method to request a tx by id.
"""
# TODO: deal with stale `get_data` calls
if origin != 'mempool':
raise ValueError(f'origin={origin} not supported, only origin=mempool is supported')
key = f'{origin}:{tx_id.hex()}'
deferred = self.deferred_by_key.get(key, None)
deferred = self._deferred_txs.get(tx_id, None)
if deferred is None:
deferred = self.deferred_by_key[key] = Deferred()
deferred = self._deferred_txs[tx_id] = Deferred()
self.send_get_data(tx_id, origin=origin)
self.log.debug('get_data of new tx_id', deferred=deferred, key=key)
self.log.debug('get_data of new tx_id', deferred=deferred, key=tx_id.hex())
else:
# XXX: can we re-use deferred objects like this?
self.log.debug('get_data of same tx_id, reusing deferred', deferred=deferred, key=key)
self.log.debug('get_data of same tx_id, reusing deferred', deferred=deferred, key=tx_id.hex())
return deferred

def _on_get_data(self, tx: BaseTransaction, origin: str) -> None:
""" Called when a requested tx is received.
"""
assert tx.hash is not None
key = f'{origin}:{tx.hash_hex}'
deferred = self.deferred_by_key.pop(key, None)
deferred = self._deferred_txs.pop(tx.hash, None)
if deferred is None:
# Peer sent the wrong transaction?!
# XXX: ban peer?
self.protocol.send_error_and_close_connection(f'DATA {origin}: with tx that was not requested')
return
self.log.debug('get_data fulfilled', deferred=deferred, key=key)
self.log.debug('get_data fulfilled', deferred=deferred, key=tx.hash.hex())
self._get_tx_cache[tx.hash] = tx
if len(self._get_tx_cache) > self._get_tx_cache_maxsize:
self._get_tx_cache.popitem(last=False)
Expand Down