diff --git a/.github/workflows/CI_federate.yml b/.github/workflows/CI_federate.yml index 1a91606e3..52cdfd9e1 100644 --- a/.github/workflows/CI_federate.yml +++ b/.github/workflows/CI_federate.yml @@ -39,4 +39,4 @@ jobs: run: | cd python bash tests/test_federate/test_federate.sh - echo "Federate example has been tested successfully!" \ No newline at end of file + echo "Federate example has been tested successfully!" diff --git a/.github/workflows/CI_serving.yml b/.github/workflows/CI_serving.yml new file mode 100644 index 000000000..95423baa7 --- /dev/null +++ b/.github/workflows/CI_serving.yml @@ -0,0 +1,42 @@ +# This is a basic workflow to help you get started with Actions + +name: CI-serving + +# Controls when the workflow will run +on: + # Triggers the workflow on push or pull request events but only for the master branch + schedule: + # Nightly build at 12:12 A.M. + - cron: "0 10 */1 * *" + pull_request: + branches: [ master, dev/v0.7.0 ] + + # Allows you to run this workflow manually from the Actions tab + workflow_dispatch: + +# A workflow run is made up of one or more jobs that can run sequentially or in parallel +jobs: + serving: + runs-on: ${{ matrix.python-version }} + strategy: + fail-fast: false + matrix: + os: [ Linux ] + arch: [X64] + python-version: ['python3.8', 'python3.9', 'python3.10', 'python3.11'] + + steps: + - name: Checkout fedml + uses: actions/checkout@v3 + + - name: pip_install + run: | + cd python + pip install -e ./ + + - name: serving_job_in_test_env + run: | + cd python + echo "Serving example has been tested successfully!" + # python tests/test_launch/test_launch.py + diff --git a/.github/workflows/CI_train.yml b/.github/workflows/CI_train.yml index 7cae049db..529472d55 100644 --- a/.github/workflows/CI_train.yml +++ b/.github/workflows/CI_train.yml @@ -39,5 +39,4 @@ jobs: cd python python tests/test_train/test_train.py echo "Train example has been tested successfully!" - # cd examples/federate/quick_start/beehive diff --git a/python/fedml/computing/scheduler/comm_utils/network_util.py b/python/fedml/computing/scheduler/comm_utils/network_util.py new file mode 100644 index 000000000..48e478f23 --- /dev/null +++ b/python/fedml/computing/scheduler/comm_utils/network_util.py @@ -0,0 +1,18 @@ +import os +from fedml.computing.scheduler.model_scheduler.device_client_constants import ClientConstants + + +def return_this_device_connectivity_type() -> str: + """ + Return -> "http" | "http_proxy" |"mqtt" + """ + # Get the environmental variable's value and convert to lower case. + env_conn_type = os.getenv(ClientConstants.ENV_CONNECTION_TYPE_KEY, "").lower() + if env_conn_type in [ + ClientConstants.WORKER_CONNECTIVITY_TYPE_HTTP, + ClientConstants.WORKER_CONNECTIVITY_TYPE_HTTP_PROXY, + ClientConstants.WORKER_CONNECTIVITY_TYPE_MQTT + ]: + return env_conn_type + else: + return ClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT diff --git a/python/fedml/computing/scheduler/model_scheduler/device_client_constants.py b/python/fedml/computing/scheduler/model_scheduler/device_client_constants.py index 7894f2c73..2c06189d2 100644 --- a/python/fedml/computing/scheduler/model_scheduler/device_client_constants.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_client_constants.py @@ -97,6 +97,12 @@ class ClientConstants(object): INFERENCE_INFERENCE_SERVER_VERSION = "v2" INFERENCE_REQUEST_TIMEOUT = 30 + ENV_CONNECTION_TYPE_KEY = "FEDML_CONNECTION_TYPE" + WORKER_CONNECTIVITY_TYPE_HTTP = "http" + WORKER_CONNECTIVITY_TYPE_HTTP_PROXY = "http_proxy" + WORKER_CONNECTIVITY_TYPE_MQTT = "mqtt" + WORKER_CONNECTIVITY_TYPE_DEFAULT = WORKER_CONNECTIVITY_TYPE_HTTP + MSG_MODELOPS_DEPLOYMENT_STATUS_INITIALIZING = "INITIALIZING" MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYING = "DEPLOYING" MSG_MODELOPS_DEPLOYMENT_STATUS_INFERRING = "INFERRING" diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_cache.py b/python/fedml/computing/scheduler/model_scheduler/device_model_cache.py index 30e4f460e..c941c4210 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_cache.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_cache.py @@ -344,9 +344,13 @@ def get_result_item_info(self, result_item): result_payload = result_item_json["result"] return device_id, replica_no, result_payload - def get_idle_device(self, end_point_id, end_point_name, - model_name, model_version, - check_end_point_status=True, limit_specific_model_version=False): + def get_idle_device(self, + end_point_id, + end_point_name, + model_name, + model_version, + check_end_point_status=True, + limit_specific_model_version=False): # Deprecated the model status logic, query directly from the deployment result list idle_device_list = list() @@ -365,7 +369,7 @@ def get_idle_device(self, end_point_id, end_point_name, if "model_status" in result_payload and result_payload["model_status"] == "DEPLOYED": idle_device_list.append({"device_id": device_id, "end_point_id": end_point_id}) - logging.info(f"{len(idle_device_list)} devices this model has on it: {idle_device_list}") + logging.debug(f"{len(idle_device_list)} devices this model has on it: {idle_device_list}") if len(idle_device_list) <= 0: return None, None @@ -394,7 +398,7 @@ def get_idle_device(self, end_point_id, end_point_name, logging.info("Inference Device selection Failed:") logging.info(e) - logging.info(f"Using Round Robin, the device index is {selected_device_index}") + logging.debug(f"Using Round Robin, the device index is {selected_device_index}") idle_device_dict = idle_device_list[selected_device_index] # Note that within the same endpoint_id, there could be one device with multiple same models @@ -407,7 +411,7 @@ def get_idle_device(self, end_point_id, end_point_name, # Find deployment result from the target idle device. try: for result_item in result_list: - logging.info("enter the for loop") + logging.debug("enter the for loop") device_id, _, result_payload = self.get_result_item_info(result_item) found_end_point_id = result_payload["end_point_id"] found_end_point_name = result_payload["end_point_name"] @@ -421,7 +425,7 @@ def get_idle_device(self, end_point_id, end_point_name, if same_model_device_rank > 0: same_model_device_rank -= 1 continue - logging.info(f"The chosen device is {device_id}") + logging.debug(f"The chosen device is {device_id}") return result_payload, device_id except Exception as e: logging.info(str(e)) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py b/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py index 8feb757a6..c2f11a291 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py @@ -14,7 +14,6 @@ from fedml.core.common.singleton import Singleton from fedml.computing.scheduler.model_scheduler.modelops_configs import ModelOpsConfigs -from fedml.computing.scheduler.model_scheduler.device_model_deployment import get_model_info from fedml.computing.scheduler.model_scheduler.device_server_constants import ServerConstants from fedml.computing.scheduler.model_scheduler.device_model_object import FedMLModelList, FedMLEndpointDetail from fedml.computing.scheduler.model_scheduler.device_client_constants import ClientConstants diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py b/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py index 1876373d2..edd2ebea9 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py @@ -1,12 +1,13 @@ +import fedml + import logging import os -import pickle -import platform import shutil import time import traceback import yaml import datetime +import docker import requests import torch @@ -15,27 +16,18 @@ import collections.abc -import fedml from fedml.computing.scheduler.comm_utils import sys_utils, security_utils -from fedml.computing.scheduler.comm_utils.container_utils import ContainerUtils from fedml.computing.scheduler.comm_utils.hardware_utils import HardwareUtil from fedml.computing.scheduler.comm_utils.job_utils import JobRunnerUtils - -for type_name in collections.abc.__all__: - setattr(collections, type_name, getattr(collections.abc, type_name)) - from fedml.computing.scheduler.comm_utils.constants import SchedulerConstants from fedml.computing.scheduler.model_scheduler.device_client_constants import ClientConstants -import io - -import docker -from ..scheduler_core.compute_cache_manager import ComputeCacheManager +from fedml.computing.scheduler.model_scheduler.device_model_cache import FedMLModelCache from ..scheduler_core.compute_utils import ComputeUtils from ..comm_utils.container_utils import ContainerUtils - from .device_http_inference_protocol import FedMLHttpInference -from fedml.computing.scheduler.model_scheduler.device_model_cache import FedMLModelCache +for type_name in collections.abc.__all__: + setattr(collections, type_name, getattr(collections.abc, type_name)) no_real_gpu_allocation = None @@ -76,6 +68,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, num_gpus = gpu_per_replica gpu_ids, gpu_attach_cmd = None, "" + # Concatenate the model name running_model_name = ClientConstants.get_running_model_name( end_point_name, inference_model_name, model_version, end_point_id, model_id, edge_id=edge_id) @@ -85,6 +78,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, config = yaml.safe_load(file) # Resource related + inference_type = "default" use_gpu = config.get('use_gpu', True) num_gpus_frm_yml = config.get('num_gpus', None) if not use_gpu: @@ -93,9 +87,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, if num_gpus_frm_yml is not None: num_gpus = int(num_gpus_frm_yml) usr_indicated_wait_time = config.get('deploy_timeout', 900) - usr_indicated_worker_port = config.get('worker_port', "") - if usr_indicated_worker_port == "": - usr_indicated_worker_port = os.environ.get("FEDML_WORKER_PORT", "") + usr_indicated_retry_cnt = max(int(usr_indicated_wait_time) // 10, 1) shm_size = config.get('shm_size', None) storage_opt = config.get('storage_opt', None) tmpfs = config.get('tmpfs', None) @@ -104,17 +96,6 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, cpus = int(cpus) memory = config.get('memory', None) - if usr_indicated_worker_port == "": - usr_indicated_worker_port = None - else: - usr_indicated_worker_port = int(usr_indicated_worker_port) - - worker_port_env = os.environ.get("FEDML_WORKER_PORT", "") - worker_port_from_config = config.get('worker_port', "") - logging.info(f"usr_indicated_worker_port {usr_indicated_worker_port}, worker port env {worker_port_env}, " - f"worker port from config {worker_port_from_config}") - - usr_indicated_retry_cnt = max(int(usr_indicated_wait_time) // 10, 1) inference_image_name = config.get('inference_image_name', ClientConstants.INFERENCE_SERVER_CUSTOME_IMAGE) image_pull_policy = config.get('image_pull_policy', SchedulerConstants.IMAGE_PULL_POLICY_IF_NOT_PRESENT) @@ -152,6 +133,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, # If using customized image, then bootstrap + job will be the entry point enable_custom_image = config.get("enable_custom_image", False) + # inference_type = "custom" customized_image_entry_cmd = \ "/bin/bash /home/fedml/models_serving/fedml-deploy-bootstrap-entry-auto-gen.sh" @@ -159,18 +141,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, docker_registry_user_password = config.get("docker_registry_user_password", "") docker_registry = config.get("docker_registry", "") - port_inside_container = int(config.get("port_inside_container", 2345)) - use_triton = config.get("use_triton", False) - if use_triton: - inference_type = "triton" - else: - inference_type = "default" - - # Config check - if src_code_dir == "": - raise Exception("Please indicate source_code_dir in the fedml_model_config.yaml") - if relative_entry == "": - logging.warning("You missed main_entry in the fedml_model_config.yaml") + port_inside_container = int(config.get("port", 2345)) # Request the GPU ids for the deployment if num_gpus > 0: @@ -183,22 +154,10 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, end_point_id, end_point_name, inference_model_name, edge_id, replica_rank+1, gpu_ids) logging.info("GPU ids allocated: {}".format(gpu_ids)) + # Create the model serving dir if not exists model_serving_dir = ClientConstants.get_model_serving_dir() if not os.path.exists(model_serving_dir): os.makedirs(model_serving_dir, exist_ok=True) - converted_model_path = os.path.join(model_storage_local_path, ClientConstants.FEDML_CONVERTED_MODEL_DIR_NAME) - if os.path.exists(converted_model_path): - model_file_list = os.listdir(converted_model_path) - for model_file in model_file_list: - src_model_file = os.path.join(converted_model_path, model_file) - dst_model_file = os.path.join(model_serving_dir, model_file) - if os.path.isdir(src_model_file): - if not os.path.exists(dst_model_file): - shutil.copytree(src_model_file, dst_model_file, copy_function=shutil.copy, - ignore_dangling_symlinks=True) - else: - if not os.path.exists(dst_model_file): - shutil.copyfile(src_model_file, dst_model_file) if inference_engine != ClientConstants.INFERENCE_ENGINE_TYPE_INT_DEFAULT: raise Exception(f"inference engine {inference_engine} is not supported") @@ -236,13 +195,12 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, logging.info(f"Start pulling the inference image {inference_image_name}... with policy {image_pull_policy}") ContainerUtils.get_instance().pull_image_with_policy(image_pull_policy, inference_image_name) - volumns = [] + volumes = [] binds = {} environment = {} # data_cache_dir mounting - assert type(data_cache_dir_input) == dict or type(data_cache_dir_input) == str - if type(data_cache_dir_input) == str: + if isinstance(data_cache_dir_input, str): # In this case, we mount to the same folder, if it has ~, we replace it with /home/fedml src_data_cache_dir, dst_data_cache_dir = "", "" if data_cache_dir_input != "": @@ -261,28 +219,30 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, if type(src_data_cache_dir) == str and src_data_cache_dir != "": logging.info("Start copying the data cache to the container...") if os.path.exists(src_data_cache_dir): - volumns.append(src_data_cache_dir) + volumes.append(src_data_cache_dir) binds[src_data_cache_dir] = { "bind": dst_data_cache_dir, "mode": "rw" } environment["DATA_CACHE_FOLDER"] = dst_data_cache_dir - else: + elif isinstance(data_cache_dir_input, dict): for k, v in data_cache_dir_input.items(): if os.path.exists(k): - volumns.append(v) + volumes.append(v) binds[k] = { "bind": v, "mode": "rw" } else: logging.warning(f"{k} does not exist, skip mounting it to the container") - logging.info(f"Data cache mount: {volumns}, {binds}") + logging.info(f"Data cache mount: {volumes}, {binds}") + else: + logging.warning("data_cache_dir_input is not a string or a dictionary, skip mounting it to the container") # Default mounting if not enable_custom_image or (enable_custom_image and relative_entry != ""): logging.info("Start copying the source code to the container...") - volumns.append(src_code_dir) + volumes.append(src_code_dir) binds[src_code_dir] = { "bind": dst_model_serving_dir, "mode": "rw" @@ -292,7 +252,7 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, host_config_dict = { "binds": binds, "port_bindings": { - port_inside_container: usr_indicated_worker_port + port_inside_container: None }, "shm_size": shm_size, "storage_opt": storage_opt, @@ -320,7 +280,6 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, if not enable_custom_image: # For some image, the default user is root. Unified to fedml. environment["HOME"] = "/home/fedml" - environment["BOOTSTRAP_DIR"] = dst_bootstrap_dir environment["FEDML_CURRENT_RUN_ID"] = end_point_id environment["FEDML_CURRENT_EDGE_ID"] = edge_id @@ -334,12 +293,13 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, for key in extra_envs: environment[key] = extra_envs[key] + # Create the container try: host_config = client.api.create_host_config(**host_config_dict) new_container = client.api.create_container( image=inference_image_name, name=default_server_container_name, - volumes=volumns, + volumes=volumes, ports=[port_inside_container], # port open inside the container environment=environment, host_config=host_config, @@ -357,22 +317,18 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, while True: cnt += 1 try: - if usr_indicated_worker_port is not None: - inference_http_port = usr_indicated_worker_port - break - else: - # Find the random port - port_info = client.api.port(new_container.get("Id"), port_inside_container) - inference_http_port = port_info[0]["HostPort"] - logging.info("inference_http_port: {}".format(inference_http_port)) - break + # Find the random port + port_info = client.api.port(new_container.get("Id"), port_inside_container) + inference_http_port = port_info[0]["HostPort"] + logging.info("host port allocated: {}".format(inference_http_port)) + break except: if cnt >= 5: raise Exception("Failed to get the port allocation") time.sleep(3) # Logging the info from the container when starting - log_deployment_result(end_point_id, model_id, default_server_container_name, + log_deployment_output(end_point_id, model_id, default_server_container_name, ClientConstants.CMD_TYPE_RUN_DEFAULT_SERVER, inference_model_name, inference_engine, inference_http_port, inference_type, retry_interval=10, deploy_attempt_threshold=usr_indicated_retry_cnt, @@ -381,9 +337,8 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, # Return the running model name and the inference output url inference_output_url, running_model_version, ret_model_metadata, ret_model_config = \ - get_model_info(inference_model_name, inference_engine, inference_http_port, - infer_host, False, inference_type, request_input_example=request_input_example, - enable_custom_image=enable_custom_image) + check_container_readiness(inference_http_port=inference_http_port, infer_host=infer_host, + request_input_example=request_input_example) if inference_output_url == "": return running_model_name, "", None, None, None @@ -432,13 +387,10 @@ def should_exit_logs(end_point_id, model_id, cmd_type, model_name, inference_eng if cmd_type == ClientConstants.CMD_TYPE_RUN_DEFAULT_SERVER: # TODO: Exited Quickly if the container is Exited or Removed # If the container has exited, return True, means we should exit the logs - # container_name = "{}".format(ClientConstants.FEDML_DEFAULT_SERVER_CONTAINER_NAME_PREFIX) + "__" + \ - # security_utils.get_content_hash(model_name) try: inference_output_url, model_version, model_metadata, model_config = \ - get_model_info(model_name, inference_engine, inference_port, infer_host, - inference_type=inference_type, request_input_example=request_input_example, - enable_custom_image=enable_custom_image) + check_container_readiness(inference_http_port=inference_port, infer_host=infer_host, + request_input_example=request_input_example) if inference_output_url != "": logging.info("Log test for deploying model successfully, inference url: {}, " "model metadata: {}, model config: {}". @@ -453,7 +405,7 @@ def should_exit_logs(end_point_id, model_id, cmd_type, model_name, inference_eng return False -def log_deployment_result(end_point_id, model_id, cmd_container_name, cmd_type, +def log_deployment_output(end_point_id, model_id, cmd_container_name, cmd_type, inference_model_name, inference_engine, inference_http_port, inference_type="default", retry_interval=10, deploy_attempt_threshold=10, @@ -552,12 +504,10 @@ def log_deployment_result(end_point_id, model_id, cmd_container_name, cmd_type, time.sleep(retry_interval) -def is_client_inference_container_ready(infer_url_host, inference_http_port, inference_model_name, local_infer_url, - inference_type="default", model_version="", request_input_example=None): - # logging.info(f"Inference type: {inference_type}, infer_url_host {infer_url_host}, \ - # inference_http_port: {inference_http_port}, local_infer_url {local_infer_url}") +def is_client_inference_container_ready(infer_url_host, inference_http_port, readiness_check_type="default", + readiness_check_cmd=None, request_input_example=None): - if inference_type == "default": + if readiness_check_type == "default": default_client_container_ready_url = "http://{}:{}/ready".format("0.0.0.0", inference_http_port) response = None try: @@ -567,7 +517,7 @@ def is_client_inference_container_ready(infer_url_host, inference_http_port, inf if not response or response.status_code != 200: return "", "", {}, {} - # Report the deployed model info + # Construct the model metadata (input and output) model_metadata = {} if request_input_example is not None and len(request_input_example) > 0: model_metadata["inputs"] = request_input_example @@ -575,51 +525,19 @@ def is_client_inference_container_ready(infer_url_host, inference_http_port, inf model_metadata["inputs"] = {"text": "What is a good cure for hiccups?"} model_metadata["outputs"] = [] model_metadata["type"] = "default" + return "http://{}:{}/predict".format(infer_url_host, inference_http_port), None, model_metadata, None else: - triton_server_url = "{}:{}".format(infer_url_host, inference_http_port) - if model_version == "" or model_version is None: - model_version = ClientConstants.INFERENCE_MODEL_VERSION - logging.info( - f"triton_server_url: {triton_server_url} model_version: {model_version} model_name: {inference_model_name}") - triton_client = http_client.InferenceServerClient(url=triton_server_url, verbose=False) - if not triton_client.is_model_ready( - model_name=inference_model_name, model_version=model_version - ): - return "", model_version, {}, {} - logging.info(f"Model {inference_model_name} is ready, start to get model metadata...") - model_metadata = triton_client.get_model_metadata(model_name=inference_model_name, model_version=model_version) - model_config = triton_client.get_model_config(model_name=inference_model_name, model_version=model_version) - version_list = model_metadata.get("versions", None) - if version_list is not None and len(version_list) > 0: - model_version = version_list[0] - else: - model_version = ClientConstants.INFERENCE_MODEL_VERSION - - inference_output_url = "http://{}:{}/{}/models/{}/versions/{}/infer".format(infer_url_host, - inference_http_port, - ClientConstants.INFERENCE_INFERENCE_SERVER_VERSION, - inference_model_name, - model_version) - - return inference_output_url, model_version, model_metadata, model_config - - -def get_model_info(model_name, inference_engine, inference_http_port, infer_host="127.0.0.1", is_hg_model=False, - inference_type="default", request_input_example=None, enable_custom_image=False): - if model_name is None: + # TODO(Raphael): Support arbitrary readiness check command + logging.error(f"Unknown readiness check type: {readiness_check_type}") return "", "", {}, {} - local_infer_url = "{}:{}".format(infer_host, inference_http_port) - - if is_hg_model: - inference_model_name = "{}_{}_inference".format(model_name, str(inference_engine)) - else: - inference_model_name = model_name +def check_container_readiness(inference_http_port, infer_host="127.0.0.1", request_input_example=None, + readiness_check_type="default", readiness_check_cmd=None): response_from_client_container = is_client_inference_container_ready( - infer_host, inference_http_port, inference_model_name, local_infer_url, - inference_type, model_version="", request_input_example=request_input_example) + infer_host, inference_http_port, readiness_check_type, readiness_check_cmd, + request_input_example=request_input_example) return response_from_client_container @@ -631,211 +549,5 @@ def run_http_inference_with_curl_request(inference_url, inference_input_list, in inference_type=inference_type, engine_type=engine_type, timeout=timeout) -def convert_model_to_onnx( - torch_model, output_path: str, dummy_input_list, input_size: int, input_is_tensor=True -) -> None: - from collections import OrderedDict - import torch - from torch.onnx import TrainingMode - - torch.onnx.export(torch_model, # model being run - dummy_input_list if input_is_tensor else tuple(dummy_input_list), - # model input (or a tuple for multiple inputs) - f=output_path, # where to save the model (can be a file or file-like object) - export_params=True, # store the trained parameter weights inside the model file - opset_version=11, # the ONNX version to export the model to - do_constant_folding=False, # whether to execute constant folding for optimization - input_names=["input1", "input2"], - # the model's input names - output_names=['output'], # the model's output names - training=TrainingMode.EVAL, - verbose=True, - dynamic_axes={"input1": {0: "batch_size"}, - "input2": {0: "batch_size"}, - "output": {0: "batch_size"}} - ) - - -def test_start_triton_server(model_serving_dir): - sudo_prefix = "sudo " - sys_name = platform.system() - if sys_name == "Darwin": - sudo_prefix = "" - gpu_attach_cmd = "" - - triton_server_container_name = "{}".format(ClientConstants.FEDML_TRITON_SERVER_CONTAINER_NAME_PREFIX) - triton_server_cmd = "{}docker stop {}; {}docker rm {}; {}docker run --name {} {} -p{}:8000 " \ - "-p{}:8001 -p{}:8002 " \ - "--shm-size {} " \ - "-v {}:/models {} " \ - "bash -c \"pip install transformers && tritonserver --strict-model-config=false " \ - "--model-control-mode=poll --repository-poll-secs={} " \ - "--model-repository=/models\" ".format(sudo_prefix, triton_server_container_name, - sudo_prefix, triton_server_container_name, - sudo_prefix, triton_server_container_name, - gpu_attach_cmd, - ClientConstants.INFERENCE_HTTP_PORT, - ClientConstants.INFERENCE_GRPC_PORT, - 8002, - "4096m", - model_serving_dir, - ClientConstants.INFERENCE_SERVER_IMAGE, - ClientConstants.FEDML_MODEL_SERVING_REPO_SCAN_INTERVAL) - logging.info("Run triton inference server: {}".format(triton_server_cmd)) - triton_server_process = ClientConstants.exec_console_with_script(triton_server_cmd, - should_capture_stdout=False, - should_capture_stderr=False, - no_sys_out_err=True) - - -def test_convert_pytorch_model_to_onnx(model_net_file, model_bin_file, model_name, model_in_params): - torch_model = torch.jit.load(model_net_file) - with open(model_bin_file, 'rb') as model_pkl_file: - model_state_dict = pickle.load(model_pkl_file) - torch_model.load_state_dict(model_state_dict) - torch_model.eval() - - input_size = model_in_params["input_size"] - input_types = model_in_params["input_types"] - - dummy_input_list = [] - for index, input_i in enumerate(input_size): - if input_types[index] == "int": - this_input = torch.tensor(torch.randint(0, 1, input_i)) - else: - this_input = torch.tensor(torch.zeros(input_i)) - dummy_input_list.append(this_input) - - onnx_model_dir = os.path.join(ClientConstants.get_model_cache_dir(), - ClientConstants.FEDML_CONVERTED_MODEL_DIR_NAME, - model_name, ClientConstants.INFERENCE_MODEL_VERSION) - if not os.path.exists(onnx_model_dir): - os.makedirs(onnx_model_dir, exist_ok=True) - onnx_model_path = os.path.join(onnx_model_dir, "model.onnx") - - convert_model_to_onnx(torch_model, onnx_model_path, dummy_input_list, input_size, - input_is_tensor=True) - - model_serving_dir = os.path.join(ClientConstants.get_model_cache_dir(), - ClientConstants.FEDML_CONVERTED_MODEL_DIR_NAME) - return model_serving_dir - - -def start_gpu_model_load_process(): - from multiprocessing import Process - import time - process = Process(target=load_gpu_model_to_cpu_device) - process.start() - while True: - time.sleep(1) - - -def load_gpu_model_to_cpu_device(): - import pickle - import io - import torch - - class CPU_Unpickler(pickle.Unpickler): - def find_class(self, module, name): - if module == 'torch.storage' and name == '_load_from_bytes': - return lambda b: torch.load(io.BytesIO(b), map_location='cpu') - else: - return super().find_class(module, name) - - model_file = "/home/fedml/.fedml/fedml-client/fedml/models/theta_rec_auc_81_single_label/theta_rec_auc_81_single_label" - with open(model_file, "rb") as model_pkl_file: - if not torch.cuda.is_available(): - model = CPU_Unpickler(model_pkl_file).load() - if model is None: - print("Failed to load gpu model to cpu device") - else: - print("Succeeded to load gpu model to cpu device") - - if __name__ == "__main__": - start_gpu_model_load_process() - - model_serving_dir = test_convert_pytorch_model_to_onnx("./sample-open-training-model-net", - "./sample-open-training-model", - "rec-model", - {"input_size": [[1, 24], [1, 2]], - "input_types": ["int", "float"]}) - - test_start_triton_server(model_serving_dir) - - # input_data = {"model_version": "v0-Sun Feb 05 12:17:16 GMT 2023", - # "model_name": "model_414_45_open-model-test_v0-Sun-Feb-05-12-17-16-GMT-2023", - # # "data": "file:///Users/alexliang/fedml_data/mnist-image.png", - # "data": "https://raw.githubusercontent.com/niyazed/triton-mnist-example/master/images/sample_image.png", - # "end_point_id": 414, "model_id": 45, "token": "a09a18a14c4c4d89a8d5f9515704c073"} - # - # data_list = list() - # data_list.append(input_data["data"]) - # run_http_inference_with_lib_http_api_with_image_data(input_data["model_name"], - # 5001, 1, data_list, "") - # - # - # class LogisticRegression(torch.nn.Module): - # def __init__(self, input_dim, output_dim): - # super(LogisticRegression, self).__init__() - # self.linear = torch.nn.Linear(input_dim, output_dim) - # - # def forward(self, x): - # outputs = torch.sigmoid(self.linear(x)) - # return outputs - # - # - # model = LogisticRegression(28 * 28, 10) - # checkpoint = {'model': model} - # model_net_file = "/Users/alexliang/fedml-client/fedml/models/open-model-test/model-net.pt" - # torch.save(checkpoint, model_net_file) - # - # with open("/Users/alexliang/fedml-client/fedml/models/open-model-test/open-model-test", 'rb') as model_pkl_file: - # model_params = pickle.load(model_pkl_file) - # # torch.save(model_params, "/Users/alexliang/fedml-client/fedml/models/open-model-test/a.pt") - # # model = torch.load("/Users/alexliang/fedml-client/fedml/models/open-model-test/a.pt") - # loaded_checkpoint = torch.load(model_net_file) - # loaded_model = loaded_checkpoint["model"] - # loaded_model.load_state_dict(model_params) - # for parameter in loaded_model.parameters(): - # parameter.requires_grad = False - # loaded_model.eval() - # input_names = {"x": 0} - # convert_model_to_onnx(loaded_model, "/Users/alexliang/fedml-client/fedml/models/open-model-test/a.onnx", - # input_names, 28 * 28) - - # parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) - # parser.add_argument("--cf", "-c", help="config file") - # parser.add_argument("--role", "-r", type=str, default="client", help="role") - # parser.add_argument("--model_storage_local_path", "-url", type=str, default="/home/ubuntu", - # help="model storage local path") - # parser.add_argument("--inference_model_name", "-n", type=str, default="fedml-model", - # help="inference model name") - # parser.add_argument("--inference_engine", "-engine", type=str, default="ONNX", help="inference engine") - # parser.add_argument("--inference_http_port", "-http", type=int, default=8000, help="inference http port") - # parser.add_argument("--inference_grpc_port", "-gprc", type=int, default=8001, help="inference grpc port") - # parser.add_argument("--inference_metric_port", "-metric", type=int, default=8002, help="inference metric port") - # parser.add_argument("--inference_use_gpu", "-gpu", type=str, default="gpu", help="inference use gpu") - # parser.add_argument("--inference_memory_size", "-mem", type=str, default="256m", help="inference memory size") - # parser.add_argument("--inference_convertor_image", "-convertor", type=str, - # default=ClientConstants.INFERENCE_CONVERTOR_IMAGE, help="inference convertor image") - # parser.add_argument("--inference_server_image", "-server", type=str, - # default=ClientConstants.INFERENCE_SERVER_IMAGE, help="inference server image") - # args = parser.parse_args() - # args.user = args.user - # - # pip_source_dir = os.path.dirname(__file__) - # __running_model_name, __inference_output_url, __model_version, __model_metadata, __model_config = \ - # start_deployment( - # args.model_storage_local_path, - # args.inference_model_name, - # args.inference_engine, - # args.inference_http_port, - # args.inference_grpc_port, - # args.inference_metric_port, - # args.inference_use_gpu, - # args.inference_memory_size, - # args.inference_convertor_image, - # args.inference_server_image) - # print("Model deployment results, running model name: {}, url: {}, model metadata: {}, model config: {}".format( - # __running_model_name, __inference_output_url, __model_metadata, __model_config)) + pass diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py b/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py index d073533b7..ba1300624 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py @@ -210,7 +210,8 @@ async def _predict( return inference_response # Found idle inference device - idle_device, end_point_id, model_id, model_name, model_version, inference_host, inference_output_url = \ + idle_device, end_point_id, model_id, model_name, model_version, inference_host, inference_output_url,\ + connectivity_type = \ found_idle_inference_device(in_end_point_id, in_end_point_name, in_model_name, in_model_version) if idle_device is None or idle_device == "": FEDML_MODEL_CACHE.update_pending_requests_counter(end_point_id, decrease=True) @@ -229,19 +230,22 @@ async def _predict( model_metrics.set_start_time(start_time) # Send inference request to idle device - logging.info("inference url {}.".format(inference_output_url)) + logging.debug("inference url {}.".format(inference_output_url)) if inference_output_url != "": input_list = input_json.get("inputs", input_json) stream_flag = input_json.get("stream", False) input_list["stream"] = input_list.get("stream", stream_flag) output_list = input_json.get("outputs", []) + + # main execution of redirecting the inference request to the idle device inference_response = await send_inference_request( idle_device, end_point_id, inference_output_url, input_list, output_list, - inference_type=in_return_type) + inference_type=in_return_type, + connectivity_type=connectivity_type) # Calculate model metrics try: @@ -304,37 +308,40 @@ def found_idle_inference_device(end_point_id, end_point_name, in_model_name, in_ inference_host = "" inference_output_url = "" model_version = "" + connectivity_type = "" + # Found idle device (TODO: optimize the algorithm to search best device for inference) payload, idle_device = FEDML_MODEL_CACHE. \ get_idle_device(end_point_id, end_point_name, in_model_name, in_model_version) - if payload is not None: - logging.info("found idle deployment result {}".format(payload)) - deployment_result = payload - model_name = deployment_result["model_name"] - model_version = deployment_result["model_version"] - model_id = deployment_result["model_id"] - end_point_id = deployment_result["end_point_id"] - inference_output_url = deployment_result["model_url"] + if payload: + model_name = payload["model_name"] + model_version = payload["model_version"] + model_id = payload["model_id"] + end_point_id = payload["end_point_id"] + inference_output_url = payload["model_url"] + connectivity_type = \ + payload.get("connectivity_type", + ClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT) url_parsed = urlparse(inference_output_url) inference_host = url_parsed.hostname else: logging.info("not found idle deployment result") - return idle_device, end_point_id, model_id, model_name, model_version, inference_host, inference_output_url + res = (idle_device, end_point_id, model_id, model_name, model_version, inference_host, inference_output_url, + connectivity_type) + logging.debug(f"found idle device with metrics: {res}") + + return res async def send_inference_request(idle_device, end_point_id, inference_url, input_list, output_list, - inference_type="default", has_public_ip=True): + inference_type="default", + connectivity_type=ClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT): request_timeout_sec = FEDML_MODEL_CACHE.get_endpoint_settings(end_point_id) \ .get("request_timeout_sec", ClientConstants.INFERENCE_REQUEST_TIMEOUT) try: - http_infer_available = os.getenv("FEDML_INFERENCE_HTTP_AVAILABLE", True) - if not http_infer_available: - if http_infer_available == "False" or http_infer_available == "false": - http_infer_available = False - - if http_infer_available: + if connectivity_type == ClientConstants.WORKER_CONNECTIVITY_TYPE_HTTP: response_ok = await FedMLHttpInference.is_inference_ready( inference_url, timeout=request_timeout_sec) @@ -345,24 +352,25 @@ async def send_inference_request(idle_device, end_point_id, inference_url, input output_list, inference_type=inference_type, timeout=request_timeout_sec) - logging.info(f"Use http inference. return {response_ok}") + logging.debug(f"Use http inference. return {response_ok}") return inference_response - - response_ok = await FedMLHttpProxyInference.is_inference_ready( - inference_url, - timeout=request_timeout_sec) - if response_ok: - response_ok, inference_response = await FedMLHttpProxyInference.run_http_proxy_inference_with_request( - end_point_id, + elif connectivity_type == ClientConstants.WORKER_CONNECTIVITY_TYPE_HTTP_PROXY: + logging.warning("Use http proxy inference.") + response_ok = await FedMLHttpProxyInference.is_inference_ready( inference_url, - input_list, - output_list, - inference_type=inference_type, timeout=request_timeout_sec) - logging.info(f"Use http proxy inference. return {response_ok}") - return inference_response - - if not has_public_ip: + if response_ok: + response_ok, inference_response = await FedMLHttpProxyInference.run_http_proxy_inference_with_request( + end_point_id, + inference_url, + input_list, + output_list, + inference_type=inference_type, + timeout=request_timeout_sec) + logging.info(f"Use http proxy inference. return {response_ok}") + return inference_response + elif connectivity_type == ClientConstants.WORKER_CONNECTIVITY_TYPE_MQTT: + logging.warning("Use mqtt inference.") agent_config = {"mqtt_config": Settings.mqtt_config} mqtt_inference = FedMLMqttInference( agent_config=agent_config, @@ -385,7 +393,8 @@ async def send_inference_request(idle_device, end_point_id, inference_url, input logging.info(f"Use mqtt inference. return {response_ok}.") return inference_response - return {"error": True, "message": "Failed to use http, http-proxy for inference, no response from replica."} + else: + return {"error": True, "message": "Failed to use http, http-proxy for inference, no response from replica."} except Exception as e: inference_response = {"error": True, "message": f"Exception when using http, http-proxy and mqtt " diff --git a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py index 9854dad5f..af8f5dce5 100755 --- a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py @@ -250,14 +250,6 @@ def process_deployment_result_message(self, topic=None, payload=None): logging.info(f"Endpoint {end_point_id}; Device {device_id}; replica {replica_no}; " f"run_operation {run_operation} model status {model_status}.") - # OPTIONAL DEBUG PARAMS - # this_run_controller = self.model_runner_mapping[run_id_str].replica_controller - # logging.info(f"The current replica controller state is " - # f"Total version diff num {this_run_controller.total_replica_version_diff_num}") - # logging.info(f"self.request_json now {self.request_json}") # request_json will be deprecated - # this_run_request_json = self.request_json - # logging.info(f"self.request_json now {this_run_request_json}") - # Set redis + sqlite deployment result FedMLModelCache.get_instance().set_redis_params(self.redis_addr, self.redis_port, self.redis_password) @@ -461,7 +453,6 @@ def process_deployment_result_message(self, topic=None, payload=None): time.sleep(3) self.trigger_completed_event() - def cleanup_runner_process(self, run_id): ServerConstants.cleanup_run_process(run_id, not_kill_subprocess=True) diff --git a/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py b/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py index 3c357e9da..810070738 100755 --- a/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py @@ -9,6 +9,8 @@ from abc import ABC import yaml from fedml.computing.scheduler.comm_utils.job_utils import JobRunnerUtils +from fedml.computing.scheduler.comm_utils.network_util import return_this_device_connectivity_type + from fedml.core.mlops import MLOpsRuntimeLog from fedml.computing.scheduler.comm_utils import file_utils from .device_client_constants import ClientConstants @@ -234,8 +236,11 @@ def run_impl(self, run_extend_queue_list, sender_message_center, running_model_name, inference_output_url, inference_model_version, model_metadata, model_config = \ "", "", model_version, {}, {} + # ip and connectivity + worker_ip = GeneralConstants.get_ip_address(self.request_json) + connectivity = return_this_device_connectivity_type() + if op == "add": - worker_ip = GeneralConstants.get_ip_address(self.request_json) for rank in range(prev_rank + 1, prev_rank + 1 + op_num): try: running_model_name, inference_output_url, inference_model_version, model_metadata, model_config = \ @@ -269,7 +274,9 @@ def run_impl(self, run_extend_queue_list, sender_message_center, result_payload = self.send_deployment_results( end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, model_id, model_name, inference_output_url, model_version, inference_port_external, - inference_engine, model_metadata, model_config, replica_no=rank + 1) + inference_engine, model_metadata, model_config, replica_no=rank + 1, + connectivity=connectivity + ) if inference_port_external != inference_port: # Save internal port to local db @@ -278,16 +285,16 @@ def run_impl(self, run_extend_queue_list, sender_message_center, result_payload = self.construct_deployment_results( end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, model_id, model_name, inference_output_url, model_version, inference_port, - inference_engine, model_metadata, model_config, replica_no=rank + 1) + inference_engine, model_metadata, model_config, replica_no=rank + 1, + connectivity=connectivity + ) FedMLModelDatabase.get_instance().set_deployment_result( run_id, end_point_name, model_name, model_version, self.edge_id, json.dumps(result_payload), replica_no=rank + 1) logging.info(f"Deploy replica {rank + 1} / {prev_rank + 1 + op_num} successfully.") - time.sleep(5) - time.sleep(1) self.status_reporter.run_id = self.run_id self.status_reporter.report_client_id_status( self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FINISHED, @@ -326,7 +333,6 @@ def run_impl(self, run_extend_queue_list, sender_message_center, return True elif op == "update" or op == "rollback": # Update is combine of delete and add - worker_ip = GeneralConstants.get_ip_address(self.request_json) for rank in replica_rank_to_update: # Delete a replica (container) if exists self.replica_handler.remove_replica(rank) @@ -340,7 +346,8 @@ def run_impl(self, run_extend_queue_list, sender_message_center, # TODO (Raphael) check if this will allow another job to seize the gpu during high concurrency: try: - JobRunnerUtils.get_instance().release_partial_job_gpu(run_id, self.edge_id, replica_occupied_gpu_ids) + JobRunnerUtils.get_instance().release_partial_job_gpu( + run_id, self.edge_id, replica_occupied_gpu_ids) except Exception as e: if op == "rollback": pass @@ -387,7 +394,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center, JobRunnerUtils.get_instance().release_partial_job_gpu( run_id, self.edge_id, replica_occupied_gpu_ids) - result_payload = self.send_deployment_results( + self.send_deployment_results( end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED, model_id, model_name, inference_output_url, inference_model_version, inference_port, inference_engine, model_metadata, model_config) @@ -402,7 +409,9 @@ def run_impl(self, run_extend_queue_list, sender_message_center, result_payload = self.send_deployment_results( end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, model_id, model_name, inference_output_url, model_version, inference_port_external, - inference_engine, model_metadata, model_config, replica_no=rank + 1) + inference_engine, model_metadata, model_config, replica_no=rank + 1, + connectivity=connectivity + ) if inference_port_external != inference_port: # Save internal port to local db logging.info("inference_port_external {} != inference_port {}".format( @@ -410,7 +419,9 @@ def run_impl(self, run_extend_queue_list, sender_message_center, result_payload = self.construct_deployment_results( end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, model_id, model_name, inference_output_url, model_version, inference_port, - inference_engine, model_metadata, model_config, replica_no=rank + 1) + inference_engine, model_metadata, model_config, replica_no=rank + 1, + connectivity=connectivity + ) FedMLModelDatabase.get_instance().set_deployment_result( run_id, end_point_name, model_name, model_version, self.edge_id, @@ -433,7 +444,8 @@ def run_impl(self, run_extend_queue_list, sender_message_center, def construct_deployment_results(self, end_point_name, device_id, model_status, model_id, model_name, model_inference_url, model_version, inference_port, inference_engine, - model_metadata, model_config, replica_no=1): + model_metadata, model_config, replica_no=1, + connectivity=ClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT): deployment_results_payload = {"end_point_id": self.run_id, "end_point_name": end_point_name, "model_id": model_id, "model_name": model_name, "model_url": model_inference_url, "model_version": model_version, @@ -444,6 +456,7 @@ def construct_deployment_results(self, end_point_name, device_id, model_status, "model_status": model_status, "inference_port": inference_port, "replica_no": replica_no, + "connectivity_type": connectivity, } return deployment_results_payload @@ -466,7 +479,8 @@ def construct_deployment_status(self, end_point_name, device_id, def send_deployment_results(self, end_point_name, device_id, model_status, model_id, model_name, model_inference_url, model_version, inference_port, inference_engine, - model_metadata, model_config, replica_no=1): + model_metadata, model_config, replica_no=1, + connectivity=ClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT): deployment_results_topic = "model_device/model_device/return_deployment_result/{}/{}".format( self.run_id, device_id) @@ -474,22 +488,13 @@ def send_deployment_results(self, end_point_name, device_id, model_status, end_point_name, device_id, model_status, model_id, model_name, model_inference_url, model_version, inference_port, inference_engine, - model_metadata, model_config, replica_no=replica_no) + model_metadata, model_config, replica_no=replica_no, connectivity=connectivity) logging.info("[client] send_deployment_results: topic {}, payload {}.".format(deployment_results_topic, deployment_results_payload)) self.message_center.send_message_json(deployment_results_topic, json.dumps(deployment_results_payload)) return deployment_results_payload - def send_deployment_status(self, end_point_name, device_id, - model_id, model_name, model_version, - model_inference_url, model_status, - inference_port=ClientConstants.MODEL_INFERENCE_DEFAULT_PORT, - replica_no=1, # start from 1 - ): - # Deprecated - pass - def reset_devices_status(self, edge_id, status): self.status_reporter.run_id = self.run_id self.status_reporter.edge_id = edge_id diff --git a/python/tests/test_server/test_server.py b/python/tests/test_server/test_server.py new file mode 100644 index 000000000..15501b1d7 --- /dev/null +++ b/python/tests/test_server/test_server.py @@ -0,0 +1,30 @@ +import os.path +import time +import fedml +from fedml.api.constants import RunStatus + +# Login +fedml.set_env_version("test") +fedml.set_local_on_premise_platform_port(18080) +error_code, error_msg = fedml.api.fedml_login(api_key="") +if error_code != 0: + raise Exception("API Key is invalid!") + +# Yaml file +cur_dir = os.path.dirname(__file__) +fedml_dir = os.path.dirname(cur_dir) +python_dir = os.path.dirname(fedml_dir) +yaml_file = os.path.join(python_dir, "examples", "launch", "serve_job_mnist.yaml") + +# Launch job +launch_result_dict = {} +launch_result_status = {} + +launch_result = fedml.api.launch_job(yaml_file) + +# launch_result = fedml.api.launch_job_on_cluster(yaml_file, "alex-cluster") +if launch_result.result_code != 0: + raise Exception(f"Failed to launch job. Reason: {launch_result.result_message}") + +launch_result_dict[launch_result.run_id] = launch_result +launch_result_status[launch_result.run_id] = RunStatus.STARTING diff --git a/python/tests/test_train/test_train.py b/python/tests/test_train/test_train.py index 33f5b6f4f..e2017b5a1 100644 --- a/python/tests/test_train/test_train.py +++ b/python/tests/test_train/test_train.py @@ -45,4 +45,4 @@ if log_result.run_status == RunStatus.FINISHED: print(f"Job finished successfully.") break - \ No newline at end of file +