From 167af9c3c9deb3b40d3b01b920329f7ff667293e Mon Sep 17 00:00:00 2001 From: Yi Xu Date: Wed, 8 Feb 2023 16:08:33 -0800 Subject: [PATCH 1/5] wip - add v1 functions for bundle and endpoint crud --- launch/client.py | 949 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 762 insertions(+), 187 deletions(-) diff --git a/launch/client.py b/launch/client.py index d9cb7897..332870b7 100644 --- a/launch/client.py +++ b/launch/client.py @@ -25,12 +25,16 @@ from launch.api_client.model.create_model_bundle_request import ( CreateModelBundleRequest, ) +from launch.api_client.model.create_model_bundle_response import CreateModelBundleResponse from launch.api_client.model.create_model_endpoint_request import ( CreateModelEndpointRequest, ) +from launch.api_client.model.create_model_endpoint_response import CreateModelEndpointResponse +from launch.api_client.model.delete_model_endpoint_response import DeleteModelEndpointResponse from launch.api_client.model.endpoint_predict_request import ( EndpointPredictRequest, ) +from launch.api_client.model.get_model_endpoint_response import GetModelEndpointResponse from launch.api_client.model.gpu_type import GpuType from launch.api_client.model.model_bundle_environment_params import ( ModelBundleEnvironmentParams, @@ -43,6 +47,7 @@ from launch.api_client.model.update_model_endpoint_request import ( UpdateModelEndpointRequest, ) +from launch.api_client.model.update_model_endpoint_response import UpdateModelEndpointResponse from launch.connection import Connection from launch.constants import ( BATCH_TASK_INPUT_SIGNED_URL_PATH, @@ -344,74 +349,18 @@ def create_model_bundle_from_dirs( bundle. This is used to validate the response for the model bundle's endpoint. Note: If request_schema is specified, then response_schema must also be specified. """ - with open(requirements_path, "r", encoding="utf-8") as req_f: - requirements = req_f.read().splitlines() - - tmpdir = tempfile.mkdtemp() - try: - zip_path = os.path.join(tmpdir, "bundle.zip") - _zip_directories(zip_path, base_paths) - with open(zip_path, "rb") as zip_f: - data = zip_f.read() - finally: - shutil.rmtree(tmpdir) - - raw_bundle_url = self._upload_data(data) - - schema_location = None - if bool(request_schema) ^ bool(response_schema): - raise ValueError( - "If request_schema is specified, then response_schema must also be specified." - ) - if request_schema is not None and response_schema is not None: - model_definitions = get_model_definitions( - request_schema=request_schema, - response_schema=response_schema, - ) - model_definitions_encoded = json.dumps(model_definitions).encode() - schema_location = self._upload_data(model_definitions_encoded) - - bundle_metadata = { - "load_predict_fn_module_path": load_predict_fn_module_path, - "load_model_fn_module_path": load_model_fn_module_path, - } - - logger.info( - "create_model_bundle_from_dirs: raw_bundle_url=%s", - raw_bundle_url, - ) - payload = dict( - packaging_type="zip", - bundle_name=model_bundle_name, - location=raw_bundle_url, - bundle_metadata=bundle_metadata, - requirements=requirements, + self.create_model_bundle_from_dirs_v1( + model_bundle_name=model_bundle_name, + base_paths=base_paths, + requirements_path=requirements_path, env_params=env_params, - schema_location=schema_location, + load_predict_fn_module_path=load_predict_fn_module_path, + load_model_fn_module_path=load_model_fn_module_path, + app_config=app_config, + request_schema=request_schema, + response_schema=response_schema, ) - _add_app_config_to_bundle_create_payload(payload, app_config) - with ApiClient(self.configuration) as api_client: - api_instance = DefaultApi(api_client) - framework = ModelBundleFramework(env_params["framework_type"]) - env_params_copy = env_params.copy() - env_params_copy["framework_type"] = framework # type: ignore - env_params_obj = ModelBundleEnvironmentParams(**env_params_copy) # type: ignore - payload = dict_not_none( - env_params=env_params_obj, - location=raw_bundle_url, - name=model_bundle_name, - requirements=requirements, - packaging_type=ModelBundlePackagingType("zip"), - metadata=bundle_metadata, - app_config=payload.get("app_config"), - schema_location=schema_location, - ) - create_model_bundle_request = CreateModelBundleRequest(**payload) # type: ignore - api_instance.create_model_bundle_v1_model_bundles_post( - body=create_model_bundle_request, - skip_deserialization=True, - ) return ModelBundle(model_bundle_name) def create_model_bundle( # pylint: disable=too-many-statements @@ -515,129 +464,32 @@ def create_model_bundle( # pylint: disable=too-many-statements bundle. This is used to validate the response for the model bundle's endpoint. Note: If request_schema is specified, then response_schema must also be specified. """ - # TODO(ivan): remove `disable=too-many-branches` when get rid of `load_*` functions # pylint: disable=too-many-branches - check_args = [ - predict_fn_or_cls is not None, - load_predict_fn is not None and model is not None, - load_predict_fn is not None and load_model_fn is not None, - ] - - if sum(check_args) != 1: - raise ValueError( - "A model bundle consists of exactly {predict_fn_or_cls}, {load_predict_fn + model}, or {load_predict_fn + load_model_fn}." - ) - # TODO should we try to catch when people intentionally pass both model and load_model_fn as None? - - if requirements is None: - # TODO explore: does globals() actually work as expected? Should we use globals_copy instead? - requirements_inferred = find_packages_from_imports(globals()) - requirements = [ - f"{key}=={value}" - for key, value in requirements_inferred.items() - ] - logger.info( - "Using \n%s\n for model bundle %s", - requirements, - model_bundle_name, - ) - - # Prepare cloudpickle for external imports - if globals_copy: - for module in get_imports(globals_copy): - if module.__name__ == cloudpickle.__name__: - # Avoid recursion - # register_pickle_by_value does not work properly with itself - continue - cloudpickle.register_pickle_by_value(module) - - bundle: Union[ - Callable[[Any], Any], Dict[str, Any], None - ] # validate bundle - bundle_metadata = {} - # Create bundle - if predict_fn_or_cls: - bundle = predict_fn_or_cls - if inspect.isfunction(predict_fn_or_cls): - source_code = inspect.getsource(predict_fn_or_cls) - else: - source_code = inspect.getsource(predict_fn_or_cls.__class__) - bundle_metadata["predict_fn_or_cls"] = source_code - elif model is not None: - bundle = dict(model=model, load_predict_fn=load_predict_fn) - bundle_metadata["load_predict_fn"] = inspect.getsource( - load_predict_fn # type: ignore - ) - else: - bundle = dict( - load_model_fn=load_model_fn, load_predict_fn=load_predict_fn - ) - bundle_metadata["load_predict_fn"] = inspect.getsource( - load_predict_fn # type: ignore - ) - bundle_metadata["load_model_fn"] = inspect.getsource( - load_model_fn # type: ignore - ) - - serialized_bundle = cloudpickle.dumps(bundle) - raw_bundle_url = self._upload_data(data=serialized_bundle) - - schema_location = None - if bool(request_schema) ^ bool(response_schema): - raise ValueError( - "If request_schema is specified, then response_schema must also be specified." - ) - if request_schema is not None and response_schema is not None: - model_definitions = get_model_definitions( - request_schema=request_schema, - response_schema=response_schema, - ) - model_definitions_encoded = json.dumps(model_definitions).encode() - schema_location = self._upload_data(model_definitions_encoded) - - payload = dict( - packaging_type="cloudpickle", - bundle_name=model_bundle_name, - location=raw_bundle_url, - bundle_metadata=bundle_metadata, - requirements=requirements, - env_params=env_params, - schema_location=schema_location, - ) - - _add_app_config_to_bundle_create_payload(payload, app_config) - framework = ModelBundleFramework(env_params["framework_type"]) - env_params_copy = env_params.copy() - env_params_copy["framework_type"] = framework # type: ignore - env_params_obj = ModelBundleEnvironmentParams(**env_params_copy) # type: ignore - with ApiClient(self.configuration) as api_client: - api_instance = DefaultApi(api_client) - payload = dict_not_none( - env_params=env_params_obj, - location=raw_bundle_url, - name=model_bundle_name, - requirements=requirements, - packaging_type=ModelBundlePackagingType("cloudpickle"), - metadata=bundle_metadata, - app_config=app_config, - schema_location=schema_location, - ) - create_model_bundle_request = CreateModelBundleRequest(**payload) # type: ignore - api_instance.create_model_bundle_v1_model_bundles_post( - body=create_model_bundle_request, - skip_deserialization=True, - ) # resp["data"]["name"] should equal model_bundle_name # TODO check that a model bundle was created and no name collisions happened + self.create_model_bundle( + model_bundle_name=model_bundle_name, + env_params=env_params, + load_predict_fn=load_predict_fn, + predict_fn_or_cls=predict_fn_or_cls, + requirements=requirements, + model=model, + load_model_fn=load_model_fn, + bundle_url=bundle_url, + app_config=app_config, + globals_copy=globals_copy, + request_schema=request_schema, + response_schema=response_schema, + ) return ModelBundle(model_bundle_name) def create_model_endpoint( self, *, endpoint_name: str, - model_bundle: Union[ModelBundle, str], + model_bundle: Optional[Union[ModelBundle, str]] = None, cpus: int = 3, memory: str = "8Gi", storage: Optional[str] = None, @@ -661,7 +513,10 @@ def create_model_endpoint( endpoint_name: The name of the model endpoint you want to create. The name must be unique across all endpoints that you own. - model_bundle: The ``ModelBundle`` that the endpoint should serve. + model_bundle: (deprecated) The ``ModelBundle`` that the endpoint should serve. Deprecated in favor of + model_bundle_id. + + model_bundle_id: The ID of the ``ModelBundle`` that the endpoint should serve. cpus: Number of cpus each worker should get, e.g. 1, 2, etc. This must be greater than or equal to 1. @@ -725,6 +580,7 @@ def create_model_endpoint( self.edit_model_endpoint( model_endpoint=endpoint_name, model_bundle=model_bundle, + model_bundle_id=model_bundle_id, cpus=cpus, memory=memory, storage=storage, @@ -743,11 +599,23 @@ def create_model_endpoint( logger.info("Creating new endpoint") with ApiClient(self.configuration) as api_client: api_instance = DefaultApi(api_client) - if ( - not isinstance(model_bundle, ModelBundle) - or model_bundle.id is None - ): - model_bundle = self.get_model_bundle(model_bundle) + if bool(model_bundle) ^ bool(model_bundle_id): + raise ValueError("Can only specify one of 'model_bundle' and 'model_bundle_id'") + + if model_bundle_id: + # Need to fetch the ModelBundle object so that we can use it later in this function, + # so as to not break existing things. + else: + if ( + not isinstance(model_bundle, ModelBundle) + or model_bundle.id is None + ): + model_bundle = self.get_model_bundle(model_bundle) + + # By here, we have a ModelBundle regardless, so we can get its .id + model_bundle_id = model_bundle.id + + assert model_bundle_id is not None payload = dict_not_none( cpus=cpus, endpoint_type=ModelEndpointType(endpoint_type), @@ -760,7 +628,7 @@ def create_model_endpoint( memory=memory, metadata={}, min_workers=min_workers, - model_bundle_id=model_bundle.id, + model_bundle_id=model_bundle_id, name=endpoint_name, per_worker=per_worker, post_inference_hooks=post_inference_hooks or [], @@ -992,9 +860,33 @@ def get_model_bundle( resp = json.loads(response.response.data) return ModelBundle.from_dict(resp) # type: ignore + def get_model_bundle_by_id( + self, model_bundle_id: str + ) -> ModelBundle: + """ + Returns a model bundle specified by ``model_bundle_id`` that the user owns. + + Parameters: + model_bundle_id: The bundle ID. + + Returns: + A ``ModelBundle`` object + + """ + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + query_params = frozendict({"model_name": bundle_name}) + response = api_instance.get_latest_model_bundle_v1_model_bundles_latest_get( # type: ignore + query_params=query_params, + skip_deserialization=True, + ) + resp = json.loads(response.response.data) + return ModelBundle.from_dict(resp) # type: ignore + + def clone_model_bundle_with_changes( self, - model_bundle: Union[ModelBundle, str], + model_bundle_id: str, app_config: Optional[Dict] = None, ) -> ModelBundle: """ @@ -1008,11 +900,10 @@ def clone_model_bundle_with_changes( A ``ModelBundle`` object """ - bundle_id = _model_bundle_to_id(model_bundle) with ApiClient(self.configuration) as api_client: api_instance = DefaultApi(api_client) payload = dict_not_none( - original_model_bundle_id=bundle_id, + original_model_bundle_id=model_bundle_id, app_config=app_config, ) clone_model_bundle_request = CloneModelBundleRequest(**payload) @@ -1416,6 +1307,690 @@ def get_batch_async_response(self, batch_job_id: str) -> Dict[str, Any]: resp = json.loads(response.response.data) return resp + ### + # V1 functions + ### + def create_model_bundle_from_dirs_v1( + self, + *, + model_bundle_name: str, + base_paths: List[str], + requirements_path: str, + env_params: Dict[str, str], + load_predict_fn_module_path: str, + load_model_fn_module_path: str, + app_config: Optional[Union[Dict[str, Any], str]] = None, + request_schema: Optional[Type[BaseModel]] = None, + response_schema: Optional[Type[BaseModel]] = None, + ) -> Dict[str, Any]: + """ + Packages up code from one or more local filesystem folders and uploads them as a bundle to Scale Launch. + In this mode, a bundle is just local code instead of a serialized object. + + For example, if you have a directory structure like so, and your current working directory is also ``my_root``: + + .. code-block:: text + + my_root/ + my_module1/ + __init__.py + ...files and directories + my_inference_file.py + my_module2/ + __init__.py + ...files and directories + + then calling ``create_model_bundle_from_dirs`` with ``base_paths=["my_module1", "my_module2"]`` essentially + creates a zip file without the root directory, e.g.: + + .. code-block:: text + + my_module1/ + __init__.py + ...files and directories + my_inference_file.py + my_module2/ + __init__.py + ...files and directories + + and these contents will be unzipped relative to the server side application root. Bear these points in mind when + referencing Python module paths for this bundle. For instance, if ``my_inference_file.py`` has ``def f(...)`` + as the desired inference loading function, then the `load_predict_fn_module_path` argument should be + `my_module1.my_inference_file.f`. + + + Parameters: + model_bundle_name: The name of the model bundle you want to create. The name must be unique across all + bundles that you own. + + base_paths: The paths on the local filesystem where the bundle code lives. + + requirements_path: A path on the local filesystem where a ``requirements.txt`` file lives. + + env_params: A dictionary that dictates environment information e.g. + the use of pytorch or tensorflow, which base image tag to use, etc. + Specifically, the dictionary should contain the following keys: + + - ``framework_type``: either ``tensorflow`` or ``pytorch``. + - PyTorch fields: + - ``pytorch_image_tag``: An image tag for the ``pytorch`` docker base image. The list of tags + can be found from https://hub.docker.com/r/pytorch/pytorch/tags. + - Example: + + .. code-block:: python + + { + "framework_type": "pytorch", + "pytorch_image_tag": "1.10.0-cuda11.3-cudnn8-runtime" + } + + load_predict_fn_module_path: A python module path for a function that, when called with the output of + load_model_fn_module_path, returns a function that carries out inference. + + load_model_fn_module_path: A python module path for a function that returns a model. The output feeds into + the function located at load_predict_fn_module_path. + + app_config: Either a Dictionary that represents a YAML file contents or a local path to a YAML file. + + request_schema: A pydantic model that represents the request schema for the model + bundle. This is used to validate the request body for the model bundle's endpoint. + + response_schema: A pydantic model that represents the request schema for the model + bundle. This is used to validate the response for the model bundle's endpoint. + Note: If request_schema is specified, then response_schema must also be specified. + """ + with open(requirements_path, "r", encoding="utf-8") as req_f: + requirements = req_f.read().splitlines() + + tmpdir = tempfile.mkdtemp() + try: + zip_path = os.path.join(tmpdir, "bundle.zip") + _zip_directories(zip_path, base_paths) + with open(zip_path, "rb") as zip_f: + data = zip_f.read() + finally: + shutil.rmtree(tmpdir) + + raw_bundle_url = self._upload_data(data) + + schema_location = None + if bool(request_schema) ^ bool(response_schema): + raise ValueError( + "If request_schema is specified, then response_schema must also be specified." + ) + if request_schema is not None and response_schema is not None: + model_definitions = get_model_definitions( + request_schema=request_schema, + response_schema=response_schema, + ) + model_definitions_encoded = json.dumps(model_definitions).encode() + schema_location = self._upload_data(model_definitions_encoded) + + bundle_metadata = { + "load_predict_fn_module_path": load_predict_fn_module_path, + "load_model_fn_module_path": load_model_fn_module_path, + } + + logger.info( + "create_model_bundle_from_dirs: raw_bundle_url=%s", + raw_bundle_url, + ) + payload = dict( + packaging_type="zip", + bundle_name=model_bundle_name, + location=raw_bundle_url, + bundle_metadata=bundle_metadata, + requirements=requirements, + env_params=env_params, + schema_location=schema_location, + ) + _add_app_config_to_bundle_create_payload(payload, app_config) + + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + framework = ModelBundleFramework(env_params["framework_type"]) + env_params_copy = env_params.copy() + env_params_copy["framework_type"] = framework # type: ignore + env_params_obj = ModelBundleEnvironmentParams(**env_params_copy) # type: ignore + payload = dict_not_none( + env_params=env_params_obj, + location=raw_bundle_url, + name=model_bundle_name, + requirements=requirements, + packaging_type=ModelBundlePackagingType("zip"), + metadata=bundle_metadata, + app_config=payload.get("app_config"), + schema_location=schema_location, + ) + create_model_bundle_request = CreateModelBundleRequest(**payload) # type: ignore + return api_instance.create_model_bundle_v1_model_bundles_post( + body=create_model_bundle_request, + skip_deserialization=True, + ) + + def create_model_bundle_v1( + self, + model_bundle_name: str, + env_params: Dict[str, str], + *, + load_predict_fn: Optional[ + Callable[[LaunchModel_T], Callable[[Any], Any]] + ] = None, + predict_fn_or_cls: Optional[Callable[[Any], Any]] = None, + requirements: Optional[List[str]] = None, + model: Optional[LaunchModel_T] = None, + load_model_fn: Optional[Callable[[], LaunchModel_T]] = None, + bundle_url: Optional[str] = None, + app_config: Optional[Union[Dict[str, Any], str]] = None, + globals_copy: Optional[Dict[str, Any]] = None, + request_schema: Optional[Type[BaseModel]] = None, + response_schema: Optional[Type[BaseModel]] = None, + ) -> Dict[str, Any]: + """ + Uploads and registers a model bundle to Scale Launch. + + A model bundle consists of exactly one of the following: + + - ``predict_fn_or_cls`` + - ``load_predict_fn + model`` + - ``load_predict_fn + load_model_fn`` + + Pre/post-processing code can be included inside load_predict_fn/model or in predict_fn_or_cls call. + + Parameters: + model_bundle_name: The name of the model bundle you want to create. The name must be unique across all + bundles that you own. + + predict_fn_or_cls: ``Function`` or a ``Callable`` class that runs end-to-end (pre/post processing and model inference) on the call. + i.e. ``predict_fn_or_cls(REQUEST) -> RESPONSE``. + + model: Typically a trained Neural Network, e.g. a Pytorch module. + + Exactly one of ``model`` and ``load_model_fn`` must be provided. + + load_model_fn: A function that, when run, loads a model. This function is essentially a deferred + wrapper around the ``model`` argument. + + Exactly one of ``model`` and ``load_model_fn`` must be provided. + + load_predict_fn: Function that, when called with a model, returns a function that carries out inference. + + If ``model`` is specified, then this is equivalent + to: + ``load_predict_fn(model, app_config=optional_app_config]) -> predict_fn`` + + Otherwise, if ``load_model_fn`` is specified, then this is equivalent + to: + ``load_predict_fn(load_model_fn(), app_config=optional_app_config]) -> predict_fn`` + + In both cases, ``predict_fn`` is then the inference function, i.e.: + ``predict_fn(REQUEST) -> RESPONSE`` + + + requirements: A list of python package requirements, where each list element is of the form + ``==``, e.g. + + ``["tensorflow==2.3.0", "tensorflow-hub==0.11.0"]`` + + If you do not pass in a value for ``requirements``, then you must pass in ``globals()`` for the + ``globals_copy`` argument. + + app_config: Either a Dictionary that represents a YAML file contents or a local path to a YAML file. + + env_params: A dictionary that dictates environment information e.g. + the use of pytorch or tensorflow, which base image tag to use, etc. + Specifically, the dictionary should contain the following keys: + + - ``framework_type``: either ``tensorflow`` or ``pytorch``. + - PyTorch fields: + - ``pytorch_image_tag``: An image tag for the ``pytorch`` docker base image. The list of tags + can be found from https://hub.docker.com/r/pytorch/pytorch/tags. + - Example: + + .. code-block:: python + + { + "framework_type": "pytorch", + "pytorch_image_tag": "1.10.0-cuda11.3-cudnn8-runtime" + } + + - Tensorflow fields: + - ``tensorflow_version``: Version of tensorflow, e.g. ``"2.3.0"``. + + globals_copy: Dictionary of the global symbol table. Normally provided by ``globals()`` built-in function. + + bundle_url: (Only used in self-hosted mode.) The desired location of bundle. + Overrides any value given by ``self.bundle_location_fn`` + + request_schema: A pydantic model that represents the request schema for the model + bundle. This is used to validate the request body for the model bundle's endpoint. + + response_schema: A pydantic model that represents the request schema for the model + bundle. This is used to validate the response for the model bundle's endpoint. + Note: If request_schema is specified, then response_schema must also be specified. + """ + check_args = [ + predict_fn_or_cls is not None, + load_predict_fn is not None and model is not None, + load_predict_fn is not None and load_model_fn is not None, + ] + + if sum(check_args) != 1: + raise ValueError( + "A model bundle consists of exactly {predict_fn_or_cls}, {load_predict_fn + model}, or {load_predict_fn + load_model_fn}." + ) + # TODO should we try to catch when people intentionally pass both model and load_model_fn as None? + + if requirements is None: + # TODO explore: does globals() actually work as expected? Should we use globals_copy instead? + requirements_inferred = find_packages_from_imports(globals()) + requirements = [ + f"{key}=={value}" + for key, value in requirements_inferred.items() + ] + logger.info( + "Using \n%s\n for model bundle %s", + requirements, + model_bundle_name, + ) + + # Prepare cloudpickle for external imports + if globals_copy: + for module in get_imports(globals_copy): + if module.__name__ == cloudpickle.__name__: + # Avoid recursion + # register_pickle_by_value does not work properly with itself + continue + cloudpickle.register_pickle_by_value(module) + + bundle: Union[ + Callable[[Any], Any], Dict[str, Any], None + ] # validate bundle + bundle_metadata = {} + # Create bundle + if predict_fn_or_cls: + bundle = predict_fn_or_cls + if inspect.isfunction(predict_fn_or_cls): + source_code = inspect.getsource(predict_fn_or_cls) + else: + source_code = inspect.getsource(predict_fn_or_cls.__class__) + bundle_metadata["predict_fn_or_cls"] = source_code + elif model is not None: + bundle = dict(model=model, load_predict_fn=load_predict_fn) + bundle_metadata["load_predict_fn"] = inspect.getsource( + load_predict_fn # type: ignore + ) + else: + bundle = dict( + load_model_fn=load_model_fn, load_predict_fn=load_predict_fn + ) + bundle_metadata["load_predict_fn"] = inspect.getsource( + load_predict_fn # type: ignore + ) + bundle_metadata["load_model_fn"] = inspect.getsource( + load_model_fn # type: ignore + ) + + serialized_bundle = cloudpickle.dumps(bundle) + raw_bundle_url = self._upload_data(data=serialized_bundle) + + schema_location = None + if bool(request_schema) ^ bool(response_schema): + raise ValueError( + "If request_schema is specified, then response_schema must also be specified." + ) + if request_schema is not None and response_schema is not None: + model_definitions = get_model_definitions( + request_schema=request_schema, + response_schema=response_schema, + ) + model_definitions_encoded = json.dumps(model_definitions).encode() + schema_location = self._upload_data(model_definitions_encoded) + + payload = dict( + packaging_type="cloudpickle", + bundle_name=model_bundle_name, + location=raw_bundle_url, + bundle_metadata=bundle_metadata, + requirements=requirements, + env_params=env_params, + schema_location=schema_location, + ) + + _add_app_config_to_bundle_create_payload(payload, app_config) + framework = ModelBundleFramework(env_params["framework_type"]) + env_params_copy = env_params.copy() + env_params_copy["framework_type"] = framework # type: ignore + env_params_obj = ModelBundleEnvironmentParams(**env_params_copy) # type: ignore + + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + payload = dict_not_none( + env_params=env_params_obj, + location=raw_bundle_url, + name=model_bundle_name, + requirements=requirements, + packaging_type=ModelBundlePackagingType("cloudpickle"), + metadata=bundle_metadata, + app_config=app_config, + schema_location=schema_location, + ) + create_model_bundle_request = CreateModelBundleRequest(**payload) # type: ignore + response = api_instance.create_model_bundle_v1_model_bundles_post( + body=create_model_bundle_request, + skip_deserialization=True, + ) + return json.loads(response.response.data) + def list_model_bundles_v1(self) -> List[Dict[str, Any]]: + """ + Returns a list of model bundles that the user owns. + + Returns: + A list of JSON objects that correspond to the bundles that the user owns. + """ + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + response = api_instance.list_model_bundles_v1_model_bundles_get( + skip_deserialization=True + ) + return json.loads(response.response.data) + + def get_model_bundle_by_id_v1(self, model_bundle_id: str) -> Dict[str, Any]: + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + path_params = frozendict({"model_bundle_id": model_bundle_id}) + response = api_instance.get_model_bundle_v1_model_bundles_model_bundle_id_get( # type: ignore + path_params=path_params, # type: ignore + skip_deserialization=True, + ) + return json.loads(response.response.data) + pass + + def create_model_endpoint_v1( + self, + *, + endpoint_name: str, + model_bundle_id: Optional[str] = None, + cpus: int = 3, + memory: str = "8Gi", + storage: Optional[str] = None, + gpus: int = 0, + min_workers: int = 1, + max_workers: int = 1, + per_worker: int = 10, + gpu_type: Optional[str] = None, + endpoint_type: str = "sync", + post_inference_hooks: Optional[List[PostInferenceHooks]] = None, + default_callback_url: Optional[str] = None, + update_if_exists: bool = False, + labels: Optional[Dict[str, str]] = None, + ) -> Dict[str, Any]: + """ + Creates and registers a model endpoint in Scale Launch. The returned object is an instance of type ``Endpoint``, + which is a base class of either ``SyncEndpoint`` or ``AsyncEndpoint``. This is the object + to which you sent inference requests. + + Parameters: + endpoint_name: The name of the model endpoint you want to create. The name must be unique across + all endpoints that you own. + + model_bundle_id: The ID of the ``ModelBundle`` that the endpoint should serve. + + cpus: Number of cpus each worker should get, e.g. 1, 2, etc. This must be greater than or equal to 1. + + memory: Amount of memory each worker should get, e.g. "4Gi", "512Mi", etc. This must be a positive + amount of memory. + + storage: Amount of local ephemeral storage each worker should get, e.g. "4Gi", "512Mi", etc. This must + be a positive amount of storage. + + gpus: Number of gpus each worker should get, e.g. 0, 1, etc. + + min_workers: The minimum number of workers. Must be greater than or equal to 0. This should be determined + by computing the minimum throughput of your workload and dividing it by the throughput of a single + worker. This field must be at least ``1`` for synchronous endpoints. + + max_workers: The maximum number of workers. Must be greater than or equal to 0, and as well as + greater than or equal to ``min_workers``. This should be determined by computing the maximum throughput + of your workload and dividing it by the throughput of a single worker. + + per_worker: The maximum number of concurrent requests that an individual worker can service. Launch + automatically scales the number of workers for the endpoint so that each worker is processing + ``per_worker`` requests, subject to the limits defined by ``min_workers`` and ``max_workers``. + + - If the average number of concurrent requests per worker is lower than ``per_worker``, then the number + of workers will be reduced. + - Otherwise, if the average number of concurrent requests per worker is higher + than ``per_worker``, then the number of workers will be increased to meet the elevated traffic. + + Here is our recommendation for computing ``per_worker``: + + 1. Compute ``min_workers`` and ``max_workers`` per your minimum and maximum throughput requirements. + 2. Determine a value for the maximum number of concurrent requests in the workload. Divide this number + by ``max_workers``. Doing this ensures that the number of workers will "climb" to ``max_workers``. + + gpu_type: If specifying a non-zero number of gpus, this controls the type of gpu requested. Here are the + supported values: + + - ``nvidia-tesla-t4`` + - ``nvidia-ampere-a10`` + + endpoint_type: Either ``"sync"`` or ``"async"``. + + post_inference_hooks: List of hooks to trigger after inference tasks are served. + + default_callback_url: The default callback url to use for async endpoints. + This can be overridden in the task parameters for each individual task. + post_inference_hooks must contain "callback" for the callback to be triggered. + + update_if_exists: If ``True``, will attempt to update the endpoint if it exists. Otherwise, will + unconditionally try to create a new endpoint. Note that endpoint names for a given user must be unique, + so attempting to call this function with ``update_if_exists=False`` for an existing endpoint will raise + an error. + + labels: An optional dictionary of key/value pairs to associate with this endpoint. + + Returns: + A Endpoint object that can be used to make requests to the endpoint. + + """ + if update_if_exists: + try: + endpoint_response = self.get_model_endpoint_v1() + except: + raise Exception("TODO: ") + + response = self.update_model_endpoint_v1( + model_endpoint_id=endpoint_response.id, + model_bundle_id=model_bundle_id, + cpus=cpus, + memory=memory, + storage=storage, + gpus=gpus, + min_workers=min_workers, + max_workers=max_workers, + per_worker=per_worker, + gpu_type=gpu_type, + default_callback_url=default_callback_url, + ) + return json.loads(response.resopnse.data) + else: + # Presumably, the user knows that the endpoint doesn't already exist, and so we can defer + # to the server to reject any duplicate creations. + logger.info("Creating new endpoint") + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + + payload = dict_not_none( + cpus=cpus, + endpoint_type=ModelEndpointType(endpoint_type), + gpus=gpus, + gpu_type=GpuType(gpu_type) + if gpu_type is not None + else None, + labels=labels or {}, + max_workers=max_workers, + memory=memory, + metadata={}, + min_workers=min_workers, + model_bundle_id=model_bundle_id, + name=endpoint_name, + per_worker=per_worker, + post_inference_hooks=post_inference_hooks or [], + default_callback_url=default_callback_url, + storage=storage, + ) + create_model_endpoint_request = CreateModelEndpointRequest( + **payload + ) + response = ( + api_instance.create_model_endpoint_v1_model_endpoints_post( + body=create_model_endpoint_request, + skip_deserialization=True, + ) + ) + return json.loads(response.response.data) + + def update_model_endpoint_v1( + self, + *, + model_endpoint_id: str, + model_bundle_id: Optional[str] = None, + cpus: Optional[float] = None, + memory: Optional[str] = None, + storage: Optional[str] = None, + gpus: Optional[int] = None, + min_workers: Optional[int] = None, + max_workers: Optional[int] = None, + per_worker: Optional[int] = None, + gpu_type: Optional[str] = None, + post_inference_hooks: Optional[List[PostInferenceHooks]] = None, + default_callback_url: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Edits an existing model endpoint. Here are the fields that **cannot** be edited on an existing endpoint: + + - The endpoint's name. + - The endpoint's type (i.e. you cannot go from a ``SyncEnpdoint`` to an ``AsyncEndpoint`` or vice versa. + + Parameters: + model_endpoint: The model endpoint (or its name) you want to edit. The name must be unique across + all endpoints that you own. + + model_bundle: The ``ModelBundle`` that the endpoint should serve. + + cpus: Number of cpus each worker should get, e.g. 1, 2, etc. This must be greater than or equal to 1. + + memory: Amount of memory each worker should get, e.g. "4Gi", "512Mi", etc. This must be a positive + amount of memory. + + storage: Amount of local ephemeral storage each worker should get, e.g. "4Gi", "512Mi", etc. This must + be a positive amount of storage. + + gpus: Number of gpus each worker should get, e.g. 0, 1, etc. + + min_workers: The minimum number of workers. Must be greater than or equal to 0. + + max_workers: The maximum number of workers. Must be greater than or equal to 0, and as well as + greater than or equal to ``min_workers``. + + per_worker: The maximum number of concurrent requests that an individual worker can service. Launch + automatically scales the number of workers for the endpoint so that each worker is processing + ``per_worker`` requests: + + - If the average number of concurrent requests per worker is lower than ``per_worker``, then the number + of workers will be reduced. + - Otherwise, if the average number of concurrent requests per worker is higher + than ``per_worker``, then the number of workers will be increased to meet the elevated traffic. + + gpu_type: If specifying a non-zero number of gpus, this controls the type of gpu requested. Here are the + supported values: + + - ``nvidia-tesla-t4`` + - ``nvidia-ampere-a10`` + + post_inference_hooks: List of hooks to trigger after inference tasks are served. + + default_callback_url: The default callback url to use for async endpoints. + This can be overridden in the task parameters for each individual task. + post_inference_hooks must contain "callback" for the callback to be triggered. + + """ + logger.info("Editing existing endpoint") + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + + payload = dict_not_none( + cpus=cpus, + gpus=gpus, + gpu_type=GpuType(gpu_type) if gpu_type is not None else None, + max_workers=max_workers, + memory=memory, + min_workers=min_workers, + model_bundle_id=model_bundle_id, + per_worker=per_worker, + post_inference_hooks=post_inference_hooks or [], + default_callback_url=default_callback_url, + storage=storage, + ) + update_model_endpoint_request = UpdateModelEndpointRequest( + **payload + ) + path_params = frozendict({"model_endpoint_id": model_endpoint_id}) + response = api_instance.update_model_endpoint_v1_model_endpoints_model_endpoint_id_put( # type: ignore + body=update_model_endpoint_request, + path_params=path_params, # type: ignore + skip_deserialization=True, + ) + return json.loads(response.response.data) + + def get_model_endpoint_by_id_v1( + self, model_endpoint_id: str + ) -> Dict[str, Any]: + """ + Gets a model endpoint for a given ID. + + Parameters: + endpoint_id: The name of the endpoint to retrieve. + """ + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + path_params = frozendict({"model_endpoint_id": model_endpoint_id}) + response = api_instance.get_model_endpoints_api_v1_model_endpoints_api_get( # type: ignore + # TODO: how to pass ID? + path_params=path_params, + skip_deserialization=True, + ) + return json.loads(response.response.data) + + def get_model_endpoint_by_name_v1( + self, model_endpoint_name: str + ) -> Dict[str, Any]: + """ + Gets a model endpoint for a given ID. + + Parameters: + model_endpoint_name: The name of the endpoint to retrieve. + """ + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + query_params = frozendict({"name": model_endpoint_name}) + response = api_instance.get_model_endpoints_api_v1_model_endpoints_api_get( # type: ignore + query_params=query_params, + skip_deserialization=True, + ) + return response + + def delete_model_endpoint_v1(self, model_endpoint_id: str) -> Dict[str, Any]: + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + path_params = frozendict({"model_endpoint_id": model_endpoint_id}) + response = api_instance.delete_model_endpoint_v1_model_endpoints_model_endpoint_id_delete( # type: ignore + path_params=path_params, # type: ignore + skip_deserialization=True, + ) + return json.loads(response.response.data) + def _zip_directory(zipf: ZipFile, path: str) -> None: for root, _, files in os.walk(path): From 1264e47afd954055f1c5550281281dd08bd47902 Mon Sep 17 00:00:00 2001 From: Yi Xu Date: Wed, 8 Feb 2023 16:30:15 -0800 Subject: [PATCH 2/5] Add stubs for inference tasks --- launch/client.py | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/launch/client.py b/launch/client.py index 332870b7..cc580d04 100644 --- a/launch/client.py +++ b/launch/client.py @@ -19,6 +19,7 @@ from launch.api_client.model.clone_model_bundle_request import ( CloneModelBundleRequest, ) +from launch.api_client.model.create_async_task_response import CreateAsyncTaskResponse from launch.api_client.model.create_batch_job_request import ( CreateBatchJobRequest, ) @@ -34,6 +35,7 @@ from launch.api_client.model.endpoint_predict_request import ( EndpointPredictRequest, ) +from launch.api_client.model.get_async_task_response import GetAsyncTaskResponse from launch.api_client.model.get_model_endpoint_response import GetModelEndpointResponse from launch.api_client.model.gpu_type import GpuType from launch.api_client.model.model_bundle_environment_params import ( @@ -44,6 +46,7 @@ ModelBundlePackagingType, ) from launch.api_client.model.model_endpoint_type import ModelEndpointType +from launch.api_client.model.sync_endpoint_predict_response import SyncEndpointPredictResponse from launch.api_client.model.update_model_endpoint_request import ( UpdateModelEndpointRequest, ) @@ -1874,10 +1877,10 @@ def update_model_endpoint_v1( - The endpoint's type (i.e. you cannot go from a ``SyncEnpdoint`` to an ``AsyncEndpoint`` or vice versa. Parameters: - model_endpoint: The model endpoint (or its name) you want to edit. The name must be unique across + model_endpoint_id: The model endpoint (or its name) you want to edit. The name must be unique across all endpoints that you own. - model_bundle: The ``ModelBundle`` that the endpoint should serve. + model_bundle_id: The ``ModelBundle`` that the endpoint should serve. cpus: Number of cpus each worker should get, e.g. 1, 2, etc. This must be greater than or equal to 1. @@ -1991,6 +1994,32 @@ def delete_model_endpoint_v1(self, model_endpoint_id: str) -> Dict[str, Any]: ) return json.loads(response.response.data) + def create_sync_inference_task_v1( + self, + model_endpoint_id: str, + url: Optional[str] = None, + args: Optional[Dict[str, Any]] = None, + callback_url: Optional[str] = None, + return_pickled: Optional[bool] = False, + ) -> SyncEndpointPredictResponse: + pass + + def create_async_inference_task_v1( + self, + model_endpoint_id: str, + url: Optional[str] = None, + args: Optional[Dict[str, Any]] = None, + callback_url: Optional[str] = None, + return_pickled: Optional[bool] = False, + ) -> CreateAsyncTaskResponse: + pass + + def get_async_inference_task_v1( + self, + task_id: str, + ) -> GetAsyncTaskResponse: + pass + def _zip_directory(zipf: ZipFile, path: str) -> None: for root, _, files in os.walk(path): From c77569c3f1be0ed05f515d06907c328c6223c7fd Mon Sep 17 00:00:00 2001 From: Yi Xu Date: Wed, 8 Feb 2023 16:39:10 -0800 Subject: [PATCH 3/5] fixes --- launch/client.py | 66 +++++++++++++++--------------------------------- 1 file changed, 20 insertions(+), 46 deletions(-) diff --git a/launch/client.py b/launch/client.py index cc580d04..fa5a9826 100644 --- a/launch/client.py +++ b/launch/client.py @@ -519,8 +519,6 @@ def create_model_endpoint( model_bundle: (deprecated) The ``ModelBundle`` that the endpoint should serve. Deprecated in favor of model_bundle_id. - model_bundle_id: The ID of the ``ModelBundle`` that the endpoint should serve. - cpus: Number of cpus each worker should get, e.g. 1, 2, etc. This must be greater than or equal to 1. memory: Amount of memory each worker should get, e.g. "4Gi", "512Mi", etc. This must be a positive @@ -583,7 +581,6 @@ def create_model_endpoint( self.edit_model_endpoint( model_endpoint=endpoint_name, model_bundle=model_bundle, - model_bundle_id=model_bundle_id, cpus=cpus, memory=memory, storage=storage, @@ -602,23 +599,13 @@ def create_model_endpoint( logger.info("Creating new endpoint") with ApiClient(self.configuration) as api_client: api_instance = DefaultApi(api_client) - if bool(model_bundle) ^ bool(model_bundle_id): - raise ValueError("Can only specify one of 'model_bundle' and 'model_bundle_id'") - - if model_bundle_id: - # Need to fetch the ModelBundle object so that we can use it later in this function, - # so as to not break existing things. - else: - if ( - not isinstance(model_bundle, ModelBundle) - or model_bundle.id is None - ): - model_bundle = self.get_model_bundle(model_bundle) - - # By here, we have a ModelBundle regardless, so we can get its .id - model_bundle_id = model_bundle.id - - assert model_bundle_id is not None + + if ( + not isinstance(model_bundle, ModelBundle) + or model_bundle.id is None + ): + model_bundle = self.get_model_bundle(model_bundle) + payload = dict_not_none( cpus=cpus, endpoint_type=ModelEndpointType(endpoint_type), @@ -631,7 +618,6 @@ def create_model_endpoint( memory=memory, metadata={}, min_workers=min_workers, - model_bundle_id=model_bundle_id, name=endpoint_name, per_worker=per_worker, post_inference_hooks=post_inference_hooks or [], @@ -863,28 +849,6 @@ def get_model_bundle( resp = json.loads(response.response.data) return ModelBundle.from_dict(resp) # type: ignore - def get_model_bundle_by_id( - self, model_bundle_id: str - ) -> ModelBundle: - """ - Returns a model bundle specified by ``model_bundle_id`` that the user owns. - - Parameters: - model_bundle_id: The bundle ID. - - Returns: - A ``ModelBundle`` object - - """ - with ApiClient(self.configuration) as api_client: - api_instance = DefaultApi(api_client) - query_params = frozendict({"model_name": bundle_name}) - response = api_instance.get_latest_model_bundle_v1_model_bundles_latest_get( # type: ignore - query_params=query_params, - skip_deserialization=True, - ) - resp = json.loads(response.response.data) - return ModelBundle.from_dict(resp) # type: ignore def clone_model_bundle_with_changes( @@ -1684,7 +1648,8 @@ def create_model_bundle_v1( skip_deserialization=True, ) return json.loads(response.response.data) - def list_model_bundles_v1(self) -> List[Dict[str, Any]]: + + def list_model_bundles_v1(self) -> List[Dict[str, Any]]: """ Returns a list of model bundles that the user owns. @@ -1707,7 +1672,16 @@ def get_model_bundle_by_id_v1(self, model_bundle_id: str) -> Dict[str, Any]: skip_deserialization=True, ) return json.loads(response.response.data) - pass + + def get_latest_model_bundle_v1(self, model_bundle_name: str) -> Dict[str, Any]: + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + path_params = frozendict({"model_name": model_bundle_name}) + response = api_instance.get_latest_model_bundle_v1_model_bundles_latest_get( # type: ignore + path_params=path_params, # type: ignore + skip_deserialization=True, + ) + return json.loads(response.response.data) def create_model_endpoint_v1( self, @@ -1940,7 +1914,7 @@ def update_model_endpoint_v1( **payload ) path_params = frozendict({"model_endpoint_id": model_endpoint_id}) - response = api_instance.update_model_endpoint_v1_model_endpoints_model_endpoint_id_put( # type: ignore + response = api_instance.update_model_endpoint_v1_model_endpoints_model_endpoint_id_put( # type: ignore body=update_model_endpoint_request, path_params=path_params, # type: ignore skip_deserialization=True, From f4d2f38ec46aa0d5424ce9b18d50267357f04df4 Mon Sep 17 00:00:00 2001 From: Yi Xu Date: Wed, 8 Feb 2023 17:37:25 -0800 Subject: [PATCH 4/5] Add list_endpoints_v1 --- launch/client.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/launch/client.py b/launch/client.py index fa5a9826..58f74bcd 100644 --- a/launch/client.py +++ b/launch/client.py @@ -808,6 +808,11 @@ def get_model_endpoint( "Endpoint should be one of the types 'sync' or 'async'" ) + """ + XXX: Returns a 500 + + Also need to pass in optional query params + """ def list_model_bundles(self) -> List[ModelBundle]: """ Returns a list of model bundles that the user owns. @@ -817,7 +822,9 @@ def list_model_bundles(self) -> List[ModelBundle]: """ with ApiClient(self.configuration) as api_client: api_instance = DefaultApi(api_client) + query_params = {"name": self.model_endpoint.name} response = api_instance.list_model_bundles_v1_model_bundles_get( + query_params=query_params, skip_deserialization=True ) resp = json.loads(response.response.data) @@ -849,8 +856,6 @@ def get_model_bundle( resp = json.loads(response.response.data) return ModelBundle.from_dict(resp) # type: ignore - - def clone_model_bundle_with_changes( self, model_bundle_id: str, @@ -1921,6 +1926,22 @@ def update_model_endpoint_v1( ) return json.loads(response.response.data) + def list_model_endpoints_v1(self) -> List[Dict[str, Any]]: + """ + Lists all model endpoints that the user owns. + + Returns: + A list of ``ModelEndpoint`` objects. + """ + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + response = ( + api_instance.list_model_endpoints_v1_model_endpoints_get( + skip_deserialization=True + ) + ) + return json.loads(response.response.data) + def get_model_endpoint_by_id_v1( self, model_endpoint_id: str ) -> Dict[str, Any]: @@ -1928,7 +1949,7 @@ def get_model_endpoint_by_id_v1( Gets a model endpoint for a given ID. Parameters: - endpoint_id: The name of the endpoint to retrieve. + model_endpoint_id: The name of the endpoint to retrieve. """ with ApiClient(self.configuration) as api_client: api_instance = DefaultApi(api_client) From d6bf1a0c98cb97936ded5ca3487c70bbfcca1ff8 Mon Sep 17 00:00:00 2001 From: Yi Xu Date: Wed, 8 Feb 2023 18:07:37 -0800 Subject: [PATCH 5/5] Implement inference fns --- launch/client.py | 42 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/launch/client.py b/launch/client.py index 58f74bcd..44f5e78a 100644 --- a/launch/client.py +++ b/launch/client.py @@ -1955,7 +1955,6 @@ def get_model_endpoint_by_id_v1( api_instance = DefaultApi(api_client) path_params = frozendict({"model_endpoint_id": model_endpoint_id}) response = api_instance.get_model_endpoints_api_v1_model_endpoints_api_get( # type: ignore - # TODO: how to pass ID? path_params=path_params, skip_deserialization=True, ) @@ -1994,10 +1993,14 @@ def create_sync_inference_task_v1( model_endpoint_id: str, url: Optional[str] = None, args: Optional[Dict[str, Any]] = None, - callback_url: Optional[str] = None, return_pickled: Optional[bool] = False, - ) -> SyncEndpointPredictResponse: - pass + ) -> Dict[str, Any]: + return self._sync_request( + endpoint_id=model_endpoint_id, + url=url, + args=args, + return_pickled=return_pickled, + ) def create_async_inference_task_v1( self, @@ -2006,14 +2009,37 @@ def create_async_inference_task_v1( args: Optional[Dict[str, Any]] = None, callback_url: Optional[str] = None, return_pickled: Optional[bool] = False, - ) -> CreateAsyncTaskResponse: - pass + ) -> Dict[str, Any]: + validate_task_request(url=url, args=args) + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + payload = dict_not_none( + return_pickled=return_pickled, + url=url, + args=args, + callback_url=callback_url, + ) + request = EndpointPredictRequest(**payload) + query_params = frozendict({"model_endpoint_id": model_endpoint_id}) + response = api_instance.create_async_inference_task_v1_async_tasks_post( # type: ignore + body=request, + query_params=query_params, # type: ignore + skip_deserialization=True, + ) + return json.loads(response.response.data) def get_async_inference_task_v1( self, task_id: str, - ) -> GetAsyncTaskResponse: - pass + ) -> Dict[str, Any]: + with ApiClient(self.configuration) as api_client: + api_instance = DefaultApi(api_client) + path_params = frozendict({"task_id": task_id}) + response = api_instance.get_async_inference_task_v1_async_tasks_task_id_get( + path_params=path_params, + skip_deserialization=True, + ) + return json.loads(response.response.data) def _zip_directory(zipf: ZipFile, path: str) -> None: