Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Follow up. Adds param validation pattern using template languageVersion 2 model. #332

Merged
merged 3 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions azext_edge/edge/commands_edge.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def init(
enable_fault_tolerance: Optional[bool] = None,
mi_user_assigned_identities: Optional[List[str]] = None,
# Broker
broker_config_file: Optional[str] = None,
custom_broker_config_file: Optional[str] = None,
broker_memory_profile: str = MqMemoryProfile.medium.value,
broker_service_type: str = MqServiceType.cluster_ip.value,
broker_backend_partitions: int = 2,
Expand Down Expand Up @@ -152,9 +152,9 @@ def init(
no_pre_flight = is_env_flag_enabled(INIT_NO_PREFLIGHT_ENV_KEY)

# TODO - @digimaun
broker_config = None
if broker_config_file:
broker_config = json.loads(read_file_content(file_path=broker_config_file))
custom_broker_config = None
if custom_broker_config_file:
custom_broker_config = json.loads(read_file_content(file_path=custom_broker_config_file))

if broker_service_type == MqServiceType.load_balancer.value and add_insecure_listener:
raise ArgumentUsageError(
Expand Down Expand Up @@ -182,7 +182,7 @@ def init(
schema_registry_resource_id=schema_registry_resource_id,
mi_user_assigned_identities=mi_user_assigned_identities,
# Broker
broker_config=broker_config,
custom_broker_config=custom_broker_config,
broker_memory_profile=broker_memory_profile,
broker_service_type=broker_service_type,
broker_backend_partitions=broker_backend_partitions,
Expand Down
2 changes: 1 addition & 1 deletion azext_edge/edge/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ def load_iotops_arguments(self, _):
)
# Broker
context.argument(
"broker_config_file",
"custom_broker_config_file",
options_list=["--broker-config-file"],
help="Path to a json file with custom broker config properties. Useful for advanced scenarios. "
"The expected format is described at https://aka.ms/aziotops-broker-config.",
Expand Down
78 changes: 57 additions & 21 deletions azext_edge/edge/providers/orchestration/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

from typing import List, Optional, Tuple

from azure.cli.core.azclierror import InvalidArgumentValueError

from ...common import (
DEFAULT_BROKER,
DEFAULT_BROKER_AUTHN,
Expand Down Expand Up @@ -41,7 +43,7 @@ def __init__(
enable_fault_tolerance: Optional[bool] = None,
dataflow_profile_instances: int = 1,
# Broker
broker_config: Optional[dict] = None,
custom_broker_config: Optional[dict] = None,
broker_memory_profile: Optional[str] = None,
broker_service_type: Optional[str] = None,
broker_backend_partitions: Optional[int] = None,
Expand Down Expand Up @@ -72,15 +74,15 @@ def __init__(

# Broker
self.add_insecure_listener = add_insecure_listener
self.broker_config = broker_config
self.broker_memory_profile = broker_memory_profile
self.broker_memory_profile = broker_memory_profile
self.broker_service_type = broker_service_type
self.broker_backend_partitions = broker_backend_partitions
self.broker_backend_workers = broker_backend_workers
self.broker_backend_redundancy_factor = broker_backend_redundancy_factor
self.broker_frontend_workers = broker_frontend_workers
self.broker_frontend_replicas = broker_frontend_replicas
self.broker_backend_partitions = self._sanitize_int(broker_backend_partitions)
self.broker_backend_workers = self._sanitize_int(broker_backend_workers)
self.broker_backend_redundancy_factor = self._sanitize_int(broker_backend_redundancy_factor)
self.broker_frontend_workers = self._sanitize_int(broker_frontend_workers)
self.broker_frontend_replicas = self._sanitize_int(broker_frontend_replicas)
self.broker_config = self.get_broker_config_target_map()
self.custom_broker_config = custom_broker_config

# Akri
self.kubernetes_distro = kubernetes_distro
Expand All @@ -89,14 +91,19 @@ def __init__(
self.trust_source = trust_source
self.mi_user_assigned_identities = mi_user_assigned_identities

def _sanitize_k8s_name(self, name: str) -> str:
def _sanitize_k8s_name(self, name: Optional[str]) -> Optional[str]:
if not name:
return name
sanitized = str(name)
sanitized = sanitized.lower()
sanitized = sanitized.replace("_", "-")
return sanitized

def _sanitize_int(self, value: Optional[int]) -> Optional[int]:
if value is None:
return value
return int(value)

def _handle_apply_targets(
self, param_to_target: dict, template_blueprint: TemplateBlueprint
) -> Tuple[TemplateBlueprint, dict]:
Expand Down Expand Up @@ -157,7 +164,7 @@ def get_ops_instance_template(self, cl_extension_ids: List[str]) -> Tuple[dict,
"deployResourceSyncRules": self.deploy_resource_sync_rules,
"schemaRegistryId": self.schema_registry_resource_id,
"defaultDataflowinstanceCount": self.dataflow_profile_instances,
"brokerConfig": self.get_broker_config_target_map(),
"brokerConfig": self.broker_config,
"trustConfig": "",
},
template_blueprint=M2_INSTANCE_TEMPLATE,
Expand Down Expand Up @@ -185,30 +192,59 @@ def get_ops_instance_template(self, cl_extension_ids: List[str]) -> Tuple[dict,
instance["identity"]["type"] = "UserAssigned"
instance["identity"]["userAssignedIdentities"] = mi_user_payload

if self.broker_config:
if "properties" in self.broker_config:
self.broker_config = self.broker_config["properties"]
broker["properties"] = self.broker_config
if self.custom_broker_config:
if "properties" in self.custom_broker_config:
self.custom_broker_config = self.custom_broker_config["properties"]
broker["properties"] = self.custom_broker_config

if self.add_insecure_listener:
template.add_resource(
resource_key="broker_listener_nontls",
resource_key="broker_listener_insecure",
resource_def=get_insecure_listener(instance_name=self.instance_name, broker_name=BROKER_NAME),
)

return template.content, parameters

def get_broker_config_target_map(self):
broker_config_map = {
to_process_config_map = {
"frontendReplicas": self.broker_frontend_replicas,
"frontendWorkers": self.broker_backend_workers,
"frontendWorkers": self.broker_frontend_workers,
"backendRedundancyFactor": self.broker_backend_redundancy_factor,
"backendWorkers": self.broker_backend_workers,
"backendPartitions": self.broker_backend_partitions,
"memoryProfile": self.broker_memory_profile,
"serviceType": self.broker_service_type,
}
for config in broker_config_map:
if not broker_config_map[config]:
del broker_config_map[config]
return broker_config_map
processed_config_map = {}

validation_errors = []
broker_config_def = M2_INSTANCE_TEMPLATE.get_type_definition("_1.BrokerConfig")["properties"]
for config in to_process_config_map:
if to_process_config_map[config] is None:
continue
processed_config_map[config] = to_process_config_map[config]

if not broker_config_def:
continue

if isinstance(to_process_config_map[config], int):
if config in broker_config_def and broker_config_def[config].get("type") == "int":
min_value = broker_config_def[config].get("minValue")
max_value = broker_config_def[config].get("maxValue")

if all([min_value is None, max_value is None]):
continue

if any([to_process_config_map[config] < min_value, to_process_config_map[config] > max_value]):
error_msg = f"{config} value range"

if min_value:
error_msg += f" min:{min_value}"
if max_value:
error_msg += f" max:{max_value}"
validation_errors.append(error_msg)

if validation_errors:
raise InvalidArgumentValueError("\n".join(validation_errors))

return processed_config_map
3 changes: 3 additions & 0 deletions azext_edge/edge/providers/orchestration/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ def get_component_vers(self) -> dict:
# Don't need a deep copy here.
return self.content["variables"]["VERSIONS"].copy()

def get_type_definition(self, key: str) -> Optional[dict]:
return self.content["definitions"].get(key, {"properties": {}})

@property
def parameters(self) -> dict:
return self.content["parameters"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,28 @@ def test_schema_registry_lifecycle(settings_with_rg, tracked_resources):
storage_account_name = f"teststore{generate_random_string(force_lower=True, size=6)}"
registry_name = f"test-registry-{generate_random_string(force_lower=True, size=6)}"
registry_rg = settings_with_rg.env.azext_edge_rg
registry_namespace = f"test-namespace-{generate_random_string(force_lower=True, size=6)}"
registry_namespace1 = f"test-namespace-{generate_random_string(force_lower=True, size=6)}"
registry_namespace2 = f"test-namespace-{generate_random_string(force_lower=True, size=6)}"
# create the storage account and get the id
storage_account = run(
f"az storage account create -n {storage_account_name} -g {registry_rg} "
"--enable-hierarchical-namespace"
"--enable-hierarchical-namespace --public-network-access Disabled "
"--allow-shared-key-access false --allow-blob-public-access false --default-action Deny"
)
tracked_resources.append(storage_account['id'])

# CREATE 1
registry = run(
f"az iot ops schema registry create -n {registry_name} -g {registry_rg} "
f"--rn {registry_namespace} --sa-resource-id {storage_account['id']} "
f"--rn {registry_namespace1} --sa-resource-id {storage_account['id']} "
"--location eastus2euap" # TODO: remove once avaliable in all regions
)
tracked_resources.append(registry["id"])
assert_schema_registry(
registry=registry,
name=registry_name,
resource_group=registry_rg,
namespace=registry_namespace,
namespace=registry_namespace1,
sa_blob_uri=storage_account["primaryEndpoints"]["blob"]
)
# check the roles
Expand All @@ -53,7 +55,7 @@ def test_schema_registry_lifecycle(settings_with_rg, tracked_resources):
registry=show_registry,
name=registry_name,
resource_group=registry_rg,
namespace=registry_namespace,
namespace=registry_namespace1,
sa_blob_uri=storage_account["primaryEndpoints"]["blob"]
)

Expand All @@ -66,21 +68,28 @@ def test_schema_registry_lifecycle(settings_with_rg, tracked_resources):
sa_container = generate_random_string(force_lower=True, size=8)
description = generate_random_string()
display_name = generate_random_string()
tags = f"{generate_random_string()}={generate_random_string()}"
tags = {generate_random_string(): generate_random_string()}
tags_str = ""
for t in tags:
tags_str += f"{t}={tags[t]} "
alt_registry = run(
f"az iot ops schema registry create -n {alt_registry_name} -g {registry_rg} "
f"--rn {registry_namespace} --sa-resource-id {storage_account['id']} "
f"--rn {registry_namespace2} --sa-resource-id {storage_account['id']} "
f"--sa-container {sa_container} --desc {description} --display-name {display_name} "
f"--tags {tags} --custom-role-id {role_id} "
f"--tags {tags_str} --custom-role-id {role_id} "
"--location eastus2euap" # TODO: remove once avaliable in all regions
)
tracked_resources.append(alt_registry["id"])
assert_schema_registry(
registry=registry,
name=registry_name,
registry=alt_registry,
name=alt_registry_name,
resource_group=registry_rg,
namespace=registry_namespace,
sa_blob_uri=storage_account["primaryEndpoints"]["blob"]
namespace=registry_namespace2,
sa_blob_uri=storage_account["primaryEndpoints"]["blob"],
sa_container=sa_container,
description=description,
display_name=display_name,
tags=tags
)
# check the roles
roles = run(
Expand Down Expand Up @@ -123,6 +132,7 @@ def assert_schema_registry(registry: dict, **expected):
assert registry["resourceGroup"] == expected["resource_group"]

assert registry["identity"]
assert registry.get("tags") == expected.get("tags")

registry_props = registry["properties"]
assert registry_props["namespace"] == expected["namespace"]
Expand Down