diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index 8aa154dc267b6..e9f93c0c1eab0 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -127,6 +127,10 @@ def _get_dependencies( ) return upstream_tasks + @staticmethod + def _extract_owners(dag: "DAG") -> List[str]: + return [owner.strip() for owner in dag.owner.split(",")] + @staticmethod def generate_dataflow( config: DatahubLineageConfig, @@ -175,7 +179,7 @@ def generate_dataflow( data_flow.url = f"{base_url}/tree?dag_id={dag.dag_id}" if config.capture_ownership_info and dag.owner: - owners = [owner.strip() for owner in dag.owner.split(",")] + owners = AirflowGenerator._extract_owners(dag) if config.capture_ownership_as_group: data_flow.group_owners.update(owners) else: @@ -282,10 +286,12 @@ def generate_datajob( datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.flow_id}&_flt_3_task_id={task.task_id}" if capture_owner and dag.owner: - if config and config.capture_ownership_as_group: - datajob.group_owners.add(dag.owner) - else: - datajob.owners.add(dag.owner) + if config and config.capture_ownership_info: + owners = AirflowGenerator._extract_owners(dag) + if config.capture_ownership_as_group: + datajob.group_owners.update(owners) + else: + datajob.owners.update(owners) if capture_tags and dag.tags: datajob.tags.update(dag.tags)