From 82ca70fe0107dde550abcf5a431a3f40a21417fb Mon Sep 17 00:00:00 2001 From: rnhttr Date: Thu, 24 Jul 2025 16:51:10 -0400 Subject: [PATCH 1/3] add EmptyOperator with lineage --- .../openlineage/operators/__init__.py | 16 ++++++ .../providers/openlineage/operators/empty.py | 48 +++++++++++++++++ .../providers/openlineage/version_compat.py | 5 ++ .../unit/openlineage/operators/__init__.py | 16 ++++++ .../unit/openlineage/operators/test_empty.py | 54 +++++++++++++++++++ 5 files changed, 139 insertions(+) create mode 100644 providers/openlineage/src/airflow/providers/openlineage/operators/__init__.py create mode 100644 providers/openlineage/src/airflow/providers/openlineage/operators/empty.py create mode 100644 providers/openlineage/tests/unit/openlineage/operators/__init__.py create mode 100644 providers/openlineage/tests/unit/openlineage/operators/test_empty.py diff --git a/providers/openlineage/src/airflow/providers/openlineage/operators/__init__.py b/providers/openlineage/src/airflow/providers/openlineage/operators/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/openlineage/src/airflow/providers/openlineage/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/openlineage/src/airflow/providers/openlineage/operators/empty.py b/providers/openlineage/src/airflow/providers/openlineage/operators/empty.py new file mode 100644 index 0000000000000..6ac8754797f99 --- /dev/null +++ b/providers/openlineage/src/airflow/providers/openlineage/operators/empty.py @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.providers.openlineage.extractors.base import OperatorLineage +from airflow.providers.openlineage.version_compat import BaseOperator + +if TYPE_CHECKING: + from airflow.sdk.definitions.context import Context + + +class EmptyOperator(BaseOperator): + """ + Operator that does literally nothing. + + It can be used to group tasks in a DAG. + The task is evaluated by the scheduler but never processed by the executor. + """ + + ui_color = "#e8f7e4" + + def execute(self, context: Context): + pass + + def get_openlineage_facets_on_start(self) -> OperatorLineage: + return OperatorLineage() + + def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage: + return OperatorLineage() + + def get_openlineage_facets_on_failure(self, task_instance) -> OperatorLineage: + return OperatorLineage() diff --git a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py index 48d122b669696..6585864b830b2 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py +++ b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py @@ -33,3 +33,8 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperator +else: + from airflow.models import BaseOperator diff --git a/providers/openlineage/tests/unit/openlineage/operators/__init__.py b/providers/openlineage/tests/unit/openlineage/operators/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/openlineage/tests/unit/openlineage/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/openlineage/tests/unit/openlineage/operators/test_empty.py b/providers/openlineage/tests/unit/openlineage/operators/test_empty.py new file mode 100644 index 0000000000000..ab7427d63ad9a --- /dev/null +++ b/providers/openlineage/tests/unit/openlineage/operators/test_empty.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import Mock + +import pytest + +from airflow.providers.openlineage.extractors.base import OperatorLineage +from airflow.providers.openlineage.operators.empty import EmptyOperator + + +def test_execute_returns_none_and_does_not_raise(): + op = EmptyOperator(task_id="empty") + # Calling execute should do nothing and return None + result = op.execute(Mock()) + assert result is None + + +@pytest.mark.parametrize( + "method_name,use_task_instance", + [ + ("get_openlineage_facets_on_start", False), + ("get_openlineage_facets_on_complete", True), + ("get_openlineage_facets_on_failure", True), + ], +) +def test_openlineage_facets_methods_return_operator_lineage(method_name, use_task_instance): + op = EmptyOperator(task_id="empty") + method = getattr(op, method_name) + # Invoke with or without a mock for task_instance + if use_task_instance: + facets = method(Mock()) + else: + facets = method() + # Should return an OperatorLineage instance + assert isinstance(facets, OperatorLineage) + # Each call returns a fresh instance + second_call = method(Mock()) if use_task_instance else method() + assert facets is not second_call From 994bb832bbc45ac97f03ea21e858e36fc0cc76c8 Mon Sep 17 00:00:00 2001 From: rnhttr Date: Thu, 24 Jul 2025 16:54:27 -0400 Subject: [PATCH 2/3] add EmptyOperator with lineage --- .../src/airflow/providers/openlineage/version_compat.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py index 6585864b830b2..e7a259afb357c 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py +++ b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py @@ -38,3 +38,8 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: from airflow.sdk import BaseOperator else: from airflow.models import BaseOperator + +__all__ = [ + "AIRFLOW_V_3_0_PLUS", + "BaseOperator", +] From fc87a225280af636247aeed06dba5f30cebcfbaa Mon Sep 17 00:00:00 2001 From: rnhttr Date: Thu, 24 Jul 2025 17:33:33 -0400 Subject: [PATCH 3/3] update provider.yaml --- providers/openlineage/provider.yaml | 5 +++++ .../src/airflow/providers/openlineage/get_provider_info.py | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/providers/openlineage/provider.yaml b/providers/openlineage/provider.yaml index be450c269054c..bf77c0bc82497 100644 --- a/providers/openlineage/provider.yaml +++ b/providers/openlineage/provider.yaml @@ -68,6 +68,11 @@ integrations: logo: /docs/integration-logos/openlineage.svg tags: [protocol] +operators: + - integration-name: OpenLineage + python-modules: + - airflow.providers.openlineage.operators.empty + plugins: - name: openlineage plugin-class: airflow.providers.openlineage.plugins.openlineage.OpenLineageProviderPlugin diff --git a/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py b/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py index dd59cfacca419..3a26a47bdfdc7 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py +++ b/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py @@ -34,6 +34,12 @@ def get_provider_info(): "tags": ["protocol"], } ], + "operators": [ + { + "integration-name": "OpenLineage", + "python-modules": ["airflow.providers.openlineage.operators.empty"], + } + ], "plugins": [ { "name": "openlineage",