From 80fb5987b2943a6949e0b570e1146c5906357c87 Mon Sep 17 00:00:00 2001 From: bsuryadevara Date: Tue, 17 Jan 2023 11:25:27 -0600 Subject: [PATCH 1/9] TAO api client initial commit --- morpheus/pipeline/training/tao_client.py | 731 +++++++++++++++++++++++ tests/test_tao_client.py | 78 +++ 2 files changed, 809 insertions(+) create mode 100644 morpheus/pipeline/training/tao_client.py create mode 100644 tests/test_tao_client.py diff --git a/morpheus/pipeline/training/tao_client.py b/morpheus/pipeline/training/tao_client.py new file mode 100644 index 0000000000..fbaeec77ff --- /dev/null +++ b/morpheus/pipeline/training/tao_client.py @@ -0,0 +1,731 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import os +import typing + +import requests + +logger = logging.getLogger("morpheus.{}".format(__name__)) + +_KIND = typing.Literal["model", "dataset"] +_ACTIONS = typing.Literal["convert", "train", "evaluate", "prune", "retrain", "export", "inference"] + + +def generate_schema_url(url, ssl): + if url.startswith("http://") or url.startswith("https://"): + raise ValueError("URL should not include the scheme") + + scheme = "https://" if ssl else "http://" + url = scheme + (url if url[-1] != "/" else url[:-1]) + + return url + + +def vaildate_apikey(apikey): + if not isinstance(apikey, str): + raise ValueError('API key must be a string') + + if not apikey: + raise ValueError('API key can not be an empty string') + + return apikey + + +class TaoApiClient(): + + def __init__(self, + apikey: str, + url: str, + ssl: bool = False, + cert: str = None, + server_side_cert: bool = True, + proxies: typing.Dict[str, str] = None): + + self._apikey = vaildate_apikey(apikey) + self._parsed_url = generate_schema_url(url, ssl) + base_uri = self._parsed_url.rstrip('/') + self._base_uri = f"{base_uri}/api/v1" + + self._session = requests.Session() + + login_creds = self._login() + + self._user_uri = self._base_uri + "/user/" + login_creds.get("user_id") + + if not ssl: + self._session.headers.update({'Authorization': 'Bearer ' + login_creds.get("token")}) + + else: + if server_side_cert: + self._session.verify = cert + self._session.cert = cert + + if proxies: + self._session.proxies.update(proxies) + + def _login(self): + + endpoint = f"{self._base_uri}/login/{self._apikey}" + + logger.debug("Login endpoint: {}".format(endpoint)) + + resp = self._session.get(endpoint) + if not resp.status_code == 200: + raise Exception("Login failed: {}".format(resp.reason)) + + logger.info("Login has been successful!") + + return json.loads(resp.content) + + @property + def base_uri(self): + return self._base_uri + + @property + def user_uri(self): + return self._user_uri + + @property + def session(self): + return self._session + + def create_resource(self, data: typing.Dict, kind: _KIND, **kwargs) -> str: + """ + Create new resource. + + Parameters + ---------- + data : typing.Dict + Initial metadata for new resource. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + **kwargs : + Additional arguments. + Returns + ------- + resource_id: str + Unique identifier for the resource. + """ + + data = json.dumps(data) + + endpoint = f"{self.user_uri}/{kind}" + + logger.debug("reate resource with endpoint: {}".format(endpoint)) + + resp = self.session.post(endpoint, data=data, **kwargs) + if not resp.status_code == 201: + raise Exception("Error creating resource {} with endpoint {}: {}".format(kind, endpoint, resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + resource_id = json_resp.get("id") + + return resource_id + + def partial_update_resource(self, data: typing.Dict, resource_id: str, kind: _KIND, **kwargs) -> typing.Dict: + """ + Partially update the resource. + + Parameters + ---------- + data : typing.Dict + Metadata that needs to be updated. + resource_id: str + Unique identifier for the resource. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + + data = json.dumps(data) + + endpoint = f"{self.user_uri}/{kind}/{resource_id}" + logger.debug("Partially update resource with endpoint: {}".format(endpoint)) + + resp = self.session.patch(endpoint, data=data, **kwargs) + if not resp.status_code == 200: + raise Exception("Unable to partially update resource: {}".format(resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def update_resource(self, data: typing.Dict, resource_id: str, kind: _KIND, **kwargs) -> typing.Dict: + """ + Update the resource. + + Parameters + ---------- + data : typing.Dict + Metadata that needs to be updated. + resource_id: str + Unique identifier for the resource. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + + data = json.dumps(data) + + endpoint = f"{self.user_uri}/{kind}/{resource_id}" + logger.debug("Update resource with endpoint: {}".format(endpoint)) + + resp = self.session.put(endpoint, data=data, **kwargs) + if not resp.status_code == 200: + raise Exception("Unable to update resource: {}".format(resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def upload_resource(self, resource_path: str, resource_id: str, kind: _KIND, **kwargs) -> typing.Dict: + """ + Upload the resource. + + Parameters + ---------- + resource_path : str + The location of the resource to be uploaded. + resource_id: str + Unique identifier for the resource. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + + if os.path.exists(resource_path): + if os.path.isfile(resource_path): + files = [("file", open(resource_path, "rb"))] + else: + raise Exception("Resource path must be a file.") + else: + raise ValueError("Resource path provided does not exists.") + + endpoint = f"{self.user_uri}/{kind}/{resource_id}/upload" + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + resp = self.session.post(endpoint, files=files, **kwargs) + if not resp.status_code == 201: + raise Exception("Unable to upload resource: {}".format(resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def list_resources(self, kind: _KIND, **kwargs) -> typing.Dict: + """ + List available resources by kind. + + Parameters + ---------- + kind : _KIND + Endpoint type that is specific to a model or a dataset. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + + endpoint = f"{self.user_uri}/{kind}" + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + resp = self.session.get(endpoint, **kwargs) + if not resp.status_code == 200: + raise Exception("Unable to list the resources: {}".format(resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def get_specs_schema(self, resource_id: str, kind: _KIND, action: _ACTIONS, **kwargs) -> typing.Dict: + """ + Get specs schema by kind and action. + + Parameters + ---------- + resource_id: str + Unique identifier for the resource. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + action: _ACTIONS + TAO actions. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + + endpoint = f"{self.user_uri}/{kind}/{resource_id}/specs/{action}/schema" + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + resp = self.session.get(endpoint, **kwargs) + if not resp.status_code == 200: + raise Exception("Error getting specs schema: {}".format(resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def get_specs(self, resource_id: str, kind: _KIND, action: _ACTIONS, **kwargs) -> typing.Dict: + """ + Get specs by kind and action. + + Parameters + ---------- + resource_id: str + Unique identifier for the resource. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + action: _ACTIONS + TAO actions. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + + endpoint = f"{self.user_uri}/{kind}/{resource_id}/specs/{action}" + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + resp = self.session.get(endpoint, **kwargs) + if not resp.status_code == 200: + raise Exception("Error getting specs: {}".format(resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def update_specs(self, specs: typing.Dict, resource_id: str, kind: _KIND, action: _ACTIONS, + **kwargs) -> typing.Dict: + """ + Update specs by kind and action. + + Parameters + ---------- + specs: typing.Dict + Updated specs. + resource_id: str + Unique identifier for the resource. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + action: _ACTIONS + TAO actions. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + + endpoint = f"{self.user_uri}/{kind}/{resource_id}/specs/{action}" + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + data = json.dumps(specs) + + resp = self.session.post(endpoint, data=data, **kwargs) + if not resp.status_code == 201: + raise Exception("Unable to update specs: {}".format(resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def save_specs(self, resource_id: str, kind: _KIND, action: _ACTIONS, **kwargs) -> typing.Dict: + """ + Save specs by kind and action. + + Parameters + ---------- + resource_id: str + Unique identifier for the resource. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + action: _ACTIONS + TAO actions. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + + endpoint = f"{self.user_uri}/{kind}/{resource_id}/specs/{action}" + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + resp = self.session.post(endpoint, **kwargs) + if not resp.status_code == 201: + raise Exception("Error saving specs: {}".format(resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def run_job(self, + resource_id: str, + kind: _KIND, + actions: typing.List[str], + parent_job_id: str = None, + **kwargs) -> typing.Dict: + """ + Run job by kind and action. + + Parameters + ---------- + resource_id: str + Unique identifier for the resource. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + action: _ACTIONS + TAO actions. + parent_job_id: str + Parent job id. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + + data = json.dumps({"job": parent_job_id, "actions": actions}) + + endpoint = f"{self.user_uri}/{kind}/{resource_id}/job" + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + resp = self.session.post(endpoint, data=data, **kwargs) + if not resp.status_code == 201: + raise Exception("Unable to run the job: {}".format(resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def get_job_status(self, job_id: str, resource_id: str, kind: _KIND, **kwargs) -> typing.Dict: + """ + Get job status. + + Parameters + ---------- + job_id: str + Unique identifier for the job. + resource_id: str + Unique identifier for the resource. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + + endpoint = f"{self.user_uri}/{kind}/{resource_id}/job/{job_id}" + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + resp = self.session.get(endpoint, **kwargs) + if not resp.status_code == 200: + raise Exception("Unable to retrieve job status: {}".format(resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def list_jobs(self, resource_id: str, kind: _KIND, **kwargs) -> typing.Dict: + """ + List jobs for a given resource by kind. + + Parameters + ---------- + resource_id: str + Unique identifier for the resource. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + + endpoint = f"{self.user_uri}/{kind}/{resource_id}/job" + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + resp = self.session.get(endpoint, **kwargs) + + if not resp.status_code == 200: + raise Exception("Error retrieving list of jobs belongs to {}: {}".format(kind, resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def delete_job(self, resource_id: str, job_id: str, kind: _KIND, **kwargs) -> typing.Dict: + """ + Delete job for a given kind and resource identifier. + + Parameters + ---------- + resource_id: str + Unique identifier for the resource. + job_id: str + Unique identifier for the job. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + + endpoint = f"{self.user_uri}/{kind}/{resource_id}/job/{job_id}" + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + resp = self.session.delete(endpoint, **kwargs) + if not resp.status_code == 200: + raise Exception("Unable to delete job belongs to {} group: {}".format(kind, resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def cancel_job(self, resource_id: str, job_id: str, kind: _KIND, **kwargs) -> typing.Dict: + """ + Cancel job for a given kind and resource identifier. + + Parameters + ---------- + resource_id: str + Unique identifier for the resource. + job_id: str + Unique identifier for the job. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + + endpoint = f"{self.user_uri}/{kind}/{resource_id}/job/{job_id}/cancel" + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + resp = self.session.post(endpoint, **kwargs) + + if not resp.status_code == 200: + raise Exception("Unable to cancel {} job: {}".format(kind, resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def resume_model_job(self, model_id: str, job_id: str, **kwargs) -> typing.Dict: + """ + Resume model job. + + Parameters + ---------- + model_id: str + Unique identifier for the model. + job_id: str + Unique identifier for the job. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + + endpoint = f"{self.user_uri}/model/{model_id}/job/{job_id}/resume" + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + resp = self.session.post(endpoint, **kwargs) + + if not resp.status_code == 200: + raise Exception("Error resuming model job: {}".format(resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def download_resource(self, resource_id, job_id, kind: _KIND, output_dir: str, **kwargs) -> str: + """ + Download resources. + + Parameters + ---------- + resource_id: str + Unique identifier for the resource. + job_id: str + Unique identifier for the job. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + output_dir : str + Output directory to save the downloaded content. + **kwargs : + Additional arguments. + Returns + ------- + downloaded_path : str + The download location's path. + """ + job_status = self.get_job_status(resource_id=resource_id, job_id=job_id, kind=kind) + + status = job_status.get("status") + + if status == "Done": + + endpoint = f'{self.user_uri}/{kind}/{resource_id}/job/{job_id}/download' + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + resp = self.session.get(endpoint, **kwargs) + + if not resp.status_code == 200: + raise Exception("Error downloading the job content: {}".format(resp.content)) + + temptar = f'{job_id}.tar.gz' + + with open(temptar, 'wb') as f: + f.write(resp.content) + logger.debug("Untarring {}...".format(temptar)) + tar_command = f"tar -xvf {temptar} -C {output_dir}/" + os.system(tar_command) + logger.debug("Untarring {}... Done".format(temptar)) + os.remove(temptar) + downloaded_path = f"{output_dir}/{job_id}" + + logger.debug("Results at location {}".format(downloaded_path)) + + return downloaded_path + + logger.info("Resource can be downloaded only when the job is completed. Current status is in {}".format(status)) + + def delete_resource(self, resource_id: str, kind: _KIND, **kwargs) -> typing.Dict: + """ + Delete resource. + + Parameters + ---------- + resource_id: str + Unique identifier for the resource. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + endpoint = f"{self.user_uri}/{kind}/{resource_id}" + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + resp = self.session.delete(endpoint, **kwargs) + + if not resp.status_code == 200: + raise Exception("Error deleting resource from {}: {}".format(kind, resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def retrieve_resource(self, resource_id: str, kind: _KIND, **kwargs) -> typing.Dict: + """ + Retrieve resource metadata. + + Parameters + ---------- + resource_id: str + Unique identifier for the resource. + kind : _KIND + Endpoint type that is specific to a model or a dataset. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + endpoint = f"{self.user_uri}/{kind}/{resource_id}" + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + resp = self.session.get(endpoint, **kwargs) + + if not resp.status_code == 200: + raise Exception("Error retrieving resource from {}: {}".format(kind, resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + + def close(self): + """ + Closes session. + """ + session = getattr(self, '_session', None) + if session: + logger.debug("Closing session...") + session.close() + self._session = None + logger.debug("Closing session... Done") diff --git a/tests/test_tao_client.py b/tests/test_tao_client.py new file mode 100644 index 0000000000..cb45fd83af --- /dev/null +++ b/tests/test_tao_client.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest.mock import MagicMock + +import pytest +from requests.models import Response + +from morpheus.pipeline.training.tao_client import TaoApiClient +from morpheus.pipeline.training.tao_client import generate_schema_url +from morpheus.pipeline.training.tao_client import vaildate_apikey + + +def test_generate_schema_url(): + + actual = generate_schema_url(url="localhost:32080", ssl=False) + expected = "http://localhost:32080" + + assert actual == expected + + with pytest.raises(ValueError): + generate_schema_url(url="http://localhost:32080", ssl=False) + + with pytest.raises(ValueError): + generate_schema_url(url="https://localhost:32080", ssl=True) + + actual = generate_schema_url(url="localhost:32080", ssl=False) + expected = "http://localhost:32080" + assert actual == expected + + actual = generate_schema_url(url="localhost:32080", ssl=True) + expected = "https://localhost:32080" + assert actual == expected + + +def test_vaildate_apikey(): + + vaildate_apikey("test_api_key") + + with pytest.raises(ValueError): + vaildate_apikey("") + + with pytest.raises(ValueError): + vaildate_apikey(123459) + + +def test_create_resource(): + tao_client = TaoApiClient("test_api_key", "localhost:32080") + + mock_creds = {"user_id": "X20109876", "token": "TOkJJTkw6WkxKRDpNWk9ZOkRVN0o6"} + + mock_response = Response() + mock_response.status_code = 201 + mock_response._content = b'{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "created_on": "2023-01-17T15:35:08.014463"}' + + tao_client.get_login_creds = MagicMock(return_value=mock_creds) + tao_client.session.post = MagicMock(return_value=Response()) + ds_type = "object_detection" + ds_format = "kitti" + + data = {"type": ds_type, "format": ds_format} + + actual_resource_id = tao_client.create_resource(kind="dataset", data=data) + + assert actual_resource_id == "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" From 49307d005fb2ff2eda556ab1e1bfafc450585d20 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Tue, 17 Jan 2023 22:29:09 +0000 Subject: [PATCH 2/9] added unit tests --- morpheus/pipeline/training/tao_client.py | 231 ++++++++++++++++------- tests/test_tao_client.py | 168 ++++++++++++++++- 2 files changed, 320 insertions(+), 79 deletions(-) diff --git a/morpheus/pipeline/training/tao_client.py b/morpheus/pipeline/training/tao_client.py index fbaeec77ff..c80ecd752f 100644 --- a/morpheus/pipeline/training/tao_client.py +++ b/morpheus/pipeline/training/tao_client.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import functools import json import logging import os @@ -22,7 +23,77 @@ logger = logging.getLogger("morpheus.{}".format(__name__)) _KIND = typing.Literal["model", "dataset"] -_ACTIONS = typing.Literal["convert", "train", "evaluate", "prune", "retrain", "export", "inference"] +_DATASET_ACTIONS = typing.Literal["convert", "convert_index", "convert_efficientdet"] +_MODEL_ACTIONS = typing.Literal["train", "evaluate", "prune", "retrain", "export", "inference"] + + +def validate_kind(func): + """ + Validates given endpoint category. + + Parameters + ---------- + func : Function that requires wrapping. + Returns + ------- + inner_func + Encapsulated function. + """ + + @functools.wraps(func) + def inner_func(*args, **kwargs): + if len(args) < 2: + raise ValueError("Kind not found. Select from available kinds: {}".format(_KIND)) + kind = args[1] + + if kind is None: + raise TypeError("TypeError: a string-like object is required for kind, not 'NoneType'") + if kind not in typing.get_args(_KIND): + raise ValueError("Invalid kind '{}'. Available kinds are {}".format(kind, _KIND)) + return func(*args, **kwargs) + + return inner_func + +def validate_actions(func): + """ + Validates TAO actions. + + Parameters + ---------- + func : Function that requires wrapping. + Returns + ------- + inner_func + Encapsulated function. + """ + + @functools.wraps(func) + def inner_func(*args, **kwargs): + + actions_by_kind = _DATASET_ACTIONS + if args[1] == "model": + actions_by_kind = _MODEL_ACTIONS + + if len(args) < 3: + raise ValueError("Actions not found. Select from available actions: {}".format(actions_by_kind)) + + actions = args[2] + + if actions is None: + raise TypeError("TypeError: a string-like object is required for an action, not 'NoneType'") + + availablestr = typing.get_args(actions_by_kind) + + if isinstance(actions, list): + if not set(actions).issubset(availablestr): + raise ValueError("One or more actions are not valid actions '{}'. Available actions are {}".format(actions, actions_by_kind)) + else: + if actions not in availablestr: + raise ValueError("Invalid action '{}'. Available actions are {}".format(actions, actions_by_kind)) + + return func(*args, **kwargs) + + return inner_func def generate_schema_url(url, ssl): @@ -57,39 +128,37 @@ def __init__(self, self._apikey = vaildate_apikey(apikey) self._parsed_url = generate_schema_url(url, ssl) - base_uri = self._parsed_url.rstrip('/') - self._base_uri = f"{base_uri}/api/v1" + self._base_uri = f"{self._parsed_url}/api/v1" + self._ssl = ssl + self._user_uri = None self._session = requests.Session() - - login_creds = self._login() - - self._user_uri = self._base_uri + "/user/" + login_creds.get("user_id") - - if not ssl: - self._session.headers.update({'Authorization': 'Bearer ' + login_creds.get("token")}) - - else: - if server_side_cert: - self._session.verify = cert - self._session.cert = cert + + if server_side_cert: + self._session.verify = cert + self._session.cert = cert if proxies: self._session.proxies.update(proxies) - def _login(self): + def authorize(self): endpoint = f"{self._base_uri}/login/{self._apikey}" logger.debug("Login endpoint: {}".format(endpoint)) - resp = self._session.get(endpoint) + resp = self.session.get(endpoint) if not resp.status_code == 200: raise Exception("Login failed: {}".format(resp.reason)) logger.info("Login has been successful!") - return json.loads(resp.content) + json_resp = resp.json() + + self._user_uri = self._base_uri + "/user/" + json_resp.get("user_id") + + if not self._ssl: + self._session.headers.update({'Authorization': 'Bearer ' + json_resp.get("token")}) @property def base_uri(self): @@ -102,8 +171,9 @@ def user_uri(self): @property def session(self): return self._session - - def create_resource(self, data: typing.Dict, kind: _KIND, **kwargs) -> str: + + @validate_kind + def create_resource(self, kind: _KIND, data: typing.Dict, **kwargs) -> str: """ Create new resource. @@ -125,7 +195,7 @@ def create_resource(self, data: typing.Dict, kind: _KIND, **kwargs) -> str: endpoint = f"{self.user_uri}/{kind}" - logger.debug("reate resource with endpoint: {}".format(endpoint)) + logger.debug("create resource with endpoint: {}".format(endpoint)) resp = self.session.post(endpoint, data=data, **kwargs) if not resp.status_code == 201: @@ -137,8 +207,9 @@ def create_resource(self, data: typing.Dict, kind: _KIND, **kwargs) -> str: resource_id = json_resp.get("id") return resource_id - - def partial_update_resource(self, data: typing.Dict, resource_id: str, kind: _KIND, **kwargs) -> typing.Dict: + + @validate_kind + def partial_update_resource(self, kind: _KIND, data: typing.Dict, resource_id: str, **kwargs) -> typing.Dict: """ Partially update the resource. @@ -171,8 +242,9 @@ def partial_update_resource(self, data: typing.Dict, resource_id: str, kind: _KI logger.debug("Response: {}".format(json_resp)) return json_resp - - def update_resource(self, data: typing.Dict, resource_id: str, kind: _KIND, **kwargs) -> typing.Dict: + + @validate_kind + def update_resource(self, kind: _KIND, data: typing.Dict, resource_id: str, **kwargs) -> typing.Dict: """ Update the resource. @@ -205,8 +277,9 @@ def update_resource(self, data: typing.Dict, resource_id: str, kind: _KIND, **kw logger.debug("Response: {}".format(json_resp)) return json_resp - - def upload_resource(self, resource_path: str, resource_id: str, kind: _KIND, **kwargs) -> typing.Dict: + + @validate_kind + def upload_resource(self, kind: _KIND, resource_path: str, resource_id: str, **kwargs) -> typing.Dict: """ Upload the resource. @@ -245,7 +318,8 @@ def upload_resource(self, resource_path: str, resource_id: str, kind: _KIND, **k logger.debug("Response: {}".format(json_resp)) return json_resp - + + @validate_kind def list_resources(self, kind: _KIND, **kwargs) -> typing.Dict: """ List available resources by kind. @@ -273,8 +347,10 @@ def list_resources(self, kind: _KIND, **kwargs) -> typing.Dict: logger.debug("Response: {}".format(json_resp)) return json_resp - - def get_specs_schema(self, resource_id: str, kind: _KIND, action: _ACTIONS, **kwargs) -> typing.Dict: + + @validate_kind + @validate_actions + def get_specs_schema(self, kind: _KIND, action: str, resource_id: str, **kwargs) -> typing.Dict: """ Get specs schema by kind and action. @@ -284,7 +360,7 @@ def get_specs_schema(self, resource_id: str, kind: _KIND, action: _ACTIONS, **kw Unique identifier for the resource. kind : _KIND Endpoint type that is specific to a model or a dataset. - action: _ACTIONS + action: str TAO actions. **kwargs : Additional arguments. @@ -305,8 +381,10 @@ def get_specs_schema(self, resource_id: str, kind: _KIND, action: _ACTIONS, **kw logger.debug("Response: {}".format(json_resp)) return json_resp - - def get_specs(self, resource_id: str, kind: _KIND, action: _ACTIONS, **kwargs) -> typing.Dict: + + @validate_kind + @validate_actions + def get_specs(self, kind: _KIND, action: str, resource_id: str, **kwargs) -> typing.Dict: """ Get specs by kind and action. @@ -316,7 +394,7 @@ def get_specs(self, resource_id: str, kind: _KIND, action: _ACTIONS, **kwargs) - Unique identifier for the resource. kind : _KIND Endpoint type that is specific to a model or a dataset. - action: _ACTIONS + action: str TAO actions. **kwargs : Additional arguments. @@ -337,8 +415,10 @@ def get_specs(self, resource_id: str, kind: _KIND, action: _ACTIONS, **kwargs) - logger.debug("Response: {}".format(json_resp)) return json_resp - - def update_specs(self, specs: typing.Dict, resource_id: str, kind: _KIND, action: _ACTIONS, + + @validate_kind + @validate_actions + def update_specs(self, kind: _KIND, action: str, specs: typing.Dict, resource_id: str, **kwargs) -> typing.Dict: """ Update specs by kind and action. @@ -351,7 +431,7 @@ def update_specs(self, specs: typing.Dict, resource_id: str, kind: _KIND, action Unique identifier for the resource. kind : _KIND Endpoint type that is specific to a model or a dataset. - action: _ACTIONS + action: str TAO actions. **kwargs : Additional arguments. @@ -374,8 +454,10 @@ def update_specs(self, specs: typing.Dict, resource_id: str, kind: _KIND, action logger.debug("Response: {}".format(json_resp)) return json_resp - - def save_specs(self, resource_id: str, kind: _KIND, action: _ACTIONS, **kwargs) -> typing.Dict: + + @validate_kind + @validate_actions + def save_specs(self, kind: _KIND, action: str, resource_id: str, **kwargs) -> typing.Dict: """ Save specs by kind and action. @@ -385,7 +467,7 @@ def save_specs(self, resource_id: str, kind: _KIND, action: _ACTIONS, **kwargs) Unique identifier for the resource. kind : _KIND Endpoint type that is specific to a model or a dataset. - action: _ACTIONS + action: str TAO actions. **kwargs : Additional arguments. @@ -406,11 +488,13 @@ def save_specs(self, resource_id: str, kind: _KIND, action: _ACTIONS, **kwargs) logger.debug("Response: {}".format(json_resp)) return json_resp - + + @validate_kind + @validate_actions def run_job(self, - resource_id: str, kind: _KIND, actions: typing.List[str], + resource_id: str, parent_job_id: str = None, **kwargs) -> typing.Dict: """ @@ -422,7 +506,7 @@ def run_job(self, Unique identifier for the resource. kind : _KIND Endpoint type that is specific to a model or a dataset. - action: _ACTIONS + action: str TAO actions. parent_job_id: str Parent job id. @@ -430,8 +514,8 @@ def run_job(self, Additional arguments. Returns ------- - json_resp : typing.Dict - JSON response. + job_ids : typing.List[str] + List of job id's by actions. """ data = json.dumps({"job": parent_job_id, "actions": actions}) @@ -443,12 +527,13 @@ def run_job(self, if not resp.status_code == 201: raise Exception("Unable to run the job: {}".format(resp.content)) - json_resp = resp.json() - logger.debug("Response: {}".format(json_resp)) + job_ids = resp.json() + logger.debug("Response: {}".format(job_ids)) - return json_resp - - def get_job_status(self, job_id: str, resource_id: str, kind: _KIND, **kwargs) -> typing.Dict: + return job_ids + + @validate_kind + def get_job_status(self, kind: _KIND, resource_id: str, job_id: str, **kwargs) -> typing.Dict: """ Get job status. @@ -479,8 +564,9 @@ def get_job_status(self, job_id: str, resource_id: str, kind: _KIND, **kwargs) - logger.debug("Response: {}".format(json_resp)) return json_resp - - def list_jobs(self, resource_id: str, kind: _KIND, **kwargs) -> typing.Dict: + + @validate_kind + def list_jobs(self, kind: _KIND, resource_id: str, **kwargs) -> typing.Dict: """ List jobs for a given resource by kind. @@ -510,8 +596,9 @@ def list_jobs(self, resource_id: str, kind: _KIND, **kwargs) -> typing.Dict: logger.debug("Response: {}".format(json_resp)) return json_resp - - def delete_job(self, resource_id: str, job_id: str, kind: _KIND, **kwargs) -> typing.Dict: + + @validate_kind + def delete_job(self, kind: _KIND, resource_id: str, job_id: str, **kwargs) -> typing.Dict: """ Delete job for a given kind and resource identifier. @@ -542,8 +629,9 @@ def delete_job(self, resource_id: str, job_id: str, kind: _KIND, **kwargs) -> ty logger.debug("Response: {}".format(json_resp)) return json_resp - - def cancel_job(self, resource_id: str, job_id: str, kind: _KIND, **kwargs) -> typing.Dict: + + @validate_kind + def cancel_job(self, kind: _KIND, resource_id: str, job_id: str, **kwargs) -> typing.Dict: """ Cancel job for a given kind and resource identifier. @@ -606,8 +694,9 @@ def resume_model_job(self, model_id: str, job_id: str, **kwargs) -> typing.Dict: logger.debug("Response: {}".format(json_resp)) return json_resp - - def download_resource(self, resource_id, job_id, kind: _KIND, output_dir: str, **kwargs) -> str: + + @validate_kind + def download_resource(self, kind: _KIND, resource_id, job_id, output_dir: str, **kwargs) -> str: """ Download resources. @@ -628,7 +717,7 @@ def download_resource(self, resource_id, job_id, kind: _KIND, output_dir: str, * downloaded_path : str The download location's path. """ - job_status = self.get_job_status(resource_id=resource_id, job_id=job_id, kind=kind) + job_status = self.get_job_status(kind, resource_id=resource_id, job_id=job_id) status = job_status.get("status") @@ -642,15 +731,15 @@ def download_resource(self, resource_id, job_id, kind: _KIND, output_dir: str, * if not resp.status_code == 200: raise Exception("Error downloading the job content: {}".format(resp.content)) - temptar = f'{job_id}.tar.gz' + temp_tar = f'{job_id}.tar.gz' - with open(temptar, 'wb') as f: + with open(temp_tar, 'wb') as f: f.write(resp.content) - logger.debug("Untarring {}...".format(temptar)) - tar_command = f"tar -xvf {temptar} -C {output_dir}/" + logger.debug("Untarring {}...".format(temp_tar)) + tar_command = f"tar -xvf {temp_tar} -C {output_dir}/" os.system(tar_command) - logger.debug("Untarring {}... Done".format(temptar)) - os.remove(temptar) + logger.debug("Untarring {}... Done".format(temp_tar)) + os.remove(temp_tar) downloaded_path = f"{output_dir}/{job_id}" logger.debug("Results at location {}".format(downloaded_path)) @@ -658,8 +747,9 @@ def download_resource(self, resource_id, job_id, kind: _KIND, output_dir: str, * return downloaded_path logger.info("Resource can be downloaded only when the job is completed. Current status is in {}".format(status)) - - def delete_resource(self, resource_id: str, kind: _KIND, **kwargs) -> typing.Dict: + + @validate_kind + def delete_resource(self, kind: _KIND, resource_id: str, **kwargs) -> typing.Dict: """ Delete resource. @@ -688,8 +778,9 @@ def delete_resource(self, resource_id: str, kind: _KIND, **kwargs) -> typing.Dic logger.debug("Response: {}".format(json_resp)) return json_resp - - def retrieve_resource(self, resource_id: str, kind: _KIND, **kwargs) -> typing.Dict: + + @validate_kind + def retrieve_resource(self, kind: _KIND, resource_id: str, **kwargs) -> typing.Dict: """ Retrieve resource metadata. @@ -728,4 +819,4 @@ def close(self): logger.debug("Closing session...") session.close() self._session = None - logger.debug("Closing session... Done") + logger.debug("Closing session... Done") \ No newline at end of file diff --git a/tests/test_tao_client.py b/tests/test_tao_client.py index cb45fd83af..688c7e2b8d 100644 --- a/tests/test_tao_client.py +++ b/tests/test_tao_client.py @@ -18,12 +18,13 @@ import pytest from requests.models import Response +from requests.sessions import Session from morpheus.pipeline.training.tao_client import TaoApiClient from morpheus.pipeline.training.tao_client import generate_schema_url from morpheus.pipeline.training.tao_client import vaildate_apikey - +@pytest.mark.use_python def test_generate_schema_url(): actual = generate_schema_url(url="localhost:32080", ssl=False) @@ -45,7 +46,7 @@ def test_generate_schema_url(): expected = "https://localhost:32080" assert actual == expected - +@pytest.mark.use_python def test_vaildate_apikey(): vaildate_apikey("test_api_key") @@ -56,23 +57,172 @@ def test_vaildate_apikey(): with pytest.raises(ValueError): vaildate_apikey(123459) - -def test_create_resource(): +@pytest.mark.use_python +def get_tao_client(): + mock_creds = {"user_id": "X20109876", "token": "TOkJJTkw6WkxKRDpNWk9ZOkRVN0o6"} tao_client = TaoApiClient("test_api_key", "localhost:32080") + tao_client.authorize = MagicMock(return_value=mock_creds) - mock_creds = {"user_id": "X20109876", "token": "TOkJJTkw6WkxKRDpNWk9ZOkRVN0o6"} + return tao_client + +@pytest.mark.use_python +def test_create_dataset_resource(): + tao_client = get_tao_client() mock_response = Response() mock_response.status_code = 201 mock_response._content = b'{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "created_on": "2023-01-17T15:35:08.014463"}' - tao_client.get_login_creds = MagicMock(return_value=mock_creds) - tao_client.session.post = MagicMock(return_value=Response()) + tao_client.session.post = MagicMock(return_value=mock_response) + ds_type = "object_detection" ds_format = "kitti" data = {"type": ds_type, "format": ds_format} - actual_resource_id = tao_client.create_resource(kind="dataset", data=data) + resource_id = tao_client.create_resource("dataset", data) + + with pytest.raises(ValueError): + tao_client.create_resource("test", data=data) + + assert resource_id == "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" + + mock_response2 = Response() + mock_response2.status_code = 400 + mock_response2._content = b'{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "created_on": "2023-01-17T15:35:08.014463"}' + + tao_client.session.post = MagicMock(return_value=mock_response2) + + with pytest.raises(Exception): + tao_client.create_resource("dataset", data=data) + +@pytest.mark.use_python +def test_create_model_resource(): + tao_client = get_tao_client() + + mock_response = Response() + mock_response.status_code = 201 + mock_response._content = b'{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "created_on": "2023-01-17T15:35:08.014463"}' + + tao_client.session.post = MagicMock(return_value=mock_response) + + network_arch = "detectnet_v2" + encode_key = "tlt_encode" + data = {"network_arch":network_arch,"encryption_key":encode_key, "description": "My model"} + + resource_id = tao_client.create_resource("model", data) + + assert resource_id == "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" + + with pytest.raises(ValueError): + tao_client.create_resource("random_kind", data=data) + +@pytest.mark.use_python +def test_partial_update_resource(): + + tao_client = get_tao_client() + + mock_response = Response() + mock_response.status_code = 200 + mock_response._content = b'{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "created_on": "2023-01-17T15:35:08.014463"}' + + tao_client.session.patch = MagicMock(return_value=mock_response) + + data = {"name":"Train dataset", "description":"My train dataset with kitti"} + + resp_json = tao_client.partial_update_resource("dataset", resource_id="eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", data=data) + + assert resp_json.get("id") == "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" + + mock_response.status_code = 401 + tao_client.session.patch = MagicMock(return_value=mock_response) + + with pytest.raises(Exception): + tao_client.create_resource("dataset", data=data) + +def test_update_resource(): + + tao_client = get_tao_client() + + mock_response = Response() + mock_response.status_code = 200 + mock_response._content = b'{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "created_on": "2023-01-17T15:35:08.014463"}' + + tao_client.session.put = MagicMock(return_value=mock_response) + + data = {"name":"Train dataset", "description":"My train dataset with kitti"} + + resp_json = tao_client.update_resource("dataset", resource_id="eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", data=data) + + assert resp_json.get("id") == "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" + assert isinstance(resp_json, dict) + +@pytest.mark.use_python +def test_get_specs_schema(): + tao_client = get_tao_client() + + mock_response = Response() + mock_response.status_code = 200 + + mock_response._content = b'{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "created_on": "2023-01-17T15:35:08.014463"}' + + resource_id="eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" + + tao_client.session.get = MagicMock(return_value=mock_response) + + resp_json = tao_client.get_specs_schema("dataset", "convert", resource_id=resource_id) + + with pytest.raises(ValueError): + tao_client.get_specs_schema("dataset", "tmp_convert", resource_id=resource_id) + + assert resp_json.get("id") == "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" + assert isinstance(resp_json, dict) + + +@pytest.mark.use_python +def test_close(): + tao_client = get_tao_client() + + session = tao_client.session + assert isinstance(session, Session) + + tao_client.close() + assert tao_client.session is None + +@pytest.mark.use_python +def test_upload_resource(tmpdir): + input_data = tmpdir.join("input_dataset.txt") + + with open(input_data, 'w') as fh: + fh.write("This is a training data.") + + tao_client = get_tao_client() + + mock_response = Response() + mock_response.status_code = 201 + mock_response._content = b'{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "created_on": "2023-01-17T15:35:08.014463"}' + + resource_id="eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" + + tao_client.session.post = MagicMock(return_value=mock_response) + + resp_json = tao_client.upload_resource("dataset", resource_path=input_data, resource_id=resource_id) + + assert resp_json.get("id") == "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" + assert isinstance(resp_json, dict) + +@pytest.mark.use_python +def test_download_resource(tmpdir): + tao_client = get_tao_client() + + mock_response = Response() + mock_response.status_code = 200 + mock_response._content = b'{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "parent_id" : "None", "status": "Pending", "created_on": "2023-01-17T15:35:08.014463"}' + + resource_id="eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" + + tao_client.session.get = MagicMock(return_value=mock_response) + + resp_json = tao_client.download_resource("dataset", resource_id=resource_id, job_id="test_235678", output_dir=tmpdir) - assert actual_resource_id == "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" + assert resp_json is None From 7e5ff7152c37c6f54c3d85d8106414855848a62c Mon Sep 17 00:00:00 2001 From: bsuryadevara Date: Tue, 17 Jan 2023 16:35:27 -0600 Subject: [PATCH 3/9] removed unused imports --- morpheus/pipeline/training/tao_client.py | 63 ++++++++++++------------ tests/test_tao_client.py | 41 +++++++++------ 2 files changed, 59 insertions(+), 45 deletions(-) diff --git a/morpheus/pipeline/training/tao_client.py b/morpheus/pipeline/training/tao_client.py index c80ecd752f..a6df939d53 100644 --- a/morpheus/pipeline/training/tao_client.py +++ b/morpheus/pipeline/training/tao_client.py @@ -24,7 +24,7 @@ _KIND = typing.Literal["model", "dataset"] _DATASET_ACTIONS = typing.Literal["convert", "convert_index", "convert_efficientdet"] -_MODEL_ACTIONS = typing.Literal["train", "evaluate", "prune", "retrain", "export", "inference"] +_MODEL_ACTIONS = typing.Literal["train", "evaluate", "prune", "retrain", "export", "inference"] def validate_kind(func): @@ -47,13 +47,14 @@ def inner_func(*args, **kwargs): kind = args[1] if kind is None: - raise TypeError("TypeError: a string-like object is required for kind, not 'NoneType'") + raise TypeError("TypeError: a string-like object is required for kind, not 'NoneType'") if kind not in typing.get_args(_KIND): raise ValueError("Invalid kind '{}'. Available kinds are {}".format(kind, _KIND)) return func(*args, **kwargs) return inner_func + def validate_actions(func): """ Validates TAO actions. @@ -69,24 +70,25 @@ def validate_actions(func): @functools.wraps(func) def inner_func(*args, **kwargs): - + actions_by_kind = _DATASET_ACTIONS if args[1] == "model": actions_by_kind = _MODEL_ACTIONS - + if len(args) < 3: raise ValueError("Actions not found. Select from available actions: {}".format(actions_by_kind)) - + actions = args[2] if actions is None: raise TypeError("TypeError: a string-like object is required for an action, not 'NoneType'") - + availablestr = typing.get_args(actions_by_kind) - + if isinstance(actions, list): if not set(actions).issubset(availablestr): - raise ValueError("One or more actions are not valid actions '{}'. Available actions are {}".format(actions, actions_by_kind)) + raise ValueError("One or more actions are not valid actions '{}'. Available actions are {}".format( + actions, actions_by_kind)) else: if actions not in availablestr: raise ValueError("Invalid action '{}'. Available actions are {}".format(actions, actions_by_kind)) @@ -133,7 +135,7 @@ def __init__(self, self._user_uri = None self._session = requests.Session() - + if server_side_cert: self._session.verify = cert self._session.cert = cert @@ -158,7 +160,7 @@ def authorize(self): self._user_uri = self._base_uri + "/user/" + json_resp.get("user_id") if not self._ssl: - self._session.headers.update({'Authorization': 'Bearer ' + json_resp.get("token")}) + self._session.headers.update({'Authorization': 'Bearer ' + json_resp.get("token")}) @property def base_uri(self): @@ -171,7 +173,7 @@ def user_uri(self): @property def session(self): return self._session - + @validate_kind def create_resource(self, kind: _KIND, data: typing.Dict, **kwargs) -> str: """ @@ -207,7 +209,7 @@ def create_resource(self, kind: _KIND, data: typing.Dict, **kwargs) -> str: resource_id = json_resp.get("id") return resource_id - + @validate_kind def partial_update_resource(self, kind: _KIND, data: typing.Dict, resource_id: str, **kwargs) -> typing.Dict: """ @@ -242,7 +244,7 @@ def partial_update_resource(self, kind: _KIND, data: typing.Dict, resource_id: s logger.debug("Response: {}".format(json_resp)) return json_resp - + @validate_kind def update_resource(self, kind: _KIND, data: typing.Dict, resource_id: str, **kwargs) -> typing.Dict: """ @@ -277,7 +279,7 @@ def update_resource(self, kind: _KIND, data: typing.Dict, resource_id: str, **kw logger.debug("Response: {}".format(json_resp)) return json_resp - + @validate_kind def upload_resource(self, kind: _KIND, resource_path: str, resource_id: str, **kwargs) -> typing.Dict: """ @@ -318,7 +320,7 @@ def upload_resource(self, kind: _KIND, resource_path: str, resource_id: str, **k logger.debug("Response: {}".format(json_resp)) return json_resp - + @validate_kind def list_resources(self, kind: _KIND, **kwargs) -> typing.Dict: """ @@ -347,7 +349,7 @@ def list_resources(self, kind: _KIND, **kwargs) -> typing.Dict: logger.debug("Response: {}".format(json_resp)) return json_resp - + @validate_kind @validate_actions def get_specs_schema(self, kind: _KIND, action: str, resource_id: str, **kwargs) -> typing.Dict: @@ -381,7 +383,7 @@ def get_specs_schema(self, kind: _KIND, action: str, resource_id: str, **kwargs) logger.debug("Response: {}".format(json_resp)) return json_resp - + @validate_kind @validate_actions def get_specs(self, kind: _KIND, action: str, resource_id: str, **kwargs) -> typing.Dict: @@ -415,11 +417,10 @@ def get_specs(self, kind: _KIND, action: str, resource_id: str, **kwargs) -> typ logger.debug("Response: {}".format(json_resp)) return json_resp - + @validate_kind @validate_actions - def update_specs(self, kind: _KIND, action: str, specs: typing.Dict, resource_id: str, - **kwargs) -> typing.Dict: + def update_specs(self, kind: _KIND, action: str, specs: typing.Dict, resource_id: str, **kwargs) -> typing.Dict: """ Update specs by kind and action. @@ -454,7 +455,7 @@ def update_specs(self, kind: _KIND, action: str, specs: typing.Dict, resource_id logger.debug("Response: {}".format(json_resp)) return json_resp - + @validate_kind @validate_actions def save_specs(self, kind: _KIND, action: str, resource_id: str, **kwargs) -> typing.Dict: @@ -488,7 +489,7 @@ def save_specs(self, kind: _KIND, action: str, resource_id: str, **kwargs) -> ty logger.debug("Response: {}".format(json_resp)) return json_resp - + @validate_kind @validate_actions def run_job(self, @@ -515,7 +516,7 @@ def run_job(self, Returns ------- job_ids : typing.List[str] - List of job id's by actions. + List of job id's by actions. """ data = json.dumps({"job": parent_job_id, "actions": actions}) @@ -531,7 +532,7 @@ def run_job(self, logger.debug("Response: {}".format(job_ids)) return job_ids - + @validate_kind def get_job_status(self, kind: _KIND, resource_id: str, job_id: str, **kwargs) -> typing.Dict: """ @@ -564,7 +565,7 @@ def get_job_status(self, kind: _KIND, resource_id: str, job_id: str, **kwargs) - logger.debug("Response: {}".format(json_resp)) return json_resp - + @validate_kind def list_jobs(self, kind: _KIND, resource_id: str, **kwargs) -> typing.Dict: """ @@ -596,7 +597,7 @@ def list_jobs(self, kind: _KIND, resource_id: str, **kwargs) -> typing.Dict: logger.debug("Response: {}".format(json_resp)) return json_resp - + @validate_kind def delete_job(self, kind: _KIND, resource_id: str, job_id: str, **kwargs) -> typing.Dict: """ @@ -629,7 +630,7 @@ def delete_job(self, kind: _KIND, resource_id: str, job_id: str, **kwargs) -> ty logger.debug("Response: {}".format(json_resp)) return json_resp - + @validate_kind def cancel_job(self, kind: _KIND, resource_id: str, job_id: str, **kwargs) -> typing.Dict: """ @@ -694,7 +695,7 @@ def resume_model_job(self, model_id: str, job_id: str, **kwargs) -> typing.Dict: logger.debug("Response: {}".format(json_resp)) return json_resp - + @validate_kind def download_resource(self, kind: _KIND, resource_id, job_id, output_dir: str, **kwargs) -> str: """ @@ -747,7 +748,7 @@ def download_resource(self, kind: _KIND, resource_id, job_id, output_dir: str, * return downloaded_path logger.info("Resource can be downloaded only when the job is completed. Current status is in {}".format(status)) - + @validate_kind def delete_resource(self, kind: _KIND, resource_id: str, **kwargs) -> typing.Dict: """ @@ -778,7 +779,7 @@ def delete_resource(self, kind: _KIND, resource_id: str, **kwargs) -> typing.Dic logger.debug("Response: {}".format(json_resp)) return json_resp - + @validate_kind def retrieve_resource(self, kind: _KIND, resource_id: str, **kwargs) -> typing.Dict: """ @@ -819,4 +820,4 @@ def close(self): logger.debug("Closing session...") session.close() self._session = None - logger.debug("Closing session... Done") \ No newline at end of file + logger.debug("Closing session... Done") diff --git a/tests/test_tao_client.py b/tests/test_tao_client.py index 688c7e2b8d..ec4326469c 100644 --- a/tests/test_tao_client.py +++ b/tests/test_tao_client.py @@ -24,6 +24,7 @@ from morpheus.pipeline.training.tao_client import generate_schema_url from morpheus.pipeline.training.tao_client import vaildate_apikey + @pytest.mark.use_python def test_generate_schema_url(): @@ -46,6 +47,7 @@ def test_generate_schema_url(): expected = "https://localhost:32080" assert actual == expected + @pytest.mark.use_python def test_vaildate_apikey(): @@ -57,6 +59,7 @@ def test_vaildate_apikey(): with pytest.raises(ValueError): vaildate_apikey(123459) + @pytest.mark.use_python def get_tao_client(): mock_creds = {"user_id": "X20109876", "token": "TOkJJTkw6WkxKRDpNWk9ZOkRVN0o6"} @@ -65,6 +68,7 @@ def get_tao_client(): return tao_client + @pytest.mark.use_python def test_create_dataset_resource(): tao_client = get_tao_client() @@ -90,12 +94,13 @@ def test_create_dataset_resource(): mock_response2 = Response() mock_response2.status_code = 400 mock_response2._content = b'{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "created_on": "2023-01-17T15:35:08.014463"}' - + tao_client.session.post = MagicMock(return_value=mock_response2) with pytest.raises(Exception): tao_client.create_resource("dataset", data=data) + @pytest.mark.use_python def test_create_model_resource(): tao_client = get_tao_client() @@ -108,7 +113,7 @@ def test_create_model_resource(): network_arch = "detectnet_v2" encode_key = "tlt_encode" - data = {"network_arch":network_arch,"encryption_key":encode_key, "description": "My model"} + data = {"network_arch": network_arch, "encryption_key": encode_key, "description": "My model"} resource_id = tao_client.create_resource("model", data) @@ -117,9 +122,10 @@ def test_create_model_resource(): with pytest.raises(ValueError): tao_client.create_resource("random_kind", data=data) + @pytest.mark.use_python def test_partial_update_resource(): - + tao_client = get_tao_client() mock_response = Response() @@ -128,7 +134,7 @@ def test_partial_update_resource(): tao_client.session.patch = MagicMock(return_value=mock_response) - data = {"name":"Train dataset", "description":"My train dataset with kitti"} + data = {"name": "Train dataset", "description": "My train dataset with kitti"} resp_json = tao_client.partial_update_resource("dataset", resource_id="eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", data=data) @@ -140,8 +146,9 @@ def test_partial_update_resource(): with pytest.raises(Exception): tao_client.create_resource("dataset", data=data) + def test_update_resource(): - + tao_client = get_tao_client() mock_response = Response() @@ -150,13 +157,14 @@ def test_update_resource(): tao_client.session.put = MagicMock(return_value=mock_response) - data = {"name":"Train dataset", "description":"My train dataset with kitti"} + data = {"name": "Train dataset", "description": "My train dataset with kitti"} resp_json = tao_client.update_resource("dataset", resource_id="eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", data=data) assert resp_json.get("id") == "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" assert isinstance(resp_json, dict) + @pytest.mark.use_python def test_get_specs_schema(): tao_client = get_tao_client() @@ -165,15 +173,15 @@ def test_get_specs_schema(): mock_response.status_code = 200 mock_response._content = b'{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "created_on": "2023-01-17T15:35:08.014463"}' - - resource_id="eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" - + + resource_id = "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" + tao_client.session.get = MagicMock(return_value=mock_response) resp_json = tao_client.get_specs_schema("dataset", "convert", resource_id=resource_id) with pytest.raises(ValueError): - tao_client.get_specs_schema("dataset", "tmp_convert", resource_id=resource_id) + tao_client.get_specs_schema("dataset", "tmp_convert", resource_id=resource_id) assert resp_json.get("id") == "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" assert isinstance(resp_json, dict) @@ -183,12 +191,13 @@ def test_get_specs_schema(): def test_close(): tao_client = get_tao_client() - session = tao_client.session + session = tao_client.session assert isinstance(session, Session) tao_client.close() assert tao_client.session is None + @pytest.mark.use_python def test_upload_resource(tmpdir): input_data = tmpdir.join("input_dataset.txt") @@ -202,7 +211,7 @@ def test_upload_resource(tmpdir): mock_response.status_code = 201 mock_response._content = b'{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "created_on": "2023-01-17T15:35:08.014463"}' - resource_id="eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" + resource_id = "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" tao_client.session.post = MagicMock(return_value=mock_response) @@ -211,6 +220,7 @@ def test_upload_resource(tmpdir): assert resp_json.get("id") == "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" assert isinstance(resp_json, dict) + @pytest.mark.use_python def test_download_resource(tmpdir): tao_client = get_tao_client() @@ -219,10 +229,13 @@ def test_download_resource(tmpdir): mock_response.status_code = 200 mock_response._content = b'{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "parent_id" : "None", "status": "Pending", "created_on": "2023-01-17T15:35:08.014463"}' - resource_id="eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" + resource_id = "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" tao_client.session.get = MagicMock(return_value=mock_response) - resp_json = tao_client.download_resource("dataset", resource_id=resource_id, job_id="test_235678", output_dir=tmpdir) + resp_json = tao_client.download_resource("dataset", + resource_id=resource_id, + job_id="test_235678", + output_dir=tmpdir) assert resp_json is None From 79d406acc178580c22938bc7289a5a6e9d68cc04 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 18 Jan 2023 23:36:01 +0000 Subject: [PATCH 4/9] format correction --- tests/test_tao_client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_tao_client.py b/tests/test_tao_client.py index ec4326469c..81a57e1e7c 100644 --- a/tests/test_tao_client.py +++ b/tests/test_tao_client.py @@ -93,7 +93,8 @@ def test_create_dataset_resource(): mock_response2 = Response() mock_response2.status_code = 400 - mock_response2._content = b'{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "created_on": "2023-01-17T15:35:08.014463"}' + mock_response2._content = b'''{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", + "created_on": "2023-01-17T15:35:08.014463"}''' tao_client.session.post = MagicMock(return_value=mock_response2) @@ -227,7 +228,8 @@ def test_download_resource(tmpdir): mock_response = Response() mock_response.status_code = 200 - mock_response._content = b'{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "parent_id" : "None", "status": "Pending", "created_on": "2023-01-17T15:35:08.014463"}' + mock_response._content = b'''{ "id" : "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx", "parent_id" : "None", + "status": "Pending", "created_on": "2023-01-17T15:35:08.014463"}''' resource_id = "eyJzdWIiOiJwOTltOTh0NzBzdDFsa3Zx" From 9677fa9dd639f2a9620e8241880da2a902323687 Mon Sep 17 00:00:00 2001 From: Bhargav Suryadevara Date: Wed, 18 Jan 2023 23:58:24 +0000 Subject: [PATCH 5/9] variable name changes --- morpheus/pipeline/training/__init__.py | 13 +++++++++++++ morpheus/pipeline/training/tao_client.py | 10 +++++----- 2 files changed, 18 insertions(+), 5 deletions(-) create mode 100644 morpheus/pipeline/training/__init__.py diff --git a/morpheus/pipeline/training/__init__.py b/morpheus/pipeline/training/__init__.py new file mode 100644 index 0000000000..2b8ac8f681 --- /dev/null +++ b/morpheus/pipeline/training/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/morpheus/pipeline/training/tao_client.py b/morpheus/pipeline/training/tao_client.py index a6df939d53..1eaca86d51 100644 --- a/morpheus/pipeline/training/tao_client.py +++ b/morpheus/pipeline/training/tao_client.py @@ -20,7 +20,7 @@ import requests -logger = logging.getLogger("morpheus.{}".format(__name__)) +logger = logging.getLogger(__name__) _KIND = typing.Literal["model", "dataset"] _DATASET_ACTIONS = typing.Literal["convert", "convert_index", "convert_efficientdet"] @@ -83,14 +83,14 @@ def inner_func(*args, **kwargs): if actions is None: raise TypeError("TypeError: a string-like object is required for an action, not 'NoneType'") - availablestr = typing.get_args(actions_by_kind) + available_actions = typing.get_args(actions_by_kind) if isinstance(actions, list): - if not set(actions).issubset(availablestr): + if not set(actions).issubset(available_actions): raise ValueError("One or more actions are not valid actions '{}'. Available actions are {}".format( actions, actions_by_kind)) else: - if actions not in availablestr: + if actions not in available_actions: raise ValueError("Invalid action '{}'. Available actions are {}".format(actions, actions_by_kind)) return func(*args, **kwargs) @@ -151,7 +151,7 @@ def authorize(self): resp = self.session.get(endpoint) if not resp.status_code == 200: - raise Exception("Login failed: {}".format(resp.reason)) + raise Exception("Login failed: {}".format(resp.content)) logger.info("Login has been successful!") From 5b1eb94e7f6e6e53d23e52701881c0f3cf4203b8 Mon Sep 17 00:00:00 2001 From: bsuryadevara Date: Thu, 19 Jan 2023 09:02:18 -0600 Subject: [PATCH 6/9] fix flake8 style --- morpheus/pipeline/training/__init__.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/morpheus/pipeline/training/__init__.py b/morpheus/pipeline/training/__init__.py index 2b8ac8f681..e69de29bb2 100644 --- a/morpheus/pipeline/training/__init__.py +++ b/morpheus/pipeline/training/__init__.py @@ -1,13 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file From 70f99b56f74dcac347c994d69f3b7824878a4833 Mon Sep 17 00:00:00 2001 From: bsuryadevara Date: Thu, 26 Jan 2023 17:37:10 -0600 Subject: [PATCH 7/9] added functionality to upload in memory data to tao --- morpheus/pipeline/training/tao_client.py | 138 +++++++++++++++++------ 1 file changed, 102 insertions(+), 36 deletions(-) diff --git a/morpheus/pipeline/training/tao_client.py b/morpheus/pipeline/training/tao_client.py index 1eaca86d51..6fe542467e 100644 --- a/morpheus/pipeline/training/tao_client.py +++ b/morpheus/pipeline/training/tao_client.py @@ -13,9 +13,12 @@ # limitations under the License. import functools +import io import json import logging import os +import tarfile +import time import typing import requests @@ -181,10 +184,10 @@ def create_resource(self, kind: _KIND, data: typing.Dict, **kwargs) -> str: Parameters ---------- - data : typing.Dict - Initial metadata for new resource. kind : _KIND Endpoint type that is specific to a model or a dataset. + data : typing.Dict + Initial metadata for new resource. **kwargs : Additional arguments. Returns @@ -217,12 +220,12 @@ def partial_update_resource(self, kind: _KIND, data: typing.Dict, resource_id: s Parameters ---------- + kind : _KIND + Endpoint type that is specific to a model or a dataset. data : typing.Dict Metadata that needs to be updated. resource_id: str Unique identifier for the resource. - kind : _KIND - Endpoint type that is specific to a model or a dataset. **kwargs : Additional arguments. Returns @@ -252,12 +255,12 @@ def update_resource(self, kind: _KIND, data: typing.Dict, resource_id: str, **kw Parameters ---------- + kind : _KIND + Endpoint type that is specific to a model or a dataset. data : typing.Dict Metadata that needs to be updated. resource_id: str Unique identifier for the resource. - kind : _KIND - Endpoint type that is specific to a model or a dataset. **kwargs : Additional arguments. Returns @@ -287,12 +290,12 @@ def upload_resource(self, kind: _KIND, resource_path: str, resource_id: str, **k Parameters ---------- + kind : _KIND + Endpoint type that is specific to a model or a dataset. resource_path : str The location of the resource to be uploaded. resource_id: str Unique identifier for the resource. - kind : _KIND - Endpoint type that is specific to a model or a dataset. **kwargs : Additional arguments. Returns @@ -321,6 +324,69 @@ def upload_resource(self, kind: _KIND, resource_path: str, resource_id: str, **k return json_resp + def upload_in_memory_data(self, + files_content: typing.Dict[str, bytearray], + dirs_to_create, + resource_id: str, + **kwargs) -> typing.Dict: + """ + Upload in memory data. + + Parameters + ---------- + files_content : typing.Dict[str, bytearray] + Keys in dictionary are stored as obsolete filepaths and values as bytearray data. + dirs_to_create: typing.Dict[str] + To meet the requirements of the TAO data format, a list of directories must be created. + resource_id: str + Unique identifier for the resource. + **kwargs : + Additional arguments. + Returns + ------- + json_resp : typing.Dict + JSON response. + """ + # Create an in-memory binary stream + tar_bytes = io.BytesIO() + + # Open tarfile + tar = tarfile.open(fileobj=tar_bytes, mode='w') + + # Create required directories. + for dir in dirs_to_create: + dir_info = tarfile.TarInfo(name=dir) + dir_info.type = tarfile.DIRTYPE + dir_info.mode = 0o755 + dir_info.mtime = time.time() + tar.addfile(tarinfo=dir_info) + + # Create file within the directories. + for key in files_content.keys(): + # Here key is a absolute filepath. + file_content = files_content[key] + file_content = io.BytesIO(file_content) + file_info = tarfile.TarInfo(name=key) + file_info.size = len(file_content.getvalue()) + file_info.mtime = time.time() + tar.addfile(file_info, file_content) + + # Seek to the beginning of the stream + tar_bytes.seek(0) + + endpoint = f"{self.user_uri}/dataset/{resource_id}/upload" + logger.debug("Constructed endpoint with provided input: {}".format(endpoint)) + + resp = self.session.post(endpoint, files={'file': tar_bytes}, **kwargs) + + if not resp.status_code == 201: + raise Exception("Unable to upload resource: {}".format(resp.content)) + + json_resp = resp.json() + logger.debug("Response: {}".format(json_resp)) + + return json_resp + @validate_kind def list_resources(self, kind: _KIND, **kwargs) -> typing.Dict: """ @@ -358,12 +424,12 @@ def get_specs_schema(self, kind: _KIND, action: str, resource_id: str, **kwargs) Parameters ---------- - resource_id: str - Unique identifier for the resource. kind : _KIND Endpoint type that is specific to a model or a dataset. action: str TAO actions. + resource_id: str + Unique identifier for the resource. **kwargs : Additional arguments. Returns @@ -392,12 +458,12 @@ def get_specs(self, kind: _KIND, action: str, resource_id: str, **kwargs) -> typ Parameters ---------- - resource_id: str - Unique identifier for the resource. kind : _KIND Endpoint type that is specific to a model or a dataset. action: str TAO actions. + resource_id: str + Unique identifier for the resource. **kwargs : Additional arguments. Returns @@ -426,14 +492,14 @@ def update_specs(self, kind: _KIND, action: str, specs: typing.Dict, resource_id Parameters ---------- - specs: typing.Dict - Updated specs. - resource_id: str - Unique identifier for the resource. kind : _KIND Endpoint type that is specific to a model or a dataset. action: str TAO actions. + specs: typing.Dict + Updated specs. + resource_id: str + Unique identifier for the resource. **kwargs : Additional arguments. Returns @@ -464,12 +530,12 @@ def save_specs(self, kind: _KIND, action: str, resource_id: str, **kwargs) -> ty Parameters ---------- - resource_id: str - Unique identifier for the resource. kind : _KIND Endpoint type that is specific to a model or a dataset. action: str TAO actions. + resource_id: str + Unique identifier for the resource. **kwargs : Additional arguments. Returns @@ -503,12 +569,12 @@ def run_job(self, Parameters ---------- - resource_id: str - Unique identifier for the resource. kind : _KIND Endpoint type that is specific to a model or a dataset. action: str TAO actions. + resource_id: str + Unique identifier for the resource. parent_job_id: str Parent job id. **kwargs : @@ -540,12 +606,12 @@ def get_job_status(self, kind: _KIND, resource_id: str, job_id: str, **kwargs) - Parameters ---------- - job_id: str - Unique identifier for the job. - resource_id: str - Unique identifier for the resource. kind : _KIND Endpoint type that is specific to a model or a dataset. + resource_id: str + Unique identifier for the resource. + job_id: str + Unique identifier for the job. **kwargs : Additional arguments. Returns @@ -573,10 +639,10 @@ def list_jobs(self, kind: _KIND, resource_id: str, **kwargs) -> typing.Dict: Parameters ---------- - resource_id: str - Unique identifier for the resource. kind : _KIND Endpoint type that is specific to a model or a dataset. + resource_id: str + Unique identifier for the resource. **kwargs : Additional arguments. Returns @@ -605,12 +671,12 @@ def delete_job(self, kind: _KIND, resource_id: str, job_id: str, **kwargs) -> ty Parameters ---------- + kind : _KIND + Endpoint type that is specific to a model or a dataset. resource_id: str Unique identifier for the resource. job_id: str Unique identifier for the job. - kind : _KIND - Endpoint type that is specific to a model or a dataset. **kwargs : Additional arguments. Returns @@ -638,12 +704,12 @@ def cancel_job(self, kind: _KIND, resource_id: str, job_id: str, **kwargs) -> ty Parameters ---------- + kind : _KIND + Endpoint type that is specific to a model or a dataset. resource_id: str Unique identifier for the resource. job_id: str Unique identifier for the job. - kind : _KIND - Endpoint type that is specific to a model or a dataset. **kwargs : Additional arguments. Returns @@ -703,12 +769,12 @@ def download_resource(self, kind: _KIND, resource_id, job_id, output_dir: str, * Parameters ---------- + kind : _KIND + Endpoint type that is specific to a model or a dataset. resource_id: str Unique identifier for the resource. job_id: str Unique identifier for the job. - kind : _KIND - Endpoint type that is specific to a model or a dataset. output_dir : str Output directory to save the downloaded content. **kwargs : @@ -756,10 +822,10 @@ def delete_resource(self, kind: _KIND, resource_id: str, **kwargs) -> typing.Dic Parameters ---------- - resource_id: str - Unique identifier for the resource. kind : _KIND Endpoint type that is specific to a model or a dataset. + resource_id: str + Unique identifier for the resource. **kwargs : Additional arguments. Returns @@ -787,10 +853,10 @@ def retrieve_resource(self, kind: _KIND, resource_id: str, **kwargs) -> typing.D Parameters ---------- - resource_id: str - Unique identifier for the resource. kind : _KIND Endpoint type that is specific to a model or a dataset. + resource_id: str + Unique identifier for the resource. **kwargs : Additional arguments. Returns From 16c44eaa69f89da35a4b9f3cecfe90c13dfbfeec Mon Sep 17 00:00:00 2001 From: bsuryadevara Date: Fri, 27 Jan 2023 09:53:36 -0600 Subject: [PATCH 8/9] added functionality to upload in memory data to tao --- morpheus/pipeline/training/tao_client.py | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/morpheus/pipeline/training/tao_client.py b/morpheus/pipeline/training/tao_client.py index 6fe542467e..68d035ebab 100644 --- a/morpheus/pipeline/training/tao_client.py +++ b/morpheus/pipeline/training/tao_client.py @@ -324,10 +324,7 @@ def upload_resource(self, kind: _KIND, resource_path: str, resource_id: str, **k return json_resp - def upload_in_memory_data(self, - files_content: typing.Dict[str, bytearray], - dirs_to_create, - resource_id: str, + def upload_in_memory_data(self, files_content: typing.Dict[str, bytearray], resource_id: str, **kwargs) -> typing.Dict: """ Upload in memory data. @@ -335,9 +332,7 @@ def upload_in_memory_data(self, Parameters ---------- files_content : typing.Dict[str, bytearray] - Keys in dictionary are stored as obsolete filepaths and values as bytearray data. - dirs_to_create: typing.Dict[str] - To meet the requirements of the TAO data format, a list of directories must be created. + Keys in the dictionary are file path and values are file content as bytearray. resource_id: str Unique identifier for the resource. **kwargs : @@ -353,17 +348,9 @@ def upload_in_memory_data(self, # Open tarfile tar = tarfile.open(fileobj=tar_bytes, mode='w') - # Create required directories. - for dir in dirs_to_create: - dir_info = tarfile.TarInfo(name=dir) - dir_info.type = tarfile.DIRTYPE - dir_info.mode = 0o755 - dir_info.mtime = time.time() - tar.addfile(tarinfo=dir_info) - # Create file within the directories. for key in files_content.keys(): - # Here key is a absolute filepath. + # Here key is a filepath. file_content = files_content[key] file_content = io.BytesIO(file_content) file_info = tarfile.TarInfo(name=key) @@ -379,6 +366,9 @@ def upload_in_memory_data(self, resp = self.session.post(endpoint, files={'file': tar_bytes}, **kwargs) + # Close tar file. + tar.close() + if not resp.status_code == 201: raise Exception("Unable to upload resource: {}".format(resp.content)) From a68895e0862243e38acd89986a8d5ded6b0ceeab Mon Sep 17 00:00:00 2001 From: bsuryadevara Date: Thu, 16 Feb 2023 10:54:10 -0600 Subject: [PATCH 9/9] added missing docstrings and typing annotations --- .../{pipeline/training => io}/tao_client.py | 75 +++++++++++++++---- morpheus/pipeline/training/__init__.py | 0 tests/test_tao_client.py | 24 ++---- 3 files changed, 68 insertions(+), 31 deletions(-) rename morpheus/{pipeline/training => io}/tao_client.py (94%) delete mode 100644 morpheus/pipeline/training/__init__.py diff --git a/morpheus/pipeline/training/tao_client.py b/morpheus/io/tao_client.py similarity index 94% rename from morpheus/pipeline/training/tao_client.py rename to morpheus/io/tao_client.py index 68d035ebab..e42b3e75a6 100644 --- a/morpheus/pipeline/training/tao_client.py +++ b/morpheus/io/tao_client.py @@ -30,16 +30,17 @@ _MODEL_ACTIONS = typing.Literal["train", "evaluate", "prune", "retrain", "export", "inference"] -def validate_kind(func): +def validate_kind(func: typing.Callable) -> typing.Callable: """ Validates given endpoint category. Parameters ---------- - func : Function that requires wrapping. + func : typing.Callable + Function that requires wrapping. Returns ------- - inner_func + inner_func : typing.Callable Encapsulated function. """ @@ -58,16 +59,17 @@ def inner_func(*args, **kwargs): return inner_func -def validate_actions(func): +def validate_actions(func: typing.Callable) -> typing.Callable: """ Validates TAO actions. Parameters ---------- - func : Function that requires wrapping. + func : typing.Callable + Function that requires wrapping. Returns ------- - inner_func + inner_func : typing.Callable Encapsulated function. """ @@ -101,7 +103,22 @@ def inner_func(*args, **kwargs): return inner_func -def generate_schema_url(url, ssl): +def generate_schema_url(url: str, ssl: bool) -> str: + """ + Generates url with schema. + + Parameters + ---------- + url : str + URL + ssl : str + Determines whether to use HTTP or HTTPS in the schema. + Returns + ------- + url : str + Schema attached url. + """ + if url.startswith("http://") or url.startswith("https://"): raise ValueError("URL should not include the scheme") @@ -111,17 +128,43 @@ def generate_schema_url(url, ssl): return url -def vaildate_apikey(apikey): +def apikey_type_check(apikey: str): + """ + Verify API key type. + + Parameters + ---------- + apikey : str + NGC API key + """ + if not isinstance(apikey, str): raise ValueError('API key must be a string') if not apikey: raise ValueError('API key can not be an empty string') - return apikey - class TaoApiClient(): + """ + This serves as a client wrapper for TAO REST endpoints. This class gives you the flexibility to upload and delete + datasets, run training or inference processes, and update configurations on the TAO toolkit API server. + + Parameters + ---------- + apikey : str + NGC API key. + url : str + TAO toolkit API URL. + ssl : str + Determines whether to use HTTP or HTTPS in the schema. + cert : str + Client side certificate. + server_side_cert : str + Server side certificate. + proxies : typing.Dict[str, str] + Defines the HTTP and HTTPS connections. + """ def __init__(self, apikey: str, @@ -131,7 +174,8 @@ def __init__(self, server_side_cert: bool = True, proxies: typing.Dict[str, str] = None): - self._apikey = vaildate_apikey(apikey) + apikey_type_check(apikey) + self._apikey = apikey self._parsed_url = generate_schema_url(url, ssl) self._base_uri = f"{self._parsed_url}/api/v1" self._ssl = ssl @@ -147,6 +191,9 @@ def __init__(self, self._session.proxies.update(proxies) def authorize(self): + """ + This function establishes a session and authorizes with the TAO toolkit API server. + """ endpoint = f"{self._base_uri}/login/{self._apikey}" @@ -166,15 +213,15 @@ def authorize(self): self._session.headers.update({'Authorization': 'Bearer ' + json_resp.get("token")}) @property - def base_uri(self): + def base_uri(self) -> str: return self._base_uri @property - def user_uri(self): + def user_uri(self) -> str: return self._user_uri @property - def session(self): + def session(self) -> requests.Session: return self._session @validate_kind diff --git a/morpheus/pipeline/training/__init__.py b/morpheus/pipeline/training/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/test_tao_client.py b/tests/test_tao_client.py index 81a57e1e7c..e2313d5d13 100644 --- a/tests/test_tao_client.py +++ b/tests/test_tao_client.py @@ -20,12 +20,11 @@ from requests.models import Response from requests.sessions import Session -from morpheus.pipeline.training.tao_client import TaoApiClient -from morpheus.pipeline.training.tao_client import generate_schema_url -from morpheus.pipeline.training.tao_client import vaildate_apikey +from morpheus.io.tao_client import TaoApiClient +from morpheus.io.tao_client import apikey_type_check +from morpheus.io.tao_client import generate_schema_url -@pytest.mark.use_python def test_generate_schema_url(): actual = generate_schema_url(url="localhost:32080", ssl=False) @@ -48,19 +47,17 @@ def test_generate_schema_url(): assert actual == expected -@pytest.mark.use_python -def test_vaildate_apikey(): +def test_apikey_type_check(): - vaildate_apikey("test_api_key") + apikey_type_check("test_api_key") with pytest.raises(ValueError): - vaildate_apikey("") + apikey_type_check("") with pytest.raises(ValueError): - vaildate_apikey(123459) + apikey_type_check(123459) -@pytest.mark.use_python def get_tao_client(): mock_creds = {"user_id": "X20109876", "token": "TOkJJTkw6WkxKRDpNWk9ZOkRVN0o6"} tao_client = TaoApiClient("test_api_key", "localhost:32080") @@ -69,7 +66,6 @@ def get_tao_client(): return tao_client -@pytest.mark.use_python def test_create_dataset_resource(): tao_client = get_tao_client() @@ -102,7 +98,6 @@ def test_create_dataset_resource(): tao_client.create_resource("dataset", data=data) -@pytest.mark.use_python def test_create_model_resource(): tao_client = get_tao_client() @@ -124,7 +119,6 @@ def test_create_model_resource(): tao_client.create_resource("random_kind", data=data) -@pytest.mark.use_python def test_partial_update_resource(): tao_client = get_tao_client() @@ -166,7 +160,6 @@ def test_update_resource(): assert isinstance(resp_json, dict) -@pytest.mark.use_python def test_get_specs_schema(): tao_client = get_tao_client() @@ -188,7 +181,6 @@ def test_get_specs_schema(): assert isinstance(resp_json, dict) -@pytest.mark.use_python def test_close(): tao_client = get_tao_client() @@ -199,7 +191,6 @@ def test_close(): assert tao_client.session is None -@pytest.mark.use_python def test_upload_resource(tmpdir): input_data = tmpdir.join("input_dataset.txt") @@ -222,7 +213,6 @@ def test_upload_resource(tmpdir): assert isinstance(resp_json, dict) -@pytest.mark.use_python def test_download_resource(tmpdir): tao_client = get_tao_client()