diff --git a/examples/advanced/kaplan-meier-he/README.md b/examples/advanced/kaplan-meier-he/README.md new file mode 100644 index 0000000000..bd138c7583 --- /dev/null +++ b/examples/advanced/kaplan-meier-he/README.md @@ -0,0 +1,79 @@ +# Secure Federated Kaplan-Meier Analysis via Homomorphic Encryption + +This example illustrates two features: +* How to perform Kaplan-Meier survival analysis in federated setting securely via Homomorphic Encryption (HE). +* How to use the Flare Workflow Controller API to contract a workflow to facilitate HE under simulator mode. + +## Secure Multi-party Kaplan-Meier Analysis +Kaplan-Meier survival analysis is a one-shot (non-iterative) analysis performed on a list of events and their corresponding time. In this example, we use [lifelines](https://zenodo.org/records/10456828) to perform this analysis. + +Essentially, the estimator needs to get access to the event list, and under the setting of federated analysis, the aggregated event list from all participants. + +However, this poses a data security concern - by sharing the event list, the raw data can be exposed to external parties, which break the core value of federated analysis. + +Therefore, we would like to design a secure mechanism to enable collaborative Kaplan-Meier analysis without the risk of exposing any raw information from a certain participant (at server end). This is achieved by two techniques: + +- Condense the raw event list to two histograms (one for observed events and the other for censored event) binned at certain interval (e.g. a week), such that events happened within the same bin from different participants can be aggregated and will not be distinguishable for the final aggregated histograms. +- The local histograms will be encrypted as one single vector before sending to server, and the global aggregation operation at server side will be performed entirely within encryption space with HE. + +With these two settings, the server will have no access to any knowledge regarding local submissions, and participants will only receive global aggregated histograms that will not contain distinguishable information regarding any individual participants (client number >= 3 - if only two participants, one can infer the other party's info by subtracting its own histograms). + +The final Kaplan-Meier survival analysis will be performed locally on the global aggregated event list, recovered from global histograms. + + +## Simulated HE Analysis via FLARE Workflow Controller API + +The Flare Workflow Controller API (`WFController`) provides the functionality of flexible FLModel payloads for each round of federated analysis. This gives us the flexibility of transmitting various information needed by our scheme at different stages of federated learning. + +Our [existing HE examples](https://github.com/NVIDIA/NVFlare/tree/main/examples/advanced/cifar10/cifar10-real-world) uses data filter mechanism for HE, provisioning the HE context information (specs and keys) for both client and server of the federated job under [CKKS](https://github.com/NVIDIA/NVFlare/blob/main/nvflare/app_opt/he/model_encryptor.py) scheme. In this example, we would like to illustrate WFController's capability in supporting customized needs beyond the existing HE functionalities (designed mainly for encrypting deep learning models). +- different HE schemes (BFV) rather than CKKS +- different content at different rounds of federated learning, and only specific payload needs to be encrypted + +With the WFController API, such "proof of concept" experiment becomes easy. In this example, the federated analysis pipeline includes 3 rounds: +1. Server send the simple start message without any payload. +2. Clients collect the information of the local maximum bin number (for event time) and send to server, where server aggregates the information by selecting the maximum among all clients. The global maximum number is then distributed back to clients. This step is necessary because we would like to standardize the histograms generated by all clients, such that they will have the exact same length and can be encrypted as vectors of same size, which will be addable. +3. Clients condense their local raw event lists into two histograms with the global length received, encrypt the histrogram value vectors, and send to server. Server aggregated the received histograms by adding the encrypted vectors together, and sends the aggregated histograms back to clients. + +After Round 3, the federated work is completed. Then at each client, the aggregated histograms will be decrypted and converted back to an event list, and Kaplan-Meier analysis can be performed on the global information. + +## Run the job +We first run a baseline analysis with full event information: +```commandline +python baseline_kaplan_meier.py +``` +By default, this will generate a KM curve image `km_curve_baseline.png` under the current working directory. + +Then we run a 5-client federated job with simulator, begin with splitting and generating the data files for each client: +```commandline +python utils/prepare_data.py --out_path "/tmp/flare/dataset/km_data" +``` +Then we prepare HE context for clients and server, note that this step is done by secure provisioning for real-life applications, but in this study experimenting with BFV scheme, we use this step to distribute the HE context. +```commandline +python utils/prepare_he_context.py --out_path "/tmp/flare/he_context" +``` + +Next, we set the location of the job templates directory. +```commandline +nvflare config -jt ./job_templates +``` + +Then we can generate the job configuration from the `kaplan_meier_he` template: + +```commandline +N_CLIENTS=5 +nvflare job create -force -j "./jobs/kaplan-meier-he" -w "kaplan_meier_he" -sd "./src" \ +-f config_fed_client.conf app_script="kaplan_meier_train.py" app_config="--data_root /tmp/flare/dataset/km_data --he_context_path /tmp/flare/he_context/he_context_client.txt" \ +-f config_fed_server.conf min_clients=${N_CLIENTS} he_context_path="/tmp/flare/he_context/he_context_server.txt" +``` + +And we can run the federated job: +```commandline +nvflare simulator -w workspace_km_he -n 5 -t 5 jobs/kaplan-meier-he +``` +By default, this will generate a KM curve image `km_curve_fl.png` under each client's directory. + +## Display Result + +By comparing the two curves, we can observe that the two are identical: +![KM survival baseline](figs/km_curve_baseline.png) +![KM survival fl](figs/km_curve_fl.png) diff --git a/examples/advanced/kaplan-meier-he/baseline_kaplan_meier.py b/examples/advanced/kaplan-meier-he/baseline_kaplan_meier.py new file mode 100644 index 0000000000..79e7b8052f --- /dev/null +++ b/examples/advanced/kaplan-meier-he/baseline_kaplan_meier.py @@ -0,0 +1,72 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse + +import matplotlib.pyplot as plt +import numpy as np +from lifelines import KaplanMeierFitter +from sksurv.datasets import load_veterans_lung_cancer + + +def args_parser(): + parser = argparse.ArgumentParser(description="Kaplan Meier Survival Analysis Baseline") + parser.add_argument( + "--output_curve_path", + type=str, + default="./km_curve_baseline.png", + help="save path for the output curve", + ) + return parser + + +def prepare_data(bin_days: int = 7): + data_x, data_y = load_veterans_lung_cancer() + total_data_num = data_x.shape[0] + print(f"Total data count: {total_data_num}") + event = data_y["Status"] + time = data_y["Survival_in_days"] + # Categorize data to a bin, default is a week (7 days) + time = np.ceil(time / bin_days).astype(int) + return event, time + + +def main(): + parser = args_parser() + args = parser.parse_args() + + # Set parameters + output_curve_path = args.output_curve_path + + # Generate data + event, time = prepare_data() + + # Fit and plot Kaplan Meier curve with lifelines + kmf = KaplanMeierFitter() + # Fit the survival data + kmf.fit(time, event) + # Plot and save the Kaplan-Meier survival curve + plt.figure() + plt.title("Baseline") + kmf.plot_survival_function() + plt.ylim(0, 1) + plt.ylabel("prob") + plt.xlabel("time") + plt.legend("", frameon=False) + plt.tight_layout() + plt.savefig(output_curve_path) + + +if __name__ == "__main__": + main() diff --git a/examples/advanced/kaplan-meier-he/figs/km_curve_baseline.png b/examples/advanced/kaplan-meier-he/figs/km_curve_baseline.png new file mode 100644 index 0000000000..34cdb9cabb Binary files /dev/null and b/examples/advanced/kaplan-meier-he/figs/km_curve_baseline.png differ diff --git a/examples/advanced/kaplan-meier-he/figs/km_curve_fl.png b/examples/advanced/kaplan-meier-he/figs/km_curve_fl.png new file mode 100644 index 0000000000..a4765d5654 Binary files /dev/null and b/examples/advanced/kaplan-meier-he/figs/km_curve_fl.png differ diff --git a/examples/advanced/kaplan-meier-he/job_templates/kaplan_meier_he/config_fed_client.conf b/examples/advanced/kaplan-meier-he/job_templates/kaplan_meier_he/config_fed_client.conf new file mode 100644 index 0000000000..0704590617 --- /dev/null +++ b/examples/advanced/kaplan-meier-he/job_templates/kaplan_meier_he/config_fed_client.conf @@ -0,0 +1,116 @@ +{ + # version of the configuration + format_version = 2 + + # This is the application script which will be invoked. Client can replace this script with user's own training script. + app_script = "kaplan_meier_train.py" + + # Additional arguments needed by the training code. For example, in lightning, these can be --trainer.batch_size=xxx. + app_config = "--data_root /tmp/flare/dataset/km_data --he_context_path /tmp/flare/he_context/he_context_client.txt" + + # Client Computing Executors. + executors = [ + { + # tasks the executors are defined to handle + tasks = ["train"] + + # This particular executor + executor { + + # This is an executor for Client API. The underline data exchange is using Pipe. + path = "nvflare.app_opt.pt.client_api_launcher_executor.ClientAPILauncherExecutor" + + args { + # launcher_id is used to locate the Launcher object in "components" + launcher_id = "launcher" + + # pipe_id is used to locate the Pipe object in "components" + pipe_id = "pipe" + + # Timeout in seconds for waiting for a heartbeat from the training script. Defaults to 30 seconds. + # Please refer to the class docstring for all available arguments + heartbeat_timeout = 60 + + # format of the exchange parameters + params_exchange_format = "raw" + + # if the transfer_type is FULL, then it will be sent directly + # if the transfer_type is DIFF, then we will calculate the + # difference VS received parameters and send the difference + params_transfer_type = "FULL" + + # if train_with_evaluation is true, the executor will expect + # the custom code need to send back both the trained parameters and the evaluation metric + # otherwise only trained parameters are expected + train_with_evaluation = false + } + } + } + ], + + # this defined an array of task data filters. If provided, it will control the data from server controller to client executor + task_data_filters = [] + + # this defined an array of task result filters. If provided, it will control the result from client executor to server controller + task_result_filters = [] + + components = [ + { + # component id is "launcher" + id = "launcher" + + # the class path of this component + path = "nvflare.app_common.launchers.subprocess_launcher.SubprocessLauncher" + + args { + # the launcher will invoke the script + script = "python3 custom/{app_script} {app_config} " + # if launch_once is true, the SubprocessLauncher will launch once for the whole job + # if launch_once is false, the SubprocessLauncher will launch a process for each task it receives from server + launch_once = true + } + } + { + id = "pipe" + path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" + args { + mode = "PASSIVE" + site_name = "{SITE_NAME}" + token = "{JOB_ID}" + root_url = "{ROOT_URL}" + secure_mode = "{SECURE_MODE}" + workspace_dir = "{WORKSPACE}" + } + } + { + id = "metrics_pipe" + path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" + args { + mode = "PASSIVE" + site_name = "{SITE_NAME}" + token = "{JOB_ID}" + root_url = "{ROOT_URL}" + secure_mode = "{SECURE_MODE}" + workspace_dir = "{WORKSPACE}" + } + }, + { + id = "metric_relay" + path = "nvflare.app_common.widgets.metric_relay.MetricRelay" + args { + pipe_id = "metrics_pipe" + event_type = "fed.analytix_log_stats" + # how fast should it read from the peer + read_interval = 0.1 + } + }, + { + # we use this component so the client api `flare.init()` can get required information + id = "config_preparer" + path = "nvflare.app_common.widgets.external_configurator.ExternalConfigurator" + args { + component_ids = ["metric_relay"] + } + } + ] +} diff --git a/examples/advanced/kaplan-meier-he/job_templates/kaplan_meier_he/config_fed_server.conf b/examples/advanced/kaplan-meier-he/job_templates/kaplan_meier_he/config_fed_server.conf new file mode 100644 index 0000000000..2589c856bd --- /dev/null +++ b/examples/advanced/kaplan-meier-he/job_templates/kaplan_meier_he/config_fed_server.conf @@ -0,0 +1,20 @@ +{ + # version of the configuration + format_version = 2 + task_data_filters =[] + task_result_filters = [] + + workflows = [ + { + id = "km" + path = "kaplan_meier_wf.KM" + args { + min_clients = 5 + he_context_path = "/tmp/flare/he_context/he_context_server.txt" + } + } + ] + + components = [] + +} diff --git a/examples/advanced/kaplan-meier-he/job_templates/kaplan_meier_he/info.conf b/examples/advanced/kaplan-meier-he/job_templates/kaplan_meier_he/info.conf new file mode 100644 index 0000000000..a3579091d0 --- /dev/null +++ b/examples/advanced/kaplan-meier-he/job_templates/kaplan_meier_he/info.conf @@ -0,0 +1,5 @@ +{ + description = "Kaplan-Meier survival analysis with homomorphic encryption" + execution_api_type = "client_api" + controller_type = "server" +} \ No newline at end of file diff --git a/examples/advanced/kaplan-meier-he/job_templates/kaplan_meier_he/info.md b/examples/advanced/kaplan-meier-he/job_templates/kaplan_meier_he/info.md new file mode 100644 index 0000000000..4d74281bf3 --- /dev/null +++ b/examples/advanced/kaplan-meier-he/job_templates/kaplan_meier_he/info.md @@ -0,0 +1,11 @@ +# Job Template Information Card + +## kaplan_meier_he + name = "kaplan_meier_he" + description = "Kaplan-Meier survival analysis with homomorphic encryption" + class_name = "KM" + controller_type = "server" + executor_type = "launcher_executor" + contributor = "NVIDIA" + init_publish_date = "2024-04-09" + last_updated_date = "2024-04-09" diff --git a/examples/advanced/kaplan-meier-he/job_templates/kaplan_meier_he/meta.conf b/examples/advanced/kaplan-meier-he/job_templates/kaplan_meier_he/meta.conf new file mode 100644 index 0000000000..624acb062d --- /dev/null +++ b/examples/advanced/kaplan-meier-he/job_templates/kaplan_meier_he/meta.conf @@ -0,0 +1,8 @@ +name = "kaplan_meier_he" +resource_spec {} +min_clients = 2 +deploy_map { + app = [ + "@ALL" + ] +} diff --git a/examples/advanced/kaplan-meier-he/requirements.txt b/examples/advanced/kaplan-meier-he/requirements.txt new file mode 100644 index 0000000000..e6d18ba9a3 --- /dev/null +++ b/examples/advanced/kaplan-meier-he/requirements.txt @@ -0,0 +1,3 @@ +lifelines +tenseal +scikit-survival diff --git a/examples/advanced/kaplan-meier-he/src/kaplan_meier_train.py b/examples/advanced/kaplan-meier-he/src/kaplan_meier_train.py new file mode 100644 index 0000000000..401c26aaf1 --- /dev/null +++ b/examples/advanced/kaplan-meier-he/src/kaplan_meier_train.py @@ -0,0 +1,195 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import base64 +import json +import os + +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import tenseal as ts +from lifelines import KaplanMeierFitter +from lifelines.utils import survival_table_from_events + +# (1) import nvflare client API +import nvflare.client as flare +from nvflare.app_common.abstract.fl_model import FLModel, ParamsType + + +# Client code +def read_data(file_name: str): + with open(file_name, "rb") as f: + data = f.read() + return base64.b64decode(data) + + +def details_save(kmf): + # Get the survival function at all observed time points + survival_function_at_all_times = kmf.survival_function_ + # Get the timeline (time points) + timeline = survival_function_at_all_times.index.values + # Get the KM estimate + km_estimate = survival_function_at_all_times["KM_estimate"].values + # Get the event count at each time point + event_count = kmf.event_table.iloc[:, 0].values # Assuming the first column is the observed events + # Get the survival rate at each time point (using the 1st column of the survival function) + survival_rate = 1 - survival_function_at_all_times.iloc[:, 0].values + # Return the results + results = { + "timeline": timeline.tolist(), + "km_estimate": km_estimate.tolist(), + "event_count": event_count.tolist(), + "survival_rate": survival_rate.tolist(), + } + file_path = os.path.join(os.getcwd(), "km_global.json") + print(f"save the details of KM analysis result to {file_path} \n") + with open(file_path, "w") as json_file: + json.dump(results, json_file, indent=4) + + +def plot_and_save(kmf): + # Plot and save the Kaplan-Meier survival curve + plt.figure() + plt.title("Federated HE") + kmf.plot_survival_function() + plt.ylim(0, 1) + plt.ylabel("prob") + plt.xlabel("time") + plt.legend("", frameon=False) + plt.tight_layout() + file_path = os.path.join(os.getcwd(), "km_curve_fl.png") + print(f"save the curve plot to {file_path} \n") + plt.savefig(file_path) + + +def main(): + parser = argparse.ArgumentParser(description="KM analysis") + parser.add_argument("--data_root", type=str, help="Root path for data files") + parser.add_argument("--he_context_path", type=str, help="Path for the HE context file") + args = parser.parse_args() + + flare.init() + + site_name = flare.get_site_name() + print(f"Kaplan-meier analysis for {site_name}") + + # get local data + data_path = os.path.join(args.data_root, site_name + ".csv") + data = pd.read_csv(data_path) + event_local = data["event"] + time_local = data["time"] + + # HE context + # In real-life application, HE context is prepared by secure provisioning + he_context_serial = read_data(args.he_context_path) + he_context = ts.context_from(he_context_serial) + + while flare.is_running(): + # receives global message from NVFlare + global_msg = flare.receive() + curr_round = global_msg.current_round + print(f"current_round={curr_round}") + + if curr_round == 1: + # First round: + # Empty payload from server, send max index back + # Condense local data to histogram + event_table = survival_table_from_events(time_local, event_local) + hist_idx = event_table.index.values.astype(int) + # Get the max index to be synced globally + max_hist_idx = max(hist_idx) + + # Send max to server + print(f"send max hist index for site = {flare.get_site_name()}") + model = FLModel(params={"max_idx": max_hist_idx}, params_type=ParamsType.FULL) + flare.send(model) + + elif curr_round == 2: + # Second round, get global max index + # Organize local histogram and encrypt + max_idx_global = global_msg.params["max_idx_global"] + print("Global Max Idx") + print(max_idx_global) + # Convert local table to uniform histogram + hist_obs = {} + hist_cen = {} + for idx in range(max_idx_global): + hist_obs[idx] = 0 + hist_cen[idx] = 0 + # assign values + idx = event_table.index.values.astype(int) + observed = event_table["observed"].to_numpy() + censored = event_table["censored"].to_numpy() + for i in range(len(idx)): + hist_obs[idx[i]] = observed[i] + hist_cen[idx[i]] = censored[i] + # Encrypt with tenseal using BFV scheme since observations are integers + hist_obs_he = ts.bfv_vector(he_context, list(hist_obs.values())) + hist_cen_he = ts.bfv_vector(he_context, list(hist_cen.values())) + # Serialize for transmission + hist_obs_he_serial = hist_obs_he.serialize() + hist_cen_he_serial = hist_cen_he.serialize() + # Send encrypted histograms to server + response = FLModel( + params={"hist_obs": hist_obs_he_serial, "hist_cen": hist_cen_he_serial}, params_type=ParamsType.FULL + ) + flare.send(response) + + elif curr_round == 3: + # Get global histograms + hist_obs_global_serial = global_msg.params["hist_obs_global"] + hist_cen_global_serial = global_msg.params["hist_cen_global"] + # Deserialize + hist_obs_global = ts.bfv_vector_from(he_context, hist_obs_global_serial) + hist_cen_global = ts.bfv_vector_from(he_context, hist_cen_global_serial) + # Decrypt + hist_obs_global = hist_obs_global.decrypt() + hist_cen_global = hist_cen_global.decrypt() + # Unfold histogram to event list + time_unfold = [] + event_unfold = [] + for i in range(max_idx_global): + for j in range(hist_obs_global[i]): + time_unfold.append(i) + event_unfold.append(True) + for k in range(hist_cen_global[i]): + time_unfold.append(i) + event_unfold.append(False) + time_unfold = np.array(time_unfold) + event_unfold = np.array(event_unfold) + + # Perform Kaplan-Meier analysis on global aggregated information + # Create a Kaplan-Meier estimator + kmf = KaplanMeierFitter() + + # Fit the model + kmf.fit(durations=time_unfold, event_observed=event_unfold) + + # Plot and save the KM curve + plot_and_save(kmf) + + # Save details of the KM result to a json file + details_save(kmf) + + # Send a simple response to server + response = FLModel(params={}, params_type=ParamsType.FULL) + flare.send(response) + + print(f"finish send for {site_name}, complete") + + +if __name__ == "__main__": + main() diff --git a/examples/advanced/kaplan-meier-he/src/kaplan_meier_wf.py b/examples/advanced/kaplan-meier-he/src/kaplan_meier_wf.py new file mode 100644 index 0000000000..1c9fdecaee --- /dev/null +++ b/examples/advanced/kaplan-meier-he/src/kaplan_meier_wf.py @@ -0,0 +1,131 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import logging +from typing import Dict + +import tenseal as ts + +from nvflare.app_common.abstract.fl_model import FLModel, ParamsType +from nvflare.app_common.workflows.wf_controller import WFController + +# Controller Workflow + + +class KM(WFController): + def __init__(self, min_clients: int, he_context_path: str): + super(KM, self).__init__() + self.logger = logging.getLogger(self.__class__.__name__) + self.min_clients = min_clients + self.he_context_path = he_context_path + self.num_rounds = 3 + + def run(self): + max_idx_results = self.start_fl_collect_max_idx() + global_res = self.aggr_max_idx(max_idx_results) + enc_hist_results = self.distribute_max_idx_collect_enc_stats(global_res) + hist_obs_global, hist_cen_global = self.aggr_he_hist(enc_hist_results) + _ = self.distribute_global_hist(hist_obs_global, hist_cen_global) + + def read_data(self, file_name: str): + with open(file_name, "rb") as f: + data = f.read() + return base64.b64decode(data) + + def start_fl_collect_max_idx(self): + self.logger.info("send initial message to all sites to start FL \n") + model = FLModel(params={}, start_round=1, current_round=1, total_rounds=self.num_rounds) + + results = self.send_model_and_wait(data=model) + return results + + def aggr_max_idx(self, sag_result: Dict[str, Dict[str, FLModel]]): + self.logger.info("aggregate max histogram index \n") + + if not sag_result: + raise RuntimeError("input is None or empty") + + max_idx_global = [] + for fl_model in sag_result: + max_idx = fl_model.params["max_idx"] + max_idx_global.append(max_idx) + # actual time point as index, so plus 1 for storage + return max(max_idx_global) + 1 + + def distribute_max_idx_collect_enc_stats(self, result: int): + self.logger.info("send global max_index to all sites \n") + + model = FLModel( + params={"max_idx_global": result}, + params_type=ParamsType.FULL, + start_round=1, + current_round=2, + total_rounds=self.num_rounds, + ) + + results = self.send_model_and_wait(data=model) + return results + + def aggr_he_hist(self, sag_result: Dict[str, Dict[str, FLModel]]): + self.logger.info("aggregate histogram within HE \n") + + # Load HE context + he_context_serial = self.read_data(self.he_context_path) + he_context = ts.context_from(he_context_serial) + + if not sag_result: + raise RuntimeError("input is None or empty") + + hist_obs_global = None + hist_cen_global = None + for fl_model in sag_result: + site = fl_model.meta.get("client_name", None) + hist_obs_he_serial = fl_model.params["hist_obs"] + hist_obs_he = ts.bfv_vector_from(he_context, hist_obs_he_serial) + hist_cen_he_serial = fl_model.params["hist_cen"] + hist_cen_he = ts.bfv_vector_from(he_context, hist_cen_he_serial) + + if not hist_obs_global: + print(f"assign global hist with result from {site}") + hist_obs_global = hist_obs_he + else: + print(f"add to global hist with result from {site}") + hist_obs_global += hist_obs_he + + if not hist_cen_global: + print(f"assign global hist with result from {site}") + hist_cen_global = hist_cen_he + else: + print(f"add to global hist with result from {site}") + hist_cen_global += hist_cen_he + + # return the two accumulated vectors, serialized for transmission + hist_obs_global_serial = hist_obs_global.serialize() + hist_cen_global_serial = hist_cen_global.serialize() + return hist_obs_global_serial, hist_cen_global_serial + + def distribute_global_hist(self, hist_obs_global_serial, hist_cen_global_serial): + self.logger.info("send global accumulated histograms within HE to all sites \n") + + model = FLModel( + params={"hist_obs_global": hist_obs_global_serial, "hist_cen_global": hist_cen_global_serial}, + params_type=ParamsType.FULL, + start_round=1, + current_round=3, + total_rounds=self.num_rounds, + ) + + results = self.send_model_and_wait(data=model) + return results diff --git a/examples/advanced/kaplan-meier-he/utils/prepare_data.py b/examples/advanced/kaplan-meier-he/utils/prepare_data.py new file mode 100644 index 0000000000..66684a1b4b --- /dev/null +++ b/examples/advanced/kaplan-meier-he/utils/prepare_data.py @@ -0,0 +1,88 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os + +import numpy as np +import pandas as pd +from sksurv.datasets import load_veterans_lung_cancer + +np.random.seed(77) + + +def data_split_args_parser(): + parser = argparse.ArgumentParser(description="Generate data split for dataset") + parser.add_argument("--site_num", type=int, default=5, help="Total number of sites, default is 5") + parser.add_argument( + "--site_name_prefix", + type=str, + default="site-", + help="Site name prefix, default is site-", + ) + parser.add_argument("--out_path", type=str, help="Output root path for split data files") + return parser + + +def prepare_data(data, site_num, bin_days: int = 7): + # Get total data count + total_data_num = data.shape[0] + print(f"Total data count: {total_data_num}") + # Get event and time + event = data["Status"] + time = data["Survival_in_days"] + # Categorize data to a bin, default is a week (7 days) + time = np.ceil(time / bin_days).astype(int) + # Shuffle data + idx = np.random.permutation(total_data_num) + # Split data to clients + event_clients = {} + time_clients = {} + for i in range(site_num): + start = int(i * total_data_num / site_num) + end = int((i + 1) * total_data_num / site_num) + event_i = event[idx[start:end]] + time_i = time[idx[start:end]] + event_clients["site-" + str(i + 1)] = event_i + time_clients["site-" + str(i + 1)] = time_i + return event_clients, time_clients + + +def main(): + parser = data_split_args_parser() + args = parser.parse_args() + + # Load data + # For this KM analysis, we use full timeline and event label only + _, data = load_veterans_lung_cancer() + + # Prepare data + event_clients, time_clients = prepare_data(data=data, site_num=args.site_num) + + # Save data to csv files + if not os.path.exists(args.out_path): + os.makedirs(args.out_path, exist_ok=True) + for site in range(args.site_num): + output_file = os.path.join(args.out_path, f"{args.site_name_prefix}{site + 1}.csv") + df = pd.DataFrame( + { + "event": event_clients["site-" + str(site + 1)], + "time": time_clients["site-" + str(site + 1)], + } + ) + df.to_csv(output_file, index=False) + + +if __name__ == "__main__": + main() diff --git a/examples/advanced/kaplan-meier-he/utils/prepare_he_context.py b/examples/advanced/kaplan-meier-he/utils/prepare_he_context.py new file mode 100644 index 0000000000..ceedf4c9a4 --- /dev/null +++ b/examples/advanced/kaplan-meier-he/utils/prepare_he_context.py @@ -0,0 +1,62 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import base64 +import os + +import tenseal as ts + + +def data_split_args_parser(): + parser = argparse.ArgumentParser(description="Generate HE context") + parser.add_argument("--scheme", type=str, default="BFV", help="HE scheme, default is BFV") + parser.add_argument("--poly_modulus_degree", type=int, default=4096, help="Poly modulus degree, default is 4096") + parser.add_argument("--out_path", type=str, help="Output root path for HE context files for client and server") + return parser + + +def write_data(file_name: str, data: bytes): + data = base64.b64encode(data) + with open(file_name, "wb") as f: + f.write(data) + + +def main(): + parser = data_split_args_parser() + args = parser.parse_args() + if args.scheme == "BFV": + scheme = ts.SCHEME_TYPE.BFV + # Generate HE context + context = ts.context(scheme, poly_modulus_degree=args.poly_modulus_degree, plain_modulus=1032193) + elif args.scheme == "CKKS": + scheme = ts.SCHEME_TYPE.CKKS + # Generate HE context, CKKS does not need plain_modulus + context = ts.context(scheme, poly_modulus_degree=args.poly_modulus_degree) + else: + raise ValueError("HE scheme not supported") + + # Save HE context to file for client + if not os.path.exists(args.out_path): + os.makedirs(args.out_path, exist_ok=True) + context_serial = context.serialize(save_secret_key=True) + write_data(os.path.join(args.out_path, "he_context_client.txt"), context_serial) + + # Save HE context to file for server + context_serial = context.serialize(save_secret_key=False) + write_data(os.path.join(args.out_path, "he_context_server.txt"), context_serial) + + +if __name__ == "__main__": + main() diff --git a/nvflare/app_common/workflows/wf_controller.py b/nvflare/app_common/workflows/wf_controller.py index 4847e1a45c..668bd6e348 100644 --- a/nvflare/app_common/workflows/wf_controller.py +++ b/nvflare/app_common/workflows/wf_controller.py @@ -23,13 +23,13 @@ class WFController(ModelController, ABC): def __init__( self, *args, - persistor_id: str = "persistor", + persistor_id: str = "", **kwargs, ): """Workflow Controller API for FLModel-based ModelController. Args: - persistor_id (str, optional): ID of the persistor component. Defaults to "persistor". + persistor_id (str, optional): ID of the persistor component. Defaults to "". """ super().__init__(*args, persistor_id, **kwargs)