Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Call p2pservice.connect_network() once on start #4094

Merged
merged 1 commit into from
Apr 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions golem/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,6 @@ def connect(ports):

listener = ClientTaskComputerEventListener(self)
self.task_server.task_computer.register_listener(listener)
self.p2pservice.connect_to_network()

if self.monitor:
self.diag_service.register(self.p2pservice,
Expand All @@ -518,8 +517,6 @@ def terminate(*exceptions):
gatherResults([p2p, task], consumeErrors=True).addCallbacks(connect,
terminate)

self.resume()

logger.info("Starting p2p server ...")
self.p2pservice.task_server = self.task_server
self.p2pservice.set_resource_server(self.resource_server)
Expand All @@ -530,6 +527,8 @@ def terminate(*exceptions):
self.task_server.start_accepting(listening_established=task.callback,
listening_failure=task.errback)

self.resume()

def _restore_locks(self) -> None:
assert self.task_server is not None
tm = self.task_server.task_manager
Expand Down Expand Up @@ -582,9 +581,11 @@ def pause(self):
service.stop()

if self.p2pservice:
logger.debug("Pausing p2pservice")
self.p2pservice.pause()
self.p2pservice.disconnect()
if self.task_server:
logger.debug("Pausing task_server")
yield self.task_server.pause()
self.task_server.disconnect()
self.task_server.task_computer.quit()
Expand Down
17 changes: 12 additions & 5 deletions golem/network/p2p/p2pservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,21 +142,25 @@ def _listening_established(self, port):

def connect_to_network(self):
# pylint: disable=singleton-comparison
logger.debug("Connecting to seeds")
self.connect_to_seeds()
if not self.connect_to_known_hosts:
return

logger.debug("Connecting to known hosts")

for host in KnownHosts.select() \
.where(KnownHosts.is_seed == False)\
.limit(self.config_desc.opt_peer_num): # noqa

ip_address = host.ip_address
port = host.port

logger.debug("Connecting to {}:{}".format(ip_address, port))
logger.debug("Connecting to %s:%s ...", ip_address, port)
try:
socket_address = tcpnetwork.SocketAddress(ip_address, port)
self.connect(socket_address)
logger.debug("Connected!")
except Exception as exc:
logger.error("Cannot connect to host {}:{}: {}"
.format(ip_address, port, exc))
Expand All @@ -168,13 +172,15 @@ def connect_to_seeds(self):

for _ in range(len(self.seeds)):
ip_address, port = self._get_next_random_seed()
logger.debug("Connecting to %s:%s ...", ip_address, port)
try:
socket_address = tcpnetwork.SocketAddress(ip_address, port)
self.connect(socket_address)
except Exception as exc:
logger.error("Cannot connect to seed %s:%s: %s",
ip_address, port, exc)
continue
logger.debug("Connected!")
break # connected

def connect(self, socket_address):
Expand Down Expand Up @@ -220,7 +226,7 @@ def add_known_peer(self, node, ip_address, port, metadata=None):

except Exception as err:
logger.error(
"Couldn't add known peer %r:%r : %s",
"Couldn't add known peer %s:%s - %s",
ip_address,
port,
err
Expand Down Expand Up @@ -329,8 +335,9 @@ def add_peer(self, peer: PeerSession):
"""
key_id = peer.key_id
logger.info(
"Adding peer %s, key id difficulty: %r",
"Adding peer. node=%s, address=%s:%s, key_difficulty=%r",
node_info_str(peer.node_name, key_id),
peer.address, peer.port,
self.keys_auth.get_difficulty(key_id)
)
with self._peer_lock:
Expand Down Expand Up @@ -371,10 +378,10 @@ def try_to_add_peer(self, peer_info: dt_p2p.Peer, force=False):
return

logger.info(
"add peer to incoming. address=%r:%r, node=%s",
"Adding peer to incoming. node=%s, address=%s:%s",
node_info_str(node_name, key_id),
peer_info["address"],
peer_info["port"],
node_info_str(node_name, key_id)
)

self.incoming_peers[key_id] = {
Expand Down
10 changes: 4 additions & 6 deletions golem/network/p2p/peersession.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def start(self):
if self.conn_type is None:
raise Exception('Connection type (client/server) unknown')
logger.info(
"Starting peer session %r:%r",
"Starting peer session. address=%s:%r",
self.address,
self.port
)
Expand Down Expand Up @@ -277,7 +277,7 @@ def _react_to_hello(self, msg):
if proto_id != variables.PROTOCOL_CONST.ID:
logger.info(
"P2P protocol version mismatch %r vs %r (local)"
" for node %r:%r",
" for node %s:%r",
proto_id,
variables.PROTOCOL_CONST.ID,
self.address,
Expand Down Expand Up @@ -506,9 +506,6 @@ def __set_verified_conn(self):
self.verified = True

if self.p2p_service.enough_peers():
logger_msg = "TOO MANY PEERS, DROPPING CONNECTION: {} {}: {}" \
.format(self.node_name, self.address, self.port)
logger.info(logger_msg)
self._send_peers(node_key_id=self.p2p_service.get_key_id())
self.disconnect(message.base.Disconnect.REASON.TooManyPeers)

Expand All @@ -526,11 +523,12 @@ def __set_verified_conn(self):
if p:
if p != self and p.conn.opened:
logger.warning(
"PEER DUPLICATED: %r %r : %r AND %r : %r",
"Peer duplicated. new=%r (%s:%r), old=%r (%s:%r)",
p.node_name,
p.address,
p.port,
self.node_name,
self.address,
self.port
)
self.disconnect(message.base.Disconnect.REASON.DuplicatePeers)
Expand Down
12 changes: 4 additions & 8 deletions golem/network/transport/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,8 @@ def disconnect(self, reason: message.base.Disconnect.REASON):
""" Send "disconnect" message to the peer and drop the connection.
:param string reason: Reason for disconnecting.
"""
logger.info(
"Disconnecting %r:%r reason: %r",
self.address,
self.port,
reason,
)
logger.info("Sending disconnect message. reason=%s, address=%s:%r",
reason.name, self.address, self.port,)
if self.conn.opened:
self._send_disconnect(reason)
self.dropped()
Expand Down Expand Up @@ -150,8 +146,8 @@ def _check_msg(self, msg):
return True

def _react_to_disconnect(self, msg):
logger.info("Disconnect reason: %r", msg.reason)
logger.info("Closing %s:%s", self.address, self.port)
logger.info("Received disconnect message. reason=%s, address=%s:%r",
msg.reason.name, self.address, self.port)
self.dropped()


Expand Down