Skip to content

Commit

Permalink
Simulator workspace re-design (#2492)
Browse files Browse the repository at this point in the history
* Redesign simulator workspace structure.

* working, needs clean.

* Changed the simulator workspacce structure to be consistent with POC.

* Moved the logfile init to start_server_app().

* optimzed.

* adjust the stats pool location.

* Addressed the PR views.

---------

Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com>
Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <yuantingh@nvidia.com>
  • Loading branch information
3 people authored Apr 19, 2024
1 parent e1fc2eb commit bf978b7
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 41 deletions.
95 changes: 57 additions & 38 deletions nvflare/private/fed/app/simulator/simulator_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import json
import logging.config
import os
Expand Down Expand Up @@ -58,7 +58,7 @@
from nvflare.private.fed.simulator.simulator_app_runner import SimulatorServerAppRunner
from nvflare.private.fed.simulator.simulator_audit import SimulatorAuditor
from nvflare.private.fed.simulator.simulator_const import SimulatorConstants
from nvflare.private.fed.utils.fed_utils import add_logfile_handler, fobs_initialize, split_gpus
from nvflare.private.fed.utils.fed_utils import add_logfile_handler, fobs_initialize, get_simulator_app_root, split_gpus
from nvflare.security.logging import secure_format_exception, secure_log_traceback
from nvflare.security.security import EmptyAuthorizer

Expand Down Expand Up @@ -134,9 +134,6 @@ def setup(self):
if not os.path.isfile(log_config_file_path):
log_config_file_path = os.path.join(os.path.dirname(__file__), WorkspaceConstants.LOGGING_CONFIG)
logging.config.fileConfig(fname=log_config_file_path, disable_existing_loggers=False)
local_dir = os.path.join(self.args.workspace, "local")
os.makedirs(local_dir, exist_ok=True)
shutil.copyfile(log_config_file_path, os.path.join(local_dir, WorkspaceConstants.LOGGING_CONFIG))

self.args.log_config = None
self.args.config_folder = "config"
Expand All @@ -153,16 +150,10 @@ def setup(self):
AuthorizationService.initialize(EmptyAuthorizer())
AuditService.the_auditor = SimulatorAuditor()

self.simulator_root = self.args.workspace
self._cleanup_workspace()
init_security_content_service(self.args.workspace)

self.simulator_root = os.path.join(self.args.workspace, SimulatorConstants.JOB_NAME)
if os.path.exists(self.simulator_root):
shutil.rmtree(self.simulator_root)

os.makedirs(self.simulator_root)
log_file = os.path.join(self.simulator_root, WorkspaceConstants.LOG_FILE_NAME)
add_logfile_handler(log_file)

try:
data_bytes, job_name, meta = self.validate_job_data()

Expand Down Expand Up @@ -237,7 +228,7 @@ def setup(self):
self.args.sp_scheme = parsed_url.scheme

self.logger.info("Deploy the Apps.")
self._deploy_apps(job_name, data_bytes, meta)
self._deploy_apps(job_name, data_bytes, meta, log_config_file_path)

return True

Expand All @@ -246,6 +237,25 @@ def setup(self):
secure_log_traceback()
return False

def _cleanup_workspace(self):
os.makedirs(self.simulator_root, exist_ok=True)
with tempfile.TemporaryDirectory() as temp_dir:
startup_dir = os.path.join(self.args.workspace, "startup")
temp_start_up = os.path.join(temp_dir, "startup")
if os.path.exists(startup_dir):
shutil.move(startup_dir, temp_start_up)
if os.path.exists(self.simulator_root):
shutil.rmtree(self.simulator_root)
if os.path.exists(temp_start_up):
shutil.move(temp_start_up, startup_dir)

def _setup_local_startup(self, log_config_file_path, workspace):
local_dir = os.path.join(workspace, "local")
startup = os.path.join(workspace, "startup")
os.makedirs(local_dir, exist_ok=True)
shutil.copyfile(log_config_file_path, os.path.join(local_dir, WorkspaceConstants.LOGGING_CONFIG))
shutil.copytree(os.path.join(self.simulator_root, "startup"), startup)

def validate_job_data(self):
# Validate the simulate job
job_name = split_path(self.args.job_folder)[1]
Expand Down Expand Up @@ -281,30 +291,27 @@ def _validate_client_names(self, meta, client_names):
if no_app_clients:
raise RuntimeError(f"The job does not have App to run for clients: {no_app_clients}")

def _deploy_apps(self, job_name, data_bytes, meta):
def _deploy_apps(self, job_name, data_bytes, meta, log_config_file_path):
with tempfile.TemporaryDirectory() as temp_dir:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
os.mkdir(temp_dir)
unzip_all_from_bytes(data_bytes, temp_dir)
temp_job_folder = os.path.join(temp_dir, job_name)

app_server_root = os.path.join(self.simulator_root, "app_server")
for app_name, participants in meta.get(JobMetaKey.DEPLOY_MAP).items():
if len(participants) == 1 and participants[0].upper() == ALL_SITES:
participants = ["server"]
participants.extend([client for client in self.client_names])

for p in participants:
if p == "server":
if p == "server" or p in self.client_names:
app_root = get_simulator_app_root(self.simulator_root, p)
self._setup_local_startup(log_config_file_path, os.path.join(self.simulator_root, p))
app = os.path.join(temp_job_folder, app_name)
shutil.copytree(app, app_server_root)
elif p in self.client_names:
app_client_root = os.path.join(self.simulator_root, "app_" + p)
app = os.path.join(temp_job_folder, app_name)
shutil.copytree(app, app_client_root)
shutil.copytree(app, app_root)

job_meta_file = os.path.join(self.simulator_root, WorkspaceConstants.JOB_META_FILE)
job_meta_file = os.path.join(self.simulator_root, "server", WorkspaceConstants.JOB_META_FILE)
with open(job_meta_file, "w") as f:
json.dump(meta, f, indent=4)

Expand Down Expand Up @@ -336,7 +343,7 @@ def create_client(self, client_name):

def _set_client_status(self):
for client in self.federated_clients:
app_client_root = os.path.join(self.simulator_root, "app_" + client.client_name)
app_client_root = get_simulator_app_root(self.simulator_root, client.client_name)
client.app_client_root = app_client_root
client.args = self.args
# self.create_client_runner(client)
Expand Down Expand Up @@ -404,7 +411,8 @@ def simulator_run_main(self):
}

self.logger.info("Deploy and start the Server App.")
server_thread = threading.Thread(target=self.start_server_app, args=[])
args = copy.deepcopy(self.args)
server_thread = threading.Thread(target=self.start_server_app, args=[args])
server_thread.start()

# wait for the server app is started
Expand Down Expand Up @@ -448,17 +456,23 @@ def client_run(self, clients, gpu):
client_runner = SimulatorClientRunner(self.args, clients, self.client_config, self.deploy_args, self.build_ctx)
client_runner.run(gpu)

def start_server_app(self):
app_server_root = os.path.join(self.simulator_root, "app_server")
self.args.server_config = os.path.join("config", JobConstants.SERVER_JOB_CONFIG)
def start_server_app(self, args):
app_server_root = os.path.join(self.simulator_root, "server", SimulatorConstants.JOB_NAME, "app_server")
args.workspace = app_server_root
os.chdir(args.workspace)

log_file = os.path.join(self.simulator_root, "server", WorkspaceConstants.LOG_FILE_NAME)
add_logfile_handler(log_file)

args.server_config = os.path.join("config", JobConstants.SERVER_JOB_CONFIG)
app_custom_folder = os.path.join(app_server_root, "custom")
sys.path.append(app_custom_folder)

startup = os.path.join(self.args.workspace, WorkspaceConstants.STARTUP_FOLDER_NAME)
startup = os.path.join(args.workspace, WorkspaceConstants.STARTUP_FOLDER_NAME)
os.makedirs(startup, exist_ok=True)
local = os.path.join(self.args.workspace, WorkspaceConstants.SITE_FOLDER_NAME)
local = os.path.join(args.workspace, WorkspaceConstants.SITE_FOLDER_NAME)
os.makedirs(local, exist_ok=True)
workspace = Workspace(root_dir=self.args.workspace, site_name="server")
workspace = Workspace(root_dir=args.workspace, site_name="server")

self.server.job_cell = self.server.create_job_cell(
SimulatorConstants.JOB_NAME,
Expand All @@ -471,7 +485,7 @@ def start_server_app(self):
snapshot = None
kv_list = [f"secure_train={self.server.secure_train}"]
server_app_runner.start_server_app(
workspace, self.args, app_server_root, self.args.job_id, snapshot, self.logger, kv_list=kv_list
workspace, args, app_server_root, args.job_id, snapshot, self.logger, kv_list=kv_list
)

# start = time.time()
Expand All @@ -489,8 +503,8 @@ def start_server_app(self):
def dump_stats(self, workspace: Workspace):
stats_dict = StatsPoolManager.to_dict()
json_object = json.dumps(stats_dict, indent=4)
os.makedirs(os.path.join(workspace.get_run_dir(SimulatorConstants.JOB_NAME), POOL_STATS_DIR))
file = os.path.join(workspace.get_run_dir(SimulatorConstants.JOB_NAME), POOL_STATS_DIR, SIMULATOR_POOL_STATS)
os.makedirs(os.path.join(workspace.get_root_dir(), POOL_STATS_DIR))
file = os.path.join(workspace.get_root_dir(), POOL_STATS_DIR, SIMULATOR_POOL_STATS)
with open(file, "w") as outfile:
outfile.write(json_object)

Expand All @@ -502,7 +516,7 @@ def __init__(self, args, clients: [], client_config, deploy_args, build_ctx):
self.federated_clients = clients
self.run_client_index = -1

self.simulator_root = os.path.join(self.args.workspace, SimulatorConstants.JOB_NAME)
self.simulator_root = self.args.workspace
self.client_config = client_config
self.deploy_args = deploy_args
self.build_ctx = build_ctx
Expand Down Expand Up @@ -576,13 +590,16 @@ def run_client_thread(self, num_of_threads, gpu, lock, rank, timeout=60):

def do_one_task(self, client, num_of_threads, gpu, lock, timeout=60.0, task_name=RunnerTask.TASK_EXEC):
open_port = get_open_ports(1)[0]
client_workspace = os.path.join(self.args.workspace, SimulatorConstants.JOB_NAME, "app_" + client.client_name)
client_workspace = os.path.join(self.args.workspace, client.client_name)
logging_config = os.path.join(
self.args.workspace, client.client_name, "local", WorkspaceConstants.LOGGING_CONFIG
)
command = (
sys.executable
+ " -m nvflare.private.fed.app.simulator.simulator_worker -o "
+ client_workspace
+ " --logging_config "
+ self.logging_config
+ logging_config
+ " --client "
+ client.client_name
+ " --token "
Expand Down Expand Up @@ -612,10 +629,12 @@ def do_one_task(self, client, num_of_threads, gpu, lock, timeout=60.0, task_name
conn = self._create_connection(open_port, timeout=timeout)

self.build_ctx["client_name"] = client.client_name
deploy_args = copy.deepcopy(self.deploy_args)
deploy_args.workspace = os.path.join(deploy_args.workspace, client.client_name)
data = {
# SimulatorConstants.CLIENT: client,
SimulatorConstants.CLIENT_CONFIG: self.client_config,
SimulatorConstants.DEPLOY_ARGS: self.deploy_args,
SimulatorConstants.DEPLOY_ARGS: deploy_args,
SimulatorConstants.BUILD_CTX: self.build_ctx,
}
conn.send(data)
Expand Down
11 changes: 8 additions & 3 deletions nvflare/private/fed/app/simulator/simulator_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from nvflare.private.fed.simulator.simulator_app_runner import SimulatorClientAppRunner
from nvflare.private.fed.simulator.simulator_audit import SimulatorAuditor
from nvflare.private.fed.simulator.simulator_const import SimulatorConstants
from nvflare.private.fed.utils.fed_utils import add_logfile_handler, fobs_initialize
from nvflare.private.fed.utils.fed_utils import add_logfile_handler, fobs_initialize, get_simulator_app_root
from nvflare.security.logging import secure_format_exception, secure_log_traceback
from nvflare.security.security import EmptyAuthorizer

Expand Down Expand Up @@ -146,7 +146,7 @@ def run(self, args, conn):

client = self._create_client(args, build_ctx, deploy_args)

app_root = os.path.join(args.workspace, SimulatorConstants.JOB_NAME, "app_" + client.client_name)
app_root = get_simulator_app_root(args.workspace, client.client_name)
app_custom_folder = os.path.join(app_root, "custom")
sys.path.append(app_custom_folder)

Expand Down Expand Up @@ -185,7 +185,7 @@ def _create_client(self, args, build_ctx, deploy_args):
return client

def _set_client_status(self, client, deploy_args, simulator_root):
app_client_root = os.path.join(simulator_root, "app_" + client.client_name)
app_client_root = get_simulator_app_root(simulator_root, client.client_name)
client.app_client_root = app_client_root
client.args = deploy_args
# self.create_client_runner(client)
Expand Down Expand Up @@ -238,6 +238,11 @@ def main(args):
app_custom_folder = os.path.join(args.workspace, "custom")
sys.path.append(app_custom_folder)
os.chdir(args.workspace)
startup = os.path.join(args.workspace, WorkspaceConstants.STARTUP_FOLDER_NAME)
os.makedirs(startup, exist_ok=True)
local = os.path.join(args.workspace, WorkspaceConstants.SITE_FOLDER_NAME)
os.makedirs(local, exist_ok=True)

fobs_initialize()
AuthorizationService.initialize(EmptyAuthorizer())
# AuditService.initialize(audit_file_name=WorkspaceConstants.AUDIT_LOG)
Expand Down
5 changes: 5 additions & 0 deletions nvflare/private/fed/utils/fed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from nvflare.security.logging import secure_format_exception
from nvflare.security.security import EmptyAuthorizer, FLAuthorizer

from ..simulator.simulator_const import SimulatorConstants
from .app_authz import AppAuthzService


Expand Down Expand Up @@ -334,3 +335,7 @@ def get_return_code(process, job_id, workspace, logger):
else:
return_code = process.poll()
return return_code


def get_simulator_app_root(simulator_root, site_name):
return os.path.join(simulator_root, site_name, SimulatorConstants.JOB_NAME, "app_" + site_name)

0 comments on commit bf978b7

Please sign in to comment.