diff --git a/golem/core/common.py b/golem/core/common.py index f2289cb00b..3741452e8c 100644 --- a/golem/core/common.py +++ b/golem/core/common.py @@ -7,7 +7,8 @@ from calendar import timegm from datetime import datetime from functools import wraps -from typing import Any, Callable, cast, List, TypeVar +from pathlib import Path +from typing import Any, Callable, cast, List, TypeVar, Optional import pytz @@ -238,6 +239,12 @@ def wrapper(*args, **kwargs): return decorator +def get_log_dir(data_dir: Optional[str] = None) -> Path: + if data_dir is None: + data_dir = simpleenv.get_local_datadir("default") + return Path(data_dir) / 'logs' + + # pylint: disable=too-many-branches,too-many-locals def config_logging( suffix='', @@ -252,9 +259,7 @@ def config_logging( except ImportError: from loggingconfig import LOGGING - if datadir is None: - datadir = simpleenv.get_local_datadir("default") - logdir_path = os.path.join(datadir, 'logs') + logdir_path = get_log_dir(datadir) for formatter in LOGGING.get('formatters', {}).values(): formatter['format'] = f"{formatter_prefix}{formatter['format']}" @@ -284,14 +289,11 @@ def config_logging( LOGGING['loggers']['twisted']['level'] = 'WARNING' try: - if not os.path.exists(logdir_path): - os.makedirs(logdir_path) + logdir_path.mkdir(parents=True, exist_ok=True) logging.config.dictConfig(LOGGING) except (ValueError, PermissionError) as e: - sys.stderr.write( - "Can't configure logging in: {} Got: {}\n".format(logdir_path, e) - ) + sys.stderr.write(f"Can't configure logging in {logdir_path} Got: {e}\n") return # Avoid consequent errors logging.captureWarnings(True) diff --git a/golem/envs/__init__.py b/golem/envs/__init__.py index 883644ecd7..a743f4f866 100644 --- a/golem/envs/__init__.py +++ b/golem/envs/__init__.py @@ -20,6 +20,7 @@ CounterUsage = Any EnvId = str +RuntimeId = str class RuntimeEventType(Enum): @@ -143,7 +144,10 @@ def __exit__(self, *_, **__) -> None: self.close() -class RuntimeOutput(Iterable[Union[str, bytes]], ABC): +RuntimeOutput = Iterable[Union[str, bytes]] + + +class RuntimeOutputBase(RuntimeOutput, ABC): """ A handle for reading output (either stdout or stderr) from a running Runtime. Yielded items are output lines. Output could be either raw (bytes) or decoded (str). """ @@ -161,6 +165,12 @@ class Runtime(ABC): """ A runnable object representing some particular computation. Tied to a particular Environment that was used to create this object. """ + @abstractmethod + def id(self) -> Optional[RuntimeId]: + """ Get unique identifier of this Runtime. Might not be available if the + Runtime is not yet prepared. """ + raise NotImplementedError + @abstractmethod def prepare(self) -> Deferred: """ Prepare the Runtime to be started. Assumes current status is diff --git a/golem/envs/docker/cpu.py b/golem/envs/docker/cpu.py index ce7d933c67..e2ed396113 100644 --- a/golem/envs/docker/cpu.py +++ b/golem/envs/docker/cpu.py @@ -34,8 +34,10 @@ EnvSupportStatus, Prerequisites, RuntimeBase, + RuntimeId, RuntimeInput, RuntimeOutput, + RuntimeOutputBase, RuntimePayload, RuntimeStatus, BenchmarkResult) @@ -74,7 +76,7 @@ def from_dict(cls, data: Dict[str, Any]) -> 'DockerCPUConfig': return cls(work_dirs=work_dirs, **data) -class DockerOutput(RuntimeOutput): +class DockerOutput(RuntimeOutputBase): def __init__( self, raw_output: Iterable[bytes], encoding: Optional[str] = None @@ -231,6 +233,9 @@ def _update_status_loop(self) -> None: self._logger.info("Runtime is no longer running. " "Stopping status update thread.") + def id(self) -> Optional[RuntimeId]: + return self._container_id + def prepare(self) -> Deferred: self._change_status( from_status=RuntimeStatus.CREATED, diff --git a/golem/envs/wrappers/__init__.py b/golem/envs/wrappers/__init__.py index d0d608d6cc..673827b344 100644 --- a/golem/envs/wrappers/__init__.py +++ b/golem/envs/wrappers/__init__.py @@ -20,6 +20,7 @@ Runtime, RuntimeEventListener, RuntimeEventType, + RuntimeId, RuntimeInput, RuntimeOutput, RuntimePayload, @@ -34,6 +35,9 @@ class RuntimeWrapper(Runtime): def __init__(self, runtime: Runtime) -> None: self._runtime = runtime + def id(self) -> Optional[RuntimeId]: + return self._runtime.id() + def prepare(self) -> Deferred: return self._runtime.prepare() diff --git a/golem/envs/wrappers/auto_setup.py b/golem/envs/wrappers/auto_setup.py index 0557262c90..8140755bf4 100644 --- a/golem/envs/wrappers/auto_setup.py +++ b/golem/envs/wrappers/auto_setup.py @@ -14,7 +14,7 @@ Runtime, RuntimePayload, ) -from . import EnvironmentWrapper, RuntimeWrapper +from golem.envs.wrappers import EnvironmentWrapper, RuntimeWrapper class RuntimeSetupWrapper(RuntimeWrapper): diff --git a/golem/envs/wrappers/dump_logs.py b/golem/envs/wrappers/dump_logs.py new file mode 100644 index 0000000000..4017e7cc0d --- /dev/null +++ b/golem/envs/wrappers/dump_logs.py @@ -0,0 +1,98 @@ +import logging +from pathlib import Path +from threading import Thread +from typing import Optional + +from twisted.internet.defer import Deferred, inlineCallbacks + +from golem.envs import ( + EnvConfig, + Environment, + Runtime, + RuntimeOutput, + RuntimePayload, +) +from golem.envs.wrappers import EnvironmentWrapper, RuntimeWrapper + +logger = logging.getLogger(__name__) + + +class RuntimeLogsWrapper(RuntimeWrapper): + + def __init__( + self, + runtime: Runtime, + logs_dir: Path, + encoding: str = 'utf-8' + ) -> None: + super().__init__(runtime) + self._logs_dir = logs_dir + self._encoding = encoding + self._stdout_thread: Optional[Thread] = None + self._stderr_thread: Optional[Thread] = None + + def _dump_output(self, output: RuntimeOutput, path: Path) -> None: + logger.info('Dumping runtime output to %r', path) + with path.open(mode='w', encoding=self._encoding) as file: + file.writelines(output) + + @inlineCallbacks + def prepare(self) -> Deferred: + yield super().prepare() + stdout_file = self._logs_dir / f'{self._runtime.id()}_stdout.txt' + stderr_file = self._logs_dir / f'{self._runtime.id()}_stderr.txt' + stdout = self._runtime.stdout(self._encoding) + stderr = self._runtime.stderr(self._encoding) + self._stdout_thread = Thread( + target=self._dump_output, + args=(stdout, stdout_file)) + self._stderr_thread = Thread( + target=self._dump_output, + args=(stderr, stderr_file)) + self._stdout_thread.start() + self._stderr_thread.start() + + @inlineCallbacks + def clean_up(self) -> Deferred: + assert self._stdout_thread is not None + assert self._stderr_thread is not None + yield super().clean_up() + self._stdout_thread.join(5) + if self._stdout_thread.is_alive(): + logger.warning('Cannot join stdout thread') + self._stderr_thread.join(5) + if self._stderr_thread.is_alive(): + logger.warning('Cannot join stderr thread') + + +class EnvironmentLogsWrapper(EnvironmentWrapper): + + def __init__( + self, + env: Environment, + logs_dir: Path, + encoding: str = 'utf-8' + ) -> None: + super().__init__(env) + self._logs_dir = logs_dir + self._encoding = encoding + + def runtime( + self, + payload: RuntimePayload, + config: Optional[EnvConfig] = None + ) -> Runtime: + runtime = super().runtime(payload, config) + return RuntimeLogsWrapper(runtime, self._logs_dir, self._encoding) + + +def dump_logs( + env: Environment, + logs_dir: Path, + encoding: str = 'utf-8' +) -> Environment: + return EnvironmentLogsWrapper( + env=env, + logs_dir=logs_dir, + encoding=encoding + ) diff --git a/golem/task/envmanager.py b/golem/task/envmanager.py index ddc3a226dc..dc26d53699 100644 --- a/golem/task/envmanager.py +++ b/golem/task/envmanager.py @@ -1,12 +1,18 @@ import logging +from pathlib import Path from typing import Dict, List, Type, Optional from dataclasses import dataclass from peewee import PeeweeException from twisted.internet.defer import Deferred, inlineCallbacks, DeferredLock -from golem.envs import BenchmarkResult, EnvId, Environment, EnvMetadata -from golem.envs.wrappers.auto_setup import auto_setup +from golem.envs import ( + BenchmarkResult, + EnvId, + Environment, + EnvMetadata, +) +from golem.envs.wrappers import auto_setup, dump_logs from golem.model import Performance, EnvConfiguration from golem.task.task_api import TaskApiPayloadBuilder @@ -23,7 +29,8 @@ class EnvEntry: metadata: EnvMetadata payload_builder: Type[TaskApiPayloadBuilder] - def __init__(self): + def __init__(self, runtime_logs_dir: Path) -> None: + self._runtime_logs_dir = runtime_logs_dir self._envs: Dict[EnvId, EnvironmentManager.EnvEntry] = {} self._state = EnvStates() self._running_benchmark: bool = False @@ -67,7 +74,22 @@ def register_env( """ Register an Environment (i.e. make it visible to manager). """ if metadata.id in self._envs: raise ValueError(f"Environment '{metadata.id}' already registered.") - wrapped_env = auto_setup(env, self._start_usage, self._end_usage) + + # Apply automatic setup wrapper + wrapped_env = auto_setup.auto_setup( + env=env, + start_usage=self._start_usage, + end_usage=self._end_usage + ) + + # Apply runtime logs wrapper + logs_dir = self._runtime_logs_dir / metadata.id + logs_dir.mkdir(parents=True, exist_ok=True) + wrapped_env = dump_logs.dump_logs( + env=wrapped_env, + logs_dir=logs_dir + ) + self._envs[metadata.id] = EnvironmentManager.EnvEntry( instance=wrapped_env, metadata=metadata, diff --git a/golem/task/task_api/__init__.py b/golem/task/task_api/__init__.py index e36cad33f9..c003460680 100644 --- a/golem/task/task_api/__init__.py +++ b/golem/task/task_api/__init__.py @@ -51,7 +51,10 @@ async def start(self, command: str, port: int) -> Tuple[str, int]: ) self._runtime = self._env.runtime(runtime_payload) loop = asyncio.get_event_loop() - await self._runtime.prepare().asFuture(loop) + d = self._runtime.prepare() + f = d.asFuture(loop) + await f + # await self._runtime.prepare().asFuture(loop) await self._runtime.start().asFuture(loop) return self._runtime.get_port_mapping(port) diff --git a/golem/task/taskserver.py b/golem/task/taskserver.py index eae4029bee..1c35e945af 100644 --- a/golem/task/taskserver.py +++ b/golem/task/taskserver.py @@ -37,7 +37,7 @@ from golem.apps import manager as app_manager from golem.apps.default import save_built_in_app_definitions from golem.clientconfigdescriptor import ClientConfigDescriptor -from golem.core.common import short_node_id, deadline_to_timeout +from golem.core.common import short_node_id, deadline_to_timeout, get_log_dir from golem.core.deferred import ( asyncio_main_loop, deferred_from_future, @@ -129,7 +129,8 @@ def __init__(self, Path(self.get_task_computer_root()).mkdir(parents=True, exist_ok=True) - new_env_manager = EnvironmentManager() + runtime_logs_dir = get_log_dir(client.datadir) + new_env_manager = EnvironmentManager(runtime_logs_dir) register_built_in_repositories() register_environments( work_dir=self.get_task_computer_root(), diff --git a/scripts/task_api_tests/basic_integration.py b/scripts/task_api_tests/basic_integration.py index 456c64988b..b459a72be0 100644 --- a/scripts/task_api_tests/basic_integration.py +++ b/scripts/task_api_tests/basic_integration.py @@ -38,7 +38,9 @@ async def test_task( app_manager.register_app(app_definition) app_manager.set_enabled(app_definition.id, True) - env_manager = envmanager.EnvironmentManager() + runtime_logs_dir = work_dir / 'runtime_logs' + runtime_logs_dir.mkdir() + env_manager = envmanager.EnvironmentManager(runtime_logs_dir) register_built_in_repositories() register_environments( work_dir=str(work_dir), diff --git a/tests/golem/envs/localhost.py b/tests/golem/envs/localhost.py index 3a3a8c6717..30f46b177b 100644 --- a/tests/golem/envs/localhost.py +++ b/tests/golem/envs/localhost.py @@ -2,12 +2,14 @@ import logging import multiprocessing import signal +import uuid from pathlib import Path from typing import Optional, Dict, Any, Tuple, List, Awaitable, Callable import dill from dataclasses import dataclass, asdict from golem_task_api import RequestorAppHandler, ProviderAppHandler, entrypoint +from golem_task_api.dirutils import RequestorTaskDir from golem_task_api.enums import VerifyResult from golem_task_api.structs import Subtask, Task from twisted.internet import defer, threads @@ -24,6 +26,7 @@ Prerequisites, Runtime, RuntimeBase, + RuntimeId, RuntimeInput, RuntimeOutput, RuntimePayload @@ -72,6 +75,7 @@ class LocalhostPayload(RuntimePayload): command: str shared_dir: Path prerequisites: LocalhostPrerequisites + runtime_id: Optional[RuntimeId] = None class LocalhostPayloadBuilder(TaskApiPayloadBuilder): @@ -142,6 +146,16 @@ async def compute( return await self._prereq.compute( # type: ignore subtask_id, subtask_params) + async def abort_task(self, task_work_dir: RequestorTaskDir) -> None: + pass + + async def abort_subtask( + self, + task_work_dir: RequestorTaskDir, + subtask_id: str + ) -> None: + pass + class LocalhostRuntime(RuntimeBase): @@ -150,6 +164,8 @@ def __init__( payload: LocalhostPayload, ) -> None: super().__init__(logger) + self._id = payload.runtime_id or str(uuid.uuid4()) + # From docs: Start a fresh python interpreter process. Unnecessary # file descriptors and handles from the parent process will not # be inherited. @@ -161,6 +177,9 @@ def __init__( ) self._shutdown_deferred: Optional[defer.Deferred] = None + def id(self) -> Optional[RuntimeId]: + return self._id + def prepare(self) -> defer.Deferred: self._prepared() return defer.succeed(None) @@ -219,10 +238,10 @@ def stdin(self, encoding: Optional[str] = None) -> RuntimeInput: raise NotImplementedError def stdout(self, encoding: Optional[str] = None) -> RuntimeOutput: - raise NotImplementedError + return [] def stderr(self, encoding: Optional[str] = None) -> RuntimeOutput: - raise NotImplementedError + return [] def get_port_mapping(self, port: int) -> Tuple[str, int]: return '127.0.0.1', port diff --git a/tests/golem/task/server/test_queue.py b/tests/golem/task/server/test_queue.py index bb2dd3214f..9d952f98e4 100644 --- a/tests/golem/task/server/test_queue.py +++ b/tests/golem/task/server/test_queue.py @@ -25,7 +25,7 @@ def setUp(self): self.server.client = self.client self.server.task_keeper = taskkeeper.TaskHeaderKeeper( old_env_manager=self.client.environments_manager, - new_env_manager=NewEnvManager(), + new_env_manager=NewEnvManager(self.new_path), node=self.client.node, min_price=0 ) diff --git a/tests/golem/task/test_appbenchmarkmanager.py b/tests/golem/task/test_appbenchmarkmanager.py index a480e87cb3..d3aca452f2 100644 --- a/tests/golem/task/test_appbenchmarkmanager.py +++ b/tests/golem/task/test_appbenchmarkmanager.py @@ -33,7 +33,7 @@ def setup_env(self, env_id): @pytest.fixture(autouse=True) def setup_method(self, pytest_database_fixture, tmpdir, event_loop): # noqa # pylint: disable=attribute-defined-outside-init - self.env_manager = EnvironmentManager() + self.env_manager = EnvironmentManager(Path(tmpdir)) self.app_benchmark_manager = AppBenchmarkManager( env_manager=self.env_manager, root_path=Path(tmpdir)) diff --git a/tests/golem/task/test_envmanager.py b/tests/golem/task/test_envmanager.py index ce49ea9ad7..ea5e53349a 100644 --- a/tests/golem/task/test_envmanager.py +++ b/tests/golem/task/test_envmanager.py @@ -13,7 +13,7 @@ class EnvManagerBaseTest(DatabaseFixture): def setUp(self): super().setUp() - self.manager = EnvironmentManager() + self.manager = EnvironmentManager(self.new_path) def register_env(self, env_id): env = MagicMock(spec=Environment) @@ -125,6 +125,40 @@ def test_auto_setup(self): env2.prepare.assert_called_once() +class TestRuntimeLogs( # pylint: disable=too-many-ancestors + EnvManagerBaseTest, + TwistedTestCase +): + + @defer.inlineCallbacks + def test_runtime_logs(self): + env_id = 'env' + runtime_id = 'runtime' + stdout = ['ąąą\n', 'bbb\n', 'ććć\n'] + stderr = ['ddd\n', 'ęęę\n', 'fff\n'] + + env, *_ = self.register_env(env_id) + env.runtime().id.return_value = runtime_id + env.runtime().stdout.return_value = stdout + env.runtime().stderr.return_value = stderr + + wrapped_env = self.manager.environment("env") + runtime = wrapped_env.runtime(Mock()) + + yield runtime.prepare() + yield runtime.clean_up() + + stdout_path = self.new_path / env_id / f'{runtime_id}_stdout.txt' + self.assertTrue(stdout_path.exists()) + with stdout_path.open(mode='r', encoding='utf-8') as file: + self.assertEqual(list(file), stdout) + + stderr_path = self.new_path / env_id / f'{runtime_id}_stderr.txt' + self.assertTrue(stderr_path.exists()) + with stderr_path.open(mode='r', encoding='utf-8') as file: + self.assertEqual(list(file), stderr) + + class TestEnvironmentManagerDB( # pylint: disable=too-many-ancestors EnvManagerBaseTest, TwistedTestCase diff --git a/tests/golem/task/test_taskkeeper.py b/tests/golem/task/test_taskkeeper.py index 0f1cece84f..6c65d5ab2e 100644 --- a/tests/golem/task/test_taskkeeper.py +++ b/tests/golem/task/test_taskkeeper.py @@ -45,11 +45,13 @@ def async_run(request, success=None, error=None): success(result) -class TestTaskHeaderKeeperIsSupported(LogTestCase): +class TestTaskHeaderKeeperIsSupported(TempDirFixture, LogTestCase): + def setUp(self) -> None: + super().setUp() self.tk = TaskHeaderKeeper( old_env_manager=OldEnvManager(), - new_env_manager=NewEnvManager(), + new_env_manager=NewEnvManager(self.new_path), node=dt_p2p_factory.Node(), min_price=10.0) self.tk.old_env_manager.environments = {} @@ -123,12 +125,12 @@ def test_mask_mismatch(self): self.assertIn(UnsupportReason.MASK_MISMATCH, supported.desc) -class TaskHeaderKeeperBase(LogTestCase): +class TaskHeaderKeeperBase(TempDirFixture, LogTestCase): def setUp(self): super().setUp() self.thk = taskkeeper.TaskHeaderKeeper( old_env_manager=OldEnvManager(), - new_env_manager=NewEnvManager(), + new_env_manager=NewEnvManager(self.new_path), node=dt_p2p_factory.Node(), min_price=10.0, ) @@ -140,7 +142,7 @@ def setUp(self): self.tar = mock.Mock(spec=taskarchiver.TaskArchiver) self.thk = TaskHeaderKeeper( old_env_manager=OldEnvManager(), - new_env_manager=NewEnvManager(), + new_env_manager=NewEnvManager(self.new_path), node=dt_p2p_factory.Node(), min_price=10.0, task_archiver=self.tar, @@ -314,7 +316,7 @@ def test_check_max_tasks_per_owner(freezer, self): tk = TaskHeaderKeeper( old_env_manager=OldEnvManager(), - new_env_manager=NewEnvManager(), + new_env_manager=NewEnvManager(self.new_path), node=dt_p2p_factory.Node(), min_price=10, max_tasks_per_requestor=10)