From 5ed97d372d6ae80e0b1453b7e0c03e748c08e18d Mon Sep 17 00:00:00 2001 From: Adam Wierzbicki Date: Wed, 26 Jun 2019 14:29:52 +0200 Subject: [PATCH 1/3] Moved requesting tasks out of TaskComputer to TaskServer --- golem/task/taskcomputer.py | 19 +---- golem/task/taskserver.py | 33 ++++--- tests/golem/docker/test_docker_task.py | 2 +- tests/golem/docker/test_docker_task_thread.py | 2 +- tests/golem/task/test_taskcomputer.py | 50 +---------- tests/golem/task/test_taskserver.py | 85 +++++++++++++++++-- 6 files changed, 106 insertions(+), 85 deletions(-) diff --git a/golem/task/taskcomputer.py b/golem/task/taskcomputer.py index 3bfd564929..11477b7434 100644 --- a/golem/task/taskcomputer.py +++ b/golem/task/taskcomputer.py @@ -66,11 +66,9 @@ def __init__( # Is task computer currently able to run computation? self.runnable = True self.listeners = [] - self.last_task_request = time.time() self.dir_manager: DirManager = DirManager( task_server.get_task_computer_root()) - self.task_request_frequency = None self.docker_manager: DockerManager = DockerManager.install() if use_docker_manager: @@ -187,14 +185,9 @@ def task_computed(self, task_thread: TaskThread) -> None: success=was_success, value=work_time_to_be_paid) self._task_finished() - def run(self): - """ Main loop of task computer """ + def check_timeout(self): if self.counting_thread is not None: self.counting_thread.check_timeout() - elif self.compute_tasks and self.runnable: - last_request = time.time() - self.last_task_request - if last_request > self.task_request_frequency: - self.__request_task() def get_progress(self) -> Optional[ComputingSubtaskStateSnapshot]: if not self.is_computing() or self.assigned_subtask is None: @@ -250,7 +243,6 @@ def change_config( ) -> Deferred: self.dir_manager = DirManager( self.task_server.get_task_computer_root()) - self.task_request_frequency = config_desc.task_request_interval self.compute_tasks = config_desc.accept_tasks \ and not config_desc.in_shutdown return self.change_docker_config( @@ -333,15 +325,6 @@ def lock_config(self, on=True): for l in self.listeners: l.lock_config(on) - def __request_task(self): - if self.has_assigned_task(): - return - - self.last_task_request = time.time() - requested_task = self.task_server.request_task() - if requested_task is not None: - self.stats.increase_stat('tasks_requested') - def start_computation(self) -> None: # pylint: disable=too-many-locals subtask = self.assigned_subtask assert subtask is not None diff --git a/golem/task/taskserver.py b/golem/task/taskserver.py index 8a0fb1e6e4..72fcad6662 100644 --- a/golem/task/taskserver.py +++ b/golem/task/taskserver.py @@ -167,6 +167,7 @@ def __init__(self, self.acl_ip = DenyAcl([], max_times=config_desc.disallow_ip_max_times) self.resource_handshakes = {} self.requested_tasks: Set[str] = set() + self._last_task_request_time: float = time.time() network = TCPNetwork( ProtocolFactory(SafeProtocol, self, SessionFactory(TaskSession)), @@ -203,7 +204,8 @@ def sync_network(self, timeout=None): ), self._sync_pending, self._send_waiting_results, - self.task_computer.run, + self._request_random_task, + self.task_computer.check_timeout, self.task_connections_helper.sync, self._sync_forwarded_session_requests, self.__remove_old_tasks, @@ -240,23 +242,34 @@ def get_environment_by_id(self, env_id): env_id) def request_task_by_id(self, task_id: str) -> None: - """Requests task possibly after successful resource handshake. - """ + """ Requests task possibly after successful resource handshake. """ try: - task_header: dt_tasks.TaskHeader = self.task_keeper.task_headers[ - task_id - ] + task_header = self.task_keeper.task_headers[task_id] except KeyError: logger.debug("Task missing in TaskKeeper. task_id=%s", task_id) return self._request_task(task_header) - def request_task(self) -> Optional[str]: - """Chooses random task from network to compute on our machine""" - task_header: dt_tasks.TaskHeader = \ - self.task_keeper.get_task(self.requested_tasks) + def _request_random_task(self) -> Optional[str]: + """ If there is no task currently computing and time elapsed from last + request exceeds the configured request interval, choose a random + task from the network to compute on our machine. """ + + if self.task_computer.has_assigned_task() \ + or (not self.task_computer.compute_tasks) \ + or (not self.task_computer.runnable): + return None + + if time.time() - self._last_task_request_time \ + < self.config_desc.task_request_interval: + return None + + task_header = self.task_keeper.get_task(self.requested_tasks) if task_header is None: return None + + self._last_task_request_time = time.time() + self.task_computer.stats.increase_stat('tasks_requested') return self._request_task(task_header) def _request_task(self, theader: dt_tasks.TaskHeader) -> Optional[str]: diff --git a/tests/golem/docker/test_docker_task.py b/tests/golem/docker/test_docker_task.py index 0957747bf2..fa193e0e1b 100644 --- a/tests/golem/docker/test_docker_task.py +++ b/tests/golem/docker/test_docker_task.py @@ -183,7 +183,7 @@ def _run_task(self, task: Task, timeout: int = 60 * 5, *_) \ if task_thread: task_thread.join(timeout) - task_computer.run() + task_computer.check_timeout() return task_thread diff --git a/tests/golem/docker/test_docker_task_thread.py b/tests/golem/docker/test_docker_task_thread.py index cf87f860b2..f50bcc6565 100644 --- a/tests/golem/docker/test_docker_task_thread.py +++ b/tests/golem/docker/test_docker_task_thread.py @@ -65,7 +65,7 @@ def test(): ct = task_computer.counting_thread while ct and ct.is_alive(): - task_computer.run() + task_computer.check_timeout() if time.time() - started > 15: self.fail("Job timed out") diff --git a/tests/golem/task/test_taskcomputer.py b/tests/golem/task/test_taskcomputer.py index f6684ddf5b..527ca7a86d 100644 --- a/tests/golem/task/test_taskcomputer.py +++ b/tests/golem/task/test_taskcomputer.py @@ -57,46 +57,10 @@ def test_init(self): use_docker_manager=False) self.assertIsInstance(tc, TaskComputer) - def test_run(self): - task_server = self.task_server - task_server.config_desc.task_request_interval = 0.5 - task_server.config_desc.accept_tasks = True - task_server.get_task_computer_root.return_value = self.path - tc = TaskComputer( - task_server, - self.docker_cpu_env, - use_docker_manager=False) - self.assertIsNone(tc.counting_thread) - tc.last_task_request = 0 - tc.run() - task_server.request_task.assert_called_with() - task_server.request_task = mock.MagicMock() - task_server.config_desc.accept_tasks = False - tc2 = TaskComputer( - task_server, - self.docker_cpu_env, - use_docker_manager=False) - tc2.counting_thread = None - tc2.last_task_request = 0 - - tc2.run() - task_server.request_task.assert_not_called() - - tc2.runnable = True - tc2.compute_tasks = True - - tc2.last_task_request = 0 - tc2.counting_thread = None - - tc2.run() - - assert task_server.request_task.called - - task_server.request_task.called = False - - tc2.last_checking = 10 ** 10 - - tc2.run() + def test_check_timeout(self): + self.task_computer.counting_thread = mock.Mock() + self.task_computer.check_timeout() + self.task_computer.counting_thread.check_timeout.assert_called_once() def test_computation(self): # pylint: disable=too-many-statements # FIXME Refactor too single tests and remove disable too many @@ -480,12 +444,6 @@ def test_root_path(self, change_docker_config): in_background=True ) - def test_task_request_frequency(self, _): - config_desc = ClientConfigDescriptor() - config_desc.task_request_interval = 100 - self.task_computer.change_config(config_desc) - self.assertEqual(self.task_computer.task_request_frequency, 100) - def _test_compute_tasks(self, accept_tasks, in_shutdown, expected): config_desc = ClientConfigDescriptor() config_desc.accept_tasks = accept_tasks diff --git a/tests/golem/task/test_taskserver.py b/tests/golem/task/test_taskserver.py index f63fd2980a..a915b5200d 100644 --- a/tests/golem/task/test_taskserver.py +++ b/tests/golem/task/test_taskserver.py @@ -1,5 +1,6 @@ # pylint: disable=protected-access, too-many-lines import os +import time from datetime import datetime, timedelta import random import tempfile @@ -159,7 +160,7 @@ def test_request(self, tar, *_): ts.client.get_suggested_addr.return_value = "10.10.10.10" ts.client.get_requesting_trust.return_value = 0.3 self.assertIsInstance(ts, TaskServer) - self.assertIsNone(ts.request_task()) + self.assertIsNone(ts._request_random_task()) keys_auth = KeysAuth(self.path, 'prv_key', '') task_header = get_example_task_header(keys_auth.public_key) @@ -176,7 +177,7 @@ def test_request(self, tar, *_): self.ts.get_key_id = Mock(return_value='0'*128) self.ts.keys_auth.eth_addr = pubkey_to_address('0' * 128) ts.add_task_header(task_header) - self.assertEqual(ts.request_task(), task_id) + self.assertEqual(ts._request_random_task(), task_id) self.assertIn(task_id, ts.requested_tasks) assert ts.remove_task_header(task_id) self.assertNotIn(task_id, ts.requested_tasks) @@ -196,7 +197,7 @@ def test_request(self, tar, *_): task_header = get_example_task_header(keys_auth.public_key) task_id3 = task_header.task_id ts.add_task_header(task_header) - self.assertIsNone(ts.request_task()) + self.assertIsNone(ts._request_random_task()) tar.add_support_status.assert_called_with( task_id3, SupportStatus( @@ -211,7 +212,7 @@ def test_request(self, tar, *_): task_id4 = task_header.task_id task_header.max_price = 1 ts.add_task_header(task_header) - self.assertIsNone(ts.request_task()) + self.assertIsNone(ts._request_random_task()) tar.add_support_status.assert_called_with( task_id4, SupportStatus( @@ -225,7 +226,7 @@ def test_request(self, tar, *_): task_header = get_example_task_header(keys_auth.public_key) task_id5 = task_header.task_id ts.add_task_header(task_header) - self.assertIsNone(ts.request_task()) + self.assertIsNone(ts._request_random_task()) tar.add_support_status.assert_called_with( task_id5, SupportStatus( @@ -241,13 +242,14 @@ def test_request_task_concent_required(self, *_): self.ts.config_desc.min_price = 0 self.ts.client.concent_service.enabled = True self.ts.task_archiver = Mock() + self.ts._last_task_request_time = 0.0 keys_auth = KeysAuth(self.path, 'prv_key', '') task_header = get_example_task_header(keys_auth.public_key) task_header.concent_enabled = False task_header.sign(private_key=keys_auth._private_key) self.ts.add_task_header(task_header) - self.assertIsNone(self.ts.request_task()) + self.assertIsNone(self.ts._request_random_task()) self.ts.task_archiver.add_support_status.assert_called_once_with( task_header.task_id, SupportStatus( @@ -263,12 +265,9 @@ def test_change_config(self, *_): ccd2.task_session_timeout = 124 ccd2.min_price = 0.0057 ccd2.task_request_interval = 31 - # ccd2.use_waiting_ttl = False ts.change_config(ccd2) self.assertEqual(ts.config_desc, ccd2) self.assertEqual(ts.task_keeper.min_price, 0.0057) - self.assertEqual(ts.task_computer.task_request_frequency, 31) - # self.assertEqual(ts.task_computer.use_waiting_ttl, False) @patch("golem.task.taskserver.TaskServer._sync_pending") def test_sync(self, mock_sync_pending, *_): @@ -1216,3 +1215,71 @@ def test_ok(self, interrupted, send_task_failed, logger_mock): 'test_task', 'Error downloading resources: test_reason' ) + + +class TestRequestRandomTask(TaskServerTestBase): + + def setUp(self): + super().setUp() + self.ts.task_computer = MagicMock() + self.ts.task_keeper = MagicMock() + + def test_task_already_assigned(self): + self.ts.task_computer.has_assigned_task.return_value = True + self.ts.task_computer.compute_tasks = True + self.ts.task_computer.runnable = True + + self.assertIsNone(self.ts._request_random_task()) + + def test_task_computer_not_accepting_tasks(self): + self.ts.task_computer.has_assigned_task.return_value = False + self.ts.task_computer.compute_tasks = False + self.ts.task_computer.runnable = True + + self.assertIsNone(self.ts._request_random_task()) + + def test_task_computer_not_runnable(self): + self.ts.task_computer.has_assigned_task.return_value = False + self.ts.task_computer.compute_tasks = True + self.ts.task_computer.runnable = False + + self.assertIsNone(self.ts._request_random_task()) + + @freezegun.freeze_time() + def test_request_interval(self): + self.ts.task_computer.has_assigned_task.return_value = False + self.ts.task_computer.compute_tasks = True + self.ts.task_computer.runnable = True + self.ts.config_desc.task_request_interval = 1.0 + self.ts._last_task_request_time = time.time() + + self.assertIsNone(self.ts._request_random_task()) + + @freezegun.freeze_time() + def test_no_supported_tasks_in_task_keeper(self): + self.ts.task_computer.has_assigned_task.return_value = False + self.ts.task_computer.compute_tasks = True + self.ts.task_computer.runnable = True + self.ts.config_desc.task_request_interval = 1.0 + self.ts._last_task_request_time = time.time() - 1.0 + self.ts.task_keeper.get_task.return_value = None + + self.assertIsNone(self.ts._request_random_task()) + + @freezegun.freeze_time() + @patch('golem.task.taskserver.TaskServer._request_task') + def test_ok(self, request_task): + self.ts.task_computer.has_assigned_task.return_value = False + self.ts.task_computer.compute_tasks = True + self.ts.task_computer.runnable = True + self.ts.config_desc.task_request_interval = 1.0 + self.ts._last_task_request_time = time.time() - 1.0 + task_header = Mock() + self.ts.task_keeper.get_task.return_value = task_header + + result = self.ts._request_random_task() + self.assertEqual(result, request_task.return_value) + self.assertEqual(self.ts._last_task_request_time, time.time()) + self.ts.task_computer.stats.increase_stat.assert_called_once_with( + 'tasks_requested') + request_task.assert_called_once_with(task_header) From 8391c1dbbd6f6bdb2d09291acca0c8dd028d162c Mon Sep 17 00:00:00 2001 From: Adam Wierzbicki Date: Wed, 26 Jun 2019 15:24:03 +0200 Subject: [PATCH 2/3] Assert that assigned subtask is not None in _task_finished() --- golem/task/taskcomputer.py | 1 + tests/golem/task/test_taskcomputer.py | 33 +++++++++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/golem/task/taskcomputer.py b/golem/task/taskcomputer.py index 11477b7434..51a02e3c55 100644 --- a/golem/task/taskcomputer.py +++ b/golem/task/taskcomputer.py @@ -389,6 +389,7 @@ def start_computation(self) -> None: # pylint: disable=too-many-locals def _task_finished(self) -> None: ctd = self.assigned_subtask + assert ctd is not None self.assigned_subtask = None ProviderTimer.finish() diff --git a/tests/golem/task/test_taskcomputer.py b/tests/golem/task/test_taskcomputer.py index 527ca7a86d..be94b7281b 100644 --- a/tests/golem/task/test_taskcomputer.py +++ b/tests/golem/task/test_taskcomputer.py @@ -640,3 +640,36 @@ def test_ok(self, task_finished): self.task_computer.assigned_subtask = mock.Mock() self.task_computer.task_interrupted() task_finished.assert_called_once_with() + + +class TestTaskFinished(TestTaskComputerBase): + + def test_no_assigned_subtask(self): + with self.assertRaises(AssertionError): + self.task_computer._task_finished() + + @mock.patch('golem.task.taskcomputer.dispatcher') + @mock.patch('golem.task.taskcomputer.ProviderTimer') + def test_ok(self, provider_timer, dispatcher): + ctd = ComputeTaskDef( + task_id='test_task', + subtask_id='test_subtask', + performance=123 + ) + self.task_computer.assigned_subtask = ctd + self.task_computer.counting_thread = mock.Mock() + self.task_computer.finished_cb = mock.Mock() + + self.task_computer._task_finished() + self.assertIsNone(self.task_computer.assigned_subtask) + self.assertIsNone(self.task_computer.counting_thread) + provider_timer.finish.assert_called_once_with() + dispatcher.send.assert_called_once_with( + signal='golem.taskcomputer', + event='subtask_finished', + subtask_id=ctd['subtask_id'], + min_performance=ctd['performance'] + ) + self.task_server.task_keeper.task_ended.assert_called_once_with( + ctd['task_id']) + self.task_computer.finished_cb.assert_called_once_with() From 056181b36367e4aac9775ac9f8b28fcb128c6079 Mon Sep 17 00:00:00 2001 From: Adam Wierzbicki Date: Thu, 27 Jun 2019 10:49:55 +0200 Subject: [PATCH 3/3] Moved interval checking to the top of _request_random_task() --- golem/task/taskserver.py | 8 +++---- tests/golem/task/test_taskserver.py | 33 +++++++++++++++-------------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/golem/task/taskserver.py b/golem/task/taskserver.py index 72fcad6662..c21f47c513 100644 --- a/golem/task/taskserver.py +++ b/golem/task/taskserver.py @@ -255,15 +255,15 @@ def _request_random_task(self) -> Optional[str]: request exceeds the configured request interval, choose a random task from the network to compute on our machine. """ + if time.time() - self._last_task_request_time \ + < self.config_desc.task_request_interval: + return None + if self.task_computer.has_assigned_task() \ or (not self.task_computer.compute_tasks) \ or (not self.task_computer.runnable): return None - if time.time() - self._last_task_request_time \ - < self.config_desc.task_request_interval: - return None - task_header = self.task_keeper.get_task(self.requested_tasks) if task_header is None: return None diff --git a/tests/golem/task/test_taskserver.py b/tests/golem/task/test_taskserver.py index a915b5200d..849b589888 100644 --- a/tests/golem/task/test_taskserver.py +++ b/tests/golem/task/test_taskserver.py @@ -1217,6 +1217,7 @@ def test_ok(self, interrupted, send_task_failed, logger_mock): ) +@freezegun.freeze_time() class TestRequestRandomTask(TaskServerTestBase): def setUp(self): @@ -1224,7 +1225,15 @@ def setUp(self): self.ts.task_computer = MagicMock() self.ts.task_keeper = MagicMock() + def test_request_interval(self): + self.ts.config_desc.task_request_interval = 1.0 + self.ts._last_task_request_time = time.time() + + self.assertIsNone(self.ts._request_random_task()) + def test_task_already_assigned(self): + self.ts.config_desc.task_request_interval = 1.0 + self.ts._last_task_request_time = time.time() - 1.0 self.ts.task_computer.has_assigned_task.return_value = True self.ts.task_computer.compute_tasks = True self.ts.task_computer.runnable = True @@ -1232,6 +1241,8 @@ def test_task_already_assigned(self): self.assertIsNone(self.ts._request_random_task()) def test_task_computer_not_accepting_tasks(self): + self.ts.config_desc.task_request_interval = 1.0 + self.ts._last_task_request_time = time.time() - 1.0 self.ts.task_computer.has_assigned_task.return_value = False self.ts.task_computer.compute_tasks = False self.ts.task_computer.runnable = True @@ -1239,41 +1250,31 @@ def test_task_computer_not_accepting_tasks(self): self.assertIsNone(self.ts._request_random_task()) def test_task_computer_not_runnable(self): + self.ts.config_desc.task_request_interval = 1.0 + self.ts._last_task_request_time = time.time() - 1.0 self.ts.task_computer.has_assigned_task.return_value = False self.ts.task_computer.compute_tasks = True self.ts.task_computer.runnable = False self.assertIsNone(self.ts._request_random_task()) - @freezegun.freeze_time() - def test_request_interval(self): - self.ts.task_computer.has_assigned_task.return_value = False - self.ts.task_computer.compute_tasks = True - self.ts.task_computer.runnable = True - self.ts.config_desc.task_request_interval = 1.0 - self.ts._last_task_request_time = time.time() - - self.assertIsNone(self.ts._request_random_task()) - - @freezegun.freeze_time() def test_no_supported_tasks_in_task_keeper(self): + self.ts.config_desc.task_request_interval = 1.0 + self.ts._last_task_request_time = time.time() - 1.0 self.ts.task_computer.has_assigned_task.return_value = False self.ts.task_computer.compute_tasks = True self.ts.task_computer.runnable = True - self.ts.config_desc.task_request_interval = 1.0 - self.ts._last_task_request_time = time.time() - 1.0 self.ts.task_keeper.get_task.return_value = None self.assertIsNone(self.ts._request_random_task()) - @freezegun.freeze_time() @patch('golem.task.taskserver.TaskServer._request_task') def test_ok(self, request_task): + self.ts.config_desc.task_request_interval = 1.0 + self.ts._last_task_request_time = time.time() - 1.0 self.ts.task_computer.has_assigned_task.return_value = False self.ts.task_computer.compute_tasks = True self.ts.task_computer.runnable = True - self.ts.config_desc.task_request_interval = 1.0 - self.ts._last_task_request_time = time.time() - 1.0 task_header = Mock() self.ts.task_keeper.get_task.return_value = task_header