diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index 49f3246077453..734f4c5761aa1 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -222,8 +222,8 @@ def on_running(): tags=dag.tags, task=task_metadata, run_facets={ - **get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id), **get_user_provided_run_facets(task_instance, TaskInstanceState.RUNNING), + **get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id), **get_airflow_mapped_task_facet(task_instance), **get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid), **debug_facet, @@ -349,8 +349,8 @@ def on_success(): nominal_start_time=data_interval_start, nominal_end_time=data_interval_end, run_facets={ - **get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id), **get_user_provided_run_facets(task_instance, TaskInstanceState.SUCCESS), + **get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id), **get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid), **get_airflow_debug_facet(), }, @@ -487,8 +487,8 @@ def on_failure(): job_description=doc, job_description_type=doc_type, run_facets={ - **get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id), **get_user_provided_run_facets(task_instance, TaskInstanceState.FAILED), + **get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id), **get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid), **get_airflow_debug_facet(), }, diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py index 7763836628b28..941666b8be42c 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py @@ -314,8 +314,8 @@ def mock_task_id(dag_id, task_id, try_number, logical_date, map_index): ) def test_adapter_start_task_is_called_with_proper_arguments( self, - mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, + mock_get_airflow_mapped_task_facet, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, mock_disabled, @@ -332,7 +332,7 @@ def test_adapter_start_task_is_called_with_proper_arguments( listener, task_instance = self._create_listener_and_task_instance() mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -371,8 +371,8 @@ def test_adapter_start_task_is_called_with_proper_arguments( ) def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default( self, - mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, + mock_get_airflow_mapped_task_facet, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, mock_disabled, @@ -381,7 +381,7 @@ def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default ): listener, task_instance = self._create_listener_and_task_instance() mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -405,8 +405,8 @@ def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default def test_adapter_start_task_is_called_with_dag_description_when_task_doc_is_empty( self, mock_get_job_name, - mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, + mock_get_airflow_mapped_task_facet, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, mock_disabled, @@ -416,7 +416,7 @@ def test_adapter_start_task_is_called_with_dag_description_when_task_doc_is_empt listener, task_instance = self._create_listener_and_task_instance() mock_get_job_name.return_value = "job_name" mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -457,7 +457,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments( listener, task_instance = self._create_listener_and_task_instance() task_instance.logical_date = timezone.datetime(2020, 1, 1, 1, 1, 1) - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -499,8 +499,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments( ) def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default( self, - mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, + mock_get_airflow_mapped_task_facet, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, mock_disabled, @@ -509,7 +509,7 @@ def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default( ): listener, task_instance = self._create_listener_and_task_instance() mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -534,8 +534,8 @@ def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default( ) def test_adapter_fail_task_is_called_with_dag_description_when_task_doc_is_empty( self, - mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, + mock_get_airflow_mapped_task_facet, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, mock_disabled, @@ -544,7 +544,7 @@ def test_adapter_fail_task_is_called_with_dag_description_when_task_doc_is_empty ): listener, task_instance = self._create_listener_and_task_instance() mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -588,7 +588,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments( time_machine.move_to(timezone.datetime(2023, 1, 3, 13, 1, 1)) listener, task_instance = self._create_listener_and_task_instance() - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -630,8 +630,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments( ) def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_default( self, - mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, + mock_get_airflow_mapped_task_facet, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, mock_disabled, @@ -640,7 +640,7 @@ def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_defa ): listener, task_instance = self._create_listener_and_task_instance() mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -662,8 +662,8 @@ def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_defa ) def test_adapter_complete_task_is_called_with_dag_description_when_task_doc_is_empty( self, - mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, + mock_get_airflow_mapped_task_facet, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, mock_disabled, @@ -672,7 +672,7 @@ def test_adapter_complete_task_is_called_with_dag_description_when_task_doc_is_e ): listener, task_instance = self._create_listener_and_task_instance() mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -1198,8 +1198,8 @@ def mock_task_id(dag_id, task_id, try_number, logical_date, map_index): ) def test_adapter_start_task_is_called_with_proper_arguments( self, - mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, + mock_get_airflow_mapped_task_facet, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, mock_disabled, @@ -1217,7 +1217,7 @@ def test_adapter_start_task_is_called_with_proper_arguments( listener, task_instance = self._create_listener_and_task_instance() mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -1256,8 +1256,8 @@ def test_adapter_start_task_is_called_with_proper_arguments( ) def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default( self, - mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, + mock_get_airflow_mapped_task_facet, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, mock_disabled, @@ -1266,7 +1266,7 @@ def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default ): listener, task_instance = self._create_listener_and_task_instance() mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -1291,8 +1291,8 @@ def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default def test_adapter_start_task_is_called_with_dag_description_when_task_doc_is_empty( self, mock_get_job_name, - mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, + mock_get_airflow_mapped_task_facet, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, mock_disabled, @@ -1302,7 +1302,7 @@ def test_adapter_start_task_is_called_with_dag_description_when_task_doc_is_empt listener, task_instance = self._create_listener_and_task_instance() mock_get_job_name.return_value = "job_name" mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -1343,7 +1343,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments( listener, task_instance = self._create_listener_and_task_instance() task_instance.get_template_context()["dag_run"].logical_date = timezone.datetime(2020, 1, 1, 1, 1, 1) - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -1383,8 +1383,8 @@ def test_adapter_fail_task_is_called_with_proper_arguments( ) def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default( self, - mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, + mock_get_airflow_mapped_task_facet, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, mock_disabled, @@ -1393,7 +1393,7 @@ def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default( ): listener, task_instance = self._create_listener_and_task_instance() mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -1418,8 +1418,8 @@ def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default( ) def test_adapter_fail_task_is_called_with_dag_description_when_task_doc_is_empty( self, - mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, + mock_get_airflow_mapped_task_facet, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, mock_disabled, @@ -1428,7 +1428,7 @@ def test_adapter_fail_task_is_called_with_dag_description_when_task_doc_is_empty ): listener, task_instance = self._create_listener_and_task_instance() mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -1526,7 +1526,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments( time_machine.move_to(timezone.datetime(2023, 1, 3, 13, 1, 1), tick=False) listener, task_instance = self._create_listener_and_task_instance() - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -1566,8 +1566,8 @@ def test_adapter_complete_task_is_called_with_proper_arguments( ) def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_default( self, - mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, + mock_get_airflow_mapped_task_facet, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, mock_disabled, @@ -1576,7 +1576,7 @@ def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_defa ): listener, task_instance = self._create_listener_and_task_instance() mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -1600,8 +1600,8 @@ def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_defa ) def test_adapter_complete_task_is_called_with_dag_description_when_task_doc_is_empty( self, - mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, + mock_get_airflow_mapped_task_facet, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, mock_disabled, @@ -1610,7 +1610,7 @@ def test_adapter_complete_task_is_called_with_dag_description_when_task_doc_is_e ): listener, task_instance = self._create_listener_and_task_instance() mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} - mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} + mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2, "parent": 99} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"}