diff --git a/src/amltk/_richutil/__init__.py b/src/amltk/_richutil/__init__.py index 50e6a507..ae4bf870 100644 --- a/src/amltk/_richutil/__init__.py +++ b/src/amltk/_richutil/__init__.py @@ -1,6 +1,6 @@ from amltk._richutil.renderable import RichRenderable from amltk._richutil.renderers import Function, rich_make_column_selector -from amltk._richutil.util import df_to_table, richify +from amltk._richutil.util import df_to_table, is_jupyter, richify __all__ = [ "df_to_table", @@ -8,4 +8,5 @@ "RichRenderable", "Function", "rich_make_column_selector", + "is_jupyter", ] diff --git a/src/amltk/_richutil/util.py b/src/amltk/_richutil/util.py index 77d1cbee..5f8a0668 100644 --- a/src/amltk/_richutil/util.py +++ b/src/amltk/_richutil/util.py @@ -3,6 +3,7 @@ # where rich not being installed. from __future__ import annotations +import os from concurrent.futures import ProcessPoolExecutor from typing import TYPE_CHECKING, Any @@ -70,3 +71,25 @@ def df_to_table( table.add_row(str(index), *[str(cell) for cell in row]) return table + + +def is_jupyter() -> bool: + """Return True if running in a Jupyter environment.""" + # https://github.com/Textualize/rich/blob/fd981823644ccf50d685ac9c0cfe8e1e56c9dd35/rich/console.py#L518-L535 + try: + get_ipython # type: ignore[name-defined] # noqa: B018 + except NameError: + return False + ipython = get_ipython() # type: ignore[name-defined] # noqa: F821 + shell = ipython.__class__.__name__ + if ( + "google.colab" in str(ipython.__class__) + or os.getenv("DATABRICKS_RUNTIME_VERSION") + or shell == "ZMQInteractiveShell" + ): + return True # Jupyter notebook or qtconsole + + if shell == "TerminalInteractiveShell": + return False # Terminal running IPython + + return False # Other type (?) diff --git a/src/amltk/_util.py b/src/amltk/_util.py new file mode 100644 index 00000000..9da6d2ac --- /dev/null +++ b/src/amltk/_util.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from typing import Any + + +def threadpoolctl_heuristic(item_contained_in_node: Any | None) -> bool: + """Heuristic to determine if we should automatically set threadpoolctl. + + This is done by detecting if it's a scikit-learn `BaseEstimator` but this may + be extended in the future. + + !!! tip + + The reason to have this heuristic is that when running scikit-learn, or any + multithreaded model, in parallel, they will over subscribe to threads. This + causes a significant performance hit as most of the time is spent switching + thread contexts instead of work. This can be particularly bad for HPO where + we are evaluating multiple models in parallel on the same system. + + The recommened thread count is 1 per core with no additional information to + act upon. + + !!! todo + + This is potentially not an issue if running on multiple nodes of some cluster, + as they do not share logical cores and hence do not clash. + + Args: + item_contained_in_node: The item with which to base the heuristic on. + + Returns: + Whether we should automatically set threadpoolctl. + """ + if item_contained_in_node is None or not isinstance(item_contained_in_node, type): + return False + + try: + # NOTE: sklearn depends on threadpoolctl so it will be installed. + from sklearn.base import BaseEstimator + + return issubclass(item_contained_in_node, BaseEstimator) + except ImportError: + return False diff --git a/src/amltk/evalutors/__init__.py b/src/amltk/evalutors/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/amltk/evalutors/evaluation_protocol.py b/src/amltk/evalutors/evaluation_protocol.py new file mode 100644 index 00000000..c95c3e14 --- /dev/null +++ b/src/amltk/evalutors/evaluation_protocol.py @@ -0,0 +1,59 @@ +"""Evaluation protocols for how a trial and a pipeline should be evaluated. + +TODO: Sorry +""" +from __future__ import annotations + +from collections.abc import Callable, Iterable +from typing import TYPE_CHECKING + +from amltk.scheduling import Plugin + +if TYPE_CHECKING: + from amltk.optimization import Trial + from amltk.pipeline import Node + from amltk.scheduling import Scheduler, Task + + +class EvaluationProtocol: + """A protocol for how a trial should be evaluated on a pipeline.""" + + fn: Callable[[Trial, Node], Trial.Report] + + def task( + self, + scheduler: Scheduler, + plugins: Plugin | Iterable[Plugin] | None = None, + ) -> Task[[Trial, Node], Trial.Report]: + """Create a task for this protocol. + + Args: + scheduler: The scheduler to use for the task. + plugins: The plugins to use for the task. + + Returns: + The created task. + """ + _plugins: tuple[Plugin, ...] + match plugins: + case None: + _plugins = () + case Plugin(): + _plugins = (plugins,) + case Iterable(): + _plugins = tuple(plugins) + + return scheduler.task(self.fn, plugins=_plugins) + + +class CustomProtocol(EvaluationProtocol): + """A custom evaluation protocol based on a user function.""" + + def __init__(self, fn: Callable[[Trial, Node], Trial.Report]) -> None: + """Initialize the protocol. + + Args: + fn: The function to use for the evaluation. + """ + super().__init__() + self.fn = fn diff --git a/src/amltk/optimization/history.py b/src/amltk/optimization/history.py index a598e407..0a4f6b17 100644 --- a/src/amltk/optimization/history.py +++ b/src/amltk/optimization/history.py @@ -65,7 +65,7 @@ def target_function(trial: Trial) -> Trial.Report: from collections import defaultdict from collections.abc import Callable, Hashable, Iterable, Iterator from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Literal, TypeVar +from typing import TYPE_CHECKING, Literal, TypeVar, overload from typing_extensions import override import pandas as pd @@ -527,7 +527,14 @@ def sortby( return sorted(history.reports, key=sort_key, reverse=reverse) - @override + @overload + def __getitem__(self, key: int | str) -> Trial.Report: + ... + + @overload + def __getitem__(self, key: slice) -> Trial.Report: + ... + def __getitem__( # type: ignore self, key: int | str | slice, diff --git a/src/amltk/optimization/optimizer.py b/src/amltk/optimization/optimizer.py index 795c4238..eb4bca78 100644 --- a/src/amltk/optimization/optimizer.py +++ b/src/amltk/optimization/optimizer.py @@ -15,18 +15,22 @@ """ from __future__ import annotations +import logging from abc import abstractmethod -from collections.abc import Callable, Iterable, Sequence +from collections.abc import Callable, Iterable, Iterator, Sequence from datetime import datetime +from pathlib import Path from typing import ( TYPE_CHECKING, Any, Concatenate, Generic, ParamSpec, + Protocol, TypeVar, overload, ) +from typing_extensions import Self from more_itertools import all_unique @@ -36,11 +40,14 @@ from amltk.optimization.metric import Metric from amltk.optimization.trial import Trial from amltk.pipeline import Node + from amltk.types import Seed I = TypeVar("I") # noqa: E741 P = ParamSpec("P") ParserOutput = TypeVar("ParserOutput") +logger = logging.getLogger(__name__) + class Optimizer(Generic[I]): """An optimizer protocol. @@ -123,3 +130,91 @@ def preferred_parser( """ return None + + @classmethod + @abstractmethod + def create( + cls, + *, + space: Node, + metrics: Metric | Sequence[Metric], + bucket: str | Path | PathBucket | None = None, + seed: Seed | None = None, + ) -> Self: + """Create this optimizer. + + !!! note + + Subclasses should override this with more specific configuration + but these arguments should be all that's necessary to create the optimizer. + + Args: + space: The space to optimize over. + bucket: The bucket for where to store things related to the trial. + metrics: The metrics to optimize. + seed: The seed to use for the optimizer. + + Returns: + The optimizer. + """ + + class CreateSignature(Protocol): + """A Protocol which defines the keywords required to create an + optimizer with deterministic behavior at a desired location. + + This protocol matches the `Optimizer.create` classmethod, however we also + allow any function which accepts the keyword arguments to create an + Optimizer. + """ + + def __call__( + self, + *, + space: Node, + metrics: Metric | Sequence[Metric], + bucket: PathBucket | None = None, + seed: Seed | None = None, + ) -> Optimizer: + """A function which creates an optimizer for node.optimize should + accept the following keyword arguments. + + Args: + space: The node to optimize + metrics: The metrics to optimize + bucket: The bucket to store the results in + seed: The seed to use for the optimization + """ + ... + + @classmethod + def _get_known_importable_optimizer_classes(cls) -> Iterator[type[Optimizer]]: + """Get all developer known optimizer classes. This is used for defaults. + + Do not rely on this functionality and prefer to give concrete optimizers to + functionality requiring one. This is intended for convenience of particular + quickstart methods. + """ + # NOTE: We can't use the `Optimizer.__subclasses__` method as the optimizers + # are not imported by any other module initially and so they do no exist + # until imported. Hence this manual iteration. For now, we be explicit and + # only if the optimizer list grows should we consider dynamic importing. + try: + from amltk.optimization.optimizers.smac import SMACOptimizer + + yield SMACOptimizer + except ImportError as e: + logger.debug("Failed to import SMACOptimizer", exc_info=e) + + try: + from amltk.optimization.optimizers.optuna import OptunaOptimizer + + yield OptunaOptimizer + except ImportError as e: + logger.debug("Failed to import OptunaOptimizer", exc_info=e) + + try: + from amltk.optimization.optimizers.neps import NEPSOptimizer + + yield NEPSOptimizer + except ImportError as e: + logger.debug("Failed to import NEPSOptimizer", exc_info=e) diff --git a/src/amltk/optimization/optimizers/neps.py b/src/amltk/optimization/optimizers/neps.py index 3d468d7d..1e7b6457 100644 --- a/src/amltk/optimization/optimizers/neps.py +++ b/src/amltk/optimization/optimizers/neps.py @@ -249,7 +249,7 @@ def __init__( self, *, space: SearchSpace, - loss_metric: Metric, + loss_metric: Metric | Sequence[Metric], cost_metric: Metric | None = None, optimizer: BaseOptimizer, working_dir: Path, @@ -307,7 +307,7 @@ def create( # noqa: PLR0913 | Mapping[str, ConfigurationSpace | Parameter] | Node ), - metrics: Metric, + metrics: Metric | Sequence[Metric], cost_metric: Metric | None = None, bucket: PathBucket | str | Path | None = None, searcher: str | BaseOptimizer = "default", diff --git a/src/amltk/pipeline/node.py b/src/amltk/pipeline/node.py index 4c5e559e..b247c053 100644 --- a/src/amltk/pipeline/node.py +++ b/src/amltk/pipeline/node.py @@ -36,11 +36,16 @@ means two nodes are considered equal if they look the same and they are connected in to nodes that also look the same. """ # noqa: E501 +# ruff: noqa: PLR0913 from __future__ import annotations import inspect -from collections.abc import Callable, Iterator, Mapping, Sequence +import warnings +from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence from dataclasses import dataclass, field +from datetime import datetime +from functools import partial +from pathlib import Path from typing import ( TYPE_CHECKING, Any, @@ -50,6 +55,7 @@ Literal, NamedTuple, ParamSpec, + Protocol, TypeAlias, TypeVar, overload, @@ -59,10 +65,15 @@ from more_itertools import first_true from sklearn.pipeline import Pipeline as SklearnPipeline -from amltk._functional import classname, mapping_select, prefix_keys +from amltk._functional import classname, funcname, mapping_select, prefix_keys from amltk._richutil import RichRenderable from amltk.exceptions import RequestNotMetError -from amltk.types import Config, Item, Space +from amltk.optimization.history import History +from amltk.optimization.optimizer import Optimizer +from amltk.scheduling import Task +from amltk.scheduling.plugins import Plugin +from amltk.store import PathBucket +from amltk.types import Config, Item, Seed, Space if TYPE_CHECKING: from typing_extensions import Self @@ -71,8 +82,12 @@ from rich.console import RenderableType from rich.panel import Panel + from amltk.evalutors.evaluation_protocol import EvaluationProtocol + from amltk.optimization.metric import Metric + from amltk.optimization.trial import Trial from amltk.pipeline.components import Choice, Join, Sequential from amltk.pipeline.parsers.optuna import OptunaSearchSpace + from amltk.scheduling import Scheduler NodeLike: TypeAlias = ( set["Node" | "NodeLike"] @@ -84,6 +99,81 @@ SklearnPipelineT = TypeVar("SklearnPipelineT", bound=SklearnPipeline) + +class OnBeginCallbackSignature(Protocol): + """A calllback to further define control flow from + [`pipeline.optimize()`][amltk.pipeline.node.Node.optimize]. + + In one of these callbacks, you can register to specific `@events` of the + [`Scheduler`][amltk.scheduling.Scheduler] or [`Task`][amltk.scheduling.Task]. + + ```python + pipeline = ... + + # The callback will get the task, scheduler and the history in which results + # will be stored + def my_callback(task: Task[..., Trial.Report], scheduler: Scheduler, history: History) -> None: + + # You can do early stopping based on a target metric + @task.on_result + def stop_if_target_reached(_: Future, report: Trial.Report) -> None: + score = report.metrics["accuracy"] + if score >= 0.95: + scheduler.stop(stop_msg="Target reached!")) + + # You could also perform early stopping based on iterations + n = 0 + last_score = 0.0 + + @task.on_result + def stop_if_no_improvement_for_n_runs(_: Future, report: Trial.Report) -> None: + score = report.metrics["accuracy"] + if score > last_score: + n = 0 + last_score = score + elif n >= 5: + scheduler.stop() + else: + n += 1 + + # Really whatever you'd like + @task.on_result + def print_if_choice_made(_: Future, report: Trial.Report) -> None: + if report.config["estimator:__choice__"] == "random_forest": + print("yay") + + # Every callback will be called here in the main process so it's + # best not to do anything too heavy here. + # However you can also submit new tasks or jobs to the scheduler too + @task.on_result(every=30) # Do a cleanup sweep every 30 trials + def keep_on_ten_best_models_on_disk(_: Future, report: Trial.Report) -> None: + sorted_reports = history.sortby("accuracy") + reports_to_cleanup = sorted_reports[10:] + scheduler.submit(some_cleanup_function, reporteds_to_cleanup) + + history = pipeline.optimize( + ..., + on_begin=my_callback, + ) + ``` + """ # noqa: E501 + + def __call__( + self, + task: Task[[Trial, Node], Trial.Report], + scheduler: Scheduler, + history: History, + ) -> None: + """Signature for the callback. + + Args: + task: The task that will be run + scheduler: The scheduler that will be running the optimization + history: The history that will be used to collect the results + """ + ... + + T = TypeVar("T") ParserOutput = TypeVar("ParserOutput") BuilderOutput = TypeVar("BuilderOutput") @@ -788,3 +878,617 @@ def _fufill_param_requests( continue return new_config + + def register_optimization_loop( # noqa: C901, PLR0915, PLR0912 + self, + target: ( + EvaluationProtocol + | Callable[[Trial, Node], Trial.Report] + | Task[[Trial, Node], Trial.Report] + ), + metric: Metric | Sequence[Metric], + *, + optimizer: type[Optimizer] | Optimizer.CreateSignature | None = None, + seed: Seed | None = None, + max_trials: int | None = 3, + n_workers: int = 1, + working_dir: str | Path | PathBucket | None = None, + scheduler: Scheduler | None = None, + history: History | None = None, + on_begin: OnBeginCallbackSignature | None = None, + on_trial_exception: Literal["raise", "end", "continue"] = "raise", + # Plugin creating arguments + plugins: Plugin | Iterable[Plugin] | None = None, + process_memory_limit: int | tuple[int, str] | None = None, + process_walltime_limit: int | tuple[float, str] | None = None, + process_cputime_limit: int | tuple[float, str] | None = None, + threadpool_limit_ctl: bool | int | None = None, + ) -> Scheduler: + """Setup a pipeline to be optimized in a loop. + + Args: + target: + The function against which to optimize. + + * If `target` is an + [`EvaluationProtocol`][amltk.evalutors.evaluation_protocol.EvaluationProtocol], + then it will be used to evaluate the pipeline. + + * If `target` is a function, then it must take in a + [`Trial`][amltk.optimization.trial.Trial] as the first argument + and a [`Node`][amltk.pipeline.node.Node] second argument, returning a + [`Trial.Report`][amltk.optimization.trial.Trial.Report]. Please refer to + the [optimization guide](../../../guides/optimization.md) for more. + + * If `target` is a [`Task`][amltk.scheduling.task.Task], then + this is not implemeneted yet. Sorry + metric: + The metric(s) that will be passed to `optimizer=`. These metrics + should align with what is being computed in `target=`. + optimizer: + The optimizer to use. If `None`, then AMLTK will go through a list + of known optimizers and use the first one it can find which was installed. + + Alternatively, this can be a class inheriting from + [`Optimizer`][amltk.optimization.optimizer.Optimizer] or else + a signature match [`Optimizer.CreateSignature`][amltk.optimization.Optimizer.CreateSignature] + + ??? tip "`Optimizer.CreateSignature`" + + ::: amltk.optimization.Optimizer.CreateSignature + + seed: + A [`seed`][amltk.types.Seed] for the optimizer to use. + n_workers: + The numer of workers to use to evaluate this pipeline. + If no `scheduler=` is provided, then one will be created for + you as [`Scheduler.with_processes(n_workers)`][amltk.scheduling.Scheduler.with_processes]. + If you provide your own `scheduler=` then this will limit the maximum + amount of concurrent trials for this pipeline that will be evaluating + at once. + working_dir: + A working directory to use for the optimizer and the trials. + Any items you store in trials will be located in this directory, + where the [`trial.name`][amltk.optimization.Trial.name] will be + used as a subfolder where any contents stored with + [`trial.store()`][amltk.optimization.trial.Trial.store] will be put there. + Please see the [optimization guide](../../../guides/optimization.md) + for more on trial storage. + scheduler: + The specific [`Scheduler`][amltk.scheduling.Scheduler] to use. + If `None`, then one will be created for you with + [`Scheduler.with_processes(n_workers)`][amltk.scheduling.Scheduler.with_processes] + history: + A [`History`][amltk.optimization.history.History] to store the + [`Trial.Report`][amltk.optimization.Trial.Report]s in. You + may pass in your own if you wish for this method to store + it there instead of creating its own. + on_begin: + A callback that will be called before the scheduler is run. This + can be used to hook into the life-cycle of the optimization and + perform custom routines. Please see the + [scheduling guide](../../../guides/scheduling.md) for more. + + ??? tip "on_begin signature" + + ::: amltk.pipeline.node.OnBeginCallbackSignature + + on_trial_exception: + What to do when a trial raises an exception within its + [`with trial.begin():`][amltk.optimization.Trial.begin] block. + Please see the [optimization guide](../../../guides/optimization.md) + for more. In all cases, the exception will be attached to the + [`Trial.Report`][amltk.optimization.Trial.Report] object under + [`report.exception`][amltk.optimization.Trial.Report.exception]. + + * If `#!python "raise"`, then the exception will be raised + immediatly and the optimization process will halt. The default + and good for initial development. + * If `#!python "end"`, then the exception will be caught and + the optimization process will end gracefully. + * If `#!python "continue"`, the exception will be ignored and + the optimization procedure will continue. + max_trials: + The maximum number of trials to run. If `None`, then the + optimization will continue for as long as the scheduler is + running. You'll likely want to configure this. + process_memory_limit: + If specified, the [`Task`][amltk.scheduling.task.Task] will + use the + [`PynisherPlugin`][amltk.scheduling.plugins.pynisher.PynisherPlugin] + to limit the memory the process can use. Please + refer to the + [plugins `pynisher` reference](../../../reference/scheduling/plugins.md#pynisher) + for more as there are platform limitations and additional + dependancies required. + process_walltime_limit: + If specified, the [`Task`][amltk.scheduling.task.Task] will + use the + [`PynisherPlugin`][amltk.scheduling.plugins.pynisher.PynisherPlugin] + to limit the wall time the process can use. Please + refer to the + [plugins `pynisher` reference](../../../reference/scheduling/plugins.md#pynisher) + for more as there are platform limitations and additional + dependancies required. + process_cputime_limit: + If specified, the [`Task`][amltk.scheduling.task.Task] will + use the + [`PynisherPlugin`][amltk.scheduling.plugins.pynisher.PynisherPlugin] + to limit the cputime the process can use. Please + refer to the + [plugins `pynisher` reference](../../../reference/scheduling/plugins.md#pynisher) + for more as there are platform limitations and additional + dependancies required. + threadpool_limit_ctl: + If specified, the [`Task`][amltk.scheduling.task.Task] will + use the + [`ThreadPoolCTLPlugin`][amltk.scheduling.plugins.threadpoolctl.ThreadPoolCTLPlugin] + to limit the number of threads used by compliant libraries. + **Notably**, this includes scikit-learn, for which running multiple + in parallel can be problematic if not adjusted accordingly. + + The default behavior (when `None`) is to auto-detect whether this + is applicable. This is done by checking if `sklearn` is installed + and if the first node in the pipeline has a `BaseEstimator` item. + Please set this to `True`/`False` depending on your preference. + plugins: + Additional plugins to attach to the eventual + [`Task`][amltk.scheduling.task.Task] that will be executed by + the [`Scheduler`][amltk.scheduling.Scheduler]. Please + refer to the + [plugins reference](../../../reference/scheduling/plugins.md) for more. + """ # noqa: E501 + match history: + case None: + history = History() + case History(): + pass + case _: + raise ValueError(f"Invalid history {history}. Must be a History") + + _plugins: tuple[Plugin, ...] + match plugins: + case None: + _plugins = () + case Plugin(): + _plugins = (plugins,) + case Iterable(): + _plugins = tuple(plugins) + case _: + raise ValueError( + f"Invalid plugins {plugins}. Must be a Plugin or an Iterable of" + " Plugins", + ) + + if any( + limit is not None + for limit in ( + process_memory_limit, + process_walltime_limit, + process_cputime_limit, + ) + ): + try: + from amltk.scheduling.plugins.pynisher import PynisherPlugin + except ImportError as e: + raise ImportError( + "You must install `pynisher` to use `trial_*_limit`" + " You can do so with `pip install amltk[pynisher]`" + " or `pip install pynisher` directly", + ) from e + # TODO: I'm hesitant to add even more arguments to the `optimize` + # signature, specifically for `mp_context`. + plugin = PynisherPlugin( + memory_limit=process_memory_limit, + walltime_limit=process_walltime_limit, + cputime_limit=process_cputime_limit, + ) + _plugins = (*_plugins, plugin) + + # If threadpool_limit_ctl None, we should default to inspecting if it's + # an sklearn pipeline. This is because sklearn pipelines + # run in parallel will over-subscribe the CPU and cause + # the system to slow down. + # We use a heuristic to check this by checking if the item at the head + # of this node is a subclass of sklearn.base.BaseEstimator + match threadpool_limit_ctl: + case None: + from amltk._util import threadpoolctl_heuristic + + threadpool_limit_ctl = False + if threadpoolctl_heuristic(self.item): + threadpool_limit_ctl = 1 + warnings.warn( + "Detected an sklearn pipeline. Setting `threadpool_limit_ctl`" + " to True. This will limit the number of threads spawned by" + " sklearn to the number of cores on the machine. This is" + " because sklearn pipelines run in parallel will over-subscribe" + " the CPU and cause the system to slow down." + "\nPlease set `threadpool_limit_ctl=False` if you do not want" + " this behaviour and set it to `True` to silence this warning.", + stacklevel=2, + ) + case True: + threadpool_limit_ctl = 1 + case False: + pass + case int(): + pass + case _: + raise ValueError( + f"Invalid threadpool_limit_ctl {threadpool_limit_ctl}." + " Must be a bool or an int", + ) + + if threadpool_limit_ctl is not False: + from amltk.scheduling.plugins.threadpoolctl import ThreadPoolCTLPlugin + + _plugins = (*_plugins, ThreadPoolCTLPlugin(threadpool_limit_ctl)) + + match max_trials: + case None: + pass + case int() if max_trials > 0: + from amltk.scheduling.plugins import Limiter + + _plugins = (*_plugins, Limiter(max_calls=max_trials)) + case _: + raise ValueError(f"{max_trials=} must be a positive int") + + from amltk.evalutors.evaluation_protocol import EvaluationProtocol + + match target: + case EvaluationProtocol(): + pass + case Task(): # type: ignore # NOTE not sure why pyright complains here + # TODO: When updating this, please update the docstring too + raise NotImplementedError( + "Not sure how to handle an already created task yet", + ) + case _ if callable(target): + from amltk.evalutors.evaluation_protocol import CustomProtocol + + target = CustomProtocol(target) + case _: + raise ValueError( + f"Invalid target {target}. Must be a function or an" + " EvaluationProtocol", + ) + + from amltk.scheduling.scheduler import Scheduler + + match scheduler: + case None: + scheduler = Scheduler.with_processes(n_workers) + case Scheduler(): + pass + case _: + raise ValueError(f"Invalid scheduler {scheduler}. Must be a Scheduler") + + # NOTE: I'm not particularly fond of this hack but I assume most people + # when prototyping don't care for the actual underlying optimizer and + # so we should just *pick one*. + create_optimizer: Optimizer.CreateSignature + match optimizer: + case None: + first_opt_class = next( + Optimizer._get_known_importable_optimizer_classes(), + None, + ) + if first_opt_class is None: + raise ValueError( + "No optimizer was given and no known importable optimizers were" + " found. Please consider giving one explicitly or installing" + " one of the following packages:\n" + "\n - optuna" + "\n - smac" + "\n - neural-pipeline-search", + ) + + create_optimizer = first_opt_class.create + opt_name = classname(first_opt_class) + case type(): + if not issubclass(optimizer, Optimizer): + raise ValueError( + f"Invalid optimizer {optimizer}. Must be a subclass of" + " Optimizer or a function that returns an Optimizer", + ) + create_optimizer = optimizer.create + opt_name = classname(optimizer) + case _: + assert not isinstance(optimizer, type) + create_optimizer = optimizer + opt_name = funcname(optimizer) + + match working_dir: + case None: + now = datetime.utcnow().isoformat() + + working_dir = PathBucket(f"{opt_name}-{self.name}-{now}") + case str() | Path(): + working_dir = PathBucket(working_dir) + case PathBucket(): + pass + case _: + raise ValueError( + f"Invalid working_dir {working_dir}." + " Must be a str, Path or PathBucket", + ) + + _optimizer = create_optimizer( + space=self, + metrics=metric, + bucket=working_dir, + seed=seed, + ) + + task = target.task(scheduler=scheduler, plugins=_plugins) + + if on_begin is not None: + hook = partial(on_begin, task, scheduler, history) + scheduler.on_start(hook) + + @scheduler.on_start + def launch_initial_trials() -> None: + trials = _optimizer.ask(n=n_workers) + for trial in trials: + task.submit(trial, self) + + from amltk.optimization.trial import Trial + + @task.on_result + def add_report_to_history(_: Any, report: Trial.Report) -> None: + history.add(report) + match report.status: + case Trial.Status.SUCCESS: + return + case Trial.Status.FAIL | Trial.Status.CRASHED | Trial.Status.UNKNOWN: + match on_trial_exception: + case "raise": + if report.exception is None: + raise RuntimeError( + f"Trial finished with status {report.status} but" + " no exception was attached!", + ) + raise report.exception + case "end": + scheduler.stop( + stop_msg=f"Trial finished with status {report.status}", + exception=report.exception, + ) + case "continue": + pass + case _: + raise ValueError(f"Invalid status {report.status}") + + @task.on_result + def run_next_trial(*_: Any) -> None: + if scheduler.running(): + trial = _optimizer.ask() + task.submit(trial, self) + + return scheduler + + def optimize( + self, + target: ( + EvaluationProtocol + | Callable[[Trial, Node], Trial.Report] + | Task[[Trial, Node], Trial.Report] + ), + metric: Metric | Sequence[Metric], + *, + optimizer: type[Optimizer] | Optimizer.CreateSignature | None = None, + seed: Seed | None = None, + max_trials: int | None = 3, + n_workers: int = 1, + timeout: float | None = None, + working_dir: str | Path | PathBucket | None = None, + scheduler: Scheduler | None = None, + history: History | None = None, + on_begin: OnBeginCallbackSignature | None = None, + on_trial_exception: Literal["raise", "end", "continue"] = "raise", + # Plugin creating arguments + plugins: Plugin | Iterable[Plugin] | None = None, + process_memory_limit: int | tuple[int, str] | None = None, + process_walltime_limit: int | tuple[float, str] | None = None, + process_cputime_limit: int | tuple[float, str] | None = None, + threadpool_limit_ctl: bool | int | None = None, + # `scheduler.run()` arguments + display: bool | Literal["auto"] = "auto", + wait: bool = True, + on_scheduler_exception: Literal["raise", "end", "continue"] = "raise", + ) -> History: + """Optimize a pipeline on a given target function or evaluation protocol. + + Args: + target: + The function against which to optimize. + + * If `target` is an + [`EvaluationProtocol`][amltk.evalutors.evaluation_protocol.EvaluationProtocol], + then it will be used to evaluate the pipeline. + + * If `target` is a function, then it must take in a + [`Trial`][amltk.optimization.trial.Trial] as the first argument + and a [`Node`][amltk.pipeline.node.Node] second argument, returning a + [`Trial.Report`][amltk.optimization.trial.Trial.Report]. Please refer to + the [optimization guide](../../../guides/optimization.md) for more. + + * If `target` is a [`Task`][amltk.scheduling.task.Task], then + this is not implemeneted yet. Sorry + metric: + The metric(s) that will be passed to `optimizer=`. These metrics + should align with what is being computed in `target=`. + optimizer: + The optimizer to use. If `None`, then AMLTK will go through a list + of known optimizers and use the first one it can find which was installed. + + Alternatively, this can be a class inheriting from + [`Optimizer`][amltk.optimization.optimizer.Optimizer] or else + a signature match [`Optimizer.CreateSignature`][amltk.optimization.Optimizer.CreateSignature] + + ??? tip "`Optimizer.CreateSignature`" + + ::: amltk.optimization.Optimizer.CreateSignature + + seed: + A [`seed`][amltk.types.Seed] for the optimizer to use. + n_workers: + The numer of workers to use to evaluate this pipeline. + If no `scheduler=` is provided, then one will be created for + you as [`Scheduler.with_processes(n_workers)`][amltk.scheduling.Scheduler.with_processes]. + If you provide your own `scheduler=` then this will limit the maximum + amount of concurrent trials for this pipeline that will be evaluating + at once. + timeout: + How long to run the scheduler for. This parameter only takes + effect if `setup_only=False` which is the default. Otherwise, + it will be ignored. + display: + Whether to display the scheduler during running. By default + it is `"auto"` which means to enable the display if running + in a juptyer notebook or colab. Otherwise, it will be + `False`. + + This may work poorly if including print statements or logging. + wait: + Whether to wait for the scheduler to finish all pending jobs + if was stopped for any reason, e.g. a `timeout=` or + [`scheduler.stop()`][amltk.scheduling.Scheduler.stop] was called. + on_scheduler_exception: + What to do if an exception occured, either in the submitted task, + the callback, or any other unknown source during the loop. + + * If `#!python "raise"`, then the exception will be raised + immediatly and the optimization process will halt. The default + behavior and good for initial development. + * If `#!python "end"`, then the exception will be caught and + the optimization process will end gracefully. + * If `#!python "continue"`, the exception will be ignored and + the optimization procedure will continue. + working_dir: + A working directory to use for the optimizer and the trials. + Any items you store in trials will be located in this directory, + where the [`trial.name`][amltk.optimization.Trial.name] will be + used as a subfolder where any contents stored with + [`trial.store()`][amltk.optimization.trial.Trial.store] will be put there. + Please see the [optimization guide](../../../guides/optimization.md) + for more on trial storage. + scheduler: + The specific [`Scheduler`][amltk.scheduling.Scheduler] to use. + If `None`, then one will be created for you with + [`Scheduler.with_processes(n_workers)`][amltk.scheduling.Scheduler.with_processes] + history: + A [`History`][amltk.optimization.history.History] to store the + [`Trial.Report`][amltk.optimization.Trial.Report]s in. You + may pass in your own if you wish for this method to store + it there instead of creating its own. + on_begin: + A callback that will be called before the scheduler is run. This + can be used to hook into the life-cycle of the optimization and + perform custom routines. Please see the + [scheduling guide](../../../guides/scheduling.md) for more. + + ??? tip "on_begin signature" + + ::: amltk.pipeline.node.OnBeginCallbackSignature + + on_trial_exception: + What to do when a trial raises an exception within its + [`with trial.begin():`][amltk.optimization.Trial.begin] block. + Please see the [optimization guide](../../../guides/optimization.md) + for more. In all cases, the exception will be attached to the + [`Trial.Report`][amltk.optimization.Trial.Report] object under + [`report.exception`][amltk.optimization.Trial.Report.exception]. + + * If `#!python "raise"`, then the exception will be raised + immediatly and the optimization process will halt. The default + and good for initial development. + * If `#!python "end"`, then the exception will be caught and + the optimization process will end gracefully. + * If `#!python "continue"`, the exception will be ignored and + the optimization procedure will continue. + max_trials: + The maximum number of trials to run. If `None`, then the + optimization will continue for as long as the scheduler is + running. You'll likely want to configure this. + process_memory_limit: + If specified, the [`Task`][amltk.scheduling.task.Task] will + use the + [`PynisherPlugin`][amltk.scheduling.plugins.pynisher.PynisherPlugin] + to limit the memory the process can use. Please + refer to the + [plugins `pynisher` reference](../../../reference/scheduling/plugins.md#pynisher) + for more as there are platform limitations and additional + dependancies required. + process_walltime_limit: + If specified, the [`Task`][amltk.scheduling.task.Task] will + use the + [`PynisherPlugin`][amltk.scheduling.plugins.pynisher.PynisherPlugin] + to limit the wall time the process can use. Please + refer to the + [plugins `pynisher` reference](../../../reference/scheduling/plugins.md#pynisher) + for more as there are platform limitations and additional + dependancies required. + process_cputime_limit: + If specified, the [`Task`][amltk.scheduling.task.Task] will + use the + [`PynisherPlugin`][amltk.scheduling.plugins.pynisher.PynisherPlugin] + to limit the cputime the process can use. Please + refer to the + [plugins `pynisher` reference](../../../reference/scheduling/plugins.md#pynisher) + for more as there are platform limitations and additional + dependancies required. + threadpool_limit_ctl: + If specified, the [`Task`][amltk.scheduling.task.Task] will + use the + [`ThreadPoolCTLPlugin`][amltk.scheduling.plugins.threadpoolctl.ThreadPoolCTLPlugin] + to limit the number of threads used by compliant libraries. + **Notably**, this includes scikit-learn, for which running multiple + in parallel can be problematic if not adjusted accordingly. + + The default behavior (when `None`) is to auto-detect whether this + is applicable. This is done by checking if `sklearn` is installed + and if the first node in the pipeline has a `BaseEstimator` item. + Please set this to `True`/`False` depending on your preference. + plugins: + Additional plugins to attach to the eventual + [`Task`][amltk.scheduling.task.Task] that will be executed by + the [`Scheduler`][amltk.scheduling.Scheduler]. Please + refer to the + [plugins reference](../../../reference/scheduling/plugins.md) for more. + """ # noqa: E501 + match history: + case None: + history = History() + case History(): + pass + case _: + raise ValueError(f"Invalid history {history}. Must be a History") + + scheduler = self.register_optimization_loop( + target=target, + metric=metric, + optimizer=optimizer, + seed=seed, + max_trials=max_trials, + n_workers=n_workers, + working_dir=working_dir, + scheduler=scheduler, + history=history, + on_begin=on_begin, + on_trial_exception=on_trial_exception, + plugins=plugins, + process_memory_limit=process_memory_limit, + process_walltime_limit=process_walltime_limit, + process_cputime_limit=process_cputime_limit, + threadpool_limit_ctl=threadpool_limit_ctl, + ) + scheduler.run( + wait=wait, + timeout=timeout, + on_exception=on_scheduler_exception, + display=display, + ) + return history diff --git a/src/amltk/scheduling/scheduler.py b/src/amltk/scheduling/scheduler.py index 02bda827..a43aa838 100644 --- a/src/amltk/scheduling/scheduler.py +++ b/src/amltk/scheduling/scheduler.py @@ -1302,7 +1302,7 @@ def run( ) = "raise", on_cancelled: Literal["raise", "end", "continue"] = "raise", asyncio_debug_mode: bool = False, - display: bool | Iterable[RenderableType] = False, + display: bool | Iterable[RenderableType] | Literal["auto"] = "auto", ) -> ExitState: """Run the scheduler. @@ -1378,6 +1378,11 @@ def run( Defaults to `False`. Please see [asyncio.run][] for more. display: Whether to display the scheduler live in the console. + * If `#!python "auto"`, will display the scheduler if in + a notebook or colab environemnt. Otherwise, it will not display + it. If left as "auto" and the display occurs, a warning will + be printed alongside it. + * If `#!python False`, will not display anything. * If `#!python True`, will display the scheduler and all its tasks. * If a `#!python list[RenderableType]` , will display the scheduler itself plus those renderables. @@ -1388,6 +1393,20 @@ def run( Raises: RuntimeError: If the scheduler is already running. """ + if display == "auto": + from amltk._richutil import is_jupyter + + display = is_jupyter() + if display is True: + warnings.warn( + "Detected that current running context is in a notebook!" + " When `display='auto'`, the default, the scheduler will" + " automatically be set to display. If you do not want this or" + " wish to disable this warning, please set `display=False`.", + UserWarning, + stacklevel=2, + ) + return asyncio.run( self.async_run( timeout=timeout, diff --git a/src/amltk/scheduling/task.py b/src/amltk/scheduling/task.py index a6550666..ab38298c 100644 --- a/src/amltk/scheduling/task.py +++ b/src/amltk/scheduling/task.py @@ -444,7 +444,7 @@ def _when_future_from_submission( @override def __repr__(self) -> str: - kwargs = {"unique_ref": self.unique_ref} + kwargs = {"unique_ref": self.unique_ref, "plugins": self.plugins} kwargs_str = ", ".join(f"{k}={v}" for k, v in kwargs.items()) return f"{self.__class__.__name__}({kwargs_str})" diff --git a/src/amltk/store/paths/path_bucket.py b/src/amltk/store/paths/path_bucket.py index 577ae765..1b5f23e2 100644 --- a/src/amltk/store/paths/path_bucket.py +++ b/src/amltk/store/paths/path_bucket.py @@ -108,7 +108,7 @@ class PathBucket(Bucket[str, Path]): def __init__( self, - path: Path | str, + path: PathBucket | Path | str, *, loaders: Sequence[type[PathLoader]] | None = None, create: bool = True, @@ -135,6 +135,8 @@ def __init__( if isinstance(path, str): path = Path(path) + elif isinstance(path, PathBucket): + path = path.path if clean and path.exists(): shutil.rmtree(path, ignore_errors=True) @@ -146,7 +148,7 @@ def __init__( path.mkdir(parents=True, exist_ok=True) self._create = create - self.path = path + self.path: Path = path self.loaders = _loaders def sizes(self) -> dict[str, int]: diff --git a/src/amltk/types.py b/src/amltk/types.py index eb9c821e..879c2e65 100644 --- a/src/amltk/types.py +++ b/src/amltk/types.py @@ -31,7 +31,7 @@ Space = TypeVar("Space") """Generic for objects that are aware of a space but not the specific kind""" -Seed: TypeAlias = int | np.integer | (np.random.RandomState | np.random.Generator) +Seed: TypeAlias = int | np.integer | np.random.RandomState | np.random.Generator """Type alias for kinds of Seeded objects.""" FidT: TypeAlias = tuple[int, int] | tuple[float, float] | list[Any] diff --git a/tests/pipeline/test_optimize.py b/tests/pipeline/test_optimize.py new file mode 100644 index 00000000..baebc2a1 --- /dev/null +++ b/tests/pipeline/test_optimize.py @@ -0,0 +1,141 @@ +from __future__ import annotations + +from collections.abc import Sequence +from pathlib import Path + +import pytest +import threadpoolctl + +from amltk import Component, Metric, Node, Trial +from amltk.optimization.history import History +from amltk.optimization.optimizers.smac import SMACOptimizer +from amltk.scheduling.scheduler import Scheduler +from amltk.scheduling.task import Task +from amltk.store import PathBucket +from amltk.types import Seed + +METRIC = Metric("acc", minimize=False, bounds=(0.0, 1.0)) + + +class _CustomError(Exception): + pass + + +def target_funtion(trial: Trial, pipeline: Node) -> Trial.Report: # noqa: ARG001 + # We don't really care here + with trial.begin(): + pass + + threadpool_info = threadpoolctl.threadpool_info() + trial.summary["num_threads"] = threadpool_info[0]["num_threads"] + return trial.success(acc=0.5) + + +def test_custom_callback_used(tmp_path: Path) -> None: + def my_callback(task: Task, scheduler: Scheduler, history: History) -> None: # noqa: ARG001 + raise _CustomError() + + component = Component(object, space={"a": (0.0, 1.0)}) + + with pytest.raises(_CustomError): + component.optimize( + target_funtion, + metric=METRIC, + on_begin=my_callback, + max_trials=1, + working_dir=tmp_path, + ) + + +def test_populates_given_history(tmp_path: Path) -> None: + history = History() + component = Component(object, space={"a": (0.0, 1.0)}) + trial = Trial(name="test_trial", config={}) + with trial.begin(): + pass + report = trial.success() + history.add(report) + + component.optimize( + target_funtion, + metric=METRIC, + history=history, + max_trials=1, + working_dir=tmp_path, + ) + + +def test_custom_create_optimizer_signature(tmp_path: Path) -> None: + component = Component(object, space={"a": (0.0, 1.0)}) + + def my_custom_optimizer_creator( + *, + space: Node, + metrics: Metric | Sequence[Metric], + bucket: PathBucket | None = None, + seed: Seed | None = None, + ) -> SMACOptimizer: + assert space is component + assert metrics is METRIC + assert bucket is not None + assert bucket.path == tmp_path + assert seed == 1 + + raise _CustomError() + + with pytest.raises(_CustomError): + component.optimize( + target_funtion, + metric=METRIC, + optimizer=my_custom_optimizer_creator, + max_trials=1, + seed=1, + working_dir=tmp_path, + ) + + +def test_history_populated_with_exactly_maximum_trials(tmp_path: Path) -> None: + component = Component(object, space={"a": (0.0, 1.0)}) + history = component.optimize( + target_funtion, + metric=METRIC, + max_trials=10, + working_dir=tmp_path, + ) + assert len(history) == 10 + + +def test_sklearn_head_triggers_triggers_threadpoolctl(tmp_path: Path) -> None: + from sklearn.ensemble import RandomForestClassifier + + info = threadpoolctl.threadpool_info() + num_threads = info[0]["num_threads"] + + component = Component(RandomForestClassifier, space={"a": (0.0, 1.0)}) + history = component.optimize( + target_funtion, + metric=METRIC, + max_trials=1, + working_dir=tmp_path, + ) + + report = history[0] + # Should have a different number of threads in there. By default 1 + assert report.summary["num_threads"] != num_threads + assert report.summary["num_threads"] == 1 + + +def test_no_sklearn_head_does_not_trigger_threadpoolctl(tmp_path: Path) -> None: + info = threadpoolctl.threadpool_info() + num_threads = info[0]["num_threads"] + + component = Component(object, space={"a": (0.0, 1.0)}) + history = component.optimize( + target_funtion, + metric=METRIC, + max_trials=1, + working_dir=tmp_path, + ) + + report = history[0] + assert report.summary["num_threads"] == num_threads