diff --git a/metaflow/cli.py b/metaflow/cli.py index 00b0f95f08e..4059df21ae1 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -10,7 +10,8 @@ from . import decorators, lint, metaflow_version, parameters, plugins from .cli_args import cli_args from .cli_components.utils import LazyGroup, LazyPluginCommandCollection -from .datastore import FlowDataStore +from .datastore import FlowDataStore, TaskDataStoreSet +from .debug import debug from .exception import CommandException, MetaflowException from .flowspec import _FlowState from .graph import FlowGraph @@ -35,7 +36,7 @@ ) from .pylint_wrapper import PyLint from .R import metaflow_r_version, use_r -from .util import resolve_identity +from .util import get_latest_run_id, resolve_identity from .user_configs.config_options import LocalFileInput, config_options from .user_configs.config_parameters import ConfigValue @@ -346,6 +347,33 @@ def start( echo(" executing *%s*" % ctx.obj.flow.name, fg="magenta", nl=False) echo(" for *%s*" % resolve_identity(), fg="magenta") + # Setup the context + cli_args._set_top_kwargs(ctx.params) + ctx.obj.echo = echo + ctx.obj.echo_always = echo_always + ctx.obj.is_quiet = quiet + ctx.obj.logger = logger + ctx.obj.pylint = pylint + ctx.obj.check = functools.partial(_check, echo) + ctx.obj.top_cli = cli + ctx.obj.package_suffixes = package_suffixes.split(",") + + ctx.obj.datastore_impl = [d for d in DATASTORES if d.TYPE == datastore][0] + + if datastore_root is None: + datastore_root = ctx.obj.datastore_impl.get_datastore_root_from_config( + ctx.obj.echo + ) + if datastore_root is None: + raise CommandException( + "Could not find the location of the datastore -- did you correctly set the " + "METAFLOW_DATASTORE_SYSROOT_%s environment variable?" % datastore.upper() + ) + + ctx.obj.datastore_impl.datastore_root = datastore_root + + FlowDataStore.default_storage_impl = ctx.obj.datastore_impl + # At this point, we are able to resolve the user-configuration options so we can # process all those decorators that the user added that will modify the flow based # on those configurations. It is important to do this as early as possible since it @@ -355,20 +383,75 @@ def start( # second one processed will return the actual options. The order of processing # depends on what (and in what order) the user specifies on the command line. config_options = config_file or config_value + + if ( + hasattr(ctx, "saved_args") + and ctx.saved_args + and ctx.saved_args[0] == "resume" + and getattr(ctx.obj, "has_config_options", False) + ): + # In the case of resume, we actually need to load the configurations + # from the resumed run to process them. This can be slightly onerous so check + # if we need to in the first place + if getattr(ctx.obj, "has_cl_config_options", False): + raise click.UsageError( + "Cannot specify --config-file or --config-value with 'resume'" + ) + # We now load the config artifacts from the original run id + run_id = None + try: + idx = ctx.saved_args.index("--origin-run-id") + except ValueError: + idx = -1 + if idx >= 0: + run_id = ctx.saved_args[idx + 1] + else: + run_id = get_latest_run_id(ctx.obj.echo, ctx.obj.flow.name) + if run_id is None: + raise CommandException( + "A previous run id was not found. Specify --origin-run-id." + ) + # We get the name of the parameters we need to load from the datastore -- these + # are accessed using the *variable* name and not necessarily the *parameter* name + config_var_names = [] + config_param_names = [] + for name, param in ctx.obj.flow._get_parameters(): + if not param.IS_CONFIG_PARAMETER: + continue + config_var_names.append(name) + config_param_names.append(param.name) + + # We just need a task datastore that will be thrown away -- we do this so + # we don't have to create the logger, monitor, etc. + debug.userconf_exec("Loading config parameters from run %s" % run_id) + for d in TaskDataStoreSet( + FlowDataStore(ctx.obj.flow.name), + run_id, + steps=["_parameters"], + prefetch_data_artifacts=config_var_names, + ): + param_ds = d + + # We can now set the the CONFIGS value in the flow properly. This will overwrite + # anything that may have been passed in by default and we will use exactly what + # the original flow had. Note that these are accessed through the parameter name + ctx.obj.flow._flow_state[_FlowState.CONFIGS].clear() + d = ctx.obj.flow._flow_state[_FlowState.CONFIGS] + for param_name, var_name in zip(config_param_names, config_var_names): + val = param_ds[var_name] + debug.userconf_exec("Loaded config %s as: %s" % (param_name, val)) + d[param_name] = val + + elif getattr(ctx.obj, "delayed_config_exception", None): + # If we are not doing a resume, any exception we had parsing configs needs to + # be raised. For resume, since we ignore those options, we ignore the error. + raise ctx.obj.delayed_config_exception + new_cls = ctx.obj.flow._process_config_decorators(config_options) if new_cls: ctx.obj.flow = new_cls(use_cli=False) - cli_args._set_top_kwargs(ctx.params) - ctx.obj.echo = echo - ctx.obj.echo_always = echo_always - ctx.obj.is_quiet = quiet ctx.obj.graph = ctx.obj.flow._graph - ctx.obj.logger = logger - ctx.obj.pylint = pylint - ctx.obj.check = functools.partial(_check, echo) - ctx.obj.top_cli = cli - ctx.obj.package_suffixes = package_suffixes.split(",") ctx.obj.environment = [ e for e in ENVIRONMENTS + [MetaflowEnvironment] if e.TYPE == environment @@ -391,21 +474,6 @@ def start( ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor ) - ctx.obj.datastore_impl = [d for d in DATASTORES if d.TYPE == datastore][0] - - if datastore_root is None: - datastore_root = ctx.obj.datastore_impl.get_datastore_root_from_config( - ctx.obj.echo - ) - if datastore_root is None: - raise CommandException( - "Could not find the location of the datastore -- did you correctly set the " - "METAFLOW_DATASTORE_SYSROOT_%s environment variable?" % datastore.upper() - ) - - ctx.obj.datastore_impl.datastore_root = datastore_root - - FlowDataStore.default_storage_impl = ctx.obj.datastore_impl ctx.obj.flow_datastore = FlowDataStore( ctx.obj.flow.name, ctx.obj.environment, diff --git a/metaflow/datastore/flow_datastore.py b/metaflow/datastore/flow_datastore.py index d6e7ba6b748..16318ed7693 100644 --- a/metaflow/datastore/flow_datastore.py +++ b/metaflow/datastore/flow_datastore.py @@ -13,7 +13,7 @@ class FlowDataStore(object): def __init__( self, flow_name, - environment, + environment=None, metadata=None, event_logger=None, monitor=None, @@ -31,7 +31,7 @@ def __init__( ---------- flow_name : str The name of the flow - environment : MetaflowEnvironment + environment : MetaflowEnvironment, optional Environment this datastore is operating in metadata : MetadataProvider, optional The metadata provider to use and update if needed, by default None diff --git a/metaflow/user_configs/config_options.py b/metaflow/user_configs/config_options.py index db961c24046..e5b5e794dbe 100644 --- a/metaflow/user_configs/config_options.py +++ b/metaflow/user_configs/config_options.py @@ -177,6 +177,7 @@ def process_configs( param_value: Dict[str, Optional[str]], quiet: bool, datastore: str, + click_obj: Optional[Any] = None, ): from ..cli import echo_always, echo_dev_null # Prevent circular import from ..flowspec import _FlowState # Prevent circular import @@ -242,6 +243,9 @@ def process_configs( # would cause an issue -- we can ignore those as the kv. values should trump # everything else. all_keys = set(self._value_values).union(self._path_values) + + if all_keys and click_obj: + click_obj.has_cl_config_options = True # Make sure we have at least some keys (ie: some non default values) has_all_kv = all_keys and all( self._value_values.get(k, "").startswith(_CONVERT_PREFIX + "kv.") @@ -257,10 +261,14 @@ def process_configs( [k for k, v in self._path_values.items()] or [] ) if common_keys: - raise click.UsageError( + exc = click.UsageError( "Cannot provide both a value and a file for the same configuration. " "Found such values for '%s'" % "', '".join(common_keys) ) + if click_obj: + click_obj.delayed_config_exception = exc + return None + raise exc all_values = dict(self._path_values) all_values.update(self._value_values) @@ -298,6 +306,7 @@ def process_configs( else: debug.userconf_exec("Fast path due to pre-processed values") merged_configs = self._value_values + click_obj.has_config_options = True debug.userconf_exec("Configs merged with defaults: %s" % str(merged_configs)) missing_configs = set() @@ -319,9 +328,13 @@ def process_configs( # This means to load it from a file read_value = self.get_config(val[3:]) if read_value is None: - raise click.UsageError( + exc = click.UsageError( "Could not find configuration '%s' in INFO file" % val ) + if click_obj: + click_obj.delayed_config_exception = exc + return None + raise exc flow_cls._flow_state[_FlowState.CONFIGS][name] = read_value to_return[name] = ConfigValue(read_value) else: @@ -354,9 +367,13 @@ def process_configs( % (merged_configs[missing][len(_CONVERTED_DEFAULT_NO_FILE) :], missing) ) if msgs: - raise click.UsageError( + exc = click.UsageError( "Bad values passed for configuration options: %s" % ", ".join(msgs) ) + if click_obj: + click_obj.delayed_config_exception = exc + return None + raise exc debug.userconf_exec("Finalized configs: %s" % str(to_return)) return to_return @@ -368,6 +385,7 @@ def process_configs_click(self, ctx, param, value): value, ctx.params["quiet"], ctx.params["datastore"], + click_obj=ctx.obj, ) def __str__(self): @@ -453,7 +471,7 @@ def config_options_with_config_input(cmd): help_str = ( "Configuration options for the flow. " - "Multiple configurations can be specified." + "Multiple configurations can be specified. Cannot be used with resume." ) help_str = "\n\n".join([help_str] + help_strs) config_input = ConfigInput(required_names, defaults, parsers)