From 7ffe1c768acbc9c94cb7c3e170fdb8f0b6b8fc8e Mon Sep 17 00:00:00 2001 From: Alputer Date: Thu, 28 Nov 2024 12:01:41 +0100 Subject: [PATCH] fix(k8s): handle exceptions better for Dask K8s resources (#618) Closes #617 --- reana_workflow_controller/consumer.py | 39 +---- reana_workflow_controller/dask.py | 212 ++++++++++++++++++++++++-- reana_workflow_controller/k8s.py | 89 ----------- 3 files changed, 203 insertions(+), 137 deletions(-) diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index 081d2ad8..ceeb5b5d 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -22,8 +22,6 @@ from reana_commons.k8s.api_client import ( current_k8s_batchv1_api_client, current_k8s_corev1_api_client, - current_k8s_custom_objects_api_client, - current_k8s_networking_api_client, ) from reana_commons.k8s.secrets import UserSecretsStore from reana_commons.utils import ( @@ -46,10 +44,8 @@ REANA_JOB_STATUS_CONSUMER_PREFETCH_COUNT, ) from reana_workflow_controller.errors import REANAWorkflowControllerError -from reana_workflow_controller.k8s import delete_dask_dashboard_ingress -from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED -from reana_workflow_controller.dask import requires_dask +from reana_workflow_controller.dask import requires_dask, delete_dask_cluster try: from urllib import parse as urlparse @@ -169,7 +165,7 @@ def _update_workflow_status(workflow, status, logs): workflow.logs += "Workflow engine logs could not be retrieved.\n" if requires_dask(workflow): - _delete_dask_cluster(workflow) + delete_dask_cluster(workflow.id_, workflow.owner_id) if RunStatus.should_cleanup_job(status): try: @@ -315,34 +311,3 @@ def _get_workflow_engine_pod_logs(workflow: Workflow) -> str: # There might not be any pod returned by `list_namespaced_pod`, for example # when a workflow fails to be scheduled return "" - - -def _delete_dask_cluster(workflow: Workflow) -> None: - """Delete the Dask cluster resources.""" - current_k8s_custom_objects_api_client.delete_namespaced_custom_object( - group="kubernetes.dask.org", - version="v1", - plural="daskclusters", - name=get_dask_component_name(workflow.id_, "cluster"), - namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, - ) - - if DASK_AUTOSCALER_ENABLED: - current_k8s_custom_objects_api_client.delete_namespaced_custom_object( - group="kubernetes.dask.org", - version="v1", - plural="daskautoscalers", - name=get_dask_component_name(workflow.id_, "autoscaler"), - namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, - ) - - delete_dask_dashboard_ingress(workflow.id_) - - dask_service = ( - Session.query(Service) - .filter_by(name=get_dask_component_name(workflow.id_, "database_model_service")) - .one_or_none() - ) - workflow.services.remove(dask_service) - Session.delete(dask_service) - Session.object_session(workflow).commit() diff --git a/reana_workflow_controller/dask.py b/reana_workflow_controller/dask.py index 525afcc1..05e14ed2 100644 --- a/reana_workflow_controller/dask.py +++ b/reana_workflow_controller/dask.py @@ -11,6 +11,12 @@ from flask import current_app +from kubernetes import client +from kubernetes.client.exceptions import ApiException + +from reana_db.database import Session +from reana_db.models import Service +from reana_db.utils import _get_workflow_with_uuid_or_name from reana_commons.config import ( K8S_CERN_EOS_AVAILABLE, K8S_CERN_EOS_MOUNT_CONFIGURATION, @@ -20,6 +26,7 @@ REANA_RUNTIME_KUBERNETES_NAMESPACE, ) from reana_commons.k8s.api_client import ( + current_k8s_networking_api_client, current_k8s_custom_objects_api_client, ) from reana_commons.k8s.kerberos import get_kerberos_k8s_config @@ -28,11 +35,14 @@ get_workspace_volume, get_reana_shared_volume, ) -from reana_commons.job_utils import kubernetes_memory_to_bytes from reana_commons.utils import get_dask_component_name -from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED -from reana_workflow_controller.k8s import create_dask_dashboard_ingress +from reana_workflow_controller.config import ( + DASK_AUTOSCALER_ENABLED, + REANA_INGRESS_HOST, + REANA_INGRESS_CLASS_NAME, + REANA_INGRESS_ANNOTATIONS, +) class DaskResourceManager: @@ -110,14 +120,21 @@ def _load_dask_autoscaler_template(self): def create_dask_resources(self): """Create necessary Dask resources for the workflow.""" - self._prepare_cluster() - self._create_dask_cluster() + try: + self._prepare_cluster() + self._create_dask_cluster() - if DASK_AUTOSCALER_ENABLED: - self._prepare_autoscaler() - self._create_dask_autoscaler() + if DASK_AUTOSCALER_ENABLED: + self._prepare_autoscaler() + self._create_dask_autoscaler() + + create_dask_dashboard_ingress(self.workflow_id) - create_dask_dashboard_ingress(self.workflow_id) + except Exception as e: + logging.error( + f"An error occured while trying to create dask cluster, now deleting the cluster... Error message:\n{e}" + ) + delete_dask_cluster(self.workflow_id, self.user_id) def _prepare_cluster(self): """Prepare Dask cluster body by adding necessary image-pull secrets, volumes, volume mounts, init containers and sidecar containers.""" @@ -492,11 +509,11 @@ def _create_dask_cluster(self): namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, body=self.cluster_body, ) - except Exception: + except Exception as e: logging.exception( "An error occurred while trying to create a Dask cluster." ) - raise + raise e def _create_dask_autoscaler(self): """Create Dask autoscaler resource.""" @@ -520,3 +537,176 @@ def requires_dask(workflow): return bool( workflow.reana_specification["workflow"].get("resources", {}).get("dask", False) ) + + +def delete_dask_cluster(workflow_id, user_id) -> None: + """Delete the Dask cluster resources.""" + errors = [] # Collect errors during deletion attempts + + try: + current_k8s_custom_objects_api_client.delete_namespaced_custom_object( + group="kubernetes.dask.org", + version="v1", + plural="daskclusters", + namespace="default", + name=get_dask_component_name(workflow_id, "cluster"), + ) + logging.info(f"Dask cluster for workflow {workflow_id} deleted successfully.") + except Exception as e: + errors.append(f"Error deleting Dask cluster for workflow {workflow_id}: {e}") + + if DASK_AUTOSCALER_ENABLED: + try: + current_k8s_custom_objects_api_client.delete_namespaced_custom_object( + group="kubernetes.dask.org", + version="v1", + plural="daskautoscalers", + namespace="default", + name=get_dask_component_name(workflow_id, "autoscaler"), + ) + logging.info( + f"Dask autoscaler for workflow {workflow_id} deleted successfully." + ) + except Exception as e: + errors.append( + f"Error deleting Dask autoscaler for workflow {workflow_id}: {e}" + ) + + try: + delete_dask_dashboard_ingress(workflow_id) + logging.info( + f"Dask dashboard ingress for workflow {workflow_id} deleted successfully." + ) + except Exception as e: + errors.append( + f"Error deleting Dask dashboard ingress for workflow {workflow_id}: {e}" + ) + + try: + dask_service = ( + Session.query(Service) + .filter_by( + name=get_dask_component_name(workflow_id, "database_model_service") + ) + .one_or_none() + ) + if dask_service: + workflow = _get_workflow_with_uuid_or_name(str(workflow_id), user_id) + workflow.services.remove(dask_service) + Session.delete(dask_service) + Session.object_session(workflow).commit() + + except Exception as e: + errors.append( + f"Error deleting Dask Service model from database of the workflow: {workflow_id}: {e}" + ) + + # Raise collected errors if any + if errors: + logging.error("Errors occurred during resource deletion:\n" + "\n".join(errors)) + raise RuntimeError( + "Errors occurred during resource deletion:\n" + "\n".join(errors) + ) + + +def create_dask_dashboard_ingress(workflow_id): + """Create K8S Ingress object for Dask dashboard.""" + # Define the middleware spec + middleware_spec = { + "apiVersion": "traefik.io/v1alpha1", + "kind": "Middleware", + "metadata": { + "name": get_dask_component_name( + workflow_id, "dashboard_ingress_middleware" + ), + "namespace": REANA_RUNTIME_KUBERNETES_NAMESPACE, + }, + "spec": { + "replacePathRegex": { + "regex": f"/{workflow_id}/dashboard/*", + "replacement": "/$1", + } + }, + } + + ingress = client.V1Ingress( + api_version="networking.k8s.io/v1", + kind="Ingress", + metadata=client.V1ObjectMeta( + name=get_dask_component_name(workflow_id, "dashboard_ingress"), + annotations={ + **REANA_INGRESS_ANNOTATIONS, + "traefik.ingress.kubernetes.io/router.middlewares": f"{REANA_RUNTIME_KUBERNETES_NAMESPACE}-{get_dask_component_name(workflow_id, 'dashboard_ingress_middleware')}@kubernetescrd", + }, + ), + spec=client.V1IngressSpec( + rules=[ + client.V1IngressRule( + host=REANA_INGRESS_HOST, + http=client.V1HTTPIngressRuleValue( + paths=[ + client.V1HTTPIngressPath( + path=f"/{workflow_id}/dashboard", + path_type="Prefix", + backend=client.V1IngressBackend( + service=client.V1IngressServiceBackend( + name=get_dask_component_name( + workflow_id, "dashboard_service" + ), + port=client.V1ServiceBackendPort(number=8787), + ) + ), + ) + ] + ), + ) + ] + ), + ) + if REANA_INGRESS_CLASS_NAME: + ingress.spec.ingress_class_name = REANA_INGRESS_CLASS_NAME + + # Create middleware for ingress + current_k8s_custom_objects_api_client.create_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace="default", + plural="middlewares", + body=middleware_spec, + ) + # Create the ingress resource + current_k8s_networking_api_client.create_namespaced_ingress( + namespace="default", body=ingress + ) + + +def delete_dask_dashboard_ingress(workflow_id): + """Delete K8S Ingress Object for Dask dashboard.""" + errors = [] # Collect errors during deletion attempts + try: + current_k8s_networking_api_client.delete_namespaced_ingress( + get_dask_component_name(workflow_id, "dashboard_ingress"), + namespace="default", + body=client.V1DeleteOptions(), + ) + except Exception as e: + errors.append( + f"Error deleting Dask dashboard ingress for workflow {workflow_id}: {e}" + ) + + try: + current_k8s_custom_objects_api_client.delete_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace="default", + plural="middlewares", + name=get_dask_component_name(workflow_id, "dashboard_ingress_middleware"), + ) + except Exception as e: + errors.append( + f"Error deleting Dask dashboard ingress middleware for workflow {workflow_id}: {e}" + ) + + # Raise collected errors if any + if errors: + raise RuntimeError("\n".join(errors)) diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index 30b79a47..c45f70af 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -17,14 +17,12 @@ current_k8s_appsv1_api_client, current_k8s_corev1_api_client, current_k8s_networking_api_client, - current_k8s_custom_objects_api_client, ) from reana_commons.k8s.secrets import UserSecretsStore from reana_commons.k8s.volumes import ( get_k8s_cvmfs_volumes, get_workspace_volume, ) -from reana_commons.utils import get_dask_component_name from reana_workflow_controller.config import ( # isort:skip JUPYTER_INTERACTIVE_SESSION_DEFAULT_PORT, @@ -405,93 +403,6 @@ def delete_k8s_ingress_object(ingress_name, namespace): ) -def create_dask_dashboard_ingress(workflow_id): - """Create K8S Ingress object for Dask dashboard.""" - # Define the middleware spec - middleware_spec = { - "apiVersion": "traefik.io/v1alpha1", - "kind": "Middleware", - "metadata": { - "name": get_dask_component_name( - workflow_id, "dashboard_ingress_middleware" - ), - "namespace": REANA_RUNTIME_KUBERNETES_NAMESPACE, - }, - "spec": { - "replacePathRegex": { - "regex": f"/{workflow_id}/dashboard/*", - "replacement": "/$1", - } - }, - } - - ingress = client.V1Ingress( - api_version="networking.k8s.io/v1", - kind="Ingress", - metadata=client.V1ObjectMeta( - name=get_dask_component_name(workflow_id, "dashboard_ingress"), - annotations={ - **REANA_INGRESS_ANNOTATIONS, - "traefik.ingress.kubernetes.io/router.middlewares": f"{REANA_RUNTIME_KUBERNETES_NAMESPACE}-{get_dask_component_name(workflow_id, 'dashboard_ingress_middleware')}@kubernetescrd", - }, - ), - spec=client.V1IngressSpec( - rules=[ - client.V1IngressRule( - host=REANA_INGRESS_HOST, - http=client.V1HTTPIngressRuleValue( - paths=[ - client.V1HTTPIngressPath( - path=f"/{workflow_id}/dashboard", - path_type="Prefix", - backend=client.V1IngressBackend( - service=client.V1IngressServiceBackend( - name=get_dask_component_name( - workflow_id, "dashboard_service" - ), - port=client.V1ServiceBackendPort(number=8787), - ) - ), - ) - ] - ), - ) - ] - ), - ) - if REANA_INGRESS_CLASS_NAME: - ingress.spec.ingress_class_name = REANA_INGRESS_CLASS_NAME - - # Create middleware for ingress - current_k8s_custom_objects_api_client.create_namespaced_custom_object( - group="traefik.io", - version="v1alpha1", - namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, - plural="middlewares", - body=middleware_spec, - ) - # Create the ingress resource - current_k8s_networking_api_client.create_namespaced_ingress( - namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, body=ingress - ) - - -def delete_dask_dashboard_ingress(workflow_id): - """Delete K8S Ingress Object for Dask dashboard.""" - current_k8s_networking_api_client.delete_namespaced_ingress( - name=get_dask_component_name(workflow_id, "dashboard_ingress"), - namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, - body=client.V1DeleteOptions(), - ) - current_k8s_custom_objects_api_client.delete_namespaced_custom_object( - group="traefik.io", - version="v1alpha1", - namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, - plural="middlewares", - name=get_dask_component_name(workflow_id, "dashboard_ingress_middleware"), - ) - - def check_pod_readiness_by_prefix( pod_name_prefix, namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE ):