From 7299390512ad6995379ba796872eb9a642feac66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Irving=20Mondrag=C3=B3n?= Date: Tue, 7 Jan 2025 10:45:04 +0100 Subject: [PATCH] Set up Jobset failure policy on workloads --- src/xpk/commands/workload.py | 271 ++++++++++++++++++++--------------- src/xpk/core/core.py | 15 +- src/xpk/core/pathways.py | 56 ++++---- 3 files changed, 184 insertions(+), 158 deletions(-) diff --git a/src/xpk/commands/workload.py b/src/xpk/commands/workload.py index 4ba7aa76..be694e69 100644 --- a/src/xpk/commands/workload.py +++ b/src/xpk/commands/workload.py @@ -36,6 +36,7 @@ get_gpu_scheduler, get_gpu_tcp_volume, get_gpu_volume, + get_main_container_docker_image, get_user_workload_container, get_volumes, parse_env_config, @@ -78,6 +79,7 @@ alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool # 1:1 job replica to node pool assignment spec: failurePolicy: + {failure_policy_rules} maxRestarts: {args.max_restarts} replicatedJobs: - name: slice-job @@ -87,6 +89,7 @@ parallelism: {system.vms_per_slice} # Equal to the number of VMs per slice completions: {system.vms_per_slice} # Same as the above. backoffLimit: 0 # When any pod fails, the job is failed + {pod_failure_policy} template: metadata: labels: @@ -119,6 +122,7 @@ xpk.google.com/workload: {args.workload} spec: failurePolicy: + {failure_policy_rules} maxRestarts: {args.max_restarts} replicatedJobs: - name: slice-job @@ -128,6 +132,7 @@ parallelism: {args.num_nodes} completions: {args.num_nodes} backoffLimit: 0 # When any pod fails, the job is failed + {pod_failure_policy} template: metadata: labels: @@ -176,6 +181,7 @@ xpk.google.com/workload: {args.workload} spec: failurePolicy: + {failure_policy_rules} maxRestarts: {args.max_restarts} replicatedJobs: - name: slice-job @@ -185,6 +191,7 @@ parallelism: {args.num_nodes} completions: {args.num_nodes} backoffLimit: 0 # When any pod fails, the job is failed + {pod_failure_policy} template: metadata: labels: @@ -212,129 +219,131 @@ xpk.google.com/workload: {args.workload} spec: failurePolicy: + {failure_policy_rules} maxRestarts: {args.max_restarts} successPolicy: operator: "All" targetReplicatedJobs: - {args.targetReplicatedJob} replicatedJobs: - - name: worker - replicas: {args.num_slices} - template: - metadata: - annotations: - alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool - labels: - xpk.google.com/workload: {args.workload} - spec: - backoffLimit: {backoff_limit} - completions: {system.vms_per_slice} - parallelism: {system.vms_per_slice} - template: - spec: - terminationGracePeriodSeconds: {args.termination_grace_period_seconds} - containers: - - args: - {pathways_worker_args} - image: {args.server_image} - imagePullPolicy: Always - name: pathways-worker - ports: - - containerPort: 29001 - - containerPort: 8471 - - containerPort: 8080 - resources: - limits: - {resource_type}: {system.chips_per_vm} - securityContext: - privileged: true - volumeMounts: - - mountPath: /tmp + - name: worker + replicas: {args.num_slices} + template: + metadata: + annotations: + alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool + labels: + xpk.google.com/workload: {args.workload} + spec: + backoffLimit: {backoff_limit} + completions: {system.vms_per_slice} + parallelism: {system.vms_per_slice} + {pod_failure_policy} + template: + spec: + terminationGracePeriodSeconds: {args.termination_grace_period_seconds} + containers: + - args: + {pathways_worker_args} + image: {args.server_image} + imagePullPolicy: Always + name: pathways-worker + ports: + - containerPort: 29001 + - containerPort: 8471 + - containerPort: 8080 + resources: + limits: + {resource_type}: {system.chips_per_vm} + securityContext: + privileged: true + volumeMounts: + - mountPath: /tmp + name: shared-tmp + nodeSelector: + {accelerator_label} + {machine_label} + {autoprovisioning_args} + priorityClassName: {args.priority} + hostNetwork: true + dnsPolicy: ClusterFirstWithHostNet + volumes: + - hostPath: + path: /tmp + type: DirectoryOrCreate name: shared-tmp - nodeSelector: - {accelerator_label} - {machine_label} - {autoprovisioning_args} - priorityClassName: {args.priority} - hostNetwork: true - dnsPolicy: ClusterFirstWithHostNet - volumes: - - hostPath: - path: /tmp - type: DirectoryOrCreate - name: shared-tmp - - name: rm - replicas: 1 - template: - metadata: - labels: - xpk.google.com/workload: {args.workload} - spec: - backoffLimit: 0 - completions: 1 - parallelism: 1 - template: - spec: - containers: - - args: - {pathways_rm_args} - env: - - name: REPLICATED_JOB_NAME - valueFrom: - fieldRef: - fieldPath: metadata.annotations['jobset.sigs.k8s.io/replicatedjob-name'] - - name: JOBSET_NAME - valueFrom: - fieldRef: - fieldPath: metadata.annotations['jobset.sigs.k8s.io/jobset-name'] - - name: HOST_ADDRESS - value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME) - - name: TPU_SKIP_MDS_QUERY - value: "true" - image: {args.server_image} - imagePullPolicy: Always - name: pathways-rm - ports: - - containerPort: 29001 - securityContext: - privileged: true - volumeMounts: - - mountPath: /tmp + - name: rm + replicas: 1 + template: + metadata: + labels: + xpk.google.com/workload: {args.workload} + spec: + backoffLimit: 0 + completions: 1 + parallelism: 1 + template: + spec: + containers: + - args: + {pathways_rm_args} + env: + - name: REPLICATED_JOB_NAME + valueFrom: + fieldRef: + fieldPath: metadata.annotations['jobset.sigs.k8s.io/replicatedjob-name'] + - name: JOBSET_NAME + valueFrom: + fieldRef: + fieldPath: metadata.annotations['jobset.sigs.k8s.io/jobset-name'] + - name: HOST_ADDRESS + value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME) + - name: TPU_SKIP_MDS_QUERY + value: "true" + image: {args.server_image} + imagePullPolicy: Always + name: pathways-rm + ports: + - containerPort: 29001 + securityContext: + privileged: true + volumeMounts: + - mountPath: /tmp + name: shared-tmp + nodeSelector: + cloud.google.com/gke-nodepool: cpu-rm-np + hostNetwork: true + dnsPolicy: ClusterFirstWithHostNet + volumes: + - hostPath: + path: /tmp + type: DirectoryOrCreate name: shared-tmp - nodeSelector: - cloud.google.com/gke-nodepool: cpu-rm-np - hostNetwork: true - dnsPolicy: ClusterFirstWithHostNet - volumes: - - hostPath: - path: /tmp - type: DirectoryOrCreate - name: shared-tmp - - name: proxy - replicas: 1 - template: - metadata: - labels: - xpk.google.com/workload: {args.workload} - spec: - backoffLimit: 0 - completions: 1 - parallelism: 1 - template: - spec: - containers: - - args: - {pathways_proxy_args} - image: {args.proxy_server_image} - imagePullPolicy: Always - name: pathways-proxy - ports: - - containerPort: 29000 - hostNetwork: true - dnsPolicy: ClusterFirstWithHostNet - nodeSelector: - cloud.google.com/gke-nodepool: cpu-proxy-np - {user_workload} + - name: proxy + replicas: 1 + template: + metadata: + labels: + xpk.google.com/workload: {args.workload} + spec: + backoffLimit: 0 + completions: 1 + parallelism: 1 + template: + spec: + containers: + - args: + {pathways_proxy_args} + image: {args.proxy_server_image} + imagePullPolicy: Always + name: pathways-proxy + ports: + - containerPort: 29000 + hostNetwork: true + dnsPolicy: ClusterFirstWithHostNet + nodeSelector: + cloud.google.com/gke-nodepool: cpu-proxy-np + {user_workload} """ @@ -445,6 +454,27 @@ def workload_create(args) -> None: if return_code != 0: xpk_exit(return_code) + failure_policy_rules = '' + pod_failure_policy = '' + if args.restart_on_user_code_failure: + if int(args.max_restarts) <= 0: + xpk_print( + f'Warning: --max-restarts, is set to {args.max_restarts}. Will not' + ' restart on user failure.' + ) + failure_policy_rules = """rules: + - action: FailJobSet + onJobFailureReasons: + - PodFailurePolicy""" + pod_failure_policy = f"""# podFailurePolicy which fails job immediately if job was not killed by SIGTERM (i.e., graceful node shutdown for maintenance events) + podFailurePolicy: + rules: + - action: FailJob + onExitCodes: + containerName: {get_main_container_docker_image(args, system)} + operator: NotIn + values: [143] # SIGTERM = exit code 143""" + # Create the workload file based on accelerator type or workload type. if system.accelerator_type == AcceleratorType['GPU']: container, debugging_dashboard_id = get_user_workload_container( @@ -458,7 +488,10 @@ def workload_create(args) -> None: if system.device_type in cluster_gcluster.supported_device_types: yml_string = a3_gpu_workload_create_yaml.format( - args=args, container=container + args=args, + container=container, + failure_policy_rules=failure_policy_rules, + pod_failure_policy=pod_failure_policy, ) if args.device_type == cluster_gcluster.a3mega_device_type: @@ -481,6 +514,8 @@ def workload_create(args) -> None: gpu_rxdm_image=get_gpu_rxdm_image(system), gpu_rxdm_cmd=get_gpu_rxdm_cmd(system), gpu_tcp_volume=get_gpu_tcp_volume(system), + failure_policy_rules=failure_policy_rules, + pod_failure_policy=pod_failure_policy, ) elif args.use_pathways and ensure_pathways_workload_prerequisites( args, system @@ -502,6 +537,8 @@ def workload_create(args) -> None: local_queue_name=LOCAL_QUEUE_NAME, autoprovisioning_args=autoprovisioning_args, backoff_limit=system.vms_per_slice * 4, + failure_policy_rules=failure_policy_rules, + pod_failure_policy=pod_failure_policy, ) else: container, debugging_dashboard_id = get_user_workload_container( @@ -519,6 +556,8 @@ def workload_create(args) -> None: local_queue_name=LOCAL_QUEUE_NAME, autoprovisioning_args=autoprovisioning_args, volumes=get_volumes(args, system), + failure_policy_rules=failure_policy_rules, + pod_failure_policy=pod_failure_policy, ) tmp = write_tmp_file(yml_string) command = f'kubectl apply -f {str(tmp.file.name)}' diff --git a/src/xpk/core/core.py b/src/xpk/core/core.py index b0c8095b..84a5312c 100644 --- a/src/xpk/core/core.py +++ b/src/xpk/core/core.py @@ -1989,15 +1989,6 @@ def get_main_container(args, system, docker_image, resource_type) -> str: 'touch /shared-volume/stacktrace_signal; ' ) - xpk_return_user_exit_code = '' - if args.restart_on_user_code_failure: - if int(args.max_restarts) <= 0: - xpk_print( - f'Warning: --max-restarts, is set to {args.max_restarts}. Will not' - ' restart on user failure.' - ) - xpk_return_user_exit_code = 'exit $EXIT_CODE' - yaml = """- name: {docker_name} image: {docker_image} {image_pull_policy} @@ -2026,10 +2017,7 @@ def get_main_container(args, system, docker_image, resource_type) -> str: echo EXIT_CODE=$EXIT_CODE; {tpu_stacktrace_terminate_command} {gpu_workload_terminate_command} - if [ "$EXIT_CODE" = 143 ]; then - exit $EXIT_CODE - fi - {xpk_return_user_exit_code} + exit $EXIT_CODE resources: limits: {resources} @@ -2056,7 +2044,6 @@ def get_main_container(args, system, docker_image, resource_type) -> str: xpk_internal_commands=xpk_internal_commands, resources=get_main_container_resources(args, system, resource_type), volume_mounts=volume_mounts, - xpk_return_user_exit_code=xpk_return_user_exit_code, ) diff --git a/src/xpk/core/pathways.py b/src/xpk/core/pathways.py index 71ed27ea..4b8ad58c 100644 --- a/src/xpk/core/pathways.py +++ b/src/xpk/core/pathways.py @@ -41,8 +41,8 @@ def get_pathways_worker_args(args) -> str: str: yaml containing arguments for the Pathways workers. """ yaml = """- --server_port=29001 - - --resource_manager_address={rm_address} - - --gcs_scratch_location={args.pathways_gcs_location}""" + - --resource_manager_address={rm_address} + - --gcs_scratch_location={args.pathways_gcs_location}""" if args.use_pathways: return yaml.format(args=args, rm_address=get_rm_address(args)) else: @@ -58,8 +58,8 @@ def get_pathways_proxy_args(args) -> str: str: yaml containing arguments for the Pathways proxy. """ yaml = """- --server_port=29000 - - --resource_manager_address={rm_address} - - --gcs_scratch_location={args.pathways_gcs_location}""" + - --resource_manager_address={rm_address} + - --gcs_scratch_location={args.pathways_gcs_location}""" if args.use_pathways: return yaml.format(args=args, rm_address=get_rm_address(args)) @@ -198,10 +198,10 @@ def get_pathways_rm_args(args, system: SystemCharacteristics) -> str: str: yaml containing arguments for the Pathways resource manager. """ yaml = """- --server_port=29001 - - --gcs_scratch_location={args.pathways_gcs_location} - - --node_type=resource_manager - - --instance_count={instance_count} - - --instance_type={instance_type}""" + - --gcs_scratch_location={args.pathways_gcs_location} + - --node_type=resource_manager + - --instance_count={instance_count} + - --instance_type={instance_type}""" if args.use_pathways: return yaml.format( args=args, @@ -227,27 +227,27 @@ def get_user_workload_for_pathways(args, system: SystemCharacteristics) -> str: Pathways server port as a YAML string """ user_workload_yaml = """- name: main - replicas: 1 - template: - metadata: - labels: - xpk.google.com/workload: {args.workload} - spec: - backoffLimit: 0 - completions: 1 - parallelism: 1 - template: - spec: - containers: + replicas: 1 + template: + metadata: + labels: + xpk.google.com/workload: {args.workload} + spec: + backoffLimit: 0 + completions: 1 + parallelism: 1 + template: + spec: + containers: {container} - nodeSelector: - cloud.google.com/gke-nodepool: cpu-user-np - restartPolicy: OnFailure - volumes: - - hostPath: - path: /tmp - type: DirectoryOrCreate - name: shared-tmp""" + nodeSelector: + cloud.google.com/gke-nodepool: cpu-user-np + restartPolicy: OnFailure + volumes: + - hostPath: + path: /tmp + type: DirectoryOrCreate + name: shared-tmp""" if args.headless: return '' else: