From b4da933f3ee86af66769efb89b491da72bc08a55 Mon Sep 17 00:00:00 2001 From: Ellie O'Neil <110510035+eboneil@users.noreply.github.com> Date: Fri, 12 Jul 2024 13:02:35 -0700 Subject: [PATCH 1/5] Add comma parsing of owners to datajobs --- .../datahub_airflow_plugin/client/airflow_generator.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index 8aa154dc267b6..681259bb2d43c 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -282,10 +282,12 @@ def generate_datajob( datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.flow_id}&_flt_3_task_id={task.task_id}" if capture_owner and dag.owner: - if config and config.capture_ownership_as_group: - datajob.group_owners.add(dag.owner) - else: - datajob.owners.add(dag.owner) + if config.capture_ownership_info and dag.owner: + owners = [owner.strip() for owner in dag.owner.split(",")] + if config.capture_ownership_as_group: + data_job.group_owners.update(owners) + else: + data_job.owners.update(owners) if capture_tags and dag.tags: datajob.tags.update(dag.tags) From 2081084beddfa50b0a13fee91fa7f147eb8960aa Mon Sep 17 00:00:00 2001 From: Ellie O'Neil <110510035+eboneil@users.noreply.github.com> Date: Fri, 12 Jul 2024 13:10:23 -0700 Subject: [PATCH 2/5] Update --- .../src/datahub_airflow_plugin/client/airflow_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index 681259bb2d43c..5db656db0332e 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -282,7 +282,7 @@ def generate_datajob( datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.flow_id}&_flt_3_task_id={task.task_id}" if capture_owner and dag.owner: - if config.capture_ownership_info and dag.owner: + if config and config.capture_ownership_info: owners = [owner.strip() for owner in dag.owner.split(",")] if config.capture_ownership_as_group: data_job.group_owners.update(owners) From eb567cc8a1ee16b3dec57a1c54ba9ab3918d1dc8 Mon Sep 17 00:00:00 2001 From: Ellie O'Neil <110510035+eboneil@users.noreply.github.com> Date: Fri, 12 Jul 2024 13:11:10 -0700 Subject: [PATCH 3/5] One more --- .../src/datahub_airflow_plugin/client/airflow_generator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index 5db656db0332e..44ed62334f204 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -285,9 +285,9 @@ def generate_datajob( if config and config.capture_ownership_info: owners = [owner.strip() for owner in dag.owner.split(",")] if config.capture_ownership_as_group: - data_job.group_owners.update(owners) + datajob.group_owners.update(owners) else: - data_job.owners.update(owners) + datajob.owners.update(owners) if capture_tags and dag.tags: datajob.tags.update(dag.tags) From 6d24978161d98ce5e65fa9e0f11e6401137caa8b Mon Sep 17 00:00:00 2001 From: Ellie O'Neil <110510035+eboneil@users.noreply.github.com> Date: Wed, 17 Jul 2024 14:25:06 -0700 Subject: [PATCH 4/5] Extract to helper function --- .../datahub_airflow_plugin/client/airflow_generator.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index 44ed62334f204..1dea367c423df 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -127,6 +127,10 @@ def _get_dependencies( ) return upstream_tasks + @staticmethod + def _extract_owners(dag: "DAG") -> list[str]: + return [owner.strip() for owner in dag.owner.split(",")] + @staticmethod def generate_dataflow( config: DatahubLineageConfig, @@ -175,7 +179,7 @@ def generate_dataflow( data_flow.url = f"{base_url}/tree?dag_id={dag.dag_id}" if config.capture_ownership_info and dag.owner: - owners = [owner.strip() for owner in dag.owner.split(",")] + owners = AirflowGenerator._extract_owners(dag) if config.capture_ownership_as_group: data_flow.group_owners.update(owners) else: @@ -283,7 +287,7 @@ def generate_datajob( if capture_owner and dag.owner: if config and config.capture_ownership_info: - owners = [owner.strip() for owner in dag.owner.split(",")] + owners = AirflowGenerator._extract_owners(dag) if config.capture_ownership_as_group: datajob.group_owners.update(owners) else: From d55cffbfae986d68add52e52bc95a23ed04a46df Mon Sep 17 00:00:00 2001 From: Ellie O'Neil <110510035+eboneil@users.noreply.github.com> Date: Wed, 17 Jul 2024 14:29:13 -0700 Subject: [PATCH 5/5] Lint --- .../src/datahub_airflow_plugin/client/airflow_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index 1dea367c423df..e9f93c0c1eab0 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -128,7 +128,7 @@ def _get_dependencies( return upstream_tasks @staticmethod - def _extract_owners(dag: "DAG") -> list[str]: + def _extract_owners(dag: "DAG") -> List[str]: return [owner.strip() for owner in dag.owner.split(",")] @staticmethod