Skip to content

Commit

Permalink
refactor :
Browse files Browse the repository at this point in the history
-`RUN_ID_LEN` to `RUN_HASH_ID_LEN`
- `TASK_ID_LEN` to `TASK_ID_HASH_LEN`
  • Loading branch information
valayDave committed Jul 29, 2022
1 parent 04299e3 commit 7ef186f
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
4 changes: 2 additions & 2 deletions metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from .sensors import SUPPORTED_SENSORS
from .airflow_utils import (
AIRFLOW_TASK_ID,
RUN_ID_LEN,
RUN_HASH_ID_LEN,
RUN_ID_PREFIX,
TASK_ID_XCOM_KEY,
AirflowTask,
Expand All @@ -53,7 +53,7 @@ class Airflow(object):
# Such run-ids break the `metaflow.util.decompress_list`; this is why we hash the runid
run_id = (
"%s-$(echo -n {{ run_id }}-{{ dag_run.dag_id }} | md5sum | awk '{print $1}' | awk '{print substr ($0, 0, %s)}')"
% (RUN_ID_PREFIX, str(RUN_ID_LEN))
% (RUN_ID_PREFIX, str(RUN_HASH_ID_LEN))
)
# We do echo -n because emits line breaks and we dont want to consider that since it we want same hash value when retrieved in python.
run_id_arg = "--run-id %s" % run_id
Expand Down
8 changes: 4 additions & 4 deletions metaflow/plugins/airflow/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ class AirflowSensorNotFound(Exception):


TASK_ID_XCOM_KEY = "metaflow_task_id"
RUN_ID_LEN = 12
TASK_ID_LEN = 8
RUN_HASH_ID_LEN = 12
TASK_ID_HASH_LEN = 8
RUN_ID_PREFIX = "airflow"

# AIRFLOW_TASK_ID will work for linear/branched workflows.
Expand Down Expand Up @@ -48,14 +48,14 @@ def get_supported_sensors(cls):

def run_id_creator(val):
# join `[dag-id,run-id]` of airflow dag.
return hashlib.md5("-".join(val).encode("utf-8")).hexdigest()[:RUN_ID_LEN]
return hashlib.md5("-".join(val).encode("utf-8")).hexdigest()[:RUN_HASH_ID_LEN]


def task_id_creator(lst):
# This is a filter which creates a hash of the run_id/step_name string.
# Since run_ids in airflow are constants, they don't create an issue with the
#
return hashlib.md5("/".join(lst).encode("utf-8")).hexdigest()[:TASK_ID_LEN]
return hashlib.md5("/".join(lst).encode("utf-8")).hexdigest()[:TASK_ID_HASH_LEN]


def json_dump(val):
Expand Down

0 comments on commit 7ef186f

Please sign in to comment.