diff --git a/providers/amazon/pyproject.toml b/providers/amazon/pyproject.toml index 14c191750c93f..a88898f2748a7 100644 --- a/providers/amazon/pyproject.toml +++ b/providers/amazon/pyproject.toml @@ -138,7 +138,7 @@ dependencies = [ "apache-airflow-providers-mongo" ] "openlineage" = [ - "apache-airflow-providers-openlineage" + "apache-airflow-providers-openlineage>=2.3.0" ] "salesforce" = [ "apache-airflow-providers-salesforce" diff --git a/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py index be03438fe94bd..d4e0034690e18 100644 --- a/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/operators/test_livy.py @@ -449,6 +449,7 @@ def test_inject_composite_openlineage_config_to_spark( mock_ti.try_number = 1 mock_ti.dag_run.logical_date = DEFAULT_DATE mock_ti.dag_run.run_after = DEFAULT_DATE + mock_ti.dag_run.clear_number = 0 mock_ti.logical_date = DEFAULT_DATE mock_ti.map_index = -1 mock_get_batch_state.return_value = BatchState.SUCCESS @@ -472,6 +473,9 @@ def test_inject_composite_openlineage_config_to_spark( "spark.openlineage.parentJobName": "test_dag_id.spark_submit_job", "spark.openlineage.parentJobNamespace": "default", "spark.openlineage.parentRunId": "01595753-6400-710b-8a12-9e978335a56d", + "spark.openlineage.rootParentJobName": "test_dag_id", + "spark.openlineage.rootParentJobNamespace": "default", + "spark.openlineage.rootParentRunId": "01595753-6400-71fe-a08c-aaed126ab6fb", "spark.openlineage.transport.type": "composite", "spark.openlineage.transport.continueOnFailure": "True", "spark.openlineage.transport.transports.test1.type": "http", diff --git a/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py b/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py index b339093bc540f..264606748b167 100644 --- a/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py +++ b/providers/apache/spark/tests/unit/apache/spark/operators/test_spark_submit.py @@ -368,6 +368,7 @@ def test_inject_composite_openlineage_config_to_spark(self, mock_get_openlineage mock_ti.try_number = 1 mock_ti.dag_run.logical_date = DEFAULT_DATE mock_ti.dag_run.run_after = DEFAULT_DATE + mock_ti.dag_run.clear_number = 0 mock_ti.logical_date = DEFAULT_DATE mock_ti.map_index = -1 @@ -386,6 +387,9 @@ def test_inject_composite_openlineage_config_to_spark(self, mock_get_openlineage "spark.openlineage.parentJobName": "test_dag_id.spark_submit_job", "spark.openlineage.parentJobNamespace": "default", "spark.openlineage.parentRunId": "01595753-6400-710b-8a12-9e978335a56d", + "spark.openlineage.rootParentJobName": "test_dag_id", + "spark.openlineage.rootParentJobNamespace": "default", + "spark.openlineage.rootParentRunId": "01595753-6400-71fe-a08c-aaed126ab6fb", "spark.openlineage.transport.type": "composite", "spark.openlineage.transport.continueOnFailure": "True", "spark.openlineage.transport.transports.test1.type": "http", diff --git a/providers/dbt/cloud/pyproject.toml b/providers/dbt/cloud/pyproject.toml index 4246c8301c8a5..f3bfa334c408f 100644 --- a/providers/dbt/cloud/pyproject.toml +++ b/providers/dbt/cloud/pyproject.toml @@ -69,7 +69,7 @@ dependencies = [ [project.optional-dependencies] # pip install apache-airflow-providers-dbt-cloud[openlineage] "openlineage" = [ - "apache-airflow-providers-openlineage>=2.0.0", + "apache-airflow-providers-openlineage>=2.3.0", ] [dependency-groups] diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py index b9e56f30f3faf..23061e6e20f4b 100644 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py +++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py @@ -48,7 +48,7 @@ def _get_logical_date(task_instance): return date -@require_openlineage_version(provider_min_version="2.0.0") +@require_openlineage_version(provider_min_version="2.3.0") def generate_openlineage_events_from_dbt_cloud_run( operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor, task_instance: TaskInstance ) -> OperatorLineage: @@ -141,10 +141,19 @@ async def get_artifacts_for_steps(steps, artifacts): map_index=task_instance.map_index, ) + root_parent_run_id = OpenLineageAdapter.build_dag_run_id( + dag_id=task_instance.dag_id, + logical_date=_get_logical_date(task_instance), + clear_number=task_instance.dag_run.clear_number, + ) + parent_job = ParentRunMetadata( run_id=parent_run_id, job_name=f"{task_instance.dag_id}.{task_instance.task_id}", job_namespace=namespace(), + root_parent_run_id=root_parent_run_id, + root_parent_job_name=task_instance.dag_id, + root_parent_job_namespace=namespace(), ) client = get_openlineage_listener().adapter.get_or_create_openlineage_client() diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/utils/test_openlineage.py b/providers/dbt/cloud/tests/unit/dbt/cloud/utils/test_openlineage.py index b96cb549eb7b9..b76855f44b118 100644 --- a/providers/dbt/cloud/tests/unit/dbt/cloud/utils/test_openlineage.py +++ b/providers/dbt/cloud/tests/unit/dbt/cloud/utils/test_openlineage.py @@ -33,6 +33,7 @@ TASK_ID = "dbt_test" DAG_ID = "dbt_dag" TASK_UUID = "01481cfa-0ff7-3692-9bba-79417cf498c2" +DAG_UUID = "01481cfa-1a1a-2b2b-3c3c-79417cf498c2" class MockResponse: @@ -88,30 +89,48 @@ def get_dbt_artifact(*args, **kwargs): return None -def test_previous_version_openlineage_provider(): - """When using OpenLineage, the dbt-cloud provider now depends on openlineage provider >= 2.0""" +@pytest.mark.parametrize( + "value, is_error", + [ + ("1.99.0", True), + ("2.0.0", True), + ("2.3.0", False), + ("2.99.0", False), + ], +) +def test_previous_version_openlineage_provider(value, is_error): + """When using OpenLineage, the dbt-cloud provider now depends on openlineage provider >= 2.3""" def _mock_version(package): if package == "apache-airflow-providers-openlineage": - return "1.99.0" + return value raise Exception("Unexpected package") mock_operator = MagicMock() mock_task_instance = MagicMock() expected_err = ( - "OpenLineage provider version `1.99.0` is lower than required `2.0.0`, " + f"OpenLineage provider version `{value}` is lower than required `2.3.0`, " "skipping function `generate_openlineage_events_from_dbt_cloud_run` execution" ) - with patch("importlib.metadata.version", side_effect=_mock_version): - with pytest.raises(AirflowOptionalProviderFeatureException, match=expected_err): - generate_openlineage_events_from_dbt_cloud_run(mock_operator, mock_task_instance) + if is_error: + with patch("importlib.metadata.version", side_effect=_mock_version): + with pytest.raises(AirflowOptionalProviderFeatureException, match=expected_err): + generate_openlineage_events_from_dbt_cloud_run(mock_operator, mock_task_instance) + else: + with patch("importlib.metadata.version", side_effect=_mock_version): + # Error that would certainly not happen on version checking + mock_operator.hook.get_job_run.side_effect = ZeroDivisionError("error for test") + with pytest.raises(ZeroDivisionError, match="error for test"): + generate_openlineage_events_from_dbt_cloud_run(mock_operator, mock_task_instance) class TestGenerateOpenLineageEventsFromDbtCloudRun: + @patch("importlib.metadata.version", return_value="2.3.0") @patch("airflow.providers.openlineage.plugins.listener.get_openlineage_listener") @patch("airflow.providers.openlineage.plugins.adapter.OpenLineageAdapter.build_task_instance_run_id") + @patch("airflow.providers.openlineage.plugins.adapter.OpenLineageAdapter.build_dag_run_id") @patch.object(DbtCloudHook, "get_job_run") @patch.object(DbtCloudHook, "get_project") @patch.object(DbtCloudHook, "get_job_run_artifact") @@ -120,8 +139,10 @@ def test_generate_events( mock_get_job_run_artifact, mock_get_project, mock_get_job_run, + mock_build_dag_run_id, mock_build_task_instance_run_id, mock_get_openlineage_listener, + mock_version, ): mock_operator = MagicMock(spec=DbtCloudRunJobOperator) mock_operator.account_id = None @@ -154,6 +175,7 @@ def test_generate_events( mock_task_instance = MagicMock() mock_task_instance.task_id = TASK_ID mock_task_instance.dag_id = DAG_ID + mock_task_instance.dag_run.clear_number = 0 mock_client = MagicMock() @@ -163,6 +185,7 @@ def test_generate_events( ) mock_build_task_instance_run_id.return_value = TASK_UUID + mock_build_dag_run_id.return_value = DAG_UUID generate_openlineage_events_from_dbt_cloud_run(mock_operator, task_instance=mock_task_instance) assert mock_client.emit.call_count == 4 diff --git a/providers/google/tests/unit/google/cloud/openlineage/test_utils.py b/providers/google/tests/unit/google/cloud/openlineage/test_utils.py index 21317983d62e8..b5ee59a268d01 100644 --- a/providers/google/tests/unit/google/cloud/openlineage/test_utils.py +++ b/providers/google/tests/unit/google/cloud/openlineage/test_utils.py @@ -106,11 +106,13 @@ EXAMPLE_CONTEXT = { "ti": MagicMock( dag_id="dag_id", - dag_run=MagicMock(run_after=dt.datetime(2024, 11, 11), logical_date=dt.datetime(2024, 11, 11)), task_id="task_id", try_number=1, map_index=1, logical_date=dt.datetime(2024, 11, 11), + dag_run=MagicMock( + run_after=dt.datetime(2024, 11, 11), logical_date=dt.datetime(2024, 11, 11), clear_number=0 + ), ) } OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG = { @@ -142,6 +144,9 @@ "spark.openlineage.parentJobName": "dag_id.task_id", "spark.openlineage.parentJobNamespace": "default", "spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267", + "spark.openlineage.rootParentJobName": "dag_id", + "spark.openlineage.rootParentJobNamespace": "default", + "spark.openlineage.rootParentRunId": "01931885-2800-799d-8041-88a263ffa0d8", } @@ -1034,6 +1039,9 @@ def test_inject_openlineage_properties_into_dataproc_workflow_template_parent_in "spark.openlineage.parentJobName": "dag_id.task_id", "spark.openlineage.parentJobNamespace": "default", "spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267", + "spark.openlineage.rootParentJobName": "dag_id", + "spark.openlineage.rootParentJobNamespace": "default", + "spark.openlineage.rootParentRunId": "01931885-2800-799d-8041-88a263ffa0d8", }, }, }, diff --git a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py index 35072befd72e7..e10eba067b6a8 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_dataproc.py +++ b/providers/google/tests/unit/google/cloud/operators/test_dataproc.py @@ -405,6 +405,7 @@ try_number=1, map_index=1, logical_date=dt.datetime(2024, 11, 11), + dag_run=MagicMock(logical_date=dt.datetime(2024, 11, 11), clear_number=0), ) } OPENLINEAGE_HTTP_TRANSPORT_EXAMPLE_CONFIG = { @@ -436,6 +437,9 @@ "spark.openlineage.parentJobName": "dag_id.task_id", "spark.openlineage.parentJobNamespace": "default", "spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267", + "spark.openlineage.rootParentJobName": "dag_id", + "spark.openlineage.rootParentJobNamespace": "default", + "spark.openlineage.rootParentRunId": "01931885-2800-7be7-aa8d-aaa15c337267", } @@ -1464,6 +1468,9 @@ def test_execute_openlineage_parent_job_info_injection( "spark.openlineage.parentJobName": "dag_id.task_id", "spark.openlineage.parentJobNamespace": "default", "spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267", + "spark.openlineage.rootParentJobName": "dag_id", + "spark.openlineage.rootParentJobNamespace": "default", + "spark.openlineage.rootParentRunId": "01931885-2800-7be7-aa8d-aaa15c337267", }, }, } @@ -2646,6 +2653,9 @@ def test_execute_openlineage_parent_job_info_injection( "spark.openlineage.parentJobName": "dag_id.task_id", "spark.openlineage.parentJobNamespace": "default", "spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267", + "spark.openlineage.rootParentJobName": "dag_id", + "spark.openlineage.rootParentJobNamespace": "default", + "spark.openlineage.rootParentRunId": "01931885-2800-7be7-aa8d-aaa15c337267", }, }, }, diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py index 55fa5fa976961..3c41639b227bd 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py @@ -32,7 +32,6 @@ job_type_job, nominal_time_run, ownership_job, - parent_run, source_code_location_job, ) from openlineage.client.uuid import generate_static_uuid @@ -188,8 +187,6 @@ def start_task( job_name: str, job_description: str, event_time: str, - parent_job_name: str | None, - parent_run_id: str | None, code_location: str | None, nominal_start_time: str | None, nominal_end_time: str | None, @@ -204,9 +201,6 @@ def start_task( :param job_name: globally unique identifier of task in dag :param job_description: user provided description of job :param event_time: - :param parent_job_name: the name of the parent job (typically the DAG, - but possibly a task group) - :param parent_run_id: identifier of job spawning this task :param code_location: file path or URL of DAG file :param nominal_start_time: scheduled time of dag run :param nominal_end_time: following schedule of dag run @@ -223,9 +217,6 @@ def start_task( eventTime=event_time, run=self._build_run( run_id=run_id, - job_name=job_name, - parent_job_name=parent_job_name, - parent_run_id=parent_run_id, nominal_start_time=nominal_start_time, nominal_end_time=nominal_end_time, run_facets=run_facets, @@ -248,8 +239,6 @@ def complete_task( self, run_id: str, job_name: str, - parent_job_name: str | None, - parent_run_id: str | None, end_time: str, task: OperatorLineage, run_facets: dict[str, RunFacet] | None = None, @@ -259,9 +248,6 @@ def complete_task( :param run_id: globally unique identifier of task in dag run :param job_name: globally unique identifier of task between dags - :param parent_job_name: the name of the parent job (typically the DAG, - but possibly a task group) - :param parent_run_id: identifier of job spawning this task :param end_time: time of task completion :param task: metadata container with information extracted from operator :param run_facets: additional run facets @@ -275,9 +261,6 @@ def complete_task( eventTime=end_time, run=self._build_run( run_id=run_id, - job_name=job_name, - parent_job_name=parent_job_name, - parent_run_id=parent_run_id, run_facets=run_facets, ), job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets), @@ -291,8 +274,6 @@ def fail_task( self, run_id: str, job_name: str, - parent_job_name: str | None, - parent_run_id: str | None, end_time: str, task: OperatorLineage, error: str | BaseException | None = None, @@ -303,9 +284,6 @@ def fail_task( :param run_id: globally unique identifier of task in dag run :param job_name: globally unique identifier of task between dags - :param parent_job_name: the name of the parent job (typically the DAG, - but possibly a task group) - :param parent_run_id: identifier of job spawning this task :param end_time: time of task completion :param task: metadata container with information extracted from operator :param run_facets: custom run facets @@ -332,9 +310,6 @@ def fail_task( eventTime=end_time, run=self._build_run( run_id=run_id, - job_name=job_name, - parent_job_name=parent_job_name, - parent_run_id=parent_run_id, run_facets=run_facets, ), job=self._build_job(job_name, job_type=_JOB_TYPE_TASK, job_facets=task.job_facets), @@ -372,7 +347,6 @@ def dag_started( run_id=self.build_dag_run_id( dag_id=dag_id, logical_date=logical_date, clear_number=clear_number ), - job_name=dag_id, nominal_start_time=nominal_start_time, nominal_end_time=nominal_end_time, run_facets={**run_facets, **get_airflow_debug_facet(), **get_processing_engine_facet()}, @@ -473,9 +447,6 @@ def dag_failed( @staticmethod def _build_run( run_id: str, - job_name: str, - parent_job_name: str | None = None, - parent_run_id: str | None = None, nominal_start_time: str | None = None, nominal_end_time: str | None = None, run_facets: dict[str, RunFacet] | None = None, @@ -485,13 +456,6 @@ def _build_run( facets.update( {"nominalTime": nominal_time_run.NominalTimeRunFacet(nominal_start_time, nominal_end_time)} ) - if parent_run_id: - parent_run_facet = parent_run.ParentRunFacet( - run=parent_run.Run(runId=parent_run_id), - job=parent_run.Job(namespace=conf.namespace(), name=parent_job_name or job_name), - ) - facets.update({"parent": parent_run_facet}) - if run_facets: facets.update(run_facets) diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index 11228c0d10fcf..2e76d4086f8c9 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -40,6 +40,7 @@ get_airflow_mapped_task_facet, get_airflow_run_facet, get_job_name, + get_task_parent_run_facet, get_user_provided_run_facets, is_operator_disabled, is_selective_lineage_enabled, @@ -201,14 +202,13 @@ def on_running(): job_name=get_job_name(task), job_description=dag.description, event_time=start_date.isoformat(), - parent_job_name=dag.dag_id, - parent_run_id=parent_run_id, code_location=None, nominal_start_time=data_interval_start, nominal_end_time=data_interval_end, owners=dag.owner.split(", "), task=task_metadata, run_facets={ + **get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id), **get_user_provided_run_facets(task_instance, TaskInstanceState.RUNNING), **get_airflow_mapped_task_facet(task_instance), **get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid), @@ -314,11 +314,10 @@ def on_success(): redacted_event = self.adapter.complete_task( run_id=task_uuid, job_name=get_job_name(task), - parent_job_name=dag.dag_id, - parent_run_id=parent_run_id, end_time=end_date.isoformat(), task=task_metadata, run_facets={ + **get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id), **get_user_provided_run_facets(task_instance, TaskInstanceState.SUCCESS), **get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid), **get_airflow_debug_facet(), @@ -434,12 +433,11 @@ def on_failure(): redacted_event = self.adapter.fail_task( run_id=task_uuid, job_name=get_job_name(task), - parent_job_name=dag.dag_id, - parent_run_id=parent_run_id, end_time=end_date.isoformat(), task=task_metadata, error=error, run_facets={ + **get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=dag.dag_id), **get_user_provided_run_facets(task_instance, TaskInstanceState.FAILED), **get_airflow_run_facet(dagrun, dag, task_instance, task, task_uuid), **get_airflow_debug_facet(), @@ -482,11 +480,12 @@ def on_state_change(): adapter_kwargs = { "run_id": task_uuid, "job_name": get_job_name(ti), - "parent_job_name": dagrun.dag_id, - "parent_run_id": parent_run_id, "end_time": end_date.isoformat(), "task": OperatorLineage(), - "run_facets": get_airflow_debug_facet(), + "run_facets": { + **get_task_parent_run_facet(parent_run_id=parent_run_id, parent_job_name=ti.dag_id), + **get_airflow_debug_facet(), + }, } if ti_state == TaskInstanceState.FAILED: diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py index ef5b9c0ad64e9..13316304131f5 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/macros.py @@ -59,25 +59,11 @@ def lineage_run_id(task_instance: TaskInstance): For more information take a look at the guide: :ref:`howto/macros:openlineage` """ - if AIRFLOW_V_3_0_PLUS: - context = task_instance.get_template_context() - if hasattr(task_instance, "dag_run"): - dag_run = task_instance.dag_run - elif hasattr(context, "dag_run"): - dag_run = context["dag_run"] - if hasattr(dag_run, "logical_date") and dag_run.logical_date: - date = dag_run.logical_date - else: - date = dag_run.run_after - elif hasattr(task_instance, "logical_date"): - date = task_instance.logical_date - else: - date = task_instance.execution_date return OpenLineageAdapter.build_task_instance_run_id( dag_id=task_instance.dag_id, task_id=task_instance.task_id, try_number=task_instance.try_number, - logical_date=date, + logical_date=_get_logical_date(task_instance), map_index=task_instance.map_index, ) @@ -101,3 +87,44 @@ def lineage_parent_id(task_instance: TaskInstance): lineage_run_id(task_instance), ) ) + + +def lineage_root_parent_id(task_instance: TaskInstance): + return "/".join( + ( + lineage_job_namespace(), + lineage_root_job_name(task_instance), + lineage_root_run_id(task_instance), + ) + ) + + +def lineage_root_job_name(task_instance: TaskInstance): + return task_instance.dag_id + + +def lineage_root_run_id(task_instance: TaskInstance): + return OpenLineageAdapter.build_dag_run_id( + dag_id=task_instance.dag_id, + logical_date=_get_logical_date(task_instance), + clear_number=task_instance.dag_run.clear_number, + ) + + +def _get_logical_date(task_instance): + # todo: remove when min airflow version >= 3.0 + if AIRFLOW_V_3_0_PLUS: + context = task_instance.get_template_context() + if hasattr(task_instance, "dag_run"): + dag_run = task_instance.dag_run + elif hasattr(context, "dag_run"): + dag_run = context["dag_run"] + if hasattr(dag_run, "logical_date") and dag_run.logical_date: + date = dag_run.logical_date + else: + date = dag_run.run_after + elif hasattr(task_instance, "logical_date"): + date = task_instance.logical_date + else: + date = task_instance.execution_date + return date diff --git a/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py b/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py index 1da6a2a97e6dc..4925cd00f3923 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py +++ b/providers/openlineage/src/airflow/providers/openlineage/utils/spark.py @@ -24,6 +24,8 @@ from airflow.providers.openlineage.plugins.macros import ( lineage_job_name, lineage_job_namespace, + lineage_root_job_name, + lineage_root_run_id, lineage_run_id, ) @@ -48,6 +50,9 @@ def _get_parent_job_information_as_spark_properties(context: Context) -> dict: "spark.openlineage.parentJobNamespace": lineage_job_namespace(), "spark.openlineage.parentJobName": lineage_job_name(ti), # type: ignore[arg-type] "spark.openlineage.parentRunId": lineage_run_id(ti), # type: ignore[arg-type] + "spark.openlineage.rootParentRunId": lineage_root_run_id(ti), # type: ignore[arg-type] + "spark.openlineage.rootParentJobName": lineage_root_job_name(ti), # type: ignore[arg-type] + "spark.openlineage.rootParentJobNamespace": lineage_job_namespace(), } diff --git a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py index 2567cb418027b..9f96954fda146 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py +++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py @@ -26,6 +26,7 @@ from typing import TYPE_CHECKING, Any, Callable import attrs +from openlineage.client.facet_v2 import parent_run from openlineage.client.utils import RedactMixin from airflow import __version__ as AIRFLOW_VERSION @@ -126,6 +127,27 @@ def get_job_name(task: TaskInstance) -> str: return f"{task.dag_id}.{task.task_id}" +def get_task_parent_run_facet( + parent_run_id: str, parent_job_name: str, parent_job_namespace: str = conf.namespace() +) -> dict[str, Any]: + """ + Retrieve the parent run facet for task-level events. + + This facet currently always points to the DAG-level run ID and name, + as external events for DAG runs are not yet handled. + """ + return { + "parent": parent_run.ParentRunFacet( + run=parent_run.Run(runId=parent_run_id), + job=parent_run.Job(namespace=parent_job_namespace, name=parent_job_name), + root=parent_run.Root( + run=parent_run.RootRun(runId=parent_run_id), + job=parent_run.RootJob(namespace=parent_job_namespace, name=parent_job_name), + ), + ) + } + + def get_airflow_mapped_task_facet(task_instance: TaskInstance) -> dict[str, Any]: # check for -1 comes from SmartSensor compatibility with dynamic task mapping # this comes from Airflow code diff --git a/providers/openlineage/tests/unit/openlineage/dags/__init__.py b/providers/openlineage/tests/unit/openlineage/dags/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/openlineage/tests/unit/openlineage/dags/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow-core/tests/unit/dags/test_openlineage_execution.py b/providers/openlineage/tests/unit/openlineage/dags/test_openlineage_execution.py similarity index 100% rename from airflow-core/tests/unit/dags/test_openlineage_execution.py rename to providers/openlineage/tests/unit/openlineage/dags/test_openlineage_execution.py diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py index 64c66aab2bbfc..68946cdc42a5a 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py @@ -154,8 +154,6 @@ def test_emit_start_event(mock_stats_incr, mock_stats_timer): job_name="job", job_description="description", event_time=event_time, - parent_job_name=None, - parent_run_id=None, code_location=None, nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(), nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(), @@ -217,8 +215,6 @@ def test_emit_start_event_with_additional_information(mock_stats_incr, mock_stat job_name="job", job_description="description", event_time=event_time, - parent_job_name="parent_job_name", - parent_run_id=parent_run_id, code_location=None, nominal_start_time=datetime.datetime(2022, 1, 1).isoformat(), nominal_end_time=datetime.datetime(2022, 1, 1).isoformat(), @@ -228,9 +224,17 @@ def test_emit_start_event_with_additional_information(mock_stats_incr, mock_stat outputs=[Dataset(namespace="gs://bucket", name="exported_folder")], job_facets={"sql": sql_job.SQLJobFacet(query="SELECT 1;")}, run_facets={ + "parent": parent_run.ParentRunFacet( + run=parent_run.Run(runId=parent_run_id), + job=parent_run.Job(namespace=namespace(), name="parent_job_name"), + root=parent_run.Root( + run=parent_run.RootRun(runId=parent_run_id), + job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), + ), + ), "externalQuery1": external_query_run.ExternalQueryRunFacet( externalQueryId="123", source="source" - ) + ), }, ), run_facets={ @@ -256,6 +260,10 @@ def test_emit_start_event_with_additional_information(mock_stats_incr, mock_stat "parent": parent_run.ParentRunFacet( run=parent_run.Run(runId=parent_run_id), job=parent_run.Job(namespace=namespace(), name="parent_job_name"), + root=parent_run.Root( + run=parent_run.RootRun(runId=parent_run_id), + job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), + ), ), "externalQuery1": external_query_run.ExternalQueryRunFacet( externalQueryId="123", source="source" @@ -308,8 +316,6 @@ def test_emit_complete_event(mock_stats_incr, mock_stats_timer): adapter.complete_task( run_id=run_id, end_time=event_time, - parent_job_name=None, - parent_run_id=None, job_name="job", task=OperatorLineage(), ) @@ -360,8 +366,6 @@ def test_emit_complete_event_with_additional_information(mock_stats_incr, mock_s adapter.complete_task( run_id=run_id, end_time=event_time, - parent_job_name="parent_job_name", - parent_run_id=parent_run_id, job_name="job", task=OperatorLineage( inputs=[Dataset(namespace="bigquery", name="a.b.c"), Dataset(namespace="bigquery", name="x.y.z")], @@ -374,7 +378,17 @@ def test_emit_complete_event_with_additional_information(mock_stats_incr, mock_s }, ), run_facets={ - "externalQuery2": external_query_run.ExternalQueryRunFacet(externalQueryId="999", source="source") + "parent": parent_run.ParentRunFacet( + run=parent_run.Run(runId=parent_run_id), + job=parent_run.Job(namespace=namespace(), name="parent_job_name"), + root=parent_run.Root( + run=parent_run.RootRun(runId=parent_run_id), + job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), + ), + ), + "externalQuery2": external_query_run.ExternalQueryRunFacet( + externalQueryId="999", source="source" + ), }, ) @@ -389,6 +403,10 @@ def test_emit_complete_event_with_additional_information(mock_stats_incr, mock_s "parent": parent_run.ParentRunFacet( run=parent_run.Run(runId=parent_run_id), job=parent_run.Job(namespace=namespace(), name="parent_job_name"), + root=parent_run.Root( + run=parent_run.RootRun(runId=parent_run_id), + job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), + ), ), "processing_engine": processing_engine_run.ProcessingEngineRunFacet( version=ANY, name="Airflow", openlineageAdapterVersion=ANY @@ -437,8 +455,6 @@ def test_emit_failed_event(mock_stats_incr, mock_stats_timer): adapter.fail_task( run_id=run_id, end_time=event_time, - parent_job_name=None, - parent_run_id=None, job_name="job", task=OperatorLineage(), ) @@ -489,8 +505,6 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta adapter.fail_task( run_id=run_id, end_time=event_time, - parent_job_name="parent_job_name", - parent_run_id=parent_run_id, job_name="job", task=OperatorLineage( inputs=[Dataset(namespace="bigquery", name="a.b.c"), Dataset(namespace="bigquery", name="x.y.z")], @@ -503,7 +517,17 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta job_facets={"sql": sql_job.SQLJobFacet(query="SELECT 1;")}, ), run_facets={ - "externalQuery2": external_query_run.ExternalQueryRunFacet(externalQueryId="999", source="source") + "parent": parent_run.ParentRunFacet( + run=parent_run.Run(runId=parent_run_id), + job=parent_run.Job(namespace=namespace(), name="parent_job_name"), + root=parent_run.Root( + run=parent_run.RootRun(runId=parent_run_id), + job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), + ), + ), + "externalQuery2": external_query_run.ExternalQueryRunFacet( + externalQueryId="999", source="source" + ), }, error=ValueError("Error message"), ) @@ -518,6 +542,10 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta "parent": parent_run.ParentRunFacet( run=parent_run.Run(runId=parent_run_id), job=parent_run.Job(namespace=namespace(), name="parent_job_name"), + root=parent_run.Root( + run=parent_run.RootRun(runId=parent_run_id), + job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), + ), ), "processing_engine": processing_engine_run.ProcessingEngineRunFacet( version=ANY, name="Airflow", openlineageAdapterVersion=ANY @@ -637,7 +665,17 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat nominal_end_time=event_time.isoformat(), owners=["airflow"], description=dag.description, - run_facets={"airflowDagRun": dag_run_facet}, + run_facets={ + "parent": parent_run.ParentRunFacet( + run=parent_run.Run(runId=random_uuid), + job=parent_run.Job(namespace=namespace(), name="parent_job_name"), + root=parent_run.Root( + run=parent_run.RootRun(runId=random_uuid), + job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), + ), + ), + "airflowDagRun": dag_run_facet, + }, job_facets=job_facets, ) @@ -649,6 +687,14 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat run=Run( runId=random_uuid, facets={ + "parent": parent_run.ParentRunFacet( + run=parent_run.Run(runId=random_uuid), + job=parent_run.Job(namespace=namespace(), name="parent_job_name"), + root=parent_run.Root( + run=parent_run.RootRun(runId=random_uuid), + job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), + ), + ), "nominalTime": nominal_time_run.NominalTimeRunFacet( nominalStartTime=event_time.isoformat(), nominalEndTime=event_time.isoformat(), @@ -750,7 +796,17 @@ def test_emit_dag_complete_event( clear_number=0, dag_run_state=DagRunState.SUCCESS, task_ids=["task_0", "task_1", "task_2.test"], - run_facets={"airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run)}, + run_facets={ + "parent": parent_run.ParentRunFacet( + run=parent_run.Run(runId=random_uuid), + job=parent_run.Job(namespace=namespace(), name="parent_job_name"), + root=parent_run.Root( + run=parent_run.RootRun(runId=random_uuid), + job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), + ), + ), + "airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run), + }, ) client.emit.assert_called_once_with( @@ -760,6 +816,14 @@ def test_emit_dag_complete_event( run=Run( runId=random_uuid, facets={ + "parent": parent_run.ParentRunFacet( + run=parent_run.Run(runId=random_uuid), + job=parent_run.Job(namespace=namespace(), name="parent_job_name"), + root=parent_run.Root( + run=parent_run.RootRun(runId=random_uuid), + job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), + ), + ), "airflowState": AirflowStateRunFacet( dagRunState=DagRunState.SUCCESS, tasksState={ @@ -846,7 +910,17 @@ def test_emit_dag_failed_event( dag_run_state=DagRunState.FAILED, task_ids=["task_0", "task_1", "task_2.test"], msg="error msg", - run_facets={"airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run)}, + run_facets={ + "parent": parent_run.ParentRunFacet( + run=parent_run.Run(runId=random_uuid), + job=parent_run.Job(namespace=namespace(), name="parent_job_name"), + root=parent_run.Root( + run=parent_run.RootRun(runId=random_uuid), + job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), + ), + ), + "airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run), + }, ) client.emit.assert_called_once_with( @@ -856,6 +930,14 @@ def test_emit_dag_failed_event( run=Run( runId=random_uuid, facets={ + "parent": parent_run.ParentRunFacet( + run=parent_run.Run(runId=random_uuid), + job=parent_run.Job(namespace=namespace(), name="parent_job_name"), + root=parent_run.Root( + run=parent_run.RootRun(runId=random_uuid), + job=parent_run.RootJob(namespace=namespace(), name="parent_job_name"), + ), + ), "errorMessage": error_message_run.ErrorMessageRunFacet( message="error msg", programmingLanguage="python" ), diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py index 88de93e6e2877..ecc5f5cc7c1d4 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py @@ -39,13 +39,9 @@ from tests_common.test_utils.db import clear_db_runs from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS -if AIRFLOW_V_3_0_PLUS: - from airflow.utils.types import DagRunTriggeredByType - -TEST_DAG_FOLDER = os.environ["AIRFLOW__CORE__DAGS_FOLDER"] +TEST_DAG_FOLDER = os.path.join(os.path.dirname(os.path.dirname(__file__)), "dags") DEFAULT_DATE = timezone.datetime(2016, 1, 1) - log = logging.getLogger(__name__) @@ -88,7 +84,8 @@ def setup_job(self, task_name, run_id): shutil.rmtree(dirpath) dirpath.mkdir(exist_ok=True, parents=True) lm = get_listener_manager() - lm.add_listener(OpenLineageListener()) + listener = OpenLineageListener() + lm.add_listener(listener) dagbag = DagBag( dag_folder=TEST_DAG_FOLDER, @@ -97,22 +94,15 @@ def setup_job(self, task_name, run_id): dag = dagbag.dags.get("test_openlineage_execution") task = dag.get_task(task_name) - if AIRFLOW_V_3_0_PLUS: - dagrun_kwargs = { - "logical_date": DEFAULT_DATE, - "run_after": DEFAULT_DATE, - "triggered_by": DagRunTriggeredByType.TEST, - } - else: - dagrun_kwargs = {"execution_date": DEFAULT_DATE} dag.create_dagrun( run_id=run_id, run_type=DagRunType.MANUAL, data_interval=(DEFAULT_DATE, DEFAULT_DATE), state=State.RUNNING, start_date=DEFAULT_DATE, - **dagrun_kwargs, + execution_date=DEFAULT_DATE, ) + ti = TaskInstance(task=task, run_id=run_id) job = Job(id=random.randint(0, 23478197), dag_id=ti.dag_id) job_runner = LocalTaskJobRunner(job=job, task_instance=ti, ignore_ti_state=True) @@ -207,22 +197,15 @@ def test_success_overtime_kills_tasks(self): dag = dagbag.dags.get("test_openlineage_execution") task = dag.get_task("execute_long_stall") - if AIRFLOW_V_3_0_PLUS: - dagrun_kwargs = { - "logical_date": DEFAULT_DATE, - "run_after": DEFAULT_DATE, - "triggered_by": DagRunTriggeredByType.TEST, - } - else: - dagrun_kwargs = {"execution_date": DEFAULT_DATE} dag.create_dagrun( run_id="test_long_stalled_task_is_killed_by_listener_overtime_if_ol_timeout_long_enough", run_type=DagRunType.MANUAL, data_interval=(DEFAULT_DATE, DEFAULT_DATE), state=State.RUNNING, start_date=DEFAULT_DATE, - **dagrun_kwargs, + execution_date=DEFAULT_DATE, ) + ti = TaskInstance( task=task, run_id="test_long_stalled_task_is_killed_by_listener_overtime_if_ol_timeout_long_enough", diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py index 3dc2c938835af..3ef2c13caa687 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py @@ -269,6 +269,7 @@ def mock_task_id(dag_id, task_id, try_number, logical_date, map_index): @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") + @mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_mapped_task_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") @@ -282,6 +283,7 @@ def test_adapter_start_task_is_called_with_proper_arguments( mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, + mock_get_task_parent_run_facet, mock_disabled, mock_debug_mode, ): @@ -298,6 +300,7 @@ def test_adapter_start_task_is_called_with_proper_arguments( mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} + mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_disabled.return_value = False listener.on_task_instance_running(None, task_instance, None) @@ -306,14 +309,13 @@ def test_adapter_start_task_is_called_with_proper_arguments( job_name="job_name", job_description="Test DAG Description", event_time="2023-01-01T13:01:01+00:00", - parent_job_name="dag_id", - parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0", code_location=None, nominal_start_time=None, nominal_end_time=None, owners=["Test Owner"], task=listener.extractor_manager.extract_metadata(), run_facets={ + "parent": 4, "mapped_facet": 1, "custom_user_facet": 2, "airflow_run_facet": 3, @@ -323,6 +325,7 @@ def test_adapter_start_task_is_called_with_proper_arguments( @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") + @mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @@ -336,6 +339,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments( mock_get_job_name, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, + mock_get_task_parent_run_facet, mock_disabled, mock_debug_mode, ): @@ -352,6 +356,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments( mock_get_job_name.return_value = "job_name" mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} + mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_disabled.return_value = False err = ValueError("test") @@ -364,11 +369,10 @@ def test_adapter_fail_task_is_called_with_proper_arguments( listener.adapter.fail_task.assert_called_once_with( end_time="2023-01-03T13:01:01+00:00", job_name="job_name", - parent_job_name="dag_id", - parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0", run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", task=listener.extractor_manager.extract_metadata(), run_facets={ + "parent": 4, "custom_user_facet": 2, "airflow": {"task": "..."}, "debug": AirflowDebugRunFacet(packages=ANY), @@ -378,6 +382,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments( @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") + @mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @@ -391,6 +396,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments( mock_get_job_name, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, + mock_get_task_parent_run_facet, mock_disabled, mock_debug_mode, ): @@ -407,6 +413,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments( mock_get_job_name.return_value = "job_name" mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} + mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_disabled.return_value = False listener.on_task_instance_success(None, task_instance, None) @@ -417,11 +424,10 @@ def test_adapter_complete_task_is_called_with_proper_arguments( assert calls[0][1] == dict( end_time="2023-01-03T13:01:01+00:00", job_name="job_name", - parent_job_name="dag_id", - parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0", run_id=f"2020-01-01T01:01:01+00:00.dag_id.task_id.{EXPECTED_TRY_NUMBER_1}.-1", task=listener.extractor_manager.extract_metadata(), run_facets={ + "parent": 4, "custom_user_facet": 2, "airflow": {"task": "..."}, "debug": AirflowDebugRunFacet(packages=ANY), @@ -924,6 +930,7 @@ def mock_task_id(dag_id, task_id, try_number, logical_date, map_index): @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") + @mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_mapped_task_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") @@ -937,6 +944,7 @@ def test_adapter_start_task_is_called_with_proper_arguments( mock_get_airflow_mapped_task_facet, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, + mock_get_task_parent_run_facet, mock_disabled, mock_debug_mode, ): @@ -953,6 +961,7 @@ def test_adapter_start_task_is_called_with_proper_arguments( mock_get_airflow_mapped_task_facet.return_value = {"mapped_facet": 1} mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow_run_facet": 3} + mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_disabled.return_value = False listener.on_task_instance_running(None, task_instance) @@ -961,8 +970,6 @@ def test_adapter_start_task_is_called_with_proper_arguments( job_name="job_name", job_description="Test DAG Description", event_time="2023-01-01T13:01:01+00:00", - parent_job_name="dag_id", - parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0", code_location=None, nominal_start_time=None, nominal_end_time=None, @@ -972,12 +979,14 @@ def test_adapter_start_task_is_called_with_proper_arguments( "mapped_facet": 1, "custom_user_facet": 2, "airflow_run_facet": 3, + "parent": 4, "debug": AirflowDebugRunFacet(packages=ANY), }, ) @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") + @mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @@ -991,6 +1000,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments( mock_get_job_name, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, + mock_get_task_parent_run_facet, mock_disabled, mock_debug_mode, ): @@ -1007,6 +1017,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments( mock_get_job_name.return_value = "job_name" mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} + mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_disabled.return_value = False err = ValueError("test") @@ -1019,11 +1030,10 @@ def test_adapter_fail_task_is_called_with_proper_arguments( listener.adapter.fail_task.assert_called_once_with( end_time="2023-01-03T13:01:01+00:00", job_name="job_name", - parent_job_name="dag_id", - parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0", run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", task=listener.extractor_manager.extract_metadata(), run_facets={ + "parent": 4, "custom_user_facet": 2, "airflow": {"task": "..."}, "debug": AirflowDebugRunFacet(packages=ANY), @@ -1032,6 +1042,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments( ) @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) + @mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call @@ -1041,6 +1052,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments_for_db_task_instance_ self, mock_utcnow, mock_get_job_name, + mock_get_task_parent_run_facet, mock_debug_mode, ): """Tests that the 'fail_task' method of the OpenLineageAdapter is invoked with the correct arguments. @@ -1051,6 +1063,7 @@ def test_adapter_fail_task_is_called_with_proper_arguments_for_db_task_instance_ listener, task_instance = self._create_listener_and_task_instance(runtime_ti=False) mock_get_job_name.return_value = "job_name" + mock_get_task_parent_run_facet.return_value = {"parent": 4} err = ValueError("test") on_task_failed_listener_kwargs = {"error": err} @@ -1059,19 +1072,24 @@ def test_adapter_fail_task_is_called_with_proper_arguments_for_db_task_instance_ listener.on_task_instance_failed( previous_state=None, task_instance=task_instance, **on_task_failed_listener_kwargs ) + mock_get_task_parent_run_facet.assert_called_once_with( + parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0", parent_job_name=task_instance.dag_id + ) listener.adapter.fail_task.assert_called_once_with( end_time="2023-01-03T13:01:01+00:00", job_name="job_name", - parent_job_name="dag_id", - parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0", run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", task=OperatorLineage(), - run_facets={"debug": AirflowDebugRunFacet(packages=ANY)}, + run_facets={ + "parent": 4, + "debug": AirflowDebugRunFacet(packages=ANY), + }, **expected_err_kwargs, ) @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) @mock.patch("airflow.providers.openlineage.plugins.listener.is_operator_disabled") + @mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_airflow_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_user_provided_run_facets") @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @@ -1085,6 +1103,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments( mock_get_job_name, mock_get_user_provided_run_facets, mock_get_airflow_run_facet, + mock_get_task_parent_run_facet, mock_disabled, mock_debug_mode, ): @@ -1101,6 +1120,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments( mock_get_job_name.return_value = "job_name" mock_get_user_provided_run_facets.return_value = {"custom_user_facet": 2} mock_get_airflow_run_facet.return_value = {"airflow": {"task": "..."}} + mock_get_task_parent_run_facet.return_value = {"parent": 4} mock_disabled.return_value = False listener.on_task_instance_success(None, task_instance) @@ -1109,11 +1129,10 @@ def test_adapter_complete_task_is_called_with_proper_arguments( assert calls[0][1] == dict( end_time="2023-01-03T13:01:01+00:00", job_name="job_name", - parent_job_name="dag_id", - parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0", run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", task=listener.extractor_manager.extract_metadata(), run_facets={ + "parent": 4, "custom_user_facet": 2, "airflow": {"task": "..."}, "debug": AirflowDebugRunFacet(packages=ANY), @@ -1121,6 +1140,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments( ) @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) + @mock.patch("airflow.providers.openlineage.plugins.listener.get_task_parent_run_facet") @mock.patch("airflow.providers.openlineage.plugins.listener.get_job_name") @mock.patch( "airflow.providers.openlineage.plugins.listener.OpenLineageListener._execute", new=regular_call @@ -1130,6 +1150,7 @@ def test_adapter_complete_task_is_called_with_proper_arguments_for_db_task_insta self, mock_utcnow, mock_get_job_name, + mock_get_task_parent_run_facet, mock_debug_mode, ): """Tests that the 'complete_task' method of the OpenLineageAdapter is called with the correct arguments. @@ -1139,18 +1160,23 @@ def test_adapter_complete_task_is_called_with_proper_arguments_for_db_task_insta """ listener, task_instance = self._create_listener_and_task_instance(runtime_ti=False) mock_get_job_name.return_value = "job_name" + mock_get_task_parent_run_facet.return_value = {"parent": 4} listener.on_task_instance_success(None, task_instance) calls = listener.adapter.complete_task.call_args_list assert len(calls) == 1 + mock_get_task_parent_run_facet.assert_called_once_with( + parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0", parent_job_name=task_instance.dag_id + ) assert calls[0][1] == dict( end_time="2023-01-03T13:01:01+00:00", job_name="job_name", - parent_job_name="dag_id", - parent_run_id="2020-01-01T01:01:01+00:00.dag_id.0", run_id="2020-01-01T01:01:01+00:00.dag_id.task_id.1.-1", task=OperatorLineage(), - run_facets={"debug": AirflowDebugRunFacet(packages=ANY)}, + run_facets={ + "parent": 4, + "debug": AirflowDebugRunFacet(packages=ANY), + }, ) @mock.patch( diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_spark.py b/providers/openlineage/tests/unit/openlineage/utils/test_spark.py index dba2f33712876..5403dab5497a8 100644 --- a/providers/openlineage/tests/unit/openlineage/utils/test_spark.py +++ b/providers/openlineage/tests/unit/openlineage/utils/test_spark.py @@ -40,8 +40,8 @@ task_id="task_id", try_number=1, map_index=1, - dag_run=MagicMock(logical_date=dt.datetime(2024, 11, 11)), logical_date=dt.datetime(2024, 11, 11), + dag_run=MagicMock(logical_date=dt.datetime(2024, 11, 11), clear_number=0), ) } EXAMPLE_HTTP_TRANSPORT_CONFIG = { @@ -74,6 +74,9 @@ "spark.openlineage.parentJobName": "dag_id.task_id", "spark.openlineage.parentJobNamespace": "default", "spark.openlineage.parentRunId": "01931885-2800-7be7-aa8d-aaa15c337267", + "spark.openlineage.rootParentRunId": "01931885-2800-799d-8041-88a263ffa0d8", + "spark.openlineage.rootParentJobName": "dag_id", + "spark.openlineage.rootParentJobNamespace": "default", } EXAMPLE_TRANSPORT_SPARK_PROPERTIES = { "spark.openlineage.transport.type": "http", @@ -174,6 +177,9 @@ def test_get_transport_information_as_spark_properties_composite_transport_type( "spark.openlineage.parentJobNamespace": "another_namespace", "spark.openlineage.parentJobName": "another_job_name", "spark.openlineage.parentRunId": "another_run_id", + "spark.openlineage.rootParentJobNamespace": "another_namespace", + "spark.openlineage.rootParentJobName": "another_job_name", + "spark.openlineage.rootParentRunId": "another_run_id", }, True, ), diff --git a/providers/snowflake/pyproject.toml b/providers/snowflake/pyproject.toml index 7a597d879bfad..894cd54bf8369 100644 --- a/providers/snowflake/pyproject.toml +++ b/providers/snowflake/pyproject.toml @@ -75,7 +75,7 @@ dependencies = [ # Any change in the dependencies is preserved when the file is regenerated [project.optional-dependencies] "openlineage" = [ - "apache-airflow-providers-openlineage" + "apache-airflow-providers-openlineage>=2.3.0" ] [dependency-groups] diff --git a/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py b/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py index 14d682c50cb9b..9016e9d56dfbe 100644 --- a/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py +++ b/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py @@ -97,7 +97,22 @@ def fix_snowflake_sqlalchemy_uri(uri: str) -> str: return urlunparse((parts.scheme, hostname, parts.path, parts.params, parts.query, parts.fragment)) -# todo: move this run_id logic into OpenLineage's listener to avoid differences +def _get_logical_date(task_instance): + # todo: remove when min airflow version >= 3.0 + if AIRFLOW_V_3_0_PLUS: + dagrun = task_instance.get_template_context()["dag_run"] + return dagrun.logical_date or dagrun.run_after + + if hasattr(task_instance, "logical_date"): + date = task_instance.logical_date + else: + date = task_instance.execution_date + + return date + + # todo: move this run_id logic into OpenLineage's listener to avoid differences + + def _get_ol_run_id(task_instance) -> str: """ Get OpenLineage run_id from TaskInstance. @@ -108,29 +123,27 @@ def _get_ol_run_id(task_instance) -> str: """ from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter - def _get_logical_date(): - # todo: remove when min airflow version >= 3.0 - if AIRFLOW_V_3_0_PLUS: - dagrun = task_instance.get_template_context()["dag_run"] - return dagrun.logical_date or dagrun.run_after - - if hasattr(task_instance, "logical_date"): - date = task_instance.logical_date - else: - date = task_instance.execution_date - - return date - # Generate same OL run id as is generated for current task instance return OpenLineageAdapter.build_task_instance_run_id( dag_id=task_instance.dag_id, task_id=task_instance.task_id, - logical_date=_get_logical_date(), + logical_date=_get_logical_date(task_instance), try_number=task_instance.try_number, map_index=task_instance.map_index, ) +# todo: move this run_id logic into OpenLineage's listener to avoid differences +def _get_ol_dag_run_id(task_instance) -> str: + from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter + + return OpenLineageAdapter.build_dag_run_id( + dag_id=task_instance.dag_id, + logical_date=_get_logical_date(task_instance), + clear_number=task_instance.dag_run.clear_number, + ) + + def _get_parent_run_facet(task_instance): """ Retrieve the ParentRunFacet associated with a specific Airflow task instance. @@ -144,6 +157,7 @@ def _get_parent_run_facet(task_instance): from airflow.providers.openlineage.conf import namespace parent_run_id = _get_ol_run_id(task_instance) + root_parent_run_id = _get_ol_dag_run_id(task_instance) return parent_run.ParentRunFacet( run=parent_run.Run(runId=parent_run_id), @@ -151,6 +165,13 @@ def _get_parent_run_facet(task_instance): namespace=namespace(), name=f"{task_instance.dag_id}.{task_instance.task_id}", ), + root=parent_run.Root( + run=parent_run.RootRun(runId=root_parent_run_id), + job=parent_run.RootJob( + name=task_instance.dag_id, + namespace=namespace(), + ), + ), ) @@ -218,7 +239,7 @@ def _create_snowflake_event_pair( return start, end -@require_openlineage_version(provider_min_version="2.0.0") +@require_openlineage_version(provider_min_version="2.3.0") def emit_openlineage_events_for_snowflake_queries( query_ids: list[str], query_source_namespace: str, diff --git a/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake.py b/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake.py index f3278cd990c9b..f22fde497b96c 100644 --- a/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake.py +++ b/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake.py @@ -860,7 +860,7 @@ def test_get_openlineage_database_specific_lineage_with_old_openlineage_provider hook.get_openlineage_database_info = lambda x: mock.MagicMock(authority="auth", scheme="scheme") expected_err = ( - "OpenLineage provider version `1.99.0` is lower than required `2.0.0`, " + "OpenLineage provider version `1.99.0` is lower than required `2.3.0`, " "skipping function `emit_openlineage_events_for_snowflake_queries` execution" ) with pytest.raises(AirflowOptionalProviderFeatureException, match=expected_err): diff --git a/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py b/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py index a6c94a7a383ca..f6aad261e2d2a 100644 --- a/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py +++ b/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py @@ -304,9 +304,10 @@ def test_create_snowflake_event_pair_success(mock_generate_uuid, is_successful): assert start_event.job == end_event.job +@mock.patch("importlib.metadata.version", return_value="2.3.0") @mock.patch("openlineage.client.uuid.generate_new_uuid") @mock.patch("airflow.utils.timezone.utcnow") -def test_emit_openlineage_events_for_snowflake_queries_with_hook(mock_now, mock_generate_uuid): +def test_emit_openlineage_events_for_snowflake_queries_with_hook(mock_now, mock_generate_uuid, mock_version): fake_uuid = "01958e68-03a2-79e3-9ae9-26865cc40e2f" mock_generate_uuid.return_value = fake_uuid @@ -323,6 +324,7 @@ def test_emit_openlineage_events_for_snowflake_queries_with_hook(mock_now, mock_ try_number=1, logical_date=logical_date, state=TaskInstanceState.FAILED, # This will be query default state if no metadata found + dag_run=mock.MagicMock(logical_date=logical_date, clear_number=0), ) mock_ti.get_template_context.return_value = {"dag_run": mock.MagicMock(logical_date=logical_date)} @@ -387,6 +389,10 @@ def test_emit_openlineage_events_for_snowflake_queries_with_hook(mock_now, mock_ "parent": parent_run.ParentRunFacet( run=parent_run.Run(runId="01941f29-7c00-7087-8906-40e512c257bd"), job=parent_run.Job(namespace=namespace(), name="dag_id.task_id"), + root=parent_run.Root( + run=parent_run.RootRun(runId="01941f29-7c00-743e-b109-28b18d0a19c5"), + job=parent_run.RootJob(namespace=namespace(), name="dag_id"), + ), ), "custom_run": "value_run", } @@ -535,9 +541,12 @@ def test_emit_openlineage_events_for_snowflake_queries_with_hook(mock_now, mock_ assert fake_client.emit.call_args_list == expected_calls +@mock.patch("importlib.metadata.version", return_value="2.3.0") @mock.patch("openlineage.client.uuid.generate_new_uuid") @mock.patch("airflow.utils.timezone.utcnow") -def test_emit_openlineage_events_for_snowflake_queries_without_hook(mock_now, mock_generate_uuid): +def test_emit_openlineage_events_for_snowflake_queries_without_hook( + mock_now, mock_generate_uuid, mock_version +): fake_uuid = "01958e68-03a2-79e3-9ae9-26865cc40e2f" mock_generate_uuid.return_value = fake_uuid @@ -554,8 +563,11 @@ def test_emit_openlineage_events_for_snowflake_queries_without_hook(mock_now, mo try_number=1, logical_date=logical_date, state=TaskInstanceState.SUCCESS, # This will be query default state if no metadata found + dag_run=mock.MagicMock(logical_date=logical_date, clear_number=0), ) - mock_ti.get_template_context.return_value = {"dag_run": mock.MagicMock(logical_date=logical_date)} + mock_ti.get_template_context.return_value = { + "dag_run": mock.MagicMock(logical_date=logical_date, clear_number=0) + } additional_run_facets = {"custom_run": "value_run"} additional_job_facets = {"custom_job": "value_job"} @@ -593,6 +605,10 @@ def test_emit_openlineage_events_for_snowflake_queries_without_hook(mock_now, mo "parent": parent_run.ParentRunFacet( run=parent_run.Run(runId="01941f29-7c00-7087-8906-40e512c257bd"), job=parent_run.Job(namespace=namespace(), name="dag_id.task_id"), + root=parent_run.Root( + run=parent_run.RootRun(runId="01941f29-7c00-743e-b109-28b18d0a19c5"), + job=parent_run.RootJob(namespace=namespace(), name="dag_id"), + ), ), "custom_run": "value_run", } @@ -643,7 +659,8 @@ def test_emit_openlineage_events_for_snowflake_queries_without_hook(mock_now, mo assert fake_client.emit.call_args_list == expected_calls -def test_emit_openlineage_events_for_snowflake_queries_without_query_ids(): +@mock.patch("importlib.metadata.version", return_value="2.3.0") +def test_emit_openlineage_events_for_snowflake_queries_without_query_ids(mock_version): query_ids = [] original_query_ids = copy.deepcopy(query_ids) @@ -666,7 +683,7 @@ def test_emit_openlineage_events_for_snowflake_queries_without_query_ids(): fake_client.emit.assert_not_called() # No events should be emitted -# emit_openlineage_events_for_snowflake_queries requires OL provider 2.0.0 +# emit_openlineage_events_for_snowflake_queries requires OL provider 2.3.0 @mock.patch("importlib.metadata.version", return_value="1.99.0") def test_emit_openlineage_events_with_old_openlineage_provider(mock_version): query_ids = ["q1", "q2"] @@ -682,7 +699,7 @@ def test_emit_openlineage_events_with_old_openlineage_provider(mock_version): return_value=fake_listener, ): expected_err = ( - "OpenLineage provider version `1.99.0` is lower than required `2.0.0`, " + "OpenLineage provider version `1.99.0` is lower than required `2.3.0`, " "skipping function `emit_openlineage_events_for_snowflake_queries` execution" ) diff --git a/pyproject.toml b/pyproject.toml index 48b603047a684..8b110fe868851 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -281,7 +281,7 @@ packages = [] "apache-airflow-providers-openfaas>=3.7.0" ] "openlineage" = [ - "apache-airflow-providers-openlineage>=2.1.3" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py + "apache-airflow-providers-openlineage>=2.3.0" # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py ] "opensearch" = [ "apache-airflow-providers-opensearch>=1.5.0" @@ -445,7 +445,7 @@ packages = [] "apache-airflow-providers-odbc>=4.8.0", "apache-airflow-providers-openai>=1.5.0", "apache-airflow-providers-openfaas>=3.7.0", - "apache-airflow-providers-openlineage>=2.1.3", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py + "apache-airflow-providers-openlineage>=2.3.0", # Set from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py "apache-airflow-providers-opensearch>=1.5.0", "apache-airflow-providers-opsgenie>=5.8.0", "apache-airflow-providers-oracle>=3.12.0", diff --git a/scripts/ci/pre_commit/update_airflow_pyproject_toml.py b/scripts/ci/pre_commit/update_airflow_pyproject_toml.py index 8bf1fb2ca2fdf..7c6e6dbe7c208 100755 --- a/scripts/ci/pre_commit/update_airflow_pyproject_toml.py +++ b/scripts/ci/pre_commit/update_airflow_pyproject_toml.py @@ -57,7 +57,7 @@ MIN_VERSION_OVERRIDE: dict[str, Version] = { "amazon": parse_version("2.1.3"), "fab": parse_version("2.0.2"), - "openlineage": parse_version("2.1.3"), + "openlineage": parse_version("2.3.0"), "git": parse_version("0.0.2"), "common.messaging": parse_version("1.0.1"), }