diff --git a/pymc/variational/callbacks.py b/pymc/variational/callbacks.py index 72f0b4a2ddf..4f609efbaf3 100644 --- a/pymc/variational/callbacks.py +++ b/pymc/variational/callbacks.py @@ -18,7 +18,13 @@ import numpy as np -__all__ = ["Callback", "CheckParametersConvergence", "Tracker"] +__all__ = [ + "Callback", + "CheckParametersConvergence", + "ExponentialDecay", + "ReduceLROnPlateau", + "Tracker", +] class Callback: @@ -93,6 +99,125 @@ def flatten_shared(shared_list): return np.concatenate([sh.get_value().flatten() for sh in shared_list]) +class LearningRateScheduler(Callback): + """Baseclass for learning rate schedulers.""" + + def __init__(self, optimizer): + self.optimizer = optimizer + + def __call__(self, approx, loss_hist, i): + raise NotImplementedError("Must be implemented in subclass.") + + def _set_new_lr(self, new_lr): + self.optimizer.keywords["learning_rate"] = new_lr + + +class ExponentialDecay(LearningRateScheduler): + """ + Exponentially decays the learning rate. + + This is inspired by Keras' homonymous callback: + https://github.com/keras-team/keras/blob/v2.14.0/keras/optimizers/schedules/learning_rate_schedule.py + + Parameters + ---------- + decay_steps : int + Number of steps at which the learning rate decay happens. + decay_rate : float + Rate of decay. + min_lr: float + lower bound on the learning rate + staircase : bool + If True, decay the learning rate at discrete intervals. + """ + + def __init__(self, optimizer, decay_steps, decay_rate, min_lr=1e-6, staircase=False): + super().__init__(optimizer) + self.decay_steps = decay_steps + self.decay_rate = decay_rate + self.staircase = staircase + self.min_lr = min_lr + + self.initial_learning_rate = float(self.optimizer.keywords["learning_rate"]) + + def __call__(self, approx, loss_hist, i): + if self.staircase: + new_lr = self.initial_learning_rate * self.decay_rate ** (i // self.decay_steps) + else: + new_lr = self.initial_learning_rate * self.decay_rate ** (i / self.decay_steps) + if new_lr >= self.min_lr: + self._set_new_lr(new_lr) + + +class ReduceLROnPlateau(LearningRateScheduler): + """ + Reduce learning rate when the loss has stopped improving. + + This is inspired by Keras' homonymous callback: + https://github.com/keras-team/keras/blob/v2.14.0/keras/callbacks.py + + Parameters + ---------- + optimizer: callable + PyMC optimizer + factor: float + factor by which the learning rate will be reduced: `new_lr = lr * factor` + patience: int + number of epochs with no improvement after which learning rate will be reduced + min_lr: float + lower bound on the learning rate + cooldown: int + number of iterations to wait before resuming normal operation after lr has been reduced + """ + + def __init__( + self, + optimizer, + factor=0.1, + patience=10, + min_lr=1e-6, + cooldown=0, + ): + super().__init__(optimizer) + self.factor = factor + self.patience = patience + self.min_lr = min_lr + self.cooldown = cooldown + self.cooldown_counter = 0 + self.wait = 0 + self.best = float("inf") + + def _in_cooldown(self): + return self.cooldown_counter > 0 + + def _reduce_lr(self): + old_lr = float(self.optimizer.keywords["learning_rate"]) + new_lr = max(old_lr * self.factor, self.min_lr) + if new_lr >= self.min_lr: + self._set_new_lr(new_lr) + + def __call__(self, approx, loss_hist, i): + current = loss_hist[-1] + + if np.isinf(current): + return + + if self._in_cooldown(): + self.cooldown_counter -= 1 + self.wait = 0 + return + + if current < self.best: + self.best = current + self.wait = 0 + elif not np.isinf(self.best): + self.wait += 1 + if self.wait >= self.patience: + self._reduce_lr() + self.cooldown_counter = self.cooldown + self.wait = 0 + + class Tracker(Callback): """ Helper class to record arbitrary stats during VI diff --git a/pymc/variational/opvi.py b/pymc/variational/opvi.py index 99261f026e0..7a2e1625c21 100644 --- a/pymc/variational/opvi.py +++ b/pymc/variational/opvi.py @@ -251,6 +251,7 @@ def updates( if more_updates is None: more_updates = dict() resulting_updates = ObjectiveUpdates() + if self.test_params: self.add_test_updates( resulting_updates, @@ -313,10 +314,14 @@ def add_obj_updates( obj_target = self( obj_n_mc, more_obj_params=more_obj_params, more_replacements=more_replacements ) + grads = pm.updates.get_or_compute_grads(obj_target, self.obj_params + more_obj_params) if total_grad_norm_constraint is not None: grads = pm.total_norm_constraint(grads, total_grad_norm_constraint) - updates.update(obj_optimizer(grads, self.obj_params + more_obj_params)) + + # Pass the loss plus the gradients to the optimizer, so that schedulers can use the loss if need. + updates.update(obj_optimizer((obj_target, grads), self.obj_params + more_obj_params)) + if self.op.returns_loss: updates.loss = obj_target diff --git a/pymc/variational/updates.py b/pymc/variational/updates.py index a6d049608e4..31bc5a96d5a 100644 --- a/pymc/variational/updates.py +++ b/pymc/variational/updates.py @@ -109,7 +109,7 @@ """ from collections import OrderedDict -from functools import partial +from functools import partial, wraps import numpy as np import pytensor @@ -173,17 +173,103 @@ def get_or_compute_grads(loss_or_grads, params): ) return loss_or_grads else: - return pytensor.grad(loss_or_grads, params) + grads = pytensor.grad(loss_or_grads, params) + for grad, param in zip(grads, params): + grad.name = f"d_loss/d_{param.name}" + return grads -def _get_call_kwargs(_locals_): - _locals_ = _locals_.copy() - _locals_.pop("loss_or_grads") - _locals_.pop("params") - return _locals_ +def _input_to_shared_variable(x, name): + if isinstance(x, pt.sharedvar.SharedVariable): + return x + return pytensor.shared(x, name=name) -def sgd(loss_or_grads=None, params=None, learning_rate=1e-3): +def _find_variable_among_args_kwargs(args, kwargs, name): + """ + Helper function to find a variable among args and kwargs. + + Notes + ----- + Assumes that the variable being searched for is either a kwarg or the first arg. + """ + + variable = kwargs.pop(name, None) + if not variable: + variable = args.pop(0) if len(args) > 0 else None + return variable + + +def _partial_initialization_wrapper(optimizer): + """ + Functional wrapper to allow optimizer to be called without both loss_or_grads and params + + Parameters + ---------- + optimizer: callable + Optimizer function to wrap + + Returns + ------- + optimizer: callable + Optimizer function that returns itself partially initialized if called without both loss_or_grads and params + """ + + @wraps(optimizer) + def make_partial_if_necessary(*args, **kwargs): + args = list(args) + loss_or_grads = _find_variable_among_args_kwargs(args, kwargs, "loss_or_grads") + params = _find_variable_among_args_kwargs(args, kwargs, "params") + + if loss_or_grads is None and params is None: + return partial(optimizer, **kwargs) + elif loss_or_grads is None or params is None: + raise ValueError("Please provide both `loss_or_grads` and `params` to get updates") + + return optimizer(loss_or_grads=loss_or_grads, params=params, **kwargs) + + return make_partial_if_necessary + + +def _handle_loss_and_grad_input_wrapper(optimizer): + """ + Functional wrapper to allow optimizer to take a tuple of (loss, grads) as input, and either discard the loss or + pass it through. + + Adds a keyword argument to the wrapped optimizer, `discard_loss`, which if True, will discard the loss and only + return the updates. If False, the optimizer will return both the updates and the loss. + + Parameters + ---------- + optimizer: callable + Optimizer function to wrap + + Returns + ------- + optimizer: callable + Wrapped optimizer function + """ + + @wraps(optimizer) + def discard_or_pass_through_loss_optimizer( + loss_or_grads, params, discard_loss=True, *args, **kwargs + ): + if isinstance(loss_or_grads, tuple): + loss, grads = loss_or_grads + updates = optimizer(loss_or_grads=grads, params=params, *args, **kwargs) + else: + discard_loss, loss = True, None + updates = optimizer(loss_or_grads=loss_or_grads, params=params, *args, **kwargs) + + if discard_loss: + return updates + else: + return updates, loss + + return discard_or_pass_through_loss_optimizer + + +def _sgd(loss_or_grads=None, params=None, *, learning_rate=1e-3): """Stochastic Gradient Descent (SGD) updates Generates update expressions of the form: @@ -223,19 +309,20 @@ def sgd(loss_or_grads=None, params=None, learning_rate=1e-3): >>> isinstance(updates, dict) True """ - if loss_or_grads is None and params is None: - return partial(sgd, **_get_call_kwargs(locals())) - elif loss_or_grads is None or params is None: - raise ValueError("Please provide both `loss_or_grads` and `params` to get updates") grads = get_or_compute_grads(loss_or_grads, params) updates = OrderedDict() for param, grad in zip(params, grads): - updates[param] = param - learning_rate * grad + updated_param = param - learning_rate * grad + updated_param.name = f"{param.name}__updated" + updates[param] = updated_param return updates +sgd = _partial_initialization_wrapper(_handle_loss_and_grad_input_wrapper(_sgd)) + + def apply_momentum(updates, params=None, momentum=0.9): """Returns a modified update dictionary including momentum @@ -275,15 +362,23 @@ def apply_momentum(updates, params=None, momentum=0.9): for param in params: value = param.get_value(borrow=True) - velocity = pytensor.shared(np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape) - x = momentum * velocity + updates[param] - updates[velocity] = x - param - updates[param] = x + velocity = pytensor.shared( + np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape, name="velocity" + ) + + updated_param = momentum * velocity + updates[param] + updated_param.name = f"{param}__updated" + + updated_velocity = updated_param - param + updated_velocity.name = "velocity__updated" + + updates[velocity] = updated_velocity + updates[param] = updated_param return updates -def momentum(loss_or_grads=None, params=None, learning_rate=1e-3, momentum=0.9): +def _momentum(loss_or_grads=None, params=None, learning_rate=1e-3, momentum=0.9): """Stochastic Gradient Descent (SGD) updates with momentum Generates update expressions of the form: @@ -335,14 +430,14 @@ def momentum(loss_or_grads=None, params=None, learning_rate=1e-3, momentum=0.9): >>> isinstance(updates, dict) True """ - if loss_or_grads is None and params is None: - return partial(pm.updates.momentum, **_get_call_kwargs(locals())) - elif loss_or_grads is None or params is None: - raise ValueError("Please provide both `loss_or_grads` and `params` to get updates") updates = sgd(loss_or_grads, params, learning_rate) + return apply_momentum(updates, momentum=momentum) +momentum = _partial_initialization_wrapper(_handle_loss_and_grad_input_wrapper(_momentum)) + + def apply_nesterov_momentum(updates, params=None, momentum=0.9): """Returns a modified update dictionary including Nesterov momentum @@ -389,14 +484,20 @@ def apply_nesterov_momentum(updates, params=None, momentum=0.9): for param in params: value = param.get_value(borrow=True) velocity = pytensor.shared(np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape) - x = momentum * velocity + updates[param] - param - updates[velocity] = x - updates[param] = momentum * x + updates[param] + + updated_velocity = momentum * velocity + updates[param] - param + updated_velocity.name = "velocity__updated" + + updated_param = momentum * updated_velocity + updates[param] + updated_param.name = f"{param.name}__updated" + + updates[velocity] = updated_velocity + updates[param] = updated_param return updates -def nesterov_momentum(loss_or_grads=None, params=None, learning_rate=1e-3, momentum=0.9): +def _nesterov_momentum(loss_or_grads=None, params=None, learning_rate=1e-3, momentum=0.9): """Stochastic Gradient Descent (SGD) updates with Nesterov momentum Generates update expressions of the form: @@ -453,15 +554,17 @@ def nesterov_momentum(loss_or_grads=None, params=None, learning_rate=1e-3, momen >>> isinstance(updates, dict) True """ - if loss_or_grads is None and params is None: - return partial(nesterov_momentum, **_get_call_kwargs(locals())) - elif loss_or_grads is None or params is None: - raise ValueError("Please provide both `loss_or_grads` and `params` to get updates") + updates = sgd(loss_or_grads, params, learning_rate) return apply_nesterov_momentum(updates, momentum=momentum) -def adagrad(loss_or_grads=None, params=None, learning_rate=1.0, epsilon=1e-6): +nesterov_momentum = _partial_initialization_wrapper( + _handle_loss_and_grad_input_wrapper(_nesterov_momentum) +) + + +def _adagrad(loss_or_grads=None, params=None, learning_rate=1.0, epsilon=1e-6): """Adagrad updates Scale learning rates by dividing with the square root of accumulated @@ -521,24 +624,33 @@ def adagrad(loss_or_grads=None, params=None, learning_rate=1.0, epsilon=1e-6): >>> isinstance(updates, dict) True """ - if loss_or_grads is None and params is None: - return partial(adagrad, **_get_call_kwargs(locals())) - elif loss_or_grads is None or params is None: - raise ValueError("Please provide both `loss_or_grads` and `params` to get updates") grads = get_or_compute_grads(loss_or_grads, params) updates = OrderedDict() for param, grad in zip(params, grads): value = param.get_value(borrow=True) - accu = pytensor.shared(np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape) + accu = pytensor.shared( + np.zeros(value.shape, dtype=value.dtype), + shape=param.type.shape, + name="gradient_squares", + ) accu_new = accu + grad**2 + accu_new.name = "gradient_squares__updated" + updates[accu] = accu_new - updates[param] = param - (learning_rate * grad / pt.sqrt(accu_new + epsilon)) + + updated_param = param - (learning_rate * grad / pt.sqrt(accu_new + epsilon)) + updated_param.name = f"{param.name}__updated" + + updates[param] = updated_param return updates -def adagrad_window(loss_or_grads=None, params=None, learning_rate=0.001, epsilon=0.1, n_win=10): +adagrad = _partial_initialization_wrapper(_handle_loss_and_grad_input_wrapper(_adagrad)) + + +def _adagrad_window(loss_or_grads=None, params=None, learning_rate=0.001, epsilon=0.1, n_win=10): """Returns a function that returns parameter updates. Instead of accumulated estimate, uses running window @@ -560,30 +672,42 @@ def adagrad_window(loss_or_grads=None, params=None, learning_rate=0.001, epsilon OrderedDict A dictionary mapping each parameter to its update expression """ - if loss_or_grads is None and params is None: - return partial(adagrad_window, **_get_call_kwargs(locals())) - elif loss_or_grads is None or params is None: - raise ValueError("Please provide both `loss_or_grads` and `params` to get updates") grads = get_or_compute_grads(loss_or_grads, params) updates = OrderedDict() for param, grad in zip(params, grads): - i = pytensor.shared(pm.floatX(0)) + i = pytensor.shared(pm.floatX(0), name="window_idx") i_int = i.astype("int32") value = param.get_value(borrow=True) - accu = pytensor.shared(np.zeros(value.shape + (n_win,), dtype=value.dtype)) + + accu = pytensor.shared( + np.zeros(value.shape + (n_win,), dtype=value.dtype), name="gradient_squares" + ) # Append squared gradient vector to accu_new accu_new = pt.set_subtensor(accu[..., i_int], grad**2) + accu_new.name = "gradient_squares__updated" + i_new = pt.switch((i + 1) < n_win, i + 1, 0) + i_new.name = "window_idx__updated" + updates[accu] = accu_new updates[i] = i_new accu_sum = accu_new.sum(axis=-1) - updates[param] = param - (learning_rate * grad / pt.sqrt(accu_sum + epsilon)) + + param_updated = param - (learning_rate * grad / pt.sqrt(accu_sum + epsilon)) + param_updated.name = f"{param.name}__updated" + updates[param] = param_updated + return updates -def rmsprop(loss_or_grads=None, params=None, learning_rate=1.0, rho=0.9, epsilon=1e-6): +adagrad_window = _partial_initialization_wrapper( + _handle_loss_and_grad_input_wrapper(_adagrad_window) +) + + +def _rmsprop(loss_or_grads=None, params=None, learning_rate=1.0, rho=0.9, epsilon=1e-6): """RMSProp updates Scale learning rates by dividing with the moving average of the root mean @@ -644,10 +768,6 @@ def rmsprop(loss_or_grads=None, params=None, learning_rate=1.0, rho=0.9, epsilon >>> isinstance(updates, dict) True """ - if loss_or_grads is None and params is None: - return partial(rmsprop, **_get_call_kwargs(locals())) - elif loss_or_grads is None or params is None: - raise ValueError("Please provide both `loss_or_grads` and `params` to get updates") grads = get_or_compute_grads(loss_or_grads, params) updates = OrderedDict() @@ -656,15 +776,28 @@ def rmsprop(loss_or_grads=None, params=None, learning_rate=1.0, rho=0.9, epsilon for param, grad in zip(params, grads): value = param.get_value(borrow=True) - accu = pytensor.shared(np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape) + accu = pytensor.shared( + np.zeros(value.shape, dtype=value.dtype), + shape=param.type.shape, + name="gradient_squares", + ) + accu_new = rho * accu + (one - rho) * grad**2 + accu_new.name = "gradient_squares__updated" + updates[accu] = accu_new - updates[param] = param - (learning_rate * grad / pt.sqrt(accu_new + epsilon)) + + param_updated = param - (learning_rate * grad / pt.sqrt(accu_new + epsilon)) + param_updated.name = f"{param.name}__updated" + updates[param] = param_updated return updates -def adadelta(loss_or_grads=None, params=None, learning_rate=1.0, rho=0.95, epsilon=1e-6): +rmsprop = _partial_initialization_wrapper(_handle_loss_and_grad_input_wrapper(_rmsprop)) + + +def _adadelta(loss_or_grads=None, params=None, learning_rate=1.0, rho=0.95, epsilon=1e-6): r"""Adadelta updates Scale learning rates by the ratio of accumulated gradients to accumulated @@ -734,10 +867,6 @@ def adadelta(loss_or_grads=None, params=None, learning_rate=1.0, rho=0.95, epsil >>> isinstance(updates, dict) True """ - if loss_or_grads is None and params is None: - return partial(adadelta, **_get_call_kwargs(locals())) - elif loss_or_grads is None or params is None: - raise ValueError("Please provide both `loss_or_grads` and `params` to get updates") grads = get_or_compute_grads(loss_or_grads, params) updates = OrderedDict() @@ -747,10 +876,15 @@ def adadelta(loss_or_grads=None, params=None, learning_rate=1.0, rho=0.95, epsil for param, grad in zip(params, grads): value = param.get_value(borrow=True) # accu: accumulate gradient magnitudes - accu = pytensor.shared(np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape) + + accu = pytensor.shared( + np.zeros(value.shape, dtype=value.dtype), + shape=param.type.shape, + name="gradient_squares", + ) # delta_accu: accumulate update magnitudes (recursively!) delta_accu = pytensor.shared( - np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape + np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape, name="" ) # update accu (as in rmsprop) @@ -768,7 +902,10 @@ def adadelta(loss_or_grads=None, params=None, learning_rate=1.0, rho=0.95, epsil return updates -def adam( +adadelta = _partial_initialization_wrapper(_handle_loss_and_grad_input_wrapper(_adadelta)) + + +def _adam( loss_or_grads=None, params=None, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8 ): """Adam updates @@ -824,38 +961,51 @@ def adam( >>> isinstance(updates, dict) True """ - if loss_or_grads is None and params is None: - return partial(adam, **_get_call_kwargs(locals())) - elif loss_or_grads is None or params is None: - raise ValueError("Please provide both `loss_or_grads` and `params` to get updates") all_grads = get_or_compute_grads(loss_or_grads, params) - t_prev = pytensor.shared(pm.pytensorf.floatX(0.0)) + t_prev = pytensor.shared(pm.pytensorf.floatX(0.0), name="t") updates = OrderedDict() # Using pytensor constant to prevent upcasting of float32 one = pt.constant(1) t = t_prev + 1 + t.name = "t__updated" a_t = learning_rate * pt.sqrt(one - beta2**t) / (one - beta1**t) + a_t.name = "a" for param, g_t in zip(params, all_grads): + name = param.name value = param.get_value(borrow=True) - m_prev = pytensor.shared(np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape) - v_prev = pytensor.shared(np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape) + m_prev = pytensor.shared( + np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape, name=f"{name}_m" + ) + v_prev = pytensor.shared( + np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape, name=f"{name}_v" + ) m_t = beta1 * m_prev + (one - beta1) * g_t + m_t.name = f"{name}_m__updated" v_t = beta2 * v_prev + (one - beta2) * g_t**2 + v_t.name = f"{name}_v__updated" + step = a_t * m_t / (pt.sqrt(v_t) + epsilon) + step.name = f"{name}_step_size" updates[m_prev] = m_t updates[v_prev] = v_t - updates[param] = param - step + + param_updated = param - step + param_updated.name = f"{name}__updated" + updates[param] = param_updated updates[t_prev] = t return updates -def adamax( +adam = _partial_initialization_wrapper(_handle_loss_and_grad_input_wrapper(_adam)) + + +def _adamax( loss_or_grads=None, params=None, learning_rate=0.002, beta1=0.9, beta2=0.999, epsilon=1e-8 ): """Adamax updates @@ -908,37 +1058,52 @@ def adamax( >>> isinstance(updates, dict) True """ - if loss_or_grads is None and params is None: - return partial(adamax, **_get_call_kwargs(locals())) - elif loss_or_grads is None or params is None: - raise ValueError("Please provide both `loss_or_grads` and `params` to get updates") all_grads = get_or_compute_grads(loss_or_grads, params) - t_prev = pytensor.shared(pm.pytensorf.floatX(0.0)) + t_prev = pytensor.shared(pm.pytensorf.floatX(0.0), name="t") updates = OrderedDict() # Using pytensor constant to prevent upcasting of float32 one = pt.constant(1) t = t_prev + 1 + t.name = "t__updated" + a_t = learning_rate / (one - beta1**t) + a_t.name = "a" for param, g_t in zip(params, all_grads): + name = param.name value = param.get_value(borrow=True) - m_prev = pytensor.shared(np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape) - u_prev = pytensor.shared(np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape) + m_prev = pytensor.shared( + np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape, name=f"{name}_m" + ) + u_prev = pytensor.shared( + np.zeros(value.shape, dtype=value.dtype), shape=param.type.shape, name=f"{name}_u" + ) m_t = beta1 * m_prev + (one - beta1) * g_t + m_t.name = f"{name}_m__updated" + u_t = pt.maximum(beta2 * u_prev, abs(g_t)) + u_t.name = f"{name}_u__updated" + step = a_t * m_t / (u_t + epsilon) + step.name = f"{name}_step_size" updates[m_prev] = m_t updates[u_prev] = u_t - updates[param] = param - step + + param_updated = param - step + param_updated.name = f"{name}__updated" + updates[param] = param_updated updates[t_prev] = t return updates +adamax = _partial_initialization_wrapper(_handle_loss_and_grad_input_wrapper(_adamax)) + + def norm_constraint(tensor_var, max_norm, norm_axes=None, epsilon=1e-7): """Max weight norm constraints and gradient clipping @@ -1082,3 +1247,183 @@ def total_norm_constraint(tensor_vars, max_norm, epsilon=1e-7, return_norm=False return tensor_vars_scaled, norm else: return tensor_vars_scaled + + +def _handle_time_updates(updates): + """ + Create a shared time variable and its update if one does not already exist in the updates dictionary, otherwise + extract it and delete the entry from the updates dictionary. + + Parameters + ---------- + updates: dict + update dictionary created by an optimizer function + + Returns + ------- + t: pt.shared.SharedVariable + shared variable representing the current time step + new_t: pt.shared.SharedVariable + shared variable representing the next time step + + Notes + ----- + This function potentially modifies the update dictionary in-place by deleting the entry for the time variable, if + it exists. This is done to ensure that the time variable is always the last update applied. All schedulers need + to add this update back in to the update dictionary before returning it. + """ + old_values = list(updates.keys()) + old_names = [shared_var.name for shared_var in old_values] + + t_idx = old_names.index("t") if "t" in old_names else None + if t_idx is None: + t = pytensor.shared(pm.pytensorf.floatX(0.0), name="t") + new_t = t + 1 + new_t.name = "t__updated" + else: + # If t is already present, we will reuse it, but we also need to delete it from the update dict temporarily. + # We always want it to be the last update applied. + t = old_values[t_idx] + new_t = updates[t] + del updates[t] + + return t, new_t + + +def exponential_decay_scheduler( + optimizer: partial, + decay_steps: int, + decay_rate: float, + min_lr: float = 1e-6, + staircase: bool = False, +): + """ + Returns a new optimizer that applies exponential decay to the learning rate. + + Parameters + ---------- + optimizer: Callable + Optimizer to apply exponential decay to + decay_steps: int + Number of steps between application of a decay. + decay_rate: float + Decay factor used to compute new learning rate, with new_lr = max(lr * decay_rate, min_lr). Must be between 0 + and 1. + min_lr: float + Minimum learning rate, after which no additional decay is applied. Defaults to 1e-6. + staircase: bool + If True, learning rate is decayed in discrete intervals, otherwise decay is applied continuously. + Defaults to False. + + Returns + ------- + scheduled_optimizer: Callable + Optimizer with exponential decay applied to learning rate. + """ + if not 0 < decay_rate <= 1: + raise ValueError("decay_rate must be between 0 and 1") + + kwargs = optimizer.keywords + _initial_lr = pm.floatX(optimizer.keywords["learning_rate"]) + + initial_lr = pt.constant(_initial_lr, name="initial_learning_rate") + shared_lr = _input_to_shared_variable(_initial_lr, "learning_rate") + kwargs["learning_rate"] = shared_lr + + @wraps(optimizer) + def optimizer_with_exponential_decay(loss_or_grads, params, *args, **kwargs): + updates = optimizer(loss_or_grads, params, *args, **kwargs) + t, new_t = _handle_time_updates(updates) + + if staircase: + new_lr = initial_lr * decay_rate ** (t // decay_steps) + else: + new_lr = initial_lr * decay_rate ** (t / decay_steps) + + new_lr = pt.maximum(new_lr, min_lr) + + new_lr.name = "learning_rate__updated" + updates[shared_lr] = new_lr + updates[t] = new_t + + return updates + + return optimizer_with_exponential_decay + + +def reduce_lr_on_plateau_scheduler(optimizer, factor=0.1, patience=10, min_lr=1e-6, cooldown=0): + kwargs = optimizer.keywords + _initial_lr = pm.floatX(optimizer.keywords["learning_rate"]) + shared_lr = _input_to_shared_variable(_initial_lr, "learning_rate") + kwargs["learning_rate"] = shared_lr + + @wraps(optimizer) + def optimizer_with_reduce_lr_on_plateau(loss_or_grads, params, *args, **kwargs): + updates, loss = optimizer(loss_or_grads, params, *args, discard_loss=False, **kwargs) + + cooldown_counter = pytensor.shared(np.zeros((), dtype="int32"), name="cooldown_counter") + wait = pytensor.shared(np.zeros((), dtype="int32"), name="wait") + best_loss = pytensor.shared(np.inf, name="best_loss") + + loss_is_inf = pt.isinf(loss) + + in_cooldown = pt.gt(cooldown_counter, 0) + improving_loss = pt.lt(loss, best_loss) + patience_exceeded = pt.ge(wait, patience) + + updated_best_loss = pt.switch( + loss_is_inf, best_loss, pt.switch(improving_loss, loss, best_loss) + ) + + updated_best_loss.name = "best_loss__updated" + + updated_cooldown_counter = pt.switch( + loss_is_inf, + cooldown_counter, + pt.switch( + in_cooldown, + cooldown_counter - 1, + pt.switch( + improving_loss, + cooldown_counter, + pt.switch(patience_exceeded, cooldown, cooldown_counter), + ), + ), + ) + updated_cooldown_counter.name = "cooldown_counter__updated" + + updated_lr = pt.switch( + loss_is_inf, + shared_lr, + pt.switch( + in_cooldown, + shared_lr, + pt.switch( + improving_loss, + shared_lr, + pt.switch(patience_exceeded, pt.maximum(min_lr, shared_lr * factor), shared_lr), + ), + ), + ) + + updated_lr.name = "learning_rate__updated" + + updated_wait = pt.switch( + loss_is_inf, + wait, + pt.switch( + in_cooldown, + 0, + pt.switch(improving_loss, 0, pt.switch(patience_exceeded, 0, wait + 1)), + ), + ) + updated_wait.name = "wait__updated" + + updates[best_loss] = updated_best_loss + updates[cooldown_counter] = updated_cooldown_counter + updates[wait] = updated_wait + updates[shared_lr] = updated_lr + + return updates + + return optimizer_with_reduce_lr_on_plateau diff --git a/tests/variational/test_callbacks.py b/tests/variational/test_callbacks.py index a40acb1e922..9649dc2287d 100644 --- a/tests/variational/test_callbacks.py +++ b/tests/variational/test_callbacks.py @@ -52,3 +52,64 @@ def test_tracker_callback(): tracker = pm.callbacks.Tracker(bad=lambda t: t) # bad signature with pytest.raises(TypeError): tracker(None, None, 1) + + +OPTIMIZERS = [ + pm.sgd(learning_rate=0.1), + pm.momentum(learning_rate=0.1), + pm.nesterov_momentum(learning_rate=0.1), + pm.adagrad(learning_rate=0.1), + pm.rmsprop(learning_rate=0.1), + pm.adadelta(learning_rate=0.1), + pm.adam(learning_rate=0.1), + pm.adamax(learning_rate=0.1), +] + + +@pytest.mark.parametrize("optimizer", OPTIMIZERS) +def test_reduce_lr_on_plateau(optimizer): + cb = pm.variational.callbacks.ReduceLROnPlateau( + optimizer=optimizer, + patience=1, + min_lr=0.001, + ) + cb(None, [float("inf")], 1) + np.testing.assert_almost_equal(optimizer.keywords["learning_rate"], 0.1) + assert cb.best == float("inf") + cb(None, [float("inf"), 2], 2) + np.testing.assert_almost_equal(optimizer.keywords["learning_rate"], 0.1) + assert cb.best == 2 + cb(None, [float("inf"), 2, 1], 3) + np.testing.assert_almost_equal(optimizer.keywords["learning_rate"], 0.1) + assert cb.best == 1 + cb(None, [float("inf"), 2, 1, 99], 4) + np.testing.assert_almost_equal(optimizer.keywords["learning_rate"], 0.01) + assert cb.best == 1 + cb(None, [float("inf"), 2, 1, 99, 0.9], 5) + np.testing.assert_almost_equal(optimizer.keywords["learning_rate"], 0.01) + assert cb.best == 0.9 + cb(None, [float("inf"), 2, 1, 99, 0.9, 99], 6) + np.testing.assert_almost_equal(optimizer.keywords["learning_rate"], 0.001) + assert cb.best == 0.9 + cb(None, [float("inf"), 2, 1, 99, 0.9, 99, 99], 7) + np.testing.assert_almost_equal(optimizer.keywords["learning_rate"], 0.001) + assert cb.best == 0.9 + + +@pytest.mark.parametrize("optimizer", OPTIMIZERS) +def test_exponential_decay(optimizer): + cb = pm.variational.callbacks.ExponentialDecay( + optimizer=optimizer, + decay_steps=1, + decay_rate=0.1, + min_lr=0.001, + ) + np.testing.assert_almost_equal(optimizer.keywords["learning_rate"], 0.1) + cb(None, [float("inf")], 1) + np.testing.assert_almost_equal(optimizer.keywords["learning_rate"], 0.01) + cb(None, [float("inf"), 2, 2], 2) + np.testing.assert_almost_equal(optimizer.keywords["learning_rate"], 0.001) + cb(None, [float("inf"), 2, 2, 2], 3) + np.testing.assert_almost_equal(optimizer.keywords["learning_rate"], 0.001) + cb(None, [float("inf"), 2, 2, 2, 2], 4) + np.testing.assert_almost_equal(optimizer.keywords["learning_rate"], 0.001) diff --git a/tests/variational/test_updates.py b/tests/variational/test_updates.py index 0ea4183a1b3..f670a6e99e3 100644 --- a/tests/variational/test_updates.py +++ b/tests/variational/test_updates.py @@ -16,18 +16,46 @@ import pytensor import pytest +import pymc as pm + from pymc.variational.updates import ( adadelta, adagrad, adagrad_window, adam, adamax, + exponential_decay_scheduler, momentum, nesterov_momentum, + reduce_lr_on_plateau_scheduler, rmsprop, sgd, ) +OPTIMIZERS = [ + sgd, + momentum, + nesterov_momentum, + adagrad, + rmsprop, + adadelta, + adam, + adamax, + adagrad_window, +] + +OPTIMIZER_NAMES = [ + "sgd", + "momentum", + "nesterov_momentum", + "adagrad", + "rmsprop", + "adadelta", + "adam", + "adamax", + "adagrad_window", +] + _a = pytensor.shared(1.0) _b = _a * 2 @@ -37,21 +65,7 @@ _n2 = _b + _n + _m2.sum() -@pytest.mark.parametrize( - "opt", - [sgd, momentum, nesterov_momentum, adagrad, rmsprop, adadelta, adam, adamax, adagrad_window], - ids=[ - "sgd", - "momentum", - "nesterov_momentum", - "adagrad", - "rmsprop", - "adadelta", - "adam", - "adamax", - "adagrad_window", - ], -) +@pytest.mark.parametrize("opt", OPTIMIZERS, ids=OPTIMIZER_NAMES) @pytest.mark.parametrize( "getter", [ @@ -93,3 +107,132 @@ def test_updates_fast(opt, loss_and_params, kwargs, getter): # Usual call to optimizer, old behaviour updates = opt(**args) assert isinstance(updates, dict) + + +@pytest.fixture() +def regression_model(): + rng = np.random.default_rng(1) + + X = rng.normal(size=(100,)) + intercept, coef = rng.normal(100, size=(2,)) + noise = rng.normal(size=(100,)) + y = intercept + coef * X + noise + + with pm.Model() as model: + a = pm.Normal( + "a", + ) + b = pm.Normal("b") + + mu = a + b * X + pm.Normal("y", mu=mu, sigma=1, observed=y) + + return model + + +SCHEDULER_PARAMS = [ + (1, 0.5, 1, 1e-8, False), + (1, 0.5, 1, 1e-8, True), + (1, 0.5, 2, 1e-8, False), + (1, 0.5, 2, 1e-8, True), +] +SCHEDULER_IDS = [ + f"initial_lr={x[0]}, decay_rate={x[1]}, decay_steps={x[2]}, min_lr={x[3]}, staircase={x[4]}" + for x in SCHEDULER_PARAMS +] + + +@pytest.mark.parametrize("optimizer", OPTIMIZERS, ids=OPTIMIZER_NAMES) +@pytest.mark.parametrize("scheduler_args", SCHEDULER_PARAMS, ids=SCHEDULER_IDS) +def test_exponential_decay_scheduler(regression_model, optimizer, scheduler_args): + initial_lr, decay_rate, decay_steps, min_lr, staircase = scheduler_args + opt = optimizer(learning_rate=initial_lr) + scheduled_optimizer = exponential_decay_scheduler( + opt, decay_steps, decay_rate, min_lr, staircase + ) + + with regression_model: + advi = pm.ADVI() + + updates = advi.objective.updates(obj_optimizer=scheduled_optimizer) + inputs = list(updates.keys()) + outputs = list(updates.values()) + + old_names = [x.name for x in inputs] + new_names = [x.name for x in outputs] + + assert all([expected_name in old_names for expected_name in ["learning_rate", "t"]]) + assert all( + [expected_name in new_names for expected_name in ["learning_rate__updated", "t__updated"]] + ) + + lr_idx = old_names.index("learning_rate") + t_idx = old_names.index("t") + + step_func = pytensor.function( + [], [outputs[lr_idx], outputs[t_idx]], updates=updates, mode="FAST_COMPILE" + ) + + steps = np.vstack([step_func() for _ in range(10)]) + + def floor_div(x, y): + return x // y + + div_func = floor_div if staircase else np.divide + + expected_decay = np.maximum( + initial_lr * decay_rate ** (div_func(np.arange(10), decay_steps)), min_lr + ) + + np.testing.assert_allclose(steps[:, 0], expected_decay) + np.testing.assert_allclose(steps[:, 1], np.arange(1, 11)) + + +def test_reduce_lr_on_plateau_scheduler(regression_model): + opt = pm.adam(learning_rate=1) + factor = 0.1 + patience = 10 + min_lr = 1e-6 + cooldown = 10 + scheduled_optimizer = reduce_lr_on_plateau_scheduler( + opt, factor=factor, patience=patience, min_lr=min_lr, cooldown=cooldown + ) + with regression_model: + advi = pm.ADVI() + + updates = advi.objective.updates(obj_optimizer=scheduled_optimizer) + inputs = list(updates.keys()) + outputs = list(updates.values()) + + old_names = [x.name for x in inputs] + new_names = [x.name for x in outputs] + + expected_names = ["best_loss", "cooldown_counter", "wait", "learning_rate"] + + assert all([expected_name in old_names for expected_name in expected_names]) + assert all([f"{expected_name}__updated" in new_names for expected_name in expected_names]) + + outputs_of_interest = [ + outputs[new_names.index(f"{expected_name}__updated")] + for expected_name in expected_names + ] + + tracker = pm.callbacks.Tracker( + best_loss=outputs_of_interest[0].eval, + cooldown_counter=outputs_of_interest[1].eval, + wait=outputs_of_interest[2].eval, + learning_rate=outputs_of_interest[3].eval, + ) + approx = advi.fit(1000, callbacks=[tracker], obj_optimizer=scheduled_optimizer) + + # Best loss only decreases + assert np.all(np.diff(np.stack(tracker.hist["best_loss"])) <= 0) + + # Learning_rate only decreases + assert np.all(np.diff(np.stack(tracker.hist["learning_rate"])) <= 0) + + # Wait is never greater than patience + assert np.all(np.stack(tracker.hist["wait"]) <= patience) + + # Cooldown_counter is never greater than cooldown + assert np.all(np.stack(tracker.hist["cooldown_counter"]) <= cooldown)