From db0beadbf25552110ae095761e789e9e3c957175 Mon Sep 17 00:00:00 2001 From: Yuhong Wen Date: Wed, 27 Nov 2024 13:13:17 -0500 Subject: [PATCH] Fix custom pythonpath missing (#3069) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fixed the custom pythonpath missing for the simulator client run execution. * added comment for the add_custom_dir_to_path method. * codestyle. * need to remove the server custom_folder when running the client in simulator. --------- Co-authored-by: Yuan-Ting Hsieh (謝沅廷) --- .../fed/app/simulator/simulator_runner.py | 23 +++++++++++++------ nvflare/private/fed/utils/fed_utils.py | 10 ++++---- .../app/simulator/simulator_runner_test.py | 6 ++--- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/nvflare/private/fed/app/simulator/simulator_runner.py b/nvflare/private/fed/app/simulator/simulator_runner.py index f88d9e5772..2d5e3fbfdf 100644 --- a/nvflare/private/fed/app/simulator/simulator_runner.py +++ b/nvflare/private/fed/app/simulator/simulator_runner.py @@ -37,6 +37,7 @@ RunnerTask, RunProcessKey, SystemConfigs, + SystemVarName, WorkspaceConstants, ) from nvflare.apis.job_def import ALL_SITES, JobMetaKey @@ -111,6 +112,7 @@ def __init__( self.client_config = None self.deploy_args = None self.build_ctx = None + self.server_custom_folder = None self.clients_created = 0 @@ -480,7 +482,7 @@ def simulator_run_main(self): executor = ThreadPoolExecutor(max_workers=len(gpus)) for index in range(len(gpus)): clients = split_clients[index] - executor.submit(lambda p: self.client_run(*p), [clients, gpus[index]]) + executor.submit(lambda p: self.client_run(*p), [self.server_custom_folder, clients, gpus[index]]) executor.shutdown() # Abort the server after all clients finished run @@ -497,8 +499,10 @@ def simulator_run_main(self): run_status = 1 return run_status - def client_run(self, clients, gpu): - client_runner = SimulatorClientRunner(self.args, clients, self.client_config, self.deploy_args, self.build_ctx) + def client_run(self, server_custom_folder, clients, gpu): + client_runner = SimulatorClientRunner( + server_custom_folder, self.args, clients, self.client_config, self.deploy_args, self.build_ctx + ) client_runner.run(gpu) def start_server_app(self, args): @@ -507,9 +511,9 @@ def start_server_app(self, args): os.chdir(args.workspace) args.server_config = os.path.join("config", JobConstants.SERVER_JOB_CONFIG) - app_custom_folder = os.path.join(app_server_root, "custom") - if os.path.isdir(app_custom_folder) and app_custom_folder not in sys.path: - sys.path.append(app_custom_folder) + self.server_custom_folder = os.path.join(app_server_root, "custom") + if os.path.isdir(self.server_custom_folder) and self.server_custom_folder not in sys.path: + sys.path.append(self.server_custom_folder) startup = os.path.join(args.workspace, WorkspaceConstants.STARTUP_FOLDER_NAME) os.makedirs(startup, exist_ok=True) @@ -553,8 +557,9 @@ def dump_stats(self, workspace: Workspace): class SimulatorClientRunner(FLComponent): - def __init__(self, args, clients: [], client_config, deploy_args, build_ctx): + def __init__(self, server_custom_folder, args, clients: [], client_config, deploy_args, build_ctx): super().__init__() + self.server_custom_folder = server_custom_folder self.args = args self.federated_clients = clients self.run_client_index = -1 @@ -702,6 +707,10 @@ def do_one_task(self, client, num_of_threads, gpu, lock, timeout=60.0, task_name command += " --gpu " + str(gpu) new_env = os.environ.copy() add_custom_dir_to_path(app_custom_folder, new_env) + if self.server_custom_folder: + python_paths = new_env[SystemVarName.PYTHONPATH].split(os.pathsep) + python_paths.remove(self.server_custom_folder) + new_env[SystemVarName.PYTHONPATH] = os.pathsep.join(python_paths) _ = subprocess.Popen(shlex.split(command, True), preexec_fn=os.setsid, env=new_env) diff --git a/nvflare/private/fed/utils/fed_utils.py b/nvflare/private/fed/utils/fed_utils.py index 4efbe1bf17..970598ed7c 100644 --- a/nvflare/private/fed/utils/fed_utils.py +++ b/nvflare/private/fed/utils/fed_utils.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import copy import importlib import json import logging @@ -475,11 +476,10 @@ def get_simulator_app_root(simulator_root, site_name): def add_custom_dir_to_path(app_custom_folder, new_env): - path = new_env.get(SystemVarName.PYTHONPATH, "") - if path: - new_env[SystemVarName.PYTHONPATH] = path + os.pathsep + app_custom_folder - else: - new_env[SystemVarName.PYTHONPATH] = app_custom_folder + """Util method to add app_custom_folder into the sys.path and carry into the child process.""" + sys_path = copy.copy(sys.path) + sys_path.append(app_custom_folder) + new_env[SystemVarName.PYTHONPATH] = os.pathsep.join(sys_path) def extract_participants(participants_list): diff --git a/tests/unit_test/private/fed/app/simulator/simulator_runner_test.py b/tests/unit_test/private/fed/app/simulator/simulator_runner_test.py index 5b27740e03..fda1f95310 100644 --- a/tests/unit_test/private/fed/app/simulator/simulator_runner_test.py +++ b/tests/unit_test/private/fed/app/simulator/simulator_runner_test.py @@ -161,7 +161,7 @@ def test_start_server_app(self, mock_deploy, mock_admin, mock_register, mock_cel def test_get_new_sys_path_with_empty(self): args = Namespace(workspace="/tmp") args.set = [] - runner = SimulatorClientRunner(args, [], None, None, None) + runner = SimulatorClientRunner(None, args, [], None, None, None) old_sys_path = copy.deepcopy(sys.path) sys.path.insert(0, "") sys.path.append("/temp/test") @@ -172,7 +172,7 @@ def test_get_new_sys_path_with_empty(self): def test_get_new_sys_path_with_multiple_empty(self): args = Namespace(workspace="/tmp") args.set = [] - runner = SimulatorClientRunner(args, [], None, None, None) + runner = SimulatorClientRunner(None, args, [], None, None, None) old_sys_path = copy.deepcopy(sys.path) sys.path.insert(0, "") if len(sys.path) > 2: @@ -185,7 +185,7 @@ def test_get_new_sys_path_with_multiple_empty(self): def test_get_new_sys_path(self): args = Namespace(workspace="/tmp") args.set = [] - runner = SimulatorClientRunner(args, [], None, None, None) + runner = SimulatorClientRunner(None, args, [], None, None, None) old_sys_path = copy.deepcopy(sys.path) sys.path.append("/temp/test") new_sys_path = runner._get_new_sys_path()