diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 0572482a..95617323 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -41,4 +41,4 @@ jobs: - name: Run pytest timeout-minutes: 15 - run: poetry run pytest -m "all_examples or metahyper" + run: poetry run pytest -m "all_examples or metahyper or summary_csv" diff --git a/neps_examples/basic_usage/hyperparameters.py b/neps_examples/basic_usage/hyperparameters.py index 338be1ab..ad7a344b 100644 --- a/neps_examples/basic_usage/hyperparameters.py +++ b/neps_examples/basic_usage/hyperparameters.py @@ -25,5 +25,6 @@ def run_pipeline(float1, float2, categorical, integer1, integer2): run_pipeline=run_pipeline, pipeline_space=pipeline_space, root_directory="results/hyperparameters_example", + post_run_summary=True, max_evaluations_total=15, ) diff --git a/pyproject.toml b/pyproject.toml index f78987cc..19c4336f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,7 +96,7 @@ line_length = 90 [tool.pytest.ini_options] addopts = "--basetemp ./tests_tmpdir -m 'core_examples or yaml_api'" -markers = ["all_examples", "core_examples", "regression_all", "metahyper", "yaml_api"] +markers = ["all_examples", "core_examples", "regression_all", "metahyper", "yaml_api", "summary_csv"] filterwarnings = "ignore::DeprecationWarning:torch.utils.tensorboard.*:" [tool.mypy] diff --git a/src/metahyper/api.py b/src/metahyper/api.py index 2d8acde8..abcd974e 100644 --- a/src/metahyper/api.py +++ b/src/metahyper/api.py @@ -108,6 +108,64 @@ def hp_values(self): raise NotImplementedError +def _process_sampler_info( + serializer: YamlSerializer, + sampler_info: dict, + sampler_info_file: Path, + decision_locker: Locker, + logger=None, +): + """ + This function is called by the metahyper before sampling and training happens. + It performs checks on the optimizer's YAML data file to ensure data integrity + and applies sanity checks for potential user errors when running NePS. + + The function utilizes a file-locking mechanism using the `Locker` class to ensure + thread safety, preventing potential conflicts when multiple threads access the file + simultaneously. + + Parameters: + - serializer: The YAML serializer object used for loading and dumping data. + - sampler_info: The dictionary containing information for the optimizer. + - sampler_info_file: The path to the YAML file storing optimizer data if available. + - decision_locker: The Locker file to use during multi-thread communication. + - logger: An optional logger object for logging messages (default is 'neps'). + + Note: + - The file-locking mechanism is employed to avoid potential errors in multiple threads. + - The `Locker` class and `YamlSerializer` should be appropriately defined or imported. + - Ensure that potential side effects or dependencies are considered when using this function. + """ + if logger is None: + logger = logging.getLogger("neps") + + should_break = False + while not should_break: + if decision_locker.acquire_lock(): + try: + if sampler_info_file.exists(): + optimizer_data = serializer.load(sampler_info_file) + excluded_key = "searcher_name" + sampler_info_copy = sampler_info.copy() + optimizer_data_copy = optimizer_data.copy() + sampler_info_copy.pop(excluded_key, None) + optimizer_data_copy.pop(excluded_key, None) + + if sampler_info_copy != optimizer_data_copy: + raise ValueError( + f"The sampler_info in the file {sampler_info_file} is not valid. " + f"Expected: {sampler_info_copy}, Found: {optimizer_data_copy}" + ) + else: + # If the file is empty or doesn't exist, write the sampler_info + serializer.dump(sampler_info, sampler_info_file, sort_keys=False) + except Exception as e: + raise RuntimeError(f"Error during data saving: {e}") from e + finally: + decision_locker.release_lock() + should_break = True + + def _load_sampled_paths(optimization_dir: Path | str, serializer, logger): optimization_dir = Path(optimization_dir) base_result_directory = optimization_dir / "results" @@ -146,7 +204,6 @@ def _load_sampled_paths(optimization_dir: Path | str, serializer, logger): shutil.rmtree(str(config_dir)) except Exception: # The worker doesn't need to crash for this logger.exception(f"Can't delete {config_dir}") - return previous_paths, pending_paths @@ -214,7 +271,6 @@ def _check_max_evaluations( continue_until_max_evaluation_completed, ): logger.debug("Checking if max evaluations is reached") - previous_results, pending_configs, pending_configs_free = read( optimization_dir, serializer, logger ) @@ -395,10 +451,9 @@ def run( decision_lock_file.touch(exist_ok=True) decision_locker = Locker(decision_lock_file, logger.getChild("_locker")) - # Check if the directory already exists - if not Path(sampler_info_file).exists(): - # Write the sampler_info to a YAML file - serializer.dump(sampler_info, sampler_info_file, sort_keys=False) + _process_sampler_info( + serializer, sampler_info, sampler_info_file, decision_locker, logger + ) evaluations_in_this_run = 0 while True: diff --git a/src/neps/api.py b/src/neps/api.py index 499527a0..24efe7e8 100644 --- a/src/neps/api.py +++ b/src/neps/api.py @@ -17,6 +17,7 @@ from .plot.tensorboard_eval import tblogger from .search_spaces.parameter import Parameter from .search_spaces.search_space import SearchSpace, pipeline_space_from_configspace +from .status.status import post_run_csv from .utils.common import get_searcher_data from .utils.result_utils import get_loss @@ -36,7 +37,7 @@ def _post_evaluation_hook( working_directory = Path(config_working_directory, "../../") loss = get_loss(result, loss_value_on_error, ignore_errors) - # 1. write all configs and losses + # 1. Write all configs and losses all_configs_losses = Path(working_directory, "all_losses_and_configs.txt") def write_loss_and_config(file_handle, loss_, config_id_, config_): @@ -48,7 +49,7 @@ def write_loss_and_config(file_handle, loss_, config_id_, config_): with all_configs_losses.open("a", encoding="utf-8") as f: write_loss_and_config(f, loss, config_id, config) - # No need to handle best loss cases if an error occurred + # no need to handle best loss cases if an error occurred if result == "error": return @@ -58,7 +59,7 @@ def write_loss_and_config(file_handle, loss_, config_id_, config_): logger.info(f"Finished evaluating config {config_id}") return - # 2. Write best losses / configs + # 2. Write best losses/configs best_loss_trajectory_file = Path(working_directory, "best_loss_trajectory.txt") best_loss_config_trajectory_file = Path( working_directory, "best_loss_with_config_trajectory.txt" @@ -97,6 +98,7 @@ def run( pipeline_space: dict[str, Parameter | CS.ConfigurationSpace] | CS.ConfigurationSpace, root_directory: str | Path, overwrite_working_directory: bool = False, + post_run_summary: bool = False, development_stage_id=None, task_id=None, max_evaluations_total: int | None = None, @@ -124,9 +126,9 @@ def run( """Run a neural pipeline search. To parallelize: - In order to run a neural pipeline search with multiple processes or machines, + To run a neural pipeline search with multiple processes or machines, simply call run(.) multiple times (optionally on different machines). Make sure - that root_directory points to the same folder on the same filesystem, otherwise + that root_directory points to the same folder on the same filesystem, otherwise, the multiple calls to run(.) will be independent. Args: @@ -136,6 +138,8 @@ def run( synchronize multiple calls to run(.) for parallelization. overwrite_working_directory: If true, delete the working directory at the start of the run. This is, e.g., useful when debugging a run_pipeline function. + post_run_summary: If True, creates a csv file after each worker is done, + holding summary information about the configs and results. development_stage_id: ID for the current development stage. Only needed if you work with multiple development stages. task_id: ID for the current task. Only needed if you work with multiple @@ -238,10 +242,10 @@ def run( "searcher_args_user_modified": False, } - # Check to verify if the target directory contains history of another optimizer state + # Check to verify if the target directory contains the history of another optimizer state # This check is performed only when the `searcher` is built during the run if isinstance(searcher, BaseOptimizer): - # This check is not strict when a user-defined nep.optimizer is provided + # This check is not strict when a user-defined neps.optimizer is provided logger.warn( "An instantiated optimizer is provided. The safety checks of NePS will be " "skipped. Accurate continuation of runs can no longer be guaranteed!" @@ -265,7 +269,7 @@ def run( else: # No searcher argument updates when NePS decides the searcher. logger.info(35 * "=" + "WARNING" + 35 * "=") - logger.info("CHANGINE ARGUMENTS ONLY WORKS WHEN SEARCHER IS DEFINED") + logger.info("CHANGING ARGUMENTS ONLY WORK WHEN SEARCHER IS DEFINED") logger.info( f"The searcher argument '{key}' will not change to '{value}'" f" because NePS chose the searcher" @@ -292,6 +296,9 @@ def run( pre_load_hooks=pre_load_hooks, ) + if post_run_csv: + post_run_csv(root_directory, logger) + def _run_args( pipeline_space: dict[str, Parameter | CS.ConfigurationSpace] | CS.ConfigurationSpace, diff --git a/src/neps/status/status.py b/src/neps/status/status.py index 28103c4e..90832427 100644 --- a/src/neps/status/status.py +++ b/src/neps/status/status.py @@ -4,7 +4,10 @@ from pathlib import Path from typing import Any +import pandas as pd + from metahyper import read +from metahyper._locker import Locker from metahyper.api import ConfigResult from ..search_spaces.search_space import SearchSpace @@ -14,7 +17,7 @@ def get_summary_dict( root_directory: str | Path, add_details: bool = False ) -> dict[str, Any]: - """Create dict that summarizes a run. + """Create a dict that summarizes a run. Args: root_directory: The root directory given to neps.run. @@ -121,3 +124,218 @@ def status( print(all_loss_config.read_text(encoding="utf-8")) return summary["previous_results"], summary["pending_configs"] + + +def _initiate_summary_csv( + root_directory: str | Path, + logger: logging.Logger, +) -> tuple[Path, Path, Locker]: + """ + Initializes a summary CSV and an associated locker for file access control. + + Args: + root_directory (str | Path): The root directory where the summary CSV directory, + containing CSV files and a locker for file access control, will be created. + logger (logging.Logger): A logger for log messages. + + Returns: + Tuple[Path, Path, Locker]: A tuple containing the file paths for the + configuration data CSV, run data CSV, and a locker file. + + The locker is used for file access control to ensure data integrity in a + multi-threaded or multi-process environment. + """ + root_directory = Path(root_directory) + summary_csv_directory = Path(root_directory / "summary_csv") + summary_csv_directory.mkdir(parents=True, exist_ok=True) + + csv_config_data = summary_csv_directory / "config_data.csv" + csv_run_data = summary_csv_directory / "run_status.csv" + + csv_lock_file = summary_csv_directory / ".csv_lock" + csv_lock_file.touch(exist_ok=True) + csv_locker = Locker(csv_lock_file, logger.getChild("_locker")) + + return ( + csv_config_data, + csv_run_data, + csv_locker, + ) + + +def _get_dataframes_from_summary( + root_directory: str | Path, + include_metadatas: bool = True, + include_results: bool = True, + include_configs: bool = True, +) -> tuple[pd.DataFrame, pd.DataFrame]: + """ + Generate Pandas DataFrames from summary data retrieved from a run. + + Args: + root_directory (str | Path): The root directory of the NePS run. + include_metadatas (bool): Include metadata in the DataFrames (Default: True). + include_results (bool): Include results in the DataFrames (Default: True). + include_configs (bool): Include configurations in the DataFrames (Default: True). + + Returns: + Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame] + """ + indices_prev = [] + config_data_prev = [] + result_data_prev = [] + metadata_data_prev = [] + + indices_pending = [] + config_data_pending = [] + + summary = get_summary_dict(root_directory=root_directory, add_details=True) + + for key_prev, config_result_prev in summary["previous_results"].items(): + indices_prev.append(str(key_prev)) + if include_configs: + config_data_prev.append(config_result_prev.config) + if include_results: + result_data_prev.append(config_result_prev.result) + if include_metadatas: + metadata_data_prev.append(config_result_prev.metadata) + + for key_pending, config_pending in summary["pending_configs"].items(): + indices_pending.append(str(key_pending)) + if include_configs: + config_data_pending.append(config_pending) + + # Creating the dataframe for previous config results. + df_previous = pd.DataFrame({"Config_id": indices_prev}) + df_previous["Status"] = "Complete" + df_previous = pd.concat( + [df_previous, pd.json_normalize(config_data_prev).add_prefix("config.")], axis=1 + ) + df_previous = pd.concat( + [df_previous, pd.json_normalize(metadata_data_prev).add_prefix("metadata.")], + axis=1, + ) + df_previous = pd.concat( + [df_previous, pd.json_normalize(result_data_prev).add_prefix("result.")], axis=1 + ) + + # Creating dataframe for pending configs. + df_pending = pd.DataFrame({"Config_id": indices_pending}) + df_pending["Status"] = "Pending" + df_pending = pd.concat( + [df_pending, pd.json_normalize(config_data_pending).add_prefix("config.")], + axis=1, + ) + + # Concatenate the two DataFrames + df_config_data = pd.concat([df_previous, df_pending], join="outer", ignore_index=True) + + # Create a dataframe with the specified additional summary data + additional_data = { + "num_evaluated_configs": summary["num_evaluated_configs"], + "num_pending_configs": summary["num_pending_configs"], + "num_pending_configs_with_worker": summary["num_pending_configs_with_worker"], + "best_loss": summary["best_loss"], + "best_config_id": summary["best_config_id"], + "num_error": summary["num_error"], + } + + df_run_data = pd.DataFrame.from_dict( + additional_data, orient="index", columns=["value"] + ) + df_run_data.index.name = "description" + + return df_config_data, df_run_data + + +def _save_data_to_csv( + config_data_file_path: Path, + run_data_file_path: Path, + locker: Locker, + config_data_df: pd.DataFrame, + run_data_df: pd.DataFrame, +) -> None: + """ + Save data as a CSV while acquiring a lock for data integrity. + + Args: + config_data_file_path (Path | str): The path to the CSV file for configuration data. + run_data_file_path (Path | str): The path to the CSV file for additional run data. + locker (Locker): An object for acquiring and releasing a lock to ensure data integrity. + config_data_df (pd.DataFrame): The DataFrame containing configuration data. + run_data_df (pd.DataFrame): The DataFrame containing additional run data. + + This function saves data to CSV files while acquiring a lock to prevent concurrent writes. + If the lock is acquired, it writes the data to the CSV files and releases the lock. + """ + should_break = False + while not should_break: + if locker.acquire_lock(): + try: + pending_configs = run_data_df.loc["num_pending_configs", "value"] + pending_configs_with_worker = run_data_df.loc[ + "num_pending_configs_with_worker", "value" + ] + # Represents the last worker + if int(pending_configs) == 0 and int(pending_configs_with_worker) == 0: + config_data_df = config_data_df.sort_values( + by="result.loss", ascending=True + ) + config_data_df.to_csv(config_data_file_path, index=False, mode="w") + run_data_df.to_csv(run_data_file_path, index=True, mode="w") + + if run_data_file_path.exists(): + prev_run_data_df = pd.read_csv(run_data_file_path) + prev_run_data_df.set_index("description", inplace=True) + + num_evaluated_configs_csv = prev_run_data_df.loc[ + "num_evaluated_configs", "value" + ] + num_evaluated_configs_run = run_data_df.loc[ + run_data_df.index == "num_evaluated_configs", "value" + ] + # checks if the current worker has more evaluated configs than the previous + if int(num_evaluated_configs_csv) < int(num_evaluated_configs_run): + config_data_df = config_data_df.sort_values( + by="result.loss", ascending=True + ) + config_data_df.to_csv( + config_data_file_path, index=False, mode="w" + ) + run_data_df.to_csv(run_data_file_path, index=True, mode="w") + # Represents the first worker to be evaluated + else: + config_data_df = config_data_df.sort_values( + by="result.loss", ascending=True + ) + config_data_df.to_csv(config_data_file_path, index=False, mode="w") + run_data_df.to_csv(run_data_file_path, index=True, mode="w") + except Exception as e: + raise RuntimeError(f"Error during data saving: {e}") from e + finally: + locker.release_lock() + should_break = True + + +def post_run_csv(root_directory: str | Path, logger=None) -> None: + if logger is None: + logger = logging.getLogger("neps_status") + + csv_config_data, csv_rundata, csv_locker = _initiate_summary_csv( + root_directory, logger=logger + ) + + df_config_data, df_run_data = _get_dataframes_from_summary( + root_directory, + include_metadatas=True, + include_results=True, + include_configs=True, + ) + + _save_data_to_csv( + csv_config_data, + csv_rundata, + csv_locker, + df_config_data, + df_run_data, + ) diff --git a/tests/test_metahyper/test_locking.py b/tests/test_metahyper/test_locking.py index e397ec47..2cd09499 100644 --- a/tests/test_metahyper/test_locking.py +++ b/tests/test_metahyper/test_locking.py @@ -3,10 +3,25 @@ import subprocess from pathlib import Path +import pandas as pd import pytest from more_itertools import first_true +def launch_example_processes(n_workers: int=3) -> list: + processes = [] + for _ in range(n_workers): + processes.append( + subprocess.Popen( # pylint: disable=consider-using-with + "python -m neps_examples.basic_usage.hyperparameters && python -m neps_examples.basic_usage.analyse", + stdout=subprocess.PIPE, + shell=True, + text=True, + ) + ) + return processes + + @pytest.mark.metahyper def test_filelock() -> None: """Test that the filelocking method of parallelization works as intended.""" @@ -24,23 +39,9 @@ def test_filelock() -> None: results_dir = Path("results") / "hyperparameters_example" / "results" try: assert not results_dir.exists() - - # Launch both processes - p1 = subprocess.Popen( # pylint: disable=consider-using-with - "python -m neps_examples.basic_usage.hyperparameters && python -m neps_examples.basic_usage.analyse", - stdout=subprocess.PIPE, - shell=True, - text=True, - ) - p2 = subprocess.Popen( # pylint: disable=consider-using-with - "python -m neps_examples.basic_usage.hyperparameters && python -m neps_examples.basic_usage.analyse", - stdout=subprocess.PIPE, - shell=True, - text=True, - ) - # Wait for them - for p in (p1, p2): + p_list = launch_example_processes(n_workers=2) + for p in p_list: p.wait() out, _ = p.communicate() lines = out.splitlines() @@ -69,4 +70,29 @@ def test_filelock() -> None: raise e finally: if results_dir.exists(): - shutil.rmtree(results_dir) + shutil.rmtree(results_dir.parent) + + +@pytest.mark.summary_csv +def test_summary_csv(): + # Testing the csv files output. + summary_dir = Path("results") / "hyperparameters_example" / "summary_csv" + try: + if not summary_dir.exists(): + p_list = launch_example_processes(n_workers=2) + for p in p_list: + p.wait() + assert summary_dir.is_dir() + run_data_df = pd.read_csv(summary_dir / "run_status.csv") + run_data_df.set_index("description", inplace=True) + num_evaluated_configs_csv = run_data_df.loc["num_evaluated_configs", "value"] + assert num_evaluated_configs_csv == 15 + + config_data_df = pd.read_csv(summary_dir / "config_data.csv") + assert config_data_df.shape[0] == 15 + assert (config_data_df["Status"] == "Complete").all() + except Exception as e: + raise e + finally: + if summary_dir.exists(): + shutil.rmtree(summary_dir.parent)