diff --git a/providers/openlineage/provider.yaml b/providers/openlineage/provider.yaml index be450c269054c..bf77c0bc82497 100644 --- a/providers/openlineage/provider.yaml +++ b/providers/openlineage/provider.yaml @@ -68,6 +68,11 @@ integrations: logo: /docs/integration-logos/openlineage.svg tags: [protocol] +operators: + - integration-name: OpenLineage + python-modules: + - airflow.providers.openlineage.operators.empty + plugins: - name: openlineage plugin-class: airflow.providers.openlineage.plugins.openlineage.OpenLineageProviderPlugin diff --git a/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py b/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py index dd59cfacca419..3a26a47bdfdc7 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py +++ b/providers/openlineage/src/airflow/providers/openlineage/get_provider_info.py @@ -34,6 +34,12 @@ def get_provider_info(): "tags": ["protocol"], } ], + "operators": [ + { + "integration-name": "OpenLineage", + "python-modules": ["airflow.providers.openlineage.operators.empty"], + } + ], "plugins": [ { "name": "openlineage", diff --git a/providers/openlineage/src/airflow/providers/openlineage/operators/__init__.py b/providers/openlineage/src/airflow/providers/openlineage/operators/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/openlineage/src/airflow/providers/openlineage/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/openlineage/src/airflow/providers/openlineage/operators/empty.py b/providers/openlineage/src/airflow/providers/openlineage/operators/empty.py new file mode 100644 index 0000000000000..6ac8754797f99 --- /dev/null +++ b/providers/openlineage/src/airflow/providers/openlineage/operators/empty.py @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.providers.openlineage.extractors.base import OperatorLineage +from airflow.providers.openlineage.version_compat import BaseOperator + +if TYPE_CHECKING: + from airflow.sdk.definitions.context import Context + + +class EmptyOperator(BaseOperator): + """ + Operator that does literally nothing. + + It can be used to group tasks in a DAG. + The task is evaluated by the scheduler but never processed by the executor. + """ + + ui_color = "#e8f7e4" + + def execute(self, context: Context): + pass + + def get_openlineage_facets_on_start(self) -> OperatorLineage: + return OperatorLineage() + + def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage: + return OperatorLineage() + + def get_openlineage_facets_on_failure(self, task_instance) -> OperatorLineage: + return OperatorLineage() diff --git a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py index 48d122b669696..e7a259afb357c 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py +++ b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py @@ -33,3 +33,13 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseOperator +else: + from airflow.models import BaseOperator + +__all__ = [ + "AIRFLOW_V_3_0_PLUS", + "BaseOperator", +] diff --git a/providers/openlineage/tests/unit/openlineage/operators/__init__.py b/providers/openlineage/tests/unit/openlineage/operators/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/openlineage/tests/unit/openlineage/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/openlineage/tests/unit/openlineage/operators/test_empty.py b/providers/openlineage/tests/unit/openlineage/operators/test_empty.py new file mode 100644 index 0000000000000..ab7427d63ad9a --- /dev/null +++ b/providers/openlineage/tests/unit/openlineage/operators/test_empty.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import Mock + +import pytest + +from airflow.providers.openlineage.extractors.base import OperatorLineage +from airflow.providers.openlineage.operators.empty import EmptyOperator + + +def test_execute_returns_none_and_does_not_raise(): + op = EmptyOperator(task_id="empty") + # Calling execute should do nothing and return None + result = op.execute(Mock()) + assert result is None + + +@pytest.mark.parametrize( + "method_name,use_task_instance", + [ + ("get_openlineage_facets_on_start", False), + ("get_openlineage_facets_on_complete", True), + ("get_openlineage_facets_on_failure", True), + ], +) +def test_openlineage_facets_methods_return_operator_lineage(method_name, use_task_instance): + op = EmptyOperator(task_id="empty") + method = getattr(op, method_name) + # Invoke with or without a mock for task_instance + if use_task_instance: + facets = method(Mock()) + else: + facets = method() + # Should return an OperatorLineage instance + assert isinstance(facets, OperatorLineage) + # Each call returns a fresh instance + second_call = method(Mock()) if use_task_instance else method() + assert facets is not second_call