diff --git a/.github/workflows/build_tests.yaml b/.github/workflows/build_tests.yaml index bea777f..f9f4a19 100644 --- a/.github/workflows/build_tests.yaml +++ b/.github/workflows/build_tests.yaml @@ -62,7 +62,7 @@ jobs: - name: Wait for workload completion and confirm it succeeded run: python3 xpk.py workload list --cluster $TPU_CLUSTER_NAME --zone=us-central2-b --wait-for-job-completion $WORKLOAD_NAME --timeout 300 - name: Run a Pathways workload on Ubuntu base image - run: python xpk.py workload create-pathways --cluster $TPU_CLUSTER_NAME --workload $PATHWAYS_WORKLOAD_NAME --docker-image='marketplace.gcr.io/google/ubuntu2004' --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b --command "bash test.sh" + run: python xpk.py workload create-pathways --cluster $TPU_CLUSTER_NAME --workload $PATHWAYS_WORKLOAD_NAME --docker-image='marketplace.gcr.io/google/ubuntu2004' --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b --command "echo \"Hello world from a test script! \"" - name: Wait for Pathways workload completion and confirm it succeeded run: python3 xpk.py workload list --cluster $TPU_CLUSTER_NAME --zone=us-central2-b --wait-for-job-completion $PATHWAYS_WORKLOAD_NAME --timeout 300 - name: List out the workloads on the cluster diff --git a/.github/workflows/nightly_tests.yaml b/.github/workflows/nightly_tests.yaml index d4a756e..2b5410f 100644 --- a/.github/workflows/nightly_tests.yaml +++ b/.github/workflows/nightly_tests.yaml @@ -102,7 +102,7 @@ jobs: - name: Create test script to execute in workloads run: echo -e '#!/bin/bash \n echo "Hello world from a test script!"' > test.sh - name: Run a Pathways workload on Ubuntu base image - run: python xpk.py workload create-pathways --cluster $PATHWAYS_TPU_CLUSTER_NAME --workload $PATHWAYS_WORKLOAD_NAME --docker-image='marketplace.gcr.io/google/ubuntu2004' --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b --command "bash test.sh" + run: python xpk.py workload create-pathways --cluster $PATHWAYS_TPU_CLUSTER_NAME --workload $PATHWAYS_WORKLOAD_NAME --docker-image='marketplace.gcr.io/google/ubuntu2004' --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b --command "echo \"Hello world from a test script! \"" - name: Wait for Pathways workload completion and confirm it succeeded run: python3 xpk.py workload list --cluster $PATHWAYS_TPU_CLUSTER_NAME --zone=us-central2-b --wait-for-job-completion $PATHWAYS_WORKLOAD_NAME --timeout 300 - name: Delete the Pathways workload on the cluster diff --git a/xpk.py b/xpk.py index 1a05f27..88042fc 100644 --- a/xpk.py +++ b/xpk.py @@ -5030,7 +5030,7 @@ def get_main_container(args, system, docker_image, resource_type) -> str: ) xpk_return_user_exit_code = '' - if not args.use_pathways and args.restart_on_user_code_failure: + if args.restart_on_user_code_failure: if int(args.max_restarts) <= 0: xpk_print( f'Warning: --max-restarts, is set to {args.max_restarts}. Will not' @@ -5403,17 +5403,11 @@ def get_user_workload_for_pathways(args, system: SystemCharacteristics) -> str: path: /tmp type: DirectoryOrCreate name: shared-tmp""" - if args.use_pathways and not args.headless: - if not args.command: - xpk_print( - 'Please provide a command, if you wish to use Pathways with a docker' - ' container.' - ) - xpk_exit(1) + if args.headless: + return '' + else: container, _ = get_user_workload_container(args, system) return user_workload_yaml.format(args=args, container=container) - else: - return '' def get_env_container(args, system: SystemCharacteristics): @@ -6045,15 +6039,6 @@ def workload_create(args) -> None: """ add_zone_and_project(args) - if args.command is None and not args.headless: - xpk_print( - 'Please provide a command using "--command" for the docker container to' - ' execute. Command is not required if you wish to run Pathways' - ' workloads in headless mode (`xpk workload create-pathways' - ' --headless`).' - ) - xpk_exit(1) - if args.headless and not is_cluster_using_clouddns(args): xpk_print( 'Please run xpk cluster create-pathways first, to upgrade and enable' @@ -6157,28 +6142,9 @@ def workload_create(args) -> None: gpu_rxdm_cmd=get_gpu_rxdm_cmd(system), gpu_tcp_volume=get_gpu_tcp_volume(system), ) - elif args.use_pathways: - # Ensure the cluster and CPU nodepools were created with create-pathways - all_node_pools = get_all_nodepools_programmatic(args) - desired_pw_cpu_node_pools = {'cpu-user-np', 'cpu-rm-np', 'cpu-proxy-np'} - if not desired_pw_cpu_node_pools.issubset(set(all_node_pools[0])): - xpk_print( - 'Cluster needs to be created with `xpk create-pathways` to run' - ' Pathways workloads.' - ) - xpk_exit(1) - - # Ensure device type is TPUs - currently Pathways supports TPUs only. - if system.accelerator_type != AcceleratorType['TPU']: - xpk_print('Currently, Pathways workloads can only be run on TPUs.') - xpk_exit(1) - - # Set proxy address to be consumed in helper methods and displayed to user. - args.pathways_proxy_address = get_proxy_address(args) - - # Set the job which determines the life of other Pathways jobs - args.targetReplicatedJob = 'proxy' if args.headless else 'main' - + elif args.use_pathways and ensure_pathways_workload_prerequisites( + args, system + ): yml_string = pw_workload_create_yaml.format( args=args, system=system, @@ -6273,6 +6239,53 @@ def workload_create(args) -> None: xpk_exit(0) +def ensure_pathways_workload_prerequisites(args, system) -> bool: + """Check all Pathways workload prerequisites and set necessary args. + + Args: + args: user provided arguments for running the command. + system: system characteristics. + + Returns: + True once conditions satisfy and variables are set. Exits otherwise. + """ + # Ensure command is provided if not using Pathways in headless mode + if args.command is None and not args.headless: + xpk_print( + 'Please provide a command using "--command" for the docker container to' + ' execute. Command is not required if you wish to run Pathways' + ' workloads in headless mode (`xpk workload create-pathways' + ' --headless`).' + ) + xpk_exit(1) + + # Ensure the cluster and CPU nodepools were created with create-pathways + all_node_pools = get_all_nodepools_programmatic(args) + desired_pw_cpu_node_pools = {'cpu-user-np', 'cpu-rm-np', 'cpu-proxy-np'} + if not desired_pw_cpu_node_pools.issubset(set(all_node_pools[0])): + xpk_print( + 'Cluster needs to be created with `xpk create-pathways` to run' + ' Pathways workloads.' + ) + xpk_exit(1) + + # Ensure device type is TPUs - currently Pathways supports TPUs only. + if system.accelerator_type != AcceleratorType['TPU']: + xpk_print('Currently, Pathways workloads can only be run on TPUs.') + xpk_exit(1) + + # Set proxy address to be consumed in helper methods and displayed to user. + args.pathways_proxy_address = get_proxy_address(args) + + # Set the job which determines the life of other Pathways jobs + args.targetReplicatedJob = 'proxy' if args.headless else 'main' + + # Always report user code failures back to JobSet. + args.restart_on_user_code_failure = True + + return True + + def workload_delete(args) -> None: """Function around workload delete. @@ -7216,6 +7229,17 @@ def add_shared_workload_create_optional_arguments(args_parsers): ' the workload.' ), ) + custom_parser.add_argument( + '--restart-on-user-code-failure', + action='store_true', + help=( + 'Adding this argument will return user failures back to the jobset' + ' manager allowing restarts on user code when --max-restarts is set' + ' greater than 0. By default, this is not enabled, and workloads' + ' will not restart from user code failures. This is enabled by' + ' default on Pathways workloads.' + ), + ) custom_parser.add_argument( '--headless', action='store_true', @@ -7825,16 +7849,6 @@ def directory_path_type(value): 'and forward them to Cloud Logging for TPU workloads.' ), ) -workload_create_parser_optional_arguments.add_argument( - '--restart-on-user-code-failure', - action='store_true', - help=( - 'Adding this argument will return user failures back to the jobset' - ' manager allowing restarts on user code when --max-restarts is set' - ' greater than 0. By default, this is not enabled, and workloads will' - ' not restart from user code failures.' - ), -) # Autoprovisioning workload arguments workload_create_autoprovisioning_arguments.add_argument(