Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Moved requesting tasks out of TaskComputer to TaskServer #4389

Merged
merged 3 commits into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 2 additions & 18 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,9 @@ def __init__(
# Is task computer currently able to run computation?
self.runnable = True
self.listeners = []
self.last_task_request = time.time()

self.dir_manager: DirManager = DirManager(
task_server.get_task_computer_root())
self.task_request_frequency = None

self.docker_manager: DockerManager = DockerManager.install()
if use_docker_manager:
Expand Down Expand Up @@ -187,14 +185,9 @@ def task_computed(self, task_thread: TaskThread) -> None:
success=was_success, value=work_time_to_be_paid)
self._task_finished()

def run(self):
""" Main loop of task computer """
def check_timeout(self):
if self.counting_thread is not None:
self.counting_thread.check_timeout()
elif self.compute_tasks and self.runnable:
last_request = time.time() - self.last_task_request
if last_request > self.task_request_frequency:
self.__request_task()

def get_progress(self) -> Optional[ComputingSubtaskStateSnapshot]:
if not self.is_computing() or self.assigned_subtask is None:
Expand Down Expand Up @@ -250,7 +243,6 @@ def change_config(
) -> Deferred:
self.dir_manager = DirManager(
self.task_server.get_task_computer_root())
self.task_request_frequency = config_desc.task_request_interval
self.compute_tasks = config_desc.accept_tasks \
and not config_desc.in_shutdown
return self.change_docker_config(
Expand Down Expand Up @@ -333,15 +325,6 @@ def lock_config(self, on=True):
for l in self.listeners:
l.lock_config(on)

def __request_task(self):
if self.has_assigned_task():
return

self.last_task_request = time.time()
requested_task = self.task_server.request_task()
if requested_task is not None:
self.stats.increase_stat('tasks_requested')

def start_computation(self) -> None: # pylint: disable=too-many-locals
subtask = self.assigned_subtask
assert subtask is not None
Expand Down Expand Up @@ -406,6 +389,7 @@ def start_computation(self) -> None: # pylint: disable=too-many-locals

def _task_finished(self) -> None:
ctd = self.assigned_subtask
assert ctd is not None
self.assigned_subtask = None

ProviderTimer.finish()
Expand Down
33 changes: 23 additions & 10 deletions golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def __init__(self,
self.acl_ip = DenyAcl([], max_times=config_desc.disallow_ip_max_times)
self.resource_handshakes = {}
self.requested_tasks: Set[str] = set()
self._last_task_request_time: float = time.time()

network = TCPNetwork(
ProtocolFactory(SafeProtocol, self, SessionFactory(TaskSession)),
Expand Down Expand Up @@ -203,7 +204,8 @@ def sync_network(self, timeout=None):
),
self._sync_pending,
self._send_waiting_results,
self.task_computer.run,
self._request_random_task,
self.task_computer.check_timeout,
self.task_connections_helper.sync,
self._sync_forwarded_session_requests,
self.__remove_old_tasks,
Expand Down Expand Up @@ -240,23 +242,34 @@ def get_environment_by_id(self, env_id):
env_id)

def request_task_by_id(self, task_id: str) -> None:
"""Requests task possibly after successful resource handshake.
"""
""" Requests task possibly after successful resource handshake. """
try:
task_header: dt_tasks.TaskHeader = self.task_keeper.task_headers[
task_id
]
task_header = self.task_keeper.task_headers[task_id]
except KeyError:
logger.debug("Task missing in TaskKeeper. task_id=%s", task_id)
return
self._request_task(task_header)

def request_task(self) -> Optional[str]:
"""Chooses random task from network to compute on our machine"""
task_header: dt_tasks.TaskHeader = \
self.task_keeper.get_task(self.requested_tasks)
def _request_random_task(self) -> Optional[str]:
""" If there is no task currently computing and time elapsed from last
request exceeds the configured request interval, choose a random
task from the network to compute on our machine. """

if self.task_computer.has_assigned_task() \
or (not self.task_computer.compute_tasks) \
or (not self.task_computer.runnable):
return None

if time.time() - self._last_task_request_time \
Wiezzel marked this conversation as resolved.
Show resolved Hide resolved
< self.config_desc.task_request_interval:
return None

task_header = self.task_keeper.get_task(self.requested_tasks)
if task_header is None:
return None

self._last_task_request_time = time.time()
self.task_computer.stats.increase_stat('tasks_requested')
return self._request_task(task_header)

def _request_task(self, theader: dt_tasks.TaskHeader) -> Optional[str]:
Expand Down
2 changes: 1 addition & 1 deletion tests/golem/docker/test_docker_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def _run_task(self, task: Task, timeout: int = 60 * 5, *_) \

if task_thread:
task_thread.join(timeout)
task_computer.run()
task_computer.check_timeout()

return task_thread

Expand Down
2 changes: 1 addition & 1 deletion tests/golem/docker/test_docker_task_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def test():
ct = task_computer.counting_thread

while ct and ct.is_alive():
task_computer.run()
task_computer.check_timeout()

if time.time() - started > 15:
self.fail("Job timed out")
Expand Down
83 changes: 37 additions & 46 deletions tests/golem/task/test_taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,46 +57,10 @@ def test_init(self):
use_docker_manager=False)
self.assertIsInstance(tc, TaskComputer)

def test_run(self):
task_server = self.task_server
task_server.config_desc.task_request_interval = 0.5
task_server.config_desc.accept_tasks = True
task_server.get_task_computer_root.return_value = self.path
tc = TaskComputer(
task_server,
self.docker_cpu_env,
use_docker_manager=False)
self.assertIsNone(tc.counting_thread)
tc.last_task_request = 0
tc.run()
task_server.request_task.assert_called_with()
task_server.request_task = mock.MagicMock()
task_server.config_desc.accept_tasks = False
tc2 = TaskComputer(
task_server,
self.docker_cpu_env,
use_docker_manager=False)
tc2.counting_thread = None
tc2.last_task_request = 0

tc2.run()
task_server.request_task.assert_not_called()

tc2.runnable = True
tc2.compute_tasks = True

tc2.last_task_request = 0
tc2.counting_thread = None

tc2.run()

assert task_server.request_task.called

task_server.request_task.called = False

tc2.last_checking = 10 ** 10

tc2.run()
def test_check_timeout(self):
self.task_computer.counting_thread = mock.Mock()
self.task_computer.check_timeout()
self.task_computer.counting_thread.check_timeout.assert_called_once()

def test_computation(self): # pylint: disable=too-many-statements
# FIXME Refactor too single tests and remove disable too many
Expand Down Expand Up @@ -480,12 +444,6 @@ def test_root_path(self, change_docker_config):
in_background=True
)

def test_task_request_frequency(self, _):
config_desc = ClientConfigDescriptor()
config_desc.task_request_interval = 100
self.task_computer.change_config(config_desc)
self.assertEqual(self.task_computer.task_request_frequency, 100)

def _test_compute_tasks(self, accept_tasks, in_shutdown, expected):
config_desc = ClientConfigDescriptor()
config_desc.accept_tasks = accept_tasks
Expand Down Expand Up @@ -682,3 +640,36 @@ def test_ok(self, task_finished):
self.task_computer.assigned_subtask = mock.Mock()
self.task_computer.task_interrupted()
task_finished.assert_called_once_with()


class TestTaskFinished(TestTaskComputerBase):

def test_no_assigned_subtask(self):
with self.assertRaises(AssertionError):
self.task_computer._task_finished()

@mock.patch('golem.task.taskcomputer.dispatcher')
@mock.patch('golem.task.taskcomputer.ProviderTimer')
def test_ok(self, provider_timer, dispatcher):
ctd = ComputeTaskDef(
task_id='test_task',
subtask_id='test_subtask',
performance=123
)
self.task_computer.assigned_subtask = ctd
self.task_computer.counting_thread = mock.Mock()
self.task_computer.finished_cb = mock.Mock()

self.task_computer._task_finished()
self.assertIsNone(self.task_computer.assigned_subtask)
self.assertIsNone(self.task_computer.counting_thread)
provider_timer.finish.assert_called_once_with()
dispatcher.send.assert_called_once_with(
signal='golem.taskcomputer',
event='subtask_finished',
subtask_id=ctd['subtask_id'],
min_performance=ctd['performance']
)
self.task_server.task_keeper.task_ended.assert_called_once_with(
ctd['task_id'])
self.task_computer.finished_cb.assert_called_once_with()
85 changes: 76 additions & 9 deletions tests/golem/task/test_taskserver.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# pylint: disable=protected-access, too-many-lines
import os
import time
from datetime import datetime, timedelta
import random
import tempfile
Expand Down Expand Up @@ -159,7 +160,7 @@ def test_request(self, tar, *_):
ts.client.get_suggested_addr.return_value = "10.10.10.10"
ts.client.get_requesting_trust.return_value = 0.3
self.assertIsInstance(ts, TaskServer)
self.assertIsNone(ts.request_task())
self.assertIsNone(ts._request_random_task())

keys_auth = KeysAuth(self.path, 'prv_key', '')
task_header = get_example_task_header(keys_auth.public_key)
Expand All @@ -176,7 +177,7 @@ def test_request(self, tar, *_):
self.ts.get_key_id = Mock(return_value='0'*128)
self.ts.keys_auth.eth_addr = pubkey_to_address('0' * 128)
ts.add_task_header(task_header)
self.assertEqual(ts.request_task(), task_id)
self.assertEqual(ts._request_random_task(), task_id)
self.assertIn(task_id, ts.requested_tasks)
assert ts.remove_task_header(task_id)
self.assertNotIn(task_id, ts.requested_tasks)
Expand All @@ -196,7 +197,7 @@ def test_request(self, tar, *_):
task_header = get_example_task_header(keys_auth.public_key)
task_id3 = task_header.task_id
ts.add_task_header(task_header)
self.assertIsNone(ts.request_task())
self.assertIsNone(ts._request_random_task())
tar.add_support_status.assert_called_with(
task_id3,
SupportStatus(
Expand All @@ -211,7 +212,7 @@ def test_request(self, tar, *_):
task_id4 = task_header.task_id
task_header.max_price = 1
ts.add_task_header(task_header)
self.assertIsNone(ts.request_task())
self.assertIsNone(ts._request_random_task())
tar.add_support_status.assert_called_with(
task_id4,
SupportStatus(
Expand All @@ -225,7 +226,7 @@ def test_request(self, tar, *_):
task_header = get_example_task_header(keys_auth.public_key)
task_id5 = task_header.task_id
ts.add_task_header(task_header)
self.assertIsNone(ts.request_task())
self.assertIsNone(ts._request_random_task())
tar.add_support_status.assert_called_with(
task_id5,
SupportStatus(
Expand All @@ -241,13 +242,14 @@ def test_request_task_concent_required(self, *_):
self.ts.config_desc.min_price = 0
self.ts.client.concent_service.enabled = True
self.ts.task_archiver = Mock()
self.ts._last_task_request_time = 0.0
keys_auth = KeysAuth(self.path, 'prv_key', '')
task_header = get_example_task_header(keys_auth.public_key)
task_header.concent_enabled = False
task_header.sign(private_key=keys_auth._private_key)
self.ts.add_task_header(task_header)

self.assertIsNone(self.ts.request_task())
self.assertIsNone(self.ts._request_random_task())
self.ts.task_archiver.add_support_status.assert_called_once_with(
task_header.task_id,
SupportStatus(
Expand All @@ -263,12 +265,9 @@ def test_change_config(self, *_):
ccd2.task_session_timeout = 124
ccd2.min_price = 0.0057
ccd2.task_request_interval = 31
# ccd2.use_waiting_ttl = False
ts.change_config(ccd2)
self.assertEqual(ts.config_desc, ccd2)
self.assertEqual(ts.task_keeper.min_price, 0.0057)
self.assertEqual(ts.task_computer.task_request_frequency, 31)
# self.assertEqual(ts.task_computer.use_waiting_ttl, False)

@patch("golem.task.taskserver.TaskServer._sync_pending")
def test_sync(self, mock_sync_pending, *_):
Expand Down Expand Up @@ -1216,3 +1215,71 @@ def test_ok(self, interrupted, send_task_failed, logger_mock):
'test_task',
'Error downloading resources: test_reason'
)


class TestRequestRandomTask(TaskServerTestBase):

def setUp(self):
super().setUp()
self.ts.task_computer = MagicMock()
self.ts.task_keeper = MagicMock()

def test_task_already_assigned(self):
self.ts.task_computer.has_assigned_task.return_value = True
self.ts.task_computer.compute_tasks = True
self.ts.task_computer.runnable = True

self.assertIsNone(self.ts._request_random_task())

def test_task_computer_not_accepting_tasks(self):
self.ts.task_computer.has_assigned_task.return_value = False
self.ts.task_computer.compute_tasks = False
self.ts.task_computer.runnable = True

self.assertIsNone(self.ts._request_random_task())

def test_task_computer_not_runnable(self):
self.ts.task_computer.has_assigned_task.return_value = False
self.ts.task_computer.compute_tasks = True
self.ts.task_computer.runnable = False

self.assertIsNone(self.ts._request_random_task())

@freezegun.freeze_time()
def test_request_interval(self):
self.ts.task_computer.has_assigned_task.return_value = False
self.ts.task_computer.compute_tasks = True
self.ts.task_computer.runnable = True
self.ts.config_desc.task_request_interval = 1.0
self.ts._last_task_request_time = time.time()

self.assertIsNone(self.ts._request_random_task())

@freezegun.freeze_time()
def test_no_supported_tasks_in_task_keeper(self):
self.ts.task_computer.has_assigned_task.return_value = False
self.ts.task_computer.compute_tasks = True
self.ts.task_computer.runnable = True
self.ts.config_desc.task_request_interval = 1.0
self.ts._last_task_request_time = time.time() - 1.0
self.ts.task_keeper.get_task.return_value = None

self.assertIsNone(self.ts._request_random_task())

@freezegun.freeze_time()
@patch('golem.task.taskserver.TaskServer._request_task')
def test_ok(self, request_task):
self.ts.task_computer.has_assigned_task.return_value = False
self.ts.task_computer.compute_tasks = True
self.ts.task_computer.runnable = True
self.ts.config_desc.task_request_interval = 1.0
self.ts._last_task_request_time = time.time() - 1.0
task_header = Mock()
self.ts.task_keeper.get_task.return_value = task_header

result = self.ts._request_random_task()
self.assertEqual(result, request_task.return_value)
self.assertEqual(self.ts._last_task_request_time, time.time())
self.ts.task_computer.stats.increase_stat.assert_called_once_with(
'tasks_requested')
request_task.assert_called_once_with(task_header)