diff --git a/integ/test_codegen.py b/integ/test_codegen.py index 34e449b..8cfb426 100644 --- a/integ/test_codegen.py +++ b/integ/test_codegen.py @@ -42,6 +42,7 @@ iris_df = iris_df[["target"] + [col for col in iris_df.columns if col != "target"]] train_data, test_data = train_test_split(iris_df, test_size=0.2, random_state=42) train_data.to_csv("./data/train.csv", index=False, header=False) +test_data_no_target = test_data.drop("target", axis=1) # Upload Data prefix = "DEMO-scikit-iris" @@ -149,6 +150,27 @@ def test_training_and_inference(self): ) endpoint.wait_for_status("InService") + invoke_result = endpoint.invoke( + body=test_data_no_target.to_csv(header=False, index=False), + content_type="text/csv", + accept="text/csv", + ) + + assert invoke_result.body + + invoke_result = endpoint.invoke_with_response_stream( + test_data_no_target.to_csv(header=False, index=False), + content_type="text/csv", + accept="application/csv", + ) + + def deserialise(response): + return [res_part for res_part in response["Body"]] + + deserialised_response = deserialise(invoke_result) + assert len(deserialised_response) > 0 + assert deserialised_response[0]["PayloadPart"] + def test_intelligent_defaults(self): os.environ["SAGEMAKER_CORE_ADMIN_CONFIG_OVERRIDE"] = ( self._setup_intelligent_default_configs_and_fetch_path() diff --git a/integ/test_experiment_and_trial.py b/integ/test_experiment_and_trial.py new file mode 100644 index 0000000..45fb887 --- /dev/null +++ b/integ/test_experiment_and_trial.py @@ -0,0 +1,78 @@ +import datetime +import time +import unittest + +from sagemaker_core.helper.session_helper import Session, get_execution_role +from sagemaker_core.main.resources import Experiment, Trial, TrialComponent +from sagemaker_core.main.shapes import RawMetricData, TrialComponentParameterValue +from sagemaker_core.main.utils import get_textual_rich_logger + +logger = get_textual_rich_logger(__name__) + +sagemaker_session = Session() +region = sagemaker_session.boto_region_name +role = get_execution_role() +bucket = sagemaker_session.default_bucket() + + +class TestExperimentAndTrial(unittest.TestCase): + def test_experiment_and_trial(self): + experiment_name = "local-pyspark-experiment-example-" + time.strftime( + "%Y-%m-%d-%H-%M-%S", time.gmtime() + ) + run_group_name = "Default-Run-Group-" + experiment_name + run_name = "local-experiment-run-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime()) + + experiment = Experiment.create(experiment_name=experiment_name) + trial = Trial.create(trial_name=run_group_name, experiment_name=experiment_name) + + created_after = datetime.datetime.now() - datetime.timedelta(days=5) + experiments_iterator = Experiment.get_all(created_after=created_after) + experiments = [exp.experiment_name for exp in experiments_iterator] + + assert len(experiments) > 0 + assert experiment.experiment_name in experiments + + trial_component_parameters = { + "num_train_samples": TrialComponentParameterValue(number_value=5), + "num_test_samples": TrialComponentParameterValue(number_value=5), + } + + trial_component = TrialComponent.create( + trial_component_name=run_name, + parameters=trial_component_parameters, + ) + trial_component.associate_trail(trial_name=trial.trial_name) + + training_parameters = { + "device": TrialComponentParameterValue(string_value="cpu"), + "data_dir": TrialComponentParameterValue(string_value="test"), + "optimizer": TrialComponentParameterValue(string_value="sgd"), + "epochs": TrialComponentParameterValue(number_value=5), + "hidden_channels": TrialComponentParameterValue(number_value=10), + } + trial_component.update(parameters=training_parameters) + + metrics = [] + for i in range(5): + accuracy_metric = RawMetricData( + metric_name="test:accuracy", + value=i / 10, + step=i, + timestamp=time.time(), + ) + metrics.append(accuracy_metric) + + trial_component.batch_put_metrics(metric_data=metrics) + + time.sleep(10) + trial_component.refresh() + + assert len(trial_component.parameters) == 7 + assert len(trial_component.metrics) == 1 + assert trial_component.metrics[0].count == 5 + + trial_component.disassociate_trail(trial_name=trial.trial_name) + trial_component.delete() + trial.delete() + experiment.delete() diff --git a/src/sagemaker_core/main/code_injection/constants.py b/src/sagemaker_core/main/code_injection/constants.py index bc529bc..a187480 100644 --- a/src/sagemaker_core/main/code_injection/constants.py +++ b/src/sagemaker_core/main/code_injection/constants.py @@ -13,7 +13,7 @@ """Constants used in the code_injection modules.""" from enum import Enum -BASIC_TYPES = ["string", "boolean", "integer", "long", "double", "timestamp", "float"] +BASIC_TYPES = ["string", "boolean", "integer", "long", "double", "timestamp", "float", "blob"] STRUCTURE_TYPE = "structure" MAP_TYPE = "map" LIST_TYPE = "list" diff --git a/src/sagemaker_core/main/resources.py b/src/sagemaker_core/main/resources.py index 29461b9..088e5e3 100644 --- a/src/sagemaker_core/main/resources.py +++ b/src/sagemaker_core/main/resources.py @@ -8389,7 +8389,7 @@ def invoke_with_response_stream( inference_component_name: Optional[str] = Unassigned(), session: Optional[Session] = None, region: Optional[str] = None, - ) -> Optional[InvokeEndpointWithResponseStreamOutput]: + ) -> Optional[object]: """ Invokes a model at the specified endpoint to return the inference response as a stream. @@ -8406,7 +8406,7 @@ def invoke_with_response_stream( region: Region name. Returns: - InvokeEndpointWithResponseStreamOutput + object Raises: botocore.exceptions.ClientError: This exception is raised for AWS service related errors. @@ -8449,8 +8449,7 @@ def invoke_with_response_stream( response = client.invoke_endpoint_with_response_stream(**operation_input_args) logger.debug(f"Response: {response}") - transformed_response = transform(response, "InvokeEndpointWithResponseStreamOutput") - return InvokeEndpointWithResponseStreamOutput(**transformed_response) + return response class EndpointConfig(Base): diff --git a/src/sagemaker_core/main/shapes.py b/src/sagemaker_core/main/shapes.py index c48534c..1e6766d 100644 --- a/src/sagemaker_core/main/shapes.py +++ b/src/sagemaker_core/main/shapes.py @@ -139,24 +139,6 @@ class ResponseStream(Base): internal_stream_failure: Optional[InternalStreamFailure] = Unassigned() -class InvokeEndpointWithResponseStreamOutput(Base): - """ - InvokeEndpointWithResponseStreamOutput - - Attributes - ---------------------- - body - content_type: The MIME type of the inference returned from the model container. - invoked_production_variant: Identifies the production variant that was invoked. - custom_attributes: Provides additional information in the response about the inference returned by a model hosted at an Amazon SageMaker endpoint. The information is an opaque value that is forwarded verbatim. You could use this value, for example, to return an ID received in the CustomAttributes header of a request or other metadata that a service endpoint was programmed to produce. The value must consist of no more than 1024 visible US-ASCII characters as specified in Section 3.3.6. Field Value Components of the Hypertext Transfer Protocol (HTTP/1.1). If the customer wants the custom attribute returned, the model must set the custom attribute to be included on the way back. The code in your model is responsible for setting or updating any custom attributes in the response. If your code does not set this value in the response, an empty value is returned. For example, if a custom attribute represents the trace ID, your model can prepend the custom attribute with Trace ID: in your post-processing function. This feature is currently supported in the Amazon Web Services SDKs but not in the Amazon SageMaker Python SDK. - """ - - body: ResponseStream - content_type: Optional[str] = Unassigned() - invoked_production_variant: Optional[str] = Unassigned() - custom_attributes: Optional[str] = Unassigned() - - class ModelError(Base): """ ModelError diff --git a/src/sagemaker_core/main/utils.py b/src/sagemaker_core/main/utils.py index 6db1d49..e26ade1 100644 --- a/src/sagemaker_core/main/utils.py +++ b/src/sagemaker_core/main/utils.py @@ -500,7 +500,7 @@ def _serialize_dict(value: Dict) -> dict: """ serialized_dict = {} for k, v in value.items(): - if serialize_result := serialize(v): + if (serialize_result := serialize(v)) is not None: serialized_dict.update({k: serialize_result}) return serialized_dict @@ -517,7 +517,7 @@ def _serialize_list(value: List) -> list: """ serialized_list = [] for v in value: - if serialize_result := serialize(v): + if (serialize_result := serialize(v)) is not None: serialized_list.append(serialize_result) return serialized_list @@ -534,7 +534,7 @@ def _serialize_shape(value: Any) -> dict: """ serialized_dict = {} for k, v in vars(value).items(): - if serialize_result := serialize(v): + if (serialize_result := serialize(v)) is not None: key = snake_to_pascal(k) if is_snake_case(k) else k serialized_dict.update({key[0].upper() + key[1:]: serialize_result}) return serialized_dict diff --git a/src/sagemaker_core/tools/additional_operations.json b/src/sagemaker_core/tools/additional_operations.json index b910921..e7e795b 100644 --- a/src/sagemaker_core/tools/additional_operations.json +++ b/src/sagemaker_core/tools/additional_operations.json @@ -130,7 +130,7 @@ "operation_name": "InvokeEndpointWithResponseStream", "resource_name": "Endpoint", "method_name": "invoke_with_response_stream", - "return_type": "InvokeEndpointWithResponseStreamOutput", + "return_type": "object", "method_type": "object", "service_name": "sagemaker-runtime" } diff --git a/src/sagemaker_core/tools/resources_codegen.py b/src/sagemaker_core/tools/resources_codegen.py index 96cfc91..386eeb3 100644 --- a/src/sagemaker_core/tools/resources_codegen.py +++ b/src/sagemaker_core/tools/resources_codegen.py @@ -55,6 +55,7 @@ REFRESH_METHOD_TEMPLATE, RESOURCE_BASE_CLASS_TEMPLATE, RETURN_ITERATOR_TEMPLATE, + RETURN_WITHOUT_DESERIALIZATION_TEMPLATE, SERIALIZE_INPUT_TEMPLATE, STOP_METHOD_TEMPLATE, DELETE_METHOD_TEMPLATE, @@ -1373,6 +1374,11 @@ def generate_method(self, method: Method, resource_attributes: list): return_type = f"Optional[{method.return_type}]" deserialize_response = DESERIALIZE_RESPONSE_TO_BASIC_TYPE_TEMPLATE return_string = f"Returns:\n" f" {method.return_type}\n" + elif method.return_type == "object": + # if the return type is object, return the response without deserialization + return_type = f"Optional[{method.return_type}]" + deserialize_response = RETURN_WITHOUT_DESERIALIZATION_TEMPLATE + return_string = f"Returns:\n" f" {method.return_type}\n" else: if method.return_type == "cls": return_type = f'Optional["{method.resource_name}"]' diff --git a/src/sagemaker_core/tools/templates.py b/src/sagemaker_core/tools/templates.py index 8909d79..fbcbde4 100644 --- a/src/sagemaker_core/tools/templates.py +++ b/src/sagemaker_core/tools/templates.py @@ -553,6 +553,9 @@ def {method_name}( DESERIALIZE_RESPONSE_TO_BASIC_TYPE_TEMPLATE = """ return list(response.values())[0]""" +RETURN_WITHOUT_DESERIALIZATION_TEMPLATE = """ + return response""" + RETURN_ITERATOR_TEMPLATE = """ return ResourceIterator( {resource_iterator_args} diff --git a/tst/generated/test_resources.py b/tst/generated/test_resources.py index 90b14d4..8b92a1b 100644 --- a/tst/generated/test_resources.py +++ b/tst/generated/test_resources.py @@ -295,6 +295,8 @@ def test_resources(self, session, mock_transform): operation_info["return_type"] ] } + elif operation_info["return_type"] == "object": + return_value = {"return_value": None} else: return_cls = self.SHAPE_CLASSES_BY_SHAPE_NAME[ operation_info["return_type"] @@ -378,8 +380,8 @@ def _convert_dict_keys_into_pascal_case(self, input_args: dict): return converted def _convert_to_pascal(self, string: str): - if string == "auto_ml_job_name": - return "AutoMLJobName" + if string.startswith("auto_ml_"): + return "AutoML" + snake_to_pascal(string[7:]) return snake_to_pascal(string) def _get_required_parameters_for_function(self, func) -> dict: diff --git a/tst/tools/test_resources_codegen.py b/tst/tools/test_resources_codegen.py index bb56033..1ac3b56 100644 --- a/tst/tools/test_resources_codegen.py +++ b/tst/tools/test_resources_codegen.py @@ -873,7 +873,7 @@ def invoke_with_response_stream( inference_component_name: Optional[str] = Unassigned(), session: Optional[Session] = None, region: Optional[str] = None, -) -> Optional[InvokeEndpointWithResponseStreamOutput]: +) -> Optional[object]: """ Invokes a model at the specified endpoint to return the inference response as a stream. @@ -890,7 +890,7 @@ def invoke_with_response_stream( region: Region name. Returns: - InvokeEndpointWithResponseStreamOutput + object Raises: botocore.exceptions.ClientError: This exception is raised for AWS service related errors. @@ -932,15 +932,14 @@ def invoke_with_response_stream( response = client.invoke_endpoint_with_response_stream(**operation_input_args) logger.debug(f"Response: {response}") - transformed_response = transform(response, 'InvokeEndpointWithResponseStreamOutput') - return InvokeEndpointWithResponseStreamOutput(**transformed_response) + return response ''' method = Method( **{ "operation_name": "InvokeEndpointWithResponseStream", "resource_name": "Endpoint", "method_name": "invoke_with_response_stream", - "return_type": "InvokeEndpointWithResponseStreamOutput", + "return_type": "object", "method_type": "object", "service_name": "sagemaker-runtime", }