Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check for wrong task CLI arguments. #201

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions mlcube/mlcube/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ def parser_process(value: t.Text, state: click.parser.ParsingState):


def _parse_cli_args(ctx: t.Optional[click.core.Context], mlcube: t.Text, platform: t.Optional[t.Text],
workspace: t.Optional[t.Text],
workspace: t.Optional[t.Text], tasks: t.Optional[t.List[t.Text]],
resolve: bool) -> t.Tuple[t.Optional[t.Type[Runner]], DictConfig]:
"""
Args:
ctx: Click context. We need this to get access to extra CLI arguments.
mlcube: Path to MLCube root directory or mlcube.yaml file.
platform: Platform to use to run this MLCube (docker, singularity, gcp, k8s etc).
workspace: Workspace path to use. If not specified, default workspace inside MLCube directory is used.
tasks: List of tasks to be executed.
resolve: if True, compute values in MLCube configuration.
"""
mlcube_inst: MLCubeDirectory = CliParser.parse_mlcube_arg(mlcube)
Expand All @@ -75,7 +76,7 @@ def _parse_cli_args(ctx: t.Optional[click.core.Context], mlcube: t.Text, platfor
runner_cls, runner_config = None, None
mlcube_config = MLCubeConfig.create_mlcube_config(
os.path.join(mlcube_inst.path, mlcube_inst.file), mlcube_cli_args, task_cli_args, runner_config, workspace,
resolve=resolve, runner_cls=runner_cls
tasks=tasks, resolve=resolve, runner_cls=runner_cls
)
return runner_cls, mlcube_config

Expand Down Expand Up @@ -132,7 +133,7 @@ def show_config(ctx: click.core.Context, mlcube: t.Text, platform: t.Text, works
workspace: Workspace path to use. If not specified, default workspace inside MLCube directory is used.
resolve: if True, compute values in MLCube configuration.
"""
_, mlcube_config = _parse_cli_args(ctx, mlcube, platform, workspace, resolve)
_, mlcube_config = _parse_cli_args(ctx, mlcube, platform, workspace, tasks=None, resolve=resolve)
print(OmegaConf.to_yaml(mlcube_config))


Expand All @@ -152,8 +153,8 @@ def run(ctx: click.core.Context, mlcube: t.Text, platform: t.Text, task: t.Text,
task: Comma separated list of tasks to run.
workspace: Workspace path to use. If not specified, default workspace inside MLCube directory is used.
"""
runner_cls, mlcube_config = _parse_cli_args(ctx, mlcube, platform, workspace, resolve=True)
tasks: t.List[str] = CliParser.parse_list_arg(task, default='main')
tasks: t.List[t.Text] = CliParser.parse_list_arg(task, default='main')
runner_cls, mlcube_config = _parse_cli_args(ctx, mlcube, platform, workspace, tasks, resolve=True)
for task in tasks:
docker_runner = runner_cls(mlcube_config, task=task)
docker_runner.run()
Expand All @@ -162,7 +163,7 @@ def run(ctx: click.core.Context, mlcube: t.Text, platform: t.Text, task: t.Text,
@cli.command(name='describe', help='Describe MLCube.')
@mlcube_option
def describe(mlcube: t.Text) -> None:
_, mlcube_config = _parse_cli_args(None, mlcube, None, None, resolve=True)
_, mlcube_config = _parse_cli_args(ctx=None, mlcube=mlcube, platform=None, workspace=None, tasks=None, resolve=True)
print(f"MLCube")
print(f" path = {mlcube_config.runtime.root}")
print(f" name = {mlcube_config.name}:{mlcube_config.get('version', 'latest')}")
Expand Down
66 changes: 57 additions & 9 deletions mlcube/mlcube/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import typing as t
from omegaconf import (OmegaConf, DictConfig)
from mlcube.errors import ConfigurationError
from mlcube.runner import Runner

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -48,30 +49,30 @@ def get_uri(value: t.Text) -> t.Text:
@staticmethod
def create_mlcube_config(mlcube_config_file: t.Text, mlcube_cli_args: t.Optional[DictConfig] = None,
task_cli_args: t.Optional[t.Dict] = None, runner_config: t.Optional[DictConfig] = None,
workspace: t.Optional[t.Text] = None, resolve: bool = True,
runner_cls: t.Optional[t.Type[Runner]] = None) -> DictConfig:
workspace: t.Optional[t.Text] = None, tasks: t.Optional[t.List[t.Text]] = None,
resolve: bool = True, runner_cls: t.Optional[t.Type[Runner]] = None) -> DictConfig:
""" Create MLCube mlcube merging different configs - base, global, local and cli.
Args:
mlcube_config_file: Path to mlcube.yaml file.
mlcube_cli_args: MLCube mlcube from command line.
task_cli_args: Task parameters from command line.
runner_config: MLCube runner configuration, usually comes from system settings file.
workspace: Workspace path to use in this MLCube run.
tasks: List of tasks to be executed. If empty or None, consider all tasks in MLCube config.
resolve: If true, compute all values (some of them may reference other parameters or environmental
variables).
runner_cls: A python class for the runner type specified in `runner_config`.
"""
if mlcube_cli_args is None:
mlcube_cli_args = OmegaConf.create({})
if task_cli_args is None:
task_cli_args = {}
if runner_config is None:
runner_config = OmegaConf.create({})
logger.debug("mlcube_config_file = %s", mlcube_config_file)
logger.debug("mlcube_cli_args = %s", mlcube_cli_args)
logger.debug("task_cli_args = %s", task_cli_args)
logger.debug("runner_config = %s", str(runner_config))
logger.debug("workspace = %s", workspace)
logger.debug("tasks = %s", tasks)

# Load MLCube configuration and maybe override parameters from command line (like -Pdocker.build_strategy=...).
actual_workspace = '${runtime.root}/workspace' if workspace is None else MLCubeConfig.get_uri(workspace)
Expand Down Expand Up @@ -99,26 +100,49 @@ def create_mlcube_config(mlcube_config_file: t.Text, mlcube_cli_args: t.Optional
if runner_cls:
runner_cls.CONFIG.validate(mlcube_config)

# TODO: This needs some discussion. Originally, one task was supposed to run at a time. Now, we seem to converge
# to support lists of tasks. Current implementation continues to use old format of task parameters, i.e.
# `param_name=param_value`. This may result in an unexpected behavior when many tasks run, so we should think
# about a different rule: `task_name.param_name=param_value`. This is similar to parameter specification in
# DVC.

# We will iterate over all tasks and make them representation canonical, but will use only tasks to run to
# check if users provided unrecognized parameters.
task_cli_args = task_cli_args or dict() # Dictionary of task arguments from a CL.
tasks_to_run = tasks or list(mlcube_config.tasks.keys()) # These tasks need to run later.
overridden_parameters = set() # Set of parameters from task_cli_args that were used.
for task_name in mlcube_config.tasks.keys():
[task] = MLCubeConfig.ensure_values_exist(mlcube_config.tasks, task_name, dict)
[parameters] = MLCubeConfig.ensure_values_exist(task, 'parameters', dict)
[inputs, outputs] = MLCubeConfig.ensure_values_exist(parameters, ['inputs', 'outputs'], dict)

MLCubeConfig.check_parameters(inputs, task_cli_args)
MLCubeConfig.check_parameters(outputs, task_cli_args)
overridden_inputs = MLCubeConfig.check_parameters(inputs, task_cli_args)
overridden_outputs = MLCubeConfig.check_parameters(outputs, task_cli_args)

if task_name in tasks_to_run:
overridden_parameters.update(overridden_inputs)
overridden_parameters.update(overridden_outputs)

unknown_task_cli_args = set([name for name in task_cli_args if name not in overridden_parameters])
if unknown_task_cli_args:
MLCubeConfig.report_unknown_task_cli_args(task_cli_args, unknown_task_cli_args)
raise ConfigurationError(f"Unknown task CLI arguments: {unknown_task_cli_args}")

if resolve:
OmegaConf.resolve(mlcube_config)
return mlcube_config

@staticmethod
def check_parameters(parameters: DictConfig, task_cli_args: t.Dict) -> None:
def check_parameters(parameters: DictConfig, task_cli_args: t.Dict) -> t.Set:
""" Check that task parameters are defined according to MLCube schema.
Args:
parameters: Task parameters (`inputs` or `outputs`).
task_cli_args: Task parameters from command line.
task_cli_args: Task parameters that a user provided on a command line.
Returns:
Set of parameters that were overridden using task parameters on a command line.
This function does not set `type` of parameters (if not present) in all cases.
"""
overridden_parameters = set()
for name in parameters.keys():
# The `_param_name` is anyway there, so check it's not None.
[param_def] = MLCubeConfig.ensure_values_exist(parameters, name, dict)
Expand All @@ -137,7 +161,9 @@ def check_parameters(parameters: DictConfig, task_cli_args: t.Dict) -> None:
if param_def.type == ParameterType.UNKNOWN and param_def.default.endswith(os.sep):
param_def.type = ParameterType.DIRECTORY
# See if there is value on a command line
param_def.default = task_cli_args.get(name, param_def.default)
if name in task_cli_args:
param_def.default = task_cli_args.get(name)
overridden_parameters.add(name)
# Check again parameter type. Users in certain number of cases will not be providing final slash on a
# command line for directories, so we tried to infer types above using default values. Just in case, see
# if we can do the same with user-provided values.
Expand All @@ -150,3 +176,25 @@ def check_parameters(parameters: DictConfig, task_cli_args: t.Dict) -> None:

# It probably does not make too much sense to see, let's say, if an input parameter exists and set its
# type at this moment, because MLCube can run on remote hosts.
return overridden_parameters

@staticmethod
def check_task_cli_args(tasks: DictConfig, task_cli_args: t.Dict) -> None:
"""Find any unknown task CLI arguments and report error.
Args:
tasks: Dictionary of task definitions.
task_cli_args: Dictionary of task parameters that a user provided on a command line.
Raises:
ConfigurationError if at least one task CLI argument is not recognized.
"""

@staticmethod
def report_unknown_task_cli_args(task_cli_args: t.Dict, unknown_task_cli_args: t.Set) -> None:
"""Task CLI argument (s) has not been recognized, report this error.
Args:
task_cli_args: Dictionary of all task CLI arguments.
unknown_task_cli_args: Arguments that have not been used (recognized).
"""
print("The following task CLI arguments have not been used:")
for arg in unknown_task_cli_args:
print(f"\t{arg} = {task_cli_args[arg]}")