Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
Correctly handle disconnections from maxed out peers
Browse files Browse the repository at this point in the history
A maxed out peer will disconnect during the P2P handshake by
sending us a disconnect msg with the TOO_MANY_PEERS reason.  We must
detect that and wait a while before attempting to connect again.

Closes #1167
  • Loading branch information
gsalgado committed Nov 21, 2019
1 parent fae0be8 commit 42c97f2
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 9 deletions.
1 change: 1 addition & 0 deletions p2p/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ class TransportAPI(ABC):
session: SessionAPI
remote: NodeAPI
read_state: TransportState
logger: ExtendedDebugLogger

@property
@abstractmethod
Expand Down
3 changes: 3 additions & 0 deletions p2p/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ class BaseCommand(CommandAPI[TCommandPayload]):
def __init__(self, payload: TCommandPayload) -> None:
self.payload = payload

def __repr__(self) -> str:
return f"{self.__class__}(payload={self.payload})"

def encode(self, cmd_id: int, snappy_support: bool) -> MessageAPI:
raw_payload_data = self.serialization_codec.encode(self.payload)

Expand Down
4 changes: 2 additions & 2 deletions p2p/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ class HandshakeFailure(BaseP2PError):
pass


class TooManyPeersFailure(HandshakeFailure):
class HandshakeFailureTooManyPeers(HandshakeFailure):
"""
The remote disconnected from us because it has too many peers
The remote disconnected from us during a handshake because it has too many peers
"""
pass

Expand Down
9 changes: 9 additions & 0 deletions p2p/handshake.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
)
from p2p.connection import Connection
from p2p.constants import DEVP2P_V5
from p2p.disconnect import DisconnectReason
from p2p.exceptions import (
HandshakeFailure,
HandshakeFailureTooManyPeers,
NoMatchingPeerCapabilities,
)
from p2p.multiplexer import (
Expand All @@ -40,6 +42,7 @@
)
from p2p.p2p_proto import (
DevP2PReceipt,
Disconnect,
Hello,
HelloPayload,
BaseP2PProtocol,
Expand Down Expand Up @@ -127,6 +130,10 @@ async def _do_p2p_handshake(transport: TransportAPI,
# The base `p2p` protocol handshake directly streams the messages as it has
# strict requirements about receiving the `Hello` message first.
async for _, cmd in stream_transport_messages(transport, base_protocol, token=token):
if isinstance(cmd, Disconnect):
if cmd.payload == DisconnectReason.TOO_MANY_PEERS:
raise HandshakeFailureTooManyPeers(f"Peer disconnected because it is already full")

if not isinstance(cmd, Hello):
raise HandshakeFailure(
f"First message across the DevP2P connection must be a Hello "
Expand Down Expand Up @@ -304,6 +311,7 @@ async def dial_out(remote: NodeAPI,
token,
)

transport.logger.debug2("Initiating p2p handshake with %s", remote)
try:
multiplexer, devp2p_receipt, protocol_receipts = await negotiate_protocol_handshakes(
transport=transport,
Expand All @@ -320,6 +328,7 @@ async def dial_out(remote: NodeAPI,
await asyncio.sleep(0)
raise

transport.logger.debug2("Completed p2p handshake with %s", remote)
connection = Connection(
multiplexer=multiplexer,
devp2p_receipt=devp2p_receipt,
Expand Down
4 changes: 2 additions & 2 deletions p2p/tracking/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from p2p.exceptions import (
BaseP2PError,
HandshakeFailure,
TooManyPeersFailure,
HandshakeFailureTooManyPeers,
)


Expand All @@ -23,7 +23,7 @@ def register_error(exception: Type[BaseP2PError], timeout_seconds: int) -> None:


register_error(HandshakeFailure, 10) # 10 seconds
register_error(TooManyPeersFailure, 60) # one minute
register_error(HandshakeFailureTooManyPeers, 60) # one minute


def get_timeout_for_failure(failure: BaseP2PError) -> int:
Expand Down
6 changes: 4 additions & 2 deletions p2p/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def connect(cls,
remote: NodeAPI,
private_key: datatypes.PrivateKey,
token: CancelToken) -> TransportAPI:
"""Perform the auth and P2P handshakes with the given remote.
"""Perform the auth handshake with the given remote.
Return an instance of the given peer_class (must be a subclass of
BasePeer) connected to that remote in case both handshakes are
Expand All @@ -122,6 +122,7 @@ async def connect(cls,
handshake or if none of the sub-protocols supported by us is also
supported by the remote.
"""
cls.logger.debug2("Initiating auth handshake with %s", remote)
try:
(aes_secret,
mac_secret,
Expand All @@ -133,6 +134,7 @@ async def connect(cls,
except (ConnectionRefusedError, OSError) as e:
raise UnreachablePeer(f"Can't reach {remote!r}") from e

cls.logger.debug2("Completed auth handshake with %s", remote)
return cls(
remote=remote,
private_key=private_key,
Expand Down Expand Up @@ -203,7 +205,7 @@ async def receive_connection(cls,
ip, socket, *_ = peername
remote_address = Address(ip, socket)

cls.logger.debug("Receiving handshake from %s", remote_address)
cls.logger.debug("Receiving auth handshake from %s", remote_address)

initiator_remote = Node(initiator_pubkey, remote_address)

Expand Down
12 changes: 9 additions & 3 deletions scripts/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
)

from eth_typing import BlockNumber
from eth_utils import DEBUG2_LEVEL_NUM

from eth.chains.mainnet import MainnetChain, MAINNET_GENESIS_HEADER, MAINNET_VM_CONFIGURATION
from eth.chains.ropsten import RopstenChain, ROPSTEN_GENESIS_HEADER, ROPSTEN_VM_CONFIGURATION
from eth.db.atomic import AtomicDB
from eth.db.backends.memory import MemoryDB
from eth_utils import DEBUG2_LEVEL_NUM

from p2p import ecies
from p2p.constants import DEVP2P_V5
Expand All @@ -35,14 +36,19 @@


def _main() -> None:
logging.basicConfig(level=DEBUG2_LEVEL_NUM, format='%(asctime)s %(levelname)s: %(message)s')

parser = argparse.ArgumentParser()
parser.add_argument('-enode', type=str, help="The enode we should connect to", required=True)
parser.add_argument('-mainnet', action='store_true')
parser.add_argument('-light', action='store_true', help="Connect as a light node")
parser.add_argument('-debug', action="store_true")
args = parser.parse_args()

log_level = logging.INFO
if args.debug:
log_level = DEBUG2_LEVEL_NUM
logging.basicConfig(
level=log_level, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%H:%M:%S')

peer_class: Union[Type[ETHPeer], Type[LESPeer]]
pool_class: Union[Type[ETHPeerPool], Type[LESPeerPool]]

Expand Down

0 comments on commit 42c97f2

Please sign in to comment.