Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Environment wrapper for dumping output logs
Browse files Browse the repository at this point in the history
Signed-off-by: Adam Wierzbicki <awierzbicki@golem.network>
  • Loading branch information
Wiezzel committed Nov 8, 2019
1 parent b419c7f commit f57f12c
Show file tree
Hide file tree
Showing 15 changed files with 233 additions and 31 deletions.
20 changes: 11 additions & 9 deletions golem/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from calendar import timegm
from datetime import datetime
from functools import wraps
from typing import Any, Callable, cast, List, TypeVar
from pathlib import Path
from typing import Any, Callable, cast, List, TypeVar, Optional

import pytz

Expand Down Expand Up @@ -238,6 +239,12 @@ def wrapper(*args, **kwargs):
return decorator


def get_log_dir(data_dir: Optional[str] = None) -> Path:
if data_dir is None:
data_dir = simpleenv.get_local_datadir("default")
return Path(data_dir) / 'logs'


# pylint: disable=too-many-branches,too-many-locals
def config_logging(
suffix='',
Expand All @@ -252,9 +259,7 @@ def config_logging(
except ImportError:
from loggingconfig import LOGGING

if datadir is None:
datadir = simpleenv.get_local_datadir("default")
logdir_path = os.path.join(datadir, 'logs')
logdir_path = get_log_dir(datadir)

for formatter in LOGGING.get('formatters', {}).values():
formatter['format'] = f"{formatter_prefix}{formatter['format']}"
Expand Down Expand Up @@ -284,14 +289,11 @@ def config_logging(
LOGGING['loggers']['twisted']['level'] = 'WARNING'

try:
if not os.path.exists(logdir_path):
os.makedirs(logdir_path)
logdir_path.mkdir(parents=True, exist_ok=True)

logging.config.dictConfig(LOGGING)
except (ValueError, PermissionError) as e:
sys.stderr.write(
"Can't configure logging in: {} Got: {}\n".format(logdir_path, e)
)
sys.stderr.write(f"Can't configure logging in {logdir_path} Got: {e}\n")
return # Avoid consequent errors
logging.captureWarnings(True)

Expand Down
12 changes: 11 additions & 1 deletion golem/envs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
CounterUsage = Any

EnvId = str
RuntimeId = str


class RuntimeEventType(Enum):
Expand Down Expand Up @@ -143,7 +144,10 @@ def __exit__(self, *_, **__) -> None:
self.close()


class RuntimeOutput(Iterable[Union[str, bytes]], ABC):
RuntimeOutput = Iterable[Union[str, bytes]]


class RuntimeOutputBase(RuntimeOutput, ABC):
""" A handle for reading output (either stdout or stderr) from a running
Runtime. Yielded items are output lines. Output could be either raw
(bytes) or decoded (str). """
Expand All @@ -161,6 +165,12 @@ class Runtime(ABC):
""" A runnable object representing some particular computation. Tied to a
particular Environment that was used to create this object. """

@abstractmethod
def id(self) -> Optional[RuntimeId]:
""" Get unique identifier of this Runtime. Might not be available if the
Runtime is not yet prepared. """
raise NotImplementedError

@abstractmethod
def prepare(self) -> Deferred:
""" Prepare the Runtime to be started. Assumes current status is
Expand Down
7 changes: 6 additions & 1 deletion golem/envs/docker/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
EnvSupportStatus,
Prerequisites,
RuntimeBase,
RuntimeId,
RuntimeInput,
RuntimeOutput,
RuntimeOutputBase,
RuntimePayload,
RuntimeStatus,
BenchmarkResult)
Expand Down Expand Up @@ -74,7 +76,7 @@ def from_dict(cls, data: Dict[str, Any]) -> 'DockerCPUConfig':
return cls(work_dirs=work_dirs, **data)


class DockerOutput(RuntimeOutput):
class DockerOutput(RuntimeOutputBase):

def __init__(
self, raw_output: Iterable[bytes], encoding: Optional[str] = None
Expand Down Expand Up @@ -231,6 +233,9 @@ def _update_status_loop(self) -> None:
self._logger.info("Runtime is no longer running. "
"Stopping status update thread.")

def id(self) -> Optional[RuntimeId]:
return self._container_id

def prepare(self) -> Deferred:
self._change_status(
from_status=RuntimeStatus.CREATED,
Expand Down
4 changes: 4 additions & 0 deletions golem/envs/wrappers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Runtime,
RuntimeEventListener,
RuntimeEventType,
RuntimeId,
RuntimeInput,
RuntimeOutput,
RuntimePayload,
Expand All @@ -34,6 +35,9 @@ class RuntimeWrapper(Runtime):
def __init__(self, runtime: Runtime) -> None:
self._runtime = runtime

def id(self) -> Optional[RuntimeId]:
return self._runtime.id()

def prepare(self) -> Deferred:
return self._runtime.prepare()

Expand Down
2 changes: 1 addition & 1 deletion golem/envs/wrappers/auto_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
Runtime,
RuntimePayload,
)
from . import EnvironmentWrapper, RuntimeWrapper
from golem.envs.wrappers import EnvironmentWrapper, RuntimeWrapper


class RuntimeSetupWrapper(RuntimeWrapper):
Expand Down
98 changes: 98 additions & 0 deletions golem/envs/wrappers/dump_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import logging
from pathlib import Path
from threading import Thread
from typing import Optional

from twisted.internet.defer import Deferred, inlineCallbacks

from golem.envs import (
EnvConfig,
Environment,
Runtime,
RuntimeOutput,
RuntimePayload,
)
from golem.envs.wrappers import EnvironmentWrapper, RuntimeWrapper

logger = logging.getLogger(__name__)


class RuntimeLogsWrapper(RuntimeWrapper):

def __init__(
self,
runtime: Runtime,
logs_dir: Path,
encoding: str = 'utf-8'
) -> None:
super().__init__(runtime)
self._logs_dir = logs_dir
self._encoding = encoding
self._stdout_thread: Optional[Thread] = None
self._stderr_thread: Optional[Thread] = None

def _dump_output(self, output: RuntimeOutput, path: Path) -> None:
logger.info('Dumping runtime output to %r', path)
with path.open(mode='w', encoding=self._encoding) as file:
file.writelines(output)

@inlineCallbacks
def prepare(self) -> Deferred:
yield super().prepare()
stdout_file = self._logs_dir / f'{self._runtime.id()}_stdout.txt'
stderr_file = self._logs_dir / f'{self._runtime.id()}_stderr.txt'
stdout = self._runtime.stdout(self._encoding)
stderr = self._runtime.stderr(self._encoding)
self._stdout_thread = Thread(
target=self._dump_output,
args=(stdout, stdout_file))
self._stderr_thread = Thread(
target=self._dump_output,
args=(stderr, stderr_file))
self._stdout_thread.start()
self._stderr_thread.start()

@inlineCallbacks
def clean_up(self) -> Deferred:
assert self._stdout_thread is not None
assert self._stderr_thread is not None
yield super().clean_up()
self._stdout_thread.join(5)
if self._stdout_thread.is_alive():
logger.warning('Cannot join stdout thread')
self._stderr_thread.join(5)
if self._stderr_thread.is_alive():
logger.warning('Cannot join stderr thread')


class EnvironmentLogsWrapper(EnvironmentWrapper):

def __init__(
self,
env: Environment,
logs_dir: Path,
encoding: str = 'utf-8'
) -> None:
super().__init__(env)
self._logs_dir = logs_dir
self._encoding = encoding

def runtime(
self,
payload: RuntimePayload,
config: Optional[EnvConfig] = None
) -> Runtime:
runtime = super().runtime(payload, config)
return RuntimeLogsWrapper(runtime, self._logs_dir, self._encoding)


def dump_logs(
env: Environment,
logs_dir: Path,
encoding: str = 'utf-8'
) -> Environment:
return EnvironmentLogsWrapper(
env=env,
logs_dir=logs_dir,
encoding=encoding
)
30 changes: 26 additions & 4 deletions golem/task/envmanager.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import logging
from pathlib import Path
from typing import Dict, List, Type, Optional

from dataclasses import dataclass
from peewee import PeeweeException
from twisted.internet.defer import Deferred, inlineCallbacks, DeferredLock

from golem.envs import BenchmarkResult, EnvId, Environment, EnvMetadata
from golem.envs.wrappers.auto_setup import auto_setup
from golem.envs import (
BenchmarkResult,
EnvId,
Environment,
EnvMetadata,
)
from golem.envs.wrappers import auto_setup, dump_logs
from golem.model import Performance, EnvConfiguration
from golem.task.task_api import TaskApiPayloadBuilder

Expand All @@ -23,7 +29,8 @@ class EnvEntry:
metadata: EnvMetadata
payload_builder: Type[TaskApiPayloadBuilder]

def __init__(self):
def __init__(self, runtime_logs_dir: Path) -> None:
self._runtime_logs_dir = runtime_logs_dir
self._envs: Dict[EnvId, EnvironmentManager.EnvEntry] = {}
self._state = EnvStates()
self._running_benchmark: bool = False
Expand Down Expand Up @@ -67,7 +74,22 @@ def register_env(
""" Register an Environment (i.e. make it visible to manager). """
if metadata.id in self._envs:
raise ValueError(f"Environment '{metadata.id}' already registered.")
wrapped_env = auto_setup(env, self._start_usage, self._end_usage)

# Apply automatic setup wrapper
wrapped_env = auto_setup.auto_setup(
env=env,
start_usage=self._start_usage,
end_usage=self._end_usage
)

# Apply runtime logs wrapper
logs_dir = self._runtime_logs_dir / metadata.id
logs_dir.mkdir(parents=True, exist_ok=True)
wrapped_env = dump_logs.dump_logs(
env=wrapped_env,
logs_dir=logs_dir
)

self._envs[metadata.id] = EnvironmentManager.EnvEntry(
instance=wrapped_env,
metadata=metadata,
Expand Down
5 changes: 4 additions & 1 deletion golem/task/task_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ async def start(self, command: str, port: int) -> Tuple[str, int]:
)
self._runtime = self._env.runtime(runtime_payload)
loop = asyncio.get_event_loop()
await self._runtime.prepare().asFuture(loop)
d = self._runtime.prepare()
f = d.asFuture(loop)
await f
# await self._runtime.prepare().asFuture(loop)
await self._runtime.start().asFuture(loop)
return self._runtime.get_port_mapping(port)

Expand Down
5 changes: 3 additions & 2 deletions golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from golem.apps import manager as app_manager
from golem.apps.default import save_built_in_app_definitions
from golem.clientconfigdescriptor import ClientConfigDescriptor
from golem.core.common import short_node_id, deadline_to_timeout
from golem.core.common import short_node_id, deadline_to_timeout, get_log_dir
from golem.core.deferred import (
asyncio_main_loop,
deferred_from_future,
Expand Down Expand Up @@ -129,7 +129,8 @@ def __init__(self,

Path(self.get_task_computer_root()).mkdir(parents=True, exist_ok=True)

new_env_manager = EnvironmentManager()
runtime_logs_dir = get_log_dir(client.datadir)
new_env_manager = EnvironmentManager(runtime_logs_dir)
register_built_in_repositories()
register_environments(
work_dir=self.get_task_computer_root(),
Expand Down
4 changes: 3 additions & 1 deletion scripts/task_api_tests/basic_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ async def test_task(
app_manager.register_app(app_definition)
app_manager.set_enabled(app_definition.id, True)

env_manager = envmanager.EnvironmentManager()
runtime_logs_dir = work_dir / 'runtime_logs'
runtime_logs_dir.mkdir()
env_manager = envmanager.EnvironmentManager(runtime_logs_dir)
register_built_in_repositories()
register_environments(
work_dir=str(work_dir),
Expand Down
Loading

0 comments on commit f57f12c

Please sign in to comment.