Skip to content

Commit

Permalink
Removed Sensor metadata extraction.
Browse files Browse the repository at this point in the history
  • Loading branch information
valayDave committed Jul 29, 2022
1 parent 284608f commit fa21dc2
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 136 deletions.
7 changes: 5 additions & 2 deletions metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from . import airflow_utils
from .airflow_decorator import SUPPORTED_SENSORS, AirflowSensorDecorator
from .airflow_utils import (AIRFLOW_TASK_ID_TEMPLATE_VALUE,
PARENT_TASK_INSTANCE_STATUS_MACRO, RUN_ID_LEN,
RUN_ID_LEN,
TASK_ID_XCOM_KEY, AirflowTask, Workflow)

AIRFLOW_DEPLOY_TEMPLATE_FILE = os.path.join(os.path.dirname(__file__), "dag.py")
Expand Down Expand Up @@ -414,7 +414,10 @@ def _to_job(self, node):
# The Below If/Else Block handle "Input Paths".
# Input Paths help manage dataflow across the graph.
if node.name == "start":
# todo : pass metadata for sensors using `airflow_utils.PARENT_TASK_INSTANCE_STATUS_MACRO`
# POSSIBLE_FUTURE_IMPROVEMENT:
# We can extract metadata about the possible upstream sensor triggers.
# There is a previous commit (7bdf6) in the `airflow` branch that has `SensorMetaExtractor` class and
# associated MACRO we have built to handle this case if a metadata regarding the sensor is needed.
# Initialize parameters for the flow in the `start` step.
# `start` step has no upstream input dependencies aside from
# parameters.
Expand Down
134 changes: 0 additions & 134 deletions metaflow/plugins/airflow/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,139 +86,6 @@ def dash_connect(val):
return "-".join(val)


###=============================SENSOR-METADATA-EXTRACTION===================
# This code helps extract metadata about what happened with sensors.
# It is not being used at the moment but can be very easily in the future.
# Just set `PARENT_TASK_INSTANCE_STATUS_MACRO` to some environment variable
# it will create a json dictionary at task runtime.
# just SensorMetadata? Also, looking at the usage, we should be able to do away with
# this class entirely and embed the logic in get_sensor_metadata?
class SensorMetaExtractor:
"""
Extracts the metadata about the upstream sensors that are triggering the DAG.
Stores different metadata information based on different sensor type.
"""

def __init__(self, task_instance, airflow_operator, dag_run):
self._data = dict(
task_id=airflow_operator.task_id,
state=task_instance.state,
operator_type=task_instance.operator,
# just metadata
task_metadata=self._get_metadata(task_instance, airflow_operator, dag_run),
)

# this method shouldn't be a public method of this class. Only ExternalTaskSensor
# will have reason to use this method - we may as well move this method within
# get_metadata
@staticmethod
def external_task_sensor_run_ids(task_instance, airflow_operator, logical_date):
def resolve_run_ids(airflow_operator, dates):
from airflow.models import DagRun
# why is DR needed - can't we directly use DagRun?
DR = DagRun
vals = []
for d in dates:
run_id = DR.find(
dag_id=airflow_operator.external_dag_id, execution_date=d
)[0].run_id
vals.append(run_id)
return vals

# can you add a url reference to the code instead on the comment below?
# ----THIS CODE BLOCK USES LOGIC GIVEN IN EXTERNAL TASK SENSOR-----
if airflow_operator.execution_delta:
dttm = logical_date - airflow_operator.execution_delta
elif airflow_operator.execution_date_fn:
# `task_instance.get_template_context` : https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/taskinstance/index.html#airflow.models.taskinstance.TaskInstance.get_template_context
dttm = airflow_operator._handle_execution_date_fn(
context=task_instance.get_template_context()
)
else:
dttm = logical_date
dttm_filter = dttm if isinstance(dttm, list) else [dttm]
# ------------------------------------------------------------
resolvd_run_ids = resolve_run_ids(airflow_operator, dttm_filter)
return resolvd_run_ids

# I am not sure we need a function for this. you can simply generate a dictionary
# in _get_metadata
def _make_metadata(self, name, value):
return dict(name=name, value=value)

# rename this to metadata
def _get_metadata(self, task_instance, airflow_operator, dag_run):
metadata = []
if task_instance.operator == "ExternalTaskSensor":
metadata.extend(
[
self._make_metadata(
"upstream-triggering-run-ids",
", ".join(
self.external_task_sensor_run_ids(
task_instance, airflow_operator, dag_run.logical_date
)
),
),
self._make_metadata(
"upstream-dag-id", airflow_operator.external_dag_id
),
]
)
if airflow_operator.external_task_id is not None:
metadata.append(
self._make_metadata(
"upstream-task-id", str(airflow_operator.external_task_ids)
),
)
elif task_instance.operator == "S3KeySensor":
airflow_operator._resolve_bucket_and_key()
metadata.extend(
[
self._make_metadata("s3-key", airflow_operator.bucket_key),
self._make_metadata("s3-bucket-name", airflow_operator.bucket_name),
]
)
elif task_instance.operator == "SqlSensor":
# todo : Should we store sql statement in the metadata ?
# Storing SQL can raise security concerns and it would also require care when JSON.
metadata.append(
self._make_metadata("sql-conn-id", airflow_operator.conn_id)
)
return metadata


def get_data(self):
return self._data


PARENT_TASK_INSTANCE_STATUS_MACRO = (
"{{ [task.upstream_task_ids, dag_run] | get_sensor_metadata }}"
)


def get_sensor_metadata(args):
"""
This function will be a user defined macro that retrieve the task-instances for a task-id
and figure its status so that we can pass it down to the airflow decorators and store it as metadata.
It is used via the `PARENT_TASK_INSTANCE_STATUS_MACRO` to store this JSON dumped information in a environment variable.
"""

task_ids, dag_run = args
data = []
dag = dag_run.get_dag()
for tid in task_ids:
# `task_instance` is of the form https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/taskinstance/index.html#airflow.models.taskinstance.TaskInstance
task_instance = dag_run.get_task_instance(tid)
operator_object = dag.get_task(tid)
data.append(
SensorMetaExtractor(task_instance, operator_object, dag_run).get_data()
)
return json.dumps(data)


###========================================================================
class AirflowDAGArgs(object):
# _arg_types This object helps map types of
# different keys that need to be parsed. None of the "values" in this
Expand Down Expand Up @@ -260,7 +127,6 @@ class AirflowDAGArgs(object):
hash=lambda my_value: hasher(my_value),
task_id_creator=lambda v: task_id_creator(v),
json_dump=lambda val: json_dump(val),
get_sensor_metadata=lambda val: get_sensor_metadata(val),
dash_connect=lambda val: dash_connect(val),
),
}
Expand Down

0 comments on commit fa21dc2

Please sign in to comment.