Skip to content

Commit

Permalink
Merge branch 'main' into http-driver-bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
chesterxgchen authored Feb 1, 2024
2 parents 195b3e7 + edaf8d2 commit fc72f3b
Show file tree
Hide file tree
Showing 16 changed files with 273 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

# 4.1 Central vs. FedAvg
experiments = {
"cifar10_central": {"tag": "val_acc_local_model"},
"cifar10_central": {"tag": "val_acc_local_model", "alpha": 0.0},
"cifar10_fedavg": {"tag": "val_acc_global_model", "alpha": 1.0},
}

Expand Down Expand Up @@ -95,6 +95,8 @@ def main():
alpha = exp.get("alpha", None)
if alpha:
config_name = config_name + f"*alpha{alpha}"
else:
raise ValueError(f"Expected an alpha value to be provided but got alpha={alpha}")
eventfile = glob.glob(
os.path.join(client_results_root, config_name, "**", "app_site-1", "events.*"), recursive=True
)
Expand All @@ -116,7 +118,8 @@ def main():
try:
xsite_data[k].append(xsite_results["site-1"][k]["val_accuracy"])
except Exception as e:
raise ValueError(f"No val_accuracy for {k} in {xsite_file}!")
xsite_data[k].append(None)
print(f"Warning: No val_accuracy for {k} in {xsite_file}!")

print("Training TB data:")
print(pd.DataFrame(data))
Expand Down
7 changes: 1 addition & 6 deletions examples/advanced/cifar10/cifar10-sim/run_simulator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@ n_clients=$4

# specify output workdir
RESULT_ROOT=/tmp/nvflare/sim_cifar10
if [ 1 -eq "$(echo "${alpha} > 0" | bc)" ]
then
out_workspace=${RESULT_ROOT}/${job}_alpha${alpha}
else
out_workspace=${RESULT_ROOT}/${job}
fi
out_workspace=${RESULT_ROOT}/${job}_alpha${alpha}

# run FL simulator
./set_alpha.sh "${job}" "${alpha}"
Expand Down
12 changes: 6 additions & 6 deletions examples/advanced/gnn/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ python3 -m pip install -r requirements.txt
```
To support functions of PyTorch Geometric necessary for this example, we need extra dependencies. Please refer to [installation guide](https://pytorch-geometric.readthedocs.io/en/latest/install/installation.html) and install accordingly:
```
pip install pyg_lib torch_scatter torch_sparse torch_cluster torch_spline_conv -f https://data.pyg.org/whl/torch-2.1.0+cpu.html
python3 -m pip install pyg_lib torch_scatter torch_sparse torch_cluster torch_spline_conv -f https://data.pyg.org/whl/torch-2.1.0+cpu.html
```

#### Job Template
Expand All @@ -46,8 +46,8 @@ nvflare job list_templates
We can see the "sag_gnn" template is available

#### Protein Classification
The PPI dataset is directly available via torch_geometric library, we randomly split the dataset to 2 subsets, one for each client.
First, we run the local training on each client, as well as the whole dataset.
The PPI dataset is directly available via torch_geometric library, we randomly split the dataset to 2 subsets, one for each client (`--client_id 1` and `--client_id 2`).
First, we run the local training on each client, as well as the whole dataset with `--client_id 0`.
```
python3 code/graphsage_protein_local.py --client_id 0
python3 code/graphsage_protein_local.py --client_id 1
Expand All @@ -64,7 +64,7 @@ For client configs, we set client_ids for each client, and the number of local e

For server configs, we set the number of rounds for federated training, the key metric for model selection, and the model class path with model hyperparameters.

With the produced job, we run the federated training on both clients via FedAvg using NVFlare Simulator.
With the produced job, we run the federated training on both clients via FedAvg using the NVFlare Simulator.
```
nvflare simulator -w /tmp/nvflare/gnn/protein_fl_workspace -n 2 -t 2 /tmp/nvflare/jobs/gnn_protein
```
Expand All @@ -74,7 +74,7 @@ We first download the Elliptic++ dataset to `/tmp/nvflare/datasets/elliptic_pp`
- `txs_classes.csv`: transaction id and its class (licit or illicit)
- `txs_edgelist.csv`: connections for transaction ids
- `txs_features.csv`: transaction id and its features
Then, we run the local training on each client, as well as the whole dataset.
Then, we run the local training on each client, as well as the whole dataset. Again, `--client_id 0` uses all data.
```
python3 code/graphsage_finance_local.py --client_id 0
python3 code/graphsage_finance_local.py --client_id 1
Expand All @@ -87,7 +87,7 @@ nvflare job create -force -j "/tmp/nvflare/jobs/gnn_finance" -w "sag_gnn" -sd "c
-f app_2/config_fed_client.conf app_script="graphsage_finance_fl.py" app_config="--client_id 2 --epochs 10" \
-f app_server/config_fed_server.conf num_rounds=7 key_metric="validation_auc" model_class_path="pyg_sage.SAGE" components[0].args.model.args.in_channels=165 components[0].args.model.args.hidden_channels=256 components[0].args.model.args.num_layers=3 components[0].args.model.args.num_classes=2
```
And with the produced job, we run the federated training on both clients via FedAvg using NVFlare Simulator.
And with the produced job, we run the federated training on both clients via FedAvg using the NVFlare Simulator.
```
nvflare simulator -w /tmp/nvflare/gnn/finance_fl_workspace -n 2 -t 2 /tmp/nvflare/jobs/gnn_finance
```
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"format_version": 2,
"num_rounds": 100,
"executors": [
{
"tasks": [
Expand All @@ -10,14 +11,14 @@
"name": "FedXGBHistogramExecutor",
"args": {
"data_loader_id": "dataloader",
"num_rounds": 100,
"num_rounds": "{num_rounds}",
"early_stopping_rounds": 2,
"xgb_params": {
"max_depth": 8,
"eta": 0.1,
"objective": "binary:logistic",
"eval_metric": "auc",
"tree_method": "gpu_hist",
"tree_method": "hist",
"nthread": 16
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"format_version": 2,
"num_rounds": 101,

"server": {
"heart_beat_timeout": 600,
Expand Down Expand Up @@ -34,7 +35,7 @@
"name": "ScatterAndGather",
"args": {
"min_clients": 5,
"num_rounds": 101,
"num_rounds": "{num_rounds}",
"start_round": 0,
"wait_time_after_min_received": 0,
"aggregator_id": "aggregator",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
"data_loader_id": "dataloader",
"training_mode": "cyclic",
"num_client_bagging": 1,
"lr_mode": "scaled",
"local_model_path": "model.json",
"global_model_path": "model_global.json",
"learning_rate": 0.1,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"format_version": 2,
"num_rounds": 20,

"server": {
"heart_beat_timeout": 600,
Expand Down Expand Up @@ -29,7 +30,7 @@
"id": "cyclic_ctl",
"name": "CyclicController",
"args": {
"num_rounds": 20,
"num_rounds": "{num_rounds}",
"task_assignment_timeout": 60,
"persistor_id": "persistor",
"shareable_generator_id": "shareable_generator",
Expand Down
11 changes: 7 additions & 4 deletions examples/advanced/xgboost/utils/prepare_job_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

from nvflare.apis.fl_constant import JobConstants

SCRIPT_PATH = pathlib.Path(os.path.realpath(__file__))
XGB_EXAMPLE_ROOT = SCRIPT_PATH.parent.parent.absolute()
JOB_CONFIGS_ROOT = "jobs"
MODE_ALGO_MAP = {"bagging": "tree-based", "cyclic": "tree-based", "histogram": "histogram-based"}

Expand Down Expand Up @@ -84,7 +86,7 @@ def _get_src_job_dir(training_mode):
"cyclic": "cyclic_base",
"histogram": "base",
}
return pathlib.Path(MODE_ALGO_MAP[training_mode]) / JOB_CONFIGS_ROOT / base_job_map[training_mode]
return XGB_EXAMPLE_ROOT / MODE_ALGO_MAP[training_mode] / JOB_CONFIGS_ROOT / base_job_map[training_mode]


def _gen_deploy_map(num_sites: int, site_name_prefix: str) -> dict:
Expand Down Expand Up @@ -133,17 +135,18 @@ def _update_client_config(config: dict, args, lr_scale, site_name: str):
num_client_bagging = args.site_num
config["executors"][0]["executor"]["args"]["num_client_bagging"] = num_client_bagging
else:
config["num_rounds"] = args.round_num
config["components"][0]["args"]["data_split_filename"] = data_split_name
config["executors"][0]["executor"]["args"]["xgb_params"]["nthread"] = args.nthread
config["executors"][0]["executor"]["args"]["xgb_params"]["tree_method"] = args.tree_method


def _update_server_config(config: dict, args):
if args.training_mode == "bagging":
config["workflows"][0]["args"]["num_rounds"] = args.round_num + 1
config["num_rounds"] = args.round_num + 1
config["workflows"][0]["args"]["min_clients"] = args.site_num
elif args.training_mode == "cyclic":
config["workflows"][0]["args"]["num_rounds"] = int(args.round_num / args.site_num)
config["num_rounds"] = int(args.round_num / args.site_num)


def _copy_custom_files(src_job_path, src_app_name, dst_job_path, dst_app_name):
Expand Down Expand Up @@ -198,7 +201,7 @@ def main():
src_job_path = _get_src_job_dir(args.training_mode)

# create a new job
dst_job_path = pathlib.Path(MODE_ALGO_MAP[args.training_mode]) / JOB_CONFIGS_ROOT / job_name
dst_job_path = XGB_EXAMPLE_ROOT / MODE_ALGO_MAP[args.training_mode] / JOB_CONFIGS_ROOT / job_name
if not os.path.exists(dst_job_path):
os.makedirs(dst_job_path)

Expand Down
6 changes: 5 additions & 1 deletion nvflare/app_common/workflows/base_fedavg.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from nvflare.apis.fl_constant import FLMetaKey
from nvflare.app_common.abstract.fl_model import FLModel
from nvflare.app_common.abstract.model import make_model_learnable
from nvflare.app_common.aggregators.weighted_aggregation_helper import WeightedAggregationHelper
from nvflare.app_common.app_constant import AppConstants
from nvflare.app_common.app_event_type import AppEventType
Expand Down Expand Up @@ -142,5 +143,8 @@ def update_model(self, aggr_result):

self.model = FLModelUtils.update_model(self.model, aggr_result)

self.fl_ctx.set_prop(AppConstants.GLOBAL_MODEL, self.model, private=True, sticky=True)
# persistor uses Learnable format to save model
ml = make_model_learnable(weights=self.model.params, meta_props=self.model.meta)
self.fl_ctx.set_prop(AppConstants.GLOBAL_MODEL, ml, private=True, sticky=True)

self.event(AppEventType.AFTER_SHAREABLE_TO_LEARNABLE)
14 changes: 14 additions & 0 deletions nvflare/app_common/workflows/cyclic_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import random

from nvflare.apis.client import Client
from nvflare.apis.fl_constant import ReturnCode
from nvflare.apis.fl_context import FLContext
from nvflare.apis.impl.controller import ClientTask, Controller, Task
from nvflare.apis.shareable import Shareable
Expand Down Expand Up @@ -145,6 +146,19 @@ def _get_relay_orders(self, fl_ctx: FLContext):
return targets

def _process_result(self, client_task: ClientTask, fl_ctx: FLContext):
result = client_task.result
rc = result.get_return_code()
client_name = client_task.client.name

# Raise errors if ReturnCode is not OK.
if rc and rc != ReturnCode.OK:
self.system_panic(
f"Result from {client_name} is bad, error code: {rc}. "
f"{self.__class__.__name__} exiting at round {self._current_round}.",
fl_ctx=fl_ctx,
)
return False

# submitted shareable is stored in client_task.result
# we need to update task.data with that shareable so the next target
# will get the updated shareable
Expand Down
10 changes: 8 additions & 2 deletions nvflare/app_common/workflows/model_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ def start_controller(self, fl_ctx: FLContext) -> None:
else:
self.model = FLModel(params_type=ParamsType.FULL, params={})

self.fl_ctx.set_prop(AppConstants.GLOBAL_MODEL, self.model, private=True, sticky=True)
# persistor uses Learnable format to save model
ml = make_model_learnable(weights=self.model.params, meta_props=self.model.meta)
self.fl_ctx.set_prop(AppConstants.GLOBAL_MODEL, ml, private=True, sticky=True)
self.event(AppEventType.INITIAL_MODEL_LOADED)

self.engine = self.fl_ctx.get_engine()
Expand Down Expand Up @@ -231,7 +233,11 @@ def _process_result(self, client_task: ClientTask, fl_ctx: FLContext) -> None:
result = client_task.result
client_name = client_task.client.name

self.fl_ctx.set_prop(AppConstants.CURRENT_ROUND, self._current_round, private=True, sticky=True)

self.event(AppEventType.BEFORE_CONTRIBUTION_ACCEPT)
self._accept_train_result(client_name=client_name, result=result, fl_ctx=fl_ctx)
self.event(AppEventType.AFTER_CONTRIBUTION_ACCEPT)

# Turn result into FLModel
result_model = FLModelUtils.from_shareable(result)
Expand Down Expand Up @@ -270,7 +276,6 @@ def _accept_train_result(self, client_name: str, result: Shareable, fl_ctx: FLCo
)
return

self.fl_ctx.set_prop(AppConstants.CURRENT_ROUND, self._current_round, private=True, sticky=True)
self.fl_ctx.set_prop(AppConstants.TRAINING_RESULT, result, private=True, sticky=False)

@abstractmethod
Expand Down Expand Up @@ -307,6 +312,7 @@ def save_model(self):
) or self._current_round == self._num_rounds - 1:
self.info("Start persist model on server.")
self.event(AppEventType.BEFORE_LEARNABLE_PERSIST)
# persistor uses Learnable format to save model
ml = make_model_learnable(weights=self.model.params, meta_props=self.model.meta)
self.persistor.save(ml, self.fl_ctx)
self.event(AppEventType.AFTER_LEARNABLE_PERSIST)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
n_servers: 1
n_clients: 2
additional_python_paths:
- ../../examples/advanced/xgboost
cleanup: true
jobs_root_dir: ../../examples/advanced/xgboost/histogram-based/jobs


tests:
- test_name: Test a simplified copy of job higgs_2_histogram_uniform_split_uniform_lr
for xgboost histogram-based example.
event_sequence:
- actions:
- submit_job higgs_2_histogram_uniform_split_uniform_lr_copy
result:
type: job_submit_success
trigger:
data: Server started
type: server_log
- actions:
- ensure_current_job_done
result:
data:
run_finished: true
type: run_state
trigger:
data:
run_finished: true
type: run_state
setup:
- cp ../../examples/advanced/xgboost/histogram-based/requirements.txt
../../examples/advanced/xgboost/histogram-based/temp_requirements.txt
- sed -i '/nvflare\|jupyter\|notebook/d' ../../examples/advanced/xgboost/histogram-based/temp_requirements.txt
- pip install -r ../../examples/advanced/xgboost/histogram-based/temp_requirements.txt
- bash ../../examples/advanced/xgboost/histogram-based/prepare_data.sh
- python3 ../../examples/advanced/xgboost/utils/prepare_job_config.py --site_num 2 --training_mode histogram
--split_method uniform --lr_mode uniform --nthread 16 --tree_method hist
- python3 convert_to_test_job.py
--job ../../examples/advanced/xgboost/histogram-based/jobs/higgs_2_histogram_uniform_split_uniform_lr
--post _copy
- rm -f ../../examples/advanced/xgboost/histogram-based/temp_requirements.txt
teardown:
- rm -rf ../../examples/advanced/xgboost/histogram-based/jobs/higgs_2_histogram_uniform_split_uniform_lr
- rm -rf ../../examples/advanced/xgboost/histogram-based/jobs/higgs_2_histogram_uniform_split_uniform_lr_copy
Loading

0 comments on commit fc72f3b

Please sign in to comment.