Skip to content

Commit

Permalink
Refactored an id_creator method.
Browse files Browse the repository at this point in the history
  • Loading branch information
valayDave committed Jul 28, 2022
1 parent 04d7f20 commit a3147be
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 13 deletions.
15 changes: 4 additions & 11 deletions metaflow/plugins/airflow/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,9 @@ def get_supported_sensors(cls):
return list(cls.__dict__.values())


def run_id_creator(val):
def id_creator(val, hash_len):
# join `[dag-id,run-id]` of airflow dag.
return hashlib.md5("-".join(val).encode("utf-8")).hexdigest()[:RUN_HASH_ID_LEN]


def task_id_creator(lst):
# This is a filter which creates a hash of the run_id/step_name string.
# Since run_ids in airflow are constants, they don't create an issue with the
#
return hashlib.md5("/".join(lst).encode("utf-8")).hexdigest()[:TASK_ID_HASH_LEN]
return hashlib.md5("-".join(val).encode("utf-8")).hexdigest()[:hash_len]


def json_dump(val):
Expand Down Expand Up @@ -95,9 +88,9 @@ class AirflowDAGArgs(object):

# Reference for user_defined_filters : https://stackoverflow.com/a/70175317
filters = dict(
task_id_creator=lambda v: task_id_creator(v),
task_id_creator=lambda v: id_creator(v, TASK_ID_HASH_LEN),
json_dump=lambda val: json_dump(val),
run_id_creator=lambda val: run_id_creator(val),
run_id_creator=lambda val: id_creator(val, RUN_HASH_ID_LEN),
)

def __init__(self, **kwargs):
Expand Down
4 changes: 2 additions & 2 deletions metaflow/plugins/airflow/sensors/base_sensor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid
from metaflow.decorators import FlowDecorator
from ..exception import AirflowException
from ..airflow_utils import AirflowTask, task_id_creator
from ..airflow_utils import AirflowTask, id_creator, TASK_ID_HASH_LEN


class AirflowSensorDecorator(FlowDecorator):
Expand Down Expand Up @@ -63,7 +63,7 @@ def validate(self):
].index(self._id)
self._airflow_task_name = "%s-%s" % (
self.operator_type,
task_id_creator([self.operator_type, str(deco_index)]),
id_creator([self.operator_type, str(deco_index)], TASK_ID_HASH_LEN),
)
else:
self._airflow_task_name = self.attributes["name"]
Expand Down

0 comments on commit a3147be

Please sign in to comment.