Skip to content

Commit

Permalink
Disable configs for resume and load old configs
Browse files Browse the repository at this point in the history
  • Loading branch information
romain-intel committed Dec 18, 2024
1 parent 79a5653 commit 69fa5e9
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 32 deletions.
120 changes: 94 additions & 26 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from . import decorators, lint, metaflow_version, parameters, plugins
from .cli_args import cli_args
from .cli_components.utils import LazyGroup, LazyPluginCommandCollection
from .datastore import FlowDataStore
from .datastore import FlowDataStore, TaskDataStoreSet
from .debug import debug
from .exception import CommandException, MetaflowException
from .flowspec import _FlowState
from .graph import FlowGraph
Expand All @@ -35,7 +36,7 @@
)
from .pylint_wrapper import PyLint
from .R import metaflow_r_version, use_r
from .util import resolve_identity
from .util import get_latest_run_id, resolve_identity
from .user_configs.config_options import LocalFileInput, config_options
from .user_configs.config_parameters import ConfigValue

Expand Down Expand Up @@ -346,6 +347,33 @@ def start(
echo(" executing *%s*" % ctx.obj.flow.name, fg="magenta", nl=False)
echo(" for *%s*" % resolve_identity(), fg="magenta")

# Setup the context
cli_args._set_top_kwargs(ctx.params)
ctx.obj.echo = echo
ctx.obj.echo_always = echo_always
ctx.obj.is_quiet = quiet
ctx.obj.logger = logger
ctx.obj.pylint = pylint
ctx.obj.check = functools.partial(_check, echo)
ctx.obj.top_cli = cli
ctx.obj.package_suffixes = package_suffixes.split(",")

ctx.obj.datastore_impl = [d for d in DATASTORES if d.TYPE == datastore][0]

if datastore_root is None:
datastore_root = ctx.obj.datastore_impl.get_datastore_root_from_config(
ctx.obj.echo
)
if datastore_root is None:
raise CommandException(
"Could not find the location of the datastore -- did you correctly set the "
"METAFLOW_DATASTORE_SYSROOT_%s environment variable?" % datastore.upper()
)

ctx.obj.datastore_impl.datastore_root = datastore_root

FlowDataStore.default_storage_impl = ctx.obj.datastore_impl

# At this point, we are able to resolve the user-configuration options so we can
# process all those decorators that the user added that will modify the flow based
# on those configurations. It is important to do this as early as possible since it
Expand All @@ -355,20 +383,75 @@ def start(
# second one processed will return the actual options. The order of processing
# depends on what (and in what order) the user specifies on the command line.
config_options = config_file or config_value

if (
hasattr(ctx, "saved_args")
and ctx.saved_args
and ctx.saved_args[0] == "resume"
and getattr(ctx.obj, "has_config_options", False)
):
# In the case of resume, we actually need to load the configurations
# from the resumed run to process them. This can be slightly onerous so check
# if we need to in the first place
if getattr(ctx.obj, "has_cl_config_options", False):
raise click.UsageError(
"Cannot specify --config-file or --config-value with 'resume'"
)
# We now load the config artifacts from the original run id
run_id = None
try:
idx = ctx.saved_args.index("--origin-run-id")
except ValueError:
idx = -1
if idx >= 0:
run_id = ctx.saved_args[idx + 1]
else:
run_id = get_latest_run_id(ctx.obj.echo, ctx.obj.flow.name)
if run_id is None:
raise CommandException(
"A previous run id was not found. Specify --origin-run-id."
)
# We get the name of the parameters we need to load from the datastore -- these
# are accessed using the *variable* name and not necessarily the *parameter* name
config_var_names = []
config_param_names = []
for name, param in ctx.obj.flow._get_parameters():
if not param.IS_CONFIG_PARAMETER:
continue
config_var_names.append(name)
config_param_names.append(param.name)

# We just need a task datastore that will be thrown away -- we do this so
# we don't have to create the logger, monitor, etc.
debug.userconf_exec("Loading config parameters from run %s" % run_id)
for d in TaskDataStoreSet(
FlowDataStore(ctx.obj.flow.name),
run_id,
steps=["_parameters"],
prefetch_data_artifacts=config_var_names,
):
param_ds = d

# We can now set the the CONFIGS value in the flow properly. This will overwrite
# anything that may have been passed in by default and we will use exactly what
# the original flow had. Note that these are accessed through the parameter name
ctx.obj.flow._flow_state[_FlowState.CONFIGS].clear()
d = ctx.obj.flow._flow_state[_FlowState.CONFIGS]
for param_name, var_name in zip(config_param_names, config_var_names):
val = param_ds[var_name]
debug.userconf_exec("Loaded config %s as: %s" % (param_name, val))
d[param_name] = val

elif getattr(ctx.obj, "delayed_config_exception", None):
# If we are not doing a resume, any exception we had parsing configs needs to
# be raised. For resume, since we ignore those options, we ignore the error.
raise ctx.obj.delayed_config_exception

new_cls = ctx.obj.flow._process_config_decorators(config_options)
if new_cls:
ctx.obj.flow = new_cls(use_cli=False)

cli_args._set_top_kwargs(ctx.params)
ctx.obj.echo = echo
ctx.obj.echo_always = echo_always
ctx.obj.is_quiet = quiet
ctx.obj.graph = ctx.obj.flow._graph
ctx.obj.logger = logger
ctx.obj.pylint = pylint
ctx.obj.check = functools.partial(_check, echo)
ctx.obj.top_cli = cli
ctx.obj.package_suffixes = package_suffixes.split(",")

ctx.obj.environment = [
e for e in ENVIRONMENTS + [MetaflowEnvironment] if e.TYPE == environment
Expand All @@ -391,21 +474,6 @@ def start(
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
)

ctx.obj.datastore_impl = [d for d in DATASTORES if d.TYPE == datastore][0]

if datastore_root is None:
datastore_root = ctx.obj.datastore_impl.get_datastore_root_from_config(
ctx.obj.echo
)
if datastore_root is None:
raise CommandException(
"Could not find the location of the datastore -- did you correctly set the "
"METAFLOW_DATASTORE_SYSROOT_%s environment variable?" % datastore.upper()
)

ctx.obj.datastore_impl.datastore_root = datastore_root

FlowDataStore.default_storage_impl = ctx.obj.datastore_impl
ctx.obj.flow_datastore = FlowDataStore(
ctx.obj.flow.name,
ctx.obj.environment,
Expand Down
4 changes: 2 additions & 2 deletions metaflow/datastore/flow_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class FlowDataStore(object):
def __init__(
self,
flow_name,
environment,
environment=None,
metadata=None,
event_logger=None,
monitor=None,
Expand All @@ -31,7 +31,7 @@ def __init__(
----------
flow_name : str
The name of the flow
environment : MetaflowEnvironment
environment : MetaflowEnvironment, optional
Environment this datastore is operating in
metadata : MetadataProvider, optional
The metadata provider to use and update if needed, by default None
Expand Down
26 changes: 22 additions & 4 deletions metaflow/user_configs/config_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def process_configs(
param_value: Dict[str, Optional[str]],
quiet: bool,
datastore: str,
click_obj: Optional[Any] = None,
):
from ..cli import echo_always, echo_dev_null # Prevent circular import
from ..flowspec import _FlowState # Prevent circular import
Expand Down Expand Up @@ -242,6 +243,9 @@ def process_configs(
# would cause an issue -- we can ignore those as the kv. values should trump
# everything else.
all_keys = set(self._value_values).union(self._path_values)

if all_keys and click_obj:
click_obj.has_cl_config_options = True
# Make sure we have at least some keys (ie: some non default values)
has_all_kv = all_keys and all(
self._value_values.get(k, "").startswith(_CONVERT_PREFIX + "kv.")
Expand All @@ -257,10 +261,14 @@ def process_configs(
[k for k, v in self._path_values.items()] or []
)
if common_keys:
raise click.UsageError(
exc = click.UsageError(
"Cannot provide both a value and a file for the same configuration. "
"Found such values for '%s'" % "', '".join(common_keys)
)
if click_obj:
click_obj.delayed_config_exception = exc
return None
raise exc

all_values = dict(self._path_values)
all_values.update(self._value_values)
Expand Down Expand Up @@ -298,6 +306,7 @@ def process_configs(
else:
debug.userconf_exec("Fast path due to pre-processed values")
merged_configs = self._value_values
click_obj.has_config_options = True
debug.userconf_exec("Configs merged with defaults: %s" % str(merged_configs))

missing_configs = set()
Expand All @@ -319,9 +328,13 @@ def process_configs(
# This means to load it from a file
read_value = self.get_config(val[3:])
if read_value is None:
raise click.UsageError(
exc = click.UsageError(
"Could not find configuration '%s' in INFO file" % val
)
if click_obj:
click_obj.delayed_config_exception = exc
return None
raise exc
flow_cls._flow_state[_FlowState.CONFIGS][name] = read_value
to_return[name] = ConfigValue(read_value)
else:
Expand Down Expand Up @@ -354,9 +367,13 @@ def process_configs(
% (merged_configs[missing][len(_CONVERTED_DEFAULT_NO_FILE) :], missing)
)
if msgs:
raise click.UsageError(
exc = click.UsageError(
"Bad values passed for configuration options: %s" % ", ".join(msgs)
)
if click_obj:
click_obj.delayed_config_exception = exc
return None
raise exc

debug.userconf_exec("Finalized configs: %s" % str(to_return))
return to_return
Expand All @@ -368,6 +385,7 @@ def process_configs_click(self, ctx, param, value):
value,
ctx.params["quiet"],
ctx.params["datastore"],
click_obj=ctx.obj,
)

def __str__(self):
Expand Down Expand Up @@ -453,7 +471,7 @@ def config_options_with_config_input(cmd):

help_str = (
"Configuration options for the flow. "
"Multiple configurations can be specified."
"Multiple configurations can be specified. Cannot be used with resume."
)
help_str = "\n\n".join([help_str] + help_strs)
config_input = ConfigInput(required_names, defaults, parsers)
Expand Down

0 comments on commit 69fa5e9

Please sign in to comment.