Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Handle task failure during creation
Browse files Browse the repository at this point in the history
  • Loading branch information
jiivan committed Jan 22, 2020
1 parent c2643f6 commit 65df132
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 5 deletions.
18 changes: 15 additions & 3 deletions golem/task/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,20 @@ def _create_task_error(e, _self, task_dict, *args, **_kwargs) \
return None, str(e)


def _restart_task_error(e, _self, task_id, *args, **_kwargs) \
-> typing.Tuple[None, str]:
def _restart_task_error(
e: Exception,
self: 'ClientProvider',
task_id: str,
*_args, **kwargs
) -> typing.Tuple[None, str]:
logger.error("Cannot restart task %r: %s", task_id, e)
try:
new_task = kwargs['new_task']
self.task_manager.put_task_in_failed_state(new_task.task_id)
except KeyError:
logger.debug('No new task given')
except Exception: # pylint: disable=broad-except
logger.exception("Can't put task in failed state. task_id=%r", task_id)

if hasattr(e, 'to_dict'):
return None, rpc_utils.int_to_string(e.to_dict())
Expand Down Expand Up @@ -776,8 +787,9 @@ def restart_legacy_task(
deferred.addErrback(
lambda failure: _restart_task_error(
e=failure.value,
_self=self,
self=self,
task_id=task_id,
new_task=new_task,
)
)
self.task_manager.put_task_in_restarted_state(task_id)
Expand Down
33 changes: 31 additions & 2 deletions golem/task/taskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
Iterable,
List,
Optional,
Tuple,
Type,
TYPE_CHECKING,
)
Expand All @@ -28,7 +27,6 @@

from apps.appsmanager import AppsManager
from apps.core.task.coretask import CoreTask
from apps.core.task.coretaskstate import TaskDefinition
from apps.wasm.environment import WasmTaskEnvironment

from golem import model
Expand Down Expand Up @@ -59,6 +57,7 @@

if TYPE_CHECKING:
# pylint:disable=unused-import, ungrouped-imports
from typing import Tuple
from apps.appsmanager import App
from apps.core.task.coretaskstate import TaskDefinition
from golem.task.taskbase import TaskTypeInfo, TaskBuilder
Expand Down Expand Up @@ -979,6 +978,36 @@ def put_task_in_restarted_state(self, task_id, clear_tmp=True):
logger.info("Task %s put into restarted state", task_id)
self.notice_task_updated(task_id, op=TaskOp.RESTARTED)

@handle_task_key_error
def put_task_in_failed_state(
self,
task_id: str,
task_status=TaskStatus.errorCreating,
) -> None:
assert not task_status.is_active()
assert not task_status.is_completed()
task_state = self.tasks_states[task_id]
if task_state.status.is_completed():
logger.debug(
"Task is already completed. Won't change status."
" current_status=%(current_status)s,"
" refused_status=%(refused_status)s",
{
'current_status': task_state.status,
'refused_status': task_status,
},
)
return

task_state.status = task_state

logger.info(
"Task %s put into failed state. task_status=%s",
task_id,
task_state,
)
self.notice_task_updated(task_id, op=TaskOp.ABORTED)

@handle_subtask_key_error
def restart_subtask(
self,
Expand Down
17 changes: 17 additions & 0 deletions tests/golem/task/test_rpc.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# pylint: disable=protected-access,too-many-ancestors,too-many-lines
# pylint: disable=arguments-differ
import copy
import itertools
from pathlib import Path
Expand Down Expand Up @@ -28,6 +29,7 @@
from golem.task import taskserver
from golem.task import taskstate
from golem.task import tasktester
from golem.task.rpc import _restart_task_error
from golem.task.rpc import ClientProvider
from tests.golem import test_client
from tests.golem.test_client import TestClientBase
Expand Down Expand Up @@ -364,6 +366,21 @@ def add_resources(*_args, **_kwargs):
output_path = Path(task_def['options']['output_path'])
self.assertEqual(str(output_path.parent), task_output_path)

def test_restart_task_error(self):
result = _restart_task_error(
e=RuntimeError('Test error'),
self=self.provider,
task_id='task_id',
new_task=Mock(task_id='new_task_id'),
)
self.assertEqual(
result,
(
None,
'Test error',
),
)


class TestGetMaskForTask(test_client.TestClientBase):
def test_get_mask_for_task(self, *_):
Expand Down

0 comments on commit 65df132

Please sign in to comment.