Skip to content

Commit

Permalink
Add SubprocessLauncher + PTFilePipeLauncherExecutor + NVFlare Client …
Browse files Browse the repository at this point in the history
…API example
  • Loading branch information
YuanTingHsieh committed Jul 19, 2023
1 parent e9e527b commit 871f2d4
Show file tree
Hide file tree
Showing 12 changed files with 365 additions and 0 deletions.
24 changes: 24 additions & 0 deletions examples/advanced/ml-to-fl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# ML to FL trasition with NVFlare

Converting Machine Learning or Deep Learning to FL is not easy, as it involves:

1. Algorithms formulation, how to formulate a ML/DL to FL algorithm and what information needs to be pass between Client and Server

2. Convert existing standalone, centralized ML/DL code to FL code.

3. Configure the workflow to use the newly changed code.

In this example, we assume #1 algorithm formulation is fixed (FedAvg).
We are showing #2, that is how to quickly convert the centralized DL to FL.
We will demonstrate different techniques depending the existing code structure and preferences.

For #3 one can reference to the config we have here and the documentation.

In this directory, we are providing job configurations to showcase how to utilizes
`LauncherExecutor`, `Launcher` and several NVFlare interfaces to simplify the
transition from your ML code to FL with NVFlare.


## Examples

- [client_api](./jobs/client_api/) Use `SubprocessLauncher`, `PTFilePipeLauncherExecutor` and NVFlare Client API to do federated learning with CIFAR10 dataset and PyTorch framework
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"exchange_path": "./",
"exchange_format": "pytorch",
"params_type": "DIFF",
"params_diff_func": "numerical_params_diff"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"format_version": 2,

"executors": [
{
"tasks": ["train"],
"executor": {
"name": "PTFilePipeLauncherExecutor",
"args": {
"launcher_id": "launcher"
}
}
}
],
"task_result_filters": [
],
"task_data_filters": [
],
"components": [
{
"id": "launcher",
"name": "SubprocessLauncher",
"args": {
"script": "python custom/cifar10.py --epochs 1"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"format_version": 2,

"server": {
"heart_beat_timeout": 600
},
"task_data_filters": [],
"task_result_filters": [],
"components": [
{
"id": "persistor",
"name": "PTFileModelPersistor",
"args": {
"model": {
"path": "net.Net"
}
}
},
{
"id": "shareable_generator",
"path": "nvflare.app_common.shareablegenerators.full_model_shareable_generator.FullModelShareableGenerator",
"args": {}
},
{
"id": "aggregator",
"path": "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator",
"args": {
"expected_data_kind": "WEIGHT_DIFF"
}
}
],
"workflows": [
{
"id": "scatter_and_gather",
"name": "ScatterAndGather",
"args": {
"min_clients" : 2,
"num_rounds" : 2,
"start_round": 0,
"wait_time_after_min_received": 0,
"aggregator_id": "aggregator",
"persistor_id": "persistor",
"shareable_generator_id": "shareable_generator",
"train_task_name": "train",
"train_timeout": 0
}
}
]
}
103 changes: 103 additions & 0 deletions examples/advanced/ml-to-fl/jobs/client_api/app/custom/cifar10.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from net import Net

import nvflare.client as flare

DATASET_PATH = "/tmp/nvflare/data"
device = "cuda:0"

transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
batch_size = 4

trainset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2)


net = Net()

# initializes NVFlare interface
flare.init(config="config/config_exchange.json")
input_model, input_meta = flare.receive_model()

# get model from NVFlare
net.load_state_dict(input_model)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)


net.to(device)
for epoch in range(2): # loop over the dataset multiple times

running_loss = 0.0
for i, data in enumerate(trainloader, 0):
# get the inputs; data is a list of [inputs, labels]
inputs, labels = data[0].to(device), data[1].to(device)

# zero the parameter gradients
optimizer.zero_grad()

# forward + backward + optimize
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

# print statistics
running_loss += loss.item()
if i % 2000 == 1999: # print every 2000 mini-batches
print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}")
running_loss = 0.0
break

print("Finished Training")


PATH = "./cifar_net.pth"
torch.save(net.state_dict(), PATH)


net = Net()
net.load_state_dict(input_model)
net.to(device)


correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
for data in testloader:
images, labels = data[0].to(device), data[1].to(device)
# calculate outputs by running images through the network
outputs = net(images)
# the class with the highest energy is what we choose as prediction
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()

print(f"Accuracy of the network on the 10000 test images: {100 * correct // total} %")

net.load_state_dict(torch.load(PATH))
flare.submit_metrics({"accuracy": 100 * correct // total})
flare.submit_model(net.cpu().state_dict())
37 changes: 37 additions & 0 deletions examples/advanced/ml-to-fl/jobs/client_api/app/custom/net.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import torch
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)

def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = torch.flatten(x, 1) # flatten all dimensions except batch
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
10 changes: 10 additions & 0 deletions examples/advanced/ml-to-fl/jobs/client_api/meta.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"name": "subprocess with file pipe",
"resource_spec": {},
"min_clients" : 2,
"deploy_map": {
"app": [
"@ALL"
]
}
}
3 changes: 3 additions & 0 deletions examples/advanced/ml-to-fl/prepare_data.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DATASET_ROOT="/tmp/cifar10"

python -c "import torchvision.datasets as datasets; datasets.CIFAR10(root='${DATASET_ROOT}', train=True, download=True)"
1 change: 1 addition & 0 deletions examples/advanced/ml-to-fl/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
nvflare[PT]>=2.4.0
73 changes: 73 additions & 0 deletions nvflare/app_opt/pt/file_pipe_launcher_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from typing import Optional

from nvflare.app_common.executors.file_pipe_launcher_executor import FilePipeLauncherExecutor
from nvflare.app_opt.pt.decomposers import TensorDecomposer
from nvflare.app_opt.pt.params_converter import NumpyToPTParamsConverter, PTToNumpyParamsConverter
from nvflare.fuel.utils import fobs


class PTFilePipeLauncherExecutor(FilePipeLauncherExecutor):
def __init__(
self,
data_exchange_path: Optional[str] = None,
pipe_id: Optional[str] = None,
pipe_name: str = "pipe",
launcher_id: Optional[str] = None,
launch_timeout: Optional[float] = None,
task_wait_time: Optional[float] = None,
task_read_wait_time: Optional[float] = 30.0,
result_poll_interval: float = 0.1,
read_interval: float = 0.1,
heartbeat_interval: float = 5.0,
heartbeat_timeout: float = 30.0,
workers: int = 1,
) -> None:
"""Initializes the PTFilePipeLauncherExecutor.
Args:
data_exchange_path (Optional[str]): Path used for data exchange. If None, "app_dir" will be used.
If pipe_id is provided, will use the Pipe gets from pipe_id.
pipe_id (Optional[str]): Identifier used to get the Pipe from NVFlare components.
pipe_name (str): Name of the pipe. Defaults to "pipe".
launcher_id (Optional[str]): Identifier used to get the Launcher from NVFlare components.
launch_timeout (Optional[float]): Timeout for the "launch" method to end. None means forever.
task_wait_time (Optional[float]): Time to wait for tasks to complete before exiting the executor.
task_read_wait_time (Optional[float]): Time to wait for task results from the pipe. Defaults to 30.0.
result_poll_interval (float): Interval for polling task results from the pipe. Defaults to 0.1.
read_interval (float): Interval for reading from the pipe. Defaults to 0.1.
heartbeat_interval (float): Interval for sending heartbeat to the peer. Defaults to 5.0.
heartbeat_timeout (float): Timeout for waiting for a heartbeat from the peer. Defaults to 30.0.
workers (int): Number of worker threads needed.
"""
super().__init__(
pipe_id=pipe_id,
pipe_name=pipe_name,
launcher_id=launcher_id,
launch_timeout=launch_timeout,
task_wait_time=task_wait_time,
task_read_wait_time=task_read_wait_time,
result_poll_interval=result_poll_interval,
read_interval=read_interval,
heartbeat_interval=heartbeat_interval,
heartbeat_timeout=heartbeat_timeout,
workers=workers,
)
fobs.register(TensorDecomposer)
self._data_exchange_path = data_exchange_path
self._from_nvflare_converter = NumpyToPTParamsConverter()
self._to_nvflare_converter = PTToNumpyParamsConverter()
29 changes: 29 additions & 0 deletions nvflare/app_opt/pt/params_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict

import torch

from nvflare.app_common.utils.fl_model_utils import ParamsConverter


class NumpyToPTParamsConverter(ParamsConverter):
def convert(self, params: Dict) -> Dict:
return {k: torch.as_tensor(v) for k, v in params.items()}


class PTToNumpyParamsConverter(ParamsConverter):
def convert(self, params: Dict) -> Dict:
return {k: v.cpu().numpy() for k, v in params.items()}
2 changes: 2 additions & 0 deletions tests/integration_test/example_registry.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ examples:
prepare_data_script: prepare_data.sh
- root: advanced/federated-statistics/df_stats
prepare_data_script: prepare_data.sh
- root: advanced/ml-to-fl
prepare_data_script: prepare_data.sh

0 comments on commit 871f2d4

Please sign in to comment.