diff --git a/.travis.yml b/.travis.yml index 3046663a3..71767df62 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,7 +13,7 @@ before_install: - ci/scripts/install_scripts.sh - pip install coverage install: -- pip install cli/ +- pip install cli/[dev] script: - coverage run -m unittest discover --start-directory cli/test - ci/run_tests.sh diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index bd0633fb0..4c013e714 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -20,7 +20,7 @@ Popper adheres to our code of conduct, [posted in this repository](CODE_OF_CONDU Take a look at the issues in our [list of projects](https://github.com/systemslab/popper/projects) to get started! -### Keywords in Issue Titles +## Keywords in Issue Titles The title of each open issue is prefixed with a keyword that denotes the following: @@ -40,7 +40,7 @@ following: Workflow that showcases available features of the CLI tool, as well as the catalog of pre-defined actions. -### Branches +## Branches There are two main branches of the codebase: @@ -51,7 +51,7 @@ There are two main branches of the codebase: * [`master`](./). This tracks the latest 2.x series, which adopted Github actions workflows as the main supported format. -### Install Popper from Source +## Install Popper from Source To install Popper in "development mode", we suggest the following approach: @@ -68,7 +68,7 @@ git clone git@github.com:systemslab/popper cd popper # install popper from source -pip install -e cli +pip install -e cli/[dev] ``` the `-e` flag passed to `pip` tells it to install the package from the @@ -78,6 +78,21 @@ the above approach you have both (1) popper installed in your machine and (2) an environment where you can modify popper and test the results of such modifications. +## Running the unittests locally + +To run the unittests on your local machine, we suggest the following +approach: + +```bash +cd popper/ + +# activate the virtualenv +source $HOME/venvs/popper/bin/activate + +# run the tests +ENGINE=docker python -X tracemalloc -m unittest discover -f cli/test/ +``` + ## How to contribute changes Once you've identified one of the issues above that you want to contribute to, you're ready to make a change to the project repository! diff --git a/README.md b/README.md index 25babc7bd..ff051bf33 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,8 @@ popper run -f wf.yml ``` Keep reading down to find [installation instructions](#installation). -For more information on the YAML syntax, see [here][cnwf]. +The full example above can be found [here][minimalpython]. For more +information on the YAML syntax, see [here][cnwf]. The high-level goals of this project are to provide: @@ -114,3 +115,4 @@ us](mailto:ivo@cs.ucsc.edu). [cn]: https://cloudblogs.microsoft.com/opensource/2018/04/23/5-reasons-you-should-be-doing-container-native-development/ [compose]: https://docs.docker.com/compose/ [podman]: https://podman.io +[minimalpython]: https://github.com/popperized/popper-examples/tree/master/workflows/minimal-python diff --git a/cli/popper/commands/cmd_run.py b/cli/popper/commands/cmd_run.py index 03c26cadd..57aacee41 100644 --- a/cli/popper/commands/cmd_run.py +++ b/cli/popper/commands/cmd_run.py @@ -1,8 +1,10 @@ import click import os +import traceback from popper import log as logging from popper.cli import log, pass_context +from popper.config import PopperConfig from popper.parser import Workflow from popper.runner import WorkflowRunner @@ -118,7 +120,23 @@ def cli(ctx, step, wfile, debug, dry_run, log_file, quiet, reuse, engine, resource_manager, skip, skip_pull, skip_clone, substitution, allow_loose, with_dependencies, workspace, conf): - """Runs a Popper workflow. Only execute STEP if given.""" + """Runs a Popper workflow. Only executes STEP if given. + + This command allows specifying the engine and the resource manager + in two different ways. + + * Using the `--engine/-e` option and `--resource-manager/-r` option. + + * Through a configuration file specified with the `--conf/-c` option. + + NOTE: + + 1. If none of the above are given, popper uses docker as the + default engine and host as the default resource manager. + + 2. If the engine or resource manager is specified through CLI and + config file both, CLI is given preference over config file. + """ # set the logging levels. level = 'STEP_INFO' if quiet: @@ -146,14 +164,15 @@ def cli(ctx, step, wfile, debug, dry_run, log_file, quiet, reuse, substitutions=substitution, allow_loose=allow_loose, include_step_dependencies=with_dependencies) - # instantiate the runner - runner = WorkflowRunner( - engine, - resource_manager, - config_file=conf, - dry_run=dry_run, - reuse=reuse, - skip_pull=skip_pull, - skip_clone=skip_clone, - workspace_dir=workspace) - runner.run(wf) + config = PopperConfig(engine_name=engine, resman_name=resource_manager, + config_file=conf, reuse=reuse, dry_run=dry_run, + skip_pull=skip_pull, skip_clone=skip_clone, + workspace_dir=workspace) + + runner = WorkflowRunner(config) + + try: + runner.run(wf) + except Exception as e: + log.debug(traceback.format_exc()) + log.fail(e) diff --git a/cli/popper/config.py b/cli/popper/config.py index 98338ff3a..09c507f37 100644 --- a/cli/popper/config.py +++ b/cli/popper/config.py @@ -1,59 +1,100 @@ import os +import yaml -from hashlib import shake_256 +import popper.scm as scm +from hashlib import shake_256 from popper.cli import log as log -import popper.scm as scm -import popper.utils as pu - class PopperConfig(object): - def __init__(self, **kwargs): + def __init__(self, engine_name=None, resman_name=None, config_file=None, + workspace_dir=os.getcwd(), reuse=False, + dry_run=False, quiet=False, skip_pull=False, + skip_clone=False): + + self.workspace_dir = os.path.realpath(workspace_dir) + self.reuse = reuse + self.dry_run = dry_run + self.quiet = quiet + self.skip_pull = skip_pull + self.skip_clone = skip_clone self.repo = scm.new_repo() - self.workspace_dir = os.path.realpath(kwargs['workspace_dir']) - self.wid = shake_256(self.workspace_dir.encode('utf-8')).hexdigest(4) self.workspace_sha = scm.get_sha(self.repo) - self.config_file = kwargs['config_file'] - self.dry_run = kwargs['dry_run'] - self.skip_clone = kwargs['skip_clone'] - self.skip_pull = kwargs['skip_pull'] - self.quiet = kwargs['quiet'] - self.reuse = kwargs['reuse'] - self.engine_name = kwargs.get('engine', None) - self.resman_name = kwargs.get('resource_manager', None) - self.engine_options = kwargs['engine_options'] - self.resman_options = kwargs['resman_options'] - self.config_from_file = pu.load_config_file(self.config_file) - - def parse(self): - self.validate() - self.normalize() - - def validate(self): - if self.config_from_file.get('engine', None): - if not self.config_from_file['engine'].get('name', None): - log.fail( - 'engine config must have the name property.') - - if self.config_from_file.get('resource_manager', None): - if not self.config_from_file['resource_manager'].get('name', None): - log.fail( - 'resource_manager config must have the name property.') - - def normalize(self): - if not self.engine_name: - if self.config_from_file.get('engine', None): - self.engine_name = self.config_from_file['engine']['name'] - self.engine_options = self.config_from_file['engine'].get( - 'options', dict()) - else: - self.engine_name = 'docker' - - if not self.resman_name: - if self.config_from_file.get('resource_manager', None): - self.resman_name = self.config_from_file['resource_manager']['name'] - self.resman_options = self.config_from_file['resource_manager'].get( - 'options', dict()) - else: - self.resman_name = 'host' + + wid = shake_256(self.workspace_dir.encode('utf-8')).hexdigest(4) + self.wid = wid + + from_file = self._load_config_from_file(config_file, engine_name, + resman_name) + + self.engine_name = from_file['engine_name'] + self.resman_name = from_file['resman_name'] + self.engine_opts = from_file['engine_opts'] + self.resman_opts = from_file['resman_opts'] + + def _load_config_from_file(self, config_file, engine_name, resman_name): + from_file = PopperConfig.__load_config_file(config_file) + loaded_conf = {} + + eng_section = from_file.get('engine', None) + eng_from_file = from_file.get('engine', {}).get('name') + if from_file and eng_section and not eng_from_file: + log.fail('No engine name given.') + + resman_section = from_file.get('resource_manager', None) + resman_from_file = from_file.get('resource_manager', {}).get('name') + if from_file and resman_section and not resman_from_file: + log.fail('No resource manager name given.') + + # set name in precedence order (or assigne default values) + if engine_name: + loaded_conf['engine_name'] = engine_name + elif eng_from_file: + loaded_conf['engine_name'] = eng_from_file + else: + loaded_conf['engine_name'] = 'docker' + + if resman_name: + loaded_conf['resman_name'] = resman_name + elif resman_from_file: + loaded_conf['resman_name'] = resman_from_file + else: + loaded_conf['resman_name'] = 'host' + + engine_opts = from_file.get('engine', {}).get('options', {}) + resman_opts = from_file.get('resource_manager', {}).get('options', {}) + loaded_conf['engine_opts'] = engine_opts + loaded_conf['resman_opts'] = resman_opts + + return loaded_conf + + @staticmethod + def __load_config_file(config_file): + """Validate and parse the engine configuration file. + + Args: + config_file(str): Path to the file to be parsed. + + Returns: + dict: Engine configuration. + """ + if isinstance(config_file, dict): + return config_file + + if not config_file: + return dict() + + if not os.path.exists(config_file): + log.fail(f'File {config_file} was not found.') + + if not config_file.endswith('.yml'): + log.fail('Configuration file must be a YAML file.') + + with open(config_file, 'r') as cf: + data = yaml.load(cf, Loader=yaml.Loader) + + if not data: + log.fail('Configuration file is empty.') + + return data diff --git a/cli/popper/parser.py b/cli/popper/parser.py index ff8356574..bbe9ef937 100644 --- a/cli/popper/parser.py +++ b/cli/popper/parser.py @@ -3,13 +3,13 @@ import re import hcl import os +import threading import yaml from copy import deepcopy from builtins import str, dict from popper.cli import log as log -import popper.scm as scm import popper.utils as pu @@ -24,6 +24,45 @@ "next"] +class threadsafe_iter_3: + """Takes an iterator/generator and makes it thread-safe by serializing call + to the `next` method of given iterator/generator.""" + + def __init__(self, it): + self.it = it + self.lock = threading.Lock() + + def __iter__(self): + return self + + def __next__(self): + with self.lock: + return self.it.__next__() + + +def threadsafe_generator(f): + """A decorator that takes a generator function and makes it thread-safe. + + Args: + f(function): Generator function + + Returns: + None + """ + def g(*args, **kwargs): + """ + + Args: + *args(list): List of non-key worded,variable length arguments. + **kwargs(dict): List of key-worded,variable length arguments. + + Returns: + function: The thread-safe function. + """ + return threadsafe_iter_3(f(*args, **kwargs)) + return g + + class Workflow(object): """Represents an immutable workflow.""" @@ -98,7 +137,7 @@ def format_command(params): return params.split(" ") return params - @pu.threadsafe_generator + @threadsafe_generator def get_stages(self): """Generator of stages. A stages is a list of steps that can be executed in parallel. diff --git a/cli/popper/runner.py b/cli/popper/runner.py index 79ebd8ae6..742b1d803 100644 --- a/cli/popper/runner.py +++ b/cli/popper/runner.py @@ -14,38 +14,31 @@ class WorkflowRunner(object): """The workflow runner.""" # class variable that holds references to runner singletons - runners = {} + __runners = {} - def __init__(self, engine, resource_manager, config_file=None, - workspace_dir=os.getcwd(), reuse=False, dry_run=False, - quiet=False, skip_pull=False, skip_clone=False, - engine_options=dict(), resman_options=dict()): - - # create a config object from the given arguments - kwargs = locals() - kwargs.pop('self') - self.config = PopperConfig(**kwargs) - self.config.parse() + def __init__(self, config): + self._config = config + self._is_resman_module_loaded = False - # dynamically load resource manager - resman_mod_name = f'popper.runner_{self.config.resman_name}' + def _load_resman_module(self): + """dynamically load resource manager module""" + resman_mod_name = f'popper.runner_{self._config.resman_name}' resman_spec = importlib.util.find_spec(resman_mod_name) if not resman_spec: raise ValueError( - f'Invalid resource manager: {self.config.resman_name}') - self.resman_mod = importlib.import_module(resman_mod_name) - - log.debug(f'WorkflowRunner config:\n{pu.prettystr(self.config)}') + f'Invalid resource manager: {self._config.resman_name}') + self._resman_mod = importlib.import_module(resman_mod_name) + self._is_resman_module_loaded = True def __enter__(self): return self def __exit__(self, exc_type, exc, traceback): """calls __exit__ on all instantiated step runners""" - self.config.repo.close() - for _, r in WorkflowRunner.runners.items(): + self._config.repo.close() + for _, r in WorkflowRunner.__runners.items(): r.__exit__(exc_type, exc, traceback) - WorkflowRunner.runners = {} + WorkflowRunner.__runners = {} @staticmethod def signal_handler(sig, frame): @@ -60,12 +53,11 @@ def signal_handler(sig, frame): None """ log.info(f'Got {sig} signal. Stopping running steps.') - for _, runner in WorkflowRunner.runners.items(): + for _, runner in WorkflowRunner.__runners.items(): runner.stop_running_tasks() sys.exit(0) - @staticmethod - def process_secrets(wf, config): + def _process_secrets(self, wf): """Checks whether the secrets defined for a step are available in the execution environment. When the environment variable `CI` is set to `true` and no environment variable is defined for a secret, the @@ -80,7 +72,7 @@ def process_secrets(wf, config): Returns: None """ - if config.dry_run or config.skip_clone: + if self._config.dry_run or self._config.skip_clone: return for _, a in wf.steps.items(): @@ -94,7 +86,24 @@ def process_secrets(wf, config): os.environ[s] = val @staticmethod - def clone_repos(wf, config): + def _setup_base_cache(): + """Set up the base cache directory. + + Returns: + str: The path to the base cache directory. + """ + if os.environ.get('POPPER_CACHE_DIR', None): + base_cache = os.environ['POPPER_CACHE_DIR'] + else: + cache_dir_default = os.path.join(os.environ['HOME'], '.cache') + cache_dir = os.environ.get('XDG_CACHE_HOME', cache_dir_default) + base_cache = os.path.join(cache_dir, 'popper') + + os.makedirs(base_cache, exist_ok=True) + + return base_cache + + def _clone_repos(self, wf): """Clone steps that reference a repository. Args: @@ -106,7 +115,8 @@ def clone_repos(wf, config): Returns: None """ - repo_cache = os.path.join(pu.setup_base_cache(), config.wid) + repo_cache = os.path.join(WorkflowRunner._setup_base_cache(), + self._config.wid) cloned = set() infoed = False @@ -124,10 +134,10 @@ def clone_repos(wf, config): a['repo_dir'] = repo_dir a['step_dir'] = step_dir - if config.dry_run: + if self._config.dry_run: continue - if config.skip_clone: + if self._config.skip_clone: if not os.path.exists(repo_dir): log.fail(f"Expecting folder '{repo_dir}' not found.") continue @@ -152,18 +162,18 @@ def run(self, wf): Returns: None """ - WorkflowRunner.process_secrets(wf, self.config) - WorkflowRunner.clone_repos(wf, self.config) + self._process_secrets(wf) + self._clone_repos(wf) for _, step in wf.steps.items(): log.debug(f'Executing step:\n{pu.prettystr(step)}') if step['uses'] == 'sh': - e = self.step_runner('host', step).run(step) + e = self._step_runner('host', step).run(step) else: - e = self.step_runner(self.config.engine_name, step).run(step) + e = self._step_runner(self._config.engine_name, step).run(step) if e != 0 and e != 78: - log.fail(f"Step '{step['name']}' failed !") + log.fail(f"Step '{step['name']}' failed ('{e}') !") log.info(f"Step '{step['name']}' ran successfully !") @@ -172,22 +182,33 @@ def run(self, wf): log.info(f"Workflow finished successfully.") - def step_runner(self, engine_name, step): + def _step_runner(self, engine_name, step): """Factory of singleton runners""" - if engine_name not in WorkflowRunner.runners: + if not self._is_resman_module_loaded: + self._load_resman_module() + + runner = WorkflowRunner.__runners.get(engine_name, None) + + if not runner: engine_cls_name = f'{engine_name.capitalize()}Runner' - engine_cls = getattr(self.resman_mod, engine_cls_name, None) + engine_cls = getattr(self._resman_mod, engine_cls_name, None) if not engine_cls: - raise ValueError(f'Unknown engine {engine_name}') - WorkflowRunner.runners[engine_name] = engine_cls(self.config) - return WorkflowRunner.runners[engine_name] + raise ValueError(f'Cannot find class for {engine_name}') + runner = engine_cls(config=self._config) + WorkflowRunner.__runners[engine_name] = runner + + return runner +# class design guidelines: +# - if not exposed to users, then it is protected, e.g `_foo()` +# - if a method does not use internal state then it is a @staticmethod +# - if both of the above, then it's both protected and static class StepRunner(object): """Base class for step runners, assumed to be singletons.""" - def __init__(self, config): - self.config = config + def __init__(self, config=PopperConfig()): + self._config = config def __enter__(self): return self diff --git a/cli/popper/runner_host.py b/cli/popper/runner_host.py index 878545442..f10596268 100644 --- a/cli/popper/runner_host.py +++ b/cli/popper/runner_host.py @@ -1,9 +1,10 @@ import os - -from subprocess import PIPE, Popen, STDOUT, SubprocessError +import signal import docker +from subprocess import Popen, STDOUT, PIPE, SubprocessError + from popper import utils as pu from popper import scm from popper.cli import log as log @@ -11,20 +12,21 @@ class HostRunner(StepRunner): - """Run an step on the Host Machine.""" + """Run a step directly on the host machine.""" + + def __init__(self, **kw): + super(HostRunner, self).__init__(**kw) - spawned_processes = set() + self._spawned_pids = set() - def __init__(self, config): - super(HostRunner, self).__init__(config) - if self.config.reuse: + if self._config.reuse: log.warning('Reuse not supported for HostRunner.') def __enter__(self): return self def __exit__(self, exc_type, exc, traceback): - HostRunner.spawned_processes = set() + pass def run(self, step): step_env = StepRunner.prepare_environment(step, os.environ) @@ -36,138 +38,123 @@ def run(self, step): log.info(f'[{step["name"]}] {cmd}') - if self.config.dry_run: + if self._config.dry_run: return 0 log.debug(f'Environment:\n{pu.prettystr(os.environ)}') - ecode, _ = pu.exec_cmd( - cmd, - step_env, - self.config.workspace_dir, - HostRunner.spawned_processes) + pid, ecode, _ = HostRunner._exec_cmd(cmd, step_env, + self._config.workspace_dir, + self._spawned_pids) + if pid != 0: + self._spawned_pids.remove(pid) + return ecode def stop_running_tasks(self): - for p in HostRunner.spawned_processes: - log.info(f'Stopping proces {p.pid}') - p.kill() + for pid in self._spawned_pids: + log.info(f'Stopping proces {pid}') + os.kill(pid, signal.SIGKILL) + + @staticmethod + def _exec_cmd(cmd, env=None, cwd=os.getcwd(), pids=set(), logging=True): + pid = 0 + try: + with Popen(cmd, stdout=PIPE, stderr=STDOUT, + universal_newlines=True, preexec_fn=os.setsid, + env=env, cwd=cwd) as p: + pid = p.pid + pids.add(p.pid) + log.debug('Reading process output') + + output = [] + for line in iter(p.stdout.readline, ''): + if logging: + log.step_info(line) + else: + output.append(line) + + p.wait() + ecode = p.poll() + + log.debug(f'Code returned by process: {ecode}') + + except SubprocessError as ex: + output = "" + ecode = ex.returncode + log.step_info(f"Command '{cmd[0]}' failed with: {ex}") + except Exception as ex: + output = "" + ecode = 1 + log.step_info(f"Command raised non-SubprocessError error: {ex}") + + return pid, ecode, '\n'.join(output) class DockerRunner(StepRunner): - """Runs steps in docker.""" - d = None + """Runs steps in docker on the local machine.""" + + def __init__(self, init_docker_client=True, **kw): + super(DockerRunner, self).__init__(**kw) - # hold references to spawned containers - spawned_containers = [] + self._spawned_containers = set() + self._d = None - def __init__(self, config): - super(DockerRunner, self).__init__(config) + if not init_docker_client: + return try: - DockerRunner.d = docker.from_env() - DockerRunner.d.version() + self._d = docker.from_env() + self._d.version() except Exception as e: log.debug(f'Docker error: {e}') log.fail(f'Unable to connect to the docker daemon.') - log.debug(f'Docker info: {pu.prettystr(DockerRunner.d.info())}') + log.debug(f'Docker info: {pu.prettystr(self._d.info())}') def __exit__(self, exc_type, exc_value, exc_traceback): - if DockerRunner.d: - DockerRunner.d.close() - DockerRunner.spawned_containers = [] - return True + if self._d: + self._d.close() + self._spawned_containers = set() def run(self, step): """Execute the given step in docker.""" - cid = pu.sanitized_name(step['name'], self.config.wid) + cid = pu.sanitized_name(step['name'], self._config.wid) - container = DockerRunner.find_container(cid) - - if container and not self.config.reuse and not self.config.dry_run: + container = self._find_container(cid) + if container and not self._config.reuse and not self._config.dry_run: container.remove(force=True) - container = DockerRunner.create_container(cid, step, self.config) + container = self._create_container(cid, step) log.info(f'[{step["name"]}] docker start') - if self.config.dry_run: + if self._config.dry_run: return 0 - DockerRunner.spawned_containers.append(container) + self._spawned_containers.add(container) container.start() cout = container.logs(stream=True) for line in cout: - log.step_info(pu.decode(line).strip('\n')) + log.step_info(line.decode().rstrip()) e = container.wait()['StatusCode'] return e def stop_running_tasks(self): - for c in DockerRunner.spawned_containers: + for c in self._spawned_containers: log.info(f'Stopping container {c.name}') c.stop() - @staticmethod - def prepare_image(step, config): - build, img, dockerfile = DockerRunner.get_build_info( - step, config.workspace_dir, config.workspace_sha) - - if build: - DockerRunner.docker_build(step, img, dockerfile, - config.dry_run) - elif not config.skip_pull and not step.get('skip_pull', False): - DockerRunner.docker_pull(step, img, config.dry_run) - - return build, img, dockerfile - - @staticmethod - def get_engine_config(step, img, cid, config): - engine_config = { - "image": img, - "command": step.get('args', None), - "name": cid, - "volumes": [ - f'{config.workspace_dir}:/workspace', - '/var/run/docker.sock:/var/run/docker.sock' - ], - "working_dir": '/workspace', - "environment": StepRunner.prepare_environment(step), - "entrypoint": step.get('runs', None), - "detach": True - } - return engine_config - - @staticmethod - def create_container(cid, step, config): - build, img, dockerfile = DockerRunner.prepare_image(step, config) - msg = f'{img} {step.get("runs", "")} {step.get("args", "")}' - log.info(f'[{step["name"]}] docker create {msg}') - - if config.dry_run: - return - - engine_config = DockerRunner.get_engine_config(step, img, cid, config) - - if hasattr(config, 'engine_options'): - DockerRunner.update_engine_config( - engine_config, config.engine_options) - log.debug(f'Engine configuration: {pu.prettystr(engine_config)}\n') - - container = DockerRunner.d.containers.create(**engine_config) - return container - - @staticmethod - def get_build_info(step, workspace_dir, workspace_sha): + def _get_build_info(self, step): """Parses the `uses` attribute and returns build information needed. Args: step(dict): dict with step data Returns: - (str, str, str): 'pull' or 'build', image ref, path to Dockerfile + (str, str, str, str): bool (build), image, tag, Dockerfile """ build = True img = None @@ -175,86 +162,92 @@ def get_build_info(step, workspace_dir, workspace_sha): if 'docker://' in step['uses']: img = step['uses'].replace('docker://', '') - if ':' not in img: - img += ":latest" + if ':' in img: + (img, tag) = img.split(':') + else: + tag = 'latest' build = False elif './' in step['uses']: - img = f'{pu.sanitized_name(step["name"], "step")}:{workspace_sha}' - build_source = os.path.join(workspace_dir, step['uses']) + img = f'{pu.sanitized_name(step["name"], "step")}' + tag = f'{self._config.workspace_sha}' + build_source = os.path.join(self._config.workspace_dir, + step['uses']) else: _, _, user, repo, _, version = scm.parse(step['uses']) - img = f'{user}/{repo}:{version}' + img = f'{user}/{repo}'.lower() + tag = version build_source = os.path.join(step['repo_dir'], step['step_dir']) - return (build, img.lower(), build_source) + return (build, img, tag, build_source) - @staticmethod - def find_container(cid): - """Check whether the container exists.""" - containers = DockerRunner.d.containers.list( - all=True, filters={'name': cid}) + def _create_container(self, cid, step): + build, img, tag, dockerfile = self._get_build_info(step) - filtered_containers = [c for c in containers if c.name == cid] + if build: + log.info( + f'[{step["name"]}] docker build {img}:{tag} {os.path.dirname(dockerfile)}') + if not self._config.dry_run: + self._d.images.build(path=dockerfile, tag=f'{img}:{tag}', + rm=True, pull=True) + elif not self._config.skip_pull and not step.get('skip_pull', False): + log.info(f'[{step["name"]}] docker pull {img}:{tag}') + if not self._config.dry_run: + self._d.images.pull(repository=f'{img}:{tag}') + + if self._config.dry_run: + return - if len(filtered_containers): - return filtered_containers[0] + container_args = self._get_container_kwargs(step, f'{img}:{tag}', cid) - return None + log.info(f'[{step["name"]}] docker create {container_args} {img}:{tag}') + container = self._d.containers.create(**container_args) + return container - @staticmethod - def docker_image_exists(img): - """Check whether a docker image exists for a step not. + def _get_container_kwargs(self, step, img, name): + args = { + "image": img, + "command": step.get('args', None), + "name": name, + "volumes": [ + f'{self._config.workspace_dir}:/workspace', + '/var/run/docker.sock:/var/run/docker.sock' + ], + "working_dir": '/workspace', + "environment": StepRunner.prepare_environment(step), + "entrypoint": step.get('runs', None), + "detach": True + } - Args: - img(str): The image to check for. + self._update_with_engine_config(args) - Returns: - bool: Whether the image exists or not. - """ - images = DockerRunner.d.images.list(all=True) - filtered_images = [i for i in images if img in i.tags] - if filtered_images: - return True - return False + log.debug(f'container args: {pu.prettystr(args)}\n') - @staticmethod - def update_engine_config(engine_conf, update_with): - engine_conf["volumes"] = [*engine_conf["volumes"], - *update_with.get('volumes', list())] - for k, v in update_with.get('environment', dict()).items(): - engine_conf["environment"].update({k: v}) + return args - for k, v in update_with.items(): - if k not in engine_conf.keys(): - engine_conf[k] = update_with[k] + def _find_container(self, cid): + """Check whether the container exists.""" + containers = self._d.containers.list(all=True, filters={'name': cid}) - @staticmethod - def docker_pull(step, img, dry_run): - """Pull an image from a container registry. + filtered_containers = [c for c in containers if c.name == cid] - Args: - img(str): The image reference to pull. + if len(filtered_containers): + return filtered_containers[0] - Returns: - None + return None + + def _update_with_engine_config(self, container_args): + """Given container arguments, it extends it so it includes options + obtained from the PopperConfig.engine_opts property. """ - log.info(f'[{step["name"]}] docker pull {img}') - if dry_run: + update_with = self._config.engine_opts + if not update_with: return - DockerRunner.d.images.pull(repository=img) - @staticmethod - def docker_build(step, tag, path, dry_run): - """Build a docker image from a Dockerfile. - - Args: - tag(str): The name of the image to build. - path(str): The path to the Dockerfile. + container_args["volumes"] = [*container_args["volumes"], + *update_with.get('volumes', list())] + for k, v in update_with.get('environment', dict()).items(): + container_args["environment"].update({k: v}) - Returns: - None - """ - log.info(f'[{step["name"]}] docker build -t {tag} {path}') - if dry_run: - return - DockerRunner.d.images.build(path=path, tag=tag, rm=True, pull=True) + for k, v in update_with.items(): + if k not in container_args.keys(): + container_args[k] = update_with[k] diff --git a/cli/popper/runner_slurm.py b/cli/popper/runner_slurm.py index 2f9a22d78..8ed28d70b 100644 --- a/cli/popper/runner_slurm.py +++ b/cli/popper/runner_slurm.py @@ -1,249 +1,153 @@ import os import time -import subprocess +import signal import threading -import sh - from popper import utils as pu -from popper import scm from popper.cli import log as log -from popper.runner import StepRunner as StepRunner -from popper.runner_host import DockerRunner as HostDockerRunner from popper.runner_host import HostRunner +from popper.runner_host import DockerRunner as HostDockerRunner -class SlurmRunner(StepRunner): - spawned_jobs = set() - - def __init__(self, config): - super(SlurmRunner, self).__init__(config) +class SlurmRunner(HostRunner): + def __init__(self, **kw): + super(SlurmRunner, self).__init__(**kw) + self._spawned_jobs = set() def __exit__(self, exc_type, exc, traceback): - SlurmRunner.spawned_jobs = set() - - def _stream_output(self, out_file): - self.output_stream_pid = set() - pu.exec_cmd(["tail", "-f", out_file], - spawned_processes=self.output_stream_pid) - - def _stream_error(self, err_file): - self.error_stream_pid = set() - pu.exec_cmd(["tail", "-f", err_file], - spawned_processes=self.error_stream_pid) - - def start_output_error_stream(self, out_file, err_file): - self.output_stream_thread = threading.Thread( - target=self._stream_output, args=(out_file,)) + self._spawned_jobs = set() - self.error_stream_thread = threading.Thread( - target=self._stream_error, args=(err_file,)) - - self.output_stream_thread.start() - self.error_stream_thread.start() - - def stop_output_error_stream(self): - output_stream_proc = list(self.output_stream_pid)[0] - error_stream_proc = list(self.error_stream_pid)[0] - - output_stream_proc.kill() - error_stream_proc.kill() - - self.output_stream_thread.join() - self.error_stream_thread.join() - - @staticmethod - def generate_script(cmd, job_name, job_script): - with open(job_script, "w") as f: - f.write("#!/bin/bash\n") - f.write(cmd) + def _tail_output(self, out_file): + self._out_stream_pid = set() + _, ecode, _ = HostRunner._exec_cmd(["tail", "-f", out_file], + pids=self._out_stream_pid) + return ecode - @staticmethod - def touch_log_files(out_file, err_file): - if os.path.exists(out_file): - os.remove(out_file) + def _start_out_stream(self, out_file): + self._out_stream_thread = threading.Thread( + target=self._tail_output, args=(out_file,)) + self._out_stream_thread.start() + time.sleep(2) + + def _stop_out_stream(self): + _out_stream_pid = list(self._out_stream_pid)[0] + try: + os.kill(_out_stream_pid, 0) + os.kill(_out_stream_pid, signal.SIGKILL) + except ProcessLookupError: + log.warning('Tail process was killed by some other process.') + self._out_stream_thread.join() + + def _submit_batch_job(self, cmd, step): + job_name = pu.sanitized_name(step['name'], self._config.wid) + temp_dir = "/tmp/popper/slurm/" + os.makedirs(temp_dir, exist_ok=True) - if os.path.exists(err_file): - os.remove(err_file) + job_script = os.path.join(temp_dir, f'{job_name}.sh') + out_file = os.path.join(temp_dir, f'{job_name}.out') - sh.touch(out_file) - sh.touch(err_file) + # create/truncate log + with open(out_file, 'w'): + pass - def submit_batch_job(self, cmd, step): - job_name = pu.sanitized_name(step['name'], self.config.wid) - temp_dir = "/tmp/popper/slurm/" - os.makedirs(temp_dir, exist_ok=True) + with open(job_script, 'w') as f: + f.write('#!/bin/bash\n') + f.write('\n'.join(cmd)) - job_script = os.path.join(temp_dir, f"{job_name}.sh") - out_file = os.path.join(temp_dir, f"{job_name}.out") - err_file = os.path.join(temp_dir, f"{job_name}.err") + sbatch_cmd = f'sbatch --wait --job-name {job_name} --output {out_file}' + sbatch_cmd = sbatch_cmd.split() - SlurmRunner.touch_log_files(out_file, err_file) - SlurmRunner.generate_script(cmd, job_name, job_script) + for k, v in self._config.resman_opts.get(step['name'], {}).items(): + sbatch_cmd.append(pu.key_value_to_flag(k, v)) - sbatch_cmd = "sbatch --wait " - sbatch_cmd += f"--job-name {job_name} " - sbatch_cmd += f"--output {out_file} " - sbatch_cmd += f"--error {err_file} " + sbatch_cmd.append(job_script) - for k, v in self.config.resman_options.get(step['name'], {}).items(): - sbatch_cmd += "-" if len(k) == 1 else "--" - if isinstance(v, bool): - sbatch_cmd += f"{k} " - else: - sbatch_cmd += f"{k} {v} " + log.info(f'[{step["name"]}] {" ".join(sbatch_cmd)}') - sbatch_cmd += job_script - log.debug(sbatch_cmd) + if self._config.dry_run: + return 0 - SlurmRunner.spawned_jobs.add(job_name) + self._spawned_jobs.add(job_name) - # start a tail process on the output and error file - self.start_output_error_stream(out_file, err_file) + # start a tail (background) process on the output file + self._start_out_stream(out_file) - # submit the job and wait, then parse the job_id - ecode, output = pu.exec_cmd(sbatch_cmd.split(" "), logging=False) - job_id = int(output.split(" ")[-1].strip("\n")) + # submit the job and wait + _, ecode, output = HostRunner._exec_cmd(sbatch_cmd, logging=False) # kill the tail process - self.stop_output_error_stream() + self._stop_out_stream() + + self._spawned_jobs.remove(job_name) - SlurmRunner.spawned_jobs.remove(job_name) return ecode - @staticmethod - def cancel_job(): - for job_name in SlurmRunner.spawned_jobs: + def stop_running_tasks(self): + for job_name in self._spawned_jobs: log.info(f'Cancelling job {job_name}') - ecode, _ = pu.exec_cmd(["scancel", "--name", job_name]) - if ecode: - log.fail(f"Failed to cancel the job {job_name}.") + _, ecode, _ = HostRunner._exec_cmd(['scancel', '--name', job_name]) + if ecode != 0: + log.warning(f'Failed to cancel the job {job_name}.') -class DockerRunner(SlurmRunner): - spawned_containers = set() +class DockerRunner(SlurmRunner, HostDockerRunner): - def __init__(self, config): - super(DockerRunner, self).__init__(config) + def __init__(self, **kw): + super(DockerRunner, self).__init__(init_docker_client=False, **kw) def __exit__(self, exc_type, exc, traceback): - spawned_containers = set() + pass def run(self, step): - """Execute the given step in docker.""" - # generate cid - cid = pu.sanitized_name(step['name'], self.config.wid) - step['cmd_list'] = [] + """Execute the given step via slurm in the docker engine.""" + cid = pu.sanitized_name(step['name'], self._config.wid) + cmd = [] - # prepare image build artifacts - build, img, dockerfile = HostDockerRunner.get_build_info( - step, self.config.workspace_dir, self.config.workspace_sha) + build, img, tag, dockerfile = self._get_build_info(step) - if build: - DockerRunner.docker_build( - step, img, dockerfile, self.config.dry_run) - elif not self.config.skip_pull and not step.get('skip_pull', False): - DockerRunner.docker_pull(step, img, self.config.dry_run) + cmd.append(f'docker rm -f {cid} || true') - # remove container if it exists - DockerRunner.docker_rm(step, cid, self.config.dry_run) + if build: + cmd.append(f'docker build -t {img}:{tag} {dockerfile}') + elif not self._config.skip_pull and not step.get('skip_pull', False): + cmd.append(f'docker pull {img}:{tag}') - # create container - DockerRunner.docker_create(step, img, cid, self.config) + cmd.append(self._create_cmd(step, f'{img}:{tag}', cid)) + cmd.append(f'docker start --attach {cid}') - if self.config.dry_run: + if self._config.dry_run: return 0 - DockerRunner.spawned_containers.add(cid) - DockerRunner.docker_start(step, cid, self.config.dry_run) - ecode = self.run_script(step) - DockerRunner.spawned_containers.remove(cid) + self._spawned_containers.add(cid) + ecode = self._submit_batch_job(cmd, step) + self._spawned_containers.remove(cid) return ecode - def run_script(self, step): - step['cmd_list'] = list(map(lambda x: 'srun ' + x, step['cmd_list'])) - final_cmd = "\n".join(step['cmd_list']) - return self.submit_batch_job(final_cmd, step) - - @staticmethod - def docker_create(step, img, cid, config): - msg = f'{img} {step.get("runs", "")} {step.get("args", "")}' - log.info(f'[{step["name"]}] docker create {msg}') - - engine_config = HostDockerRunner.get_engine_config( - step, img, cid, config) - engine_config.pop('detach') - docker_cmd = "docker create " - docker_cmd += f"--name {engine_config.pop('name')} " - docker_cmd += f"--workdir {engine_config.pop('working_dir')} " - - if engine_config.get('entrypoint', None): - docker_cmd += f"--entrypoint '{' '.join(engine_config.pop('entrypoint'))}' " - - # append the vol and envs - for vol in engine_config.pop('volumes'): - docker_cmd += f"-v {vol} " - for env_key, env_val in engine_config.pop('environment').items(): - docker_cmd += f"-e {env_key}={env_val} " - - image = engine_config.pop('image') - - if engine_config.get('command', None): - command = ' '.join(engine_config.pop('command')) - else: - command = ' ' - - for k, v in engine_config.items(): - if not v: - continue - if isinstance(v, bool): - docker_cmd += "-" if len(k) == 1 else "--" - docker_cmd += f"{k} " - elif isinstance(v, list): - for item in v: - docker_cmd += "-" if len(k) == 1 else "--" - docker_cmd += f"{k} {item} " - else: - docker_cmd += "-" if len(k) == 1 else "--" - docker_cmd += f"{k} {v} " + def _create_cmd(self, step, img, cid): + container_args = self._get_container_kwargs(step, img, cid) + container_args.pop('detach') + cmd = ['docker create'] + cmd.append(f"--name {container_args.pop('name')}") + cmd.append(f"--workdir {container_args.pop('working_dir')}") + + entrypoint = container_args.pop('entrypoint', None) + if entrypoint: + cmd.append(f"--entrypoint {' '.join(entrypoint)}") + + # append volume and environment flags + for vol in container_args.pop('volumes'): + cmd.append(f'-v {vol}') + for env_key, env_val in container_args.pop('environment').items(): + cmd.append(f'-e {env_key}={env_val}') + + command = ' '.join(container_args.pop('command', [])) + image = container_args.pop('image') + + # anything else is treated as a flag + for k, v in container_args.items(): + cmd.append(pu.key_value_to_flag(k, v)) # append the image and the commands - docker_cmd += f"{image} {command} > /dev/null" - step['cmd_list'].append(docker_cmd) - - @staticmethod - def docker_start(step, cid, dry_run): - log.info(f'[{step["name"]}] docker start') - if dry_run: - return - docker_cmd = f"docker start --attach {cid}" - step['cmd_list'].append(docker_cmd) - - @staticmethod - def docker_pull(step, img, dry_run): - log.info(f'[{step["name"]}] docker pull {img}') - if dry_run: - return - docker_cmd = f"docker pull {img} > /dev/null" - step['cmd_list'].append(docker_cmd) - - @staticmethod - def docker_build(step, tag, path, dry_run): - log.info(f'[{step["name"]}] docker build -t {tag} {path}') - if dry_run: - return - docker_cmd = f"docker build {tag} {path} > /dev/null" - step['cmd_list'].append(docker_cmd) - - @staticmethod - def docker_rm(step, cid, dry_run): - if dry_run: - return - docker_cmd = f"docker rm -f {cid} || true > /dev/null" - step['cmd_list'].append(docker_cmd) + cmd.append(f'{image} {command}') - def stop_running_tasks(self): - for cid in DockerRunner.spawned_containers: - log.info(f'Stopping container {cid}') - pu.exec_cmd(["docker", "stop", cid]) - SlurmRunner.cancel_job() + return ' '.join(cmd) diff --git a/cli/popper/utils.py b/cli/popper/utils.py index c7b0528d5..9015920aa 100644 --- a/cli/popper/utils.py +++ b/cli/popper/utils.py @@ -1,91 +1,12 @@ -import importlib.util import os import re -import threading import yaml -from builtins import str from distutils.spawn import find_executable -from subprocess import Popen, STDOUT, PIPE, SubprocessError -from dotmap import DotMap from popper.cli import log -def setup_base_cache(): - """Set up the base cache directory. - - Args: - None - - Returns: - str: The path to the base cache directory. - """ - if os.environ.get('POPPER_CACHE_DIR', None): - base_cache = os.environ['POPPER_CACHE_DIR'] - else: - cache_dir = os.environ.get('XDG_CACHE_HOME', - os.path.join(os.environ['HOME'], '.cache')) - base_cache = os.path.join(cache_dir, 'popper') - - os.makedirs(base_cache, exist_ok=True) - - return base_cache - - -def decode(line): - """Make treatment of stdout Python 2/3 compatible. - - Args: - line(str): The string that is required to be converted. - - Returns: - str : The string in converted form. - """ - if isinstance(line, bytes): - return line.decode('utf-8') - return line - - -class threadsafe_iter_3: - """Takes an iterator/generator and makes it thread-safe by serializing call - to the `next` method of given iterator/generator.""" - - def __init__(self, it): - self.it = it - self.lock = threading.Lock() - - def __iter__(self): - return self - - def __next__(self): - with self.lock: - return self.it.__next__() - - -def threadsafe_generator(f): - """A decorator that takes a generator function and makes it thread-safe. - - Args: - f(function): Generator function - - Returns: - None - """ - def g(*args, **kwargs): - """ - - Args: - *args(list): List of non-key worded,variable length arguments. - **kwargs(dict): List of key-worded,variable length arguments. - - Returns: - function: The thread-safe function. - """ - return threadsafe_iter_3(f(*args, **kwargs)) - return g - - def sanitized_name(name, wid=''): """Clean a step name and change it to proper format. It replaces all the unwanted characters with `_`. @@ -131,50 +52,6 @@ def of_type(param, valid_types): return False -def write_file(path, content=''): - """Create and write contents to a file. If no content is provided a blank - file is created. - - Args: - path(str): The path where the file would be created. - content(str, optional): The content to write in the file. - (Default value = '') - - Returns: - None - """ - f = open(path, 'w') - f.write(content) - f.close() - - -def load_config_file(config_file): - """Validate and parse the engine configuration file. - - Args: - config_file(str): Path to the file to be parsed. - - Returns: - dict: Engine configuration. - """ - if not config_file: - return dict() - - if not os.path.exists(config_file): - log.fail(f'File {config_file} was not found.') - - if not config_file.endswith('.yml'): - log.fail('Configuration file must be a YAML file.') - - with open(config_file, 'r') as cf: - data = yaml.load(cf, Loader=yaml.Loader) - - if not data: - log.fail('Configuration file is empty.') - - return data - - def assert_executable_exists(command): """Check if the given command can be invoked; fails if not.""" if not find_executable(command): @@ -182,51 +59,26 @@ def assert_executable_exists(command): def prettystr(a): - if isinstance(a, DotMap): - a = a.toDict() + """improve how dictionaries get printed""" if isinstance(a, os._Environ): a = dict(a) if isinstance(a, dict): return f'{yaml.dump(a, default_flow_style=False)}' -def exec_cmd(cmd, env=None, cwd=os.getcwd(), spawned_processes=set(), - logging=True): - try: - with Popen(cmd, stdout=PIPE, stderr=STDOUT, - universal_newlines=True, preexec_fn=os.setsid, - env=env, cwd=cwd) as p: - - spawned_processes.add(p) - log.debug('Reading process output') - - output = "" - for line in iter(p.stdout.readline, ''): - line_decoded = decode(line) - if logging: - log.step_info(line_decoded[:-1]) - else: - output += line_decoded - - p.wait() - ecode = p.poll() - spawned_processes.remove(p) +def key_value_to_flag(k, v, equals_symbol=False): + is_bool = isinstance(v, bool) - log.debug(f'Code returned by process: {ecode}') + if is_bool and not v and not equals_symbol: + return '' - except SubprocessError as ex: - output = "" - ecode = ex.returncode - log.step_info(f"Command '{cmd[0]}' failed with: {ex}") - except Exception as ex: - output = "" - ecode = 1 - log.step_info(f"Command raised non-SubprocessError error: {ex}") + flag = '-' if len(k) == 1 else '--' - return ecode, output - - -def select_not_none(array): - for item in array: - if item: - return item + if equals_symbol: + flag += f'{k}={str(v).lower() if is_bool else v}' + else: + if isinstance(v, bool): + flag += f'{k}' + else: + flag += f'{k} {v}' + return flag diff --git a/cli/setup.py b/cli/setup.py index c18e7a73c..71fac0440 100644 --- a/cli/setup.py +++ b/cli/setup.py @@ -14,9 +14,6 @@ packages=['popper', 'popper.commands'], include_package_data=True, install_requires=[ - 'sh', - 'dotmap', - 'testfixtures', 'python-vagrant', 'GitPython', 'spython', @@ -25,6 +22,9 @@ 'pyyaml', 'docker' ], + extras_require={ + 'dev': ['testfixtures'] + }, entry_points=''' [console_scripts] popper=popper.cli:cli diff --git a/cli/test/fixtures/settings_1.yml b/cli/test/fixtures/settings_1.yml deleted file mode 100644 index ad8bb64e8..000000000 --- a/cli/test/fixtures/settings_1.yml +++ /dev/null @@ -1,5 +0,0 @@ -engine: - options: - privileged: true -resource_manager: - name: slurm \ No newline at end of file diff --git a/cli/test/fixtures/settings_2.yml b/cli/test/fixtures/settings_2.yml deleted file mode 100644 index af9c7d08c..000000000 --- a/cli/test/fixtures/settings_2.yml +++ /dev/null @@ -1,7 +0,0 @@ -engine: - name: docker - options: - privileged: true -resource_manager: - options: - foo: bar diff --git a/cli/test/fixtures/settings_3.yml b/cli/test/fixtures/settings_3.yml deleted file mode 100644 index c7f56f982..000000000 --- a/cli/test/fixtures/settings_3.yml +++ /dev/null @@ -1,10 +0,0 @@ -engine: - name: docker - options: - privileged: true -resource_manager: - name: slurm - options: - action_one: - nodes: 1 - cpus-per-task: 1 diff --git a/cli/test/test_config.py b/cli/test/test_config.py index dc50cb9a8..571ef0fce 100644 --- a/cli/test/test_config.py +++ b/cli/test/test_config.py @@ -1,8 +1,6 @@ import unittest import os -import utils as testutils - from popper.config import PopperConfig from popper.cli import log @@ -10,91 +8,84 @@ FIXDIR = f'{os.path.dirname(os.path.realpath(__file__))}/fixtures' -def _wfile(name, format): - return f'{FIXDIR}/{name}.{format}' - - class TestPopperConfig(unittest.TestCase): + default_args = { + 'skip_clone': False, + 'engine_name': 'docker', + 'engine_opts': {}, + 'resman_name': 'host', + 'resman_opts': {}, + 'skip_pull': False, + 'dry_run': False, + 'workspace_dir': os.getcwd(), + 'quiet': False, + 'reuse': False + } + def setUp(self): log.setLevel('CRITICAL') - self.test_dir = testutils.mk_repo().working_dir - common_kwargs = { - 'skip_clone': False, - 'skip_pull': False, - 'dry_run': False, - 'workspace_dir': self.test_dir, - 'quiet': False, - 'reuse': False, - 'engine_options': dict(), - 'resman_options': dict()} - - self.from_config_file = PopperConfig( - config_file=_wfile("settings_3", "yml"), - engine=None, - resource_manager=None, - **common_kwargs) - - self.from_cli = PopperConfig( - config_file=_wfile("settings_3", "yml"), - engine='foo', - resource_manager='bar', - **common_kwargs) - - self.from_defaults = PopperConfig( - config_file=None, - engine=None, - resource_manager=None, - **common_kwargs) - - self.invalid_popper_cfg_one = PopperConfig( - config_file=_wfile("settings_1", "yml"), - engine=None, - resource_manager=None, - **common_kwargs) - - self.invalid_popper_cfg_two = PopperConfig( - config_file=_wfile("settings_2", "yml"), - engine=None, - resource_manager=None, - **common_kwargs) + self.maxDiff = None def tearDown(self): log.setLevel('NOTSET') - def test_parse(self): - self.assertEqual( - self.from_config_file.config_from_file, { - 'engine': { - 'name': 'docker', 'options': { - 'privileged': True}}, 'resource_manager': { - 'name': 'slurm', 'options': { - 'action_one': { - 'cpus-per-task': 1, 'nodes': 1}}}}) - - def test_validate(self): - self.assertRaises(SystemExit, self.invalid_popper_cfg_one.validate) - self.assertRaises(SystemExit, self.invalid_popper_cfg_two.validate) - - def test_normalize(self): - # --engine and --resource manager not provided through cli - # so, test those values get read from the config file. - self.from_config_file.normalize() - self.assertEqual(self.from_config_file.engine_name, 'docker') - self.assertEqual(self.from_config_file.resman_name, 'slurm') - self.assertEqual( - self.from_config_file.engine_options, { - 'privileged': True}) - self.assertEqual( - self.from_config_file.resman_options, { - 'action_one': { - 'nodes': 1, 'cpus-per-task': 1}}) - - # --engine and --resource manager provided, config file is ignored. - self.from_cli.normalize() - self.assertEqual(self.from_cli.engine_name, 'foo') - self.assertEqual(self.from_cli.resman_name, 'bar') - - # neither flags nor config flag describes what runtime/resman to use. - self.from_defaults.normalize() - self.assertEqual(self.from_defaults.engine_name, 'docker') - self.assertEqual(self.from_defaults.resman_name, 'host') + def test_config_defaults(self): + conf = PopperConfig() + actual = conf.__dict__ + + expected = TestPopperConfig.default_args + + self.assertEqual(expected, + TestPopperConfig.extract_dict(expected, actual)) + + def test_config_non_defaults(self): + expected = { + 'skip_clone': True, + 'skip_pull': True, + 'dry_run': True, + 'workspace_dir': os.path.realpath('/tmp/foo'), + 'quiet': True, + 'reuse': True + } + conf = PopperConfig(**expected) + actual = conf.__dict__ + + self.assertEqual(expected, + TestPopperConfig.extract_dict(expected, actual)) + + def test_config_from_file(self): + config = { + 'engine': {'options': {'privileged': True}}, + 'resource_manager': {'options': {'foo': 'bar'}} + } + kwargs = {'config_file': config} + + # engine name missing + with self.assertLogs('popper', level='INFO') as cm: + self.assertRaises(SystemExit, PopperConfig, **kwargs) + self.assertEqual(len(cm.output), 1) + self.assertTrue('No engine name given' in cm.output[0]) + + # resman name missing + config.update({'engine': {'name': 'foo'}}) + with self.assertLogs('popper', level='INFO') as cm: + self.assertRaises(SystemExit, PopperConfig, **kwargs) + self.assertEqual(len(cm.output), 1) + self.assertTrue('No resource manager name given' in cm.output[0]) + + # now all OK + config.update({'resource_manager': {'name': 'bar'}}) + conf = PopperConfig(**kwargs) + self.assertEqual(conf.engine_name, 'foo') + self.assertEqual(conf.resman_name, 'bar') + self.assertEqual(conf.engine_opts, {}) + self.assertEqual(conf.resman_opts, {}) + + config.update({'engine': {'name': 'bar', 'options': {'foo': 'baz'}}}) + conf = PopperConfig(**kwargs) + self.assertEqual(conf.engine_opts, {'foo': 'baz'}) + + @staticmethod + def extract_dict(A, B): + # taken from https://stackoverflow.com/a/21213251 + return dict([(k, B[k]) for k in A.keys() if k in B.keys()]) diff --git a/cli/test/test_runner.py b/cli/test/test_runner.py index 21563773c..eed263f89 100644 --- a/cli/test/test_runner.py +++ b/cli/test/test_runner.py @@ -2,8 +2,8 @@ import unittest import shutil -from dotmap import DotMap from unittest.mock import patch +from popper.config import PopperConfig from popper.parser import YMLWorkflow from popper.runner import WorkflowRunner, StepRunner @@ -25,31 +25,27 @@ def test_check_secrets(self): wf.parse() # in dry-run, secrets are ignored - WorkflowRunner.process_secrets(wf, DotMap({'dry_run': True})) + runner = WorkflowRunner(PopperConfig(dry_run=True)) + runner._process_secrets(wf) + + # now go back to not dry-running + runner = WorkflowRunner(PopperConfig()) # when CI=true it should fail os.environ['CI'] = 'true' - self.assertRaises( - SystemExit, - WorkflowRunner.process_secrets, - wf, - DotMap({})) + self.assertRaises(SystemExit, runner._process_secrets, wf) # add one secret os.environ['SECRET_ONE'] = '1234' # it should fail again, as we're missing one - self.assertRaises( - SystemExit, - WorkflowRunner.process_secrets, - wf, - DotMap({})) + self.assertRaises(SystemExit, runner._process_secrets, wf) os.environ.pop('CI') # now is fine with patch('getpass.getpass', return_value='5678'): - WorkflowRunner.process_secrets(wf, DotMap({})) + runner._process_secrets(wf) # pop the other os.environ.pop('SECRET_ONE') @@ -62,36 +58,54 @@ def test_clone_repos(self): """) wf.parse() - wid = '12345' + conf = PopperConfig() cache_dir = os.path.join(os.environ['HOME'], '.cache/popper/') # clone repos in the default cache directory. - WorkflowRunner.clone_repos(wf, DotMap({'wid': wid})) - self.assertTrue( - os.path.exists( - os.path.join(cache_dir, wid, 'github.com/popperized/bin'))) + runner = WorkflowRunner(conf) + runner._clone_repos(wf) + step_dir = os.path.join(cache_dir, conf.wid, + 'github.com/popperized/bin') + self.assertTrue(os.path.exists(step_dir)) # clone repos in custom cache directory os.environ['POPPER_CACHE_DIR'] = '/tmp/smdir' - WorkflowRunner.clone_repos(wf, DotMap({'wid': wid})) - self.assertTrue( - os.path.exists( - os.path.join('/tmp/smdir', wid, 'github.com/popperized/bin'))) + runner._clone_repos(wf) + step_dir = os.path.join('/tmp/smdir', conf.wid, + 'github.com/popperized/bin') + self.assertTrue(os.path.exists(step_dir)) os.environ.pop('POPPER_CACHE_DIR') # check failure when container is not available and we skip cloning shutil.rmtree('/tmp/smdir') shutil.rmtree(cache_dir) - self.assertRaises(SystemExit, WorkflowRunner.clone_repos, wf, - DotMap({'wid': wid, 'skip_clone': True})) + conf = PopperConfig(skip_clone=True) + runner = WorkflowRunner(conf) + self.assertRaises(SystemExit, runner._clone_repos, wf) def test_steprunner_factory(self): - with WorkflowRunner('docker', 'host') as r: - self.assertEqual(r.step_runner('host', None).__class__.__name__, + with WorkflowRunner(PopperConfig()) as r: + self.assertEqual(r._step_runner('host', None).__class__.__name__, 'HostRunner') - self.assertEqual(r.step_runner('docker', None).__class__.__name__, + self.assertEqual(r._step_runner('docker', None).__class__.__name__, 'DockerRunner') + def test_setup_base_cache(self): + cache_dir = WorkflowRunner._setup_base_cache() + try: + self.assertEqual(cache_dir, os.environ['XDG_CACHE_HOME']) + except KeyError: + self.assertEqual( + cache_dir, + os.path.join( + os.environ['HOME'], + '.cache/popper')) + + os.environ['POPPER_CACHE_DIR'] = '/tmp/popper' + cache_dir = WorkflowRunner._setup_base_cache() + self.assertEqual(cache_dir, '/tmp/popper') + os.environ.pop('POPPER_CACHE_DIR') + class TestStepRunner(unittest.TestCase): def setUp(self): diff --git a/cli/test/test_runner_host.py b/cli/test/test_runner_host.py index e5f5f913b..112d5a570 100644 --- a/cli/test/test_runner_host.py +++ b/cli/test/test_runner_host.py @@ -1,11 +1,20 @@ import os +import time import unittest +from subprocess import Popen + +import docker + import utils as testutils +import popper.utils as pu +from testfixtures import LogCapture + +from popper.config import PopperConfig from popper.parser import YMLWorkflow from popper.runner import WorkflowRunner - +from popper.runner_host import HostRunner, DockerRunner from popper.cli import log as log @@ -15,9 +24,9 @@ def setUp(self): def test_run(self): repo = testutils.mk_repo() + conf = PopperConfig(workspace_dir=repo.working_dir) - with WorkflowRunner( - 'docker', 'host', workspace_dir=repo.working_dir) as r: + with WorkflowRunner(conf) as r: wf = YMLWorkflow(""" version: '1' steps: @@ -53,17 +62,160 @@ def test_run(self): repo.close() + def test_exec_cmd(self): + cmd = ["echo", "hello-world"] + pid, ecode, output = HostRunner._exec_cmd(cmd, logging=False) + self.assertGreater(pid, 0) + self.assertEqual(ecode, 0) + self.assertEqual(output, "hello-world\n") + + with LogCapture('popper') as log: + pid, ecode, output = HostRunner._exec_cmd(cmd) + self.assertGreater(pid, 0) + self.assertEqual(ecode, 0) + self.assertEqual(output, "") + log.check_present(('popper', 'STEP_INFO', 'hello-world\n')) + + cmd = ["env"] + pid, ecode, output = HostRunner._exec_cmd( + cmd, env={'TESTACION': 'test'}, cwd="/tmp", logging=False) + self.assertGreater(pid, 0) + self.assertEqual(ecode, 0) + self.assertTrue('TESTACION' in output) + + _pids = set() + _, _, _ = HostRunner._exec_cmd(["sleep", "2"], pids=_pids) + self.assertEqual(len(_pids), 1) + + def test_stop_running_tasks(self): + with HostRunner() as hr: + with Popen(["sleep", "2000"]) as p: + pid = p.pid + hr._spawned_pids.add(pid) + hr.stop_running_tasks() + time.sleep(2) + self.assertRaises(ProcessLookupError, os.kill, pid, 0) + class TestHostDockerRunner(unittest.TestCase): def setUp(self): log.setLevel('CRITICAL') + @unittest.skipIf(os.environ['ENGINE'] != 'docker', 'ENGINE != docker') + def test_create_container(self): + config = PopperConfig() + step = { + 'uses': 'docker://alpine:3.9', + 'runs': ['echo hello'], + 'name': 'kontainer_one' + } + cid = pu.sanitized_name(step['name'], config.wid) + with DockerRunner(init_docker_client=True, config=config) as dr: + c = dr._create_container(cid, step) + self.assertEqual(c.status, 'created') + c.remove() + + @unittest.skipIf(os.environ['ENGINE'] != 'docker', 'ENGINE != docker') + def test_stop_running_tasks(self): + with DockerRunner() as dr: + dclient = docker.from_env() + c1 = dclient.containers.run( + 'debian:buster-slim', 'sleep 20000', detach=True) + c2 = dclient.containers.run( + 'alpine:3.9', 'sleep 10000', detach=True) + dr._spawned_containers.add(c1) + dr._spawned_containers.add(c2) + dr.stop_running_tasks() + self.assertEqual(c1.status, 'created') + self.assertEqual(c2.status, 'created') + dclient.close() + + @unittest.skipIf(os.environ['ENGINE'] != 'docker', 'ENGINE != docker') + def test_get_container_kwargs(self): + step = { + 'uses': 'popperized/bin/sh@master', + 'args': ['ls'], + 'name': 'one', + 'repo_dir': '/path/to/repo/dir', + 'step_dir': 'sh'} + + config_dict = { + 'engine': { + 'name': 'docker', + 'options': { + 'privileged': True, + 'hostname': 'popper.local', + 'domainname': 'www.example.org', + 'volumes': ['/path/in/host:/path/in/container'], + 'environment': {'FOO': 'bar'} + } + }, + 'resource_manager': { + 'name': 'slurm' + } + } + + config = PopperConfig( + config_file=config_dict, + workspace_dir='/path/to/workdir') + + with DockerRunner(init_docker_client=False, config=config) as dr: + args = dr._get_container_kwargs( + step, 'alpine:3.9', 'container_a') + + self.assertEqual(args, { + 'image': 'alpine:3.9', + 'command': ['ls'], + 'name': 'container_a', + 'volumes': [ + '/path/to/workdir:/workspace', + '/var/run/docker.sock:/var/run/docker.sock', + '/path/in/host:/path/in/container'], + 'working_dir': '/workspace', + 'environment': {'FOO': 'bar'}, + 'entrypoint': None, + 'detach': True, + 'privileged': True, + 'hostname': 'popper.local', + 'domainname': 'www.example.org'}) + + @unittest.skipIf(os.environ['ENGINE'] != 'docker', 'ENGINE != docker') + def test_get_build_info(self): + step = { + 'uses': 'popperized/bin/sh@master', + 'args': ['ls'], + 'name': 'one', + 'repo_dir': '/path/to/repo/dir', + 'step_dir': 'sh'} + with DockerRunner(init_docker_client=False) as dr: + build, img, tag, build_sources = dr._get_build_info(step) + self.assertEqual(build, True) + self.assertEqual(img, 'popperized/bin') + self.assertEqual(tag, 'master') + self.assertEqual( + build_sources, + '/path/to/repo/dir/sh') + + step = { + 'uses': 'docker://alpine:3.9', + 'runs': ['sh', '-c', 'echo $FOO > hello.txt ; pwd'], + 'env': {'FOO': 'bar'}, + 'name': '1' + } + + with DockerRunner(init_docker_client=False) as dr: + build, img, tag, build_sources = dr._get_build_info(step) + self.assertEqual(build, False) + self.assertEqual(img, 'alpine') + self.assertEqual(tag, '3.9') + self.assertEqual(build_sources, None) + @unittest.skipIf(os.environ['ENGINE'] != 'docker', 'ENGINE != docker') def test_docker_basic_run(self): repo = testutils.mk_repo() + conf = PopperConfig(workspace_dir=repo.working_dir) - with WorkflowRunner( - 'docker', 'host', workspace_dir=repo.working_dir) as r: + with WorkflowRunner(conf) as r: wf = YMLWorkflow(""" version: '1' steps: diff --git a/cli/test/test_runner_slurm.py b/cli/test/test_runner_slurm.py index 59fdfafea..b5381ca50 100644 --- a/cli/test/test_runner_slurm.py +++ b/cli/test/test_runner_slurm.py @@ -1,23 +1,21 @@ import os import unittest - +import tempfile import utils as testutils from popper.config import PopperConfig +from popper.runner import WorkflowRunner +from popper.parser import YMLWorkflow from popper.runner_slurm import SlurmRunner, DockerRunner from popper.cli import log as log -from dotmap import DotMap - -from testfixtures import Replacer +from testfixtures import Replacer, replace, compare from testfixtures.popen import MockPopen +from testfixtures.mock import call -FIXDIR = f'{os.path.dirname(os.path.realpath(__file__))}/fixtures' - - -def _wfile(name, format): - return f'{FIXDIR}/{name}.{format}' +def mock_kill(pid, sig): + return 0 class TestSlurmSlurmRunner(unittest.TestCase): @@ -25,109 +23,263 @@ def setUp(self): log.setLevel('CRITICAL') self.Popen = MockPopen() replacer = Replacer() - replacer.replace('popper.utils.Popen', self.Popen) + replacer.replace('popper.runner_host.Popen', self.Popen) self.addCleanup(replacer.restore) - self.repo = testutils.mk_repo().working_dir - self.slurm_runner = SlurmRunner(DotMap({})) + self.repo = tempfile.mkdtemp() def tearDown(self): log.setLevel('NOTSET') - def test__stream_output(self): + def test_tail_output(self): self.Popen.set_command('tail -f slurm-x.out', returncode=0) - self.slurm_runner._stream_output('slurm-x.out') + with SlurmRunner(config=PopperConfig()) as sr: + self.assertEqual( + sr._tail_output('slurm-x.out'), 0) + self.assertEqual( + len(sr._out_stream_pid), 1) - def test__stream_error(self): - self.Popen.set_command('tail -f slurm-x.err', returncode=0) - self.slurm_runner._stream_error('slurm-x.err') + def test_stop_running_tasks(self): + self.Popen.set_command('scancel --name job_a', returncode=0) + with SlurmRunner(config=PopperConfig()) as sr: + sr._spawned_jobs.add('job_a') + sr.stop_running_tasks() + self.assertEqual( + call.Popen( + ['scancel', '--name', 'job_a'], + cwd=os.getcwd(), + env=None, preexec_fn=os.setsid, stderr=-2, stdout=-1, + universal_newlines=True) in self.Popen.all_calls, True) - def test_generate_script(self): - cmd = " ".join(["docker", "version"]) - job_script = os.path.join(self.repo, 'script.sh') - SlurmRunner.generate_script(cmd, 'sample_job', job_script) - with open(job_script, 'r') as f: - content = f.read() - self.assertEqual(content, "#!/bin/bash\ndocker version") + @replace('popper.runner_slurm.os.kill', mock_kill) + def test_submit_batch_job(self, mock_kill): + self.Popen.set_command( + 'sbatch --wait ' + '--job-name popper_sample_123abc ' + '--output /tmp/popper/slurm/popper_sample_123abc.out ' + '/tmp/popper/slurm/popper_sample_123abc.sh', + returncode=0) + self.Popen.set_command( + 'tail -f /tmp/popper/slurm/popper_sample_123abc.out', returncode=0) + config = PopperConfig(workspace_dir='/w') + config.wid = "123abc" + step = {"name": "sample"} + with SlurmRunner(config=config) as sr: + sr._submit_batch_job(["ls -la"], step) + with open("/tmp/popper/slurm/popper_sample_123abc.sh", 'r') as f: + content = f.read() - def test_touch_log_files(self): - out_file = os.path.join(self.repo, 'slurm-x.out') - err_file = os.path.join(self.repo, 'slurm-x.err') - self.slurm_runner.touch_log_files(out_file, err_file) - self.assertEqual(os.path.exists(out_file), True) - self.assertEqual(os.path.exists(err_file), True) + self.assertEqual(content, "#!/bin/bash\nls -la") + self.assertEqual(len(sr._spawned_jobs), 0) + self.assertEqual(sr._out_stream_thread.is_alive(), False) - def test_cancel_job(self): - self.Popen.set_command('scancel --name job_a', returncode=0) - SlurmRunner.spawned_jobs.add('job_a') - SlurmRunner.cancel_job() + call_tail = call.Popen( + ['tail', '-f', '/tmp/popper/slurm/popper_sample_123abc.out'], + cwd=os.getcwd(), + env=None, preexec_fn=os.setsid, + stderr=-2, stdout=-1, universal_newlines=True) + + call_sbatch = call.Popen( + ['sbatch', '--wait', '--job-name', + 'popper_sample_123abc', '--output', + '/tmp/popper/slurm/popper_sample_123abc.out', + '/tmp/popper/slurm/popper_sample_123abc.sh'], + cwd=os.getcwd(), env=None, + preexec_fn=os.setsid, + stderr=-2, stdout=-1, universal_newlines=True) + + self.assertEqual(call_tail in self.Popen.all_calls, True) + self.assertEqual(call_sbatch in self.Popen.all_calls, True) + + @replace('popper.runner_slurm.os.kill', mock_kill) + def test_submit_job_failure(self, mock_kill): + self.Popen.set_command( + 'sbatch --wait --job-name popper_1_123abc ' + '--output /tmp/popper/slurm/popper_1_123abc.out ' + '/tmp/popper/slurm/popper_1_123abc.sh', returncode=12) + + self.Popen.set_command( + 'tail -f /tmp/popper/slurm/popper_1_123abc.out', + returncode=0) + + config_dict = { + 'engine': { + 'name': 'docker', + 'options': {} + }, + 'resource_manager': { + 'name': 'slurm', + 'options': {} + } + } + + config = PopperConfig( + workspace_dir='/w', + config_file=config_dict) + config.wid = "123abc" + + with WorkflowRunner(config) as r: + wf = YMLWorkflow(""" + version: '1' + steps: + - uses: 'popperized/bin/sh@master' + runs: [cat] + args: README.md + """) + wf.parse() + self.assertRaises(SystemExit, r.run, wf) + + call_tail = call.Popen( + ['tail', '-f', '/tmp/popper/slurm/popper_1_123abc.out'], + cwd=os.getcwd(), + env=None, preexec_fn=os.setsid, + stderr=-2, stdout=-1, universal_newlines=True) + + call_sbatch = call.Popen(['sbatch', + '--wait', + '--job-name', + 'popper_1_123abc', + '--output', + '/tmp/popper/slurm/popper_1_123abc.out', + '/tmp/popper/slurm/popper_1_123abc.sh'], + cwd=os.getcwd(), + env=None, + preexec_fn=os.setsid, + stderr=-2, + stdout=-1, + universal_newlines=True) + + self.assertEqual(call_tail in self.Popen.all_calls, True) + self.assertEqual(call_sbatch in self.Popen.all_calls, True) + + def test_dry_run(self): + repo = testutils.mk_repo() + config = PopperConfig( + engine_name='docker', + resman_name='slurm', + dry_run=True, + workspace_dir=repo.working_dir) + + with WorkflowRunner(config) as r: + wf = YMLWorkflow(""" + version: '1' + steps: + - uses: 'popperized/bin/sh@master' + runs: [cat] + args: README.md + """) + wf.parse() + r.run(wf) + + self.assertEqual(self.Popen.all_calls, []) class TestSlurmDockerRunner(unittest.TestCase): def setUp(self): log.setLevel('CRITICAL') - self.test_dir = "/path/to/workspace" - common_kwargs = { - 'skip_clone': False, - 'skip_pull': False, - 'dry_run': False, - 'workspace_dir': self.test_dir, - 'quiet': False, - 'reuse': False, - 'engine_options': dict(), - 'resman_options': dict()} - - self.config = PopperConfig( - config_file=_wfile("settings_3", "yml"), - engine=None, - resource_manager=None, - **common_kwargs) - - self.docker_runner = DockerRunner(self.config) - self.cls = TestSlurmDockerRunner - - @classmethod - def setUpClass(cls): - cls.step = {'cmd_list': [], 'name': 'one'} + self.Popen = MockPopen() + replacer = Replacer() + replacer.replace('popper.runner_host.Popen', self.Popen) + self.addCleanup(replacer.restore) def tearDown(self): log.setLevel('NOTSET') - def test_docker_build(self): - DockerRunner.docker_build( - self.cls.step, - 'alpine', - '/path/to/build_dir', - self.config.dry_run) - self.assertEqual( - self.cls.step['cmd_list'], - ['docker build alpine /path/to/build_dir > /dev/null']) - - def test_docker_create(self): - DockerRunner.docker_create(self.cls.step, 'alpine', 'c1', self.config) - self.assertEqual(self.cls.step['cmd_list'], [ - 'docker build alpine /path/to/build_dir > /dev/null', - 'docker create --name c1 --workdir /workspace -v /path/to/workspace:/workspace -v /var/run/docker.sock:/var/run/docker.sock alpine > /dev/null']) - - def test_docker_pull(self): - DockerRunner.docker_pull(self.cls.step, 'alpine', self.config.dry_run) - self.assertEqual(self.cls.step['cmd_list'], [ - 'docker build alpine /path/to/build_dir > /dev/null', - 'docker create --name c1 --workdir /workspace -v /path/to/workspace:/workspace -v /var/run/docker.sock:/var/run/docker.sock alpine > /dev/null', - 'docker pull alpine > /dev/null']) - - def test_docker_rm(self): - DockerRunner.docker_rm(self.step, 'c1', self.config.dry_run) - self.assertEqual(self.cls.step['cmd_list'], [ - 'docker build alpine /path/to/build_dir > /dev/null', - 'docker create --name c1 --workdir /workspace -v /path/to/workspace:/workspace -v /var/run/docker.sock:/var/run/docker.sock alpine > /dev/null', - 'docker pull alpine > /dev/null', - 'docker rm -f c1 || true > /dev/null']) - - def test_docker_start(self): - DockerRunner.docker_start(self.step, 'c1', self.config.dry_run) - self.assertEqual(self.cls.step['cmd_list'], [ - 'docker build alpine /path/to/build_dir > /dev/null', - 'docker create --name c1 --workdir /workspace -v /path/to/workspace:/workspace -v /var/run/docker.sock:/var/run/docker.sock alpine > /dev/null', - 'docker pull alpine > /dev/null', - 'docker rm -f c1 || true > /dev/null', - 'docker start --attach c1']) + def test_create_cmd(self): + config = {'workspace_dir': '/w'} + with DockerRunner(config=PopperConfig(**config)) as drunner: + step = {'args': ['-two', '-flags']} + cmd = drunner._create_cmd(step, 'foo:1.9', 'container_name') + + expected = ( + 'docker create' + ' --name container_name' + ' --workdir /workspace' + ' -v /w:/workspace' + ' -v /var/run/docker.sock:/var/run/docker.sock' + ' foo:1.9 -two -flags') + + self.assertEqual(expected, cmd) + + config_dict = { + 'engine': { + 'name': 'docker', + 'options': { + 'privileged': True, + 'hostname': 'popper.local', + 'domainname': 'www.example.org', + 'volumes': ['/path/in/host:/path/in/container'], + 'environment': {'FOO': 'bar'} + } + }, + 'resource_manager': { + 'name': 'slurm' + } + } + + config = {'workspace_dir': '/w', 'config_file': config_dict} + with DockerRunner(config=PopperConfig(**config)) as drunner: + step = {'args': ['-two', '-flags']} + cmd = drunner._create_cmd(step, 'foo:1.9', 'container_name') + + expected = ('docker create --name container_name ' + '--workdir /workspace ' + '-v /w:/workspace ' + '-v /var/run/docker.sock:/var/run/docker.sock ' + '-v /path/in/host:/path/in/container ' + '-e FOO=bar --privileged --hostname popper.local ' + '--domainname www.example.org ' + 'foo:1.9 -two -flags') + + self.assertEqual(expected, cmd) + + @replace('popper.runner_slurm.os.kill', mock_kill) + def test_run(self, mock_kill): + self.Popen.set_command( + 'sbatch --wait --job-name popper_1_123abc ' + '--output /tmp/popper/slurm/popper_1_123abc.out ' + '/tmp/popper/slurm/popper_1_123abc.sh', returncode=0) + + self.Popen.set_command( + 'tail -f /tmp/popper/slurm/popper_1_123abc.out', + returncode=0) + + config_dict = { + 'engine': { + 'name': 'docker', + 'options': { + 'privileged': True, + 'hostname': 'popper.local', + 'domainname': 'www.example.org', + 'volumes': ['/path/in/host:/path/in/container'], + 'environment': {'FOO': 'bar'} + } + }, + 'resource_manager': { + 'name': 'slurm' + } + } + + config = PopperConfig( + workspace_dir='/w', + config_file=config_dict) + config.wid = "123abc" + + with WorkflowRunner(config) as r: + wf = YMLWorkflow(""" + version: '1' + steps: + - uses: 'popperized/bin/sh@master' + runs: [cat] + args: README.md + """) + wf.parse() + r.run(wf) + + with open('/tmp/popper/slurm/popper_1_123abc.sh', 'r') as f: + content = f.read() + self.assertEqual(content, + f"""#!/bin/bash +docker rm -f popper_1_123abc || true +docker build -t popperized/bin:master {os.environ['HOME']}/.cache/popper/123abc/github.com/popperized/bin/sh +docker create --name popper_1_123abc --workdir /workspace --entrypoint cat -v /w:/workspace -v /var/run/docker.sock:/var/run/docker.sock -v /path/in/host:/path/in/container -e FOO=bar --privileged --hostname popper.local --domainname www.example.org popperized/bin:master README.md +docker start --attach popper_1_123abc""") diff --git a/cli/test/test_utils.py b/cli/test/test_utils.py index d14b81f63..a4d9fbd30 100644 --- a/cli/test/test_utils.py +++ b/cli/test/test_utils.py @@ -1,7 +1,4 @@ import unittest -import os - -from dotmap import DotMap from popper import utils as pu from popper.cli import log @@ -15,15 +12,6 @@ def setUp(self): def tearDown(self): log.setLevel('NOTSET') - def test_decode(self): - string = b'Hello from popper' - result = pu.decode(string) - self.assertIsInstance(result, str) - - string = 'Hello from popper' - result = pu.decode(string) - self.assertIsInstance(result, str) - def test_sanitized_name(self): name = "test action" santizied_name = pu.sanitized_name(name, '1234') @@ -37,24 +25,9 @@ def test_sanitized_name(self): santizied_name = pu.sanitized_name(name, '1234') self.assertEqual(santizied_name, "popper_test_action__1234") - def touch_file(self, path): - open(path, 'w').close() - - def test_setup_base_cache(self): - cache_dir = pu.setup_base_cache() - try: - self.assertEqual(cache_dir, os.environ['XDG_CACHE_HOME']) - except KeyError: - self.assertEqual( - cache_dir, - os.path.join( - os.environ['HOME'], - '.cache/popper')) - - os.environ['POPPER_CACHE_DIR'] = '/tmp/popper' - cache_dir = pu.setup_base_cache() - self.assertEqual(cache_dir, '/tmp/popper') - os.environ.pop('POPPER_CACHE_DIR') + def test_assert_executable_exists(self): + pu.assert_executable_exists('ls') + self.assertRaises(SystemExit, pu.assert_executable_exists, 'abcd') def test_of_type(self): param = [u"hello", u"world"] @@ -69,55 +42,15 @@ def test_of_type(self): } self.assertEqual(pu.of_type(param, ['str', 'dict']), True) - def test_write_file(self): - content = "Hello world" - pu.write_file('testfile1.txt', content) - pu.write_file('testfile2.txt') - with open('testfile1.txt', 'r') as f: - self.assertEqual(f.read(), "Hello world") - with open('testfile2.txt', 'r') as f: - self.assertEqual(f.read(), '') - os.remove('testfile1.txt') - os.remove('testfile2.txt') - - def test_load_config_file(self): - conf_content = """ -engine: - name: docker - options: - runtime: nvidia - """ - pu.write_file('settings.yml', conf_content) - config = pu.load_config_file('settings.yml') - self.assertTrue(config.get('engine'), True) - self.assertDictEqual( - config['engine']['options'], { - 'runtime': 'nvidia'}) - os.remove('settings.yml') - - def test_assert_executable_exists(self): - self.assertRaises(SystemExit, pu.assert_executable_exists, 'abcd') - - def test_select_not_none(self): - a = ["Hello", {}, None] - self.assertEqual(pu.select_not_none(a), "Hello") - - b = [DotMap(), "Hello", []] - self.assertEqual(pu.select_not_none(b), "Hello") - - def test_exec_cmd(self): - cmd = ["echo", "command_1"] - ecode, output = pu.exec_cmd(cmd, logging=False) - self.assertEqual(ecode, 0) - self.assertEqual(output, "command_1\n") - - ecode, output = pu.exec_cmd(cmd, logging=True) - self.assertEqual(ecode, 0) - self.assertEqual(output, "") - - pu.write_file("/tmp/test.py", "import os\nprint(os.environ['TEST'])") - cmd = ["python", "test.py"] - ecode, output = pu.exec_cmd( - cmd, env={'TEST': 'test'}, cwd="/tmp", logging=False) - self.assertEqual(ecode, 0) - self.assertEqual(output, "test\n") + def test_kv_to_flag(self): + self.assertEqual(pu.key_value_to_flag('x', 'a'), '-x a') + self.assertEqual(pu.key_value_to_flag('y', True), '-y') + self.assertEqual(pu.key_value_to_flag('y', False), '') + self.assertEqual(pu.key_value_to_flag('yy', True), '--yy') + self.assertEqual(pu.key_value_to_flag('zz', 'c'), '--zz c') + eq = True + self.assertEqual(pu.key_value_to_flag('x', 'a', eq), '-x=a') + self.assertEqual(pu.key_value_to_flag('y', True, eq), '-y=true') + self.assertEqual(pu.key_value_to_flag('y', False, eq), '-y=false') + self.assertEqual(pu.key_value_to_flag('yy', True, eq), '--yy=true') + self.assertEqual(pu.key_value_to_flag('zz', 'c', eq), '--zz=c') diff --git a/docs/sections/cn_workflows.md b/docs/sections/cn_workflows.md index a23aca513..9c3bde9c6 100644 --- a/docs/sections/cn_workflows.md +++ b/docs/sections/cn_workflows.md @@ -199,8 +199,9 @@ the exit code to set the workflow execution status, which can be ## Container Engines -By default, steps in Popper workflows run in Docker. In addition, -Popper can execute workflows in other runtimes by interacting with +By default, steps in Popper workflows run in Docker using the host machine +as the resource manager (see [next section](#resource-managers) on running on other resource managers). +In addition, Popper can execute workflows in other runtimes by interacting with their corresponding container engines. An `--engine ` flag for the `popper run` is used to invoke alternative engines, where `` is one of the supported engines. When no value for this @@ -290,3 +291,76 @@ command to specify custom options for the underlying engine in question (see [here][engconf] for more). [engconf]: /cli_features#customizing-container-engine-behavior + +## Resource Managers + +Popper can execute steps in a workflow through other resource managers +like `slurm` besides the host machine. The resource manager can be specified +either through the `--resource-manager/-r` option or through the config file. +If neither of them are provided, the steps are run in the host machine +by default. + +### Supported resource managers + +#### Slurm + +Popper workflows can be run on [HPC](https://en.wikipedia.org/wiki/HPC) (Multi-Node environments) +using [Slurm](https://slurm.schedmd.com/overview.html) as the underlying resource manager to distribute the execution of a step to +several nodes. You can get started with running Popper workflows through Slurm by following the example below. + +Let's consider a workflow `sample.yml` like the one shown below. +```yaml +version: '1' +steps: +- id: one + uses: docker://alpine:3.9 + args: echo hello-world + +- id: two + uses: popperized/bin/sh@master + args: ls -l +``` + +To run all the steps of the workflow through slurm resource manager, +use the `--resource-manager` or `-r` option of the `popper run` subcommand to specify the resource manager. + +```bash +popper run -f sample.yml -r slurm +``` + +To have more finer control on which steps to run through slurm resource manager, +the specifications can be provided through the config file as shown below. + +We create a config file called `config.yml` with the following contents. + +```yaml +engine: + name: docker + options: + privileged: True + hostname: example.local + +resource_manager: + name: slurm + options: + two: + nodes: 2 +``` + +Now, we execute `popper run` with this config file as follows: +```bash +popper run -f sample.yml -c config.yml +``` + +This runs the step `one` locally in the host and step `two` through slurm on 2 nodes. + +#### Host + +Popper executes the workflows by default using the `host` machine as the resource manager. So, when no resource manager is provided like the example below, the workflow runs on the local machine. + +```bash +popper run -f sample.yml +``` + +The above assumes `docker` as the container engine and `host` as the resource manager to be +used. diff --git a/docs/sections/contributing.md b/docs/sections/contributing.md index f6e9feda5..98b0dc2fa 100644 --- a/docs/sections/contributing.md +++ b/docs/sections/contributing.md @@ -33,7 +33,7 @@ git clone git@github.com:systemslab/popper cd popper # install popper from source -pip install -e cli +pip install -e cli/[dev] ``` The `-e` flag passed to `pip` tells it to install the package from the @@ -47,6 +47,21 @@ results of such modifications. > every time you open a new terminal window (`source` commmand), > otherwise the `popper` command will not be found by your shell. +## Running unittests locally + +To run the unittests on your local machine, we suggest the following +approach: + +```bash +cd popper/ + +# activate the virtualenv +source $HOME/venvs/popper/bin/activate + +# run the tests +ENGINE=docker python -X tracemalloc -m unittest discover -f cli/test/ +``` + ## Contributing CLI features To contribute new CLI features: