diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index 8d53a0a695f..b0372e7cf24 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -197,10 +197,6 @@ def from_conf(name, default=None): ) # -### -# Airflow Configuration -### -AIRFLOW_STATE_MACHINE_PREFIX = from_conf("METAFLOW_AIRFLOW_STATE_MACHINE_PREFIX") ### # Conda configuration diff --git a/metaflow/plugins/airflow/airflow_cli.py b/metaflow/plugins/airflow/airflow_cli.py index 5dd9d66f5c2..2c0dbd51d1d 100644 --- a/metaflow/plugins/airflow/airflow_cli.py +++ b/metaflow/plugins/airflow/airflow_cli.py @@ -13,11 +13,7 @@ VALID_NAME = re.compile("[^a-zA-Z0-9_\-\.]") -def resolve_state_machine_name(name): - def attach_prefix(name): - if AIRFLOW_STATE_MACHINE_PREFIX is not None: - return AIRFLOW_STATE_MACHINE_PREFIX + "_" + name - return name +def resolve_dag_name(name): project = current.get("project_name") if project: @@ -25,16 +21,16 @@ def attach_prefix(name): raise MetaflowException( "--name is not supported for @projects. " "Use --branch instead." ) - state_machine_name = attach_prefix(current.project_flow_name) + dag_name = current.project_flow_name is_project = True else: if name and VALID_NAME.search(name): raise MetaflowException("Name '%s' contains invalid characters." % name) - state_machine_name = attach_prefix(name if name else current.flow_name) + dag_name = name if name else current.flow_name is_project = False - return state_machine_name, is_project + return dag_name, is_project @click.group() @@ -72,7 +68,7 @@ def make_flow( package_url, package_sha = obj.flow_datastore.save_data( [obj.package.blob], len_hint=1 )[0] - flow_name, is_project = resolve_state_machine_name(dag_name) + flow_name, is_project = resolve_dag_name(dag_name) return Airflow( flow_name, obj.graph, @@ -130,7 +126,7 @@ def make_flow( "--max-workers", default=100, show_default=True, - help="Maximum number of concurrent airflow tasks to run for the DAG. ", + help="Maximum number of concurrent Airflow tasks.", ) @click.option( "--worker-pool", diff --git a/metaflow/plugins/airflow/airflow_compiler.py b/metaflow/plugins/airflow/airflow_compiler.py index 302aa39aae7..83e8d3056c2 100644 --- a/metaflow/plugins/airflow/airflow_compiler.py +++ b/metaflow/plugins/airflow/airflow_compiler.py @@ -363,10 +363,10 @@ def _to_job(self, node: DAGNode): # or join_in_foreach # ): - # # Todo : Find ways to pass state using for the below usecases: - # # 1. To set the cardinality of foreaches - # # 2. To set the input paths from the parent steps of a foreach join. - # # 3. To read the input paths in a foreach join. + # Todo : Find ways to pass state using for the below usecases: + # 1. To set the cardinality of foreaches + # 2. To set the input paths from the parent steps of a foreach join. + # 3. To read the input paths in a foreach join. compute_type = "k8s" # todo : This will become more dynamic in the future. if compute_type == "k8s": diff --git a/metaflow/plugins/airflow/airflow_decorator.py b/metaflow/plugins/airflow/airflow_decorator.py index aa333d4cd03..aaaefb6bca6 100644 --- a/metaflow/plugins/airflow/airflow_decorator.py +++ b/metaflow/plugins/airflow/airflow_decorator.py @@ -4,10 +4,23 @@ from metaflow.decorators import StepDecorator from metaflow.metadata import MetaDatum -from .plumbing.airflow_xcom_push import push_xcom_values from .airflow_utils import TASK_ID_XCOM_KEY +K8S_XCOM_DIR_PATH = "/airflow/xcom" + +def safe_mkdir(dir): + try: + os.makedirs(dir) + except FileExistsError: + pass + + +def push_xcom_values(xcom_dict): + safe_mkdir(K8S_XCOM_DIR_PATH) + with open(os.path.join(K8S_XCOM_DIR_PATH, "return.json"), "w") as f: + json.dump(xcom_dict, f) + class AirflowInternalDecorator(StepDecorator): name = "airflow_internal" @@ -25,11 +38,10 @@ def task_pre_step( ubf_context, inputs, ): - # todo : find out where the execution is taking place. + # find out where the execution is taking place. # Once figured where the execution is happening then we can do # handle xcom push / pull differently meta = {} - meta["airflow-execution"] = os.environ["METAFLOW_RUN_ID"] meta["airflow-dag-run-id"] = os.environ["METAFLOW_AIRFLOW_DAG_RUN_ID"] meta["airflow-job-id"] = os.environ["METAFLOW_AIRFLOW_JOB_ID"] entries = [ @@ -46,8 +58,3 @@ def task_pre_step( } ) - def task_finished( - self, step_name, flow, graph, is_task_ok, retry_count, max_user_code_retries - ): - pass - # todo : Figure ways to find out foreach cardinality over here, diff --git a/metaflow/plugins/airflow/plumbing/airflow_xcom_push.py b/metaflow/plugins/airflow/plumbing/airflow_xcom_push.py deleted file mode 100644 index 5f8740ba352..00000000000 --- a/metaflow/plugins/airflow/plumbing/airflow_xcom_push.py +++ /dev/null @@ -1,18 +0,0 @@ -# How does xcom push work in K8s Operator : https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html#how-does-xcom-work -import os -import json - -K8S_XCOM_DIR_PATH = "/airflow/xcom" - - -def safe_mkdir(dir): - try: - os.makedirs(dir) - except FileExistsError: - pass - - -def push_xcom_values(xcom_dict): - safe_mkdir(K8S_XCOM_DIR_PATH) - with open(os.path.join(K8S_XCOM_DIR_PATH, "return.json"), "w") as f: - json.dump(xcom_dict, f)