Skip to content

Commit

Permalink
Removing un-used code paths + code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
valayDave committed Apr 11, 2022
1 parent 7425f62 commit 9e622ba
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 66 deletions.
110 changes: 44 additions & 66 deletions metaflow/plugins/airflow/airflow_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,20 +273,22 @@ def _to_job(self, node: DAGNode):
],
"step_name": node.name,
}
# currently this code is commented because these conditions are not required
# since foreach's are not supported ATM
# Making the key conditions to check into human readable variables so
# if statements make sense when reading logic
# successors_are_foreach_joins = any(
# self.graph[n].type == "join"
# and self.graph[self.graph[n].split_parents[-1]].type == "foreach"
# for n in node.out_funcs
# )
# any_incoming_node_is_foreach = any(
# self.graph[n].type == "foreach" for n in node.in_funcs
# )
# join_in_foreach = (
# node.type == "join" and self.graph[node.split_parents[-1]].type == "foreach"
# )
is_a_foreach = node.type == "foreach"
successors_are_foreach_joins = any(
self.graph[n].type == "join"
and self.graph[self.graph[n].split_parents[-1]].type == "foreach"
for n in node.out_funcs
)
join_in_foreach = (
node.type == "join" and self.graph[node.split_parents[-1]].type == "foreach"
)
any_incoming_node_is_foreach = any(
self.graph[n].type == "foreach" for n in node.in_funcs
)

# Add env vars from the optional @environment decorator.
env_deco = [deco for deco in node.decorators if deco.name == "environment"]
Expand All @@ -313,38 +315,35 @@ def _to_job(self, node: DAGNode):
env["METAFLOW_DEFAULT_PARAMETERS"] = json.dumps(default_parameters)
input_paths = None
else:
# If it is not the start node then we check if there are many paths
# converging into it or a single path. Based on that we set the INPUT_PATHS
if node.parallel_foreach:
raise AirflowException(
"Parallel steps are not supported yet with Airflow."
)

# Handle foreach join.
if join_in_foreach:
# todo : Handle split values + input_paths
pass
if len(node.in_funcs) == 1:
# set input paths where this is only one parent node
# The parent-task-id is passed via the xcom; There is no other way to get that.
# One key thing about xcoms is that they are immutable and only accepted if the task
# doesn't fail.
# From airflow docs :
# "Note: If the first task run is not succeeded then on every retry task XComs will be cleared to make the task run idempotent."
input_paths = self._make_parent_input_path(node.in_funcs[0])
else:
if len(node.in_funcs) == 1:
# set input paths where this is only one parent node
# The parent-task-id is passed via the xcom; There is no other way to get that.
# One key thing about xcoms is that they are immutable and only accepted if the task
# doesn't fail.
# From airflow docs :
# "Note: If the first task run is not succeeded then on every retry task XComs will be cleared to make the task run idempotent."
input_paths = self._make_parent_input_path(node.in_funcs[0])
else:
# this is a split scenario where there can be more than one input paths.
input_paths = self._make_parent_input_path_compressed(node.in_funcs)
# this is a split scenario where there can be more than one input paths.
input_paths = self._make_parent_input_path_compressed(node.in_funcs)

env["METAFLOW_INPUT_PATHS"] = input_paths

if node.is_inside_foreach:
# Todo : Handle This case
pass
# if node.is_inside_foreach:
# # Todo : Handle This case
# pass

if any_incoming_node_is_foreach:
# todo : Handle split index for for-each.
# step.append("--split-index $METAFLOW_SPLIT_INDEX")
pass
# if any_incoming_node_is_foreach:
# # todo : Handle split index for for-each.
# # step.append("--split-index $METAFLOW_SPLIT_INDEX")
# pass

env["METAFLOW_CODE_URL"] = self.code_package_url
env["METAFLOW_FLOW_NAME"] = attrs["metaflow.flow_name"]
Expand All @@ -358,21 +357,17 @@ def _to_job(self, node: DAGNode):
metaflow_version["flow_name"] = self.graph.name
env["METAFLOW_VERSION"] = json.dumps(metaflow_version)

if (
is_a_foreach
or (node.is_inside_foreach and successors_are_foreach_joins)
or join_in_foreach
):

# Todo : Find ways to pass state using for the below usecases:
# 1. To set the cardinality of foreaches (which are subsequently)
# read prior to the instantiation of the Map state by AWS Step
# Functions.
# 2. To set the input paths from the parent steps of a foreach join.
# 3. To read the input paths in a foreach join.
pass

# Todo : Find and set resource requirements for the decorator.
# if (
# is_a_foreach
# or (node.is_inside_foreach and successors_are_foreach_joins)
# or join_in_foreach
# ):

# # Todo : Find ways to pass state using for the below usecases:
# # 1. To set the cardinality of foreaches
# # 2. To set the input paths from the parent steps of a foreach join.
# # 3. To read the input paths in a foreach join.

compute_type = "k8s" # todo : This will become more dynamic in the future.
if compute_type == "k8s":
return self._k8s_job(node, input_paths, env)
Expand Down Expand Up @@ -403,12 +398,6 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
top_opts_dict.update(deco.get_top_level_options())

top_opts = list(dict_to_cli_options(top_opts_dict))
join_in_foreach = (
node.type == "join" and self.graph[node.split_parents[-1]].type == "foreach"
)
any_previous_node_is_foreach = any(
self.graph[n].type == "foreach" for n in node.in_funcs
)

top_level = top_opts + [
"--quiet",
Expand Down Expand Up @@ -472,10 +461,6 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
# set input paths for parameters
paths = "%s/_parameters/%s" % (self.run_id, task_id_params)

if join_in_foreach:
# todo : handle join case
pass

step = [
"step",
node.name,
Expand All @@ -485,9 +470,6 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
"--max-user-code-retries %d" % user_code_retries,
"--input-paths %s" % paths,
]
if any_previous_node_is_foreach:
# todo step.append("--split-index $METAFLOW_SPLIT_INDEX")
pass
if self.tags:
step.extend("--tag %s" % tag for tag in self.tags)
if self.namespace is not None:
Expand Down Expand Up @@ -538,11 +520,7 @@ def _visit(node: DAGNode, workflow: Workflow, exit_node=None):
workflow,
)

elif node.type == "foreach":
# Todo : handle foreach cardinality in some way
# Continue the traversal from the matching_join.
_visit(self.graph[node.matching_join], workflow)
# We shouldn't ideally ever get here.
# We should only get here for foreach branches.
else:
raise AirflowException(
"Node type *%s* for step *%s* "
Expand Down
3 changes: 3 additions & 0 deletions metaflow/plugins/airflow/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,9 @@ def compile(self):
from airflow import DAG

params_dict = self._construct_params()
# DAG Params can be seen here :
# https://airflow.apache.org/docs/apache-airflow/2.0.0/_api/airflow/models/dag/index.html#airflow.models.dag.DAG
# Airflow 2.0.0 Allows setting Params.
dag = DAG(params=params_dict, **self._dag_instantiation_params.arguements)
dag.fileloc = self._file_path if self._file_path is not None else dag.fileloc

Expand Down

0 comments on commit 9e622ba

Please sign in to comment.