Skip to content

Commit

Permalink
Merge pull request #1137 from pipermerriam/piper/filters-for-peer-poo…
Browse files Browse the repository at this point in the history
…l-subscribers

PeerSubscribers now specify what messages they are interested in
  • Loading branch information
pipermerriam authored Aug 3, 2018
2 parents 888f105 + 0fb8a71 commit bf1d3c2
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 47 deletions.
54 changes: 49 additions & 5 deletions p2p/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import collections
import contextlib
import datetime
import functools
import logging
import operator
import random
Expand All @@ -20,6 +21,7 @@
Dict,
Iterator,
List,
Set,
TYPE_CHECKING,
Tuple,
Type,
Expand Down Expand Up @@ -381,14 +383,23 @@ def handle_p2p_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -
raise UnexpectedMessage("Unexpected msg: {} ({})".format(cmd, msg))

def handle_sub_proto_msg(self, cmd: protocol.Command, msg: protocol._DecodedMsgType) -> None:
cmd_type = type(cmd)

if self._subscribers:
for subscriber in self._subscribers:
was_added = tuple(
subscriber.add_msg((self, cmd, msg))
for subscriber
in self._subscribers
)
if not any(was_added):
self.logger.warn(
"Peer %s has no subscribers for msg type %s",
self,
cmd_type.__name__,
)
else:
self.logger.warn("Peer %s has no subscribers, discarding %s msg", self, cmd)

cmd_type = type(cmd)

if cmd_type in self.pending_requests:
request, future = self.pending_requests[cmd_type]
try:
Expand Down Expand Up @@ -547,10 +558,32 @@ def __hash__(self) -> int:
class PeerSubscriber(ABC):
_msg_queue: 'asyncio.Queue[PEER_MSG_TYPE]' = None

@property
@abstractmethod
def subscription_msg_types(self) -> Set[Type[protocol.Command]]:
"""
The `p2p.protocol.Command` types that this class subscribes to. Any
command which is not in this set will not be passed to this subscriber.
The base command class `p2p.protocol.Command` can be used to enable
**all** command types.
.. note: This API only applies to sub-protocol commands. Base protocol
commands are handled exclusively at the peer level and cannot be
consumed with this API.
"""
pass

@functools.lru_cache(maxsize=64)
def is_subscription_command(self, cmd_type: Type[protocol.Command]) -> bool:
return bool(self.subscription_msg_types.intersection(
{cmd_type, protocol.Command}
))

@property
@abstractmethod
def msg_queue_maxsize(self) -> int:
raise NotImplementedError("Must be implemented by subclasses")
pass

def register_peer(self, peer: BasePeer) -> None:
"""
Expand All @@ -577,16 +610,27 @@ def msg_queue(self) -> 'asyncio.Queue[PEER_MSG_TYPE]':
def queue_size(self) -> int:
return self.msg_queue.qsize()

def add_msg(self, msg: 'PEER_MSG_TYPE') -> None:
def add_msg(self, msg: 'PEER_MSG_TYPE') -> bool:
peer, cmd, _ = msg

if not self.is_subscription_command(type(cmd)):
self.logger.trace( # type: ignore
"Discarding %s msg from %s; not subscribed to msg type; "
"subscriptions: %s",
cmd, peer, self.subscription_msg_types,
)
return False

try:
self.logger.trace( # type: ignore
"Adding %s msg from %s to queue; queue_size=%d", cmd, peer, self.queue_size)
self.msg_queue.put_nowait(msg)
return True
except asyncio.queues.QueueFull:
self.logger.warn( # type: ignore
"%s msg queue is full; discarding %s msg from %s",
self.__class__.__name__, cmd, peer)
return False

@contextlib.contextmanager
def subscribe(self, peer_pool: 'PeerPool') -> Iterator[None]:
Expand Down
57 changes: 57 additions & 0 deletions tests/p2p/test_peer_subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import asyncio
import logging

import pytest

from p2p.peer import PeerSubscriber
from p2p.protocol import Command

from trinity.protocol.eth.peer import ETHPeer
from trinity.protocol.eth.commands import GetBlockHeaders

from tests.trinity.core.peer_helpers import (
get_directly_linked_peers,
)


logger = logging.getLogger('testing.p2p.PeerSubscriber')


class HeadersSubscriber(PeerSubscriber):
logger = logger
msg_queue_maxsize = 10
subscription_msg_types = {GetBlockHeaders}


class AllSubscriber(PeerSubscriber):
logger = logger
msg_queue_maxsize = 10
subscription_msg_types = {Command}


@pytest.mark.asyncio
async def test_peer_subscriber_filters_messages(request, event_loop):
peer, remote = await get_directly_linked_peers(
request,
event_loop,
peer1_class=ETHPeer,
peer2_class=ETHPeer,
)

header_subscriber = HeadersSubscriber()
all_subscriber = AllSubscriber()

peer.add_subscriber(header_subscriber)
peer.add_subscriber(all_subscriber)

remote.sub_proto.send_get_node_data([b'\x00' * 32])
remote.sub_proto.send_get_block_headers(0, 1, 0, False)
remote.sub_proto.send_get_node_data([b'\x00' * 32])
remote.sub_proto.send_get_block_headers(1, 1, 0, False)
remote.sub_proto.send_get_node_data([b'\x00' * 32])

# yeild to let remote and peer transmit.
await asyncio.sleep(0.01)

assert header_subscriber.queue_size == 2
assert all_subscriber.queue_size == 5
7 changes: 6 additions & 1 deletion tests/trinity/core/peer_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import os
from typing import List
from typing import (
List,
)

from eth_hash.auto import keccak

Expand All @@ -16,6 +18,7 @@
from p2p import kademlia
from p2p.auth import decode_authentication
from p2p.peer import BasePeer, PeerPool, PeerSubscriber
from p2p.protocol import Command


from trinity.protocol.les.peer import LESPeer
Expand Down Expand Up @@ -174,6 +177,8 @@ async def _run(self) -> None:
class SamplePeerSubscriber(PeerSubscriber):
logger = TraceLogger("")

subscription_msg_types = {Command}

@property
def msg_queue_maxsize(self) -> int:
return 100
19 changes: 11 additions & 8 deletions trinity/plugins/builtin/tx_pool/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
cast,
Callable,
Iterable,
List
List,
Set,
Type,
)
import uuid

Expand All @@ -21,6 +23,7 @@
PeerPool,
PeerSubscriber,
)
from p2p.protocol import Command
from p2p.service import (
BaseService
)
Expand Down Expand Up @@ -59,12 +62,12 @@ def __init__(self,
self._bloom = BloomFilter(max_elements=1000000)
self._bloom_salt = str(uuid.uuid4())

@property
def msg_queue_maxsize(self) -> int:
# This is a rather arbitrary value, but when the sync is operating normally we never see
# the msg queue grow past a few hundred items, so this should be a reasonable limit for
# now.
return 2000
subscription_msg_types: Set[Type[Command]] = {Transactions}

# This is a rather arbitrary value, but when the sync is operating normally we never see
# the msg queue grow past a few hundred items, so this should be a reasonable limit for
# now.
msg_queue_maxsize: int = 2000

async def _run(self) -> None:
self.logger.info("Running Tx Pool")
Expand All @@ -74,8 +77,8 @@ async def _run(self) -> None:
peer, cmd, msg = await self.wait(
self.msg_queue.get(), token=self.cancel_token)
peer = cast(ETHPeer, peer)
msg = cast(List[BaseTransactionFields], msg)
if isinstance(cmd, Transactions):
msg = cast(List[BaseTransactionFields], msg)
await self._handle_tx(peer, msg)

async def _handle_tx(self, peer: ETHPeer, txs: List[BaseTransactionFields]) -> None:
Expand Down
42 changes: 32 additions & 10 deletions trinity/sync/full/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
Dict,
List,
NamedTuple,
Set,
Tuple,
Type,
Union,
cast,
)
Expand All @@ -32,9 +34,15 @@
from p2p.exceptions import NoEligiblePeers
from p2p.p2p_proto import DisconnectReason
from p2p.peer import PeerPool
from p2p.protocol import Command

from trinity.db.chain import AsyncChainDB
from trinity.protocol.eth import commands
from trinity.protocol.eth import (
constants as eth_constants,
)
from trinity.protocol.eth.peer import ETHPeer
from trinity.protocol.eth.requests import HeaderRequest
from trinity.protocol.les.peer import LESPeer
from trinity.rlp.block_body import BlockBody
from trinity.sync.base_chain_syncer import BaseHeaderChainSyncer
Expand Down Expand Up @@ -66,6 +74,24 @@ def __init__(self,
self._downloaded_receipts: asyncio.Queue[Tuple[ETHPeer, List[DownloadedBlockPart]]] = asyncio.Queue() # noqa: E501
self._downloaded_bodies: asyncio.Queue[Tuple[ETHPeer, List[DownloadedBlockPart]]] = asyncio.Queue() # noqa: E501

subscription_msg_types: Set[Type[Command]] = {
commands.BlockBodies,
commands.Receipts,
commands.NewBlock,
commands.GetBlockHeaders,
commands.BlockHeaders,
commands.GetBlockBodies,
commands.GetReceipts,
commands.GetNodeData,
commands.Transactions,
commands.NodeData,
# TODO: all of the following are here to quiet warning logging output
# until the messages are properly handled.
commands.Transactions,
commands.NewBlock,
commands.NewBlockHashes,
}

async def _calculate_td(self, headers: Tuple[BlockHeader, ...]) -> int:
"""Return the score (total difficulty) of the last header in the given list.
Expand Down Expand Up @@ -191,7 +217,6 @@ def _request_block_parts(
target_td: int,
headers: List[BlockHeader],
request_func: Callable[[ETHPeer, List[BlockHeader]], None]) -> int:
from trinity.protocol.eth.peer import ETHPeer # noqa: F811
peers = self.peer_pool.get_peers(target_td)
if not peers:
raise NoEligiblePeers()
Expand Down Expand Up @@ -235,15 +260,14 @@ def request_receipts(self, target_td: int, headers: List[BlockHeader]) -> int:

async def _handle_msg(self, peer: HeaderRequestingPeer, cmd: protocol.Command,
msg: protocol._DecodedMsgType) -> None:
from trinity.protocol.eth.peer import ETHPeer # noqa: F811
from trinity.protocol.eth import commands
from trinity.protocol.eth import (
constants as eth_constants,
)

peer = cast(ETHPeer, peer)

if isinstance(cmd, commands.BlockBodies):
# TODO: stop ignoring these once we have proper handling for these messages.
ignored_commands = (commands.Transactions, commands.NewBlock, commands.NewBlockHashes)

if isinstance(cmd, ignored_commands):
pass
elif isinstance(cmd, commands.BlockBodies):
await self._handle_block_bodies(peer, list(cast(Tuple[BlockBody], msg)))
elif isinstance(cmd, commands.Receipts):
await self._handle_block_receipts(peer, cast(List[List[Receipt]], msg))
Expand Down Expand Up @@ -318,8 +342,6 @@ async def _handle_get_block_headers(
self,
peer: ETHPeer,
query: Dict[str, Any]) -> None:
from trinity.protocol.eth.requests import HeaderRequest # noqa: F811

self.logger.debug("Peer %s made header request: %s", peer, query)
request = HeaderRequest(
query['block_number_or_hash'],
Expand Down
32 changes: 23 additions & 9 deletions trinity/sync/full/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
List,
Set,
Tuple,
Type,
Union,
)

Expand Down Expand Up @@ -88,12 +89,26 @@ def __init__(self,
self._peer_missing_nodes: Dict[ETHPeer, Set[Hash32]] = collections.defaultdict(set)
self._executor = get_asyncio_executor()

@property
def msg_queue_maxsize(self) -> int:
# This is a rather arbitrary value, but when the sync is operating normally we never see
# the msg queue grow past a few hundred items, so this should be a reasonable limit for
# now.
return 2000
# Throughout the whole state sync our chain head is fixed, so it makes sense to ignore
# messages related to new blocks/transactions, but we must handle requests for data from
# other peers or else they will disconnect from us.
subscription_msg_types: Set[Type[Command]] = {
commands.NodeData,
commands.GetBlockHeaders,
commands.GetBlockBodies,
commands.GetReceipts,
commands.GetNodeData,
# TODO: all of the following are here to quiet warning logging output
# until the messages are properly handled.
commands.Transactions,
commands.NewBlock,
commands.NewBlockHashes,
}

# This is a rather arbitrary value, but when the sync is operating normally we never see
# the msg queue grow past a few hundred items, so this should be a reasonable limit for
# now.
msg_queue_maxsize: int = 2000

def deregister_peer(self, peer: BasePeer) -> None:
# Use .pop() with a default value as it's possible we never requested anything to this
Expand Down Expand Up @@ -154,10 +169,9 @@ async def _process_nodes(self, nodes: Iterable[Tuple[Hash32, bytes]]) -> None:

async def _handle_msg(
self, peer: ETHPeer, cmd: Command, msg: _DecodedMsgType) -> None:
# Throughout the whole state sync our chain head is fixed, so it makes sense to ignore
# messages related to new blocks/transactions, but we must handle requests for data from
# other peers or else they will disconnect from us.
# TODO: stop ignoring these once we have proper handling for these messages.
ignored_commands = (commands.Transactions, commands.NewBlock, commands.NewBlockHashes)

if isinstance(cmd, ignored_commands):
pass
elif isinstance(cmd, commands.NodeData):
Expand Down
Loading

0 comments on commit bf1d3c2

Please sign in to comment.