Skip to content

[feat] Support statistics print by adding results manager object #334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 72 additions & 26 deletions autoPyTorch/api/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from smac.stats.stats import Stats
from smac.tae import StatusType

from autoPyTorch.api.results_manager import ResultsManager, SearchResults
from autoPyTorch.automl_common.common.utils.backend import Backend, create
from autoPyTorch.constants import (
REGRESSION_TASKS,
Expand Down Expand Up @@ -192,12 +193,13 @@ def __init__(
self.search_space: Optional[ConfigurationSpace] = None
self._dataset_requirements: Optional[List[FitRequirement]] = None
self._metric: Optional[autoPyTorchMetric] = None
self._scoring_functions: Optional[List[autoPyTorchMetric]] = None
self._logger: Optional[PicklableClientLogger] = None
self.run_history: RunHistory = RunHistory()
self.trajectory: Optional[List] = None
self.dataset_name: Optional[str] = None
self.cv_models_: Dict = {}

self._results_manager = ResultsManager()

# By default try to use the TCP logging port or get a new port
self._logger_port = logging.handlers.DEFAULT_TCP_LOGGING_PORT

Expand Down Expand Up @@ -240,6 +242,18 @@ def build_pipeline(self, dataset_properties: Dict[str, Any]) -> BasePipeline:
"""
raise NotImplementedError

@property
def run_history(self) -> RunHistory:
return self._results_manager.run_history

@property
def ensemble_performance_history(self) -> List[Dict[str, Any]]:
return self._results_manager.ensemble_performance_history

@property
def trajectory(self) -> Optional[List]:
return self._results_manager.trajectory

def set_pipeline_config(self, **pipeline_config_kwargs: Any) -> None:
"""
Check whether arguments are valid and
Expand Down Expand Up @@ -883,6 +897,12 @@ def _search(

self.pipeline_options['optimize_metric'] = optimize_metric

if all_supported_metrics:
self._scoring_functions = get_metrics(dataset_properties=dataset_properties,
all_supported_metrics=True)
else:
self._scoring_functions = [self._metric]

self.search_space = self.get_search_space(dataset)

# Incorporate budget to pipeline config
Expand Down Expand Up @@ -1037,12 +1057,14 @@ def _search(
pynisher_context=self._multiprocessing_context,
)
try:
run_history, self.trajectory, budget_type = \
run_history, self._results_manager.trajectory, budget_type = \
_proc_smac.run_smbo(func=tae_func)
self.run_history.update(run_history, DataOrigin.INTERNAL)
trajectory_filename = os.path.join(
self._backend.get_smac_output_directory_for_run(self.seed),
'trajectory.json')

assert self.trajectory is not None # mypy check
saveable_trajectory = \
[list(entry[:2]) + [entry[2].get_dictionary()] + list(entry[3:])
for entry in self.trajectory]
Expand All @@ -1059,7 +1081,7 @@ def _search(
self._logger.info("Starting Shutdown")

if proc_ensemble is not None:
self.ensemble_performance_history = list(proc_ensemble.history)
self._results_manager.ensemble_performance_history = list(proc_ensemble.history)

if len(proc_ensemble.futures) > 0:
# Also add ensemble runs that did not finish within smac time
Expand All @@ -1068,7 +1090,7 @@ def _search(
result = proc_ensemble.futures.pop().result()
if result:
ensemble_history, _, _, _ = result
self.ensemble_performance_history.extend(ensemble_history)
self._results_manager.ensemble_performance_history.extend(ensemble_history)
self._logger.info("Ensemble script finished, continue shutdown.")

# save the ensemble performance history file
Expand Down Expand Up @@ -1356,28 +1378,12 @@ def get_incumbent_results(
The incumbent configuration
Dict[str, Union[int, str, float]]:
Additional information about the run of the incumbent configuration.

"""
assert self.run_history is not None, "No Run History found, search has not been called."
if self.run_history.empty():
raise ValueError("Run History is empty. Something went wrong, "
"smac was not able to fit any model?")

run_history_data = self.run_history.data
if not include_traditional:
# traditional classifiers have trainer_configuration in their additional info
run_history_data = dict(
filter(lambda elem: elem[1].status == StatusType.SUCCESS and elem[1].
additional_info is not None and elem[1].
additional_info['configuration_origin'] != 'traditional',
run_history_data.items()))
run_history_data = dict(
filter(lambda elem: 'SUCCESS' in str(elem[1].status), run_history_data.items()))
sorted_runvalue_by_cost = sorted(run_history_data.items(), key=lambda item: item[1].cost)
incumbent_run_key, incumbent_run_value = sorted_runvalue_by_cost[0]
incumbent_config = self.run_history.ids_config[incumbent_run_key.config_id]
incumbent_results = incumbent_run_value.additional_info
return incumbent_config, incumbent_results

if self._metric is None:
raise RuntimeError("`search_results` is only available after a search has finished.")

return self._results_manager.get_incumbent_results(metric=self._metric, include_traditional=include_traditional)

def get_models_with_weights(self) -> List:
if self.models_ is None or len(self.models_) == 0 or \
Expand Down Expand Up @@ -1417,3 +1423,43 @@ def _print_debug_info_to_log(self) -> None:
self._logger.debug(' multiprocessing_context: %s', str(self._multiprocessing_context))
for key, value in vars(self).items():
self._logger.debug(f"\t{key}->{value}")

def get_search_results(self) -> SearchResults:
"""
Get the interface to obtain the search results easily.
"""
if self._scoring_functions is None or self._metric is None:
raise RuntimeError("`search_results` is only available after a search has finished.")

return self._results_manager.get_search_results(
metric=self._metric,
scoring_functions=self._scoring_functions
)

def sprint_statistics(self) -> str:
"""
Prints statistics about the SMAC search.

These statistics include:

1. Optimisation Metric
2. Best Optimisation score achieved by individual pipelines
3. Total number of target algorithm runs
4. Total number of successful target algorithm runs
5. Total number of crashed target algorithm runs
6. Total number of target algorithm runs that exceeded the time limit
7. Total number of successful target algorithm runs that exceeded the memory limit

Returns:
(str):
Formatted string with statistics
"""
if self._scoring_functions is None or self._metric is None:
raise RuntimeError("`search_results` is only available after a search has finished.")

assert self.dataset_name is not None # my check
return self._results_manager.sprint_statistics(
dataset_name=self.dataset_name,
scoring_functions=self._scoring_functions,
metric=self._metric
)
Loading