Skip to content

Commit

Permalink
Fix issue in Step Functions integration with CLI defined decorators (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
savingoyal authored Jan 25, 2022
1 parent 44f1ae7 commit 885810a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 28 deletions.
47 changes: 21 additions & 26 deletions metaflow/plugins/aws/step_functions/step_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,15 +704,33 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):

# Use AWS Batch job identifier as the globally unique task identifier.
task_id = "${AWS_BATCH_JOB_ID}"

top_opts_dict = {
"with": [
decorator.make_decorator_spec()
for decorator in node.decorators
if not decorator.statically_defined
]
}
# FlowDecorators can define their own top-level options. They are
# responsible for adding their own top-level options and values through
# the get_top_level_options() hook. See similar logic in runtime.py.
top_opts_dict = {}
for deco in flow_decorators():
top_opts_dict.update(deco.get_top_level_options())

top_opts = list(dict_to_cli_options(top_opts_dict))

top_level = top_opts + [
"--quiet",
"--metadata=%s" % self.metadata.TYPE,
"--environment=%s" % self.environment.TYPE,
"--datastore=%s" % self.flow_datastore.TYPE,
"--datastore-root=%s" % self.flow_datastore.datastore_root,
"--event-logger=%s" % self.event_logger.logger_type,
"--monitor=%s" % self.monitor.monitor_type,
"--no-pylint",
"--with=step_functions_internal",
]

if node.name == "start":
# We need a separate unique ID for the special _parameters task
task_id_params = "%s-params" % task_id
Expand All @@ -729,18 +747,10 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
". `pwd`/%s" % param_file,
]
)

params = (
entrypoint
+ top_opts
+ top_level
+ [
"--quiet",
"--metadata=%s" % self.metadata.TYPE,
"--environment=%s" % self.environment.TYPE,
"--datastore=s3",
"--event-logger=%s" % self.event_logger.logger_type,
"--monitor=%s" % self.monitor.monitor_type,
"--no-pylint",
"init",
"--run-id sfn-$METAFLOW_RUN_ID",
"--task-id %s" % task_id_params,
Expand Down Expand Up @@ -777,18 +787,6 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
)
cmds.append(export_parent_tasks)

top_level = top_opts + [
"--quiet",
"--metadata=%s" % self.metadata.TYPE,
"--environment=%s" % self.environment.TYPE,
"--datastore=%s" % self.flow_datastore.TYPE,
"--datastore-root=%s" % self.flow_datastore.datastore_root,
"--event-logger=%s" % self.event_logger.logger_type,
"--monitor=%s" % self.monitor.monitor_type,
"--no-pylint",
"--with=step_functions_internal",
]

step = [
"step",
node.name,
Expand All @@ -799,9 +797,6 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
"--retry-count $((AWS_BATCH_JOB_ATTEMPT-1))",
"--max-user-code-retries %d" % user_code_retries,
"--input-paths %s" % paths,
# Set decorator to batch to execute `task_*` hooks for batch
# decorator.
"--with=batch",
]
if any(self.graph[n].type == "foreach" for n in node.in_funcs):
# We set the `METAFLOW_SPLIT_INDEX` through JSONPath-foo
Expand Down
3 changes: 1 addition & 2 deletions metaflow/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,7 @@ def dict_to_cli_options(params):
if k == "decospecs":
k = "with"
k = k.replace("_", "-")
if not isinstance(v, tuple):
v = [v]
v = v if isinstance(v, (list, tuple, set)) else [v]
for value in v:
yield "--%s" % k
if not isinstance(value, bool):
Expand Down

0 comments on commit 885810a

Please sign in to comment.