Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Unit tests for new TaskComputer
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiezzel committed Jul 29, 2019
1 parent 5d7e23e commit 166049c
Showing 1 changed file with 141 additions and 0 deletions.
141 changes: 141 additions & 0 deletions tests/golem/task/test_newtaskcomputer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import time
from unittest import mock

from golem_messages.message import ComputeTaskDef
from twisted.internet import defer
from twisted.trial.unittest import TestCase as TwistedTestCase

from golem.core.statskeeper import IntStatsKeeper
from golem.envs.docker.cpu import DockerCPUEnvironment
from golem.task.envmanager import EnvironmentManager
from golem.task.taskcomputer import NewTaskComputer


class NewTaskComputerTestBase(TwistedTestCase):

@mock.patch('golem.task.taskcomputer.ProviderAppClient')
def setUp(self, provider_client) -> None:
self.env_manager = mock.Mock(spec=EnvironmentManager)
self.task_finished_callback = mock.Mock()
self.stats_keeper = mock.Mock(spec=IntStatsKeeper)
self.provider_client = provider_client()
self.task_computer = NewTaskComputer(
env_manager=self.env_manager,
work_dir=mock.Mock(),
task_finished_callback=self.task_finished_callback,
stats_keeper=self.stats_keeper
)

@property
def task_id(self):
return 'test_task'

@property
def subtask_id(self):
return 'test_subtask'

@property
def subtask_params(self):
return {'test_param': 'test_value'}

@property
def env_id(self):
return 'test_env'

@property
def prereq_dict(self):
return {'test_prereq': 'test_value'}

@property
def performance(self):
return 2137

@property
def subtask_timeout(self):
return 3600

@property
def task_deadline(self):
return int(time.time()) + 3600

@property
def subtask_deadline(self):
return int(time.time()) + 3600

def get_task_header(self, **kwargs):
return mock.Mock(
task_id=kwargs.get('task_id') or self.task_id,
environment=kwargs.get('env_id') or self.env_id,
environment_prerequistes=(
kwargs.get('prereq_dict') or self.prereq_dict),
subtask_timeout=(
kwargs.get('subtask_timeout') or self.subtask_timeout),
deadline=kwargs.get('task_deadline') or self.task_deadline
)

def get_compute_task_def(self, **kwargs):
return ComputeTaskDef(
subtask_id=kwargs.get('subtask_id') or self.subtask_id,
extra_data=kwargs.get('subtask_params') or self.subtask_params,
performance=kwargs.get('performance') or self.performance,
deadline=kwargs.get('subtask_deadline') or self.subtask_deadline
)

def patch_async(self, name, *args, **kwargs):
patcher = mock.patch(f'golem.task.taskcomputer.{name}', *args, **kwargs)
self.addCleanup(patcher.stop)
return patcher.start()


class TestPrepare(NewTaskComputerTestBase):

@defer.inlineCallbacks
def test(self):
yield self.task_computer.prepare()
self.env_manager.environment.assert_called_once_with(
DockerCPUEnvironment.ENV_ID)
self.env_manager.environment().prepare.assert_called_once()


class TestCleanUp(NewTaskComputerTestBase):

@defer.inlineCallbacks
def test(self):
yield self.task_computer.clean_up()
self.env_manager.environment.assert_called_once_with(
DockerCPUEnvironment.ENV_ID)
self.env_manager.environment().clean_up.assert_called_once()


@mock.patch('golem.task.taskcomputer.ProviderTimer')
class TestTaskGiven(NewTaskComputerTestBase):

def test_ok(self, provider_timer):
task_header = self.get_task_header()
compute_task_def = self.get_compute_task_def()
self.task_computer.task_given(task_header, compute_task_def)
self.assertTrue(self.task_computer.has_assigned_task())
self.assertEqual(self.task_computer.assigned_task_id, self.task_id)
self.assertEqual(
self.task_computer.assigned_subtask_id,
self.subtask_id)
provider_timer.start.assert_called_once_with()

def test_has_assigned_task(self, provider_timer):
task_header = self.get_task_header()
compute_task_def = self.get_compute_task_def()
self.task_computer.task_given(task_header, compute_task_def)
provider_timer.reset_mock()
with self.assertRaises(AssertionError):
self.task_computer.task_given(task_header, compute_task_def)
provider_timer.start.assert_not_called()


class TestCompute(NewTaskComputerTestBase):

@defer.inlineCallbacks
def test_no_assigned_task(self):
with self.assertRaises(AssertionError):
yield self.task_computer.compute()


0 comments on commit 166049c

Please sign in to comment.