From 26e46b53195d75d1d79c40c76f4efceed11ad3ea Mon Sep 17 00:00:00 2001 From: Guillermo Padres Date: Wed, 11 Dec 2024 20:54:43 +0100 Subject: [PATCH] GPJ: adding path for loading and saving results for synthetic data --- .../synthetic_data_attacks/inference_utils.py | 97 ++++++++++--------- .../linkability_utils.py | 62 +++++++----- .../res_inference_base_case_adults.json | 3 +- .../res_inference_worst_case_adults.json | 3 +- .../res_singling_out_n_cols_1_adults.json | 4 +- .../res_singling_out_n_cols_all_adults.json | 4 +- .../singling_out_utils.py | 74 +++++++------- leakpro/synthetic_data_attacks/utils.py | 30 +++--- .../test_inference_utils.py | 56 ++++++----- .../test_linkability_utils.py | 50 +++++----- .../test_singling_out_utils.py | 52 +++++----- .../test_utils.py | 47 +++++---- 12 files changed, 268 insertions(+), 214 deletions(-) diff --git a/leakpro/synthetic_data_attacks/inference_utils.py b/leakpro/synthetic_data_attacks/inference_utils.py index df35d694..9495d623 100755 --- a/leakpro/synthetic_data_attacks/inference_utils.py +++ b/leakpro/synthetic_data_attacks/inference_utils.py @@ -35,82 +35,81 @@ class InferenceResults(BaseModel): secrets: List[str] worst_case_flag: bool - def save(self:Self, - path: str = "../leakpro_output/results/", - name: str = "inference", - case_flag:str = "base", # noqa: ARG002 - config: dict = None # noqa: ARG002 - ) -> None: + def save(self: Self, + path: str = "../leakpro_output/results/", + name: str = "inference", + case_flag: str = "base", # noqa: ARG002 + config: dict = None # noqa: ARG002 + ) -> None: """Save method for InferenceResults.""" - # Data to be saved data = { "resulttype": self.__class__.__name__, "resultname": name, "res": self.model_dump(), } - # Check if path exists, otherwise create it. for _ in range(3): if os.path.exists(path): break path = "../" + path - # If no result folder can be found if not os.path.exists(path): os.makedirs("../../leakpro_output/results/") - # Save the results to a file if not os.path.exists(f"{path}/inference_risk/{name}"): os.makedirs(f"{path}/inference_risk/{name}") - with open(f"{path}/inference_risk/{name}/data.json", "w") as f: json.dump(data, f) - - - self.plot(worst_case_flag=self.worst_case_flag, - show=False, - save=True, - save_path=f"{path}/inference_risk/{name}", - save_name=name - ) + self.plot( + worst_case_flag = self.worst_case_flag, + show = False, + save = True, + save_path = f"{path}/inference_risk/{name}", + save_name = name + ) @staticmethod def load(data: dict) -> "InferenceResults": """Load method for InferenceResults.""" - return InferenceResults(res=data["res"]["res"], - res_cols=data["res"]["res_cols"], - aux_cols=data["res"]["aux_cols"], - secrets=data["res"]["secrets"], - worst_case_flag=data["res"]["worst_case_flag"]) + return InferenceResults( + res = data["res"]["res"], + res_cols = data["res"]["res_cols"], + aux_cols = data["res"]["aux_cols"], + secrets = data["res"]["secrets"], + worst_case_flag = data["res"]["worst_case_flag"] + ) - def plot(self:Self, - high_res_flag: bool = False, - worst_case_flag: bool = False, - show: bool = True, - save: bool = False, - save_path: str = "./", - save_name: str = "fig.png" - ) -> None: + def plot( + self:Self, + high_res_flag: bool = False, + worst_case_flag: bool = False, + show: bool = True, + save: bool = False, + save_path: str = "./", + save_name: str = "fig.png" + ) -> None: """Plot method for InferenceResults.""" from leakpro.synthetic_data_attacks.plots import plot_ir_base_case, plot_ir_worst_case - plot_inference = plot_ir_worst_case if worst_case_flag else plot_ir_base_case - plot_inference(inf_res=InferenceResults(res=self.res, - res_cols=self.res_cols, - aux_cols=self.aux_cols, - secrets=self.secrets, - worst_case_flag=self.worst_case_flag), - high_res_flag=high_res_flag, - show=show, - save=save, - save_name=f"{save_path}/{save_name}") + plot_inference( + inf_res = InferenceResults( + res = self.res, + res_cols = self.res_cols, + aux_cols = self.aux_cols, + secrets = self.secrets, + worst_case_flag = self.worst_case_flag + ), + high_res_flag = high_res_flag, + show = show, + save = save, + save_name = f"{save_path}/{save_name}" + ) @staticmethod def create_results(results: list, save_dir: str = "./") -> str: """Result method for InferenceResults.""" latex = "" - def _latex(save_dir: str, save_name: str) -> str: """Latex method for InferenceResults.""" filename = f"{save_dir}/{save_name}.png" @@ -121,10 +120,8 @@ def _latex(save_dir: str, save_name: str) -> str: \\caption{{Original}} \\end{{figure}} """ - for res in results: name = "inference_"+("worst_case" if res.worst_case_flag else "base_case") - res.plot(show=False, save=True, save_path=save_dir, save_name=name) latex += _latex(save_dir=save_dir, save_name=name) return latex @@ -182,6 +179,7 @@ def inference_risk_evaluation( dataset: str = "test", verbose: bool = False, save_results_json: bool = False, + path: str = None, **kwargs: dict ) -> InferenceResults: """Perform a full inference risk evaluation. @@ -208,6 +206,8 @@ def inference_risk_evaluation( If True, prints progress of evaluation. save_results_json: bool, default is False If True, saves results and combinations to json file. + path: str, default is None + Path where to save json results file. kwargs: dict Other keyword arguments for InferenceEvaluator. @@ -271,11 +271,12 @@ def inference_risk_evaluation( save_res_json_file( prefix = prefix, dataset = dataset, - res = inf_res.model_dump() + res = inf_res.model_dump(), + path = path ) return inf_res -def load_inference_results(*, dataset: str, worst_case_flag: bool = True) -> InferenceResults: +def load_inference_results(*, dataset: str, worst_case_flag: bool = True, path: str = None) -> InferenceResults: """Function to load and return inference results from given dataset.""" prefix = get_inference_prefix(worst_case_flag=worst_case_flag) - return InferenceResults(**load_res_json_file(prefix=prefix, dataset=dataset)) + return InferenceResults(**load_res_json_file(prefix=prefix, dataset=dataset, path=path)) diff --git a/leakpro/synthetic_data_attacks/linkability_utils.py b/leakpro/synthetic_data_attacks/linkability_utils.py index da482c35..6e5baaa3 100755 --- a/leakpro/synthetic_data_attacks/linkability_utils.py +++ b/leakpro/synthetic_data_attacks/linkability_utils.py @@ -195,11 +195,14 @@ class LinkabilityResults(BaseModel): res: List[List[Union[int,float]]] aux_cols: List[List[List[str]]] - def save(self:Self, path: str = "../leakpro_output/results/", name: str = "linkability", config: dict = None) -> None: # noqa: ARG002 + def save( + self: Self, + path: str = "../leakpro_output/results/", + name: str = "linkability", + config: dict = None # noqa: ARG002 + ) -> None: """Save method for LinkabilityResults.""" - id = "linkability" - # Data to be saved data = { "resulttype": self.__class__.__name__, @@ -207,24 +210,19 @@ def save(self:Self, path: str = "../leakpro_output/results/", name: str = "linka "res": self.model_dump(), "id": id, } - # Check if path exists, otherwise create it. for _ in range(3): if os.path.exists(path): break path = "../" + path - # If no result folder can be found if not os.path.exists(path): os.makedirs("../../leakpro_output/results/") - # Save the results to a file if not os.path.exists(f"{path}/{name}/{id}"): os.makedirs(f"{path}/{name}/{id}") - with open(f"{path}/{name}/{id}/data.json", "w") as f: json.dump(data, f) - self.plot(show=False, save=True, save_path=f"{path}", @@ -233,27 +231,38 @@ def save(self:Self, path: str = "../leakpro_output/results/", name: str = "linka @staticmethod def load(data: dict) -> "LinkabilityResults": """Load method for LinkabilityResults.""" - return LinkabilityResults(res=data["res"]["res"], - res_cols=data["res"]["res_cols"], - aux_cols=data["res"]["aux_cols"]) + return LinkabilityResults( + res = data["res"]["res"], + res_cols = data["res"]["res_cols"], + aux_cols = data["res"]["aux_cols"] + ) - def plot(self:Self, high_res_flag: bool = False, show: bool = True, save: bool = False, - save_path: str = "./", save_name: str = "fig.png") -> None: + def plot( + self: Self, + high_res_flag: bool = False, + show: bool = True, + save: bool = False, + save_path: str = "./", + save_name: str = "fig.png" + ) -> None: """Plot method for LinkabilityResults.""" from leakpro.synthetic_data_attacks.plots import plot_linkability - plot_linkability(link_res=LinkabilityResults(res=self.res, - res_cols=self.res_cols, - aux_cols=self.aux_cols), - high_res_flag=high_res_flag, - show=show, - save=save, - save_name=f"{save_path}/{save_name}") + plot_linkability( + link_res = LinkabilityResults( + res = self.res, + res_cols = self.res_cols, + aux_cols = self.aux_cols + ), + high_res_flag = high_res_flag, + show = show, + save = save, + save_name = f"{save_path}/{save_name}" + ) @staticmethod def create_results(results: list, save_dir: str = "./") -> str: """Result method for LinkabilityResults.""" latex = "" - def _latex(save_dir: str, save_name: str) -> str: """Latex method for LinkabilityResults.""" filename = f"{save_dir}/{save_name}.png" @@ -264,7 +273,6 @@ def _latex(save_dir: str, save_name: str) -> str: \\caption{{Original}} \\end{{figure}} """ - for res in results: res.plot(show=False, save=True, save_path=save_dir, save_name="linkability") latex += _latex(save_dir=save_dir, save_name="linkability") @@ -277,6 +285,7 @@ def linkability_risk_evaluation( dataset: str = "test", verbose: bool = False, save_results_json: bool = False, + path: str = None, **kwargs: dict ) -> LinkabilityResults: """Perform a full linkability risk evaluation. @@ -299,6 +308,8 @@ def linkability_risk_evaluation( If True, prints progress of evaluation. save_results_json: bool, default is False If True, saves results and combinations to json file. + path: str, default is None + Path where to save json results file. kwargs: dict Other keyword arguments for LinkabilityEvaluator. @@ -344,10 +355,11 @@ def linkability_risk_evaluation( save_res_json_file( prefix = "linkability", dataset = dataset, - res = link_full_res.model_dump() + res = link_full_res.model_dump(), + path = path ) return link_full_res -def load_linkability_results(*, dataset: str) -> LinkabilityResults: +def load_linkability_results(*, dataset: str, path: str = None) -> LinkabilityResults: """Function to load and return linkability results from given dataset.""" - return LinkabilityResults(**load_res_json_file(prefix="linkability", dataset=dataset)) + return LinkabilityResults(**load_res_json_file(prefix="linkability", dataset=dataset, path=path)) diff --git a/leakpro/synthetic_data_attacks/results/res_inference_base_case_adults.json b/leakpro/synthetic_data_attacks/results/res_inference_base_case_adults.json index e218c711..7f1d2b3a 100755 --- a/leakpro/synthetic_data_attacks/results/res_inference_base_case_adults.json +++ b/leakpro/synthetic_data_attacks/results/res_inference_base_case_adults.json @@ -143976,5 +143976,6 @@ "income", "income", "income" - ] + ], + "worst_case_flag": false } \ No newline at end of file diff --git a/leakpro/synthetic_data_attacks/results/res_inference_worst_case_adults.json b/leakpro/synthetic_data_attacks/results/res_inference_worst_case_adults.json index b133aecf..aabc4d2c 100755 --- a/leakpro/synthetic_data_attacks/results/res_inference_worst_case_adults.json +++ b/leakpro/synthetic_data_attacks/results/res_inference_worst_case_adults.json @@ -378,5 +378,6 @@ "hr_per_week", "country", "income" - ] + ], + "worst_case_flag": true } \ No newline at end of file diff --git a/leakpro/synthetic_data_attacks/results/res_singling_out_n_cols_1_adults.json b/leakpro/synthetic_data_attacks/results/res_singling_out_n_cols_1_adults.json index 192dbd60..3b33cea0 100755 --- a/leakpro/synthetic_data_attacks/results/res_singling_out_n_cols_1_adults.json +++ b/leakpro/synthetic_data_attacks/results/res_singling_out_n_cols_1_adults.json @@ -20,5 +20,7 @@ 0.11556078021516172, 1 ] - ] + ], + "prefix": "singling_out_n_cols_1", + "dataset": "adults" } \ No newline at end of file diff --git a/leakpro/synthetic_data_attacks/results/res_singling_out_n_cols_all_adults.json b/leakpro/synthetic_data_attacks/results/res_singling_out_n_cols_all_adults.json index 6c608024..8f3a9a43 100755 --- a/leakpro/synthetic_data_attacks/results/res_singling_out_n_cols_all_adults.json +++ b/leakpro/synthetic_data_attacks/results/res_singling_out_n_cols_all_adults.json @@ -140,5 +140,7 @@ 0.0, 14 ] - ] + ], + "prefix": "singling_out_n_cols_all", + "dataset": "adults" } \ No newline at end of file diff --git a/leakpro/synthetic_data_attacks/singling_out_utils.py b/leakpro/synthetic_data_attacks/singling_out_utils.py index b2c7b13a..234e94d9 100755 --- a/leakpro/synthetic_data_attacks/singling_out_utils.py +++ b/leakpro/synthetic_data_attacks/singling_out_utils.py @@ -31,9 +31,8 @@ class SinglingOutResults(BaseModel): def save(self:Self, path:str = "../leakpro_output/results/", name: str = "singling_out", config:dict = None) -> None: # noqa: ARG002 """Save method for SinglingOutResults.""" - + from leakpro.synthetic_data_attacks.plots import plot_singling_out id = f"{self.prefix}"+f"_{self.dataset}" - # Data to be saved data = { "resulttype": self.__class__.__name__, @@ -41,42 +40,40 @@ def save(self:Self, path:str = "../leakpro_output/results/", name: str = "singli "res": self.model_dump(), "id": id, } - # Check if path exists, otherwise create it. for _ in range(3): if os.path.exists(path): break - path = "../"+path - + path = "../" + path # If no result folder can be found if not os.path.exists(path): os.makedirs("../../leakpro_output/results/") - # Save the results to a file if not os.path.exists(f"{path}/{name}/{id}"): os.makedirs(f"{path}/{name}/{id}") - with open(f"{path}/{name}/{id}/data.json", "w") as f: json.dump(data, f) - - from leakpro.synthetic_data_attacks.plots import plot_singling_out - plot_singling_out(sin_out_res=SinglingOutResults(res=self.res, - res_cols=self.res_cols, - prefix=self.prefix, - dataset=self.dataset), - show=False, - save=True, - save_name=f"{path}/{name}/{id}/{self.prefix}", - ) + plot_singling_out( + sin_out_res = SinglingOutResults( + res = self.res, + res_cols = self.res_cols, + prefix = self.prefix, + dataset = self.dataset + ), + show = False, + save = True, + save_name = f"{path}/{name}/{id}/{self.prefix}", + ) @staticmethod def load(data: dict) -> None: """Load method for SinglingOutResults.""" - return SinglingOutResults(res=data["res"]["res"], - res_cols=data["res"]["res_cols"], - dataset=data["res"]["dataset"], - prefix=data["res"]["prefix"] - ) + return SinglingOutResults( + res = data["res"]["res"], + res_cols = data["res"]["res_cols"], + dataset = data["res"]["dataset"], + prefix = data["res"]["prefix"] + ) def plot(self:Self, high_res_flag:bool = False, @@ -87,15 +84,18 @@ def plot(self:Self, ) -> None: """Plot method for SinglingOutResults.""" from leakpro.synthetic_data_attacks.plots import plot_singling_out - plot_singling_out(sin_out_res=SinglingOutResults(res=self.res, - res_cols=self.res_cols, - prefix=self.prefix, - dataset=self.dataset), - high_res_flag=high_res_flag, - show = show, - save = save, - save_name = f"{save_path}/{save_name}", - ) + plot_singling_out( + sin_out_res=SinglingOutResults( + res=self.res, + res_cols=self.res_cols, + prefix=self.prefix, + dataset=self.dataset + ), + high_res_flag=high_res_flag, + show = show, + save = save, + save_name = f"{save_path}/{save_name}", + ) @staticmethod def create_results( @@ -147,7 +147,7 @@ def aux_singling_out_risk_evaluation(**kwargs: Any) -> Tuple[Optional[Union[int, verbose = kwargs.pop("verbose") #Get n_cols n_cols = kwargs["n_cols"] - #Return non if n'_cols==2 + #Return non if n_cols==2 #Note: this is because n_cols==2 takes A LOT of time. Seems algorithm is not good for predicates with len==2 if n_cols == 2: return None, None @@ -180,6 +180,7 @@ def singling_out_risk_evaluation( dataset: str = "test", verbose: bool = False, save_results_json: bool = False, + path: str = None, **kwargs: dict ) -> SinglingOutResults: """Perform an individual/full singling-out risk evaluation. @@ -202,6 +203,8 @@ def singling_out_risk_evaluation( If True, prints progress of evaluation. save_results_json: bool, default is False If True, saves results and combinations to json file. + path: str, default is None + Path where to save json results file. kwargs: dict Other keyword arguments for SinglingOutEvaluator. @@ -263,11 +266,12 @@ def singling_out_risk_evaluation( save_res_json_file( prefix = get_singling_out_prefix(n_cols=n_cols), dataset = dataset, - res = sin_out_res.model_dump() + res = sin_out_res.model_dump(), + path = path ) return sin_out_res -def load_singling_out_results(*, dataset: str, n_cols: Optional[int] = None) -> SinglingOutResults: +def load_singling_out_results(*, dataset: str, n_cols: Optional[int] = None, path: str = None) -> SinglingOutResults: """Function to load and return singling-out results from given dataset.""" prefix = get_singling_out_prefix(n_cols=n_cols) - return SinglingOutResults(**load_res_json_file(prefix=prefix, dataset=dataset)) + return SinglingOutResults(**load_res_json_file(prefix=prefix, dataset=dataset, path=path)) diff --git a/leakpro/synthetic_data_attacks/utils.py b/leakpro/synthetic_data_attacks/utils.py index f3ea7509..5082bcca 100755 --- a/leakpro/synthetic_data_attacks/utils.py +++ b/leakpro/synthetic_data_attacks/utils.py @@ -4,27 +4,31 @@ from typing import Tuple # Default path to save results -DEFAULT_PATH_RESULTS = os.path.dirname(os.path.dirname(__file__)) + "/synthetic_data_attacks/results/" +DEFAULT_PATH_RESULTS = os.path.dirname(__file__) + "/results/" -def aux_file_path(*, path: str = None, prefix: str, dataset: str) -> Tuple[str, str]: - """Util function that returns file and file_path for given prefix and dataset.""" +def aux_file_path(*, prefix: str, dataset: str, path: str = None) -> Tuple[str,str]: + """Util function that returns the file path for given prefix and dataset and path.""" if prefix: prefix += "_" file = "res_" + prefix + dataset + ".json" - file_path = os.path.join(path or DEFAULT_PATH_RESULTS, file) - return file, file_path + if path is not None: + if path[-1] != "/": + path += "/" + else: + path = DEFAULT_PATH_RESULTS + return path + file -def save_res_json_file(*, path: str = None, prefix: str, dataset: str, res: dict) -> None: - """Util function that saves results dictionary into a json file with given prefix and dataset name.""" - file, file_path = aux_file_path(path=path, prefix=prefix, dataset=dataset) - # Create directory if it does not exist +def save_res_json_file(*, prefix: str, dataset: str, res: dict, path: str = None) -> None: + """Util function that saves results dictionary into a json file under given path with given prefix and dataset name.""" + file_path = aux_file_path(prefix=prefix, dataset=dataset, path=path) + #Create directory if does not exist os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "w") as f: json.dump(res, f, indent=4) - print("\n### Results saved!", file) # noqa: T201 + print("\n### Results saved!", os.path.basename(file_path)) # noqa: T201 -def load_res_json_file(*, path: str = None, prefix: str, dataset: str) -> dict: - """Util function that loads and returns results from json file with given prefix and dataset name.""" - _, file_path = aux_file_path(path=path, prefix=prefix, dataset=dataset) +def load_res_json_file(*, prefix: str, dataset: str, path: str = None) -> dict: + """Util function that loads and returns results from json file under given path with given prefix and dataset name.""" + file_path = aux_file_path(prefix=prefix, dataset=dataset, path=path) with open(file_path, "r") as f: return json.load(f) diff --git a/leakpro/tests/tests_synthetic_data_attacks/test_inference_utils.py b/leakpro/tests/tests_synthetic_data_attacks/test_inference_utils.py index 23d3a3f1..9b556354 100755 --- a/leakpro/tests/tests_synthetic_data_attacks/test_inference_utils.py +++ b/leakpro/tests/tests_synthetic_data_attacks/test_inference_utils.py @@ -7,7 +7,7 @@ import pytest import leakpro.synthetic_data_attacks.inference_utils as iu -from leakpro.synthetic_data_attacks.utils import aux_file_path +from leakpro.synthetic_data_attacks import utils as u from leakpro.tests.tests_synthetic_data_attacks.anonymeter_tests.fixtures import get_adult @@ -24,8 +24,11 @@ def test_get_inference_prefix() -> None: prefix = iu.get_inference_prefix(worst_case_flag=False) assert prefix == "inference_base_case" -def test_inference_risk_evaluation() -> None: - """Assert results for inference_risk_evaluation function for simple input case.""" +def test_inference_risk_evaluation() -> None: # noqa: PLR0915 + """Assert results for inference_risk_evaluation function for simple input case. + + Test also tests function load_inference_results. + """ #Prepare test variables ori = get_adult(return_ori=True, n_samples=10) syn = get_adult(return_ori=False, n_samples=10) @@ -84,29 +87,30 @@ def test_inference_risk_evaluation() -> None: #Case save_results_json = True dataset = "test_ir_evaluation_adults" prefix = iu.get_inference_prefix(worst_case_flag=worst_case_flag) - _, file_path = aux_file_path(prefix=prefix, dataset=dataset) - assert not os.path.exists(file_path) - inf_res = iu.inference_risk_evaluation( - dataset = dataset, - ori = ori, - syn = syn, - worst_case_flag = worst_case_flag, - n_attacks = n_attacks, - n_samples = n_samples, - save_results_json = True - ) - assert isinstance(inf_res, iu.InferenceResults) - assert os.path.exists(file_path) - #Remove results file - os.remove(file_path) - assert not os.path.exists(file_path) - -def test_load_inference_results() -> None: - """Assert results for load_inference_results function for dataset used in examples.""" - inf_res = iu.load_inference_results(dataset="adults", worst_case_flag=True) - assert isinstance(inf_res, iu.InferenceResults) - inf_res = iu.load_inference_results(dataset="adults", worst_case_flag=False) - assert isinstance(inf_res, iu.InferenceResults) + for path in [None, "/tmp"]: # noqa: S108 + file_path = u.aux_file_path(prefix=prefix, dataset=dataset, path=path) + assert not os.path.exists(file_path) + inf_res = iu.inference_risk_evaluation( + dataset = dataset, + ori = ori, + syn = syn, + worst_case_flag = worst_case_flag, + n_attacks = n_attacks, + n_samples = n_samples, + save_results_json = True, + path = path + ) + if path is None: + path = u.DEFAULT_PATH_RESULTS[:-1] + assert os.path.dirname(file_path) == path + assert isinstance(inf_res, iu.InferenceResults) + assert os.path.exists(file_path) + #Test load_inference_results + res = iu.load_inference_results(dataset=dataset, worst_case_flag=worst_case_flag, path=path) + assert isinstance(res, iu.InferenceResults) + #Remove results file + os.remove(file_path) + assert not os.path.exists(file_path) @pytest.mark.parametrize( ("cols", "k", "n"), diff --git a/leakpro/tests/tests_synthetic_data_attacks/test_linkability_utils.py b/leakpro/tests/tests_synthetic_data_attacks/test_linkability_utils.py index 5ae27dab..f0ba13d7 100755 --- a/leakpro/tests/tests_synthetic_data_attacks/test_linkability_utils.py +++ b/leakpro/tests/tests_synthetic_data_attacks/test_linkability_utils.py @@ -6,7 +6,7 @@ import pytest import leakpro.synthetic_data_attacks.linkability_utils as lu -from leakpro.synthetic_data_attacks.utils import aux_file_path +from leakpro.synthetic_data_attacks import utils as u from leakpro.tests.tests_synthetic_data_attacks.anonymeter_tests.fixtures import get_adult @@ -126,7 +126,10 @@ def test_linkability_combinations_samples() -> None: n_cols+=1 def test_linkability_risk_evaluation() -> None: - """Assert results for linkability_risk_evaluation function for simple input case.""" + """Assert results for linkability_risk_evaluation function for simple input case. + + Test also tests function load_linkability_results. + """ #Prepare test variables ori = get_adult(return_ori=True, n_samples=10) syn = get_adult(return_ori=False, n_samples=10) @@ -156,23 +159,26 @@ def test_linkability_risk_evaluation() -> None: assert full_link_res.res_cols == e_res_cols #Case save_results_json = True dataset = "test_linkability_risk_evaluation_adults" - _, file_path = aux_file_path(prefix="linkability", dataset=dataset) - assert not os.path.exists(file_path) - full_link_res = lu.linkability_risk_evaluation( - dataset = dataset, - ori = ori, - syn = syn, - n_samples = n_samples, - n_attacks = 5, - save_results_json = True - ) - assert isinstance(full_link_res, lu.LinkabilityResults) - assert os.path.exists(file_path) - #Remove results file - os.remove(file_path) - assert not os.path.exists(file_path) - -def test_load_linkability_results() -> None: - """Assert results for load_linkability_results function for dataset used in examples.""" - full_link_res = lu.load_linkability_results(dataset="adults") - assert isinstance(full_link_res, lu.LinkabilityResults) + for path in [None, "/tmp"]: # noqa: S108 + file_path = u.aux_file_path(prefix="linkability", dataset=dataset, path=path) + assert not os.path.exists(file_path) + full_link_res = lu.linkability_risk_evaluation( + dataset = dataset, + ori = ori, + syn = syn, + n_samples = n_samples, + n_attacks = 5, + save_results_json = True, + path = path + ) + if path is None: + path = u.DEFAULT_PATH_RESULTS[:-1] + assert os.path.dirname(file_path) == path + assert isinstance(full_link_res, lu.LinkabilityResults) + assert os.path.exists(file_path) + #Test load_linkability_results + res = lu.load_linkability_results(dataset=dataset, path=path) + assert isinstance(res, lu.LinkabilityResults) + #Remove results file + os.remove(file_path) + assert not os.path.exists(file_path) diff --git a/leakpro/tests/tests_synthetic_data_attacks/test_singling_out_utils.py b/leakpro/tests/tests_synthetic_data_attacks/test_singling_out_utils.py index 3b22c841..90499b98 100755 --- a/leakpro/tests/tests_synthetic_data_attacks/test_singling_out_utils.py +++ b/leakpro/tests/tests_synthetic_data_attacks/test_singling_out_utils.py @@ -5,7 +5,7 @@ import pytest import leakpro.synthetic_data_attacks.singling_out_utils as sou -from leakpro.synthetic_data_attacks.utils import aux_file_path +from leakpro.synthetic_data_attacks import utils as u from leakpro.tests.tests_synthetic_data_attacks.anonymeter_tests.fixtures import get_adult @@ -40,7 +40,10 @@ def test_get_singling_out_prefix() -> None: assert prefix == "singling_out_n_cols_3" def test_singling_out_risk_evaluation() -> None: - """Assert results for singling_out_risk_evaluation function for simple input case.""" + """Assert results for singling_out_risk_evaluation function for simple input case. + + Test also tests function load_singling_out_results. + """ #Prepare test variables ori = get_adult(return_ori=True, n_samples=10) syn = get_adult(return_ori=False, n_samples=10) @@ -106,25 +109,26 @@ def test_singling_out_risk_evaluation() -> None: #Case save_results_json = True dataset = "test_sor_evaluation_adults" prefix = sou.get_singling_out_prefix(n_cols=1) - _, file_path = aux_file_path(prefix=prefix, dataset=dataset) - assert not os.path.exists(file_path) - sin_out_res = sou.singling_out_risk_evaluation( - dataset = dataset, - ori = ori, - syn = syn, - n_cols = 1, - n_attacks = 2, - save_results_json = True - ) - assert isinstance(sin_out_res, sou.SinglingOutResults) - assert os.path.exists(file_path) - #Remove results file - os.remove(file_path) - assert not os.path.exists(file_path) - -def test_load_singling_out_results() -> None: - """Assert results for load_singling_out_results function for dataset used in examples.""" - inf_res = sou.load_singling_out_results(dataset="adults", n_cols=1) - assert isinstance(inf_res, sou.SinglingOutResults) - inf_res = sou.load_singling_out_results(dataset="adults", n_cols=None) - assert isinstance(inf_res, sou.SinglingOutResults) + for path in [None, "/tmp"]: # noqa: S108 + file_path = u.aux_file_path(prefix=prefix, dataset=dataset, path=path) + assert not os.path.exists(file_path) + sin_out_res = sou.singling_out_risk_evaluation( + dataset = dataset, + ori = ori, + syn = syn, + n_cols = 1, + n_attacks = 2, + save_results_json = True, + path = path + ) + if path is None: + path = u.DEFAULT_PATH_RESULTS[:-1] + assert os.path.dirname(file_path) == path + assert isinstance(sin_out_res, sou.SinglingOutResults) + assert os.path.exists(file_path) + #Test load_singling_out_results + res = sou.load_singling_out_results(dataset=dataset, n_cols=1, path=path) + assert isinstance(res, sou.SinglingOutResults) + #Remove results file + os.remove(file_path) + assert not os.path.exists(file_path) diff --git a/leakpro/tests/tests_synthetic_data_attacks/test_utils.py b/leakpro/tests/tests_synthetic_data_attacks/test_utils.py index 8fbab907..9238863c 100755 --- a/leakpro/tests/tests_synthetic_data_attacks/test_utils.py +++ b/leakpro/tests/tests_synthetic_data_attacks/test_utils.py @@ -8,13 +8,18 @@ def test_aux_file_path() -> None: """Assert results of aux_file_path function.""" e_file_path_pre = "/LeakPro/leakpro/synthetic_data_attacks/results/" #Case prefix=="" - file, file_path = u.aux_file_path(prefix="", dataset="test") - assert file == "res_test.json" - assert file_path[-61:] == e_file_path_pre + file + file_path = u.aux_file_path(prefix="", dataset="test") + assert file_path[-61:] == e_file_path_pre + "res_test.json" #Case prefix!="" - file, file_path = u.aux_file_path(prefix="testing", dataset="test") - assert file == "res_testing_test.json" - assert file_path[-69:] == e_file_path_pre + file + file_path = u.aux_file_path(prefix="testing", dataset="test") + assert file_path[-69:] == e_file_path_pre + "res_testing_test.json" + #Case path not None + path = "/test" + file_path = u.aux_file_path(prefix="", dataset="test", path=path) + assert file_path == path + "/" + "res_test.json" + path = "/test/" + file_path = u.aux_file_path(prefix="", dataset="test", path=path) + assert file_path == path + "res_test.json" def test_save_load_res_json_file() -> None: """Assert results of save_res_json_file and load_res_json_file functions.""" @@ -22,14 +27,22 @@ def test_save_load_res_json_file() -> None: res = {"0": 0, "1": 1} prefix = "test" dataset = "test_save_load_res_json_file" - _, file_path = u.aux_file_path(prefix=prefix, dataset=dataset) - #Test save_res_json_file - assert not os.path.exists(file_path) - u.save_res_json_file(prefix=prefix, dataset=dataset, res=res) - assert os.path.exists(file_path) - #Test load_res_json_file - new_res = u.load_res_json_file(prefix=prefix, dataset=dataset) - assert new_res == res - #Remove file - os.remove(file_path) - assert not os.path.exists(file_path) + for path in [None, "/tmp", "/tmp/"]: # noqa: S108 + file_path = u.aux_file_path(prefix=prefix, dataset=dataset, path=path) + if path is not None: + extra = "" + if path[-1] == "/": + extra = "/" + assert os.path.dirname(file_path) + extra == path + else: + assert os.path.dirname(file_path) + "/" == u.DEFAULT_PATH_RESULTS + #Test save_res_json_file + assert not os.path.exists(file_path) + u.save_res_json_file(prefix=prefix, dataset=dataset, res=res, path=path) + assert os.path.exists(file_path) + #Test load_res_json_file + new_res = u.load_res_json_file(prefix=prefix, dataset=dataset, path=path) + assert new_res == res + #Remove file + os.remove(file_path) + assert not os.path.exists(file_path)