Skip to content

Commit

Permalink
fix(ingestion/airflow-plugin): emit browsePathV2 (#10738)
Browse files Browse the repository at this point in the history
  • Loading branch information
dushayntAW authored Jun 20, 2024
1 parent 1b56035 commit 190f09a
Show file tree
Hide file tree
Showing 9 changed files with 343 additions and 381 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
BrowsePathEntryClass,
BrowsePathsV2Class,
DataFlowKeyClass,
DataJobKeyClass,
FineGrainedLineageClass,
Expand Down Expand Up @@ -544,6 +546,16 @@ def on_dag_start(self, dag_run: "DagRun") -> None:

self.emitter.emit(event)

browse_path_v2_event: MetadataChangeProposalWrapper = (
MetadataChangeProposalWrapper(
entityUrn=str(dataflow.urn),
aspect=BrowsePathsV2Class(
path=[BrowsePathEntryClass(str(dag.dag_id))],
),
)
)
self.emitter.emit(browse_path_v2_event)

if dag.dag_id == _DATAHUB_CLEANUP_DAG:
assert self.graph

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "basic_iolets"
}
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
Expand Down Expand Up @@ -242,16 +257,16 @@
"state": "running",
"operator": "BashOperator",
"priority_weight": "1",
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1",
"log_url": "http://airflow.example.com/dags/basic_iolets/grid?dag_run_id=manual_run_test&task_id=run_data_task&map_index=-1&tab=logs",
"orchestrator": "airflow",
"dag_id": "basic_iolets",
"task_id": "run_data_task"
},
"externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1",
"externalUrl": "http://airflow.example.com/dags/basic_iolets/grid?dag_run_id=manual_run_test&task_id=run_data_task&map_index=-1&tab=logs",
"name": "basic_iolets_run_data_task_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1717179624988,
"time": 1718733614956,
"actor": "urn:li:corpuser:datahub"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "basic_iolets"
}
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "simple_dag"
}
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
Expand Down Expand Up @@ -201,16 +216,16 @@
"state": "running",
"operator": "BashOperator",
"priority_weight": "2",
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1",
"log_url": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=task_1&map_index=-1&tab=logs",
"orchestrator": "airflow",
"dag_id": "simple_dag",
"task_id": "task_1"
},
"externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1",
"externalUrl": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=task_1&map_index=-1&tab=logs",
"name": "simple_dag_task_1_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1717179559032,
"time": 1718733547259,
"actor": "urn:li:corpuser:datahub"
}
}
Expand Down Expand Up @@ -572,16 +587,16 @@
"state": "running",
"operator": "BashOperator",
"priority_weight": "1",
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1",
"log_url": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=run_another_data_task&map_index=-1&tab=logs",
"orchestrator": "airflow",
"dag_id": "simple_dag",
"task_id": "run_another_data_task"
},
"externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1",
"externalUrl": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=run_another_data_task&map_index=-1&tab=logs",
"name": "simple_dag_run_another_data_task_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1717179564453,
"time": 1718733551439,
"actor": "urn:li:corpuser:datahub"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "simple_dag"
}
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,snowflake_operator,prod)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "snowflake_operator"
}
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,snowflake_operator,prod),transform_cost_table)",
Expand Down Expand Up @@ -242,16 +257,16 @@
"state": "running",
"operator": "SnowflakeOperator",
"priority_weight": "1",
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=snowflake_operator&map_index=-1",
"log_url": "http://airflow.example.com/dags/snowflake_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&map_index=-1&tab=logs",
"orchestrator": "airflow",
"dag_id": "snowflake_operator",
"task_id": "transform_cost_table"
},
"externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=snowflake_operator&map_index=-1",
"externalUrl": "http://airflow.example.com/dags/snowflake_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&map_index=-1&tab=logs",
"name": "snowflake_operator_transform_cost_table_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1717179684292,
"time": 1718733682840,
"actor": "urn:li:corpuser:datahub"
}
}
Expand Down
Loading

0 comments on commit 190f09a

Please sign in to comment.