Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Airflow 2.x operators (runtime specific components) #3263

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ The [URL component catalog](pipeline-components.html#pipeline-components.html#ur
The [Apache Airflow package catalog](pipeline-components.html#apache-airflow-package-catalog) provides access to Apache Airflow operators that are stored in built distributions.

1. Take note of the displayed `airflow_package`, which identifies the Apache Airflow built distribution that includes the missing operator.
1. [Add a new Apache Airflow package catalog](pipeline-components.html#adding-a-component-catalog), providing the _download URL_ for the listed distribution as input. For example, if the value of `airflow_package` is `apache_airflow-1.10.15-py2.py3-none-any.whl`, specify as URL
1. [Add a new Apache Airflow package catalog](pipeline-components.html#adding-a-component-catalog), providing the _download URL_ for the listed distribution as input. For example, if the value of `airflow_package` is `apache_airflow-2.10.4-py2.py3-none-any.whl`, specify as URL
```
https://files.pythonhosted.org/packages/f0/3a/f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/apache_airflow-1.10.15-py2.py3-none-any.whl
https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl
```

#### Apache Airflow provider package catalog (type: `airflow-provider-package-catalog`)
Expand Down
4 changes: 2 additions & 2 deletions docs/source/user_guide/pipeline-components.md
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,11 @@ The [Apache Airflow package catalog connector](https://github.com/elyra-ai/elyra
Examples:
- [Apache Airflow](https://pypi.org/project/apache-airflow/) (v1.10.15):
```
https://files.pythonhosted.org/packages/f0/3a/f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/apache_airflow-1.10.15-py2.py3-none-any.whl
https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl
```
- Local copy of a downloaded Apache Airflow package
```
file:///absolute/path/to/apache_airflow-1.10.15-py2.py3-none-any.whl
file:///absolute/path/to/apache_airflow-2.10.4-py2.py3-none-any.whl
```

#### Apache Airflow provider package catalog
Expand Down
4 changes: 2 additions & 2 deletions elyra/pipeline/airflow/package_catalog_connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ If the Airflow package is stored on PyPI:
1. Search for the Apache Airflow package on PyPI.
1. Open the package's release history and choose the desired version.
1. Open the `Download files` link.
1. Copy the download link for the package's wheel. ([Example download URL for Apache Airflow 1.10.15](https://files.pythonhosted.org/packages/f0/3a/f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/apache_airflow-1.10.15-py2.py3-none-any.whl))
1. Copy the download link for the package's wheel. ([Example download URL for Apache Airflow 2.10.4](https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl))
1. Save the catalog entry.
1. Open the Visual Pipeline Editor and expand the palette. The loaded Apache Airflow operators are displayed.

Expand All @@ -28,4 +28,4 @@ If the Airflow package is stored on PyPI:
If the palette does not include the expected operators check the JupyterLab log file for error messages. Error messages include the component catalog name, as shown in this example:
```
Error. The Airflow package connector '<CATALOG_NAME>' encountered an issue downloading '<URL>'. HTTP response code: <HTTP_CODE>
```
```
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@

# Read the user-supplied 'airflow_package_download_url', which is a required
# input defined in the 'airflow-package-catalog-catalog.json' schema file.
# Example value: https://archive.apache.org/dist/airflow/1.10.15/apache_airflow-1.10.15-py2.py3-none-any.whl
# Example value: https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl
airflow_package_download_url = catalog_metadata["airflow_package_download_url"]
# extract the package name, e.g. 'apache_airflow-1.10.15-py2.py3-none-any.whl'
# extract the package name, e.g. 'apache_airflow-2.10.4-py3-none-any.whl'
airflow_package_name = Path(urlparse(airflow_package_download_url).path).name

if not airflow_package_name:
Expand Down Expand Up @@ -161,7 +161,7 @@

#
# Identify Python scripts that define classes that extend the
# airflow.models.BaseOperator class
# airflow.models.baseoperator.BaseOperator class
#
scripts_with_operator_class: List[str] = [] # Python scripts that contain operator definitions
extends_baseoperator: List[str] = [] # Classes that extend BaseOperator
Expand All @@ -188,8 +188,11 @@
elif isinstance(node, ast.ImportFrom):
node_module = node.module
for name in node.names:
if "airflow.models" == node_module and name.name == "BaseOperator":
if "airflow.models.baseoperator" == node_module and name.name == "BaseOperator":
imported_operator_classes.append(name.name)
# TODO: Support sensor operators
# if "airflow.sensors.base" == node_module and name.name == "BaseSensorOperator":
# imported_operator_classes.append(name.name)
Dismissed Show dismissed Hide dismissed
elif isinstance(node, ast.ClassDef):
# determine whether this class extends the BaseOperator class
self.log.debug(f"Analyzing class '{node.name}' in {script_id} ...")
Expand Down
18 changes: 7 additions & 11 deletions elyra/tests/pipeline/airflow/test_airflow_package_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@
)
from elyra.pipeline.catalog_connector import AirflowEntryData

AIRFLOW_1_10_15_PKG_URL = (
"https://files.pythonhosted.org/packages/f0/3a/"
"f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/"
"apache_airflow-1.10.15-py2.py3-none-any.whl"
)
AIRFLOW_2_10_4_PKG_URL = "https://archive.apache.org/dist/airflow/2.10.4/apache_airflow-2.10.4-py3-none-any.whl"

AIRFLOW_SUPPORTED_FILE_TYPES = [".py"]

Expand Down Expand Up @@ -115,19 +111,19 @@ def test_invalid_get_entry_data():
# ----------------------------------


def test_1_10_15_distribution():
def test_2_10_4_distribution():
"""
Test connector using Apache Airflow 1.10.15 built distribution.
Test connector using Apache Airflow 2.10.4 built distribution.
"""
apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
# get catalog entries for the specified distribution
ces = apc.get_catalog_entries({"airflow_package_download_url": AIRFLOW_1_10_15_PKG_URL})
# this distribution should contain 37 Python scripts with operator definitions
assert len(ces) == 37
ces = apc.get_catalog_entries({"airflow_package_download_url": AIRFLOW_2_10_4_PKG_URL})
# this distribution should contain 12 Python scripts with operator definitions
assert len(ces) == 11
# each entry must contain two keys
for entry in ces:
# built distribution package file name
assert entry.get("airflow_package") == "apache_airflow-1.10.15-py2.py3-none-any.whl"
assert entry.get("airflow_package") == "apache_airflow-2.10.4-py3-none-any.whl"
# a Python script
assert entry.get("file", "").endswith(".py")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
from elyra.pipeline.catalog_connector import AirflowEntryData

HTTP_PROVIDER_PKG_URL = (
"https://files.pythonhosted.org/packages/a1/08/"
"91653e9f394cbefe356ac07db809be7e69cc89b094379ad91d6cef3d2bc9/"
"apache_airflow_providers_http-2.0.2-py3-none-any.whl"
"https://files.pythonhosted.org/packages/bb/55/"
"9f38da212c29e1c429ddadef082f2bdaccebb0b0d9c88e72463ad25c786a/"
"apache_airflow_providers_http-5.0.0-py3-none-any.whl"
)

AIRFLOW_SUPPORTED_FILE_TYPES = [".py"]
Expand Down Expand Up @@ -134,7 +134,7 @@ def test_valid_url_http_provider_package():
# each entry must contain three keys
for entry in ces:
# provider package file name
assert entry.get("provider_package") == "apache_airflow_providers_http-2.0.2-py3-none-any.whl"
assert entry.get("provider_package") == "apache_airflow_providers_http-5.0.0-py3-none-any.whl"
# provider name
assert entry.get("provider") == "apache_airflow_providers_http"
# a Python script
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from airflow.models import BaseOperator
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context # New context type in Airflow 2.x
from airflow.operators.imported_operator import ImportedOperator # noqa TODO


Expand Down Expand Up @@ -87,7 +87,7 @@ def __init__(
):
super().__init__(*args, **kwargs)

def execute(self, context: Any):
def execute(self, context: Context):
pass


Expand Down Expand Up @@ -132,7 +132,7 @@ def __init__(
):
super().__init__(**kwargs)

def execute(self, context: Any):
def execute(self, context: Context):
pass


Expand Down Expand Up @@ -173,7 +173,7 @@ def __init__(
):
super().__init__(**kwargs)

def execute(self, context: Any):
def execute(self, context: Context):
pass


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any

from airflow.models import BaseOperator
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context # New context type in Airflow 2.x


class TestOperatorNoInputs(BaseOperator):
Expand All @@ -27,5 +27,5 @@ class TestOperatorNoInputs(BaseOperator):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def execute(self, context: Any):
def execute(self, context: Context):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from typing import List
from typing import Optional

from airflow.models import BaseOperator
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults


Expand Down
Loading
Loading