Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

0.19 hyperg port #4093

Merged
merged 8 commits into from
Apr 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion golem/appconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@
DISALLOW_ID_MAX_TIMES = 1
DISALLOW_IP_MAX_TIMES = 1

DEFAULT_HYPERDRIVE_PORT = 3282
DEFAULT_HYPERDRIVE_ADDRESS = None
DEFAULT_HYPERDRIVE_RPC_PORT = 3292
DEFAULT_HYPERDRIVE_RPC_ADDRESS = 'localhost'


class NodeConfig:

Expand Down Expand Up @@ -190,10 +195,21 @@ def load_config(cls, datadir, cfg_file_name=CONFIG_FILENAME):
disallow_ip_timeout_seconds=DISALLOW_IP_TIMEOUT_SECONDS,
disallow_id_max_times=DISALLOW_ID_MAX_TIMES,
disallow_ip_max_times=DISALLOW_IP_MAX_TIMES,
#hyperg
hyperdrive_port=DEFAULT_HYPERDRIVE_PORT,
hyperdrive_address=DEFAULT_HYPERDRIVE_ADDRESS,
hyperdrive_rpc_port=DEFAULT_HYPERDRIVE_RPC_PORT,
hyperdrive_rpc_address=DEFAULT_HYPERDRIVE_RPC_ADDRESS,
)

cfg = SimpleConfig(node_config, cfg_file, keep_old=False)
return AppConfig(cfg, cfg_file)
return cls(cfg, cfg_file)

def __repr__(self):
return '<{}: {}>'.format(self.__class__, {
prop: self.get_node_property(prop)()
for prop in self._cfg.get_node_config().prop_names
})

def __init__(self, cfg, config_file):
self.config_file = config_file
Expand Down
23 changes: 21 additions & 2 deletions golem/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,22 @@ def start_network(self):

logger.info("Starting resource server ...")

self.daemon_manager = HyperdriveDaemonManager(self.datadir)
self.daemon_manager = HyperdriveDaemonManager(
self.datadir,
daemon_config={
k: v for k, v in {
'host': self.config_desc.hyperdrive_address,
'port': self.config_desc.hyperdrive_port,
'rpc_host': self.config_desc.hyperdrive_rpc_address,
'rpc_port': self.config_desc.hyperdrive_rpc_port,
}.items()
if v is not None
},
client_config={
'port': self.config_desc.hyperdrive_rpc_port,
'host': self.config_desc.hyperdrive_rpc_address,
}
)
self.daemon_manager.start()

hyperdrive_addrs = self.daemon_manager.public_addresses(
Expand All @@ -425,7 +440,11 @@ def start_network(self):

resource_manager = HyperdriveResourceManager(
dir_manager=dir_manager,
daemon_address=hyperdrive_addrs
daemon_address=hyperdrive_addrs,
client_kwargs={
'host': self.config_desc.hyperdrive_rpc_address,
'port': self.config_desc.hyperdrive_rpc_port,
},
)
self.resource_server = BaseResourceServer(
resource_manager=resource_manager,
Expand Down
9 changes: 9 additions & 0 deletions golem/clientconfigdescriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ def __init__(self):
self.disallow_id_max_times = 1
self.disallow_ip_max_times = 1

self.hyperdrive_port: typing.Optional[int] = None
self.hyperdrive_address: typing.Optional[str] = None
self.hyperdrive_rpc_port: typing.Optional[int] = None
self.hyperdrive_rpc_address: typing.Optional[str] = None

def __repr__(self):
return '{}: {}'.format(self.__class__, {
v: getattr(self, v) for v in vars(self)})

def init_from_app_config(self, app_config):
"""Initializes config parameters based on the specified AppConfig
:param app_config: instance of AppConfig
Expand Down
2 changes: 1 addition & 1 deletion golem/docker/task_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def _run_docker_job(self) -> Optional[int]:
devices = None
runtime = None

assert self.docker_manager is not None
assert self.docker_manager is not None, "Docker Manager undefined"
# PyLint still thinks docker_manager is of type DockerConfigManager
# pylint: disable=no-member
host_config = self.docker_manager.get_host_config_for_task(binds)
Expand Down
16 changes: 8 additions & 8 deletions golem/network/hyperdrive/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
log = logging.getLogger(__name__)


DEFAULT_HYPERDRIVE_PORT = 3282
DEFAULT_HYPERDRIVE_RPC_PORT = 3292


def to_hyperg_peer(host: str, port: int) -> Dict[str, Tuple[str, int]]:
return {'TCP': (host, port)}

Expand All @@ -35,12 +31,14 @@ def round_timeout(value: Optional[Union[int, float]]) -> Optional[int]:


class HyperdriveClient(IClient):
"""
Enables communication between Golem and the Hyperdrive service.
"""

CLIENT_ID = 'hyperg'
VERSION = 1.1

def __init__(self, port=DEFAULT_HYPERDRIVE_RPC_PORT,
host='localhost', timeout=None):
def __init__(self, port, host, timeout=None):
super(HyperdriveClient, self).__init__()

# API destination address
Expand All @@ -53,6 +51,9 @@ def __init__(self, port=DEFAULT_HYPERDRIVE_RPC_PORT,
self._url = 'http://{}:{}/api'.format(self.host, self.port)
self._headers = {'content-type': 'application/json'}

def __repr__(self):
return f'<{self.__class__.__name__} {self.CLIENT_ID} at {self._url}>'

@classmethod
def build_options(cls, peers=None, **kwargs):
return HyperdriveClientOptions(cls.CLIENT_ID, cls.VERSION,
Expand Down Expand Up @@ -145,8 +146,7 @@ def _request(self, **data):

class HyperdriveAsyncClient(HyperdriveClient):

def __init__(self, port=DEFAULT_HYPERDRIVE_RPC_PORT, host='localhost',
timeout=None):
def __init__(self, port, host, timeout=None):
from twisted.web.http_headers import Headers # imports reactor

super().__init__(port, host, timeout)
Expand Down
47 changes: 35 additions & 12 deletions golem/network/hyperdrive/daemon_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import itertools
import logging
import os
import subprocess
Expand All @@ -18,18 +19,23 @@


GOLEM_HYPERDRIVE_VERSION = '0.2.4'
GOLEM_HYPERDRIVE_LOGFILE = 'hyperg.log'


class HyperdriveDaemonManager(object):

_executable = 'hyperg'
_min_version = semantic_version.Version(GOLEM_HYPERDRIVE_VERSION)

def __init__(self, datadir, **hyperdrive_config):
super(HyperdriveDaemonManager, self).__init__()

def __init__(
self,
datadir,
daemon_config: Optional[dict] = None,
client_config: Optional[dict] = None
) -> None:
self._addresses = None
self._config = hyperdrive_config
self._config = client_config or {}
self._daemon_config = daemon_config or {}

# monitor and restart if process dies
self._monitor = ProcessMonitor()
Expand All @@ -43,17 +49,28 @@ def __init__(self, datadir, **hyperdrive_config):
logger.warning("Creating HyperG logsdir: %s", logsdir)
os.makedirs(logsdir)

self._daemon_config.update(
db=self._dir,
logfile=os.path.join(logsdir, GOLEM_HYPERDRIVE_LOGFILE)
)

self._command = [
self._executable,
'--db', self._dir,
'--logfile', os.path.join(logsdir, "hyperg.log"),
]
] + list(itertools.chain.from_iterable(
[('--' + k, str(v)) for k, v in self._daemon_config.items()]
))

def __str__(self):
return self._executable

def addresses(self):
def addresses(self, suppress_warning=False):
try:
return self._get_addresses()
except requests.ConnectionError:
logger.warning('Cannot connect to Hyperdrive daemon')
if not suppress_warning:
import traceback
traceback.print_stack()
logger.warning('Cannot connect to Hyperdrive daemon')
return dict()

def version(self) -> Optional[semantic_version.Version]:
Expand Down Expand Up @@ -105,11 +122,13 @@ def stop(self, *_):

@report_calls(Component.hyperdrive, 'instance.connect')
def _start(self, *_):
self._check_version()
version = self._check_version()

# do not supervise already running processes
addresses = self.addresses()
addresses = self.addresses(suppress_warning=True)
if addresses:
logger.info("%s %s already started. addresses=%s",
self._executable, version, addresses)
return

process = self._create_sub()
Expand All @@ -120,12 +139,16 @@ def _start(self, *_):
else:
raise RuntimeError("Cannot start {}".format(self._executable))

logger.info("%s %s started. Listening on %s.",
self._executable, version, self.addresses())

@report_calls(Component.hyperdrive, 'instance.version')
def _check_version(self):
version = self.version()
if version < self._min_version:
raise RuntimeError('HyperG version {} is required'
.format(self._min_version))
return version

@report_calls(Component.hyperdrive, 'instance.check')
def _create_sub(self):
Expand All @@ -141,7 +164,7 @@ def _wait(self, timeout: int = 10):
deadline = time.time() + timeout

while time.time() < deadline:
addresses = self.addresses()
addresses = self.addresses(suppress_warning=True)
if addresses:
return
time.sleep(1.)
Expand Down
15 changes: 11 additions & 4 deletions golem/resource/hyperdrive/resourcesmanager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

import os
import typing
from collections import Iterable, Sized
from functools import partial
from twisted.internet.defer import Deferred
Expand Down Expand Up @@ -74,12 +75,18 @@ def log_error(msg, exc):

class HyperdriveResourceManager(ClientHandler):

def __init__(self, dir_manager, daemon_address=None, config=None,
resource_dir_method=None):

def __init__( # noqa pylint: disable=too-many-arguments
self, dir_manager, daemon_address=None, config=None, # noqa pylint: disable=unused-argument
resource_dir_method=None,
client_kwargs: typing.Optional[dict] = None,
) -> None:
super().__init__(config)

self.client = HyperdriveAsyncClient(**self.config.client)
self.client = HyperdriveAsyncClient( # type: ignore
**self.config.client, **(client_kwargs or {}))
logger.info("Initializing %s, using %s",
self.__class__.__name__, self.client)

self.storage = ResourceStorage(dir_manager, resource_dir_method or
dir_manager.get_task_resource_dir)

Expand Down
11 changes: 10 additions & 1 deletion golem/task/taskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

from apps.appsmanager import AppsManager
from apps.core.task.coretask import CoreTask

from golem.clientconfigdescriptor import ClientConfigDescriptor
from golem.core.common import get_timestamp_utc, HandleForwardedError, \
HandleKeyError, node_info_str, short_node_id, to_unicode, update_dict
from golem.manager.nodestatesnapshot import LocalTaskStateSnapshot
Expand Down Expand Up @@ -82,8 +84,11 @@ class AlreadyRestartedError(Error):

def __init__(
self, node, keys_auth, root_path,
config_desc: ClientConfigDescriptor,
tasks_dir="tasks", task_persistence=True,
apps_manager=AppsManager(), finished_cb=None):
apps_manager=AppsManager(),
finished_cb=None,
):
super().__init__()

self.apps_manager = apps_manager
Expand All @@ -110,6 +115,10 @@ def __init__(
resource_manager = HyperdriveResourceManager(
self.dir_manager,
resource_dir_method=self.dir_manager.get_task_temporary_dir,
client_kwargs={
'host': config_desc.hyperdrive_rpc_address,
'port': config_desc.hyperdrive_rpc_port,
},
)
self.task_result_manager = EncryptedResultPackageManager(
resource_manager
Expand Down
1 change: 1 addition & 0 deletions golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def __init__(self,
self.node,
self.keys_auth,
root_path=TaskServer.__get_task_manager_root(client.datadir),
config_desc=config_desc,
tasks_dir=os.path.join(client.datadir, 'tasks'),
apps_manager=apps_manager,
finished_cb=task_finished_cb,
Expand Down
1 change: 1 addition & 0 deletions golem/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class TempDirFixture(unittest.TestCase):

@classmethod
def setUpClass(cls):
super().setUpClass()
logging.basicConfig(level=logging.DEBUG)
if cls.root_dir is None:
if is_osx():
Expand Down
10 changes: 9 additions & 1 deletion golemapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ def monkey_patched_getLogger(*args, **kwargs):
]),
help="Change level for Golem loggers and handlers")
@click.option('--enable-talkback', is_flag=True, default=None)
@click.option('--hyperdrive-port', type=int, help="Hyperdrive public port")
@click.option('--hyperdrive-rpc-port', type=int, help="Hyperdrive RPC port")
# Python flags, needed by crossbar (package only)
@click.option('-m', nargs=1, default=None)
@click.option('--node', expose_value=False)
Expand All @@ -119,7 +121,9 @@ def monkey_patched_getLogger(*args, **kwargs):
def start( # pylint: disable=too-many-arguments, too-many-locals
monitor, concent, datadir, node_address, rpc_address, peer, mainnet,
net, geth_address, password, accept_terms, accept_concent_terms,
accept_all_terms, version, log_level, enable_talkback, m):
accept_all_terms, version, log_level, enable_talkback, m,
hyperdrive_port, hyperdrive_rpc_port,
):

freeze_support()
delete_reactor()
Expand Down Expand Up @@ -161,6 +165,10 @@ def _start():
config_desc.rpc_port = rpc_address.port
if node_address:
config_desc.node_address = node_address
if hyperdrive_port:
config_desc.hyperdrive_port = hyperdrive_port
if hyperdrive_rpc_port:
config_desc.hyperdrive_rpc_port = hyperdrive_rpc_port

# Golem headless
install_reactor()
Expand Down
Loading