Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Unit tests and some fixes for NewTaskComputer
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiezzel committed Jul 29, 2019
1 parent 5d7e23e commit 54cec12
Show file tree
Hide file tree
Showing 6 changed files with 353 additions and 37 deletions.
35 changes: 13 additions & 22 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from golem.clientconfigdescriptor import ClientConfigDescriptor
from golem.core.common import deadline_to_timeout
from golem.core.deferred import sync_wait
from golem.core.deferred import sync_wait, deferred_from_future
from golem.core.statskeeper import IntStatsKeeper
from golem.docker.image import DockerImage
from golem.docker.manager import DockerManager
Expand Down Expand Up @@ -55,8 +55,8 @@ def __init__(
self,
task_server: 'TaskServer',
env_manager: EnvironmentManager,
use_docker_manager=True,
finished_cb=None
use_docker_manager: bool = True,
finished_cb: Callable[[], Any] = lambda: None
) -> None:
self.stats = IntStatsKeeper(CompStats)
self._task_server = task_server
Expand Down Expand Up @@ -112,10 +112,10 @@ def assigned_subtask_id(self) -> Optional[str]:

def start_computation(self) -> None:
if self._new_computer.has_assigned_task():
# Fire and forget because compute resolves when computation ends
task_id = self.assigned_task_id
subtask_id = self.assigned_subtask_id
computation = self._new_computer.compute()
# Fire and forget because it resolves when computation ends
self._handle_computation_results(task_id, subtask_id, computation)
elif self._old_computer.has_assigned_task():
self._old_computer.start_computation()
Expand All @@ -132,19 +132,17 @@ def _handle_computation_results(
) -> defer.Deferred:
try:
output_file = yield computation
self._task_server.send_results(
subtask_id=subtask_id,
task_id=task_id,
result={'data': [output_file]},
)
except Exception as e: # pylint: disable=broad-except
self._task_server.send_task_failed(
subtask_id=subtask_id,
task_id=task_id,
err_msg=str(e)
)
return

self._task_server.send_results(
subtask_id=subtask_id,
task_id=task_id,
result={'data': [output_file]},
)

def task_interrupted(self) -> None:
if self._new_computer.has_assigned_task():
Expand All @@ -160,8 +158,6 @@ def check_timeout(self) -> None:
self._old_computer.check_timeout()

def get_progress(self) -> Optional[ComputingSubtaskStateSnapshot]:
if self._new_computer.has_assigned_task():
return self._new_computer.get_progress()
if self._old_computer.has_assigned_task():
return self._old_computer.get_progress()
return None
Expand Down Expand Up @@ -262,7 +258,7 @@ def assigned_subtask_id(self) -> Optional[str]:
return self._assigned_task.subtask_id

def _is_computing(self) -> bool:
return self._runtime is not None
return self._computation is not None

def task_given(
self,
Expand Down Expand Up @@ -292,8 +288,7 @@ def compute(self) -> defer.Deferred:
prereq_dict=assigned_task.prereq_dict
)
# FIXME: Remove when ProviderAppClient implements shutdown
# pylint: disable=protected-access
self._runtime = task_api_service._runtime
self._runtime = task_api_service._runtime # noqa pylint: disable=protected-access

compute_future = asyncio.ensure_future(self._app_client.compute(
service=task_api_service,
Expand All @@ -302,9 +297,9 @@ def compute(self) -> defer.Deferred:
subtask_params=assigned_task.subtask_params
))

self._computation = defer.Deferred.fromFuture(compute_future)
self._computation = deferred_from_future(compute_future)
from twisted.internet import reactor
timeout = deadline_to_timeout(assigned_task.deadline)
timeout = int(deadline_to_timeout(assigned_task.deadline))
self._computation.addTimeout(timeout, reactor)
return self._wait_until_computation_ends()

Expand Down Expand Up @@ -396,10 +391,6 @@ def task_interrupted(self) -> None:
assert self._runtime is not None
self._runtime.stop()

@staticmethod
def get_progress() -> Optional[ComputingSubtaskStateSnapshot]:
return None # Not supported because it's app-specific

def get_current_computing_env(self) -> Optional[EnvId]:
if self._assigned_task is None:
return None
Expand Down
3 changes: 2 additions & 1 deletion golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,10 @@ def resource_failure(self, task_id: str, reason: str) -> None:
logger.error("Resource failure for a wrong task, %s", task_id)
return

subtask_id = self.task_computer.assigned_subtask_id
self.task_computer.task_interrupted()
self.send_task_failed(
self.task_computer.assigned_subtask_id,
subtask_id,
task_id,
f'Error downloading resources: {reason}',
)
Expand Down
5 changes: 4 additions & 1 deletion tests/golem/core/test_deferred.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ class TestDeferredFromFuture(TwistedTestCase):

@classmethod
def setUpClass(cls) -> None:
uninstall_reactor() # Because other tests don't clean up
try:
uninstall_reactor() # Because other tests don't clean up
except AttributeError:
pass
install_reactor()

@classmethod
Expand Down
Loading

0 comments on commit 54cec12

Please sign in to comment.