From 8ac2bd6b0e621dc39ac16cf527d69dcf57c5a5b1 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Wed, 25 May 2022 17:29:05 +0200 Subject: [PATCH] fix(airflow): Fix for Airflow 1 support (#4995) --- .../client/airflow_generator.py | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py index 5a491f1e8e653a..b5c389d298969e 100644 --- a/metadata-ingestion/src/datahub_provider/client/airflow_generator.py +++ b/metadata-ingestion/src/datahub_provider/client/airflow_generator.py @@ -10,7 +10,6 @@ from datahub.metadata.schema_classes import DataProcessTypeClass from datahub.utilities.urns.data_flow_urn import DataFlowUrn from datahub.utilities.urns.data_job_urn import DataJobUrn -from datahub_provider.hooks.datahub import AIRFLOW_1 if TYPE_CHECKING: from airflow import DAG @@ -159,6 +158,20 @@ def generate_dataflow( return data_flow + @staticmethod + def _get_description(task: "BaseOperator") -> Optional[str]: + if hasattr(task, "doc") and task.doc: + return task.doc + elif hasattr(task, "doc_md") and task.doc_md: + return task.doc_md + elif hasattr(task, "doc_json") and task.doc_json: + return task.doc_json + elif hasattr(task, "doc_yaml") and task.doc_yaml: + return task.doc_yaml + elif hasattr(task, "doc_rst") and task.doc_yaml: + return task.doc_yaml + return None + @staticmethod def generate_datajob( cluster: str, @@ -184,11 +197,7 @@ def generate_datajob( orchestrator="airflow", env=cluster, flow_id=dag.dag_id ) datajob = DataJob(id=task.task_id, flow_urn=dataflow_urn) - datajob.description = ( - (task.doc or task.doc_md or task.doc_json or task.doc_yaml or task.doc_rst) - if not AIRFLOW_1 - else None - ) + datajob.description = AirflowGenerator._get_description(task) job_property_bag: Dict[str, str] = { key: repr(value) @@ -375,7 +384,7 @@ def run_datajob( job_property_bag["hostname"] = str(ti.hostname) job_property_bag["max_tries"] = str(ti.max_tries) # Not compatible with Airflow 1 - if not AIRFLOW_1: + if hasattr(ti, "external_executor_id"): job_property_bag["external_executor_id"] = str(ti.external_executor_id) job_property_bag["pid"] = str(ti.pid) job_property_bag["state"] = str(ti.state) @@ -387,7 +396,7 @@ def run_datajob( dpi.url = ti.log_url # This property only exists in Airflow2 - if hasattr(ti.dag_run, "run_type"): + if hasattr(ti, "dag_run") and hasattr(ti.dag_run, "run_type"): from airflow.utils.types import DagRunType if ti.dag_run.run_type == DagRunType.SCHEDULED: