diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/graph_usage_sample_dag.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/graph_usage_sample_dag.py new file mode 100644 index 00000000000000..d72ba67c23cd72 --- /dev/null +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/graph_usage_sample_dag.py @@ -0,0 +1,35 @@ +"""This example DAG demonstrates how to create and use a DataHubGraph client.""" + +from datetime import timedelta + +import pendulum +from airflow.decorators import dag, task +from datahub.ingestion.graph.client import DataHubGraph, RemovedStatusFilter + +from datahub_airflow_plugin.hooks.datahub import DatahubRestHook + + +@dag( + schedule_interval=timedelta(days=1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, +) +def datahub_graph_usage_sample_dag(): + @task() + def use_the_graph(): + graph: DataHubGraph = DatahubRestHook("my_datahub_rest_conn_id").make_graph() + graph.test_connection() + + # Example usage: Find all soft-deleted BigQuery DEV entities + # in DataHub, and hard delete them. + for urn in graph.get_urns_by_filter( + platform="bigquery", + env="DEV", + status=RemovedStatusFilter.ONLY_SOFT_DELETED, + ): + graph.hard_delete_entity(urn) + + use_the_graph() + + +datahub_graph_usage_sample_dag() diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py index b60f20c5bf8b28..5f4d787fb893d3 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py @@ -14,6 +14,7 @@ from datahub.emitter.kafka_emitter import DatahubKafkaEmitter from datahub.emitter.rest_emitter import DataHubRestEmitter from datahub.emitter.synchronized_file_emitter import SynchronizedFileEmitter + from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.sink.datahub_kafka import KafkaSinkConfig @@ -94,6 +95,9 @@ def make_emitter(self) -> "DataHubRestEmitter": host, token, **extra_args ) + def make_graph(self) -> "DataHubGraph": + return self.make_emitter().to_graph() + def emit( self, items: Sequence[