From f440c41db8cfd9ba7a696b78c386e20dfcc67b9c Mon Sep 17 00:00:00 2001 From: Adam Mizerski Date: Wed, 22 Apr 2020 17:49:06 +0200 Subject: [PATCH] fix config update failure --- golem/envs/__init__.py | 37 +++++++- golem/envs/docker/cpu.py | 4 +- golem/task/taskcomputer.py | 20 ++-- .../nodes/provider/configure_or_die.py | 43 +++++++++ .../__init__.py | 0 .../playbook.py | 75 +++++++++++++++ .../test_config.py | 9 ++ tests/golem/envs/docker/cpu/test_env.py | 5 - tests/golem/envs/test_env.py | 92 ++++++++++++++++++- tests/golem/task/test_newtaskcomputer.py | 15 +-- tests/golem/task/test_taskcomputeradapter.py | 5 +- 11 files changed, 267 insertions(+), 38 deletions(-) create mode 100644 scripts/node_integration_tests/nodes/provider/configure_or_die.py create mode 100644 scripts/node_integration_tests/playbooks/golem/reconfigure_provider_while_working/__init__.py create mode 100644 scripts/node_integration_tests/playbooks/golem/reconfigure_provider_while_working/playbook.py create mode 100644 scripts/node_integration_tests/playbooks/golem/reconfigure_provider_while_working/test_config.py diff --git a/golem/envs/__init__.py b/golem/envs/__init__.py index 03a0d5ca3f..e344044846 100644 --- a/golem/envs/__init__.py +++ b/golem/envs/__init__.py @@ -6,7 +6,7 @@ from threading import RLock from typing import Any, Callable, Dict, List, Optional, NamedTuple, Union, \ - Sequence, Iterable, ContextManager, Set, Tuple, TYPE_CHECKING + Sequence, Iterable, ContextManager, Set, Tuple, Type, TYPE_CHECKING from dataclasses import dataclass, field from twisted.internet.threads import deferToThread @@ -599,3 +599,38 @@ def listen( listener: EnvEventListener ) -> None: self._event_listeners.setdefault(event_type, set()).add(listener) + + +def delayed_config(cls: Type[EnvironmentBase]) -> Type[EnvironmentBase]: + """ + This class decorator allows to save config update and apply it, when env is + disabled. + """ + # FIXME workaround https://github.com/python/mypy/issues/5865 + cls2: Any = cls + + class DelayedConfigWrapper(cls2): + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self._next_config: Optional[EnvConfig] = None + + def apply_next_config(_): + if self._next_config is None: + return + self._logger.debug("Applying saved config") + config = self._next_config + self._next_config = None + cls.update_config(self, config) + + self.listen(EnvEventType.DISABLED, apply_next_config) + + def update_config(self, new_config: EnvConfig) -> None: + if self._status == EnvStatus.DISABLED: + self._logger.debug("Config applied immediately") + super().update_config(new_config) + return + + self._logger.debug("Config saved for later") + self._next_config = new_config + + return DelayedConfigWrapper diff --git a/golem/envs/docker/cpu.py b/golem/envs/docker/cpu.py index f9b0c48dc5..0a9ade1ce8 100644 --- a/golem/envs/docker/cpu.py +++ b/golem/envs/docker/cpu.py @@ -41,7 +41,8 @@ RuntimePayload, RuntimeStatus, UsageCounter, - UsageCounterValues + UsageCounterValues, + delayed_config, ) from golem.envs.docker import DockerRuntimePayload, DockerPrerequisites from golem.envs.docker.whitelist import Whitelist @@ -496,6 +497,7 @@ def usage_counter_values(self) -> UsageCounterValues: return deepcopy(self._counters) +@delayed_config class DockerCPUEnvironment(EnvironmentBase): MIN_MEMORY_MB: ClassVar[int] = 1024 diff --git a/golem/task/taskcomputer.py b/golem/task/taskcomputer.py index cf4d60e9ca..c3c5dfdc4c 100644 --- a/golem/task/taskcomputer.py +++ b/golem/task/taskcomputer.py @@ -77,7 +77,7 @@ def __init__( ) self._new_computer = NewTaskComputer( env_manager=env_manager, - work_dir=task_server.get_task_computer_root(), + work_dir=Path(task_server.get_task_computer_root()), stats_keeper=self.stats ) @@ -263,10 +263,8 @@ def change_config( config_desc: 'ClientConfigDescriptor', in_background: bool = True ) -> defer.Deferred: - work_dir = Path(self._task_server.get_task_computer_root()) - yield self._new_computer.change_config( - config_desc=config_desc, - work_dir=work_dir) + self._new_computer.change_config( + config_desc=config_desc) return (yield self._old_computer.change_config( config_desc=config_desc, in_background=in_background)) @@ -480,13 +478,9 @@ def get_current_computing_env(self) -> 'Optional[EnvId]': def change_config( self, config_desc: 'ClientConfigDescriptor', - work_dir: Path - ) -> defer.Deferred: - assert not self._is_computing() - self._work_dir = work_dir - + ) -> None: config_dict = dict( - work_dirs=[work_dir], + work_dirs=[self._work_dir], cpu_count=config_desc.num_cores, memory_mb=scale_memory( config_desc.max_memory_size, @@ -495,7 +489,7 @@ def change_config( ) ) - # FIXME: Decide how to properly configure environments + # FIXME: Decide how to properly configure environments - is it still relevant? if self._env_manager.enabled(DOCKER_CPU_ENV_ID): docker_cpu = self._env_manager.environment(DOCKER_CPU_ENV_ID) docker_cpu.update_config(DockerCPUConfig(**config_dict)) @@ -505,8 +499,6 @@ def change_config( # TODO: GPU options in config_dict docker_gpu.update_config(DockerGPUConfig(**config_dict)) - return defer.succeed(None) - def quit(self): if self.has_assigned_task(): self.task_interrupted() diff --git a/scripts/node_integration_tests/nodes/provider/configure_or_die.py b/scripts/node_integration_tests/nodes/provider/configure_or_die.py new file mode 100644 index 0000000000..468db0a3d1 --- /dev/null +++ b/scripts/node_integration_tests/nodes/provider/configure_or_die.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python + +import logging +from unittest.mock import patch + +from twisted.internet.defer import inlineCallbacks + +from golemapp import main + +from golem.client import Client +from golem.task.taskserver import TaskServer + + +def on_exception(): + logging.critical("#### Integration test failed ####") + + +client_change_config_orig = Client.change_config + + +def client_change_config(self: Client, *args, **kwargs): + try: + client_change_config_orig(self, *args, **kwargs) + except: # noqa pylint: disable=broad-except + on_exception() + + +task_server_change_config_orig = TaskServer.change_config + + +@inlineCallbacks +def task_server_change_config(self: TaskServer, *args, **kwargs): + try: + yield task_server_change_config_orig(self, *args, **kwargs) + except: # noqa pylint: disable=broad-except + on_exception() + + +with patch("golem.client.Client.change_config", + client_change_config), \ + patch("golem.task.taskserver.TaskServer.change_config", + task_server_change_config): + main() diff --git a/scripts/node_integration_tests/playbooks/golem/reconfigure_provider_while_working/__init__.py b/scripts/node_integration_tests/playbooks/golem/reconfigure_provider_while_working/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/scripts/node_integration_tests/playbooks/golem/reconfigure_provider_while_working/playbook.py b/scripts/node_integration_tests/playbooks/golem/reconfigure_provider_while_working/playbook.py new file mode 100644 index 0000000000..7374bbdf51 --- /dev/null +++ b/scripts/node_integration_tests/playbooks/golem/reconfigure_provider_while_working/playbook.py @@ -0,0 +1,75 @@ +import time +from functools import partial + +from scripts.node_integration_tests import helpers +from ...test_config_base import NodeId +from ..task_api.playbook import Playbook as BasePlaybook + + +class Playbook(BasePlaybook): + def wait_for_computing_task(self): + def on_success(result): + state = result['provider_state'] + print(f"provider state: {state}") + if state['status'] == 'Computing': + self.next() + else: + time.sleep(10) + + def on_error(_): + print(f"failed getting provider stats") + self.fail() + return self.call(NodeId.provider, 'comp.tasks.stats', + on_success=on_success, on_error=on_error) + + def ui_stop(self, node_id: NodeId): + def on_success(_): + print(f"stopped {node_id.value}") + self.next() + + def on_error(_): + print(f"stopping {node_id.value} failed") + self.fail() + return self.call(node_id, 'ui.stop', on_success=on_success, + on_error=on_error) + + def change_config(self, node_id: NodeId): + opts = { + "node_name": "a new name", + } + + def on_success(_): + print(f"reconfigured {node_id.value}") + time.sleep(10) # give time for async operations to process + self.next() + + def on_error(_): + print(f"reconfiguring {node_id.value} failed") + self.fail() + + return self.call(node_id, 'env.opts.update', opts, + on_success=on_success, on_error=on_error) + + def check_if_test_failed(self, node_id: NodeId): + test_failed = bool(helpers.search_output( + self.output_queues[node_id], + ".*#### Integration test failed ####.*")) + + if test_failed: + self.fail("found failure marker in log") + + print("no failure marker found in log") + self.next() + + steps = BasePlaybook.initial_steps + ( + BasePlaybook.step_enable_app, + BasePlaybook.step_create_task, + BasePlaybook.step_get_task_id, + BasePlaybook.step_get_task_status, + wait_for_computing_task, + partial(ui_stop, node_id=NodeId.provider), + partial(change_config, node_id=NodeId.provider), + partial(check_if_test_failed, node_id=NodeId.provider), + # No need to wait for subtask to finish, because it's a waste of time + # and because it's gets cancelled, because task session is closed. + ) diff --git a/scripts/node_integration_tests/playbooks/golem/reconfigure_provider_while_working/test_config.py b/scripts/node_integration_tests/playbooks/golem/reconfigure_provider_while_working/test_config.py new file mode 100644 index 0000000000..ff69680000 --- /dev/null +++ b/scripts/node_integration_tests/playbooks/golem/reconfigure_provider_while_working/test_config.py @@ -0,0 +1,9 @@ +from ...test_config_base import NodeId + +from ..task_api.test_config import TestConfig as TestConfigBase + + +class TestConfig(TestConfigBase): + def __init__(self): + super().__init__() + self.nodes[NodeId.provider].script = 'provider/configure_or_die' diff --git a/tests/golem/envs/docker/cpu/test_env.py b/tests/golem/envs/docker/cpu/test_env.py index 167babe68e..b5c049d683 100644 --- a/tests/golem/envs/docker/cpu/test_env.py +++ b/tests/golem/envs/docker/cpu/test_env.py @@ -329,11 +329,6 @@ def test_wrong_type(self): with self.assertRaises(AssertionError): self.env.update_config(object()) - def test_enabled_status(self): - self.env._status = EnvStatus.ENABLED - with self.assertRaises(ValueError): - self.env.update_config(Mock(spec=DockerCPUConfig)) - @patch_env('_validate_config', side_effect=ValueError) def test_invalid_config(self, validate): config = Mock(spec=DockerCPUConfig) diff --git a/tests/golem/envs/test_env.py b/tests/golem/envs/test_env.py index e7fb5e8401..a7906b81b3 100644 --- a/tests/golem/envs/test_env.py +++ b/tests/golem/envs/test_env.py @@ -2,13 +2,18 @@ from unittest import TestCase from unittest.mock import Mock, patch +from dataclasses import dataclass + +from twisted.internet import defer + from golem.envs import ( EnvConfig, EnvEvent, EnvEventType, EnvironmentBase, EnvStatus, - Prerequisites + Prerequisites, + delayed_config, ) @@ -121,3 +126,88 @@ def test_re_register(self): self.assertEqual(self.env._event_listeners, { EnvEventType.ERROR_OCCURRED: {listener} }) + + +@dataclass +class MyConfig(EnvConfig): + i: int + + def to_dict(self) -> dict: + pass + + @staticmethod + def from_dict(data): + pass + + +@delayed_config +class MyEnv(EnvironmentBase): + def __init__(self, config: MyConfig) -> None: + super().__init__() + self._config = config + + def update_config(self, config: EnvConfig) -> None: + assert isinstance(config, MyConfig) + self._logger.debug("dupa %r", self._event_listeners) + if self._status != EnvStatus.DISABLED: + raise ValueError + self._config = config + self._config_updated(config) + + def config(self) -> MyConfig: + return self._config + + @classmethod + def supported(cls): + raise NotImplementedError + + def prepare(self): + raise NotImplementedError + + def clean_up(self): + raise NotImplementedError + + def run_benchmark(self): + raise NotImplementedError + + def parse_prerequisites(self, prerequisites_dict): + raise NotImplementedError + + def install_prerequisites(self, prerequisites): + raise NotImplementedError + + def parse_config(self, config_dict): + raise NotImplementedError + + def supported_usage_counters(self): + raise NotImplementedError + + def runtime(self, payload, config=None): + raise NotImplementedError + + +def execute(f, *args, **kwargs): + try: + return defer.succeed(f(*args, **kwargs)) + except Exception as exc: # pylint: disable=broad-except + return defer.fail(exc) + + +@patch('golem.envs.deferToThread', execute) +class TestDelayedConfig(TestCase): + + def setUp(self) -> None: + config = MyConfig(i=1) + # pylint: disable=abstract-class-instantiated + self.env = MyEnv(config) + + def test_update_config_when_disabled(self): + self.env.update_config(MyConfig(i=2)) + self.assertEqual(self.env.config().i, 2) + + def test_update_config_when_enabled(self): + self.env._env_enabled() + self.env.update_config(MyConfig(i=2)) + self.assertEqual(self.env.config().i, 1) + self.env._env_disabled() + self.assertEqual(self.env.config().i, 2) diff --git a/tests/golem/task/test_newtaskcomputer.py b/tests/golem/task/test_newtaskcomputer.py index 1b3f7b7b4c..991420fdb0 100644 --- a/tests/golem/task/test_newtaskcomputer.py +++ b/tests/golem/task/test_newtaskcomputer.py @@ -320,28 +320,17 @@ def test_client_client_and_compute(self): class TestChangeConfig(NewTaskComputerTestBase): - @defer.inlineCallbacks - def test_computation_running(self): - self.task_computer._computation = mock.Mock() - work_dir = Path('test_dir') - config_desc = ClientConfigDescriptor() - with self.assertRaises(AssertionError): - yield self.task_computer.change_config(config_desc, work_dir) - - @defer.inlineCallbacks def test_ok(self): - work_dir = Path('test_dir') config_desc = ClientConfigDescriptor() config_desc.num_cores = 13 config_desc.max_memory_size = 1024 * 1024 - yield self.task_computer.change_config(config_desc, work_dir) + self.task_computer.change_config(config_desc) - self.assertEqual(self.task_computer._work_dir, work_dir) self.env_manager.environment.assert_called_once_with(DOCKER_CPU_ENV_ID) self.env_manager.environment().update_config.assert_called_once_with( DockerCPUConfig( - work_dirs=[Path('test_dir')], + work_dirs=[self.work_dir], cpu_count=13, memory_mb=1024, ) diff --git a/tests/golem/task/test_taskcomputeradapter.py b/tests/golem/task/test_taskcomputeradapter.py index cbc4d64b0a..c735de6028 100644 --- a/tests/golem/task/test_taskcomputeradapter.py +++ b/tests/golem/task/test_taskcomputeradapter.py @@ -34,7 +34,8 @@ def setUp(self, new_task_computer, old_task_computer, int_stats_keeper): # noqa self.task_server = mock.Mock( spec=TaskServer, config_desc=config_desc, - task_keeper=self.task_keeper + task_keeper=self.task_keeper, + get_task_computer_root=mock.Mock(return_value=Path("/tmp")), ) self.env_manager = mock.Mock(spec_set=EnvironmentManager) self.finished_callback = mock.Mock() @@ -336,11 +337,9 @@ def test_compute_tasks_setting(self): @defer.inlineCallbacks def test_both_computers_reconfigured(self): config_desc = ClientConfigDescriptor() - self.task_server.get_task_computer_root.return_value = '/test' yield self.adapter.change_config(config_desc) self.new_computer.change_config.assert_called_once_with( config_desc=config_desc, - work_dir=Path('/test') ) self.old_computer.change_config.assert_called_once_with( config_desc=config_desc,