Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Some tests and fixes for NewTaskComputer
Browse files Browse the repository at this point in the history
* Refactored _get_task_api_service() and _get_task_dir() to use
  _assigned_task instead of method parameters.
* Added support_direct_computation property to TaskComputerAdapter
  (needed by dummy task computation).
* Fixed DummyTask.computation_failed() which had passing None value as
  a not-none parameter to DummyTask.computation_finished().
* Added unit test for NewTaskComputer._get_task_api_service()
* Moved _runtime assignment after calling _app_client.compute() in
  NewTaskComputer.compute() (otherwise it would be always None).
  • Loading branch information
Wiezzel committed Jul 30, 2019
1 parent 0ac8c24 commit 11e053d
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 47 deletions.
42 changes: 23 additions & 19 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ def assigned_subtask_id(self) -> Optional[str]:
return self._new_computer.assigned_subtask_id \
or self._old_computer.assigned_subtask_id

@property
def support_direct_computation(self) -> bool:
return self._old_computer.support_direct_computation

@support_direct_computation.setter
def support_direct_computation(self, value: bool) -> None:
self._old_computer.support_direct_computation = value

def start_computation(self) -> None:
if self._new_computer.has_assigned_task():
task_id = self.assigned_task_id
Expand Down Expand Up @@ -282,14 +290,7 @@ def compute(self) -> defer.Deferred:
assigned_task = self._assigned_task
assert assigned_task is not None

task_api_service = self._get_task_api_service(
task_id=assigned_task.task_id,
env_id=assigned_task.env_id,
prereq_dict=assigned_task.prereq_dict
)
# FIXME: Remove when ProviderAppClient implements shutdown
self._runtime = task_api_service._runtime # noqa pylint: disable=protected-access

task_api_service = self._get_task_api_service()
compute_future = asyncio.ensure_future(self._app_client.compute(
service=task_api_service,
task_id=assigned_task.task_id,
Expand All @@ -298,6 +299,9 @@ def compute(self) -> defer.Deferred:
))

self._computation = deferred_from_future(compute_future)
# FIXME: Remove when ProviderAppClient implements shutdown
self._runtime = task_api_service._runtime # noqa pylint: disable=protected-access

from twisted.internet import reactor
timeout = int(deadline_to_timeout(assigned_task.deadline))
self._computation.addTimeout(timeout, reactor)
Expand All @@ -307,9 +311,7 @@ def compute(self) -> defer.Deferred:
def _wait_until_computation_ends(self) -> defer.Deferred:
assigned_task = self._assigned_task
assert assigned_task is not None
task_dir = self._get_task_dir(
assigned_task.task_id,
assigned_task.env_id)
task_dir = self._get_task_dir()

success = False
try:
Expand Down Expand Up @@ -362,19 +364,21 @@ def _wait_until_computation_ends(self) -> defer.Deferred:
self._runtime = None
self._task_finished_callback()

def _get_task_dir(self, task_id: str, env_id: EnvId) -> Path:
def _get_task_dir(self) -> Path:
assert self._assigned_task is not None
env_id = self._assigned_task.env_id
task_id = self._assigned_task.task_id
return self._work_dir / env_id / task_id

def _get_task_api_service(
self,
task_id: str,
env_id: EnvId,
prereq_dict: dict
) -> EnvironmentTaskApiService:
def _get_task_api_service(self) -> EnvironmentTaskApiService:
assert self._assigned_task is not None
env_id = self._assigned_task.env_id
prereq_dict = self._assigned_task.prereq_dict

env = self._env_manager.environment(env_id)
payload_builder = self._env_manager.payload_builder(env_id)
prereq = env.parse_prerequisites(prereq_dict)
shared_dir = self._get_task_dir(task_id, env_id)
shared_dir = self._get_task_dir()

return EnvironmentTaskApiService(
env=env,
Expand Down
7 changes: 5 additions & 2 deletions tests/golem/task/dummy/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,11 @@ def add_resources(self, resource_parts):
self.resource_parts = resource_parts

def computation_failed(self, subtask_id: str, ban_node: bool = True):
print('DummyTask.computation_failed called')
self.computation_finished(subtask_id, None)
print(f'DummyTask.computation_failed called. subtask_id: {subtask_id}')
with self._lock:
if subtask_id in self.assigned_subtasks:
node_id = self.assigned_subtasks.pop(subtask_id, None)
self.assigned_nodes.pop(node_id, None)

def restart(self):
print('DummyTask.restart called')
Expand Down
13 changes: 6 additions & 7 deletions tests/golem/task/dummy/test_runner_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

from golem.network.transport.tcpnetwork import SocketAddress
from golem.testutils import DatabaseFixture
from tests.golem.task.dummy import runner, task
from tests.golem.task.dummy import runner


@mock.patch('golem.task.taskserver.NonHypervisedDockerCPUEnvironment')
class TestDummyTaskRunnerScript(DatabaseFixture):
"""Tests for the runner script"""

Expand All @@ -14,7 +13,7 @@ class TestDummyTaskRunnerScript(DatabaseFixture):
@mock.patch("tests.golem.task.dummy.runner.run_simulation")
def test_runner_dispatch_requesting(
self, mock_run_simulation, mock_run_computing_node,
mock_run_requesting_node, _):
mock_run_requesting_node):
args = ["runner.py", runner.REQUESTING_NODE_KIND, self.path, "7"]
runner.dispatch(args)
self.assertTrue(mock_run_requesting_node.called)
Expand All @@ -27,7 +26,7 @@ def test_runner_dispatch_requesting(
@mock.patch("tests.golem.task.dummy.runner.run_simulation")
def test_runner_dispatch_computing(
self, mock_run_simulation, mock_run_computing_node,
mock_run_requesting_node, _):
mock_run_requesting_node):
args = ["runner.py", runner.COMPUTING_NODE_KIND,
self.path, "1.2.3.4:5678", "pid", ]
runner.dispatch(args)
Expand All @@ -45,7 +44,7 @@ def test_runner_dispatch_computing(
@mock.patch("tests.golem.task.dummy.runner.run_simulation")
def test_runner_dispatch_computing_with_failure(
self, mock_run_simulation, mock_run_computing_node,
mock_run_requesting_node, _):
mock_run_requesting_node):
args = ["runner.py", runner.COMPUTING_NODE_KIND,
self.path, "10.0.255.127:16000", "pid", "25"]
runner.dispatch(args)
Expand All @@ -63,7 +62,7 @@ def test_runner_dispatch_computing_with_failure(
@mock.patch("tests.golem.task.dummy.runner.run_simulation")
def test_runner_run_simulation(
self, mock_run_simulation, mock_run_computing_node,
mock_run_requesting_node, _):
mock_run_requesting_node):
args = ["runner.py"]
mock_run_simulation.return_value = None
runner.dispatch(args)
Expand Down Expand Up @@ -105,7 +104,7 @@ def test_run_computing_node(self, mock_config_logging, mock_reactor, *_):
client.quit()

@mock.patch("subprocess.Popen")
def test_run_simulation(self, mock_popen, _):
def test_run_simulation(self, mock_popen):
mock_process = mock.MagicMock()
mock_process.pid = 12345
mock_popen.return_value = mock_process
Expand Down
68 changes: 49 additions & 19 deletions tests/golem/task/test_newtaskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ def setUp(self, provider_client): # pylint: disable=arguments-differ
self.task_finished_callback = mock.Mock()
self.stats_keeper = mock.Mock(spec=IntStatsKeeper)
self.provider_client = provider_client()
self.work_dir = Path('test')
self.task_computer = NewTaskComputer(
env_manager=self.env_manager,
work_dir=mock.Mock(),
work_dir=self.work_dir,
task_finished_callback=self.task_finished_callback,
stats_keeper=self.stats_keeper
)
Expand Down Expand Up @@ -69,30 +70,35 @@ def task_deadline(self):
def subtask_deadline(self):
return int(time.time()) + 3600

def get_task_header(self, **kwargs):
def _get_task_header(self, **kwargs):
return mock.Mock(
task_id=kwargs.get('task_id') or self.task_id,
environment=kwargs.get('env_id') or self.env_id,
environment_prerequistes=(
environment_prerequisites=(
kwargs.get('prereq_dict') or self.prereq_dict),
subtask_timeout=(
kwargs.get('subtask_timeout') or self.subtask_timeout),
deadline=kwargs.get('task_deadline') or self.task_deadline
)

def get_compute_task_def(self, **kwargs):
def _get_compute_task_def(self, **kwargs):
return ComputeTaskDef(
subtask_id=kwargs.get('subtask_id') or self.subtask_id,
extra_data=kwargs.get('subtask_params') or self.subtask_params,
performance=kwargs.get('performance') or self.performance,
deadline=kwargs.get('subtask_deadline') or self.subtask_deadline
)

def patch_async(self, name, *args, **kwargs):
def _patch_async(self, name, *args, **kwargs):
patcher = mock.patch(f'golem.task.taskcomputer.{name}', *args, **kwargs)
self.addCleanup(patcher.stop)
return patcher.start()

def _assign_task(self, **kwargs):
task_header = self._get_task_header(**kwargs)
compute_task_def = self._get_compute_task_def(**kwargs)
self.task_computer.task_given(task_header, compute_task_def)


class TestPrepare(NewTaskComputerTestBase):

Expand All @@ -118,19 +124,28 @@ def test_clean_up(self):
class TestTaskGiven(NewTaskComputerTestBase):

def test_ok(self, provider_timer):
task_header = self.get_task_header()
compute_task_def = self.get_compute_task_def()
self.assertFalse(self.task_computer.has_assigned_task())
self.assertIsNone(self.task_computer.assigned_task_id)
self.assertIsNone(self.task_computer.assigned_subtask_id)
self.assertIsNone(self.task_computer.get_current_computing_env())

task_header = self._get_task_header()
compute_task_def = self._get_compute_task_def()
self.task_computer.task_given(task_header, compute_task_def)

self.assertTrue(self.task_computer.has_assigned_task())
self.assertEqual(self.task_computer.assigned_task_id, self.task_id)
self.assertEqual(
self.task_computer.assigned_subtask_id,
self.subtask_id)
self.assertEqual(
self.task_computer.get_current_computing_env(),
self.env_id)
provider_timer.start.assert_called_once_with()

def test_has_assigned_task(self, provider_timer):
task_header = self.get_task_header()
compute_task_def = self.get_compute_task_def()
task_header = self._get_task_header()
compute_task_def = self._get_compute_task_def()
self.task_computer.task_given(task_header, compute_task_def)
provider_timer.reset_mock()
with self.assertRaises(AssertionError):
Expand Down Expand Up @@ -159,29 +174,24 @@ def setUp(self): # pylint: disable=arguments-differ
spec=TaskApiService,
_runtime=self.runtime
)
self.patch_async(
self._patch_async(
'NewTaskComputer._get_task_api_service',
return_value=self.task_api_service
)
self.task_dir = Path('task_dir')
self.patch_async(
self._patch_async(
'NewTaskComputer._get_task_dir',
return_value=self.task_dir
)
self.provider_timer = self.patch_async('ProviderTimer')
self.dispatcher = self.patch_async('dispatcher')
self.logger = self.patch_async('logger')
self.provider_timer = self._patch_async('ProviderTimer')
self.dispatcher = self._patch_async('dispatcher')
self.logger = self._patch_async('logger')

@defer.inlineCallbacks
def test_no_assigned_task(self):
with self.assertRaises(AssertionError):
yield self.task_computer.compute()

def _assign_task(self, **kwargs):
task_header = self.get_task_header(**kwargs)
compute_task_def = self.get_compute_task_def(**kwargs)
self.task_computer.task_given(task_header, compute_task_def)

@defer.inlineCallbacks
def test_ok(self):
self._assign_task()
Expand Down Expand Up @@ -300,6 +310,26 @@ def test_task_error(self):
self.assertFalse(self.task_computer.has_assigned_task())


class TestGetTaskApiService(NewTaskComputerTestBase):

@mock.patch('golem.task.taskcomputer.EnvironmentTaskApiService')
def test_get_task_api_service(self, env_task_api_service):
self._assign_task()
service = self.task_computer._get_task_api_service()
self.env_manager.environment.assert_called_once_with(self.env_id)
self.env_manager.payload_builder.assert_called_once_with(self.env_id)
self.env_manager.environment().parse_prerequisites\
.assert_called_once_with(self.prereq_dict)

self.assertEqual(service, env_task_api_service.return_value)
env_task_api_service.assert_called_once_with(
env=self.env_manager.environment(),
prereq=self.env_manager.environment().parse_prerequisites(),
shared_dir=self.work_dir / self.env_id / self.task_id,
payload_builder=self.env_manager.payload_builder()
)


class TestChangeConfig(NewTaskComputerTestBase):

@defer.inlineCallbacks
Expand Down

0 comments on commit 11e053d

Please sign in to comment.