Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set up JobSet failure policy on workloads #308

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 155 additions & 116 deletions src/xpk/commands/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
get_gpu_scheduler,
get_gpu_tcp_volume,
get_gpu_volume,
get_main_container_docker_image,
get_user_workload_container,
get_volumes,
parse_env_config,
Expand Down Expand Up @@ -79,6 +80,7 @@
spec:
ttlSecondsAfterFinished: {args.ttl_seconds_after_finished}
failurePolicy:
{failure_policy_rules}
maxRestarts: {args.max_restarts}
replicatedJobs:
- name: slice-job
Expand All @@ -88,6 +90,7 @@
parallelism: {system.vms_per_slice} # Equal to the number of VMs per slice
completions: {system.vms_per_slice} # Same as the above.
backoffLimit: 0 # When any pod fails, the job is failed
{pod_failure_policy}
template:
metadata:
labels:
Expand Down Expand Up @@ -121,6 +124,7 @@
spec:
ttlSecondsAfterFinished: {args.ttl_seconds_after_finished}
failurePolicy:
{failure_policy_rules}
maxRestarts: {args.max_restarts}
replicatedJobs:
- name: slice-job
Expand All @@ -130,6 +134,7 @@
parallelism: {args.num_nodes}
completions: {args.num_nodes}
backoffLimit: 0 # When any pod fails, the job is failed
{pod_failure_policy}
template:
metadata:
labels:
Expand Down Expand Up @@ -179,6 +184,7 @@
spec:
ttlSecondsAfterFinished: {args.ttl_seconds_after_finished}
failurePolicy:
{failure_policy_rules}
maxRestarts: {args.max_restarts}
replicatedJobs:
- name: slice-job
Expand All @@ -188,6 +194,7 @@
parallelism: {args.num_nodes}
completions: {args.num_nodes}
backoffLimit: 0 # When any pod fails, the job is failed
{pod_failure_policy}
template:
metadata:
labels:
Expand Down Expand Up @@ -215,129 +222,131 @@
xpk.google.com/workload: {args.workload}
spec:
failurePolicy:
{failure_policy_rules}
maxRestarts: {args.max_restarts}
successPolicy:
operator: "All"
targetReplicatedJobs:
- {args.targetReplicatedJob}
replicatedJobs:
- name: worker
replicas: {args.num_slices}
template:
metadata:
annotations:
alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: {backoff_limit}
completions: {system.vms_per_slice}
parallelism: {system.vms_per_slice}
template:
spec:
terminationGracePeriodSeconds: {args.termination_grace_period_seconds}
containers:
- args:
{pathways_worker_args}
image: {args.server_image}
imagePullPolicy: Always
name: pathways-worker
ports:
- containerPort: 29001
- containerPort: 8471
- containerPort: 8080
resources:
limits:
{resource_type}: {system.chips_per_vm}
securityContext:
privileged: true
volumeMounts:
- mountPath: /tmp
- name: worker
replicas: {args.num_slices}
template:
metadata:
annotations:
alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: {backoff_limit}
completions: {system.vms_per_slice}
parallelism: {system.vms_per_slice}
{pod_failure_policy}
template:
spec:
terminationGracePeriodSeconds: {args.termination_grace_period_seconds}
containers:
- args:
{pathways_worker_args}
image: {args.server_image}
imagePullPolicy: Always
name: pathways-worker
ports:
- containerPort: 29001
- containerPort: 8471
- containerPort: 8080
resources:
limits:
{resource_type}: {system.chips_per_vm}
securityContext:
privileged: true
volumeMounts:
- mountPath: /tmp
name: shared-tmp
nodeSelector:
{accelerator_label}
{machine_label}
{autoprovisioning_args}
priorityClassName: {args.priority}
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
volumes:
- hostPath:
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
nodeSelector:
{accelerator_label}
{machine_label}
{autoprovisioning_args}
priorityClassName: {args.priority}
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
volumes:
- hostPath:
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
- name: rm
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_rm_args}
env:
- name: REPLICATED_JOB_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/replicatedjob-name']
- name: JOBSET_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/jobset-name']
- name: HOST_ADDRESS
value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME)
- name: TPU_SKIP_MDS_QUERY
value: "true"
image: {args.server_image}
imagePullPolicy: Always
name: pathways-rm
ports:
- containerPort: 29001
securityContext:
privileged: true
volumeMounts:
- mountPath: /tmp
- name: rm
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_rm_args}
env:
- name: REPLICATED_JOB_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/replicatedjob-name']
- name: JOBSET_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/jobset-name']
- name: HOST_ADDRESS
value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME)
- name: TPU_SKIP_MDS_QUERY
value: "true"
image: {args.server_image}
imagePullPolicy: Always
name: pathways-rm
ports:
- containerPort: 29001
securityContext:
privileged: true
volumeMounts:
- mountPath: /tmp
name: shared-tmp
nodeSelector:
cloud.google.com/gke-nodepool: cpu-rm-np
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
volumes:
- hostPath:
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
nodeSelector:
cloud.google.com/gke-nodepool: cpu-rm-np
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
volumes:
- hostPath:
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
- name: proxy
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_proxy_args}
image: {args.proxy_server_image}
imagePullPolicy: Always
name: pathways-proxy
ports:
- containerPort: 29000
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
nodeSelector:
cloud.google.com/gke-nodepool: cpu-proxy-np
{user_workload}
- name: proxy
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_proxy_args}
image: {args.proxy_server_image}
imagePullPolicy: Always
name: pathways-proxy
ports:
- containerPort: 29000
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
nodeSelector:
cloud.google.com/gke-nodepool: cpu-proxy-np
{user_workload}
"""


Expand Down Expand Up @@ -448,6 +457,27 @@ def workload_create(args) -> None:
if return_code != 0:
xpk_exit(return_code)

failure_policy_rules = ''
pod_failure_policy = ''
if args.restart_on_user_code_failure:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great to use failure policy!! Looking forward to this change.

I named this argument poorly -- its intent is to allow for the workload to restart additionally on user code failures when set. The range of exit codes this should specify is anything that a user would specify like 1,2,...

Some system level exit codes that should always restart the job are 42, 143. Probably should ask GKE folks what other system level exit codes exist. These should always restart the job and should be set / or assumed in the pod failure policy to restart by default.


For background some customers depend on xpk to restart the job in system failure cases by default and avoid user errors (like code typos) from restarting their workload. So this is the default case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I don't think anyone actually wants a user error to restart the job so the ideal choice in my opinion is to remove the "args.restart_on_user_code_failure" argument and just "restart the job in system failure cases by default and avoid user errors (like code typos) from restarting their workload". Since now with the failure policy support we can do this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the information. Then, to sum up:

  1. Default behavior should be: restart on exit codes 42 and 143.
  2. Remove restart-on-user-code-failure flag but allow users to pass additional exit codes for restarting workload.

Is that correct? And how is the user going to pass these additional exit codes, should we have another flag for that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Obliviour can you confirm those are the changes needed?

if int(args.max_restarts) <= 0:
xpk_print(
f'Warning: --max-restarts, is set to {args.max_restarts}. Will not'
' restart on user failure.'
)
failure_policy_rules = """rules:
- action: FailJobSet
onJobFailureReasons:
- PodFailurePolicy"""
pod_failure_policy = f"""# podFailurePolicy which fails job immediately if job was not killed by SIGTERM (i.e., graceful node shutdown for maintenance events)
podFailurePolicy:
rules:
- action: FailJob
onExitCodes:
containerName: {get_main_container_docker_image(args, system)}
operator: NotIn
values: [143] # SIGTERM = exit code 143"""

# Create the workload file based on accelerator type or workload type.
if system.accelerator_type == AcceleratorType['GPU']:
container, debugging_dashboard_id = get_user_workload_container(
Expand All @@ -461,7 +491,10 @@ def workload_create(args) -> None:

if system.device_type in cluster_gcluster.supported_device_types:
yml_string = a3_gpu_workload_create_yaml.format(
args=args, container=container
args=args,
container=container,
failure_policy_rules=failure_policy_rules,
pod_failure_policy=pod_failure_policy,
)

if args.device_type == cluster_gcluster.a3mega_device_type:
Expand All @@ -484,6 +517,8 @@ def workload_create(args) -> None:
gpu_rxdm_image=get_gpu_rxdm_image(system),
gpu_rxdm_cmd=get_gpu_rxdm_cmd(system),
gpu_tcp_volume=get_gpu_tcp_volume(system),
failure_policy_rules=failure_policy_rules,
pod_failure_policy=pod_failure_policy,
)
elif args.use_pathways and ensure_pathways_workload_prerequisites(
args, system
Expand All @@ -505,6 +540,8 @@ def workload_create(args) -> None:
local_queue_name=LOCAL_QUEUE_NAME,
autoprovisioning_args=autoprovisioning_args,
backoff_limit=system.vms_per_slice * 4,
failure_policy_rules=failure_policy_rules,
pod_failure_policy=pod_failure_policy,
)
else:
container, debugging_dashboard_id = get_user_workload_container(
Expand All @@ -522,6 +559,8 @@ def workload_create(args) -> None:
local_queue_name=LOCAL_QUEUE_NAME,
autoprovisioning_args=autoprovisioning_args,
volumes=get_volumes(args, system),
failure_policy_rules=failure_policy_rules,
pod_failure_policy=pod_failure_policy,
)
tmp = write_tmp_file(yml_string)
command = f'kubectl apply -f {str(tmp.file.name)}'
Expand Down
Loading
Loading