From 95cf942ed5544f6f7c798605520e51906069203d Mon Sep 17 00:00:00 2001 From: savin Date: Mon, 24 Jan 2022 16:19:17 -0800 Subject: [PATCH 1/2] Fix issue in Step Functions integration with CLI defined decorators --- .../aws/step_functions/step_functions.py | 49 +++++++++---------- metaflow/util.py | 3 +- 2 files changed, 23 insertions(+), 29 deletions(-) diff --git a/metaflow/plugins/aws/step_functions/step_functions.py b/metaflow/plugins/aws/step_functions/step_functions.py index 89f5dff05a5..e4c13f54a42 100644 --- a/metaflow/plugins/aws/step_functions/step_functions.py +++ b/metaflow/plugins/aws/step_functions/step_functions.py @@ -12,7 +12,7 @@ from metaflow.plugins import ResourcesDecorator, BatchDecorator, RetryDecorator from metaflow.parameters import deploy_time_eval from metaflow.decorators import flow_decorators -from metaflow.util import compress_list, dict_to_cli_options, to_pascalcase +from metaflow.util import compress_list, dict_to_cli_options, to_pascalcase, to_unicode from metaflow.metaflow_config import ( SFN_IAM_ROLE, EVENTS_SFN_ACCESS_IAM_ROLE, @@ -704,15 +704,33 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries): # Use AWS Batch job identifier as the globally unique task identifier. task_id = "${AWS_BATCH_JOB_ID}" - + top_opts_dict = { + "with": [ + decorator.make_decorator_spec() + for decorator in node.decorators + if not decorator.statically_defined + ] + } # FlowDecorators can define their own top-level options. They are # responsible for adding their own top-level options and values through # the get_top_level_options() hook. See similar logic in runtime.py. - top_opts_dict = {} for deco in flow_decorators(): top_opts_dict.update(deco.get_top_level_options()) + top_opts = list(dict_to_cli_options(top_opts_dict)) + top_level = top_opts + [ + "--quiet", + "--metadata=%s" % self.metadata.TYPE, + "--environment=%s" % self.environment.TYPE, + "--datastore=%s" % self.flow_datastore.TYPE, + "--datastore-root=%s" % self.flow_datastore.datastore_root, + "--event-logger=%s" % self.event_logger.logger_type, + "--monitor=%s" % self.monitor.monitor_type, + "--no-pylint", + "--with=step_functions_internal", + ] + if node.name == "start": # We need a separate unique ID for the special _parameters task task_id_params = "%s-params" % task_id @@ -729,18 +747,10 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries): ". `pwd`/%s" % param_file, ] ) - params = ( entrypoint - + top_opts + + top_level + [ - "--quiet", - "--metadata=%s" % self.metadata.TYPE, - "--environment=%s" % self.environment.TYPE, - "--datastore=s3", - "--event-logger=%s" % self.event_logger.logger_type, - "--monitor=%s" % self.monitor.monitor_type, - "--no-pylint", "init", "--run-id sfn-$METAFLOW_RUN_ID", "--task-id %s" % task_id_params, @@ -777,18 +787,6 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries): ) cmds.append(export_parent_tasks) - top_level = top_opts + [ - "--quiet", - "--metadata=%s" % self.metadata.TYPE, - "--environment=%s" % self.environment.TYPE, - "--datastore=%s" % self.flow_datastore.TYPE, - "--datastore-root=%s" % self.flow_datastore.datastore_root, - "--event-logger=%s" % self.event_logger.logger_type, - "--monitor=%s" % self.monitor.monitor_type, - "--no-pylint", - "--with=step_functions_internal", - ] - step = [ "step", node.name, @@ -799,9 +797,6 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries): "--retry-count $((AWS_BATCH_JOB_ATTEMPT-1))", "--max-user-code-retries %d" % user_code_retries, "--input-paths %s" % paths, - # Set decorator to batch to execute `task_*` hooks for batch - # decorator. - "--with=batch", ] if any(self.graph[n].type == "foreach" for n in node.in_funcs): # We set the `METAFLOW_SPLIT_INDEX` through JSONPath-foo diff --git a/metaflow/util.py b/metaflow/util.py index 14483206bd9..5dee0d1d533 100644 --- a/metaflow/util.py +++ b/metaflow/util.py @@ -314,8 +314,7 @@ def dict_to_cli_options(params): if k == "decospecs": k = "with" k = k.replace("_", "-") - if not isinstance(v, tuple): - v = [v] + v = v if isinstance(v, (list, tuple, set)) else [v] for value in v: yield "--%s" % k if not isinstance(value, bool): From 69985068b2557619031c8c61b42288d63f0cec75 Mon Sep 17 00:00:00 2001 From: savin Date: Mon, 24 Jan 2022 16:50:22 -0800 Subject: [PATCH 2/2] remove stray import --- metaflow/plugins/aws/step_functions/step_functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metaflow/plugins/aws/step_functions/step_functions.py b/metaflow/plugins/aws/step_functions/step_functions.py index e4c13f54a42..d1f7108c2da 100644 --- a/metaflow/plugins/aws/step_functions/step_functions.py +++ b/metaflow/plugins/aws/step_functions/step_functions.py @@ -12,7 +12,7 @@ from metaflow.plugins import ResourcesDecorator, BatchDecorator, RetryDecorator from metaflow.parameters import deploy_time_eval from metaflow.decorators import flow_decorators -from metaflow.util import compress_list, dict_to_cli_options, to_pascalcase, to_unicode +from metaflow.util import compress_list, dict_to_cli_options, to_pascalcase from metaflow.metaflow_config import ( SFN_IAM_ROLE, EVENTS_SFN_ACCESS_IAM_ROLE,