Skip to content

Commit

Permalink
Support Airflow 2.x operators (runtime specific components)
Browse files Browse the repository at this point in the history
Signed-off-by: Luciano Resende <lresende@apple.com>
  • Loading branch information
lresende committed Dec 29, 2024
1 parent b510626 commit 4a528d2
Show file tree
Hide file tree
Showing 10 changed files with 3,037 additions and 3,038 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ The [URL component catalog](pipeline-components.html#pipeline-components.html#ur
The [Apache Airflow package catalog](pipeline-components.html#apache-airflow-package-catalog) provides access to Apache Airflow operators that are stored in built distributions.

1. Take note of the displayed `airflow_package`, which identifies the Apache Airflow built distribution that includes the missing operator.
1. [Add a new Apache Airflow package catalog](pipeline-components.html#adding-a-component-catalog), providing the _download URL_ for the listed distribution as input. For example, if the value of `airflow_package` is `apache_airflow-1.10.15-py2.py3-none-any.whl`, specify as URL
1. [Add a new Apache Airflow package catalog](pipeline-components.html#adding-a-component-catalog), providing the _download URL_ for the listed distribution as input. For example, if the value of `airflow_package` is `apache_airflow-2.10.4-py2.py3-none-any.whl`, specify as URL
```
https://files.pythonhosted.org/packages/f0/3a/f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/apache_airflow-1.10.15-py2.py3-none-any.whl
https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl
```

#### Apache Airflow provider package catalog (type: `airflow-provider-package-catalog`)
Expand Down
4 changes: 2 additions & 2 deletions docs/source/user_guide/pipeline-components.md
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,11 @@ The [Apache Airflow package catalog connector](https://github.com/elyra-ai/elyra
Examples:
- [Apache Airflow](https://pypi.org/project/apache-airflow/) (v1.10.15):
```
https://files.pythonhosted.org/packages/f0/3a/f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/apache_airflow-1.10.15-py2.py3-none-any.whl
https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl
```
- Local copy of a downloaded Apache Airflow package
```
file:///absolute/path/to/apache_airflow-1.10.15-py2.py3-none-any.whl
file:///absolute/path/to/apache_airflow-2.10.4-py2.py3-none-any.whl
```

#### Apache Airflow provider package catalog
Expand Down
4 changes: 2 additions & 2 deletions elyra/pipeline/airflow/package_catalog_connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ If the Airflow package is stored on PyPI:
1. Search for the Apache Airflow package on PyPI.
1. Open the package's release history and choose the desired version.
1. Open the `Download files` link.
1. Copy the download link for the package's wheel. ([Example download URL for Apache Airflow 1.10.15](https://files.pythonhosted.org/packages/f0/3a/f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/apache_airflow-1.10.15-py2.py3-none-any.whl))
1. Copy the download link for the package's wheel. ([Example download URL for Apache Airflow 2.10.4](https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl))
1. Save the catalog entry.
1. Open the Visual Pipeline Editor and expand the palette. The loaded Apache Airflow operators are displayed.

Expand All @@ -28,4 +28,4 @@ If the Airflow package is stored on PyPI:
If the palette does not include the expected operators check the JupyterLab log file for error messages. Error messages include the component catalog name, as shown in this example:
```
Error. The Airflow package connector '<CATALOG_NAME>' encountered an issue downloading '<URL>'. HTTP response code: <HTTP_CODE>
```
```
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str

# Read the user-supplied 'airflow_package_download_url', which is a required
# input defined in the 'airflow-package-catalog-catalog.json' schema file.
# Example value: https://archive.apache.org/dist/airflow/1.10.15/apache_airflow-1.10.15-py2.py3-none-any.whl
# Example value: https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl
airflow_package_download_url = catalog_metadata["airflow_package_download_url"]
# extract the package name, e.g. 'apache_airflow-1.10.15-py2.py3-none-any.whl'
# extract the package name, e.g. 'apache_airflow-2.10.4-py3-none-any.whl'
airflow_package_name = Path(urlparse(airflow_package_download_url).path).name

if not airflow_package_name:
Expand Down Expand Up @@ -161,7 +161,7 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str

#
# Identify Python scripts that define classes that extend the
# airflow.models.BaseOperator class
# airflow.models.baseoperator.BaseOperator class
#
scripts_with_operator_class: List[str] = [] # Python scripts that contain operator definitions
extends_baseoperator: List[str] = [] # Classes that extend BaseOperator
Expand All @@ -188,8 +188,11 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str
elif isinstance(node, ast.ImportFrom):
node_module = node.module
for name in node.names:
if "airflow.models" == node_module and name.name == "BaseOperator":
if "airflow.models.baseoperator" == node_module and name.name == "BaseOperator":
imported_operator_classes.append(name.name)
# TODO: Support sensor operators
# if "airflow.sensors.base" == node_module and name.name == "BaseSensorOperator":
# imported_operator_classes.append(name.name)

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
elif isinstance(node, ast.ClassDef):
# determine whether this class extends the BaseOperator class
self.log.debug(f"Analyzing class '{node.name}' in {script_id} ...")
Expand Down
18 changes: 7 additions & 11 deletions elyra/tests/pipeline/airflow/test_airflow_package_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@
)
from elyra.pipeline.catalog_connector import AirflowEntryData

AIRFLOW_1_10_15_PKG_URL = (
"https://files.pythonhosted.org/packages/f0/3a/"
"f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/"
"apache_airflow-1.10.15-py2.py3-none-any.whl"
)
AIRFLOW_2_10_4_PKG_URL = "https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl"

AIRFLOW_SUPPORTED_FILE_TYPES = [".py"]

Expand Down Expand Up @@ -115,19 +111,19 @@ def test_invalid_get_entry_data():
# ----------------------------------


def test_1_10_15_distribution():
def test_2_10_4_distribution():
"""
Test connector using Apache Airflow 1.10.15 built distribution.
Test connector using Apache Airflow 2.10.4 built distribution.
"""
apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
# get catalog entries for the specified distribution
ces = apc.get_catalog_entries({"airflow_package_download_url": AIRFLOW_1_10_15_PKG_URL})
# this distribution should contain 37 Python scripts with operator definitions
assert len(ces) == 37
ces = apc.get_catalog_entries({"airflow_package_download_url": AIRFLOW_2_10_4_PKG_URL})
# this distribution should contain 12 Python scripts with operator definitions
assert len(ces) == 11
# each entry must contain two keys
for entry in ces:
# built distribution package file name
assert entry.get("airflow_package") == "apache_airflow-1.10.15-py2.py3-none-any.whl"
assert entry.get("airflow_package") == "apache_airflow-2.10.4-py3-none-any.whl"
# a Python script
assert entry.get("file", "").endswith(".py")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
from elyra.pipeline.catalog_connector import AirflowEntryData

HTTP_PROVIDER_PKG_URL = (
"https://files.pythonhosted.org/packages/a1/08/"
"91653e9f394cbefe356ac07db809be7e69cc89b094379ad91d6cef3d2bc9/"
"apache_airflow_providers_http-2.0.2-py3-none-any.whl"
"https://files.pythonhosted.org/packages/bb/55/"
"9f38da212c29e1c429ddadef082f2bdaccebb0b0d9c88e72463ad25c786a/"
"apache_airflow_providers_http-5.0.0-py3-none-any.whl"
)

AIRFLOW_SUPPORTED_FILE_TYPES = [".py"]
Expand Down Expand Up @@ -134,7 +134,7 @@ def test_valid_url_http_provider_package():
# each entry must contain three keys
for entry in ces:
# provider package file name
assert entry.get("provider_package") == "apache_airflow_providers_http-2.0.2-py3-none-any.whl"
assert entry.get("provider_package") == "apache_airflow_providers_http-5.0.0-py3-none-any.whl"
# provider name
assert entry.get("provider") == "apache_airflow_providers_http"
# a Python script
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from airflow.models import BaseOperator
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context # New context type in Airflow 2.x
from airflow.operators.imported_operator import ImportedOperator # noqa TODO


Expand Down Expand Up @@ -87,7 +87,7 @@ def __init__(
):
super().__init__(*args, **kwargs)

def execute(self, context: Any):
def execute(self, context: Context):
pass


Expand Down Expand Up @@ -132,7 +132,7 @@ def __init__(
):
super().__init__(**kwargs)

def execute(self, context: Any):
def execute(self, context: Context):
pass


Expand Down Expand Up @@ -173,7 +173,7 @@ def __init__(
):
super().__init__(**kwargs)

def execute(self, context: Any):
def execute(self, context: Context):
pass


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any

from airflow.models import BaseOperator
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context # New context type in Airflow 2.x


class TestOperatorNoInputs(BaseOperator):
Expand All @@ -27,5 +27,5 @@ class TestOperatorNoInputs(BaseOperator):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def execute(self, context: Any):
def execute(self, context: Context):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from typing import List
from typing import Optional

from airflow.models import BaseOperator
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults


Expand Down
Loading

0 comments on commit 4a528d2

Please sign in to comment.