From ded41f382a673464a6f37bd0b2218da279eb2d2e Mon Sep 17 00:00:00 2001 From: Guilherme Salgado Date: Wed, 4 Jul 2018 15:46:12 +0100 Subject: [PATCH 1/2] FastChainSyncer now handles data requests from peers It used to handle only GetBlockHeader requests, and now handles the rest as well --- p2p/chain.py | 64 ++++++++++++++----------- p2p/state.py | 12 +++-- trinity/plugins/builtin/tx_pool/pool.py | 4 +- 3 files changed, 46 insertions(+), 34 deletions(-) diff --git a/p2p/chain.py b/p2p/chain.py index 6b5f4176d6..b0a357c75a 100644 --- a/p2p/chain.py +++ b/p2p/chain.py @@ -289,6 +289,7 @@ async def _handle_get_block_headers(self, peer: LESPeer, msg: Dict[str, Any]) -> query = msg['query'] headers = await self._handler.lookup_headers( query.block_number_or_hash, query.max_headers, query.skip, query.reverse) + self.logger.trace("Replying to %s with %d headers", peer, len(headers)) peer.sub_proto.send_block_headers(headers, buffer_value=0, request_id=msg['request_id']) async def _process_headers( @@ -490,8 +491,27 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command, await self._handle_new_block(peer, cast(Dict[str, Any], msg)) elif isinstance(cmd, eth.GetBlockHeaders): await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg)) + elif isinstance(cmd, eth.GetBlockBodies): + # Only serve up to eth.MAX_BODIES_FETCH items in every request. + block_hashes = cast(List[Hash32], msg)[:eth.MAX_BODIES_FETCH] + await self._handler.handle_get_block_bodies(peer, block_hashes) + elif isinstance(cmd, eth.GetReceipts): + # Only serve up to eth.MAX_RECEIPTS_FETCH items in every request. + block_hashes = cast(List[Hash32], msg)[:eth.MAX_RECEIPTS_FETCH] + await self._handler.handle_get_receipts(peer, block_hashes) + elif isinstance(cmd, eth.GetNodeData): + # Only serve up to eth.MAX_STATE_FETCH items in every request. + node_hashes = cast(List[Hash32], msg)[:eth.MAX_STATE_FETCH] + await self._handler.handle_get_node_data(peer, node_hashes) + elif isinstance(cmd, eth.Transactions): + # Transactions msgs are handled by our TxPool service. + pass + elif isinstance(cmd, eth.NodeData): + # When doing a chain sync we never send GetNodeData requests, so peers should not send + # us NodeData msgs. + self.logger.warn("Unexpected NodeData msg from %s", peer) else: - self.logger.debug("Ignoring %s message from %s", cmd, peer) + self.logger.debug("%s msg not handled yet, need to be implemented", cmd) async def _handle_new_block(self, peer: ETHPeer, msg: Dict[str, Any]) -> None: self._sync_requests.put_nowait(peer) @@ -515,7 +535,7 @@ async def _handle_block_receipts(self, async def _handle_block_bodies(self, peer: ETHPeer, - bodies: List[eth.BlockBody]) -> None: + bodies: List[BlockBody]) -> None: self.logger.debug("Got Bodies for %d blocks from %s", len(bodies), peer) loop = asyncio.get_event_loop() iterator = map(make_trie_root_and_nodes, [body.transactions for body in bodies]) @@ -542,6 +562,7 @@ async def _handle_get_block_headers( headers = await self._handler.lookup_headers( header_request['block_number_or_hash'], header_request['max_headers'], header_request['skip'], header_request['reverse']) + self.logger.trace("Replying to %s with %d headers", peer, len(headers)) peer.sub_proto.send_block_headers(headers) @@ -553,31 +574,10 @@ class RegularChainSyncer(FastChainSyncer): """ _exit_on_sync_complete = False - async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command, - msg: protocol._DecodedMsgType) -> None: - peer = cast(ETHPeer, peer) - if isinstance(cmd, eth.BlockHeaders): - self._handle_block_headers(tuple(cast(Tuple[BlockHeader, ...], msg))) - elif isinstance(cmd, eth.BlockBodies): - await self._handle_block_bodies(peer, list(cast(Tuple[eth.BlockBody], msg))) - elif isinstance(cmd, eth.NewBlock): - await self._handle_new_block(peer, cast(Dict[str, Any], msg)) - elif isinstance(cmd, eth.GetBlockHeaders): - await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg)) - elif isinstance(cmd, eth.GetBlockBodies): - # Only serve up to eth.MAX_BODIES_FETCH items in every request. - block_hashes = cast(List[Hash32], msg)[:eth.MAX_BODIES_FETCH] - await self._handler.handle_get_block_bodies(peer, cast(List[Hash32], msg)) - elif isinstance(cmd, eth.GetReceipts): - # Only serve up to eth.MAX_RECEIPTS_FETCH items in every request. - block_hashes = cast(List[Hash32], msg)[:eth.MAX_RECEIPTS_FETCH] - await self._handler.handle_get_receipts(peer, block_hashes) - elif isinstance(cmd, eth.GetNodeData): - # Only serve up to eth.MAX_STATE_FETCH items in every request. - node_hashes = cast(List[Hash32], msg)[:eth.MAX_STATE_FETCH] - await self._handler.handle_get_node_data(peer, node_hashes) - else: - self.logger.debug("%s msg not handled yet, need to be implemented", cmd) + async def _handle_block_receipts( + self, peer: ETHPeer, receipts_by_block: List[List[eth.Receipt]]) -> None: + # When doing a regular sync we never request receipts. + self.logger.warn("Unexpected BlockReceipts msg from %s", peer) async def _process_headers( self, peer: HeaderRequestingPeer, headers: Tuple[BlockHeader, ...]) -> int: @@ -599,7 +599,7 @@ async def _process_headers( transactions: List[BaseTransaction] = [] uncles: List[BlockHeader] = [] else: - body = cast(eth.BlockBody, downloaded_parts[_body_key(header)]) + body = cast(BlockBody, downloaded_parts[_body_key(header)]) tx_class = block_class.get_transaction_class() transactions = [tx_class.from_base_transaction(tx) for tx in body.transactions] @@ -624,6 +624,7 @@ def __init__(self, db: 'AsyncHeaderDB', logger: TraceLogger, token: CancelToken) self.cancel_token = token async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: List[Hash32]) -> None: + self.logger.trace("%s requested bodies for %d blocks", peer, len(block_hashes)) chaindb = cast('AsyncChainDB', self.db) bodies = [] for block_hash in block_hashes: @@ -636,9 +637,11 @@ async def handle_get_block_bodies(self, peer: ETHPeer, block_hashes: List[Hash32 chaindb.coro_get_block_transactions(header, BaseTransactionFields)) uncles = await self.wait(chaindb.coro_get_block_uncles(header.uncles_hash)) bodies.append(BlockBody(transactions, uncles)) + self.logger.trace("Replying to %s with %d block bodies", peer, len(bodies)) peer.sub_proto.send_block_bodies(bodies) async def handle_get_receipts(self, peer: ETHPeer, block_hashes: List[Hash32]) -> None: + self.logger.trace("%s requested receipts for %d blocks", peer, len(block_hashes)) chaindb = cast('AsyncChainDB', self.db) receipts = [] for block_hash in block_hashes: @@ -650,9 +653,11 @@ async def handle_get_receipts(self, peer: ETHPeer, block_hashes: List[Hash32]) - continue block_receipts = await self.wait(chaindb.coro_get_receipts(header, Receipt)) receipts.append(block_receipts) + self.logger.trace("Replying to %s with receipts for %d blocks", peer, len(receipts)) peer.sub_proto.send_receipts(receipts) async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) -> None: + self.logger.trace("%s requested %d trie nodes", peer, len(node_hashes)) chaindb = cast('AsyncChainDB', self.db) nodes = [] for node_hash in node_hashes: @@ -662,6 +667,7 @@ async def handle_get_node_data(self, peer: ETHPeer, node_hashes: List[Hash32]) - self.logger.debug("%s asked for a trie node we don't have: %s", peer, node_hash) continue nodes.append(node) + self.logger.trace("Replying to %s with %d trie nodes", peer, len(nodes)) peer.sub_proto.send_node_data(nodes) async def lookup_headers(self, block_number_or_hash: Union[int, bytes], max_headers: int, @@ -731,7 +737,7 @@ async def _generate_available_headers( class DownloadedBlockPart(NamedTuple): - part: Union[eth.BlockBody, List[Receipt]] + part: Union[BlockBody, List[Receipt]] unique_key: Union[bytes, Tuple[bytes, bytes]] diff --git a/p2p/state.py b/p2p/state.py index 7ccf435616..978936d06c 100644 --- a/p2p/state.py +++ b/p2p/state.py @@ -129,11 +129,17 @@ async def _handle_msg( elif isinstance(cmd, eth.GetBlockHeaders): await self._handle_get_block_headers(peer, cast(Dict[str, Any], msg)) elif isinstance(cmd, eth.GetBlockBodies): - await self._handler.handle_get_block_bodies(peer, cast(List[Hash32], msg)) + # Only serve up to eth.MAX_BODIES_FETCH items in every request. + block_hashes = cast(List[Hash32], msg)[:eth.MAX_BODIES_FETCH] + await self._handler.handle_get_block_bodies(peer, block_hashes) elif isinstance(cmd, eth.GetReceipts): - await self._handler.handle_get_receipts(peer, cast(List[Hash32], msg)) + # Only serve up to eth.MAX_RECEIPTS_FETCH items in every request. + block_hashes = cast(List[Hash32], msg)[:eth.MAX_RECEIPTS_FETCH] + await self._handler.handle_get_receipts(peer, block_hashes) elif isinstance(cmd, eth.GetNodeData): - await self._handler.handle_get_node_data(peer, cast(List[Hash32], msg)) + # Only serve up to eth.MAX_STATE_FETCH items in every request. + node_hashes = cast(List[Hash32], msg)[:eth.MAX_STATE_FETCH] + await self._handler.handle_get_node_data(peer, node_hashes) else: self.logger.warn("%s not handled during StateSync, must be implemented", cmd) diff --git a/trinity/plugins/builtin/tx_pool/pool.py b/trinity/plugins/builtin/tx_pool/pool.py index 448a1d6222..bed90371c4 100644 --- a/trinity/plugins/builtin/tx_pool/pool.py +++ b/trinity/plugins/builtin/tx_pool/pool.py @@ -73,7 +73,7 @@ async def _run(self) -> None: async def _handle_tx(self, peer: ETHPeer, txs: List[BaseTransactionFields]) -> None: - self.logger.debug('Received transactions from %r: %r', peer, txs) + self.logger.trace('Received transactions from %r: %r', peer, txs) self._add_txs_to_bloom(peer, txs) @@ -87,7 +87,7 @@ async def _handle_tx(self, peer: ETHPeer, txs: List[BaseTransactionFields]) -> N if len(filtered_tx) == 0: continue - self.logger.debug( + self.logger.trace( 'Sending transactions to %r: %r', receiving_peer, filtered_tx From 9183298b759b0f04cd149149590735b9733cc15f Mon Sep 17 00:00:00 2001 From: Guilherme Salgado Date: Tue, 10 Jul 2018 13:56:15 +0100 Subject: [PATCH 2/2] Peer.disconnect() now awaits for cancel() Also disconnect from remotes if we get unexpected NodeData or Receipts msgs during a sync --- p2p/chain.py | 9 ++++++--- p2p/peer.py | 33 +++++++++++++++++---------------- p2p/sharding_peer.py | 2 +- 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/p2p/chain.py b/p2p/chain.py index b0a357c75a..5e2f3fdab8 100644 --- a/p2p/chain.py +++ b/p2p/chain.py @@ -40,6 +40,7 @@ from p2p.cancel_token import CancellableMixin, CancelToken from p2p.constants import MAX_REORG_DEPTH from p2p.exceptions import NoEligiblePeers, OperationCancelled +from p2p.p2p_proto import DisconnectReason from p2p.peer import BasePeer, ETHPeer, LESPeer, PeerPool, PeerPoolSubscriber from p2p.rlp import BlockBody from p2p.service import BaseService @@ -187,7 +188,7 @@ async def _sync(self, peer: HeaderRequestingPeer) -> None: headers = await self._fetch_missing_headers(peer, start_at) except TimeoutError: self.logger.warn("Timeout waiting for header batch from %s, aborting sync", peer) - await peer.cancel() + await peer.disconnect(DisconnectReason.timeout) break if not headers: @@ -509,7 +510,8 @@ async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command, elif isinstance(cmd, eth.NodeData): # When doing a chain sync we never send GetNodeData requests, so peers should not send # us NodeData msgs. - self.logger.warn("Unexpected NodeData msg from %s", peer) + self.logger.warn("Unexpected NodeData msg from %s, disconnecting", peer) + await peer.disconnect(DisconnectReason.bad_protocol) else: self.logger.debug("%s msg not handled yet, need to be implemented", cmd) @@ -577,7 +579,8 @@ class RegularChainSyncer(FastChainSyncer): async def _handle_block_receipts( self, peer: ETHPeer, receipts_by_block: List[List[eth.Receipt]]) -> None: # When doing a regular sync we never request receipts. - self.logger.warn("Unexpected BlockReceipts msg from %s", peer) + self.logger.warn("Unexpected BlockReceipts msg from %s, disconnecting", peer) + await peer.disconnect(DisconnectReason.bad_protocol) async def _process_headers( self, peer: HeaderRequestingPeer, headers: Tuple[BlockHeader, ...]) -> int: diff --git a/p2p/peer.py b/p2p/peer.py index 13783f1452..c352ff9e7d 100644 --- a/p2p/peer.py +++ b/p2p/peer.py @@ -276,7 +276,7 @@ async def do_p2p_handshake(self) -> None: # Peers sometimes send a disconnect msg before they send the initial P2P handshake. raise HandshakeFailure("{} disconnected before completing handshake: {}".format( self, msg['reason_name'])) - self.process_p2p_handshake(cmd, msg) + await self.process_p2p_handshake(cmd, msg) @property async def genesis(self) -> BlockHeader: @@ -393,16 +393,17 @@ def process_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> N else: self.handle_sub_proto_msg(cmd, msg) - def process_p2p_handshake(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None: + async def process_p2p_handshake( + self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None: msg = cast(Dict[str, Any], msg) if not isinstance(cmd, Hello): - self.disconnect(DisconnectReason.bad_protocol) + await self.disconnect(DisconnectReason.bad_protocol) raise HandshakeFailure("Expected a Hello msg, got {}, disconnecting".format(cmd)) remote_capabilities = msg['capabilities'] try: self.sub_proto = self.select_sub_protocol(remote_capabilities) except NoMatchingPeerCapabilities: - self.disconnect(DisconnectReason.useless_peer) + await self.disconnect(DisconnectReason.useless_peer) raise HandshakeFailure( "No matching capabilities between us ({}) and {} ({}), disconnecting".format( self.capabilities, self.remote, remote_capabilities)) @@ -474,9 +475,11 @@ def send(self, header: bytes, body: bytes) -> None: self.logger.trace("Sending msg with cmd_id: %s", cmd_id) self.writer.write(self.encrypt(header, body)) - def disconnect(self, reason: DisconnectReason) -> None: + async def disconnect(self, reason: DisconnectReason) -> None: """Send a disconnect msg to the remote node and stop this Peer. + Also awaits for self.cancel() to ensure any pending tasks are cleaned up. + :param reason: An item from the DisconnectReason enum. """ if not isinstance(reason, DisconnectReason): @@ -485,6 +488,8 @@ def disconnect(self, reason: DisconnectReason) -> None: self.logger.debug("Disconnecting from remote peer; reason: %s", reason.name) self.base_protocol.send_disconnect(reason.value) self.close() + if self.is_running: + await self.cancel() def select_sub_protocol(self, remote_capabilities: List[Tuple[bytes, int]] ) -> protocol.Protocol: @@ -537,18 +542,18 @@ async def send_sub_proto_handshake(self) -> None: async def process_sub_proto_handshake( self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None: if not isinstance(cmd, (les.Status, les.StatusV2)): - self.disconnect(DisconnectReason.subprotocol_error) + await self.disconnect(DisconnectReason.subprotocol_error) raise HandshakeFailure( "Expected a LES Status msg, got {}, disconnecting".format(cmd)) msg = cast(Dict[str, Any], msg) if msg['networkId'] != self.network_id: - self.disconnect(DisconnectReason.useless_peer) + await self.disconnect(DisconnectReason.useless_peer) raise HandshakeFailure( "{} network ({}) does not match ours ({}), disconnecting".format( self, msg['networkId'], self.network_id)) genesis = await self.genesis if msg['genesisHash'] != genesis.hash: - self.disconnect(DisconnectReason.useless_peer) + await self.disconnect(DisconnectReason.useless_peer) raise HandshakeFailure( "{} genesis ({}) does not match ours ({}), disconnecting".format( self, encode_hex(msg['genesisHash']), genesis.hex_hash)) @@ -628,18 +633,18 @@ async def send_sub_proto_handshake(self) -> None: async def process_sub_proto_handshake( self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None: if not isinstance(cmd, eth.Status): - self.disconnect(DisconnectReason.subprotocol_error) + await self.disconnect(DisconnectReason.subprotocol_error) raise HandshakeFailure( "Expected a ETH Status msg, got {}, disconnecting".format(cmd)) msg = cast(Dict[str, Any], msg) if msg['network_id'] != self.network_id: - self.disconnect(DisconnectReason.useless_peer) + await self.disconnect(DisconnectReason.useless_peer) raise HandshakeFailure( "{} network ({}) does not match ours ({}), disconnecting".format( self, msg['network_id'], self.network_id)) genesis = await self.genesis if msg['genesis_hash'] != genesis.hash: - self.disconnect(DisconnectReason.useless_peer) + await self.disconnect(DisconnectReason.useless_peer) raise HandshakeFailure( "{} genesis ({}) does not match ours ({}), disconnecting".format( self, encode_hex(msg['genesis_hash']), genesis.hex_hash)) @@ -770,12 +775,8 @@ async def _run(self) -> None: async def stop_all_peers(self) -> None: self.logger.info("Stopping all peers ...") - peers = self.connected_nodes.values() - for peer in peers: - peer.disconnect(DisconnectReason.client_quitting) - - await asyncio.gather(*[peer.cancel() for peer in peers]) + await asyncio.gather(*[peer.disconnect(DisconnectReason.client_quitting) for peer in peers]) async def _cleanup(self) -> None: await self.stop_all_peers() diff --git a/p2p/sharding_peer.py b/p2p/sharding_peer.py index cfe8b15782..1f4aa5939f 100644 --- a/p2p/sharding_peer.py +++ b/p2p/sharding_peer.py @@ -67,7 +67,7 @@ async def process_sub_proto_handshake(self, cmd: Command, msg: protocol._DecodedMsgType) -> None: if not isinstance(cmd, Status): - self.disconnect(DisconnectReason.subprotocol_error) + await self.disconnect(DisconnectReason.subprotocol_error) raise HandshakeFailure("Expected status msg, got {}, disconnecting".format(cmd)) async def _get_headers_at_chain_split(