Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Collaborator private attributes were accessible from Aggregator steps #31

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ def FedAvg(models, previous_global_model=None, dp_params=None):
for key, tensor in state.items():
per_layer_norms.append(torch.norm(tensor))

if torch.norm(torch.Tensor(per_layer_norms)) > dp_params["clip_norm"]:
if (
torch.norm(torch.Tensor(per_layer_norms))
> dp_params["clip_norm"]
):
raise ValueError(
f"The model with index {idx} had update whose "
+ "L2-norm was greater than clip norm."
Expand Down Expand Up @@ -305,7 +308,9 @@ def start(self):
self.sample_rate = self.dp_params["sample_rate"]
global_data_loader = DataLoader(
self.collaborators,
batch_size=int(self.sample_rate * float(len(self.collaborators))),
batch_size=int(
self.sample_rate * float(len(self.collaborators))
),
)
dp_data_loader = DPDataLoader.from_data_loader(
global_data_loader, distributed=False
Expand Down Expand Up @@ -333,10 +338,14 @@ def start(self):
exclude=["private"],
)
else:
print(f"No collaborator selected for training at Round: {self.round}")
print(
f"No collaborator selected for training at Round: {self.round}"
)
self.next(self.check_round_completion)
else:
print(f"No collaborator selected for training at Round: {self.round}")
print(
f"No collaborator selected for training at Round: {self.round}"
)
self.next(self.check_round_completion)

# Uncomment this if you don't have GPU in the machine and
Expand All @@ -350,9 +359,13 @@ def aggregated_model_validation(self):

# verifying that model went to the correct GPU device
assert next(self.model.parameters()).device == self.device
assert next(self.previous_global_model.parameters()).device == self.device
assert (
next(self.previous_global_model.parameters()).device == self.device
)

self.agg_validation_score = inference(self.model, self.test_loader, self.device)
self.agg_validation_score = inference(
self.model, self.test_loader, self.device
)
print(f"{self.input} value of {self.agg_validation_score}")
self.collaborator_name = self.input
self.next(self.train)
Expand Down Expand Up @@ -390,7 +403,8 @@ def train(self):

if self.clip_test:
optimizer_before_step_params = [
param.data for param in self.optimizer.param_groups()[0]["params"]
param.data
for param in self.optimizer.param_groups()[0]["params"]
]

self.optimizer.step(
Expand All @@ -405,7 +419,9 @@ def train(self):
if self.clip_test:
optimizer_after_step_params = [
param.data
for param in self.optimizer.param_groups()[0]["params"]
for param in self.optimizer.param_groups()[0][
"params"
]
]
clip_testing_on_optimizer_parameters(
optimizer_before_step_params,
Expand Down Expand Up @@ -451,7 +467,9 @@ def join(self, inputs):
f"Average aggregated model validation values = {self.aggregated_model_accuracy}"
)
print(f"Average training loss = {self.average_loss}")
print(f"Average local model validation values = {self.local_model_accuracy}")
print(
f"Average local model validation values = {self.local_model_accuracy}"
)
if self.dp_params is not None:
self.model = FedAvg(
[input.model.cpu() for input in inputs],
Expand All @@ -478,7 +496,9 @@ def join(self, inputs):
+ f"is epsilon={epsilon} (best alpha was: {best_alpha})."
)
print(20 * "#")
self.previous_global_model.load_state_dict(deepcopy(self.model.state_dict()))
self.previous_global_model.load_state_dict(
deepcopy(self.model.state_dict())
)
self.optimizers.update(
{input.collaborator_name: input.optimizer for input in inputs}
)
Expand All @@ -501,7 +521,9 @@ def check_round_completion(self):
if self.dp_params is not None:
global_data_loader = DataLoader(
self.collaborators,
batch_size=int(self.sample_rate * float(len(self.collaborators))),
batch_size=int(
self.sample_rate * float(len(self.collaborators))
),
)
dp_data_loader = DPDataLoader.from_data_loader(
global_data_loader, distributed=False
Expand Down Expand Up @@ -538,7 +560,9 @@ def check_round_completion(self):
)
self.next(self.check_round_completion)
else:
print(f"No collaborator selected for training at Round: {self.round}")
print(
f"No collaborator selected for training at Round: {self.round}"
)
self.next(self.check_round_completion)

@aggregator
Expand Down Expand Up @@ -608,8 +632,10 @@ def end(self):
),
}

local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators)
print(f"Local runtime collaborators = {local_runtime._collaborators}")
local_runtime = LocalRuntime(
aggregator=aggregator, collaborators=collaborators
)
print(f"Local runtime collaborators = {local_runtime.collaborators}")

top_model_accuracy = 0
model = Net()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ def end(self):
}

local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators)
print(f"Local runtime collaborators = {local_runtime._collaborators}")
print(f"Local runtime collaborators = {local_runtime.collaborators}")
best_model = None
initial_model = Net()
top_model_accuracy = 0
Expand Down
4 changes: 2 additions & 2 deletions openfl-tutorials/experimental/Privacy_Meter/cifar10_PM.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,8 +711,8 @@ def end(self):
# To activate the ray backend with parallel collaborator tasks run in their own process
# and exclusive GPUs assigned to tasks, set LocalRuntime with backend='ray':
local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators)

print(f'Local runtime collaborators = {local_runtime._collaborators}')
print(f'Local runtime collaborators = {local_runtime.collaborators}')

# change to the internal flow loop
model = Net()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@
"\n",
"local_runtime = LocalRuntime(\n",
" aggregator=aggregator, collaborators=collaborators, backend='single_process')\n",
"print(f'Local runtime collaborators = {local_runtime._collaborators}')\n",
"print(f'Local runtime collaborators = {local_runtime.collaborators}')\n",
"\n",
"epochs = 100\n",
"batch_num = 0\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
"\n",
"local_runtime = LocalRuntime(\n",
" aggregator=aggregator, collaborators=collaborators)\n",
"print(f'Local runtime collaborators = {local_runtime._collaborators}')\n",
"print(f'Local runtime collaborators = {local_runtime.collaborators}')\n",
"\n",
"vflow = VerticalFlow(checkpoint=True)\n",
"vflow.runtime = local_runtime\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@
" }\n",
"\n",
"local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='single_process')\n",
"print(f'Local runtime collaborators = {local_runtime._collaborators}')"
"print(f'Local runtime collaborators = {local_runtime.collaborators}')"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@
" }\n",
"\n",
"local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='single_process')\n",
"print(f'Local runtime collaborators = {local_runtime._collaborators}')"
"print(f'Local runtime collaborators = {local_runtime.collaborators}')"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@
"# The following is equivalent to\n",
"# local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, **backend='ray'**)\n",
"local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators)\n",
"print(f'Local runtime collaborators = {local_runtime._collaborators}')"
"print(f'Local runtime collaborators = {local_runtime.collaborators}')"
]
},
{
Expand Down Expand Up @@ -611,7 +611,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "openfl_github_collab_private",
"language": "python",
"name": "python3"
},
Expand All @@ -625,7 +625,12 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
"version": "3.8.15 (default, Nov 24 2022, 15:19:38) \n[GCC 11.2.0]"
},
"vscode": {
"interpreter": {
"hash": "a9b3ea793f0a9a343a81a73b472831cd604f7c2c0cb7677aa4f9120271015e80"
}
}
},
"nbformat": 4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@
" }\n",
"\n",
"local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators)\n",
"print(f\"Local runtime collaborators = {local_runtime._collaborators}\")"
"print(f\"Local runtime collaborators = {local_runtime.collaborators}\")"
]
},
{
Expand Down Expand Up @@ -854,9 +854,9 @@
],
"metadata": {
"kernelspec": {
"display_name": "py3.8",
"display_name": "Python 3",
"language": "python",
"name": "py3.8"
"name": "python3"
},
"language_info": {
"codemirror_mode": {
Expand All @@ -868,11 +868,11 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
"version": "3.8.10 (default, Jun 22 2022, 20:18:18) \n[GCC 9.4.0]"
},
"vscode": {
"interpreter": {
"hash": "91e603971e1614a722a6cf110d916339d8ca5d12a13bb23038132a65313451b3"
"hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1"
}
}
},
Expand Down
15 changes: 9 additions & 6 deletions openfl/experimental/runtime/local_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def __init__(
dp = kwargs.get("dashboard_port", 5252)
ray.init(dashboard_host=dh, dashboard_port=dp)
self.backend = backend
self.aggregator = aggregator
if aggregator is not None:
self.aggregator = aggregator
# List of envoys should be iterable, so that a subset can be selected at runtime
# The envoys is the superset of envoys that can be selected during the experiment
if collaborators is not None:
Expand All @@ -58,12 +59,12 @@ def collaborators(self):
"""
Return names of collaborators. Don't give direct access to private attributes
"""
return list(self._collaborators.keys())
return list(self.__collaborators.keys())

@collaborators.setter
def collaborators(self, collaborators):
"""Set LocalRuntime collaborators"""
self._collaborators = {
self.__collaborators = {
collaborator.name: collaborator for collaborator in collaborators
}

Expand Down Expand Up @@ -129,7 +130,7 @@ def execute_task(
# object will not contain private attributes of
# aggregator or other collaborators
clone.runtime = LocalRuntime(backend="single_process")
for name, attr in self._collaborators[
for name, attr in self.__collaborators[
clone.input
].private_attributes.items():
setattr(clone, name, attr)
Expand All @@ -155,9 +156,11 @@ def execute_task(
for col in selected_collaborators:
clone = FLSpec._clones[col]
func = clone.execute_next
for attr in self._collaborators[clone.input].private_attributes:
for attr in self.__collaborators[
clone.input
].private_attributes:
if hasattr(clone, attr):
self._collaborators[clone.input].private_attributes[
self.__collaborators[clone.input].private_attributes[
attr
] = getattr(clone, attr)
delattr(clone, attr)
Expand Down
2 changes: 1 addition & 1 deletion tests/github/experimental/testflow_exclude.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def end(self):
local_runtime = LocalRuntime(
aggregator=aggregator, collaborators=collaborators
)
print(f"Local runtime collaborators = {local_runtime._collaborators}")
print(f"Local runtime collaborators = {local_runtime.collaborators}")

flflow = TestFlowExclude(checkpoint=True)
flflow.runtime = local_runtime
Expand Down
2 changes: 1 addition & 1 deletion tests/github/experimental/testflow_include.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def end(self):
local_runtime = LocalRuntime(
aggregator=aggregator, collaborators=collaborators
)
print(f"Local runtime collaborators = {local_runtime._collaborators}")
print(f"Local runtime collaborators = {local_runtime.collaborators}")

flflow = TestFlowInclude(checkpoint=True)
flflow.runtime = local_runtime
Expand Down
2 changes: 1 addition & 1 deletion tests/github/experimental/testflow_include_exclude.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def end(self):
local_runtime = LocalRuntime(
aggregator=aggregator, collaborators=collaborators
)
print(f"Local runtime collaborators = {local_runtime._collaborators}")
print(f"Local runtime collaborators = {local_runtime.collaborators}")
flflow = TestFlowIncludeExclude(checkpoint=False)
flflow.runtime = local_runtime
for i in range(5):
Expand Down
2 changes: 1 addition & 1 deletion tests/github/experimental/testflow_internalloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def display_validate_errors(validate_flow_error):
local_runtime = LocalRuntime(
aggregator=aggregator, collaborators=collaborators
)
print(f"Local runtime collaborators = {local_runtime._collaborators}")
print(f"Local runtime collaborators = {local_runtime.collaborators}")

model = None
best_model = None
Expand Down
Loading