Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Merge branch 'develop' into new_taskcomputer
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiezzel committed Jul 30, 2019
2 parents 54cec12 + b2d6a9b commit 583788b
Show file tree
Hide file tree
Showing 11 changed files with 449 additions and 181 deletions.
53 changes: 52 additions & 1 deletion golem/task/envmanager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import logging
from typing import Dict, List, NamedTuple, Type

from twisted.internet.defer import Deferred, inlineCallbacks

from golem.envs import EnvId, Environment
from golem.model import Performance
from golem.task.task_api import TaskApiPayloadBuilder

logger = logging.getLogger(__name__)


class EnvEntry(NamedTuple):
instance: Environment
Expand All @@ -15,6 +21,7 @@ class EnvironmentManager:
def __init__(self):
self._envs: Dict[EnvId, EnvEntry] = {}
self._state: Dict[EnvId, bool] = {}
self._running_benchmark: bool = False

def register_env(
self,
Expand All @@ -40,7 +47,10 @@ def set_state(self, state: Dict[EnvId, bool]) -> None:
self.set_enabled(env_id, enabled)

def enabled(self, env_id: EnvId) -> bool:
""" Get the state (enabled or not) for an Environment. """
""" Get the state (enabled or not) for an Environment.
Also returns false when the environment is not registered"""
if env_id not in self._state:
return False
return self._state[env_id]

def set_enabled(self, env_id: EnvId, enabled: bool) -> None:
Expand All @@ -60,3 +70,44 @@ def environment(self, env_id: EnvId) -> Environment:

def payload_builder(self, env_id: EnvId) -> Type[TaskApiPayloadBuilder]:
return self._envs[env_id].payload_builder

@inlineCallbacks
def get_performance(self, env_id) -> Deferred:
""" Gets the performance for the given environment
Checks the database first, if not found it starts a benchmark
Return value Deferred resulting in a float
or None when the benchmark is already running. """
if self._running_benchmark:
return None

if not self.enabled(env_id):
raise Exception("Requested performance for disabled environment")

try:
perf = Performance.get(Performance.environment_id == env_id)
return perf.value
except Performance.DoesNotExist:
pass

env = self._envs[env_id].instance
self._running_benchmark = True

try:
result = yield env.run_benchmark()
except Exception:
logger.error(
'failed to run benchmark. env=%r',
env_id,
exc_info=True
)
raise
finally:
self._running_benchmark = False

Performance.update_or_create(env_id, result)
logger.info(
'Finshed running benchmark. env=%r, score=%r',
env_id,
result
)
return result
12 changes: 8 additions & 4 deletions golem/task/server/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,19 @@ def _restore_resources_error(self, task_id, error):
logger.error("Cannot restore task '%s' resources: %r", task_id, error)
self.task_manager.delete_task(task_id)

def request_resource(self, task_id, subtask_id, resources):
def request_resource(
self,
task_id,
subtask_id,
resources,
resources_options,
):
if not self.client.resource_server:
logger.error("ResourceManager not ready")
return False
resources = self.resource_manager.from_wire(resources)

task_keeper = self.task_manager.comp_task_keeper
options = task_keeper.get_resources_options(subtask_id)
client_options = self.get_download_options(options)
client_options = self.get_download_options(resources_options)
self.pull_resources(task_id, resources, client_options)
return True

Expand Down
32 changes: 12 additions & 20 deletions golem/task/taskkeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from golem.environments.environment import SupportStatus, UnsupportReason
from golem.environments.environmentsmanager import \
EnvironmentsManager as OldEnvManager
from golem.network.hyperdrive.client import HyperdriveClientOptions
from golem.task.envmanager import EnvironmentManager as NewEnvManager
from golem.task.taskproviderstats import ProviderStatsManager

Expand Down Expand Up @@ -59,8 +58,9 @@ class WrongOwnerException(Exception):


class CompTaskInfo:
def __init__(self, header: dt_tasks.TaskHeader) -> None:
def __init__(self, header: dt_tasks.TaskHeader, performance: float) -> None:
self.header = header
self.performance = performance
self.requests = 1
self.subtasks: typing.Dict[str, message.tasks.ComputeTaskDef] = {}
# TODO Add concent communication timeout. Issue #2406
Expand Down Expand Up @@ -114,10 +114,6 @@ def __init__(self, tasks_path: pathlib.Path):
# information about tasks that this node wants to compute
self.active_tasks: typing.Dict[str, CompTaskInfo] = {}

# information about resource options for subtask
self.resources_options: typing.Dict[str, typing.Optional[
HyperdriveClientOptions]] = {}

# price information per last task request
self.active_task_offers: typing.Dict[str, int] = {}

Expand Down Expand Up @@ -146,7 +142,7 @@ def _dump_tasks(self):
self.subtask_to_task,
self.task_package_paths,
self.active_task_offers,
self.resources_options
None, # resources_options, leaving for backwards compatibility
)
pickle.dump(dump_data, f)

Expand All @@ -162,7 +158,8 @@ def restore(self):
subtask_to_task = data[1]
task_package_paths = data[2] if len(data) > 2 else {}
active_task_offers = data[3] if len(data) > 3 else {}
resources_options = data[4] if len(data) > 4 else {}
# backwards compatibility, don't use slot 4
# resources_options = data[4] if len(data) > 4 else {}
except (pickle.UnpicklingError, EOFError, AttributeError, KeyError):
logger.exception(
'Problem restoring dumpfile: %s; deleting broken file',
Expand All @@ -175,9 +172,13 @@ def restore(self):
self.subtask_to_task.update(subtask_to_task)
self.task_package_paths.update(task_package_paths)
self.active_task_offers.update(active_task_offers)
self.resources_options.update(resources_options)

def add_request(self, theader: dt_tasks.TaskHeader, price: int):
def add_request(
self,
theader: dt_tasks.TaskHeader,
price: int,
performance: float
):
# price is task_header.max_price
logger.debug('CT.add_request(%r, %s)', theader, price)
if price < 0:
Expand All @@ -186,7 +187,7 @@ def add_request(self, theader: dt_tasks.TaskHeader, price: int):
if task_id in self.active_tasks:
self.active_tasks[task_id].requests += 1
else:
self.active_tasks[task_id] = CompTaskInfo(theader)
self.active_tasks[task_id] = CompTaskInfo(theader, performance)
self.active_task_offers[task_id] = compute_subtask_value(
price, self.active_tasks[task_id].header.subtask_timeout
)
Expand Down Expand Up @@ -227,10 +228,6 @@ def receive_subtask(self, task_to_compute: message.tasks.TaskToCompute):
header.subtask_timeout, task_to_compute.size)

self.subtask_to_task[subtask_id] = task_id
if task_to_compute.resources_options:
task_to_compute.resources_options['options']['size'] = \
task_to_compute.size
self.resources_options[subtask_id] = task_to_compute.resources_options
self.dump()
return True

Expand Down Expand Up @@ -270,10 +267,6 @@ def get_task_id_for_subtask(self, subtask_id: str) -> typing.Optional[str]:
def get_node_for_task_id(self, task_id) -> typing.Optional[str]:
return self.active_tasks[task_id].header.task_owner.key

def get_resources_options(self, subtask_id: str) -> \
typing.Optional[HyperdriveClientOptions]:
return self.resources_options.get(subtask_id)

def check_task_owner_by_subtask(self, task_owner_key_id, subtask_id):
task_id = self.subtask_to_task.get(subtask_id)
task = self.active_tasks.get(task_id)
Expand All @@ -295,7 +288,6 @@ def remove_old_tasks(self):
logger.info("Removing comp_task after deadline: %s", task_id)

for subtask_id in self.active_tasks[task_id].subtasks:
self.resources_options.pop(subtask_id, None)
self.subtask_to_task.pop(subtask_id, None)

self.active_tasks.pop(task_id, None)
Expand Down
4 changes: 2 additions & 2 deletions golem/task/taskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1139,9 +1139,9 @@ def get_task_preview(self, task_id, single=False):
task_type = self.task_types[task_type_name]
return task_type.get_preview(task, single=single)

def add_comp_task_request(self, theader, price):
def add_comp_task_request(self, theader, price, performance):
""" Add a header of a task which this node may try to compute """
self.comp_task_keeper.add_request(theader, price)
self.comp_task_keeper.add_request(theader, price, performance)

def __add_subtask_to_tasks_states(self, node_id,
ctd, price: int):
Expand Down
Loading

0 comments on commit 583788b

Please sign in to comment.