diff --git a/devops/dockerfile/fedml-launch/job/Dockerfile b/devops/dockerfile/fedml-launch/job/Dockerfile new file mode 100644 index 0000000000..7a9717c8aa --- /dev/null +++ b/devops/dockerfile/fedml-launch/job/Dockerfile @@ -0,0 +1,27 @@ +# Image Name: fedml/fedml-launch-job:cu11.6 +ARG BASE_IMAGE=fedml/fedml:latest-torch1.13.1-cuda11.6-cudnn8-devel +FROM ${BASE_IMAGE} + +ARG HOME_DIR=/home/fedml +## Only Modify if you want to use a different version of FedML +RUN pip3 install -U fedml +## Only Uncomment if you want to use a local version of FedML +# RUN mkdir -p ${HOME_DIR}/fedml-pip +# COPY ./python ${HOME_DIR}/fedml-pip +# WORKDIR ${HOME_DIR}/fedml-pip +# RUN pip3 install ./ + +# 2. MOUNT User's Local Folder (If any) +ENV DATA_FOLDER="" +VOLUME [ DATA_FOLDER ] + +# 3. MOUNT Fedml Home Folder +VOLUME ["/home/fedml/launch"] + +# 4. Enter the bootstrap and job entrypoints +WORKDIR ${HOME_DIR}/launch +ENV BOOTSTRAP_SCRIPT="fedml_bootstrap_generated.sh" +ENV MAIN_ENTRY="fedml_job_entry_pack.sh" + +# Run Bootstrap Script and Main Entry File +CMD /bin/bash ${BOOTSTRAP_SCRIPT}; /bin/bash ${MAIN_ENTRY} diff --git a/python/fedml/computing/scheduler/comm_utils/constants.py b/python/fedml/computing/scheduler/comm_utils/constants.py index b1294181bb..d397d8f767 100644 --- a/python/fedml/computing/scheduler/comm_utils/constants.py +++ b/python/fedml/computing/scheduler/comm_utils/constants.py @@ -103,7 +103,7 @@ class SchedulerConstants: RUN_PROCESS_TYPE_BOOTSTRAP_PROCESS = "bootstrap-process" FEDML_DEFAULT_LAUNCH_CONTAINER_PREFIX = "fedml_default_launch_container" - FEDML_DEFAULT_LAUNCH_IMAGE = "fedml/fedml-default-launch:cu12.1-u22.04" + FEDML_DEFAULT_LAUNCH_IMAGE = "fedml/fedml-launch-job:cu11.6" FEDML_DEFAULT_LOG_DIR = ".fedml/fedml-client/fedml/logs" FEDML_DEFAULT_DATA_DIR = ".fedml/fedml-client/fedml/data" diff --git a/python/fedml/computing/scheduler/comm_utils/job_utils.py b/python/fedml/computing/scheduler/comm_utils/job_utils.py index 384cbacd1d..48c5fbc159 100644 --- a/python/fedml/computing/scheduler/comm_utils/job_utils.py +++ b/python/fedml/computing/scheduler/comm_utils/job_utils.py @@ -13,18 +13,51 @@ from fedml.computing.scheduler.slave.client_constants import ClientConstants from fedml.computing.scheduler.comm_utils.sys_utils import get_python_program from fedml.computing.scheduler.scheduler_core.compute_cache_manager import ComputeCacheManager +from ..scheduler_entry.constants import Constants from dataclasses import dataclass, field, fields from fedml.computing.scheduler.slave.client_data_interface import FedMLClientDataInterface from fedml.core.common.singleton import Singleton from fedml.computing.scheduler.comm_utils.container_utils import ContainerUtils -from typing import List +from typing import List, Dict, Any, Optional import threading import json run_docker_without_gpu = False +@dataclass +class JobArgs: + request_json: Dict[str, Any] + conf_file_object: Any + fedml_config_object: field(default_factory=dict) + client_rank: Optional[int] = None + + def __post_init__(self): + self.run_config = self.request_json.get("run_config", {}) + self.run_params = self.run_config.get("parameters", {}) + self.client_rank = self.request_json.get("client_rank", 1) if self.client_rank is None else self.client_rank + self.job_yaml = self.run_params.get("job_yaml", {}) + self.job_yaml_default_none = self.run_params.get("job_yaml", None) + self.job_api_key = self.run_params.get("job_api_key", None) + self.job_api_key = self.job_yaml.get("fedml_run_dynamic_params", None) if self.job_api_key is None else self.job_api_key + self.assigned_gpu_ids = self.run_params.get("gpu_ids", None) + self.job_type = self.job_yaml.get("job_type", None) + # TODO: Can we remove task_type? + self.job_type = self.job_yaml.get("task_type", Constants.JOB_TASK_TYPE_TRAIN) if self.job_type is None else self.job_type + self.containerize = self.fedml_config_object.get("containerize", None) + self.image_pull_policy = self.fedml_config_object.get("image_pull_policy", Constants.IMAGE_PULL_POLICY_ALWAYS) + self.entry_args_dict = self.conf_file_object.get("fedml_entry_args", {}) + self.entry_args = self.entry_args_dict.get("entry_args", None) + self.scheduler_match_info = self.request_json.get("scheduler_match_info", {}) + self.env_args = self.fedml_config_object.get("environment_args", None) + self.docker_args = JobRunnerUtils.create_instance_from_dict(DockerArgs, + self.fedml_config_object.get("docker", {})) + self.executable_interpreter = ClientConstants.CLIENT_SHELL_PS \ + if platform.system() == ClientConstants.PLATFORM_WINDOWS else ClientConstants.CLIENT_SHELL_BASH + self.framework_type = self.job_yaml.get("framework_type", None) + + @dataclass class DockerArgs: image: str = SchedulerConstants.FEDML_DEFAULT_LAUNCH_IMAGE @@ -33,6 +66,9 @@ class DockerArgs: registry: str = "" ports: List[int] = field(default_factory=lambda: [2345]) + def __post_init__(self): + self.client = JobRunnerUtils.get_docker_client(self) + class JobRunnerUtils(Singleton): STATIC_RUN_LOCK_KEY_SUFFIX = "STATIC" @@ -393,23 +429,44 @@ def create_instance_from_dict(data_class, input_dict: {}): return instance @staticmethod - def generate_bootstrap_commands(bootstrap_script_path, bootstrap_script_dir, bootstrap_script_file): - if os.path.exists(bootstrap_script_path): - bootstrap_stat = os.stat(bootstrap_script_path) - if platform.system() == 'Windows': - os.chmod(bootstrap_script_path, - bootstrap_stat.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) - bootstrap_scripts = "{}".format(bootstrap_script_path) - else: - os.chmod(bootstrap_script_path, - bootstrap_stat.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) - bootstrap_scripts = "cd {}; ./{}".format( - bootstrap_script_dir, os.path.basename(bootstrap_script_file)) - - bootstrap_scripts = str(bootstrap_scripts).replace('\\', os.sep).replace('/', os.sep) - shell_cmd_list = list() - shell_cmd_list.append(bootstrap_scripts) - return shell_cmd_list + def generate_bootstrap_commands(env_args, unzip_package_path) -> (List[str], str): + bootstrap_cmd_list = list() + bootstrap_script_path, bootstrap_script_dir, bootstrap_script_file = [None] * 3 + + if env_args is not None: + bootstrap_script_file = env_args.get("bootstrap", None) + if bootstrap_script_file is not None: + bootstrap_script_file = str(bootstrap_script_file).replace('\\', os.sep).replace('/', os.sep) + if platform.system() == 'Windows': + bootstrap_script_file = bootstrap_script_file.rstrip('.sh') + '.bat' + if bootstrap_script_file is not None: + bootstrap_script_dir = os.path.join(unzip_package_path, "fedml", + os.path.dirname(bootstrap_script_file)) + bootstrap_script_path = os.path.join( + bootstrap_script_dir, bootstrap_script_dir, os.path.basename(bootstrap_script_file) + ) + + if bootstrap_script_path: + logging.info("Bootstrap commands are being generated...") + if os.path.exists(bootstrap_script_path): + bootstrap_stat = os.stat(bootstrap_script_path) + if platform.system() == 'Windows': + os.chmod(bootstrap_script_path, + bootstrap_stat.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + bootstrap_scripts = "{}".format(bootstrap_script_path) + else: + os.chmod(bootstrap_script_path, + bootstrap_stat.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + bootstrap_scripts = "cd {}; ./{}".format( + bootstrap_script_dir, os.path.basename(bootstrap_script_file)) + + bootstrap_scripts = str(bootstrap_scripts).replace('\\', os.sep).replace('/', os.sep) + bootstrap_cmd_list.append(bootstrap_scripts) + if len(bootstrap_cmd_list): + logging.info(f"Generated following Bootstrap commands: {bootstrap_cmd_list}") + else: + logging.info("No Bootstrap commands generated.") + return bootstrap_cmd_list, bootstrap_script_file @staticmethod def generate_job_execute_commands(run_id, edge_id, version, diff --git a/python/fedml/computing/scheduler/slave/client_runner.py b/python/fedml/computing/scheduler/slave/client_runner.py index 79b5697728..f188e707f3 100755 --- a/python/fedml/computing/scheduler/slave/client_runner.py +++ b/python/fedml/computing/scheduler/slave/client_runner.py @@ -2,6 +2,7 @@ import logging import multiprocessing import sys +from datetime import datetime from multiprocessing import Process import os @@ -17,12 +18,15 @@ import zipfile from urllib.parse import urljoin, urlparse +import docker.types import requests +from fedml.computing.scheduler.comm_utils import job_utils import fedml from ..comm_utils.constants import SchedulerConstants +from ..comm_utils.container_utils import ContainerUtils from ..comm_utils.job_cleanup import JobCleanup -from ..comm_utils.job_utils import JobRunnerUtils, DockerArgs +from ..comm_utils.job_utils import JobRunnerUtils, DockerArgs, JobArgs from ..comm_utils.run_process_utils import RunProcessUtils from ..scheduler_entry.constants import Constants from ....core.mlops.mlops_device_perfs import MLOpsDevicePerfStats @@ -139,7 +143,7 @@ def __repr__(self): def copy_runner(self): copy_runner = FedMLClientRunner(self.args) - copy_runner.disable_client_login = self.disable_client_login + copy_runner.disable_client_login = self.disable_client_login copy_runner.model_device_server = self.model_device_server copy_runner.model_device_client_list = self.model_device_client_list copy_runner.run_process_event = self.run_process_event @@ -161,7 +165,7 @@ def copy_runner(self): copy_runner.unique_device_id = self.unique_device_id copy_runner.args = self.args copy_runner.request_json = self.request_json - copy_runner.version =self.version + copy_runner.version = self.version copy_runner.device_id = self.device_id copy_runner.cur_dir = self.cur_dir copy_runner.cur_dir = self.cur_dir @@ -537,7 +541,7 @@ def run_impl(self): ##### if not os.path.exists(unzip_package_path): - logging.info("failed to unzip file.") + logging.error("Failed to unzip file.") self.check_runner_stop_event() # Send failed msg when exceptions. self.cleanup_run_when_starting_failed(status=ClientConstants.MSG_MLOPS_CLIENT_STATUS_EXCEPTION) @@ -555,11 +559,21 @@ def run_impl(self): logging.info(" ") logging.info("====Your Run Logs Begin===") - process, is_launch_task, error_list = self.execute_job_task(unzip_package_path=unzip_package_path, - entry_file_full_path=entry_file_full_path, - conf_file_full_path=conf_file_full_path, - dynamic_args_config=dynamic_args_config, - fedml_config_object=self.fedml_config_object) + job_args = JobArgs(request_json=self.request_json, + conf_file_object=load_yaml_config(conf_file_full_path), + fedml_config_object=self.fedml_config_object) + + if job_args.job_type == Constants.JOB_TASK_TYPE_TRAIN: + process, is_launch_task, error_list = self.execute_train_job_task(job_args=job_args, + unzip_package_path=unzip_package_path, + entry_file_full_path=entry_file_full_path) + + else: + process, is_launch_task, error_list = self.execute_job_task(unzip_package_path=unzip_package_path, + entry_file_full_path=entry_file_full_path, + conf_file_full_path=conf_file_full_path, + dynamic_args_config=dynamic_args_config, + fedml_config_object=self.fedml_config_object) logging.info("====Your Run Logs End===") logging.info(" ") @@ -617,75 +631,134 @@ def run_impl(self): self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED, server_id=self.server_id, run_id=run_id) + def execute_train_job_task(self, job_args: JobArgs, unzip_package_path, entry_file_full_path): + self.check_runner_stop_event() + + self.mlops_metrics.report_client_id_status( + self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_RUNNING, run_id=self.run_id) + + bootstrap_cmd_list, bootstrap_script_file = JobRunnerUtils.generate_bootstrap_commands(job_args.env_args, + unzip_package_path) + + container = self.create_docker_container(docker_args=job_args.docker_args, + unzip_package_path=unzip_package_path, + image_pull_policy=job_args.image_pull_policy) + + try: + job_executing_commands = JobRunnerUtils.generate_launch_docker_command(docker_args=job_args.docker_args, + run_id=self.run_id, + edge_id=self.edge_id, + unzip_package_path=unzip_package_path, + executable_interpreter=job_args.executable_interpreter, + entry_file_full_path=entry_file_full_path, + bootstrap_cmd_list=bootstrap_cmd_list, + cuda_visible_gpu_ids_str=self.cuda_visible_gpu_ids_str, + image_pull_policy=job_args.image_pull_policy) + + except Exception as e: + logging.error(f"Error occurred while generating containerized launch commands. " + f"Exception: {e}, Traceback: {traceback.format_exc()}") + return None, None, None + + if not job_executing_commands: + raise Exception("Failed to generate docker execution command") + + # Run the job executing commands + logging.info(f"Run the client job with job id {self.run_id}, device id {self.edge_id}.") + process, error_list = ClientConstants.execute_commands_with_live_logs( + job_executing_commands, callback=self.start_job_perf, error_processor=self.job_error_processor, + should_write_log_file=False if job_args.job_type == Constants.JOB_TASK_TYPE_FEDERATE else True) + is_launch_task = False if job_args.job_type == Constants.JOB_TASK_TYPE_FEDERATE else True + + return process, is_launch_task, error_list + + def create_docker_container(self, docker_args: DockerArgs, + unzip_package_path: str, + image_pull_policy: str = None): + + logging.info(f"Start pulling the launch job image {docker_args.image}... " + f"with policy {image_pull_policy}") + try: + ContainerUtils.get_instance().pull_image_with_policy(image_name=docker_args.image, + client=docker_args.client, + image_pull_policy=image_pull_policy) + except Exception as e: + raise Exception(f"Failed to pull the launch job image {docker_args.image} with Exception {e}") + + container_name = JobRunnerUtils.get_run_container_name(self.run_id) + JobRunnerUtils.remove_run_container_if_exists(container_name, docker_args.client) + device_requests = [] + volumes = [] + binds = {} + + # Source Code Mounting + source_code_dir = os.path.join(unzip_package_path, "fedml") + destination_launch_dir = "/home/fedml/launch" + volumes.append(source_code_dir) + binds[source_code_dir] = { + "bind": destination_launch_dir, + "mode": "rw" + } + + if self.cuda_visible_gpu_ids_str is not None: + gpu_id_list = self.cuda_visible_gpu_ids_str.split(",") + device_requests.append( + docker.types.DeviceRequest(device_ids=gpu_id_list, capabilities=[["gpu"]])) + logging.info(f"device_requests: {device_requests}") + + try: + host_config = docker_args.client.api.create_host_config( + binds=binds, + device_requests=device_requests, + ) + container = docker_args.client.api.create_container( + image=docker_args.image, + name=container_name, + tty=True, + host_config=host_config, + ports=docker_args.ports, + volumes=volumes, + detach=True # Run container in detached mode + ) + docker_args.client.api.start(container=container.get("Id")) + except Exception as e: + logging.error(f"Failed to create docker container with Exception {e}. Traceback: {traceback.format_exc()}") + raise Exception(f"Failed to create docker container with Exception {e}") + return container + def execute_job_task(self, unzip_package_path, entry_file_full_path, conf_file_full_path, dynamic_args_config, fedml_config_object): run_config = self.request_json["run_config"] run_params = run_config.get("parameters", {}) - client_rank = self.request_json.get("client_rank", 1) job_yaml = run_params.get("job_yaml", {}) job_yaml_default_none = run_params.get("job_yaml", None) - job_api_key = job_yaml.get("run_api_key", None) - job_api_key = job_yaml.get("fedml_run_dynamic_params", None) if job_api_key is None else job_api_key - assigned_gpu_ids = run_params.get("gpu_ids", None) job_type = job_yaml.get("job_type", None) - containerize = fedml_config_object.get("containerize", None) - image_pull_policy = fedml_config_object.get("image_pull_policy", Constants.IMAGE_PULL_POLICY_ALWAYS) # TODO: Can we remove task_type? job_type = job_yaml.get("task_type", Constants.JOB_TASK_TYPE_TRAIN) if job_type is None else job_type - conf_file_object = load_yaml_config(conf_file_full_path) - entry_args_dict = conf_file_object.get("fedml_entry_args", {}) - entry_args = entry_args_dict.get("arg_items", None) - scheduler_match_info = self.request_json.get("scheduler_match_info", {}) - if job_type == Constants.JOB_TASK_TYPE_TRAIN: - containerize = True if containerize is None else containerize # Bootstrap Info - bootstrap_script_path, bootstrap_script_dir, bootstrap_script_file = [None] * 3 env_args = fedml_config_object.get("environment_args", None) - - if env_args is not None: - bootstrap_script_file = env_args.get("bootstrap", None) - if bootstrap_script_file is not None: - bootstrap_script_file = str(bootstrap_script_file).replace('\\', os.sep).replace('/', os.sep) - if platform.system() == 'Windows': - bootstrap_script_file = bootstrap_script_file.rstrip('.sh') + '.bat' - if bootstrap_script_file is not None: - bootstrap_script_dir = os.path.join(unzip_package_path, "fedml", - os.path.dirname(bootstrap_script_file)) - bootstrap_script_path = os.path.join( - bootstrap_script_dir, bootstrap_script_dir, os.path.basename(bootstrap_script_file) - ) - - bootstrap_cmd_list = list() - if bootstrap_script_path: - logging.info("Bootstrap commands are being generated...") - bootstrap_cmd_list = JobRunnerUtils.generate_bootstrap_commands(bootstrap_script_path=bootstrap_script_path, - bootstrap_script_dir=bootstrap_script_dir, - bootstrap_script_file=bootstrap_script_file) - logging.info(f"Generated following Bootstrap commands: {bootstrap_cmd_list}") - - if not containerize: - if len(bootstrap_cmd_list) and not (job_type == Constants.JOB_TASK_TYPE_DEPLOY or - job_type == Constants.JOB_TASK_TYPE_SERVE): - bootstrapping_successful = self.run_bootstrap_script(bootstrap_cmd_list=bootstrap_cmd_list, - bootstrap_script_file=bootstrap_script_file) - - if not bootstrapping_successful: - logging.info("failed to update local fedml config.") - self.check_runner_stop_event() - # Send failed msg when exceptions. - self.cleanup_run_when_starting_failed(status=ClientConstants.MSG_MLOPS_CLIENT_STATUS_EXCEPTION) - raise Exception(f"Failed to execute following bootstrap commands: {bootstrap_cmd_list}") - - logging.info("cleanup the previous learning process and bootstrap process...") - ClientConstants.cleanup_learning_process(self.request_json["runId"]) - ClientConstants.cleanup_bootstrap_process(self.request_json["runId"]) - - executable_interpreter = ClientConstants.CLIENT_SHELL_PS \ - if platform.system() == ClientConstants.PLATFORM_WINDOWS else ClientConstants.CLIENT_SHELL_BASH - + bootstrap_cmd_list, bootstrap_script_file = JobRunnerUtils.generate_bootstrap_commands(env_args, + unzip_package_path) + + if len(bootstrap_cmd_list) and not (job_type == Constants.JOB_TASK_TYPE_DEPLOY or + job_type == Constants.JOB_TASK_TYPE_SERVE): + bootstrapping_successful = self.run_bootstrap_script(bootstrap_cmd_list=bootstrap_cmd_list, + bootstrap_script_file=bootstrap_script_file) + + if not bootstrapping_successful: + logging.info("failed to update local fedml config.") + self.check_runner_stop_event() + # Send failed msg when exceptions. + self.cleanup_run_when_starting_failed(status=ClientConstants.MSG_MLOPS_CLIENT_STATUS_EXCEPTION) + raise Exception(f"Failed to execute following bootstrap commands: {bootstrap_cmd_list}") + + logging.info("cleanup the previous learning process and bootstrap process...") + ClientConstants.cleanup_learning_process(self.request_json["runId"]) + ClientConstants.cleanup_bootstrap_process(self.request_json["runId"]) + + # Generate the job executing commands for previous federated learning (Compatibility) if job_yaml_default_none is None: - # Generate the job executing commands for previous federated learning (Compatibility) python_program = get_python_program() logging.info("Run the client: {} {} --cf {} --rank {} --role client".format( python_program, entry_file_full_path, conf_file_full_path, str(dynamic_args_config.get("rank", 1)))) @@ -698,49 +771,7 @@ def execute_job_task(self, unzip_package_path, entry_file_full_path, conf_file_f process, error_list = ClientConstants.execute_commands_with_live_logs( shell_cmd_list, callback=self.callback_start_fl_job, should_write_log_file=False) is_launch_task = False - else: - self.check_runner_stop_event() - - self.mlops_metrics.report_client_id_status( - self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_RUNNING, run_id=self.run_id) - - # Generate the job executing commands - job_executing_commands = JobRunnerUtils.generate_job_execute_commands( - self.run_id, self.edge_id, self.version, - self.package_type, executable_interpreter, entry_file_full_path, - conf_file_object, entry_args, assigned_gpu_ids, - job_api_key, client_rank, scheduler_match_info=scheduler_match_info, - cuda_visible_gpu_ids_str=self.cuda_visible_gpu_ids_str) - - if containerize is not None and containerize is True: - docker_args = fedml_config_object.get("docker", {}) - docker_args = JobRunnerUtils.create_instance_from_dict(DockerArgs, docker_args) - try: - job_executing_commands = JobRunnerUtils.generate_launch_docker_command(docker_args=docker_args, - run_id=self.run_id, - edge_id=self.edge_id, - unzip_package_path=unzip_package_path, - executable_interpreter=executable_interpreter, - entry_file_full_path=entry_file_full_path, - bootstrap_cmd_list=bootstrap_cmd_list, - cuda_visible_gpu_ids_str=self.cuda_visible_gpu_ids_str, - image_pull_policy=image_pull_policy) - except Exception as e: - logging.error(f"Error occurred while generating containerized launch commands. " - f"Exception: {e}, Traceback: {traceback.format_exc()}") - return None, None, None - - if not job_executing_commands: - raise Exception("Failed to generate docker execution command") - - # Run the job executing commands - logging.info(f"Run the client job with job id {self.run_id}, device id {self.edge_id}.") - process, error_list = ClientConstants.execute_commands_with_live_logs( - job_executing_commands, callback=self.start_job_perf, error_processor=self.job_error_processor, - should_write_log_file=False if job_type == Constants.JOB_TASK_TYPE_FEDERATE else True) - is_launch_task = False if job_type == Constants.JOB_TASK_TYPE_FEDERATE else True - - return process, is_launch_task, error_list + return process, is_launch_task, error_list def callback_start_fl_job(self, job_pid): ClientConstants.save_learning_process(self.run_id, job_pid) @@ -1001,6 +1032,7 @@ def callback_start_train(self, topic, payload): self.run_process_event_map[run_id_str], self.run_process_completed_event_map[run_id_str], self.message_center.get_message_queue())) self.run_process_map[run_id_str].start() + # FIXME (@alaydshah): Seems this info is not really being persisted. ClientConstants.save_run_process(run_id, self.run_process_map[run_id_str].pid) def callback_stop_train(self, topic, payload): @@ -1238,7 +1270,7 @@ def response_device_info_to_mlops(self, topic, payload): if context is not None: response_payload["context"] = context self.message_center.send_message(response_topic, json.dumps(response_payload), run_id=run_id) - + def callback_report_device_info(self, topic, payload): payload_json = json.loads(payload) server_id = payload_json.get("server_id", 0) @@ -1555,7 +1587,6 @@ def on_agent_mqtt_connected(self, mqtt_client_object): self.add_message_listener(topic_stop_train, self.callback_stop_train) self.mqtt_mgr.add_message_listener(topic_stop_train, self.listener_message_dispatch_center) - # Setup MQTT message listener for client status switching topic_client_status = "fl_client/flclient_agent_" + str(self.edge_id) + "/status" self.add_message_listener(topic_client_status, self.callback_runner_id_status) @@ -1578,20 +1609,25 @@ def on_agent_mqtt_connected(self, mqtt_client_object): topic_request_edge_device_info_from_mlops = f"deploy/mlops/slave_agent/request_device_info/{self.edge_id}" self.add_message_listener(topic_request_edge_device_info_from_mlops, self.response_device_info_to_mlops) - self.mqtt_mgr.add_message_listener(topic_request_edge_device_info_from_mlops, self.listener_message_dispatch_center) + self.mqtt_mgr.add_message_listener(topic_request_edge_device_info_from_mlops, + self.listener_message_dispatch_center) topic_request_deploy_master_device_info_from_mlops = None if self.model_device_server_id is not None: topic_request_deploy_master_device_info_from_mlops = f"deploy/mlops/master_agent/request_device_info/{self.model_device_server_id}" - self.add_message_listener(topic_request_deploy_master_device_info_from_mlops, self.response_device_info_to_mlops) - self.mqtt_mgr.add_message_listener(topic_request_deploy_master_device_info_from_mlops, self.listener_message_dispatch_center) + self.add_message_listener(topic_request_deploy_master_device_info_from_mlops, + self.response_device_info_to_mlops) + self.mqtt_mgr.add_message_listener(topic_request_deploy_master_device_info_from_mlops, + self.listener_message_dispatch_center) topic_request_deploy_slave_device_info_from_mlops = None if self.model_device_client_edge_id_list is not None and len(self.model_device_client_edge_id_list) > 0: topic_request_deploy_slave_device_info_from_mlops = f"deploy/mlops/slave_agent/request_device_info/{self.model_device_client_edge_id_list[0]}" - self.add_message_listener(topic_request_deploy_slave_device_info_from_mlops, self.response_device_info_to_mlops) - self.mqtt_mgr.add_message_listener(topic_request_deploy_slave_device_info_from_mlops, self.listener_message_dispatch_center) - + self.add_message_listener(topic_request_deploy_slave_device_info_from_mlops, + self.response_device_info_to_mlops) + self.mqtt_mgr.add_message_listener(topic_request_deploy_slave_device_info_from_mlops, + self.listener_message_dispatch_center) + # Setup MQTT message listener to logout from MLOps. topic_client_logout = "mlops/client/logout/" + str(self.edge_id) self.add_message_listener(topic_client_logout, self.callback_client_logout) diff --git a/python/fedml/core/common/singleton.py b/python/fedml/core/common/singleton.py index 69ee009155..4c206e9ba5 100644 --- a/python/fedml/core/common/singleton.py +++ b/python/fedml/core/common/singleton.py @@ -1,6 +1,16 @@ +import threading + + class Singleton(object): + _instance = None + # For thread safety + _lock = threading.Lock() + def __new__(cls, *args, **kw): - if not hasattr(cls, "_instance"): - orig = super(Singleton, cls) - cls._instance = orig.__new__(cls, *args, **kw) + if cls._instance is None: + with cls._lock: + # Another thread could have created the instance before we acquired the lock. So check that the + # instance is still nonexistent. + if cls._instance is None: + cls._instance = super().__new__(cls) return cls._instance