From 3cddc9eb7da6b23b35b214cd154f72bbf03770ac Mon Sep 17 00:00:00 2001 From: Zebedee Nicholls Date: Mon, 16 Sep 2024 15:56:34 +0200 Subject: [PATCH] Finish cleaning up docs --- .../how-to-guides/how-to-run-a-calibration.py | 174 ++++++++---------- src/openscm_calibration/calibration_demo.py | 23 +++ src/openscm_calibration/emcee_utils.py | 144 ++++++++++++++- src/openscm_calibration/minimize.py | 51 +---- src/openscm_calibration/model_runner.py | 31 ++-- src/openscm_calibration/parameter_handling.py | 167 +++++++++++++++++ src/openscm_calibration/typing.py | 73 +++++++- 7 files changed, 493 insertions(+), 170 deletions(-) create mode 100644 src/openscm_calibration/parameter_handling.py diff --git a/docs/how-to-guides/how-to-run-a-calibration.py b/docs/how-to-guides/how-to-run-a-calibration.py index 690468f..a2e56fe 100644 --- a/docs/how-to-guides/how-to-run-a-calibration.py +++ b/docs/how-to-guides/how-to-run-a-calibration.py @@ -48,6 +48,11 @@ ) from openscm_calibration.minimize import to_minimize_full from openscm_calibration.model_runner import OptModelRunner +from openscm_calibration.parameter_handling import ( + BoundDefinition, + ParameterDefinition, + ParameterOrder, +) from openscm_calibration.scipy_plotting import ( CallbackProxy, OptPlotter, @@ -366,18 +371,44 @@ def do_model_runs_input_generator( # %% [markdown] # Firstly, we define the parameters we're going to optimise. # This will be used to ensure a consistent order and units throughout. +# We also define the bounds we're going to use here +# because we need them later +# (but we don't always have to provide bounds). # %% -parameters = [ - ("k", f"{MASS_UNITS} / {TIME_UNITS} ^ 2"), - ("x_zero", LENGTH_UNITS), - ("beta", f"{MASS_UNITS} / {TIME_UNITS}"), -] -parameters +parameter_order = ParameterOrder( + ( + ParameterDefinition( + "k", + f"{MASS_UNITS} / {TIME_UNITS} ^ 2", + BoundDefinition( + lower=UREG.Quantity(300.0, "kg / s^2"), + upper=UREG.Quantity(10000.0, "kg / s^2"), + ), + ), + ParameterDefinition( + "x_zero", + LENGTH_UNITS, + BoundDefinition( + lower=UREG.Quantity(-2, "m"), + upper=UREG.Quantity(2, "m"), + ), + ), + ParameterDefinition( + "beta", + f"{MASS_UNITS} / {TIME_UNITS}", + BoundDefinition( + lower=UREG.Quantity(1e10, "kg / s"), + upper=UREG.Quantity(1e12, "kg / s"), + ), + ), + ) +) +parameter_order.names # %% -model_runner = OptModelRunner.from_parameters( - params=parameters, +model_runner = OptModelRunner.from_parameter_order( + parameter_order=parameter_order, do_model_runs_input_generator=do_model_runs_input_generator, do_model_runs=run_experiments, ) @@ -407,27 +438,6 @@ def do_model_runs_input_generator( start = np.array([4, 0.6, 2]) start -# %% [markdown] -# For this optimisation, we must also define bounds for each parameter. - -# %% -bounds_dict = { - "k": [ - UREG.Quantity(300, "kg / s^2"), - UREG.Quantity(1e4, "kg / s^2"), - ], - "x_zero": [ - UREG.Quantity(-2, "m"), - UREG.Quantity(2, "m"), - ], - "beta": [ - UREG.Quantity(1e10, "kg / s"), - UREG.Quantity(1e12, "kg / s"), - ], -} -bounds = [[v.to(unit).m for v in bounds_dict[k]] for k, unit in parameters] -bounds_dict - # %% [markdown] # Now we're ready to run our optimisation. @@ -467,7 +477,7 @@ def do_model_runs_input_generator( # We think this is the right way to calculate this. # If in doubt, you can always just increase this by some factor # (at the cost of potentially reserving more memory than you need). -max_n_runs = (maxiter + 1) * popsize * len(parameters) +max_n_runs = (maxiter + 1) * popsize * len(parameter_order.parameters) # Visualisation options @@ -478,11 +488,10 @@ def do_model_runs_input_generator( # Create axes to plot on cost_name = "cost" timeseries_axes = list(convert_results_to_plot_dict(target).keys()) -parameters_names = [v[0] for v in parameters] mosaic = get_optimisation_mosaic( cost_key=cost_name, - params=parameters_names, + params=parameter_order.names, timeseries=timeseries_axes, cost_col_relwidth=2, n_parameters_per_row=3, @@ -499,7 +508,7 @@ def do_model_runs_input_generator( store = OptResStore.from_n_runs_manager( max_n_runs, manager, - params=parameters_names, + params=parameter_order.names, add_iteration_to_res=add_iteration_info, ) @@ -518,7 +527,7 @@ def do_model_runs_input_generator( fig=fig, axes=axd, cost_key=cost_name, - parameters=parameters_names, + parameters=parameter_order.names, timeseries_axes=timeseries_axes, target=target, store=store, @@ -544,7 +553,7 @@ def do_model_runs_input_generator( optimize_res = scipy.optimize.differential_evolution( to_minimize, - bounds, + parameter_order.bounds_m(), maxiter=maxiter, x0=start, tol=tol, @@ -580,7 +589,7 @@ def do_model_runs_input_generator( # %% # Here we imagine that we're polishing from the results of the DE above, # but we make the start slightly worse first -start_local = optimize_res.x * 1.3 +start_local = optimize_res.x * 1.5 start_local # %% [markdown] @@ -595,25 +604,24 @@ def do_model_runs_input_generator( # Optimisation parameters tol = 1e-4 # Maximum number of iterations to use -maxiter = 100 +maxiter = 50 # We think this is how this works. # As above, if you hit memory errors, # just increase this by some factor. -max_n_runs = len(parameters) + 2 * maxiter +max_n_runs = len(parameter_order.names) + 2 * maxiter # Lots of options here method = "Nelder-mead" # Visualisation options -update_every = 20 +update_every = 5 thin_ts_to_plot = 5 -parameters_names = [v[0] for v in parameters] # Create other objects store = OptResStore.from_n_runs( max_n_runs, - params=parameters_names, + params=parameter_order.names, add_iteration_to_res=add_iteration_info, ) to_minimize = partial( @@ -630,7 +638,7 @@ def do_model_runs_input_generator( # thing as the previous example under the hood. opt_plotter = OptPlotter.from_autogenerated_figure( cost_key=cost_name, - params=parameters_names, + params=parameter_order.names, convert_results_to_plot_dict=convert_results_to_plot_dict, target=target, store=store, @@ -682,58 +690,19 @@ def do_model_runs_input_generator( # %% -def neg_log_prior_bounds(x: np.ndarray, bounds: np.ndarray) -> float: - """ - Log prior that just checks proposal is in bounds - - Parameters - ---------- - x - Parameter array - - bounds - Bounds for each parameter (must have same - order as x) - """ - in_bounds = (x > bounds[:, 0]) & (x < bounds[:, 1]) - if np.all(in_bounds): - return 0 - - return -np.inf - - -neg_log_prior = partial(neg_log_prior_bounds, bounds=np.array(bounds)) - +from openscm_calibration.emcee_utils import get_neg_log_prior, neg_log_info # %% -def log_prob(x) -> tuple[float, float, float]: - """ - Get log probability for a given parameter vector - - Returns (negative) log probability of x, - (negative) log likelihood of x based on the prior - and (negative) log likelihood of x. - """ - neg_ll_prior_x = neg_log_prior(x) - - if not np.isfinite(neg_ll_prior_x): - return -np.inf, None, None - - try: - model_results = model_runner.run_model(x) - except ValueError: - return -np.inf, None, None - - sses = cost_calculator.calculate_cost(model_results) - neg_ll_x = -sses / 2 - neg_log_prob = neg_ll_x + neg_ll_prior_x - - return neg_log_prob, neg_ll_prior_x, neg_ll_x - - -# %% -assert False, "put neg_log_prior_bounds in package and check names against emcee docs" -assert False, "put log_prob in package and check names against emcee docs" +neg_log_prior = get_neg_log_prior( + parameter_order, + kind="uniform", +) +neg_log_info = partial( + neg_log_info, + neg_log_prior=neg_log_prior, + model_runner=model_runner, + negative_log_likelihood_calculator=cost_calculator, +) # %% [markdown] # We're using the DIME proposal from [emcwrap](https://github.com/gboehl/emcwrap). @@ -741,7 +710,7 @@ def log_prob(x) -> tuple[float, float, float]: # so requires less fine tuning and is less sensitive to the starting point. # %% -ndim = len(bounds) +ndim = len(parameter_order.names) # emcwrap docs suggest 5 * ndim nwalkers = 5 * ndim @@ -773,16 +742,15 @@ def log_prob(x) -> tuple[float, float, float]: # the posterior appropriately, normally requires looking at the # chains and then just running them for longer if needed. # This number is definitely too small. -max_iterations = 125 +max_iterations = 200 burnin = 25 thin = 5 ## Visualisation options plot_every = 30 convergence_ratio = 50 -parameter_order = [p[0] for p in parameters] neg_log_likelihood_name = "neg_ll" -labels_chain = [neg_log_likelihood_name, *parameter_order] +labels_chain = [neg_log_likelihood_name, *parameter_order.names] # Stores for autocorrelation values # (as a function of the number of steps performed). @@ -798,7 +766,7 @@ def log_prob(x) -> tuple[float, float, float]: holder_chain = display(fig_chain, display_id=True) # noqa: F821 # used in a notebook fig_dist, axd_dist = plt.subplot_mosaic( - mosaic=[[parameter] for parameter in parameter_order], + mosaic=[[parameter] for parameter in parameter_order.names], figsize=(10, 5), ) holder_dist = display(fig_dist, display_id=True) # noqa: F821 # used in a notebook @@ -814,13 +782,16 @@ def log_prob(x) -> tuple[float, float, float]: holder_tau = display(fig_tau, display_id=True) # noqa: F821 # used in a notebook # Plottting helper -truths_corner = [truth[k].to(u).m for k, u in parameters] +truths_corner = [ + truth[parameter.name].to(parameter.unit).m + for parameter in parameter_order.parameters +] with Pool(processes=processes) as pool: sampler = emcee.EnsembleSampler( nwalkers, ndim, - log_prob, + log_prob_fn=neg_log_info, # Handy, but requires changes in other functions. # One for the future. # parameter_names=parameter_order, @@ -839,7 +810,7 @@ def log_prob(x) -> tuple[float, float, float]: burnin=burnin, thin=thin, plot_every=plot_every, - parameter_order=parameter_order, + parameter_order=parameter_order.names, neg_log_likelihood_name=neg_log_likelihood_name, start=start_emcee if sampler.iteration < 1 else None, holder_chain=holder_chain, @@ -855,8 +826,7 @@ def log_prob(x) -> tuple[float, float, float]: ax_tau=ax_tau, corner_kwargs=dict(truths=truths_corner), ): - print(f"{step_info.steps_post_burnin=}") - print(f"{step_info.acceptance_fraction=:.3f}") + print(f"{step_info.steps_post_burnin=}. {step_info.acceptance_fraction=:.3f}") # Close all the figures to avoid them appearing twice for _ in range(4): diff --git a/src/openscm_calibration/calibration_demo.py b/src/openscm_calibration/calibration_demo.py index c559af9..26a631d 100644 --- a/src/openscm_calibration/calibration_demo.py +++ b/src/openscm_calibration/calibration_demo.py @@ -80,6 +80,29 @@ def calculate_cost( return sses + def calculate_negative_log_likelihood( + self, + model_results: ExperimentResultCollection, + ) -> float: + """ + Calculate the negative log likelihood of a given set of results + + Parameters + ---------- + model_results + Model results for which to calculate the negative log likelihood + + Returns + ------- + : + Negative log likelihood (up to an additive constant) + """ + sses = self.calculate_cost(model_results) + # TODO: find the proof of this + negative_log_likelihood = -sses / 2 + + return negative_log_likelihood + @define class Timeseries: diff --git a/src/openscm_calibration/emcee_utils.py b/src/openscm_calibration/emcee_utils.py index 6859633..e0f700c 100644 --- a/src/openscm_calibration/emcee_utils.py +++ b/src/openscm_calibration/emcee_utils.py @@ -5,10 +5,19 @@ from __future__ import annotations import warnings -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Callable, Literal import numpy as np +import numpy.typing as nptype from attrs import define +from typing_extensions import TypeAlias + +from openscm_calibration.parameter_handling import ParameterOrder +from openscm_calibration.typing import ( + DataContainer, + SupportsModelRun, + SupportsNegativeLogLikelihoodCalculation, +) if TYPE_CHECKING: # See here for explanation of this pattern and why we don't need quotes @@ -16,7 +25,138 @@ from typing import Any import emcee.backends - import numpy.typing as nptype + + +SupportsLikelihoodCalculation: TypeAlias = Callable[[nptype.NDArray[np.float64]], float] + + +def get_neg_log_prior( + parameter_order: ParameterOrder, + kind: Literal["uniform"] = "uniform", +) -> SupportsLikelihoodCalculation: + """ + Get a function for calculating the negative log prior + + Parameters + ---------- + parameter_order + The parameters being calibrated + + kind + The kind of log prior to create. + + Options: + + - "uniform": a uniform prior within the bounds for each parameter + + Returns + ------- + : + Function that, given a parameter vector, returns the negative log prior + """ + if kind == "uniform": + bounds = np.array( + [parameter.bounds_m() for parameter in parameter_order.parameters] + ) + + def neg_log_prior(x: nptype.NDArray[np.float64]) -> float: + """ + Get log prior for a given parameter vector, `x` + + This was generated in `get_neg_log_prior_from_bounds` + and assumes a uniform distribution for parameters + within their bounds. + + Future uses must match the bounds order provided to + `get_neg_log_prior_from_bounds`. + + Parameters + ---------- + x + Parameter vector. + + Returns + ------- + : + Negative log prior of `x` + """ + in_bounds = (x > bounds[:, 0]) & (x < bounds[:, 1]) + if np.all(in_bounds): + return 0 + + return -np.inf + + else: + raise NotImplementedError(kind) + + return neg_log_prior + + +def neg_log_info( + x: nptype.NDArray[np.float64], + neg_log_prior: Callable[[nptype.NDArray[np.float64]], float], + model_runner: SupportsModelRun[DataContainer], + negative_log_likelihood_calculator: SupportsNegativeLogLikelihoodCalculation[ + DataContainer + ], +) -> tuple[float, float, float | None]: + """ + Get negative log probability and likelihood information + + Specifically, negative log probability, log prior and log likelihood + for a given parameter vector. + + Parameters + ---------- + x + Parameter vector. + + Be careful with the order and units of parameters. + No checks of parameter order or units are performed in this function. + + neg_log_prior + Function that calculates the negative log prior for a given value of `x` + + model_runner + Runner of the model for a given value of `x` + + negative_log_likelihood_calculator + Calculator of negative log likelihood based on the outputs of the model run + + Returns + ------- + : + Negative log probability of `x`, + negative log prior of `x` + and negative log likelihood of `x`. + + If the negative log probability of `x` is `-np.inf`, + then no likelihood calculation is performed + and `None` is returned for the negative log likelihood of `x`. + + If the log likelihood calculation raises a `ValueError`, + we assume that the likelihood calculation failed + and return `-np.inf` for the log probability + and negative log likelihood of `x`. + """ + neg_log_prior_x = neg_log_prior(x) + + if not np.isfinite(neg_log_prior_x): + return -np.inf, neg_log_prior_x, None + + try: + model_results = model_runner.run_model(x) + except ValueError: + return -np.inf, neg_log_prior_x, -np.inf + + neg_log_likelihood_x = ( + negative_log_likelihood_calculator.calculate_negative_log_likelihood( + model_results + ) + ) + neg_log_prob = neg_log_prior_x + neg_log_likelihood_x + + return neg_log_prob, neg_log_prior_x, neg_log_likelihood_x def get_start( diff --git a/src/openscm_calibration/minimize.py b/src/openscm_calibration/minimize.py index 0b0c9be..eb48ccd 100644 --- a/src/openscm_calibration/minimize.py +++ b/src/openscm_calibration/minimize.py @@ -4,66 +4,21 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Protocol +from typing import TYPE_CHECKING import numpy as np from openscm_calibration.store import OptResStore from openscm_calibration.typing import ( DataContainer, - DataContainer_co, - DataContainer_contra, + SupportsCostCalculation, + SupportsModelRun, ) if TYPE_CHECKING: from typing import Any -class SupportsCostCalculation(Protocol[DataContainer_contra]): - """ - Class that supports cost calculations - """ - - def calculate_cost(self, model_results: DataContainer_contra) -> float: - """ - Calculate cost function - - Parameters - ---------- - model_results - Model results for which to calculate the cost - - Returns - ------- - : - Cost function value - """ - - -class SupportsModelRun(Protocol[DataContainer_co]): - """ - Class that supports model runs - """ - - def run_model( - self, - x: np.typing.NDArray[np.number[Any]], - ) -> DataContainer_co: - """ - Calculate cost function - - Parameters - ---------- - x - Parameter values - - Returns - ------- - : - Results of model run - """ - - def to_minimize_full( x: np.typing.NDArray[np.number[Any]], cost_calculator: SupportsCostCalculation[DataContainer], diff --git a/src/openscm_calibration/model_runner.py b/src/openscm_calibration/model_runner.py index 7772ec3..4dfd832 100644 --- a/src/openscm_calibration/model_runner.py +++ b/src/openscm_calibration/model_runner.py @@ -4,7 +4,6 @@ from __future__ import annotations -from collections.abc import Iterable from functools import partial from typing import Any, Callable, Generic, Protocol @@ -12,6 +11,7 @@ import pint from attrs import define +from openscm_calibration.parameter_handling import ParameterOrder from openscm_calibration.typing import DataContainer_co @@ -85,22 +85,22 @@ class OptModelRunner(Generic[DataContainer_co]): """ @classmethod - def from_parameters( + def from_parameter_order( cls, - params: Iterable[tuple[str, str | pint.Unit | None]], + parameter_order: ParameterOrder, do_model_runs_input_generator: ModelRunsInputGenerator, do_model_runs: ModelRunner[DataContainer_co], get_unit_registry: Callable[[], pint.UnitRegistry] | None = None, ) -> OptModelRunner[DataContainer_co]: """ - Initialise from list of parameters + Initialise from a parameter order definition This is a convenience method Parameters ---------- - params - List of parameters + parameter_order + Parameter order from which to initialise do_model_runs_input_generator Generator of input for `do_model_runs`. @@ -124,7 +124,7 @@ def from_parameters( """ convert_x_to_names_with_units = partial( x_and_parameters_to_named_with_units, - params=params, + parameter_order=parameter_order, get_unit_registry=get_unit_registry, ) @@ -162,7 +162,7 @@ def run_model( def x_and_parameters_to_named_with_units( x: np.typing.NDArray[np.number[Any]], - params: Iterable[tuple[str, str | pint.Unit | None]], + parameter_order: ParameterOrder, get_unit_registry: Callable[[], pint.UnitRegistry] | None = None, ) -> dict[str, pint.registry.UnitRegistry.Quantity]: """ @@ -173,11 +173,8 @@ def x_and_parameters_to_named_with_units( x Vector of calibration parameter values - params - parameters to be calibrated - - This defines both the names and units of the parameters. - If the units are `None`, the values are assumed to be plain numpy quantities. + parameter_order + Definition of the (expected) order of the parameters get_unit_registry Function to get unit registry. @@ -221,12 +218,12 @@ def x_and_parameters_to_named_with_units( ) out: dict[str, pint.registry.UnitRegistry.Quantity] = {} - for val, (key, unit) in zip(x, params): - if unit is not None: - val_out = unit_reg.Quantity(val, unit) + for val, parameter in zip(x, parameter_order.parameters): + if parameter.unit is not None: + val_out = unit_reg.Quantity(val, parameter.unit) else: val_out = val - out[key] = val_out + out[parameter.name] = val_out return out diff --git a/src/openscm_calibration/parameter_handling.py b/src/openscm_calibration/parameter_handling.py new file mode 100644 index 0000000..bca8c15 --- /dev/null +++ b/src/openscm_calibration/parameter_handling.py @@ -0,0 +1,167 @@ +""" +Handling of parameters + +In particular, helping to ensure that we use the correct parameter order and units +in the many places where the order and units are not checked by the receiving function +(e.g. all of scipy's optimisation routines, which work on bare arrays). +""" + +from __future__ import annotations + +from typing import TypeVar + +import attr +import numpy as np +import numpy.typing as nptype +import pint +from attrs import define, field + +BoundsValue = TypeVar( + "BoundsValue", float, np.float64, pint.registry.UnitRegistry.Quantity +) + + +@define +class BoundDefinition: + """Definition of bounds for a parameter""" + + lower: BoundsValue = field() + """Lower bound""" + + upper: BoundsValue = field() + """Upper bound""" + + @upper.validator + def check_greather_than_lower( + instance: BoundDefinition, + attribute: attr.Attribute[pint.registry.UnitRegistry.Quantity], + value: pint.registry.UnitRegistry.Quantity, + ) -> None: + """Check that the value is greater than the lower bound""" + if value <= instance.lower: + msg = ( + "`upper` must be greater than `lower`. " + f"Received {instance.lower=}, instance.upper={value=}`." + ) + raise ValueError(msg) + + +@define +class ParameterDefinition: + """Definition of a parameter""" + + name: str + """The parameter's name""" + + unit: str | None + """ + The parameter's default unit for usage that isn't unit-aware + + If this is `None`, we assume that no units apply to the parameter. + """ + + bounds: BoundDefinition | None = None + """The bounds for the parameter""" + + def bounds_m( + self, unit: str | None = None + ) -> tuple[ + pint.registry.UnitRegistry.Quantity, pint.registry.UnitRegistry.Quantity + ]: + """ + Get the magnitude of `self.bounds` + + Parameters + ---------- + unit + Unit in which to return the magnitude. + + Only applies if `self.unit` is not `None`. + + If `self.unit` is not `None` and `unit` is not supplied, + we use `self.unit` instead + (i.e. `self.unit` functions as a default). + + Returns + ------- + : + Magnitude of the bounds (lower, upper). + """ + if self.bounds is None: + msg = ( + f"No bounds provided for {self.name}. " + "Please re-initialise this parameter definition." + ) + raise ValueError(msg) + + if self.unit is None: + return tuple([v for v in [self.bounds.lower, self.bounds.upper]]) + + unit_h = unit if unit is not None else self.unit + + return tuple([v.to(unit_h).m for v in [self.bounds.lower, self.bounds.upper]]) + + +@define +class ParameterOrder: + """Parameter order definition""" + + parameters: tuple[ParameterDefinition, ...] + """ + Parameters + + These are stored in a tuple, hence order is preserved. + """ + + def bounds_m( + self, units: dict[str, str] | None = None + ) -> nptype.NDArray[np.float64]: + """ + Get the magnitude of the bounds of the parameters + + Parameters + ---------- + units + Units to use for the parameters. + + Keys are parameter names, + values are the units to use for the parameter's bounds. + + If not supplied at all, + we simply use the default unit for each parameter + i.e. the default behaviour of + [`ParameterDefinition.bounds_m`][openscm_calibration.parameter_handling.ParameterDefinition.bounds_m]. + + If not supplied for a given parameter, + we simply use the parameter's default unit + i.e. the default behaviour of + [`ParameterDefinition.bounds_m`][openscm_calibration.parameter_handling.ParameterDefinition.bounds_m]. + + Returns + ------- + : + Magnitude of the bounds of the parameters (in order) + """ + out_l = [] + for parameter in self.parameters: + if units is not None and parameter.name in units: + unit = units[parameter.name] + + else: + unit = None + + out_l.append(parameter.bounds_m(unit)) + + return np.array(out_l) + + @property + def names(self) -> tuple[str, ...]: + """ + Get the names of the parameters + + Returns + ------- + : + Names of the parameters (in order) + """ + return tuple([v.name for v in self.parameters]) diff --git a/src/openscm_calibration/typing.py b/src/openscm_calibration/typing.py index 4351779..7aafce8 100644 --- a/src/openscm_calibration/typing.py +++ b/src/openscm_calibration/typing.py @@ -4,8 +4,79 @@ from __future__ import annotations -from typing import TypeVar +from typing import Any, Protocol, TypeVar + +import numpy as np +import numpy.typing as nptype DataContainer = TypeVar("DataContainer") DataContainer_co = TypeVar("DataContainer_co", covariant=True) DataContainer_contra = TypeVar("DataContainer_contra", contravariant=True) + + +class SupportsModelRun(Protocol[DataContainer_co]): + """ + Class that supports model runs + """ + + def run_model( + self, + x: nptype.NDArray[np.number[Any]], + ) -> DataContainer_co: + """ + Calculate cost function + + Parameters + ---------- + x + Parameter values + + Returns + ------- + : + Results of model run + """ + + +class SupportsCostCalculation(Protocol[DataContainer_contra]): + """ + Class that supports cost calculations + """ + + def calculate_cost(self, model_results: DataContainer_contra) -> float: + """ + Calculate cost function + + Parameters + ---------- + model_results + Model results for which to calculate the cost + + Returns + ------- + : + Cost function value + """ + + +class SupportsNegativeLogLikelihoodCalculation(Protocol[DataContainer_contra]): + """ + Class that supports negative log likelihood calculations + """ + + def calculate_negative_log_likelihood( + self, model_results: DataContainer_contra + ) -> float: + """ + Calculate negative log likelihood + + Parameters + ---------- + model_results + Model results for which to calculate the cost + + Returns + ------- + : + Negative log likelihood + """