Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
NewTaskComputer
Browse files Browse the repository at this point in the history
Created NewTaskComputer class for computing tasks created with the new
Task API. Support for old-style tasks is kept as well.
TaskComputerAdapter was introduced to dispatch tasks between new and old
task computer.
  • Loading branch information
Wiezzel committed Jul 23, 2019
1 parent a738d5d commit 63ce003
Show file tree
Hide file tree
Showing 11 changed files with 834 additions and 240 deletions.
465 changes: 408 additions & 57 deletions golem/task/taskcomputer.py

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
from .server import queue_ as srv_queue
from .server import resources
from .server import verification as srv_verification
from .taskcomputer import TaskComputer
from .taskcomputer import TaskComputerAdapter
from .taskkeeper import TaskHeaderKeeper
from .taskmanager import TaskManager
from .tasksession import TaskSession
Expand Down Expand Up @@ -126,7 +126,7 @@ def __init__(self,
self.task_archiver = task_archiver
self.task_keeper = TaskHeaderKeeper(
old_env_manager=client.environments_manager,
new_env_manager=EnvironmentManager(),
new_env_manager=new_env_manager,
node=self.node,
min_price=config_desc.min_price,
task_archiver=task_archiver)
Expand All @@ -146,9 +146,9 @@ def __init__(self,
root_path=self.get_task_computer_root(),
benchmarks=benchmarks
)
self.task_computer = TaskComputer(
self.task_computer = TaskComputerAdapter(
task_server=self,
docker_cpu_env=docker_cpu_env,
env_manager=new_env_manager,
use_docker_manager=use_docker_manager,
finished_cb=task_finished_cb)
deferred = self._change_task_computer_config(
Expand Down Expand Up @@ -419,7 +419,7 @@ def resource_failure(self, task_id: str, reason: str) -> None:

self.task_computer.task_interrupted()
self.send_task_failed(
self.task_computer.assigned_subtask['subtask_id'],
self.task_computer.assigned_subtask_id,
task_id,
f'Error downloading resources: {reason}',
)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ constantly==15.1.0
crossbar==17.12.1
cryptography==2.3.1
cytoolz==0.9.0.1
dataclasses==0.6
distro==1.3.0
dnspython==1.15.0
docker==3.5.0
Expand Down
1 change: 1 addition & 0 deletions requirements_to-freeze.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ certifi
cffi==1.10.0
click>=4.0
crossbar==17.12.1
dataclasses==0.6
distro
docker==3.5.0
enforce==0.3.4
Expand Down
7 changes: 5 additions & 2 deletions tests/golem/network/p2p/test_peersession.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from golem.core.keysauth import KeysAuth
from golem.core.variables import PROTOCOL_CONST
from golem.core.variables import TASK_HEADERS_LIMIT
from golem.envs.docker.cpu import DockerCPUEnvironment
from golem.network.p2p.p2pservice import P2PService
from golem.network.p2p.peersession import (logger, PeerSession, PeerSessionInfo)
from golem.tools.assertlogs import LogTestCase
Expand All @@ -38,7 +39,7 @@ class TestPeerSession(testutils.DatabaseFixture, LogTestCase,
PEP8_FILES = ['golem/network/p2p/peersession.py', ]

@patch('golem.task.taskserver.NonHypervisedDockerCPUEnvironment')
def setUp(self, _):
def setUp(self, docker_env):
super().setUp()
random.seed()
self.peer_session = PeerSession(MagicMock())
Expand All @@ -53,6 +54,7 @@ def setUp(self, _):
)
client = MagicMock()
client.datadir = self.path
docker_env().metadata.return_value.id = DockerCPUEnvironment.ENV_ID
with patch(
'golem.network.concent.handlers_library.HandlersLibrary'
'.register_handler',):
Expand Down Expand Up @@ -461,14 +463,15 @@ def test_send_remove_task(self, send_mock):
assert isinstance(send_mock.call_args[0][0], message.p2p.RemoveTask)

@patch('golem.task.taskserver.NonHypervisedDockerCPUEnvironment')
def _gen_data_for_test_react_to_remove_task(self, _):
def _gen_data_for_test_react_to_remove_task(self, docker_env):
keys_auth = KeysAuth(self.path, 'priv_key', 'password')
previous_ka = self.peer_session.p2p_service.keys_auth
self.peer_session.p2p_service.keys_auth = keys_auth

# Unknown task owner
client = MagicMock()
client.datadir = self.path
docker_env().metadata.return_value.id = DockerCPUEnvironment.ENV_ID
with patch('golem.network.concent.handlers_library.HandlersLibrary'
'.register_handler',):
task_server = task_server_factory.TaskServer(client=client,)
Expand Down
2 changes: 1 addition & 1 deletion tests/golem/resource/base/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _create_resources(resource_dir):

return relative, absolute

@mock.patch('golem.task.taskserver.TaskComputer', mock.Mock())
@mock.patch('golem.task.taskserver.TaskComputerAdapter', mock.Mock())
def _create_client(self, task_id, postfix):
directory = os.path.join(self.tempdir, 'node' + postfix)
dir_manager = DirManager(directory)
Expand Down
2 changes: 1 addition & 1 deletion tests/golem/task/test_localcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_computer(self):
dm.update_config(
status_callback=mock.Mock(),
done_callback=mock.Mock(),
work_dirs=[self.new_path],
work_dirs=[self.new_path.parent.parent],
in_background=True)
files = self.additional_dir_content([1])
lc = LocalComputer(root_path=self.path,
Expand Down
4 changes: 3 additions & 1 deletion tests/golem/task/test_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from golem import clientconfigdescriptor
from golem.core import common
from golem.core import deferred as golem_deferred
from golem.envs.docker.cpu import DockerCPUEnvironment
from golem.ethereum import exceptions
from golem.network.p2p import p2pservice
from golem.task import rpc
Expand Down Expand Up @@ -59,14 +60,15 @@ class ProviderBase(test_client.TestClientBase):
}

@mock.patch('golem.task.taskserver.NonHypervisedDockerCPUEnvironment')
def setUp(self, _):
def setUp(self, docker_env):
super().setUp()
self.client.sync = mock.Mock()
self.client.p2pservice = mock.Mock(peers={})
self.client.apps_manager._benchmark_enabled = mock.Mock(
return_value=True
)
self.client.apps_manager.load_all_apps()
docker_env().metadata.return_value.id = DockerCPUEnvironment.ENV_ID
with mock.patch(
'golem.network.concent.handlers_library.HandlersLibrary'
'.register_handler',
Expand Down
119 changes: 2 additions & 117 deletions tests/golem/task/test_taskcomputer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
import random
from pathlib import Path
from threading import Lock
import time
import unittest.mock as mock
Expand All @@ -10,12 +9,11 @@
from twisted.internet import defer
from twisted.trial.unittest import TestCase as TwistedTestCase

from golem.client import ClientTaskComputerEventListener
from golem.clientconfigdescriptor import ClientConfigDescriptor
from golem.core.common import timeout_to_deadline
from golem.core.deferred import sync_wait
from golem.docker.manager import DockerManager
from golem.envs.docker.cpu import DockerCPUConfig, DockerCPUEnvironment
from golem.envs.docker.cpu import DockerCPUEnvironment
from golem.task.taskcomputer import TaskComputer, PyTaskThread
from golem.task.taskserver import TaskServer
from golem.testutils import DatabaseFixture
Expand All @@ -37,21 +35,14 @@ def setUp(self, docker_manager):
task_server.config_desc = ClientConfigDescriptor()
self.task_server = task_server

self.docker_cpu_env = mock.Mock(spec=DockerCPUEnvironment)
self.docker_manager = mock.Mock(spec=DockerManager, hypervisor=None)
docker_manager.install.return_value = self.docker_manager

self.task_computer = TaskComputer(
self.task_server,
self.docker_cpu_env)

self.task_computer = TaskComputer(self.task_server)
self.docker_manager.reset_mock()
self.docker_cpu_env.reset_mock()

def test_init(self):
tc = TaskComputer(
self.task_server,
self.docker_cpu_env,
use_docker_manager=False)
self.assertIsInstance(tc, TaskComputer)

Expand Down Expand Up @@ -89,7 +80,6 @@ def test_computation(self): # pylint: disable=too-many-statements
mock_finished = mock.Mock()
tc = TaskComputer(
task_server,
self.docker_cpu_env,
use_docker_manager=False,
finished_cb=mock_finished)

Expand Down Expand Up @@ -186,37 +176,6 @@ def test_computation(self): # pylint: disable=too-many-statements
if tt.is_alive():
tt.join(timeout=5)

def test_host_state(self):
task_server = self.task_server
tc = TaskComputer(
task_server,
self.docker_cpu_env,
use_docker_manager=False)
self.assertEqual(tc.get_host_state(), "Idle")
tc.counting_thread = mock.Mock()
self.assertEqual(tc.get_host_state(), "Computing")

def test_event_listeners(self):
client = mock.Mock()
task_server = self.task_server

tc = TaskComputer(
task_server,
self.docker_cpu_env,
use_docker_manager=False)

tc.lock_config(True)
tc.lock_config(False)

listener = ClientTaskComputerEventListener(client)
tc.register_listener(listener)

tc.lock_config(True)
client.lock_config.assert_called_with(True)

tc.lock_config(False)
client.lock_config.assert_called_with(False)

@mock.patch('golem.task.taskthread.TaskThread.start')
def test_compute_task(self, start):

Expand Down Expand Up @@ -262,7 +221,6 @@ def __wait_for_tasks(tc):
def test_get_environment_no_assigned_subtask(self):
tc = TaskComputer(
self.task_server,
self.docker_cpu_env,
use_docker_manager=False)
assert tc.get_environment() is None

Expand All @@ -276,7 +234,6 @@ def test_get_environment(self):

tc = TaskComputer(
task_server,
self.docker_cpu_env,
use_docker_manager=False)
tc.assigned_subtask = ComputeTaskDef()
tc.assigned_subtask['task_id'] = "task_id"
Expand All @@ -299,7 +256,6 @@ def test_thread(self):

tc = TaskComputer(
ts,
mock.Mock(spec=DockerCPUEnvironment),
use_docker_manager=False)

tt = self._new_task_thread(tc)
Expand Down Expand Up @@ -369,7 +325,6 @@ def test_task_computed(self):

task = TaskComputer(
task_server,
mock.Mock(spec=DockerCPUEnvironment),
use_docker_manager=False)

task_thread = mock.MagicMock()
Expand Down Expand Up @@ -452,55 +407,6 @@ def test_root_path(self):
yield self.task_computer.change_config(config_desc)
self.assertEqual(self.task_computer.dir_manager.root_path, '/test')

@defer.inlineCallbacks
def _test_compute_tasks(self, accept_tasks, in_shutdown, expected):
self.task_server.get_task_computer_root.return_value = '/test'
config_desc = ClientConfigDescriptor()
config_desc.accept_tasks = accept_tasks
config_desc.in_shutdown = in_shutdown

yield self.task_computer.change_config(config_desc)
self.assertEqual(self.task_computer.compute_tasks, expected)

@defer.inlineCallbacks
def test_compute_tasks(self):
yield self._test_compute_tasks(
accept_tasks=True,
in_shutdown=True,
expected=False
)
yield self._test_compute_tasks(
accept_tasks=True,
in_shutdown=False,
expected=True
)
yield self._test_compute_tasks(
accept_tasks=False,
in_shutdown=True,
expected=False
)
yield self._test_compute_tasks(
accept_tasks=False,
in_shutdown=False,
expected=False
)

@defer.inlineCallbacks
def test_update_docker_cpu_env_config(self):
self.task_server.get_task_computer_root.return_value = '/test'
config_desc = ClientConfigDescriptor()
config_desc.num_cores = 13
config_desc.max_memory_size = 1024 * 1024

yield self.task_computer.change_config(config_desc)
self.docker_cpu_env.update_config.assert_called_once_with(
DockerCPUConfig(
work_dirs=[Path('/test')],
cpu_count=13,
memory_mb=1024,
)
)

@defer.inlineCallbacks
def test_update_docker_manager_config(self):
def _update_config(done_callback, *_, **__):
Expand All @@ -517,27 +423,6 @@ def _update_config(done_callback, *_, **__):
self.docker_manager.update_config.assert_called_once()


class TestLockConfig(TestTaskComputerBase):

def test_on(self):
listener = mock.MagicMock()
self.task_computer.register_listener(listener)
self.task_computer.runnable = True

self.task_computer.lock_config(True)
listener.lock_config.assert_called_once_with(True)
self.assertFalse(self.task_computer.runnable)

def test_off(self):
listener = mock.MagicMock()
self.task_computer.register_listener(listener)
self.task_computer.runnable = False

self.task_computer.lock_config(False)
listener.lock_config.assert_called_once_with(False)
self.assertTrue(self.task_computer.runnable)


@mock.patch('golem.task.taskcomputer.ProviderTimer')
class TestTaskGiven(TestTaskComputerBase):

Expand Down
Loading

0 comments on commit 63ce003

Please sign in to comment.