Skip to content

Commit

Permalink
Add missing client api test jobs (NVIDIA#2535)
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanTingHsieh authored and nvidianz committed May 6, 2024
1 parent 38312a1 commit 49655e6
Show file tree
Hide file tree
Showing 16 changed files with 695 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
format_version = 2
app_script = "cifar10_structured_fl.py"
app_config = ""
executors = [
{
tasks = [
"train"
]
executor {
path = "nvflare.app_opt.pt.client_api_launcher_executor.PTClientAPILauncherExecutor"
args {
launcher_id = "launcher"
pipe_id = "pipe"
heartbeat_timeout = 60
params_exchange_format = "pytorch"
params_transfer_type = "DIFF"
train_with_evaluation = true
}
}
}
]
task_data_filters = []
task_result_filters = []
components = [
{
id = "launcher"
path = "nvflare.app_common.launchers.subprocess_launcher.SubprocessLauncher"
args {
script = "python3 -u custom/{app_script} {app_config} "
launch_once = true
}
}
{
id = "pipe"
path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe"
args {
mode = "PASSIVE"
site_name = "{SITE_NAME}"
token = "{JOB_ID}"
root_url = "{ROOT_URL}"
secure_mode = "{SECURE_MODE}"
workspace_dir = "{WORKSPACE}"
}
}
{
id = "metrics_pipe"
path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe"
args {
mode = "PASSIVE"
site_name = "{SITE_NAME}"
token = "{JOB_ID}"
root_url = "{ROOT_URL}"
secure_mode = "{SECURE_MODE}"
workspace_dir = "{WORKSPACE}"
}
}
{
id = "metric_relay"
path = "nvflare.app_common.widgets.metric_relay.MetricRelay"
args {
pipe_id = "metrics_pipe"
event_type = "fed.analytix_log_stats"
read_interval = 0.1
}
}
{
id = "config_preparer"
path = "nvflare.app_common.widgets.external_configurator.ExternalConfigurator"
args {
component_ids = [
"metric_relay"
]
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{
format_version = 2
task_data_filters = []
task_result_filters = []
model_class_path = "net.Net"
workflows = [
{
id = "scatter_and_gather"
path = "nvflare.app_common.workflows.scatter_and_gather.ScatterAndGather"
args {
min_clients = 2
num_rounds = 2
start_round = 0
wait_time_after_min_received = 0
aggregator_id = "aggregator"
persistor_id = "persistor"
shareable_generator_id = "shareable_generator"
train_task_name = "train"
train_timeout = 0
}
}
]
components = [
{
id = "persistor"
path = "nvflare.app_opt.pt.file_model_persistor.PTFileModelPersistor"
args {
model {
path = "{model_class_path}"
}
}
}
{
id = "shareable_generator"
path = "nvflare.app_common.shareablegenerators.full_model_shareable_generator.FullModelShareableGenerator"
args {}
}
{
id = "aggregator"
path = "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator"
args {
expected_data_kind = "WEIGHT_DIFF"
}
}
{
id = "model_selector"
path = "nvflare.app_common.widgets.intime_model_selector.IntimeModelSelector"
args {
key_metric = "accuracy"
}
}
{
id = "receiver"
path = "nvflare.app_opt.tracking.tb.tb_receiver.TBAnalyticsReceiver"
args {
events = [
"fed.analytix_log_stats"
]
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from net import Net

# (1) import nvflare client API
import nvflare.client as flare

# (optional) set a fix place so we don't need to download everytime
DATASET_PATH = "/tmp/nvflare/data"
# (optional) We change to use GPU to speed things up.
# if you want to use CPU, change DEVICE="cpu"
DEVICE = "cuda:0"
PATH = "./cifar_net.pth"


def main():
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

batch_size = 4

trainset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2)

net = Net()

# (2) initializes NVFlare client API
flare.init()

# (3) decorates with flare.train and load model from the first argument
# wraps training logic into a method
@flare.train
def train(input_model=None, total_epochs=2, lr=0.001):
net.load_state_dict(input_model.params)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.9)

# (optional) use GPU to speed things up
net.to(DEVICE)
# (optional) calculate total steps
steps = total_epochs * len(trainloader)

for epoch in range(total_epochs): # loop over the dataset multiple times

running_loss = 0.0
for i, data in enumerate(trainloader, 0):
# get the inputs; data is a list of [inputs, labels]
# (optional) use GPU to speed things up
inputs, labels = data[0].to(DEVICE), data[1].to(DEVICE)

# zero the parameter gradients
optimizer.zero_grad()

# forward + backward + optimize
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

# print statistics
running_loss += loss.item()
if i % 2000 == 1999: # print every 2000 mini-batches
print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}")
running_loss = 0.0

print("Finished Training")

torch.save(net.state_dict(), PATH)

# (4) construct trained FL model
output_model = flare.FLModel(params=net.cpu().state_dict(), meta={"NUM_STEPS_CURRENT_ROUND": steps})
return output_model

# (5) decorates with flare.evaluate and load model from the first argument
@flare.evaluate
def fl_evaluate(input_model=None):
return evaluate(input_weights=input_model.params)

# wraps evaluate logic into a method
def evaluate(input_weights):
net.load_state_dict(input_weights)
# (optional) use GPU to speed things up
net.to(DEVICE)

correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
for data in testloader:
# (optional) use GPU to speed things up
images, labels = data[0].to(DEVICE), data[1].to(DEVICE)
# calculate outputs by running images through the network
outputs = net(images)
# the class with the highest energy is what we choose as prediction
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()

# return evaluation metrics
return 100 * correct // total

while flare.is_running():
# (6) receives FLModel from NVFlare
input_model = flare.receive()
print(f"current_round={input_model.current_round}")

# (7) call fl_evaluate method before training
# to evaluate on the received/aggregated model
global_metric = fl_evaluate(input_model)
print(f"Accuracy of the global model on the 10000 test images: {global_metric} %")
# call train method
train(input_model, total_epochs=2, lr=0.001)
# call evaluate method
metric = evaluate(input_weights=torch.load(PATH))
print(f"Accuracy of the trained model on the 10000 test images: {metric} %")


if __name__ == "__main__":
main()
37 changes: 37 additions & 0 deletions tests/integration_test/data/jobs/decorator/app/custom/net.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import torch
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)

def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = torch.flatten(x, 1) # flatten all dimensions except batch
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
11 changes: 11 additions & 0 deletions tests/integration_test/data/jobs/decorator/meta.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
name = "decorator"
resource_spec {}
deploy_map {
app = [
"@ALL"
]
}
min_clients = 2
mandatory_clients = []
}
Loading

0 comments on commit 49655e6

Please sign in to comment.