diff --git a/golem/appconfig.py b/golem/appconfig.py index 30026710f5..739e88d897 100644 --- a/golem/appconfig.py +++ b/golem/appconfig.py @@ -89,6 +89,11 @@ DISALLOW_ID_MAX_TIMES = 1 DISALLOW_IP_MAX_TIMES = 1 +DEFAULT_HYPERDRIVE_PORT = 3282 +DEFAULT_HYPERDRIVE_ADDRESS = None +DEFAULT_HYPERDRIVE_RPC_PORT = 3292 +DEFAULT_HYPERDRIVE_RPC_ADDRESS = 'localhost' + class NodeConfig: @@ -190,10 +195,21 @@ def load_config(cls, datadir, cfg_file_name=CONFIG_FILENAME): disallow_ip_timeout_seconds=DISALLOW_IP_TIMEOUT_SECONDS, disallow_id_max_times=DISALLOW_ID_MAX_TIMES, disallow_ip_max_times=DISALLOW_IP_MAX_TIMES, + #hyperg + hyperdrive_port=DEFAULT_HYPERDRIVE_PORT, + hyperdrive_address=DEFAULT_HYPERDRIVE_ADDRESS, + hyperdrive_rpc_port=DEFAULT_HYPERDRIVE_RPC_PORT, + hyperdrive_rpc_address=DEFAULT_HYPERDRIVE_RPC_ADDRESS, ) cfg = SimpleConfig(node_config, cfg_file, keep_old=False) - return AppConfig(cfg, cfg_file) + return cls(cfg, cfg_file) + + def __repr__(self): + return '<{}: {}>'.format(self.__class__, { + prop: self.get_node_property(prop)() + for prop in self._cfg.get_node_config().prop_names + }) def __init__(self, cfg, config_file): self.config_file = config_file diff --git a/golem/client.py b/golem/client.py index 6f6dd2bfda..15abb6c59d 100644 --- a/golem/client.py +++ b/golem/client.py @@ -408,7 +408,22 @@ def start_network(self): logger.info("Starting resource server ...") - self.daemon_manager = HyperdriveDaemonManager(self.datadir) + self.daemon_manager = HyperdriveDaemonManager( + self.datadir, + daemon_config={ + k: v for k, v in { + 'host': self.config_desc.hyperdrive_address, + 'port': self.config_desc.hyperdrive_port, + 'rpc_host': self.config_desc.hyperdrive_rpc_address, + 'rpc_port': self.config_desc.hyperdrive_rpc_port, + }.items() + if v is not None + }, + client_config={ + 'port': self.config_desc.hyperdrive_rpc_port, + 'host': self.config_desc.hyperdrive_rpc_address, + } + ) self.daemon_manager.start() hyperdrive_addrs = self.daemon_manager.public_addresses( @@ -425,7 +440,11 @@ def start_network(self): resource_manager = HyperdriveResourceManager( dir_manager=dir_manager, - daemon_address=hyperdrive_addrs + daemon_address=hyperdrive_addrs, + client_kwargs={ + 'host': self.config_desc.hyperdrive_rpc_address, + 'port': self.config_desc.hyperdrive_rpc_port, + }, ) self.resource_server = BaseResourceServer( resource_manager=resource_manager, diff --git a/golem/clientconfigdescriptor.py b/golem/clientconfigdescriptor.py index fc437d9efb..8ef4270cbb 100644 --- a/golem/clientconfigdescriptor.py +++ b/golem/clientconfigdescriptor.py @@ -75,6 +75,15 @@ def __init__(self): self.disallow_id_max_times = 1 self.disallow_ip_max_times = 1 + self.hyperdrive_port: typing.Optional[int] = None + self.hyperdrive_address: typing.Optional[str] = None + self.hyperdrive_rpc_port: typing.Optional[int] = None + self.hyperdrive_rpc_address: typing.Optional[str] = None + + def __repr__(self): + return '{}: {}'.format(self.__class__, { + v: getattr(self, v) for v in vars(self)}) + def init_from_app_config(self, app_config): """Initializes config parameters based on the specified AppConfig :param app_config: instance of AppConfig diff --git a/golem/docker/task_thread.py b/golem/docker/task_thread.py index 70709a9983..84b4da79d5 100644 --- a/golem/docker/task_thread.py +++ b/golem/docker/task_thread.py @@ -195,7 +195,7 @@ def _run_docker_job(self) -> Optional[int]: devices = None runtime = None - assert self.docker_manager is not None + assert self.docker_manager is not None, "Docker Manager undefined" # PyLint still thinks docker_manager is of type DockerConfigManager # pylint: disable=no-member host_config = self.docker_manager.get_host_config_for_task(binds) diff --git a/golem/network/hyperdrive/client.py b/golem/network/hyperdrive/client.py index db5d918b2e..54c17fa46b 100644 --- a/golem/network/hyperdrive/client.py +++ b/golem/network/hyperdrive/client.py @@ -18,10 +18,6 @@ log = logging.getLogger(__name__) -DEFAULT_HYPERDRIVE_PORT = 3282 -DEFAULT_HYPERDRIVE_RPC_PORT = 3292 - - def to_hyperg_peer(host: str, port: int) -> Dict[str, Tuple[str, int]]: return {'TCP': (host, port)} @@ -35,12 +31,14 @@ def round_timeout(value: Optional[Union[int, float]]) -> Optional[int]: class HyperdriveClient(IClient): + """ + Enables communication between Golem and the Hyperdrive service. + """ CLIENT_ID = 'hyperg' VERSION = 1.1 - def __init__(self, port=DEFAULT_HYPERDRIVE_RPC_PORT, - host='localhost', timeout=None): + def __init__(self, port, host, timeout=None): super(HyperdriveClient, self).__init__() # API destination address @@ -53,6 +51,9 @@ def __init__(self, port=DEFAULT_HYPERDRIVE_RPC_PORT, self._url = 'http://{}:{}/api'.format(self.host, self.port) self._headers = {'content-type': 'application/json'} + def __repr__(self): + return f'<{self.__class__.__name__} {self.CLIENT_ID} at {self._url}>' + @classmethod def build_options(cls, peers=None, **kwargs): return HyperdriveClientOptions(cls.CLIENT_ID, cls.VERSION, @@ -145,8 +146,7 @@ def _request(self, **data): class HyperdriveAsyncClient(HyperdriveClient): - def __init__(self, port=DEFAULT_HYPERDRIVE_RPC_PORT, host='localhost', - timeout=None): + def __init__(self, port, host, timeout=None): from twisted.web.http_headers import Headers # imports reactor super().__init__(port, host, timeout) diff --git a/golem/network/hyperdrive/daemon_manager.py b/golem/network/hyperdrive/daemon_manager.py index a46b6d4e1d..85ca2f246a 100644 --- a/golem/network/hyperdrive/daemon_manager.py +++ b/golem/network/hyperdrive/daemon_manager.py @@ -1,4 +1,5 @@ import copy +import itertools import logging import os import subprocess @@ -18,6 +19,7 @@ GOLEM_HYPERDRIVE_VERSION = '0.2.4' +GOLEM_HYPERDRIVE_LOGFILE = 'hyperg.log' class HyperdriveDaemonManager(object): @@ -25,11 +27,15 @@ class HyperdriveDaemonManager(object): _executable = 'hyperg' _min_version = semantic_version.Version(GOLEM_HYPERDRIVE_VERSION) - def __init__(self, datadir, **hyperdrive_config): - super(HyperdriveDaemonManager, self).__init__() - + def __init__( + self, + datadir, + daemon_config: Optional[dict] = None, + client_config: Optional[dict] = None + ) -> None: self._addresses = None - self._config = hyperdrive_config + self._config = client_config or {} + self._daemon_config = daemon_config or {} # monitor and restart if process dies self._monitor = ProcessMonitor() @@ -43,17 +49,28 @@ def __init__(self, datadir, **hyperdrive_config): logger.warning("Creating HyperG logsdir: %s", logsdir) os.makedirs(logsdir) + self._daemon_config.update( + db=self._dir, + logfile=os.path.join(logsdir, GOLEM_HYPERDRIVE_LOGFILE) + ) + self._command = [ self._executable, - '--db', self._dir, - '--logfile', os.path.join(logsdir, "hyperg.log"), - ] + ] + list(itertools.chain.from_iterable( + [('--' + k, str(v)) for k, v in self._daemon_config.items()] + )) + + def __str__(self): + return self._executable - def addresses(self): + def addresses(self, suppress_warning=False): try: return self._get_addresses() except requests.ConnectionError: - logger.warning('Cannot connect to Hyperdrive daemon') + if not suppress_warning: + import traceback + traceback.print_stack() + logger.warning('Cannot connect to Hyperdrive daemon') return dict() def version(self) -> Optional[semantic_version.Version]: @@ -105,11 +122,13 @@ def stop(self, *_): @report_calls(Component.hyperdrive, 'instance.connect') def _start(self, *_): - self._check_version() + version = self._check_version() # do not supervise already running processes - addresses = self.addresses() + addresses = self.addresses(suppress_warning=True) if addresses: + logger.info("%s %s already started. addresses=%s", + self._executable, version, addresses) return process = self._create_sub() @@ -120,12 +139,16 @@ def _start(self, *_): else: raise RuntimeError("Cannot start {}".format(self._executable)) + logger.info("%s %s started. Listening on %s.", + self._executable, version, self.addresses()) + @report_calls(Component.hyperdrive, 'instance.version') def _check_version(self): version = self.version() if version < self._min_version: raise RuntimeError('HyperG version {} is required' .format(self._min_version)) + return version @report_calls(Component.hyperdrive, 'instance.check') def _create_sub(self): @@ -141,7 +164,7 @@ def _wait(self, timeout: int = 10): deadline = time.time() + timeout while time.time() < deadline: - addresses = self.addresses() + addresses = self.addresses(suppress_warning=True) if addresses: return time.sleep(1.) diff --git a/golem/resource/hyperdrive/resourcesmanager.py b/golem/resource/hyperdrive/resourcesmanager.py index 4c955cba0f..9b1522732c 100644 --- a/golem/resource/hyperdrive/resourcesmanager.py +++ b/golem/resource/hyperdrive/resourcesmanager.py @@ -1,6 +1,7 @@ import logging import os +import typing from collections import Iterable, Sized from functools import partial from twisted.internet.defer import Deferred @@ -74,12 +75,18 @@ def log_error(msg, exc): class HyperdriveResourceManager(ClientHandler): - def __init__(self, dir_manager, daemon_address=None, config=None, - resource_dir_method=None): - + def __init__( # noqa pylint: disable=too-many-arguments + self, dir_manager, daemon_address=None, config=None, # noqa pylint: disable=unused-argument + resource_dir_method=None, + client_kwargs: typing.Optional[dict] = None, + ) -> None: super().__init__(config) - self.client = HyperdriveAsyncClient(**self.config.client) + self.client = HyperdriveAsyncClient( # type: ignore + **self.config.client, **(client_kwargs or {})) + logger.info("Initializing %s, using %s", + self.__class__.__name__, self.client) + self.storage = ResourceStorage(dir_manager, resource_dir_method or dir_manager.get_task_resource_dir) diff --git a/golem/task/taskmanager.py b/golem/task/taskmanager.py index e465010ca8..5cf57a0eb2 100644 --- a/golem/task/taskmanager.py +++ b/golem/task/taskmanager.py @@ -16,6 +16,8 @@ from apps.appsmanager import AppsManager from apps.core.task.coretask import CoreTask + +from golem.clientconfigdescriptor import ClientConfigDescriptor from golem.core.common import get_timestamp_utc, HandleForwardedError, \ HandleKeyError, node_info_str, short_node_id, to_unicode, update_dict from golem.manager.nodestatesnapshot import LocalTaskStateSnapshot @@ -82,8 +84,11 @@ class AlreadyRestartedError(Error): def __init__( self, node, keys_auth, root_path, + config_desc: ClientConfigDescriptor, tasks_dir="tasks", task_persistence=True, - apps_manager=AppsManager(), finished_cb=None): + apps_manager=AppsManager(), + finished_cb=None, + ): super().__init__() self.apps_manager = apps_manager @@ -110,6 +115,10 @@ def __init__( resource_manager = HyperdriveResourceManager( self.dir_manager, resource_dir_method=self.dir_manager.get_task_temporary_dir, + client_kwargs={ + 'host': config_desc.hyperdrive_rpc_address, + 'port': config_desc.hyperdrive_rpc_port, + }, ) self.task_result_manager = EncryptedResultPackageManager( resource_manager diff --git a/golem/task/taskserver.py b/golem/task/taskserver.py index d3fa308fd6..3f8bdd07d2 100644 --- a/golem/task/taskserver.py +++ b/golem/task/taskserver.py @@ -105,6 +105,7 @@ def __init__(self, self.node, self.keys_auth, root_path=TaskServer.__get_task_manager_root(client.datadir), + config_desc=config_desc, tasks_dir=os.path.join(client.datadir, 'tasks'), apps_manager=apps_manager, finished_cb=task_finished_cb, diff --git a/golem/testutils.py b/golem/testutils.py index 0aa89c0ff2..d85d83314d 100644 --- a/golem/testutils.py +++ b/golem/testutils.py @@ -23,6 +23,7 @@ class TempDirFixture(unittest.TestCase): @classmethod def setUpClass(cls): + super().setUpClass() logging.basicConfig(level=logging.DEBUG) if cls.root_dir is None: if is_osx(): diff --git a/golemapp.py b/golemapp.py index 2d5f801433..e3b4ac50e5 100755 --- a/golemapp.py +++ b/golemapp.py @@ -102,6 +102,8 @@ def monkey_patched_getLogger(*args, **kwargs): ]), help="Change level for Golem loggers and handlers") @click.option('--enable-talkback', is_flag=True, default=None) +@click.option('--hyperdrive-port', type=int, help="Hyperdrive public port") +@click.option('--hyperdrive-rpc-port', type=int, help="Hyperdrive RPC port") # Python flags, needed by crossbar (package only) @click.option('-m', nargs=1, default=None) @click.option('--node', expose_value=False) @@ -119,7 +121,9 @@ def monkey_patched_getLogger(*args, **kwargs): def start( # pylint: disable=too-many-arguments, too-many-locals monitor, concent, datadir, node_address, rpc_address, peer, mainnet, net, geth_address, password, accept_terms, accept_concent_terms, - accept_all_terms, version, log_level, enable_talkback, m): + accept_all_terms, version, log_level, enable_talkback, m, + hyperdrive_port, hyperdrive_rpc_port, +): freeze_support() delete_reactor() @@ -161,6 +165,10 @@ def _start(): config_desc.rpc_port = rpc_address.port if node_address: config_desc.node_address = node_address + if hyperdrive_port: + config_desc.hyperdrive_port = hyperdrive_port + if hyperdrive_rpc_port: + config_desc.hyperdrive_rpc_port = hyperdrive_rpc_port # Golem headless install_reactor() diff --git a/tests/apps/blender/verification/test_verification_queue.py b/tests/apps/blender/verification/test_verification_queue.py index 3563d44b97..c90c563442 100644 --- a/tests/apps/blender/verification/test_verification_queue.py +++ b/tests/apps/blender/verification/test_verification_queue.py @@ -1,14 +1,17 @@ -import unittest from unittest import mock import functools +from twisted.internet.defer import Deferred + from golem_verificator.blender_verifier import BlenderVerifier from golem.core.common import timeout_to_deadline +from golem.core.deferred import sync_wait from golem.docker.task_thread import DockerTaskThread +from golem.tools.testwithreactor import TestWithReactor from apps.core.verification_queue import VerificationQueue from apps.blender.blender_reference_generator import BlenderReferenceGenerator -class TestVerificationQueue(unittest.TestCase): +class TestVerificationQueue(TestWithReactor): def setUp(self): self.queue = VerificationQueue() @@ -24,12 +27,14 @@ def test_task_timeout(self, _start_rendering, _simple_verification, _verification_timed_out, ): VerificationQueue.VERIFICATION_TIMEOUT = 2 - from twisted.internet import reactor + d = Deferred() def test_timeout(): + subtask_info = {'subtask_id': 'deadbeef'} - subtask_info = {} - subtask_info['subtask_id'] = 'deadbeef' + def verification_finished(subtask_id, verdict, result): # noqa pylint:disable=unused-argument + d.callback(True) + return subtask_id self.queue.submit( functools.partial(BlenderVerifier, @@ -37,16 +42,15 @@ def test_timeout(): docker_task_cls=DockerTaskThread), "deadbeef", timeout_to_deadline(10), - lambda subtask_id, verdict, result: subtask_id, + cb=verification_finished, subtask_info=subtask_info, results=[], resources=[], - reference_data=[] + reference_data=[], ) + reactor = self._get_reactor() reactor.callLater(0, test_timeout) - reactor.callLater(5, reactor.stop) - - reactor.run() + sync_wait(d, 60) _verification_timed_out.assert_called_once() diff --git a/tests/factories/hyperdrive.py b/tests/factories/hyperdrive.py new file mode 100644 index 0000000000..b87700e8a3 --- /dev/null +++ b/tests/factories/hyperdrive.py @@ -0,0 +1,17 @@ +from golem.appconfig import ( + DEFAULT_HYPERDRIVE_RPC_ADDRESS, DEFAULT_HYPERDRIVE_RPC_PORT +) + + +def hyperdrive_client_kwargs(wrapped=True): + client_kwargs = { + 'host': DEFAULT_HYPERDRIVE_RPC_ADDRESS, + 'port': DEFAULT_HYPERDRIVE_RPC_PORT, + } + + if not wrapped: + return client_kwargs + + return { + 'client_kwargs': client_kwargs + } diff --git a/tests/golem/docker/test_docker_dummy_task.py b/tests/golem/docker/test_docker_dummy_task.py index 0eae615279..5158d580f6 100644 --- a/tests/golem/docker/test_docker_dummy_task.py +++ b/tests/golem/docker/test_docker_dummy_task.py @@ -8,23 +8,31 @@ from twisted.internet.defer import Deferred from apps.dummy.task.dummytask import DummyTaskBuilder, DummyTask +from golem.clientconfigdescriptor import ClientConfigDescriptor from golem.core.common import get_golem_path from golem.core.deferred import sync_wait from golem.core.fileshelper import find_file_with_ext +from golem.docker.manager import DockerManager from golem.resource.dirmanager import symlink_or_copy, \ rmlink_or_rmtree from golem.task.localcomputer import LocalComputer from golem.task.taskcomputer import DockerTaskThread from golem.task.tasktester import TaskTester from golem.tools.ci import ci_skip +from golem.tools.testwithreactor import TestWithReactor + from .test_docker_task import DockerTaskTestCase # Make peewee logging less verbose logging.getLogger("peewee").setLevel("INFO") +WAIT_TIMEOUT = 60 + @ci_skip -class TestDockerDummyTask(DockerTaskTestCase[DummyTask, DummyTaskBuilder]): +class TestDockerDummyTask( + DockerTaskTestCase[DummyTask, DummyTaskBuilder], TestWithReactor +): TASK_FILE = "docker-dummy-test-task.json" TASK_CLASS = DummyTask @@ -53,12 +61,15 @@ def setUpClass(cls): symlink_or_copy(code_dir, cls.code_link) symlink_or_copy(data_dir, cls.data_link) + DockerManager.install(ClientConfigDescriptor()) + cls.TASK_CLASS.VERIFICATION_QUEUE.resume() @classmethod def tearDownClass(cls): rmlink_or_rmtree(cls.code_link) rmlink_or_rmtree(cls.data_link) os.rmdir(cls.test_tmp) + super().tearDownClass() def _extract_results(self, computer: LocalComputer, subtask_id: str) \ -> Path: @@ -71,6 +82,7 @@ def _extract_results(self, computer: LocalComputer, subtask_id: str) \ requestor are separate machines """ assert isinstance(computer.tt, DockerTaskThread) + dirname = path.dirname(computer.tt.result['data'][0]) result = Path(find_file_with_ext(dirname, [".result"])) self.assertTrue(result.is_file()) @@ -90,8 +102,6 @@ def test_dummy_real_task(self, mock_dtt): print(ctd) print(type(ctd)) - from twisted.internet import reactor - d = Deferred() computer = LocalComputer( @@ -121,8 +131,7 @@ def success(*args, **kwargs): task.computation_finished(ctd['subtask_id'], [str(output)], verification_finished=success) - reactor.iterate() - sync_wait(d, 40) + sync_wait(d, WAIT_TIMEOUT) b = Deferred() @@ -137,15 +146,14 @@ def failure(*args, **kwargs): ctd = task.query_extra_data(10000.).ctd task.computation_finished(ctd['subtask_id'], [str(bad_output)], verification_finished=failure) - reactor.iterate() - sync_wait(b, 40) + sync_wait(b, WAIT_TIMEOUT) def test_dummytask_TaskTester_should_pass(self): task = self._get_test_task() computer = TaskTester(task, self.tempdir, Mock(), Mock()) computer.run() - computer.tt.join(60.0) + computer.tt.join(float(WAIT_TIMEOUT)) dirname = os.path.dirname(computer.tt.result[0]['data'][0]) result = find_file_with_ext(dirname, [".result"]) diff --git a/tests/golem/docker/test_docker_image.py b/tests/golem/docker/test_docker_image.py index 2540f93698..6c07739506 100644 --- a/tests/golem/docker/test_docker_image.py +++ b/tests/golem/docker/test_docker_image.py @@ -18,6 +18,7 @@ class DockerTestCase(unittest.TestCase): @classmethod def setUpClass(cls): """Disable all tests if Docker or the test image is not available.""" + super().setUpClass() try: client = local_client() images = client.images() diff --git a/tests/golem/network/hyperdrive/test_hyperdrive_client.py b/tests/golem/network/hyperdrive/test_hyperdrive_client.py index 6be659899c..fab754890e 100644 --- a/tests/golem/network/hyperdrive/test_hyperdrive_client.py +++ b/tests/golem/network/hyperdrive/test_hyperdrive_client.py @@ -9,6 +9,9 @@ from golem.network.hyperdrive.client import HyperdriveAsyncClient, \ HyperdriveClient, HyperdriveClientOptions +from tests.factories.hyperdrive import hyperdrive_client_kwargs + + response = { 'id': str(uuid.uuid4()), 'version': '0.2.4', @@ -26,6 +29,10 @@ content=response_str.encode())) class TestHyperdriveClient(TestCase): + @staticmethod + def get_client(): + return HyperdriveClient(**hyperdrive_client_kwargs(wrapped=False)) + def test_build_options(self, _): options = HyperdriveClient.build_options() assert options.client_id == HyperdriveClient.CLIENT_ID @@ -33,25 +40,25 @@ def test_build_options(self, _): assert options.options['peers'] is None def test_id(self, _): - client = HyperdriveClient() + client = self.get_client() result = client.id() assert result['id'] == response['id'] assert result['version'] == response['version'] def test_addresses(self, _): - client = HyperdriveClient() + client = self.get_client() assert client.addresses() == dict(TCP=('0.0.0.0', 3282)) def test_add(self, _): - client = HyperdriveClient() + client = self.get_client() assert client.add(response['files']) == response['hash'] def test_restore(self, _): - client = HyperdriveClient() + client = self.get_client() assert client.restore(response['files']) == response['hash'] def test_get(self, _): - client = HyperdriveClient() + client = self.get_client() content_hash = str(uuid.uuid4()) filepath = str(uuid.uuid4()) @@ -62,7 +69,7 @@ def test_get(self, _): == [(filepath, content_hash, response['files'])] def test_cancel(self, _): - client = HyperdriveClient() + client = self.get_client() content_hash = str(uuid.uuid4()) response_hash = response['hash'] assert client.cancel(content_hash) == response_hash @@ -70,7 +77,7 @@ def test_cancel(self, _): @mock.patch('json.loads') @mock.patch('requests.post') def test_request(self, post, json_loads, _): - client = HyperdriveClient() + client = self.get_client() resp = mock.Mock() post.return_value = resp @@ -79,7 +86,7 @@ def test_request(self, post, json_loads, _): @mock.patch('json.loads') def test_request_exception(self, json_loads, post): - client = HyperdriveClient() + client = self.get_client() resp = mock.Mock() post.return_value = resp @@ -94,7 +101,7 @@ def test_request_exception(self, json_loads, post): @mock.patch('json.loads') def test_request_http_error(self, json_loads, post): - client = HyperdriveClient() + client = self.get_client() resp = mock.Mock() post.return_value = resp @@ -122,10 +129,14 @@ def failure(*_): d.errback(Exception()) return d + @staticmethod + def get_client(): + return HyperdriveAsyncClient(**hyperdrive_client_kwargs(wrapped=False)) + @staticmethod @mock.patch('golem.core.golem_async.AsyncHTTPRequest.run') def test_get_async_run(request_run): - client = HyperdriveAsyncClient() + client = TestHyperdriveClientAsync.get_client() result = client.get_async('resource_hash', client_options=None, filepath='.') @@ -143,7 +154,7 @@ def test_get_async_run(request_run): ) def test_get_async_error(self): - client = HyperdriveAsyncClient() + client = self.get_client() with mock.patch('golem.core.golem_async.AsyncHTTPRequest.run', side_effect=self.failure): @@ -155,7 +166,7 @@ def test_get_async_error(self): assert isinstance(wrapper.result, failure.Failure) def test_get_async_body_error(self): - client = HyperdriveAsyncClient() + client = self.get_client() with mock.patch('twisted.web.client.readBody', side_effect=self.failure), \ @@ -180,7 +191,7 @@ def body(*_): mock.patch('golem.core.golem_async.AsyncHTTPRequest.run', side_effect=self.success): - client = HyperdriveAsyncClient() + client = self.get_client() wrapper = client.get_async('resource_hash', client_options=None, filepath='.') @@ -201,7 +212,7 @@ def body(*_): mock.patch('golem.core.golem_async.AsyncHTTPRequest.run', side_effect=self.success): - client = HyperdriveAsyncClient() + client = self.get_client() wrapper = client.add_async(files) assert wrapper.called assert isinstance(wrapper.result, str) diff --git a/tests/golem/network/hyperdrive/test_hyperdrive_daemon_manager.py b/tests/golem/network/hyperdrive/test_hyperdrive_daemon_manager.py index 7f0188d7df..bd066b30a9 100644 --- a/tests/golem/network/hyperdrive/test_hyperdrive_daemon_manager.py +++ b/tests/golem/network/hyperdrive/test_hyperdrive_daemon_manager.py @@ -6,6 +6,8 @@ from golem.network.hyperdrive.daemon_manager import HyperdriveDaemonManager from golem.testutils import TempDirFixture +from tests.factories.hyperdrive import hyperdrive_client_kwargs + class TestHyperdriveDaemonManager(TempDirFixture): @@ -13,7 +15,8 @@ class TestHyperdriveDaemonManager(TempDirFixture): def setUp(self, *_): super().setUp() - self.dm = HyperdriveDaemonManager(self.path) + self.dm = HyperdriveDaemonManager( + self.path, client_config=hyperdrive_client_kwargs(wrapped=False)) self.monitor = self.dm._monitor @patch('golem.network.hyperdrive.daemon_manager.' diff --git a/tests/golem/resource/base/common.py b/tests/golem/resource/base/common.py index 3e31422f12..fbdaf5f296 100644 --- a/tests/golem/resource/base/common.py +++ b/tests/golem/resource/base/common.py @@ -18,6 +18,8 @@ from golem.testutils import TempDirFixture from golem.tools.assertlogs import LogTestCase +from tests.factories.hyperdrive import hyperdrive_client_kwargs + class AddGetResources(TempDirFixture, LogTestCase): @@ -79,7 +81,7 @@ def _create_client(self, task_id, postfix): cls = self._resource_manager_class resource_manager = cls.__new__(cls) - resource_manager.__init__(dir_manager) + resource_manager.__init__(dir_manager, **hyperdrive_client_kwargs()) database = Database( db, diff --git a/tests/golem/resource/base/test_base_resourceserver.py b/tests/golem/resource/base/test_base_resourceserver.py index 020df308b1..0bb50f2a83 100644 --- a/tests/golem/resource/base/test_base_resourceserver.py +++ b/tests/golem/resource/base/test_base_resourceserver.py @@ -14,6 +14,8 @@ from golem.resource.hyperdrive.resourcesmanager import DummyResourceManager from golem.tools import testwithreactor +from tests.factories.hyperdrive import hyperdrive_client_kwargs + node_name = 'test_suite' @@ -66,7 +68,8 @@ def setUp(self): shutil.copy(test_dir_file, test_dir_file_copy) - self.resource_manager = DummyResourceManager(self.dir_manager) + self.resource_manager = DummyResourceManager( + self.dir_manager, **hyperdrive_client_kwargs()) self.keys_auth = KeysAuth(self.path, 'priv_key', 'password') self.client = MockClient() self.resource_server = BaseResourceServer( @@ -180,7 +183,8 @@ def testGetResources(self): relative = [[r.hash, r.files] for r in resources] new_server = BaseResourceServer( - DummyResourceManager(self.dir_manager), + DummyResourceManager( + self.dir_manager, **hyperdrive_client_kwargs()), DirManager(self.path, '2'), self.keys_auth, self.client diff --git a/tests/golem/resource/hyperdrive/test_hyperdrive_resourcemanager.py b/tests/golem/resource/hyperdrive/test_hyperdrive_resourcemanager.py index 47a9913f01..2c1fdba84e 100644 --- a/tests/golem/resource/hyperdrive/test_hyperdrive_resourcemanager.py +++ b/tests/golem/resource/hyperdrive/test_hyperdrive_resourcemanager.py @@ -17,11 +17,12 @@ default_argument_value from golem.testutils import TempDirFixture from tests.golem.resource.base.common import AddGetResources +from tests.factories.hyperdrive import hyperdrive_client_kwargs def running(): try: - return HyperdriveClient().id() + return HyperdriveClient(**hyperdrive_client_kwargs(wrapped=False)).id() except ConnectionError: return False @@ -200,7 +201,10 @@ def setUp(self): self.task_id = str(uuid.uuid4()) self.handle_retries = Mock() self.dir_manager = DirManager(self.tempdir) - self.resource_manager = HyperdriveResourceManager(self.dir_manager) + self.resource_manager = HyperdriveResourceManager( # noqa pylint: disable=unexpected-keyword-arg + self.dir_manager, + **hyperdrive_client_kwargs() + ) self.resource_manager._handle_retries = self.handle_retries file_name = 'test_file' diff --git a/tests/golem/resource/test_resourcehandshake.py b/tests/golem/resource/test_resourcehandshake.py index f1c6ec6a6e..4add9ecce3 100644 --- a/tests/golem/resource/test_resourcehandshake.py +++ b/tests/golem/resource/test_resourcehandshake.py @@ -20,6 +20,8 @@ from golem.task.acl import get_acl from golem.testutils import TempDirFixture, DatabaseFixture +from tests.factories.hyperdrive import hyperdrive_client_kwargs + class TestResourceHandshake(TempDirFixture): @@ -609,7 +611,8 @@ def __create_task_server(session): client = Mock(datadir=session.data_dir) dir_manager = DirManager(session.data_dir) - resource_manager = HyperdriveResourceManager(dir_manager=dir_manager) + resource_manager = HyperdriveResourceManager( # noqa pylint: disable=unexpected-keyword-arg + dir_manager=dir_manager, **hyperdrive_client_kwargs()) resource_manager.successful_uploads = True resource_manager.successful_downloads = True diff --git a/tests/golem/task/result/test_resultmanager.py b/tests/golem/task/result/test_resultmanager.py index 85a1a45f81..8f61e188fe 100644 --- a/tests/golem/task/result/test_resultmanager.py +++ b/tests/golem/task/result/test_resultmanager.py @@ -9,6 +9,8 @@ from golem.task.result.resultpackage import ExtractedPackage from golem.tools.testdirfixture import TestDirFixture +from tests.factories.hyperdrive import hyperdrive_client_kwargs + class MockTaskResult: def __init__(self, task_id, result, @@ -66,7 +68,8 @@ def setUp(self): self.dir_manager = DirManager(self.path) self.resource_manager = DummyResourceManager( self.dir_manager, - resource_dir_method=self.dir_manager.get_task_output_dir + resource_dir_method=self.dir_manager.get_task_output_dir, + **hyperdrive_client_kwargs(), ) def testGenSecret(self): @@ -136,7 +139,8 @@ def error(exc, *args, **kwargs): dir_manager = DirManager(self.path) resource_manager = DummyResourceManager( dir_manager, - resource_dir_method=dir_manager.get_task_temporary_dir + resource_dir_method=dir_manager.get_task_temporary_dir, + **hyperdrive_client_kwargs(), ) new_manager = EncryptedResultPackageManager(resource_manager) diff --git a/tests/golem/task/test_taskmanager.py b/tests/golem/task/test_taskmanager.py index 8b7afe9efd..c03bbc48c9 100644 --- a/tests/golem/task/test_taskmanager.py +++ b/tests/golem/task/test_taskmanager.py @@ -18,6 +18,7 @@ from twisted.internet.defer import fail from apps.appsmanager import AppsManager +from golem.clientconfigdescriptor import ClientConfigDescriptor from apps.core.task.coretask import CoreTask from apps.core.task.coretaskstate import TaskDefinition from apps.blender.task.blenderrendertask import BlenderRenderTask @@ -89,6 +90,7 @@ def setUp(self): dt_p2p_factory.Node(), keys_auth, root_path=self.path, + config_desc=ClientConfigDescriptor(), task_persistence=True, finished_cb=Mock() ) @@ -202,6 +204,7 @@ def test_dump_and_restore(self): temp_tm = TaskManager(dt_p2p_factory.Node(), keys_auth=keys_auth, root_path=self.path, + config_desc=ClientConfigDescriptor(), task_persistence=True) temp_tm.key_id = "KEYID" @@ -213,8 +216,12 @@ def test_dump_and_restore(self): "TASK %s DUMPED" % task_id in log for log in log.output) with self.assertLogs(logger, level="DEBUG") as log: - fresh_tm = TaskManager(dt_p2p_factory.Node(), keys_auth=Mock(), - root_path=self.path, task_persistence=True) + fresh_tm = TaskManager( + dt_p2p_factory.Node(), + keys_auth=Mock(), + root_path=self.path, + config_desc=ClientConfigDescriptor(), + task_persistence=True) assert any( "SEARCHING FOR TASKS TO RESTORE" in log for log in log.output) @@ -897,8 +904,12 @@ def test_get_tasks(self, _): count = 3 apps_manager = AppsManager() apps_manager.load_all_apps() - tm = TaskManager(dt_p2p_factory.Node(), Mock(), root_path=self.path, - apps_manager=apps_manager) + tm = TaskManager( + dt_p2p_factory.Node(), + Mock(), + root_path=self.path, + config_desc=ClientConfigDescriptor(), + apps_manager=apps_manager) task_id, subtask_id = self.__build_tasks(tm, count) one_task = tm.get_task_dict(task_id) @@ -928,8 +939,12 @@ def test_get_task_preview(self, get_preview, _): apps_manager = AppsManager() apps_manager.load_all_apps() ln = LocalNode(**dt_p2p_factory.Node().to_dict()) - tm = TaskManager(ln, Mock(), root_path=self.path, - apps_manager=apps_manager) + tm = TaskManager( + ln, + Mock(), + root_path=self.path, + config_desc=ClientConfigDescriptor(), + apps_manager=apps_manager) task_id, _ = self.__build_tasks(tm, 1) tm.get_task_preview(task_id) @@ -940,8 +955,12 @@ def test_get_subtasks_borders(self, _): count = 3 apps_manager = AppsManager() apps_manager.load_all_apps() - tm = TaskManager(dt_p2p_factory.Node(), Mock(), root_path=self.path, - apps_manager=apps_manager) + tm = TaskManager( + dt_p2p_factory.Node(), + Mock(), + root_path=self.path, + config_desc=ClientConfigDescriptor(), + apps_manager=apps_manager) task_id, _ = self.__build_tasks(tm, count) borders = tm.get_subtasks_borders(task_id, 0) @@ -1276,6 +1295,7 @@ def setUp(self): node=dt_p2p_factory.Node(), keys_auth=MagicMock(spec=KeysAuth), root_path='/tmp', + config_desc=ClientConfigDescriptor(), task_persistence=False ) diff --git a/tests/golem/task/test_taskserver.py b/tests/golem/task/test_taskserver.py index 771d3b1219..19ae0564cf 100644 --- a/tests/golem/task/test_taskserver.py +++ b/tests/golem/task/test_taskserver.py @@ -21,6 +21,7 @@ from requests import HTTPError from golem import testutils +from golem.appconfig import AppConfig from golem.clientconfigdescriptor import ClientConfigDescriptor from golem.core.common import node_info_str from golem.core.keysauth import KeysAuth @@ -42,6 +43,7 @@ from golem.tools.testwithreactor import TestDatabaseWithReactor from tests.factories.resultpackage import ExtractedPackageFactory +from tests.factories.hyperdrive import hyperdrive_client_kwargs DEFAULT_RESOURCE_SIZE: int = 2 * 1024 @@ -108,6 +110,8 @@ def setUp(self): super().setUp() random.seed() self.ccd = ClientConfigDescriptor() + self.ccd.init_from_app_config( + AppConfig.load_config(tempfile.mkdtemp(), 'cfg')) self.client.concent_service.enabled = False with patch( 'golem.network.concent.handlers_library.HandlersLibrary' @@ -219,6 +223,7 @@ def test_request(self, tar, *_): return_value=SupportStatus(True), ) def test_request_task_concent_required(self, *_): + self.ts.config_desc.min_price = 0 self.ts.client.concent_service.enabled = True self.ts.task_archiver = Mock() keys_auth = KeysAuth(self.path, 'prv_key', '') @@ -238,8 +243,7 @@ def test_request_task_concent_required(self, *_): @patch("golem.task.taskserver.Trust") def test_send_results(self, trust, *_): - ccd = ClientConfigDescriptor() - ccd.min_price = 11 + self.ts.config_desc.min_price = 11 keys_auth = KeysAuth(self.path, 'priv_key', '') task_header = get_example_task_header(keys_auth.public_key) n = task_header.task_owner @@ -1010,7 +1014,7 @@ def test_new_connection(self, *_): def test_download_options(self, *_): dm = DirManager(self.path) - rm = HyperdriveResourceManager(dm) + rm = HyperdriveResourceManager(dm, **hyperdrive_client_kwargs()) # noqa pylint: disable=unexpected-keyword-arg self.client.resource_server.resource_manager = rm ts = self.ts diff --git a/tests/golem/test_client.py b/tests/golem/test_client.py index 8f1f8d0bf3..bd6ac10458 100644 --- a/tests/golem/test_client.py +++ b/tests/golem/test_client.py @@ -20,6 +20,9 @@ from golem import model from golem import testutils +from golem.appconfig import ( + DEFAULT_HYPERDRIVE_RPC_PORT, DEFAULT_HYPERDRIVE_RPC_ADDRESS +) from golem.client import Client, ClientTaskComputerEventListener, \ DoWorkService, MonitoringPublisherService, \ NetworkConnectionPublisherService, \ @@ -110,6 +113,10 @@ def make_client(*_, **kwargs): 'use_monitor': False, 'concent_variant': CONCENT_CHOICES['disabled'], } + default_kwargs['config_desc'].hyperdrive_rpc_address = \ + DEFAULT_HYPERDRIVE_RPC_ADDRESS + default_kwargs['config_desc'].hyperdrive_rpc_port = \ + DEFAULT_HYPERDRIVE_RPC_PORT default_kwargs.update(kwargs) client = Client(**default_kwargs) return client