From 72cdd9ef60f10192f4c80669f5d2aaa448e9da76 Mon Sep 17 00:00:00 2001 From: You Quan Chong Date: Thu, 9 Jun 2022 11:00:04 -0700 Subject: [PATCH 1/3] implement failure_policy --- google/cloud/aiplatform/compat/__init__.py | 2 + .../cloud/aiplatform/compat/types/__init__.py | 4 ++ google/cloud/aiplatform/pipeline_jobs.py | 11 ++++++ .../cloud/aiplatform/utils/pipeline_utils.py | 38 ++++++++++++++++++- 4 files changed, 53 insertions(+), 2 deletions(-) diff --git a/google/cloud/aiplatform/compat/__init__.py b/google/cloud/aiplatform/compat/__init__.py index 6aea51d133..c0f69f8c1b 100644 --- a/google/cloud/aiplatform/compat/__init__.py +++ b/google/cloud/aiplatform/compat/__init__.py @@ -94,6 +94,7 @@ types.model_evaluation_slice = types.model_evaluation_slice_v1beta1 types.model_service = types.model_service_v1beta1 types.operation = types.operation_v1beta1 + types.pipeline_failure_policy = types.pipeline_failure_policy_v1beta1 types.pipeline_job = types.pipeline_job_v1beta1 types.pipeline_service = types.pipeline_service_v1beta1 types.pipeline_state = types.pipeline_state_v1beta1 @@ -176,6 +177,7 @@ types.model_evaluation_slice = types.model_evaluation_slice_v1 types.model_service = types.model_service_v1 types.operation = types.operation_v1 + types.pipeline_failure_policy = types.pipeline_failure_policy_v1 types.pipeline_job = types.pipeline_job_v1 types.pipeline_service = types.pipeline_service_v1 types.pipeline_state = types.pipeline_state_v1 diff --git a/google/cloud/aiplatform/compat/types/__init__.py b/google/cloud/aiplatform/compat/types/__init__.py index 14ff93f011..79cdfcc34e 100644 --- a/google/cloud/aiplatform/compat/types/__init__.py +++ b/google/cloud/aiplatform/compat/types/__init__.py @@ -61,6 +61,7 @@ model_evaluation_slice as model_evaluation_slice_v1beta1, model_service as model_service_v1beta1, operation as operation_v1beta1, + pipeline_failure_policy as pipeline_failure_policy_v1beta1, pipeline_job as pipeline_job_v1beta1, pipeline_service as pipeline_service_v1beta1, pipeline_state as pipeline_state_v1beta1, @@ -122,6 +123,7 @@ model_evaluation_slice as model_evaluation_slice_v1, model_service as model_service_v1, operation as operation_v1, + pipeline_failure_policy as pipeline_failure_policy_v1, pipeline_job as pipeline_job_v1, pipeline_service as pipeline_service_v1, pipeline_state as pipeline_state_v1, @@ -185,6 +187,7 @@ model_evaluation_slice_v1, model_service_v1, operation_v1, + pipeline_failure_policy_v1beta1, pipeline_job_v1, pipeline_service_v1, pipeline_state_v1, @@ -245,6 +248,7 @@ model_evaluation_slice_v1beta1, model_service_v1beta1, operation_v1beta1, + pipeline_failure_policy_v1beta1, pipeline_job_v1beta1, pipeline_service_v1beta1, pipeline_state_v1beta1, diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index bc50a47aa2..671128d543 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -102,6 +102,7 @@ def __init__( credentials: Optional[auth_credentials.Credentials] = None, project: Optional[str] = None, location: Optional[str] = None, + failure_policy: Optional[str] = None, ): """Retrieves a PipelineJob resource and instantiates its representation. @@ -155,6 +156,15 @@ def __init__( location (str): Optional. Location to create PipelineJob. If not set, location set in aiplatform.init will be used. + failure_policy (str): + Optional. The failure policy - "slow" or "fast". + Currently, the default of a pipeline is that the pipeline will continue to + run until no more tasks can be executed, also known as + PIPELINE_FAILURE_POLICY_FAIL_SLOW (corresponds to "slow"). + However, if a pipeline is set to + PIPELINE_FAILURE_POLICY_FAIL_FAST (corresponds to "fast"), + it will stop scheduling any new tasks when a task has failed. Any + scheduled tasks will continue to completion. Raises: ValueError: If job_id or labels have incorrect format. @@ -201,6 +211,7 @@ def __init__( ) builder.update_pipeline_root(pipeline_root) builder.update_runtime_parameters(parameter_values) + builder.update_failure_policy(failure_policy) runtime_config_dict = builder.build() runtime_config = gca_pipeline_job.PipelineJob.RuntimeConfig()._pb diff --git a/google/cloud/aiplatform/utils/pipeline_utils.py b/google/cloud/aiplatform/utils/pipeline_utils.py index 7ea56264d7..9d2a9b9a5c 100644 --- a/google/cloud/aiplatform/utils/pipeline_utils.py +++ b/google/cloud/aiplatform/utils/pipeline_utils.py @@ -17,6 +17,7 @@ import copy import json from typing import Any, Dict, Mapping, Optional, Union +from google.cloud.aiplatform.compat.types import pipeline_failure_policy import packaging.version @@ -32,6 +33,7 @@ def __init__( schema_version: str, parameter_types: Mapping[str, str], parameter_values: Optional[Dict[str, Any]] = None, + failure_policy: Optional[str] = None, ): """Creates a PipelineRuntimeConfigBuilder object. @@ -44,11 +46,20 @@ def __init__( Required. The mapping from pipeline parameter name to its type. parameter_values (Dict[str, Any]): Optional. The mapping from runtime parameter name to its value. + failure_policy (pipeline_failure_policy.PipelineFailurePolicy): + Optional. Represents the failure policy of a pipeline. Currently, the + default of a pipeline is that the pipeline will continue to + run until no more tasks can be executed, also known as + PIPELINE_FAILURE_POLICY_FAIL_SLOW. However, if a pipeline is + set to PIPELINE_FAILURE_POLICY_FAIL_FAST, it will stop + scheduling any new tasks when a task has failed. Any + scheduled tasks will continue to completion. """ self._pipeline_root = pipeline_root self._schema_version = schema_version self._parameter_types = parameter_types self._parameter_values = copy.deepcopy(parameter_values or {}) + self._failure_policy = failure_policy @classmethod def from_job_spec_json( @@ -80,7 +91,8 @@ def from_job_spec_json( pipeline_root = runtime_config_spec.get("gcsOutputDirectory") parameter_values = _parse_runtime_parameters(runtime_config_spec) - return cls(pipeline_root, schema_version, parameter_types, parameter_values) + failure_policy = runtime_config_spec.get("failurePolicy") + return cls(pipeline_root, schema_version, parameter_types, parameter_values, failure_policy) def update_pipeline_root(self, pipeline_root: Optional[str]) -> None: """Updates pipeline_root value. @@ -111,6 +123,16 @@ def update_runtime_parameters( parameters[k] = json.dumps(v) self._parameter_values.update(parameters) + def update_failure_policy(self, failure_policy: Optional[str] = None) -> None: + """Merges runtime failure policy. + + Args: + failure_policy (str): + Optional. The failure policy - "slow" or "fast". + """ + if failure_policy: + self._failure_policy = _FAILURE_POLICY_TO_ENUM_VALUE[failure_policy] + def build(self) -> Dict[str, Any]: """Build a RuntimeConfig proto. @@ -128,7 +150,8 @@ def build(self) -> Dict[str, Any]: parameter_values_key = "parameterValues" else: parameter_values_key = "parameters" - return { + + runtime_config = { "gcsOutputDirectory": self._pipeline_root, parameter_values_key: { k: self._get_vertex_value(k, v) @@ -137,6 +160,11 @@ def build(self) -> Dict[str, Any]: }, } + if self._failure_policy: + runtime_config["failurePolicy"]: self._failure_policy + + return runtime_config + def _get_vertex_value( self, name: str, value: Union[int, float, str, bool, list, dict] ) -> Union[int, float, str, bool, list, dict]: @@ -205,3 +233,9 @@ def _parse_runtime_parameters( else: raise TypeError("Got unknown type of value: {}".format(value)) return result + +_FAILURE_POLICY_TO_ENUM_VALUE = { + "slow": pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_SLOW, + "fast": pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_FAST, + None: pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_UNSPECIFIED, +} From 438eeee95be5a229b536bd0f52939f8c69056615 Mon Sep 17 00:00:00 2001 From: You Quan Chong Date: Wed, 22 Jun 2022 09:24:04 -0700 Subject: [PATCH 2/3] add tests --- .../cloud/aiplatform/utils/pipeline_utils.py | 13 ++- tests/unit/aiplatform/test_pipeline_jobs.py | 94 +++++++++++++++++++ tests/unit/aiplatform/test_utils.py | 20 +++- 3 files changed, 123 insertions(+), 4 deletions(-) diff --git a/google/cloud/aiplatform/utils/pipeline_utils.py b/google/cloud/aiplatform/utils/pipeline_utils.py index 9d2a9b9a5c..54b4e508b2 100644 --- a/google/cloud/aiplatform/utils/pipeline_utils.py +++ b/google/cloud/aiplatform/utils/pipeline_utils.py @@ -33,7 +33,7 @@ def __init__( schema_version: str, parameter_types: Mapping[str, str], parameter_values: Optional[Dict[str, Any]] = None, - failure_policy: Optional[str] = None, + failure_policy: Optional[pipeline_failure_policy.PipelineFailurePolicy] = None, ): """Creates a PipelineRuntimeConfigBuilder object. @@ -92,7 +92,13 @@ def from_job_spec_json( pipeline_root = runtime_config_spec.get("gcsOutputDirectory") parameter_values = _parse_runtime_parameters(runtime_config_spec) failure_policy = runtime_config_spec.get("failurePolicy") - return cls(pipeline_root, schema_version, parameter_types, parameter_values, failure_policy) + return cls( + pipeline_root, + schema_version, + parameter_types, + parameter_values, + failure_policy, + ) def update_pipeline_root(self, pipeline_root: Optional[str]) -> None: """Updates pipeline_root value. @@ -161,7 +167,7 @@ def build(self) -> Dict[str, Any]: } if self._failure_policy: - runtime_config["failurePolicy"]: self._failure_policy + runtime_config["failurePolicy"] = self._failure_policy return runtime_config @@ -234,6 +240,7 @@ def _parse_runtime_parameters( raise TypeError("Got unknown type of value: {}".format(value)) return result + _FAILURE_POLICY_TO_ENUM_VALUE = { "slow": pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_SLOW, "fast": pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_FAST, diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index 78fa4d926a..5f6b24fd09 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -30,6 +30,7 @@ from google.cloud.aiplatform import base from google.cloud.aiplatform import initializer from google.cloud.aiplatform import pipeline_jobs +from google.cloud.aiplatform.compat.types import pipeline_failure_policy from google.cloud import storage from google.protobuf import json_format @@ -621,6 +622,99 @@ def test_run_call_pipeline_service_create_with_timeout_not_explicitly_set( timeout=None, ) + @pytest.mark.parametrize( + "job_spec", + [_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB], + ) + @pytest.mark.parametrize( + "failure_policy", + [ + ( + "slow", + pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_SLOW, + ), + ( + "fast", + pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_FAST, + ), + ], + ) + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_with_failure_policy( + self, + mock_pipeline_service_create, + mock_pipeline_service_get, + job_spec, + mock_load_yaml_and_json, + failure_policy, + sync, + ): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_GCS_BUCKET_NAME, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + ) + + job = pipeline_jobs.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + template_path=_TEST_TEMPLATE_PATH, + job_id=_TEST_PIPELINE_JOB_ID, + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES, + enable_caching=True, + failure_policy=failure_policy[0], + ) + + job.run( + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + sync=sync, + create_request_timeout=None, + ) + + if not sync: + job.wait() + + expected_runtime_config_dict = { + "gcsOutputDirectory": _TEST_GCS_BUCKET_NAME, + "parameterValues": _TEST_PIPELINE_PARAMETER_VALUES, + "failurePolicy": failure_policy[1], + } + runtime_config = gca_pipeline_job.PipelineJob.RuntimeConfig()._pb + json_format.ParseDict(expected_runtime_config_dict, runtime_config) + + job_spec = yaml.safe_load(job_spec) + pipeline_spec = job_spec.get("pipelineSpec") or job_spec + + # Construct expected request + expected_gapic_pipeline_job = gca_pipeline_job.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + pipeline_spec={ + "components": {}, + "pipelineInfo": pipeline_spec["pipelineInfo"], + "root": pipeline_spec["root"], + "schemaVersion": "2.1.0", + }, + runtime_config=runtime_config, + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + ) + + mock_pipeline_service_create.assert_called_once_with( + parent=_TEST_PARENT, + pipeline_job=expected_gapic_pipeline_job, + pipeline_job_id=_TEST_PIPELINE_JOB_ID, + timeout=None, + ) + + mock_pipeline_service_get.assert_called_with( + name=_TEST_PIPELINE_JOB_NAME, retry=base._DEFAULT_RETRY + ) + + assert job._gca_resource == make_pipeline_job( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + @pytest.mark.parametrize( "job_spec", [ diff --git a/tests/unit/aiplatform/test_utils.py b/tests/unit/aiplatform/test_utils.py index 6d59c16dc3..3020a29abc 100644 --- a/tests/unit/aiplatform/test_utils.py +++ b/tests/unit/aiplatform/test_utils.py @@ -28,6 +28,7 @@ from google.api_core import client_options, gapic_v1 from google.cloud import aiplatform from google.cloud.aiplatform import compat, utils +from google.cloud.aiplatform.compat.types import pipeline_failure_policy from google.cloud.aiplatform.utils import pipeline_utils, tensorboard_utils, yaml_utils from google.cloud.aiplatform_v1.services.model_service import ( client as model_service_client_v1, @@ -454,7 +455,22 @@ def test_pipeline_utils_runtime_config_builder_with_no_op_updates(self): expected_runtime_config = self.SAMPLE_JOB_SPEC["runtimeConfig"] assert expected_runtime_config == actual_runtime_config - def test_pipeline_utils_runtime_config_builder_with_merge_updates(self): + @pytest.mark.parametrize( + "failure_policy", + [ + ( + "slow", + pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_SLOW, + ), + ( + "fast", + pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_FAST, + ), + ], + ) + def test_pipeline_utils_runtime_config_builder_with_merge_updates( + self, failure_policy + ): my_builder = pipeline_utils.PipelineRuntimeConfigBuilder.from_job_spec_json( self.SAMPLE_JOB_SPEC ) @@ -468,6 +484,7 @@ def test_pipeline_utils_runtime_config_builder_with_merge_updates(self): "bool_param": True, } ) + my_builder.update_failure_policy(failure_policy[0]) actual_runtime_config = my_builder.build() expected_runtime_config = { @@ -481,6 +498,7 @@ def test_pipeline_utils_runtime_config_builder_with_merge_updates(self): "list_param": {"stringValue": "[1, 2, 3]"}, "bool_param": {"stringValue": "true"}, }, + "failurePolicy": failure_policy[1], } assert expected_runtime_config == actual_runtime_config From 1a0dec0d125365948a60f481c0b919f95f53f8d2 Mon Sep 17 00:00:00 2001 From: You Quan Chong Date: Thu, 23 Jun 2022 09:31:46 -0700 Subject: [PATCH 3/3] raise valueerror if failure_policy is invalid --- google/cloud/aiplatform/utils/pipeline_utils.py | 10 +++++++++- tests/unit/aiplatform/test_utils.py | 11 +++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/utils/pipeline_utils.py b/google/cloud/aiplatform/utils/pipeline_utils.py index 54b4e508b2..f988cc307e 100644 --- a/google/cloud/aiplatform/utils/pipeline_utils.py +++ b/google/cloud/aiplatform/utils/pipeline_utils.py @@ -135,9 +135,17 @@ def update_failure_policy(self, failure_policy: Optional[str] = None) -> None: Args: failure_policy (str): Optional. The failure policy - "slow" or "fast". + + Raises: + ValueError: if failure_policy is not valid. """ if failure_policy: - self._failure_policy = _FAILURE_POLICY_TO_ENUM_VALUE[failure_policy] + if failure_policy in _FAILURE_POLICY_TO_ENUM_VALUE: + self._failure_policy = _FAILURE_POLICY_TO_ENUM_VALUE[failure_policy] + else: + raise ValueError( + f'failure_policy should be either "slow" or "fast", but got: "{failure_policy}".' + ) def build(self) -> Dict[str, Any]: """Build a RuntimeConfig proto. diff --git a/tests/unit/aiplatform/test_utils.py b/tests/unit/aiplatform/test_utils.py index 3020a29abc..6651990c5a 100644 --- a/tests/unit/aiplatform/test_utils.py +++ b/tests/unit/aiplatform/test_utils.py @@ -502,6 +502,17 @@ def test_pipeline_utils_runtime_config_builder_with_merge_updates( } assert expected_runtime_config == actual_runtime_config + def test_pipeline_utils_runtime_config_builder_invalid_failure_policy(self): + my_builder = pipeline_utils.PipelineRuntimeConfigBuilder.from_job_spec_json( + self.SAMPLE_JOB_SPEC + ) + with pytest.raises(ValueError) as e: + my_builder.update_failure_policy("slo") + + assert e.match( + regexp=r'failure_policy should be either "slow" or "fast", but got: "slo".' + ) + def test_pipeline_utils_runtime_config_builder_parameter_not_found(self): my_builder = pipeline_utils.PipelineRuntimeConfigBuilder.from_job_spec_json( self.SAMPLE_JOB_SPEC