Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Merge pull request #5171 from golemfactory/fix-config-cannot-be-updated
Browse files Browse the repository at this point in the history
Fix config cannot be updated
  • Loading branch information
etam committed Apr 27, 2020
2 parents b029636 + cb62998 commit cda6a9f
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 38 deletions.
51 changes: 49 additions & 2 deletions golem/envs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
from copy import deepcopy
from enum import Enum
from logging import Logger, getLogger
from threading import RLock
from threading import Lock, RLock

from typing import Any, Callable, Dict, List, Optional, NamedTuple, Union, \
Sequence, Iterable, ContextManager, Set, Tuple, TYPE_CHECKING
Sequence, Iterable, ContextManager, Set, Tuple, Type, TYPE_CHECKING

from dataclasses import dataclass, field
from twisted.internet.threads import deferToThread
Expand Down Expand Up @@ -599,3 +599,50 @@ def listen(
listener: EnvEventListener
) -> None:
self._event_listeners.setdefault(event_type, set()).add(listener)


def delayed_config(cls: Type[Environment]) -> Type[Environment]:
"""
This class decorator allows to save config update and apply it, when env is
disabled.
Mutex prevents the following scenario
Thread 1 Thread 2
call apply_next_config
call update_config
status is disabled, so calls
super().update_config
cls.update_config
with _next_config
"""
# FIXME workaround https://github.com/python/mypy/issues/5865
cls2: Any = cls

class DelayedConfigWrapper(cls2):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._next_config: Optional[EnvConfig] = None
self._config_lock = Lock()

def apply_next_config(_):
with self._config_lock:
if self._next_config is None:
return
self._logger.debug("Applying saved config")
config = self._next_config
self._next_config = None
cls.update_config(self, config)

self.listen(EnvEventType.DISABLED, apply_next_config)

def update_config(self, new_config: EnvConfig) -> None:
with self._config_lock:
if self._status == EnvStatus.DISABLED:
self._logger.debug("Config applied immediately")
super().update_config(new_config)
return

self._logger.debug("Config saved for later")
self._next_config = new_config

return DelayedConfigWrapper
4 changes: 3 additions & 1 deletion golem/envs/docker/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
RuntimePayload,
RuntimeStatus,
UsageCounter,
UsageCounterValues
UsageCounterValues,
delayed_config,
)
from golem.envs.docker import DockerRuntimePayload, DockerPrerequisites
from golem.envs.docker.whitelist import Whitelist
Expand Down Expand Up @@ -496,6 +497,7 @@ def usage_counter_values(self) -> UsageCounterValues:
return deepcopy(self._counters)


@delayed_config
class DockerCPUEnvironment(EnvironmentBase):

MIN_MEMORY_MB: ClassVar[int] = 1024
Expand Down
18 changes: 5 additions & 13 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(
)
self._new_computer = NewTaskComputer(
env_manager=env_manager,
work_dir=task_server.get_task_computer_root(),
work_dir=Path(task_server.get_task_computer_root()),
stats_keeper=self.stats
)

Expand Down Expand Up @@ -263,10 +263,8 @@ def change_config(
config_desc: 'ClientConfigDescriptor',
in_background: bool = True
) -> defer.Deferred:
work_dir = Path(self._task_server.get_task_computer_root())
yield self._new_computer.change_config(
config_desc=config_desc,
work_dir=work_dir)
self._new_computer.change_config(
config_desc=config_desc)
return (yield self._old_computer.change_config(
config_desc=config_desc,
in_background=in_background))
Expand Down Expand Up @@ -480,13 +478,9 @@ def get_current_computing_env(self) -> 'Optional[EnvId]':
def change_config(
self,
config_desc: 'ClientConfigDescriptor',
work_dir: Path
) -> defer.Deferred:
assert not self._is_computing()
self._work_dir = work_dir

) -> None:
config_dict = dict(
work_dirs=[work_dir],
work_dirs=[self._work_dir],
cpu_count=config_desc.num_cores,
memory_mb=scale_memory(
config_desc.max_memory_size,
Expand All @@ -505,8 +499,6 @@ def change_config(
# TODO: GPU options in config_dict
docker_gpu.update_config(DockerGPUConfig(**config_dict))

return defer.succeed(None)

def quit(self):
if self.has_assigned_task():
self.task_interrupted()
Expand Down
43 changes: 43 additions & 0 deletions scripts/node_integration_tests/nodes/provider/configure_or_die.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/env python

import logging
from unittest.mock import patch

from twisted.internet.defer import inlineCallbacks

from golemapp import main

from golem.client import Client
from golem.task.taskserver import TaskServer


def on_exception():
logging.critical("#### Integration test failed ####")


client_change_config_orig = Client.change_config


def client_change_config(self: Client, *args, **kwargs):
try:
client_change_config_orig(self, *args, **kwargs)
except: # noqa pylint: disable=broad-except
on_exception()


task_server_change_config_orig = TaskServer.change_config


@inlineCallbacks
def task_server_change_config(self: TaskServer, *args, **kwargs):
try:
yield task_server_change_config_orig(self, *args, **kwargs)
except: # noqa pylint: disable=broad-except
on_exception()


with patch("golem.client.Client.change_config",
client_change_config), \
patch("golem.task.taskserver.TaskServer.change_config",
task_server_change_config):
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import time
from functools import partial

from scripts.node_integration_tests import helpers
from ...test_config_base import NodeId
from ..task_api.playbook import Playbook as BasePlaybook


class Playbook(BasePlaybook):
def wait_for_computing_task(self):
def on_success(result):
state = result['provider_state']
print(f"provider state: {state}")
if state['status'] == 'Computing':
self.next()
else:
time.sleep(10)

def on_error(_):
print(f"failed getting provider stats")
self.fail()
return self.call(NodeId.provider, 'comp.tasks.stats',
on_success=on_success, on_error=on_error)

def ui_stop(self, node_id: NodeId):
def on_success(_):
print(f"stopped {node_id.value}")
self.next()

def on_error(_):
print(f"stopping {node_id.value} failed")
self.fail()
return self.call(node_id, 'ui.stop', on_success=on_success,
on_error=on_error)

def change_config(self, node_id: NodeId):
opts = {
"node_name": "a new name",
}

def on_success(_):
print(f"reconfigured {node_id.value}")
time.sleep(10) # give time for async operations to process
self.next()

def on_error(_):
print(f"reconfiguring {node_id.value} failed")
self.fail()

return self.call(node_id, 'env.opts.update', opts,
on_success=on_success, on_error=on_error)

def check_if_test_failed(self, node_id: NodeId):
test_failed = bool(helpers.search_output(
self.output_queues[node_id],
".*#### Integration test failed ####.*"))

if test_failed:
self.fail("found failure marker in log")

print("no failure marker found in log")
self.next()

steps = BasePlaybook.initial_steps + (
BasePlaybook.step_enable_app,
BasePlaybook.step_create_task,
BasePlaybook.step_get_task_id,
BasePlaybook.step_get_task_status,
wait_for_computing_task,
partial(ui_stop, node_id=NodeId.provider),
partial(change_config, node_id=NodeId.provider),
partial(check_if_test_failed, node_id=NodeId.provider),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from ...test_config_base import NodeId

from ..task_api.test_config import TestConfig as TestConfigBase


class TestConfig(TestConfigBase):
def __init__(self):
super().__init__()
self.nodes[NodeId.provider].script = 'provider/configure_or_die'
5 changes: 0 additions & 5 deletions tests/golem/envs/docker/cpu/test_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,6 @@ def test_wrong_type(self):
with self.assertRaises(AssertionError):
self.env.update_config(object())

def test_enabled_status(self):
self.env._status = EnvStatus.ENABLED
with self.assertRaises(ValueError):
self.env.update_config(Mock(spec=DockerCPUConfig))

@patch_env('_validate_config', side_effect=ValueError)
def test_invalid_config(self, validate):
config = Mock(spec=DockerCPUConfig)
Expand Down
91 changes: 90 additions & 1 deletion tests/golem/envs/test_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
from unittest import TestCase
from unittest.mock import Mock, patch

from dataclasses import dataclass

from twisted.internet import defer

from golem.envs import (
EnvConfig,
EnvEvent,
EnvEventType,
EnvironmentBase,
EnvStatus,
Prerequisites
Prerequisites,
delayed_config,
)


Expand Down Expand Up @@ -121,3 +126,87 @@ def test_re_register(self):
self.assertEqual(self.env._event_listeners, {
EnvEventType.ERROR_OCCURRED: {listener}
})


@dataclass
class MyConfig(EnvConfig):
i: int

def to_dict(self) -> dict:
pass

@staticmethod
def from_dict(data):
pass


@delayed_config
class MyEnv(EnvironmentBase):
def __init__(self, config: MyConfig) -> None:
super().__init__()
self._config = config

def update_config(self, config: EnvConfig) -> None:
assert isinstance(config, MyConfig)
self._logger.debug("dupa %r", self._event_listeners)
if self._status != EnvStatus.DISABLED:
raise ValueError
self._config = config
self._config_updated(config)

def config(self) -> MyConfig:
return self._config

@classmethod
def supported(cls):
raise NotImplementedError

def prepare(self):
raise NotImplementedError

def clean_up(self):
raise NotImplementedError

def run_benchmark(self):
raise NotImplementedError

def parse_prerequisites(self, prerequisites_dict):
raise NotImplementedError

def install_prerequisites(self, prerequisites):
raise NotImplementedError

def parse_config(self, config_dict):
raise NotImplementedError

def supported_usage_counters(self):
raise NotImplementedError

def runtime(self, payload, config=None):
raise NotImplementedError


def execute(f, *args, **kwargs):
try:
return defer.succeed(f(*args, **kwargs))
except Exception as exc: # pylint: disable=broad-except
return defer.fail(exc)


@patch('golem.envs.deferToThread', execute)
class TestDelayedConfig(TestCase):

def setUp(self) -> None:
config = MyConfig(i=1)
self.env = MyEnv(config)

def test_update_config_when_disabled(self):
self.env.update_config(MyConfig(i=2))
self.assertEqual(self.env.config().i, 2)

def test_update_config_when_enabled(self):
self.env._env_enabled()
self.env.update_config(MyConfig(i=2))
self.assertEqual(self.env.config().i, 1)
self.env._env_disabled()
self.assertEqual(self.env.config().i, 2)
Loading

0 comments on commit cda6a9f

Please sign in to comment.