Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Airflow parser functionality #2418

Merged
merged 34 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
97d3b6a
Resolve naming conflict in case of multiple operators
kiersten-stokes Jan 14, 2022
9051b9e
Initial changes to use AST for parsing
kiersten-stokes Jan 19, 2022
9ec6812
Refactor to support AST parse
kiersten-stokes Jan 19, 2022
f4bfa0e
Improve docstrings and type hints
kiersten-stokes Jan 21, 2022
d2962ea
Improve refactoring
kiersten-stokes Jan 25, 2022
7f1ce33
Clean up comments and TODO's
kiersten-stokes Jan 25, 2022
516cdee
Fix incorrect type hint
kiersten-stokes Jan 25, 2022
0a8c144
Add log messaging
kiersten-stokes Jan 26, 2022
2ceabf3
Fix typo
kiersten-stokes Jan 26, 2022
2ce38f5
Change the way operators are id'ed from imported packages
kiersten-stokes Jan 26, 2022
f1dc3d4
Fix long-running test issue
kiersten-stokes Jan 26, 2022
c280264
Fix type regex for docstring searches
kiersten-stokes Jan 26, 2022
b04cd1b
Fix determination of 'required' and address minor review comments
kiersten-stokes Jan 27, 2022
e632182
Add temporary log messages re: type
kiersten-stokes Jan 27, 2022
0709245
Fix indentation
kiersten-stokes Jan 27, 2022
2595cb7
Account for ast variable access in different python versions
kiersten-stokes Jan 27, 2022
5a93629
Remove astunparse from setup.py
kiersten-stokes Jan 27, 2022
07995ca
Replace continue with break in loop
kiersten-stokes Jan 27, 2022
db1b6c2
Fix type determination for complex type hints
kiersten-stokes Jan 28, 2022
4ecfb7b
Fix and streamline docstring regexes
kiersten-stokes Jan 28, 2022
6d5014e
Fix type check for _get_class_docstring
kiersten-stokes Jan 28, 2022
95ae932
Fix ast var access in different python versions (round 2)
kiersten-stokes Jan 28, 2022
f4a627b
Fix regex to replace newlines with spaces
kiersten-stokes Jan 31, 2022
4ea33f8
Set default value in _parse_from_docstring
kiersten-stokes Jan 31, 2022
673b85a
Make first pass at test fixes
kiersten-stokes Jan 31, 2022
3bcf4b3
Remove debug log messages
kiersten-stokes Jan 31, 2022
38c2b3b
Cover missing case for pasting in python 3.7 and lower
kiersten-stokes Feb 1, 2022
104228b
Clean up and add comments to test case
kiersten-stokes Feb 1, 2022
a5825e1
Remove comment
kiersten-stokes Feb 2, 2022
51e8075
Merge branch 'master' into aa-parser-improvements
kiersten-stokes Feb 2, 2022
5dc8c1c
Add display_name to metadata available to connector implementations
kiersten-stokes Feb 4, 2022
93bed91
Address review re: pytest fixture
kiersten-stokes Feb 7, 2022
61b5470
Amend module regex to include upper case and digits
kiersten-stokes Feb 7, 2022
a283b8e
Fix lint
kiersten-stokes Feb 7, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

pytest_plugins = ["jupyter_server.pytest_plugin"]

TEST_CATALOG_NAME = 'new_test_catalog'

KFP_COMPONENT_CACHE_INSTANCE = {
"display_name": "KFP Example Components",
"metadata": {
Expand All @@ -40,6 +42,7 @@
"schema_name": "elyra-airflow-examples-catalog"
}


@pytest.fixture
def component_cache_instance(request):
"""Creates an instance of a component cache and removes after test."""
Expand Down Expand Up @@ -68,3 +71,22 @@ def component_cache_instance(request):
# Test was not parametrized, so component instance is not needed
except AttributeError:
yield None


@pytest.fixture
def teardown_test_catalog():
"""
This fixture is run after certain tests that modify the component catalog.
This way, the catalog instance will be removed even when the test fails
"""
metadata_manager = MetadataManager(schemaspace=ComponentCatalogs.COMPONENT_CATALOGS_SCHEMASPACE_ID)

# Run test
yield

# Remove test catalog
try:
if metadata_manager.get(TEST_CATALOG_NAME):
metadata_manager.remove(TEST_CATALOG_NAME)
except Exception:
pass
468 changes: 347 additions & 121 deletions elyra/pipeline/airflow/component_parser_airflow.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion elyra/pipeline/component_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def check_outstanding_threads(self) -> bool:
# Report successful join for threads that have previously logged a
# cache update duration warning
if thread.last_warn_time != thread.task_start_time:
self.log.info(f"Cache update for catalog '{thread.name}' has"
self.log.info(f"Cache update for catalog '{thread.name}' has "
f"completed after {cumulative_run_time} seconds")

return outstanding_threads
Expand Down
296 changes: 151 additions & 145 deletions elyra/tests/pipeline/airflow/test_component_parser_airflow.py

Large diffs are not rendered by default.

30 changes: 9 additions & 21 deletions elyra/tests/pipeline/kfp/test_component_parser_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from types import SimpleNamespace

from conftest import KFP_COMPONENT_CACHE_INSTANCE
from conftest import TEST_CATALOG_NAME
import jupyter_core.paths
import pytest

Expand Down Expand Up @@ -47,6 +48,7 @@ def test_component_catalog_can_load_components_from_registries(component_cache_i
assert len(components) > 0


@pytest.mark.usefixtures('teardown_test_catalog')
def test_modify_component_catalogs():
# Initialize a ComponentCache instance and wait for all worker threads to compete
component_catalog = ComponentCache.instance()
Expand All @@ -70,17 +72,11 @@ def test_modify_component_catalogs():
"paths": paths
}
registry_instance = Metadata(schema_name="local-file-catalog",
name="new_test_registry",
name=TEST_CATALOG_NAME,
display_name="New Test Registry",
metadata=instance_metadata)

try:
if metadata_manager.get("new_test_registry"):
metadata_manager.remove("new_test_registry")
except Exception:
pass

metadata_manager.create("new_test_registry", registry_instance)
metadata_manager.create(TEST_CATALOG_NAME, registry_instance)

# Wait for update to complete
component_catalog.wait_for_all_cache_updates()
Expand All @@ -96,7 +92,7 @@ def test_modify_component_catalogs():

# Modify the test registry to add an additional path to
paths.append(_get_resource_path('kfp_test_operator_no_inputs.yaml'))
metadata_manager.update("new_test_registry", registry_instance)
metadata_manager.update(TEST_CATALOG_NAME, registry_instance)

# Wait for update to complete
component_catalog.wait_for_all_cache_updates()
Expand All @@ -111,7 +107,7 @@ def test_modify_component_catalogs():
assert 'Test Operator No Inputs' in modified_component_names

# Delete the test registry
metadata_manager.remove("new_test_registry")
metadata_manager.remove(TEST_CATALOG_NAME)

# Wait for update to complete
component_catalog.wait_for_all_cache_updates()
Expand All @@ -131,6 +127,7 @@ def test_modify_component_catalogs():
assert initial_palette == post_delete_palette


@pytest.mark.usefixtures('teardown_test_catalog')
def test_directory_based_component_catalog():
# Initialize a ComponentCache instance and wait for all worker threads to compete
component_catalog = ComponentCache.instance()
Expand All @@ -150,17 +147,11 @@ def test_directory_based_component_catalog():
"paths": [registry_path]
}
registry_instance = Metadata(schema_name="local-directory-catalog",
name="new_test_registry",
name=TEST_CATALOG_NAME,
display_name="New Test Registry",
metadata=instance_metadata)

try:
if metadata_manager.get("new_test_registry"):
metadata_manager.remove("new_test_registry")
except Exception:
pass

metadata_manager.create("new_test_registry", registry_instance)
metadata_manager.create(TEST_CATALOG_NAME, registry_instance)

# Wait for update to complete
component_catalog.wait_for_all_cache_updates()
Expand All @@ -175,9 +166,6 @@ def test_directory_based_component_catalog():
assert 'Test Operator' in added_component_names
assert 'Test Operator No Inputs' in added_component_names

# Remove the test instance
metadata_manager.remove("new_test_registry")


def test_parse_kfp_component_file():
# Define the appropriate reader for a filesystem-type component definition
Expand Down
226 changes: 175 additions & 51 deletions elyra/tests/pipeline/resources/components/airflow_test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,62 +13,186 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.operators.imported_operator import ImportedOperator # noqa TODO


class TestOperator(BaseOperator):
r"""
Execute a test script.

:param test_string_no_default: The test command description
:type test_string_no_default: str
:param test_bool_default: The test command bool description
:type test_bool_default: bool
:param test_int_default: The test command int description
:type test_int_default: int
:param test_dict_default: The test command dict description
:type test_dict_default: dict
:param test_list_default: The test command list description
:type test_list_default: list
:param test_string_default_value: The test command description
:type test_string_default_value: str
:param test_string_default_empty: The test command description
:type test_string_default_empty: str
:param test_bool_false: The test command bool description
:type test_bool_false: bool
:param test_bool_true: The test command bool description
:type test_bool_true: bool
:param test_int_zero: The test command int description
:type test_int_zero: int
:param test_int_non_zero: The test command int description
:type test_int_non_zero: int
:param test_unusual_type_dict: The test command description
:type test_unusual_type_dict: a dictionary of arrays
:param test_unusual_type_list: The test command description
:type test_unusual_type_list: a list of strings
:param test_unusual_type_string: The test command description
:type test_unusual_type_string: a string
:param test_unusual_type_notgiven: The test command description
"""
Operator derives from BaseOperator and mimics Airflow v1 Operator structure.
Note that some parameters have been intentionally omitted from the docstring
in order to test that fallback types are assigned appropriately.

:param str_no_default: a string parameter with no default given
:type str_no_default: str
:param bool_no_default: a boolean parameter with no default given
:type bool_no_default: bool
:param int_no_default: an integer parameter with no default given
:type int_no_default: int
:param str_default: a string parameter with a default value given
:type str_default: str
:param bool_default_true: a boolean parameter with a default value of True
:type bool_default_true: bool
:param bool_default_false: a boolean parameter with a default value of False
:type bool_default_false: bool
:param int_default_non_zero: an integer parameter with a non-zero default value
:type int_default_non_zero: int
:param int_default_zero: an integer parameter with a default value of 0
:type int_default_zero: int
:param str_empty: a string parameter with a default value of None
:type str_empty: str
:param list_default_is_none: an list parameter with a default of None
:type list_default_is_none: list
:param dict_default_is_none: a dictionary parameter with a default of None
:type dict_default_is_none: dict
:param unusual_type_dict: a dictionary parameter with the phrase 'list' in type description
:type unusual_type_dict: a dictionary of arrays
:param unusual_type_list: a list parameter with the phrase 'string' in type description
:type unusual_type_list: a list of strings
"""

def __init__(
self,
str_no_default,
bool_no_default,
int_no_default,
str_default="default",
bool_default_true=True,
bool_default_false=False,
int_default_non_zero=2,
int_default_zero=0,
str_empty=None,
list_default_is_none=None,
dict_default_is_none=None,
str_not_in_docstring="",
bool_not_in_docstring=False,
int_not_in_docstring=3,
unusual_type_dict=None,
unusual_type_list=None,
fallback_type=None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)

def execute(self, context: Any):
pass

@apply_defaults
def __init__(self,
test_string_no_default,
test_bool_default,
test_int_default,
test_dict_default,
test_list_default,
test_string_default_value='default',
test_string_default_empty=None,
test_bool_false=False,
test_bool_true=True,
test_int_zero=0,
test_int_non_zero=1,
test_unusual_type_dict=None,
test_unusual_type_list=None,
test_unusual_type_string="",
test_unusual_type_notgiven="",
*args, **kwargs):

class DeriveFromTestOperator(TestOperator):
"""
Operator derives indirectly from BaseOperator and mimics Airflow v2 Operator
structure, including type hints given for all parameters

:param str_no_default: a string parameter with no default given
:type str_no_default: str
:param bool_no_default: a boolean parameter with no default given
:type bool_no_default: bool
:param int_no_default: an integer parameter with no default given
:type int_no_default: int
:param str_default: a string parameter with a default value given
:type str_default: str
:param bool_default: a boolean parameter with a default value given
:type bool_default: bool
:param int_default: an integer parameter with a default value given
:type int_default: int
:param str_optional_default: an Optional string parameter with a default value given
:type str_optional_default: Optional[str]
:param list_optional_default: an Optional list parameter with a default of None
:type list_optional_default: Optional[list]
"""

def __init__(
self,
*,
str_no_default: str,
bool_no_default: bool,
int_no_default: int,
str_not_in_docstring: str,
bool_not_in_docstring: bool,
int_not_in_docstring: int,
str_default: str = "default",
bool_default: bool = True,
int_default: int = 2,
str_optional_default: Optional[str] = "optional default",
list_optional_default: Optional[List] = None,
**kwargs
):
super().__init__(**kwargs)

def execute(self, context: Any):
pass


class DeriveFromImportedOperator(ImportedOperator):
"""
Operator derives from an airflow package Operator (and therefore indirectly
extends the BaseOperator) and whose parameters are list and dictionary types

:param dict_no_default: a dictionary parameter with no default given
:type dict_no_default: dict
:param list_no_default: a list parameter with no default given
:type list_no_default: list
:param dict_optional_no_default: an optional dictionary parameter with no default given
:type dict_optional_no_default: Optional[Dict[str, str]]
:param list_optional_no_default: an optional list parameter with no default given
:type list_optional_no_default: Optional[List[int]]
:param nested_dict_default: a nested dictionary parameter with a default value
:type nested_dict_default: Dict[str, Dict[str, str]]
:param list_default: a list parameter with a default value
:type list_default: List[str]
:param list_optional_default: a list parameter with a default value of None
:type list_optional_default: Optional[List[str]]
"""

def __init__(
self,
*,
dict_no_default: Dict,
list_no_default: List,
dict_optional_no_default: Optional[Dict[str, str]],
list_optional_no_default: Optional[List[int]],
nested_dict_default: Dict[str, Dict[str, str]] = None,
list_default: List[str] = None,
list_optional_default: Optional[List[str]] = None,
list_not_in_docstring: List[str],
dict_not_in_docstring: Dict[str, str],
**kwargs
):
super().__init__(**kwargs)

def execute(self, context: Any):
pass


class HelperClass1:
"""
A class that should not be picked up by the parser as it does not
derive from an Operator class
"""

def __init__(
self,
myvar1,
*args,
**kwargs):
super().__init__(*args, **kwargs)


class HelperClass2(object):
"""
Another class that should not be picked up by the parser as it does not
derive from an Operator class
"""

def __init__(
self,
myvar2,
*args,
**kwargs):
super().__init__(*args, **kwargs)
Loading