Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Airflow parser functionality #2418

Merged
merged 34 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
97d3b6a
Resolve naming conflict in case of multiple operators
kiersten-stokes Jan 14, 2022
9051b9e
Initial changes to use AST for parsing
kiersten-stokes Jan 19, 2022
9ec6812
Refactor to support AST parse
kiersten-stokes Jan 19, 2022
f4bfa0e
Improve docstrings and type hints
kiersten-stokes Jan 21, 2022
d2962ea
Improve refactoring
kiersten-stokes Jan 25, 2022
7f1ce33
Clean up comments and TODO's
kiersten-stokes Jan 25, 2022
516cdee
Fix incorrect type hint
kiersten-stokes Jan 25, 2022
0a8c144
Add log messaging
kiersten-stokes Jan 26, 2022
2ceabf3
Fix typo
kiersten-stokes Jan 26, 2022
2ce38f5
Change the way operators are id'ed from imported packages
kiersten-stokes Jan 26, 2022
f1dc3d4
Fix long-running test issue
kiersten-stokes Jan 26, 2022
c280264
Fix type regex for docstring searches
kiersten-stokes Jan 26, 2022
b04cd1b
Fix determination of 'required' and address minor review comments
kiersten-stokes Jan 27, 2022
e632182
Add temporary log messages re: type
kiersten-stokes Jan 27, 2022
0709245
Fix indentation
kiersten-stokes Jan 27, 2022
2595cb7
Account for ast variable access in different python versions
kiersten-stokes Jan 27, 2022
5a93629
Remove astunparse from setup.py
kiersten-stokes Jan 27, 2022
07995ca
Replace continue with break in loop
kiersten-stokes Jan 27, 2022
db1b6c2
Fix type determination for complex type hints
kiersten-stokes Jan 28, 2022
4ecfb7b
Fix and streamline docstring regexes
kiersten-stokes Jan 28, 2022
6d5014e
Fix type check for _get_class_docstring
kiersten-stokes Jan 28, 2022
95ae932
Fix ast var access in different python versions (round 2)
kiersten-stokes Jan 28, 2022
f4a627b
Fix regex to replace newlines with spaces
kiersten-stokes Jan 31, 2022
4ea33f8
Set default value in _parse_from_docstring
kiersten-stokes Jan 31, 2022
673b85a
Make first pass at test fixes
kiersten-stokes Jan 31, 2022
3bcf4b3
Remove debug log messages
kiersten-stokes Jan 31, 2022
38c2b3b
Cover missing case for pasting in python 3.7 and lower
kiersten-stokes Feb 1, 2022
104228b
Clean up and add comments to test case
kiersten-stokes Feb 1, 2022
a5825e1
Remove comment
kiersten-stokes Feb 2, 2022
51e8075
Merge branch 'master' into aa-parser-improvements
kiersten-stokes Feb 2, 2022
5dc8c1c
Add display_name to metadata available to connector implementations
kiersten-stokes Feb 4, 2022
93bed91
Address review re: pytest fixture
kiersten-stokes Feb 7, 2022
61b5470
Amend module regex to include upper case and digits
kiersten-stokes Feb 7, 2022
a283b8e
Fix lint
kiersten-stokes Feb 7, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

pytest_plugins = ["jupyter_server.pytest_plugin"]

TEST_CATALOG_NAME = 'new_test_catalog'

KFP_COMPONENT_CACHE_INSTANCE = {
"display_name": "KFP Example Components",
"metadata": {
Expand All @@ -40,6 +42,7 @@
"schema_name": "elyra-airflow-examples-catalog"
}


@pytest.fixture
def component_cache_instance(request):
"""Creates an instance of a component cache and removes after test."""
Expand Down Expand Up @@ -68,3 +71,22 @@ def component_cache_instance(request):
# Test was not parametrized, so component instance is not needed
except AttributeError:
yield None


@pytest.fixture
def metadata_manager_with_teardown():
"""
This fixture provides a MetadataManager instance for certain tests that modify the component
catalog. This ensures the catalog instance is removed even when the test fails
"""
metadata_manager = MetadataManager(schemaspace=ComponentCatalogs.COMPONENT_CATALOGS_SCHEMASPACE_ID)

# Run test with provided metadata manager
yield metadata_manager

# Remove test catalog
try:
if metadata_manager.get(TEST_CATALOG_NAME):
metadata_manager.remove(TEST_CATALOG_NAME)
except Exception:
pass
467 changes: 346 additions & 121 deletions elyra/pipeline/airflow/component_parser_airflow.py

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions elyra/pipeline/catalog_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#
from abc import abstractmethod
from copy import deepcopy
import hashlib
from http import HTTPStatus
import os
Expand Down Expand Up @@ -240,8 +241,12 @@ def read_component_definitions(self, catalog_instance: Metadata) -> Dict[str, Di
# the catalog entry hash for each entry in the catalog
keys_to_hash = self.get_hash_keys()

# Add display_name attribute to the metadata dictionary
catalog_metadata = deepcopy(catalog_instance.metadata)
catalog_metadata['display_name'] = catalog_instance.display_name

# Add catalog entry data dictionaries to the thread queue
for entry_data in self.get_catalog_entries(catalog_instance.metadata):
for entry_data in self.get_catalog_entries(catalog_metadata):
catalog_entry_q.put_nowait(entry_data)

except NotImplementedError as e:
Expand All @@ -268,7 +273,7 @@ def read_with_thread():
self.log.debug(f"Attempting read of definition for catalog entry with identifying information: "
f"{str(catalog_entry_data)}...")
definition = self.read_catalog_entry(catalog_entry_data=catalog_entry_data,
catalog_metadata=catalog_instance.metadata)
catalog_metadata=catalog_metadata)

# Ignore this entry if no definition content is returned
if not definition:
Expand Down
2 changes: 1 addition & 1 deletion elyra/pipeline/component_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def check_outstanding_threads(self) -> bool:
# Report successful join for threads that have previously logged a
# cache update duration warning
if thread.last_warn_time != thread.task_start_time:
self.log.info(f"Cache update for catalog '{thread.name}' has"
self.log.info(f"Cache update for catalog '{thread.name}' has "
f"completed after {cumulative_run_time} seconds")

return outstanding_threads
Expand Down
300 changes: 149 additions & 151 deletions elyra/tests/pipeline/airflow/test_component_parser_airflow.py

Large diffs are not rendered by default.

38 changes: 9 additions & 29 deletions elyra/tests/pipeline/kfp/test_component_parser_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
from types import SimpleNamespace

from conftest import KFP_COMPONENT_CACHE_INSTANCE
from conftest import TEST_CATALOG_NAME
import jupyter_core.paths
import pytest

from elyra.metadata.manager import MetadataManager
from elyra.metadata.metadata import Metadata
from elyra.metadata.schemaspaces import ComponentCatalogs
from elyra.pipeline.catalog_connector import FilesystemComponentCatalogConnector
from elyra.pipeline.catalog_connector import UrlComponentCatalogConnector
from elyra.pipeline.component_catalog import ComponentCache
Expand All @@ -47,7 +46,7 @@ def test_component_catalog_can_load_components_from_registries(component_cache_i
assert len(components) > 0


def test_modify_component_catalogs():
def test_modify_component_catalogs(metadata_manager_with_teardown):
# Initialize a ComponentCache instance and wait for all worker threads to compete
component_catalog = ComponentCache.instance()
component_catalog.wait_for_all_cache_updates()
Expand All @@ -58,8 +57,6 @@ def test_modify_component_catalogs():
# Components must be sorted by id for the equality comparison with later component lists
initial_components = sorted(initial_components, key=lambda component: component.id)

metadata_manager = MetadataManager(schemaspace=ComponentCatalogs.COMPONENT_CATALOGS_SCHEMASPACE_ID)

# Create new registry instance with a single URL-based component
paths = [_get_resource_path('kfp_test_operator.yaml')]

Expand All @@ -70,17 +67,11 @@ def test_modify_component_catalogs():
"paths": paths
}
registry_instance = Metadata(schema_name="local-file-catalog",
name="new_test_registry",
name=TEST_CATALOG_NAME,
display_name="New Test Registry",
metadata=instance_metadata)

try:
if metadata_manager.get("new_test_registry"):
metadata_manager.remove("new_test_registry")
except Exception:
pass

metadata_manager.create("new_test_registry", registry_instance)
metadata_manager_with_teardown.create(TEST_CATALOG_NAME, registry_instance)

# Wait for update to complete
component_catalog.wait_for_all_cache_updates()
Expand All @@ -96,7 +87,7 @@ def test_modify_component_catalogs():

# Modify the test registry to add an additional path to
paths.append(_get_resource_path('kfp_test_operator_no_inputs.yaml'))
metadata_manager.update("new_test_registry", registry_instance)
metadata_manager_with_teardown.update(TEST_CATALOG_NAME, registry_instance)

# Wait for update to complete
component_catalog.wait_for_all_cache_updates()
Expand All @@ -111,7 +102,7 @@ def test_modify_component_catalogs():
assert 'Test Operator No Inputs' in modified_component_names

# Delete the test registry
metadata_manager.remove("new_test_registry")
metadata_manager_with_teardown.remove(TEST_CATALOG_NAME)

# Wait for update to complete
component_catalog.wait_for_all_cache_updates()
Expand All @@ -131,16 +122,14 @@ def test_modify_component_catalogs():
assert initial_palette == post_delete_palette


def test_directory_based_component_catalog():
def test_directory_based_component_catalog(metadata_manager_with_teardown):
# Initialize a ComponentCache instance and wait for all worker threads to compete
component_catalog = ComponentCache.instance()
component_catalog.wait_for_all_cache_updates()

# Get initial set of components from the current active registries
initial_components = component_catalog.get_all_components(RUNTIME_PROCESSOR)

metadata_manager = MetadataManager(schemaspace=ComponentCatalogs.COMPONENT_CATALOGS_SCHEMASPACE_ID)

# Create new directory-based registry instance with components in ../../test/resources/components
registry_path = _get_resource_path('')
instance_metadata = {
Expand All @@ -150,17 +139,11 @@ def test_directory_based_component_catalog():
"paths": [registry_path]
}
registry_instance = Metadata(schema_name="local-directory-catalog",
name="new_test_registry",
name=TEST_CATALOG_NAME,
display_name="New Test Registry",
metadata=instance_metadata)

try:
if metadata_manager.get("new_test_registry"):
metadata_manager.remove("new_test_registry")
except Exception:
pass

metadata_manager.create("new_test_registry", registry_instance)
metadata_manager_with_teardown.create(TEST_CATALOG_NAME, registry_instance)

# Wait for update to complete
component_catalog.wait_for_all_cache_updates()
Expand All @@ -175,9 +158,6 @@ def test_directory_based_component_catalog():
assert 'Test Operator' in added_component_names
assert 'Test Operator No Inputs' in added_component_names

# Remove the test instance
metadata_manager.remove("new_test_registry")


def test_parse_kfp_component_file():
# Define the appropriate reader for a filesystem-type component definition
Expand Down
Loading