Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address test warnings #2833

Merged
merged 2 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions elyra/pipeline/airflow/processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
from elyra.pipeline.pipeline import Pipeline
from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX
from elyra.pipeline.processor import PipelineProcessor
from elyra.pipeline.processor import PipelineProcessorResponse
from elyra.pipeline.processor import RuntimePipelineProcessor
from elyra.pipeline.processor import RuntimePipelineProcessorResponse
from elyra.pipeline.runtime_type import RuntimeProcessorType
from elyra.util.cos import join_paths
from elyra.util.github import GithubClient
Expand Down Expand Up @@ -91,8 +91,8 @@ class AirflowPipelineProcessor(RuntimePipelineProcessor):
# Contains mappings from class to import statement for each available Airflow operator
class_import_map = {}

def __init__(self, root_dir, **kwargs):
super().__init__(root_dir, **kwargs)
def __init__(self, **kwargs):
super().__init__(**kwargs)
if not self.class_import_map: # Only need to load once
for package in self.available_airflow_operators:
parts = package.rsplit(".", 1)
Expand Down Expand Up @@ -713,7 +713,7 @@ def render_secrets_for_generic_op(op: Dict) -> str:
return dedent(str_to_render)


class AirflowPipelineProcessorResponse(PipelineProcessorResponse):
class AirflowPipelineProcessorResponse(RuntimePipelineProcessorResponse):

_type = RuntimeProcessorType.APACHE_AIRFLOW
_name = "airflow"
Expand Down
8 changes: 4 additions & 4 deletions elyra/pipeline/component_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,10 @@ class ComponentCache(SingletonConfigurable):
}

def __init__(self, **kwargs):
emulate_server_app: bool = kwargs.pop("emulate_server_app", False)
super().__init__(**kwargs)

self._component_cache = {}
self.is_server_process = ComponentCache._determine_server_process(**kwargs)
self.is_server_process = ComponentCache._determine_server_process(emulate_server_app, **kwargs)
self.manifest_dir = jupyter_runtime_dir()
# Ensure queue attribute exists for non-server instances as well.
self.refresh_queue: Optional[RefreshQueue] = None
Expand All @@ -391,13 +391,13 @@ def __init__(self, **kwargs):
self.manifest_filename = os.path.join(self.manifest_dir, f"elyra-component-manifest-{os.getpid()}.json")

@staticmethod
def _determine_server_process(**kwargs) -> bool:
def _determine_server_process(emulate_server_app: bool, **kwargs) -> bool:
"""Determines if this process is a server (extension) process."""
app_names = ["ServerApp", "ElyraApp"]
is_server_process = False
if "parent" in kwargs and kwargs["parent"].__class__.__name__ in app_names:
is_server_process = True
elif "emulate_server_app" in kwargs and kwargs["emulate_server_app"]: # Used in unittests
elif emulate_server_app: # Used in unittests
is_server_process = True

return is_server_process
Expand Down
7 changes: 2 additions & 5 deletions elyra/pipeline/kfp/processor_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
from elyra.pipeline.pipeline import Pipeline
from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX
from elyra.pipeline.processor import PipelineProcessor
from elyra.pipeline.processor import PipelineProcessorResponse
from elyra.pipeline.processor import RuntimePipelineProcessor
from elyra.pipeline.processor import RuntimePipelineProcessorResponse
from elyra.pipeline.runtime_type import RuntimeProcessorType
from elyra.util.cos import join_paths
from elyra.util.path import get_absolute_path
Expand All @@ -68,9 +68,6 @@ class KfpPipelineProcessor(RuntimePipelineProcessor):
# Defaults to `/tmp`
WCD = os.getenv("ELYRA_WRITABLE_CONTAINER_DIR", "/tmp").strip().rstrip("/")

def __init__(self, root_dir, **kwargs):
super().__init__(root_dir, **kwargs)

def process(self, pipeline):
"""
Runs a pipeline on Kubeflow Pipelines
Expand Down Expand Up @@ -769,7 +766,7 @@ def _sanitize_param_name(name: str) -> str:
return normalized_name.replace(" ", "_")


class KfpPipelineProcessorResponse(PipelineProcessorResponse):
class KfpPipelineProcessorResponse(RuntimePipelineProcessorResponse):
_type = RuntimeProcessorType.KUBEFLOW_PIPELINES
_name = "kfp"

Expand Down
7 changes: 2 additions & 5 deletions elyra/pipeline/local/processor_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ class LocalPipelineProcessor(PipelineProcessor):
_type = RuntimeProcessorType.LOCAL
_name = "local"

def __init__(self, root_dir, **kwargs):
super().__init__(root_dir, **kwargs)
def __init__(self, **kwargs):
super().__init__(**kwargs)
notebook_op_processor = NotebookOperationProcessor(self.root_dir)
python_op_processor = PythonScriptOperationProcessor(self.root_dir)
r_op_processor = RScriptOperationProcessor(self.root_dir)
Expand Down Expand Up @@ -120,9 +120,6 @@ class LocalPipelineProcessorResponse(PipelineProcessorResponse):
_type = RuntimeProcessorType.LOCAL
_name = "local"

def __init__(self):
super().__init__("", "", "")


class OperationProcessor(ABC):

Expand Down
44 changes: 23 additions & 21 deletions elyra/pipeline/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from abc import ABC
from abc import abstractmethod
import ast
import asyncio
Expand Down Expand Up @@ -54,15 +53,16 @@ class PipelineProcessorRegistry(SingletonConfigurable):
_processors: Dict[str, "PipelineProcessor"] = {}

def __init__(self, **kwargs):
root_dir: Optional[str] = kwargs.pop("root_dir", None)
super().__init__(**kwargs)
self.root_dir = get_expanded_path(kwargs.get("root_dir"))
self.root_dir = get_expanded_path(root_dir)
# Register all known processors based on entrypoint configuration
for processor in entrypoints.get_group_all("elyra.pipeline.processors"):
try:
# instantiate an actual instance of the processor
processor_instance = processor.load()(self.root_dir, parent=kwargs.get("parent")) # Load an instance
processor_instance = processor.load()(root_dir=self.root_dir, parent=kwargs.get("parent"))
self.log.info(
f"Registering {processor.name} processor " f'"{processor.module_name}.{processor.object_name}"...'
f"Registering {processor.name} processor '{processor.module_name}.{processor.object_name}'..."
)
self.add_processor(processor_instance)
except Exception as err:
Expand Down Expand Up @@ -110,9 +110,10 @@ class PipelineProcessorManager(SingletonConfigurable):
_registry: PipelineProcessorRegistry

def __init__(self, **kwargs):
root_dir: Optional[str] = kwargs.pop("root_dir", None)
super().__init__(**kwargs)
self.root_dir = get_expanded_path(kwargs.get("root_dir"))
self._registry = PipelineProcessorRegistry.instance()
self.root_dir = get_expanded_path(root_dir)
self._registry = PipelineProcessorRegistry.instance(root_dir=self.root_dir)

def get_processor_for_runtime(self, runtime_name: str):
processor = self._registry.get_processor(runtime_name)
Expand Down Expand Up @@ -157,16 +158,11 @@ async def export(self, pipeline, pipeline_export_format, pipeline_export_path, o
return res


class PipelineProcessorResponse(ABC):
class PipelineProcessorResponse:

_type: RuntimeProcessorType = None
_name: str = None

def __init__(self, run_url, object_storage_url, object_storage_path):
self._run_url = run_url
self._object_storage_url = object_storage_url
self._object_storage_path = object_storage_path

@property
def type(self) -> str: # Return the string value of the name so that JSON serialization works
if self._type is None:
Expand All @@ -179,6 +175,19 @@ def name(self) -> str:
raise NotImplementedError("_name must have a value!")
return self._name

def to_json(self):
return {
"platform": self.type,
}


class RuntimePipelineProcessorResponse(PipelineProcessorResponse):
def __init__(self, run_url, object_storage_url, object_storage_path):
super().__init__()
self._run_url = run_url
self._object_storage_url = object_storage_url
self._object_storage_path = object_storage_path

@property
def run_url(self):
"""
Expand Down Expand Up @@ -216,19 +225,15 @@ class PipelineProcessor(LoggingConfigurable): # ABC
_type: RuntimeProcessorType = None
_name: str = None

root_dir = Unicode(allow_none=True)
root_dir: str = Unicode(allow_none=True)

enable_pipeline_info = Bool(
enable_pipeline_info: bool = Bool(
config=True,
default_value=(os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true"),
help="""Produces formatted logging of informational messages with durations
(default=True). (ELYRA_ENABLE_PIPELINE_INFO env var)""",
)

def __init__(self, root_dir, **kwargs):
super().__init__(**kwargs)
self.root_dir = root_dir

@property
def type(self):
if self._type is None:
Expand Down Expand Up @@ -348,9 +353,6 @@ def _sort_operation_dependencies(operations_by_id: dict, ordered_operations: lis


class RuntimePipelineProcessor(PipelineProcessor):
def __init__(self, root_dir: str, **kwargs):
super().__init__(root_dir, **kwargs)

def _get_dependency_archive_name(self, operation: Operation) -> str:
return f"{Path(operation.filename).stem}-{operation.id}.tar.gz"

Expand Down
96 changes: 51 additions & 45 deletions elyra/pipeline/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ def to_json(self):

class PipelineValidationManager(SingletonConfigurable):
def __init__(self, **kwargs):
root_dir: Optional[str] = kwargs.pop("root_dir", None)
super().__init__(**kwargs)
self.root_dir = get_expanded_path(kwargs.get("root_dir"))
self.root_dir = get_expanded_path(root_dir)

async def validate(self, pipeline: Dict) -> ValidationResponse:
"""
Expand Down Expand Up @@ -310,57 +311,62 @@ async def _validate_compatibility(
response.add_message(
severity=ValidationSeverity.Error,
message_type="invalidRuntime",
message="Pipeline runtime platform is not compatible " "with selected runtime configuration.",
message="Pipeline runtime platform is not compatible with selected runtime configuration.",
data={
"pipelineID": primary_pipeline_id,
"pipelineType": pipeline_type,
"pipelineRuntime": pipeline_runtime,
},
)
elif PipelineProcessorManager.instance().is_supported_runtime(pipeline_runtime):
component_list = await PipelineProcessorManager.instance().get_components(pipeline_runtime)
for component in component_list:
supported_ops.append(component.op)

# Checks pipeline node types are compatible with the runtime selected
for sub_pipeline in pipeline_definition.pipelines:
for node in sub_pipeline.nodes:
if node.op not in ComponentCache.get_generic_component_ops() and pipeline_runtime == "local":
response.add_message(
severity=ValidationSeverity.Error,
message_type="invalidNodeType",
message="This pipeline contains at least one runtime-specific "
"component, but pipeline runtime is 'local'. Specify a "
"runtime config or remove runtime-specific components "
"from the pipeline",
data={"nodeID": node.id, "nodeOpName": node.op, "pipelineId": sub_pipeline.id},
)
break
if node.type == "execution_node" and node.op not in supported_ops:
response.add_message(
severity=ValidationSeverity.Error,
message_type="invalidNodeType",
message="This component was not found in the catalog. Please add it "
"to your component catalog or remove this node from the "
"pipeline",
data={
"nodeID": node.id,
"nodeOpName": node.op,
"nodeName": node.label,
"pipelineId": sub_pipeline.id,
},
)
else:
response.add_message(
severity=ValidationSeverity.Error,
message_type="invalidRuntime",
message="Unsupported pipeline runtime",
data={
"pipelineRuntime": pipeline_runtime,
"pipelineType": pipeline_type,
"pipelineId": primary_pipeline_id,
},
)
processor_manager = PipelineProcessorManager.instance(root_dir=self.root_dir)
if processor_manager.is_supported_runtime(pipeline_runtime):
component_list = await processor_manager.get_components(pipeline_runtime)
for component in component_list:
supported_ops.append(component.op)

# Checks pipeline node types are compatible with the runtime selected
for sub_pipeline in pipeline_definition.pipelines:
for node in sub_pipeline.nodes:
if (
node.op not in ComponentCache.get_generic_component_ops()
and pipeline_runtime == "local"
):
response.add_message(
severity=ValidationSeverity.Error,
message_type="invalidNodeType",
message="This pipeline contains at least one runtime-specific "
"component, but pipeline runtime is 'local'. Specify a "
"runtime config or remove runtime-specific components "
"from the pipeline",
data={"nodeID": node.id, "nodeOpName": node.op, "pipelineId": sub_pipeline.id},
)
break
if node.type == "execution_node" and node.op not in supported_ops:
response.add_message(
severity=ValidationSeverity.Error,
message_type="invalidNodeType",
message="This component was not found in the catalog. Please add it "
"to your component catalog or remove this node from the "
"pipeline",
data={
"nodeID": node.id,
"nodeOpName": node.op,
"nodeName": node.label,
"pipelineId": sub_pipeline.id,
},
)
else:
response.add_message(
severity=ValidationSeverity.Error,
message_type="invalidRuntime",
message="Unsupported pipeline runtime",
data={
"pipelineRuntime": pipeline_runtime,
"pipelineType": pipeline_type,
"pipelineId": primary_pipeline_id,
},
)

async def _validate_node_properties(
self,
Expand Down
2 changes: 1 addition & 1 deletion elyra/tests/pipeline/airflow/test_processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

@pytest.fixture
def processor(monkeypatch, setup_factory_data):
processor = AirflowPipelineProcessor(os.getcwd())
processor = AirflowPipelineProcessor(root_dir=os.getcwd())

# Add spoofed TestOperator to class import map
class_import_map = {"TestOperator": "from airflow.operators.test_operator import TestOperator"}
Expand Down
2 changes: 1 addition & 1 deletion elyra/tests/pipeline/kfp/test_processor_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

@pytest.fixture
def processor(setup_factory_data):
processor = KfpPipelineProcessor(os.getcwd())
processor = KfpPipelineProcessor(root_dir=os.getcwd())
return processor


Expand Down
8 changes: 4 additions & 4 deletions elyra/tests/pipeline/local/test_pipeline_processor_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def test_pipeline_execution(pipeline_dir):

pipeline = construct_pipeline("p1", nodes=nodes, location=pipeline_dir)

LocalPipelineProcessor(pipeline_dir).process(pipeline)
LocalPipelineProcessor(root_dir=pipeline_dir).process(pipeline)

# Confirm outputs
for node in nodes:
Expand All @@ -116,7 +116,7 @@ def test_pipeline_execution_missing_kernelspec(pipeline_dir):
nbformat.write(nb, node1nb_file)

with pytest.raises(RuntimeError) as e:
LocalPipelineProcessor(pipeline_dir).process(pipeline)
LocalPipelineProcessor(root_dir=pipeline_dir).process(pipeline)
assert (
"Error processing operation node1 (node1.ipynb): No kernel "
"name found in notebook and no override provided." in str(e.value)
Expand All @@ -136,7 +136,7 @@ def test_pipeline_execution_bad_notebook(pipeline_dir):
pipeline = construct_pipeline("p1", nodes=nodes, location=pipeline_dir)

with pytest.raises(RuntimeError) as e:
LocalPipelineProcessor(pipeline_dir).process(pipeline)
LocalPipelineProcessor(root_dir=pipeline_dir).process(pipeline)
assert "Error processing operation node3" in str(e.value)

# Confirm outputs (and non-outputs)
Expand All @@ -162,7 +162,7 @@ def test_pipeline_execution_bad_python(pipeline_dir):
pipeline = construct_pipeline("p1", nodes=nodes, location=pipeline_dir)

with pytest.raises(RuntimeError) as e:
LocalPipelineProcessor(pipeline_dir).process(pipeline)
LocalPipelineProcessor(root_dir=pipeline_dir).process(pipeline)
assert "Error processing operation node2" in str(e.value)

# Confirm outputs (and non-outputs)
Expand Down