Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Reporting exceeded CPU limit (#4851)
Browse files Browse the repository at this point in the history
  • Loading branch information
kmazurek authored Nov 6, 2019
1 parent da00a6a commit b49093d
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 31 deletions.
12 changes: 10 additions & 2 deletions golem/docker/task_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from golem.docker.job import DockerJob
from golem.environments.environmentsmanager import EnvironmentsManager
from golem.envs.docker import DockerBind
from golem.task.taskthread import TaskThread, JobException, TimeoutException
from golem.task.taskthread import TaskThread, JobException, TimeoutException, \
BudgetExceededException
from golem.vm.memorychecker import MemoryChecker

if TYPE_CHECKING:
Expand All @@ -20,8 +21,10 @@
logger = logging.getLogger(__name__)


EXIT_CODE_BUDGET_EXCEEDED = 111
EXIT_CODE_MESSAGE = "Subtask computation failed with exit code {}"
EXIT_CODE_PROBABLE_CAUSES = {
EXIT_CODE_BUDGET_EXCEEDED: "CPU budget exceeded",
137: "probably killed by out-of-memory killer"
}

Expand Down Expand Up @@ -220,7 +223,12 @@ def _run_docker_job(self) -> Optional[int]:
logger.warning(f'Task error - exit_code={exit_code}\n'
f'stderr:\n{std_err}\n'
f'tail of stdout:\n{std_out}\n')
raise JobException(self._exit_code_message(exit_code))

if exit_code == EXIT_CODE_BUDGET_EXCEEDED:
raise BudgetExceededException(
self._exit_code_message(exit_code))
else:
raise JobException(self._exit_code_message(exit_code))

return estm_mem

Expand Down
3 changes: 2 additions & 1 deletion golem/task/server/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ def send_task_failure(waiting_task_failure) -> None:
waiting_task_failure.owner.key,
message.tasks.TaskFailure(
task_to_compute=task_to_compute,
err=waiting_task_failure.err_msg
err=waiting_task_failure.err_msg,
reason=waiting_task_failure.reason
),
)
19 changes: 12 additions & 7 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from threading import Lock

from dataclasses import dataclass
from golem_messages.message.tasks import ComputeTaskDef, TaskHeader
from golem_messages.message.tasks import ComputeTaskDef, TaskHeader, TaskFailure
from golem_task_api import ProviderAppClient, constants as task_api_constants
from golem_task_api.envs import DOCKER_CPU_ENV_ID, DOCKER_GPU_ENV_ID
from pydispatch import dispatcher
Expand All @@ -33,7 +33,7 @@
from golem.task.timer import ProviderTimer
from golem.vm.vm import PythonProcVM, PythonTestVM

from .taskthread import TaskThread
from .taskthread import TaskThread, BudgetExceededException, TimeoutException

if TYPE_CHECKING:
from .taskserver import TaskServer # noqa pylint:disable=unused-import
Expand Down Expand Up @@ -523,15 +523,20 @@ def task_computed(self, task_thread: TaskThread) -> None:
was_success = False

if task_thread.error or task_thread.error_msg:

if "Task timed out" in task_thread.error_msg:
reason = TaskFailure.DEFAULT_REASON
# pylint: disable=unidiomatic-typecheck
if type(task_thread.error) is TimeoutException:
self.stats.increase_stat('tasks_with_timeout')
reason = TaskFailure.REASON.TimeExceeded
elif type(task_thread.error) is BudgetExceededException:
reason = TaskFailure.REASON.BudgetExceeded
else:
self.stats.increase_stat('tasks_with_errors')
self.task_server.send_task_failed(
subtask_id,
subtask['task_id'],
task_thread.error_msg,
reason
)

elif task_thread.result and 'data' in task_thread.result:
Expand Down Expand Up @@ -574,7 +579,7 @@ def get_progress(self) -> Optional[ComputingSubtaskStateSnapshot]:
if not self._is_computing() or self.assigned_subtask is None:
return None

c: TaskThread = self.counting_thread
c: Optional[TaskThread] = self.counting_thread
try:
outfilebasename = c.extra_data.get( # type: ignore
'crops'
Expand Down Expand Up @@ -682,8 +687,8 @@ def start_computation(self) -> None: # pylint: disable=too-many-locals
docker_images = [DockerImage(**did) for did in docker_images]
dir_mapping = DockerTaskThread.generate_dir_mapping(resource_dir,
temp_dir)
tt = DockerTaskThread(docker_images, extra_data,
dir_mapping, task_timeout)
tt: TaskThread = DockerTaskThread(
docker_images, extra_data, dir_mapping, task_timeout)
elif self.support_direct_computation:
tt = PyTaskThread(extra_data, resource_dir, temp_dir,
task_timeout)
Expand Down
17 changes: 12 additions & 5 deletions golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,12 @@ def _create_and_set_result_package(self, wtr):
) = result

def send_task_failed(
self, subtask_id: str, task_id: str, err_msg: str) -> None:

self,
subtask_id: str,
task_id: str,
err_msg: str,
reason=message.TaskFailure.DEFAULT_REASON
) -> None:
header = self.task_keeper.task_headers[task_id]

if subtask_id not in self.failures_to_send:
Expand All @@ -647,7 +651,8 @@ def send_task_failed(
task_id=task_id,
subtask_id=subtask_id,
err_msg=err_msg,
owner=header.task_owner)
owner=header.task_owner,
reason=reason)

def new_connection(self, session):
if not self.active:
Expand Down Expand Up @@ -1059,11 +1064,12 @@ def should_accept_provider( # pylint: disable=too-many-return-statements
if allowed:
allowed, reason = self.acl_ip.is_allowed(ip_addr)
if not allowed:
logger.info(f'provider is {reason.value}; {ids}')
reason_msg = 'unknown reason' if reason is None else reason.value
logger.info(f'provider is {reason_msg}; {ids}')
self.notify_provider_rejected(
node_id=node_id, task_id=task_id,
reason=self.RejectedReason.acl,
details={'acl_reason': reason.value})
details={'acl_reason': reason_msg})
return False

trust = self.client.get_computing_trust(node_id)
Expand Down Expand Up @@ -1266,3 +1272,4 @@ class WaitingTaskFailure:
owner: 'dt_p2p.Node'
subtask_id: str
task_id: str
reason: message.TaskFailure.REASON
12 changes: 8 additions & 4 deletions golem/task/taskthread.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import threading
import time
from typing import Any, Dict, Tuple, Union
from typing import Any, Dict, Optional, Tuple, Union

from twisted.internet.defer import Deferred

Expand All @@ -19,6 +19,10 @@ class TimeoutException(JobException):
pass


class BudgetExceededException(JobException):
pass


class TaskThread(threading.Thread):
result: Union[None, Dict[str, Any], Tuple[Dict[str, Any], int]] = None

Expand All @@ -37,7 +41,7 @@ def __init__(self,
self.res_path = res_path
self.tmp_path = tmp_path
self.lock = threading.Lock()
self.error = False
self.error: Optional[Exception] = None
self.error_msg = ""
self.start_time = time.time()
self.end_time = None
Expand Down Expand Up @@ -71,7 +75,7 @@ def get_progress(self):
with self.lock:
return self.vm.get_progress()

def get_error(self):
def get_error(self) -> Optional[Exception]:
with self.lock:
return self.error

Expand Down Expand Up @@ -103,7 +107,7 @@ def _fail(self, exception: Exception):

logger.warning("Task computing error %s", exception)

self.error = True
self.error = exception
self.error_msg = str(exception)
self.done = True
self._deferred.errback(exception)
Expand Down
3 changes: 2 additions & 1 deletion tests/factories/taskserver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

import factory
from golem_messages import idgenerator
from golem_messages import idgenerator, message

from golem import clientconfigdescriptor
from golem.task import taskserver
Expand Down Expand Up @@ -74,3 +74,4 @@ class Meta:
owner = factory.SubFactory(
'golem_messages.factories.datastructures.p2p.Node',
)
reason = message.TaskFailure.DEFAULT_REASON
5 changes: 5 additions & 0 deletions tests/golem/docker/test_docker_task_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,8 @@ def test_exit_code_message(self):
message = DockerTaskThread._exit_code_message(exit_code)
assert message != EXIT_CODE_MESSAGE.format(exit_code)
assert "out-of-memory" in message

exit_code = 111
message = DockerTaskThread._exit_code_message(exit_code)
assert message != EXIT_CODE_MESSAGE.format(exit_code)
assert "CPU budget exceeded" in message
6 changes: 3 additions & 3 deletions tests/golem/task/test_localcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from golem_messages.message import ComputeTaskDef

from golem.docker.manager import DockerManager
from golem.docker.task_thread import DockerTaskThread
from golem.docker.task_thread import DockerTaskThread, JobException
from golem.task.localcomputer import LocalComputer
from golem.task.taskbase import Task
from golem.tools.ci import ci_skip
Expand All @@ -26,7 +26,7 @@ class TestLocalComputer(TestDirFixture):
class TestTaskThread(object):
def __init__(self, result, error_msg):
self.result = result
self.error = False
self.error = None
self.error_msg = error_msg

def test_computer(self):
Expand Down Expand Up @@ -82,7 +82,7 @@ def test_computer(self):
assert self.success_counter == 2

tt = self.TestTaskThread({'data': "some data"}, None)
tt.error = True
tt.error = JobException()
lc.task_computed(tt)
assert self.last_error is None
assert self.error_counter == 4
Expand Down
18 changes: 11 additions & 7 deletions tests/golem/task/test_taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from pydispatch import dispatcher

from golem_messages.message import ComputeTaskDef
from golem_messages.message import ComputeTaskDef, TaskFailure
from twisted.internet import defer
from twisted.trial.unittest import TestCase as TwistedTestCase

Expand All @@ -18,6 +18,7 @@
from golem.envs.docker.cpu import DockerCPUEnvironment
from golem.task.taskcomputer import TaskComputer, PyTaskThread
from golem.task.taskserver import TaskServer
from golem.task.taskthread import JobException
from golem.testutils import DatabaseFixture
from golem.tools.ci import ci_skip
from golem.tools.assertlogs import LogTestCase
Expand Down Expand Up @@ -95,7 +96,10 @@ def test_computation(self): # pylint: disable=too-many-statements
assert tc.counting_thread is None
assert tc.assigned_subtask is None
task_server.send_task_failed.assert_called_with(
"xxyyzz", "xyz", "Host direct task not supported")
"xxyyzz",
"xyz",
"Host direct task not supported"
)

tc.support_direct_computation = True
tc.task_given(ctd)
Expand Down Expand Up @@ -132,7 +136,7 @@ def test_computation(self): # pylint: disable=too-many-statements
self.assertIsNone(tc.counting_thread)
self.assertIsNone(tc.assigned_subtask)
task_server.send_task_failed.assert_called_with(
"aabbcc", "xyz", 'some exception')
"aabbcc", "xyz", 'some exception', TaskFailure.DEFAULT_REASON)
mock_finished.assert_called_once_with()
mock_finished.reset_mock()

Expand Down Expand Up @@ -273,12 +277,12 @@ def test_fail(self):
tt = self._new_task_thread(mock.Mock())
tt._fail(first_error)

assert tt.error is True
self.assertIsNotNone(tt.error)
assert tt.done is True
assert tt.error_msg == str(first_error)

tt._fail(second_error)
assert tt.error is True
self.assertIsNotNone(tt.error)
assert tt.done is True
assert tt.error_msg == str(first_error)

Expand Down Expand Up @@ -363,12 +367,12 @@ def check(expected):

# error case
prepare()
task_thread.error = True
task_thread.error = JobException()
check(False)

# success case
prepare()
task_thread.error = False
task_thread.error = None
task_thread.error_msg = None
task_thread.result = {'data': 'oh senora!!!'}
check(True)
Expand Down
3 changes: 2 additions & 1 deletion tests/golem/task/test_taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from golem_messages.datastructures import tasks as dt_tasks
from golem_messages.datastructures.masking import Mask
from golem_messages.factories.datastructures import p2p as dt_p2p_factory
from golem_messages.message import ComputeTaskDef
from golem_messages.message import ComputeTaskDef, TaskFailure
from golem_messages.utils import encode_hex as encode_key_id, pubkey_to_address
from golem_task_api.envs import DOCKER_CPU_ENV_ID
from requests import HTTPError
Expand Down Expand Up @@ -345,6 +345,7 @@ def test_send_waiting_results(self, mock_send_rct, mock_send_tf, *_):
subtask_id=subtask_id,
owner=dt_p2p_factory.Node(),
err_msg="Controlled failure",
reason=TaskFailure.DEFAULT_REASON
)

ts.failures_to_send[subtask_id] = wtf
Expand Down

0 comments on commit b49093d

Please sign in to comment.