Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

TaskSession almost stateless #3896

Merged
merged 10 commits into from
Feb 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions apps/core/task/coretask.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@
import logging
import os
import time
from typing import Type, Dict, Any
from typing import (
Any,
Dict,
List,
Type,
)

from ethereum.utils import denoms
from golem_messages import idgenerator
from golem_messages.datastructures import p2p as dt_p2p
from golem_messages.datastructures import tasks as dt_tasks
import golem_messages.message
from golem.verificator.core_verifier import CoreVerifier
from golem.verificator.verifier import SubtaskVerificationState

from apps.core.task.coretaskstate import TaskDefinition, Options
from apps.core.verification_queue import VerificationQueue
Expand All @@ -24,6 +27,8 @@
TaskTypeInfo, AcceptClientVerdict
from golem.task.taskclient import TaskClient
from golem.task.taskstate import SubtaskStatus
from golem.verificator.core_verifier import CoreVerifier
from golem.verificator.verifier import SubtaskVerificationState

logger = logging.getLogger("apps.core")

Expand Down Expand Up @@ -437,6 +442,13 @@ def _mark_subtask_failed(self, subtask_id):
self.counting_nodes[node_id].reject()
self.num_failed_subtasks += 1

def get_finishing_subtasks(self, node_id: str) -> List[dict]:
return [
subtask for subtask in self.subtasks_given
if subtask['status'].is_finishing()
and subtask['node_id'] == node_id
]

def get_resources(self):
return self.task_resources

Expand Down
5 changes: 2 additions & 3 deletions golem/task/server/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,9 @@ def _restore_resources_error(self, task_id, error):
self.task_manager.delete_task(task_id)

def request_resource(self, task_id, subtask_id, resources):
if subtask_id not in self.task_sessions:
logger.error("Cannot map subtask_id %r to session", subtask_id)
if not self.client.resource_server:
logger.error("ResourceManager not ready")
return False

resource_manager = self.client.resource_server.resource_manager
resources = resource_manager.from_wire(resources)

Expand Down
10 changes: 9 additions & 1 deletion golem/task/taskbase.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import abc
import logging
from enum import Enum
from typing import List, Type, Optional
from typing import (
List,
Optional,
Type,
)

import golem_messages
from golem_messages.datastructures import tasks as dt_tasks
Expand Down Expand Up @@ -338,3 +342,7 @@ def should_accept_client(self, node_id):
@abc.abstractmethod
def accept_client(self, node_id):
pass

# pylint: disable=unused-argument, no-self-use
def get_finishing_subtasks(self, node_id: str) -> List[dict]:
return []
46 changes: 20 additions & 26 deletions golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ def __init__(self,
finished_cb=task_finished_cb)
self.task_connections_helper = TaskConnectionsHelper()
self.task_connections_helper.task_server = self
# Remove .task_sessions when Message Queue is implemented
# https://github.com/golemfactory/golem/issues/2223
self.task_sessions = {}
self.task_sessions_incoming = weakref.WeakSet()
self.task_sessions_incoming: weakref.WeakSet = weakref.WeakSet()
self.task_sessions_outgoing: weakref.WeakSet = weakref.WeakSet()

OfferPool.change_interval(self.config_desc.offer_pooling_interval)

Expand Down Expand Up @@ -165,6 +168,12 @@ def __init__(self,
signal='golem.taskmanager'
)

@property
def all_sessions(self):
return frozenset(
self.task_sessions_outgoing | self.task_sessions_incoming,
)

def sync_network(self, timeout=None):
if timeout is None:
timeout = self.last_message_time_threshold
Expand Down Expand Up @@ -361,13 +370,7 @@ def new_connection(self, session):
session.disconnect(message.base.Disconnect.REASON.NoMoreMessages)

def disconnect(self):
task_sessions = dict(self.task_sessions)
sessions_incoming = weakref.WeakSet(self.task_sessions_incoming)

for task_session in list(task_sessions.values()):
task_session.dropped()

for task_session in sessions_incoming:
for task_session in self.all_sessions:
try:
task_session.dropped()
except Exception as exc:
Expand Down Expand Up @@ -791,7 +794,6 @@ def __connection_for_task_request_established(
estimated_performance, price, max_resource_size, max_memory_size):
self.new_session_prepare(
session=session,
subtask_id=task_id,
key_id=key_id,
conn_id=conn_id,
)
Expand Down Expand Up @@ -824,7 +826,6 @@ def __connection_for_task_result_established(self, session, conn_id,
waiting_task_result):
self.new_session_prepare(
session=session,
subtask_id=waiting_task_result.subtask_id,
key_id=waiting_task_result.owner.key,
conn_id=conn_id,
)
Expand Down Expand Up @@ -857,7 +858,6 @@ def __connection_for_task_failure_established(self, session, conn_id,
key_id, subtask_id, err_msg):
self.new_session_prepare(
session=session,
subtask_id=subtask_id,
key_id=key_id,
conn_id=conn_id,
)
Expand Down Expand Up @@ -887,7 +887,6 @@ def __connection_for_start_session_established(
ans_conn_id):
self.new_session_prepare(
session=session,
subtask_id=None,
key_id=key_id,
conn_id=conn_id,
)
Expand Down Expand Up @@ -945,17 +944,16 @@ def __connection_for_start_session_final_failure(

def new_session_prepare(self,
session: TaskSession,
subtask_id: str,
key_id: str,
conn_id: str):
self.remove_forwarded_session_request(key_id)
session.task_id = subtask_id
session.key_id = key_id
session.conn_id = conn_id
self._mark_connected(conn_id, session.address, session.port)
self.task_sessions[subtask_id] = session
self.task_sessions_outgoing.add(session)

def noop(self, *args, **kwargs):
@classmethod
def noop(cls, *args, **kwargs):
args_, kwargs_ = args, kwargs # avoid params name collision in logger
logger.debug('Noop(%r, %r)', args_, kwargs_)

Expand All @@ -970,7 +968,6 @@ def __connection_for_task_verification_result_established(
full_path_files = extracted_package.get_full_path_files()
self.new_session_prepare(
session=session,
subtask_id=subtask_id,
key_id=key_id,
conn_id=conn_id,
)
Expand All @@ -995,17 +992,14 @@ def __remove_old_tasks(self):

def __remove_old_sessions(self):
cur_time = time.time()
sessions_to_remove = []
sessions = dict(self.task_sessions)

for subtask_id, session in sessions.items():
for session in self.all_sessions:
dt = cur_time - session.last_message_time
if dt > self.last_message_time_threshold:
sessions_to_remove.append(subtask_id)
for subtask_id in sessions_to_remove:
if sessions[subtask_id].task_computer is not None:
sessions[subtask_id].task_computer.session_timeout()
sessions[subtask_id].dropped()
if dt < self.last_message_time_threshold:
continue
if session.task_computer is not None:
session.task_computer.session_timeout()
session.dropped()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will dropping the session here change the list of all_sessions? Think it was 2 loops for this reason

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually all_sessions is a frozenset() thus it can't change. Also it's a dynamically generated property which is a copy of incoming and outgoing sessions sets combined.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! ok, approving :D


def __send_waiting_results(self):
for subtask_id in list(self.results_to_send.keys()):
Expand Down
54 changes: 48 additions & 6 deletions golem/task/tasksession.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ def __init__(self, conn):
"""
BasicSafeSession.__init__(self, conn)
ResourceHandshakeSessionMixin.__init__(self)
# FIXME: Remove task_id and use values from messages
self.task_id = None # current task id
self.conn_id = None # connection id
# set in TaskServer.new_session_prepare()
self.key_id: Optional[str] = None
# messages waiting to be send (because connection hasn't been
# verified yet)
self.msgs_to_send = []
Expand Down Expand Up @@ -527,7 +527,21 @@ def _cannot_assign(reason):
if self.task_manager.should_wait_for_node(msg.task_id, self.key_id):
logger.warning("Can not accept offer: Still waiting on results."
"task_id=%r, node=%r", msg.task_id, node_name_id)
self.send(message.tasks.WaitingForResults())
task = self.task_manager.tasks[msg.task_id]
subtasks = task.get_finishing_subtasks(
node_id=self.key_id,
)
previous_ttc = get_task_message(
message_class_name='TaskToCompute',
node_id=self.key_id,
task_id=msg.task_id,
subtask_id=subtasks[0]['subtask_id'],
)
self.send(
message.tasks.WaitingForResults(
task_to_compute=previous_ttc,
),
)
return

if self._handshake_required(self.key_id):
Expand Down Expand Up @@ -731,11 +745,35 @@ def _cannot_compute(reason):

def _react_to_waiting_for_results(
self,
_msg: message.tasks.WaitingForResults,
msg: message.tasks.WaitingForResults,
):
if self.concent_service.available:
concent_key = self.concent_service.variant['pubkey']
else:
concent_key = None
try:
msg.verify_owners(
requestor_public_key=self.key_id,
provider_public_key=self.task_server.keys_auth.ecc.raw_pubkey,
concent_public_key=concent_key,
)
except msg_exceptions.MessageError:
node_id = common.short_node_id(self.key_id)
logger.info(
'Dropping invalid WaitingForResults.'
' sender_node_id: %(node_id)s, task_id: %(task_id)s,'
' subtask_id: %(subtask_id)s',
{
'node_id': node_id,
'task_id': msg.task_id,
'subtask_id': msg.subtask_id,
},
)
logger.debug('Invalid WaitingForResults received', exc_info=True)
return
self.task_server.subtask_waiting(
task_id=self.task_id,
subtask_id=None,
task_id=msg.task_id,
subtask_id=msg.subtask_id,
)
self.task_computer.session_closed()
if not self.msgs_to_send:
Expand Down Expand Up @@ -778,6 +816,10 @@ def _react_to_report_computed_task(self, msg):
self.dropped()
return

self.task_server.add_task_session(
msg.subtask_id, self
)

returned_msg = concent_helpers.process_report_computed_task(
msg=msg,
ecc=self.task_server.keys_auth.ecc,
Expand Down
3 changes: 3 additions & 0 deletions golem/task/taskstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ def is_active(self) -> bool:
def is_finished(self) -> bool:
return self == self.finished

def is_finishing(self) -> bool:
return self in {self.downloading, self.verifying}


class TaskTestStatus(Enum):
started = 'Started'
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#git+https://github.com/golemfactory/golem-messages@<branchname>
--extra-index-url https://builds.golem.network --trusted-host build.golem.network
appdirs==1.4.3
argh==0.26.2
Expand Down Expand Up @@ -32,7 +33,7 @@ eth-keys==0.2.0b3
eth-tester==0.1.0-beta.24
eth-utils==1.0.3
ethereum==1.6.1
Golem-Messages==3.0.0
Golem-Messages==3.1.0
Golem-Smart-Contracts-Interface==1.6.1
greenlet==0.4.15
h2==3.0.1
Expand Down
3 changes: 2 additions & 1 deletion requirements_to-freeze.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#git+https://github.com/golemfactory/golem-messages@<branchname>
--extra-index-url https://builds.golem.network
appdirs>=1.4
asn1crypto==0.22.0
Expand All @@ -14,7 +15,7 @@ docker==3.5.0
enforce==0.3.4
eth-utils==1.0.3
ethereum==1.6.1
Golem-Messages==3.0.0
Golem-Messages==3.1.0
Golem-Smart-Contracts-Interface==1.6.1
html2text==2018.1.9
humanize==0.5.1
Expand Down
6 changes: 6 additions & 0 deletions tests/golem/task/dummy/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,12 @@ def should_accept_client(self, node_id):
return AcceptClientVerdict.SHOULD_WAIT
return AcceptClientVerdict.ACCEPTED

def get_finishing_subtasks(self, node_id):
try:
return [{'subtask_id': self.assigned_nodes[node_id]}]
except KeyError:
return []

def accept_client(self, node_id):
print('DummyTask.accept_client called node_id=%r '
'- WIP: move more responsibilities from query_extra_data',
Expand Down
5 changes: 0 additions & 5 deletions tests/golem/task/server/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ class TestTaskResourcesMixin(TestWithClient):
def setUp(self):
super().setUp()
self.server = TaskResourcesMixin()
self.server.task_sessions = {}
self.server.task_manager = self.client.task_manager
self.server.client = self.client
self.server.task_keeper = TaskHeaderKeeper(
Expand All @@ -18,9 +17,5 @@ def setUp(self):
min_price=0
)

def test_request_resource_no_session(self):
assert not self.server.request_resource("task_id1", "subtask_id2", [])

def test_request_resource(self):
self.server.task_sessions["subtask_id"] = MagicMock()
assert self.server.request_resource("task_id1", "subtask_id", [])
Loading