Skip to content

Commit

Permalink
Add support for 'file' URI scheme to URL-based connectors (#2873)
Browse files Browse the repository at this point in the history
Add support for the 'file' URI scheme to the URL component catalog connector, the Apache Airflow package connector, and Apache Airflow provider package connector. This enables those connectors to load resources from the local file system, which can lead to significant performance improvements in the Visual Pipeline Editor.
  • Loading branch information
ptitzler authored Aug 11, 2022
1 parent 6ee5aea commit a83c3aa
Show file tree
Hide file tree
Showing 9 changed files with 403 additions and 57 deletions.
40 changes: 33 additions & 7 deletions docs/source/user_guide/pipeline-components.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Elyra includes connectors for the following component catalog types:

Example: A directory component catalog that is configured using the `/users/jdoe/kubeflow_components/test` path makes all component files in that directory available to Elyra.

- [_URL component catalogs_](#url-component-catalog) provide access to components that are stored on the web and can be retrieved using anonymous HTTP `GET` requests.
- [_URL component catalogs_](#url-component-catalog) provide access to components that are stored on the web and can be retrieved using HTTP `GET` requests.

Example: A URL component catalog that is configured using the `http://myserver:myport/mypath/my_component.yaml` URL makes the `my_component.yaml` component file available to Elyra.

Expand Down Expand Up @@ -395,36 +395,62 @@ Examples (CLI):

The URL component catalog connector provides access to components that are stored on the web:
- You can specify one or more URL resources.
- The specified URLs must be retrievable using an HTTP `GET` request.
- The specified URLs must be retrievable using an HTTP `GET` request. `http`, `https`, and `file` [URI schemes](https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml) are supported.
- If the resources are secured, provide credentials, such as a user id and password or API key.

Examples (GUI):
- `https://raw.githubusercontent.com/elyra-ai/examples/main/component-catalog-connectors/kfp-example-components-connector/kfp_examples_connector/resources/filter_text_using_shell_and_grep.yaml`
- HTTPS URL
```
https://raw.githubusercontent.com/elyra-ai/examples/main/component-catalog-connectors/kfp-example-components-connector/kfp_examples_connector/resources/filter_text_using_shell_and_grep.yaml
```
- Local file URL
```
file:///absolute/path/to/component.yaml
```

Examples (CLI):
- `['https://raw.githubusercontent.com/elyra-ai/examples/main/component-catalog-connectors/kfp-example-components-connector/kfp_examples_connector/resources/filter_text_using_shell_and_grep.yaml']`
- `['<URL_1>','<URL_2>']`
- HTTPS URL
```
['https://raw.githubusercontent.com/elyra-ai/examples/main/component-catalog-connectors/kfp-example-components-connector/kfp_examples_connector/resources/filter_text_using_shell_and_grep.yaml']
```
- Local file URL
```
['file:///absolute/path/to/component.yaml']
```
- Multiple URLs
```
['<URL_1>','<URL_2>']
```


#### Apache Airflow package catalog

The [Apache Airflow package catalog connector](https://github.com/elyra-ai/elyra/tree/main/elyra/pipeline/airflow/package_catalog_connector) provides access to operators that are stored in Apache Airflow [built distributions](https://packaging.python.org/en/latest/glossary/#term-built-distribution):
- Only the [wheel distribution format](https://packaging.python.org/en/latest/glossary/#term-Wheel) is supported.
- The specified URL must be retrievable using an HTTP `GET` request.
- The specified URL must be retrievable using an HTTP `GET` request. `http`, `https`, and `file` [URI schemes](https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml) are supported.

Examples:
- [Apache Airflow](https://pypi.org/project/apache-airflow/) (v1.10.15):
```
https://files.pythonhosted.org/packages/f0/3a/f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/apache_airflow-1.10.15-py2.py3-none-any.whl
```
- Local copy of a downloaded Apache Airflow package
```
file:///absolute/path/to/apache_airflow-1.10.15-py2.py3-none-any.whl
```

#### Apache Airflow provider package catalog
The [Apache Airflow provider package catalog connector](https://github.com/elyra-ai/elyra/tree/main/elyra/pipeline/airflow/provider_package_catalog_connector) provides access to operators that are stored in [Apache Airflow provider packages](https://airflow.apache.org/docs/apache-airflow-providers/):
- Only the [wheel distribution format](https://packaging.python.org/en/latest/glossary/#term-Wheel) is supported.
- The specified URL must be retrievable using an HTTP `GET` request.
- The specified URL must be retrievable using an HTTP `GET` request. `http`, `https`, and `file` [URI schemes](https://www.iana.org/assignments/uri-schemes/uri-schemes.xhtml) are supported.

Examples:
- [apache-airflow-providers-http](https://airflow.apache.org/docs/apache-airflow-providers-http/stable/index.html) (v2.0.2):
```
https://files.pythonhosted.org/packages/a1/08/91653e9f394cbefe356ac07db809be7e69cc89b094379ad91d6cef3d2bc9/apache_airflow_providers_http-2.0.2-py3-none-any.whl
```

- Local copy of a downloaded provider package
```
file:///absolute/path/to/apache_airflow_providers_http-2.0.2-py3-none-any.whl
```
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@
from urllib.parse import urlparse
import zipfile

from requests import get
from requests import session
from requests.auth import HTTPBasicAuth

from elyra.pipeline.catalog_connector import AirflowEntryData
from elyra.pipeline.catalog_connector import ComponentCatalogConnector
from elyra.pipeline.catalog_connector import EntryData
from elyra.util.url import FileTransportAdapter


class AirflowPackageCatalogConnector(ComponentCatalogConnector):
Expand Down Expand Up @@ -74,20 +75,22 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str
)
return operator_key_list

# determine whether authentication needs to be performed
auth_id = catalog_metadata.get("auth_id")
auth_password = catalog_metadata.get("auth_password")
if auth_id and auth_password:
auth = HTTPBasicAuth(auth_id, auth_password)
elif auth_id or auth_password:
self.log.error(
f"Error. Airflow connector '{catalog_metadata.get('display_name')}' "
"is not configured properly. "
"Authentication requires a user id and password or API key."
)
return operator_key_list
else:
auth = None
pr = urlparse(airflow_package_download_url)
auth = None

if pr.scheme != "file":
# determine whether authentication needs to be performed
auth_id = catalog_metadata.get("auth_id")
auth_password = catalog_metadata.get("auth_password")
if auth_id and auth_password:
auth = HTTPBasicAuth(auth_id, auth_password)
elif auth_id or auth_password:
self.log.error(
f"Error. Airflow connector '{catalog_metadata.get('display_name')}' "
"is not configured properly. "
"Authentication requires a user id and password or API key."
)
return operator_key_list

# tmp_archive_dir is used to store the downloaded archive and as working directory
if hasattr(self, "tmp_archive_dir"):
Expand All @@ -100,7 +103,10 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str

# download archive; abort after 30 seconds
try:
response = get(
requests_session = session()
if pr.scheme == "file":
requests_session.mount("file://", FileTransportAdapter())
response = requests_session.get(
airflow_package_download_url,
timeout=AirflowPackageCatalogConnector.REQUEST_TIMEOUT,
allow_redirects=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@
from urllib.parse import urlparse
import zipfile

from requests import get
from requests import session
from requests.auth import HTTPBasicAuth

from elyra.pipeline.catalog_connector import AirflowEntryData
from elyra.pipeline.catalog_connector import ComponentCatalogConnector
from elyra.pipeline.catalog_connector import EntryData
from elyra.util.url import FileTransportAdapter


class AirflowProviderPackageCatalogConnector(ComponentCatalogConnector):
Expand Down Expand Up @@ -79,20 +80,22 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str
)
return operator_key_list

# determine whether authentication needs to be performed
auth_id = catalog_metadata.get("auth_id")
auth_password = catalog_metadata.get("auth_password")
if auth_id and auth_password:
auth = HTTPBasicAuth(auth_id, auth_password)
elif auth_id or auth_password:
self.log.error(
f"Error. Airflow provider package connector '{catalog_metadata.get('display_name')}' "
"is not configured properly. "
"Authentication requires a user id and password or API key."
)
return operator_key_list
else:
auth = None
pr = urlparse(airflow_provider_package_download_url)
auth = None

if pr.scheme != "file":
# determine whether authentication needs to be performed
auth_id = catalog_metadata.get("auth_id")
auth_password = catalog_metadata.get("auth_password")
if auth_id and auth_password:
auth = HTTPBasicAuth(auth_id, auth_password)
elif auth_id or auth_password:
self.log.error(
f"Error. Airflow provider package connector '{catalog_metadata.get('display_name')}' "
"is not configured properly. "
"Authentication requires a user id and password or API key."
)
return operator_key_list

# tmp_archive_dir is used to store the downloaded archive and as working directory
if hasattr(self, "tmp_archive_dir"):
Expand All @@ -105,7 +108,10 @@ def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str

# download archive
try:
response = get(
requests_session = session()
if pr.scheme == "file":
requests_session.mount("file://", FileTransportAdapter())
response = requests_session.get(
airflow_provider_package_download_url,
timeout=AirflowProviderPackageCatalogConnector.REQUEST_TIMEOUT,
allow_redirects=True,
Expand Down
40 changes: 23 additions & 17 deletions elyra/pipeline/catalog_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@
from typing import Dict
from typing import List
from typing import Optional
from urllib.parse import urlparse

from deprecation import deprecated
from jupyter_core.paths import ENV_JUPYTER_PATH
from requests import get
from requests import session
from requests.auth import HTTPBasicAuth
from traitlets.config import LoggingConfigurable
from traitlets.traitlets import default
Expand All @@ -40,6 +41,7 @@
from elyra.pipeline.component import Component
from elyra.pipeline.component import ComponentParameter
from elyra.pipeline.runtime_type import RuntimeProcessorType
from elyra.util.url import FileTransportAdapter


class EntryData(object):
Expand Down Expand Up @@ -636,24 +638,28 @@ def get_entry_data(
individual catalog entries
"""
url = catalog_entry_data.get("url")

# determine whether authentication needs to be performed
auth_id = catalog_metadata.get("auth_id")
auth_password = catalog_metadata.get("auth_password")
if auth_id and auth_password:
auth = HTTPBasicAuth(auth_id, auth_password)
elif auth_id or auth_password:
self.log.error(
f"Error. URL catalog connector '{catalog_metadata.get('display_name')}' "
"is not configured properly. "
"Authentication requires a user id and password or API key."
)
return None
else:
auth = None
pr = urlparse(url)
auth = None

if pr.scheme != "file":
# determine whether authentication needs to be performed
auth_id = catalog_metadata.get("auth_id")
auth_password = catalog_metadata.get("auth_password")
if auth_id and auth_password:
auth = HTTPBasicAuth(auth_id, auth_password)
elif auth_id or auth_password:
self.log.error(
f"Error. URL catalog connector '{catalog_metadata.get('display_name')}' "
"is not configured properly. "
"Authentication requires a user id and password or API key."
)
return None

try:
res = get(
requests_session = session()
if pr.scheme == "file":
requests_session.mount("file://", FileTransportAdapter())
res = requests_session.get(
url,
timeout=UrlComponentCatalogConnector.REQUEST_TIMEOUT,
allow_redirects=True,
Expand Down
26 changes: 26 additions & 0 deletions elyra/tests/pipeline/airflow/test_airflow_package_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

import io
from pathlib import Path
import zipfile

from elyra.pipeline.airflow.package_catalog_connector.airflow_package_catalog_connector import (
Expand Down Expand Up @@ -84,6 +85,31 @@ def test_invalid_download_input(requests_mock):
assert len(ce) == 0


def test_invalid_get_entry_data():
"""
Validate that AirflowPackageCatalogConnector.get_entry_data(...) returns
the expected results for invalid inputs
"""
apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)

# Test invalid "file://" inputs ...
# ... input refers to a directory
resource_location = Path(__file__).parent / ".." / "resources" / "components"
resource_url = resource_location.as_uri()
ce = apc.get_catalog_entries({"airflow_package_download_url": resource_url, "display_name": "file://is-a-dir-test"})
assert isinstance(ce, list), resource_url
assert len(ce) == 0

# ... input refers to a non-existing whl file
resource_location = Path(__file__).parent / ".." / "resources" / "components" / "no-such.whl"
resource_url = resource_location.as_uri()
ce = apc.get_catalog_entries(
{"airflow_package_download_url": resource_url, "display_name": "file://no-such-file-test"}
)
assert isinstance(ce, list), resource_url
assert len(ce) == 0


# -----------------------------------
# Long running test(s)
# ----------------------------------
Expand Down
Loading

0 comments on commit a83c3aa

Please sign in to comment.