Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Reporting exceeded CPU limit #4851

Merged
merged 4 commits into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions golem/docker/task_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from golem.docker.job import DockerJob
from golem.environments.environmentsmanager import EnvironmentsManager
from golem.envs.docker import DockerBind
from golem.task.taskthread import TaskThread, JobException, TimeoutException
from golem.task.taskthread import TaskThread, JobException, TimeoutException, \
BudgetExceededException
from golem.vm.memorychecker import MemoryChecker

if TYPE_CHECKING:
Expand All @@ -20,8 +21,10 @@
logger = logging.getLogger(__name__)


EXIT_CODE_BUDGET_EXCEEDED = 111
EXIT_CODE_MESSAGE = "Subtask computation failed with exit code {}"
EXIT_CODE_PROBABLE_CAUSES = {
EXIT_CODE_BUDGET_EXCEEDED: "CPU budget exceeded",
137: "probably killed by out-of-memory killer"
}

Expand Down Expand Up @@ -220,7 +223,12 @@ def _run_docker_job(self) -> Optional[int]:
logger.warning(f'Task error - exit_code={exit_code}\n'
f'stderr:\n{std_err}\n'
f'tail of stdout:\n{std_out}\n')
raise JobException(self._exit_code_message(exit_code))

if exit_code == EXIT_CODE_BUDGET_EXCEEDED:
raise BudgetExceededException(
self._exit_code_message(exit_code))
else:
raise JobException(self._exit_code_message(exit_code))

return estm_mem

Expand Down
3 changes: 2 additions & 1 deletion golem/task/server/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def send_task_failure(waiting_task_failure) -> None:
waiting_task_failure.owner.key,
message.tasks.TaskFailure(
task_to_compute=task_to_compute,
err=waiting_task_failure.err_msg
err=waiting_task_failure.err_msg,
reason=waiting_task_failure.reason
),
)
19 changes: 12 additions & 7 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from threading import Lock

from dataclasses import dataclass
from golem_messages.message.tasks import ComputeTaskDef, TaskHeader
from golem_messages.message.tasks import ComputeTaskDef, TaskHeader, TaskFailure
from golem_task_api import ProviderAppClient, constants as task_api_constants
from golem_task_api.envs import DOCKER_CPU_ENV_ID, DOCKER_GPU_ENV_ID
from pydispatch import dispatcher
Expand All @@ -33,7 +33,7 @@
from golem.task.timer import ProviderTimer
from golem.vm.vm import PythonProcVM, PythonTestVM

from .taskthread import TaskThread
from .taskthread import TaskThread, BudgetExceededException, TimeoutException

if TYPE_CHECKING:
from .taskserver import TaskServer # noqa pylint:disable=unused-import
Expand Down Expand Up @@ -519,15 +519,20 @@ def task_computed(self, task_thread: TaskThread) -> None:
was_success = False

if task_thread.error or task_thread.error_msg:

if "Task timed out" in task_thread.error_msg:
reason = TaskFailure.DEFAULT_REASON
# pylint: disable=unidiomatic-typecheck
if type(task_thread.error) is TimeoutException:
self.stats.increase_stat('tasks_with_timeout')
reason = TaskFailure.REASON.TimeExceeded
elif type(task_thread.error) is BudgetExceededException:
reason = TaskFailure.REASON.BudgetExceeded
else:
self.stats.increase_stat('tasks_with_errors')
self.task_server.send_task_failed(
subtask_id,
subtask['task_id'],
task_thread.error_msg,
reason
)

elif task_thread.result and 'data' in task_thread.result:
Expand Down Expand Up @@ -570,7 +575,7 @@ def get_progress(self) -> Optional[ComputingSubtaskStateSnapshot]:
if not self._is_computing() or self.assigned_subtask is None:
return None

c: TaskThread = self.counting_thread
c: Optional[TaskThread] = self.counting_thread
try:
outfilebasename = c.extra_data.get( # type: ignore
'crops'
Expand Down Expand Up @@ -678,8 +683,8 @@ def start_computation(self) -> None: # pylint: disable=too-many-locals
docker_images = [DockerImage(**did) for did in docker_images]
dir_mapping = DockerTaskThread.generate_dir_mapping(resource_dir,
temp_dir)
tt = DockerTaskThread(docker_images, extra_data,
dir_mapping, task_timeout)
tt: TaskThread = DockerTaskThread(
docker_images, extra_data, dir_mapping, task_timeout)
elif self.support_direct_computation:
tt = PyTaskThread(extra_data, resource_dir, temp_dir,
task_timeout)
Expand Down
17 changes: 12 additions & 5 deletions golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,12 @@ def _create_and_set_result_package(self, wtr):
) = result

def send_task_failed(
self, subtask_id: str, task_id: str, err_msg: str) -> None:

self,
subtask_id: str,
task_id: str,
err_msg: str,
reason=message.TaskFailure.DEFAULT_REASON
) -> None:
header = self.task_keeper.task_headers[task_id]

if subtask_id not in self.failures_to_send:
Expand All @@ -627,7 +631,8 @@ def send_task_failed(
task_id=task_id,
subtask_id=subtask_id,
err_msg=err_msg,
owner=header.task_owner)
owner=header.task_owner,
reason=reason)

def new_connection(self, session):
if not self.active:
Expand Down Expand Up @@ -1024,11 +1029,12 @@ def should_accept_provider( # pylint: disable=too-many-return-statements
if allowed:
allowed, reason = self.acl_ip.is_allowed(ip_addr)
if not allowed:
logger.info(f'provider is {reason.value}; {ids}')
reason_msg = 'unknown reason' if reason is None else reason.value
logger.info(f'provider is {reason_msg}; {ids}')
self.notify_provider_rejected(
node_id=node_id, task_id=task_id,
reason=self.RejectedReason.acl,
details={'acl_reason': reason.value})
details={'acl_reason': reason_msg})
return False

trust = self.client.get_computing_trust(node_id)
Expand Down Expand Up @@ -1233,3 +1239,4 @@ class WaitingTaskFailure:
owner: 'dt_p2p.Node'
subtask_id: str
task_id: str
reason: message.TaskFailure.REASON
12 changes: 8 additions & 4 deletions golem/task/taskthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import threading
import time
from typing import Any, Dict, Tuple, Union
from typing import Any, Dict, Optional, Tuple, Union

from twisted.internet.defer import Deferred

Expand All @@ -19,6 +19,10 @@ class TimeoutException(JobException):
pass


class BudgetExceededException(JobException):
pass


class TaskThread(threading.Thread):
result: Union[None, Dict[str, Any], Tuple[Dict[str, Any], int]] = None

Expand All @@ -37,7 +41,7 @@ def __init__(self,
self.res_path = res_path
self.tmp_path = tmp_path
self.lock = threading.Lock()
self.error = False
self.error: Optional[Exception] = None
self.error_msg = ""
self.start_time = time.time()
self.end_time = None
Expand Down Expand Up @@ -71,7 +75,7 @@ def get_progress(self):
with self.lock:
return self.vm.get_progress()

def get_error(self):
def get_error(self) -> Optional[Exception]:
with self.lock:
return self.error

Expand Down Expand Up @@ -103,7 +107,7 @@ def _fail(self, exception: Exception):

logger.warning("Task computing error %s", exception)

self.error = True
self.error = exception
self.error_msg = str(exception)
self.done = True
self._deferred.errback(exception)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ eth-utils==1.0.3
ethereum==1.6.1
eventlet==0.24.1
fs==2.4.4
Golem-Messages==3.13.0
Golem-Messages==3.14.0
Golem-Smart-Contracts-Interface==1.10.3
golem_task_api==0.21.0
greenlet==0.4.15
Expand Down
2 changes: 1 addition & 1 deletion requirements_to-freeze.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ docker==3.5.0
enforce==0.3.4
eth-utils==1.0.3
ethereum==1.6.1
Golem-Messages==3.13.0
Golem-Messages==3.14.0
Golem-Smart-Contracts-Interface==1.10.3
golem_task_api==0.21.0
html2text==2018.1.9
Expand Down
3 changes: 2 additions & 1 deletion tests/factories/taskserver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

import factory
from golem_messages import idgenerator
from golem_messages import idgenerator, message

from golem import clientconfigdescriptor
from golem.task import taskserver
Expand Down Expand Up @@ -74,3 +74,4 @@ class Meta:
owner = factory.SubFactory(
'golem_messages.factories.datastructures.p2p.Node',
)
reason = message.TaskFailure.DEFAULT_REASON
5 changes: 5 additions & 0 deletions tests/golem/docker/test_docker_task_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,8 @@ def test_exit_code_message(self):
message = DockerTaskThread._exit_code_message(exit_code)
assert message != EXIT_CODE_MESSAGE.format(exit_code)
assert "out-of-memory" in message

exit_code = 111
message = DockerTaskThread._exit_code_message(exit_code)
assert message != EXIT_CODE_MESSAGE.format(exit_code)
assert "CPU budget exceeded" in message
6 changes: 3 additions & 3 deletions tests/golem/task/test_localcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from golem_messages.message import ComputeTaskDef

from golem.docker.manager import DockerManager
from golem.docker.task_thread import DockerTaskThread
from golem.docker.task_thread import DockerTaskThread, JobException
from golem.task.localcomputer import LocalComputer
from golem.task.taskbase import Task
from golem.tools.ci import ci_skip
Expand All @@ -26,7 +26,7 @@ class TestLocalComputer(TestDirFixture):
class TestTaskThread(object):
def __init__(self, result, error_msg):
self.result = result
self.error = False
self.error = None
self.error_msg = error_msg

def test_computer(self):
Expand Down Expand Up @@ -82,7 +82,7 @@ def test_computer(self):
assert self.success_counter == 2

tt = self.TestTaskThread({'data': "some data"}, None)
tt.error = True
tt.error = JobException()
lc.task_computed(tt)
assert self.last_error is None
assert self.error_counter == 4
Expand Down
18 changes: 11 additions & 7 deletions tests/golem/task/test_taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from pydispatch import dispatcher

from golem_messages.message import ComputeTaskDef
from golem_messages.message import ComputeTaskDef, TaskFailure
from twisted.internet import defer
from twisted.trial.unittest import TestCase as TwistedTestCase

Expand All @@ -18,6 +18,7 @@
from golem.envs.docker.cpu import DockerCPUEnvironment
from golem.task.taskcomputer import TaskComputer, PyTaskThread
from golem.task.taskserver import TaskServer
from golem.task.taskthread import JobException
from golem.testutils import DatabaseFixture
from golem.tools.ci import ci_skip
from golem.tools.assertlogs import LogTestCase
Expand Down Expand Up @@ -95,7 +96,10 @@ def test_computation(self): # pylint: disable=too-many-statements
assert tc.counting_thread is None
assert tc.assigned_subtask is None
task_server.send_task_failed.assert_called_with(
"xxyyzz", "xyz", "Host direct task not supported")
"xxyyzz",
"xyz",
"Host direct task not supported"
)

tc.support_direct_computation = True
tc.task_given(ctd)
Expand Down Expand Up @@ -132,7 +136,7 @@ def test_computation(self): # pylint: disable=too-many-statements
self.assertIsNone(tc.counting_thread)
self.assertIsNone(tc.assigned_subtask)
task_server.send_task_failed.assert_called_with(
"aabbcc", "xyz", 'some exception')
"aabbcc", "xyz", 'some exception', TaskFailure.DEFAULT_REASON)
mock_finished.assert_called_once_with()
mock_finished.reset_mock()

Expand Down Expand Up @@ -273,12 +277,12 @@ def test_fail(self):
tt = self._new_task_thread(mock.Mock())
tt._fail(first_error)

assert tt.error is True
self.assertIsNotNone(tt.error)
assert tt.done is True
assert tt.error_msg == str(first_error)

tt._fail(second_error)
assert tt.error is True
self.assertIsNotNone(tt.error)
assert tt.done is True
assert tt.error_msg == str(first_error)

Expand Down Expand Up @@ -363,12 +367,12 @@ def check(expected):

# error case
prepare()
task_thread.error = True
task_thread.error = JobException()
check(False)

# success case
prepare()
task_thread.error = False
task_thread.error = None
task_thread.error_msg = None
task_thread.result = {'data': 'oh senora!!!'}
check(True)
Expand Down
3 changes: 2 additions & 1 deletion tests/golem/task/test_taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from golem_messages.datastructures import tasks as dt_tasks
from golem_messages.datastructures.masking import Mask
from golem_messages.factories.datastructures import p2p as dt_p2p_factory
from golem_messages.message import ComputeTaskDef
from golem_messages.message import ComputeTaskDef, TaskFailure
from golem_messages.utils import encode_hex as encode_key_id, pubkey_to_address
from golem_task_api.envs import DOCKER_CPU_ENV_ID
from requests import HTTPError
Expand Down Expand Up @@ -340,6 +340,7 @@ def test_send_waiting_results(self, mock_send_rct, mock_send_tf, *_):
subtask_id=subtask_id,
owner=dt_p2p_factory.Node(),
err_msg="Controlled failure",
reason=TaskFailure.DEFAULT_REASON
)

ts.failures_to_send[subtask_id] = wtf
Expand Down