Skip to content

Commit

Permalink
Add logic to fail Pathways jobs on user code errors. (#152)
Browse files Browse the repository at this point in the history
* Fail JobSet for Pathways jobs, for erroneous user code.

* Create a function with some Pathways workloads prerequisites.

* Update build and nightly tests for Pathways jobs.

* Minor change, address comments.
  • Loading branch information
RoshaniN authored Jun 7, 2024
1 parent 78115ef commit 2512b40
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
- name: Wait for workload completion and confirm it succeeded
run: python3 xpk.py workload list --cluster $TPU_CLUSTER_NAME --zone=us-central2-b --wait-for-job-completion $WORKLOAD_NAME --timeout 300
- name: Run a Pathways workload on Ubuntu base image
run: python xpk.py workload create-pathways --cluster $TPU_CLUSTER_NAME --workload $PATHWAYS_WORKLOAD_NAME --docker-image='marketplace.gcr.io/google/ubuntu2004' --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b --command "bash test.sh"
run: python xpk.py workload create-pathways --cluster $TPU_CLUSTER_NAME --workload $PATHWAYS_WORKLOAD_NAME --docker-image='marketplace.gcr.io/google/ubuntu2004' --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b --command "echo \"Hello world from a test script! \""
- name: Wait for Pathways workload completion and confirm it succeeded
run: python3 xpk.py workload list --cluster $TPU_CLUSTER_NAME --zone=us-central2-b --wait-for-job-completion $PATHWAYS_WORKLOAD_NAME --timeout 300
- name: List out the workloads on the cluster
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/nightly_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ jobs:
- name: Create test script to execute in workloads
run: echo -e '#!/bin/bash \n echo "Hello world from a test script!"' > test.sh
- name: Run a Pathways workload on Ubuntu base image
run: python xpk.py workload create-pathways --cluster $PATHWAYS_TPU_CLUSTER_NAME --workload $PATHWAYS_WORKLOAD_NAME --docker-image='marketplace.gcr.io/google/ubuntu2004' --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b --command "bash test.sh"
run: python xpk.py workload create-pathways --cluster $PATHWAYS_TPU_CLUSTER_NAME --workload $PATHWAYS_WORKLOAD_NAME --docker-image='marketplace.gcr.io/google/ubuntu2004' --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b --command "echo \"Hello world from a test script! \""
- name: Wait for Pathways workload completion and confirm it succeeded
run: python3 xpk.py workload list --cluster $PATHWAYS_TPU_CLUSTER_NAME --zone=us-central2-b --wait-for-job-completion $PATHWAYS_WORKLOAD_NAME --timeout 300
- name: Delete the Pathways workload on the cluster
Expand Down
116 changes: 65 additions & 51 deletions xpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -5030,7 +5030,7 @@ def get_main_container(args, system, docker_image, resource_type) -> str:
)

xpk_return_user_exit_code = ''
if not args.use_pathways and args.restart_on_user_code_failure:
if args.restart_on_user_code_failure:
if int(args.max_restarts) <= 0:
xpk_print(
f'Warning: --max-restarts, is set to {args.max_restarts}. Will not'
Expand Down Expand Up @@ -5403,17 +5403,11 @@ def get_user_workload_for_pathways(args, system: SystemCharacteristics) -> str:
path: /tmp
type: DirectoryOrCreate
name: shared-tmp"""
if args.use_pathways and not args.headless:
if not args.command:
xpk_print(
'Please provide a command, if you wish to use Pathways with a docker'
' container.'
)
xpk_exit(1)
if args.headless:
return ''
else:
container, _ = get_user_workload_container(args, system)
return user_workload_yaml.format(args=args, container=container)
else:
return ''


def get_env_container(args, system: SystemCharacteristics):
Expand Down Expand Up @@ -6045,15 +6039,6 @@ def workload_create(args) -> None:
"""
add_zone_and_project(args)

if args.command is None and not args.headless:
xpk_print(
'Please provide a command using "--command" for the docker container to'
' execute. Command is not required if you wish to run Pathways'
' workloads in headless mode (`xpk workload create-pathways'
' --headless`).'
)
xpk_exit(1)

if args.headless and not is_cluster_using_clouddns(args):
xpk_print(
'Please run xpk cluster create-pathways first, to upgrade and enable'
Expand Down Expand Up @@ -6157,28 +6142,9 @@ def workload_create(args) -> None:
gpu_rxdm_cmd=get_gpu_rxdm_cmd(system),
gpu_tcp_volume=get_gpu_tcp_volume(system),
)
elif args.use_pathways:
# Ensure the cluster and CPU nodepools were created with create-pathways
all_node_pools = get_all_nodepools_programmatic(args)
desired_pw_cpu_node_pools = {'cpu-user-np', 'cpu-rm-np', 'cpu-proxy-np'}
if not desired_pw_cpu_node_pools.issubset(set(all_node_pools[0])):
xpk_print(
'Cluster needs to be created with `xpk create-pathways` to run'
' Pathways workloads.'
)
xpk_exit(1)

# Ensure device type is TPUs - currently Pathways supports TPUs only.
if system.accelerator_type != AcceleratorType['TPU']:
xpk_print('Currently, Pathways workloads can only be run on TPUs.')
xpk_exit(1)

# Set proxy address to be consumed in helper methods and displayed to user.
args.pathways_proxy_address = get_proxy_address(args)

# Set the job which determines the life of other Pathways jobs
args.targetReplicatedJob = 'proxy' if args.headless else 'main'

elif args.use_pathways and ensure_pathways_workload_prerequisites(
args, system
):
yml_string = pw_workload_create_yaml.format(
args=args,
system=system,
Expand Down Expand Up @@ -6273,6 +6239,53 @@ def workload_create(args) -> None:
xpk_exit(0)


def ensure_pathways_workload_prerequisites(args, system) -> bool:
"""Check all Pathways workload prerequisites and set necessary args.
Args:
args: user provided arguments for running the command.
system: system characteristics.
Returns:
True once conditions satisfy and variables are set. Exits otherwise.
"""
# Ensure command is provided if not using Pathways in headless mode
if args.command is None and not args.headless:
xpk_print(
'Please provide a command using "--command" for the docker container to'
' execute. Command is not required if you wish to run Pathways'
' workloads in headless mode (`xpk workload create-pathways'
' --headless`).'
)
xpk_exit(1)

# Ensure the cluster and CPU nodepools were created with create-pathways
all_node_pools = get_all_nodepools_programmatic(args)
desired_pw_cpu_node_pools = {'cpu-user-np', 'cpu-rm-np', 'cpu-proxy-np'}
if not desired_pw_cpu_node_pools.issubset(set(all_node_pools[0])):
xpk_print(
'Cluster needs to be created with `xpk create-pathways` to run'
' Pathways workloads.'
)
xpk_exit(1)

# Ensure device type is TPUs - currently Pathways supports TPUs only.
if system.accelerator_type != AcceleratorType['TPU']:
xpk_print('Currently, Pathways workloads can only be run on TPUs.')
xpk_exit(1)

# Set proxy address to be consumed in helper methods and displayed to user.
args.pathways_proxy_address = get_proxy_address(args)

# Set the job which determines the life of other Pathways jobs
args.targetReplicatedJob = 'proxy' if args.headless else 'main'

# Always report user code failures back to JobSet.
args.restart_on_user_code_failure = True

return True


def workload_delete(args) -> None:
"""Function around workload delete.
Expand Down Expand Up @@ -7216,6 +7229,17 @@ def add_shared_workload_create_optional_arguments(args_parsers):
' the workload.'
),
)
custom_parser.add_argument(
'--restart-on-user-code-failure',
action='store_true',
help=(
'Adding this argument will return user failures back to the jobset'
' manager allowing restarts on user code when --max-restarts is set'
' greater than 0. By default, this is not enabled, and workloads'
' will not restart from user code failures. This is enabled by'
' default on Pathways workloads.'
),
)
custom_parser.add_argument(
'--headless',
action='store_true',
Expand Down Expand Up @@ -7825,16 +7849,6 @@ def directory_path_type(value):
'and forward them to Cloud Logging for TPU workloads.'
),
)
workload_create_parser_optional_arguments.add_argument(
'--restart-on-user-code-failure',
action='store_true',
help=(
'Adding this argument will return user failures back to the jobset'
' manager allowing restarts on user code when --max-restarts is set'
' greater than 0. By default, this is not enabled, and workloads will'
' not restart from user code failures.'
),
)

# Autoprovisioning workload arguments
workload_create_autoprovisioning_arguments.add_argument(
Expand Down

0 comments on commit 2512b40

Please sign in to comment.