Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
fix config update failure
Browse files Browse the repository at this point in the history
  • Loading branch information
etam committed Apr 24, 2020
1 parent b029636 commit f440c41
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 38 deletions.
37 changes: 36 additions & 1 deletion golem/envs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from threading import RLock

from typing import Any, Callable, Dict, List, Optional, NamedTuple, Union, \
Sequence, Iterable, ContextManager, Set, Tuple, TYPE_CHECKING
Sequence, Iterable, ContextManager, Set, Tuple, Type, TYPE_CHECKING

from dataclasses import dataclass, field
from twisted.internet.threads import deferToThread
Expand Down Expand Up @@ -599,3 +599,38 @@ def listen(
listener: EnvEventListener
) -> None:
self._event_listeners.setdefault(event_type, set()).add(listener)


def delayed_config(cls: Type[EnvironmentBase]) -> Type[EnvironmentBase]:
"""
This class decorator allows to save config update and apply it, when env is
disabled.
"""
# FIXME workaround https://github.com/python/mypy/issues/5865
cls2: Any = cls

class DelayedConfigWrapper(cls2):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._next_config: Optional[EnvConfig] = None

def apply_next_config(_):
if self._next_config is None:
return
self._logger.debug("Applying saved config")
config = self._next_config
self._next_config = None
cls.update_config(self, config)

self.listen(EnvEventType.DISABLED, apply_next_config)

def update_config(self, new_config: EnvConfig) -> None:
if self._status == EnvStatus.DISABLED:
self._logger.debug("Config applied immediately")
super().update_config(new_config)
return

self._logger.debug("Config saved for later")
self._next_config = new_config

return DelayedConfigWrapper
4 changes: 3 additions & 1 deletion golem/envs/docker/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
RuntimePayload,
RuntimeStatus,
UsageCounter,
UsageCounterValues
UsageCounterValues,
delayed_config,
)
from golem.envs.docker import DockerRuntimePayload, DockerPrerequisites
from golem.envs.docker.whitelist import Whitelist
Expand Down Expand Up @@ -496,6 +497,7 @@ def usage_counter_values(self) -> UsageCounterValues:
return deepcopy(self._counters)


@delayed_config
class DockerCPUEnvironment(EnvironmentBase):

MIN_MEMORY_MB: ClassVar[int] = 1024
Expand Down
20 changes: 6 additions & 14 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(
)
self._new_computer = NewTaskComputer(
env_manager=env_manager,
work_dir=task_server.get_task_computer_root(),
work_dir=Path(task_server.get_task_computer_root()),
stats_keeper=self.stats
)

Expand Down Expand Up @@ -263,10 +263,8 @@ def change_config(
config_desc: 'ClientConfigDescriptor',
in_background: bool = True
) -> defer.Deferred:
work_dir = Path(self._task_server.get_task_computer_root())
yield self._new_computer.change_config(
config_desc=config_desc,
work_dir=work_dir)
self._new_computer.change_config(
config_desc=config_desc)
return (yield self._old_computer.change_config(
config_desc=config_desc,
in_background=in_background))
Expand Down Expand Up @@ -480,13 +478,9 @@ def get_current_computing_env(self) -> 'Optional[EnvId]':
def change_config(
self,
config_desc: 'ClientConfigDescriptor',
work_dir: Path
) -> defer.Deferred:
assert not self._is_computing()
self._work_dir = work_dir

) -> None:
config_dict = dict(
work_dirs=[work_dir],
work_dirs=[self._work_dir],
cpu_count=config_desc.num_cores,
memory_mb=scale_memory(
config_desc.max_memory_size,
Expand All @@ -495,7 +489,7 @@ def change_config(
)
)

# FIXME: Decide how to properly configure environments
# FIXME: Decide how to properly configure environments - is it still relevant?
if self._env_manager.enabled(DOCKER_CPU_ENV_ID):
docker_cpu = self._env_manager.environment(DOCKER_CPU_ENV_ID)
docker_cpu.update_config(DockerCPUConfig(**config_dict))
Expand All @@ -505,8 +499,6 @@ def change_config(
# TODO: GPU options in config_dict
docker_gpu.update_config(DockerGPUConfig(**config_dict))

return defer.succeed(None)

def quit(self):
if self.has_assigned_task():
self.task_interrupted()
Expand Down
43 changes: 43 additions & 0 deletions scripts/node_integration_tests/nodes/provider/configure_or_die.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/env python

import logging
from unittest.mock import patch

from twisted.internet.defer import inlineCallbacks

from golemapp import main

from golem.client import Client
from golem.task.taskserver import TaskServer


def on_exception():
logging.critical("#### Integration test failed ####")


client_change_config_orig = Client.change_config


def client_change_config(self: Client, *args, **kwargs):
try:
client_change_config_orig(self, *args, **kwargs)
except: # noqa pylint: disable=broad-except
on_exception()


task_server_change_config_orig = TaskServer.change_config


@inlineCallbacks
def task_server_change_config(self: TaskServer, *args, **kwargs):
try:
yield task_server_change_config_orig(self, *args, **kwargs)
except: # noqa pylint: disable=broad-except
on_exception()


with patch("golem.client.Client.change_config",
client_change_config), \
patch("golem.task.taskserver.TaskServer.change_config",
task_server_change_config):
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import time
from functools import partial

from scripts.node_integration_tests import helpers
from ...test_config_base import NodeId
from ..task_api.playbook import Playbook as BasePlaybook


class Playbook(BasePlaybook):
def wait_for_computing_task(self):
def on_success(result):
state = result['provider_state']
print(f"provider state: {state}")
if state['status'] == 'Computing':
self.next()
else:
time.sleep(10)

def on_error(_):
print(f"failed getting provider stats")
self.fail()
return self.call(NodeId.provider, 'comp.tasks.stats',
on_success=on_success, on_error=on_error)

def ui_stop(self, node_id: NodeId):
def on_success(_):
print(f"stopped {node_id.value}")
self.next()

def on_error(_):
print(f"stopping {node_id.value} failed")
self.fail()
return self.call(node_id, 'ui.stop', on_success=on_success,
on_error=on_error)

def change_config(self, node_id: NodeId):
opts = {
"node_name": "a new name",
}

def on_success(_):
print(f"reconfigured {node_id.value}")
time.sleep(10) # give time for async operations to process
self.next()

def on_error(_):
print(f"reconfiguring {node_id.value} failed")
self.fail()

return self.call(node_id, 'env.opts.update', opts,
on_success=on_success, on_error=on_error)

def check_if_test_failed(self, node_id: NodeId):
test_failed = bool(helpers.search_output(
self.output_queues[node_id],
".*#### Integration test failed ####.*"))

if test_failed:
self.fail("found failure marker in log")

print("no failure marker found in log")
self.next()

steps = BasePlaybook.initial_steps + (
BasePlaybook.step_enable_app,
BasePlaybook.step_create_task,
BasePlaybook.step_get_task_id,
BasePlaybook.step_get_task_status,
wait_for_computing_task,
partial(ui_stop, node_id=NodeId.provider),
partial(change_config, node_id=NodeId.provider),
partial(check_if_test_failed, node_id=NodeId.provider),
# No need to wait for subtask to finish, because it's a waste of time
# and because it's gets cancelled, because task session is closed.
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from ...test_config_base import NodeId

from ..task_api.test_config import TestConfig as TestConfigBase


class TestConfig(TestConfigBase):
def __init__(self):
super().__init__()
self.nodes[NodeId.provider].script = 'provider/configure_or_die'
5 changes: 0 additions & 5 deletions tests/golem/envs/docker/cpu/test_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,6 @@ def test_wrong_type(self):
with self.assertRaises(AssertionError):
self.env.update_config(object())

def test_enabled_status(self):
self.env._status = EnvStatus.ENABLED
with self.assertRaises(ValueError):
self.env.update_config(Mock(spec=DockerCPUConfig))

@patch_env('_validate_config', side_effect=ValueError)
def test_invalid_config(self, validate):
config = Mock(spec=DockerCPUConfig)
Expand Down
92 changes: 91 additions & 1 deletion tests/golem/envs/test_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
from unittest import TestCase
from unittest.mock import Mock, patch

from dataclasses import dataclass

from twisted.internet import defer

from golem.envs import (
EnvConfig,
EnvEvent,
EnvEventType,
EnvironmentBase,
EnvStatus,
Prerequisites
Prerequisites,
delayed_config,
)


Expand Down Expand Up @@ -121,3 +126,88 @@ def test_re_register(self):
self.assertEqual(self.env._event_listeners, {
EnvEventType.ERROR_OCCURRED: {listener}
})


@dataclass
class MyConfig(EnvConfig):
i: int

def to_dict(self) -> dict:
pass

@staticmethod
def from_dict(data):
pass


@delayed_config
class MyEnv(EnvironmentBase):
def __init__(self, config: MyConfig) -> None:
super().__init__()
self._config = config

def update_config(self, config: EnvConfig) -> None:
assert isinstance(config, MyConfig)
self._logger.debug("dupa %r", self._event_listeners)
if self._status != EnvStatus.DISABLED:
raise ValueError
self._config = config
self._config_updated(config)

def config(self) -> MyConfig:
return self._config

@classmethod
def supported(cls):
raise NotImplementedError

def prepare(self):
raise NotImplementedError

def clean_up(self):
raise NotImplementedError

def run_benchmark(self):
raise NotImplementedError

def parse_prerequisites(self, prerequisites_dict):
raise NotImplementedError

def install_prerequisites(self, prerequisites):
raise NotImplementedError

def parse_config(self, config_dict):
raise NotImplementedError

def supported_usage_counters(self):
raise NotImplementedError

def runtime(self, payload, config=None):
raise NotImplementedError


def execute(f, *args, **kwargs):
try:
return defer.succeed(f(*args, **kwargs))
except Exception as exc: # pylint: disable=broad-except
return defer.fail(exc)


@patch('golem.envs.deferToThread', execute)
class TestDelayedConfig(TestCase):

def setUp(self) -> None:
config = MyConfig(i=1)
# pylint: disable=abstract-class-instantiated
self.env = MyEnv(config)

def test_update_config_when_disabled(self):
self.env.update_config(MyConfig(i=2))
self.assertEqual(self.env.config().i, 2)

def test_update_config_when_enabled(self):
self.env._env_enabled()
self.env.update_config(MyConfig(i=2))
self.assertEqual(self.env.config().i, 1)
self.env._env_disabled()
self.assertEqual(self.env.config().i, 2)
Loading

0 comments on commit f440c41

Please sign in to comment.