Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

cherry-pick from 4924 rebased against 0.22 #4933

Merged
merged 3 commits into from
Nov 26, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 43 additions & 46 deletions apps/wasm/task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
from dataclasses import dataclass
from copy import deepcopy
from pathlib import Path, PurePath
from typing import (
Expand All @@ -15,6 +14,7 @@
Set
)
import logging
from dataclasses import dataclass

from ethereum.utils import denoms

Expand Down Expand Up @@ -64,13 +64,15 @@ class VbrSubtask:
and subtask related data management from the client code.
"""
# __DEBUG_COUNTER: int = 0
def __init__(self, id_gen, name, params, redundancy_factor):
def __init__(
self, id_gen: Callable[[], str], name: str, params: Dict[str, str],
redundancy_factor: int
):
self.id_gen = id_gen
self.name = name
self.params = params
self.result = None
self.result: Optional[TaskResult] = None
self.redundancy_factor = redundancy_factor

self.subtasks: Dict[str, SubtaskInstance] = {}
self.verifier = BucketVerifier(
redundancy_factor, WasmTask.cmp_results, referee_count=1)
Expand Down Expand Up @@ -104,15 +106,15 @@ def get_instance(self, s_id) -> SubtaskInstance:
return self.subtasks[s_id]

def get_instances(self) -> List[str]:
return self.subtasks.keys()
return list(self.subtasks.keys())

def add_result(self, s_id: str, task_result: Optional[TaskResult]):
result_files = task_result.files if task_result else None
self.verifier.add_result(
self.subtasks[s_id].actor, result_files)
self.subtasks[s_id].results = task_result

def get_result(self) -> TaskResult:
def get_result(self) -> Optional[TaskResult]:
return self.result

def is_finished(self) -> bool:
Expand Down Expand Up @@ -140,19 +142,24 @@ def get_subtask_count(self) -> int:

def get_tasks_left(self) -> int:
return self.get_subtask_count() - len(
[s for s in self.subtasks.values()
if s['status'] != SubtaskStatus.finished]
)
[
s for s in self.subtasks.values()
if s.status != SubtaskStatus.finished
])

def restart_subtask(self, subtask_id: str):
subtask = self.subtasks[subtask_id]
if subtask['status'] != SubtaskStatus.starting:
raise ValueError("Cannot restart subtask with status: " +
str(subtask['status']))
if subtask['results'] is not None:
if subtask.status != SubtaskStatus.starting:
raise ValueError(
"Cannot restart subtask with status: " + str(
subtask.status))
if subtask.results is not None and subtask.results.files:
logger.warning(
"results subtask status=%s results=%s", str(subtask.status),
repr(subtask.results))
raise ValueError("Cannot restart computed VbR subtask")
self.verifier.remove_actor(subtask['actor'])
subtask['status'] = SubtaskStatus.restarted
self.verifier.remove_actor(subtask.actor)
subtask.status = SubtaskStatus.restarted


class WasmTaskOptions(Options):
Expand Down Expand Up @@ -218,9 +225,9 @@ def to_dict(self) -> dict:


class WasmTask(CoreTask): # pylint: disable=too-many-public-methods
REQUESTOR_MARKET_STRATEGY: Type[RequestorWasmMarketStrategy] =\
REQUESTOR_MARKET_STRATEGY: Type[RequestorWasmMarketStrategy] = \
RequestorWasmMarketStrategy
PROVIDER_MARKET_STRATEGY: Type[ProviderWasmMarketStrategy] =\
PROVIDER_MARKET_STRATEGY: Type[ProviderWasmMarketStrategy] = \
ProviderWasmMarketStrategy

ENVIRONMENT_CLASS = WasmTaskEnvironment
Expand All @@ -238,7 +245,6 @@ def __init__(self, task_definition: WasmTaskDefinition,
self.task_definition: WasmTaskDefinition = task_definition
self.options: WasmTaskOptions = task_definition.options
self.subtasks: List[VbrSubtask] = []
self.subtasks_given = {}

for s_name, s_params in self.options.get_subtask_iterator():
s_params = {
Expand All @@ -262,10 +268,8 @@ def query_extra_data(
next_subtask = s.new_instance(node_id)
if next_subtask:
s_id, s_params = next_subtask
self.subtasks_given[s_id] = {
'status': SubtaskStatus.starting,
'node_id': node_id
}
self.subtasks_given[s_id] = dict(
status=SubtaskStatus.starting, node_id=node_id)
ctd = self._new_compute_task_def(s_id, s_params, perf_index)

return Task.ExtraData(ctd=ctd)
Expand Down Expand Up @@ -315,7 +319,7 @@ def _resolve_subtasks_statuses(self, subtask: VbrSubtask):

def _handle_vbr_subtask_result(self, subtask: VbrSubtask):
# save the results but only if verification was successful
result: TaskResult = subtask.get_result()
result: Optional[TaskResult] = subtask.get_result()
if result is not None:
self.save_results(subtask.name, result.files)
else:
Expand All @@ -333,7 +337,7 @@ def computation_finished(
WasmTask.CALLBACKS[subtask_id] = verification_finished
self.subtasks_given[subtask_id]['status'] = SubtaskStatus.verifying

self.interpret_task_results(subtask_id, task_result.files)
self.interpret_task_results(subtask_id, task_result)
task_result.files = self.results[subtask_id]

subtask = self._find_vbrsubtask_by_id(subtask_id)
Expand All @@ -348,12 +352,16 @@ def computation_finished(
s_instance = subtask.get_instance(s_id)
if not s_instance.results:
continue
subtask_usages.append(
(s_instance.actor.uuid,
s_id,
s_instance.results.stats.
cpu_stats.cpu_usage['total_usage'] * NANOSECOND)
)
if s_instance.results.stats.cpu_stats is not None:
subtask_usages.append(
(
s_instance.actor.uuid, s_id,
s_instance.results.stats.
cpu_stats.cpu_usage['total_usage'] * NANOSECOND))
else:
logger.warning(
"invalid result stats %s",
repr(s_instance.results.stats))
self.REQUESTOR_MARKET_STRATEGY.report_subtask_usages(
self.task_definition.task_id,
subtask_usages
Expand Down Expand Up @@ -381,7 +389,7 @@ def accept_results(self, subtask_id, result_files):
pass

def query_extra_data_for_test_task(self) -> ComputeTaskDef:
next_subtask_instance = self.subtasks[0]\
next_subtask_instance = self.subtasks[0] \
.new_instance("benchmark_node_id")

if not next_subtask_instance:
Expand Down Expand Up @@ -414,20 +422,8 @@ def filter_task_results(

return filtered_task_results

def interpret_task_results(
self, subtask_id: str, task_results: List[str],
sort: bool = True) -> None:
self.stdout[subtask_id] = ""
self.stderr[subtask_id] = ""

self.results[subtask_id] = self.filter_task_results(
task_results, subtask_id)
if sort:
self.results[subtask_id].sort()

def should_accept_client(self,
node_id: str,
offer_hash: str) -> AcceptClientVerdict:
def should_accept_client(
self, node_id: str, offer_hash: str) -> AcceptClientVerdict:
"""Deciding whether to accept particular node_id for next task
computation.

Expand Down Expand Up @@ -516,7 +512,8 @@ def get_results(self, subtask_id):
return results.files if results else []

@classmethod
def calculate_subtask_budget(cls, task_definition: WasmTaskDefinition): # type:ignore # noqa pylint:disable=line-too-long
def calculate_subtask_budget(cls, task_definition: TaskDefinition) -> int:
assert isinstance(task_definition, WasmTaskDefinition)
num_payable_subtasks = len(task_definition.options.subtasks) * \
(cls.REDUNDANCY_FACTOR + 1)
return task_definition.budget // num_payable_subtasks
Expand Down