Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Task.initialize() executed in background (#4324)
Browse files Browse the repository at this point in the history
- task.rpc: run initialize_task and enqueue_new_task in another thread
- task.taskstate: new 'creating' and 'errorCreating' TaskStatus vals
- initial TaskStatus is 'creating', the task is remembered immediately
- when task creation fails, TaskStatus is set to 'errorCreating'
  • Loading branch information
mfranciszkiewicz authored Jun 17, 2019
1 parent c67732e commit 8397dcc
Show file tree
Hide file tree
Showing 16 changed files with 222 additions and 106 deletions.
11 changes: 6 additions & 5 deletions apps/core/task/coretask.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,10 +533,7 @@ def __init__(self,

def build(self):
# pylint:disable=abstract-class-instantiated
task = self.TASK_CLASS(**self.get_task_kwargs())

task.initialize(self.dir_manager)
return task
return self.TASK_CLASS(**self.get_task_kwargs())

def get_task_kwargs(self, **kwargs):
kwargs['total_tasks'] = int(self.task_definition.subtasks_count)
Expand All @@ -556,8 +553,12 @@ def build_minimal_definition(cls, task_type: CoreTaskTypeInfo, dictionary) \
definition.options = task_type.options()
definition.task_type = task_type.name
definition.compute_on = dictionary.get('compute_on', 'cpu')
definition.resources = set(dictionary['resources'])
definition.subtasks_count = int(dictionary['subtasks_count'])
definition.concent_enabled = dictionary.get('concent_enabled', False)

if 'resources' in dictionary:
definition.resources = set(dictionary['resources'])

return definition

@classmethod
Expand Down
11 changes: 3 additions & 8 deletions apps/glambda/task/glambdatask.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from golem_messages.datastructures import p2p as dt_p2p

from apps.core.task.coretask import CoreTask, CoreTaskBuilder, \
CoreVerifier
CoreVerifier, CoreTaskTypeInfo
from apps.core.task.coretaskstate import TaskDefinition, Options
from apps.glambda.glambdaenvironment import GLambdaTaskEnvironment
from golem.resource.dirmanager import DirManager
Expand Down Expand Up @@ -246,14 +246,9 @@ def get_task_kwargs(self, **kwargs):
return kwargs

@classmethod
def build_minimal_definition(cls, task_type: TaskTypeInfo, dictionary):
definition = task_type.definition()
definition.task_type = task_type.name
definition.compute_on = dictionary.get('compute_on', 'cpu')
if 'resources' in dictionary:
definition.resources = set(dictionary['resources'])
def build_minimal_definition(cls, task_type: CoreTaskTypeInfo, dictionary):
definition = super().build_minimal_definition(task_type, dictionary)
options = dictionary['options']
definition.subtasks_count = int(dictionary['subtasks_count'])
definition.options.method = options['method']
definition.options.args = options['args']
definition.options.verification = options['verification']
Expand Down
4 changes: 0 additions & 4 deletions apps/rendering/task/renderingtask.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,6 @@ def get_task_kwargs(self, **kwargs):
kwargs['total_tasks'] = self._calculate_total(self.DEFAULTS())
return kwargs

def build(self):
task = super(RenderingTaskBuilder, self).build()
return task

@classmethod
def build_dictionary(cls, definition):
parent = super(RenderingTaskBuilder, cls)
Expand Down
34 changes: 28 additions & 6 deletions golem/core/deferred.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,34 @@
from queue import Queue, Empty
from typing import Any, Callable, Dict, Tuple, List

from twisted.internet.defer import Deferred, TimeoutError
from twisted.internet import defer
from twisted.internet.task import deferLater
from twisted.internet.threads import deferToThread
from twisted.python.failure import Failure


class DeferredSeq:
def __init__(self) -> None:
self._seq: List[Tuple[Callable, Tuple, Dict]] = []

def push(self, fn: Callable, *args, **kwargs) -> 'DeferredSeq':
self._seq.append((fn, args, kwargs))
return self

def execute(self) -> defer.Deferred:
return deferToThread(lambda: sync_wait(self._execute()))

@defer.inlineCallbacks
def _execute(self) -> Any:
result = None
for entry in self._seq:
fn, args, kwargs = entry
result = yield defer.maybeDeferred(fn, *args, **kwargs)
return result


def chain_function(deferred, fn, *args, **kwargs):
result = Deferred()
result = defer.Deferred()

def resolve(_):
fn(*args, **kwargs).addCallbacks(result.callback,
Expand All @@ -18,7 +40,7 @@ def resolve(_):


def sync_wait(deferred, timeout=10):
if not isinstance(deferred, Deferred):
if not isinstance(deferred, defer.Deferred):
return deferred

queue = Queue()
Expand All @@ -27,13 +49,13 @@ def sync_wait(deferred, timeout=10):
try:
result = queue.get(True, timeout)
except Empty:
raise TimeoutError("Command timed out")
raise defer.TimeoutError("Command timed out")

if isinstance(result, Failure):
result.raiseException()
return result


def call_later(delay: int, callable, *args, **kwargs) -> None:
def call_later(delay: int, fn, *args, **kwargs) -> None:
from twisted.internet import reactor
deferLater(reactor, delay, callable, *args, **kwargs)
deferLater(reactor, delay, fn, *args, **kwargs)
2 changes: 2 additions & 0 deletions golem/task/benchmarkmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def error_callback(err: Union[str, Exception]):
task_state.definition,
self.dir_manager)
task = builder.build()
task.initialize(builder.dir_manager)

br = BenchmarkRunner(
task=task,
root_path=self.dir_manager.root_path,
Expand Down
19 changes: 11 additions & 8 deletions golem/task/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from golem.core import common
from golem.core import deferred as golem_deferred
from golem.core import simpleserializer
from golem.core.deferred import DeferredSeq
from golem.ethereum import exceptions as eth_exceptions
from golem.model import Actor
from golem.resource import resource
Expand Down Expand Up @@ -402,7 +403,7 @@ def enqueue_new_task(client, task, force=False) \

def _create_task_error(e, _self, task_dict, **_kwargs) \
-> typing.Tuple[None, typing.Union[str, typing.Dict]]:
logger.error("Cannot create task %r: %s", task_dict, e)
_self.client.task_manager.task_creation_failed(task_dict.get('id'), str(e))

if hasattr(e, 'to_dict'):
temp_dict = rpc_utils.int_to_string(e.to_dict())
Expand All @@ -416,7 +417,7 @@ def _restart_task_error(e, _self, task_id, **_kwargs):
return None, str(e)


def _restart_subtasks_error(e, _self, task_id, subtask_ids, *args, **_kwargs) \
def _restart_subtasks_error(e, _self, task_id, subtask_ids, *_args, **_kwargs) \
-> typing.Union[str, typing.Dict]:
logger.error("Failed to restart subtasks. task_id: %r, subtask_ids: %r, %s",
task_id, subtask_ids, e)
Expand Down Expand Up @@ -470,17 +471,19 @@ def create_task(self, task_dict, force=False) \
)
task_id = task.header.task_id

deferred = enqueue_new_task(self.client, task, force=force)
# We want to return quickly from create_task without waiting for
# deferred completion.
deferred.addErrback( # pylint: disable=no-member
DeferredSeq().push(
self.task_manager.initialize_task, task
).push(
enqueue_new_task, self.client, task, force=force
).execute().addErrback(
lambda failure: _create_task_error(
e=failure.value,
_self=self,
task_dict=task_dict,
force=force
),
)
)

return task_id, None

def _validate_enough_funds_to_pay_for_task(
Expand Down Expand Up @@ -675,7 +678,7 @@ def restart_frame_subtasks(
if not frame_subtasks:
logger.error('Frame restart failed, frame has no subtasks.'
'task_id=%r, frame=%r', task_id, frame)
return
return None

return self.restart_subtasks(task_id, list(frame_subtasks))

Expand Down
Loading

0 comments on commit 8397dcc

Please sign in to comment.