Skip to content

Commit

Permalink
added mising Run-id prefixes to variables.
Browse files Browse the repository at this point in the history
- merged `hash` and `dash_connect` filters.
  • Loading branch information
valayDave committed Jul 29, 2022
1 parent 5f4d573 commit 36f67a9
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 15 deletions.
3 changes: 2 additions & 1 deletion metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
from .airflow_decorator import SUPPORTED_SENSORS, AirflowSensorDecorator
from .airflow_utils import (AIRFLOW_TASK_ID_TEMPLATE_VALUE,
RUN_ID_LEN,
RUN_ID_PREFIX,
TASK_ID_XCOM_KEY, AirflowTask, Workflow)

AIRFLOW_DEPLOY_TEMPLATE_FILE = os.path.join(os.path.dirname(__file__), "dag.py")


RUN_ID_PREFIX = "airflow"


class AirflowException(MetaflowException):
headline = "Airflow Exception"
Expand Down
22 changes: 8 additions & 14 deletions metaflow/plugins/airflow/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class AirflowSensorNotFound(Exception):
TASK_ID_XCOM_KEY = "metaflow_task_id"
RUN_ID_LEN = 12
TASK_ID_LEN = 8
RUN_ID_PREFIX = "airflow"

# AIRFLOW_TASK_ID_TEMPLATE_VALUE will work for linear/branched workflows.
# ti.task_id is the stepname in metaflow code.
Expand All @@ -31,9 +32,8 @@ class AirflowSensorNotFound(Exception):
# we can rename steps in a foreach with indexes (eg. `stepname-$index`) to create those steps.
# Hence : Foreachs will require some special form of plumbing.
# https://stackoverflow.com/questions/62962386/can-an-airflow-task-dynamically-generate-a-dag-at-runtime
# TODO: Reference RUN_ID_PREFIX here
AIRFLOW_TASK_ID_TEMPLATE_VALUE = (
"airflow-{{ [run_id, ti.task_id, dag_run.dag_id ] | task_id_creator }}"
"%s-{{ [run_id, ti.task_id, dag_run.dag_id ] | task_id_creator }}" % RUN_ID_PREFIX
)


Expand Down Expand Up @@ -67,9 +67,10 @@ def sanitize_label_value(val):
return sanitized_val[:57] + "-" + hash[:5]


def hasher(my_value):
return hashlib.md5(my_value.encode("utf-8")).hexdigest()[:RUN_ID_LEN]

def run_id_creator(val):
# join `[dag-id,run-id]` of airflow dag.
return hashlib.md5("-".join(val).encode("utf-8")).hexdigest()[:RUN_ID_LEN]


def task_id_creator(lst):
# This is a filter which creates a hash of the run_id/step_name string.
Expand All @@ -82,10 +83,6 @@ def json_dump(val):
return json.dumps(val)


def dash_connect(val):
return "-".join(val)


class AirflowDAGArgs(object):
# _arg_types This object helps map types of
# different keys that need to be parsed. None of the "values" in this
Expand Down Expand Up @@ -124,10 +121,9 @@ class AirflowDAGArgs(object):
metaflow_specific_args = {
# Reference for user_defined_filters : https://stackoverflow.com/a/70175317
"user_defined_filters": dict(
hash=lambda my_value: hasher(my_value),
task_id_creator=lambda v: task_id_creator(v),
json_dump=lambda val: json_dump(val),
dash_connect=lambda val: dash_connect(val),
run_id_creator=lambda val:run_id_creator(val)
),
}

Expand Down Expand Up @@ -220,9 +216,7 @@ def set_k8s_operator_args(flow_name, step_name, operator_args):
from airflow.kubernetes.secret import Secret

task_id = AIRFLOW_TASK_ID_TEMPLATE_VALUE
# TODO: Reference RUN_ID_PREFIX here. Also this seems like the only place where
# dash_connect and hash are being used. Can we combine them together?
run_id = "airflow-{{ [run_id, dag_run.dag_id] | dash_connect | hash }}" # hash is added via the `user_defined_filters`
run_id = "%s-{{ [run_id, dag_run.dag_id] | run_id_creator }}" % RUN_ID_PREFIX # run_id_creator is added via the `user_defined_filters`
attempt = "{{ task_instance.try_number - 1 }}"
# Set dynamic env variables like run-id, task-id etc from here.
env_vars = (
Expand Down

0 comments on commit 36f67a9

Please sign in to comment.