Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Merge pull request #4278 from golemfactory/subtasks-restart-estimation
Browse files Browse the repository at this point in the history
Implement subtasks restart cost estimation RPC
  • Loading branch information
kmazurek authored Jun 7, 2019
2 parents 4187730 + f537b6b commit c9fed0a
Show file tree
Hide file tree
Showing 7 changed files with 389 additions and 68 deletions.
133 changes: 101 additions & 32 deletions golem/task/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from golem.core import deferred as golem_deferred
from golem.core import simpleserializer
from golem.ethereum import exceptions as eth_exceptions
from golem.model import Actor
from golem.resource import resource
from golem.rpc import utils as rpc_utils
from golem.task import taskbase, taskkeeper, taskstate, tasktester
Expand Down Expand Up @@ -415,7 +416,7 @@ def _restart_task_error(e, _self, task_id, **_kwargs):
return None, str(e)


def _restart_subtasks_error(e, _self, task_id, subtask_ids, **_kwargs) \
def _restart_subtasks_error(e, _self, task_id, subtask_ids, *args, **_kwargs) \
-> typing.Union[str, typing.Dict]:
logger.error("Failed to restart subtasks. task_id: %r, subtask_ids: %r, %s",
task_id, subtask_ids, e)
Expand Down Expand Up @@ -530,8 +531,12 @@ def _validate_lock_funds_possibility(

@rpc_utils.expose('comp.task.restart')
@safe_run(_restart_task_error)
def restart_task(self, task_id: str, force: bool = False) \
-> typing.Tuple[typing.Optional[str], typing.Optional[str]]:
def restart_task(
self,
task_id: str,
force: bool = False,
disable_concent: bool = False
) -> typing.Tuple[typing.Optional[str], typing.Optional[str]]:
"""
:return: (new_task_id, None) on success; (None, error_message)
on failure
Expand All @@ -552,7 +557,7 @@ def restart_task(self, task_id: str, force: bool = False) \
self._validate_enough_funds_to_pay_for_task(
task.subtask_price,
task.get_total_tasks(),
task.header.concent_enabled,
False if disable_concent else task.header.concent_enabled,
force
)

Expand All @@ -564,7 +569,10 @@ def restart_task(self, task_id: str, force: bool = False) \
except KeyError:
return None, "Task not found: '{}'".format(task_id)

task_dict.pop('id', None)
del task_dict['id']
if disable_concent:
task_dict['concent_enabled'] = False

prepare_and_validate_task_dict(self.client, task_dict)
new_task = self.task_manager.create_task(task_dict)
validate_client(self.client)
Expand Down Expand Up @@ -592,11 +600,14 @@ def restart_subtasks(
disable_concent: bool = False
) -> typing.Optional[typing.Union[str, typing.Dict]]:
"""
Restarts a set of subtasks from the same task. If the specified task is
already finished it will be restarted, clearing the state of the given
subtasks and copying over the remaining results.
Restarts a set of subtasks from the given task. If the specified task is
already finished, all failed subtasks will be restarted along with the
set provided as a parameter. Finished subtasks will have their results
copied over to the newly created task.
:param task_id: the ID of the task which contains the given subtasks.
:param subtask_ids: the set of subtask IDs which should be restarted.
If this is empty and the task is finished, all of the task's subtasks
marked as failed will be restarted.
:param ignore_gas_price: if True, this will ignore long transaction time
errors and proceed with the restart.
:param disable_concent: setting this flag to True will result in forcing
Expand All @@ -605,19 +616,17 @@ def restart_subtasks(
:return: In case of any errors, returns the representation of the error
(either a string or a dict). Otherwise, returns None.
"""
try:
task = self.task_manager.tasks[task_id]
except KeyError:
err_msg = f'Task not found: {task_id!r}'
logger.error(err_msg)
return err_msg
task = self.task_manager.tasks.get(task_id)
if not task:
return f'Task not found: {task_id!r}'

self._validate_enough_funds_to_pay_for_task(
task.subtask_price,
len(subtask_ids),
False if disable_concent else task.header.concent_enabled,
ignore_gas_price
)
subtasks_to_restart = set(subtask_ids)

for sub_id in subtasks_to_restart:
if self.task_manager.subtask_to_task(
sub_id, Actor.Requestor) != task_id:
return f'Subtask does not belong to the given task.' \
f'task_id: {task_id}, subtask_id: {sub_id}'

logger.debug('restart_subtasks. task_id=%r, subtask_ids=%r, '
'ignore_gas_price=%r, disable_concent=%r', task_id,
Expand All @@ -626,6 +635,13 @@ def restart_subtasks(
task_state = self.client.task_manager.tasks_states[task_id]

if task_state.status.is_active():
self._validate_enough_funds_to_pay_for_task(
task.subtask_price,
len(subtask_ids),
False if disable_concent else task.header.concent_enabled,
ignore_gas_price
)

for subtask_id in subtask_ids:
self.client.restart_subtask(subtask_id)
else:
Expand All @@ -649,19 +665,19 @@ def restart_frame_subtasks(
self,
task_id: str,
frame: int
):
) -> typing.Optional[typing.Union[str, typing.Dict]]:
logger.debug('restart_frame_subtasks. task_id=%r, frame=%r',
task_id, frame)

frame_subtasks: typing.Dict[str, dict] =\
frame_subtasks: typing.FrozenSet[str] =\
self.task_manager.get_frame_subtasks(task_id, frame)

if not frame_subtasks:
logger.error('Frame restart failed, frame has no subtasks.'
'task_id=%r, frame=%r', task_id, frame)
return

self.restart_subtasks(task_id, frame_subtasks)
return self.restart_subtasks(task_id, list(frame_subtasks))

@safe_run(_restart_subtasks_error)
def _restart_finished_task_subtasks(
Expand All @@ -676,11 +692,6 @@ def _restart_finished_task_subtasks(
subtask_ids, ignore_gas_price)

try:
self.task_manager.put_task_in_restarted_state(
task_id,
clear_tmp=False,
)

old_task = self.task_manager.tasks[task_id]

finished_subtask_ids = set(
Expand All @@ -689,6 +700,18 @@ def _restart_finished_task_subtasks(
)
subtask_ids_to_copy = finished_subtask_ids - set(subtask_ids)

self._validate_enough_funds_to_pay_for_task(
old_task.subtask_price,
old_task.get_total_tasks() - len(subtask_ids_to_copy),
False if disable_concent else old_task.header.concent_enabled,
ignore_gas_price
)

self.task_manager.put_task_in_restarted_state(
task_id,
clear_tmp=False,
)

logger.debug('_restart_finished_task_subtasks. '
'subtask_ids_to_copy=%r', subtask_ids_to_copy)
except self.task_manager.AlreadyRestartedError:
Expand Down Expand Up @@ -740,6 +763,49 @@ def run_test_task(self, task_dict) -> bool:
# Don't wait for _deferred
return True

@rpc_utils.expose('comp.task.subtasks.estimated.cost')
def get_estimated_subtasks_cost(
self,
task_id: str,
subtask_ids: typing.List[str]
) -> typing.Tuple[typing.Optional[dict], typing.Optional[str]]:
"""
Estimates the cost of restarting an array of subtasks from a given task.
If the specified task is finished, all of the failed subtasks from that
task will be added to the estimation.
:param task_id: ID of the task containing the subtasks to be restarted.
:param subtask_ids: a list of subtask IDs which should be restarted. If
one of the subtasks does not belong to the given task, an error will be
returned.
:return: a result, error tuple. When the result is present the error
should be None (and vice-versa).
"""
task = self.task_manager.tasks.get(task_id)
if not task:
return None, f'Task not found: {task_id}'

subtasks_to_restart = set(subtask_ids)

for sub_id in subtasks_to_restart:
if self.task_manager.subtask_to_task(
sub_id, Actor.Requestor) != task_id:
return None, f'Subtask does not belong to the given task.' \
f'task_id: {task_id}, subtask_id: {sub_id}'

if self.task_manager.task_finished(task_id):
failed_subtask_ids = set(
sub_id for sub_id, subtask in task.subtasks_given.items()
if subtask['status'] == taskstate.SubtaskStatus.failure
)
subtasks_to_restart |= failed_subtask_ids

result = self._get_cost_estimation(
len(subtasks_to_restart),
task.subtask_price
)

return result, None

@rpc_utils.expose('comp.tasks.estimated.cost')
def get_estimated_cost(
self,
Expand Down Expand Up @@ -792,6 +858,12 @@ def get_estimated_cost(
computation_time=subtask_timeout
)

result = self._get_cost_estimation(subtask_count, subtask_price)

logger.info('Estimated task cost. result=%r', result)
return result, None

def _get_cost_estimation(self, subtask_count: int, subtask_price: int):
estimated_gnt: int = subtask_count * subtask_price
estimated_eth: int = self.client \
.transaction_system.eth_for_batch_payment(subtask_count)
Expand All @@ -800,7 +872,7 @@ def get_estimated_cost(
estimated_deposit_eth: int = self.client.transaction_system \
.eth_for_deposit()

result = {
return {
'GNT': str(estimated_gnt),
'ETH': str(estimated_eth),
'deposit': {
Expand All @@ -810,9 +882,6 @@ def get_estimated_cost(
},
}

logger.info('Estimated task cost. result=%r', result)
return result, None

@rpc_utils.expose('comp.task.rendering.task_fragments')
def get_fragments(self, task_id: str) -> \
typing.Tuple[
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import time
import typing

from ...base import NodeTestPlaybook
from ...test_config_base import NodeId


class Playbook(NodeTestPlaybook):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.previous_task_id = None

def step_restart_failed_subtasks(self):
def on_success(result):
print(f'Restarted failed subtasks.'
f'task_id={self.previous_task_id}.')
self.next()

return self.call(NodeId.requestor,
'comp.task.subtasks.restart',
self.previous_task_id,
[],
on_success=on_success)

def step_wait_task_timeout(self):
def on_success(result):
if result['status'] == 'Timeout':
print("Task timed out as expected.")
self.previous_task_id = self.task_id
self.task_id = None
self.next()
elif result['status'] == 'Finished':
print("Task finished unexpectedly, failing test :(")
self.fail()
else:
print("Task status: {} ... ".format(result['status']))
time.sleep(10)

return self.call(NodeId.requestor, 'comp.task', self.task_id,
on_success=on_success)

steps: typing.Tuple = NodeTestPlaybook.initial_steps + (
NodeTestPlaybook.step_create_task,
NodeTestPlaybook.step_get_task_id,
NodeTestPlaybook.step_get_task_status,
step_wait_task_timeout,
NodeTestPlaybook.step_stop_nodes,
NodeTestPlaybook.step_restart_nodes,
) + NodeTestPlaybook.initial_steps + (
NodeTestPlaybook.step_get_known_tasks,
step_restart_failed_subtasks,
NodeTestPlaybook.step_get_task_id,
NodeTestPlaybook.step_get_task_status,
NodeTestPlaybook.step_wait_task_finished,
NodeTestPlaybook.step_verify_output,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from ...test_config_base import (
TestConfigBase, make_node_config_from_env, NodeId)


class TestConfig(TestConfigBase):
def __init__(self):
super().__init__()
provider_config = make_node_config_from_env(NodeId.provider.value, 0)
provider_config.script = 'provider/cannot_compute'
provider_config_2 = make_node_config_from_env(NodeId.provider.value, 0)

requestor_config = make_node_config_from_env(NodeId.requestor.value, 1)
requestor_config_2 = make_node_config_from_env(
NodeId.requestor.value, 1)
requestor_config_2.script = 'requestor/always_accept_provider'

self.nodes[NodeId.requestor] = [
requestor_config,
requestor_config_2,
]

self.nodes[NodeId.provider] = [
provider_config,
provider_config_2
]
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from functools import partial
import typing

from ...base import NodeTestPlaybook
Expand Down
3 changes: 3 additions & 0 deletions scripts/node_integration_tests/tests/test_golem.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,6 @@ def test_large_result(self):
'golem.separate_hyperg',
**{'task-package': 'cubes', 'task-settings': '4k'},
)

def test_restart_failed_subtasks(self):
self._run_test('golem.restart_failed_subtasks')
Loading

0 comments on commit c9fed0a

Please sign in to comment.