Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dataflow updates for 0.7.0 #326

Merged
merged 9 commits into from
Sep 3, 2024
198 changes: 127 additions & 71 deletions azext_edge/edge/providers/check/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,41 @@ def _process_dataflow_destinationsettings(
)


def _process_endpoint_authentication(
endpoint_settings: dict, check_manager: CheckManager, target: str, namespace: str, padding: int
) -> None:
auth = endpoint_settings.get("authentication", {})
auth_method = auth.get("method")
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label="Authentication Method", value=auth_method, padding=padding),
)
# TODO - add displays for various auth types (refs, identity names, etc)


def _process_endpoint_TLS(
tls_settings: dict, check_manager: CheckManager, target: str, namespace: str, padding: int
) -> None:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding("TLS:", (0, 0, 0, padding)),
)
for label, key in [
("Mode", "mode"),
("Trusted CA ConfigMap", "trustedCaCertificateConfigMapRef"),
]:
# TODO - validate ref?
val = tls_settings.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=(padding + PADDING_SIZE)),
)


def _process_endpoint_mqttsettings(
check_manager: CheckManager, target: str, namespace: str, spec: dict, detail_level: int, padding: int
) -> None:
Expand All @@ -356,8 +391,19 @@ def _process_endpoint_mqttsettings(
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding),
)

# endpoint authentication details
_process_endpoint_authentication(
endpoint_settings=settings,
check_manager=check_manager,
target=target,
namespace=namespace,
padding=INNER_PADDING,
)

if detail_level > ResourceOutputDetailLevel.detail.value:
for label, key in [
("Cloud Event Attributes", "cloudEventAttributes"),
("Client ID Prefix", "clientIdPrefix"),
("Keep Alive (s)", "keepAliveSeconds"),
("Max Inflight Messages", "maxInflightMessages"),
Expand All @@ -373,24 +419,12 @@ def _process_endpoint_mqttsettings(
display=basic_property_display(label=label, value=val, padding=padding),
)

# TLS
tls = settings.get("tls", {})
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding("TLS:", (0, 0, 0, padding)),
)
for label, key in [
("Mode", "mode"),
("Trusted CA ConfigMap", "trustedCaCertificateConfigMapRef"),
]:
# TODO - validate ref?
val = tls.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=(padding + PADDING_SIZE)),
)
if tls:
_process_endpoint_TLS(
tls_settings=tls, check_manager=check_manager, target=target, namespace=namespace, padding=padding
)


def _process_endpoint_kafkasettings(
Expand All @@ -411,9 +445,19 @@ def _process_endpoint_kafkasettings(
display=basic_property_display(label=label, value=val, padding=padding),
)

# endpoint authentication details
_process_endpoint_authentication(
endpoint_settings=settings,
check_manager=check_manager,
target=target,
namespace=namespace,
padding=INNER_PADDING,
)

if detail_level > ResourceOutputDetailLevel.detail.value:
# extra properties
for label, key in [
("Cloud Event Attributes", "cloudEventAttributes"),
("Compression", "compression"),
("Copy MQTT Properties", "copyMqttProperties"),
("Acks", "kafkaAcks"),
Expand All @@ -426,25 +470,12 @@ def _process_endpoint_kafkasettings(
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding),
)
# tls
# TLS
tls = settings.get("tls", {})
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding("TLS:", (0, 0, 0, padding)),
)
for label, key in [
("Mode", "mode"),
("Trusted CA ConfigMap", "trustedCaCertificateConfigMapRef"),
]:
# TODO - validate ref?
val = tls.get(key)
if val:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=inner_padding),
)
if tls:
_process_endpoint_TLS(
tls_settings=tls, check_manager=check_manager, target=target, namespace=namespace, padding=padding
)

# batching
batching = settings.get("batching", {})
Expand Down Expand Up @@ -481,6 +512,16 @@ def _process_endpoint_fabriconelakesettings(
namespace=namespace,
display=basic_property_display(label=label, value=val, padding=padding),
)

# endpoint authentication details
_process_endpoint_authentication(
endpoint_settings=settings,
check_manager=check_manager,
target=target,
namespace=namespace,
padding=INNER_PADDING,
)

if detail_level > ResourceOutputDetailLevel.detail.value:
names = settings.get("names", {})
for label, key in [
Expand Down Expand Up @@ -529,6 +570,15 @@ def _process_endpoint_datalakestoragesettings(
display=basic_property_display(label=label, value=val, padding=padding),
)

# endpoint authentication details
_process_endpoint_authentication(
endpoint_settings=settings,
check_manager=check_manager,
target=target,
namespace=namespace,
padding=INNER_PADDING,
)

if detail_level > ResourceOutputDetailLevel.detail.value:
batching = settings.get("batching", {})
check_manager.add_display(
Expand Down Expand Up @@ -563,6 +613,15 @@ def _process_endpoint_dataexplorersettings(
display=basic_property_display(label=label, value=val, padding=padding),
)

# endpoint authentication details
_process_endpoint_authentication(
endpoint_settings=settings,
check_manager=check_manager,
target=target,
namespace=namespace,
padding=INNER_PADDING,
)

if detail_level > ResourceOutputDetailLevel.detail.value:
batching = settings.get("batching", {})
check_manager.add_display(
Expand Down Expand Up @@ -596,6 +655,14 @@ def _process_endpoint_localstoragesettings(
namespace=namespace,
display=Padding(f"Persistent Volume Claim: {persistent_volume_claim}", (0, 0, 0, padding)),
)
# endpoint authentication details
_process_endpoint_authentication(
endpoint_settings=settings,
check_manager=check_manager,
target=target,
namespace=namespace,
padding=INNER_PADDING,
)


def check_dataflows_deployment(
Expand Down Expand Up @@ -670,7 +737,7 @@ def evaluate_core_service_runtime(
target=CoreServiceResourceKinds.RUNTIME_RESOURCE.value,
pods=pods,
namespace=namespace,
display_padding=INNER_PADDING,
display_padding=PADDING + 2,
detail_level=detail_level,
)

Expand Down Expand Up @@ -717,8 +784,6 @@ def evaluate_dataflows(
target_name=target,
namespace=namespace,
conditions=[
# valid dataflow profile reference
"spec.profileRef",
# at least a source and destination operation
"len(spec.operations)<=3",
# valid source endpoint
Expand Down Expand Up @@ -787,43 +852,43 @@ def evaluate_dataflows(
value=f"{CheckTaskStatus.skipped.emoji} Skipping evaluation of disabled dataflow",
color=CheckTaskStatus.skipped.color,
),
(0, 0, 0, PADDING + 2)
(0, 0, 0, PADDING + 2),
),
)
continue

profile_ref = spec.get("profileRef")
profile_ref_status = CheckTaskStatus.success
if profile_ref and profile_ref not in profile_names:
profile_ref_status = CheckTaskStatus.error
# profileRef is optional, only show an error if it exists but is invalid
if profile_ref:
profile_ref_status = CheckTaskStatus.error if profile_ref not in profile_names else CheckTaskStatus.success

# valid profileRef eval
check_manager.add_target_eval(
target_name=target,
namespace=namespace,
status=profile_ref_status.value,
resource_name=dataflow_name,
resource_kind=DataflowResourceKinds.DATAFLOW.value,
value={"spec.profileRef": profile_ref},
)
# valid profileRef eval
check_manager.add_target_eval(
target_name=target,
namespace=namespace,
status=profile_ref_status.value,
resource_name=dataflow_name,
resource_kind=DataflowResourceKinds.DATAFLOW.value,
value={"spec.profileRef": profile_ref},
)

check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
f"Dataflow Profile: {{{colorize_string(color=profile_ref_status.color, value=profile_ref)}}}",
(0, 0, 0, INNER_PADDING),
),
)
if profile_ref_status == CheckTaskStatus.error:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
colorize_string(color=profile_ref_status.color, value="Invalid Dataflow Profile reference"),
f"Dataflow Profile: {{{colorize_string(color=profile_ref_status.color, value=profile_ref)}}}",
(0, 0, 0, INNER_PADDING),
),
)
if profile_ref_status == CheckTaskStatus.error:
check_manager.add_display(
target_name=target,
namespace=namespace,
display=Padding(
colorize_string(color=profile_ref_status.color, value="Invalid Dataflow Profile reference"),
(0, 0, 0, INNER_PADDING),
),
)

operations = spec.get("operations", [])

Expand Down Expand Up @@ -984,17 +1049,8 @@ def evaluate_dataflow_endpoints(
),
)

# endpoint auth
# endpoint details at higher detail levels
if detail_level > ResourceOutputDetailLevel.summary.value:
auth = spec.get("authentication", {})
auth_method = auth.get("method")
check_manager.add_display(
target_name=target,
namespace=namespace,
display=basic_property_display(
label="Authentication Method", value=auth_method, padding=INNER_PADDING
),
)

endpoint_processor_dict = {
DataflowEndpointType.mqtt.value: _process_endpoint_mqttsettings,
Expand Down
Loading