Skip to content

Commit

Permalink
Support Airflow 2.x operators (runtime specific components)
Browse files Browse the repository at this point in the history
  • Loading branch information
lresende committed Dec 29, 2024
1 parent b510626 commit 2db5db2
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ The [URL component catalog](pipeline-components.html#pipeline-components.html#ur
The [Apache Airflow package catalog](pipeline-components.html#apache-airflow-package-catalog) provides access to Apache Airflow operators that are stored in built distributions.

1. Take note of the displayed `airflow_package`, which identifies the Apache Airflow built distribution that includes the missing operator.
1. [Add a new Apache Airflow package catalog](pipeline-components.html#adding-a-component-catalog), providing the _download URL_ for the listed distribution as input. For example, if the value of `airflow_package` is `apache_airflow-1.10.15-py2.py3-none-any.whl`, specify as URL
1. [Add a new Apache Airflow package catalog](pipeline-components.html#adding-a-component-catalog), providing the _download URL_ for the listed distribution as input. For example, if the value of `airflow_package` is `apache_airflow-2.10.4-py2.py3-none-any.whl`, specify as URL
```
https://files.pythonhosted.org/packages/f0/3a/f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/apache_airflow-1.10.15-py2.py3-none-any.whl
https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl
```

#### Apache Airflow provider package catalog (type: `airflow-provider-package-catalog`)
Expand Down
4 changes: 2 additions & 2 deletions docs/source/user_guide/pipeline-components.md
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,11 @@ The [Apache Airflow package catalog connector](https://github.com/elyra-ai/elyra
Examples:
- [Apache Airflow](https://pypi.org/project/apache-airflow/) (v1.10.15):
```
https://files.pythonhosted.org/packages/f0/3a/f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/apache_airflow-1.10.15-py2.py3-none-any.whl
https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl
```
- Local copy of a downloaded Apache Airflow package
```
file:///absolute/path/to/apache_airflow-1.10.15-py2.py3-none-any.whl
file:///absolute/path/to/apache_airflow-2.10.4-py2.py3-none-any.whl
```

#### Apache Airflow provider package catalog
Expand Down
4 changes: 2 additions & 2 deletions elyra/pipeline/airflow/package_catalog_connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ If the Airflow package is stored on PyPI:
1. Search for the Apache Airflow package on PyPI.
1. Open the package's release history and choose the desired version.
1. Open the `Download files` link.
1. Copy the download link for the package's wheel. ([Example download URL for Apache Airflow 1.10.15](https://files.pythonhosted.org/packages/f0/3a/f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/apache_airflow-1.10.15-py2.py3-none-any.whl))
1. Copy the download link for the package's wheel. ([Example download URL for Apache Airflow 2.10.4](https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl))
1. Save the catalog entry.
1. Open the Visual Pipeline Editor and expand the palette. The loaded Apache Airflow operators are displayed.

Expand All @@ -28,4 +28,4 @@ If the Airflow package is stored on PyPI:
If the palette does not include the expected operators check the JupyterLab log file for error messages. Error messages include the component catalog name, as shown in this example:
```
Error. The Airflow package connector '<CATALOG_NAME>' encountered an issue downloading '<URL>'. HTTP response code: <HTTP_CODE>
```
```
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str

# Read the user-supplied 'airflow_package_download_url', which is a required
# input defined in the 'airflow-package-catalog-catalog.json' schema file.
# Example value: https://archive.apache.org/dist/airflow/1.10.15/apache_airflow-1.10.15-py2.py3-none-any.whl
# Example value: https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl
airflow_package_download_url = catalog_metadata["airflow_package_download_url"]
# extract the package name, e.g. 'apache_airflow-1.10.15-py2.py3-none-any.whl'
# extract the package name, e.g. 'apache_airflow-2.10.4-py3-none-any.whl'
airflow_package_name = Path(urlparse(airflow_package_download_url).path).name

if not airflow_package_name:
Expand Down Expand Up @@ -161,7 +161,7 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str

#
# Identify Python scripts that define classes that extend the
# airflow.models.BaseOperator class
# airflow.models.baseoperator.BaseOperator class
#
scripts_with_operator_class: List[str] = [] # Python scripts that contain operator definitions
extends_baseoperator: List[str] = [] # Classes that extend BaseOperator
Expand All @@ -188,8 +188,11 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str
elif isinstance(node, ast.ImportFrom):
node_module = node.module
for name in node.names:
if "airflow.models" == node_module and name.name == "BaseOperator":
if "airflow.models.baseoperator" == node_module and name.name == "BaseOperator":
imported_operator_classes.append(name.name)
# TODO: Support sensor operators
# if "airflow.sensors.base" == node_module and name.name == "BaseSensorOperator":
# imported_operator_classes.append(name.name)

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
elif isinstance(node, ast.ClassDef):
# determine whether this class extends the BaseOperator class
self.log.debug(f"Analyzing class '{node.name}' in {script_id} ...")
Expand Down
18 changes: 8 additions & 10 deletions elyra/tests/pipeline/airflow/test_airflow_package_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
)
from elyra.pipeline.catalog_connector import AirflowEntryData

AIRFLOW_1_10_15_PKG_URL = (
"https://files.pythonhosted.org/packages/f0/3a/"
"f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/"
"apache_airflow-1.10.15-py2.py3-none-any.whl"
AIRFLOW_2_10_4_PKG_URL = (
"https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl"
)

AIRFLOW_SUPPORTED_FILE_TYPES = [".py"]
Expand Down Expand Up @@ -115,19 +113,19 @@ def test_invalid_get_entry_data():
# ----------------------------------


def test_1_10_15_distribution():
def test_2_10_4_distribution():
"""
Test connector using Apache Airflow 1.10.15 built distribution.
Test connector using Apache Airflow 2.10.4 built distribution.
"""
apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
# get catalog entries for the specified distribution
ces = apc.get_catalog_entries({"airflow_package_download_url": AIRFLOW_1_10_15_PKG_URL})
# this distribution should contain 37 Python scripts with operator definitions
assert len(ces) == 37
ces = apc.get_catalog_entries({"airflow_package_download_url": AIRFLOW_2_10_4_PKG_URL})
# this distribution should contain 12 Python scripts with operator definitions
assert len(ces) == 11
# each entry must contain two keys
for entry in ces:
# built distribution package file name
assert entry.get("airflow_package") == "apache_airflow-1.10.15-py2.py3-none-any.whl"
assert entry.get("airflow_package") == "apache_airflow-2.10.4-py3-none-any.whl"
# a Python script
assert entry.get("file", "").endswith(".py")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
from elyra.pipeline.catalog_connector import AirflowEntryData

HTTP_PROVIDER_PKG_URL = (
"https://files.pythonhosted.org/packages/a1/08/"
"91653e9f394cbefe356ac07db809be7e69cc89b094379ad91d6cef3d2bc9/"
"apache_airflow_providers_http-2.0.2-py3-none-any.whl"
"https://files.pythonhosted.org/packages/bb/55/"
"9f38da212c29e1c429ddadef082f2bdaccebb0b0d9c88e72463ad25c786a/"
"apache_airflow_providers_http-5.0.0-py3-none-any.whl"
)

AIRFLOW_SUPPORTED_FILE_TYPES = [".py"]
Expand Down Expand Up @@ -134,7 +134,7 @@ def test_valid_url_http_provider_package():
# each entry must contain three keys
for entry in ces:
# provider package file name
assert entry.get("provider_package") == "apache_airflow_providers_http-2.0.2-py3-none-any.whl"
assert entry.get("provider_package") == "apache_airflow_providers_http-5.0.0-py3-none-any.whl"
# provider name
assert entry.get("provider") == "apache_airflow_providers_http"
# a Python script
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from typing import List
from typing import Optional

from airflow.models import BaseOperator
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context # New context type in Airflow 2.x
from airflow.operators.imported_operator import ImportedOperator # noqa TODO


Expand Down Expand Up @@ -63,6 +64,7 @@ class TestOperator(BaseOperator):

def __init__(
self,
*,
str_no_default,
bool_no_default,
int_no_default,
Expand All @@ -82,12 +84,11 @@ def __init__(
fallback_type=None,
long_description_property=None,
mounted_volumes=None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)

def execute(self, context: Any):
def execute(self, context: Context):
pass


Expand Down Expand Up @@ -132,7 +133,7 @@ def __init__(
):
super().__init__(**kwargs)

def execute(self, context: Any):
def execute(self, context: Context):
pass


Expand Down Expand Up @@ -173,7 +174,7 @@ def __init__(
):
super().__init__(**kwargs)

def execute(self, context: Any):
def execute(self, context: Context):
pass


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
#
from typing import Any

from airflow.models import BaseOperator
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context # New context type in Airflow 2.x


class TestOperatorNoInputs(BaseOperator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from typing import List
from typing import Optional

from airflow.models import BaseOperator
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context # New context type in Airflow 2.x
from airflow.utils.decorators import apply_defaults


Expand Down

0 comments on commit 2db5db2

Please sign in to comment.