Skip to content
This repository was archived by the owner on Oct 31, 2023. It is now read-only.

Task.initialize() executed in background #4324

Merged
merged 9 commits into from
Jun 17, 2019
11 changes: 6 additions & 5 deletions apps/core/task/coretask.py
Original file line number Diff line number Diff line change
@@ -533,10 +533,7 @@ def __init__(self,

def build(self):
# pylint:disable=abstract-class-instantiated
task = self.TASK_CLASS(**self.get_task_kwargs())

task.initialize(self.dir_manager)
return task
return self.TASK_CLASS(**self.get_task_kwargs())

def get_task_kwargs(self, **kwargs):
kwargs['total_tasks'] = int(self.task_definition.subtasks_count)
@@ -556,8 +553,12 @@ def build_minimal_definition(cls, task_type: CoreTaskTypeInfo, dictionary) \
definition.options = task_type.options()
definition.task_type = task_type.name
definition.compute_on = dictionary.get('compute_on', 'cpu')
definition.resources = set(dictionary['resources'])
definition.subtasks_count = int(dictionary['subtasks_count'])
definition.concent_enabled = dictionary.get('concent_enabled', False)

if 'resources' in dictionary:
definition.resources = set(dictionary['resources'])

return definition

@classmethod
11 changes: 3 additions & 8 deletions apps/glambda/task/glambdatask.py
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@
from golem_messages.datastructures import p2p as dt_p2p

from apps.core.task.coretask import CoreTask, CoreTaskBuilder, \
CoreVerifier
CoreVerifier, CoreTaskTypeInfo
from apps.core.task.coretaskstate import TaskDefinition, Options
from apps.glambda.glambdaenvironment import GLambdaTaskEnvironment
from golem.resource.dirmanager import DirManager
@@ -246,14 +246,9 @@ def get_task_kwargs(self, **kwargs):
return kwargs

@classmethod
def build_minimal_definition(cls, task_type: TaskTypeInfo, dictionary):
definition = task_type.definition()
definition.task_type = task_type.name
definition.compute_on = dictionary.get('compute_on', 'cpu')
if 'resources' in dictionary:
definition.resources = set(dictionary['resources'])
def build_minimal_definition(cls, task_type: CoreTaskTypeInfo, dictionary):
definition = super().build_minimal_definition(task_type, dictionary)
options = dictionary['options']
definition.subtasks_count = int(dictionary['subtasks_count'])
definition.options.method = options['method']
definition.options.args = options['args']
definition.options.verification = options['verification']
4 changes: 0 additions & 4 deletions apps/rendering/task/renderingtask.py
Original file line number Diff line number Diff line change
@@ -296,10 +296,6 @@ def get_task_kwargs(self, **kwargs):
kwargs['total_tasks'] = self._calculate_total(self.DEFAULTS())
return kwargs

def build(self):
task = super(RenderingTaskBuilder, self).build()
return task

@classmethod
def build_dictionary(cls, definition):
parent = super(RenderingTaskBuilder, cls)
34 changes: 28 additions & 6 deletions golem/core/deferred.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,34 @@
from queue import Queue, Empty
from typing import Any, Callable, Dict, Tuple, List

from twisted.internet.defer import Deferred, TimeoutError
from twisted.internet import defer
from twisted.internet.task import deferLater
from twisted.internet.threads import deferToThread
from twisted.python.failure import Failure


class DeferredSeq:
Copy link

@Wiezzel Wiezzel Jun 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that this class is supposed to compute fn(...f3(f2(f1(None)))...) and do it asynchronously?

Copy link
Contributor Author

@mfranciszkiewicz mfranciszkiewicz Jun 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and by executing those functions in a separate thread.

Since deferred.callback does not allow a Deferred object to be passed as an argument (in a callback chain), this class simply yields on those before passing them further.

def __init__(self) -> None:
self._seq: List[Tuple[Callable, Tuple, Dict]] = []

def push(self, fn: Callable, *args, **kwargs) -> 'DeferredSeq':
self._seq.append((fn, args, kwargs))
return self

def execute(self) -> defer.Deferred:
return deferToThread(lambda: sync_wait(self._execute()))

@defer.inlineCallbacks
def _execute(self) -> Any:
result = None
for entry in self._seq:
fn, args, kwargs = entry
result = yield defer.maybeDeferred(fn, *args, **kwargs)
return result


def chain_function(deferred, fn, *args, **kwargs):
result = Deferred()
result = defer.Deferred()

def resolve(_):
fn(*args, **kwargs).addCallbacks(result.callback,
@@ -18,7 +40,7 @@ def resolve(_):


def sync_wait(deferred, timeout=10):
if not isinstance(deferred, Deferred):
if not isinstance(deferred, defer.Deferred):
return deferred

queue = Queue()
@@ -27,13 +49,13 @@ def sync_wait(deferred, timeout=10):
try:
result = queue.get(True, timeout)
except Empty:
raise TimeoutError("Command timed out")
raise defer.TimeoutError("Command timed out")

if isinstance(result, Failure):
result.raiseException()
return result


def call_later(delay: int, callable, *args, **kwargs) -> None:
def call_later(delay: int, fn, *args, **kwargs) -> None:
from twisted.internet import reactor
deferLater(reactor, delay, callable, *args, **kwargs)
deferLater(reactor, delay, fn, *args, **kwargs)
2 changes: 2 additions & 0 deletions golem/task/benchmarkmanager.py
Original file line number Diff line number Diff line change
@@ -63,6 +63,8 @@ def error_callback(err: Union[str, Exception]):
task_state.definition,
self.dir_manager)
task = builder.build()
task.initialize(builder.dir_manager)

br = BenchmarkRunner(
task=task,
root_path=self.dir_manager.root_path,
19 changes: 11 additions & 8 deletions golem/task/rpc.py
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
from golem.core import common
from golem.core import deferred as golem_deferred
from golem.core import simpleserializer
from golem.core.deferred import DeferredSeq
from golem.ethereum import exceptions as eth_exceptions
from golem.model import Actor
from golem.resource import resource
@@ -402,7 +403,7 @@ def enqueue_new_task(client, task, force=False) \

def _create_task_error(e, _self, task_dict, **_kwargs) \
-> typing.Tuple[None, typing.Union[str, typing.Dict]]:
logger.error("Cannot create task %r: %s", task_dict, e)
_self.client.task_manager.task_creation_failed(task_dict.get('id'), str(e))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task_dict never contains id. See #4850


if hasattr(e, 'to_dict'):
temp_dict = rpc_utils.int_to_string(e.to_dict())
@@ -416,7 +417,7 @@ def _restart_task_error(e, _self, task_id, **_kwargs):
return None, str(e)


def _restart_subtasks_error(e, _self, task_id, subtask_ids, *args, **_kwargs) \
def _restart_subtasks_error(e, _self, task_id, subtask_ids, *_args, **_kwargs) \
-> typing.Union[str, typing.Dict]:
logger.error("Failed to restart subtasks. task_id: %r, subtask_ids: %r, %s",
task_id, subtask_ids, e)
@@ -470,17 +471,19 @@ def create_task(self, task_dict, force=False) \
)
task_id = task.header.task_id

deferred = enqueue_new_task(self.client, task, force=force)
# We want to return quickly from create_task without waiting for
# deferred completion.
deferred.addErrback( # pylint: disable=no-member
DeferredSeq().push(
self.task_manager.initialize_task, task
).push(
enqueue_new_task, self.client, task, force=force
).execute().addErrback(
lambda failure: _create_task_error(
e=failure.value,
_self=self,
task_dict=task_dict,
force=force
),
)
)

return task_id, None

def _validate_enough_funds_to_pay_for_task(
@@ -675,7 +678,7 @@ def restart_frame_subtasks(
if not frame_subtasks:
logger.error('Frame restart failed, frame has no subtasks.'
'task_id=%r, frame=%r', task_id, frame)
return
return None

return self.restart_subtasks(task_id, list(frame_subtasks))

Loading