From 2db5db2f280690207247d313bbbd7055779a51b6 Mon Sep 17 00:00:00 2001 From: Luciano Resende Date: Sun, 29 Dec 2024 07:30:40 -0800 Subject: [PATCH] Support Airflow 2.x operators (runtime specific components) --- ...est-practices-custom-pipeline-components.md | 4 ++-- docs/source/user_guide/pipeline-components.md | 4 ++-- .../package_catalog_connector/README.md | 4 ++-- .../airflow_package_catalog_connector.py | 11 +++++++---- .../airflow/test_airflow_package_connector.py | 18 ++++++++---------- .../test_airflow_provider_package_connector.py | 8 ++++---- .../components/airflow_test_operator.py | 11 ++++++----- .../airflow_test_operator_no_inputs.py | 3 ++- .../airflow_test_operator_type_hints.py | 3 ++- 9 files changed, 35 insertions(+), 31 deletions(-) diff --git a/docs/source/user_guide/best-practices-custom-pipeline-components.md b/docs/source/user_guide/best-practices-custom-pipeline-components.md index 3ea6dcada..2b9f008c4 100644 --- a/docs/source/user_guide/best-practices-custom-pipeline-components.md +++ b/docs/source/user_guide/best-practices-custom-pipeline-components.md @@ -131,9 +131,9 @@ The [URL component catalog](pipeline-components.html#pipeline-components.html#ur The [Apache Airflow package catalog](pipeline-components.html#apache-airflow-package-catalog) provides access to Apache Airflow operators that are stored in built distributions. 1. Take note of the displayed `airflow_package`, which identifies the Apache Airflow built distribution that includes the missing operator. -1. [Add a new Apache Airflow package catalog](pipeline-components.html#adding-a-component-catalog), providing the _download URL_ for the listed distribution as input. For example, if the value of `airflow_package` is `apache_airflow-1.10.15-py2.py3-none-any.whl`, specify as URL +1. [Add a new Apache Airflow package catalog](pipeline-components.html#adding-a-component-catalog), providing the _download URL_ for the listed distribution as input. For example, if the value of `airflow_package` is `apache_airflow-2.10.4-py2.py3-none-any.whl`, specify as URL ``` - https://files.pythonhosted.org/packages/f0/3a/f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/apache_airflow-1.10.15-py2.py3-none-any.whl + https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl ``` #### Apache Airflow provider package catalog (type: `airflow-provider-package-catalog`) diff --git a/docs/source/user_guide/pipeline-components.md b/docs/source/user_guide/pipeline-components.md index 9603d0297..06537bedb 100644 --- a/docs/source/user_guide/pipeline-components.md +++ b/docs/source/user_guide/pipeline-components.md @@ -445,11 +445,11 @@ The [Apache Airflow package catalog connector](https://github.com/elyra-ai/elyra Examples: - [Apache Airflow](https://pypi.org/project/apache-airflow/) (v1.10.15): ``` - https://files.pythonhosted.org/packages/f0/3a/f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/apache_airflow-1.10.15-py2.py3-none-any.whl + https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl ``` - Local copy of a downloaded Apache Airflow package ``` - file:///absolute/path/to/apache_airflow-1.10.15-py2.py3-none-any.whl + file:///absolute/path/to/apache_airflow-2.10.4-py2.py3-none-any.whl ``` #### Apache Airflow provider package catalog diff --git a/elyra/pipeline/airflow/package_catalog_connector/README.md b/elyra/pipeline/airflow/package_catalog_connector/README.md index 5209b84f5..a93ae0f0c 100644 --- a/elyra/pipeline/airflow/package_catalog_connector/README.md +++ b/elyra/pipeline/airflow/package_catalog_connector/README.md @@ -19,7 +19,7 @@ If the Airflow package is stored on PyPI: 1. Search for the Apache Airflow package on PyPI. 1. Open the package's release history and choose the desired version. 1. Open the `Download files` link. - 1. Copy the download link for the package's wheel. ([Example download URL for Apache Airflow 1.10.15](https://files.pythonhosted.org/packages/f0/3a/f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/apache_airflow-1.10.15-py2.py3-none-any.whl)) + 1. Copy the download link for the package's wheel. ([Example download URL for Apache Airflow 2.10.4](https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl)) 1. Save the catalog entry. 1. Open the Visual Pipeline Editor and expand the palette. The loaded Apache Airflow operators are displayed. @@ -28,4 +28,4 @@ If the Airflow package is stored on PyPI: If the palette does not include the expected operators check the JupyterLab log file for error messages. Error messages include the component catalog name, as shown in this example: ``` Error. The Airflow package connector '' encountered an issue downloading ''. HTTP response code: -``` \ No newline at end of file +``` diff --git a/elyra/pipeline/airflow/package_catalog_connector/airflow_package_catalog_connector.py b/elyra/pipeline/airflow/package_catalog_connector/airflow_package_catalog_connector.py index 0469150bd..8240b2015 100644 --- a/elyra/pipeline/airflow/package_catalog_connector/airflow_package_catalog_connector.py +++ b/elyra/pipeline/airflow/package_catalog_connector/airflow_package_catalog_connector.py @@ -61,9 +61,9 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str # Read the user-supplied 'airflow_package_download_url', which is a required # input defined in the 'airflow-package-catalog-catalog.json' schema file. - # Example value: https://archive.apache.org/dist/airflow/1.10.15/apache_airflow-1.10.15-py2.py3-none-any.whl + # Example value: https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl airflow_package_download_url = catalog_metadata["airflow_package_download_url"] - # extract the package name, e.g. 'apache_airflow-1.10.15-py2.py3-none-any.whl' + # extract the package name, e.g. 'apache_airflow-2.10.4-py3-none-any.whl' airflow_package_name = Path(urlparse(airflow_package_download_url).path).name if not airflow_package_name: @@ -161,7 +161,7 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str # # Identify Python scripts that define classes that extend the - # airflow.models.BaseOperator class + # airflow.models.baseoperator.BaseOperator class # scripts_with_operator_class: List[str] = [] # Python scripts that contain operator definitions extends_baseoperator: List[str] = [] # Classes that extend BaseOperator @@ -188,8 +188,11 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str elif isinstance(node, ast.ImportFrom): node_module = node.module for name in node.names: - if "airflow.models" == node_module and name.name == "BaseOperator": + if "airflow.models.baseoperator" == node_module and name.name == "BaseOperator": imported_operator_classes.append(name.name) + # TODO: Support sensor operators + # if "airflow.sensors.base" == node_module and name.name == "BaseSensorOperator": + # imported_operator_classes.append(name.name) elif isinstance(node, ast.ClassDef): # determine whether this class extends the BaseOperator class self.log.debug(f"Analyzing class '{node.name}' in {script_id} ...") diff --git a/elyra/tests/pipeline/airflow/test_airflow_package_connector.py b/elyra/tests/pipeline/airflow/test_airflow_package_connector.py index 76380b955..b6557734c 100644 --- a/elyra/tests/pipeline/airflow/test_airflow_package_connector.py +++ b/elyra/tests/pipeline/airflow/test_airflow_package_connector.py @@ -23,10 +23,8 @@ ) from elyra.pipeline.catalog_connector import AirflowEntryData -AIRFLOW_1_10_15_PKG_URL = ( - "https://files.pythonhosted.org/packages/f0/3a/" - "f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/" - "apache_airflow-1.10.15-py2.py3-none-any.whl" +AIRFLOW_2_10_4_PKG_URL = ( + "https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl" ) AIRFLOW_SUPPORTED_FILE_TYPES = [".py"] @@ -115,19 +113,19 @@ def test_invalid_get_entry_data(): # ---------------------------------- -def test_1_10_15_distribution(): +def test_2_10_4_distribution(): """ - Test connector using Apache Airflow 1.10.15 built distribution. + Test connector using Apache Airflow 2.10.4 built distribution. """ apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES) # get catalog entries for the specified distribution - ces = apc.get_catalog_entries({"airflow_package_download_url": AIRFLOW_1_10_15_PKG_URL}) - # this distribution should contain 37 Python scripts with operator definitions - assert len(ces) == 37 + ces = apc.get_catalog_entries({"airflow_package_download_url": AIRFLOW_2_10_4_PKG_URL}) + # this distribution should contain 12 Python scripts with operator definitions + assert len(ces) == 11 # each entry must contain two keys for entry in ces: # built distribution package file name - assert entry.get("airflow_package") == "apache_airflow-1.10.15-py2.py3-none-any.whl" + assert entry.get("airflow_package") == "apache_airflow-2.10.4-py3-none-any.whl" # a Python script assert entry.get("file", "").endswith(".py") diff --git a/elyra/tests/pipeline/airflow/test_airflow_provider_package_connector.py b/elyra/tests/pipeline/airflow/test_airflow_provider_package_connector.py index 3ce6b858b..0f5dd7593 100644 --- a/elyra/tests/pipeline/airflow/test_airflow_provider_package_connector.py +++ b/elyra/tests/pipeline/airflow/test_airflow_provider_package_connector.py @@ -28,9 +28,9 @@ from elyra.pipeline.catalog_connector import AirflowEntryData HTTP_PROVIDER_PKG_URL = ( - "https://files.pythonhosted.org/packages/a1/08/" - "91653e9f394cbefe356ac07db809be7e69cc89b094379ad91d6cef3d2bc9/" - "apache_airflow_providers_http-2.0.2-py3-none-any.whl" + "https://files.pythonhosted.org/packages/bb/55/" + "9f38da212c29e1c429ddadef082f2bdaccebb0b0d9c88e72463ad25c786a/" + "apache_airflow_providers_http-5.0.0-py3-none-any.whl" ) AIRFLOW_SUPPORTED_FILE_TYPES = [".py"] @@ -134,7 +134,7 @@ def test_valid_url_http_provider_package(): # each entry must contain three keys for entry in ces: # provider package file name - assert entry.get("provider_package") == "apache_airflow_providers_http-2.0.2-py3-none-any.whl" + assert entry.get("provider_package") == "apache_airflow_providers_http-5.0.0-py3-none-any.whl" # provider name assert entry.get("provider") == "apache_airflow_providers_http" # a Python script diff --git a/elyra/tests/pipeline/resources/components/airflow_test_operator.py b/elyra/tests/pipeline/resources/components/airflow_test_operator.py index f63667dae..cd0b218ac 100644 --- a/elyra/tests/pipeline/resources/components/airflow_test_operator.py +++ b/elyra/tests/pipeline/resources/components/airflow_test_operator.py @@ -18,7 +18,8 @@ from typing import List from typing import Optional -from airflow.models import BaseOperator +from airflow.models.baseoperator import BaseOperator +from airflow.utils.context import Context # New context type in Airflow 2.x from airflow.operators.imported_operator import ImportedOperator # noqa TODO @@ -63,6 +64,7 @@ class TestOperator(BaseOperator): def __init__( self, + *, str_no_default, bool_no_default, int_no_default, @@ -82,12 +84,11 @@ def __init__( fallback_type=None, long_description_property=None, mounted_volumes=None, - *args, **kwargs, ): super().__init__(*args, **kwargs) - def execute(self, context: Any): + def execute(self, context: Context): pass @@ -132,7 +133,7 @@ def __init__( ): super().__init__(**kwargs) - def execute(self, context: Any): + def execute(self, context: Context): pass @@ -173,7 +174,7 @@ def __init__( ): super().__init__(**kwargs) - def execute(self, context: Any): + def execute(self, context: Context): pass diff --git a/elyra/tests/pipeline/resources/components/airflow_test_operator_no_inputs.py b/elyra/tests/pipeline/resources/components/airflow_test_operator_no_inputs.py index 4a654beff..f929bab60 100644 --- a/elyra/tests/pipeline/resources/components/airflow_test_operator_no_inputs.py +++ b/elyra/tests/pipeline/resources/components/airflow_test_operator_no_inputs.py @@ -15,7 +15,8 @@ # from typing import Any -from airflow.models import BaseOperator +from airflow.models.baseoperator import BaseOperator +from airflow.utils.context import Context # New context type in Airflow 2.x class TestOperatorNoInputs(BaseOperator): diff --git a/elyra/tests/pipeline/resources/components/airflow_test_operator_type_hints.py b/elyra/tests/pipeline/resources/components/airflow_test_operator_type_hints.py index 54ad5062b..de44ed321 100644 --- a/elyra/tests/pipeline/resources/components/airflow_test_operator_type_hints.py +++ b/elyra/tests/pipeline/resources/components/airflow_test_operator_type_hints.py @@ -17,7 +17,8 @@ from typing import List from typing import Optional -from airflow.models import BaseOperator +from airflow.models.baseoperator import BaseOperator +from airflow.utils.context import Context # New context type in Airflow 2.x from airflow.utils.decorators import apply_defaults