From 8397dccf13cbb7f6ca279b988ef470433893ff0e Mon Sep 17 00:00:00 2001 From: Marek Franciszkiewicz Date: Mon, 17 Jun 2019 22:47:43 +0200 Subject: [PATCH] Task.initialize() executed in background (#4324) - task.rpc: run initialize_task and enqueue_new_task in another thread - task.taskstate: new 'creating' and 'errorCreating' TaskStatus vals - initial TaskStatus is 'creating', the task is remembered immediately - when task creation fails, TaskStatus is set to 'errorCreating' --- apps/core/task/coretask.py | 11 +-- apps/glambda/task/glambdatask.py | 11 +-- apps/rendering/task/renderingtask.py | 4 - golem/core/deferred.py | 34 +++++-- golem/task/benchmarkmanager.py | 2 + golem/task/rpc.py | 19 ++-- golem/task/taskmanager.py | 88 ++++++++++++------ golem/task/taskstate.py | 30 +++++-- .../benchmark/test_blenderbenchmark.py | 1 + .../blender/task/test_blenderrendertask.py | 1 + tests/golem/core/test_deferred.py | 89 +++++++++++++------ .../golem/docker/test_docker_blender_task.py | 1 + tests/golem/docker/test_docker_task.py | 3 +- tests/golem/task/test_rpc.py | 7 +- tests/golem/task/test_taskmanager.py | 22 +++-- tests/golem/task/test_taskserver.py | 5 +- 16 files changed, 222 insertions(+), 106 deletions(-) diff --git a/apps/core/task/coretask.py b/apps/core/task/coretask.py index fba7faa7fe..f2540ccb5a 100644 --- a/apps/core/task/coretask.py +++ b/apps/core/task/coretask.py @@ -533,10 +533,7 @@ def __init__(self, def build(self): # pylint:disable=abstract-class-instantiated - task = self.TASK_CLASS(**self.get_task_kwargs()) - - task.initialize(self.dir_manager) - return task + return self.TASK_CLASS(**self.get_task_kwargs()) def get_task_kwargs(self, **kwargs): kwargs['total_tasks'] = int(self.task_definition.subtasks_count) @@ -556,8 +553,12 @@ def build_minimal_definition(cls, task_type: CoreTaskTypeInfo, dictionary) \ definition.options = task_type.options() definition.task_type = task_type.name definition.compute_on = dictionary.get('compute_on', 'cpu') - definition.resources = set(dictionary['resources']) definition.subtasks_count = int(dictionary['subtasks_count']) + definition.concent_enabled = dictionary.get('concent_enabled', False) + + if 'resources' in dictionary: + definition.resources = set(dictionary['resources']) + return definition @classmethod diff --git a/apps/glambda/task/glambdatask.py b/apps/glambda/task/glambdatask.py index b563f78005..e25f008145 100644 --- a/apps/glambda/task/glambdatask.py +++ b/apps/glambda/task/glambdatask.py @@ -9,7 +9,7 @@ from golem_messages.datastructures import p2p as dt_p2p from apps.core.task.coretask import CoreTask, CoreTaskBuilder, \ - CoreVerifier + CoreVerifier, CoreTaskTypeInfo from apps.core.task.coretaskstate import TaskDefinition, Options from apps.glambda.glambdaenvironment import GLambdaTaskEnvironment from golem.resource.dirmanager import DirManager @@ -246,14 +246,9 @@ def get_task_kwargs(self, **kwargs): return kwargs @classmethod - def build_minimal_definition(cls, task_type: TaskTypeInfo, dictionary): - definition = task_type.definition() - definition.task_type = task_type.name - definition.compute_on = dictionary.get('compute_on', 'cpu') - if 'resources' in dictionary: - definition.resources = set(dictionary['resources']) + def build_minimal_definition(cls, task_type: CoreTaskTypeInfo, dictionary): + definition = super().build_minimal_definition(task_type, dictionary) options = dictionary['options'] - definition.subtasks_count = int(dictionary['subtasks_count']) definition.options.method = options['method'] definition.options.args = options['args'] definition.options.verification = options['verification'] diff --git a/apps/rendering/task/renderingtask.py b/apps/rendering/task/renderingtask.py index 66309ea535..db45cb201e 100644 --- a/apps/rendering/task/renderingtask.py +++ b/apps/rendering/task/renderingtask.py @@ -296,10 +296,6 @@ def get_task_kwargs(self, **kwargs): kwargs['total_tasks'] = self._calculate_total(self.DEFAULTS()) return kwargs - def build(self): - task = super(RenderingTaskBuilder, self).build() - return task - @classmethod def build_dictionary(cls, definition): parent = super(RenderingTaskBuilder, cls) diff --git a/golem/core/deferred.py b/golem/core/deferred.py index eb49fcefee..a87f8135e4 100644 --- a/golem/core/deferred.py +++ b/golem/core/deferred.py @@ -1,12 +1,34 @@ from queue import Queue, Empty +from typing import Any, Callable, Dict, Tuple, List -from twisted.internet.defer import Deferred, TimeoutError +from twisted.internet import defer from twisted.internet.task import deferLater +from twisted.internet.threads import deferToThread from twisted.python.failure import Failure +class DeferredSeq: + def __init__(self) -> None: + self._seq: List[Tuple[Callable, Tuple, Dict]] = [] + + def push(self, fn: Callable, *args, **kwargs) -> 'DeferredSeq': + self._seq.append((fn, args, kwargs)) + return self + + def execute(self) -> defer.Deferred: + return deferToThread(lambda: sync_wait(self._execute())) + + @defer.inlineCallbacks + def _execute(self) -> Any: + result = None + for entry in self._seq: + fn, args, kwargs = entry + result = yield defer.maybeDeferred(fn, *args, **kwargs) + return result + + def chain_function(deferred, fn, *args, **kwargs): - result = Deferred() + result = defer.Deferred() def resolve(_): fn(*args, **kwargs).addCallbacks(result.callback, @@ -18,7 +40,7 @@ def resolve(_): def sync_wait(deferred, timeout=10): - if not isinstance(deferred, Deferred): + if not isinstance(deferred, defer.Deferred): return deferred queue = Queue() @@ -27,13 +49,13 @@ def sync_wait(deferred, timeout=10): try: result = queue.get(True, timeout) except Empty: - raise TimeoutError("Command timed out") + raise defer.TimeoutError("Command timed out") if isinstance(result, Failure): result.raiseException() return result -def call_later(delay: int, callable, *args, **kwargs) -> None: +def call_later(delay: int, fn, *args, **kwargs) -> None: from twisted.internet import reactor - deferLater(reactor, delay, callable, *args, **kwargs) + deferLater(reactor, delay, fn, *args, **kwargs) diff --git a/golem/task/benchmarkmanager.py b/golem/task/benchmarkmanager.py index 9e2eba84b3..907bfc0a29 100644 --- a/golem/task/benchmarkmanager.py +++ b/golem/task/benchmarkmanager.py @@ -63,6 +63,8 @@ def error_callback(err: Union[str, Exception]): task_state.definition, self.dir_manager) task = builder.build() + task.initialize(builder.dir_manager) + br = BenchmarkRunner( task=task, root_path=self.dir_manager.root_path, diff --git a/golem/task/rpc.py b/golem/task/rpc.py index 459dcac126..5cada6c98d 100644 --- a/golem/task/rpc.py +++ b/golem/task/rpc.py @@ -19,6 +19,7 @@ from golem.core import common from golem.core import deferred as golem_deferred from golem.core import simpleserializer +from golem.core.deferred import DeferredSeq from golem.ethereum import exceptions as eth_exceptions from golem.model import Actor from golem.resource import resource @@ -402,7 +403,7 @@ def enqueue_new_task(client, task, force=False) \ def _create_task_error(e, _self, task_dict, **_kwargs) \ -> typing.Tuple[None, typing.Union[str, typing.Dict]]: - logger.error("Cannot create task %r: %s", task_dict, e) + _self.client.task_manager.task_creation_failed(task_dict.get('id'), str(e)) if hasattr(e, 'to_dict'): temp_dict = rpc_utils.int_to_string(e.to_dict()) @@ -416,7 +417,7 @@ def _restart_task_error(e, _self, task_id, **_kwargs): return None, str(e) -def _restart_subtasks_error(e, _self, task_id, subtask_ids, *args, **_kwargs) \ +def _restart_subtasks_error(e, _self, task_id, subtask_ids, *_args, **_kwargs) \ -> typing.Union[str, typing.Dict]: logger.error("Failed to restart subtasks. task_id: %r, subtask_ids: %r, %s", task_id, subtask_ids, e) @@ -470,17 +471,19 @@ def create_task(self, task_dict, force=False) \ ) task_id = task.header.task_id - deferred = enqueue_new_task(self.client, task, force=force) - # We want to return quickly from create_task without waiting for - # deferred completion. - deferred.addErrback( # pylint: disable=no-member + DeferredSeq().push( + self.task_manager.initialize_task, task + ).push( + enqueue_new_task, self.client, task, force=force + ).execute().addErrback( lambda failure: _create_task_error( e=failure.value, _self=self, task_dict=task_dict, force=force - ), + ) ) + return task_id, None def _validate_enough_funds_to_pay_for_task( @@ -675,7 +678,7 @@ def restart_frame_subtasks( if not frame_subtasks: logger.error('Frame restart failed, frame has no subtasks.' 'task_id=%r, frame=%r', task_id, frame) - return + return None return self.restart_subtasks(task_id, list(frame_subtasks)) diff --git a/golem/task/taskmanager.py b/golem/task/taskmanager.py index 5993c3cfd9..f54f1b565f 100644 --- a/golem/task/taskmanager.py +++ b/golem/task/taskmanager.py @@ -1,3 +1,5 @@ +# pylint: disable=too-many-lines + import logging import os import pickle @@ -69,6 +71,8 @@ class TaskManager(TaskEventListener): """ Keeps and manages information about requested tasks Requestor uses TaskManager to assign task to providers """ + # pylint: disable=too-many-public-methods + handle_task_key_error = HandleKeyError(log_task_key_error) handle_subtask_key_error = HandleKeyError(log_subtask_key_error) handle_generic_key_error = HandleForwardedError(KeyError, @@ -87,6 +91,7 @@ def __init__( apps_manager=AppsManager(), finished_cb=None, ) -> None: + # pylint: disable=too-many-instance-attributes super().__init__() self.apps_manager = apps_manager @@ -122,8 +127,15 @@ def __init__( resource_manager ) - self.activeStatus = [TaskStatus.computing, TaskStatus.starting, - TaskStatus.waiting] + self.CREATING_STATUS = frozenset([ + TaskStatus.creating, + TaskStatus.errorCreating, + ]) + self.ACTIVE_STATUS = frozenset([ + TaskStatus.computing, + TaskStatus.starting, + TaskStatus.waiting, + ]) self.FINISHED_STATUS = frozenset([ TaskStatus.finished, TaskStatus.aborted, @@ -164,9 +176,17 @@ def create_task(self, dictionary, minimal=False): minimal) definition.task_id = CoreTask.create_task_id(self.keys_auth.public_key) definition.concent_enabled = dictionary.get('concent_enabled', False) - builder = builder_type(self.node, definition, self.dir_manager) - return builder.build() + task = builder_type(self.node, definition, self.dir_manager).build() + task_id = task.header.task_id + + logger.info("Creating task. type=%r, id=%s", type(task), task_id) + self.tasks[task_id] = task + self.tasks_states[task_id] = TaskState(task) + return task + + def initialize_task(self, task: Task): + task.initialize(self.dir_manager) def get_task_definition_dict(self, task: Task): if isinstance(task, dict): @@ -177,25 +197,25 @@ def get_task_definition_dict(self, task: Task): def add_new_task(self, task: Task, estimated_fee: int = 0) -> None: task_id = task.header.task_id - if task_id in self.tasks: - raise RuntimeError("Task {} has been already added" - .format(task.header.task_id)) + task_state = self.tasks_states.get(task_id) + + if not task_state: + task_state = TaskState(task) + self.tasks[task_id] = task + self.tasks_states[task_id] = task_state + + if task_state.status is not TaskStatus.creating: + raise RuntimeError("Task {} has already been added".format(task_id)) task.header.task_owner = self.node self.sign_task_header(task.header) task.register_listener(self) - ts = TaskState() - ts.status = TaskStatus.notStarted - ts.outputs = task.get_output_names() - ts.subtasks_count = task.get_total_tasks() - ts.time_started = time.time() - ts.estimated_cost = task.price - ts.estimated_fee = estimated_fee + task_state.status = TaskStatus.notStarted + task_state.time_started = time.time() + task_state.estimated_fee = estimated_fee - self.tasks[task_id] = task - self.tasks_states[task_id] = ts logger.info("Task %s added", task_id) self._create_task_output_dir(task.task_definition) @@ -204,6 +224,14 @@ def add_new_task(self, task: Task, estimated_fee: int = 0) -> None: op=TaskOp.CREATED, persist=False) + @handle_task_key_error + def task_creation_failed(self, task_id: str, reason: str) -> None: + logger.error("Cannot create task. id=%s : %s", task_id, reason) + + task_state = self.tasks_states[task_id] + task_state.status = TaskStatus.errorCreating + task_state.status_message = reason + @handle_task_key_error def increase_task_mask(self, task_id: str, num_bits: int = 1) -> None: """ Increase mask for given task i.e. make it more restrictive """ @@ -250,7 +278,7 @@ def dump_task(self, task_id: str) -> None: with filepath.open('wb') as f: pickle.dump(data, f, protocol=2) logger.debug('TASK %s DUMPED in %r', task_id, filepath) - except Exception as e: + except Exception: # pylint: disable=broad-except logger.exception( 'DUMP ERROR task_id: %r task: %r state: %r', task_id, self.tasks.get(task_id, ''), @@ -347,7 +375,7 @@ def restore_tasks(self) -> None: def resources_send(self, task_id): self.tasks_states[task_id].status = TaskStatus.waiting self.notice_task_updated(task_id) - logger.info("Resources for task {} sent".format(task_id)) + logger.info("Resources for task sent. id=%s", task_id) def got_wants_to_compute(self, task_id: str): @@ -366,12 +394,16 @@ def got_wants_to_compute(self, op=TaskOp.WORK_OFFER_RECEIVED, persist=False) + def task_being_created(self, task_id: str) -> bool: + task_status = self.tasks_states[task_id].status + return task_status in self.CREATING_STATUS + def task_finished(self, task_id: str) -> bool: task_status = self.tasks_states[task_id].status return task_status in self.FINISHED_STATUS def task_needs_computation(self, task_id: str) -> bool: - if self.task_finished(task_id): + if self.task_being_created(task_id) or self.task_finished(task_id): task_status = self.tasks_states[task_id].status logger.info( 'task is not active: %(task_id)s, status: %(task_status)s', @@ -387,7 +419,7 @@ def task_needs_computation(self, task_id: str) -> bool: return False return True - def get_next_subtask( + def get_next_subtask( # pylint: disable=too-many-arguments self, node_id, task_id, estimated_performance, price, max_resource_size, max_memory_size): """ Assign next subtask from task to node with given @@ -404,6 +436,7 @@ def get_next_subtask( or None. It is recommended to call is_my_task and should_wait_for_node before this to find the reason why the task is not able to be picked up """ + # pylint: disable=too-many-return-statements logger.debug( 'get_next_subtask(%r, %r, %r, %r, %r, %r)', node_id, task_id, estimated_performance, price, @@ -669,7 +702,7 @@ def get_tasks_headers(self): ret = [] for tid, task in self.tasks.items(): status = self.tasks_states[tid].status - if task.needs_computation() and status in self.activeStatus: + if task.needs_computation() and status in self.ACTIVE_STATUS: ret.append(task.header) return ret @@ -678,9 +711,9 @@ def get_trust_mod(self, subtask_id): if subtask_id in self.subtask2task_mapping: task_id = self.subtask2task_mapping[subtask_id] return self.tasks[task_id].get_trust_mod(subtask_id) - else: - logger.error("This is not my subtask {}".format(subtask_id)) - return 0 + + logger.error("This is not my subtask. id=%s", subtask_id) + return 0 def update_task_signatures(self): for task in list(self.tasks.values()): @@ -694,8 +727,7 @@ def verify_subtask(self, subtask_id): if subtask_id in self.subtask2task_mapping: task_id = self.subtask2task_mapping[subtask_id] return self.tasks[task_id].verify_subtask(subtask_id) - else: - return False + return False def get_node_id_for_subtask(self, subtask_id): if subtask_id not in self.subtask2task_mapping: @@ -748,7 +780,7 @@ def verification_finished_(): verification_finished() - if self.tasks_states[task_id].status in self.activeStatus: + if self.tasks_states[task_id].status in self.ACTIVE_STATUS: if not self.tasks[task_id].finished_computation(): self.tasks_states[task_id].status = TaskStatus.computing else: @@ -864,7 +896,7 @@ def check_timeouts(self): nodes_with_timeouts = [] for t in list(self.tasks.values()): th = t.header - if self.tasks_states[th.task_id].status not in self.activeStatus: + if self.tasks_states[th.task_id].status not in self.ACTIVE_STATUS: continue cur_time = int(get_timestamp_utc()) # Check subtask timeout diff --git a/golem/task/taskstate.py b/golem/task/taskstate.py index 0563d72c51..f4ad5e9727 100644 --- a/golem/task/taskstate.py +++ b/golem/task/taskstate.py @@ -1,33 +1,42 @@ from enum import Enum, auto import functools import time -from typing import Dict +from typing import Dict, Optional from golem_messages import datastructures from golem_messages import validators -class TaskState(object): - def __init__(self): - self.status = TaskStatus.notStarted +class TaskState: + # pylint: disable=too-many-instance-attributes + + def __init__(self, task=None) -> None: + self.status = TaskStatus.creating + self.status_message: Optional[str] = None self.progress = 0.0 self.remaining_time = 0 self.elapsed_time = 0 - self.time_started = 0 + self.time_started = 0.0 self.payment_booked = False self.payment_settled = False - self.outputs = [] - self.subtasks_count = 0 self.subtask_states: Dict[str, SubtaskState] = {} self.resource_hash = None self.package_hash = None self.package_path = None self.package_size = None - self.extra_data = {} + self.extra_data: Dict = {} self.last_update_time = time.time() - self.estimated_cost = 0 self.estimated_fee = 0 + if task: + self.outputs = task.get_output_names() + self.subtasks_count = task.get_total_tasks() + self.estimated_cost = task.price + else: + self.outputs = [] + self.subtasks_count = 0 + self.estimated_cost = 0 + def __setattr__(self, key, value): super().__setattr__(key, value) # Set last update time when changing status to other than 'restarted' @@ -44,6 +53,7 @@ def to_dictionary(self): 'time_remaining': self.remaining_time, 'last_updated': getattr(self, 'last_update_time', None), 'status': self.status.value, + 'status_message': self.status_message, 'estimated_cost': getattr(self, 'estimated_cost', None), 'estimated_fee': getattr(self, 'estimated_fee', None) } @@ -142,6 +152,8 @@ def serialize_status(cls, value: SubtaskStatus): class TaskStatus(Enum): + creating = "Creating" + errorCreating = "Error creating" notStarted = "Not started" creatingDeposit = "Creating the deposit" sending = "Sending" diff --git a/tests/apps/blender/benchmark/test_blenderbenchmark.py b/tests/apps/blender/benchmark/test_blenderbenchmark.py index 4568bd6d8e..4be17d2976 100644 --- a/tests/apps/blender/benchmark/test_blenderbenchmark.py +++ b/tests/apps/blender/benchmark/test_blenderbenchmark.py @@ -65,6 +65,7 @@ def test_run(self): task_definition, dir_manager ).build() + task.initialize(dir_manager) success = mock.MagicMock() error = mock.MagicMock() diff --git a/tests/apps/blender/task/test_blenderrendertask.py b/tests/apps/blender/task/test_blenderrendertask.py index afe593e801..379445ba3b 100644 --- a/tests/apps/blender/task/test_blenderrendertask.py +++ b/tests/apps/blender/task/test_blenderrendertask.py @@ -654,6 +654,7 @@ def test_build(self): dir_manager=DirManager( self.tempdir)) blender_task = builder.build() + blender_task.initialize(builder.dir_manager) self.assertIsInstance(blender_task, BlenderRenderTask) def test_build_dictionary_samples(self): diff --git a/tests/golem/core/test_deferred.py b/tests/golem/core/test_deferred.py index e569b39b91..5d565e4213 100644 --- a/tests/golem/core/test_deferred.py +++ b/tests/golem/core/test_deferred.py @@ -1,51 +1,88 @@ import unittest +from unittest import mock -from twisted.internet.defer import Deferred +from twisted.internet.defer import Deferred, succeed, fail from twisted.python.failure import Failure -from golem.core.deferred import chain_function +from golem.core.deferred import chain_function, DeferredSeq + + +@mock.patch('golem.core.deferred.deferToThread', lambda x: succeed(x())) +@mock.patch('twisted.internet.reactor', mock.Mock(), create=True) +class TestDeferredSeq(unittest.TestCase): + + def test_init_empty(self): + assert not DeferredSeq()._seq + + def test_init_with_functions(self): + def fn_1(): + pass + + def fn_2(): + pass + + assert DeferredSeq().push(fn_1).push(fn_2)._seq == [ + (fn_1, (), {}), + (fn_2, (), {}), + ] + + @mock.patch('golem.core.deferred.DeferredSeq._execute') + def test_execute_empty(self, execute): + deferred_seq = DeferredSeq() + with mock.patch('golem.core.deferred.DeferredSeq._execute', + wraps=deferred_seq._execute): + deferred_seq.execute() + assert execute.called + + def test_execute_functions(self): + fn_1, fn_2 = mock.Mock(), mock.Mock() + + DeferredSeq().push(fn_1).push(fn_2).execute() + assert fn_1.called + assert fn_2.called + + def test_execute_interrupted(self): + fn_1, fn_2, fn_4 = mock.Mock(), mock.Mock(), mock.Mock() + + def fn_3(*_): + raise Exception + + def def2t(f, *args, **kwargs) -> Deferred: + try: + return succeed(f(*args, **kwargs)) + except Exception as exc: # pylint: disable=broad-except + return fail(exc) + + with mock.patch('golem.core.deferred.deferToThread', def2t): + DeferredSeq().push(fn_1).push(fn_2).push(fn_3).push(fn_4).execute() + + assert fn_1.called + assert fn_2.called + assert not fn_4.called class TestChainFunction(unittest.TestCase): def test_callback(self): - deferred = Deferred() - deferred.callback(True) - - def fn(): - d = Deferred() - d.callback(True) - return d + deferred = succeed(True) + result = chain_function(deferred, lambda: succeed(True)) - result = chain_function(deferred, fn) assert result.called assert result.result assert not isinstance(result, Failure) def test_main_errback(self): - deferred = Deferred() - deferred.errback(Exception()) + deferred = fail(Exception()) + result = chain_function(deferred, lambda: succeed(True)) - def fn(): - d = Deferred() - d.callback(True) - return d - - result = chain_function(deferred, fn) assert result.called assert result.result assert isinstance(result.result, Failure) def test_fn_errback(self): - deferred = Deferred() - deferred.callback(True) - - def fn(): - d = Deferred() - d.errback(Exception()) - return d + deferred = succeed(True) + result = chain_function(deferred, lambda: fail(Exception())) - result = chain_function(deferred, fn) assert result.called assert result.result assert isinstance(result.result, Failure) diff --git a/tests/golem/docker/test_docker_blender_task.py b/tests/golem/docker/test_docker_blender_task.py index cf56eb7a19..42100d3888 100644 --- a/tests/golem/docker/test_docker_blender_task.py +++ b/tests/golem/docker/test_docker_blender_task.py @@ -91,6 +91,7 @@ def test_build(self): dir_manager, ) task = builder.build() + task.initialize(builder.dir_manager) assert isinstance(task, BlenderRenderTask) assert not task.compositing assert not task.use_frames diff --git a/tests/golem/docker/test_docker_task.py b/tests/golem/docker/test_docker_task.py index 6be9a6de25..8c24cf4a03 100644 --- a/tests/golem/docker/test_docker_task.py +++ b/tests/golem/docker/test_docker_task.py @@ -101,6 +101,7 @@ def _get_test_task(self) -> Task: dir_manager=DirManager(self.tempdir) ) task = task_builder.build() + task.initialize(task_builder.dir_manager) task.__class__._update_task_preview = lambda self_: () task.max_pending_client_results = 5 return task @@ -111,7 +112,7 @@ def _run_task(self, task: Task, timeout: int = 60 * 5, *_) \ -> Optional[DockerTaskThread]: task_id = task.header.task_id node_id = '0xdeadbeef' - extra_data = task.query_extra_data(1.0, 0, node_id) + extra_data = task.query_extra_data(1.0, node_id, 'node_name') ctd = extra_data.ctd ctd['deadline'] = timeout_to_deadline(timeout) diff --git a/tests/golem/task/test_rpc.py b/tests/golem/task/test_rpc.py index c7bf9b9262..f063f23277 100644 --- a/tests/golem/task/test_rpc.py +++ b/tests/golem/task/test_rpc.py @@ -122,7 +122,11 @@ def test_create_task(self, *_): t = dummytaskstate.DummyTaskDefinition() t.name = "test" - result = self.provider.create_task(t.to_dict()) + def execute(f, *args, **kwargs): + return defer.succeed(f(*args, **kwargs)) + + with mock.patch('golem.core.deferred.deferToThread', execute): + result = self.provider.create_task(t.to_dict()) rpc.enqueue_new_task.assert_called() self.assertEqual(result, ('task_id', None)) @@ -748,6 +752,7 @@ def setUp(self): rpc.enqueue_new_task(self.client, self.task), ) + @mock.patch('twisted.internet.reactor', mock.Mock()) @mock.patch("golem.task.rpc.prepare_and_validate_task_dict") def test_create_task(self, mock_method, *_): t = dummytaskstate.DummyTaskDefinition() diff --git a/tests/golem/task/test_taskmanager.py b/tests/golem/task/test_taskmanager.py index 390c76e8b4..6f4d9fc45a 100644 --- a/tests/golem/task/test_taskmanager.py +++ b/tests/golem/task/test_taskmanager.py @@ -193,6 +193,8 @@ def _get_test_dummy_task(self, task_id): dtb = DummyTaskBuilder(dt_p2p_factory.Node(node_name="MyNode"), tdd, dm) dummy_task = dtb.build() + dummy_task.initialize(dtb.dir_manager) + header = self._get_task_header(task_id=task_id, timeout=120, subtask_timeout=120) dummy_task.header = header @@ -328,8 +330,7 @@ def test_get_next_subtask(self, *_): cached_node.node_field.node_name ) - task_state.status = self.tm.activeStatus[0] - + task_state.status = TaskStatus.computing wrong_task = not self.tm.is_my_task("xyz") subtask = self.tm.get_next_subtask( "DEF", "xyz", 1000, 10, 1, 10) @@ -837,7 +838,7 @@ def test_check_timeouts(self, *_): self.tm.start_task(t.header.task_id) self.assertIn( self.tm.tasks_states["xyz"].status, - self.tm.activeStatus, + self.tm.ACTIVE_STATUS, ) with freeze_time(start_time + datetime.timedelta(seconds=2)): self.tm.check_timeouts() @@ -1357,7 +1358,7 @@ def test_check_timeouts_removes_output_directory(self, mock_get_dir, *_): self.tm.start_task(task.header.task_id) self.assertIn( self.tm.tasks_states['xyz'].status, - self.tm.activeStatus, + self.tm.ACTIVE_STATUS, ) with freeze_time(start_time + datetime.timedelta(seconds=2)): @@ -1486,7 +1487,6 @@ def verify(_): self.assertEqual(new_subtask_state.stderr, 'stderr') self.assertEqual(new_subtask_state.results, ['result']) - patch.object(self.tm, 'notice_task_updated').start() deferred = self.tm._copy_subtask_results( old_task=old_task, @@ -1549,7 +1549,6 @@ def setUp(self): definition.max_price = 1 * 10 ** 18 definition.resolution = [1920, 1080] definition.resources = [str(uuid.uuid4()) for _ in range(5)] - #definition.output_file = os.path.join(self.tempdir, 'somefile') definition.main_scene_file = dummy_path definition.options.frames = [1] self.task = BlenderRenderTask( @@ -1570,5 +1569,14 @@ def test_task_doesnt_need_computation(self, *_): self.task.last_task = self.task.total_tasks self.assertFalse(self.tm.task_needs_computation(self.task_id)) - def test_needs_computation(self, *_): + def test_needs_computation_while_creating(self, *_): + self.assertFalse(self.tm.task_needs_computation(self.task_id)) + + def test_needs_computation_when_added(self, *_): + keys_auth = Mock() + keys_auth._private_key = b'a' * 32 + keys_auth.sign.return_value = 'sig' + + self.tm.keys_auth = keys_auth + self.tm.add_new_task(self.task) self.assertTrue(self.tm.task_needs_computation(self.task_id)) diff --git a/tests/golem/task/test_taskserver.py b/tests/golem/task/test_taskserver.py index 9c86f9526a..43e3088426 100644 --- a/tests/golem/task/test_taskserver.py +++ b/tests/golem/task/test_taskserver.py @@ -40,7 +40,7 @@ WaitingTaskFailure, WaitingTaskResult, ) -from golem.task.taskstate import TaskState, TaskOp +from golem.task.taskstate import TaskState, TaskOp, TaskStatus from golem.tools.assertlogs import LogTestCase from golem.tools.testwithreactor import TestDatabaseWithReactor @@ -872,8 +872,7 @@ def test_results(self, trust, *_): ts.task_manager.keys_auth._private_key = b'a' * 32 ts.task_manager.add_new_task(task_mock) - ts.task_manager.tasks_states[task_id].status = \ - ts.task_manager.activeStatus[0] + ts.task_manager.tasks_states[task_id].status = TaskStatus.computing subtask = ts.task_manager.get_next_subtask( "DEF", task_id,