Skip to content

Commit

Permalink
Introduce central DP with server-side adaptive clipping (#2909)
Browse files Browse the repository at this point in the history
Co-authored-by: Heng Pan <pan@flower.ai>
  • Loading branch information
mohammadnaseri and panh99 authored Feb 29, 2024
1 parent e960a21 commit d4ce2af
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 4 deletions.
14 changes: 11 additions & 3 deletions src/py/flwr/server/strategy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@


from .bulyan import Bulyan as Bulyan
from .dp_adaptive_clipping import DifferentialPrivacyClientSideAdaptiveClipping
from .dp_adaptive_clipping import (
DifferentialPrivacyClientSideAdaptiveClipping as DifferentialPrivacyClientSideAdaptiveClipping,
)
from .dp_adaptive_clipping import (
DifferentialPrivacyServerSideAdaptiveClipping as DifferentialPrivacyServerSideAdaptiveClipping,
)
from .dp_fixed_clipping import (
DifferentialPrivacyClientSideFixedClipping as DifferentialPrivacyClientSideFixedClipping,
)
from .dp_fixed_clipping import (
DifferentialPrivacyClientSideFixedClipping,
DifferentialPrivacyServerSideFixedClipping,
DifferentialPrivacyServerSideFixedClipping as DifferentialPrivacyServerSideFixedClipping,
)
from .dpfedavg_adaptive import DPFedAvgAdaptive as DPFedAvgAdaptive
from .dpfedavg_fixed import DPFedAvgFixed as DPFedAvgFixed
Expand All @@ -46,6 +53,7 @@
"DPFedAvgAdaptive",
"DPFedAvgFixed",
"DifferentialPrivacyClientSideAdaptiveClipping",
"DifferentialPrivacyServerSideAdaptiveClipping",
"DifferentialPrivacyClientSideFixedClipping",
"DifferentialPrivacyServerSideFixedClipping",
"FedAdagrad",
Expand Down
206 changes: 205 additions & 1 deletion src/py/flwr/server/strategy/dp_adaptive_clipping.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,19 @@

import numpy as np

from flwr.common import EvaluateIns, EvaluateRes, FitIns, FitRes, Parameters, Scalar
from flwr.common import (
EvaluateIns,
EvaluateRes,
FitIns,
FitRes,
NDArrays,
Parameters,
Scalar,
ndarrays_to_parameters,
parameters_to_ndarrays,
)
from flwr.common.differential_privacy import (
adaptive_clip_inputs_inplace,
add_gaussian_noise_to_params,
compute_adaptive_noise_params,
)
Expand All @@ -40,6 +51,199 @@
from flwr.server.strategy.strategy import Strategy


class DifferentialPrivacyServerSideAdaptiveClipping(Strategy):
"""Strategy wrapper for central DP with server-side adaptive clipping.
Parameters
----------
strategy: Strategy
The strategy to which DP functionalities will be added by this wrapper.
noise_multiplier : float
The noise multiplier for the Gaussian mechanism for model updates.
num_sampled_clients : int
The number of clients that are sampled on each round.
initial_clipping_norm : float
The initial value of clipping norm. Deafults to 0.1.
Andrew et al. recommends to set to 0.1.
target_clipped_quantile : float
The desired quantile of updates which should be clipped. Defaults to 0.5.
clip_norm_lr : float
The learning rate for the clipping norm adaptation. Defaults to 0.2.
Andrew et al. recommends to set to 0.2.
clipped_count_stddev : float
The standard deviation of the noise added to the count of updates below the estimate.
Andrew et al. recommends to set to `expected_num_records/20`
Examples
--------
Create a strategy:
>>> strategy = fl.server.strategy.FedAvg( ... )
Wrap the strategy with the DifferentialPrivacyServerSideAdaptiveClipping wrapper
>>> dp_strategy = DifferentialPrivacyServerSideAdaptiveClipping(
>>> strategy, cfg.noise_multiplier, cfg.num_sampled_clients, ...
>>> )
"""

# pylint: disable=too-many-arguments,too-many-instance-attributes
def __init__(
self,
strategy: Strategy,
noise_multiplier: float,
num_sampled_clients: int,
initial_clipping_norm: float = 0.1,
target_clipped_quantile: float = 0.5,
clip_norm_lr: float = 0.2,
clipped_count_stddev: Optional[float] = None,
) -> None:
super().__init__()

if strategy is None:
raise ValueError("The passed strategy is None.")

if noise_multiplier < 0:
raise ValueError("The noise multiplier should be a non-negative value.")

if num_sampled_clients <= 0:
raise ValueError(
"The number of sampled clients should be a positive value."
)

if initial_clipping_norm <= 0:
raise ValueError("The initial clipping norm should be a positive value.")

if not 0 <= target_clipped_quantile <= 1:
raise ValueError(
"The target clipped quantile must be between 0 and 1 (inclusive)."
)

if clip_norm_lr <= 0:
raise ValueError("The learning rate must be positive.")

if clipped_count_stddev is not None:
if clipped_count_stddev < 0:
raise ValueError("The `clipped_count_stddev` must be non-negative.")

self.strategy = strategy
self.num_sampled_clients = num_sampled_clients
self.clipping_norm = initial_clipping_norm
self.target_clipped_quantile = target_clipped_quantile
self.clip_norm_lr = clip_norm_lr
(
self.clipped_count_stddev,
self.noise_multiplier,
) = compute_adaptive_noise_params(
noise_multiplier,
num_sampled_clients,
clipped_count_stddev,
)

self.current_round_params: NDArrays = []

def __repr__(self) -> str:
"""Compute a string representation of the strategy."""
rep = "Differential Privacy Strategy Wrapper (Server-Side Adaptive Clipping)"
return rep

def initialize_parameters(
self, client_manager: ClientManager
) -> Optional[Parameters]:
"""Initialize global model parameters using given strategy."""
return self.strategy.initialize_parameters(client_manager)

def configure_fit(
self, server_round: int, parameters: Parameters, client_manager: ClientManager
) -> List[Tuple[ClientProxy, FitIns]]:
"""Configure the next round of training."""
self.current_round_params = parameters_to_ndarrays(parameters)
return self.strategy.configure_fit(server_round, parameters, client_manager)

def configure_evaluate(
self, server_round: int, parameters: Parameters, client_manager: ClientManager
) -> List[Tuple[ClientProxy, EvaluateIns]]:
"""Configure the next round of evaluation."""
return self.strategy.configure_evaluate(
server_round, parameters, client_manager
)

def aggregate_fit(
self,
server_round: int,
results: List[Tuple[ClientProxy, FitRes]],
failures: List[Union[Tuple[ClientProxy, FitRes], BaseException]],
) -> Tuple[Optional[Parameters], Dict[str, Scalar]]:
"""Aggregate training results and update clip norms."""
if failures:
return None, {}

if len(results) != self.num_sampled_clients:
log(
WARNING,
CLIENTS_DISCREPANCY_WARNING,
len(results),
self.num_sampled_clients,
)

norm_bit_set_count = 0
for _, res in results:
param = parameters_to_ndarrays(res.parameters)
# Compute and clip update
model_update = [
np.subtract(x, y) for (x, y) in zip(param, self.current_round_params)
]

norm_bit = adaptive_clip_inputs_inplace(model_update, self.clipping_norm)
norm_bit_set_count += norm_bit

for i, _ in enumerate(self.current_round_params):
param[i] = self.current_round_params[i] + model_update[i]
# Convert back to parameters
res.parameters = ndarrays_to_parameters(param)

# Noising the count
noised_norm_bit_set_count = float(
np.random.normal(norm_bit_set_count, self.clipped_count_stddev)
)
noised_norm_bit_set_fraction = noised_norm_bit_set_count / len(results)
# Geometric update
self.clipping_norm *= math.exp(
-self.clip_norm_lr
* (noised_norm_bit_set_fraction - self.target_clipped_quantile)
)

aggregated_params, metrics = self.strategy.aggregate_fit(
server_round, results, failures
)

# Add Gaussian noise to the aggregated parameters
if aggregated_params:
aggregated_params = add_gaussian_noise_to_params(
aggregated_params,
self.noise_multiplier,
self.clipping_norm,
self.num_sampled_clients,
)

return aggregated_params, metrics

def aggregate_evaluate(
self,
server_round: int,
results: List[Tuple[ClientProxy, EvaluateRes]],
failures: List[Union[Tuple[ClientProxy, EvaluateRes], BaseException]],
) -> Tuple[Optional[float], Dict[str, Scalar]]:
"""Aggregate evaluation losses using the given strategy."""
return self.strategy.aggregate_evaluate(server_round, results, failures)

def evaluate(
self, server_round: int, parameters: Parameters
) -> Optional[Tuple[float, Dict[str, Scalar]]]:
"""Evaluate model parameters using an evaluation function from the strategy."""
return self.strategy.evaluate(server_round, parameters)


class DifferentialPrivacyClientSideAdaptiveClipping(Strategy):
"""Strategy wrapper for central DP with client-side adaptive clipping.
Expand Down

0 comments on commit d4ce2af

Please sign in to comment.