From c0efc0122619320483cf14e151347b87b8c42c82 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Thu, 16 Mar 2023 20:12:38 +0800 Subject: [PATCH 01/22] [DLMED] update MonaiAlgoStats Signed-off-by: Nic Ma --- monai/bundle/workflows.py | 5 +- monai/fl/client/monai_algo.py | 133 +++++++--------------- tests/test_fl_monai_algo_stats.py | 19 ++-- tests/testing_data/config_fl_stats_1.json | 2 +- tests/testing_data/config_fl_stats_2.json | 2 +- 5 files changed, 57 insertions(+), 104 deletions(-) diff --git a/monai/bundle/workflows.py b/monai/bundle/workflows.py index ace08b3ec8..bfb42d3f9d 100644 --- a/monai/bundle/workflows.py +++ b/monai/bundle/workflows.py @@ -215,6 +215,7 @@ def __init__( else: settings_ = ConfigParser.load_config_files(tracking) self.patch_bundle_tracking(parser=self.parser, settings=settings_) + self._is_initialized: bool = False def initialize(self) -> Any: """ @@ -223,6 +224,7 @@ def initialize(self) -> Any: """ # reset the "reference_resolver" buffer at initialization stage self.parser.parse(reset=True) + self._is_initialized = True return self._run_expr(id=self.init_id) def run(self) -> Any: @@ -284,7 +286,7 @@ def _get_property(self, name: str, property: dict) -> Any: property: other information for the target property, defined in `TrainProperties` or `InferProperties`. """ - if not self.parser.ref_resolver.is_resolved(): + if not self._is_initialized: raise RuntimeError("Please execute 'initialize' before getting any parsed content.") prop_id = self._get_prop_id(name, property) return self.parser.get_parsed_content(id=prop_id) if prop_id is not None else None @@ -303,6 +305,7 @@ def _set_property(self, name: str, property: dict, value: Any) -> None: if prop_id is not None: self.parser[prop_id] = value # must parse the config again after changing the content + self._is_initialized = False self.parser.ref_resolver.reset() def _check_optional_id(self, name: str, property: dict) -> bool: diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index 031143c69b..fe0ed9daae 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -23,7 +23,7 @@ from monai.apps.auto3dseg.data_analyzer import DataAnalyzer from monai.apps.utils import get_logger from monai.auto3dseg import SegSummarizer -from monai.bundle import DEFAULT_EXP_MGMT_SETTINGS, ConfigComponent, ConfigItem, ConfigParser, ConfigWorkflow +from monai.bundle import BundleWorkflow, ConfigItem, ConfigParser from monai.engines import SupervisedTrainer, Trainer from monai.fl.client import ClientAlgo, ClientAlgoStats from monai.fl.utils.constants import ( @@ -80,55 +80,38 @@ def compute_weight_diff(global_weights, local_var_dict): return weight_diff -def check_bundle_config(parser): - for k in RequiredBundleKeys: - if parser.get(k, None) is None: - raise KeyError(f"Bundle config misses required key `{k}`") - - -def disable_ckpt_loaders(parser): - if BundleKeys.VALIDATE_HANDLERS in parser: - for h in parser[BundleKeys.VALIDATE_HANDLERS]: - if ConfigComponent.is_instantiable(h): - if "CheckpointLoader" in h["_target_"]: - h["_disabled_"] = True - - class MonaiAlgoStats(ClientAlgoStats): """ Implementation of ``ClientAlgoStats`` to allow federated learning with MONAI bundle configurations. Args: - bundle_root: path of bundle. - config_train_filename: bundle training config path relative to bundle_root. Can be a list of files; - defaults to "configs/train.json". + workflow: the bundle workflow to execute, usually it's training, evaluation or inference. config_filters_filename: filter configuration file. Can be a list of files; defaults to `None`. + data_stats_transform_list: transforms to apply for the data stats result. histogram_only: whether to only compute histograms. Defaults to False. """ def __init__( self, - bundle_root: str, - config_train_filename: str | list | None = "configs/train.json", + workflow: BundleWorkflow, config_filters_filename: str | list | None = None, - train_data_key: str | None = BundleKeys.TRAIN_DATA, - eval_data_key: str | None = BundleKeys.VALID_DATA, data_stats_transform_list: list | None = None, histogram_only: bool = False, ): self.logger = logger - self.bundle_root = bundle_root - self.config_train_filename = config_train_filename + if not isinstance(workflow, BundleWorkflow): + raise ValueError("workflow must be a subclass of MONAI BundleWorkflow.") + if workflow.get_workflow_type() is None: + raise ValueError("workflow doesn't specify the type.") + self.workflow = workflow self.config_filters_filename = config_filters_filename - self.train_data_key = train_data_key - self.eval_data_key = eval_data_key + self.train_data_key = "train" + self.eval_data_key = "eval" self.data_stats_transform_list = data_stats_transform_list self.histogram_only = histogram_only self.client_name: str | None = None self.app_root: str = "" - self.train_parser: ConfigParser | None = None - self.filter_parser: ConfigParser | None = None self.post_statistics_filters: Any = None self.phase = FlPhase.IDLE self.dataset_root: Any = None @@ -147,37 +130,21 @@ def initialize(self, extra=None): self.client_name = extra.get(ExtraItems.CLIENT_NAME, "noname") self.logger.info(f"Initializing {self.client_name} ...") + self.workflow.initialize() # FL platform needs to provide filepath to configuration files self.app_root = extra.get(ExtraItems.APP_ROOT, "") + self.workflow.bundle_root = os.path.join(self.app_root, self.workflow.bundle_root) + # initialize the workflow as the content changed + self.workflow.initialize() - # Read bundle config files - self.bundle_root = os.path.join(self.app_root, self.bundle_root) - - config_train_files = self._add_config_files(self.config_train_filename) config_filter_files = self._add_config_files(self.config_filters_filename) - - # Parse - self.train_parser = ConfigParser() - self.filter_parser = ConfigParser() - if len(config_train_files) > 0: - self.train_parser.read_config(config_train_files) - check_bundle_config(self.train_parser) + filter_parser = ConfigParser() if len(config_filter_files) > 0: - self.filter_parser.read_config(config_filter_files) - - # override some config items - self.train_parser[RequiredBundleKeys.BUNDLE_ROOT] = self.bundle_root - - # Get data location - self.dataset_root = self.train_parser.get_parsed_content( - BundleKeys.DATASET_DIR, default=ConfigItem(None, BundleKeys.DATASET_DIR) - ) - - # Get filters - self.post_statistics_filters = self.filter_parser.get_parsed_content( - FiltersType.POST_STATISTICS_FILTERS, default=ConfigItem(None, FiltersType.POST_STATISTICS_FILTERS) - ) - + filter_parser.read_config(config_filter_files) + # Get filters + self.post_statistics_filters = filter_parser.get_parsed_content( + FiltersType.POST_STATISTICS_FILTERS, default=ConfigItem(None, FiltersType.POST_STATISTICS_FILTERS) + ) self.logger.info(f"Initialized {self.client_name}.") def get_data_stats(self, extra: dict | None = None) -> ExchangeObject: @@ -195,9 +162,9 @@ def get_data_stats(self, extra: dict | None = None) -> ExchangeObject: if extra is None: raise ValueError("`extra` has to be set") - if self.dataset_root: + if self.workflow.dataset_dir: self.phase = FlPhase.GET_DATA_STATS - self.logger.info(f"Computing statistics on {self.dataset_root}") + self.logger.info(f"Computing statistics on {self.workflow.dataset_dir}") if FlStatistics.HIST_BINS not in extra: raise ValueError("FlStatistics.NUM_OF_BINS not specified in `extra`") @@ -212,7 +179,7 @@ def get_data_stats(self, extra: dict | None = None) -> ExchangeObject: # train data stats train_summary_stats, train_case_stats = self._get_data_key_stats( - parser=self.train_parser, + data=self.workflow.train_dataset_data, data_key=self.train_data_key, hist_bins=hist_bins, hist_range=hist_range, @@ -223,13 +190,18 @@ def get_data_stats(self, extra: dict | None = None) -> ExchangeObject: stats_dict.update({self.train_data_key: train_summary_stats}) # eval data stats - eval_summary_stats, eval_case_stats = self._get_data_key_stats( - parser=self.train_parser, - data_key=self.eval_data_key, - hist_bins=hist_bins, - hist_range=hist_range, - output_path=os.path.join(self.app_root, "eval_data_stats.yaml"), - ) + eval_summary_stats = None + eval_case_stats = None + if self.workflow.val_dataset_data is not None: + eval_summary_stats, eval_case_stats = self._get_data_key_stats( + data=self.workflow.val_dataset_data, + data_key=self.eval_data_key, + hist_bins=hist_bins, + hist_range=hist_range, + output_path=os.path.join(self.app_root, "eval_data_stats.yaml"), + ) + else: + self.logger.warning("the datalist doesn't contain validation section.") if eval_summary_stats: # Only return summary statistics to FL server stats_dict.update({self.eval_data_key: eval_summary_stats}) @@ -252,17 +224,10 @@ def get_data_stats(self, extra: dict | None = None) -> ExchangeObject: else: raise ValueError("data_root not set!") - def _get_data_key_stats(self, parser, data_key, hist_bins, hist_range, output_path=None): - if data_key not in parser: - self.logger.warning(f"Data key {data_key} not available in bundle configs.") - return None, None - data = parser.get_parsed_content(data_key) - - datalist = {data_key: data} - + def _get_data_key_stats(self, data, data_key, hist_bins, hist_range, output_path=None): analyzer = DataAnalyzer( - datalist=datalist, - dataroot=self.dataset_root, + datalist={data_key: data}, + dataroot=self.workflow.dataset_dir, hist_bins=hist_bins, hist_range=hist_range, output_path=output_path, @@ -307,11 +272,11 @@ def _add_config_files(self, config_files): files = [] if config_files: if isinstance(config_files, str): - files.append(os.path.join(self.bundle_root, config_files)) + files.append(os.path.join(self.workflow.bundle_root, config_files)) elif isinstance(config_files, list): for file in config_files: if isinstance(file, str): - files.append(os.path.join(self.bundle_root, file)) + files.append(os.path.join(self.workflow.bundle_root, file)) else: raise ValueError(f"Expected config file to be of type str but got {type(file)}: {file}") else: @@ -465,10 +430,8 @@ def initialize(self, extra=None): self.filter_parser = ConfigParser() if len(config_train_files) > 0: self.train_parser.read_config(config_train_files) - check_bundle_config(self.train_parser) if len(config_eval_files) > 0: self.eval_parser.read_config(config_eval_files) - check_bundle_config(self.eval_parser) if len(config_filter_files) > 0: self.filter_parser.read_config(config_filter_files) @@ -479,20 +442,6 @@ def initialize(self, extra=None): if BundleKeys.TRAIN_TRAINER_MAX_EPOCHS in self.train_parser: self.train_parser[BundleKeys.TRAIN_TRAINER_MAX_EPOCHS] = self.local_epochs - # remove checkpoint loaders - if self.disable_ckpt_loading: - disable_ckpt_loaders(self.train_parser) - disable_ckpt_loaders(self.eval_parser) - - # set tracking configs for experiment management - if self.tracking is not None: - if isinstance(self.tracking, str) and self.tracking in DEFAULT_EXP_MGMT_SETTINGS: - settings_ = DEFAULT_EXP_MGMT_SETTINGS[self.tracking] - else: - settings_ = ConfigParser.load_config_files(self.tracking) - ConfigWorkflow.patch_bundle_tracking(parser=self.train_parser, settings=settings_) - ConfigWorkflow.patch_bundle_tracking(parser=self.eval_parser, settings=settings_) - # Get trainer, evaluator self.trainer = self.train_parser.get_parsed_content( BundleKeys.TRAINER, default=ConfigItem(None, BundleKeys.TRAINER) diff --git a/tests/test_fl_monai_algo_stats.py b/tests/test_fl_monai_algo_stats.py index 1955c35b36..9281e14cc0 100644 --- a/tests/test_fl_monai_algo_stats.py +++ b/tests/test_fl_monai_algo_stats.py @@ -16,6 +16,7 @@ from parameterized import parameterized +from monai.bundle import ConfigWorkflow from monai.fl.client import MonaiAlgoStats from monai.fl.utils.constants import ExtraItems, FlStatistics from monai.fl.utils.exchange_object import ExchangeObject @@ -26,25 +27,25 @@ TEST_GET_DATA_STATS_1 = [ { - "bundle_root": _data_dir, - "config_train_filename": os.path.join(_data_dir, "config_fl_stats_1.json"), + "workflow": ConfigWorkflow(workflow="train", config_file=os.path.join(_data_dir, "config_fl_stats_1.json")), "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] TEST_GET_DATA_STATS_2 = [ { - "bundle_root": _data_dir, - "config_train_filename": os.path.join(_data_dir, "config_fl_stats_2.json"), + "workflow": ConfigWorkflow(workflow="train", config_file=os.path.join(_data_dir, "config_fl_stats_2.json")), "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] TEST_GET_DATA_STATS_3 = [ { - "bundle_root": _data_dir, - "config_train_filename": [ - os.path.join(_data_dir, "config_fl_stats_1.json"), - os.path.join(_data_dir, "config_fl_stats_2.json"), - ], + "workflow": ConfigWorkflow( + workflow="train", + config_file=[ + os.path.join(_data_dir, "config_fl_stats_1.json"), + os.path.join(_data_dir, "config_fl_stats_2.json"), + ] + ), "config_filters_filename": [ os.path.join(_data_dir, "config_fl_filters.json"), os.path.join(_data_dir, "config_fl_filters.json"), diff --git a/tests/testing_data/config_fl_stats_1.json b/tests/testing_data/config_fl_stats_1.json index 41b42eb3bb..80773139c2 100644 --- a/tests/testing_data/config_fl_stats_1.json +++ b/tests/testing_data/config_fl_stats_1.json @@ -2,7 +2,7 @@ "imports": [ "$import os" ], - "bundle_root": "tests/testing_data", + "bundle_root": "", "dataset_dir": "@bundle_root", "train": { "dataset": { diff --git a/tests/testing_data/config_fl_stats_2.json b/tests/testing_data/config_fl_stats_2.json index bf55673f67..8d24bc6a8b 100644 --- a/tests/testing_data/config_fl_stats_2.json +++ b/tests/testing_data/config_fl_stats_2.json @@ -2,7 +2,7 @@ "imports": [ "$import os" ], - "bundle_root": "tests/testing_data", + "bundle_root": "", "dataset_dir": "@bundle_root", "train": { "dataset": { From 86b72be77b750991c9d4c84039f4cd5cd4756036 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 17 Mar 2023 17:29:43 +0800 Subject: [PATCH 02/22] [DLMED] update MonaiAlgo Signed-off-by: Nic Ma --- monai/bundle/properties.py | 5 + monai/bundle/workflows.py | 7 +- monai/fl/client/monai_algo.py | 240 +++++++++--------------------- monai/fl/utils/constants.py | 14 -- tests/test_bundle_workflow.py | 4 + tests/test_fl_monai_algo.py | 151 ++++++++----------- tests/test_fl_monai_algo_stats.py | 2 +- 7 files changed, 151 insertions(+), 272 deletions(-) diff --git a/monai/bundle/properties.py b/monai/bundle/properties.py index 456d84a3b3..db79101290 100644 --- a/monai/bundle/properties.py +++ b/monai/bundle/properties.py @@ -159,6 +159,11 @@ BundleProperty.REQUIRED: True, BundlePropertyConfig.ID: "device", }, + "evaluator": { + BundleProperty.DESC: "inference / evaluation workflow engine.", + BundleProperty.REQUIRED: True, + BundlePropertyConfig.ID: "evaluator", + }, "network_def": { BundleProperty.DESC: "network module for the inference.", BundleProperty.REQUIRED: True, diff --git a/monai/bundle/workflows.py b/monai/bundle/workflows.py index bfb42d3f9d..8641999f5f 100644 --- a/monai/bundle/workflows.py +++ b/monai/bundle/workflows.py @@ -44,15 +44,18 @@ class BundleWorkflow(ABC): """ + supported_train_type: tuple = ("train", "training") + supported_infer_type: tuple = ("infer", "inference", "eval", "evaluation") + def __init__(self, workflow: str | None = None): if workflow is None: self.properties = None self.workflow = None return - if workflow.lower() in ("train", "training"): + if workflow.lower() in self.supported_train_type: self.properties = TrainProperties self.workflow = "train" - elif workflow.lower() in ("infer", "inference", "eval", "evaluation"): + elif workflow.lower() in self.supported_infer_type: self.properties = InferProperties self.workflow = "infer" else: diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index fe0ed9daae..512283dc60 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -11,31 +11,19 @@ from __future__ import annotations -import logging import os from collections.abc import Mapping, MutableMapping from typing import Any, cast import torch -import torch.distributed as dist -import monai from monai.apps.auto3dseg.data_analyzer import DataAnalyzer from monai.apps.utils import get_logger from monai.auto3dseg import SegSummarizer from monai.bundle import BundleWorkflow, ConfigItem, ConfigParser -from monai.engines import SupervisedTrainer, Trainer +from monai.engines import SupervisedEvaluator, SupervisedTrainer, Trainer from monai.fl.client import ClientAlgo, ClientAlgoStats -from monai.fl.utils.constants import ( - BundleKeys, - ExtraItems, - FiltersType, - FlPhase, - FlStatistics, - ModelType, - RequiredBundleKeys, - WeightType, -) +from monai.fl.utils.constants import ExtraItems, FiltersType, FlPhase, FlStatistics, ModelType, WeightType from monai.fl.utils.exchange_object import ExchangeObject from monai.networks.utils import copy_model_state, get_state_dict from monai.utils import min_version, require_pkg @@ -100,7 +88,7 @@ def __init__( ): self.logger = logger if not isinstance(workflow, BundleWorkflow): - raise ValueError("workflow must be a subclass of MONAI BundleWorkflow.") + raise ValueError("workflow must be a subclass of BundleWorkflow.") if workflow.get_workflow_type() is None: raise ValueError("workflow doesn't specify the type.") self.workflow = workflow @@ -293,95 +281,61 @@ class MonaiAlgo(ClientAlgo, MonaiAlgoStats): FIXME: reimplement this class based on the bundle "ConfigWorkflow". Args: - bundle_root: path of bundle. + train_workflow: the bundle workflow to execute training. + eval_workflow: the bundle workflow to execute evaluation. local_epochs: number of local epochs to execute during each round of local training; defaults to 1. send_weight_diff: whether to send weight differences rather than full weights; defaults to `True`. - config_train_filename: bundle training config path relative to bundle_root. Can be a list of files; - defaults to "configs/train.json". - config_evaluate_filename: bundle evaluation config path relative to bundle_root. Can be a list of files. - If "default", config_evaluate_filename = ["configs/train.json", "configs/evaluate.json"] will be used; config_filters_filename: filter configuration file. Can be a list of files; defaults to `None`. - disable_ckpt_loading: do not use any CheckpointLoader if defined in train/evaluate configs; defaults to `True`. best_model_filepath: location of best model checkpoint; defaults "models/model.pt" relative to `bundle_root`. final_model_filepath: location of final model checkpoint; defaults "models/model_final.pt" relative to `bundle_root`. save_dict_key: If a model checkpoint contains several state dicts, the one defined by `save_dict_key` will be returned by `get_weights`; defaults to "model". If all state dicts should be returned, set `save_dict_key` to None. - seed: set random seed for modules to enable or disable deterministic training; defaults to `None`, - i.e., non-deterministic training. - benchmark: set benchmark to `False` for full deterministic behavior in cuDNN components. - Note, full determinism in federated learning depends also on deterministic behavior of other FL components, - e.g., the aggregator, which is not controlled by this class. - multi_gpu: whether to run MonaiAlgo in a multi-GPU setting; defaults to `False`. - backend: backend to use for torch.distributed; defaults to "nccl". - init_method: init_method for torch.distributed; defaults to "env://". - tracking: if not None, enable the experiment tracking at runtime with optionally configurable and extensible. - if "mlflow", will add `MLFlowHandler` to the parsed bundle with default tracking settings, - if other string, treat it as file path to load the tracking settings. - if `dict`, treat it as tracking settings. - will patch the target config content with `tracking handlers` and the top-level items of `configs`. - for detailed usage examples, plesae check the tutorial: - https://github.com/Project-MONAI/tutorials/blob/main/experiment_management/bundle_integrate_mlflow.ipynb. + data_stats_transform_list: transforms to apply for the data stats result. """ def __init__( self, - bundle_root: str, + train_workflow: BundleWorkflow | None = None, + eval_workflow: BundleWorkflow | None = None, local_epochs: int = 1, send_weight_diff: bool = True, - config_train_filename: str | list | None = "configs/train.json", - config_evaluate_filename: str | list | None = "default", config_filters_filename: str | list | None = None, - disable_ckpt_loading: bool = True, best_model_filepath: str | None = "models/model.pt", final_model_filepath: str | None = "models/model_final.pt", save_dict_key: str | None = "model", - seed: int | None = None, - benchmark: bool = True, - multi_gpu: bool = False, - backend: str = "nccl", - init_method: str = "env://", - train_data_key: str | None = BundleKeys.TRAIN_DATA, - eval_data_key: str | None = BundleKeys.VALID_DATA, data_stats_transform_list: list | None = None, - tracking: str | dict | None = None, ): self.logger = logger - if config_evaluate_filename == "default": - # by default, evaluator needs both training and evaluate to be instantiated. - config_evaluate_filename = ["configs/train.json", "configs/evaluate.json"] - self.bundle_root = bundle_root + if train_workflow is not None: + if not isinstance(train_workflow, BundleWorkflow) or train_workflow.get_workflow_type() != "train": + raise ValueError( + f"train workflow must be BundleWorkflow and set type in {BundleWorkflow.supported_train_type}." + ) + # only necessary to initialize AlgoStats when train workflow exists + MonaiAlgoStats.__init__(self, workflow=train_workflow, data_stats_transform_list=data_stats_transform_list) + if eval_workflow is not None: + # evaluation workflow can be "train" type or "infer" type + if not isinstance(eval_workflow, BundleWorkflow) or eval_workflow.get_workflow_type() is None: + raise ValueError("train workflow must be BundleWorkflow and set type.") + self.train_workflow = train_workflow + self.eval_workflow = eval_workflow self.local_epochs = local_epochs self.send_weight_diff = send_weight_diff - self.config_train_filename = config_train_filename - self.config_evaluate_filename = config_evaluate_filename self.config_filters_filename = config_filters_filename - self.disable_ckpt_loading = disable_ckpt_loading self.model_filepaths = {ModelType.BEST_MODEL: best_model_filepath, ModelType.FINAL_MODEL: final_model_filepath} self.save_dict_key = save_dict_key - self.seed = seed - self.benchmark = benchmark - self.multi_gpu = multi_gpu - self.backend = backend - self.init_method = init_method - self.train_data_key = train_data_key - self.eval_data_key = eval_data_key - self.data_stats_transform_list = data_stats_transform_list - self.tracking = tracking self.app_root = "" - self.train_parser: ConfigParser | None = None - self.eval_parser: ConfigParser | None = None self.filter_parser: ConfigParser | None = None self.trainer: SupervisedTrainer | None = None - self.evaluator: Any | None = None + self.evaluator: SupervisedEvaluator | None = None self.pre_filters = None self.post_weight_filters = None self.post_evaluate_filters = None self.iter_of_start_time = 0 self.global_weights: Mapping | None = None - self.rank = 0 self.phase = FlPhase.IDLE self.client_name = None @@ -400,80 +354,46 @@ def initialize(self, extra=None): extra = {} self.client_name = extra.get(ExtraItems.CLIENT_NAME, "noname") self.logger.info(f"Initializing {self.client_name} ...") - - if self.multi_gpu: - dist.init_process_group(backend=self.backend, init_method=self.init_method) - self._set_cuda_device() - self.logger.info( - f"Using multi-gpu training on rank {self.rank} (available devices: {torch.cuda.device_count()})" - ) - if self.rank > 0: - self.logger.setLevel(logging.WARNING) - - if self.seed: - monai.utils.set_determinism(seed=self.seed) - torch.backends.cudnn.benchmark = self.benchmark - # FL platform needs to provide filepath to configuration files self.app_root = extra.get(ExtraItems.APP_ROOT, "") - # Read bundle config files - self.bundle_root = os.path.join(self.app_root, self.bundle_root) - - config_train_files = self._add_config_files(self.config_train_filename) - config_eval_files = self._add_config_files(self.config_evaluate_filename) - config_filter_files = self._add_config_files(self.config_filters_filename) - - # Parse - self.train_parser = ConfigParser() - self.eval_parser = ConfigParser() - self.filter_parser = ConfigParser() - if len(config_train_files) > 0: - self.train_parser.read_config(config_train_files) - if len(config_eval_files) > 0: - self.eval_parser.read_config(config_eval_files) - if len(config_filter_files) > 0: - self.filter_parser.read_config(config_filter_files) - - # override some config items - self.train_parser[RequiredBundleKeys.BUNDLE_ROOT] = self.bundle_root - self.eval_parser[RequiredBundleKeys.BUNDLE_ROOT] = self.bundle_root - # number of training epochs for each round - if BundleKeys.TRAIN_TRAINER_MAX_EPOCHS in self.train_parser: - self.train_parser[BundleKeys.TRAIN_TRAINER_MAX_EPOCHS] = self.local_epochs - - # Get trainer, evaluator - self.trainer = self.train_parser.get_parsed_content( - BundleKeys.TRAINER, default=ConfigItem(None, BundleKeys.TRAINER) - ) - self.evaluator = self.eval_parser.get_parsed_content( - BundleKeys.EVALUATOR, default=ConfigItem(None, BundleKeys.EVALUATOR) - ) + if self.train_workflow is not None: + self.train_workflow.initialize() + self.train_workflow.bundle_root = os.path.join(self.app_root, self.train_workflow.bundle_root) + self.train_workflow.max_epochs = self.local_epochs + # initialize the workflow as the content changed + self.train_workflow.initialize() + self.trainer = self.train_workflow.trainer + if not isinstance(self.trainer, SupervisedTrainer): + raise ValueError(f"trainer must be SupervisedTrainer, but got: {type(self.trainer)}.") + + config_filter_files = self._add_config_files(self.config_filters_filename) + self.filter_parser = ConfigParser() + if len(config_filter_files) > 0: + self.filter_parser.read_config(config_filter_files) - # Get filters - self.pre_filters = self.filter_parser.get_parsed_content( - FiltersType.PRE_FILTERS, default=ConfigItem(None, FiltersType.PRE_FILTERS) - ) - self.post_weight_filters = self.filter_parser.get_parsed_content( - FiltersType.POST_WEIGHT_FILTERS, default=ConfigItem(None, FiltersType.POST_WEIGHT_FILTERS) - ) - self.post_evaluate_filters = self.filter_parser.get_parsed_content( - FiltersType.POST_EVALUATE_FILTERS, default=ConfigItem(None, FiltersType.POST_EVALUATE_FILTERS) - ) - self.post_statistics_filters = self.filter_parser.get_parsed_content( - FiltersType.POST_STATISTICS_FILTERS, default=ConfigItem(None, FiltersType.POST_STATISTICS_FILTERS) - ) + # Get filters + self.pre_filters = self.filter_parser.get_parsed_content( + FiltersType.PRE_FILTERS, default=ConfigItem(None, FiltersType.PRE_FILTERS) + ) + self.post_weight_filters = self.filter_parser.get_parsed_content( + FiltersType.POST_WEIGHT_FILTERS, default=ConfigItem(None, FiltersType.POST_WEIGHT_FILTERS) + ) + self.post_evaluate_filters = self.filter_parser.get_parsed_content( + FiltersType.POST_EVALUATE_FILTERS, default=ConfigItem(None, FiltersType.POST_EVALUATE_FILTERS) + ) + self.post_statistics_filters = self.filter_parser.get_parsed_content( + FiltersType.POST_STATISTICS_FILTERS, default=ConfigItem(None, FiltersType.POST_STATISTICS_FILTERS) + ) - # Get data location - self.dataset_root = self.train_parser.get_parsed_content( - BundleKeys.DATASET_DIR, default=ConfigItem(None, BundleKeys.DATASET_DIR) - ) + if self.eval_workflow is not None: + self.eval_workflow.initialize() + self.eval_workflow.bundle_root = os.path.join(self.app_root, self.eval_workflow.bundle_root) + self.eval_workflow.initialize() + self.evaluator = self.eval_workflow.evaluator + if not isinstance(self.evaluator, SupervisedEvaluator): + raise ValueError(f"evaluator must be SupervisedEvaluator, but got: {type(self.evaluator)}.") - if self.multi_gpu: - if self.rank > 0 and self.trainer: - self.trainer.logger.setLevel(logging.WARNING) - if self.rank > 0 and self.evaluator: - self.evaluator.logger.setLevel(logging.WARNING) self.logger.info(f"Initialized {self.client_name}.") def train(self, data: ExchangeObject, extra: dict | None = None) -> None: @@ -485,7 +405,6 @@ def train(self, data: ExchangeObject, extra: dict | None = None) -> None: extra: Dict with additional information that can be provided by the FL system. """ - self._set_cuda_device() if extra is None: extra = {} @@ -528,10 +447,11 @@ def get_weights(self, extra=None): or load requested model type from disk (`ModelType.BEST_MODEL` or `ModelType.FINAL_MODEL`). """ - self._set_cuda_device() if extra is None: extra = {} + if self.train_workflow is None or self.trainer is None: + raise ValueError("self.trainer should not be None.") # by default return current weights, return best if requested via model type. self.phase = FlPhase.GET_WEIGHTS @@ -543,7 +463,7 @@ def get_weights(self, extra=None): f"Expected requested model type to be of type `ModelType` but received {type(model_type)}" ) if model_type in self.model_filepaths: - model_path = os.path.join(self.bundle_root, cast(str, self.model_filepaths[model_type])) + model_path = os.path.join(self.train_workflow.bundle_root, cast(str, self.model_filepaths[model_type])) if not os.path.isfile(model_path): raise ValueError(f"No best model checkpoint exists at {model_path}") weights = torch.load(model_path, map_location="cpu") @@ -558,26 +478,21 @@ def get_weights(self, extra=None): f"Requested model type {model_type} not specified in `model_filepaths`: {self.model_filepaths}" ) else: - if self.trainer: - weights = get_state_dict(self.trainer.network) - # returned weights will be on the cpu - for k in weights.keys(): - weights[k] = weights[k].cpu() - weigh_type = WeightType.WEIGHTS - stats = self.trainer.get_stats() - # calculate current iteration and epoch data after training. - stats[FlStatistics.NUM_EXECUTED_ITERATIONS] = self.trainer.state.iteration - self.iter_of_start_time - # compute weight differences - if self.send_weight_diff: - weights = compute_weight_diff(global_weights=self.global_weights, local_var_dict=weights) - weigh_type = WeightType.WEIGHT_DIFF - self.logger.info("Returning current weight differences.") - else: - self.logger.info("Returning current weights.") + weights = get_state_dict(self.trainer.network) + # returned weights will be on the cpu + for k in weights.keys(): + weights[k] = weights[k].cpu() + weigh_type = WeightType.WEIGHTS + stats = self.trainer.get_stats() + # calculate current iteration and epoch data after training. + stats[FlStatistics.NUM_EXECUTED_ITERATIONS] = self.trainer.state.iteration - self.iter_of_start_time + # compute weight differences + if self.send_weight_diff: + weights = compute_weight_diff(global_weights=self.global_weights, local_var_dict=weights) + weigh_type = WeightType.WEIGHT_DIFF + self.logger.info("Returning current weight differences.") else: - weights = None - weigh_type = None - stats = dict() + self.logger.info("Returning current weights.") if not isinstance(stats, dict): raise ValueError(f"stats is not a dict, {stats}") @@ -607,7 +522,6 @@ def evaluate(self, data: ExchangeObject, extra: dict | None = None) -> ExchangeO return_metrics: `ExchangeObject` containing evaluation metrics. """ - self._set_cuda_device() if extra is None: extra = {} @@ -671,9 +585,6 @@ def finalize(self, extra: dict | None = None) -> None: self.logger.info(f"Terminating {self.client_name} evaluator...") self.evaluator.terminate() - if self.multi_gpu: - dist.destroy_process_group() - def _check_converted(self, global_weights, local_var_dict, n_converted): if n_converted == 0: self.logger.warning( @@ -683,8 +594,3 @@ def _check_converted(self, global_weights, local_var_dict, n_converted): self.logger.info( f"Converted {n_converted} global variables to match {len(local_var_dict)} local variables." ) - - def _set_cuda_device(self): - if self.multi_gpu: - self.rank = int(os.environ["LOCAL_RANK"]) - torch.cuda.set_device(self.rank) diff --git a/monai/fl/utils/constants.py b/monai/fl/utils/constants.py index fbd18b364c..3f229d6ecc 100644 --- a/monai/fl/utils/constants.py +++ b/monai/fl/utils/constants.py @@ -51,20 +51,6 @@ class FlStatistics(StrEnum): FEATURE_NAMES = "feature_names" -class RequiredBundleKeys(StrEnum): - BUNDLE_ROOT = "bundle_root" - - -class BundleKeys(StrEnum): - TRAINER = "train#trainer" - EVALUATOR = "validate#evaluator" - TRAIN_TRAINER_MAX_EPOCHS = "train#trainer#max_epochs" - VALIDATE_HANDLERS = "validate#handlers" - DATASET_DIR = "dataset_dir" - TRAIN_DATA = "train#dataset#data" - VALID_DATA = "validate#dataset#data" - - class FiltersType(StrEnum): PRE_FILTERS = "pre_filters" POST_WEIGHT_FILTERS = "post_weight_filters" diff --git a/tests/test_bundle_workflow.py b/tests/test_bundle_workflow.py index 948c351a1c..023f04230c 100644 --- a/tests/test_bundle_workflow.py +++ b/tests/test_bundle_workflow.py @@ -115,6 +115,8 @@ def _get_property(self, name, property): return self._bundle_root if name == "device": return self._device + if name == "evaluator": + return self._evaluator if name == "network_def": return self._network_def if name == "inferer": @@ -131,6 +133,8 @@ def _set_property(self, name, property, value): self._bundle_root = value elif name == "device": self._device = value + elif name == "evaluator": + self._evaluator = value elif name == "network_def": self._network_def = value elif name == "inferer": diff --git a/tests/test_fl_monai_algo.py b/tests/test_fl_monai_algo.py index c4c5da00bb..13be295e57 100644 --- a/tests/test_fl_monai_algo.py +++ b/tests/test_fl_monai_algo.py @@ -13,12 +13,12 @@ import os import shutil -import tempfile import unittest +from copy import deepcopy from parameterized import parameterized -from monai.bundle import ConfigParser +from monai.bundle import ConfigParser, ConfigWorkflow from monai.bundle.utils import DEFAULT_HANDLERS_ID from monai.fl.client.monai_algo import MonaiAlgo from monai.fl.utils.constants import ExtraItems @@ -31,28 +31,25 @@ TEST_TRAIN_1 = [ { - "bundle_root": _data_dir, - "config_train_filename": os.path.join(_data_dir, "config_fl_train.json"), - "config_evaluate_filename": None, + "train_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_train.json"), workflow="train"), "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] TEST_TRAIN_2 = [ { - "bundle_root": _data_dir, - "config_train_filename": os.path.join(_data_dir, "config_fl_train.json"), - "config_evaluate_filename": None, + "train_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_train.json"), workflow="train"), "config_filters_filename": None, } ] TEST_TRAIN_3 = [ { - "bundle_root": _data_dir, - "config_train_filename": [ - os.path.join(_data_dir, "config_fl_train.json"), - os.path.join(_data_dir, "config_fl_train.json"), - ], - "config_evaluate_filename": None, + "train_workflow": ConfigWorkflow( + config_file=[ + os.path.join(_data_dir, "config_fl_train.json"), + os.path.join(_data_dir, "config_fl_train.json"), + ], + workflow="train", + ), "config_filters_filename": [ os.path.join(_data_dir, "config_fl_filters.json"), os.path.join(_data_dir, "config_fl_filters.json"), @@ -60,30 +57,49 @@ } ] +TEST_TRAIN_4 = [ + { + "train_workflow": ConfigWorkflow( + config_file=os.path.join(_data_dir, "config_fl_train.json"), + workflow="train", + tracking={ + "handlers_id": DEFAULT_HANDLERS_ID, + "configs": { + "execute_config": f"{_data_dir}/config_executed.json", + "trainer": { + "_target_": "MLFlowHandler", + "tracking_uri": path_to_uri(_data_dir) + "/mlflow_override", + "output_transform": "$monai.handlers.from_engine(['loss'], first=True)", + "close_on_complete": True, + }, + }, + }, + ), + "config_filters_filename": None, + } +] + TEST_EVALUATE_1 = [ { - "bundle_root": _data_dir, - "config_train_filename": None, - "config_evaluate_filename": os.path.join(_data_dir, "config_fl_evaluate.json"), + "eval_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_evaluate.json"), workflow="train"), "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] TEST_EVALUATE_2 = [ { - "bundle_root": _data_dir, - "config_train_filename": None, - "config_evaluate_filename": os.path.join(_data_dir, "config_fl_evaluate.json"), + "eval_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_evaluate.json"), workflow="train"), "config_filters_filename": None, } ] TEST_EVALUATE_3 = [ { - "bundle_root": _data_dir, - "config_train_filename": None, - "config_evaluate_filename": [ - os.path.join(_data_dir, "config_fl_evaluate.json"), - os.path.join(_data_dir, "config_fl_evaluate.json"), - ], + "eval_workflow": ConfigWorkflow( + config_file=[ + os.path.join(_data_dir, "config_fl_evaluate.json"), + os.path.join(_data_dir, "config_fl_evaluate.json"), + ], + workflow="train", + ), "config_filters_filename": [ os.path.join(_data_dir, "config_fl_filters.json"), os.path.join(_data_dir, "config_fl_filters.json"), @@ -93,39 +109,27 @@ TEST_GET_WEIGHTS_1 = [ { - "bundle_root": _data_dir, - "config_train_filename": os.path.join(_data_dir, "config_fl_train.json"), - "config_evaluate_filename": None, + "train_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_train.json"), workflow="train"), "send_weight_diff": False, "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] TEST_GET_WEIGHTS_2 = [ { - "bundle_root": _data_dir, - "config_train_filename": None, - "config_evaluate_filename": None, - "send_weight_diff": False, - "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), - } -] -TEST_GET_WEIGHTS_3 = [ - { - "bundle_root": _data_dir, - "config_train_filename": os.path.join(_data_dir, "config_fl_train.json"), - "config_evaluate_filename": None, + "train_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_train.json"), workflow="train"), "send_weight_diff": True, "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] -TEST_GET_WEIGHTS_4 = [ +TEST_GET_WEIGHTS_3 = [ { - "bundle_root": _data_dir, - "config_train_filename": [ - os.path.join(_data_dir, "config_fl_train.json"), - os.path.join(_data_dir, "config_fl_train.json"), - ], - "config_evaluate_filename": None, + "train_workflow": ConfigWorkflow( + config_file=[ + os.path.join(_data_dir, "config_fl_train.json"), + os.path.join(_data_dir, "config_fl_train.json"), + ], + workflow="train", + ), "send_weight_diff": True, "config_filters_filename": [ os.path.join(_data_dir, "config_fl_filters.json"), @@ -138,39 +142,15 @@ @SkipIfNoModule("ignite") @SkipIfNoModule("mlflow") class TestFLMonaiAlgo(unittest.TestCase): - @parameterized.expand([TEST_TRAIN_1, TEST_TRAIN_2, TEST_TRAIN_3]) + @parameterized.expand([TEST_TRAIN_1, TEST_TRAIN_2, TEST_TRAIN_3, TEST_TRAIN_4]) def test_train(self, input_params): - # get testing data dir and update train config; using the first to define data dir - if isinstance(input_params["config_train_filename"], list): - config_train_filename = [ - os.path.join(input_params["bundle_root"], x) for x in input_params["config_train_filename"] - ] - else: - config_train_filename = os.path.join(input_params["bundle_root"], input_params["config_train_filename"]) - - data_dir = tempfile.mkdtemp() - # test experiment management - input_params["tracking"] = { - "handlers_id": DEFAULT_HANDLERS_ID, - "configs": { - "execute_config": f"{data_dir}/config_executed.json", - "trainer": { - "_target_": "MLFlowHandler", - "tracking_uri": path_to_uri(data_dir) + "/mlflow_override", - "output_transform": "$monai.handlers.from_engine(['loss'], first=True)", - "close_on_complete": True, - }, - }, - } - # initialize algo algo = MonaiAlgo(**input_params) algo.initialize(extra={ExtraItems.CLIENT_NAME: "test_fl"}) algo.abort() # initialize model - parser = ConfigParser() - parser.read_config(config_train_filename) + parser = ConfigParser(config=deepcopy(algo.train_workflow.parser.get())) parser.parse() network = parser.get_parsed_content("network") @@ -179,27 +159,22 @@ def test_train(self, input_params): # test train algo.train(data=data, extra={}) algo.finalize() - self.assertTrue(os.path.exists(f"{data_dir}/mlflow_override")) - self.assertTrue(os.path.exists(f"{data_dir}/config_executed.json")) - shutil.rmtree(data_dir) + + # test experiment management + if "execute_config" in algo.train_workflow.parser: + self.assertTrue(os.path.exists(f"{_data_dir}/mlflow_override")) + shutil.rmtree(f"{_data_dir}/mlflow_override") + self.assertTrue(os.path.exists(f"{_data_dir}/config_executed.json")) + os.remove(f"{_data_dir}/config_executed.json") @parameterized.expand([TEST_EVALUATE_1, TEST_EVALUATE_2, TEST_EVALUATE_3]) def test_evaluate(self, input_params): - # get testing data dir and update train config; using the first to define data dir - if isinstance(input_params["config_evaluate_filename"], list): - config_eval_filename = [ - os.path.join(input_params["bundle_root"], x) for x in input_params["config_evaluate_filename"] - ] - else: - config_eval_filename = os.path.join(input_params["bundle_root"], input_params["config_evaluate_filename"]) - # initialize algo algo = MonaiAlgo(**input_params) algo.initialize(extra={ExtraItems.CLIENT_NAME: "test_fl"}) # initialize model - parser = ConfigParser() - parser.read_config(config_eval_filename) + parser = ConfigParser(config=deepcopy(algo.eval_workflow.parser.get())) parser.parse() network = parser.get_parsed_content("network") @@ -208,7 +183,7 @@ def test_evaluate(self, input_params): # test evaluate algo.evaluate(data=data, extra={}) - @parameterized.expand([TEST_GET_WEIGHTS_1, TEST_GET_WEIGHTS_2, TEST_GET_WEIGHTS_3, TEST_GET_WEIGHTS_4]) + @parameterized.expand([TEST_GET_WEIGHTS_1, TEST_GET_WEIGHTS_2, TEST_GET_WEIGHTS_3]) def test_get_weights(self, input_params): # initialize algo algo = MonaiAlgo(**input_params) diff --git a/tests/test_fl_monai_algo_stats.py b/tests/test_fl_monai_algo_stats.py index 9281e14cc0..2bd05f1ea6 100644 --- a/tests/test_fl_monai_algo_stats.py +++ b/tests/test_fl_monai_algo_stats.py @@ -44,7 +44,7 @@ config_file=[ os.path.join(_data_dir, "config_fl_stats_1.json"), os.path.join(_data_dir, "config_fl_stats_2.json"), - ] + ], ), "config_filters_filename": [ os.path.join(_data_dir, "config_fl_filters.json"), From cba1490d8407a1723a6937f36af2dccf63129f5a Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 17 Mar 2023 18:37:35 +0800 Subject: [PATCH 03/22] [DLMED] update distributed tests Signed-off-by: Nic Ma --- monai/fl/client/monai_algo.py | 4 ++ tests/test_fl_monai_algo_dist.py | 52 +++++++--------------- tests/testing_data/multi_gpu_evaluate.json | 12 +++-- tests/testing_data/multi_gpu_train.json | 12 +++-- 4 files changed, 35 insertions(+), 45 deletions(-) diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index 512283dc60..65d9a394d5 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -584,6 +584,10 @@ def finalize(self, extra: dict | None = None) -> None: if isinstance(self.evaluator, Trainer): self.logger.info(f"Terminating {self.client_name} evaluator...") self.evaluator.terminate() + if self.train_workflow is not None: + self.train_workflow.finalize() + if self.eval_workflow is not None: + self.eval_workflow.finalize() def _check_converted(self, global_weights, local_var_dict, n_converted): if n_converted == 0: diff --git a/tests/test_fl_monai_algo_dist.py b/tests/test_fl_monai_algo_dist.py index 36c2f419b3..75b64daf48 100644 --- a/tests/test_fl_monai_algo_dist.py +++ b/tests/test_fl_monai_algo_dist.py @@ -16,9 +16,8 @@ from os.path import join as pathjoin import torch.distributed as dist -from parameterized import parameterized -from monai.bundle import ConfigParser +from monai.bundle import ConfigParser, ConfigWorkflow from monai.fl.client.monai_algo import MonaiAlgo from monai.fl.utils.constants import ExtraItems from monai.fl.utils.exchange_object import ExchangeObject @@ -26,68 +25,47 @@ _root_dir = os.path.abspath(pathjoin(os.path.dirname(__file__))) _data_dir = pathjoin(_root_dir, "testing_data") -TEST_TRAIN_1 = [ - { - "bundle_root": _data_dir, - "config_train_filename": [ - pathjoin(_data_dir, "config_fl_train.json"), - pathjoin(_data_dir, "multi_gpu_train.json"), - ], - "config_evaluate_filename": None, - "config_filters_filename": pathjoin(_root_dir, "testing_data", "config_fl_filters.json"), - "multi_gpu": True, - } -] - -TEST_EVALUATE_1 = [ - { - "bundle_root": _data_dir, - "config_train_filename": None, - "config_evaluate_filename": [ - pathjoin(_data_dir, "config_fl_evaluate.json"), - pathjoin(_data_dir, "multi_gpu_evaluate.json"), - ], - "config_filters_filename": pathjoin(_data_dir, "config_fl_filters.json"), - "multi_gpu": True, - } -] @SkipIfNoModule("ignite") @SkipIfBeforePyTorchVersion((1, 11, 1)) class TestFLMonaiAlgo(DistTestCase): - @parameterized.expand([TEST_TRAIN_1]) @DistCall(nnodes=1, nproc_per_node=2, init_method="no_init") @skip_if_no_cuda - def test_train(self, input_params): + def test_train(self): + config_file = [pathjoin(_data_dir, "config_fl_train.json"), pathjoin(_data_dir, "multi_gpu_train.json")] # initialize algo - algo = MonaiAlgo(**input_params) + algo = MonaiAlgo( + train_workflow=ConfigWorkflow(config_file=config_file, workflow="train"), + config_filters_filename=pathjoin(_root_dir, "testing_data", "config_fl_filters.json"), + ) algo.initialize(extra={ExtraItems.CLIENT_NAME: "test_fl"}) self.assertTrue(dist.get_rank() in (0, 1)) # initialize model parser = ConfigParser() - parser.read_config([pathjoin(input_params["bundle_root"], x) for x in input_params["config_train_filename"]]) + parser.read_config(config_file) parser.parse() network = parser.get_parsed_content("network") data = ExchangeObject(weights=network.state_dict()) # test train algo.train(data=data, extra={}) - @parameterized.expand([TEST_EVALUATE_1]) @DistCall(nnodes=1, nproc_per_node=2, init_method="no_init") @skip_if_no_cuda - def test_evaluate(self, input_params): + def test_evaluate(self): + config_file = [pathjoin(_data_dir, "config_fl_evaluate.json"), pathjoin(_data_dir, "multi_gpu_evaluate.json")] # initialize algo - algo = MonaiAlgo(**input_params) + algo = MonaiAlgo( + eval_workflow=ConfigWorkflow(config_file=config_file, workflow="train"), + config_filters_filename=pathjoin(_data_dir, "config_fl_filters.json"), + ) algo.initialize(extra={ExtraItems.CLIENT_NAME: "test_fl"}) self.assertTrue(dist.get_rank() in (0, 1)) # initialize model parser = ConfigParser() - parser.read_config( - [os.path.join(input_params["bundle_root"], x) for x in input_params["config_evaluate_filename"]] - ) + parser.read_config(pathjoin(_data_dir, "config_fl_evaluate.json")) parser.parse() network = parser.get_parsed_content("network") data = ExchangeObject(weights=network.state_dict()) diff --git a/tests/testing_data/multi_gpu_evaluate.json b/tests/testing_data/multi_gpu_evaluate.json index 7af24a6b2e..37286cfb7a 100644 --- a/tests/testing_data/multi_gpu_evaluate.json +++ b/tests/testing_data/multi_gpu_evaluate.json @@ -14,14 +14,18 @@ "shuffle": false }, "validate#dataloader#sampler": "@validate#sampler", - "evaluating": [ + "initialize": [ "$import torch.distributed as dist", - "$dist.init_process_group(backend='nccl')", + "$dist.is_initialized() or dist.init_process_group(backend='nccl')", "$torch.cuda.set_device(@device)", "$setattr(torch.backends.cudnn, 'benchmark', True)", "$import logging", - "$@validate#evaluator.logger.setLevel(logging.WARNING if dist.get_rank() > 0 else logging.INFO)", - "$@validate#evaluator.run()", + "$@validate#evaluator.logger.setLevel(logging.WARNING if dist.get_rank() > 0 else logging.INFO)" + ], + "run": [ + "$@validate#evaluator.run()" + ], + "finalize": [ "$dist.destroy_process_group()" ] } diff --git a/tests/testing_data/multi_gpu_train.json b/tests/testing_data/multi_gpu_train.json index 41fd7698db..e9a3f3b504 100644 --- a/tests/testing_data/multi_gpu_train.json +++ b/tests/testing_data/multi_gpu_train.json @@ -16,15 +16,19 @@ "train#dataloader#sampler": "@train#sampler", "train#dataloader#shuffle": false, "train#trainer#train_handlers": "$@train#handlers[: -2 if dist.get_rank() > 0 else None]", - "training": [ + "initialize": [ "$import torch.distributed as dist", - "$dist.init_process_group(backend='nccl')", + "$dist.is_initialized() or dist.init_process_group(backend='nccl')", "$torch.cuda.set_device(@device)", "$monai.utils.set_determinism(seed=123)", "$setattr(torch.backends.cudnn, 'benchmark', True)", "$import logging", - "$@train#trainer.logger.setLevel(logging.WARNING if dist.get_rank() > 0 else logging.INFO)", - "$@train#trainer.run()", + "$@train#trainer.logger.setLevel(logging.WARNING if dist.get_rank() > 0 else logging.INFO)" + ], + "run": [ + "$@train#trainer.run()" + ], + "finalize": [ "$dist.destroy_process_group()" ] } From 475ded467f2c05e17449e0be9912ab564cd82e34 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 31 Mar 2023 22:20:15 +0800 Subject: [PATCH 04/22] [DLMED] update MonaiAlgoStats Signed-off-by: Nic Ma --- monai/fl/client/monai_algo.py | 41 +++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index 65d9a394d5..cb24ce2e42 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -20,7 +20,7 @@ from monai.apps.auto3dseg.data_analyzer import DataAnalyzer from monai.apps.utils import get_logger from monai.auto3dseg import SegSummarizer -from monai.bundle import BundleWorkflow, ConfigItem, ConfigParser +from monai.bundle import BundleWorkflow, ConfigWorkflow, ConfigItem, ConfigParser from monai.engines import SupervisedEvaluator, SupervisedTrainer, Trainer from monai.fl.client import ClientAlgo, ClientAlgoStats from monai.fl.utils.constants import ExtraItems, FiltersType, FlPhase, FlStatistics, ModelType, WeightType @@ -73,7 +73,11 @@ class MonaiAlgoStats(ClientAlgoStats): Implementation of ``ClientAlgoStats`` to allow federated learning with MONAI bundle configurations. Args: + bundle_root: directory path of the bundle. workflow: the bundle workflow to execute, usually it's training, evaluation or inference. + if None, will create an `ConfigWorkflow` based on `config_train_filename`. + config_train_filename: bundle training config path relative to bundle_root. Can be a list of files; + defaults to "configs/train.json". only necessary when `workflow` is None. config_filters_filename: filter configuration file. Can be a list of files; defaults to `None`. data_stats_transform_list: transforms to apply for the data stats result. histogram_only: whether to only compute histograms. Defaults to False. @@ -81,17 +85,23 @@ class MonaiAlgoStats(ClientAlgoStats): def __init__( self, - workflow: BundleWorkflow, + bundle_root: str, + workflow: BundleWorkflow | None = None, + config_train_filename: str | list | None = "configs/train.json", config_filters_filename: str | list | None = None, data_stats_transform_list: list | None = None, histogram_only: bool = False, ): self.logger = logger - if not isinstance(workflow, BundleWorkflow): - raise ValueError("workflow must be a subclass of BundleWorkflow.") - if workflow.get_workflow_type() is None: - raise ValueError("workflow doesn't specify the type.") - self.workflow = workflow + self.bundle_root = bundle_root + self.workflow = None + if workflow is not None: + if not isinstance(workflow, BundleWorkflow): + raise ValueError("workflow must be a subclass of BundleWorkflow.") + if workflow.get_workflow_type() is None: + raise ValueError("workflow doesn't specify the type.") + self.workflow = workflow + self.config_train_filename = config_train_filename self.config_filters_filename = config_filters_filename self.train_data_key = "train" self.eval_data_key = "eval" @@ -118,10 +128,17 @@ def initialize(self, extra=None): self.client_name = extra.get(ExtraItems.CLIENT_NAME, "noname") self.logger.info(f"Initializing {self.client_name} ...") - self.workflow.initialize() # FL platform needs to provide filepath to configuration files self.app_root = extra.get(ExtraItems.APP_ROOT, "") - self.workflow.bundle_root = os.path.join(self.app_root, self.workflow.bundle_root) + self.bundle_root = os.path.join(self.app_root, self.bundle_root) + + if self.workflow is None: + config_train_files = self._add_config_files(self.config_train_filename) + self.workflow = ConfigWorkflow( + config_file=config_train_files, meta_file=None, logging_file=None, workflow="train" + ) + self.workflow.initialize() + self.workflow.bundle_root = self.bundle_root # initialize the workflow as the content changed self.workflow.initialize() @@ -260,11 +277,11 @@ def _add_config_files(self, config_files): files = [] if config_files: if isinstance(config_files, str): - files.append(os.path.join(self.workflow.bundle_root, config_files)) + files.append(os.path.join(self.bundle_root, config_files)) elif isinstance(config_files, list): for file in config_files: if isinstance(file, str): - files.append(os.path.join(self.workflow.bundle_root, file)) + files.append(os.path.join(self.bundle_root, file)) else: raise ValueError(f"Expected config file to be of type str but got {type(file)}: {file}") else: @@ -281,6 +298,7 @@ class MonaiAlgo(ClientAlgo, MonaiAlgoStats): FIXME: reimplement this class based on the bundle "ConfigWorkflow". Args: + bundle_root: directory path of the bundle. train_workflow: the bundle workflow to execute training. eval_workflow: the bundle workflow to execute evaluation. local_epochs: number of local epochs to execute during each round of local training; defaults to 1. @@ -297,6 +315,7 @@ class MonaiAlgo(ClientAlgo, MonaiAlgoStats): def __init__( self, + bundle_root: str, train_workflow: BundleWorkflow | None = None, eval_workflow: BundleWorkflow | None = None, local_epochs: int = 1, From cab8367eeb9db21f7232c8c768dbf7bcfc439b46 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 31 Mar 2023 22:44:32 +0800 Subject: [PATCH 05/22] [DLMED] change bundle_root and configs args back Signed-off-by: Nic Ma --- monai/fl/client/monai_algo.py | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index cb24ce2e42..2ee2969609 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -303,6 +303,10 @@ class MonaiAlgo(ClientAlgo, MonaiAlgoStats): eval_workflow: the bundle workflow to execute evaluation. local_epochs: number of local epochs to execute during each round of local training; defaults to 1. send_weight_diff: whether to send weight differences rather than full weights; defaults to `True`. + config_train_filename: bundle training config path relative to bundle_root. can be a list of files. + defaults to "configs/train.json". only useful when `train_workflow` is None. + config_evaluate_filename: bundle evaluation config path relative to bundle_root. can be a list of files. + defaults to ["configs/train.json", "configs/evaluate.json"]. only useful when `eval_workflow` is None. config_filters_filename: filter configuration file. Can be a list of files; defaults to `None`. best_model_filepath: location of best model checkpoint; defaults "models/model.pt" relative to `bundle_root`. final_model_filepath: location of final model checkpoint; defaults "models/model_final.pt" relative to `bundle_root`. @@ -320,6 +324,8 @@ def __init__( eval_workflow: BundleWorkflow | None = None, local_epochs: int = 1, send_weight_diff: bool = True, + config_train_filename: str | list | None = "configs/train.json", + config_evaluate_filename: str | list | None = ["configs/train.json", "configs/evaluate.json"], config_filters_filename: str | list | None = None, best_model_filepath: str | None = "models/model.pt", final_model_filepath: str | None = "models/model_final.pt", @@ -327,24 +333,28 @@ def __init__( data_stats_transform_list: list | None = None, ): self.logger = logger + self.bundle_root = bundle_root + self.train_workflow = None + self.eval_workflow = None if train_workflow is not None: if not isinstance(train_workflow, BundleWorkflow) or train_workflow.get_workflow_type() != "train": raise ValueError( f"train workflow must be BundleWorkflow and set type in {BundleWorkflow.supported_train_type}." ) - # only necessary to initialize AlgoStats when train workflow exists - MonaiAlgoStats.__init__(self, workflow=train_workflow, data_stats_transform_list=data_stats_transform_list) + self.train_workflow = train_workflow if eval_workflow is not None: # evaluation workflow can be "train" type or "infer" type if not isinstance(eval_workflow, BundleWorkflow) or eval_workflow.get_workflow_type() is None: raise ValueError("train workflow must be BundleWorkflow and set type.") - self.train_workflow = train_workflow - self.eval_workflow = eval_workflow + self.eval_workflow = eval_workflow self.local_epochs = local_epochs self.send_weight_diff = send_weight_diff + self.config_train_filename = config_train_filename + self.config_evaluate_filename = config_evaluate_filename self.config_filters_filename = config_filters_filename self.model_filepaths = {ModelType.BEST_MODEL: best_model_filepath, ModelType.FINAL_MODEL: final_model_filepath} self.save_dict_key = save_dict_key + self.data_stats_transform_list = data_stats_transform_list self.app_root = "" self.filter_parser: ConfigParser | None = None @@ -375,10 +385,16 @@ def initialize(self, extra=None): self.logger.info(f"Initializing {self.client_name} ...") # FL platform needs to provide filepath to configuration files self.app_root = extra.get(ExtraItems.APP_ROOT, "") + self.bundle_root = os.path.join(self.app_root, self.bundle_root) + if self.train_workflow is None and self.config_train_filename is not None: + config_train_files = self._add_config_files(self.config_train_filename) + self.train_workflow = ConfigWorkflow( + config_file=config_train_files, meta_file=None, logging_file=None, workflow="train" + ) if self.train_workflow is not None: self.train_workflow.initialize() - self.train_workflow.bundle_root = os.path.join(self.app_root, self.train_workflow.bundle_root) + self.train_workflow.bundle_root = self.bundle_root self.train_workflow.max_epochs = self.local_epochs # initialize the workflow as the content changed self.train_workflow.initialize() @@ -405,9 +421,14 @@ def initialize(self, extra=None): FiltersType.POST_STATISTICS_FILTERS, default=ConfigItem(None, FiltersType.POST_STATISTICS_FILTERS) ) + if self.eval_workflow is None and self.config_evaluate_filename is not None: + config_eval_files = self._add_config_files(self.config_evaluate_filename) + self.eval_workflow = ConfigWorkflow( + config_file=config_eval_files, meta_file=None, logging_file=None, workflow="train" + ) if self.eval_workflow is not None: self.eval_workflow.initialize() - self.eval_workflow.bundle_root = os.path.join(self.app_root, self.eval_workflow.bundle_root) + self.eval_workflow.bundle_root = self.bundle_root self.eval_workflow.initialize() self.evaluator = self.eval_workflow.evaluator if not isinstance(self.evaluator, SupervisedEvaluator): @@ -482,7 +503,7 @@ def get_weights(self, extra=None): f"Expected requested model type to be of type `ModelType` but received {type(model_type)}" ) if model_type in self.model_filepaths: - model_path = os.path.join(self.train_workflow.bundle_root, cast(str, self.model_filepaths[model_type])) + model_path = os.path.join(self.bundle_root, cast(str, self.model_filepaths[model_type])) if not os.path.isfile(model_path): raise ValueError(f"No best model checkpoint exists at {model_path}") weights = torch.load(model_path, map_location="cpu") From cd28fe7ef6fecdb79b8390afc142931b6ac76b25 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 31 Mar 2023 23:23:52 +0800 Subject: [PATCH 06/22] [DLMED] update test cases Signed-off-by: Nic Ma --- monai/fl/client/monai_algo.py | 30 ++++++++++++++++-- tests/test_fl_monai_algo.py | 51 ++++++++++++++++++++----------- tests/test_fl_monai_algo_dist.py | 4 +++ tests/test_fl_monai_algo_stats.py | 5 ++- 4 files changed, 69 insertions(+), 21 deletions(-) diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index 2ee2969609..3e747a5a50 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -20,7 +20,7 @@ from monai.apps.auto3dseg.data_analyzer import DataAnalyzer from monai.apps.utils import get_logger from monai.auto3dseg import SegSummarizer -from monai.bundle import BundleWorkflow, ConfigWorkflow, ConfigItem, ConfigParser +from monai.bundle import DEFAULT_EXP_MGMT_SETTINGS, BundleWorkflow, ConfigItem, ConfigParser, ConfigWorkflow from monai.engines import SupervisedEvaluator, SupervisedTrainer, Trainer from monai.fl.client import ClientAlgo, ClientAlgoStats from monai.fl.utils.constants import ExtraItems, FiltersType, FlPhase, FlStatistics, ModelType, WeightType @@ -306,7 +306,8 @@ class MonaiAlgo(ClientAlgo, MonaiAlgoStats): config_train_filename: bundle training config path relative to bundle_root. can be a list of files. defaults to "configs/train.json". only useful when `train_workflow` is None. config_evaluate_filename: bundle evaluation config path relative to bundle_root. can be a list of files. - defaults to ["configs/train.json", "configs/evaluate.json"]. only useful when `eval_workflow` is None. + if "default", ["configs/train.json", "configs/evaluate.json"] will be used. + this arg is only useful when `eval_workflow` is None. config_filters_filename: filter configuration file. Can be a list of files; defaults to `None`. best_model_filepath: location of best model checkpoint; defaults "models/model.pt" relative to `bundle_root`. final_model_filepath: location of final model checkpoint; defaults "models/model_final.pt" relative to `bundle_root`. @@ -314,6 +315,13 @@ class MonaiAlgo(ClientAlgo, MonaiAlgoStats): the one defined by `save_dict_key` will be returned by `get_weights`; defaults to "model". If all state dicts should be returned, set `save_dict_key` to None. data_stats_transform_list: transforms to apply for the data stats result. + tracking: if not None, enable the experiment tracking at runtime with optionally configurable and extensible. + if "mlflow", will add `MLFlowHandler` to the parsed bundle with default tracking settings, + if other string, treat it as file path to load the tracking settings. + if `dict`, treat it as tracking settings. + will patch the target config content with `tracking handlers` and the top-level items of `configs`. + for detailed usage examples, plesae check the tutorial: + https://github.com/Project-MONAI/tutorials/blob/main/experiment_management/bundle_integrate_mlflow.ipynb. """ @@ -325,12 +333,13 @@ def __init__( local_epochs: int = 1, send_weight_diff: bool = True, config_train_filename: str | list | None = "configs/train.json", - config_evaluate_filename: str | list | None = ["configs/train.json", "configs/evaluate.json"], + config_evaluate_filename: str | list | None = "default", config_filters_filename: str | list | None = None, best_model_filepath: str | None = "models/model.pt", final_model_filepath: str | None = "models/model_final.pt", save_dict_key: str | None = "model", data_stats_transform_list: list | None = None, + tracking: str | dict | None = None, ): self.logger = logger self.bundle_root = bundle_root @@ -350,11 +359,15 @@ def __init__( self.local_epochs = local_epochs self.send_weight_diff = send_weight_diff self.config_train_filename = config_train_filename + if config_evaluate_filename == "default": + # by default, evaluator needs both training and evaluate to be instantiated + config_evaluate_filename = ["configs/train.json", "configs/evaluate.json"] self.config_evaluate_filename = config_evaluate_filename self.config_filters_filename = config_filters_filename self.model_filepaths = {ModelType.BEST_MODEL: best_model_filepath, ModelType.FINAL_MODEL: final_model_filepath} self.save_dict_key = save_dict_key self.data_stats_transform_list = data_stats_transform_list + self.tracking = tracking self.app_root = "" self.filter_parser: ConfigParser | None = None @@ -387,6 +400,13 @@ def initialize(self, extra=None): self.app_root = extra.get(ExtraItems.APP_ROOT, "") self.bundle_root = os.path.join(self.app_root, self.bundle_root) + # set tracking configs for experiment management + if self.tracking is not None: + if isinstance(self.tracking, str) and self.tracking in DEFAULT_EXP_MGMT_SETTINGS: + settings_ = DEFAULT_EXP_MGMT_SETTINGS[self.tracking] + else: + settings_ = ConfigParser.load_config_files(self.tracking) + if self.train_workflow is None and self.config_train_filename is not None: config_train_files = self._add_config_files(self.config_train_filename) self.train_workflow = ConfigWorkflow( @@ -396,6 +416,8 @@ def initialize(self, extra=None): self.train_workflow.initialize() self.train_workflow.bundle_root = self.bundle_root self.train_workflow.max_epochs = self.local_epochs + if self.tracking is not None: + ConfigWorkflow.patch_bundle_tracking(parser=self.train_workflow.parser, settings=settings_) # initialize the workflow as the content changed self.train_workflow.initialize() self.trainer = self.train_workflow.trainer @@ -429,6 +451,8 @@ def initialize(self, extra=None): if self.eval_workflow is not None: self.eval_workflow.initialize() self.eval_workflow.bundle_root = self.bundle_root + if self.tracking is not None: + ConfigWorkflow.patch_bundle_tracking(parser=self.eval_workflow.parser, settings=settings_) self.eval_workflow.initialize() self.evaluator = self.eval_workflow.evaluator if not isinstance(self.evaluator, SupervisedEvaluator): diff --git a/tests/test_fl_monai_algo.py b/tests/test_fl_monai_algo.py index 13be295e57..092d001d39 100644 --- a/tests/test_fl_monai_algo.py +++ b/tests/test_fl_monai_algo.py @@ -31,18 +31,23 @@ TEST_TRAIN_1 = [ { + "bundle_root": _data_dir, "train_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_train.json"), workflow="train"), + "config_evaluate_filename": None, "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] TEST_TRAIN_2 = [ { - "train_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_train.json"), workflow="train"), + "bundle_root": _data_dir, + "config_train_filename": os.path.join(_data_dir, "config_fl_train.json"), + "config_evaluate_filename": None, "config_filters_filename": None, } ] TEST_TRAIN_3 = [ { + "bundle_root": _data_dir, "train_workflow": ConfigWorkflow( config_file=[ os.path.join(_data_dir, "config_fl_train.json"), @@ -50,6 +55,7 @@ ], workflow="train", ), + "config_evaluate_filename": None, "config_filters_filename": [ os.path.join(_data_dir, "config_fl_filters.json"), os.path.join(_data_dir, "config_fl_filters.json"), @@ -59,40 +65,45 @@ TEST_TRAIN_4 = [ { - "train_workflow": ConfigWorkflow( - config_file=os.path.join(_data_dir, "config_fl_train.json"), - workflow="train", - tracking={ - "handlers_id": DEFAULT_HANDLERS_ID, - "configs": { - "execute_config": f"{_data_dir}/config_executed.json", - "trainer": { - "_target_": "MLFlowHandler", - "tracking_uri": path_to_uri(_data_dir) + "/mlflow_override", - "output_transform": "$monai.handlers.from_engine(['loss'], first=True)", - "close_on_complete": True, - }, + "bundle_root": _data_dir, + "train_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_train.json"), workflow="train"), + "config_evaluate_filename": None, + "tracking": { + "handlers_id": DEFAULT_HANDLERS_ID, + "configs": { + "execute_config": f"{_data_dir}/config_executed.json", + "trainer": { + "_target_": "MLFlowHandler", + "tracking_uri": path_to_uri(_data_dir) + "/mlflow_override", + "output_transform": "$monai.handlers.from_engine(['loss'], first=True)", + "close_on_complete": True, }, }, - ), + }, "config_filters_filename": None, } ] TEST_EVALUATE_1 = [ { + "bundle_root": _data_dir, + "config_train_filename": None, "eval_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_evaluate.json"), workflow="train"), "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] TEST_EVALUATE_2 = [ { - "eval_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_evaluate.json"), workflow="train"), + "bundle_root": _data_dir, + "config_train_filename": None, + "config_evaluate_filename": os.path.join(_data_dir, "config_fl_evaluate.json"), "config_filters_filename": None, } ] TEST_EVALUATE_3 = [ { + "bundle_root": _data_dir, + "config_train_filename": None, "eval_workflow": ConfigWorkflow( config_file=[ os.path.join(_data_dir, "config_fl_evaluate.json"), @@ -109,20 +120,25 @@ TEST_GET_WEIGHTS_1 = [ { + "bundle_root": _data_dir, "train_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_train.json"), workflow="train"), + "config_evaluate_filename": None, "send_weight_diff": False, "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] TEST_GET_WEIGHTS_2 = [ { - "train_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_train.json"), workflow="train"), + "bundle_root": _data_dir, + "config_train_filename": os.path.join(_data_dir, "config_fl_train.json"), + "config_evaluate_filename": None, "send_weight_diff": True, "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] TEST_GET_WEIGHTS_3 = [ { + "bundle_root": _data_dir, "train_workflow": ConfigWorkflow( config_file=[ os.path.join(_data_dir, "config_fl_train.json"), @@ -130,6 +146,7 @@ ], workflow="train", ), + "config_evaluate_filename": None, "send_weight_diff": True, "config_filters_filename": [ os.path.join(_data_dir, "config_fl_filters.json"), diff --git a/tests/test_fl_monai_algo_dist.py b/tests/test_fl_monai_algo_dist.py index 75b64daf48..325791349f 100644 --- a/tests/test_fl_monai_algo_dist.py +++ b/tests/test_fl_monai_algo_dist.py @@ -36,7 +36,9 @@ def test_train(self): config_file = [pathjoin(_data_dir, "config_fl_train.json"), pathjoin(_data_dir, "multi_gpu_train.json")] # initialize algo algo = MonaiAlgo( + bundle_root=_data_dir, train_workflow=ConfigWorkflow(config_file=config_file, workflow="train"), + config_evaluate_filename=None, config_filters_filename=pathjoin(_root_dir, "testing_data", "config_fl_filters.json"), ) algo.initialize(extra={ExtraItems.CLIENT_NAME: "test_fl"}) @@ -57,6 +59,8 @@ def test_evaluate(self): config_file = [pathjoin(_data_dir, "config_fl_evaluate.json"), pathjoin(_data_dir, "multi_gpu_evaluate.json")] # initialize algo algo = MonaiAlgo( + bundle_root=_data_dir, + config_train_filename=None, eval_workflow=ConfigWorkflow(config_file=config_file, workflow="train"), config_filters_filename=pathjoin(_data_dir, "config_fl_filters.json"), ) diff --git a/tests/test_fl_monai_algo_stats.py b/tests/test_fl_monai_algo_stats.py index 2bd05f1ea6..0e00866ba4 100644 --- a/tests/test_fl_monai_algo_stats.py +++ b/tests/test_fl_monai_algo_stats.py @@ -27,18 +27,21 @@ TEST_GET_DATA_STATS_1 = [ { + "bundle_root": _data_dir, "workflow": ConfigWorkflow(workflow="train", config_file=os.path.join(_data_dir, "config_fl_stats_1.json")), "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] TEST_GET_DATA_STATS_2 = [ { - "workflow": ConfigWorkflow(workflow="train", config_file=os.path.join(_data_dir, "config_fl_stats_2.json")), + "bundle_root": _data_dir, + "config_train_filename": os.path.join(_data_dir, "config_fl_stats_2.json"), "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] TEST_GET_DATA_STATS_3 = [ { + "bundle_root": _data_dir, "workflow": ConfigWorkflow( workflow="train", config_file=[ From 4641daacba480ee4640c04b532942e43cc9e00ae Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 31 Mar 2023 23:35:16 +0800 Subject: [PATCH 07/22] [DLMED] fix mypy Signed-off-by: Nic Ma --- monai/fl/client/monai_algo.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index 3e747a5a50..2927eb16e4 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -167,9 +167,9 @@ def get_data_stats(self, extra: dict | None = None) -> ExchangeObject: if extra is None: raise ValueError("`extra` has to be set") - if self.workflow.dataset_dir: + if self.workflow.dataset_dir: # type: ignore self.phase = FlPhase.GET_DATA_STATS - self.logger.info(f"Computing statistics on {self.workflow.dataset_dir}") + self.logger.info(f"Computing statistics on {self.workflow.dataset_dir}") # type: ignore if FlStatistics.HIST_BINS not in extra: raise ValueError("FlStatistics.NUM_OF_BINS not specified in `extra`") @@ -184,7 +184,7 @@ def get_data_stats(self, extra: dict | None = None) -> ExchangeObject: # train data stats train_summary_stats, train_case_stats = self._get_data_key_stats( - data=self.workflow.train_dataset_data, + data=self.workflow.train_dataset_data, # type: ignore data_key=self.train_data_key, hist_bins=hist_bins, hist_range=hist_range, @@ -197,9 +197,9 @@ def get_data_stats(self, extra: dict | None = None) -> ExchangeObject: # eval data stats eval_summary_stats = None eval_case_stats = None - if self.workflow.val_dataset_data is not None: + if self.workflow.val_dataset_data is not None: # type: ignore eval_summary_stats, eval_case_stats = self._get_data_key_stats( - data=self.workflow.val_dataset_data, + data=self.workflow.val_dataset_data, # type: ignore data_key=self.eval_data_key, hist_bins=hist_bins, hist_range=hist_range, @@ -232,7 +232,7 @@ def get_data_stats(self, extra: dict | None = None) -> ExchangeObject: def _get_data_key_stats(self, data, data_key, hist_bins, hist_range, output_path=None): analyzer = DataAnalyzer( datalist={data_key: data}, - dataroot=self.workflow.dataset_dir, + dataroot=self.workflow.dataset_dir, # type: ignore hist_bins=hist_bins, hist_range=hist_range, output_path=output_path, From 4888a1c46f2ddc30c0ee213446034e82a4827d48 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Tue, 4 Apr 2023 22:16:38 +0800 Subject: [PATCH 08/22] [DLMED] remove FIXME Signed-off-by: Nic Ma --- monai/fl/client/monai_algo.py | 1 - 1 file changed, 1 deletion(-) diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index 2927eb16e4..cb76dfa410 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -295,7 +295,6 @@ def _add_config_files(self, config_files): class MonaiAlgo(ClientAlgo, MonaiAlgoStats): """ Implementation of ``ClientAlgo`` to allow federated learning with MONAI bundle configurations. - FIXME: reimplement this class based on the bundle "ConfigWorkflow". Args: bundle_root: directory path of the bundle. From f96bcd4182339468e22e3b97634b0cdc0afaada5 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Wed, 5 Apr 2023 00:14:42 +0800 Subject: [PATCH 09/22] [DLMED] fix failed CI tests Signed-off-by: Nic Ma --- tests/nonconfig_workflow.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/nonconfig_workflow.py b/tests/nonconfig_workflow.py index 63562af868..150531abc7 100644 --- a/tests/nonconfig_workflow.py +++ b/tests/nonconfig_workflow.py @@ -99,6 +99,8 @@ def _get_property(self, name, property): return self._bundle_root if name == "device": return self._device + if name == "evaluator": + return self._evaluator if name == "network_def": return self._network_def if name == "inferer": @@ -115,6 +117,8 @@ def _set_property(self, name, property, value): self._bundle_root = value elif name == "device": self._device = value + elif name == "evaluator": + self._evaluator = value elif name == "network_def": self._network_def = value elif name == "inferer": From 0030419ffa67607d1ea0279fb3c831b419ac6a27 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Wed, 5 Apr 2023 00:27:13 +0800 Subject: [PATCH 10/22] [DLMED] revert the change in "get_weights" Signed-off-by: Nic Ma --- monai/fl/client/monai_algo.py | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index cb76dfa410..f7793f7e1b 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -513,8 +513,6 @@ def get_weights(self, extra=None): if extra is None: extra = {} - if self.train_workflow is None or self.trainer is None: - raise ValueError("self.trainer should not be None.") # by default return current weights, return best if requested via model type. self.phase = FlPhase.GET_WEIGHTS @@ -541,21 +539,26 @@ def get_weights(self, extra=None): f"Requested model type {model_type} not specified in `model_filepaths`: {self.model_filepaths}" ) else: - weights = get_state_dict(self.trainer.network) - # returned weights will be on the cpu - for k in weights.keys(): - weights[k] = weights[k].cpu() - weigh_type = WeightType.WEIGHTS - stats = self.trainer.get_stats() - # calculate current iteration and epoch data after training. - stats[FlStatistics.NUM_EXECUTED_ITERATIONS] = self.trainer.state.iteration - self.iter_of_start_time - # compute weight differences - if self.send_weight_diff: - weights = compute_weight_diff(global_weights=self.global_weights, local_var_dict=weights) - weigh_type = WeightType.WEIGHT_DIFF - self.logger.info("Returning current weight differences.") + if self.trainer: + weights = get_state_dict(self.trainer.network) + # returned weights will be on the cpu + for k in weights.keys(): + weights[k] = weights[k].cpu() + weigh_type = WeightType.WEIGHTS + stats = self.trainer.get_stats() + # calculate current iteration and epoch data after training. + stats[FlStatistics.NUM_EXECUTED_ITERATIONS] = self.trainer.state.iteration - self.iter_of_start_time + # compute weight differences + if self.send_weight_diff: + weights = compute_weight_diff(global_weights=self.global_weights, local_var_dict=weights) + weigh_type = WeightType.WEIGHT_DIFF + self.logger.info("Returning current weight differences.") + else: + self.logger.info("Returning current weights.") else: - self.logger.info("Returning current weights.") + weights = None + weigh_type = None + stats = dict() if not isinstance(stats, dict): raise ValueError(f"stats is not a dict, {stats}") From ee284ff7a56433605193429bada1930a2c3ee1e8 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Thu, 6 Apr 2023 21:40:43 +0800 Subject: [PATCH 11/22] [DLMED] fix 6303 Signed-off-by: Nic Ma --- monai/fl/client/monai_algo.py | 2 +- tests/test_fl_monai_algo_dist.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index f7793f7e1b..11e03215c5 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -657,7 +657,7 @@ def finalize(self, extra: dict | None = None) -> None: def _check_converted(self, global_weights, local_var_dict, n_converted): if n_converted == 0: - self.logger.warning( + raise RuntimeError( f"No global weights converted! Received weight dict keys are {list(global_weights.keys())}" ) else: diff --git a/tests/test_fl_monai_algo_dist.py b/tests/test_fl_monai_algo_dist.py index 325791349f..e45dfb67aa 100644 --- a/tests/test_fl_monai_algo_dist.py +++ b/tests/test_fl_monai_algo_dist.py @@ -21,6 +21,7 @@ from monai.fl.client.monai_algo import MonaiAlgo from monai.fl.utils.constants import ExtraItems from monai.fl.utils.exchange_object import ExchangeObject +from monai.networks import get_state_dict from tests.utils import DistCall, DistTestCase, SkipIfBeforePyTorchVersion, SkipIfNoModule, skip_if_no_cuda _root_dir = os.path.abspath(pathjoin(os.path.dirname(__file__))) @@ -49,7 +50,7 @@ def test_train(self): parser.read_config(config_file) parser.parse() network = parser.get_parsed_content("network") - data = ExchangeObject(weights=network.state_dict()) + data = ExchangeObject(weights=get_state_dict(network)) # test train algo.train(data=data, extra={}) @@ -72,7 +73,7 @@ def test_evaluate(self): parser.read_config(pathjoin(_data_dir, "config_fl_evaluate.json")) parser.parse() network = parser.get_parsed_content("network") - data = ExchangeObject(weights=network.state_dict()) + data = ExchangeObject(weights=get_state_dict(network)) # test evaluate algo.evaluate(data=data, extra={}) From 7de4ed73b9f041124b6a6d6316124bc4ba290a73 Mon Sep 17 00:00:00 2001 From: Holger Roth Date: Thu, 6 Apr 2023 19:55:07 -0400 Subject: [PATCH 12/22] add weight diff check --- monai/fl/client/monai_algo.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index 11e03215c5..86b1bb62c2 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -58,13 +58,17 @@ def compute_weight_diff(global_weights, local_var_dict): raise ValueError("Cannot compute weight differences if `local_var_dict` is None!") # compute delta model, global model has the primary key set weight_diff = {} + n_diff = 0 for name in global_weights: if name not in local_var_dict: continue # returned weight diff will be on the cpu weight_diff[name] = local_var_dict[name].cpu() - global_weights[name].cpu() + n_diff += 1 if torch.any(torch.isnan(weight_diff[name])): raise ValueError(f"Weights for {name} became NaN...") + if n_diff == 0: + raise RuntimeError("No weight differences computed!") return weight_diff From fab33139ce1f9123a0f636eaaeb31cfa523ae257 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 7 Apr 2023 15:21:28 +0800 Subject: [PATCH 13/22] [DLMED] add checkpint disable Signed-off-by: Nic Ma --- monai/fl/client/monai_algo.py | 48 +++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index 86b1bb62c2..b982cf6895 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -72,6 +72,14 @@ def compute_weight_diff(global_weights, local_var_dict): return weight_diff +def disable_ckpt_loaders(workflow: BundleWorkflow): + if BundleKeys.VALIDATE_HANDLERS in parser: + for h in parser[BundleKeys.VALIDATE_HANDLERS]: + if ConfigComponent.is_instantiable(h): + if "CheckpointLoader" in h["_target_"]: + h["_disabled_"] = True + + class MonaiAlgoStats(ClientAlgoStats): """ Implementation of ``ClientAlgoStats`` to allow federated learning with MONAI bundle configurations. @@ -312,6 +320,7 @@ class MonaiAlgo(ClientAlgo, MonaiAlgoStats): if "default", ["configs/train.json", "configs/evaluate.json"] will be used. this arg is only useful when `eval_workflow` is None. config_filters_filename: filter configuration file. Can be a list of files; defaults to `None`. + disable_ckpt_loading: do not use any CheckpointLoader if defined in train/evaluate configs; defaults to `True`. best_model_filepath: location of best model checkpoint; defaults "models/model.pt" relative to `bundle_root`. final_model_filepath: location of final model checkpoint; defaults "models/model_final.pt" relative to `bundle_root`. save_dict_key: If a model checkpoint contains several state dicts, @@ -338,6 +347,7 @@ def __init__( config_train_filename: str | list | None = "configs/train.json", config_evaluate_filename: str | list | None = "default", config_filters_filename: str | list | None = None, + disable_ckpt_loading: bool = True, best_model_filepath: str | None = "models/model.pt", final_model_filepath: str | None = "models/model_final.pt", save_dict_key: str | None = "model", @@ -367,6 +377,7 @@ def __init__( config_evaluate_filename = ["configs/train.json", "configs/evaluate.json"] self.config_evaluate_filename = config_evaluate_filename self.config_filters_filename = config_filters_filename + self.disable_ckpt_loading = disable_ckpt_loading self.model_filepaths = {ModelType.BEST_MODEL: best_model_filepath, ModelType.FINAL_MODEL: final_model_filepath} self.save_dict_key = save_dict_key self.data_stats_transform_list = data_stats_transform_list @@ -427,25 +438,6 @@ def initialize(self, extra=None): if not isinstance(self.trainer, SupervisedTrainer): raise ValueError(f"trainer must be SupervisedTrainer, but got: {type(self.trainer)}.") - config_filter_files = self._add_config_files(self.config_filters_filename) - self.filter_parser = ConfigParser() - if len(config_filter_files) > 0: - self.filter_parser.read_config(config_filter_files) - - # Get filters - self.pre_filters = self.filter_parser.get_parsed_content( - FiltersType.PRE_FILTERS, default=ConfigItem(None, FiltersType.PRE_FILTERS) - ) - self.post_weight_filters = self.filter_parser.get_parsed_content( - FiltersType.POST_WEIGHT_FILTERS, default=ConfigItem(None, FiltersType.POST_WEIGHT_FILTERS) - ) - self.post_evaluate_filters = self.filter_parser.get_parsed_content( - FiltersType.POST_EVALUATE_FILTERS, default=ConfigItem(None, FiltersType.POST_EVALUATE_FILTERS) - ) - self.post_statistics_filters = self.filter_parser.get_parsed_content( - FiltersType.POST_STATISTICS_FILTERS, default=ConfigItem(None, FiltersType.POST_STATISTICS_FILTERS) - ) - if self.eval_workflow is None and self.config_evaluate_filename is not None: config_eval_files = self._add_config_files(self.config_evaluate_filename) self.eval_workflow = ConfigWorkflow( @@ -461,6 +453,24 @@ def initialize(self, extra=None): if not isinstance(self.evaluator, SupervisedEvaluator): raise ValueError(f"evaluator must be SupervisedEvaluator, but got: {type(self.evaluator)}.") + config_filter_files = self._add_config_files(self.config_filters_filename) + self.filter_parser = ConfigParser() + if len(config_filter_files) > 0: + self.filter_parser.read_config(config_filter_files) + + # Get filters + self.pre_filters = self.filter_parser.get_parsed_content( + FiltersType.PRE_FILTERS, default=ConfigItem(None, FiltersType.PRE_FILTERS) + ) + self.post_weight_filters = self.filter_parser.get_parsed_content( + FiltersType.POST_WEIGHT_FILTERS, default=ConfigItem(None, FiltersType.POST_WEIGHT_FILTERS) + ) + self.post_evaluate_filters = self.filter_parser.get_parsed_content( + FiltersType.POST_EVALUATE_FILTERS, default=ConfigItem(None, FiltersType.POST_EVALUATE_FILTERS) + ) + self.post_statistics_filters = self.filter_parser.get_parsed_content( + FiltersType.POST_STATISTICS_FILTERS, default=ConfigItem(None, FiltersType.POST_STATISTICS_FILTERS) + ) self.logger.info(f"Initialized {self.client_name}.") def train(self, data: ExchangeObject, extra: dict | None = None) -> None: From f131f041c5335b262138ea7c5aee0bda0522fc17 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 7 Apr 2023 15:54:44 +0800 Subject: [PATCH 14/22] [DLMED] add disable checkpoint loader Signed-off-by: Nic Ma --- monai/fl/client/monai_algo.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index b982cf6895..9bbca51711 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -20,7 +20,9 @@ from monai.apps.auto3dseg.data_analyzer import DataAnalyzer from monai.apps.utils import get_logger from monai.auto3dseg import SegSummarizer -from monai.bundle import DEFAULT_EXP_MGMT_SETTINGS, BundleWorkflow, ConfigItem, ConfigParser, ConfigWorkflow +from monai.bundle import ( + DEFAULT_EXP_MGMT_SETTINGS, BundleWorkflow, ConfigComponent, ConfigItem, ConfigParser, ConfigWorkflow +) from monai.engines import SupervisedEvaluator, SupervisedTrainer, Trainer from monai.fl.client import ClientAlgo, ClientAlgoStats from monai.fl.utils.constants import ExtraItems, FiltersType, FlPhase, FlStatistics, ModelType, WeightType @@ -72,9 +74,9 @@ def compute_weight_diff(global_weights, local_var_dict): return weight_diff -def disable_ckpt_loaders(workflow: BundleWorkflow): - if BundleKeys.VALIDATE_HANDLERS in parser: - for h in parser[BundleKeys.VALIDATE_HANDLERS]: +def disable_ckpt_loaders(parser: ConfigParser): + if "validate#handlers" in parser: + for h in parser["validate#handlers"]: if ConfigComponent.is_instantiable(h): if "CheckpointLoader" in h["_target_"]: h["_disabled_"] = True @@ -328,6 +330,7 @@ class MonaiAlgo(ClientAlgo, MonaiAlgoStats): If all state dicts should be returned, set `save_dict_key` to None. data_stats_transform_list: transforms to apply for the data stats result. tracking: if not None, enable the experiment tracking at runtime with optionally configurable and extensible. + it expects the `train_workflow` or `eval_workflow` to be `ConfigWorkflow`, not customized `BundleWorkflow`. if "mlflow", will add `MLFlowHandler` to the parsed bundle with default tracking settings, if other string, treat it as file path to load the tracking settings. if `dict`, treat it as tracking settings. @@ -430,8 +433,10 @@ def initialize(self, extra=None): self.train_workflow.initialize() self.train_workflow.bundle_root = self.bundle_root self.train_workflow.max_epochs = self.local_epochs - if self.tracking is not None: + if self.tracking is not None and isinstance(self.train_workflow, ConfigWorkflow): ConfigWorkflow.patch_bundle_tracking(parser=self.train_workflow.parser, settings=settings_) + if self.disable_ckpt_loading and isinstance(self.train_workflow, ConfigWorkflow): + disable_ckpt_loaders(parser=self.train_workflow.parser) # initialize the workflow as the content changed self.train_workflow.initialize() self.trainer = self.train_workflow.trainer @@ -446,8 +451,11 @@ def initialize(self, extra=None): if self.eval_workflow is not None: self.eval_workflow.initialize() self.eval_workflow.bundle_root = self.bundle_root - if self.tracking is not None: + if self.tracking is not None and isinstance(self.eval_workflow, ConfigWorkflow): ConfigWorkflow.patch_bundle_tracking(parser=self.eval_workflow.parser, settings=settings_) + if self.disable_ckpt_loading and isinstance(self.eval_workflow, ConfigWorkflow): + disable_ckpt_loaders(parser=self.eval_workflow.parser) + # initialize the workflow as the content changed self.eval_workflow.initialize() self.evaluator = self.eval_workflow.evaluator if not isinstance(self.evaluator, SupervisedEvaluator): From 28f19c580aad486dcf7591dd75f53e171f4cbd6d Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 7 Apr 2023 16:47:09 +0800 Subject: [PATCH 15/22] [DLMED] fix test logging Signed-off-by: Nic Ma --- monai/fl/client/monai_algo.py | 7 ++++++- tests/test_fl_monai_algo.py | 21 +++++++++++++++++---- tests/test_fl_monai_algo_dist.py | 5 +++-- tests/testing_data/config_fl_evaluate.json | 13 +++++++++++++ tests/testing_data/config_fl_train.json | 7 +++++++ tests/testing_data/multi_gpu_train.json | 3 ++- 6 files changed, 48 insertions(+), 8 deletions(-) diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index 9bbca51711..e3b00c65f6 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -21,7 +21,12 @@ from monai.apps.utils import get_logger from monai.auto3dseg import SegSummarizer from monai.bundle import ( - DEFAULT_EXP_MGMT_SETTINGS, BundleWorkflow, ConfigComponent, ConfigItem, ConfigParser, ConfigWorkflow + DEFAULT_EXP_MGMT_SETTINGS, + BundleWorkflow, + ConfigComponent, + ConfigItem, + ConfigParser, + ConfigWorkflow, ) from monai.engines import SupervisedEvaluator, SupervisedTrainer, Trainer from monai.fl.client import ClientAlgo, ClientAlgoStats diff --git a/tests/test_fl_monai_algo.py b/tests/test_fl_monai_algo.py index 092d001d39..6270e872b9 100644 --- a/tests/test_fl_monai_algo.py +++ b/tests/test_fl_monai_algo.py @@ -15,6 +15,7 @@ import shutil import unittest from copy import deepcopy +from os.path import join as pathjoin from parameterized import parameterized @@ -28,11 +29,14 @@ _root_dir = os.path.abspath(os.path.join(os.path.dirname(__file__))) _data_dir = os.path.join(_root_dir, "testing_data") +_logging_file = pathjoin(_data_dir, "logging.conf") TEST_TRAIN_1 = [ { "bundle_root": _data_dir, - "train_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_train.json"), workflow="train"), + "train_workflow": ConfigWorkflow( + config_file=os.path.join(_data_dir, "config_fl_train.json"), workflow="train", logging_file=_logging_file + ), "config_evaluate_filename": None, "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } @@ -54,6 +58,7 @@ os.path.join(_data_dir, "config_fl_train.json"), ], workflow="train", + logging_file=_logging_file, ), "config_evaluate_filename": None, "config_filters_filename": [ @@ -66,7 +71,9 @@ TEST_TRAIN_4 = [ { "bundle_root": _data_dir, - "train_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_train.json"), workflow="train"), + "train_workflow": ConfigWorkflow( + config_file=os.path.join(_data_dir, "config_fl_train.json"), workflow="train", logging_file=_logging_file + ), "config_evaluate_filename": None, "tracking": { "handlers_id": DEFAULT_HANDLERS_ID, @@ -88,7 +95,9 @@ { "bundle_root": _data_dir, "config_train_filename": None, - "eval_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_evaluate.json"), workflow="train"), + "eval_workflow": ConfigWorkflow( + os.path.join(_data_dir, "config_fl_evaluate.json"), workflow="train", logging_file=_logging_file + ), "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] @@ -110,6 +119,7 @@ os.path.join(_data_dir, "config_fl_evaluate.json"), ], workflow="train", + logging_file=_logging_file, ), "config_filters_filename": [ os.path.join(_data_dir, "config_fl_filters.json"), @@ -121,7 +131,9 @@ TEST_GET_WEIGHTS_1 = [ { "bundle_root": _data_dir, - "train_workflow": ConfigWorkflow(os.path.join(_data_dir, "config_fl_train.json"), workflow="train"), + "train_workflow": ConfigWorkflow( + config_file=os.path.join(_data_dir, "config_fl_train.json"), workflow="train", logging_file=_logging_file + ), "config_evaluate_filename": None, "send_weight_diff": False, "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), @@ -145,6 +157,7 @@ os.path.join(_data_dir, "config_fl_train.json"), ], workflow="train", + logging_file=_logging_file, ), "config_evaluate_filename": None, "send_weight_diff": True, diff --git a/tests/test_fl_monai_algo_dist.py b/tests/test_fl_monai_algo_dist.py index e45dfb67aa..7ba474cb41 100644 --- a/tests/test_fl_monai_algo_dist.py +++ b/tests/test_fl_monai_algo_dist.py @@ -26,6 +26,7 @@ _root_dir = os.path.abspath(pathjoin(os.path.dirname(__file__))) _data_dir = pathjoin(_root_dir, "testing_data") +_logging_file = pathjoin(_data_dir, "logging.conf") @SkipIfNoModule("ignite") @@ -38,7 +39,7 @@ def test_train(self): # initialize algo algo = MonaiAlgo( bundle_root=_data_dir, - train_workflow=ConfigWorkflow(config_file=config_file, workflow="train"), + train_workflow=ConfigWorkflow(config_file=config_file, workflow="train", logging_file=_logging_file), config_evaluate_filename=None, config_filters_filename=pathjoin(_root_dir, "testing_data", "config_fl_filters.json"), ) @@ -62,7 +63,7 @@ def test_evaluate(self): algo = MonaiAlgo( bundle_root=_data_dir, config_train_filename=None, - eval_workflow=ConfigWorkflow(config_file=config_file, workflow="train"), + eval_workflow=ConfigWorkflow(config_file=config_file, workflow="train", logging_file=_logging_file), config_filters_filename=pathjoin(_data_dir, "config_fl_filters.json"), ) algo.initialize(extra={ExtraItems.CLIENT_NAME: "test_fl"}) diff --git a/tests/testing_data/config_fl_evaluate.json b/tests/testing_data/config_fl_evaluate.json index 113596070a..549366e54c 100644 --- a/tests/testing_data/config_fl_evaluate.json +++ b/tests/testing_data/config_fl_evaluate.json @@ -75,6 +75,19 @@ "output_transform": "$monai.handlers.from_engine(['pred', 'label'])" } }, + "handlers": [ + { + "_target_": "CheckpointLoader", + "load_path": "$@bundle_root + '/models/model.pt'", + "load_dict": { + "model": "@network" + } + }, + { + "_target_": "StatsHandler", + "iteration_log": false + } + ], "evaluator": { "_target_": "SupervisedEvaluator", "device": "@device", diff --git a/tests/testing_data/config_fl_train.json b/tests/testing_data/config_fl_train.json index bdb9792fce..cbb409e4bc 100644 --- a/tests/testing_data/config_fl_train.json +++ b/tests/testing_data/config_fl_train.json @@ -181,12 +181,19 @@ } ] }, + "handlers": [ + { + "_target_": "StatsHandler", + "iteration_log": false + } + ], "evaluator": { "_target_": "SupervisedEvaluator", "device": "@device", "val_data_loader": "@validate#dataloader", "network": "@network", "inferer": "@validate#inferer", + "val_handlers": "@validate#handlers", "postprocessing": "@validate#postprocessing" } }, diff --git a/tests/testing_data/multi_gpu_train.json b/tests/testing_data/multi_gpu_train.json index e9a3f3b504..761de78bc0 100644 --- a/tests/testing_data/multi_gpu_train.json +++ b/tests/testing_data/multi_gpu_train.json @@ -23,7 +23,8 @@ "$monai.utils.set_determinism(seed=123)", "$setattr(torch.backends.cudnn, 'benchmark', True)", "$import logging", - "$@train#trainer.logger.setLevel(logging.WARNING if dist.get_rank() > 0 else logging.INFO)" + "$@train#trainer.logger.setLevel(logging.WARNING if dist.get_rank() > 0 else logging.INFO)", + "$@validate#evaluator.logger.setLevel(logging.WARNING if dist.get_rank() > 0 else logging.INFO)" ], "run": [ "$@train#trainer.run()" From 66f090e8fece937e4b4d13ce19d3ac0c772bc593 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 7 Apr 2023 16:57:59 +0800 Subject: [PATCH 16/22] [DLMED] fix mypy Signed-off-by: Nic Ma --- monai/fl/client/monai_algo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index e3b00c65f6..c844d476e8 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -79,7 +79,7 @@ def compute_weight_diff(global_weights, local_var_dict): return weight_diff -def disable_ckpt_loaders(parser: ConfigParser): +def disable_ckpt_loaders(parser: ConfigParser) -> None: if "validate#handlers" in parser: for h in parser["validate#handlers"]: if ConfigComponent.is_instantiable(h): From dd1100cd6e262b37419c8dbab251530ae01e54fa Mon Sep 17 00:00:00 2001 From: Holger Roth Date: Fri, 7 Apr 2023 10:44:44 -0400 Subject: [PATCH 17/22] enhance dist test --- monai/fl/utils/filters.py | 2 +- tests/test_fl_monai_algo_dist.py | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/monai/fl/utils/filters.py b/monai/fl/utils/filters.py index 15acabd9a2..56e94246ef 100644 --- a/monai/fl/utils/filters.py +++ b/monai/fl/utils/filters.py @@ -38,7 +38,7 @@ def __call__(self, data: ExchangeObject, extra: dict | None = None) -> ExchangeO class SummaryFilter(Filter): """ - Summary filter to content of ExchangeObject. + Summary filter to show content of ExchangeObject. """ def __call__(self, data: ExchangeObject, extra: dict | None = None) -> ExchangeObject: diff --git a/tests/test_fl_monai_algo_dist.py b/tests/test_fl_monai_algo_dist.py index 7ba474cb41..eb95ab796e 100644 --- a/tests/test_fl_monai_algo_dist.py +++ b/tests/test_fl_monai_algo_dist.py @@ -54,6 +54,11 @@ def test_train(self): data = ExchangeObject(weights=get_state_dict(network)) # test train algo.train(data=data, extra={}) + weights_eo = algo.get_weights() + self.assertIsInstance(weights_eo, ExchangeObject) + self.assertTrue(weights_eo.is_valid_weights()) + self.assertIsInstance(weights_eo.weights, dict) + self.assertTrue(len(weights_eo.weights) > 0) @DistCall(nnodes=1, nproc_per_node=2, init_method="no_init") @skip_if_no_cuda @@ -76,8 +81,10 @@ def test_evaluate(self): network = parser.get_parsed_content("network") data = ExchangeObject(weights=get_state_dict(network)) # test evaluate - algo.evaluate(data=data, extra={}) - + metric_eo = algo.evaluate(data=data, extra={}) + self.assertIsInstance(metric_eo, ExchangeObject) + metric = metric_eo.metrics + self.assertIsInstance(metric["accuracy"], float) if __name__ == "__main__": unittest.main() From d068e030937fea2d6d884ab932160cc8faa69db9 Mon Sep 17 00:00:00 2001 From: monai-bot Date: Sun, 9 Apr 2023 14:22:04 +0000 Subject: [PATCH 18/22] [MONAI] code formatting Signed-off-by: monai-bot --- tests/test_fl_monai_algo_dist.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_fl_monai_algo_dist.py b/tests/test_fl_monai_algo_dist.py index eb95ab796e..7bba08bf3d 100644 --- a/tests/test_fl_monai_algo_dist.py +++ b/tests/test_fl_monai_algo_dist.py @@ -86,5 +86,6 @@ def test_evaluate(self): metric = metric_eo.metrics self.assertIsInstance(metric["accuracy"], float) + if __name__ == "__main__": unittest.main() From f626f836ab738467dae9a056cbd925333effe9cb Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Tue, 11 Apr 2023 22:50:34 +0800 Subject: [PATCH 19/22] [DLMED] add eval to dist test Signed-off-by: Nic Ma --- tests/test_fl_monai_algo_dist.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/test_fl_monai_algo_dist.py b/tests/test_fl_monai_algo_dist.py index 7bba08bf3d..2beb44d145 100644 --- a/tests/test_fl_monai_algo_dist.py +++ b/tests/test_fl_monai_algo_dist.py @@ -35,12 +35,13 @@ class TestFLMonaiAlgo(DistTestCase): @DistCall(nnodes=1, nproc_per_node=2, init_method="no_init") @skip_if_no_cuda def test_train(self): - config_file = [pathjoin(_data_dir, "config_fl_train.json"), pathjoin(_data_dir, "multi_gpu_train.json")] + train_configs = [pathjoin(_data_dir, "config_fl_train.json"), pathjoin(_data_dir, "multi_gpu_train.json")] + eval_configs = [pathjoin(_data_dir, "config_fl_evaluate.json"), pathjoin(_data_dir, "multi_gpu_evaluate.json")] # initialize algo algo = MonaiAlgo( bundle_root=_data_dir, - train_workflow=ConfigWorkflow(config_file=config_file, workflow="train", logging_file=_logging_file), - config_evaluate_filename=None, + train_workflow=ConfigWorkflow(config_file=train_configs, workflow="train", logging_file=_logging_file), + eval_workflow=ConfigWorkflow(config_file=eval_configs, workflow="train", logging_file=_logging_file), config_filters_filename=pathjoin(_root_dir, "testing_data", "config_fl_filters.json"), ) algo.initialize(extra={ExtraItems.CLIENT_NAME: "test_fl"}) @@ -48,7 +49,7 @@ def test_train(self): # initialize model parser = ConfigParser() - parser.read_config(config_file) + parser.read_config(train_configs) parser.parse() network = parser.get_parsed_content("network") data = ExchangeObject(weights=get_state_dict(network)) From 7a649d2966498d9a23ac43725634b4f20950fde2 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Fri, 14 Apr 2023 23:54:59 +0800 Subject: [PATCH 20/22] [DLMED] add multiple rounds test Signed-off-by: Nic Ma --- tests/test_fl_monai_algo_dist.py | 21 +++++++++++++++------ tests/testing_data/multi_gpu_train.json | 1 - 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/tests/test_fl_monai_algo_dist.py b/tests/test_fl_monai_algo_dist.py index 2beb44d145..f50393870e 100644 --- a/tests/test_fl_monai_algo_dist.py +++ b/tests/test_fl_monai_algo_dist.py @@ -54,12 +54,21 @@ def test_train(self): network = parser.get_parsed_content("network") data = ExchangeObject(weights=get_state_dict(network)) # test train - algo.train(data=data, extra={}) - weights_eo = algo.get_weights() - self.assertIsInstance(weights_eo, ExchangeObject) - self.assertTrue(weights_eo.is_valid_weights()) - self.assertIsInstance(weights_eo.weights, dict) - self.assertTrue(len(weights_eo.weights) > 0) + for i in range(2): + print(f"Testing round {i+1} of {2}...") + # test evaluate + metric_eo = algo.evaluate(data=data, extra={}) + self.assertIsInstance(metric_eo, ExchangeObject) + metric = metric_eo.metrics + self.assertIsInstance(metric["accuracy"], float) + + # test train + algo.train(data=data, extra={}) + weights_eo = algo.get_weights() + self.assertIsInstance(weights_eo, ExchangeObject) + self.assertTrue(weights_eo.is_valid_weights()) + self.assertIsInstance(weights_eo.weights, dict) + self.assertTrue(len(weights_eo.weights) > 0) @DistCall(nnodes=1, nproc_per_node=2, init_method="no_init") @skip_if_no_cuda diff --git a/tests/testing_data/multi_gpu_train.json b/tests/testing_data/multi_gpu_train.json index 761de78bc0..a617e53dfd 100644 --- a/tests/testing_data/multi_gpu_train.json +++ b/tests/testing_data/multi_gpu_train.json @@ -15,7 +15,6 @@ }, "train#dataloader#sampler": "@train#sampler", "train#dataloader#shuffle": false, - "train#trainer#train_handlers": "$@train#handlers[: -2 if dist.get_rank() > 0 else None]", "initialize": [ "$import torch.distributed as dist", "$dist.is_initialized() or dist.init_process_group(backend='nccl')", From 8313a74bb43840e9cb35b0c541ef3e0aa5ec97ee Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Tue, 18 Apr 2023 15:03:06 +0800 Subject: [PATCH 21/22] [DLMED] added "set_device" back Signed-off-by: Nic Ma --- monai/fl/client/monai_algo.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index c844d476e8..0489a3210f 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -16,6 +16,7 @@ from typing import Any, cast import torch +import torch.distributed as dist from monai.apps.auto3dseg.data_analyzer import DataAnalyzer from monai.apps.utils import get_logger @@ -414,6 +415,7 @@ def initialize(self, extra=None): i.e., `ExtraItems.CLIENT_NAME` and `ExtraItems.APP_ROOT`. """ + self._set_cuda_device() if extra is None: extra = {} self.client_name = extra.get(ExtraItems.CLIENT_NAME, "noname") @@ -496,6 +498,7 @@ def train(self, data: ExchangeObject, extra: dict | None = None) -> None: """ + self._set_cuda_device() if extra is None: extra = {} if not isinstance(data, ExchangeObject): @@ -538,6 +541,7 @@ def get_weights(self, extra=None): """ + self._set_cuda_device() if extra is None: extra = {} @@ -616,6 +620,7 @@ def evaluate(self, data: ExchangeObject, extra: dict | None = None) -> ExchangeO """ + self._set_cuda_device() if extra is None: extra = {} if not isinstance(data, ExchangeObject): @@ -691,3 +696,8 @@ def _check_converted(self, global_weights, local_var_dict, n_converted): self.logger.info( f"Converted {n_converted} global variables to match {len(local_var_dict)} local variables." ) + + def _set_cuda_device(self): + if dist.is_initialized(): + self.rank = int(os.environ["LOCAL_RANK"]) + torch.cuda.set_device(self.rank) From 19a10d8bcf8ca2cf2fe2ea0db2948abcdeb7aa59 Mon Sep 17 00:00:00 2001 From: Nic Ma Date: Tue, 18 Apr 2023 15:52:42 +0800 Subject: [PATCH 22/22] [DLMED] optimize tests Signed-off-by: Nic Ma --- monai/fl/client/monai_algo.py | 7 +- tests/test_fl_monai_algo.py | 44 ++++----- tests/test_fl_monai_algo_dist.py | 16 ++- tests/test_fl_monai_algo_stats.py | 15 ++- tests/testing_data/config_fl_evaluate.json | 108 +++------------------ tests/testing_data/config_fl_train.json | 11 ++- 6 files changed, 68 insertions(+), 133 deletions(-) diff --git a/monai/fl/client/monai_algo.py b/monai/fl/client/monai_algo.py index 0489a3210f..a846c99d01 100644 --- a/monai/fl/client/monai_algo.py +++ b/monai/fl/client/monai_algo.py @@ -327,6 +327,9 @@ class MonaiAlgo(ClientAlgo, MonaiAlgoStats): config_evaluate_filename: bundle evaluation config path relative to bundle_root. can be a list of files. if "default", ["configs/train.json", "configs/evaluate.json"] will be used. this arg is only useful when `eval_workflow` is None. + eval_workflow_name: the workflow name corresponding to the "config_evaluate_filename", default to "train" + as the default "config_evaluate_filename" overrides the train workflow config. + this arg is only useful when `eval_workflow` is None. config_filters_filename: filter configuration file. Can be a list of files; defaults to `None`. disable_ckpt_loading: do not use any CheckpointLoader if defined in train/evaluate configs; defaults to `True`. best_model_filepath: location of best model checkpoint; defaults "models/model.pt" relative to `bundle_root`. @@ -355,6 +358,7 @@ def __init__( send_weight_diff: bool = True, config_train_filename: str | list | None = "configs/train.json", config_evaluate_filename: str | list | None = "default", + eval_workflow_name: str = "train", config_filters_filename: str | list | None = None, disable_ckpt_loading: bool = True, best_model_filepath: str | None = "models/model.pt", @@ -385,6 +389,7 @@ def __init__( # by default, evaluator needs both training and evaluate to be instantiated config_evaluate_filename = ["configs/train.json", "configs/evaluate.json"] self.config_evaluate_filename = config_evaluate_filename + self.eval_workflow_name = eval_workflow_name self.config_filters_filename = config_filters_filename self.disable_ckpt_loading = disable_ckpt_loading self.model_filepaths = {ModelType.BEST_MODEL: best_model_filepath, ModelType.FINAL_MODEL: final_model_filepath} @@ -453,7 +458,7 @@ def initialize(self, extra=None): if self.eval_workflow is None and self.config_evaluate_filename is not None: config_eval_files = self._add_config_files(self.config_evaluate_filename) self.eval_workflow = ConfigWorkflow( - config_file=config_eval_files, meta_file=None, logging_file=None, workflow="train" + config_file=config_eval_files, meta_file=None, logging_file=None, workflow=self.eval_workflow_name ) if self.eval_workflow is not None: self.eval_workflow.initialize() diff --git a/tests/test_fl_monai_algo.py b/tests/test_fl_monai_algo.py index 6270e872b9..cf8d3254ed 100644 --- a/tests/test_fl_monai_algo.py +++ b/tests/test_fl_monai_algo.py @@ -53,18 +53,10 @@ { "bundle_root": _data_dir, "train_workflow": ConfigWorkflow( - config_file=[ - os.path.join(_data_dir, "config_fl_train.json"), - os.path.join(_data_dir, "config_fl_train.json"), - ], - workflow="train", - logging_file=_logging_file, + config_file=os.path.join(_data_dir, "config_fl_train.json"), workflow="train", logging_file=_logging_file ), "config_evaluate_filename": None, - "config_filters_filename": [ - os.path.join(_data_dir, "config_fl_filters.json"), - os.path.join(_data_dir, "config_fl_filters.json"), - ], + "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] @@ -96,7 +88,12 @@ "bundle_root": _data_dir, "config_train_filename": None, "eval_workflow": ConfigWorkflow( - os.path.join(_data_dir, "config_fl_evaluate.json"), workflow="train", logging_file=_logging_file + config_file=[ + os.path.join(_data_dir, "config_fl_train.json"), + os.path.join(_data_dir, "config_fl_evaluate.json"), + ], + workflow="train", + logging_file=_logging_file, ), "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } @@ -105,7 +102,11 @@ { "bundle_root": _data_dir, "config_train_filename": None, - "config_evaluate_filename": os.path.join(_data_dir, "config_fl_evaluate.json"), + "config_evaluate_filename": [ + os.path.join(_data_dir, "config_fl_train.json"), + os.path.join(_data_dir, "config_fl_evaluate.json"), + ], + "eval_workflow_name": "training", "config_filters_filename": None, } ] @@ -115,16 +116,13 @@ "config_train_filename": None, "eval_workflow": ConfigWorkflow( config_file=[ - os.path.join(_data_dir, "config_fl_evaluate.json"), + os.path.join(_data_dir, "config_fl_train.json"), os.path.join(_data_dir, "config_fl_evaluate.json"), ], workflow="train", logging_file=_logging_file, ), - "config_filters_filename": [ - os.path.join(_data_dir, "config_fl_filters.json"), - os.path.join(_data_dir, "config_fl_filters.json"), - ], + "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] @@ -152,19 +150,11 @@ { "bundle_root": _data_dir, "train_workflow": ConfigWorkflow( - config_file=[ - os.path.join(_data_dir, "config_fl_train.json"), - os.path.join(_data_dir, "config_fl_train.json"), - ], - workflow="train", - logging_file=_logging_file, + config_file=os.path.join(_data_dir, "config_fl_train.json"), workflow="train", logging_file=_logging_file ), "config_evaluate_filename": None, "send_weight_diff": True, - "config_filters_filename": [ - os.path.join(_data_dir, "config_fl_filters.json"), - os.path.join(_data_dir, "config_fl_filters.json"), - ], + "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] diff --git a/tests/test_fl_monai_algo_dist.py b/tests/test_fl_monai_algo_dist.py index f50393870e..1bf599a0fa 100644 --- a/tests/test_fl_monai_algo_dist.py +++ b/tests/test_fl_monai_algo_dist.py @@ -36,7 +36,11 @@ class TestFLMonaiAlgo(DistTestCase): @skip_if_no_cuda def test_train(self): train_configs = [pathjoin(_data_dir, "config_fl_train.json"), pathjoin(_data_dir, "multi_gpu_train.json")] - eval_configs = [pathjoin(_data_dir, "config_fl_evaluate.json"), pathjoin(_data_dir, "multi_gpu_evaluate.json")] + eval_configs = [ + pathjoin(_data_dir, "config_fl_train.json"), + pathjoin(_data_dir, "config_fl_evaluate.json"), + pathjoin(_data_dir, "multi_gpu_evaluate.json"), + ] # initialize algo algo = MonaiAlgo( bundle_root=_data_dir, @@ -73,7 +77,11 @@ def test_train(self): @DistCall(nnodes=1, nproc_per_node=2, init_method="no_init") @skip_if_no_cuda def test_evaluate(self): - config_file = [pathjoin(_data_dir, "config_fl_evaluate.json"), pathjoin(_data_dir, "multi_gpu_evaluate.json")] + config_file = [ + pathjoin(_data_dir, "config_fl_train.json"), + pathjoin(_data_dir, "config_fl_evaluate.json"), + pathjoin(_data_dir, "multi_gpu_evaluate.json"), + ] # initialize algo algo = MonaiAlgo( bundle_root=_data_dir, @@ -86,7 +94,9 @@ def test_evaluate(self): # initialize model parser = ConfigParser() - parser.read_config(pathjoin(_data_dir, "config_fl_evaluate.json")) + parser.read_config( + [pathjoin(_data_dir, "config_fl_train.json"), pathjoin(_data_dir, "config_fl_evaluate.json")] + ) parser.parse() network = parser.get_parsed_content("network") data = ExchangeObject(weights=get_state_dict(network)) diff --git a/tests/test_fl_monai_algo_stats.py b/tests/test_fl_monai_algo_stats.py index 0e00866ba4..e46b6b899a 100644 --- a/tests/test_fl_monai_algo_stats.py +++ b/tests/test_fl_monai_algo_stats.py @@ -24,11 +24,17 @@ _root_dir = os.path.abspath(os.path.join(os.path.dirname(__file__))) _data_dir = os.path.join(_root_dir, "testing_data") +_logging_file = os.path.join(_data_dir, "logging.conf") TEST_GET_DATA_STATS_1 = [ { "bundle_root": _data_dir, - "workflow": ConfigWorkflow(workflow="train", config_file=os.path.join(_data_dir, "config_fl_stats_1.json")), + "workflow": ConfigWorkflow( + workflow="train", + config_file=os.path.join(_data_dir, "config_fl_stats_1.json"), + logging_file=_logging_file, + meta_file=None, + ), "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] @@ -48,11 +54,10 @@ os.path.join(_data_dir, "config_fl_stats_1.json"), os.path.join(_data_dir, "config_fl_stats_2.json"), ], + logging_file=_logging_file, + meta_file=None, ), - "config_filters_filename": [ - os.path.join(_data_dir, "config_fl_filters.json"), - os.path.join(_data_dir, "config_fl_filters.json"), - ], + "config_filters_filename": os.path.join(_data_dir, "config_fl_filters.json"), } ] diff --git a/tests/testing_data/config_fl_evaluate.json b/tests/testing_data/config_fl_evaluate.json index 549366e54c..917e762736 100644 --- a/tests/testing_data/config_fl_evaluate.json +++ b/tests/testing_data/config_fl_evaluate.json @@ -1,100 +1,18 @@ { - "bundle_root": "tests/testing_data", - "dataset_dir": "@bundle_root", - "imports": [ - "$import os" - ], - "device": "$torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')", - "network_def": { - "_target_": "DenseNet121", - "spatial_dims": 2, - "in_channels": 1, - "out_channels": 6 - }, - "network": "$@network_def.to(@device)", - "validate": { - "val_transforms": [ - { - "_target_": "LoadImaged", - "keys": [ - "image" - ], - "image_only": true - }, - { - "_target_": "EnsureChannelFirstD", - "keys": [ - "image" - ] - }, - { - "_target_": "ScaleIntensityd", - "keys": [ - "image" - ] - }, - { - "_target_": "ToTensord", - "keys": [ - "image", - "label" - ] - } - ], - "preprocessing": { - "_target_": "Compose", - "transforms": "$@validate#val_transforms" - }, - "dataset": { - "_target_": "Dataset", - "data": [ - { - "image": "$os.path.join(@dataset_dir, 'image0.jpeg')", - "label": 0 - }, - { - "image": "$os.path.join(@dataset_dir, 'image1.jpeg')", - "label": 1 - } - ], - "transform": "@validate#preprocessing" - }, - "dataloader": { - "_target_": "DataLoader", - "dataset": "@validate#dataset", - "batch_size": 3, - "shuffle": false, - "num_workers": 4 - }, - "inferer": { - "_target_": "SimpleInferer" - }, - "key_metric": { - "accuracy": { - "_target_": "ignite.metrics.Accuracy", - "output_transform": "$monai.handlers.from_engine(['pred', 'label'])" + "validate#handlers": [ + { + "_target_": "CheckpointLoader", + "load_path": "$@bundle_root + '/models/model.pt'", + "load_dict": { + "model": "@network" } }, - "handlers": [ - { - "_target_": "CheckpointLoader", - "load_path": "$@bundle_root + '/models/model.pt'", - "load_dict": { - "model": "@network" - } - }, - { - "_target_": "StatsHandler", - "iteration_log": false - } - ], - "evaluator": { - "_target_": "SupervisedEvaluator", - "device": "@device", - "val_data_loader": "@validate#dataloader", - "network": "@network", - "inferer": "@validate#inferer", - "key_val_metric": "@validate#key_metric" + { + "_target_": "StatsHandler", + "iteration_log": false } - } + ], + "run": [ + "$@validate#evaluator.run()" + ] } diff --git a/tests/testing_data/config_fl_train.json b/tests/testing_data/config_fl_train.json index cbb409e4bc..5b7fb6608e 100644 --- a/tests/testing_data/config_fl_train.json +++ b/tests/testing_data/config_fl_train.json @@ -165,7 +165,7 @@ "_target_": "DataLoader", "dataset": "@validate#dataset", "batch_size": 1, - "shuffle": true, + "shuffle": false, "num_workers": 2 }, "inferer": { @@ -181,6 +181,12 @@ } ] }, + "key_metric": { + "accuracy": { + "_target_": "ignite.metrics.Accuracy", + "output_transform": "$monai.handlers.from_engine(['pred', 'label'])" + } + }, "handlers": [ { "_target_": "StatsHandler", @@ -194,7 +200,8 @@ "network": "@network", "inferer": "@validate#inferer", "val_handlers": "@validate#handlers", - "postprocessing": "@validate#postprocessing" + "postprocessing": "@validate#postprocessing", + "key_val_metric": "@validate#key_metric" } }, "initialize": [