Skip to content

Commit

Permalink
feat: Add support for reservation affinity in custom training jobs.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 662596199
  • Loading branch information
vertex-sdk-bot authored and copybara-github committed Aug 13, 2024
1 parent 0008735 commit 802609b
Show file tree
Hide file tree
Showing 4 changed files with 593 additions and 37 deletions.
192 changes: 176 additions & 16 deletions google/cloud/aiplatform/training_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import datetime
import time
from typing import Dict, List, Optional, Sequence, Tuple, Union
from typing import Dict, List, Literal, Optional, Sequence, Tuple, Union
from google.protobuf import json_format

import abc
Expand Down Expand Up @@ -1404,6 +1404,11 @@ def _prepare_and_validate_run(
reduction_server_replica_count: int = 0,
reduction_server_machine_type: Optional[str] = None,
tpu_topology: Optional[str] = None,
reservation_affinity_type: Optional[
Literal["NO_RESERVATION", "ANY_RESERVATION", "SPECIFIC_RESERVATION"]
] = None,
reservation_affinity_key: Optional[str] = None,
reservation_affinity_values: Optional[List[str]] = None,
) -> Tuple[worker_spec_utils._DistributedTrainingSpec, Optional[gca_model.Model]]:
"""Create worker pool specs and managed model as well validating the
run.
Expand Down Expand Up @@ -1451,6 +1456,23 @@ def _prepare_and_validate_run(
tpu_topology (str):
Optional. Only required if the machine type is a TPU
v5 version.
reservation_affinity_type (str):
Optional. The type of reservation affinity. One of:
* "NO_RESERVATION" : No reservation is used.
* "ANY_RESERVATION" : Any reservation that matches machine spec
can be used.
* "SPECIFIC_RESERVATION" : A specific reservation must be use
used. See reservation_affinity_key and
reservation_affinity_values for how to specify the reservation.
reservation_affinity_key (str):
Optional. Corresponds to the label key of a reservation resource.
To target a SPECIFIC_RESERVATION by name, use
`compute.googleapis.com/reservation-name` as the key
and specify the name of your reservation as its value.
reservation_affinity_values (List[str]):
Optional. Corresponds to the label values of a reservation resource.
This must be the full resource name of the reservation.
Format: 'projects/{project_id_or_number}/zones/{zone}/reservations/{reservation_name}'
Returns:
Worker pools specs and managed model for run.
Expand Down Expand Up @@ -1490,6 +1512,9 @@ def _prepare_and_validate_run(
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
tpu_topology=tpu_topology,
reservation_affinity_type=reservation_affinity_type,
reservation_affinity_key=reservation_affinity_key,
reservation_affinity_values=reservation_affinity_values,
).pool_specs
)

Expand Down Expand Up @@ -3016,6 +3041,11 @@ def run(
persistent_resource_id: Optional[str] = None,
tpu_topology: Optional[str] = None,
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
reservation_affinity_type: Optional[
Literal["NO_RESERVATION", "ANY_RESERVATION", "SPECIFIC_RESERVATION"]
] = None,
reservation_affinity_key: Optional[str] = None,
reservation_affinity_values: Optional[List[str]] = None,
) -> Optional[models.Model]:
"""Runs the custom training job.
Expand Down Expand Up @@ -3373,6 +3403,23 @@ def run(
be a supported value for the TPU machine type.
scheduling_strategy (gca_custom_job_compat.Scheduling.Strategy):
Optional. Indicates the job scheduling strategy.
reservation_affinity_type (str):
Optional. The type of reservation affinity. One of:
* "NO_RESERVATION" : No reservation is used.
* "ANY_RESERVATION" : Any reservation that matches machine spec
can be used.
* "SPECIFIC_RESERVATION" : A specific reservation must be use
used. See reservation_affinity_key and
reservation_affinity_values for how to specify the reservation.
reservation_affinity_key (str):
Optional. Corresponds to the label key of a reservation resource.
To target a SPECIFIC_RESERVATION by name, use
`compute.googleapis.com/reservation-name` as the key
and specify the name of your reservation as its value.
reservation_affinity_values (List[str]):
Optional. Corresponds to the label values of a reservation resource.
This must be the full resource name of the reservation.
Format: 'projects/{project_id_or_number}/zones/{zone}/reservations/{reservation_name}'
Returns:
The trained Vertex AI model resource or None if the training
Expand All @@ -3393,6 +3440,9 @@ def run(
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
tpu_topology=tpu_topology,
reservation_affinity_type=reservation_affinity_type,
reservation_affinity_key=reservation_affinity_key,
reservation_affinity_values=reservation_affinity_values,
)

# make and copy package
Expand Down Expand Up @@ -3430,9 +3480,11 @@ def run(
enable_web_access=enable_web_access,
enable_dashboard_access=enable_dashboard_access,
tensorboard=tensorboard,
reduction_server_container_uri=reduction_server_container_uri
if reduction_server_replica_count > 0
else None,
reduction_server_container_uri=(
reduction_server_container_uri
if reduction_server_replica_count > 0
else None
),
sync=sync,
create_request_timeout=create_request_timeout,
disable_retries=disable_retries,
Expand Down Expand Up @@ -3492,6 +3544,11 @@ def submit(
persistent_resource_id: Optional[str] = None,
tpu_topology: Optional[str] = None,
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
reservation_affinity_type: Optional[
Literal["NO_RESERVATION", "ANY_RESERVATION", "SPECIFIC_RESERVATION"]
] = None,
reservation_affinity_key: Optional[str] = None,
reservation_affinity_values: Optional[List[str]] = None,
) -> Optional[models.Model]:
"""Submits the custom training job without blocking until completion.
Expand Down Expand Up @@ -3794,6 +3851,23 @@ def submit(
be a supported value for the TPU machine type.
scheduling_strategy (gca_custom_job_compat.Scheduling.Strategy):
Optional. Indicates the job scheduling strategy.
reservation_affinity_type (str):
Optional. The type of reservation affinity. One of:
* "NO_RESERVATION" : No reservation is used.
* "ANY_RESERVATION" : Any reservation that matches machine spec
can be used.
* "SPECIFIC_RESERVATION" : A specific reservation must be use
used. See reservation_affinity_key and
reservation_affinity_values for how to specify the reservation.
reservation_affinity_key (str):
Optional. Corresponds to the label key of a reservation resource.
To target a SPECIFIC_RESERVATION by name, use
`compute.googleapis.com/reservation-name` as the key
and specify the name of your reservation as its value.
reservation_affinity_values (List[str]):
Optional. Corresponds to the label values of a reservation resource.
This must be the full resource name of the reservation.
Format: 'projects/{project_id_or_number}/zones/{zone}/reservations/{reservation_name}'
Returns:
model: The trained Vertex AI Model resource or None if training did not
Expand All @@ -3813,6 +3887,9 @@ def submit(
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
tpu_topology=tpu_topology,
reservation_affinity_type=reservation_affinity_type,
reservation_affinity_key=reservation_affinity_key,
reservation_affinity_values=reservation_affinity_values,
)

# make and copy package
Expand Down Expand Up @@ -3850,9 +3927,11 @@ def submit(
enable_web_access=enable_web_access,
enable_dashboard_access=enable_dashboard_access,
tensorboard=tensorboard,
reduction_server_container_uri=reduction_server_container_uri
if reduction_server_replica_count > 0
else None,
reduction_server_container_uri=(
reduction_server_container_uri
if reduction_server_replica_count > 0
else None
),
sync=sync,
create_request_timeout=create_request_timeout,
block=False,
Expand Down Expand Up @@ -4485,6 +4564,11 @@ def run(
persistent_resource_id: Optional[str] = None,
tpu_topology: Optional[str] = None,
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
reservation_affinity_type: Optional[
Literal["NO_RESERVATION", "ANY_RESERVATION", "SPECIFIC_RESERVATION"]
] = None,
reservation_affinity_key: Optional[str] = None,
reservation_affinity_values: Optional[List[str]] = None,
) -> Optional[models.Model]:
"""Runs the custom training job.
Expand Down Expand Up @@ -4780,6 +4864,23 @@ def run(
must be a supported value for the TPU machine type.
scheduling_strategy (gca_custom_job_compat.Scheduling.Strategy):
Optional. Indicates the job scheduling strategy.
reservation_affinity_type (str):
Optional. The type of reservation affinity. One of:
* "NO_RESERVATION" : No reservation is used.
* "ANY_RESERVATION" : Any reservation that matches machine spec
can be used.
* "SPECIFIC_RESERVATION" : A specific reservation must be use
used. See reservation_affinity_key and
reservation_affinity_values for how to specify the reservation.
reservation_affinity_key (str):
Optional. Corresponds to the label key of a reservation resource.
To target a SPECIFIC_RESERVATION by name, use
`compute.googleapis.com/reservation-name` as the key
and specify the name of your reservation as its value.
reservation_affinity_values (List[str]):
Optional. Corresponds to the label values of a reservation resource.
This must be the full resource name of the reservation.
Format: 'projects/{project_id_or_number}/zones/{zone}/reservations/{reservation_name}'
Returns:
model: The trained Vertex AI Model resource or None if training did not
Expand All @@ -4805,6 +4906,9 @@ def run(
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
tpu_topology=tpu_topology,
reservation_affinity_type=reservation_affinity_type,
reservation_affinity_key=reservation_affinity_key,
reservation_affinity_values=reservation_affinity_values,
)

return self._run(
Expand Down Expand Up @@ -4836,9 +4940,11 @@ def run(
enable_web_access=enable_web_access,
enable_dashboard_access=enable_dashboard_access,
tensorboard=tensorboard,
reduction_server_container_uri=reduction_server_container_uri
if reduction_server_replica_count > 0
else None,
reduction_server_container_uri=(
reduction_server_container_uri
if reduction_server_replica_count > 0
else None
),
sync=sync,
create_request_timeout=create_request_timeout,
disable_retries=disable_retries,
Expand Down Expand Up @@ -4898,6 +5004,11 @@ def submit(
persistent_resource_id: Optional[str] = None,
tpu_topology: Optional[str] = None,
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
reservation_affinity_type: Optional[
Literal["NO_RESERVATION", "ANY_RESERVATION", "SPECIFIC_RESERVATION"]
] = None,
reservation_affinity_key: Optional[str] = None,
reservation_affinity_values: Optional[List[str]] = None,
) -> Optional[models.Model]:
"""Submits the custom training job without blocking until completion.
Expand Down Expand Up @@ -5193,6 +5304,23 @@ def submit(
must be a supported value for the TPU machine type.
scheduling_strategy (gca_custom_job_compat.Scheduling.Strategy):
Optional. Indicates the job scheduling strategy.
reservation_affinity_type (str):
Optional. The type of reservation affinity. One of:
* "NO_RESERVATION" : No reservation is used.
* "ANY_RESERVATION" : Any reservation that matches machine spec
can be used.
* "SPECIFIC_RESERVATION" : A specific reservation must be use
used. See reservation_affinity_key and
reservation_affinity_values for how to specify the reservation.
reservation_affinity_key (str):
Optional. Corresponds to the label key of a reservation resource.
To target a SPECIFIC_RESERVATION by name, use
`compute.googleapis.com/reservation-name` as the key
and specify the name of your reservation as its value.
reservation_affinity_values (List[str]):
Optional. Corresponds to the label values of a reservation resource.
This must be the full resource name of the reservation.
Format: 'projects/{project_id_or_number}/zones/{zone}/reservations/{reservation_name}'
Returns:
model: The trained Vertex AI Model resource or None if training did not
Expand All @@ -5217,6 +5345,9 @@ def submit(
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
tpu_topology=tpu_topology,
reservation_affinity_type=reservation_affinity_type,
reservation_affinity_key=reservation_affinity_key,
reservation_affinity_values=reservation_affinity_values,
)

return self._run(
Expand Down Expand Up @@ -5248,9 +5379,11 @@ def submit(
enable_web_access=enable_web_access,
enable_dashboard_access=enable_dashboard_access,
tensorboard=tensorboard,
reduction_server_container_uri=reduction_server_container_uri
if reduction_server_replica_count > 0
else None,
reduction_server_container_uri=(
reduction_server_container_uri
if reduction_server_replica_count > 0
else None
),
sync=sync,
create_request_timeout=create_request_timeout,
block=False,
Expand Down Expand Up @@ -7572,6 +7705,11 @@ def run(
persistent_resource_id: Optional[str] = None,
tpu_topology: Optional[str] = None,
scheduling_strategy: Optional[gca_custom_job_compat.Scheduling.Strategy] = None,
reservation_affinity_type: Optional[
Literal["NO_RESERVATION", "ANY_RESERVATION", "SPECIFIC_RESERVATION"]
] = None,
reservation_affinity_key: Optional[str] = None,
reservation_affinity_values: Optional[List[str]] = None,
) -> Optional[models.Model]:
"""Runs the custom training job.
Expand Down Expand Up @@ -7868,6 +8006,23 @@ def run(
must be a supported value for the TPU machine type.
scheduling_strategy (gca_custom_job_compat.Scheduling.Strategy):
Optional. Indicates the job scheduling strategy.
reservation_affinity_type (str):
Optional. The type of reservation affinity. One of:
* "NO_RESERVATION" : No reservation is used.
* "ANY_RESERVATION" : Any reservation that matches machine spec
can be used.
* "SPECIFIC_RESERVATION" : A specific reservation must be use
used. See reservation_affinity_key and
reservation_affinity_values for how to specify the reservation.
reservation_affinity_key (str):
Optional. Corresponds to the label key of a reservation resource.
To target a SPECIFIC_RESERVATION by name, use
`compute.googleapis.com/reservation-name` as the key
and specify the name of your reservation as its value.
reservation_affinity_values (List[str]):
Optional. Corresponds to the label values of a reservation resource.
This must be the full resource name of the reservation.
Format: 'projects/{project_id_or_number}/zones/{zone}/reservations/{reservation_name}'
Returns:
model: The trained Vertex AI Model resource or None if training did not
Expand All @@ -7888,6 +8043,9 @@ def run(
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
tpu_topology=tpu_topology,
reservation_affinity_type=reservation_affinity_type,
reservation_affinity_key=reservation_affinity_key,
reservation_affinity_values=reservation_affinity_values,
)

return self._run(
Expand Down Expand Up @@ -7919,9 +8077,11 @@ def run(
enable_web_access=enable_web_access,
enable_dashboard_access=enable_dashboard_access,
tensorboard=tensorboard,
reduction_server_container_uri=reduction_server_container_uri
if reduction_server_replica_count > 0
else None,
reduction_server_container_uri=(
reduction_server_container_uri
if reduction_server_replica_count > 0
else None
),
sync=sync,
create_request_timeout=create_request_timeout,
disable_retries=disable_retries,
Expand Down
Loading

0 comments on commit 802609b

Please sign in to comment.