Skip to content

Commit

Permalink
refactor(p2p): implement P2PDependencies class
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Oct 11, 2024
1 parent 3c767f7 commit d972adf
Show file tree
Hide file tree
Showing 25 changed files with 392 additions and 262 deletions.
34 changes: 16 additions & 18 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from hathor.manager import HathorManager
from hathor.mining.cpu_mining_service import CpuMiningService
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.p2p_dependencies import P2PDependencies
from hathor.p2p.peer import PrivatePeer
from hathor.pubsub import PubSubManager
from hathor.reactor import ReactorProtocol as Reactor
Expand Down Expand Up @@ -64,12 +65,10 @@ class SyncSupportLevel(IntEnum):
@classmethod
def add_factories(
cls,
settings: HathorSettingsType,
p2p_manager: ConnectionsManager,
dependencies: P2PDependencies,
sync_v1_support: 'SyncSupportLevel',
sync_v2_support: 'SyncSupportLevel',
vertex_parser: VertexParser,
vertex_handler: VertexHandler,
) -> None:
"""Adds the sync factory to the manager according to the support level."""
from hathor.p2p.sync_v1.factory import SyncV11Factory
Expand All @@ -78,18 +77,12 @@ def add_factories(

# sync-v1 support:
if sync_v1_support > cls.UNAVAILABLE:
p2p_manager.add_sync_factory(SyncVersion.V1_1, SyncV11Factory(p2p_manager, vertex_parser=vertex_parser))
p2p_manager.add_sync_factory(SyncVersion.V1_1, SyncV11Factory(dependencies))
if sync_v1_support is cls.ENABLED:
p2p_manager.enable_sync_version(SyncVersion.V1_1)
# sync-v2 support:
if sync_v2_support > cls.UNAVAILABLE:
sync_v2_factory = SyncV2Factory(
settings,
p2p_manager,
vertex_parser=vertex_parser,
vertex_handler=vertex_handler,
)
p2p_manager.add_sync_factory(SyncVersion.V2, sync_v2_factory)
p2p_manager.add_sync_factory(SyncVersion.V2, SyncV2Factory(dependencies))
if sync_v2_support is cls.ENABLED:
p2p_manager.enable_sync_version(SyncVersion.V2)

Expand Down Expand Up @@ -415,25 +408,30 @@ def _get_or_create_p2p_manager(self) -> ConnectionsManager:
return self._p2p_manager

enable_ssl = True
reactor = self._get_reactor()
my_peer = self._get_peer()

self._p2p_manager = ConnectionsManager(
dependencies = P2PDependencies(
reactor=self._get_reactor(),
settings=self._get_or_create_settings(),
reactor=reactor,
my_peer=my_peer,
vertex_parser=self._get_or_create_vertex_parser(),
tx_storage=self._get_or_create_tx_storage(),
vertex_handler=self._get_or_create_vertex_handler(),
verification_service=self._get_or_create_verification_service(),
pubsub=self._get_or_create_pubsub(),
)

self._p2p_manager = ConnectionsManager(
dependencies=dependencies,
my_peer=my_peer,
ssl=enable_ssl,
whitelist_only=False,
rng=self._rng,
)
SyncSupportLevel.add_factories(
self._get_or_create_settings(),
self._p2p_manager,
dependencies,
self._sync_v1_support,
self._sync_v2_support,
self._get_or_create_vertex_parser(),
self._get_or_create_vertex_handler(),
)
return self._p2p_manager

Expand Down
33 changes: 20 additions & 13 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from hathor.mining.cpu_mining_service import CpuMiningService
from hathor.p2p.entrypoint import Entrypoint
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.p2p_dependencies import P2PDependencies
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.utils import discover_hostname, get_genesis_short_hash
from hathor.pubsub import PubSubManager
Expand Down Expand Up @@ -318,16 +319,6 @@ def create_manager(self, reactor: Reactor) -> HathorManager:

cpu_mining_service = CpuMiningService()

p2p_manager = ConnectionsManager(
settings=settings,
reactor=reactor,
my_peer=peer,
pubsub=pubsub,
ssl=True,
whitelist_only=False,
rng=Random(),
)

vertex_handler = VertexHandler(
reactor=reactor,
settings=settings,
Expand All @@ -340,13 +331,29 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
log_vertex_bytes=self._args.log_vertex_bytes,
)

p2p_dependencies = P2PDependencies(
reactor=reactor,
settings=settings,
vertex_parser=vertex_parser,
tx_storage=tx_storage,
vertex_handler=vertex_handler,
verification_service=verification_service,
pubsub=pubsub,
)

p2p_manager = ConnectionsManager(
dependencies=p2p_dependencies,
my_peer=peer,
ssl=True,
whitelist_only=False,
rng=Random(),
)

SyncSupportLevel.add_factories(
settings,
p2p_manager,
p2p_dependencies,
sync_v1_support,
sync_v2_support,
vertex_parser,
vertex_handler,
)

from hathor.consensus.poa import PoaBlockProducer, PoaSignerFile
Expand Down
8 changes: 4 additions & 4 deletions hathor/p2p/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from twisted.internet import protocol
from twisted.internet.interfaces import IAddress

from hathor.conf.settings import HathorSettings
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.p2p_dependencies import P2PDependencies
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.protocol import HathorLineReceiver

Expand All @@ -31,22 +31,22 @@ def __init__(
my_peer: PrivatePeer,
p2p_manager: ConnectionsManager,
*,
settings: HathorSettings,
dependencies: P2PDependencies,
use_ssl: bool,
):
super().__init__()
self._settings = settings
self.my_peer = my_peer
self.p2p_manager = p2p_manager
self.dependencies = dependencies
self.use_ssl = use_ssl

def buildProtocol(self, addr: IAddress) -> HathorLineReceiver:
p = HathorLineReceiver(
my_peer=self.my_peer,
p2p_manager=self.p2p_manager,
dependencies=self.dependencies,
use_ssl=self.use_ssl,
inbound=self.inbound,
settings=self._settings
)
p.factory = self
return p
Expand Down
43 changes: 22 additions & 21 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
from twisted.python.failure import Failure
from twisted.web.client import Agent

from hathor.conf.settings import HathorSettings
from hathor.p2p.entrypoint import Entrypoint
from hathor.p2p.netfilter.factory import NetfilterFactory
from hathor.p2p.p2p_dependencies import P2PDependencies
from hathor.p2p.peer import PrivatePeer, PublicPeer, UnverifiedPeer
from hathor.p2p.peer_discovery import PeerDiscovery
from hathor.p2p.peer_id import PeerId
Expand All @@ -37,8 +37,7 @@
from hathor.p2p.sync_factory import SyncAgentFactory
from hathor.p2p.sync_version import SyncVersion
from hathor.p2p.utils import parse_whitelist
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.reactor import ReactorProtocol as Reactor
from hathor.pubsub import HathorEvents
from hathor.transaction import BaseTransaction
from hathor.util import Random

Expand Down Expand Up @@ -93,24 +92,23 @@ class GlobalRateLimiter:

def __init__(
self,
settings: HathorSettings,
reactor: Reactor,
dependencies: P2PDependencies,
my_peer: PrivatePeer,
pubsub: PubSubManager,
ssl: bool,
rng: Random,
whitelist_only: bool,
) -> None:
self.log = logger.new()
self._settings = settings
self.dependencies = dependencies
self._settings = dependencies.settings
self.rng = rng
self.manager = None

self.MAX_ENABLED_SYNC = settings.MAX_ENABLED_SYNC
self.SYNC_UPDATE_INTERVAL = settings.SYNC_UPDATE_INTERVAL
self.PEER_DISCOVERY_INTERVAL = settings.PEER_DISCOVERY_INTERVAL
self.MAX_ENABLED_SYNC = self._settings.MAX_ENABLED_SYNC
self.SYNC_UPDATE_INTERVAL = self._settings.SYNC_UPDATE_INTERVAL
self.PEER_DISCOVERY_INTERVAL = self._settings.PEER_DISCOVERY_INTERVAL

self.reactor = reactor
self.reactor = dependencies.reactor
self.my_peer = my_peer

# List of address descriptions to listen for new connections (eg: [tcp:8000])
Expand All @@ -129,10 +127,16 @@ def __init__(
from hathor.p2p.factory import HathorClientFactory, HathorServerFactory
self.use_ssl = ssl
self.server_factory = HathorServerFactory(
self.my_peer, p2p_manager=self, use_ssl=self.use_ssl, settings=self._settings
my_peer=self.my_peer,
p2p_manager=self,
dependencies=dependencies,
use_ssl=self.use_ssl,
)
self.client_factory = HathorClientFactory(
self.my_peer, p2p_manager=self, use_ssl=self.use_ssl, settings=self._settings
my_peer=self.my_peer,
p2p_manager=self,
dependencies=dependencies,
use_ssl=self.use_ssl,
)

# Global maximum number of connections.
Expand Down Expand Up @@ -184,9 +188,6 @@ def __init__(
self.wl_reconnect = LoopingCall(self.update_whitelist)
self.wl_reconnect.clock = self.reactor

# Pubsub object to publish events
self.pubsub = pubsub

# Parameter to explicitly enable whitelist-only mode, when False it will still check the whitelist for sync-v1
self.whitelist_only = whitelist_only

Expand Down Expand Up @@ -373,7 +374,7 @@ def on_connection_failure(self, failure: Failure, peer: Optional[UnverifiedPeer
self.log.warn('connection failure', entrypoint=entrypoint, failure=failure.getErrorMessage())
self.connecting_peers.pop(endpoint)

self.pubsub.publish(
self.dependencies.publish(
HathorEvents.NETWORK_PEER_CONNECTION_FAILED,
peer=peer,
peers_count=self._get_peers_count()
Expand All @@ -388,7 +389,7 @@ def on_peer_connect(self, protocol: HathorProtocol) -> None:
self.connections.add(protocol)
self.handshaking_peers.add(protocol)

self.pubsub.publish(
self.dependencies.publish(
HathorEvents.NETWORK_PEER_CONNECTED,
protocol=protocol,
peers_count=self._get_peers_count()
Expand All @@ -405,7 +406,7 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:

# we emit the event even if it's a duplicate peer as a matching
# NETWORK_PEER_DISCONNECTED will be emitted regardless
self.pubsub.publish(
self.dependencies.publish(
HathorEvents.NETWORK_PEER_READY,
protocol=protocol,
peers_count=self._get_peers_count()
Expand Down Expand Up @@ -459,7 +460,7 @@ def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
# chance it can happen if both connections start at the same time and none of them has
# reached READY state while the other is on PEER_ID state
self.connected_peers[protocol.peer.id] = existing_protocol
self.pubsub.publish(
self.dependencies.publish(
HathorEvents.NETWORK_PEER_DISCONNECTED,
protocol=protocol,
peers_count=self._get_peers_count()
Expand Down Expand Up @@ -653,7 +654,7 @@ def connect_to(
deferred.addCallback(self._connect_to_callback, peer, endpoint, entrypoint) # type: ignore
deferred.addErrback(self.on_connection_failure, peer, endpoint) # type: ignore
self.log.info('connect to', entrypoint=str(entrypoint), peer=str(peer))
self.pubsub.publish(
self.dependencies.publish(
HathorEvents.NETWORK_PEER_CONNECTING,
peer=peer,
peers_count=self._get_peers_count()
Expand Down
Loading

0 comments on commit d972adf

Please sign in to comment.