diff --git a/metaflow/decorators.py b/metaflow/decorators.py index 8ca83fa1459..9ab5d5d4aee 100644 --- a/metaflow/decorators.py +++ b/metaflow/decorators.py @@ -473,8 +473,7 @@ def _attach_decorators_to_step(step, decospecs): def _init_flow_decorators( flow, graph, environment, flow_datastore, metadata, logger, echo, deco_options ): - # Since _flow_decorators is a dictonary and some flow-decorators can have many decos of same type allowed. - # Hence we set certain keys as lists to accomodate multiple decos of same type and therefore validate if the value is a list + # Certain decorators can be specified multiple times and exist as lists in the _flow_decorators dictionary for deco in flow._flow_decorators.values(): if type(deco) == list: for rd in deco: diff --git a/metaflow/plugins/airflow/airflow.py b/metaflow/plugins/airflow/airflow.py index 25f94818153..99043863dd1 100644 --- a/metaflow/plugins/airflow/airflow.py +++ b/metaflow/plugins/airflow/airflow.py @@ -199,7 +199,7 @@ def _process_parameters(self): raise MetaflowException( "Parameter *%s* does not have a " "default value. " - "A default value is required for parameters when deploying on Airflow." + "A default value is required for parameters when deploying flows on Airflow." ) value = deploy_time_eval(param.kwargs.get("default")) # Setting airflow related param args. @@ -207,14 +207,12 @@ def _process_parameters(self): airflow_param = dict( name=param.name, ) - param_help = param.kwargs.get("help") if value is not None: airflow_param["default"] = value - if param_help: - airflow_param["description"] = param_help + if param.kwargs.get("help"): + airflow_param["description"] = param.kwargs.get("help") if param_type is not None: # Todo (fast-follow): Check if we can support more `click.Param` types - param_type_name = getattr(param_type, "__name__", None) if not param_type_name and isinstance(param_type, JSONTypeClass): # `JSONTypeClass` has no __name__ attribute so we need to explicitly check if @@ -230,19 +228,19 @@ def _process_parameters(self): return airflow_params - def _make_input_path_compressed( + def _compress_input_path( self, - step_names, + steps, ): """ This function is meant to compress the input paths and it specifically doesn't use `metaflow.util.compress_list` under the hood. The reason is because the `AIRFLOW_MACROS.RUN_ID` is a complicated macro string that doesn't behave nicely with `metaflow.util.decompress_list` since the `decompress_util` function expects a string which doesn't contain any delimiter characters and the run-id string does. - Hence we have a custom compression string created via `_make_input_path_compressed` function instead of `compress_list`. + Hence we have a custom compression string created via `_compress_input_path` function instead of `compress_list`. """ return "%s:" % (AIRFLOW_MACROS.RUN_ID) + ",".join( - self._make_input_path(s, only_task_id=True) for s in step_names + self._make_input_path(step, only_task_id=True) for step in steps ) def _make_foreach_input_path(self, step_name): @@ -257,10 +255,10 @@ def _make_foreach_input_path(self, step_name): ) def _make_input_path(self, step_name, only_task_id=False): - # This is set using the `airflow_internal` decorator. - # This will pull the `return_value` xcom which holds a dictionary. - # This helps pass state. - # The `TASK_ID_XCOM_KEY` is set via the `MetaflowKubernetesOperator` + """ + This is set using the `airflow_internal` decorator to help pass state. + This will pull the `TASK_ID_XCOM_KEY` xcom which holds task-ids. The key is set via the `MetaflowKubernetesOperator`. + """ task_id_string = "/%s/{{ task_instance.xcom_pull(task_ids='%s',key='%s') }}" % ( step_name, step_name, @@ -275,22 +273,17 @@ def _make_input_path(self, step_name, only_task_id=False): def _to_job(self, node): """ This function will transform the node's specification into Airflow compatible operator arguments. - Since this function is long. It performs two major duties: + Since this function is long, below is the summary of the two major duties it performs: 1. Based on the type of the graph node (start/linear/foreach/join etc.) it will decide how to set the input paths 2. Based on node's decorator specification convert the information into a job spec for the KubernetesPodOperator. """ - # supported compute : k8s (v1), local(v2), batch(v3) # Add env vars from the optional @environment decorator. env_deco = [deco for deco in node.decorators if deco.name == "environment"] env = {} if env_deco: env = env_deco[0].attributes["vars"] - is_foreach_join = ( - node.type == "join" and self.graph[node.split_parents[-1]].type == "foreach" - ) - - # The Below If/Else Block handle "Input Paths". + # The below if/else block handles "input paths". # Input Paths help manage dataflow across the graph. if node.name == "start": # POSSIBLE_FUTURE_IMPROVEMENT: @@ -302,7 +295,7 @@ def _to_job(self, node): # parameters. if len(self.parameters): - env["METAFLOW_PARAMETERS"] = AIRFLOW_MACROS.PARAMETER + env["METAFLOW_PARAMETERS"] = AIRFLOW_MACROS.PARAMETERS input_paths = None else: # If it is not the start node then we check if there are many paths @@ -311,7 +304,10 @@ def _to_job(self, node): raise AirflowException( "Parallel steps are not supported yet with Airflow." ) - + is_foreach_join = ( + node.type == "join" + and self.graph[node.split_parents[-1]].type == "foreach" + ) if is_foreach_join: input_paths = self._make_foreach_input_path(node.in_funcs[0]) @@ -325,7 +321,7 @@ def _to_job(self, node): input_paths = self._make_input_path(node.in_funcs[0]) else: # this is a split scenario where there can be more than one input paths. - input_paths = self._make_input_path_compressed(node.in_funcs) + input_paths = self._compress_input_path(node.in_funcs) # env["METAFLOW_INPUT_PATHS"] = input_paths diff --git a/metaflow/plugins/airflow/airflow_utils.py b/metaflow/plugins/airflow/airflow_utils.py index 8318718d071..dea2ada16cc 100644 --- a/metaflow/plugins/airflow/airflow_utils.py +++ b/metaflow/plugins/airflow/airflow_utils.py @@ -118,7 +118,7 @@ def _check_foreach_compatible_kubernetes_provider(): class AIRFLOW_MACROS: # run_id_creator is added via the `user_defined_filters` RUN_ID = "%s-{{ [run_id, dag_run.dag_id] | run_id_creator }}" % RUN_ID_PREFIX - PARAMETER = "{{ params | json_dump }}" + PARAMETERS = "{{ params | json_dump }}" # AIRFLOW_MACROS.TASK_ID will work for linear/branched workflows. # ti.task_id is the stepname in metaflow code.