diff --git a/config/audit.yaml b/config/audit.yaml index edfb558e..d5b5d491 100644 --- a/config/audit.yaml +++ b/config/audit.yaml @@ -2,31 +2,26 @@ audit: # Configurations for auditing random_seed: 1234 # Integer specifying the random seed attack_list: rmia: - training_data_fraction: 0.4 # Fraction of the auxilary dataset to use for this attack (in each shadow model training) - attack_data_fraction: 0.1 # Fraction of auxiliary dataset to sample from during attack - num_shadow_models: 8 # Number of shadow models to train + training_data_fraction: 0.1 # Fraction of the auxilary dataset to use for this attack (in each shadow model training) + attack_data_fraction: 0.025 # Fraction of auxiliary dataset to sample from during attack + num_shadow_models: 2 # Number of shadow models to train online: False # perform online or offline attack - temperature: 2 - gamma: 2.0 - offline_a: 0.33 # parameter from which we compute p(x) from p_OUT(x) such that p_IN(x) = a p_OUT(x) + b. - offline_b: 0.66 qmia: training_data_fraction: 0.5 # Fraction of the auxilary dataset (data without train and test indices) to use for training the quantile regressor epochs: 5 # Number of training epochs for quantile regression population: attack_data_fraction: 0.1 # Fraction of the auxilary dataset to use for this attack - loss_traj: - training_distill_data_fraction : 0.2 # Fraction of the auxilary dataset to use for training the distillation models D_s = (1-D_KD)/2 - number_of_traj: 20 # Number of epochs (number of points in the loss trajectory) - attack_mode: "soft_label" # label_only, soft_label - attack_data_dir: "./leakpro_output/attack_objects/loss_traj" - mia_classifier_epochs: 100 lira: - training_data_fraction: 0.4 # Fraction of the auxilary dataset to use for this attack (in each shadow model training) + training_data_fraction: 0.1 # Fraction of the auxilary dataset to use for this attack (in each shadow model training) num_shadow_models: 8 # Number of shadow models to train - online: false # perform online or offline attack + online: False # perform online or offline attack fixed_variance: True # Use a fixed variance for the whole audit - + loss_traj: + training_distill_data_fraction : 0.2 # Fraction of the auxilary dataset to use for training the distillation models D_s = (1-D_KD)/2 + number_of_traj: 1 # Number of epochs (number of points in the loss trajectory) + label_only: "False" # True or False + attack_data_dir: "./leakpro_output/attack_objects/loss_traj" + mia_classifier_epochs: 10 report_log: "./leakpro_output/results" # Folder to save the auditing report config_log: "./leakpro_output/config" # Folder to save the configuration files @@ -36,21 +31,21 @@ audit: # Configurations for auditing split_method: "no_overlapping" # Method of creating the attack dataset target: + # Target model path module_path: "./leakpro/shadow_model_blueprints.py" model_class: "ResNet18" + # Data paths trained_model_path: "./target/target_model.pkl" trained_model_metadata_path: "./target/model_metadata.pkl" data_path: "./target/data/cinic10.pkl" - shadow_model: storage_path: "./leakpro_output/attack_objects/shadow_models" - # Path to a Python file with the shadow model architecture - module_path: "./leakpro/shadow_model_blueprints.py" - # # [Optional] Define a shadow model (if none, shadow model will follow the target model) + # Path to a Python file with the shadow model architecture + #module_path: "./leakpro/shadow_model_blueprints.py" # Name of the class to instantiate from the specified file - model_class_path: "ResNet18" #"ConvNet" + model_class: "ResNet18" #"ConvNet" optimizer: name: sgd #adam, sgd, rmsprop lr: 0.01 @@ -61,10 +56,10 @@ shadow_model: # Initialization parameters init_params: {} -distillation_target_model: - storage_path: "./leakpro_output/attack_objects/distillation_target_models" - module_path: "./leakpro/shadow_model_blueprints.py" - # model_class: "ConvNet" +distillation_model: + storage_path: "./leakpro_output/attack_objects/distillation_models" + #module_path: "./leakpro/shadow_model_blueprints.py" + #model_class: "ConvNet" optimizer: name: sgd #adam, sgd, rmsprop lr: 0.01 @@ -74,6 +69,9 @@ distillation_target_model: name: crossentropyloss # crossentropyloss, nllloss, mseloss # Initialization parameters init_params: {} +<<<<<<< HEAD + +======= trained_model_path: "./leakpro_output/attack_objects/distillation_target_models/distillation_model.pkl" trained_model_metadata_path: "./leakpro_output/attack_objects/distillation_target_models/model_metadata.pkl" data_path: "./leakpro_output/attack_objects/distillation_target_models/cinic10.pkl" @@ -94,5 +92,6 @@ distillation_shadow_model: trained_model_path: "./leakpro_output/attack_objects/distillation_shadow_models/distillation_model.pkl" trained_model_metadata_path: "./leakpro_output/attack_objects/distillation_shadow_models/model_metadata.pkl" data_path: "./leakpro_output/attack_objects/distillation_shadow_models/cinic10.pkl" +>>>>>>> main diff --git a/leakpro.py b/leakpro.py index e7568260..eb93b5e9 100644 --- a/leakpro.py +++ b/leakpro.py @@ -6,10 +6,9 @@ import time from pathlib import Path -import joblib import numpy as np import yaml -from torch import load, manual_seed +from torch import manual_seed from torch.utils.data import Subset import leakpro.dev_utils.train as utils @@ -23,7 +22,7 @@ prepare_train_test_datasets, ) from leakpro.reporting.utils import prepare_priavcy_risk_report -from leakpro.utils.input_handler import get_class_from_module, import_module_from_file +from leakpro.user_inputs.cifar10_input_handler import Cifar10InputHandler def setup_log(name: str, save_file: bool=True) -> logging.Logger: @@ -96,16 +95,13 @@ def generate_user_input(configs: dict, logger: logging.Logger)->None: if __name__ == "__main__": - - #args = "./config/adult.yaml" # noqa: ERA001 - # user_args = "./config/dev_config/cifar10.yaml" # noqa: ERA001 user_args = "./config/dev_config/cinic10.yaml" # noqa: ERA001 with open(user_args, "rb") as f: user_configs = yaml.safe_load(f) # Setup logger - logger = setup_log("analysis") + logger = setup_log("LeakPro", save_file=True) # Generate user input generate_user_input(user_configs, logger) # This is for developing purposes only @@ -126,41 +122,10 @@ def generate_user_input(configs: dict, logger: logging.Logger)->None: report_dir = f"{configs['audit']['report_log']}" Path(report_dir).mkdir(parents=True, exist_ok=True) - # Get the target metadata - target_model_metadata_path = f'{configs["target"]["trained_model_metadata_path"]}' - try: - with open(target_model_metadata_path, "rb") as f: - target_model_metadata = joblib.load(f) - except FileNotFoundError: - logger.error(f"Could not find the target model metadata at {target_model_metadata_path}") - - # Create a class instance of target model - target_module = import_module_from_file(configs["target"]["module_path"]) - target_model_blueprint = get_class_from_module(target_module, configs["target"]["model_class"]) - logger.info(f"Target model blueprint created from {configs['target']['model_class']} in {configs['target']['module_path']}") - - # Load the target model parameters into the blueprint - with open(configs["target"]["trained_model_path"], "rb") as f: - target_model = target_model_blueprint(**target_model_metadata["model_metadata"]["init_params"]) - target_model.load_state_dict(load(f)) - logger.info(f"Loaded target model from {configs['target']['trained_model_path']}") - - # Get the population dataset - try: - with open(configs["target"]["data_path"], "rb") as file: - population = joblib.load(file) - logger.info(f"Loaded population dataset from {configs['target']['data_path']}") - except FileNotFoundError: - logger.error(f"Could not find the population dataset at {configs['target']['data_path']}") - # ------------------------------------------------ - # Now we have the target model, its metadata, and the train/test dataset indices. - attack_scheduler = AttackScheduler( - population, - target_model, - target_model_metadata["model_metadata"], - configs, - logger, - ) + # Create user input handler + handler = Cifar10InputHandler(configs=configs, logger=logger) + + attack_scheduler = AttackScheduler(handler) audit_results = attack_scheduler.run_attacks() for attack_name in audit_results: diff --git a/leakpro/attacks/attack_scheduler.py b/leakpro/attacks/attack_scheduler.py index c0040b75..2ca56a4f 100644 --- a/leakpro/attacks/attack_scheduler.py +++ b/leakpro/attacks/attack_scheduler.py @@ -1,12 +1,9 @@ """Module that contains the AttackScheduler class, which is responsible for creating and executing attacks.""" -import logging - -from torch import nn from leakpro.attacks.mia_attacks.abstract_mia import AbstractMIA from leakpro.attacks.mia_attacks.attack_factory_mia import AttackFactoryMIA -from leakpro.dataset import GeneralDataset from leakpro.import_helper import Any, Dict, Self +from leakpro.user_inputs.abstract_input_handler import AbstractInputHandler class AttackScheduler: @@ -16,47 +13,38 @@ class AttackScheduler: def __init__( self:Self, - population:GeneralDataset, - target_model:nn.Module, - target_model_metadata:Dict[str, Any], - configs:Dict[str, Any], - logger:logging.Logger + handler: AbstractInputHandler, ) -> None: """Initialize the AttackScheduler class. Args: ---- - population (GeneralDataset): The population dataset. - target_model (torch.nn.Module): The target model. - target_model_metadata (Dict[str, Any]): The metadata of the target model. - configs (Dict[str, Any]): The configurations. - logger (logging.Logger): The logger object. + handler (AbstractInputHandler): The handler object that contains the user inputs. """ + configs = handler.configs if configs["audit"]["attack_type"] not in list(self.attack_type_to_factory.keys()): raise ValueError( f"Unknown attack type: {configs['audit']['attack_type']}. " f"Supported attack types: {self.attack_type_to_factory.keys()}" ) - # Prepare factory with shared items + # Prepare factory factory = self.attack_type_to_factory[configs["audit"]["attack_type"]] - factory.set_population_and_audit_data(population,target_model_metadata) - factory.set_target_model_and_loss(target_model, nn.CrossEntropyLoss()) #TODO: Enable arbitrary loss functions - factory.set_logger(logger) - self.logger = logger + self.logger = handler.logger # Create the attacks self.attack_list = list(configs["audit"]["attack_list"].keys()) self.attacks = [] for attack_name in self.attack_list: try: - attack = factory.create_attack(attack_name, configs) + attack = factory.create_attack(attack_name, handler) self.add_attack(attack) self.logger.info(f"Added attack: {attack_name}") except ValueError as e: - logger.info(e) + self.logger.info(e) + self.logger.info(f"Failed to create attack: {attack_name}, supported attacks: {factory.attack_classes.keys()}") def add_attack(self:Self, attack: AbstractMIA) -> None: """Add an attack to the list of attacks.""" @@ -77,7 +65,7 @@ def run_attacks(self:Self) -> Dict[str, Any]: self.logger.info(f"Finished attack: {attack_type}") return results - def identify_attacks(self:Self) -> None: + def map_setting_to_attacks(self:Self) -> None: """Identify relevant attacks based on adversary setting.""" # TODO: Implement this mapping and remove attack list from configs pass diff --git a/leakpro/attacks/mia_attacks/abstract_mia.py b/leakpro/attacks/mia_attacks/abstract_mia.py index c1fd658d..f7d04ef0 100644 --- a/leakpro/attacks/mia_attacks/abstract_mia.py +++ b/leakpro/attacks/mia_attacks/abstract_mia.py @@ -1,13 +1,14 @@ """Module that contains the abstract class for constructing and performing a membership inference attack on a target.""" from abc import ABC, abstractmethod -from logging import Logger import numpy as np -from torch import nn +from torch.utils.data import DataLoader from leakpro.import_helper import List, Self, Union from leakpro.metrics.attack_result import AttackResult +from leakpro.model import PytorchModel +from leakpro.user_inputs.abstract_input_handler import AbstractInputHandler ######################################################################################################################## # METRIC CLASS @@ -20,30 +21,128 @@ class AbstractMIA(ABC): This serves as a guideline for implementing a metric to be used for measuring the privacy leakage of a target model. """ + # Class attributes for sharing between the different attacks + population = None + population_size = None + target_model = None + audit_dataset = None + handler=None + _initialized = False + def __init__( self:Self, - population: np.ndarray, - audit_dataset: dict, - target_model: nn.Module, - logger:Logger + handler: AbstractInputHandler, )->None: """Initialize the AttackAbstract class. Args: ---- - population (np.ndarray): The population used for the attack. - audit_dataset (dict): The audit dataset used for the attack. - target_model (nn.Module): The target model used for the attack. - logger (Logger): The logger used for logging. + handler (AbstractInputHandler): The input handler object. """ - self._population = population - self._population_size = len(population) - self._target_model = target_model - self._audit_dataset = audit_dataset - self.logger = logger + # These objects are shared and should be initialized only once + if not AbstractMIA._initialized: + AbstractMIA.population = handler.population + AbstractMIA.population_size = handler.population_size + AbstractMIA.target_model = PytorchModel(handler.target_model, handler.get_criterion()) + AbstractMIA.audit_dataset = { + # Assuming train_indices and test_indices are arrays of indices, not the actual data + "data": np.concatenate((handler.train_indices, handler.test_indices)), + # in_members will be an array from 0 to the number of training indices - 1 + "in_members": np.arange(len(handler.train_indices)), + # out_members will start after the last training index and go up to the number of test indices - 1 + "out_members": np.arange(len(handler.train_indices),len(handler.train_indices)+len(handler.test_indices)), + } + AbstractMIA.handler = handler + self._validate_shared_quantities() + AbstractMIA._initialized = True + + # These objects are instance specific + self.logger = handler.logger self.signal_data = [] + def _validate_shared_quantities(self:Self)->None: + """Validate the shared quantities used by the attack.""" + if AbstractMIA.population is None: + raise ValueError("Population dataset not found.") + if AbstractMIA.population_size is None: + raise ValueError("Population size not found.") + if AbstractMIA.population_size != len(AbstractMIA.population): + raise ValueError("Population size does not match the population dataset.") + if len(AbstractMIA.audit_dataset["in_members"]) == 0: + raise ValueError("Train indices must be provided.") + if len(AbstractMIA.audit_dataset["out_members"]) == 0: + raise ValueError("Test indices must be provided.") + if AbstractMIA.target_model is None: + raise ValueError("Target model not found.") + if AbstractMIA.audit_dataset is None: + raise ValueError("Audit dataset not found.") + + def sample_indices_from_population( + self:Self, + *, + include_train_indices: bool = False, + include_test_indices: bool = False + ) -> np.ndarray: + """Function to get attack data indices from the population. + + Args: + ---- + include_train_indices (bool): Flag indicating whether to include train data in data. + include_test_indices (bool): Flag indicating whether to include test data in data. + + Returns: + ------- + np.ndarray: The selected attack data indices. + + """ + all_index = np.arange(AbstractMIA.population_size) + + not_allowed_indices = np.array([]) + if not include_train_indices: + not_allowed_indices = np.hstack([not_allowed_indices, self.handler.train_indices]) + + if not include_test_indices: + not_allowed_indices = np.hstack([not_allowed_indices, self.handler.test_indices]) + + available_index = np.setdiff1d(all_index, not_allowed_indices) + data_size = len(available_index) + return np.random.choice(available_index, data_size, replace=False) + + + def get_dataloader(self:Self, data:np.ndarray, batch_size:int=None)->DataLoader: + """Function to get a dataloader from the dataset. + + Args: + ---- + data (np.ndarray): The dataset indices to sample from. + batch_size (int): batch size. + + Returns: + ------- + Dataloader: The sampled data. + + """ + return self.handler.get_dataloader(data) if batch_size is None else self.handler.get_dataloader(data, batch_size) + + def sample_data_from_dataset(self:Self, data:np.ndarray, size:int)->DataLoader: + """Function to sample from the dataset. + + Args: + ---- + data (np.ndarray): The dataset indices to sample from. + size (int): The size of the sample. + + Returns: + ------- + Dataloader: The sampled data. + + """ + if size > len(data): + raise ValueError("Size of the sample is greater than the size of the data.") + return self.get_dataloader(np.random.choice(data, size, replace=False)) + + @property def population(self:Self)-> List: """Get the population used for the attack. @@ -53,7 +152,7 @@ def population(self:Self)-> List: List: The population used for the attack. """ - return self._population + return AbstractMIA.population @property def population_size(self:Self)-> int: @@ -64,7 +163,7 @@ def population_size(self:Self)-> int: int: The size of the population used for the attack. """ - return self._population_size + return AbstractMIA.population_size @property def target_model(self:Self)-> Union[Self, List[Self] ]: @@ -75,7 +174,7 @@ def target_model(self:Self)-> Union[Self, List[Self] ]: Union[Self, List[Self]]: The target model used for the attack. """ - return self._target_model + return AbstractMIA.target_model @property def audit_dataset(self:Self)-> Self: @@ -86,7 +185,7 @@ def audit_dataset(self:Self)-> Self: Self: The audit dataset used for the attack. """ - return self._audit_dataset + return AbstractMIA.audit_dataset @property def train_indices(self:Self)-> np.ndarray: @@ -97,8 +196,7 @@ def train_indices(self:Self)-> np.ndarray: np.ndarray: The training indices of the audit dataset. """ - train_indices = self._audit_dataset["in_members"] - return self._audit_dataset["data"][train_indices] + return AbstractMIA.audit_dataset["in_members"] @property @@ -110,8 +208,7 @@ def test_indices(self:Self)-> np.ndarray: np.ndarray: The test indices of the audit dataset. """ - test_indices = self._audit_dataset["out_members"] - return self._audit_dataset["data"][test_indices] + return AbstractMIA.audit_dataset["out_members"] @abstractmethod def _configure_attack(self:Self, configs:dict)->None: diff --git a/leakpro/attacks/mia_attacks/attack_factory_mia.py b/leakpro/attacks/mia_attacks/attack_factory_mia.py index 996bc64a..430eebca 100644 --- a/leakpro/attacks/mia_attacks/attack_factory_mia.py +++ b/leakpro/attacks/mia_attacks/attack_factory_mia.py @@ -1,8 +1,4 @@ """Module that contains the AttackFactory class which is responsible for creating the attack objects.""" -from logging import Logger - -import numpy as np -from torch import nn from leakpro.attacks.mia_attacks.abstract_mia import AbstractMIA from leakpro.attacks.mia_attacks.attack_p import AttackP @@ -10,9 +6,9 @@ from leakpro.attacks.mia_attacks.loss_trajectory import AttackLossTrajectory from leakpro.attacks.mia_attacks.qmia import AttackQMIA from leakpro.attacks.mia_attacks.rmia import AttackRMIA -from leakpro.attacks.utils.distillation_model_handler import DistillationShadowModelHandler, DistillationTargetModelHandler +from leakpro.attacks.utils.distillation_model_handler import DistillationModelHandler from leakpro.attacks.utils.shadow_model_handler import ShadowModelHandler -from leakpro.model import PytorchModel +from leakpro.user_inputs.abstract_input_handler import AbstractInputHandler class AttackFactoryMIA: @@ -27,64 +23,17 @@ class AttackFactoryMIA: } # Shared variables for all attacks - population = None - audit_dataset = None - target_model = None - target_metadata = None - logger = None shadow_model_handler = None - distillation_target_model_handler = None - distillation_shadow_model_handler = None - - @staticmethod - def set_population_and_audit_data(population:np.ndarray, target_metadata:dict) -> None: - """Initialize the population dataset.""" - if AttackFactoryMIA.population is None: - AttackFactoryMIA.population = population - - if AttackFactoryMIA.target_metadata is None: - AttackFactoryMIA.target_metadata = target_metadata - - if AttackFactoryMIA.audit_dataset is None: - AttackFactoryMIA.audit_dataset = { - # Assuming train_indices and test_indices are arrays of indices, not the actual data - "data": np.concatenate( - ( - target_metadata["train_indices"], - target_metadata["test_indices"], - ) - ), - # in_members will be an array from 0 to the number of training indices - 1 - "in_members": np.arange(len(target_metadata["train_indices"])), - # out_members will start after the last training index and go up to the number of test indices - 1 - "out_members": np.arange( - len(target_metadata["train_indices"]), - len(target_metadata["train_indices"]) - + len(target_metadata["test_indices"]), - ), - } - - @staticmethod - def set_target_model_and_loss(target_model:nn.Module, criterion:nn.Module) -> None: - """Set the target model.""" - if AttackFactoryMIA.target_model is None: - AttackFactoryMIA.target_model = PytorchModel(target_model, criterion) - - @staticmethod - def set_logger(logger:Logger) -> None: - """Set the logger for the AttackFactoryMIA class.""" - if AttackFactoryMIA.logger is None: - AttackFactoryMIA.logger = logger + distillation_model_handler = None @classmethod - def create_attack(cls, name: str, configs: dict) -> AbstractMIA: # noqa: ANN102 - """Create an attack object based on the given name, attack_utils, and configs. + def create_attack(cls, name: str, handler: AbstractInputHandler) -> AbstractMIA: # noqa: ANN102 + """Create the attack object. Args: ---- name (str): The name of the attack. - attack_utils (AttackUtils): An instance of AttackUtils. - configs (dict): The attack configurations. + handler (AbstractInputHandler): The input handler object. Returns: ------- @@ -95,47 +44,15 @@ def create_attack(cls, name: str, configs: dict) -> AbstractMIA: # noqa: ANN102 ValueError: If the attack type is unknown. """ - if AttackFactoryMIA.population is None: - raise ValueError("Population data has not been set") - if AttackFactoryMIA.audit_dataset is None: - raise ValueError("Audit data has not been set") - if AttackFactoryMIA.target_model is None: - raise ValueError("Target model has not been set") - if AttackFactoryMIA.logger is None: - raise ValueError("Logger has not been set") if AttackFactoryMIA.shadow_model_handler is None: - AttackFactoryMIA.logger.info("Creating shadow model handler singleton") - shadow_configs = configs.get("shadow_model", {}) - AttackFactoryMIA.shadow_model_handler = ShadowModelHandler( - AttackFactoryMIA.target_model, - AttackFactoryMIA.target_metadata, - shadow_configs, - AttackFactoryMIA.logger - ) - if AttackFactoryMIA.distillation_target_model_handler is None: - AttackFactoryMIA.logger.info("Creating distillation model handler singleton for the target model") - distillation_configs = configs.get("distillation_target_model", {}) - AttackFactoryMIA.distillation_target_model_handler = DistillationTargetModelHandler( - AttackFactoryMIA.target_model, - AttackFactoryMIA.target_metadata, - distillation_configs, - AttackFactoryMIA.logger - ) - if AttackFactoryMIA.distillation_shadow_model_handler is None: - AttackFactoryMIA.logger.info("Creating distillation model handler singleton for the shadow model") - distillation_configs = configs.get("distillation_shadow_model", {}) - AttackFactoryMIA.distillation_shadow_model_handler = DistillationShadowModelHandler( - distillation_configs, - AttackFactoryMIA.logger - ) + handler.logger.info("Creating shadow model handler singleton") + AttackFactoryMIA.shadow_model_handler = ShadowModelHandler(handler) + + if AttackFactoryMIA.distillation_model_handler is None: + handler.logger.info("Creating distillation model handler singleton") + AttackFactoryMIA.distillation_model_handler = DistillationModelHandler(handler) if name in cls.attack_classes: - return cls.attack_classes[name]( - AttackFactoryMIA.population, - AttackFactoryMIA.audit_dataset, - AttackFactoryMIA.target_model, - AttackFactoryMIA.logger, - configs["audit"]["attack_list"][name] - ) + return cls.attack_classes[name](handler, handler.configs["audit"]["attack_list"][name]) raise ValueError(f"Unknown attack type: {name}") diff --git a/leakpro/attacks/mia_attacks/attack_p.py b/leakpro/attacks/mia_attacks/attack_p.py index 54340635..02b8b881 100644 --- a/leakpro/attacks/mia_attacks/attack_p.py +++ b/leakpro/attacks/mia_attacks/attack_p.py @@ -1,15 +1,13 @@ """Module that contains the implementation of the attack P.""" -from logging import Logger import numpy as np -from torch import nn from leakpro.attacks.mia_attacks.abstract_mia import AbstractMIA -from leakpro.attacks.utils.attack_data import get_attack_data from leakpro.attacks.utils.threshold_computation import linear_itp_threshold_func from leakpro.import_helper import Self from leakpro.metrics.attack_result import CombinedMetricResult from leakpro.signals.signal import ModelLoss +from leakpro.user_inputs.abstract_input_handler import AbstractInputHandler class AttackP(AbstractMIA): @@ -17,25 +15,19 @@ class AttackP(AbstractMIA): def __init__( self:Self, - population: np.ndarray, - audit_dataset: dict, - target_model: nn.Module, - logger:Logger, + handler: AbstractInputHandler, configs: dict ) -> None: """Initialize the AttackP class. Args: ---- - population (np.ndarray): The population data used for the attack. - audit_dataset (dict): The audit dataset used for the attack. - target_model (nn.Module): The target model to be attacked. - logger (Logger): The logger object for logging. + handler (AbstractInputHandler): The input handler object. configs (dict): A dictionary containing the attack configurations. """ # Initializes the parent metric - super().__init__(population, audit_dataset, target_model, logger) + super().__init__(handler) self.signal = ModelLoss() self.hypothesis_test_func = linear_itp_threshold_func @@ -47,9 +39,7 @@ def _configure_attack(self:Self, configs:dict) -> None: self.attack_data_fraction = configs.get("attack_data_fraction", 0.5) # Define the validation dictionary as: {parameter_name: (parameter, min_value, max_value)} - validation_dict = { - "attack_data_fraction": (self.attack_data_fraction, 0.01, 1) - } + validation_dict = {"attack_data_fraction": (self.attack_data_fraction, 0.01, 1)} # Validate parameters for param_name, (param_value, min_val, max_val) in validation_dict.items(): @@ -82,26 +72,15 @@ def prepare_attack(self:Self) -> None: """Prepare data needed for running the metric on the target model and dataset.""" # sample dataset to compute histogram self.logger.info("Preparing attack data for training the Population attack") - self.attack_data_index = get_attack_data( - self.population_size, - self.train_indices, - self.test_indices, - train_data_included_in_auxiliary_data = False, - test_data_included_in_auxiliary_data = False, - logger = self.logger - ) + self.attack_data_indices = self.sample_indices_from_population(include_train_indices = False, + include_test_indices = False) # subsample the attack data based on the fraction - self.logger.info(f"Subsampling attack data from {len(self.attack_data_index)} points") - self.attack_data_index = np.random.choice( - self.attack_data_index, - int(self.attack_data_fraction * len(self.attack_data_index)), - replace=False - ) - self.logger.info(f"Number of attack data points after subsampling: {len(self.attack_data_index)}") + self.logger.info(f"Subsampling attack data from {len(self.attack_data_indices)} points") + n_points = int(self.attack_data_fraction * len(self.attack_data_indices)) + attack_data = self.sample_data_from_dataset(self.attack_data_indices, n_points).dataset + self.logger.info(f"Number of attack data points after subsampling: {len(attack_data)}") - attack_data = self.population.subset(self.attack_data_index) - # Load signals if they have been computed already; otherwise, compute and save them # signals based on training dataset self.attack_signal = np.array(self.signal([self.target_model], attack_data)) @@ -127,7 +106,7 @@ def run_attack(self:Self) -> CombinedMetricResult: self.logger.info("Running the Population attack on the target model") # get the loss for the audit dataset - audit_data = self.population.subset(self.audit_dataset["data"]) + audit_data = self.get_dataloader(self.audit_dataset["data"]).dataset audit_signal = np.array(self.signal([self.target_model], audit_data)).squeeze() # pick out the in-members and out-members diff --git a/leakpro/attacks/mia_attacks/lira.py b/leakpro/attacks/mia_attacks/lira.py index 9deed47f..adf8f52a 100644 --- a/leakpro/attacks/mia_attacks/lira.py +++ b/leakpro/attacks/mia_attacks/lira.py @@ -1,43 +1,34 @@ """Implementation of the LiRA attack.""" -from logging import Logger - import numpy as np from scipy.stats import norm -from torch import nn from tqdm import tqdm from leakpro.attacks.mia_attacks.abstract_mia import AbstractMIA -from leakpro.attacks.utils.attack_data import get_attack_data from leakpro.attacks.utils.shadow_model_handler import ShadowModelHandler from leakpro.import_helper import Self from leakpro.metrics.attack_result import CombinedMetricResult from leakpro.signals.signal import ModelRescaledLogits +from leakpro.user_inputs.abstract_input_handler import AbstractInputHandler class AttackLiRA(AbstractMIA): """Implementation of the LiRA attack.""" def __init__(self:Self, - population: np.ndarray, - audit_dataset: dict, - target_model: nn.Module, - logger:Logger, + handler: AbstractInputHandler, configs: dict ) -> None: """Initialize the LiRA attack. Args: ---- - population (np.ndarray): The population data used for the attack. - audit_dataset (dict): The audit dataset used for the attack. - target_model (nn.Module): The target model to be attacked. - logger (Logger): The logger object for logging. + handler (AbstractInputHandler): The input handler object. configs (dict): Configuration parameters for the attack. """ # Initializes the parent metric - super().__init__(population, audit_dataset, target_model, logger) + super().__init__(handler) self.signal = ModelRescaledLogits() self._configure_attack(configs) @@ -100,28 +91,21 @@ def prepare_attack(self:Self)->None: of the audit dataset, prepares the data for evaluation, and computes the logits for both shadow models and the target model. """ - self.attack_data_index = get_attack_data( - self.population_size, - self.train_indices, - self.test_indices, - train_data_included_in_auxiliary_data=self.include_train_data, - test_data_included_in_auxiliary_data=self.include_test_data, - logger = self.logger - ) + self.attack_data_indices = self.sample_indices_from_population(include_train_indices = self.online, + include_test_indices = self.online) - ShadowModelHandler().create_shadow_models( - self.num_shadow_models, - self.population, - self.attack_data_index, - self.training_data_fraction, - ) - self.shadow_models, _ = ShadowModelHandler().get_shadow_models(self.num_shadow_models) + self.shadow_model_indices = ShadowModelHandler().create_shadow_models(num_models = self.num_shadow_models, + shadow_population = self.attack_data_indices, + training_fraction = self.training_data_fraction, + online = self.online) + + self.shadow_models, _ = ShadowModelHandler().get_shadow_models(self.shadow_model_indices) self.logger.info("Create masks for all IN samples") - self.in_indices_mask = ShadowModelHandler().get_in_indices_mask(self.num_shadow_models, self.audit_dataset["data"]) + self.in_indices_mask = ShadowModelHandler().get_in_indices_mask(self.shadow_model_indices, self.audit_dataset["data"]) - self.audit_data = self.population.subset(self.audit_dataset["data"]) + self.audit_data = self.get_dataloader(self.audit_dataset["data"]).dataset # Check offline attack for possible IN- sample(s) if not self.online: @@ -129,8 +113,8 @@ def prepare_attack(self:Self)->None: if count_in_samples > 0: self.logger.info(f"Some shadow model(s) contains {count_in_samples} IN samples in total for the model(s)") self.logger.info("This is not an offline attack!") - self.skip_indices = np.zeros(len(self.in_indices_mask), dtype=bool) + self.skip_indices = np.zeros(len(self.in_indices_mask), dtype=bool) if self.online: no_in = 0 no_out = 0 @@ -148,7 +132,7 @@ def prepare_attack(self:Self)->None: self.logger.info("some audit sample(s) mighthave a few or even 0 IN or OUT logits") self.logger.info(f"In total {np.count_nonzero(self.skip_indices)} indices will be skipped!") - if len(self.audit_data) == len(self.skip_indices): + if len(self.audit_data) == np.sum(self.skip_indices): raise ValueError("All audit samples are skipped. Please adjust the number of shadow models or the audit dataset.") # Calculate logits for all shadow models @@ -181,7 +165,9 @@ def run_attack(self:Self) -> CombinedMetricResult: in_std = np.nanstd(self.shadow_models_logits[self.in_indices_mask].flatten()) # Iterate and extract logits from shadow models for each sample in the audit dataset - for i, (shadow_models_logits, mask) in tqdm(enumerate(zip(self.shadow_models_logits, self.in_indices_mask))): + for i, (shadow_models_logits, mask) in tqdm(enumerate(zip(self.shadow_models_logits, self.in_indices_mask)), + total=len(self.shadow_models_logits), + desc="Processing samples"): # Calculate the mean for OUT shadow model logits out_mean = np.mean(shadow_models_logits[~mask]) diff --git a/leakpro/attacks/mia_attacks/loss_trajectory.py b/leakpro/attacks/mia_attacks/loss_trajectory.py index 4c04bddf..0f4eff28 100644 --- a/leakpro/attacks/mia_attacks/loss_trajectory.py +++ b/leakpro/attacks/mia_attacks/loss_trajectory.py @@ -2,46 +2,39 @@ import os import pickle -from logging import Logger import matplotlib.pyplot as plt import numpy as np import torch.nn.functional as F # noqa: N812 from torch import argmax, cuda, device, load, nn, no_grad, optim, save, tensor -from torch.utils.data import DataLoader, Subset, TensorDataset +from torch.utils.data import DataLoader, TensorDataset from tqdm import tqdm from leakpro.attacks.mia_attacks.abstract_mia import AbstractMIA -from leakpro.attacks.utils.attack_data import get_attack_data -from leakpro.attacks.utils.distillation_model_handler import DistillationShadowModelHandler, DistillationTargetModelHandler +from leakpro.attacks.utils.distillation_model_handler import DistillationModelHandler from leakpro.attacks.utils.shadow_model_handler import ShadowModelHandler from leakpro.import_helper import Self from leakpro.metrics.attack_result import CombinedMetricResult from leakpro.signals.signal import ModelLogits +from leakpro.user_inputs.abstract_input_handler import AbstractInputHandler class AttackLossTrajectory(AbstractMIA): """Implementation of the loss trajectory attack.""" def __init__(self: Self, - population: np.ndarray, - audit_dataset: dict, - target_model: nn.Module, - logger: Logger, + handler: AbstractInputHandler, configs: dict ) -> None: """Initialize the LossTrajectoryAttack class. Args: ---- - population (np.ndarray): The population data. - audit_dataset (dict): The audit dataset. - target_model (nn.Module): The target model. - logger (Logger): The logger instance. + handler (AbstractInputHandler): The input handler object. configs (dict): A dictionary containing the attack loss_traj configurations. """ - super().__init__(population, audit_dataset, target_model, logger) + super().__init__(handler) self.logger.info("Configuring Loss trajecatory attack") self._configure_attack(configs) @@ -54,11 +47,10 @@ def _configure_attack(self: Self, configs: dict) -> None: self.configs = configs self.train_mia_batch_size = configs.get("mia_batch_size", 64) - self.num_students = 1 self.number_of_traj = configs.get("number_of_traj", 10) self.attack_data_dir = configs.get("attack_data_dir") self.mia_classifier_epoch = configs.get("mia_classifier_epochs", 100) - self.attack_mode = configs.get("attack_mode", "soft_label") + self.label_only = configs.get("label_only", "False") self.read_from_file = False @@ -105,19 +97,9 @@ def prepare_attack(self:Self) -> None: """ self.logger.info("Preparing the data for loss trajectory attack") - include_target_training_data = False - #TODO: This should be changed! - include_target_testing_data = False - # Get all available indices for auxiliary dataset - aux_data_index = get_attack_data( - self.population_size, - self.train_indices, - self.test_indices, - include_target_training_data, - include_target_testing_data, - self.logger - ) + aux_data_index = self.sample_indices_from_population(include_train_indices = False, include_test_indices = False) + # create auxiliary dataset aux_data_size = len(aux_data_index) shadow_data_size = int(aux_data_size * self.shadow_data_fraction) @@ -129,65 +111,64 @@ def prepare_attack(self:Self) -> None: # Distillation on target and shadow model happen on the same dataset distill_data_indices = np.setdiff1d(aux_data_index, shadow_data_indices) - shadow_dataset = self.population.subset(shadow_training_indices) - distill_dataset = self.population.subset(distill_data_indices) - - # train shadow models - self.logger.info(f"Training shadow models on {len(shadow_dataset)} points") - ShadowModelHandler().create_shadow_models( - self.num_shadow_models, - shadow_dataset, - shadow_training_indices, - training_fraction = 1.0, - retrain= True, - ) + #-------------------------------------------------------- + # Train and load shadow model + #-------------------------------------------------------- + self.logger.info(f"Training shadow models on {len(shadow_training_indices)} points") + self.shadow_model_indices = ShadowModelHandler().create_shadow_models(self.num_shadow_models, + shadow_training_indices, + training_fraction = 1.0) # load shadow models - self.shadow_models, self.shadow_model_indices = \ - ShadowModelHandler().get_shadow_models(self.num_shadow_models) - self.shadow_metadata = ShadowModelHandler().get_shadow_model_metadata(1) + self.shadow_model, _ = ShadowModelHandler().get_shadow_models(self.shadow_model_indices) + #-------------------------------------------------------- + # Knowledge distillation of target and shadow models + #-------------------------------------------------------- + # Note: shadow and target models are PytorchModel objects, hence, we need to take model_obj + DistillationModelHandler().add_student_teacher_pair("shadow_distillation", self.shadow_model[0].model_obj) + DistillationModelHandler().add_student_teacher_pair("target_distillation", self.target_model.model_obj) + + self.logger.info(f"Training distillation of the shadow model on {len(distill_data_indices)} points") # train the distillation model using the one and only trained shadow model - self.logger.info(f"Training distillation of the shadow model on {len(distill_dataset)} points") - DistillationShadowModelHandler().initializng_shadow_teacher(self.shadow_models[0], self.shadow_metadata[0]) - DistillationShadowModelHandler().create_distillation_models( - self.num_students, - self.number_of_traj, - distill_dataset, - distill_data_indices, - self.attack_mode, - ) - self.distill_shadow_models = DistillationShadowModelHandler().get_distillation_epochs(self.number_of_traj) + self.distill_shadow_models = DistillationModelHandler().distill_model("shadow_distillation", + self.number_of_traj, + distill_data_indices, + self.label_only) # train distillation model of the target model - self.logger.info(f"Training distillation of the target model on {len(distill_dataset)} points") - DistillationTargetModelHandler().create_distillation_models( - self.num_students, - self.number_of_traj, - distill_dataset, - distill_data_indices, - self.attack_mode, - ) - self.distill_target_models = DistillationTargetModelHandler().get_distillation_epochs(self.number_of_traj) - + self.distill_target_models = DistillationModelHandler().distill_model("target_distillation", + self.number_of_traj, + distill_data_indices, + self.label_only) + + #-------------------------------------------------------- + # Prepare data to train and test the MIA classifier + #-------------------------------------------------------- # shadow data (train and test) is used as training data for MIA_classifier in the paper - train_mask = np.isin(shadow_data_indices,shadow_not_used_indices ) - self.prepare_mia_data(shadow_data_indices, train_mask, - self.distill_shadow_models, self.shadow_models[0].model_obj, "train") + train_mask = np.isin(shadow_data_indices,shadow_not_used_indices) + self.prepare_mia_data(shadow_data_indices, + train_mask, + student_model = self.distill_shadow_models, + teacher_model = self.shadow_model[0].model_obj, + train_mode = True) # Data used in the target (train and test) is used as test data for MIA_classifier mia_test_data_indices = np.concatenate( (self.train_indices , self.test_indices)) test_mask = np.isin(mia_test_data_indices, self.train_indices) - self.prepare_mia_data(mia_test_data_indices, test_mask, - self.distill_target_models, self.target_model.model_obj, "test") + self.prepare_mia_data(mia_test_data_indices, + test_mask, + student_model = self.distill_target_models, + teacher_model = self.target_model.model_obj, + train_mode = False) def prepare_mia_data(self:Self, data_indices: np.ndarray, membership_status_shadow_train: np.ndarray, - distill_model: nn.Module, + student_model: nn.Module, teacher_model: nn.Module, - mode: str, + train_mode: bool, ) -> None: """Prepare the data for MIA attack. @@ -195,51 +176,34 @@ def prepare_mia_data(self:Self, ---- data_indices (np.ndarray): Indices of the data. membership_status_shadow_train (np.ndarray): Membership status of the shadow training data. - distill_model (nn.Module): Distillation model. + student_model (nn.Module): Distillation model. teacher_model (nn.Module): Teacher model. - mode (str): Mode of the attack (train or test). + train_mode (bool): Mode of the attack. Returns: ------- None """ - if mode == "train": - dataset_name = "trajectory_train_data.pkl" - if os.path.exists(f"{self.attack_data_dir}/{dataset_name}"): - self.logger.info(f"Loading MIA {dataset_name}: {len(data_indices)} points") - with open(f"{self.attack_data_dir}/{dataset_name}", "rb") as file: - data = pickle.load(file) # noqa: S301 - else: - data = self._prepare_mia_data(data_indices, - membership_status_shadow_train, - distill_model, - teacher_model, - dataset_name) - - # Create the training dataset for the MIA classifier. - mia_train_input = np.concatenate((data["model_trajectory"], - data["teacher_model_loss"][:, None]), axis=1) - mia_train_dataset = TensorDataset(tensor(mia_train_input), tensor(data["member_status"])) - self.mia_train_data_loader = DataLoader(mia_train_dataset, batch_size=self.train_mia_batch_size, shuffle=True) - - elif mode == "test": - dataset_name = "trajectory_test_data.pkl" - if os.path.exists(f"{self.attack_data_dir}/{dataset_name}"): - self.logger.info(f"Loading MIA {dataset_name}: {len(data_indices)} points") - with open(f"{self.attack_data_dir}/{dataset_name}", "rb") as file: - data = pickle.load(file) # noqa: S301 - else: - data = self._prepare_mia_data(data_indices, - membership_status_shadow_train, - distill_model, - teacher_model, - dataset_name) - # Create the training dataset for the MIA classifier. - mia_test_input = np.concatenate((data["model_trajectory"] , - data["teacher_model_loss"][:,None]), axis=1) - mia_test_dataset = TensorDataset(tensor(mia_test_input), tensor(data["member_status"])) - self.mia_test_data_loader = DataLoader(mia_test_dataset, batch_size=self.train_mia_batch_size, shuffle=True) + + dataset_name = "trajectory_train_data.pkl" if train_mode else "trajectory_test_data.pkl" + if os.path.exists(f"{self.attack_data_dir}/{dataset_name}"): + self.logger.info(f"Loading MIA {dataset_name}: {len(data_indices)} points") + with open(f"{self.attack_data_dir}/{dataset_name}", "rb") as file: + data = pickle.load(file) # noqa: S301 + else: + data = self._prepare_mia_data(data_indices, + membership_status_shadow_train, + student_model, + teacher_model, + dataset_name) + # Create the training dataset for the MIA classifier. + mia_input = np.concatenate((data["model_trajectory"], data["teacher_model_loss"][:, None]), axis=1) + mia_dataset = TensorDataset(tensor(mia_input), tensor(data["member_status"])) + if train_mode: + self.mia_train_data_loader = DataLoader(mia_dataset, batch_size=self.train_mia_batch_size, shuffle=True) + else: + self.mia_test_data_loader = DataLoader(mia_dataset, batch_size=self.train_mia_batch_size, shuffle=True) def _prepare_mia_data(self:Self, data_indices: np.ndarray, @@ -250,8 +214,7 @@ def _prepare_mia_data(self:Self, ) -> dict: self.logger.info(f"Preparing MIA {dataset_name}: {len(data_indices)} points") gpu_or_cpu = device("cuda" if cuda.is_available() else "cpu") - data_attack = Subset(self.population, data_indices) - data_loader = DataLoader(data_attack, batch_size=self.train_mia_batch_size, shuffle=False) + data_loader = self.get_dataloader(data_indices, batch_size=self.train_mia_batch_size) teacher_model_loss = np.array([]) model_trajectory = np.array([]) @@ -259,12 +222,15 @@ def _prepare_mia_data(self:Self, predicted_labels = np.array([]) predicted_status = np.array([]) - for loader_idx, (data, target) in enumerate(data_loader): - data = data.to(gpu_or_cpu) # noqa: PLW2901 - target = target.to(gpu_or_cpu) # noqa: PLW2901 + for loader_idx, (data, target) in tqdm(data_loader): + data = data.to(gpu_or_cpu) + target = target.to(gpu_or_cpu) + #--------------------------------------------------------------------- + # Calculate the losses for the distilled student models + #--------------------------------------------------------------------- trajectory_current = np.array([]) - for d in range(self.number_of_traj): + for d in range(self.number_of_traj) : distill_model[d].to(gpu_or_cpu) distill_model[d].eval() @@ -279,13 +245,16 @@ def _prepare_mia_data(self:Self, loss = np.array([loss_i.detach().cpu().numpy() for loss_i in loss]).reshape(-1, 1) trajectory_current = loss if d == 0 else np.concatenate((trajectory_current, loss), 1) + #--------------------------------------------------------------------- + # Calculate the loss for the teacher model + #--------------------------------------------------------------------- teacher_model.to(gpu_or_cpu) - batch_logit_target = teacher_model(data) - + batch_logit_target = teacher_model(data) # TODO: replace with hopskipjump for label only _, batch_predict_label = batch_logit_target.max(1) batch_predicted_label = batch_predict_label.long().cpu().detach().numpy() batch_original_label = target.long().cpu().detach().numpy() batch_loss_teacher = [] + for (batch_logit_target_i, target_i) in zip(batch_logit_target, target): batch_loss_teacher.append(F.cross_entropy(batch_logit_target_i.unsqueeze(0), target_i.unsqueeze(0))) @@ -321,7 +290,6 @@ def _prepare_mia_data(self:Self, pickle.dump(data, file) return data - def mia_classifier(self:Self)-> nn.Module: """Trains and returns the MIA (Membership Inference Attack) classifier. @@ -338,7 +306,7 @@ def mia_classifier(self:Self)-> nn.Module: attack_model = attack_model.to(gpu_or_cpu) loss_fn = nn.CrossEntropyLoss() - train_loss, train_prec1 = self.train_mia_classifier(attack_model, attack_optimizer, loss_fn) + train_loss, train_prec1 = self._train_mia_classifier(attack_model, attack_optimizer, loss_fn) train_info = [train_loss, train_prec1] save(attack_model.state_dict(), self.attack_data_dir + "/trajectory_mia_model.pkl") @@ -352,7 +320,7 @@ def mia_classifier(self:Self)-> nn.Module: return attack_model - def train_mia_classifier(self:Self, model:nn.Module, + def _train_mia_classifier(self:Self, model:nn.Module, attack_optimizer:optim.Optimizer, loss_fn:nn.functional) -> tuple: """Trains the model using the MIA (Membership Inference Attack) method for one step. @@ -378,11 +346,10 @@ def train_mia_classifier(self:Self, model:nn.Module, num_correct = 0 mia_train_loader = self.mia_train_data_loader gpu_or_cpu = device("cuda" if cuda.is_available() else "cpu") - for _ in tqdm(range(self.mia_classifier_epoch)): + for _ in tqdm(range(self.mia_classifier_epoch), total=self.mia_classifier_epoch): for _batch_idx, (data, label) in enumerate(mia_train_loader): data = data.to(gpu_or_cpu) # noqa: PLW2901 - label = label.to(gpu_or_cpu) # noqa: PLW2901 - label = label.long() # noqa: PLW2901 + label = label.to(gpu_or_cpu).long() # noqa: PLW2901 pred = model(data) loss = loss_fn(pred, label) @@ -403,31 +370,6 @@ def train_mia_classifier(self:Self, model:nn.Module, return train_loss_list, train_prec_list - def run_attack(self:Self) -> CombinedMetricResult: - """Run the attack and return the combined metric result. - - Returns - ------- - CombinedMetricResult: The combined metric result containing predicted labels, true labels, - predictions probabilities, and signal values. - - """ - self.mia_classifier() - true_labels, predictions = self.mia_attack(self.mia_classifer) - - #NOTE: We don't have signals in this attack, unlike RMIA. I set it to random to pass the PR before refactoring. - signals = np.random.rand(*true_labels.shape) - - self.plot_trajectories("trajectory_train_data.pkl") - self.plot_trajectories("trajectory_test_data.pkl") - # compute ROC, TP, TN etc - return CombinedMetricResult( - predicted_labels= predictions, - true_labels=true_labels, - predictions_proba=None, - signal_values=signals, - ) - def mia_attack(self:Self, attack_model:nn.Module) -> tuple: """Perform a membership inference attack using the given attack model. @@ -452,7 +394,7 @@ def mia_attack(self:Self, attack_model:nn.Module) -> tuple: auc_pred = None with no_grad(): - for batch_idx, (data, target) in tqdm(enumerate(self.mia_test_data_loader)): + for batch_idx, (data, target) in tqdm(enumerate(self.mia_test_data_loader), total=len(self.mia_test_data_loader)): data = data.to(gpu_or_cpu) # noqa: PLW2901 target = target.to(gpu_or_cpu) # noqa: PLW2901 target = target.long() # noqa: PLW2901 @@ -472,11 +414,33 @@ def mia_attack(self:Self, attack_model:nn.Module) -> tuple: test_loss /= len(self.mia_test_data_loader.dataset) accuracy = 100. * correct / len(self.mia_test_data_loader.dataset) # noqa: F841 - thresholds_1 = np.linspace(0, 1, 1000) - member_preds = np.array([(auc_pred < threshold).astype(int) for threshold in thresholds_1]) + thresholds = np.linspace(0, 1, 1000) + member_preds = np.array([(auc_pred < threshold).astype(int) for threshold in thresholds]) return auc_ground_truth, member_preds + def run_attack(self:Self) -> CombinedMetricResult: + """Run the attack and return the combined metric result. + + Returns + ------- + CombinedMetricResult: The combined metric result containing predicted labels, true labels, + predictions probabilities, and signal values. + + """ + self.mia_classifier() + true_labels, predictions = self.mia_attack(self.mia_classifer) + + #NOTE: We don't have signals in this attack, unlike RMIA. I set it to random to pass the PR before refactoring. + signals = np.random.rand(*true_labels.shape) + + # compute ROC, TP, TN etc + return CombinedMetricResult( + predicted_labels= predictions, + true_labels=true_labels, + predictions_proba=None, + signal_values=signals, + ) def plot_trajectories(self: Self, dataset_name: str) -> None: diff --git a/leakpro/attacks/mia_attacks/qmia.py b/leakpro/attacks/mia_attacks/qmia.py index fb813bee..c52f80b0 100644 --- a/leakpro/attacks/mia_attacks/qmia.py +++ b/leakpro/attacks/mia_attacks/qmia.py @@ -1,5 +1,4 @@ """Implementation of the RMIA attack.""" -from logging import Logger import numpy as np import torch @@ -8,10 +7,10 @@ from tqdm import tqdm from leakpro.attacks.mia_attacks.abstract_mia import AbstractMIA -from leakpro.attacks.utils.attack_data import get_attack_data from leakpro.import_helper import Self from leakpro.metrics.attack_result import CombinedMetricResult from leakpro.signals.signal import ModelRescaledLogits +from leakpro.user_inputs.abstract_input_handler import AbstractInputHandler class QuantileRegressor(nn.Module): @@ -93,25 +92,19 @@ class AttackQMIA(AbstractMIA): def __init__( self:Self, - population: np.ndarray, - audit_dataset: dict, - target_model: nn.Module, - logger:Logger, + handler: AbstractInputHandler, configs: dict ) -> None: """Initialize the QMIA attack. Args: ---- - population (np.ndarray): The population data. - audit_dataset (dict): The audit dataset. - target_model (nn.Module): The target model. - logger (Logger): The logger object. + handler (AbstractInputHandler): The input handler object. configs (dict): Configuration parameters for the attack. """ # Initializes the parent metric - super().__init__(population, audit_dataset, target_model, logger) + super().__init__(handler) self.logger.info("Configuring the QMIA attack") self._configure_attack(configs) @@ -164,26 +157,17 @@ def prepare_attack(self:Self) -> None: """ # sample dataset to train quantile regressor self.logger.info("Preparing attack data for training the quantile regressor") - self.attack_data_index = get_attack_data( - self.population_size, - self.train_indices, - self.test_indices, - train_data_included_in_auxiliary_data = False, - test_data_included_in_auxiliary_data = False, - logger = self.logger - ) + self.attack_data_indices = self.sample_indices_from_population(include_train_indices = False, + include_test_indices = False) # subsample the attack data based on the fraction - self.logger.info(f"Subsampling attack data from {len(self.attack_data_index)} points") - self.attack_data_index = np.random.choice( - self.attack_data_index, - int(self.training_data_fraction * len(self.attack_data_index)), - replace=False - ) - self.logger.info(f"Number of attack data points after subsampling: {len(self.attack_data_index)}") + self.logger.info(f"Subsampling attack data from {len(self.attack_data_indices)} points") + n_points = int(self.training_data_fraction * len(self.attack_data_indices)) + attack_data = self.sample_data_from_dataset(self.attack_data_indices, n_points).dataset + self.logger.info(f"Number of attack data points after subsampling: {len(self.attack_data_indices)}") # create attack dataset - attack_data = self.population.subset(self.attack_data_index) + attack_data = self.population.subset(self.attack_data_indices) # create labels and change dataset to be used for regression regression_labels = np.array(self.signal([self.target_model], attack_data)).squeeze() @@ -269,7 +253,7 @@ def run_attack(self:Self) -> CombinedMetricResult: Result(s) of the metric. """ - audit_dataset = self.population.subset(self.audit_dataset["data"]) + audit_dataset = self.get_dataloader(self.audit_dataset["data"]).dataset self.target_logits = np.array(self.signal([self.target_model], audit_dataset)).squeeze() audit_dataloader = DataLoader(audit_dataset, batch_size=64, shuffle=False) diff --git a/leakpro/attacks/mia_attacks/rmia.py b/leakpro/attacks/mia_attacks/rmia.py index 92aa74fe..8d00200e 100644 --- a/leakpro/attacks/mia_attacks/rmia.py +++ b/leakpro/attacks/mia_attacks/rmia.py @@ -1,40 +1,32 @@ """Implementation of the RMIA attack.""" -from logging import Logger import numpy as np -from torch import nn from leakpro.attacks.mia_attacks.abstract_mia import AbstractMIA -from leakpro.attacks.utils.attack_data import get_attack_data from leakpro.attacks.utils.shadow_model_handler import ShadowModelHandler from leakpro.import_helper import Self from leakpro.metrics.attack_result import CombinedMetricResult from leakpro.signals.signal import ModelLogits +from leakpro.user_inputs.abstract_input_handler import AbstractInputHandler class AttackRMIA(AbstractMIA): """Implementation of the RMIA attack.""" def __init__(self:Self, - population: np.ndarray, - audit_dataset: dict, - target_model: nn.Module, - logger:Logger, + handler: AbstractInputHandler, configs: dict ) -> None: """Initialize the RMIA attack. Args: ---- - population (np.ndarray): The population data. - audit_dataset (dict): The audit dataset. - target_model (nn.Module): The target model. - logger (Logger): The logger object. + handler (AbstractInputHandler): The input handler object. configs (dict): Configuration parameters for the attack. """ # Initializes the parent metric - super().__init__(population, audit_dataset, target_model, logger) + super().__init__(handler) self.shadow_models = [] self.signal = ModelLogits() self.epsilon = 1e-6 @@ -95,7 +87,6 @@ def description(self:Self) -> dict: "detailed": detailed_str, } - def softmax(self:Self, all_logits:np.ndarray, true_label_indices:np.ndarray, return_full_distribution:bool=False) -> np.ndarray: @@ -135,32 +126,20 @@ def prepare_attack(self:Self) -> None: # sample dataset to compute histogram self.logger.info("Preparing attack data for training the RMIA attack") - # Get all available indices to sample from for shadow models - - include_target_training_data = self.online is True - include_target_testing_data = self.online is True - - # Get all available indices for attack dataset - self.attack_data_index = get_attack_data( - self.population_size, - self.train_indices, - self.test_indices, - include_target_training_data, - include_target_testing_data, - self.logger - ) - # train shadow models - self.logger.info(f"Check for {self.num_shadow_models} shadow models (dataset: {len(self.attack_data_index)} points)") - ShadowModelHandler().create_shadow_models( - self.num_shadow_models, - self.population, - self.attack_data_index, - self.training_data_fraction, - ) + # Get all available indices for attack dataset, if self.online = True, include training and test data + self.attack_data_indices = self.sample_indices_from_population(include_train_indices = self.online, + include_test_indices = self.online) + # train shadow models + self.logger.info(f"Check for {self.num_shadow_models} shadow models (dataset: {len(self.attack_data_indices)} points)") + self.shadow_model_indices = ShadowModelHandler().create_shadow_models( + num_models = self.num_shadow_models, + shadow_population = self.attack_data_indices, + training_fraction = self.training_data_fraction, + online = self.online) # load shadow models - self.shadow_models, self.shadow_model_indices = ShadowModelHandler().get_shadow_models(self.num_shadow_models) + self.shadow_models, _ = ShadowModelHandler().get_shadow_models(self.shadow_model_indices) # compute quantities that are not touching the audit dataset if self.online is False: @@ -168,14 +147,9 @@ def prepare_attack(self:Self) -> None: # for all points in the attack dataset output from signal: # models x # data points x # classes # subsample the attack data based on the fraction - self.logger.info(f"Subsampling attack data from {len(self.attack_data_index)} points") - self.attack_data_index = np.random.choice( - self.attack_data_index, - int(self.attack_data_fraction * len(self.attack_data_index)), - replace=False - ) - # create attack dataset - attack_data = self.population.subset(self.attack_data_index) + self.logger.info(f"Subsampling attack data from {len(self.attack_data_indices)} points") + n_points = int(self.attack_data_fraction * len(self.attack_data_indices)) + attack_data = self.sample_data_from_dataset(self.attack_data_indices, n_points).dataset self.logger.info(f"Number of attack data points after subsampling: {len(attack_data)}") # get the true label indices @@ -197,34 +171,33 @@ def prepare_attack(self:Self) -> None: p_z = np.mean(p_z_given_shadow_models, axis=0) if len(self.shadow_models) > 1 else p_z_given_shadow_models.squeeze() p_z = 0.5*((self.offline_a + 1) * p_z + (1-self.offline_a)) - #TODO: pick the maximum value of the softmax output in p(z) self.ratio_z = p_z_given_theta / (p_z + self.epsilon) def _online_attack(self:Self) -> None: self.logger.info("Running RMIA online attack") # get the logits for the audit dataset - audit_data = self.population.subset(self.audit_dataset["data"]) + audit_data = self.get_dataloader(self.audit_dataset["data"]).dataset ground_truth_indices = np.array(audit_data._labels) # find the shadow models that are trained on what points in the audit dataset - in_model_indices = ShadowModelHandler().identify_models_trained_on_samples( - self.shadow_model_indices, - self.audit_dataset["data"] - ) + in_indices_mask = ShadowModelHandler().get_in_indices_mask(self.shadow_model_indices, self.audit_dataset["data"]).T # filter out the points that no shadow model has seen and points that all shadow models have seen - num_shadow_models_seen_points = np.sum(in_model_indices, axis=0) + num_shadow_models_seen_points = np.sum(in_indices_mask, axis=0) # make sure that the audit points are included in the shadow model training (but not all) mask = (num_shadow_models_seen_points > 0) & (num_shadow_models_seen_points < self.num_shadow_models) audit_data = audit_data.subset(mask) + if len(audit_data) == 0: + raise ValueError("No points in the audit dataset are used for the shadow models") + # find out how many in-members survived the filtering in_members = np.arange(np.sum(mask[self.audit_dataset["in_members"]])) # find out how many out-members survived the filtering num_out_members = np.sum(mask[self.audit_dataset["out_members"]]) out_members = np.arange(len(in_members), len(in_members) + num_out_members) ground_truth_indices = np.array(audit_data._labels) - out_model_indices = ~in_model_indices[:,mask] + out_model_indices = ~in_indices_mask[:,mask] self.logger.info(f"Number of points in the audit dataset that are used for online attack: {len(audit_data)}") @@ -243,14 +216,7 @@ def _online_attack(self:Self) -> None: ratio_x = p_x_given_target_model / (p_x + self.epsilon) # Make a "random sample" to compute p(z) for points in attack dataset on the OUT shadow models for each audit point - self.attack_data_index = get_attack_data( - self.population_size, - self.train_indices, - self.test_indices, - False, - False, - self.logger - ) + self.attack_data_index = self.sample_indices_from_population(include_train_indices = False, include_test_indices = False) # subsample the attack data based on the fraction self.logger.info(f"Subsampling attack data from {len(self.attack_data_index)} points") @@ -294,7 +260,7 @@ def _online_attack(self:Self) -> None: def _offline_attack(self:Self) -> None: self.logger.info("Running RMIA offline attack") # get the logits for the audit dataset - audit_data = self.population.subset(self.audit_dataset["data"]) + audit_data = self.get_dataloader(self.audit_dataset["data"]).dataset ground_truth_indices = np.array(audit_data._labels) # run target points through real model to get logits diff --git a/leakpro/attacks/utils/attack_data.py b/leakpro/attacks/utils/attack_data.py deleted file mode 100644 index 33219023..00000000 --- a/leakpro/attacks/utils/attack_data.py +++ /dev/null @@ -1,56 +0,0 @@ -"""Module providing a function to get attack data for the attack models.""" -from logging import Logger - -import numpy as np - - -def get_attack_data( - population_size: int, - train_indices: list, - test_indices: list, - train_data_included_in_auxiliary_data: bool, - test_data_included_in_auxiliary_data: bool, - logger:Logger -) -> np.ndarray: - """Function to get attack data for the attack models. - - Args: - ---- - population_size (int): The size of the population. - train_indices (list): The indices of the training data. - test_indices (list): The indices of the test data. - train_data_included_in_auxiliary_data (bool): Flag indicating whether to include train data in auxiliary data. - test_data_included_in_auxiliary_data (bool): Flag indicating whether to include test data in auxiliary data. - logger (Logger): The logger object for logging. - - Returns: - ------- - np.ndarray: The selected attack data indices. - - """ - if population_size <= 0: - raise ValueError("Population size must be greater than 0.") - if train_indices is None: - raise ValueError("Train indices must be provided.") - if test_indices is None: - raise ValueError("Test indices must be provided.") - - all_index = np.arange(population_size) - - not_allowed_indices = np.array([]) - if not train_data_included_in_auxiliary_data: - not_allowed_indices = np.hstack([not_allowed_indices, train_indices]) - - if not test_data_included_in_auxiliary_data: - not_allowed_indices = np.hstack([not_allowed_indices, test_indices]) - - available_index = np.setdiff1d(all_index, not_allowed_indices) - attack_data_size = len(available_index) - - logger.info(f"Selecting {attack_data_size} attack data points out of {len(available_index)} available data points.") - - if attack_data_size <= len(available_index): - selected_index = np.random.choice(available_index, attack_data_size, replace=False) - else: - raise ValueError("Not enough remaining data points.") - return selected_index diff --git a/leakpro/attacks/utils/distillation_model_handler.py b/leakpro/attacks/utils/distillation_model_handler.py index 70883011..469e9a10 100644 --- a/leakpro/attacks/utils/distillation_model_handler.py +++ b/leakpro/attacks/utils/distillation_model_handler.py @@ -1,17 +1,17 @@ """Module for handling shadow models.""" -import logging import os import pickle import numpy as np import torch.nn.functional as F # noqa: N812 -from torch import cuda, device, load, nn, optim, save +from torch import cuda, device, save from torch.nn import CrossEntropyLoss, KLDivLoss, Module -from torch.utils.data import DataLoader, Dataset +from tqdm import tqdm +from leakpro.attacks.utils.model_handler import ModelHandler from leakpro.import_helper import Self -from leakpro.utils.input_handler import get_class_from_module, import_module_from_file +from leakpro.user_inputs.abstract_input_handler import AbstractInputHandler def singleton(cls): # noqa: ANN001, ANN201 @@ -32,477 +32,161 @@ def get_instance(*args, **kwargs): # noqa: ANN003, ANN002, ANN202 return get_instance @singleton -class DistillationTargetModelHandler(): +class DistillationModelHandler(ModelHandler): """A class handling the creation, training, and loading of distillation models.""" - # Create a dictionary mapping lowercase names to optimizer classes (Optimizer is the base class) - optimizer_mapping = { - attr.lower(): getattr(optim, attr) - for attr in dir(optim) - if isinstance(getattr(optim, attr), type) and issubclass(getattr(optim, attr), optim.Optimizer) - } - - # Create a dictionary mapping lowercase names to loss classes (_Loss is the base class) - loss_mapping = {} - - for attr in dir(nn): - # Get the attribute - attribute = getattr(nn, attr, None) - # Ensure it's a class and a subclass of _Loss - if isinstance(attribute, type) and issubclass(attribute, nn.modules.loss._Loss): - loss_mapping[attr.lower()] = attribute - - def __init__(self:Self, target_model:Module, target_config:dict, config:dict, logger:logging.Logger)->None: + def __init__(self:Self, handler: AbstractInputHandler)->None: """Initialize the DistillationModelHandler. Args: ---- - target_model (Module): The target model. - target_config (dict): The configuration of the target model. - config (dict): The configuration of the DistillationModelHandler. - logger (logging.Logger): The logger object for logging. + handler (AbstractInputHandler): The input handler. """ - module_path = config.get("module_path") - model_class_path = config.get("model_class_path") - self.target_model = target_model - self.logger = logger + super().__init__(handler) + self.configs = handler.configs["distillation_model"] + + module_path = self.configs.get("module_path", None) + model_class_path = self.configs.get("model_class", None) + self.storage_path = self.configs.get("storage_path", None) + self.batch_size = self.configs.get("batch_size", 32) + self.epochs = self.configs.get("epochs", 10) + self.optimizer_config = self.configs.get("optimizer", None) + self.loss_config = self.configs.get("loss", None) # If no path to distillation model is provided, use the target model blueprint if module_path is None or model_class_path is None: - self.init_params = target_config["init_params"] - self.distillation_model_blueprint = self.target_model.model_obj.__class__ - - self.logger.info("Distillation model blueprint: target model") + self.model_blueprint = None else: - self.module_path = module_path - self.model_class_path = model_class_path - self.init_params = config.get("init_params", {}) - module = import_module_from_file(self.module_path) - self.distillation_model_blueprint = get_class_from_module(module, self.model_class_path) + self.init_params = self.configs.get("init_params", {}) + self._import_model_from_path(module_path, model_class_path) - self.logger.info(f"Distillation model blueprint loaded from {self.model_class_path} from {self.module_path}") + if self.optimizer_config is None: + raise ValueError("Optimizer configuration not provided") + self._get_optimizer_class(self.optimizer_config.pop("name")) + + if self.loss_config is None: + raise ValueError("Loss configuration not provided") + self._get_criterion_class(self.loss_config.pop("name")) - self.storage_path = config["storage_path"] # Check if the folder does not exist if not os.path.exists(self.storage_path): # Create the folder os.makedirs(self.storage_path) self.logger.info(f"Created folder {self.storage_path}") - self.batch_size = config.get("batch_size", target_config["batch_size"]) if self.batch_size < 0: raise ValueError("Batch size cannot be negative") - #TODO: epoch here? - self.epochs = config.get("epochs", target_config["epochs"]) + if self.epochs < 0: raise ValueError("Number of epochs cannot be negative") - self.optimizer_config = config.get("optimizer", target_config["optimizer"]) - if self.optimizer_config is None: - raise ValueError("Optimizer configuration not provided") - - self.loss_config = config.get("loss", target_config["loss"]) - if self.loss_config is None: - raise ValueError("Loss configuration not provided") - - self.optimizer_class = self.optimizer_mapping[self.optimizer_config.pop("name")] - self.criterion_class = self.loss_mapping[self.loss_config.pop("name")] - - self.model_storage_name = "distillation_epochs" self.metadata_storage_name = "metadata" - def create_distillation_models( - self:Self, - num_students:int, - num_trajectory_epochs:int, - dataset:Dataset, - distillation_data_indices: np.ndarray, - attack_mode:str - ) -> None: - """Create and train shadow models based on the blueprint. - - Args: - ---- - num_students (int): The number of student models to create. - num_trajectory_epochs (int): The number of trajectory epochs for training. - dataset (torch.utils.data.Dataset): The full dataset available for training the shadow models. - distillation_data_indices (np.ndarray): The indices of the distillation data. - attack_mode (str): The mode of attack. - - Returns: - ------- - None - - """ - if num_students < 0: - raise ValueError("Number of student models cannot be negative") - if num_trajectory_epochs < 0: - raise ValueError("Number of trajectory epochs cannot be negative") - - #Intiate the distillation model - distillation_model = self.distillation_model_blueprint(**self.init_params) - - - self.logger.info(f"Distillation training for {num_trajectory_epochs } epochs") - - self._train_distillation_model(distillation_model, - dataset, - distillation_data_indices, - num_trajectory_epochs, - attack_mode, - ) - - - def _train_distillation_model( - self:Self, - distillation_model:Module, - distillation_dataset:Dataset, - distillation_data_indices:np.ndarray, - num_trajectory_epochs:int, - attack_mode:str - ) -> None: - - # Get the device for training - gpu_or_cpu = device("cuda" if cuda.is_available() else "cpu") - distillation_model.to(gpu_or_cpu) - teacher_model = self.target_model.model_obj - teacher_model.to(gpu_or_cpu) - - # Data prepration - distillation_train_loader = DataLoader(distillation_dataset, batch_size=self.batch_size, shuffle=True) - self.logger.info(f"Created distillation dataset with size {len(distillation_dataset)}") - - optimizer = self.optimizer_class(distillation_model.parameters(), **self.optimizer_config) - - for d in range( num_trajectory_epochs): - - distillation_model.train() - teacher_model.eval() - - epoch_loss = 0 - - # Loop over each epoch - self.logger.info(f" *** Training distillation model epoch: {d}") - - # Loop over the training set - for data, target_labels in distillation_train_loader: - - # Move data to the device - data, target_labels = data.to(gpu_or_cpu, non_blocking=True), target_labels.to(gpu_or_cpu, non_blocking=True) # noqa: PLW2901 - target_labels = target_labels.long() # noqa: PLW2901 - - # Output of the distillation model - output = distillation_model(data) - output_teacher = teacher_model(data) - - - if attack_mode == "label_only": - loss = CrossEntropyLoss()(output, target_labels) - elif attack_mode == "soft_label": - loss = KLDivLoss(reduction="batchmean")(F.log_softmax(output, dim=1), - F.softmax(output_teacher.float(), - dim=1)) - - optimizer.zero_grad(set_to_none=True) - loss.backward() - optimizer.step() - epoch_loss += loss.item() - - - self.logger.info(f"Training distillation model epoch {d} completed") - self.logger.info(f"loss: {epoch_loss}") - with open(f"{self.storage_path}/{self.model_storage_name}_{d}.pkl", "wb") as f: - save(distillation_model.state_dict().copy(), f) - self.logger.info(f"Saved distillation model epoch {d} to {self.storage_path}") - - self.logger.info("Storing metadata for distillation model") - meta_data = {} - meta_data["init_params"] = self.init_params - meta_data["train_indices"] = distillation_data_indices - meta_data["num_train"] = len(distillation_data_indices) - meta_data["optimizer"] = self.optimizer_class.__name__ - meta_data["criterion"] = self.criterion_class.__name__ - meta_data["batch_size"] = self.batch_size - #TODO: epoch here? - meta_data["epochs"] = self.epochs - meta_data["learning_rate"] = self.optimizer_config["lr"] - meta_data["weight_decay"] = self.optimizer_config.get("weight_decay", 0.0) - - with open(f"{self.storage_path}/{self.metadata_storage_name}_{d}.pkl", "wb") as f: - pickle.dump(meta_data, f) - - self.logger.info(f"Metadata for distillation model stored in {self.storage_path}") + self.model_pairs = {} - - def _load_distillation_epoch(self:Self, index:int) -> Module: - if index < 0: - raise ValueError("Index cannot be negative") - if index >= len(os.listdir(self.storage_path)): - raise ValueError("Index out of range") - distillation_epoch = self.distillation_model_blueprint(**self.init_params) - with open(f"{self.storage_path}/{self.model_storage_name}_{index}.pkl", "rb") as f: - distillation_epoch.load_state_dict(load(f)) - self.logger.info(f"Loaded distillaion epoch {index}") - return distillation_epoch - - - def get_distillation_epochs(self:Self, num_epochs:int) -> list: - """Retrieves the distillation epochs. - - Args: - ---- - num_epochs (int): The number of epochs to retrieve. - - Returns: - ------- - list: A list of distillation epochs. - - """ - distillation_epochs = [] - for i in range(num_epochs): - self.logger.info(f"Loading distillation epoch {i}") - epoch = self._load_distillation_epoch(i) - distillation_epochs.append(epoch) - - return distillation_epochs - - -@singleton -class DistillationShadowModelHandler(): - """A class handling the creation, training, and loading of distillation models.""" - - # Create a dictionary mapping lowercase names to optimizer classes (Optimizer is the base class) - optimizer_mapping = { - attr.lower(): getattr(optim, attr) - for attr in dir(optim) - if isinstance(getattr(optim, attr), type) and issubclass(getattr(optim, attr), optim.Optimizer) - } - - # Create a dictionary mapping lowercase names to loss classes (_Loss is the base class) - loss_mapping = {} - - for attr in dir(nn): - # Get the attribute - attribute = getattr(nn, attr, None) - # Ensure it's a class and a subclass of _Loss - if isinstance(attribute, type) and issubclass(attribute, nn.modules.loss._Loss): - loss_mapping[attr.lower()] = attribute - - def __init__(self:Self, config:dict, logger:logging.Logger)->None: - self.logger = logger - self.module_path = config.get("module_path") - self.config = config - - - def initializng_shadow_teacher(self:Self, - target_model:Module, - target_metadata:dict)->None: - """Initialize the shadow teacher model. + def add_student_teacher_pair(self:Self, name:str, teacher:Module)->None: + """Add a student-teacher pair to the model handler. Args: ---- - target_model (Module): The target model. - target_metadata (dict): The metadata of the target model. + name (str): The name of the model pair. + teacher (Module): The teacher model. Returns: ------- None """ - model_class_path = self.config.get("model_class_path") - self.target_model = target_model - - # If no path to distillation model is provided, use the target model blueprint - if self.module_path is None or model_class_path is None: - self.init_params = target_metadata["init_params"] - self.distillation_model_blueprint = target_model.model_obj.__class__ - - self.logger.info("Distillation model blueprint: target model") - else: - self.init_params = self.config.get("init_params", {}) - module = import_module_from_file(self.module_path) - self.distillation_model_blueprint = get_class_from_module(module, model_class_path) - - self.logger.info(f"Distillation model blueprint loaded from {model_class_path} from {self.module_path}") - - self.storage_path = self.config["storage_path"] - # Check if the folder does not exist - if not os.path.exists(self.storage_path): - # Create the folder - os.makedirs(self.storage_path) - self.logger.info(f"Created folder {self.storage_path}") - - self.batch_size = self.config.get("batch_size", target_metadata["batch_size"]) - if self.batch_size < 0: - raise ValueError("Batch size cannot be negative") - #TODO: epoch here? - self.epochs = self.config.get("epochs", target_metadata["epochs"]) - if self.epochs < 0: - raise ValueError("Number of epochs cannot be negative") - - self.optimizer_config = self.config.get("optimizer", target_metadata["optimizer"].lower()) - if self.optimizer_config is None: - raise ValueError("Optimizer configuration not provided") - - self.loss_config = {"name": self.config.get("criterion", target_metadata["criterion"].lower())} - if self.loss_config is None: - raise ValueError("Loss configuration not provided") - - self.optimizer_class = self.optimizer_mapping[self.optimizer_config.pop("name")] - self.criterion_class = self.loss_mapping[self.loss_config.pop("name")] - - - self.model_storage_name = "distillation_epochs" - self.metadata_storage_name = "metadata" + student, _, optimizer = self._get_model_criterion_optimizer() + self.model_pairs[name] = {"student": student, "teacher": teacher, "optimizer": optimizer} - def create_distillation_models( + def distill_model( self:Self, - num_students:int, + model_pair_name:str, num_trajectory_epochs:int, - dataset:Dataset, - distillation_data_indices:np.ndarray, - attack_mode:str - ) -> None: + distillation_data_indices: np.ndarray, + label_only:bool=False + ) -> list[Module]: """Create and train shadow models based on the blueprint. Args: ---- - num_students (int): The number of student models to create. + model_pair_name (str): The name of the model pair. num_trajectory_epochs (int): The number of trajectory epochs for training. - dataset (torch.utils.data.Dataset): The full dataset available for training the shadow models. - distillation_data_indices (np.ndarray): The indices of the data points from the dataset to be used for distillation. - attack_mode (str): The mode of attack. + distillation_data_indices (np.ndarray): The indices of the distillation data. + label_only (bool): The mode of attack. Returns: ------- - None + list[Module]: A list of distillation model checkpoints. """ - if num_students < 0: - raise ValueError("Number of student models cannot be negative") if num_trajectory_epochs < 0: raise ValueError("Number of trajectory epochs cannot be negative") - #Intiate the distillation model - distillation_model = self.distillation_model_blueprint(**self.init_params) - - self._train_distillation_model(distillation_model, - dataset, - distillation_data_indices, - num_trajectory_epochs, - attack_mode, - ) - - - def _train_distillation_model( - self:Self, - distillation_model:Module, - distillation_dataset:Dataset, - distillation_data_indices: np.ndarray, - num_trajectory_epochs:int, - attack_mode:str - ) -> None: + model_pair = self.model_pairs[model_pair_name] + student_model = model_pair["student"] + teacher_model = model_pair["teacher"] + optimizer = model_pair["optimizer"] # optimizer for student model # Get the device for training gpu_or_cpu = device("cuda" if cuda.is_available() else "cpu") - distillation_model.to(gpu_or_cpu) - teacher_model = self.target_model.model_obj teacher_model.to(gpu_or_cpu) + student_model.to(gpu_or_cpu) + student_model.train() + teacher_model.eval() - distillation_train_loader = DataLoader(distillation_dataset, batch_size=self.batch_size, shuffle=True) - self.logger.info(f"Created distillation dataset with size {len(distillation_dataset)}") + data_loader = self.handler.get_dataloader(distillation_data_indices, self.batch_size) + self.logger.info(f"Created distillation dataset with size {len(distillation_data_indices)}") - optimizer = self.optimizer_class(distillation_model.parameters(), **self.optimizer_config) + distillation_checkpoints = [] for d in range(num_trajectory_epochs): - - distillation_model.train() - teacher_model.eval() - epoch_loss = 0 - # Loop over each epoch - self.logger.info(f" *** Training distillation of shadow model epoch: {d}") - # Loop over the training set - for data, target_labels in distillation_train_loader: + for data, target_labels in tqdm(data_loader, desc=f"Epoch {d+1}/{num_trajectory_epochs}"): # Move data to the device - data, target_labels = data.to(gpu_or_cpu, non_blocking=True), target_labels.to(gpu_or_cpu, non_blocking=True) # noqa: PLW2901 - target_labels = target_labels.long() # noqa: PLW2901 + data = data.to(gpu_or_cpu, non_blocking=True) + target_labels = target_labels.to(gpu_or_cpu, non_blocking=True).long() # Output of the distillation model - output = distillation_model(data) + output_student = student_model(data) output_teacher = teacher_model(data) - - if attack_mode == "label_only": - loss = CrossEntropyLoss()(output, target_labels) - elif attack_mode == "soft_label": - loss = KLDivLoss(reduction="batchmean")(F.log_softmax(output, dim=1), - F.softmax(output_teacher.float(), - dim=1)) - + # TODO: add hopskipjump distance here + if label_only: + loss = CrossEntropyLoss()(output_student, target_labels) # TODO: I think this is wrong + else: + loss = KLDivLoss(reduction="batchmean")(F.log_softmax(output_student, dim=1), + F.softmax(output_teacher.float(), dim=1)) optimizer.zero_grad(set_to_none=True) loss.backward() optimizer.step() epoch_loss += loss.item() + self.logger.info(f"Epoch {d+1}/{num_trajectory_epochs} | Loss: {epoch_loss}") + with open(f"{self.storage_path}/{model_pair_name}_{d}.pkl", "wb") as f: + save(student_model.state_dict(), f) + self.logger.info(f"Saved distillation model for epoch {d} to {self.storage_path}") + distillation_checkpoints.append(student_model) - self.logger.info(f"Training distillation of shadow model epoch {d} completed") - self.logger.info(f"loss: {epoch_loss}") - with open(f"{self.storage_path}/{self.model_storage_name}_{d}.pkl", "wb") as f: - save(distillation_model.state_dict().copy(), f) - self.logger.info(f"Saved distillation of shadow model epoch {d} to {self.storage_path}") - - - self.logger.info("Storing metadata for distillation of shadow model") + self.logger.info("Storing metadata for distillation model") meta_data = {} meta_data["init_params"] = self.init_params - meta_data["train_indices"] = distillation_data_indices meta_data["num_train"] = len(distillation_data_indices) - meta_data["optimizer"] = self.optimizer_class.__name__ - meta_data["criterion"] = self.criterion_class.__name__ + meta_data["optimizer"] = optimizer.__class__.__name__ meta_data["batch_size"] = self.batch_size - #TODO: epoch here? meta_data["epochs"] = self.epochs - meta_data["learning_rate"] = self.optimizer_config["lr"] - meta_data["weight_decay"] = self.optimizer_config.get("weight_decay", 0.0) - with open(f"{self.storage_path}/{self.metadata_storage_name}_{d}.pkl", "wb") as f: - pickle.dump(meta_data, f) - - self.logger.info(f"Metadata for distillation of shadow model stored in {self.storage_path}") - + meta_data["label_only"] = label_only - def _load_distillation_epoch(self:Self, index:int) -> Module: - if index < 0: - raise ValueError("Index cannot be negative") - if index >= len(os.listdir(self.storage_path)): - raise ValueError("Index out of range") - distillation_epoch = self.distillation_model_blueprint(**self.init_params) - with open(f"{self.storage_path}/{self.model_storage_name}_{index}.pkl", "rb") as f: - distillation_epoch.load_state_dict(load(f)) - self.logger.info(f"Loaded distillaion of shadow model, epoch {index}") - return distillation_epoch - - def get_distillation_epochs(self:Self, num_epochs:int) -> list: - """Retrieves the distillation epochs. - - Args: - ---- - num_epochs (int): The number of epochs to retrieve. + with open(f"{self.storage_path}/{model_pair_name}_metadata_{d}.pkl", "wb") as f: + pickle.dump(meta_data, f) - Returns: - ------- - list: A list of distillation epochs. + self.logger.info(f"Metadata for distillation model stored in {self.storage_path}") - """ - distillation_epochs = [] - for i in range(num_epochs): - self.logger.info(f"Loading distillation epoch {i}") - epoch = self._load_distillation_epoch(i) - distillation_epochs.append(epoch) - return distillation_epochs + return distillation_checkpoints diff --git a/leakpro/attacks/utils/model_handler.py b/leakpro/attacks/utils/model_handler.py new file mode 100644 index 00000000..a6156f84 --- /dev/null +++ b/leakpro/attacks/utils/model_handler.py @@ -0,0 +1,140 @@ +"""Abstract class for the model handler.""" + +import joblib +import torch +from torch import load +from torch.nn import Module + +from leakpro.import_helper import Self, Tuple +from leakpro.user_inputs.abstract_input_handler import AbstractInputHandler +from leakpro.utils.input_handler import ( + get_class_from_module, + get_criterion_mapping, + get_optimizer_mapping, + import_module_from_file, +) + + +class ModelHandler(): + """Class to handle models used in attacks.""" + + def __init__( + self:Self, + handler: AbstractInputHandler, + )->None: + """Initialize the ModelHandler class.""" + self.logger = handler.logger + self.handler = handler + self.init_params = {} + + def _import_model_from_path(self:Self, module_path:str, model_class:str)->Module: + """Import the model from the given path. + + Args: + ---- + module_path (str): The path to the module. + model_class (str): The name of the model class. + + Returns: + ------- + Module: The imported blueprint of a model. + + """ + try: + module = import_module_from_file(module_path) + self.model_blueprint = get_class_from_module(module, model_class) + except Exception as e: + raise ValueError(f"Failed to create model blueprint from {model_class} in {module_path}") from e + + def _get_optimizer_class(self:Self, optimizer_name:str)->torch.optim.Optimizer: + """Get the optimizer class based on the optimizer name. + + Args: + ---- + optimizer_name (str): The name of the optimizer. + + Returns: + ------- + torch.optim.Optimizer: The optimizer class. + + """ + try: + self.optimizer_class = get_optimizer_mapping()[optimizer_name] + except Exception as e: + raise ValueError(f"Failed to create optimizer from {self.optimizer_config['name']}") from e + + def _get_criterion_class(self:Self, criterion_name:str)->torch.nn.Module: + """Get the criterion class based on the criterion name. + + Args: + ---- + criterion_name (str): The name of the criterion. + + Returns: + ------- + torch.nn.Module: The criterion class. + + """ + try: + self.criterion_class = get_criterion_mapping()[criterion_name] + except Exception as e: + raise ValueError(f"Failed to create criterion from {self.criterion_config['name']}") from e + + def _get_model_criterion_optimizer(self:Self) -> Tuple[Module, Module, Module]: + """Get the model, criterion, and optimizer from the handler or config.""" + + # Set up shadow model from config file + if self.model_blueprint is not None: + model = self.model_blueprint(**self.init_params) + optimizer = self.optimizer_class(model.parameters(), **self.optimizer_config) + criterion = self.criterion_class(**self.loss_config) + else: + # Set up shadow model from handler + model, criterion, optimizer = self.handler.get_target_replica() + + return model, criterion, optimizer + + def _load_model(self:Self, model_path:str) -> Module: + """Load a shadow model from a saved state. + + Args: + ---- + model_path (str): The path to the saved model. + + Returns: + ------- + Module: The loaded shadow model. + + """ + try: + blueprint = self.handler.target_model_blueprint if self.model_blueprint is None else self.model_blueprint + model = blueprint(**self.init_params) # noqa: E501 + criterion = self.handler.get_criterion() if self.criterion_class is None else self.criterion_class(**self.loss_config) + except Exception as e: + raise ValueError("Failed to create model from blueprint") from e + + try: + with open(model_path, "rb") as f: + model.load_state_dict(load(f)) + self.logger.info(f"Loaded model from {model_path}") + return model, criterion + except FileNotFoundError as e: + raise ValueError(f"Model file not found at {model_path}") from e + + def _load_metadata(self:Self, metadata_path:str) -> dict: + """Load metadata from a saved state. + + Args: + ---- + metadata_path (str): The path to the saved metadata. + + Returns: + ------- + dict: The loaded metadata. + + """ + try: + with open(metadata_path, "rb") as f: + return joblib.load(f) + except FileNotFoundError as e: + raise FileNotFoundError(f"Metadata at {metadata_path} not found") from e diff --git a/leakpro/attacks/utils/shadow_model_handler.py b/leakpro/attacks/utils/shadow_model_handler.py index feb0c34c..75fa12b0 100644 --- a/leakpro/attacks/utils/shadow_model_handler.py +++ b/leakpro/attacks/utils/shadow_model_handler.py @@ -1,21 +1,18 @@ """Module for handling shadow models.""" -import logging import os import pickle import re -import joblib import numpy as np import torch -from torch import Tensor, cuda, device, jit, load, nn, optim, save +from torch import Tensor, jit, save from torch.nn import Module -from torch.utils.data import DataLoader, Dataset -from tqdm import tqdm +from leakpro.attacks.utils.model_handler import ModelHandler from leakpro.import_helper import Self, Tuple from leakpro.model import PytorchModel -from leakpro.utils.input_handler import get_class_from_module, import_module_from_file +from leakpro.user_inputs.abstract_input_handler import AbstractInputHandler def singleton(cls): # noqa: ANN001, ANN201 @@ -36,104 +33,92 @@ def get_instance(*args, **kwargs): # noqa: ANN003, ANN002, ANN202 return get_instance @singleton -class ShadowModelHandler(): +class ShadowModelHandler(ModelHandler): """A class handling the creation, training, and loading of shadow models.""" - # Create a dictionary mapping lowercase names to optimizer classes (Optimizer is the base class) - optimizer_mapping = { - attr.lower(): getattr(optim, attr) - for attr in dir(optim) - if isinstance(getattr(optim, attr), type) and issubclass(getattr(optim, attr), optim.Optimizer) - } - - # Create a dictionary mapping lowercase names to loss classes (_Loss is the base class) - loss_mapping = {} - - for attr in dir(nn): - # Get the attribute - attribute = getattr(nn, attr, None) - # Ensure it's a class and a subclass of _Loss - if isinstance(attribute, type) and issubclass(attribute, nn.modules.loss._Loss): - loss_mapping[attr.lower()] = attribute - - def __init__(self:Self, target_model:Module, target_config:dict, config:dict, logger:logging.Logger)->None: + def __init__(self:Self, handler: AbstractInputHandler) -> None: # noqa: PLR0912 """Initialize the ShadowModelHandler. Args: ---- - target_model (Module): The target model. - target_config (dict): The configuration of the target model. - config (dict): The configuration of the ShadowModelHandler. - logger (logging.Logger): The logger object for logging. + handler (AbstractInputHandler): The input handler object. """ - config = config or {} - module_path = config.get("module_path") - model_class_path = config.get("model_class_path") - - self.logger = logger - - # If no path to shadow model is provided, use the target model blueprint - if module_path is None or model_class_path is None: - self.init_params = target_config["init_params"] - self.shadow_model_blueprint = target_model.model_obj.__class__ - - self.logger.info("Shadow model blueprint: target model") + super().__init__(handler) + self.configs = handler.configs["shadow_model"] + + # Read from the config file + module_path = self.configs.get("module_path", None) + model_class = self.configs.get("model_class", None) + self.optimizer_config = self.configs.get("optimizer", None) + self.loss_config = self.configs.get("loss", None) + self.batch_size = self.configs.get("batch_size", 32) + self.epochs = self.configs.get("epochs", 10) + self.storage_path = self.configs.get("storage_path") + + if module_path is None or model_class is None: + self.model_blueprint = None + self.criterion_class = None + self.optimizer_class = None else: - self.module_path = module_path - self.model_class_path = model_class_path - self.init_params = config.get("init_params", {}) - module = import_module_from_file(self.module_path) - self.shadow_model_blueprint = get_class_from_module(module, self.model_class_path) - - self.logger.info(f"Shadow model blueprint loaded from {self.model_class_path} from {self.module_path}") - - self.storage_path = config["storage_path"] + self.init_params = self.configs.get("init_params", {}) + self._import_model_from_path(module_path, model_class) + + # Read the optimizer for shadow models if it has been provided + if self.optimizer_config is None: + raise ValueError("Optimizer configuration not found in configs.") + optimizer_name = self.optimizer_config.pop("name") # pop to only have input parameters left + self._get_optimizer_class(optimizer_name) + + # Read the loss function for shadow models if it has been provided + if self.loss_config is None: + raise ValueError("Loss configuration not found in configs.") + criterion_class = self.loss_config.pop("name") # pop to only have input parameters left + self._get_criterion_class(criterion_class) + + # Create the shadow model storage folder + if self.storage_path is None: + raise ValueError("Storage path for shadow models not provided") # Check if the folder does not exist if not os.path.exists(self.storage_path): # Create the folder os.makedirs(self.storage_path) self.logger.info(f"Created folder {self.storage_path}") - self.batch_size = config.get("batch_size", target_config["batch_size"]) - if self.batch_size < 0: - raise ValueError("Batch size cannot be negative") - - self.epochs = config.get("epochs", target_config["epochs"]) - if self.epochs < 0: - raise ValueError("Number of epochs cannot be negative") - - self.optimizer_config = config.get("optimizer", target_config["optimizer"]) - if self.optimizer_config is None: - raise ValueError("Optimizer configuration not provided") - - self.loss_config = config.get("loss", target_config["loss"]) - if self.loss_config is None: - raise ValueError("Loss configuration not provided") - - self.optimizer_class = self.optimizer_mapping[self.optimizer_config.pop("name")] - self.criterion_class = self.loss_mapping[self.loss_config.pop("name")] - + # Set up the names of the shadow model self.model_storage_name = "shadow_model" self.metadata_storage_name = "metadata" + def _filter(self:Self, data_size:int, online:bool)->list[int]: + # Get the metadata for the shadow models + entries = os.listdir(self.storage_path) + pattern = re.compile(rf"^{self.metadata_storage_name}_\d+\.pkl$") + files = [f for f in entries if pattern.match(f)] + # Extract the index of the metadata + all_indices = [int(re.search(r"\d+", f).group()) for f in files] + # Filter out indices to only keep the ones with the same data size + filtered_indices = [] + for i in all_indices: + metadata = self._load_shadow_metadata(i) + if metadata["num_train"] == data_size and metadata["online"] == online: + filtered_indices.append(i) + return all_indices, filtered_indices + def create_shadow_models( self:Self, num_models:int, - dataset:Dataset, - indicies: np.ndarray, - training_fraction:float=1.0, - retrain:bool = False - ) -> None: + shadow_population: np.ndarray, + training_fraction:float=0.1, + online:bool=False + ) -> list[int]: """Create and train shadow models based on the blueprint. Args: ---- num_models (int): The number of shadow models to create. - dataset (torch.utils.data.Dataset): The full dataset available for training the shadow models. - indicies (list): The indices to use from the dataset for training the shadow models. - training_fraction (float): The fraction of the dataset to use for training. - retrain (bool): Whether to retrain the shadow models or not. + shadow_population (list): The indices in population eligible for training the shadow models. + training_fraction (float): The fraction of the shadow population to use for training of a shadow model. + online (bool): Whether the shadow models are created using an online or offline dataset. Returns: ------- @@ -143,35 +128,38 @@ def create_shadow_models( if num_models < 0: raise ValueError("Number of models cannot be negative") - if retrain: - self.logger.info("Retraining shadow models") - num_to_reuse = 0 - else: - entries = os.listdir(self.storage_path) - # Define a regex pattern to match files like model_{i}.pkl - pattern = re.compile(rf"^{self.model_storage_name}_\d+\.pkl$") - model_files = [f for f in entries if pattern.match(f)] - num_to_reuse = len(model_files) - # Get the size of the dataset - shadow_data_size = int(len(indicies)*training_fraction) - - for i in range(num_to_reuse, num_models): - - if training_fraction < 1.0: - shadow_data_indices = np.random.choice(indicies, shadow_data_size, replace=False) - shadow_dataset = dataset.subset(shadow_data_indices) - else: - shadow_dataset = dataset - shadow_data_indices = indicies - shadow_train_loader = DataLoader(shadow_dataset, batch_size=self.batch_size, shuffle=True) - self.logger.info(f"Created shadow dataset {i} with size {len(shadow_dataset)}") - - self.logger.info(f"Training shadow model {i}") - shadow_model = self.shadow_model_blueprint(**self.init_params) - shadow_model, train_acc, train_loss = self._train_shadow_model( - shadow_model, shadow_train_loader, self.optimizer_config, self.loss_config, self.epochs - ) + data_size = int(len(shadow_population)*training_fraction) + all_indices, filtered_indices = self._filter(data_size, online) + + # Create a list of indices to use for the new shadow models + n_existing_models = len(filtered_indices) + + if n_existing_models >= num_models: + self.logger.info("Number of existing models exceeds or equals the number of models to create") + return filtered_indices[:num_models] + + indices_to_use = [] + next_index = max(all_indices) + 1 if all_indices else 0 + while len(indices_to_use) < (num_models-n_existing_models): + indices_to_use.append(next_index) + next_index += 1 + + for i in indices_to_use: + # Get dataloader + data_indices = np.random.choice(shadow_population, data_size, replace=False) + data_loader = self.handler.get_dataloader(data_indices, self.batch_size) + + # Get shadow model blueprint + model, criterion, optimizer = self._get_model_criterion_optimizer() + + # Train shadow model + self.logger.info(f"Training shadow model {i} on {len(data_loader)} points") + training_results = self.handler.train(data_loader, model, criterion, optimizer, self.epochs) + # Read out results + shadow_model = training_results["model"] + train_acc = training_results["metrics"]["accuracy"] + train_loss = training_results["metrics"]["loss"] self.logger.info(f"Training shadow model {i} complete") with open(f"{self.storage_path}/{self.model_storage_name}_{i}.pkl", "wb") as f: @@ -181,77 +169,21 @@ def create_shadow_models( self.logger.info(f"Storing metadata for shadow model {i}") meta_data = {} meta_data["init_params"] = self.init_params - meta_data["train_indices"] = shadow_data_indices - meta_data["num_train"] = len(shadow_data_indices) - meta_data["optimizer"] = self.optimizer_class.__name__ - meta_data["criterion"] = self.criterion_class.__name__ + meta_data["train_indices"] = data_indices + meta_data["num_train"] = len(data_indices) + meta_data["optimizer"] = optimizer.__class__.__name__ + meta_data["criterion"] = criterion.__class__.__name__ meta_data["batch_size"] = self.batch_size meta_data["epochs"] = self.epochs - meta_data["learning_rate"] = self.optimizer_config["lr"] - meta_data["weight_decay"] = self.optimizer_config.get("weight_decay", 0.0) meta_data["train_acc"] = train_acc meta_data["train_loss"] = train_loss + meta_data["online"] = online with open(f"{self.storage_path}/{self.metadata_storage_name}_{i}.pkl", "wb") as f: pickle.dump(meta_data, f) self.logger.info(f"Metadata for shadow model {i} stored in {self.storage_path}") - - def _train_shadow_model( - self:Self, - shadow_model:Module, - train_loader:DataLoader, - optimizer_config:dict, - loss_config:dict, - epochs:int - ) -> Tuple[Module, np.ndarray, np.ndarray]: - """Train a shadow model. - - Args: - ---- - shadow_model (Module): The shadow model to train. - train_loader (torch.utils.data.DataLoader): The training data loader. - optimizer_config (dict): The optimizer configuration to use. - loss_config (dict): The loss function configuration to use. - epochs (int): The number of epochs to train the model. - - Returns: - ------- - Tuple[Module, np.ndarray, np.ndarray]: The trained shadow model, the training accuracy, and the training loss. - - """ - gpu_or_cpu = device("cuda" if cuda.is_available() else "cpu") - - shadow_model.to(gpu_or_cpu) - shadow_model.train() - - optimizer = self.optimizer_class(shadow_model.parameters(), **optimizer_config) - criterion = self.criterion_class(**loss_config) - - for epoch in range(epochs): - train_loss, train_acc = 0, 0 - shadow_model.train() - for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"): - labels = labels.long() # noqa: PLW2901 - inputs, labels = inputs.to(gpu_or_cpu, non_blocking=True), labels.to(gpu_or_cpu, non_blocking=True) # noqa: PLW2901 - optimizer.zero_grad() - outputs = shadow_model(inputs) - loss = criterion(outputs, labels) - pred = outputs.data.max(1, keepdim=True)[1] - loss.backward() - - optimizer.step() - - # Accumulate performance of shadow model - train_acc += pred.eq(labels.data.view_as(pred)).sum() - train_loss += loss.item() - - log_train_str = ( - f"Epoch: {epoch+1}/{epochs} | Train Loss: {train_loss/len(train_loader):.8f} | " - f"Train Acc: {float(train_acc)/len(train_loader.dataset):.8f}") - self.logger.info(log_train_str) - shadow_model.to("cpu") - return shadow_model, train_acc, train_loss + return filtered_indices + indices_to_use def _load_shadow_model(self:Self, index:int) -> Module: """Load a shadow model from a saved state. @@ -269,62 +201,23 @@ def _load_shadow_model(self:Self, index:int) -> Module: raise ValueError("Index cannot be negative") if index >= len(os.listdir(self.storage_path)): raise ValueError("Index out of range") - shadow_model = self.shadow_model_blueprint(**self.init_params) - - try: - with open(f"{self.storage_path}/{self.model_storage_name}_{index}.pkl", "rb") as f: - shadow_model.load_state_dict(load(f)) - self.logger.info(f"Loaded shadow model {index}") - return PytorchModel(shadow_model, self.criterion_class(**self.loss_config)) - except FileNotFoundError: - self.logger.error(f"Could not find the shadow model {index}") - return None - - def get_shadow_models(self:Self, num_models:int) -> Tuple[list, list]: + + model_path = f"{self.storage_path}/{self.model_storage_name}_{index}.pkl" + shadow_model, criterion = self._load_model(model_path) + return PytorchModel(shadow_model, criterion) + + def get_shadow_models(self:Self, num_models:list[int]) -> Tuple[list, list]: """Load the the shadow models.""" shadow_models = [] shadow_model_indices = [] - for i in range(num_models): + for i in num_models: self.logger.info(f"Loading shadow model {i}") model = self._load_shadow_model(i) shadow_models.append(model) shadow_model_indices.append(i) return shadow_models, shadow_model_indices - def identify_models_trained_on_samples(self:Self, shadow_model_indices: list[int], sample_indices:set[int]) -> list: - """Identify the shadow models trained on the provided samples. - - Args: - ---- - shadow_model_indices (list[int]): The indices of the shadow models. - sample_indices (set[int]): The indices of the samples. - - Returns: - ------- - list: The list of shadow models trained on the provided samples. - - """ - if shadow_model_indices is None: - raise ValueError("Shadow model indices must be provided") - if sample_indices is None: - raise ValueError("Sample indices must be provided") - - if isinstance(sample_indices, list): - sample_indices = set(sample_indices) - - self.logger.info("Identifying shadow models trained on provided samples") - shadow_model_trained_on_data_index = np.zeros((len(shadow_model_indices), len(sample_indices)), dtype=bool) - for i in shadow_model_indices: - with open(f"{self.storage_path}/{self.metadata_storage_name}_{i}.pkl", "rb") as f: - meta_data = joblib.load(f) - train_indices = set(meta_data["train_indices"]) - - for j in range(len(sample_indices)): - shadow_model_trained_on_data_index[i, j] = sample_indices[j] in train_indices - - return shadow_model_trained_on_data_index - - def _load_metadata(self:Self, index:int) -> dict: + def _load_shadow_metadata(self:Self, index:int) -> dict: """Load a shadow model from a saved state. Args: @@ -340,28 +233,25 @@ def _load_metadata(self:Self, index:int) -> dict: raise ValueError("Index cannot be negative") if index >= len(os.listdir(self.storage_path)): raise ValueError("Index out of range") + metadata_path = f"{self.storage_path}/{self.metadata_storage_name}_{index}.pkl" + return self._load_metadata(metadata_path) - try: - with open(f"{self.storage_path}/{self.metadata_storage_name}_{index}.pkl", "rb") as f: - return joblib.load(f) - except FileNotFoundError: - self.logger.error(f"Could not find the metadata for shadow model {index}") - return None - - def get_shadow_model_metadata(self:Self, num_models:int) -> list: + def get_shadow_model_metadata(self:Self, model_indices:list[int]) -> list: """Load the the shadow model metadata.""" metadata = [] - for i in range(num_models): + if model_indices is int: + model_indices = range(model_indices) + for i in model_indices: self.logger.info(f"Loading metadata {i}") - metadata.append(self._load_metadata(i)) + metadata.append(self._load_shadow_metadata(i)) return metadata - def get_in_indices_mask(self:Self, num_models:int, dataset:np.ndarray) -> np.ndarray: + def get_in_indices_mask(self:Self, shadow_model_indices:list[int], dataset:np.ndarray) -> np.ndarray: """Get the mask indicating which indices in the dataset are present in the shadow model training set. Args: ---- - num_models (int): The number of shadow models. + shadow_model_indices (list[int]): The number of shadow models. dataset (np.ndarray): The dataset. Returns: @@ -370,7 +260,7 @@ def get_in_indices_mask(self:Self, num_models:int, dataset:np.ndarray) -> np.nda """ # Retrieve metadata for shadow models - metadata = self.get_shadow_model_metadata(num_models) + metadata = self.get_shadow_model_metadata(shadow_model_indices) # Extract training indices for each shadow model models_in_indices = [data["train_indices"] for data in metadata] diff --git a/leakpro/dev_utils/train.py b/leakpro/dev_utils/train.py index cbb14528..0967aaed 100644 --- a/leakpro/dev_utils/train.py +++ b/leakpro/dev_utils/train.py @@ -221,8 +221,6 @@ def save_model_and_metadata( # noqa: PLR0913 """ # Save model and metadata - model_metadata_dict = {"model_metadata": {}} - log_dir = configs["run"]["log_dir"] Path(log_dir).mkdir(parents=True, exist_ok=True) @@ -258,6 +256,5 @@ def save_model_and_metadata( # noqa: PLR0913 meta_data["test_loss"] = test_loss meta_data["dataset"] = configs["data"]["dataset"] - model_metadata_dict["model_metadata"] = meta_data with open(f"{log_dir}/model_metadata.pkl", "wb") as f: - pickle.dump(model_metadata_dict, f) + pickle.dump(meta_data, f) diff --git a/leakpro/user_inputs/abstract_input_handler.py b/leakpro/user_inputs/abstract_input_handler.py new file mode 100644 index 00000000..307690f8 --- /dev/null +++ b/leakpro/user_inputs/abstract_input_handler.py @@ -0,0 +1,222 @@ +"""Parent class for user inputs.""" + +import logging +from abc import ABC, abstractmethod + +import joblib +import numpy as np +import torch +from torch import nn +from torch.utils.data import DataLoader + +from leakpro.import_helper import Self, Tuple +from leakpro.utils.input_handler import get_class_from_module, import_module_from_file + + +class AbstractInputHandler(ABC): + """Parent class for user inputs.""" + + def __init__(self:Self, configs: dict, logger:logging.Logger) -> None: + self.configs = configs + self.logger = logger + + # These objects will be generated by the setup function and then saved in the handler object + self.target_model_blueprint = None + self.target_model = None + self.target_model_metadata = None + self.population = None + + self.setup() + + # User-defined attributes + self.criterion = None + self.optimizer = None + + # must be called after initialization + def setup(self:Self) -> None: + """Set up the code handler by retrieving the model class, target metadata, trained target model, and population.""" + self._load_model_class() + self._load_target_metadata() + self._load_trained_target_model() + self._load_population() + + def _load_population(self:Self) -> None: + """Default implementation of the population loading.""" + try: + with open(self.configs["target"]["data_path"], "rb") as file: + self.population = joblib.load(file) + self.logger.info(f"Loaded population dataset from {self.configs['target']['data_path']}") + self.logger.info(f"Loaded population dataset from {self.configs['target']['data_path']}") + except FileNotFoundError as e: + raise FileNotFoundError(f"Could not find the population dataset at {self.configs['target']['data_path']}") from e + + def _load_model_class(self:Self) -> None: + """Get the model class blueprint from the target module.""" + model_class=self.configs["target"].get("model_class", None) + if model_class is None: + raise ValueError("model_class not found in configs.") + + module_path=self.configs["target"].get("module_path", None) + if module_path is None: + raise ValueError("module_path not found in configs.") + + try: + target_module = import_module_from_file(module_path) + self._target_model_blueprint = get_class_from_module(target_module, model_class) + self.logger.info(f"Target model blueprint created from {model_class} in {module_path}.") + except Exception as e: + raise ValueError(f"Failed to create the target model blueprint from {model_class} in {module_path}") from e + + def _validate_target_metadata(self:Self) -> None: + """Validate the target model metadata.""" + if "train_indices" not in self.target_model_metadata: + raise ValueError("train_indices not found in target model metadata.") + + if "test_indices" not in self.target_model_metadata: + raise ValueError("test_indices not found in target model metadata.") + + def _load_target_metadata(self:Self) -> None: + """Get the target model metadata from the trained model metadata file.""" + target_model_metadata_path = self.configs["target"].get("trained_model_metadata_path", None) + if target_model_metadata_path is None: + raise ValueError("Trained model metadata path not found in configs.") + try: + with open(target_model_metadata_path, "rb") as f: + self.target_model_metadata = joblib.load(f) + self._validate_target_metadata() + self.logger.info(f"Loaded target model metadata from {target_model_metadata_path}") + except FileNotFoundError as e: + raise FileNotFoundError(f"Could not find the target model metadata at {target_model_metadata_path}") from e + + def _load_trained_target_model(self:Self) -> None: + """Get the trained target model.""" + model_path = self.configs["target"].get("trained_model_path", None) + if model_path is None: + raise ValueError("Trained model path not found in configs.") + init_params = self.target_model_metadata.get("init_params", {}) + try: + with open(self.configs["target"]["trained_model_path"], "rb") as f: + self.target_model = self.target_model_blueprint(**init_params) + self.target_model.load_state_dict(torch.load(f)) + self.logger.info(f"Loaded target model from {model_path}") + except FileNotFoundError as e: + raise FileNotFoundError(f"Could not find the trained target model at {model_path}") from e + + #------------------------------------------------ + # Methods related to population dataset + #------------------------------------------------ + def _validate_indices(self:Self, dataset_indices: np.ndarray) -> None: + if self.population is None: + raise ValueError("Population dataset is not loaded.") + + if len(dataset_indices) == 0: + raise ValueError("Dataset indices are empty.") + + if len(dataset_indices) > len(self.population): + raise ValueError("Dataset indices are greater than the population size.") + + if len(dataset_indices) != len(np.unique(dataset_indices)): + raise ValueError("Dataset indices contain duplicates.") + + if not np.all(dataset_indices < len(self.population)): + raise ValueError("Dataset indices contain values greater than the population size.") + + if not np.all(dataset_indices >= 0): + raise ValueError("Dataset indices contain negative values.") + + if not np.all(np.isfinite(dataset_indices)): + raise ValueError("Dataset indices contain non-finite values.") + + if not np.issubdtype(dataset_indices.dtype, np.integer): + raise ValueError("Dataset indices are not integers.") + + def get_dataset(self:Self, dataset_indices: np.ndarray) -> np.ndarray: + """Get the dataset from the population.""" + self._validate_indices(dataset_indices) + return self.population.subset(dataset_indices) + + def get_dataloader(self: Self, dataset_indices: np.ndarray, batch_size: int = 32) -> DataLoader: + """Default implementation of the dataloader.""" + dataset = self.get_dataset(dataset_indices) + return DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True) + + #------------------------------------------------ + # Methods related to target model + #------------------------------------------------ + def get_target_replica(self:Self) -> Tuple[torch.nn.Module, nn.modules.loss._Loss, torch.optim.Optimizer]: + """Get an instance of a model created from the target model.""" + init_params = self.target_model_metadata.get("init_params", {}) + try: + model_replica = self.target_model_blueprint(**init_params) + return model_replica, self.get_criterion(), self.get_optimizer(model_replica) + except Exception as e: + raise ValueError("Failed to create an instance of the shadow model.") from e + + @abstractmethod + def get_criterion(self:Self, criterion: torch.nn.modules.loss._Loss) -> None: + """Get the loss function for the target model to be used in shadow model training.""" + pass + + @abstractmethod + def get_optimizer(self:Self, model:torch.nn.Module) -> torch.optim.Optimizer: + """Get the optimizer used for the target model to be used in shadow model training.""" + pass + + @abstractmethod + def train( + self: Self, + dataloader: DataLoader, + model: torch.nn.Module, + criterion: torch.nn.modules.loss._Loss, + optimizer: torch.optim.Optimizer + ) -> nn.Module: + """Procedure to train the shadow models on data from the population.""" + pass + + #------------------------------------------------ + # get-set methods + #------------------------------------------------ + @property + def target_model_blueprint(self:Self) -> torch.nn.Module: + """Get the target model blueprint.""" + return self._target_model_blueprint + + @target_model_blueprint.setter + def target_model_blueprint(self:Self, value:torch.nn.Module) -> None: + """Set the target model blueprint.""" + self._target_model_blueprint = value + + @property + def target_model(self:Self) -> torch.nn.Module: + """Get the trained target model wrapped as PyTorchModel.""" + return self._target_model + + @target_model.setter + def target_model(self:Self, model:torch.nn.Module) -> None: + """Set the trained target model.""" + self._target_model = model + + @property + def target_model_metadata(self:Self) -> dict: + """Get the metadata of the target model.""" + return self._target_model_metadata + + @target_model_metadata.setter + def target_model_metadata(self:Self, metadata:dict) -> None: + """Set the metadata of the target model.""" + self._target_model_metadata = metadata + + @property + def population_size(self:Self) -> int: + """Get the size of the population.""" + return len(self.population) + + @property + def train_indices(self:Self) -> np.ndarray: + """Get the training indices of the target model.""" + return self.target_model_metadata["train_indices"] + + @property + def test_indices(self:Self) -> np.ndarray: + """Get the testing indices of the target model.""" + return self.target_model_metadata["test_indices"] diff --git a/leakpro/user_inputs/cifar10_input_handler.py b/leakpro/user_inputs/cifar10_input_handler.py new file mode 100644 index 00000000..d98f3bdf --- /dev/null +++ b/leakpro/user_inputs/cifar10_input_handler.py @@ -0,0 +1,73 @@ +"""Module containing the class to handle the user input for the CIFAR10 dataset.""" + +import logging + +import torch +from torch import cuda, device, optim +from torch.utils.data import DataLoader +from tqdm import tqdm + +from leakpro.import_helper import Self +from leakpro.user_inputs.abstract_input_handler import AbstractInputHandler + + +class Cifar10InputHandler(AbstractInputHandler): + """Class to handle the user input for the CIFAR10 dataset.""" + + def __init__(self:Self, configs: dict, logger:logging.Logger) -> None: + super().__init__(configs = configs, logger = logger) + + + def get_criterion(self:Self)->None: + """Set the CrossEntropyLoss for the model.""" + return torch.nn.CrossEntropyLoss() + + def get_optimizer(self: Self, model:torch.nn.Module) -> None: + """Set the optimizer for the model.""" + learning_rate = 0.1 + momentum = 0.8 + return optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum) + + def train( + self: Self, + dataloader: DataLoader, + model: torch.nn.Module = None, + criterion: torch.nn.Module = None, + optimizer: optim.Optimizer = None, + epochs: int = None, + ) -> dict: + """Model training procedure.""" + + # read hyperparams for training (the parameters for the dataloader are defined in get_dataloader): + if epochs is None: + raise ValueError("epochs not found in configs") + + # prepare training + gpu_or_cpu = device("cuda" if cuda.is_available() else "cpu") + model.to(gpu_or_cpu) + + # training loop + for epoch in range(epochs): + train_loss, train_acc = 0, 0 + model.train() + for inputs, labels in tqdm(dataloader, desc=f"Epoch {epoch+1}/{epochs}"): + labels = labels.long() + inputs, labels = inputs.to(gpu_or_cpu, non_blocking=True), labels.to(gpu_or_cpu, non_blocking=True) + optimizer.zero_grad() + outputs = model(inputs) + loss = criterion(outputs, labels) + pred = outputs.data.max(1, keepdim=True)[1] + loss.backward() + optimizer.step() + + # Accumulate performance of shadow model + train_acc += pred.eq(labels.data.view_as(pred)).sum() + train_loss += loss.item() + + log_train_str = ( + f"Epoch: {epoch+1}/{epochs} | Train Loss: {train_loss/len(dataloader):.8f} | " + f"Train Acc: {float(train_acc)/len(dataloader.dataset):.8f}") + self.logger.info(log_train_str) + model.to("cpu") + + return {"model": model, "metrics": {"accuracy": train_acc, "loss": train_loss}} diff --git a/leakpro/utils/input_handler.py b/leakpro/utils/input_handler.py index c0551a6b..cabfcc65 100644 --- a/leakpro/utils/input_handler.py +++ b/leakpro/utils/input_handler.py @@ -4,6 +4,8 @@ import inspect import os +from torch import nn, optim + from leakpro.import_helper import Callable, ModuleType @@ -23,3 +25,24 @@ def get_class_from_module(module:ModuleType, class_name:str) -> Callable: if name == class_name: return obj raise ValueError(f"Class {class_name} not found in module {module.__name__}") + +def get_optimizer_mapping() -> dict: + """Return a dictionary mapping optimizer names to optimizer classes.""" + optimizer_mapping = { + attr.lower(): getattr(optim, attr) + for attr in dir(optim) + if isinstance(getattr(optim, attr), type) and issubclass(getattr(optim, attr), optim.Optimizer) + } + return optimizer_mapping # noqa: RET504 + +def get_criterion_mapping() -> dict: + """Return a dictionary mapping loss names to loss classes.""" + loss_mapping = {} + + for attr in dir(nn): + # Get the attribute + attribute = getattr(nn, attr, None) + # Ensure it's a class and a subclass of _Loss + if isinstance(attribute, type) and issubclass(attribute, nn.modules.loss._Loss): + loss_mapping[attr.lower()] = attribute + return loss_mapping diff --git a/pyproject.toml b/pyproject.toml index 6d347fda..42461dfa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,4 +45,6 @@ lint.ignore = [ "S101", # Using assert "PLR2004", # magic-value-comparison "D107", # Missing docstring in `__init__` + "PLW2901", # redefined-loop-name + "D202", # No blank lines allowed after function docstring ]