From 2a5bf53edb738645643059afa5c43cc935d01126 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Wed, 25 Sep 2024 08:52:32 +0200 Subject: [PATCH 01/15] fix(ingestion/airflow-plugin): added the AllowDenyPattern config for datahub --- docs/lineage/airflow.md | 3 ++- .../src/datahub_airflow_plugin/_config.py | 6 ++++++ .../src/datahub_airflow_plugin/datahub_listener.py | 11 ++++++++--- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index 65da1fd5251dc..a2837c6220543 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -132,7 +132,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default ``` | Name | Default value | Description | -| -------------------------- | -------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +|----------------------------|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | enabled | true | If the plugin should be enabled. | | conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. | | cluster | prod | name of the airflow cluster | @@ -144,6 +144,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default | datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. | | | | graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. | +| dag_allow_deny_pattern_str | { "allow": [".*"] } | AllowDenyPattern value in form of JSON string to filter the DAGs from running. | #### Validate that the plugin is working diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index c37a1b334ed37..90c3fe9ffcf12 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -56,6 +56,8 @@ class DatahubLineageConfig(ConfigModel): datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE + dag_allow_deny_pattern_str: str = '{"allow": [".*"]}' + def make_emitter_hook(self) -> "DatahubGenericHook": # This is necessary to avoid issues with circular imports. from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook @@ -87,6 +89,9 @@ def get_lineage_config() -> DatahubLineageConfig: datajob_url_link = conf.get( "datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value ) + dag_allow_deny_pattern_str = conf.get( + "datahub", "dag_allow_deny_pattern_str", fallback='{"allow": [".*"]}' + ) return DatahubLineageConfig( enabled=enabled, @@ -102,4 +107,5 @@ def get_lineage_config() -> DatahubLineageConfig: debug_emitter=debug_emitter, disable_openlineage_plugin=disable_openlineage_plugin, datajob_url_link=datajob_url_link, + dag_allow_deny_pattern_str=dag_allow_deny_pattern_str, ) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index 123b74fee74b5..a2791bd19a561 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -12,6 +12,7 @@ from airflow.models.serialized_dag import SerializedDagModel from datahub.api.entities.datajob import DataJob from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult +from datahub.configuration.common import AllowDenyPattern from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.ingestion.graph.client import DataHubGraph @@ -687,9 +688,13 @@ def on_dag_run_running(self, dag_run: "DagRun", msg: str) -> None: f"DataHub listener got notification about dag run start for {dag_run.dag_id}" ) - self.on_dag_start(dag_run) - - self.emitter.flush() + # convert allow_deny_pattern string to AllowDenyPattern object + dag_allow_deny_pattern_model = AllowDenyPattern.model_validate_json( + self.config.dag_allow_deny_pattern_str + ) + if dag_allow_deny_pattern_model.allowed(dag_run.dag_id): + self.on_dag_start(dag_run) + self.emitter.flush() # TODO: Add hooks for on_dag_run_success, on_dag_run_failed -> call AirflowGenerator.complete_dataflow From 82c7c5c4bd23c7b20db6034a14a0861ea2f1cde0 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Wed, 25 Sep 2024 09:02:55 +0200 Subject: [PATCH 02/15] fix(ingestion/airflow-plugin): fixing failed testcase --- .../src/datahub_airflow_plugin/datahub_listener.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index a2791bd19a561..3b5b9a2841441 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -692,6 +692,8 @@ def on_dag_run_running(self, dag_run: "DagRun", msg: str) -> None: dag_allow_deny_pattern_model = AllowDenyPattern.model_validate_json( self.config.dag_allow_deny_pattern_str ) + + assert dag_run.dag_id if dag_allow_deny_pattern_model.allowed(dag_run.dag_id): self.on_dag_start(dag_run) self.emitter.flush() From e787f9cf8d762edc6ff17f4e2943ebcd97ae239e Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Wed, 25 Sep 2024 09:06:04 +0200 Subject: [PATCH 03/15] fix(ingestion/airflow-plugin): fixing failed testcase --- .../src/datahub_airflow_plugin/datahub_listener.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index 3b5b9a2841441..9ca14ecb8a86a 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -689,7 +689,7 @@ def on_dag_run_running(self, dag_run: "DagRun", msg: str) -> None: ) # convert allow_deny_pattern string to AllowDenyPattern object - dag_allow_deny_pattern_model = AllowDenyPattern.model_validate_json( + dag_allow_deny_pattern_model = AllowDenyPattern.parse_raw( self.config.dag_allow_deny_pattern_str ) From 152e16b97fa259778bf9f5e31d1f4bfe99512d30 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Tue, 8 Oct 2024 11:55:59 +0200 Subject: [PATCH 04/15] fix(ingestion/airflow-plugin): incorporated review comments and added the test --- .../src/datahub_airflow_plugin/_config.py | 15 +- .../datahub_listener.py | 216 +++++------ .../dags/dag_to_filter_from_ingestion.py | 34 ++ .../v2_dag_to_filter_from_ingestion.json | 337 ++++++++++++++++++ .../tests/integration/test_plugin.py | 10 + 5 files changed, 502 insertions(+), 110 deletions(-) create mode 100644 metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_filter_from_ingestion.py create mode 100644 metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_dag_to_filter_from_ingestion.json diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index 90c3fe9ffcf12..97700a6bc11c5 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -1,9 +1,11 @@ from enum import Enum from typing import TYPE_CHECKING, Optional +from pydantic.fields import Field + import datahub.emitter.mce_builder as builder from airflow.configuration import conf -from datahub.configuration.common import ConfigModel +from datahub.configuration.common import ConfigModel, AllowDenyPattern if TYPE_CHECKING: from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook @@ -56,7 +58,10 @@ class DatahubLineageConfig(ConfigModel): datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE - dag_allow_deny_pattern_str: str = '{"allow": [".*"]}' + dag_allow_deny_pattern: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description="regex patterns for DAGs to ingest", + ) def make_emitter_hook(self) -> "DatahubGenericHook": # This is necessary to avoid issues with circular imports. @@ -89,8 +94,8 @@ def get_lineage_config() -> DatahubLineageConfig: datajob_url_link = conf.get( "datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value ) - dag_allow_deny_pattern_str = conf.get( - "datahub", "dag_allow_deny_pattern_str", fallback='{"allow": [".*"]}' + dag_allow_deny_pattern = AllowDenyPattern.parse_raw( + conf.get("datahub", "dag_allow_deny_pattern", fallback='{"allow": [".*"]}') ) return DatahubLineageConfig( @@ -107,5 +112,5 @@ def get_lineage_config() -> DatahubLineageConfig: debug_emitter=debug_emitter, disable_openlineage_plugin=disable_openlineage_plugin, datajob_url_link=datajob_url_link, - dag_allow_deny_pattern_str=dag_allow_deny_pattern_str, + dag_allow_deny_pattern=dag_allow_deny_pattern, ) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index 9ca14ecb8a86a..8fc02131f4c96 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -12,7 +12,6 @@ from airflow.models.serialized_dag import SerializedDagModel from datahub.api.entities.datajob import DataJob from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult -from datahub.configuration.common import AllowDenyPattern from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.ingestion.graph.client import DataHubGraph @@ -384,95 +383,100 @@ def on_task_instance_running( return logger.debug( - f"DataHub listener got notification about task instance start for {task_instance.task_id}" + f"DataHub listener got notification about task instance start for {task_instance.task_id} of dag {task_instance.dag_run.dag_id}" ) - task_instance = _render_templates(task_instance) + if self.config.dag_allow_deny_pattern.allowed(task_instance.dag_run.dag_id): + task_instance = _render_templates(task_instance) - # The type ignore is to placate mypy on Airflow 2.1.x. - dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined] - task = task_instance.task - assert task is not None - dag: "DAG" = task.dag # type: ignore[assignment] + # The type ignore is to placate mypy on Airflow 2.1.x. + dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined] + task = task_instance.task + assert task is not None + dag: "DAG" = task.dag # type: ignore[assignment] - self._task_holder.set_task(task_instance) + self._task_holder.set_task(task_instance) - # Handle async operators in Airflow 2.3 by skipping deferred state. - # Inspired by https://github.com/OpenLineage/OpenLineage/pull/1601 - if task_instance.next_method is not None: # type: ignore[attr-defined] - return + # Handle async operators in Airflow 2.3 by skipping deferred state. + # Inspired by https://github.com/OpenLineage/OpenLineage/pull/1601 + if task_instance.next_method is not None: # type: ignore[attr-defined] + return - # If we don't have the DAG listener API, we just pretend that - # the start of the task is the start of the DAG. - # This generates duplicate events, but it's better than not - # generating anything. - if not HAS_AIRFLOW_DAG_LISTENER_API: - self.on_dag_start(dagrun) + # If we don't have the DAG listener API, we just pretend that + # the start of the task is the start of the DAG. + # This generates duplicate events, but it's better than not + # generating anything. + if not HAS_AIRFLOW_DAG_LISTENER_API: + self.on_dag_start(dagrun) - datajob = AirflowGenerator.generate_datajob( - cluster=self.config.cluster, - task=task, - dag=dag, - capture_tags=self.config.capture_tags_info, - capture_owner=self.config.capture_ownership_info, - config=self.config, - ) + datajob = AirflowGenerator.generate_datajob( + cluster=self.config.cluster, + task=task, + dag=dag, + capture_tags=self.config.capture_tags_info, + capture_owner=self.config.capture_ownership_info, + config=self.config, + ) - # TODO: Make use of get_task_location to extract github urls. + # TODO: Make use of get_task_location to extract github urls. - # Add lineage info. - self._extract_lineage(datajob, dagrun, task, task_instance) + # Add lineage info. + self._extract_lineage(datajob, dagrun, task, task_instance) - # TODO: Add handling for Airflow mapped tasks using task_instance.map_index + # TODO: Add handling for Airflow mapped tasks using task_instance.map_index - for mcp in datajob.generate_mcp( - materialize_iolets=self.config.materialize_iolets - ): - self.emitter.emit(mcp, self._make_emit_callback()) - logger.debug(f"Emitted DataHub Datajob start: {datajob}") + for mcp in datajob.generate_mcp( + materialize_iolets=self.config.materialize_iolets + ): + self.emitter.emit(mcp, self._make_emit_callback()) + logger.debug(f"Emitted DataHub Datajob start: {datajob}") - if self.config.capture_executions: - dpi = AirflowGenerator.run_datajob( - emitter=self.emitter, - config=self.config, - ti=task_instance, - dag=dag, - dag_run=dagrun, - datajob=datajob, - emit_templates=False, - ) - logger.debug(f"Emitted DataHub DataProcess Instance start: {dpi}") + if self.config.capture_executions: + dpi = AirflowGenerator.run_datajob( + emitter=self.emitter, + config=self.config, + ti=task_instance, + dag=dag, + dag_run=dagrun, + datajob=datajob, + emit_templates=False, + ) + logger.debug(f"Emitted DataHub DataProcess Instance start: {dpi}") - self.emitter.flush() + self.emitter.flush() - logger.debug( - f"DataHub listener finished processing notification about task instance start for {task_instance.task_id}" - ) + logger.debug( + f"DataHub listener finished processing notification about task instance start for {task_instance.task_id}" + ) - if self.config.materialize_iolets: - for outlet in datajob.outlets: - reported_time: int = int(time.time() * 1000) - operation = OperationClass( - timestampMillis=reported_time, - operationType=OperationTypeClass.CREATE, - lastUpdatedTimestamp=reported_time, - actor=builder.make_user_urn("airflow"), - ) + if self.config.materialize_iolets: + for outlet in datajob.outlets: + reported_time: int = int(time.time() * 1000) + operation = OperationClass( + timestampMillis=reported_time, + operationType=OperationTypeClass.CREATE, + lastUpdatedTimestamp=reported_time, + actor=builder.make_user_urn("airflow"), + ) - operation_mcp = MetadataChangeProposalWrapper( - entityUrn=str(outlet), aspect=operation - ) + operation_mcp = MetadataChangeProposalWrapper( + entityUrn=str(outlet), aspect=operation + ) - self.emitter.emit(operation_mcp) - logger.debug(f"Emitted Dataset Operation: {outlet}") + self.emitter.emit(operation_mcp) + logger.debug(f"Emitted Dataset Operation: {outlet}") + else: + if self.graph: + for outlet in datajob.outlets: + if not self.graph.exists(str(outlet)): + logger.warning(f"Dataset {str(outlet)} not materialized") + for inlet in datajob.inlets: + if not self.graph.exists(str(inlet)): + logger.warning(f"Dataset {str(inlet)} not materialized") else: - if self.graph: - for outlet in datajob.outlets: - if not self.graph.exists(str(outlet)): - logger.warning(f"Dataset {str(outlet)} not materialized") - for inlet in datajob.inlets: - if not self.graph.exists(str(inlet)): - logger.warning(f"Dataset {str(inlet)} not materialized") + logger.debug( + f"DAG {task_instance.dag_run.dag_id} is not allowed by the pattern" + ) def on_task_instance_finish( self, task_instance: "TaskInstance", status: InstanceRunResult @@ -491,40 +495,45 @@ def on_task_instance_finish( dag: "DAG" = task.dag # type: ignore[assignment] - datajob = AirflowGenerator.generate_datajob( - cluster=self.config.cluster, - task=task, - dag=dag, - capture_tags=self.config.capture_tags_info, - capture_owner=self.config.capture_ownership_info, - config=self.config, - ) - - # Add lineage info. - self._extract_lineage(datajob, dagrun, task, task_instance, complete=True) - - for mcp in datajob.generate_mcp( - materialize_iolets=self.config.materialize_iolets - ): - self.emitter.emit(mcp, self._make_emit_callback()) - logger.debug(f"Emitted DataHub Datajob finish w/ status {status}: {datajob}") - - if self.config.capture_executions: - dpi = AirflowGenerator.complete_datajob( - emitter=self.emitter, + if self.config.dag_allow_deny_pattern.allowed(dag.dag_id): + datajob = AirflowGenerator.generate_datajob( cluster=self.config.cluster, - ti=task_instance, + task=task, dag=dag, - dag_run=dagrun, - datajob=datajob, - result=status, + capture_tags=self.config.capture_tags_info, + capture_owner=self.config.capture_ownership_info, config=self.config, ) + + # Add lineage info. + self._extract_lineage(datajob, dagrun, task, task_instance, complete=True) + + for mcp in datajob.generate_mcp( + materialize_iolets=self.config.materialize_iolets + ): + self.emitter.emit(mcp, self._make_emit_callback()) logger.debug( - f"Emitted DataHub DataProcess Instance with status {status}: {dpi}" + f"Emitted DataHub Datajob finish w/ status {status}: {datajob}" ) - self.emitter.flush() + if self.config.capture_executions: + dpi = AirflowGenerator.complete_datajob( + emitter=self.emitter, + cluster=self.config.cluster, + ti=task_instance, + dag=dag, + dag_run=dagrun, + datajob=datajob, + result=status, + config=self.config, + ) + logger.debug( + f"Emitted DataHub DataProcess Instance with status {status}: {dpi}" + ) + + self.emitter.flush() + else: + logger.debug(f"DAG {dag.dag_id} is not allowed by the pattern") @hookimpl @run_in_thread @@ -688,15 +697,12 @@ def on_dag_run_running(self, dag_run: "DagRun", msg: str) -> None: f"DataHub listener got notification about dag run start for {dag_run.dag_id}" ) - # convert allow_deny_pattern string to AllowDenyPattern object - dag_allow_deny_pattern_model = AllowDenyPattern.parse_raw( - self.config.dag_allow_deny_pattern_str - ) - assert dag_run.dag_id - if dag_allow_deny_pattern_model.allowed(dag_run.dag_id): + if self.config.dag_allow_deny_pattern.allowed(dag_run.dag_id): self.on_dag_start(dag_run) self.emitter.flush() + else: + logger.debug(f"DAG {dag_run.dag_id} is not allowed by the pattern") # TODO: Add hooks for on_dag_run_success, on_dag_run_failed -> call AirflowGenerator.complete_dataflow diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_filter_from_ingestion.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_filter_from_ingestion.py new file mode 100644 index 0000000000000..811278366ceba --- /dev/null +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_filter_from_ingestion.py @@ -0,0 +1,34 @@ +from datetime import datetime + +from airflow import DAG +from airflow.operators.bash import BashOperator + +from datahub_airflow_plugin.entities import Dataset, Urn + +with DAG( + "dag_to_filter_from_ingestion", + start_date=datetime(2023, 1, 1), + schedule_interval=None, + catchup=False, +) as dag: + task1 = BashOperator( + task_id="task_dag_to_filter_from_ingestion_task_1", + dag=dag, + bash_command="echo 'task_dag_to_filter_from_ingestion_task_1'", + inlets=[ + Dataset(platform="snowflake", name="mydb.schema.tableA"), + Urn( + "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" + ), + Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"), + ], + outlets=[Dataset("snowflake", "mydb.schema.tableD")], + ) + + task2 = BashOperator( + task_id="task_dag_to_filter_from_ingestion_task_2", + dag=dag, + bash_command="echo 'task_dag_to_filter_from_ingestion_task_2'", + ) + + task1 >> task2 diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_dag_to_filter_from_ingestion.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_dag_to_filter_from_ingestion.json new file mode 100644 index 0000000000000..826e94555127c --- /dev/null +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_dag_to_filter_from_ingestion.json @@ -0,0 +1,337 @@ +[ +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": { + "_access_control": "None", + "catchup": "False", + "description": "None", + "doc_md": "None", + "fileloc": "", + "is_paused_upon_creation": "None", + "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", + "tags": "[]", + "timezone": "Timezone('UTC')" + }, + "externalUrl": "http://airflow.example.com/tree?dag_id=dag_to_filter_from_ingestion", + "name": "dag_to_filter_from_ingestion", + "env": "PROD" + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "depends_on_past": "False", + "email": "None", + "label": "'task_dag_to_filter_from_ingestion_task_2'", + "execution_timeout": "None", + "sla": "None", + "task_id": "'task_dag_to_filter_from_ingestion_task_2'", + "trigger_rule": "", + "wait_for_downstream": "False", + "downstream_task_ids": "[]", + "inlets": "[]", + "outlets": "[]" + }, + "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=dag_to_filter_from_ingestion&_flt_3_task_id=task_dag_to_filter_from_ingestion_task_2", + "name": "task_dag_to_filter_from_ingestion_task_2", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_1)" + ], + "fineGrainedLineages": [] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "depends_on_past": "False", + "email": "None", + "label": "'task_dag_to_filter_from_ingestion_task_2'", + "execution_timeout": "None", + "sla": "None", + "task_id": "'task_dag_to_filter_from_ingestion_task_2'", + "trigger_rule": "", + "wait_for_downstream": "False", + "downstream_task_ids": "[]", + "inlets": "[]", + "outlets": "[]" + }, + "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=dag_to_filter_from_ingestion&_flt_3_task_id=task_dag_to_filter_from_ingestion_task_2", + "name": "task_dag_to_filter_from_ingestion_task_2", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_1)" + ], + "fineGrainedLineages": [] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d9d4c9c0c7114d033f2def8f9ac00991", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "run_id": "manual_run_test", + "duration": "", + "start_date": "", + "end_date": "", + "execution_date": "2023-09-27 21:34:38+00:00", + "try_number": "1", + "max_tries": "0", + "external_executor_id": "None", + "state": "success", + "operator": "BashOperator", + "priority_weight": "1", + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_dag_to_filter_from_ingestion_task_2&dag_id=dag_to_filter_from_ingestion&map_index=-1", + "orchestrator": "airflow", + "dag_id": "dag_to_filter_from_ingestion", + "task_id": "task_dag_to_filter_from_ingestion_task_2" + }, + "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_dag_to_filter_from_ingestion_task_2&dag_id=dag_to_filter_from_ingestion&map_index=-1", + "name": "dag_to_filter_from_ingestion_task_dag_to_filter_from_ingestion_task_2_manual_run_test", + "type": "BATCH_AD_HOC", + "created": { + "time": 1728377976166, + "actor": "urn:li:corpuser:datahub" + } + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d9d4c9c0c7114d033f2def8f9ac00991", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", + "upstreamInstances": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d9d4c9c0c7114d033f2def8f9ac00991", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1728377976166, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "STARTED", + "attempt": 2 + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d9d4c9c0c7114d033f2def8f9ac00991", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1728377976307, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "airflow" + } + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index 37cd3b792d535..7d3b1aa2f16ef 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -140,6 +140,9 @@ def _run_airflow( # Configure the datahub plugin and have it write the MCPs to a file. "AIRFLOW__CORE__LAZY_LOAD_PLUGINS": "False" if is_v1 else "True", "AIRFLOW__DATAHUB__CONN_ID": datahub_connection_name, + "AIRFLOW__DATAHUB__DAG_ALLOW_DENY_PATTERN": str( + '{ "deny": ["{dag_to_filter_from_ingestion}"] }' + ), f"AIRFLOW_CONN_{datahub_connection_name.upper()}": Connection( conn_id="datahub_file_default", conn_type="datahub-file", @@ -276,6 +279,7 @@ class DagTestCase: test_cases = [ DagTestCase("simple_dag"), DagTestCase("basic_iolets"), + DagTestCase("dag_to_filter_from_ingestion"), DagTestCase("snowflake_operator", success=False, v2_only=True), DagTestCase("sqlite_operator", v2_only=True), DagTestCase("custom_operator_dag", v2_only=True), @@ -375,6 +379,12 @@ def test_airflow_plugin( _sanitize_output_file(airflow_instance.metadata_file) + """ + Golden file will get generated for the `filtered DAG` + but this golden file will not having anything with respected + to ingested properties, as nothing will be ingested from the filtered DAG + """ + check_golden_file( pytestconfig=pytestconfig, output_path=airflow_instance.metadata_file, From 40074421ba2128e42e3f261d9e3e8de4ea5ecb60 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Tue, 8 Oct 2024 12:04:20 +0200 Subject: [PATCH 05/15] fix(ingestion/airflow-plugin): linter issue --- .../airflow-plugin/src/datahub_airflow_plugin/_config.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index 97700a6bc11c5..5729576aabd0f 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -1,11 +1,10 @@ from enum import Enum from typing import TYPE_CHECKING, Optional -from pydantic.fields import Field - import datahub.emitter.mce_builder as builder from airflow.configuration import conf -from datahub.configuration.common import ConfigModel, AllowDenyPattern +from datahub.configuration.common import AllowDenyPattern, ConfigModel +from pydantic.fields import Field if TYPE_CHECKING: from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook From 46fadcd1711527266e6333834fad3275964fe469 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Tue, 8 Oct 2024 13:31:19 +0200 Subject: [PATCH 06/15] fix(ingestion/airflow-plugin): fixed failing tests --- .../v2_dag_to_filter_from_ingestion.json | 337 ------------------ .../tests/integration/test_plugin.py | 44 ++- 2 files changed, 20 insertions(+), 361 deletions(-) delete mode 100644 metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_dag_to_filter_from_ingestion.json diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_dag_to_filter_from_ingestion.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_dag_to_filter_from_ingestion.json deleted file mode 100644 index 826e94555127c..0000000000000 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_dag_to_filter_from_ingestion.json +++ /dev/null @@ -1,337 +0,0 @@ -[ -{ - "entityType": "dataFlow", - "entityUrn": "urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod)", - "changeType": "UPSERT", - "aspectName": "dataFlowInfo", - "aspect": { - "json": { - "customProperties": { - "_access_control": "None", - "catchup": "False", - "description": "None", - "doc_md": "None", - "fileloc": "", - "is_paused_upon_creation": "None", - "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", - "tags": "[]", - "timezone": "Timezone('UTC')" - }, - "externalUrl": "http://airflow.example.com/tree?dag_id=dag_to_filter_from_ingestion", - "name": "dag_to_filter_from_ingestion", - "env": "PROD" - } - } -}, -{ - "entityType": "dataFlow", - "entityUrn": "urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - } -}, -{ - "entityType": "dataFlow", - "entityUrn": "urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod)", - "changeType": "UPSERT", - "aspectName": "ownership", - "aspect": { - "json": { - "owners": [ - { - "owner": "urn:li:corpuser:airflow", - "type": "DEVELOPER", - "source": { - "type": "SERVICE" - } - } - ], - "ownerTypes": {}, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:airflow" - } - } - } -}, -{ - "entityType": "dataFlow", - "entityUrn": "urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod)", - "changeType": "UPSERT", - "aspectName": "globalTags", - "aspect": { - "json": { - "tags": [] - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", - "changeType": "UPSERT", - "aspectName": "dataJobInfo", - "aspect": { - "json": { - "customProperties": { - "depends_on_past": "False", - "email": "None", - "label": "'task_dag_to_filter_from_ingestion_task_2'", - "execution_timeout": "None", - "sla": "None", - "task_id": "'task_dag_to_filter_from_ingestion_task_2'", - "trigger_rule": "", - "wait_for_downstream": "False", - "downstream_task_ids": "[]", - "inlets": "[]", - "outlets": "[]" - }, - "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=dag_to_filter_from_ingestion&_flt_3_task_id=task_dag_to_filter_from_ingestion_task_2", - "name": "task_dag_to_filter_from_ingestion_task_2", - "type": { - "string": "COMMAND" - }, - "env": "PROD" - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", - "changeType": "UPSERT", - "aspectName": "dataJobInputOutput", - "aspect": { - "json": { - "inputDatasets": [], - "outputDatasets": [], - "inputDatajobs": [ - "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_1)" - ], - "fineGrainedLineages": [] - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", - "changeType": "UPSERT", - "aspectName": "ownership", - "aspect": { - "json": { - "owners": [ - { - "owner": "urn:li:corpuser:airflow", - "type": "DEVELOPER", - "source": { - "type": "SERVICE" - } - } - ], - "ownerTypes": {}, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:airflow" - } - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", - "changeType": "UPSERT", - "aspectName": "globalTags", - "aspect": { - "json": { - "tags": [] - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", - "changeType": "UPSERT", - "aspectName": "dataJobInfo", - "aspect": { - "json": { - "customProperties": { - "depends_on_past": "False", - "email": "None", - "label": "'task_dag_to_filter_from_ingestion_task_2'", - "execution_timeout": "None", - "sla": "None", - "task_id": "'task_dag_to_filter_from_ingestion_task_2'", - "trigger_rule": "", - "wait_for_downstream": "False", - "downstream_task_ids": "[]", - "inlets": "[]", - "outlets": "[]" - }, - "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=dag_to_filter_from_ingestion&_flt_3_task_id=task_dag_to_filter_from_ingestion_task_2", - "name": "task_dag_to_filter_from_ingestion_task_2", - "type": { - "string": "COMMAND" - }, - "env": "PROD" - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", - "changeType": "UPSERT", - "aspectName": "dataJobInputOutput", - "aspect": { - "json": { - "inputDatasets": [], - "outputDatasets": [], - "inputDatajobs": [ - "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_1)" - ], - "fineGrainedLineages": [] - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", - "changeType": "UPSERT", - "aspectName": "ownership", - "aspect": { - "json": { - "owners": [ - { - "owner": "urn:li:corpuser:airflow", - "type": "DEVELOPER", - "source": { - "type": "SERVICE" - } - } - ], - "ownerTypes": {}, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:airflow" - } - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", - "changeType": "UPSERT", - "aspectName": "globalTags", - "aspect": { - "json": { - "tags": [] - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:d9d4c9c0c7114d033f2def8f9ac00991", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceProperties", - "aspect": { - "json": { - "customProperties": { - "run_id": "manual_run_test", - "duration": "", - "start_date": "", - "end_date": "", - "execution_date": "2023-09-27 21:34:38+00:00", - "try_number": "1", - "max_tries": "0", - "external_executor_id": "None", - "state": "success", - "operator": "BashOperator", - "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_dag_to_filter_from_ingestion_task_2&dag_id=dag_to_filter_from_ingestion&map_index=-1", - "orchestrator": "airflow", - "dag_id": "dag_to_filter_from_ingestion", - "task_id": "task_dag_to_filter_from_ingestion_task_2" - }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_dag_to_filter_from_ingestion_task_2&dag_id=dag_to_filter_from_ingestion&map_index=-1", - "name": "dag_to_filter_from_ingestion_task_dag_to_filter_from_ingestion_task_2_manual_run_test", - "type": "BATCH_AD_HOC", - "created": { - "time": 1728377976166, - "actor": "urn:li:corpuser:datahub" - } - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:d9d4c9c0c7114d033f2def8f9ac00991", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRelationships", - "aspect": { - "json": { - "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_to_filter_from_ingestion,prod),task_dag_to_filter_from_ingestion_task_2)", - "upstreamInstances": [] - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:d9d4c9c0c7114d033f2def8f9ac00991", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRunEvent", - "aspect": { - "json": { - "timestampMillis": 1728377976166, - "partitionSpec": { - "partition": "FULL_TABLE_SNAPSHOT", - "type": "FULL_TABLE" - }, - "status": "STARTED", - "attempt": 2 - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:d9d4c9c0c7114d033f2def8f9ac00991", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRunEvent", - "aspect": { - "json": { - "timestampMillis": 1728377976307, - "partitionSpec": { - "partition": "FULL_TABLE_SNAPSHOT", - "type": "FULL_TABLE" - }, - "status": "COMPLETE", - "result": { - "type": "SUCCESS", - "nativeResultType": "airflow" - } - } - } -} -] \ No newline at end of file diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index 7d3b1aa2f16ef..f83d0211819ad 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -33,6 +33,8 @@ DAGS_FOLDER = pathlib.Path(__file__).parent / "dags" GOLDENS_FOLDER = pathlib.Path(__file__).parent / "goldens" +NAME_OF_DAG_TO_FILTER_FROM_INGESTION = "dag_to_filter_from_ingestion" + @dataclasses.dataclass class AirflowInstance: @@ -140,9 +142,7 @@ def _run_airflow( # Configure the datahub plugin and have it write the MCPs to a file. "AIRFLOW__CORE__LAZY_LOAD_PLUGINS": "False" if is_v1 else "True", "AIRFLOW__DATAHUB__CONN_ID": datahub_connection_name, - "AIRFLOW__DATAHUB__DAG_ALLOW_DENY_PATTERN": str( - '{ "deny": ["{dag_to_filter_from_ingestion}"] }' - ), + "AIRFLOW__DATAHUB__DAG_ALLOW_DENY_PATTERN": f'{{ "deny": ["{NAME_OF_DAG_TO_FILTER_FROM_INGESTION}"] }}', f"AIRFLOW_CONN_{datahub_connection_name.upper()}": Connection( conn_id="datahub_file_default", conn_type="datahub-file", @@ -279,7 +279,7 @@ class DagTestCase: test_cases = [ DagTestCase("simple_dag"), DagTestCase("basic_iolets"), - DagTestCase("dag_to_filter_from_ingestion"), + DagTestCase("dag_to_filter_from_ingestion", v2_only=True), DagTestCase("snowflake_operator", success=False, v2_only=True), DagTestCase("sqlite_operator", v2_only=True), DagTestCase("custom_operator_dag", v2_only=True), @@ -377,26 +377,22 @@ def test_airflow_plugin( print("Sleeping for a few seconds to let the plugin finish...") time.sleep(10) - _sanitize_output_file(airflow_instance.metadata_file) - - """ - Golden file will get generated for the `filtered DAG` - but this golden file will not having anything with respected - to ingested properties, as nothing will be ingested from the filtered DAG - """ - - check_golden_file( - pytestconfig=pytestconfig, - output_path=airflow_instance.metadata_file, - golden_path=golden_path, - ignore_paths=[ - # TODO: If we switched to Git urls, maybe we could get this to work consistently. - r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['datahub_sql_parser_error'\]", - r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['openlineage_.*'\]", - r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['log_url'\]", - r"root\[\d+\]\['aspect'\]\['json'\]\['externalUrl'\]", - ], - ) + # Golden file will get generated for the `filtered DAG` + if dag_id != NAME_OF_DAG_TO_FILTER_FROM_INGESTION: + _sanitize_output_file(airflow_instance.metadata_file) + + check_golden_file( + pytestconfig=pytestconfig, + output_path=airflow_instance.metadata_file, + golden_path=golden_path, + ignore_paths=[ + # TODO: If we switched to Git urls, maybe we could get this to work consistently. + r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['datahub_sql_parser_error'\]", + r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['openlineage_.*'\]", + r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['log_url'\]", + r"root\[\d+\]\['aspect'\]\['json'\]\['externalUrl'\]", + ], + ) def _sanitize_output_file(output_path: pathlib.Path) -> None: From 0807f52fbe9949de2073d875c569a6c83cd21c14 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Tue, 8 Oct 2024 13:40:55 +0200 Subject: [PATCH 07/15] fix(ingestion/airflow-plugin): typo --- .../airflow-plugin/tests/integration/test_plugin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index f83d0211819ad..e887898b72e0c 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -377,7 +377,7 @@ def test_airflow_plugin( print("Sleeping for a few seconds to let the plugin finish...") time.sleep(10) - # Golden file will get generated for the `filtered DAG` + # Golden file will NOT get generated for the `filtered DAG` if dag_id != NAME_OF_DAG_TO_FILTER_FROM_INGESTION: _sanitize_output_file(airflow_instance.metadata_file) From 7b5007357c03ea614dbb842160398b0fcab4adb8 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Wed, 9 Oct 2024 08:43:24 +0200 Subject: [PATCH 08/15] fix(ingestion/airflow-plugin): review comments --- docs/lineage/airflow.md | 2 +- .../src/datahub_airflow_plugin/_config.py | 8 +- .../datahub_listener.py | 225 +++++++++--------- ...stion.py => dag_to_skip_from_ingestion.py} | 6 +- .../tests/integration/test_plugin.py | 12 +- 5 files changed, 130 insertions(+), 123 deletions(-) rename metadata-ingestion-modules/airflow-plugin/tests/integration/dags/{dag_to_filter_from_ingestion.py => dag_to_skip_from_ingestion.py} (85%) diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index a2837c6220543..17edd3fc5e175 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -144,7 +144,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default | datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. | | | | graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. | -| dag_allow_deny_pattern_str | { "allow": [".*"] } | AllowDenyPattern value in form of JSON string to filter the DAGs from running. | +| dag_filter_str | { "allow": [".*"] } | AllowDenyPattern value in form of JSON string to filter the DAGs from running. | #### Validate that the plugin is working diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index 5729576aabd0f..3f3c49a543d8b 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -57,7 +57,7 @@ class DatahubLineageConfig(ConfigModel): datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE - dag_allow_deny_pattern: AllowDenyPattern = Field( + dag_filter_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), description="regex patterns for DAGs to ingest", ) @@ -93,8 +93,8 @@ def get_lineage_config() -> DatahubLineageConfig: datajob_url_link = conf.get( "datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value ) - dag_allow_deny_pattern = AllowDenyPattern.parse_raw( - conf.get("datahub", "dag_allow_deny_pattern", fallback='{"allow": [".*"]}') + dag_filter_pattern = AllowDenyPattern.parse_raw( + conf.get("datahub", "dag_filter_str", fallback='{"allow": [".*"]}') ) return DatahubLineageConfig( @@ -111,5 +111,5 @@ def get_lineage_config() -> DatahubLineageConfig: debug_emitter=debug_emitter, disable_openlineage_plugin=disable_openlineage_plugin, datajob_url_link=datajob_url_link, - dag_allow_deny_pattern=dag_allow_deny_pattern, + dag_filter_pattern=dag_filter_pattern, ) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index 8fc02131f4c96..76d5b8dfec241 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -386,97 +386,98 @@ def on_task_instance_running( f"DataHub listener got notification about task instance start for {task_instance.task_id} of dag {task_instance.dag_run.dag_id}" ) - if self.config.dag_allow_deny_pattern.allowed(task_instance.dag_run.dag_id): - task_instance = _render_templates(task_instance) - - # The type ignore is to placate mypy on Airflow 2.1.x. - dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined] - task = task_instance.task - assert task is not None - dag: "DAG" = task.dag # type: ignore[assignment] + if not self.config.dag_filter_pattern.allowed(task_instance.dag_run.dag_id): + logger.debug( + f"DAG {task_instance.dag_run.dag_id} is not allowed by the pattern" + ) + return - self._task_holder.set_task(task_instance) + task_instance = _render_templates(task_instance) - # Handle async operators in Airflow 2.3 by skipping deferred state. - # Inspired by https://github.com/OpenLineage/OpenLineage/pull/1601 - if task_instance.next_method is not None: # type: ignore[attr-defined] - return + # The type ignore is to placate mypy on Airflow 2.1.x. + dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined] + task = task_instance.task + assert task is not None + dag: "DAG" = task.dag # type: ignore[assignment] - # If we don't have the DAG listener API, we just pretend that - # the start of the task is the start of the DAG. - # This generates duplicate events, but it's better than not - # generating anything. - if not HAS_AIRFLOW_DAG_LISTENER_API: - self.on_dag_start(dagrun) + self._task_holder.set_task(task_instance) - datajob = AirflowGenerator.generate_datajob( - cluster=self.config.cluster, - task=task, - dag=dag, - capture_tags=self.config.capture_tags_info, - capture_owner=self.config.capture_ownership_info, - config=self.config, - ) + # Handle async operators in Airflow 2.3 by skipping deferred state. + # Inspired by https://github.com/OpenLineage/OpenLineage/pull/1601 + if task_instance.next_method is not None: # type: ignore[attr-defined] + return - # TODO: Make use of get_task_location to extract github urls. + # If we don't have the DAG listener API, we just pretend that + # the start of the task is the start of the DAG. + # This generates duplicate events, but it's better than not + # generating anything. + if not HAS_AIRFLOW_DAG_LISTENER_API: + self.on_dag_start(dagrun) - # Add lineage info. - self._extract_lineage(datajob, dagrun, task, task_instance) + datajob = AirflowGenerator.generate_datajob( + cluster=self.config.cluster, + task=task, + dag=dag, + capture_tags=self.config.capture_tags_info, + capture_owner=self.config.capture_ownership_info, + config=self.config, + ) - # TODO: Add handling for Airflow mapped tasks using task_instance.map_index + # TODO: Make use of get_task_location to extract github urls. - for mcp in datajob.generate_mcp( - materialize_iolets=self.config.materialize_iolets - ): - self.emitter.emit(mcp, self._make_emit_callback()) - logger.debug(f"Emitted DataHub Datajob start: {datajob}") + # Add lineage info. + self._extract_lineage(datajob, dagrun, task, task_instance) - if self.config.capture_executions: - dpi = AirflowGenerator.run_datajob( - emitter=self.emitter, - config=self.config, - ti=task_instance, - dag=dag, - dag_run=dagrun, - datajob=datajob, - emit_templates=False, - ) - logger.debug(f"Emitted DataHub DataProcess Instance start: {dpi}") + # TODO: Add handling for Airflow mapped tasks using task_instance.map_index - self.emitter.flush() + for mcp in datajob.generate_mcp( + materialize_iolets=self.config.materialize_iolets + ): + self.emitter.emit(mcp, self._make_emit_callback()) + logger.debug(f"Emitted DataHub Datajob start: {datajob}") - logger.debug( - f"DataHub listener finished processing notification about task instance start for {task_instance.task_id}" + if self.config.capture_executions: + dpi = AirflowGenerator.run_datajob( + emitter=self.emitter, + config=self.config, + ti=task_instance, + dag=dag, + dag_run=dagrun, + datajob=datajob, + emit_templates=False, ) + logger.debug(f"Emitted DataHub DataProcess Instance start: {dpi}") - if self.config.materialize_iolets: - for outlet in datajob.outlets: - reported_time: int = int(time.time() * 1000) - operation = OperationClass( - timestampMillis=reported_time, - operationType=OperationTypeClass.CREATE, - lastUpdatedTimestamp=reported_time, - actor=builder.make_user_urn("airflow"), - ) + self.emitter.flush() - operation_mcp = MetadataChangeProposalWrapper( - entityUrn=str(outlet), aspect=operation - ) + logger.debug( + f"DataHub listener finished processing notification about task instance start for {task_instance.task_id}" + ) - self.emitter.emit(operation_mcp) - logger.debug(f"Emitted Dataset Operation: {outlet}") - else: - if self.graph: - for outlet in datajob.outlets: - if not self.graph.exists(str(outlet)): - logger.warning(f"Dataset {str(outlet)} not materialized") - for inlet in datajob.inlets: - if not self.graph.exists(str(inlet)): - logger.warning(f"Dataset {str(inlet)} not materialized") + if self.config.materialize_iolets: + for outlet in datajob.outlets: + reported_time: int = int(time.time() * 1000) + operation = OperationClass( + timestampMillis=reported_time, + operationType=OperationTypeClass.CREATE, + lastUpdatedTimestamp=reported_time, + actor=builder.make_user_urn("airflow"), + ) + + operation_mcp = MetadataChangeProposalWrapper( + entityUrn=str(outlet), aspect=operation + ) + + self.emitter.emit(operation_mcp) + logger.debug(f"Emitted Dataset Operation: {outlet}") else: - logger.debug( - f"DAG {task_instance.dag_run.dag_id} is not allowed by the pattern" - ) + if self.graph: + for outlet in datajob.outlets: + if not self.graph.exists(str(outlet)): + logger.warning(f"Dataset {str(outlet)} not materialized") + for inlet in datajob.inlets: + if not self.graph.exists(str(inlet)): + logger.warning(f"Dataset {str(inlet)} not materialized") def on_task_instance_finish( self, task_instance: "TaskInstance", status: InstanceRunResult @@ -495,45 +496,46 @@ def on_task_instance_finish( dag: "DAG" = task.dag # type: ignore[assignment] - if self.config.dag_allow_deny_pattern.allowed(dag.dag_id): - datajob = AirflowGenerator.generate_datajob( + if not self.config.dag_filter_pattern.allowed(dag.dag_id): + logger.debug(f"DAG {dag.dag_id} is not allowed by the pattern") + return + + datajob = AirflowGenerator.generate_datajob( + cluster=self.config.cluster, + task=task, + dag=dag, + capture_tags=self.config.capture_tags_info, + capture_owner=self.config.capture_ownership_info, + config=self.config, + ) + + # Add lineage info. + self._extract_lineage(datajob, dagrun, task, task_instance, complete=True) + + for mcp in datajob.generate_mcp( + materialize_iolets=self.config.materialize_iolets + ): + self.emitter.emit(mcp, self._make_emit_callback()) + logger.debug( + f"Emitted DataHub Datajob finish w/ status {status}: {datajob}" + ) + + if self.config.capture_executions: + dpi = AirflowGenerator.complete_datajob( + emitter=self.emitter, cluster=self.config.cluster, - task=task, + ti=task_instance, dag=dag, - capture_tags=self.config.capture_tags_info, - capture_owner=self.config.capture_ownership_info, + dag_run=dagrun, + datajob=datajob, + result=status, config=self.config, ) - - # Add lineage info. - self._extract_lineage(datajob, dagrun, task, task_instance, complete=True) - - for mcp in datajob.generate_mcp( - materialize_iolets=self.config.materialize_iolets - ): - self.emitter.emit(mcp, self._make_emit_callback()) logger.debug( - f"Emitted DataHub Datajob finish w/ status {status}: {datajob}" + f"Emitted DataHub DataProcess Instance with status {status}: {dpi}" ) - if self.config.capture_executions: - dpi = AirflowGenerator.complete_datajob( - emitter=self.emitter, - cluster=self.config.cluster, - ti=task_instance, - dag=dag, - dag_run=dagrun, - datajob=datajob, - result=status, - config=self.config, - ) - logger.debug( - f"Emitted DataHub DataProcess Instance with status {status}: {dpi}" - ) - - self.emitter.flush() - else: - logger.debug(f"DAG {dag.dag_id} is not allowed by the pattern") + self.emitter.flush() @hookimpl @run_in_thread @@ -698,11 +700,12 @@ def on_dag_run_running(self, dag_run: "DagRun", msg: str) -> None: ) assert dag_run.dag_id - if self.config.dag_allow_deny_pattern.allowed(dag_run.dag_id): - self.on_dag_start(dag_run) - self.emitter.flush() - else: + if not self.config.dag_filter_pattern.allowed(dag_run.dag_id): logger.debug(f"DAG {dag_run.dag_id} is not allowed by the pattern") + return + + self.on_dag_start(dag_run) + self.emitter.flush() # TODO: Add hooks for on_dag_run_success, on_dag_run_failed -> call AirflowGenerator.complete_dataflow diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_filter_from_ingestion.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_skip_from_ingestion.py similarity index 85% rename from metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_filter_from_ingestion.py rename to metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_skip_from_ingestion.py index 811278366ceba..5401dca33fa6e 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_filter_from_ingestion.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_skip_from_ingestion.py @@ -6,13 +6,13 @@ from datahub_airflow_plugin.entities import Dataset, Urn with DAG( - "dag_to_filter_from_ingestion", + "dag_to_skip", start_date=datetime(2023, 1, 1), schedule_interval=None, catchup=False, ) as dag: task1 = BashOperator( - task_id="task_dag_to_filter_from_ingestion_task_1", + task_id="dag_to_skip_task_1", dag=dag, bash_command="echo 'task_dag_to_filter_from_ingestion_task_1'", inlets=[ @@ -26,7 +26,7 @@ ) task2 = BashOperator( - task_id="task_dag_to_filter_from_ingestion_task_2", + task_id="dag_to_skip_task_2", dag=dag, bash_command="echo 'task_dag_to_filter_from_ingestion_task_2'", ) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index e887898b72e0c..aa3227f5d56bd 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -33,7 +33,7 @@ DAGS_FOLDER = pathlib.Path(__file__).parent / "dags" GOLDENS_FOLDER = pathlib.Path(__file__).parent / "goldens" -NAME_OF_DAG_TO_FILTER_FROM_INGESTION = "dag_to_filter_from_ingestion" +DAG_TO_SKIP_INGESTION = "dag_to_skip" @dataclasses.dataclass @@ -142,7 +142,7 @@ def _run_airflow( # Configure the datahub plugin and have it write the MCPs to a file. "AIRFLOW__CORE__LAZY_LOAD_PLUGINS": "False" if is_v1 else "True", "AIRFLOW__DATAHUB__CONN_ID": datahub_connection_name, - "AIRFLOW__DATAHUB__DAG_ALLOW_DENY_PATTERN": f'{{ "deny": ["{NAME_OF_DAG_TO_FILTER_FROM_INGESTION}"] }}', + "AIRFLOW__DATAHUB__DAG_FILTER_STR": f'{{ "deny": ["{DAG_TO_SKIP_INGESTION}"] }}', f"AIRFLOW_CONN_{datahub_connection_name.upper()}": Connection( conn_id="datahub_file_default", conn_type="datahub-file", @@ -377,8 +377,12 @@ def test_airflow_plugin( print("Sleeping for a few seconds to let the plugin finish...") time.sleep(10) - # Golden file will NOT get generated for the `filtered DAG` - if dag_id != NAME_OF_DAG_TO_FILTER_FROM_INGESTION: + ''' + we need to check that the golden file is missing / empty + when the dag_id is DAG_TO_SKIP_INGESTION + otherwise, this test doesn't actually do anything + ''' + if dag_id != DAG_TO_SKIP_INGESTION: _sanitize_output_file(airflow_instance.metadata_file) check_golden_file( From 253a3535b9c9e887d55351449c6a44e14f0dff44 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Wed, 9 Oct 2024 08:46:17 +0200 Subject: [PATCH 09/15] fix(ingestion/airflow-plugin): fixed typo --- .../airflow-plugin/tests/integration/test_plugin.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index aa3227f5d56bd..1a035e422492a 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -377,11 +377,11 @@ def test_airflow_plugin( print("Sleeping for a few seconds to let the plugin finish...") time.sleep(10) - ''' + """ we need to check that the golden file is missing / empty when the dag_id is DAG_TO_SKIP_INGESTION otherwise, this test doesn't actually do anything - ''' + """ if dag_id != DAG_TO_SKIP_INGESTION: _sanitize_output_file(airflow_instance.metadata_file) From e91bc88e01e64a6953f9d860402c21cfa8cb4e98 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Wed, 9 Oct 2024 08:51:55 +0200 Subject: [PATCH 10/15] fix(ingestion/airflow-plugin): linter issue fixed --- .../src/datahub_airflow_plugin/datahub_listener.py | 4 +--- .../airflow-plugin/tests/integration/test_plugin.py | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index 76d5b8dfec241..de707d1abd69d 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -516,9 +516,7 @@ def on_task_instance_finish( materialize_iolets=self.config.materialize_iolets ): self.emitter.emit(mcp, self._make_emit_callback()) - logger.debug( - f"Emitted DataHub Datajob finish w/ status {status}: {datajob}" - ) + logger.debug(f"Emitted DataHub Datajob finish w/ status {status}: {datajob}") if self.config.capture_executions: dpi = AirflowGenerator.complete_datajob( diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index 1a035e422492a..9bc3b1006d702 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -378,7 +378,7 @@ def test_airflow_plugin( time.sleep(10) """ - we need to check that the golden file is missing / empty + we need to check that the golden file is missing / empty when the dag_id is DAG_TO_SKIP_INGESTION otherwise, this test doesn't actually do anything """ From ed900802f4c08d9894833d3f3037312e6c199e67 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Wed, 9 Oct 2024 09:08:00 +0200 Subject: [PATCH 11/15] fix(ingestion/airflow-plugin): fixed failing test --- .../dags/{dag_to_skip_from_ingestion.py => dag_to_skip.py} | 4 ++-- .../airflow-plugin/tests/integration/test_plugin.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename metadata-ingestion-modules/airflow-plugin/tests/integration/dags/{dag_to_skip_from_ingestion.py => dag_to_skip.py} (85%) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_skip_from_ingestion.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_skip.py similarity index 85% rename from metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_skip_from_ingestion.py rename to metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_skip.py index 5401dca33fa6e..a805a2219d142 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_skip_from_ingestion.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_skip.py @@ -14,7 +14,7 @@ task1 = BashOperator( task_id="dag_to_skip_task_1", dag=dag, - bash_command="echo 'task_dag_to_filter_from_ingestion_task_1'", + bash_command="echo 'dag_to_skip_task_1'", inlets=[ Dataset(platform="snowflake", name="mydb.schema.tableA"), Urn( @@ -28,7 +28,7 @@ task2 = BashOperator( task_id="dag_to_skip_task_2", dag=dag, - bash_command="echo 'task_dag_to_filter_from_ingestion_task_2'", + bash_command="echo 'dag_to_skip_task_2'", ) task1 >> task2 diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index 9bc3b1006d702..bd60137f75f0a 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -279,7 +279,7 @@ class DagTestCase: test_cases = [ DagTestCase("simple_dag"), DagTestCase("basic_iolets"), - DagTestCase("dag_to_filter_from_ingestion", v2_only=True), + DagTestCase("dag_to_skip", v2_only=True), DagTestCase("snowflake_operator", success=False, v2_only=True), DagTestCase("sqlite_operator", v2_only=True), DagTestCase("custom_operator_dag", v2_only=True), From c40f3e8bc58a359f6430a846059863b121bda553 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Thu, 10 Oct 2024 07:05:33 +0200 Subject: [PATCH 12/15] fix(ingestion/airflow-plugin): updated test --- .../airflow-plugin/tests/integration/test_plugin.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index bd60137f75f0a..cf790e91ae438 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -378,11 +378,12 @@ def test_airflow_plugin( time.sleep(10) """ - we need to check that the golden file is missing / empty + verify that golden file is missing / empty when the dag_id is DAG_TO_SKIP_INGESTION - otherwise, this test doesn't actually do anything """ - if dag_id != DAG_TO_SKIP_INGESTION: + if dag_id == DAG_TO_SKIP_INGESTION: + assert not os.path.exists(airflow_instance.metadata_file) + else: _sanitize_output_file(airflow_instance.metadata_file) check_golden_file( From c4cacd30d8a540d789aae0c34d199fdca9c8224f Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 10 Oct 2024 15:05:12 -0700 Subject: [PATCH 13/15] Update test_plugin.py --- .../airflow-plugin/tests/integration/test_plugin.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index cf790e91ae438..44efd94f834b1 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -377,11 +377,8 @@ def test_airflow_plugin( print("Sleeping for a few seconds to let the plugin finish...") time.sleep(10) - """ - verify that golden file is missing / empty - when the dag_id is DAG_TO_SKIP_INGESTION - """ if dag_id == DAG_TO_SKIP_INGESTION: + # Verify that no MCPs were generated. assert not os.path.exists(airflow_instance.metadata_file) else: _sanitize_output_file(airflow_instance.metadata_file) From ed9c2ff7e071787540f2561cbb73ce8d1a903355 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 10 Oct 2024 15:08:01 -0700 Subject: [PATCH 14/15] Update _config.py --- .../airflow-plugin/src/datahub_airflow_plugin/_config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index 92a3413a3c4b5..e120dfaa59727 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -117,5 +117,6 @@ def get_lineage_config() -> DatahubLineageConfig: debug_emitter=debug_emitter, disable_openlineage_plugin=disable_openlineage_plugin, datajob_url_link=datajob_url_link, + render_templates=render_templates, dag_filter_pattern=dag_filter_pattern, ) From f53b926c466ca3e9fc33e0785f9a3aae94ffcf7b Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 10 Oct 2024 22:06:06 -0700 Subject: [PATCH 15/15] Update _config.py --- .../airflow-plugin/src/datahub_airflow_plugin/_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index e120dfaa59727..c4964712cf9f7 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -56,7 +56,7 @@ class DatahubLineageConfig(ConfigModel): # If true, ti.render_templates() will be called in the listener. # Makes extraction of jinja-templated fields more accurate. render_templates: bool = True - + dag_filter_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), description="regex patterns for DAGs to ingest",