Skip to content

Commit

Permalink
Set up Jobset failure policy on workloads
Browse files Browse the repository at this point in the history
  • Loading branch information
IrvingMg committed Jan 7, 2025
1 parent 5e779cf commit 7299390
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 158 deletions.
271 changes: 155 additions & 116 deletions src/xpk/commands/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
get_gpu_scheduler,
get_gpu_tcp_volume,
get_gpu_volume,
get_main_container_docker_image,
get_user_workload_container,
get_volumes,
parse_env_config,
Expand Down Expand Up @@ -78,6 +79,7 @@
alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool # 1:1 job replica to node pool assignment
spec:
failurePolicy:
{failure_policy_rules}
maxRestarts: {args.max_restarts}
replicatedJobs:
- name: slice-job
Expand All @@ -87,6 +89,7 @@
parallelism: {system.vms_per_slice} # Equal to the number of VMs per slice
completions: {system.vms_per_slice} # Same as the above.
backoffLimit: 0 # When any pod fails, the job is failed
{pod_failure_policy}
template:
metadata:
labels:
Expand Down Expand Up @@ -119,6 +122,7 @@
xpk.google.com/workload: {args.workload}
spec:
failurePolicy:
{failure_policy_rules}
maxRestarts: {args.max_restarts}
replicatedJobs:
- name: slice-job
Expand All @@ -128,6 +132,7 @@
parallelism: {args.num_nodes}
completions: {args.num_nodes}
backoffLimit: 0 # When any pod fails, the job is failed
{pod_failure_policy}
template:
metadata:
labels:
Expand Down Expand Up @@ -176,6 +181,7 @@
xpk.google.com/workload: {args.workload}
spec:
failurePolicy:
{failure_policy_rules}
maxRestarts: {args.max_restarts}
replicatedJobs:
- name: slice-job
Expand All @@ -185,6 +191,7 @@
parallelism: {args.num_nodes}
completions: {args.num_nodes}
backoffLimit: 0 # When any pod fails, the job is failed
{pod_failure_policy}
template:
metadata:
labels:
Expand Down Expand Up @@ -212,129 +219,131 @@
xpk.google.com/workload: {args.workload}
spec:
failurePolicy:
{failure_policy_rules}
maxRestarts: {args.max_restarts}
successPolicy:
operator: "All"
targetReplicatedJobs:
- {args.targetReplicatedJob}
replicatedJobs:
- name: worker
replicas: {args.num_slices}
template:
metadata:
annotations:
alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: {backoff_limit}
completions: {system.vms_per_slice}
parallelism: {system.vms_per_slice}
template:
spec:
terminationGracePeriodSeconds: {args.termination_grace_period_seconds}
containers:
- args:
{pathways_worker_args}
image: {args.server_image}
imagePullPolicy: Always
name: pathways-worker
ports:
- containerPort: 29001
- containerPort: 8471
- containerPort: 8080
resources:
limits:
{resource_type}: {system.chips_per_vm}
securityContext:
privileged: true
volumeMounts:
- mountPath: /tmp
- name: worker
replicas: {args.num_slices}
template:
metadata:
annotations:
alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: {backoff_limit}
completions: {system.vms_per_slice}
parallelism: {system.vms_per_slice}
{pod_failure_policy}
template:
spec:
terminationGracePeriodSeconds: {args.termination_grace_period_seconds}
containers:
- args:
{pathways_worker_args}
image: {args.server_image}
imagePullPolicy: Always
name: pathways-worker
ports:
- containerPort: 29001
- containerPort: 8471
- containerPort: 8080
resources:
limits:
{resource_type}: {system.chips_per_vm}
securityContext:
privileged: true
volumeMounts:
- mountPath: /tmp
name: shared-tmp
nodeSelector:
{accelerator_label}
{machine_label}
{autoprovisioning_args}
priorityClassName: {args.priority}
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
volumes:
- hostPath:
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
nodeSelector:
{accelerator_label}
{machine_label}
{autoprovisioning_args}
priorityClassName: {args.priority}
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
volumes:
- hostPath:
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
- name: rm
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_rm_args}
env:
- name: REPLICATED_JOB_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/replicatedjob-name']
- name: JOBSET_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/jobset-name']
- name: HOST_ADDRESS
value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME)
- name: TPU_SKIP_MDS_QUERY
value: "true"
image: {args.server_image}
imagePullPolicy: Always
name: pathways-rm
ports:
- containerPort: 29001
securityContext:
privileged: true
volumeMounts:
- mountPath: /tmp
- name: rm
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_rm_args}
env:
- name: REPLICATED_JOB_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/replicatedjob-name']
- name: JOBSET_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/jobset-name']
- name: HOST_ADDRESS
value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME)
- name: TPU_SKIP_MDS_QUERY
value: "true"
image: {args.server_image}
imagePullPolicy: Always
name: pathways-rm
ports:
- containerPort: 29001
securityContext:
privileged: true
volumeMounts:
- mountPath: /tmp
name: shared-tmp
nodeSelector:
cloud.google.com/gke-nodepool: cpu-rm-np
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
volumes:
- hostPath:
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
nodeSelector:
cloud.google.com/gke-nodepool: cpu-rm-np
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
volumes:
- hostPath:
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
- name: proxy
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_proxy_args}
image: {args.proxy_server_image}
imagePullPolicy: Always
name: pathways-proxy
ports:
- containerPort: 29000
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
nodeSelector:
cloud.google.com/gke-nodepool: cpu-proxy-np
{user_workload}
- name: proxy
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_proxy_args}
image: {args.proxy_server_image}
imagePullPolicy: Always
name: pathways-proxy
ports:
- containerPort: 29000
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
nodeSelector:
cloud.google.com/gke-nodepool: cpu-proxy-np
{user_workload}
"""


Expand Down Expand Up @@ -445,6 +454,27 @@ def workload_create(args) -> None:
if return_code != 0:
xpk_exit(return_code)

failure_policy_rules = ''
pod_failure_policy = ''
if args.restart_on_user_code_failure:
if int(args.max_restarts) <= 0:
xpk_print(
f'Warning: --max-restarts, is set to {args.max_restarts}. Will not'
' restart on user failure.'
)
failure_policy_rules = """rules:
- action: FailJobSet
onJobFailureReasons:
- PodFailurePolicy"""
pod_failure_policy = f"""# podFailurePolicy which fails job immediately if job was not killed by SIGTERM (i.e., graceful node shutdown for maintenance events)
podFailurePolicy:
rules:
- action: FailJob
onExitCodes:
containerName: {get_main_container_docker_image(args, system)}
operator: NotIn
values: [143] # SIGTERM = exit code 143"""

# Create the workload file based on accelerator type or workload type.
if system.accelerator_type == AcceleratorType['GPU']:
container, debugging_dashboard_id = get_user_workload_container(
Expand All @@ -458,7 +488,10 @@ def workload_create(args) -> None:

if system.device_type in cluster_gcluster.supported_device_types:
yml_string = a3_gpu_workload_create_yaml.format(
args=args, container=container
args=args,
container=container,
failure_policy_rules=failure_policy_rules,
pod_failure_policy=pod_failure_policy,
)

if args.device_type == cluster_gcluster.a3mega_device_type:
Expand All @@ -481,6 +514,8 @@ def workload_create(args) -> None:
gpu_rxdm_image=get_gpu_rxdm_image(system),
gpu_rxdm_cmd=get_gpu_rxdm_cmd(system),
gpu_tcp_volume=get_gpu_tcp_volume(system),
failure_policy_rules=failure_policy_rules,
pod_failure_policy=pod_failure_policy,
)
elif args.use_pathways and ensure_pathways_workload_prerequisites(
args, system
Expand All @@ -502,6 +537,8 @@ def workload_create(args) -> None:
local_queue_name=LOCAL_QUEUE_NAME,
autoprovisioning_args=autoprovisioning_args,
backoff_limit=system.vms_per_slice * 4,
failure_policy_rules=failure_policy_rules,
pod_failure_policy=pod_failure_policy,
)
else:
container, debugging_dashboard_id = get_user_workload_container(
Expand All @@ -519,6 +556,8 @@ def workload_create(args) -> None:
local_queue_name=LOCAL_QUEUE_NAME,
autoprovisioning_args=autoprovisioning_args,
volumes=get_volumes(args, system),
failure_policy_rules=failure_policy_rules,
pod_failure_policy=pod_failure_policy,
)
tmp = write_tmp_file(yml_string)
command = f'kubectl apply -f {str(tmp.file.name)}'
Expand Down
Loading

0 comments on commit 7299390

Please sign in to comment.