Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of cos_object_prefix pipeline property #2972

Merged
merged 5 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions elyra/pipeline/airflow/processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def process(self, pipeline: Pipeline) -> "AirflowPipelineProcessorResponse":
if pipeline.contains_generic_operations():
object_storage_url = f"{cos_endpoint}"
os_path = join_paths(
pipeline.pipeline_parameters.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
pipeline.pipeline_properties.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
)
object_storage_path = f"/{cos_bucket}/{os_path}"
else:
Expand Down Expand Up @@ -253,7 +253,7 @@ def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance

pipeline_instance_id = pipeline_instance_id or pipeline_name
artifact_object_prefix = join_paths(
pipeline.pipeline_parameters.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
pipeline.pipeline_properties.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
)

self.log_pipeline_info(
Expand Down
26 changes: 21 additions & 5 deletions elyra/pipeline/component_parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,31 +139,47 @@ def build_property_map(cls) -> None:
"""Build the map of property subclasses."""
cls._subclass_property_map = {sc.property_id: sc for sc in cls.all_subclasses() if hasattr(sc, "property_id")}

@classmethod
def get_class_for_property(cls, prop_id) -> type | None:
"""Returns the ElyraProperty subclass corresponding to the given property id."""
if not cls._subclass_property_map:
cls.build_property_map()
return cls._subclass_property_map.get(prop_id)

@classmethod
def subclass_exists_for_property(cls, prop_id: str) -> bool:
"""
Returns a boolean indicating whether a corresponding ElyraProperty subclass
exists for the given property id.
"""
return cls.get_class_for_property(prop_id) is not None

@classmethod
def get_single_instance(cls, value: Optional[Dict[str, Any]] = None) -> ElyraProperty | None:
"""Unpack values from dictionary object and instantiate a class instance."""
if isinstance(value, ElyraProperty):
return value # value is already a single instance, no further action required

if not isinstance(value, dict):
value = {}

params = {attr.id: cls.strip_if_string(value.get(attr.id)) for attr in cls.property_attributes}
instance = getattr(import_module(cls.__module__), cls.__name__)(**params)
return None if instance.should_discard() else instance

@classmethod
def create_instance(cls, prop_id: str, value: Optional[Any]) -> ElyraProperty | ElyraPropertyList | None:
"""Create an instance of a class with the given property id using the user-entered values."""
if not cls._subclass_property_map:
cls.build_property_map()
sc = cls.get_class_for_property(prop_id)
if sc is None:
return None

sc = cls._subclass_property_map.get(prop_id)
if issubclass(sc, ElyraPropertyListItem):
if not isinstance(value, list):
return None
instances = [sc.get_single_instance(obj) for obj in value] # create instance for each object
return ElyraPropertyList(instances).deduplicate() # convert to ElyraPropertyList and de-dupe
elif issubclass(sc, ElyraProperty):
return sc.get_single_instance(value)

return None

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions elyra/pipeline/kfp/processor_kfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ def process(self, pipeline):
if pipeline.contains_generic_operations():
object_storage_url = f"{cos_public_endpoint}"
os_path = join_paths(
pipeline.pipeline_parameters.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
pipeline.pipeline_properties.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
)
object_storage_path = f"/{cos_bucket}/{os_path}"
else:
Expand Down Expand Up @@ -486,7 +486,7 @@ def _cc_pipeline(
pipeline_instance_id = pipeline_instance_id or pipeline_name

artifact_object_prefix = join_paths(
pipeline.pipeline_parameters.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
pipeline.pipeline_properties.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id
)

self.log_pipeline_info(
Expand Down
2 changes: 1 addition & 1 deletion elyra/pipeline/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def parse(self, pipeline_json: Dict) -> Pipeline:
runtime_config=runtime_config,
source=source,
description=description,
pipeline_parameters=primary_pipeline.pipeline_parameters,
pipeline_properties=primary_pipeline.get_pipeline_default_properties(),
)

nodes = primary_pipeline.nodes
Expand Down
12 changes: 6 additions & 6 deletions elyra/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def __init__(
runtime_config: Optional[str] = None,
source: Optional[str] = None,
description: Optional[str] = None,
pipeline_parameters: Optional[Dict[str, Any]] = None,
pipeline_properties: Optional[Dict[str, Any]] = None,
):
"""
:param id: Generated UUID, 128 bit number used as a unique identifier
Expand All @@ -351,7 +351,7 @@ def __init__(
:param runtime_config: Runtime configuration that should be used to submit the pipeline to execution
:param source: The pipeline source, e.g. a pipeline file or a notebook.
:param description: Pipeline description
:param pipeline_parameters: Key/value pairs representing the parameters of this pipeline
:param pipeline_properties: Key/value pairs representing the properties of this pipeline
"""

if not name:
Expand All @@ -365,7 +365,7 @@ def __init__(
self._source = source
self._runtime = runtime
self._runtime_config = runtime_config
self._pipeline_parameters = pipeline_parameters or {}
self._pipeline_properties = pipeline_properties or {}
self._operations = {}

@property
Expand Down Expand Up @@ -395,11 +395,11 @@ def runtime_config(self) -> str:
return self._runtime_config

@property
def pipeline_parameters(self) -> Dict[str, Any]:
def pipeline_properties(self) -> Dict[str, Any]:
"""
The dictionary of global parameters associated with each node of the pipeline
The dictionary of global properties associated with this pipeline
"""
return self._pipeline_parameters
return self._pipeline_properties

@property
def operations(self) -> Dict[str, Operation]:
Expand Down
4 changes: 1 addition & 3 deletions elyra/pipeline/pipeline_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,4 @@
KUBERNETES_POD_LABELS = "kubernetes_pod_labels"
DISABLE_NODE_CACHING = "disable_node_caching"
KUBERNETES_SHARED_MEM_SIZE = "kubernetes_shared_mem_size"
PIPELINE_META_PROPERTIES = ["name", "description", "runtime"]
# optional static prefix to be used when generating an object name for object storage
COS_OBJECT_PREFIX = "cos_object_prefix"
COS_OBJECT_PREFIX = "cos_object_prefix" # optional static prefix to be used when generating object name for cos storage
76 changes: 33 additions & 43 deletions elyra/pipeline/pipeline_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@
from elyra.pipeline.component_parameter import ElyraProperty
from elyra.pipeline.component_parameter import ElyraPropertyList
from elyra.pipeline.pipeline import Operation
from elyra.pipeline.pipeline_constants import ENV_VARIABLES, RUNTIME_IMAGE
from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX
from elyra.pipeline.pipeline_constants import ENV_VARIABLES
from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
from elyra.pipeline.pipeline_constants import PIPELINE_DEFAULTS
from elyra.pipeline.pipeline_constants import PIPELINE_META_PROPERTIES
from elyra.pipeline.pipeline_constants import RUNTIME_IMAGE
from elyra.pipeline.runtime_type import RuntimeProcessorType


Expand Down Expand Up @@ -183,23 +184,19 @@ def comments(self) -> list:
"""
return self._node["app_data"]["ui_data"].get("comments", [])

@property
def pipeline_parameters(self) -> Dict[str, Any]:
"""
Retrieve pipeline parameters, which are defined as all
key/value pairs in the 'properties' stanza that are not
either pipeline meta-properties (e.g. name, description,
and runtime) or the pipeline defaults dictionary
"""
all_properties = self._node["app_data"].get("properties", {})
excluded_properties = PIPELINE_META_PROPERTIES + [PIPELINE_DEFAULTS]
def get_pipeline_default_properties(self) -> Dict[str, Any]:
"""Retrieve the dictionary of pipeline default properties"""
pipeline_defaults = self.get_property(PIPELINE_DEFAULTS, {})

pipeline_parameters = {}
for property_name, value in all_properties.items():
if property_name not in excluded_properties:
pipeline_parameters[property_name] = value
# TODO remove the block below when a pipeline migration is appropriate (after 3.13)
cos_prefix = self._node["app_data"].get("properties", {}).pop(COS_OBJECT_PREFIX, None)
if cos_prefix:
if PIPELINE_DEFAULTS in self._node["app_data"]["properties"]:
self._node["app_data"]["properties"][PIPELINE_DEFAULTS][COS_OBJECT_PREFIX] = cos_prefix
else:
self._node["app_data"]["properties"][PIPELINE_DEFAULTS] = {COS_OBJECT_PREFIX: cos_prefix}

return pipeline_parameters
return pipeline_defaults

def get_property(self, key: str, default_value=None) -> Any:
"""
Expand Down Expand Up @@ -233,18 +230,16 @@ def convert_elyra_owned_properties(self) -> None:
Convert select pipeline-level properties to their corresponding dataclass
object type. No validation is performed.
"""
pipeline_defaults = self.get_property(PIPELINE_DEFAULTS, {})
for param_id, param_value in list(pipeline_defaults.items()):
if param_id == RUNTIME_IMAGE:
continue # runtime image is the only pipeline default that does not need to be converted
if isinstance(param_value, (ElyraProperty, ElyraPropertyList)) or param_value is None:
continue # property has already been properly converted or cannot be converted

converted_value = ElyraProperty.create_instance(param_id, param_value)
if converted_value is not None:
pipeline_defaults[param_id] = converted_value
pipeline_defaults = self.get_pipeline_default_properties()
for prop_id, value in list(pipeline_defaults.items()):
if not ElyraProperty.subclass_exists_for_property(prop_id):
continue

converted_value = ElyraProperty.create_instance(prop_id, value)
if converted_value is None:
pipeline_defaults.pop(prop_id)
else:
del pipeline_defaults[param_id]
pipeline_defaults[prop_id] = converted_value


class Node(AppDataBase):
Expand Down Expand Up @@ -314,9 +309,7 @@ def component_source(self) -> Optional[str]:

@property
def is_generic(self) -> True:
"""
A property that denotes whether this node is a generic component
"""
"""A property that denotes whether this node is a generic component"""
if Operation.is_generic_operation(self.op):
return True
return False
Expand Down Expand Up @@ -400,9 +393,7 @@ def pop_component_parameter(self, key: str, default: Optional[Any] = None) -> An
return self._node["app_data"]["component_parameters"].pop(key, default)

def get_all_component_parameters(self) -> Dict[str, Any]:
"""
Retrieve all component parameter key-value pairs.
"""
"""Retrieve all component parameter key-value pairs."""
return self._node["app_data"]["component_parameters"]

def remove_env_vars_with_matching_secrets(self):
Expand All @@ -421,16 +412,15 @@ def convert_elyra_owned_properties(self) -> None:
Convert select node-level list properties to their corresponding dataclass
object type. No validation is performed.
"""
for param_id in self.elyra_owned_properties:
param_value = self.get_component_parameter(param_id)
if isinstance(param_value, (ElyraProperty, ElyraPropertyList)) or param_value is None:
continue # property has already been properly converted or cannot be converted
for prop_id in self.elyra_owned_properties:
if not ElyraProperty.subclass_exists_for_property(prop_id):
continue

converted_value = ElyraProperty.create_instance(param_id, param_value)
if converted_value is not None:
self.set_component_parameter(param_id, converted_value)
converted_value = ElyraProperty.create_instance(prop_id, value=self.get_component_parameter(prop_id))
if converted_value is None:
self.pop_component_parameter(prop_id)
else:
self.pop_component_parameter(param_id)
self.set_component_parameter(prop_id, converted_value)


class PipelineDefinition(object):
Expand Down Expand Up @@ -607,7 +597,7 @@ def propagate_pipeline_default_properties(self):
"""
self.primary_pipeline.convert_elyra_owned_properties()

pipeline_default_properties = self.primary_pipeline.get_property(PIPELINE_DEFAULTS, {})
pipeline_default_properties = self.primary_pipeline.get_pipeline_default_properties()
for node in self.pipeline_nodes:
# Determine which Elyra-owned properties will require dataclass conversion, then convert
node.set_elyra_owned_properties(self.primary_pipeline.type)
Expand Down
23 changes: 17 additions & 6 deletions elyra/tests/pipeline/airflow/test_processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from elyra.pipeline.component_parameter import ElyraProperty
from elyra.pipeline.parser import PipelineParser
from elyra.pipeline.pipeline import GenericOperation
from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX
from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
from elyra.pipeline.runtime_type import RuntimeProcessorType
from elyra.tests.pipeline.test_pipeline_parser import _read_pipeline_resource
Expand Down Expand Up @@ -153,8 +154,11 @@ def test_pipeline_process(monkeypatch, processor, parsed_pipeline, sample_metada

assert response.run_url == sample_metadata["metadata"]["api_endpoint"]
assert response.object_storage_url == sample_metadata["metadata"]["cos_endpoint"]
# Verifies that only this substring is in the storage path since a timestamp is injected into the name
assert "/" + sample_metadata["metadata"]["cos_bucket"] + "/" + "untitled" in response.object_storage_path

# Verifies cos_object_prefix is added to storage path and that the correct substring is
# in the storage path since a timestamp is injected into the name
cos_prefix = parsed_pipeline.pipeline_properties.get(COS_OBJECT_PREFIX)
assert f"/{sample_metadata['metadata']['cos_bucket']}/{cos_prefix}/untitled" in response.object_storage_path


@pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_COMPLEX], indirect=True)
Expand All @@ -172,6 +176,10 @@ def test_create_file(monkeypatch, processor, parsed_pipeline, parsed_ordered_dic
monkeypatch.setattr(processor, "_upload_dependencies_to_object_store", lambda w, x, y, prefix: True)
monkeypatch.setattr(processor, "_cc_pipeline", lambda x, y, z: parsed_ordered_dict)

# Ensure the value of COS_OBJECT_PREFIX has been propagated to the Pipeline object appropriately
cos_prefix = pipeline_json["pipelines"][0]["app_data"]["properties"]["pipeline_defaults"].get(COS_OBJECT_PREFIX)
assert cos_prefix == parsed_pipeline.pipeline_properties.get(COS_OBJECT_PREFIX)

with tempfile.TemporaryDirectory() as temp_dir:
export_pipeline_output_path = os.path.join(temp_dir, f"{export_pipeline_name}.py")

Expand Down Expand Up @@ -206,12 +214,15 @@ def test_create_file(monkeypatch, processor, parsed_pipeline, parsed_ordered_dic
# Gets sub-list slice starting where the Notebook Op starts
init_line = i + 1
for idx, line in enumerate(file_as_lines[init_line:], start=init_line):
if "--cos-endpoint" in line:
assert f"--cos-endpoint {sample_metadata['metadata']['cos_endpoint']}" in line
if "--cos-bucket" in line:
assert f"--cos-bucket {sample_metadata['metadata']['cos_bucket']}" in line
if "--cos-directory" in line:
assert f"--cos-directory '{cos_prefix}/some-instance-id'" in line

if "namespace=" in line:
assert sample_metadata["metadata"]["user_namespace"] == read_key_pair(line)["value"]
elif "cos_endpoint=" in line:
assert sample_metadata["metadata"]["cos_endpoint"] == read_key_pair(line)["value"]
elif "cos_bucket=" in line:
assert sample_metadata["metadata"]["cos_bucket"] == read_key_pair(line)["value"]
elif "name=" in line and "Volume" not in file_as_lines[idx - 1]:
assert node["app_data"]["ui_data"]["label"] == read_key_pair(line)["value"]
elif "notebook=" in line:
Expand Down
Loading