Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
cleanup and typing
Browse files Browse the repository at this point in the history
  • Loading branch information
etam committed Oct 25, 2019
1 parent ec80f77 commit 1e6907b
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 74 deletions.
12 changes: 7 additions & 5 deletions golem/network/p2p/peersession.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
from golem import constants as gconst
from golem.appconfig import SEND_PEERS_NUM
from golem.core import variables
from golem.core.keysauth import KeysAuth
from golem.network.transport.session import BasicSafeSession
from golem.network.transport.tcpnetwork import SafeProtocol

if typing.TYPE_CHECKING:
# pylint: disable=unused-import
from twisted.internet.protocol import Protocol

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -62,12 +65,11 @@ class PeerSession(BasicSafeSession):

ConnectionStateType = SafeProtocol

def __init__(self, conn):
def __init__(self, conn: 'Protocol') -> None:
"""
Create new session
:param Protocol conn: connection protocol implementation that this
session should enhance
:return None:
:param conn: connection protocol implementation that this
session should enhance
"""
BasicSafeSession.__init__(self, conn)
self.p2p_service = self.conn.server
Expand Down
51 changes: 10 additions & 41 deletions golem/network/transport/session.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import abc
import logging
import time
from typing import Optional
from typing import Optional, TYPE_CHECKING

from golem_messages import message

Expand All @@ -10,36 +9,23 @@
from golem.core.variables import UNVERIFIED_CNT
from .network import Session

logger = logging.getLogger(__name__)


class FileSession(Session, metaclass=abc.ABCMeta):
"""Abstract class that represents session interface with additional
operations for receiving files"""
if TYPE_CHECKING:
# pylint: disable=unused-import
from twisted.internet.protocol import Protocol

@abc.abstractmethod
def data_sent(self, extra_data=None):
raise NotImplementedError

@abc.abstractmethod
def full_data_received(self, extra_data=None):
raise NotImplementedError

@abc.abstractmethod
def production_failed(self, extra_data=None):
raise NotImplementedError
logger = logging.getLogger(__name__)


class BasicSession(FileSession):
class BasicSession(Session):
"""Basic session responsible for managing the connection and reacting
to different types of messages.
"""

def __init__(self, conn):
def __init__(self, conn: 'Protocol') -> None:
"""
Create new Session
:param Protocol conn: connection protocol implementation that
this session should enhance.
:param conn: connection protocol implementation that
this session should enhance.
"""
Session.__init__(self)
self.conn = conn
Expand Down Expand Up @@ -116,23 +102,6 @@ def send(self, msg):
self.dropped()
return

def data_sent(self, extra_data=None):
""" All data that should be send in stream mode has been send.
:param dict|None extra_data: additional information that may be needed
"""
if self.conn.producer:
self.conn.producer.close()
self.conn.producer = None

def production_failed(self, extra_data=None):
""" Producer encounter error and stopped sending data in stream mode
:param dict|None extra_data: additional information that may be needed
"""
self.dropped()

def full_data_received(self, extra_data=None):
pass

def _send_disconnect(self, reason: message.base.Disconnect.REASON):
""" :param string reason: reason to disconnect """
if not self._disconnect_sent:
Expand All @@ -159,7 +128,7 @@ class BasicSafeSession(BasicSession):

key_id: Optional[str] = None

def __init__(self, conn):
def __init__(self, conn: 'Protocol') -> None:
super().__init__(conn)
# how many unverified messages can be stored before dropping connection
self.unverified_cnt = UNVERIFIED_CNT
Expand Down
33 changes: 19 additions & 14 deletions golem/network/transport/tcpserver.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
import logging
import time
from typing import Callable, Dict, List, Optional, Set
from typing import Callable, Dict, List, Optional, Set, TYPE_CHECKING

from golem.clientconfigdescriptor import ClientConfigDescriptor
from golem.core.common import node_info_str
from golem.core.types import Kwargs
from golem.core.hostaddress import ip_address_private, ip_network_contains, \
ipv4_networks
from golem.core.variables import MAX_CONNECT_SOCKET_ADDRESSES

from .session import BasicSession
from .tcpnetwork import TCPNetwork, TCPListeningInfo, TCPListenInfo, \
from .tcpnetwork import TCPListeningInfo, TCPListenInfo, \
SocketAddress, TCPConnectInfo

if TYPE_CHECKING:
# pylint: disable=unused-import
from golem.clientconfigdescriptor import ClientConfigDescriptor
from .tcpnetwork import TCPNetwork
from .session import BasicSession

logger = logging.getLogger(__name__)


class TCPServer:
""" Basic tcp server that can start listening on given port """

def __init__(self,
config_desc: ClientConfigDescriptor,
network: TCPNetwork) -> None:
config_desc: 'ClientConfigDescriptor',
network: 'TCPNetwork') -> None:
"""
Create new server
:param config_desc: config descriptor for listening port
Expand All @@ -34,9 +38,9 @@ def __init__(self,
self.use_ipv6 = config_desc.use_ipv6 if config_desc else False
self.ipv4_networks = ipv4_networks()

def change_config(self, config_desc: ClientConfigDescriptor):
""" Change configuration descriptor. If listening port is changed, than stop listening on old port and start
listening on a new one.
def change_config(self, config_desc: 'ClientConfigDescriptor'):
""" Change configuration descriptor. If listening port is changed, then
stop listening on old port and start listening on a new one.
:param config_desc: new config descriptor
"""
self.config_desc = config_desc
Expand Down Expand Up @@ -102,8 +106,8 @@ class PendingConnectionsServer(TCPServer):
if connection attempt is unsuccessful."""

def __init__(self,
config_desc: ClientConfigDescriptor,
network: TCPNetwork) -> None:
config_desc: 'ClientConfigDescriptor',
network: 'TCPNetwork') -> None:
""" Create new server
:param config_desc: config descriptor for listening port
:param network: network that server will use
Expand All @@ -112,7 +116,7 @@ def __init__(self,
# Connections that should be accomplished
self.pending_connections: Dict[str, PendingConnection] = {}
# Sessions a.k.a Peers before handshake
self.pending_sessions: Set[BasicSession] = set()
self.pending_sessions: Set['BasicSession'] = set()
# Reactions for established connections of certain types
self.conn_established_for_type: Dict[int, Callable] = {}
# Reactions for failed connection attempts of certain types
Expand All @@ -134,8 +138,8 @@ def verified_conn(self, conn_id):
"""
self.remove_pending_conn(conn_id)

def remove_pending_conn(self, conn_id):
return self.pending_connections.pop(conn_id, None)
def remove_pending_conn(self, conn_id) -> None:
self.pending_connections.pop(conn_id, None)

def final_conn_failure(self, conn_id):
""" React to the information that all connection attempts failed. Call specific for this connection type
Expand Down Expand Up @@ -264,6 +268,7 @@ def _prepend_address(cls, addresses, address):
addresses.insert(0, addresses.pop(index))

def sync_network(self, timeout=1.0):
session: 'BasicSession'
for session in frozenset(self.pending_sessions):
if (time.time() - session.last_message_time) < timeout:
continue
Expand Down
2 changes: 1 addition & 1 deletion golem/task/server/queue_.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def remove_session_by_node_id(self, node_id):
return
self.remove_session(session)

def remove_session(self, session):
def remove_session(self, session: 'TaskSession'):
session.disconnect(
message.base.Disconnect.REASON.NoMoreMessages,
)
Expand Down
24 changes: 11 additions & 13 deletions golem/task/tasksession.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@
from golem.task.server import helpers as task_server_helpers

if TYPE_CHECKING:
from .requestedtaskmanager import RequestedTaskManager # noqa pylint:disable=unused-import
from .taskcomputer import TaskComputer # noqa pylint:disable=unused-import
from .taskmanager import TaskManager # noqa pylint:disable=unused-import
from .taskserver import TaskServer # noqa pylint:disable=unused-import
from golem.network.concent.client import ConcentClientService # noqa pylint:disable=unused-import
# pylint: disable=unused-import,ungrouped-imports
from twisted.internet.protocol import Protocol

from .requestedtaskmanager import RequestedTaskManager
from .taskcomputer import TaskComputer
from .taskmanager import TaskManager
from .taskserver import TaskServer
from golem.network.concent.client import ConcentClientService

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -107,12 +110,11 @@ class TaskSession(BasicSafeSession, ResourceHandshakeSessionMixin):

handle_attr_error = common.HandleAttributeError(drop_after_attr_error)

def __init__(self, conn):
def __init__(self, conn: 'Protocol') -> None:
"""
Create new Session
:param Protocol conn: connection protocol implementation that this
session should enhance
:return:
:param conn: connection protocol implementation that this
session should enhance
"""
BasicSafeSession.__init__(self, conn)
ResourceHandshakeSessionMixin.__init__(self)
Expand Down Expand Up @@ -227,10 +229,6 @@ def verify_owners(self, msg, my_role) -> bool:
return False
return True

#######################
# FileSession methods #
#######################

def send_hello(self):
""" Send first hello message, that should begin the communication """
self.send(
Expand Down

0 comments on commit 1e6907b

Please sign in to comment.