From 23d88fc7dcfdbe9f9b319a08b72b39f0c58fdbb3 Mon Sep 17 00:00:00 2001 From: Raphael Jin Date: Tue, 11 Jun 2024 11:48:20 -0700 Subject: [PATCH 1/4] [Deploy] Remove unnecessary logic. --- .../device_model_deployment.py | 232 +----------------- .../model_scheduler/master_job_runner.py | 1 - .../model_scheduler/worker_job_runner.py | 16 +- 3 files changed, 10 insertions(+), 239 deletions(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py b/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py index 1876373d25..5d3ba9873d 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py @@ -1,12 +1,13 @@ +import fedml + import logging import os -import pickle -import platform import shutil import time import traceback import yaml import datetime +import docker import requests import torch @@ -15,27 +16,18 @@ import collections.abc -import fedml from fedml.computing.scheduler.comm_utils import sys_utils, security_utils -from fedml.computing.scheduler.comm_utils.container_utils import ContainerUtils from fedml.computing.scheduler.comm_utils.hardware_utils import HardwareUtil from fedml.computing.scheduler.comm_utils.job_utils import JobRunnerUtils - -for type_name in collections.abc.__all__: - setattr(collections, type_name, getattr(collections.abc, type_name)) - from fedml.computing.scheduler.comm_utils.constants import SchedulerConstants from fedml.computing.scheduler.model_scheduler.device_client_constants import ClientConstants -import io - -import docker -from ..scheduler_core.compute_cache_manager import ComputeCacheManager +from fedml.computing.scheduler.model_scheduler.device_model_cache import FedMLModelCache from ..scheduler_core.compute_utils import ComputeUtils from ..comm_utils.container_utils import ContainerUtils - from .device_http_inference_protocol import FedMLHttpInference -from fedml.computing.scheduler.model_scheduler.device_model_cache import FedMLModelCache +for type_name in collections.abc.__all__: + setattr(collections, type_name, getattr(collections.abc, type_name)) no_real_gpu_allocation = None @@ -432,8 +424,6 @@ def should_exit_logs(end_point_id, model_id, cmd_type, model_name, inference_eng if cmd_type == ClientConstants.CMD_TYPE_RUN_DEFAULT_SERVER: # TODO: Exited Quickly if the container is Exited or Removed # If the container has exited, return True, means we should exit the logs - # container_name = "{}".format(ClientConstants.FEDML_DEFAULT_SERVER_CONTAINER_NAME_PREFIX) + "__" + \ - # security_utils.get_content_hash(model_name) try: inference_output_url, model_version, model_metadata, model_config = \ get_model_info(model_name, inference_engine, inference_port, infer_host, @@ -554,8 +544,6 @@ def log_deployment_result(end_point_id, model_id, cmd_container_name, cmd_type, def is_client_inference_container_ready(infer_url_host, inference_http_port, inference_model_name, local_infer_url, inference_type="default", model_version="", request_input_example=None): - # logging.info(f"Inference type: {inference_type}, infer_url_host {infer_url_host}, \ - # inference_http_port: {inference_http_port}, local_infer_url {local_infer_url}") if inference_type == "default": default_client_container_ready_url = "http://{}:{}/ready".format("0.0.0.0", inference_http_port) @@ -631,211 +619,5 @@ def run_http_inference_with_curl_request(inference_url, inference_input_list, in inference_type=inference_type, engine_type=engine_type, timeout=timeout) -def convert_model_to_onnx( - torch_model, output_path: str, dummy_input_list, input_size: int, input_is_tensor=True -) -> None: - from collections import OrderedDict - import torch - from torch.onnx import TrainingMode - - torch.onnx.export(torch_model, # model being run - dummy_input_list if input_is_tensor else tuple(dummy_input_list), - # model input (or a tuple for multiple inputs) - f=output_path, # where to save the model (can be a file or file-like object) - export_params=True, # store the trained parameter weights inside the model file - opset_version=11, # the ONNX version to export the model to - do_constant_folding=False, # whether to execute constant folding for optimization - input_names=["input1", "input2"], - # the model's input names - output_names=['output'], # the model's output names - training=TrainingMode.EVAL, - verbose=True, - dynamic_axes={"input1": {0: "batch_size"}, - "input2": {0: "batch_size"}, - "output": {0: "batch_size"}} - ) - - -def test_start_triton_server(model_serving_dir): - sudo_prefix = "sudo " - sys_name = platform.system() - if sys_name == "Darwin": - sudo_prefix = "" - gpu_attach_cmd = "" - - triton_server_container_name = "{}".format(ClientConstants.FEDML_TRITON_SERVER_CONTAINER_NAME_PREFIX) - triton_server_cmd = "{}docker stop {}; {}docker rm {}; {}docker run --name {} {} -p{}:8000 " \ - "-p{}:8001 -p{}:8002 " \ - "--shm-size {} " \ - "-v {}:/models {} " \ - "bash -c \"pip install transformers && tritonserver --strict-model-config=false " \ - "--model-control-mode=poll --repository-poll-secs={} " \ - "--model-repository=/models\" ".format(sudo_prefix, triton_server_container_name, - sudo_prefix, triton_server_container_name, - sudo_prefix, triton_server_container_name, - gpu_attach_cmd, - ClientConstants.INFERENCE_HTTP_PORT, - ClientConstants.INFERENCE_GRPC_PORT, - 8002, - "4096m", - model_serving_dir, - ClientConstants.INFERENCE_SERVER_IMAGE, - ClientConstants.FEDML_MODEL_SERVING_REPO_SCAN_INTERVAL) - logging.info("Run triton inference server: {}".format(triton_server_cmd)) - triton_server_process = ClientConstants.exec_console_with_script(triton_server_cmd, - should_capture_stdout=False, - should_capture_stderr=False, - no_sys_out_err=True) - - -def test_convert_pytorch_model_to_onnx(model_net_file, model_bin_file, model_name, model_in_params): - torch_model = torch.jit.load(model_net_file) - with open(model_bin_file, 'rb') as model_pkl_file: - model_state_dict = pickle.load(model_pkl_file) - torch_model.load_state_dict(model_state_dict) - torch_model.eval() - - input_size = model_in_params["input_size"] - input_types = model_in_params["input_types"] - - dummy_input_list = [] - for index, input_i in enumerate(input_size): - if input_types[index] == "int": - this_input = torch.tensor(torch.randint(0, 1, input_i)) - else: - this_input = torch.tensor(torch.zeros(input_i)) - dummy_input_list.append(this_input) - - onnx_model_dir = os.path.join(ClientConstants.get_model_cache_dir(), - ClientConstants.FEDML_CONVERTED_MODEL_DIR_NAME, - model_name, ClientConstants.INFERENCE_MODEL_VERSION) - if not os.path.exists(onnx_model_dir): - os.makedirs(onnx_model_dir, exist_ok=True) - onnx_model_path = os.path.join(onnx_model_dir, "model.onnx") - - convert_model_to_onnx(torch_model, onnx_model_path, dummy_input_list, input_size, - input_is_tensor=True) - - model_serving_dir = os.path.join(ClientConstants.get_model_cache_dir(), - ClientConstants.FEDML_CONVERTED_MODEL_DIR_NAME) - return model_serving_dir - - -def start_gpu_model_load_process(): - from multiprocessing import Process - import time - process = Process(target=load_gpu_model_to_cpu_device) - process.start() - while True: - time.sleep(1) - - -def load_gpu_model_to_cpu_device(): - import pickle - import io - import torch - - class CPU_Unpickler(pickle.Unpickler): - def find_class(self, module, name): - if module == 'torch.storage' and name == '_load_from_bytes': - return lambda b: torch.load(io.BytesIO(b), map_location='cpu') - else: - return super().find_class(module, name) - - model_file = "/home/fedml/.fedml/fedml-client/fedml/models/theta_rec_auc_81_single_label/theta_rec_auc_81_single_label" - with open(model_file, "rb") as model_pkl_file: - if not torch.cuda.is_available(): - model = CPU_Unpickler(model_pkl_file).load() - if model is None: - print("Failed to load gpu model to cpu device") - else: - print("Succeeded to load gpu model to cpu device") - - if __name__ == "__main__": - start_gpu_model_load_process() - - model_serving_dir = test_convert_pytorch_model_to_onnx("./sample-open-training-model-net", - "./sample-open-training-model", - "rec-model", - {"input_size": [[1, 24], [1, 2]], - "input_types": ["int", "float"]}) - - test_start_triton_server(model_serving_dir) - - # input_data = {"model_version": "v0-Sun Feb 05 12:17:16 GMT 2023", - # "model_name": "model_414_45_open-model-test_v0-Sun-Feb-05-12-17-16-GMT-2023", - # # "data": "file:///Users/alexliang/fedml_data/mnist-image.png", - # "data": "https://raw.githubusercontent.com/niyazed/triton-mnist-example/master/images/sample_image.png", - # "end_point_id": 414, "model_id": 45, "token": "a09a18a14c4c4d89a8d5f9515704c073"} - # - # data_list = list() - # data_list.append(input_data["data"]) - # run_http_inference_with_lib_http_api_with_image_data(input_data["model_name"], - # 5001, 1, data_list, "") - # - # - # class LogisticRegression(torch.nn.Module): - # def __init__(self, input_dim, output_dim): - # super(LogisticRegression, self).__init__() - # self.linear = torch.nn.Linear(input_dim, output_dim) - # - # def forward(self, x): - # outputs = torch.sigmoid(self.linear(x)) - # return outputs - # - # - # model = LogisticRegression(28 * 28, 10) - # checkpoint = {'model': model} - # model_net_file = "/Users/alexliang/fedml-client/fedml/models/open-model-test/model-net.pt" - # torch.save(checkpoint, model_net_file) - # - # with open("/Users/alexliang/fedml-client/fedml/models/open-model-test/open-model-test", 'rb') as model_pkl_file: - # model_params = pickle.load(model_pkl_file) - # # torch.save(model_params, "/Users/alexliang/fedml-client/fedml/models/open-model-test/a.pt") - # # model = torch.load("/Users/alexliang/fedml-client/fedml/models/open-model-test/a.pt") - # loaded_checkpoint = torch.load(model_net_file) - # loaded_model = loaded_checkpoint["model"] - # loaded_model.load_state_dict(model_params) - # for parameter in loaded_model.parameters(): - # parameter.requires_grad = False - # loaded_model.eval() - # input_names = {"x": 0} - # convert_model_to_onnx(loaded_model, "/Users/alexliang/fedml-client/fedml/models/open-model-test/a.onnx", - # input_names, 28 * 28) - - # parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) - # parser.add_argument("--cf", "-c", help="config file") - # parser.add_argument("--role", "-r", type=str, default="client", help="role") - # parser.add_argument("--model_storage_local_path", "-url", type=str, default="/home/ubuntu", - # help="model storage local path") - # parser.add_argument("--inference_model_name", "-n", type=str, default="fedml-model", - # help="inference model name") - # parser.add_argument("--inference_engine", "-engine", type=str, default="ONNX", help="inference engine") - # parser.add_argument("--inference_http_port", "-http", type=int, default=8000, help="inference http port") - # parser.add_argument("--inference_grpc_port", "-gprc", type=int, default=8001, help="inference grpc port") - # parser.add_argument("--inference_metric_port", "-metric", type=int, default=8002, help="inference metric port") - # parser.add_argument("--inference_use_gpu", "-gpu", type=str, default="gpu", help="inference use gpu") - # parser.add_argument("--inference_memory_size", "-mem", type=str, default="256m", help="inference memory size") - # parser.add_argument("--inference_convertor_image", "-convertor", type=str, - # default=ClientConstants.INFERENCE_CONVERTOR_IMAGE, help="inference convertor image") - # parser.add_argument("--inference_server_image", "-server", type=str, - # default=ClientConstants.INFERENCE_SERVER_IMAGE, help="inference server image") - # args = parser.parse_args() - # args.user = args.user - # - # pip_source_dir = os.path.dirname(__file__) - # __running_model_name, __inference_output_url, __model_version, __model_metadata, __model_config = \ - # start_deployment( - # args.model_storage_local_path, - # args.inference_model_name, - # args.inference_engine, - # args.inference_http_port, - # args.inference_grpc_port, - # args.inference_metric_port, - # args.inference_use_gpu, - # args.inference_memory_size, - # args.inference_convertor_image, - # args.inference_server_image) - # print("Model deployment results, running model name: {}, url: {}, model metadata: {}, model config: {}".format( - # __running_model_name, __inference_output_url, __model_metadata, __model_config)) + pass diff --git a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py index b9b9b4c356..ef2c01c49d 100755 --- a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py @@ -453,7 +453,6 @@ def process_deployment_result_message(self, topic=None, payload=None): time.sleep(3) self.trigger_completed_event() - def cleanup_runner_process(self, run_id): ServerConstants.cleanup_run_process(run_id, not_kill_subprocess=True) diff --git a/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py b/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py index ef65e37904..8100707386 100755 --- a/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py @@ -294,9 +294,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center, json.dumps(result_payload), replica_no=rank + 1) logging.info(f"Deploy replica {rank + 1} / {prev_rank + 1 + op_num} successfully.") - time.sleep(5) - time.sleep(1) self.status_reporter.run_id = self.run_id self.status_reporter.report_client_id_status( self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FINISHED, @@ -348,7 +346,8 @@ def run_impl(self, run_extend_queue_list, sender_message_center, # TODO (Raphael) check if this will allow another job to seize the gpu during high concurrency: try: - JobRunnerUtils.get_instance().release_partial_job_gpu(run_id, self.edge_id, replica_occupied_gpu_ids) + JobRunnerUtils.get_instance().release_partial_job_gpu( + run_id, self.edge_id, replica_occupied_gpu_ids) except Exception as e: if op == "rollback": pass @@ -395,7 +394,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center, JobRunnerUtils.get_instance().release_partial_job_gpu( run_id, self.edge_id, replica_occupied_gpu_ids) - result_payload = self.send_deployment_results( + self.send_deployment_results( end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED, model_id, model_name, inference_output_url, inference_model_version, inference_port, inference_engine, model_metadata, model_config) @@ -496,15 +495,6 @@ def send_deployment_results(self, end_point_name, device_id, model_status, self.message_center.send_message_json(deployment_results_topic, json.dumps(deployment_results_payload)) return deployment_results_payload - def send_deployment_status(self, end_point_name, device_id, - model_id, model_name, model_version, - model_inference_url, model_status, - inference_port=ClientConstants.MODEL_INFERENCE_DEFAULT_PORT, - replica_no=1, # start from 1 - ): - # Deprecated - pass - def reset_devices_status(self, edge_id, status): self.status_reporter.run_id = self.run_id self.status_reporter.edge_id = edge_id From e0ad9b5bef5bcea1eaefe3458a3d6b49aa399d46 Mon Sep 17 00:00:00 2001 From: Raphael Jin Date: Tue, 11 Jun 2024 12:15:22 -0700 Subject: [PATCH 2/4] [Deploy] Remove unnecessary logic; Rename readiness check function; Forbidden user level control of host post. --- .../device_model_deployment.py | 150 +++++------------- 1 file changed, 40 insertions(+), 110 deletions(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py b/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py index 5d3ba9873d..edd2ebea9a 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py @@ -68,6 +68,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, num_gpus = gpu_per_replica gpu_ids, gpu_attach_cmd = None, "" + # Concatenate the model name running_model_name = ClientConstants.get_running_model_name( end_point_name, inference_model_name, model_version, end_point_id, model_id, edge_id=edge_id) @@ -77,6 +78,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, config = yaml.safe_load(file) # Resource related + inference_type = "default" use_gpu = config.get('use_gpu', True) num_gpus_frm_yml = config.get('num_gpus', None) if not use_gpu: @@ -85,9 +87,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, if num_gpus_frm_yml is not None: num_gpus = int(num_gpus_frm_yml) usr_indicated_wait_time = config.get('deploy_timeout', 900) - usr_indicated_worker_port = config.get('worker_port', "") - if usr_indicated_worker_port == "": - usr_indicated_worker_port = os.environ.get("FEDML_WORKER_PORT", "") + usr_indicated_retry_cnt = max(int(usr_indicated_wait_time) // 10, 1) shm_size = config.get('shm_size', None) storage_opt = config.get('storage_opt', None) tmpfs = config.get('tmpfs', None) @@ -96,17 +96,6 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, cpus = int(cpus) memory = config.get('memory', None) - if usr_indicated_worker_port == "": - usr_indicated_worker_port = None - else: - usr_indicated_worker_port = int(usr_indicated_worker_port) - - worker_port_env = os.environ.get("FEDML_WORKER_PORT", "") - worker_port_from_config = config.get('worker_port', "") - logging.info(f"usr_indicated_worker_port {usr_indicated_worker_port}, worker port env {worker_port_env}, " - f"worker port from config {worker_port_from_config}") - - usr_indicated_retry_cnt = max(int(usr_indicated_wait_time) // 10, 1) inference_image_name = config.get('inference_image_name', ClientConstants.INFERENCE_SERVER_CUSTOME_IMAGE) image_pull_policy = config.get('image_pull_policy', SchedulerConstants.IMAGE_PULL_POLICY_IF_NOT_PRESENT) @@ -144,6 +133,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, # If using customized image, then bootstrap + job will be the entry point enable_custom_image = config.get("enable_custom_image", False) + # inference_type = "custom" customized_image_entry_cmd = \ "/bin/bash /home/fedml/models_serving/fedml-deploy-bootstrap-entry-auto-gen.sh" @@ -151,18 +141,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, docker_registry_user_password = config.get("docker_registry_user_password", "") docker_registry = config.get("docker_registry", "") - port_inside_container = int(config.get("port_inside_container", 2345)) - use_triton = config.get("use_triton", False) - if use_triton: - inference_type = "triton" - else: - inference_type = "default" - - # Config check - if src_code_dir == "": - raise Exception("Please indicate source_code_dir in the fedml_model_config.yaml") - if relative_entry == "": - logging.warning("You missed main_entry in the fedml_model_config.yaml") + port_inside_container = int(config.get("port", 2345)) # Request the GPU ids for the deployment if num_gpus > 0: @@ -175,22 +154,10 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, end_point_id, end_point_name, inference_model_name, edge_id, replica_rank+1, gpu_ids) logging.info("GPU ids allocated: {}".format(gpu_ids)) + # Create the model serving dir if not exists model_serving_dir = ClientConstants.get_model_serving_dir() if not os.path.exists(model_serving_dir): os.makedirs(model_serving_dir, exist_ok=True) - converted_model_path = os.path.join(model_storage_local_path, ClientConstants.FEDML_CONVERTED_MODEL_DIR_NAME) - if os.path.exists(converted_model_path): - model_file_list = os.listdir(converted_model_path) - for model_file in model_file_list: - src_model_file = os.path.join(converted_model_path, model_file) - dst_model_file = os.path.join(model_serving_dir, model_file) - if os.path.isdir(src_model_file): - if not os.path.exists(dst_model_file): - shutil.copytree(src_model_file, dst_model_file, copy_function=shutil.copy, - ignore_dangling_symlinks=True) - else: - if not os.path.exists(dst_model_file): - shutil.copyfile(src_model_file, dst_model_file) if inference_engine != ClientConstants.INFERENCE_ENGINE_TYPE_INT_DEFAULT: raise Exception(f"inference engine {inference_engine} is not supported") @@ -228,13 +195,12 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, logging.info(f"Start pulling the inference image {inference_image_name}... with policy {image_pull_policy}") ContainerUtils.get_instance().pull_image_with_policy(image_pull_policy, inference_image_name) - volumns = [] + volumes = [] binds = {} environment = {} # data_cache_dir mounting - assert type(data_cache_dir_input) == dict or type(data_cache_dir_input) == str - if type(data_cache_dir_input) == str: + if isinstance(data_cache_dir_input, str): # In this case, we mount to the same folder, if it has ~, we replace it with /home/fedml src_data_cache_dir, dst_data_cache_dir = "", "" if data_cache_dir_input != "": @@ -253,28 +219,30 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, if type(src_data_cache_dir) == str and src_data_cache_dir != "": logging.info("Start copying the data cache to the container...") if os.path.exists(src_data_cache_dir): - volumns.append(src_data_cache_dir) + volumes.append(src_data_cache_dir) binds[src_data_cache_dir] = { "bind": dst_data_cache_dir, "mode": "rw" } environment["DATA_CACHE_FOLDER"] = dst_data_cache_dir - else: + elif isinstance(data_cache_dir_input, dict): for k, v in data_cache_dir_input.items(): if os.path.exists(k): - volumns.append(v) + volumes.append(v) binds[k] = { "bind": v, "mode": "rw" } else: logging.warning(f"{k} does not exist, skip mounting it to the container") - logging.info(f"Data cache mount: {volumns}, {binds}") + logging.info(f"Data cache mount: {volumes}, {binds}") + else: + logging.warning("data_cache_dir_input is not a string or a dictionary, skip mounting it to the container") # Default mounting if not enable_custom_image or (enable_custom_image and relative_entry != ""): logging.info("Start copying the source code to the container...") - volumns.append(src_code_dir) + volumes.append(src_code_dir) binds[src_code_dir] = { "bind": dst_model_serving_dir, "mode": "rw" @@ -284,7 +252,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, host_config_dict = { "binds": binds, "port_bindings": { - port_inside_container: usr_indicated_worker_port + port_inside_container: None }, "shm_size": shm_size, "storage_opt": storage_opt, @@ -312,7 +280,6 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, if not enable_custom_image: # For some image, the default user is root. Unified to fedml. environment["HOME"] = "/home/fedml" - environment["BOOTSTRAP_DIR"] = dst_bootstrap_dir environment["FEDML_CURRENT_RUN_ID"] = end_point_id environment["FEDML_CURRENT_EDGE_ID"] = edge_id @@ -326,12 +293,13 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, for key in extra_envs: environment[key] = extra_envs[key] + # Create the container try: host_config = client.api.create_host_config(**host_config_dict) new_container = client.api.create_container( image=inference_image_name, name=default_server_container_name, - volumes=volumns, + volumes=volumes, ports=[port_inside_container], # port open inside the container environment=environment, host_config=host_config, @@ -349,22 +317,18 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, while True: cnt += 1 try: - if usr_indicated_worker_port is not None: - inference_http_port = usr_indicated_worker_port - break - else: - # Find the random port - port_info = client.api.port(new_container.get("Id"), port_inside_container) - inference_http_port = port_info[0]["HostPort"] - logging.info("inference_http_port: {}".format(inference_http_port)) - break + # Find the random port + port_info = client.api.port(new_container.get("Id"), port_inside_container) + inference_http_port = port_info[0]["HostPort"] + logging.info("host port allocated: {}".format(inference_http_port)) + break except: if cnt >= 5: raise Exception("Failed to get the port allocation") time.sleep(3) # Logging the info from the container when starting - log_deployment_result(end_point_id, model_id, default_server_container_name, + log_deployment_output(end_point_id, model_id, default_server_container_name, ClientConstants.CMD_TYPE_RUN_DEFAULT_SERVER, inference_model_name, inference_engine, inference_http_port, inference_type, retry_interval=10, deploy_attempt_threshold=usr_indicated_retry_cnt, @@ -373,9 +337,8 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, # Return the running model name and the inference output url inference_output_url, running_model_version, ret_model_metadata, ret_model_config = \ - get_model_info(inference_model_name, inference_engine, inference_http_port, - infer_host, False, inference_type, request_input_example=request_input_example, - enable_custom_image=enable_custom_image) + check_container_readiness(inference_http_port=inference_http_port, infer_host=infer_host, + request_input_example=request_input_example) if inference_output_url == "": return running_model_name, "", None, None, None @@ -426,9 +389,8 @@ def should_exit_logs(end_point_id, model_id, cmd_type, model_name, inference_eng # If the container has exited, return True, means we should exit the logs try: inference_output_url, model_version, model_metadata, model_config = \ - get_model_info(model_name, inference_engine, inference_port, infer_host, - inference_type=inference_type, request_input_example=request_input_example, - enable_custom_image=enable_custom_image) + check_container_readiness(inference_http_port=inference_port, infer_host=infer_host, + request_input_example=request_input_example) if inference_output_url != "": logging.info("Log test for deploying model successfully, inference url: {}, " "model metadata: {}, model config: {}". @@ -443,7 +405,7 @@ def should_exit_logs(end_point_id, model_id, cmd_type, model_name, inference_eng return False -def log_deployment_result(end_point_id, model_id, cmd_container_name, cmd_type, +def log_deployment_output(end_point_id, model_id, cmd_container_name, cmd_type, inference_model_name, inference_engine, inference_http_port, inference_type="default", retry_interval=10, deploy_attempt_threshold=10, @@ -542,10 +504,10 @@ def log_deployment_result(end_point_id, model_id, cmd_container_name, cmd_type, time.sleep(retry_interval) -def is_client_inference_container_ready(infer_url_host, inference_http_port, inference_model_name, local_infer_url, - inference_type="default", model_version="", request_input_example=None): +def is_client_inference_container_ready(infer_url_host, inference_http_port, readiness_check_type="default", + readiness_check_cmd=None, request_input_example=None): - if inference_type == "default": + if readiness_check_type == "default": default_client_container_ready_url = "http://{}:{}/ready".format("0.0.0.0", inference_http_port) response = None try: @@ -555,7 +517,7 @@ def is_client_inference_container_ready(infer_url_host, inference_http_port, inf if not response or response.status_code != 200: return "", "", {}, {} - # Report the deployed model info + # Construct the model metadata (input and output) model_metadata = {} if request_input_example is not None and len(request_input_example) > 0: model_metadata["inputs"] = request_input_example @@ -563,51 +525,19 @@ def is_client_inference_container_ready(infer_url_host, inference_http_port, inf model_metadata["inputs"] = {"text": "What is a good cure for hiccups?"} model_metadata["outputs"] = [] model_metadata["type"] = "default" + return "http://{}:{}/predict".format(infer_url_host, inference_http_port), None, model_metadata, None else: - triton_server_url = "{}:{}".format(infer_url_host, inference_http_port) - if model_version == "" or model_version is None: - model_version = ClientConstants.INFERENCE_MODEL_VERSION - logging.info( - f"triton_server_url: {triton_server_url} model_version: {model_version} model_name: {inference_model_name}") - triton_client = http_client.InferenceServerClient(url=triton_server_url, verbose=False) - if not triton_client.is_model_ready( - model_name=inference_model_name, model_version=model_version - ): - return "", model_version, {}, {} - logging.info(f"Model {inference_model_name} is ready, start to get model metadata...") - model_metadata = triton_client.get_model_metadata(model_name=inference_model_name, model_version=model_version) - model_config = triton_client.get_model_config(model_name=inference_model_name, model_version=model_version) - version_list = model_metadata.get("versions", None) - if version_list is not None and len(version_list) > 0: - model_version = version_list[0] - else: - model_version = ClientConstants.INFERENCE_MODEL_VERSION - - inference_output_url = "http://{}:{}/{}/models/{}/versions/{}/infer".format(infer_url_host, - inference_http_port, - ClientConstants.INFERENCE_INFERENCE_SERVER_VERSION, - inference_model_name, - model_version) - - return inference_output_url, model_version, model_metadata, model_config - - -def get_model_info(model_name, inference_engine, inference_http_port, infer_host="127.0.0.1", is_hg_model=False, - inference_type="default", request_input_example=None, enable_custom_image=False): - if model_name is None: + # TODO(Raphael): Support arbitrary readiness check command + logging.error(f"Unknown readiness check type: {readiness_check_type}") return "", "", {}, {} - local_infer_url = "{}:{}".format(infer_host, inference_http_port) - - if is_hg_model: - inference_model_name = "{}_{}_inference".format(model_name, str(inference_engine)) - else: - inference_model_name = model_name +def check_container_readiness(inference_http_port, infer_host="127.0.0.1", request_input_example=None, + readiness_check_type="default", readiness_check_cmd=None): response_from_client_container = is_client_inference_container_ready( - infer_host, inference_http_port, inference_model_name, local_infer_url, - inference_type, model_version="", request_input_example=request_input_example) + infer_host, inference_http_port, readiness_check_type, readiness_check_cmd, + request_input_example=request_input_example) return response_from_client_container From 64e8c779c61edfecf7ca8e638b6b54ff31d7983b Mon Sep 17 00:00:00 2001 From: Raphael Jin Date: Tue, 11 Jun 2024 16:29:37 -0700 Subject: [PATCH 3/4] [Deploy] Nit --- .../computing/scheduler/model_scheduler/device_model_cards.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py b/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py index 8feb757a63..c2f11a2917 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py @@ -14,7 +14,6 @@ from fedml.core.common.singleton import Singleton from fedml.computing.scheduler.model_scheduler.modelops_configs import ModelOpsConfigs -from fedml.computing.scheduler.model_scheduler.device_model_deployment import get_model_info from fedml.computing.scheduler.model_scheduler.device_server_constants import ServerConstants from fedml.computing.scheduler.model_scheduler.device_model_object import FedMLModelList, FedMLEndpointDetail from fedml.computing.scheduler.model_scheduler.device_client_constants import ClientConstants From 9194f8424f77008b49a48908ee72f19fe59ba23d Mon Sep 17 00:00:00 2001 From: Raphael Jin Date: Tue, 11 Jun 2024 16:42:46 -0700 Subject: [PATCH 4/4] [Deploy] Hide unnecessary log. --- .../scheduler/model_scheduler/device_model_cache.py | 8 ++++---- .../scheduler/model_scheduler/device_model_inference.py | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_cache.py b/python/fedml/computing/scheduler/model_scheduler/device_model_cache.py index 6c90944277..c941c42102 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_cache.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_cache.py @@ -369,7 +369,7 @@ def get_idle_device(self, if "model_status" in result_payload and result_payload["model_status"] == "DEPLOYED": idle_device_list.append({"device_id": device_id, "end_point_id": end_point_id}) - logging.info(f"{len(idle_device_list)} devices this model has on it: {idle_device_list}") + logging.debug(f"{len(idle_device_list)} devices this model has on it: {idle_device_list}") if len(idle_device_list) <= 0: return None, None @@ -398,7 +398,7 @@ def get_idle_device(self, logging.info("Inference Device selection Failed:") logging.info(e) - logging.info(f"Using Round Robin, the device index is {selected_device_index}") + logging.debug(f"Using Round Robin, the device index is {selected_device_index}") idle_device_dict = idle_device_list[selected_device_index] # Note that within the same endpoint_id, there could be one device with multiple same models @@ -411,7 +411,7 @@ def get_idle_device(self, # Find deployment result from the target idle device. try: for result_item in result_list: - logging.info("enter the for loop") + logging.debug("enter the for loop") device_id, _, result_payload = self.get_result_item_info(result_item) found_end_point_id = result_payload["end_point_id"] found_end_point_name = result_payload["end_point_name"] @@ -425,7 +425,7 @@ def get_idle_device(self, if same_model_device_rank > 0: same_model_device_rank -= 1 continue - logging.info(f"The chosen device is {device_id}") + logging.debug(f"The chosen device is {device_id}") return result_payload, device_id except Exception as e: logging.info(str(e)) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py b/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py index 3aeec67932..ba13006245 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py @@ -230,7 +230,7 @@ async def _predict( model_metrics.set_start_time(start_time) # Send inference request to idle device - logging.info("inference url {}.".format(inference_output_url)) + logging.debug("inference url {}.".format(inference_output_url)) if inference_output_url != "": input_list = input_json.get("inputs", input_json) stream_flag = input_json.get("stream", False) @@ -329,7 +329,7 @@ def found_idle_inference_device(end_point_id, end_point_name, in_model_name, in_ res = (idle_device, end_point_id, model_id, model_name, model_version, inference_host, inference_output_url, connectivity_type) - logging.info(f"found idle device with metrics: {res}") + logging.debug(f"found idle device with metrics: {res}") return res @@ -352,7 +352,7 @@ async def send_inference_request(idle_device, end_point_id, inference_url, input output_list, inference_type=inference_type, timeout=request_timeout_sec) - logging.info(f"Use http inference. return {response_ok}") + logging.debug(f"Use http inference. return {response_ok}") return inference_response elif connectivity_type == ClientConstants.WORKER_CONNECTIVITY_TYPE_HTTP_PROXY: logging.warning("Use http proxy inference.")