diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index 76b35974c538b..e5bdcd510b44c 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -147,7 +147,7 @@ def _on_task_instance_running( "Skipping OpenLineage event emission for task `%s` " "due to lack of explicit lineage enablement for task or DAG while " "[openlineage] selective_enable is on.", - task.task_id, + task_instance.task_id, ) return @@ -170,14 +170,14 @@ def on_running(): clear_number = dagrun.clear_number parent_run_id = self.adapter.build_dag_run_id( - dag_id=dag.dag_id, + dag_id=task_instance.dag_id, logical_date=date, clear_number=clear_number, ) task_uuid = self.adapter.build_task_instance_run_id( - dag_id=dag.dag_id, - task_id=task.task_id, + dag_id=task_instance.dag_id, + task_id=task_instance.task_id, try_number=task_instance.try_number, logical_date=date, map_index=task_instance.map_index, @@ -199,7 +199,7 @@ def on_running(): redacted_event = self.adapter.start_task( run_id=task_uuid, - job_name=get_job_name(task), + job_name=get_job_name(task_instance), job_description=dag.description, event_time=start_date.isoformat(), nominal_start_time=data_interval_start, @@ -278,7 +278,7 @@ def _on_task_instance_success(self, task_instance: RuntimeTaskInstance, dag, dag "Skipping OpenLineage event emission for task `%s` " "due to lack of explicit lineage enablement for task or DAG while " "[openlineage] selective_enable is on.", - task.task_id, + task_instance.task_id, ) return @@ -289,14 +289,14 @@ def on_success(): date = dagrun.run_after parent_run_id = self.adapter.build_dag_run_id( - dag_id=dag.dag_id, + dag_id=task_instance.dag_id, logical_date=date, clear_number=dagrun.clear_number, ) task_uuid = self.adapter.build_task_instance_run_id( - dag_id=dag.dag_id, - task_id=task.task_id, + dag_id=task_instance.dag_id, + task_id=task_instance.task_id, try_number=task_instance.try_number, logical_date=date, map_index=task_instance.map_index, @@ -321,7 +321,7 @@ def on_success(): redacted_event = self.adapter.complete_task( run_id=task_uuid, - job_name=get_job_name(task), + job_name=get_job_name(task_instance), end_time=end_date.isoformat(), task=task_metadata, # If task owner is default ("airflow"), use DAG owner instead that may have more details @@ -409,7 +409,7 @@ def _on_task_instance_failed( "Skipping OpenLineage event emission for task `%s` " "due to lack of explicit lineage enablement for task or DAG while " "[openlineage] selective_enable is on.", - task.task_id, + task_instance.task_id, ) return @@ -420,14 +420,14 @@ def on_failure(): date = dagrun.run_after parent_run_id = self.adapter.build_dag_run_id( - dag_id=dag.dag_id, + dag_id=task_instance.dag_id, logical_date=date, clear_number=dagrun.clear_number, ) task_uuid = self.adapter.build_task_instance_run_id( - dag_id=dag.dag_id, - task_id=task.task_id, + dag_id=task_instance.dag_id, + task_id=task_instance.task_id, try_number=task_instance.try_number, logical_date=date, map_index=task_instance.map_index, @@ -452,7 +452,7 @@ def on_failure(): redacted_event = self.adapter.fail_task( run_id=task_uuid, - job_name=get_job_name(task), + job_name=get_job_name(task_instance), end_time=end_date.isoformat(), task=task_metadata, error=error, @@ -489,13 +489,13 @@ def _on_task_instance_manual_state_change( def on_state_change(): date = dagrun.logical_date or dagrun.run_after parent_run_id = self.adapter.build_dag_run_id( - dag_id=dagrun.dag_id, + dag_id=ti.dag_id, logical_date=date, clear_number=dagrun.clear_number, ) task_uuid = self.adapter.build_task_instance_run_id( - dag_id=dagrun.dag_id, + dag_id=ti.dag_id, task_id=ti.task_id, try_number=ti.try_number, logical_date=date, @@ -507,6 +507,10 @@ def on_state_change(): "job_name": get_job_name(ti), "end_time": end_date.isoformat(), "task": OperatorLineage(), + "nominal_start_time": None, + "nominal_end_time": None, + "tags": None, + "owners": None, "run_facets": { **get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=ti.dag_id), **get_airflow_debug_facet(), diff --git a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py index 9134cdb191a61..afe41bcc31839 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py +++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py @@ -79,6 +79,7 @@ SecretsMasker, should_hide_value_for_key, ) + from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance from airflow.utils.state import DagRunState, TaskInstanceState else: try: @@ -127,7 +128,7 @@ def get_operator_class(task: BaseOperator) -> type: return task.__class__ -def get_job_name(task: TaskInstance) -> str: +def get_job_name(task: TaskInstance | RuntimeTaskInstance) -> str: return f"{task.dag_id}.{task.task_id}" diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py index f268cdbe3cfb7..cd763bf6c48c8 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py @@ -249,7 +249,7 @@ def mock_task_id(dag_id, task_id, try_number, logical_date, map_index): task_instance.dag_run.clear_number = 0 task_instance.dag_run.execution_date = timezone.datetime(2020, 1, 1, 1, 1, 1) task_instance.task = mock.Mock() - task_instance.task.task_id = "task_id" + task_instance.task.task_id = "task_id_from_task_and_not_ti" task_instance.task.dag = mock.Mock() task_instance.task.dag.dag_id = "dag_id" task_instance.task.dag.description = "Test DAG Description" @@ -260,6 +260,7 @@ def mock_task_id(dag_id, task_id, try_number, logical_date, map_index): task_instance.task.outlets = [] task_instance.dag_id = "dag_id" task_instance.run_id = "dag_run_run_id" + task_instance.task_id = "task_id" task_instance.try_number = 1 task_instance.state = State.RUNNING task_instance.start_date = timezone.datetime(2023, 1, 1, 13, 1, 1) @@ -280,13 +281,11 @@ def mock_task_id(dag_id, task_id, try_number, logical_date, map_index): @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_mapped_task_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call ) def test_adapter_start_task_is_called_with_proper_arguments( self, - mock_get_job_name, mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, @@ -304,7 +303,6 @@ def test_adapter_start_task_is_called_with_proper_arguments( reflecting the comprehensive tracking of task execution contexts.""" listener, task_instance = self._create_listener_and_task_instance() - mock_get_job_name.return_value = "job_name" mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} @@ -315,7 +313,7 @@ def test_adapter_start_task_is_called_with_proper_arguments( listener.on_task_instance_running(None, task_instance, None) listener.adapter.start_task.assert_called_once_with( run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", - job_name="job_name", + job_name="dag_id.task_id", job_description="Test DAG Description", event_time="2023-01-01T13:01:01+00:00", nominal_start_time=None, @@ -339,13 +337,11 @@ def test_adapter_start_task_is_called_with_proper_arguments( @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_mapped_task_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call ) def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default( self, - mock_get_job_name, mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, @@ -355,7 +351,6 @@ def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default mock_debug_mode, ): listener, task_instance = self._create_listener_and_task_instance() - mock_get_job_name.return_value = "job_name" mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} @@ -373,7 +368,6 @@ def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default @mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call ) @@ -381,7 +375,6 @@ def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default def test_adapter_fail_task_is_called_with_proper_arguments( self, mock_utcnow, - mock_get_job_name, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, @@ -399,7 +392,6 @@ def test_adapter_fail_task_is_called_with_proper_arguments( listener, task_instance = self._create_listener_and_task_instance() task_instance.logical_date = timezone.datetime(2020, 1, 1, 1, 1, 1) - mock_get_job_name.return_value = "job_name" mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} mock_get_task_parent_run_facet.return_value = {"parent": 4} @@ -412,7 +404,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments( ) listener.adapter.fail_task.assert_called_once_with( end_time="2023-01-03T13:01:01+00:00", - job_name="job_name", + job_name="dag_id.task_id", run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", owners=["task_owner"], tags=["tag1", "tag2"], @@ -435,13 +427,11 @@ def test_adapter_fail_task_is_called_with_proper_arguments( @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_mapped_task_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call ) def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default( self, - mock_get_job_name, mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, @@ -451,7 +441,6 @@ def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default( mock_debug_mode, ): listener, task_instance = self._create_listener_and_task_instance() - mock_get_job_name.return_value = "job_name" mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} @@ -472,7 +461,6 @@ def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default( @mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call ) @@ -480,7 +468,6 @@ def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default( def test_adapter_complete_task_is_called_with_proper_arguments( self, mock_utcnow, - mock_get_job_name, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, @@ -498,7 +485,6 @@ def test_adapter_complete_task_is_called_with_proper_arguments( """ listener, task_instance = self._create_listener_and_task_instance() - mock_get_job_name.return_value = "job_name" mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} mock_get_task_parent_run_facet.return_value = {"parent": 4} @@ -512,7 +498,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments( assert len(calls) == 1 assert calls[0][1] == dict( end_time="2023-01-03T13:01:01+00:00", - job_name="job_name", + job_name="dag_id.task_id", run_id=f"2020-01-01T01:01:01+00:00.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}.-1", task=listener.extractor_manager.extract_metadata(), owners=["task_owner"], @@ -534,13 +520,11 @@ def test_adapter_complete_task_is_called_with_proper_arguments( @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_mapped_task_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call ) def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_default( self, - mock_get_job_name, mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, @@ -550,7 +534,6 @@ def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_defa mock_debug_mode, ): listener, task_instance = self._create_listener_and_task_instance() - mock_get_job_name.return_value = "job_name" mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} @@ -688,12 +671,10 @@ def success_callable(**kwargs): @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") def test_listener_on_task_instance_running_do_not_call_adapter_when_disabled_operator( - self, mock_get_job_name, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, mock_disabled + self, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, mock_disabled ): listener, task_instance = self._create_listener_and_task_instance() - mock_get_job_name.return_value = "job_name" mock_get_user_provided_run_facets.return_value = {"custom_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_disabled.return_value = True @@ -707,9 +688,8 @@ def test_listener_on_task_instance_running_do_not_call_adapter_when_disabled_ope @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") def test_listener_on_task_instance_failed_do_not_call_adapter_when_disabled_operator( - self, mock_get_job_name, mock_get_user_provided_run_facets, mock_disabled + self, mock_get_user_provided_run_facets, mock_disabled ): listener, task_instance = self._create_listener_and_task_instance() mock_get_user_provided_run_facets.return_value = {"custom_facet": 2} @@ -728,9 +708,8 @@ def test_listener_on_task_instance_failed_do_not_call_adapter_when_disabled_oper @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") def test_listener_on_task_instance_success_do_not_call_adapter_when_disabled_operator( - self, mock_get_job_name, mock_get_user_provided_run_facets, mock_disabled + self, mock_get_user_provided_run_facets, mock_disabled ): listener, task_instance = self._create_listener_and_task_instance() mock_get_user_provided_run_facets.return_value = {"custom_facet": 2} @@ -974,7 +953,7 @@ def _create_listener_and_task_instance( # TaskInstance is used when on API server (when listener gets called about manual state change) task_instance = TaskInstance(task=MagicMock()) # type: ignore task_instance.dag_run = DagRun() - task_instance.dag_run.dag_id = "dag_id" + task_instance.dag_run.dag_id = "dag_id_from_dagrun_and_not_ti" task_instance.dag_run.run_id = "dag_run_run_id" task_instance.dag_run.clear_number = 0 task_instance.dag_run.logical_date = timezone.datetime(2020, 1, 1, 1, 1, 1) @@ -982,6 +961,7 @@ def _create_listener_and_task_instance( task_instance.task = None task_instance.dag = None task_instance.task_id = "task_id" + task_instance.dag_id = "dag_id" task_instance.try_number = 1 task_instance.map_index = -1 else: @@ -996,12 +976,12 @@ def _create_listener_and_task_instance( from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance dag = DAG( - dag_id="dag_id", + dag_id="dag_id_from_dag_not_ti", description="Test DAG Description", tags=["tag1", "tag2"], ) - task = EmptyOperator(task_id="task_id", dag=dag, owner="task_owner") - task2 = EmptyOperator(task_id="task_id2", dag=dag, owner="another_owner") # noqa: F841 + task = EmptyOperator(task_id="task_id_from_task_not_ti", dag=dag, owner="task_owner") + task2 = EmptyOperator(task_id="task_id2_from_task_not_ti", dag=dag, owner="another_owner") # noqa: F841 sdk_task_instance = SdkTaskInstance( id=uuid7(), @@ -1016,8 +996,8 @@ def _create_listener_and_task_instance( task=task, _ti_context_from_server=TIRunContext( dag_run=SdkDagRun( - dag_id="dag_id", - run_id="dag_run_run_id", + dag_id="dag_id_from_dagrun_not_ti", + run_id="dag_run_run_id_from_dagrun_not_ti", logical_date=timezone.datetime(2020, 1, 1, 1, 1, 1), data_interval_start=None, data_interval_end=None, @@ -1066,13 +1046,11 @@ def mock_task_id(dag_id, task_id, try_number, logical_date, map_index): @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_mapped_task_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call ) def test_adapter_start_task_is_called_with_proper_arguments( self, - mock_get_job_name, mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, @@ -1090,7 +1068,7 @@ def test_adapter_start_task_is_called_with_proper_arguments( reflecting the comprehensive tracking of task execution contexts.""" listener, task_instance = self._create_listener_and_task_instance() - mock_get_job_name.return_value = "job_name" + mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} @@ -1101,7 +1079,7 @@ def test_adapter_start_task_is_called_with_proper_arguments( listener.on_task_instance_running(None, task_instance) listener.adapter.start_task.assert_called_once_with( run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", - job_name="job_name", + job_name="dag_id.task_id", job_description="Test DAG Description", event_time="2023-01-01T13:01:01+00:00", nominal_start_time=None, @@ -1125,13 +1103,11 @@ def test_adapter_start_task_is_called_with_proper_arguments( @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_mapped_task_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call ) def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default( self, - mock_get_job_name, mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, @@ -1141,7 +1117,6 @@ def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default mock_debug_mode, ): listener, task_instance = self._create_listener_and_task_instance() - mock_get_job_name.return_value = "job_name" mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} @@ -1160,7 +1135,6 @@ def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default @mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call ) @@ -1168,7 +1142,6 @@ def test_adapter_start_task_is_called_with_dag_owners_when_task_owner_is_default def test_adapter_fail_task_is_called_with_proper_arguments( self, mock_utcnow, - mock_get_job_name, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, @@ -1186,7 +1159,6 @@ def test_adapter_fail_task_is_called_with_proper_arguments( listener, task_instance = self._create_listener_and_task_instance() task_instance.get_template_context()["dag_run"].logical_date = timezone.datetime(2020, 1, 1, 1, 1, 1) - mock_get_job_name.return_value = "job_name" mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} mock_get_task_parent_run_facet.return_value = {"parent": 4} @@ -1197,7 +1169,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments( listener.on_task_instance_failed(previous_state=None, task_instance=task_instance, error=err) listener.adapter.fail_task.assert_called_once_with( end_time="2023-01-03T13:01:01+00:00", - job_name="job_name", + job_name="dag_id.task_id", run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", task=listener.extractor_manager.extract_metadata(), owners=["task_owner"], @@ -1220,13 +1192,11 @@ def test_adapter_fail_task_is_called_with_proper_arguments( @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_mapped_task_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call ) def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default( self, - mock_get_job_name, mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, @@ -1236,7 +1206,6 @@ def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default( mock_debug_mode, ): listener, task_instance = self._create_listener_and_task_instance() - mock_get_job_name.return_value = "job_name" mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} @@ -1251,10 +1220,10 @@ def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default( call_owners = listener.adapter.fail_task.call_args.kwargs["owners"] assert sorted(call_owners) == ["airflow", "another_owner"] + @mock.patch("airflow.providers.openlineage.plugins.adapter.OpenLineageAdapter.emit") @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_debug_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call ) @@ -1262,10 +1231,10 @@ def test_adapter_fail_task_is_called_with_dag_owners_when_task_owner_is_default( def test_adapter_fail_task_is_called_with_proper_arguments_for_db_task_instance_model( self, mock_utcnow, - mock_get_job_name, mock_get_task_parent_run_facet, mock_debug_facet, mock_debug_mode, + mock_emit, ): """Tests that the 'fail_task' method of the OpenLineageAdapter is invoked with the correct arguments. @@ -1274,31 +1243,36 @@ def test_adapter_fail_task_is_called_with_proper_arguments_for_db_task_instance_ """ listener, task_instance = self._create_listener_and_task_instance(runtime_ti=False) - mock_get_job_name.return_value = "job_name" mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} err = ValueError("test") - on_task_failed_listener_kwargs = {"error": err} - expected_err_kwargs = {"error": err} - listener.on_task_instance_failed( - previous_state=None, task_instance=task_instance, **on_task_failed_listener_kwargs - ) + listener.on_task_instance_failed(previous_state=None, task_instance=task_instance, error=err) mock_get_task_parent_run_facet.assert_called_once_with( parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0", parent_job_name=task_instance.dag_id ) - listener.adapter.fail_task.assert_called_once_with( + expected_args = dict( end_time="2023-01-03T13:01:01+00:00", - job_name="job_name", + job_name="dag_id.task_id", run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", task=OperatorLineage(), + nominal_start_time=None, + nominal_end_time=None, + tags=None, + owners=None, run_facets={ "parent": 4, "debug": "packages", }, - **expected_err_kwargs, + error=err, ) + listener.adapter.fail_task.assert_called_once_with(**expected_args) + + expected_args["run_id"] = "9d3b14f7-de91-40b6-aeef-e887e2c7673e" + adapter = OpenLineageAdapter() + adapter.fail_task(**expected_args) + assert mock_emit.assert_called_once @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_debug_facet") @@ -1306,7 +1280,6 @@ def test_adapter_fail_task_is_called_with_proper_arguments_for_db_task_instance_ @mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call ) @@ -1314,7 +1287,6 @@ def test_adapter_fail_task_is_called_with_proper_arguments_for_db_task_instance_ def test_adapter_complete_task_is_called_with_proper_arguments( self, mock_utcnow, - mock_get_job_name, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, mock_get_task_parent_run_facet, @@ -1332,7 +1304,6 @@ def test_adapter_complete_task_is_called_with_proper_arguments( """ listener, task_instance = self._create_listener_and_task_instance() - mock_get_job_name.return_value = "job_name" mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} mock_get_task_parent_run_facet.return_value = {"parent": 4} @@ -1344,7 +1315,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments( assert len(calls) == 1 assert calls[0][1] == dict( end_time="2023-01-03T13:01:01+00:00", - job_name="job_name", + job_name="dag_id.task_id", run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", task=listener.extractor_manager.extract_metadata(), owners=["task_owner"], @@ -1366,13 +1337,11 @@ def test_adapter_complete_task_is_called_with_proper_arguments( @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_mapped_task_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call ) def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_default( self, - mock_get_job_name, mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, @@ -1382,7 +1351,6 @@ def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_defa mock_debug_mode, ): listener, task_instance = self._create_listener_and_task_instance() - mock_get_job_name.return_value = "job_name" mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} @@ -1396,21 +1364,16 @@ def test_adapter_complete_task_is_called_with_dag_owners_when_task_owner_is_defa call_owners = listener.adapter.complete_task.call_args.kwargs["owners"] assert sorted(call_owners) == ["airflow", "another_owner"] + @mock.patch("airflow.providers.openlineage.plugins.adapter.OpenLineageAdapter.emit") @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_debug_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call ) @mock.patch("airflow.utils.timezone.utcnow", return_value=timezone.datetime(2023, 1, 3, 13, 1, 1)) def test_adapter_complete_task_is_called_with_proper_arguments_for_db_task_instance_model( - self, - mock_utcnow, - mock_get_job_name, - mock_get_task_parent_run_facet, - mock_debug_facet, - mock_debug_mode, + self, mock_utcnow, mock_get_task_parent_run_facet, mock_debug_facet, mock_debug_mode, mock_emit ): """Tests that the 'complete_task' method of the OpenLineageAdapter is called with the correct arguments. @@ -1418,7 +1381,6 @@ def test_adapter_complete_task_is_called_with_proper_arguments_for_db_task_insta to simulate the listener being called after task's state has been manually set via API. """ listener, task_instance = self._create_listener_and_task_instance(runtime_ti=False) - mock_get_job_name.return_value = "job_name" mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_debug_facet.return_value = {"debug": "packages"} @@ -1428,16 +1390,26 @@ def test_adapter_complete_task_is_called_with_proper_arguments_for_db_task_insta mock_get_task_parent_run_facet.assert_called_once_with( parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0", parent_job_name=task_instance.dag_id ) - assert calls[0][1] == dict( + expected_args = dict( end_time="2023-01-03T13:01:01+00:00", - job_name="job_name", + job_name="dag_id.task_id", run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", task=OperatorLineage(), + nominal_start_time=None, + nominal_end_time=None, + tags=None, + owners=None, run_facets={ "parent": 4, "debug": "packages", }, ) + assert calls[0][1] == expected_args + + expected_args["run_id"] = "9d3b14f7-de91-40b6-aeef-e887e2c7673e" + adapter = OpenLineageAdapter() + adapter.complete_task(**expected_args) + assert mock_emit.assert_called_once @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call @@ -1506,12 +1478,10 @@ def test_on_task_instance_success_correctly_calls_openlineage_adapter_run_id_met @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") def test_listener_on_task_instance_running_do_not_call_adapter_when_disabled_operator( - self, mock_get_job_name, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, mock_disabled + self, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, mock_disabled ): listener, task_instance = self._create_listener_and_task_instance() - mock_get_job_name.return_value = "job_name" mock_get_user_provided_run_facets.return_value = {"custom_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} mock_disabled.return_value = True @@ -1525,9 +1495,8 @@ def test_listener_on_task_instance_running_do_not_call_adapter_when_disabled_ope @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") def test_listener_on_task_instance_failed_do_not_call_adapter_when_disabled_operator( - self, mock_get_job_name, mock_get_user_provided_run_facets, mock_disabled + self, mock_get_user_provided_run_facets, mock_disabled ): listener, task_instance = self._create_listener_and_task_instance() mock_get_user_provided_run_facets.return_value = {"custom_facet": 2} @@ -1546,9 +1515,8 @@ def test_listener_on_task_instance_failed_do_not_call_adapter_when_disabled_oper @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") - @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") def test_listener_on_task_instance_success_do_not_call_adapter_when_disabled_operator( - self, mock_get_job_name, mock_get_user_provided_run_facets, mock_disabled + self, mock_get_user_provided_run_facets, mock_disabled ): listener, task_instance = self._create_listener_and_task_instance() mock_get_user_provided_run_facets.return_value = {"custom_facet": 2}