Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Post-run summary_csv file generation #22

Merged
merged 8 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ jobs:

- name: Run pytest
timeout-minutes: 15
run: poetry run pytest -m "all_examples or metahyper"
run: poetry run pytest -m "all_examples or metahyper or summary_csv"
1 change: 1 addition & 0 deletions neps_examples/basic_usage/hyperparameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ def run_pipeline(float1, float2, categorical, integer1, integer2):
run_pipeline=run_pipeline,
pipeline_space=pipeline_space,
root_directory="results/hyperparameters_example",
post_run_summary=True,
max_evaluations_total=15,
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ line_length = 90

[tool.pytest.ini_options]
addopts = "--basetemp ./tests_tmpdir -m 'core_examples or yaml_api'"
markers = ["all_examples", "core_examples", "regression_all", "metahyper", "yaml_api"]
markers = ["all_examples", "core_examples", "regression_all", "metahyper", "yaml_api", "summary_csv"]
filterwarnings = "ignore::DeprecationWarning:torch.utils.tensorboard.*:"

[tool.mypy]
Expand Down
67 changes: 61 additions & 6 deletions src/metahyper/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,64 @@ def hp_values(self):
raise NotImplementedError


def _process_sampler_info(
serializer: YamlSerializer,
sampler_info: dict,
sampler_info_file: Path,
decision_locker: Locker,
logger=None,
):
"""
This function is called by the metahyper before sampling and training happens.
It performs checks on the optimizer's YAML data file to ensure data integrity
and applies sanity checks for potential user errors when running NePS.

The function utilizes a file-locking mechanism using the `Locker` class to ensure
thread safety, preventing potential conflicts when multiple threads access the file
simultaneously.

Parameters:
- serializer: The YAML serializer object used for loading and dumping data.
- sampler_info: The dictionary containing information for the optimizer.
- sampler_info_file: The path to the YAML file storing optimizer data if available.
- decision_locker: The Locker file to use during multi-thread communication.
- logger: An optional logger object for logging messages (default is 'neps').

Note:
- The file-locking mechanism is employed to avoid potential errors in multiple threads.
- The `Locker` class and `YamlSerializer` should be appropriately defined or imported.
- Ensure that potential side effects or dependencies are considered when using this function.
"""
if logger is None:
logger = logging.getLogger("neps")

should_break = False
while not should_break:
if decision_locker.acquire_lock():
try:
if sampler_info_file.exists():
optimizer_data = serializer.load(sampler_info_file)
excluded_key = "searcher_name"
sampler_info_copy = sampler_info.copy()
optimizer_data_copy = optimizer_data.copy()
sampler_info_copy.pop(excluded_key, None)
optimizer_data_copy.pop(excluded_key, None)

if sampler_info_copy != optimizer_data_copy:
raise ValueError(
f"The sampler_info in the file {sampler_info_file} is not valid. "
f"Expected: {sampler_info_copy}, Found: {optimizer_data_copy}"
)
else:
# If the file is empty or doesn't exist, write the sampler_info
serializer.dump(sampler_info, sampler_info_file, sort_keys=False)
except Exception as e:
raise RuntimeError(f"Error during data saving: {e}") from e
finally:
decision_locker.release_lock()
should_break = True


def _load_sampled_paths(optimization_dir: Path | str, serializer, logger):
optimization_dir = Path(optimization_dir)
base_result_directory = optimization_dir / "results"
Expand Down Expand Up @@ -146,7 +204,6 @@ def _load_sampled_paths(optimization_dir: Path | str, serializer, logger):
shutil.rmtree(str(config_dir))
except Exception: # The worker doesn't need to crash for this
logger.exception(f"Can't delete {config_dir}")

return previous_paths, pending_paths


Expand Down Expand Up @@ -214,7 +271,6 @@ def _check_max_evaluations(
continue_until_max_evaluation_completed,
):
logger.debug("Checking if max evaluations is reached")

previous_results, pending_configs, pending_configs_free = read(
optimization_dir, serializer, logger
)
Expand Down Expand Up @@ -395,10 +451,9 @@ def run(
decision_lock_file.touch(exist_ok=True)
decision_locker = Locker(decision_lock_file, logger.getChild("_locker"))

# Check if the directory already exists
if not Path(sampler_info_file).exists():
# Write the sampler_info to a YAML file
serializer.dump(sampler_info, sampler_info_file, sort_keys=False)
_process_sampler_info(
serializer, sampler_info, sampler_info_file, decision_locker, logger
)

evaluations_in_this_run = 0
while True:
Expand Down
23 changes: 15 additions & 8 deletions src/neps/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .plot.tensorboard_eval import tblogger
from .search_spaces.parameter import Parameter
from .search_spaces.search_space import SearchSpace, pipeline_space_from_configspace
from .status.status import post_run_csv
from .utils.common import get_searcher_data
from .utils.result_utils import get_loss

Expand All @@ -36,7 +37,7 @@ def _post_evaluation_hook(
working_directory = Path(config_working_directory, "../../")
loss = get_loss(result, loss_value_on_error, ignore_errors)

# 1. write all configs and losses
# 1. Write all configs and losses
all_configs_losses = Path(working_directory, "all_losses_and_configs.txt")

def write_loss_and_config(file_handle, loss_, config_id_, config_):
Expand All @@ -48,7 +49,7 @@ def write_loss_and_config(file_handle, loss_, config_id_, config_):
with all_configs_losses.open("a", encoding="utf-8") as f:
write_loss_and_config(f, loss, config_id, config)

# No need to handle best loss cases if an error occurred
# no need to handle best loss cases if an error occurred
if result == "error":
return

Expand All @@ -58,7 +59,7 @@ def write_loss_and_config(file_handle, loss_, config_id_, config_):
logger.info(f"Finished evaluating config {config_id}")
return

# 2. Write best losses / configs
# 2. Write best losses/configs
best_loss_trajectory_file = Path(working_directory, "best_loss_trajectory.txt")
best_loss_config_trajectory_file = Path(
working_directory, "best_loss_with_config_trajectory.txt"
Expand Down Expand Up @@ -97,6 +98,7 @@ def run(
pipeline_space: dict[str, Parameter | CS.ConfigurationSpace] | CS.ConfigurationSpace,
root_directory: str | Path,
overwrite_working_directory: bool = False,
post_run_summary: bool = False,
development_stage_id=None,
task_id=None,
max_evaluations_total: int | None = None,
Expand Down Expand Up @@ -124,9 +126,9 @@ def run(
"""Run a neural pipeline search.

To parallelize:
In order to run a neural pipeline search with multiple processes or machines,
To run a neural pipeline search with multiple processes or machines,
simply call run(.) multiple times (optionally on different machines). Make sure
that root_directory points to the same folder on the same filesystem, otherwise
that root_directory points to the same folder on the same filesystem, otherwise,
the multiple calls to run(.) will be independent.

Args:
Expand All @@ -136,6 +138,8 @@ def run(
synchronize multiple calls to run(.) for parallelization.
overwrite_working_directory: If true, delete the working directory at the start of
the run. This is, e.g., useful when debugging a run_pipeline function.
post_run_summary: If True, creates a csv file after each worker is done,
holding summary information about the configs and results.
development_stage_id: ID for the current development stage. Only needed if
you work with multiple development stages.
task_id: ID for the current task. Only needed if you work with multiple
Expand Down Expand Up @@ -238,10 +242,10 @@ def run(
"searcher_args_user_modified": False,
}

# Check to verify if the target directory contains history of another optimizer state
# Check to verify if the target directory contains the history of another optimizer state
# This check is performed only when the `searcher` is built during the run
if isinstance(searcher, BaseOptimizer):
# This check is not strict when a user-defined nep.optimizer is provided
# This check is not strict when a user-defined neps.optimizer is provided
logger.warn(
"An instantiated optimizer is provided. The safety checks of NePS will be "
"skipped. Accurate continuation of runs can no longer be guaranteed!"
Expand All @@ -265,7 +269,7 @@ def run(
else:
# No searcher argument updates when NePS decides the searcher.
logger.info(35 * "=" + "WARNING" + 35 * "=")
logger.info("CHANGINE ARGUMENTS ONLY WORKS WHEN SEARCHER IS DEFINED")
logger.info("CHANGING ARGUMENTS ONLY WORK WHEN SEARCHER IS DEFINED")
logger.info(
f"The searcher argument '{key}' will not change to '{value}'"
f" because NePS chose the searcher"
Expand All @@ -292,6 +296,9 @@ def run(
pre_load_hooks=pre_load_hooks,
)

if post_run_csv:
post_run_csv(root_directory, logger)


def _run_args(
pipeline_space: dict[str, Parameter | CS.ConfigurationSpace] | CS.ConfigurationSpace,
Expand Down
Loading