Skip to content

Commit

Permalink
nit fixes :
Browse files Browse the repository at this point in the history
- fixing comments.
- refactor some variable/function names.
  • Loading branch information
valayDave committed Jul 28, 2022
1 parent 2079293 commit 2719f5d
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 26 deletions.
3 changes: 1 addition & 2 deletions metaflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,7 @@ def _attach_decorators_to_step(step, decospecs):
def _init_flow_decorators(
flow, graph, environment, flow_datastore, metadata, logger, echo, deco_options
):
# Since _flow_decorators is a dictonary and some flow-decorators can have many decos of same type allowed.
# Hence we set certain keys as lists to accomodate multiple decos of same type and therefore validate if the value is a list
# Certain decorators can be specified multiple times and exist as lists in the _flow_decorators dictionary
for deco in flow._flow_decorators.values():
if type(deco) == list:
for rd in deco:
Expand Down
42 changes: 19 additions & 23 deletions metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,22 +199,20 @@ def _process_parameters(self):
raise MetaflowException(
"Parameter *%s* does not have a "
"default value. "
"A default value is required for parameters when deploying on Airflow."
"A default value is required for parameters when deploying flows on Airflow."
)
value = deploy_time_eval(param.kwargs.get("default"))
# Setting airflow related param args.
param_type = param.kwargs.get("type")
airflow_param = dict(
name=param.name,
)
param_help = param.kwargs.get("help")
if value is not None:
airflow_param["default"] = value
if param_help:
airflow_param["description"] = param_help
if param.kwargs.get("help"):
airflow_param["description"] = param.kwargs.get("help")
if param_type is not None:
# Todo (fast-follow): Check if we can support more `click.Param` types

param_type_name = getattr(param_type, "__name__", None)
if not param_type_name and isinstance(param_type, JSONTypeClass):
# `JSONTypeClass` has no __name__ attribute so we need to explicitly check if
Expand All @@ -230,19 +228,19 @@ def _process_parameters(self):

return airflow_params

def _make_input_path_compressed(
def _compress_input_path(
self,
step_names,
steps,
):
"""
This function is meant to compress the input paths and it specifically doesn't use
`metaflow.util.compress_list` under the hood. The reason is because the `AIRFLOW_MACROS.RUN_ID` is a complicated macro string
that doesn't behave nicely with `metaflow.util.decompress_list` since the `decompress_util`
function expects a string which doesn't contain any delimiter characters and the run-id string does.
Hence we have a custom compression string created via `_make_input_path_compressed` function instead of `compress_list`.
Hence we have a custom compression string created via `_compress_input_path` function instead of `compress_list`.
"""
return "%s:" % (AIRFLOW_MACROS.RUN_ID) + ",".join(
self._make_input_path(s, only_task_id=True) for s in step_names
self._make_input_path(step, only_task_id=True) for step in steps
)

def _make_foreach_input_path(self, step_name):
Expand All @@ -257,10 +255,10 @@ def _make_foreach_input_path(self, step_name):
)

def _make_input_path(self, step_name, only_task_id=False):
# This is set using the `airflow_internal` decorator.
# This will pull the `return_value` xcom which holds a dictionary.
# This helps pass state.
# The `TASK_ID_XCOM_KEY` is set via the `MetaflowKubernetesOperator`
"""
This is set using the `airflow_internal` decorator to help pass state.
This will pull the `TASK_ID_XCOM_KEY` xcom which holds task-ids. The key is set via the `MetaflowKubernetesOperator`.
"""
task_id_string = "/%s/{{ task_instance.xcom_pull(task_ids='%s',key='%s') }}" % (
step_name,
step_name,
Expand All @@ -275,22 +273,17 @@ def _make_input_path(self, step_name, only_task_id=False):
def _to_job(self, node):
"""
This function will transform the node's specification into Airflow compatible operator arguments.
Since this function is long. It performs two major duties:
Since this function is long, below is the summary of the two major duties it performs:
1. Based on the type of the graph node (start/linear/foreach/join etc.) it will decide how to set the input paths
2. Based on node's decorator specification convert the information into a job spec for the KubernetesPodOperator.
"""
# supported compute : k8s (v1), local(v2), batch(v3)
# Add env vars from the optional @environment decorator.
env_deco = [deco for deco in node.decorators if deco.name == "environment"]
env = {}
if env_deco:
env = env_deco[0].attributes["vars"]

is_foreach_join = (
node.type == "join" and self.graph[node.split_parents[-1]].type == "foreach"
)

# The Below If/Else Block handle "Input Paths".
# The below if/else block handles "input paths".
# Input Paths help manage dataflow across the graph.
if node.name == "start":
# POSSIBLE_FUTURE_IMPROVEMENT:
Expand All @@ -302,7 +295,7 @@ def _to_job(self, node):
# parameters.

if len(self.parameters):
env["METAFLOW_PARAMETERS"] = AIRFLOW_MACROS.PARAMETER
env["METAFLOW_PARAMETERS"] = AIRFLOW_MACROS.PARAMETERS
input_paths = None
else:
# If it is not the start node then we check if there are many paths
Expand All @@ -311,7 +304,10 @@ def _to_job(self, node):
raise AirflowException(
"Parallel steps are not supported yet with Airflow."
)

is_foreach_join = (
node.type == "join"
and self.graph[node.split_parents[-1]].type == "foreach"
)
if is_foreach_join:
input_paths = self._make_foreach_input_path(node.in_funcs[0])

Expand All @@ -325,7 +321,7 @@ def _to_job(self, node):
input_paths = self._make_input_path(node.in_funcs[0])
else:
# this is a split scenario where there can be more than one input paths.
input_paths = self._make_input_path_compressed(node.in_funcs)
input_paths = self._compress_input_path(node.in_funcs)

# env["METAFLOW_INPUT_PATHS"] = input_paths

Expand Down
2 changes: 1 addition & 1 deletion metaflow/plugins/airflow/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def _check_foreach_compatible_kubernetes_provider():
class AIRFLOW_MACROS:
# run_id_creator is added via the `user_defined_filters`
RUN_ID = "%s-{{ [run_id, dag_run.dag_id] | run_id_creator }}" % RUN_ID_PREFIX
PARAMETER = "{{ params | json_dump }}"
PARAMETERS = "{{ params | json_dump }}"

# AIRFLOW_MACROS.TASK_ID will work for linear/branched workflows.
# ti.task_id is the stepname in metaflow code.
Expand Down

0 comments on commit 2719f5d

Please sign in to comment.