diff --git a/doc/typhon.retrieval.bmci.rst b/doc/typhon.retrieval.bmci.rst new file mode 100644 index 00000000..c26d48f6 --- /dev/null +++ b/doc/typhon.retrieval.bmci.rst @@ -0,0 +1,11 @@ +Bayesian Monte Carlo Integration (BMCI) +======================================= + +.. automodule:: typhon.retrieval.bmci + +.. currentmodule:: typhon.retrieval.bmci + +.. autosummary:: + :toctree: generated + + BMCI diff --git a/doc/typhon.retrieval.mcmc.rst b/doc/typhon.retrieval.mcmc.rst new file mode 100644 index 00000000..7e2e700d --- /dev/null +++ b/doc/typhon.retrieval.mcmc.rst @@ -0,0 +1,11 @@ +Markov-Chain Monte Carlo (MCMC) +=============================== + +.. automodule:: typhon.retrieval.mcmc + +.. currentmodule:: typhon.retrieval.mcmc + +.. autosummary:: + :toctree: generated + + MCMC diff --git a/doc/typhon.retrieval.oem.rst b/doc/typhon.retrieval.oem.rst new file mode 100644 index 00000000..17795de0 --- /dev/null +++ b/doc/typhon.retrieval.oem.rst @@ -0,0 +1,15 @@ +Optimal estimation method (OEM) +=============================== + +.. automodule:: typhon.retrieval.oem + +.. currentmodule:: typhon.retrieval.oem + +.. autosummary:: + :toctree: generated + + error_covariance_matrix + averaging_kernel_matrix + retrieval_gain_matrix + smoothing_error + retrieval_noise diff --git a/doc/typhon.retrieval.qrnn.rst b/doc/typhon.retrieval.qrnn.rst new file mode 100644 index 00000000..917f4305 --- /dev/null +++ b/doc/typhon.retrieval.qrnn.rst @@ -0,0 +1,73 @@ +Quantile regression neural networks (QRNNs) +=========================================== + +An implementation of quantile regression neural networks (QRNNs) developed +specifically for remote sensing applications providing a flexible +interface for simple training and evaluation of QRNNs. + +Overview +-------- + +The QRNN implementation consists of two-layers: + + - A high-level interface provided by the :py:class:`~typhon.retrieval.qrnn.QRNN` + class + - Backend-specific implementations of different neural network architectures + to be used as models by the high-level implementation + + +The QRNN class +-------------- + +The :py:class:`~typhon.retrieval.qrnn.QRNN` class provides the high-level +interface for QRNNs. This is all that is required to train a plain, +fully-connected QRNN. The class itself implments generic functionality related +to the evaluation of QRNNs and the post processing of results such as computing +the PSD or the posterior mean. For the rest it acts as a wrapper around its +model attribute, which encapsules all network- and DL-framework-specific code. + +Backends +-------- + +Currently both `keras `_ and `pytorch `_ +are supported as backends for neural networks. The QRNN implementation will +automatically use the one that is available on your system. If both are available +you can choose a specific backend using the :py:meth:`~typhon.retrieval.qrnn.set_backend` function. + +Neural network models +--------------------- + +The :py:class:`typhon.retrieva.qrnn.QRNN` has designed to work with any generic +regression neural network model. This aim of this was to make the implementation +sufficiently flexible to allow special network architectures or customization of +the training process. + +This gives the user the flexibility to design custom NN models in pytorch +or Keras and use them with the ``QRNN`` class. Some predefined architectures +are defined in the :py:mod:`typhon.retrieval.qrnn.models` submodule. + +API documentation +----------------- + +.. automodule:: typhon.retrieval.qrnn.qrnn +.. currentmodule:: typhon.retrieval.qrnn.qrnn +.. autosummary:: + :toctree: generated + + QRNN + +.. automodule:: typhon.retrieval.qrnn.models.pytorch +.. currentmodule:: typhon.retrieval.qrnn.models.pytorch +.. autosummary:: + :toctree: generated + + FullyConnected + UNet + +.. automodule:: typhon.retrieval.qrnn.models.keras +.. currentmodule:: typhon.retrieval.qrnn.models.keras +.. autosummary:: + :toctree: generated + + FullyConnected + diff --git a/doc/typhon.retrieval.rst b/doc/typhon.retrieval.rst index 46944282..3c487560 100644 --- a/doc/typhon.retrieval.rst +++ b/doc/typhon.retrieval.rst @@ -1,87 +1,29 @@ -retrieval -========= +The retrieval submodule contains implementations of different retrieval methods +as well as functions for the assessment of their performance. -.. automodule:: typhon.retrieval -.. currentmodule:: typhon.retrieval +Retrieval methods +================= -.. autosummary:: - :toctree: generated +.. toctree:: + :maxdepth: 1 -retrieval.bmci -============== + typhon.retrieval.bmci + typhon.retrieval.mcmc + typhon.retrieval.qrnn + typhon.retrieval.oem -.. automodule:: typhon.retrieval.bmci - -.. currentmodule:: typhon.retrieval.bmci - -.. autosummary:: - :toctree: generated - - BMCI - -retrieval.mcmc -============== - -.. automodule:: typhon.retrieval.mcmc - -.. currentmodule:: typhon.retrieval.mcmc - -.. autosummary:: - :toctree: generated - - MCMC - -retrieval.oem -============= - -.. automodule:: typhon.retrieval.oem - -.. currentmodule:: typhon.retrieval.oem - -.. autosummary:: - :toctree: generated - - error_covariance_matrix - averaging_kernel_matrix - retrieval_gain_matrix - smoothing_error - retrieval_noise - -retrieval.qrnn -============== - -.. automodule:: typhon.retrieval.qrnn - -.. currentmodule:: typhon.retrieval.qrnn - -.. autosummary:: - :toctree: generated - - QRNN - -retrieval.scores -================ - -.. automodule:: typhon.retrieval.scores +Retrieval products +================== -.. currentmodule:: typhon.retrieval.scores +.. toctree:: + :maxdepth: 1 -.. autosummary:: - :toctree: generated + typhon.retrieval.spareice - mape - bias - quantile_score - mean_quantile_score -retrieval.spareice +Utility functions ================== +.. toctree:: + :maxdepth: 1 -.. automodule:: typhon.retrieval.spareice - -.. currentmodule:: typhon.retrieval.spareice - -.. autosummary:: - :toctree: generated - - SPAREICE + typhon.retrieval.scores diff --git a/doc/typhon.retrieval.scores.rst b/doc/typhon.retrieval.scores.rst new file mode 100644 index 00000000..0cd8290d --- /dev/null +++ b/doc/typhon.retrieval.scores.rst @@ -0,0 +1,14 @@ +Retrieval scores +================ + +.. automodule:: typhon.retrieval.scores + +.. currentmodule:: typhon.retrieval.scores + +.. autosummary:: + :toctree: generated + + mape + bias + quantile_score + mean_quantile_score diff --git a/doc/typhon.retrieval.spareice.rst b/doc/typhon.retrieval.spareice.rst new file mode 100644 index 00000000..79420cb8 --- /dev/null +++ b/doc/typhon.retrieval.spareice.rst @@ -0,0 +1,11 @@ +SpareIce +======== + +.. automodule:: typhon.retrieval.spareice + +.. currentmodule:: typhon.retrieval.spareice + +.. autosummary:: + :toctree: generated + + SPAREICE diff --git a/typhon/retrieval/qrnn/__init__.py b/typhon/retrieval/qrnn/__init__.py index bb15d24e..f69e66f9 100644 --- a/typhon/retrieval/qrnn/__init__.py +++ b/typhon/retrieval/qrnn/__init__.py @@ -1,7 +1,7 @@ r""" -An implementation of quantile regression neural networks based on the -`Keras ` deep learning package. -The implementation has been developed specifically for remote sensing -applications and provides a high level interface allowing for simple -training and evaluation of the QRNNs.""" -from typhon.retrieval.qrnn.qrnn import QRNN +Quantile regression neural networks (QRNNs) + +The module provides a flexible implementation of QRNNs for remote sensing +retrievals. +""" +from typhon.retrieval.qrnn.qrnn import QRNN, set_backend, get_backend diff --git a/typhon/retrieval/qrnn/backends/__init__.py b/typhon/retrieval/qrnn/backends/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/typhon/retrieval/qrnn/backends/keras.py b/typhon/retrieval/qrnn/backends/keras.py new file mode 100644 index 00000000..4a7830fc --- /dev/null +++ b/typhon/retrieval/qrnn/backends/keras.py @@ -0,0 +1,802 @@ +""" +Keras backend for QRNNs +======================= + +This module implements the Keras backend for QRNNs. +""" +import copy +import logging +import os +import pickle + +import numpy as np +from scipy.interpolate import CubicSpline + +# Keras Imports +try: + import keras + from keras.models import Sequential, clone_model, Model + from keras.layers import Dense, Activation, Dropout + from keras.optimizers import SGD +except ImportError: + raise ImportError( + "Could not import the required Keras modules. The QRNN " + "implementation was developed for use with Keras version 2.0.9.") + +################################################################################ +# Loss Functions +################################################################################ + +import keras.backend as K + +logger = logging.getLogger(__name__) + +def skewed_absolute_error(y_true, y_pred, tau): + """ + The quantile loss function for a given quantile tau: + + L(y_true, y_pred) = (tau - I(y_pred < y_true)) * (y_pred - y_true) + + Where I is the indicator function. + """ + dy = y_pred - y_true + return K.mean((1.0 - tau) * K.relu(dy) + tau * K.relu(-dy), axis=-1) + + +def quantile_loss(y_true, y_pred, taus): + """ + The quantiles loss for a list of quantiles. Sums up the error contribution + from the each of the quantile loss functions. + """ + e = skewed_absolute_error( + K.flatten(y_true), K.flatten(y_pred[:, 0]), taus[0]) + for i, tau in enumerate(taus[1:]): + e += skewed_absolute_error(K.flatten(y_true), + K.flatten(y_pred[:, i + 1]), + tau) + return e + + +class QuantileLoss: + """ + Wrapper class for the quantile error loss function. A class is used here + to allow the implementation of a custom `__repr` function, so that the + loss function object can be easily loaded using `keras.model.load`. + + Attributes: + + quantiles: List of quantiles that should be estimated with + this loss function. + + """ + + def __init__(self, quantiles): + self.__name__ = "QuantileLoss" + self.quantiles = quantiles + + def __call__(self, y_true, y_pred): + return quantile_loss(y_true, y_pred, self.quantiles) + + def __repr__(self): + return "QuantileLoss(" + repr(self.quantiles) + ")" + +################################################################################ +# Keras Interface Classes +################################################################################ + +class TrainingGenerator: + """ + This Keras sample generator takes the noise-free training data + and adds independent Gaussian noise to each of the components + of the input. + + Attributes: + + x_train: The training input, i.e. the brightness temperatures + measured by the satellite. + y_train: The training output, i.e. the value of the retrieval + quantity. + x_mean: A vector containing the mean of each input component. + x_sigma: A vector containing the standard deviation of each + component. + batch_size: The size of a training batch. + """ + + def __init__(self, x_train, x_mean, x_sigma, y_train, sigma_noise, batch_size): + self.bs = batch_size + + self.x_train = x_train + self.x_mean = x_mean + self.x_sigma = x_sigma + self.y_train = y_train + self.sigma_noise = sigma_noise + + self.indices = np.random.permutation(x_train.shape[0]) + self.i = 0 + + def __iter__(self): + logger.info("iter...") + return self + + def __next__(self): + inds = self.indices[np.arange(self.i * self.bs, + (self.i + 1) * self.bs) + % self.indices.size] + x_batch = np.copy(self.x_train[inds, :]) + if not self.sigma_noise is None: + x_batch += np.random.randn(*x_batch.shape) * self.sigma_noise + x_batch = (x_batch - self.x_mean) / self.x_sigma + y_batch = self.y_train[inds] + + self.i = self.i + 1 + + # Shuffle training set after each epoch. + if self.i % (self.x_train.shape[0] // self.bs) == 0: + self.indices = np.random.permutation(self.x_train.shape[0]) + + return (x_batch, y_batch) + +class AdversarialTrainingGenerator: + """ + This Keras sample generator takes the noise-free training data + and adds independent Gaussian noise to each of the components + of the input. + + Attributes: + + x_train: The training input, i.e. the brightness temperatures + measured by the satellite. + y_train: The training output, i.e. the value of the retrieval + quantity. + x_mean: A vector containing the mean of each input component. + x_sigma: A vector containing the standard deviation of each + component. + batch_size: The size of a training batch. + """ + + def __init__(self, + x_train, + x_mean, + x_sigma, + y_train, + sigma_noise, + batch_size, + input_gradients, + eps): + self.bs = batch_size + + self.x_train = x_train + self.x_mean = x_mean + self.x_sigma = x_sigma + self.y_train = y_train + self.sigma_noise = sigma_noise + + self.indices = np.random.permutation(x_train.shape[0]) + self.i = 0 + + # compile gradient function + bs2 = self.bs // 2 + + self.input_gradients = input_gradients + self.eps = eps + + def __iter__(self): + logger.info("iter...") + return self + + def __next__(self): + + if self.i == 0: + inds = np.random.randint(0, self.x_train.shape[0], self.bs) + + x_batch = np.copy(self.x_train[inds, :]) + if (self.sigma_noise): + x_batch += np.random.randn(*x_batch.shape) * self.sigma_noise + + x_batch = (x_batch - self.x_mean) / self.x_sigma + y_batch = self.y_train[inds] + + else: + + bs2 = self.bs // 2 + inds = np.random.randint(0, self.x_train.shape[0], bs2) + + x_batch = np.zeros((self.bs, self.x_train.shape[1])) + y_batch = np.zeros((self.bs, 1)) + + x_batch[:bs2, :] = np.copy(self.x_train[inds, :]) + if (self.sigma_noise): + x_batch[:bs2, :] += np.random.randn(bs2, self.x_train.shape[1]) \ + * self.sigma_noise + x_batch[:bs2, :] = (x_batch[:bs2, :] - self.x_mean) / self.x_sigma + y_batch[:bs2, :] = self.y_train[inds].reshape(-1, 1) + x_batch[bs2:, :] = x_batch[:bs2, :] + y_batch[bs2:, :] = y_batch[:bs2, :] + + if (self.i > 10): + grads = self.input_gradients( + [x_batch[:bs2, :], y_batch[:bs2, :], [1.0]])[0] + x_batch[bs2:, :] += self.eps * np.sign(grads) + + self.i = self.i + 1 + return (x_batch, y_batch) + + +# TODO: Make y-noise argument optional +class ValidationGenerator: + """ + This Keras sample generator is similar to the training generator + only that it returns the whole validation set and doesn't perform + any randomization. + + Attributes: + + x_val: The validation input, i.e. the brightness temperatures + measured by the satellite. + y_val: The validation output, i.e. the value of the retrieval + quantity. + x_mean: A vector containing the mean of each input component. + x_sigma: A vector containing the standard deviation of each + component. + """ + + def __init__(self, x_val, x_mean, x_sigma, y_val, sigma_noise): + self.x_val = x_val + self.x_mean = x_mean + self.x_sigma = x_sigma + + self.y_val = y_val + + self.sigma_noise = sigma_noise + + def __iter__(self): + return self + + def __next__(self): + x_val = np.copy(self.x_val) + if not self.sigma_noise is None: + x_val += np.random.randn(*self.x_val.shape) * self.sigma_noise + x_val = (x_val - self.x_mean) / self.x_sigma + return (x_val, self.y_val) + + +class LRDecay(keras.callbacks.Callback): + """ + The LRDecay class implements the Keras callback interface and reduces + the learning rate according to validation loss reduction. + + Attributes: + + lr_decay: The factor c > 1.0 by which the learning rate is + reduced. + lr_minimum: The training is stopped when this learning rate + is reached. + convergence_steps: The number of epochs without validation loss + reduction required to reduce the learning rate. + + """ + + def __init__(self, model, lr_decay, lr_minimum, convergence_steps): + self.model = model + self.lr_decay = lr_decay + self.lr_minimum = lr_minimum + self.convergence_steps = convergence_steps + self.steps = 0 + + def on_train_begin(self, logs={}): + self.losses = [] + self.steps = 0 + self.min_loss = 1e30 + + def on_epoch_end(self, epoch, logs={}): + self.losses += [logs.get('val_loss')] + if not self.losses[-1] < self.min_loss: + self.steps = self.steps + 1 + else: + self.steps = 0 + if self.steps > self.convergence_steps: + lr = keras.backend.get_value(self.model.optimizer.lr) + keras.backend.set_value( + self.model.optimizer.lr, lr / self.lr_decay) + self.steps = 0 + logger.info("\n Reduced learning rate to " + str(lr)) + + if lr < self.lr_minimum: + self.model.stop_training = True + + self.min_loss = min(self.min_loss, self.losses[-1]) + +################################################################################ +# QRNN +################################################################################ + +class KerasQRNN: + r""" + Quantile Regression Neural Network (QRNN) + + This class implements quantile regression neural networks and can be used + to estimate quantiles of the posterior distribution of remote sensing + retrievals. + + Internally, the QRNN uses a feed-forward neural network that is trained + to minimize the quantile loss function + + .. math:: + \mathcal{L}_\tau(y_\tau, y_{true}) = + \begin{cases} (1 - \tau)|y_\tau - y_{true}| & \text{ if } y_\tau < y_\text{true} \\ + \tau |y_\tau - y_\text{true}| & \text{ otherwise, }\end{cases} + + where :math:`x_\text{true}` is the expected value of the retrieval quantity + and and :math:`x_\tau` is the predicted quantile. The neural network + has one output neuron for each quantile to estimate. + + For the training, this implementation provides custom data generators that + can be used to add Gaussian noise to the training data as well as adversarial + training using the fast gradient sign method. + + This implementation also provides functionality to use an ensemble of networks + instead of just a single network to predict the quantiles. + + .. note:: For the QRNN I am using :math:`x` to denote the input vector and + :math:`y` to denote the output. While this is opposed to typical + inverse problem notation, it is inline with machine learning + notation and felt more natural for the implementation. If this + annoys you, I am sorry. But the other way round it would have + annoyed other people and in particular me. + + Attributes: + + input_dim (int): + The input dimension of the neural network, i.e. the dimension of the + measurement vector. + + quantiles (numpy.array): + The 1D-array containing the quantiles :math:`\tau \in [0, 1]` that the + network learns to predict. + + depth (int): + The number layers in the network excluding the input layer. + + width (int): + The width of the hidden layers in the network. + + activation (str): + The name of the activation functions to use in the hidden layers + of the network. + + models (list of keras.models.Sequential): + The ensemble of Keras neural networks used for the quantile regression + neural network. + """ + + def __init__(self, + input_dim, + quantiles, + model=(3, 128, "relu"), + ensemble_size=1, + **kwargs): + """ + Create a QRNN model. + + Arguments: + + input_dim(int): The dimension of the measurement space, i.e. the number + of elements in a single measurement vector y + + quantiles(np.array): 1D-array containing the quantiles to estimate of + the posterior distribution. Given as fractions + within the range [0, 1]. + + depth(int): The number of hidden layers in the neural network to + use for the regression. Default is 3, i.e. three hidden + plus input and output layer. + + width(int): The number of neurons in each hidden layer. + + activation(str): The name of the activation functions to use. Default + is "relu", for rectified linear unit. See + `this `_ link for + available functions. + + **kwargs: Additional keyword arguments are passed to the constructor + call `keras.layers.Dense` of the hidden layers, which can + for example be used to add regularization. For more info consult + `Keras documentation. `_ + """ + self.input_dim = input_dim + self.quantiles = np.array(quantiles) + + if type(model) is tuple: + depth = model[0] + width = model[1] + activation = model[2] + + model = Sequential() + if depth == 0: + model.add(Dense(input_dim=input_dim, + units=len(quantiles), + activation=None)) + else: + model.add(Dense(input_dim=input_dim, + units=width, + activation=activation)) + for i in range(depth - 2): + model.add(Dense(units=width, + activation=activation, + **kwargs)) + model.add(Dense(units=len(quantiles), activation=None)) + elif isinstance(model, Model): + self.models = [clone_model(model) for i in range(ensemble_size)] + elif isinstance(model, list) and isinstance(model[0], Model): + self.models = model + else: + raise Exception("The provided model is neither a suitable " + "architecture tuple nor a keras model.") + + def __fit_params__(self, kwargs): + at = kwargs.pop("adversarial_training", False) + dat = kwargs.pop("delta_at", 0.01) + batch_size = kwargs.pop("batch_size", 512) + convergence_epochs = kwargs.pop("convergence_epochs", 10) + initial_learning_rate = kwargs.pop('initial_learning_rate', 0.01) + learning_rate_decay = kwargs.pop('learning_rate_decay', 2.0) + learning_rate_minimum = kwargs.pop('learning_rate_minimum', 1e-6) + maximum_epochs = kwargs.pop("maximum_epochs", 200) + training_split = kwargs.pop("training_split", 0.9) + return at, dat, batch_size, convergence_epochs, initial_learning_rate, \ + learning_rate_decay, learning_rate_minimum, maximum_epochs, \ + training_split, kwargs + + def cross_validation(self, + x_train, + y_train, + sigma_noise = None, + n_folds=5, + s=None, + **kwargs): + r""" + Perform n-fold cross validation. + + This function trains the network n times on different subsets of the + provided training data, always keeping a fraction of 1/n samples apart + for testing. Performance for each of the networks is evaluated and mean + and standard deviation for all folds are returned. This is to reduce + the influence of random fluctuations of the network performance during + hyperparameter tuning. + + Arguments: + + x_train(numpy.array): Array of shape :code:`(m, n)` containing the + m n-dimensional training inputs. + + y_train(numpy.array): Array of shape :code:`(m, 1)` containing the + m training outputs. + + sigma_noise(None, float, np.array): If not `None` this value is used + to multiply the Gaussian noise + that is added to each training + batch. If None no noise is + added. + + n_folds(int): Number of folds to perform for the cross correlation. + + s(callable, None): Performance metric for the fold. If not None, + this should be a function object taking as + arguments :code:`(y_test, y_pred)`, i.e. the + expected output for the given fold :code:`y_test` + and the predicted output :code:`y_pred`. The + returned value is taken as performance metric. + + **kwargs: Additional keyword arguments are passed on to the :code:`fit` + method that is called for each fold. + """ + + n = x_train.shape[0] + n_test = n // n_folds + inds = np.random.permutation(np.arange(0, n)) + + results = [] + + # Nomenclature is a bit difficult here: + # Each cross validation fold has its own training, + # vaildation and test set. The size of the test set + # is number of provided training samples divided by the + # number of fold. The rest is used a traning and internal + # validation set according to the chose training_split + # ratio. + + + for i in range(n_folds): + for m in self.models: + m.reset_states() + + # Indices to use for training including training data and internal + # validation set to monitor convergence. + inds_train = np.append(np.arange(0, i * n_test), + np.arange(min((i + 1) * n_test, n), n)) + inds_train = inds[inds_train] + # Indices used to evaluate performance of the model. + inds_test = np.arange(i * n_test, (i + 1) * n_test) + inds_test = inds[inds_test] + + x_test_fold = x_train[inds_test, :] + y_test_fold = y_train[inds_test] + + # Splitting training and validation set. + x_train_fold = x_train[inds_train, :] + y_train_fold = y_train[inds_train] + + self.fit(x_train_fold, y_train_fold, + sigma_noise, **kwargs) + + # Evaluation on this folds test set. + if s: + y_pred = self.predict(x_test_fold) + results += [s(y_pred, y_test_fold)] + else: + loss = self.models[0].evaluate( + (x_test_fold - self.x_mean) / self.x_sigma, + y_test_fold) + logger.info(loss) + results += [loss] + logger.info(results) + results = np.array(results) + logger.info(results) + return (np.mean(results, axis=0), np.std(results, axis=0)) + + def fit(self, + x_train, + y_train, + sigma_noise=None, + x_val=None, + y_val=None, + **kwargs): + r""" + Train the QRNN on given training data. + + The training uses an internal validation set to monitor training + progress. This can be either split at random from the training + data (see `training_fraction` argument) or provided explicitly + using the `x_val` and `y_val` parameters + + Training of the QRNN is performed using stochastic gradient descent + (SGD). The learning rate is adaptively reduced during training when + the loss on the internal validation set has not been reduced for a + certain number of epochs. + + Two data augmentation techniques can be used to improve the + calibration of the QRNNs predictions. The first one adds Gaussian + noise to training batches before they are passed to the network. + The noise statistics are defined using the `sigma_noise` argument. + The second one is adversarial training. If adversarial training is + used, half of each training batch consists of adversarial samples + generated using the fast gradient sign method. The strength of the + perturbation is controlled using the `delta_at` parameter. + + During one epoch, each training sample is passed once to the network. + Their order is randomzied between epochs. + + Arguments: + + x_train(np.array): Array of shape `(n, m)` containing n training + samples of dimension m. + + y_train(np.array): Array of shape `(n, )` containing the training + output corresponding to the training data in + `x_train`. + + sigma_noise(None, float, np.array): If not `None` this value is used + to multiply the Gaussian noise + that is added to each training + batch. If None no noise is + added. + x_val(np.array): Array of shape :code:`(n', m)` containing n' validation + inputs that will be used to monitor training loss. Must + be provided in unison with :code:`y_val` or otherwise + will be ignored. + + y_val(np.array): Array of shape :code:`(n')` containing n' validation + outputs corresponding to the inputs in :code:`x_val`. + Must be provided in unison with :code:`x_val` or + otherwise will be ignored. + + adversarial_training(Bool): Whether or not to use adversarial training. + `False` by default. + + delta_at(flaot): Perturbation factor for the fast gradient sign method + determining the strength of the adversarial training + perturbation. `0.01` by default. + + batch_size(float): The batch size to use during training. Defaults to `512`. + + convergence_epochs(int): The number of epochs without decrease in + validation loss before the learning rate + is reduced. Defaults to `10`. + + initial_learning_rate(float): The inital value for the learning + rate. + + learning_rate_decay(float): The factor by which to reduce the + learning rate after no improvement + on the internal validation set was + observed for `convergence_epochs` + epochs. Defaults to `2.0`. + + learning_rate_minimum(float): The minimum learning rate at which + the training is terminated. Defaults + to `1e-6`. + + maximum_epochs(int): The maximum number of epochs to perform if + training does not terminate before. + + training_split(float): The ratio `0 < ts < 1.0` of the samples in + to be used as internal validation set. Defaults + to 0.9. + + """ + if not (x_train.shape[1] == self.input_dim): + raise Exception("Training input must have the same extent along" / + "dimension 1 as input_dim (" + str(self.input_dim) / + + ")") + + if not (y_train.shape[1] == 1): + raise Exception("Currently only scalar retrieval targets are" / + " supported.") + + x_mean = np.mean(x_train, axis=0, keepdims=True) + x_sigma = np.std(x_train, axis=0, keepdims=True) + self.x_mean = x_mean + self.x_sigma = x_sigma + + # Handle parameters + # at: adversarial training + # bs: batch size + # ce: convergence epochs + # ilr: initial learning rate + # lrd: learning rate decay + # lrm: learning rate minimum + # me: maximum number of epochs + # ts: split ratio of training set + at, dat, bs, ce, ilr, lrd, lrm, me, ts, kwargs = self.__fit_params__( + kwargs) + + # Split training and validation set if x_val or y_val + # are not provided. + n = x_train.shape[0] + n_train = n + if x_val is None and y_val is None: + n_train = round(ts * n) + n_val = n - n_train + inds = np.random.permutation(n) + x_val = x_train[inds[n_train:], :] + y_val = y_train[inds[n_train:]] + x_train = x_train[inds[:n_train], :] + y_train = y_train[inds[:n_train]] + loss = QuantileLoss(self.quantiles) + + self.custom_objects = {loss.__name__: loss} + optimizer = SGD(lr=ilr) + for model in self.models: + model.compile(loss=loss, optimizer=optimizer) + + if at: + inputs = [model.input, model.targets[0], + model.sample_weights[0]] + input_gradients = K.function( + inputs, K.gradients(model.total_loss, model.input)) + training_generator = AdversarialTrainingGenerator(x_train, + self.x_mean, + self.x_sigma, + y_train, + sigma_noise, + bs, + input_gradients, + dat) + else: + training_generator = TrainingGenerator(x_train, self.x_mean, self.x_sigma, + y_train, sigma_noise, bs) + validation_generator = ValidationGenerator(x_val, self.x_mean, self.x_sigma, + y_val, sigma_noise) + lr_callback = LRDecay(model, lrd, lrm, ce) + model.fit_generator(training_generator, steps_per_epoch=n_train // bs, + epochs=me, validation_data=validation_generator, + validation_steps=1, callbacks=[lr_callback]) + + def predict(self, x): + r""" + Predict quantiles of the conditional distribution P(y|x). + + Forward propagates the inputs in `x` through the network to + obtain the predicted quantiles `y`. + + Arguments: + + x(np.array): Array of shape `(n, m)` containing `n` m-dimensional inputs + for which to predict the conditional quantiles. + + Returns: + + Array of shape `(n, k)` with the columns corresponding to the k + quantiles of the network. + + """ + predictions = np.stack( + [m.predict((x - self.x_mean) / self.x_sigma) for m in self.models]) + return np.mean(predictions, axis=0) + + + def save(self, path): + r""" + Store the QRNN model in a file. + + This stores the model to a file using pickle for all + attributes that support pickling. The Keras model + is handled separately, since it can not be pickled. + + .. note:: In addition to the model file with the given filename, + additional files suffixed with :code:`_model_i` will be + created for each neural network this model consists of. + + Arguments: + + path(str): The path including filename indicating where to + store the model. + + """ + + f = open(path, "wb") + filename = os.path.basename(path) + name = os.path.splitext(filename)[0] + dirname = os.path.dirname(path) + + self.model_files = [] + for i, m in enumerate(self.models): + self.model_files += [name + "_model_" + str(i)] + m.save(os.path.join(dirname, self.model_files[i])) + pickle.dump(self, f) + f.close() + + @staticmethod + def load(path): + r""" + Load a model from a file. + + This loads a model that has been stored using the `save` method. + + Arguments: + + path(str): The path from which to read the model. + + Return: + + The loaded QRNN object. + """ + filename = os.path.basename(path) + dirname = os.path.dirname(path) + + with open(path, "rb") as f: + + qrnn = pickle.load(f) + qrnn.models = [] + for mf in qrnn.model_files: + mf = os.path.basename(mf) + try: + mp = os.path.join(dirname, os.path.basename(mf)) + qrnn.models += [keras.models.load_model(mp, qrnn.custom_objects)] + except: + raise Exception("Error loading the neural network models. " \ + "Please make sure all files created during the"\ + " saving are in this folder.") + keras_qrnn = KerasQRNN(qrnn.input_dim, qrnn.quantiles, qrnn.models) + keras_qrnn.x_mean = qrnn.x_mean + keras_qrnn.x_sigma = qrnn.x_sigma + return keras_qrnn + + def __getstate__(self): + dct = copy.copy(self.__dict__) + dct.pop("models") + return dct + + def __setstate__(self, state): + self.__dict__ = state + self.models = None diff --git a/typhon/retrieval/qrnn/backends/pytorch.py b/typhon/retrieval/qrnn/backends/pytorch.py new file mode 100644 index 00000000..f3dab8c9 --- /dev/null +++ b/typhon/retrieval/qrnn/backends/pytorch.py @@ -0,0 +1,358 @@ +""" +models +====== + +This module provides an implementation of quantile regression neural networks (QRNNs) +using the pytorch deep learning framework. +""" +import torch +import numpy as np +from torch import nn +from torch import optim +from tqdm.auto import tqdm + +activations = {"elu" : nn.ELU, + "hardshrink" : nn.Hardshrink, + "hardtanh" : nn.Hardtanh, + "prelu" : nn.PReLU, + "selu" : nn.SELU, + "celu" : nn.CELU, + "sigmoid" : nn.Sigmoid, + "softplus" : nn.Softplus + "softmin" : nn.Softmin} + +class QuantileLoss: + r""" + The quantile loss function + + This function object implements the quantile loss defined as + + + .. math:: + + \mathcal{L}(y_\text{pred}, y_\text{true}) = + \begin{cases} + \tau \cdot |y_\text{pred} - y_\text{true}| & , y_\text{pred} < y_\text{true} \\ + (1 - \tau) \cdot |y_\text{pred} - y_\text{true}| & , \text{otherwise} + \end{cases} + + + as a training criterion for the training of neural networks. The loss criterion + expects a vector :math:`\mathbf{y}_\tau` of predicted quantiles and the observed + value :math:`y`. The loss for a single training sample is computed by summing the losses + corresponding to each quantiles. The loss for a batch of training samples is + computed by taking the mean over all samples in the batch. + """ + def __init__(self, + quantiles, + mask = -1.0): + """ + Create an instance of the quantile loss function with the given quantiles. + + Arguments: + quantiles: Array or iterable containing the quantiles to be estimated. + """ + self.quantiles = torch.tensor(quantiles).float() + self.n_quantiles = len(quantiles) + self.mask = np.float32(mask) + + def to(self, device): + self.quantiles = self.quantiles.to(device) + + def __call__(self, y_pred, y_true): + """ + Compute the mean quantile loss for given inputs. + + Arguments: + y_pred: N-tensor containing the predicted quantiles along the last + dimension + y_true: (N-1)-tensor containing the true y values corresponding to + the predictions in y_pred + + Returns: + The mean quantile loss. + """ + dy = y_pred - y_true + n = self.quantiles.size()[0] + qs = self.quantiles.reshape((n,) + (1, ) * max(len(dy.size()) - 2, 0)) + l = torch.where(dy >= 0.0, + (1.0 - qs) * dy, + (-qs) * dy) + if not self.mask is None: + l = torch.where(y_true == self.mask, torch.zeros_like(l), l) + return l.mean() + +################################################################################ +# QRNN +################################################################################ + +class PytorchQRNN: + """ + Quantile regression neural network (QRNN) + + This class implements QRNNs as a fully-connected network with + a given number of layers. + """ + def __init__(self, + input_dimension, + quantiles, + model = (3, 128, "relu"), + depth = 3, + width = 128, + activation = nn.ReLU): + """ + Arguments: + input_dimension(int): The number of input features. + quantiles(array): Array of the quantiles to predict. + depth(int): Number of layers of the network. + width(int): The number of neurons in the inner layers of the network. + activation: The activation function to use for the inner layers of the network. + """ + self.input_dimension = input_dimension + self.quantiles = np.array(quantiles) + self.depth = depth + self.width = width + self.activation = nn.ReLU + self.criterion = QuantileLoss(self.quantiles) + + n_quantiles = len(quantiles) + + if type(model) is tuple: + depth = model[0] + width = model[1] + act = model[2] + if type(act) is str: + if act in activations: + act = activations[act] + else: + raise ValueError("{} is not one of the available " + " activations ({}) to use in a pytorch " + "network.".format(act, + list(activations.keys()))) + + self.model = nn.Sequential() + self.model.add_module("fc_0", nn.Linear(input_dimension, width)) + self.model.add_module("act_0", activation()) + for i in range(1, depth - 1): + self.model.add_module("fc_{}".format(i), nn.Linear(width, width)) + self.model.add_module("act_{}".format(i), activation()) + self.model.add_module("fc_{}".format(depth - 1), nn.Linear(width, n_quantiles)) + + self.criterion = QuantileLoss(self.quantiles) + elif isinstance(model, nn.Module): + self.model = model + else: + raise ValueError("Provided model must either be a valid architecture" + " tuple or an instance of pytorch.nn.Module.") + + self.training_errors = [] + self.validation_errors = [] + + def _make_adversarial_samples(self, x, y, eps): + self.model.zero_grad() + x.requires_grad = True + y_pred = self.model(x) + c = self.criterion(y_pred, y) + c.backward() + x_adv = x.detach() + eps * torch.sign(x.grad.detach()) + return x_adv + + def train(self, + training_data, + validation_data, + n_epochs = 1, + adversarial_training = False, + eps_adv = 1e-6, + lr = 1e-2, + momentum = 0.0, + gpu = False): + + """ + Train the network. + + This trains the network for the given number of epochs using the provided + training and validation data. + + If desired, the training can be augmented using adversarial training. In this + case the network is additionally trained with an adversarial batch of examples + in each step of the training. + + Arguments: + training_data: pytorch dataloader providing the training data + validation_data: pytorch dataloader providing the validation data + n_epochs: the number of epochs to train the network for + adversarial_training: whether or not to use adversarial training + eps_adv: The scaling factor to use for adversarial training. + """ + self.optimizer = optim.SGD(self.model.parameters(), + lr = lr, + momentum = momentum) + + self.model.train() + + if torch.cuda.is_available() and gpu: + dev = torch.device("cuda") + else: + dev = torch.device("cpu") + self.model.to(dev) + + self.criterion.to(dev) + + def clear_output(): + print("clreaing output") + try: + import IPython + from IPython.display import clear_output + clear_output(wait=True) + except: + print("failed loading Ipython") + pass + + for i in range(n_epochs): + clear_output() + + err = 0.0 + n = 0 + iterator = tqdm(enumerate(training_data), total = len(training_data)) + iterator.set_description(f"Training epoch {i}/{n_epochs}, lr = {self.optimizer.lr}") + for j, (x, y) in iterator: + x = x.to(dev) + y = y.to(dev) + + shape = x.size() + y = y.reshape((shape[0], 1, shape[2], shape[3])) + + self.optimizer.zero_grad() + y_pred = self.model(x) + c = self.criterion(y_pred, y) + c.backward() + self.optimizer.step() + + err += c.item() * x.size()[0] + n += x.size()[0] + + if adversarial_training: + self.optimizer.zero_grad() + x_adv = self._make_adversarial_samples(x, y, eps_adv) + y_pred = self.model(x) + c = self.criterion(y_pred, y) + c.backward() + self.optimizer.step() + + if j % 100: + iterator.set_postfix({"Training errror" : err / n}) + + # Save training error + self.training_errors.append(err / n) + + val_err = 0.0 + n = 0 + + for x, y in validation_data: + x = x.to(dev) + y = y.to(dev) + + shape = x.size() + y = y.reshape((shape[0], 1, shape[2], shape[3])) + + y_pred = self.model(x) + c = self.criterion(y_pred, y) + + val_err += c.item() * x.size()[0] + n += x.size()[0] + + self.validation_errors.append(val_err / n) + self.model.eval() + + def predict(self, x): + """ + Predict quantiles for given input. + + Args: + x: 2D tensor containing the inputs for which for which to + predict the quantiles. + + Returns: + tensor: 2D tensor containing the predicted quantiles along + the last dimension. + """ + return self.model(x) + + def calibration(self, + data, + gpu=False): + """ + Computes the calibration of the predictions from the neural network. + + Arguments: + data: torch dataloader object providing the data for which to compute + the calibration. + + Returns: + (intervals, frequencies): Tuple containing the confidence intervals and + corresponding observed frequencies. + """ + + if gpu and torch.cuda.is_available(): + dev = torch.device("cuda") + else: + dev = torch.device("cpu") + self.model.to(dev) + + n_intervals = self.quantiles.size // 2 + qs = self.quantiles + intervals = np.array([q_r - q_l for (q_l, q_r) in zip(qs, reversed(qs))])[:n_intervals] + counts = np.zeros(n_intervals) + + total = 0.0 + + iterator = tqdm(data) + for x, y in iterator: + x = x.to(dev) + y = y.to(dev) + shape = x.size() + y = y.reshape((shape[0], 1, shape[2], shape[3])) + + y_pred = self.predict(x) + y_pred = y_pred.cpu() + y = y.cpu() + + for i in range(n_intervals): + l = y_pred[:, i] + r = y_pred[:, -(i + 1)] + counts[i] += np.logical_and(y.cpu() >= l, y.cpu() < r).sum() + + total += np.prod(y.size()) + + return intervals[::-1], (counts / total)[::-1] + + + def save(self, path): + """ + Save QRNN to file. + + Arguments: + The path in which to store the QRNN. + """ + torch.save({"input_dimension" : self.input_dimension, + "quantiles" : self.quantiles, + "width" : self.width, + "depth" : self.depth, + "activation" : self.activation, + "network_state" : self.model.state_dict(), + "optimizer_state" : self.optimizer.state_dict()}, + path) + + @staticmethod + def load(self, path): + """ + Load QRNN from file. + + Arguments: + path: Path of the file where the QRNN was stored. + """ + state = torch.load(path) + keys = ["input_dimension", "quantiles", "depth", "width", "activation"] + qrnn = QRNN(*[state[k] for k in keys]) + qrnn.model.load_state_dict["network_state"] + qrnn.optimizer.load_state_dict["optimizer_state"] diff --git a/typhon/retrieval/qrnn/models/__init__.py b/typhon/retrieval/qrnn/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/typhon/retrieval/qrnn/models/keras.py b/typhon/retrieval/qrnn/models/keras.py new file mode 100644 index 00000000..e049809a --- /dev/null +++ b/typhon/retrieval/qrnn/models/keras.py @@ -0,0 +1,497 @@ +""" +typhon.retrieval.qrnn.models.keras +================================== + +This module provides Keras neural network models that can be used as backend +models with the :py:class:`typhon.retrieval.qrnn.QRNN` class. +""" +import logging +import numpy as np +import keras +from keras.models import Sequential +from keras.layers import Dense, Activation, deserialize +from keras.optimizers import SGD +import keras.backend as K + + +def save_model(f, model): + """ + Save keras model. + + Args: + f(:code:`str` or binary stream): Either a path or a binary stream + to store the data to. + model(:code:`keras.models.Models`): The Keras model to save + """ + keras.models.save_model(model, f) + + +def load_model(f, quantiles): + """ + Load keras model. + + Args: + f(:code:`str` or binary stream): Either a path or a binary stream + to read the model from + quantiles(:code:`np.ndarray`): Array containing the quantiles + that the model predicts. + + Returns: + The loaded keras model. + """ + # + # This is a bit hacky but seems required to handle + # the custom model classes. + # + def make_fully_connected(layers=None, **kwargs): + layers = list(map(deserialize, layers)) + input_dimensions = layers[0].batch_input_shape[1] + return FullyConnected(input_dimensions, quantiles, (), layers) + + custom_objects = { + "FullyConnected": make_fully_connected, + "QuantileLoss": QuantileLoss, + } + model = keras.models.load_model(f, custom_objects=custom_objects) + return model + + +################################################################################ +# Quantile loss +################################################################################ + +LOGGER = logging.getLogger(__name__) + + +def skewed_absolute_error(y_true, y_pred, tau): + """ + The quantile loss function for a given quantile tau: + + L(y_true, y_pred) = (tau - I(y_pred < y_true)) * (y_pred - y_true) + + Where I is the indicator function. + """ + dy = y_pred - y_true + return K.mean((1.0 - tau) * K.relu(dy) + tau * K.relu(-dy), axis=-1) + + +def quantile_loss(y_true, y_pred, taus): + """ + The quantiles loss for a list of quantiles. Sums up the error contribution + from the each of the quantile loss functions. + """ + e = skewed_absolute_error(K.flatten(y_true), K.flatten(y_pred[:, 0]), taus[0]) + for i, tau in enumerate(taus[1:]): + e += skewed_absolute_error(K.flatten(y_true), K.flatten(y_pred[:, i + 1]), tau) + return e + + +class QuantileLoss: + """ + Wrapper class for the quantile error loss function. A class is used here + to allow the implementation of a custom `__repr` function, so that the + loss function object can be easily loaded using `keras.model.load`. + + Attributes: + + quantiles: List of quantiles that should be estimated with + this loss function. + + """ + + def __init__(self, quantiles): + self.__name__ = "QuantileLoss" + self.quantiles = quantiles + + def __call__(self, y_true, y_pred): + return quantile_loss(y_true, y_pred, self.quantiles) + + def __repr__(self): + return "QuantileLoss(" + repr(self.quantiles) + ")" + + +class KerasModel: + def __init__(self): + pass + + def train(self): + pass + + +################################################################################ +# Keras data generators +################################################################################ + + +class BatchedDataset: + """ + Keras data loader that batches a given dataset of numpy arryas. + """ + + def __init__(self, training_data, batch_size): + """ + Create batched dataset. + + Args: + training_data: Tuple :code:`(x, y)` containing the input + and output data as arrays. + batch_size(:code:`int`): The batch size + """ + x, y = training_data + self.x = x + self.y = y + self.bs = batch_size + self.indices = np.random.permutation(x.shape[0]) + self.i = 0 + + def __iter__(self): + LOGGER.info("iter...") + return self + + def __len__(self): + return self.x.shape[0] // self.bs + + def __next__(self): + inds = self.indices[ + np.arange(self.i * self.bs, (self.i + 1) * self.bs) % self.indices.size + ] + x_batch = np.copy(self.x[inds, :]) + y_batch = self.y[inds] + self.i = self.i + 1 + # Shuffle training set after each epoch. + if self.i % (self.x.shape[0] // self.bs) == 0: + self.indices = np.random.permutation(self.x.shape[0]) + + return (x_batch, y_batch) + + +class TrainingGenerator: + """ + This Keras sample generator takes a generator for noise-free training data + and adds independent Gaussian noise to each of the components of the input. + + Attributes: + training_data: Data generator providing the data + sigma_noise: A vector containing the standard deviation of each + component. + """ + + def __init__(self, training_data, sigma_noise=None): + """ + Args: + training_data: Data generator providing the original (noise-free) + training data. + sigma_noise: Vector the length of the input dimensions specifying + the standard deviation of the noise. + """ + self.training_data = training_data + self.sigma_noise = sigma_noise + + def __iter__(self): + LOGGER.info("iter...") + return self + + def __len__(self): + return len(self.training_data) + + def __next__(self): + x_batch, y_batch = next(self.training_data) + if not self.sigma_noise is None: + x_batch += np.random.randn(*x_batch.shape) * self.sigma_noise + return (x_batch, y_batch) + + +class AdversarialTrainingGenerator: + """ + This Keras sample generator takes the noise-free training data + and adds independent Gaussian noise to each of the components + of the input. + + Attributes: + training_data: Training generator to use to generate the input + data + input_gradients: Keras function to compute the gradients of the + network + eps: The perturbation factor. + """ + + def __init__(self, training_data, input_gradients, eps): + """ + Args: + training_data: Training generator to use to generate the input + data + input_gradients: Keras function to compute the gradients of the + network + eps: The perturbation factor. + """ + self.training_data = training_data + self.input_gradients = input_gradients + self.eps = eps + + def __iter__(self): + LOGGER.info("iter...") + return self + + def __len__(self): + return len(self.training_data) + + def __next__(self): + if self.i % 2 == 0: + x_batch, y_batch = next(self.training_data) + self.x_batch = x_batch + self.y_batch = y_batch + else: + x_batch = self.x_batch + y_batch = self.y_batch + grads = self.input_gradients([x_batch, y_batch, 1.0]) + x_batch += self.eps * np.sign(grads) + + self.i = self.i + 1 + return x_batch, y_batch + + +class ValidationGenerator: + """ + This Keras sample generator is similar to the training generator + only that it returns the whole validation set and doesn't perform + any randomization. + + Attributes: + + x_val: The validation input, i.e. the brightness temperatures + measured by the satellite. + y_val: The validation output, i.e. the value of the retrieval + quantity. + x_mean: A vector containing the mean of each input component. + x_sigma: A vector containing the standard deviation of each + component. + """ + + def __init__(self, validation_data, sigma_noise): + self.validation_data = validation_data + self.sigma_noise = sigma_noise + + def __iter__(self): + return self + + def __next__(self): + x_val, y_val = next(self.validation_data) + if not self.sigma_noise is None: + x_val += np.random.randn(*self.x_val.shape) * self.sigma_noise + return (x_val, self.y_val) + + +################################################################################ +# LRDecay +################################################################################ + + +class LRDecay(keras.callbacks.Callback): + """ + The LRDecay class implements the Keras callback interface and reduces + the learning rate according to validation loss reduction. + + Attributes: + + lr_decay: The factor c > 1.0 by which the learning rate is + reduced. + lr_minimum: The training is stopped when this learning rate + is reached. + convergence_steps: The number of epochs without validation loss + reduction required to reduce the learning rate. + + """ + + def __init__(self, model, lr_decay, lr_minimum, convergence_steps): + self.model = model + self.lr_decay = lr_decay + self.lr_minimum = lr_minimum + self.convergence_steps = convergence_steps + self.steps = 0 + + def on_train_begin(self, logs={}): + self.losses = [] + self.steps = 0 + self.min_loss = 1e30 + + def on_epoch_end(self, epoch, logs={}): + loss = logs.get("val_loss") + if loss is None: + loss = logs.get("loss") + self.losses += [loss] + if not self.losses[-1] < self.min_loss: + self.steps = self.steps + 1 + else: + self.steps = 0 + if self.steps > self.convergence_steps: + lr = keras.backend.get_value(self.model.optimizer.lr) + keras.backend.set_value(self.model.optimizer.lr, lr / self.lr_decay) + self.steps = 0 + LOGGER.info("\n Reduced learning rate to " + str(lr)) + + if lr < self.lr_minimum: + self.model.stop_training = True + + self.min_loss = min(self.min_loss, self.losses[-1]) + + +################################################################################ +# QRNN +################################################################################ + + +class KerasModel: + r""" + Base class for Keras models. + + This base class provides generic utility function for the training, saving + and evaluation of Keras models. + + Attributes: + input_dimensions (int): The input dimension of the neural network, i.e. + the dimension of the measurement vector. + quantiles (numpy.array): The 1D-array containing the quantiles + :math:`\tau \in [0, 1]` that the network learns to predict. + + depth (int): + The number layers in the network excluding the input layer. + + width (int): + The width of the hidden layers in the network. + + activation (str): + The name of the activation functions to use in the hidden layers + of the network. + + models (list of keras.models.Sequential): + The ensemble of Keras neural networks used for the quantile regression + neural network. + """ + + def __init__(self, input_dimension, quantiles): + """ + Create a QRNN model. + + Arguments: + input_dimension(int): The dimension of the measurement space, i.e. the number + of elements in a single measurement vector y + + quantiles(np.array): 1D-array containing the quantiles to estimate of + the posterior distribution. Given as fractions + within the range [0, 1]. + """ + self.input_dimension = input_dimension + self.quantiles = np.array(quantiles) + + def reset(self): + """ + Reinitialize the state of the model. + """ + self.reset_states() + + def train( + self, + training_data, + validation_data=None, + batch_size=256, + sigma_noise=None, + adversarial_training=False, + delta_at=0.01, + initial_learning_rate=1e-2, + momentum=0.0, + convergence_epochs=5, + learning_rate_decay=2.0, + learning_rate_minimum=1e-6, + maximum_epochs=200, + training_split=0.9, + gpu=False, + ): + + if type(training_data) == tuple: + if not type(training_data[0]) == np.ndarray: + raise ValueError( + "When training data is provided as tuple" + " (x, y) it must contain numpy arrays." + ) + training_data = BatchedDataset(training_data, batch_size) + + if type(validation_data) is tuple: + validation_data = BatchedDataset(validation_data, batch_size) + + loss = QuantileLoss(self.quantiles) + + # Compile model + self.custom_objects = {loss.__name__: loss} + optimizer = SGD(lr=initial_learning_rate) + self.compile(loss=loss, optimizer=optimizer) + + # + # Setup training generator + # + training_generator = TrainingGenerator(training_data, sigma_noise) + if adversarial_training: + inputs = [self.input, self.targets[0], self.sample_weights[0]] + input_gradients = K.function( + inputs, K.gradients(self.total_loss, self.input) + ) + training_generator = AdversarialTrainingGenerator( + training_generator, input_gradients, delta_at + ) + + if validation_data is None: + validation_generator = None + else: + validation_generator = ValidationGenerator(validation_data, sigma_noise) + lr_callback = LRDecay( + self, learning_rate_decay, learning_rate_minimum, convergence_epochs + ) + self.fit_generator( + training_generator, + steps_per_epoch=len(training_generator), + epochs=maximum_epochs, + validation_data=validation_generator, + validation_steps=1, + callbacks=[lr_callback], + ) + + +################################################################################ +# Fully-connected network +################################################################################ + + +class FullyConnected(KerasModel, Sequential): + """ + Keras implementation of fully-connected networks. + """ + + def __init__(self, input_dimension, quantiles, arch, layers=None): + """ + Create a fully-connected neural network. + + Args: + input_dimension(:code:`int`): Number of input features + quantiles(:code:`array`): The quantiles to predict given + as fractions within [0, 1]. + arch(tuple): Tuple :code:`(d, w, a)` containing :code:`d`, the + number of hidden layers in the network, :code:`w`, the width + of the network and :code:`a`, the type of activation functions + to be used as string. + """ + quantiles = np.array(quantiles) + output_dimension = quantiles.size + + if layers is None: + if len(arch) == 0: + layers = [Dense(output_dimension, input_shape=(input_dimension))] + else: + d, w, a = arch + layers = [Dense(w, input_shape=(input_dimension,))] + for _ in range(d - 1): + layers.append(Dense(w, input_shape=(w,))) + if a is not None: + layers.append(Activation(a)) + layers.append(Dense(output_dimension, input_shape=(w,))) + + KerasModel.__init__(self, input_dimension, quantiles) + Sequential.__init__(self, layers) diff --git a/typhon/retrieval/qrnn/models/pytorch/__init__.py b/typhon/retrieval/qrnn/models/pytorch/__init__.py new file mode 100644 index 00000000..eb2830eb --- /dev/null +++ b/typhon/retrieval/qrnn/models/pytorch/__init__.py @@ -0,0 +1,14 @@ +""" +typhon.retrieval.qrnn.models.pytorch +==================================== + +This model provides Pytorch neural network models that can be used a backend +models for the :py:class:`typhon.retrieval.qrnn.QRNN` class. +""" +from typhon.retrieval.qrnn.models.pytorch.common import ( + BatchedDataset, + save_model, + load_model, +) +from typhon.retrieval.qrnn.models.pytorch.fully_connected import FullyConnected +from typhon.retrieval.qrnn.models.pytorch.unet import UNet diff --git a/typhon/retrieval/qrnn/models/pytorch/common.py b/typhon/retrieval/qrnn/models/pytorch/common.py new file mode 100644 index 00000000..17300fa0 --- /dev/null +++ b/typhon/retrieval/qrnn/models/pytorch/common.py @@ -0,0 +1,501 @@ +""" +models.pytorch.common +===================== + +This module provides common functionality required to realize QRNNs in pytorch. +""" +import torch +import numpy as np +from torch import nn +from torch import optim +from torch.optim.lr_scheduler import ReduceLROnPlateau +from torch.utils.data import Dataset +from tqdm import tqdm + +activations = { + "elu": nn.ELU, + "hardshrink": nn.Hardshrink, + "hardtanh": nn.Hardtanh, + "prelu": nn.PReLU, + "relu": nn.ReLU, + "selu": nn.SELU, + "celu": nn.CELU, + "sigmoid": nn.Sigmoid, + "softplus": nn.Softplus, + "softmin": nn.Softmin, +} + + +def save_model(f, model): + """ + Save pytorch model. + + Args: + f(:code:`str` or binary stream): Either a path or a binary stream + to store the data to. + model(:code:`pytorch.nn.Moduel`): The pytorch model to save + """ + torch.save(model, f) + + +def load_model(f, quantiles): + """ + Load pytorch model. + + Args: + f(:code:`str` or binary stream): Either a path or a binary stream + to read the model from + quantiles(:code:`np.ndarray`): Array containing the quantiles + that the model predicts. + + Returns: + The loaded pytorch model. + """ + model = torch.load(f) + return model + + +def handle_input(data, device=None): + """ + Handle input data. + + This function handles data supplied + + - as tuple of :code:`np.ndarray` + - a single :code:`np.ndarray` + - torch :code:`dataloader` + + If a numpy array is provided it is converted to a torch tensor + so that it can be fed into a pytorch model. + """ + if type(data) == tuple: + x, y = data + x = torch.tensor(x, dtype=torch.float) + y = torch.tensor(y, dtype=torch.float) + if not device is None: + x = x.to(device) + y = y.to(device) + return x, y + if type(data) == np.ndarray: + x = torch.tensor(data, dtype=torch.float) + if not device is None: + x = x.to(device) + return x + else: + return data + + +class BatchedDataset(Dataset): + """ + Batches an un-batched dataset. + """ + + def __init__(self, training_data, batch_size): + x, y = training_data + self.x = torch.tensor(x, dtype=torch.float) + self.y = torch.tensor(y, dtype=torch.float) + self.batch_size = batch_size + + def __len__(self): + # This is required because x and y are tensors and don't throw these + # errors themselves. + return self.x.shape[0] // self.batch_size + + def __getitem__(self, i): + if i >= len(self): + raise IndexError() + i_start = i * self.batch_size + i_end = (i + 1) * self.batch_size + x = self.x[i_start:i_end] + y = self.y[i_start:i_end] + return (x, y) + + +################################################################################ +# Quantile loss +################################################################################ + + +class QuantileLoss: + r""" + The quantile loss function + + This function object implements the quantile loss defined as + + + .. math:: + + \mathcal{L}(y_\text{pred}, y_\text{true}) = + \begin{cases} + \tau \cdot |y_\text{pred} - y_\text{true}| & , y_\text{pred} < y_\text{true} \\ + (1 - \tau) \cdot |y_\text{pred} - y_\text{true}| & , \text{otherwise} + \end{cases} + + + as a training criterion for the training of neural networks. The loss criterion + expects a vector :math:`\mathbf{y}_\tau` of predicted quantiles and the observed + value :math:`y`. The loss for a single training sample is computed by summing the losses + corresponding to each quantiles. The loss for a batch of training samples is + computed by taking the mean over all samples in the batch. + """ + + def __init__(self, quantiles, mask=-1.0): + """ + Create an instance of the quantile loss function with the given quantiles. + + Arguments: + quantiles: Array or iterable containing the quantiles to be estimated. + """ + self.quantiles = torch.tensor(quantiles).float() + self.n_quantiles = len(quantiles) + self.mask = np.float32(mask) + + def to(self, device): + self.quantiles = self.quantiles.to(device) + + def __call__(self, y_pred, y_true): + """ + Compute the mean quantile loss for given inputs. + + Arguments: + y_pred: N-tensor containing the predicted quantiles along the last + dimension + y_true: (N-1)-tensor containing the true y values corresponding to + the predictions in y_pred + + Returns: + The mean quantile loss. + """ + dy = y_pred - y_true + n = self.quantiles.size()[0] + qs = self.quantiles.reshape((n,) + (1,) * max(len(dy.size()) - 2, 0)) + l = torch.where(dy >= 0.0, (1.0 - qs) * dy, (-qs) * dy) + if not self.mask is None: + l = torch.where(y_true == self.mask, torch.zeros_like(l), l) + return l.mean() + + +################################################################################ +# QRNN +################################################################################ + + +class PytorchModel: + """ + Quantile regression neural network (QRNN) + + This class implements QRNNs as a fully-connected network with + a given number of layers. + """ + + def __init__(self, input_dimension, quantiles): + """ + Arguments: + input_dimension(int): The number of input features. + quantiles(array): Array of the quantiles to predict. + """ + self.input_dimension = input_dimension + self.quantiles = np.array(quantiles) + self.criterion = QuantileLoss(self.quantiles) + self.training_errors = [] + self.validation_errors = [] + self.backend = "typhon.retrieval.qrnn.models.pytorch" + + def _make_adversarial_samples(self, x, y, eps): + self.model.zero_grad() + x.requires_grad = True + y_pred = self.model(x) + c = self.criterion(y_pred, y) + c.backward() + x_adv = x.detach() + eps * torch.sign(x.grad.detach()) + return x_adv + + def reset(self): + """ + Reinitializes the weights of a model. + """ + + def reset_function(module): + if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear): + m.reset_parameters() + + self.apply(reset_function) + + def train(self, *args, **kwargs): + """ + Train the network. + + This trains the network for the given number of epochs using the + provided training and validation data. + + If desired, the training can be augmented using adversarial training. + In this case the network is additionally trained with an adversarial + batch of examples in each step of the training. + + Arguments: + training_data: pytorch dataloader providing the training data + validation_data: pytorch dataloader providing the validation data + n_epochs: the number of epochs to train the network for + adversarial_training: whether or not to use adversarial training + eps_adv: The scaling factor to use for adversarial training. + """ + # Handle overload of train() method + if len(args) < 1 or (len(args) == 1 and type(args[0]) == bool): + return nn.Sequential.train(self, *args, **kwargs) + + # + # Parse training arguments + # + + training_data = args[0] + arguments = { + "validation_data": None, + "batch_size": 256, + "sigma_noise": None, + "adversarial_training": False, + "delta_at": 0.01, + "initial_learning_rate": 1e-2, + "momentum": 0.0, + "convergence_epochs": 5, + "learning_rate_decay": 2.0, + "learning_rate_minimum": 1e-6, + "maximum_epochs": 1, + "training_split": 0.9, + "gpu": False, + } + argument_names = arguments.keys() + for a, n in zip(args[1:], argument_names): + arguments[n] = a + for k in kwargs: + if k in arguments: + arguments[k] = kwargs[k] + else: + raise ValueError("Unknown argument to {}.".print(k)) + + validation_data = arguments["validation_data"] + batch_size = arguments["batch_size"] + sigma_noise = arguments["sigma_noise"] + adversarial_training = arguments["adversarial_training"] + delta_at = arguments["delta_at"] + initial_learning_rate = arguments["initial_learning_rate"] + convergence_epochs = arguments["convergence_epochs"] + learning_rate_decay = arguments["learning_rate_decay"] + learning_rate_minimum = arguments["learning_rate_minimum"] + maximum_epochs = arguments["maximum_epochs"] + training_split = arguments["training_split"] + gpu = arguments["gpu"] + momentum = arguments["momentum"] + + # + # Determine device to use + # + if torch.cuda.is_available() and gpu: + device = torch.device("cuda") + else: + device = torch.device("cpu") + self.to(device) + + # + # Handle input data + # + try: + x, y = handle_input(training_data, device) + training_data = BatchedDataset((x, y), batch_size) + except: + pass + + self.train() + self.optimizer = optim.SGD( + self.parameters(), lr=initial_learning_rate, momentum=momentum + ) + self.criterion.to(device) + scheduler = ReduceLROnPlateau( + self.optimizer, + factor=1.0 / learning_rate_decay, + patience=convergence_epochs, + min_lr=learning_rate_minimum, + ) + training_errors = [] + validation_errors = [] + + # + # Training loop + # + + for i in range(maximum_epochs): + err = 0.0 + n = 0 + for j, (x, y) in enumerate(training_data): + + x = x.to(device) + y = y.to(device) + + shape = x.size() + shape = (shape[0], 1) + shape[2:] + y = y.reshape(shape) + + self.optimizer.zero_grad() + y_pred = self(x) + c = self.criterion(y_pred, y) + c.backward() + self.optimizer.step() + + err += c.item() * x.size()[0] + n += x.size()[0] + + if adversarial_training: + self.optimizer.zero_grad() + x_adv = self._make_adversarial_samples(x, y, delta_at) + y_pred = self(x) + c = self.criterion(y_pred, y) + c.backward() + self.optimizer.step() + + if j % 100: + print( + "Epoch {} / {}: Batch {} / {}, Training error: {:.3f}".format( + i, maximum_epochs, j, len(training_data), err / n + ), + end="\r", + ) + + # Save training error + training_errors.append(err / n) + + lr = [group["lr"] for group in self.optimizer.param_groups][0] + + val_err = 0.0 + if not validation_data is None: + n = 0 + for x, y in validation_data: + x = x.to(device).detach() + y = y.to(device).detach() + + shape = x.size() + shape = (shape[0], 1) + shape[2:] + y = y.reshape(shape) + + y_pred = self(x) + c = self.criterion(y_pred, y) + + val_err += c.item() * x.size()[0] + n += x.size()[0] + validation_errors.append(val_err / n) + + print( + "Epoch {} / {}: Training error: {:.3f}, Validation error: {:.3f}, Learning rate: {:.5f}".format( + i, + maximum_epochs, + training_errors[-1], + validation_errors[-1], + lr, + ) + ) + scheduler.step(val_err / n) + else: + scheduler.step(err / n) + print( + "Epoch {} / {}: Training error: {:.3f}, Learning rate: {:.5f}".format( + i, maximum_epochs, training_errors[-1], lr + ) + ) + + self.training_errors += training_errors + self.validation_errors += validation_errors + self.eval() + return { + "training_errors": self.training_errors, + "validation_errors": self.validation_errors, + } + + def predict(self, x, gpu=False): + "" + if torch.cuda.is_available() and gpu: + device = torch.device("cuda") + else: + device = torch.device("cpu") + x = handle_input(x, device) + self.to(device) + return self(x.detach()).detach().numpy() + + def calibration(self, data, gpu=False): + """ + Computes the calibration of the predictions from the neural network. + + Arguments: + data: torch dataloader object providing the data for which to compute + the calibration. + + Returns: + (intervals, frequencies): Tuple containing the confidence intervals and + corresponding observed frequencies. + """ + + if gpu and torch.cuda.is_available(): + dev = torch.device("cuda") + else: + dev = torch.device("cpu") + self.to(dev) + + n_intervals = self.quantiles.size // 2 + qs = self.quantiles + intervals = np.array([q_r - q_l for (q_l, q_r) in zip(qs, reversed(qs))])[ + :n_intervals + ] + counts = np.zeros(n_intervals) + + total = 0.0 + + iterator = tqdm(data) + for x, y in iterator: + x = x.to(dev).detach() + y = y.to(dev).detach() + shape = x.size() + shape = (shape[0], 1) + shape[2:] + y = y.reshape(shape) + + y_pred = self(x) + y_pred = y_pred.cpu() + y = y.cpu() + + for i in range(n_intervals): + l = y_pred[:, [i]] + r = y_pred[:, [-(i + 1)]] + counts[i] += np.logical_and(y >= l, y < r).sum() + + total += np.prod(y.size()) + return intervals[::-1], (counts / total)[::-1] + + def save(self, path): + """ + Save QRNN to file. + + Arguments: + The path in which to store the QRNN. + """ + torch.save( + { + "input_dimension": self.input_dimension, + "quantiles": self.quantiles, + "width": self.width, + "depth": self.depth, + "activation": self.activation, + "network_state": self.state_dict(), + "optimizer_state": self.optimizer.state_dict(), + }, + path, + ) + + @staticmethod + def load(self, path): + """ + Load QRNN from file. + + Arguments: + path: Path of the file where the QRNN was stored. + """ + state = torch.load(path, map_location=torch.device("cpu")) + keys = ["input_dimension", "quantiles", "depth", "width", "activation"] + qrnn = QRNN(*[state[k] for k in keys]) + qrnn.load_state_dict["network_state"] + qrnn.optimizer.load_state_dict["optimizer_state"] diff --git a/typhon/retrieval/qrnn/models/pytorch/fully_connected.py b/typhon/retrieval/qrnn/models/pytorch/fully_connected.py new file mode 100644 index 00000000..f84983ba --- /dev/null +++ b/typhon/retrieval/qrnn/models/pytorch/fully_connected.py @@ -0,0 +1,51 @@ +""" +typhon.retrieval.qrnn.models.pytorch.fully_connected +==================================================== + +This module provides an implementation of a fully-connected feed forward +neural network in pytorch. +""" +from torch import nn +from typhon.retrieval.qrnn.models.pytorch.common import PytorchModel, activations + +################################################################################ +# Fully-connected network +################################################################################ + + +class FullyConnected(PytorchModel, nn.Sequential): + """ + Pytorch implementation of a fully-connected QRNN model. + """ + + def __init__(self, input_dimension, quantiles, arch): + + """ + Create a fully-connected neural network. + + Args: + input_dimension(:code:`int`): Number of input features + quantiles(:code:`array`): The quantiles to predict given + as fractions within [0, 1]. + arch(tuple): Tuple :code:`(d, w, a)` containing :code:`d`, the + number of hidden layers in the network, :code:`w`, the width + of the network and :code:`a`, the type of activation functions + to be used as string. + """ + PytorchModel.__init__(self, input_dimension, quantiles) + output_dimension = quantiles.size + self.arch = arch + + if len(arch) == 0: + layers = [nn.Linear(input_dimension, output_dimension)] + else: + d, w, act = arch + if isinstance(act, str): + act = activations[act] + layers = [nn.Linear(input_dimension, w)] + for _ in range(d - 1): + layers.append(nn.Linear(w, w)) + if act is not None: + layers.append(act()) + layers.append(nn.Linear(w, output_dimension)) + nn.Sequential.__init__(self, *layers) diff --git a/typhon/retrieval/qrnn/models/pytorch/unet.py b/typhon/retrieval/qrnn/models/pytorch/unet.py new file mode 100644 index 00000000..85b2d8b6 --- /dev/null +++ b/typhon/retrieval/qrnn/models/pytorch/unet.py @@ -0,0 +1,250 @@ +""" +typhon.retrieval.qrnn.models.pytorch.unet +========================================= + +This module provides an implementation of the UNet [unet]_ +architecture. + +.. [unet] O. Ronneberger, P. Fischer and T. Brox, "U-net: Convolutional networks +for biomedical image segmentation", Proc. Int. Conf. Med. Image Comput. +Comput.-Assist. Intervent. (MICCAI), pp. 234-241, 2015. +""" +import torch +from torch import nn + +from typhon.retrieval.qrnn.models.pytorch.common import PytorchModel + + +class Layer(nn.Sequential): + """ + Basic building block of a UNet. Consists of a convolutional + layer followed by an activation layers and an optional batch + norm layer. + + Args: + features_in(:code:`int`): Number of features of input + features_out(:code:`int`): Raw number of output features of the + layer not including skip connections. + batch_norm(:code:`bool`): Whether or not to include a batch norm + layer. + kernel_size(:code:`int`): Kernel size to use for the conv. layer. + activation(:code:`activation`): Activation to use after conv. layer. + skip_connection(:code:`bool`): Whether to include skip connections, i.e. + to include input in layer output. + """ + + def __init__( + self, + features_in, + features_out, + kernel_size=3, + activation=nn.ReLU, + skip_connection=False, + ): + self._features_in = features_in + self._features_out = features_out + self.skip_connection = skip_connection + + if activation is not None: + modules = [ + nn.ConstantPad2d(1, 0.0), + nn.Conv2d(features_in, features_out, kernel_size), + nn.BatchNorm2d(features_out), + activation(), + ] + else: + modules = [ + nn.ConstantPad2d(1, 0.0), + nn.Conv2d(features_in, features_out, kernel_size), + nn.BatchNorm2d(features_out), + ] + super().__init__(*modules) + + @property + def features_out(self): + """ + The number outgoing channels of the layer. + """ + if self.skip_connection: + return self._features_in + self._features_out + return self._features_out + + def forward(self, x): + """ Forward input through layer. """ + y = nn.Sequential.forward(self, x) + if self.skip_connection: + y = torch.cat([x, y], dim=1) + return y + + +class Block(nn.Sequential): + """ + A block bundles a set of layers. + """ + + def __init__( + self, + features_in, + features_out, + depth=2, + batch_norm=True, + activation=nn.ReLU, + kernel_size=3, + skip_connection=None, + ): + """ + Args: + features_in(:code:`int`): The number of input features of the block + features_out(:code:`int`): The number of output features of the block. + depth(:code:`int`): The number of layers of the block + activation(:code:`nn.Module`): Pytorch activation layer to + use. :code:`nn.ReLU` by default. + skip_connection(:code:`str`): Whether or not to insert skip + connections before all layers (:code:`"all"`) or just at + the end (:code:`"end"`). + """ + self._features_in = features_in + + if skip_connection == "all": + skip_connection_layer = True + self.skip_connection = False + elif skip_connection == "end": + skip_connection_layer = False + self.skip_connection = True + else: + skip_connection_layer = False + self.skip_connection = False + + layers = [] + nf = features_in + for d in range(depth): + layers.append( + Layer( + nf, + features_out, + activation=activation, + batch_norm=batch_norm, + kernel_size=kernel_size, + skip_connection=skip_connection_layer, + ) + ) + nf = layers[-1].features_out + + self._features_out = layers[-1].features_out + super().__init__(*layers) + + @property + def features_out(self): + """ + The number outgoing channels of the layer. + """ + if self.skip_connection: + return self._features_in + self._features_out + else: + return self._features_out + + def forward(self, x): + """ Forward input through layer. """ + y = nn.Sequential.forward(self, x) + if self.skip_connection: + y = torch.cat([x, y], dim=1) + return y + + +class DownSampler(nn.Sequential): + """ + A downsampling block reduces the input resolution by applying max-pooling. + """ + def __init__(self): + modules = [nn.MaxPool2d(2)] + super().__init__(*modules) + + +class UpSampler(nn.Sequential): + """ + An upsampling block increases the input resolution by transposed convolution. + """ + def __init__(self, features_in, features_out): + modules = [ + nn.ConvTranspose2d( + features_in, features_out, 3, padding=1, output_padding=1, stride=2 + ) + ] + super().__init__(*modules) + + +class UNet(PytorchModel, nn.Module): + """ + Pytorch implementation of the UNet architecture for image segmentation. + """ + def __init__( + self, input_features, quantiles, n_features=32, n_levels=4, skip_connection=None + ): + """ + Args: + input_features(``int``): The number of channels of the input image. + quantiles(``np.array``): Array containing the quantiles to predict. + n_features: The number of channels of the first convolution block. + n_level: The number of down-sampling steps. + skip_connection: Whether or not to include skip connections in + each block. + """ + nn.Module.__init__(self) + PytorchModel.__init__(self, input_features, quantiles) + + # Down-sampling blocks + self.down_blocks = nn.ModuleList() + self.down_samplers = nn.ModuleList() + features_in = input_features + features_out = n_features + for i in range(n_levels - 1): + self.down_blocks.append( + Block(features_in, features_out, skip_connection=skip_connection) + ) + self.down_samplers.append(DownSampler()) + features_in = self.down_blocks[-1].features_out + features_out = features_out * 2 + + self.center_block = Block( + features_in, features_out, skip_connection=skip_connection + ) + + self.up_blocks = nn.ModuleList() + self.up_samplers = nn.ModuleList() + features_in = self.center_block.features_out + features_out = features_out // 2 + for i in range(n_levels - 1): + self.up_samplers.append(UpSampler(features_in, features_out)) + features_in = features_out + self.down_blocks[(-i - 1)].features_out + self.up_blocks.append( + Block(features_in, features_out, skip_connection=skip_connection) + ) + features_out = features_out // 2 + features_in = self.up_blocks[-1].features_out + + self.head = nn.Sequential( + nn.Conv2d(features_in, features_in, 1), + nn.ReLU(), + nn.Conv2d(features_in, features_in, 1), + nn.ReLU(), + nn.Conv2d(features_in, quantiles.size, 1), + ) + + def forward(self, x): + """ Propagate input through layer. """ + features = [] + for (b, s) in zip(self.down_blocks, self.down_samplers): + x = b(x) + features.append(x) + x = s(x) + + x = self.center_block(x) + + for (b, u, f) in zip(self.up_blocks, self.up_samplers, features[::-1]): + x = u(x) + x = torch.cat([x, f], 1) + x = b(x) + + self.features = features + + return self.head(x) diff --git a/typhon/retrieval/qrnn/qrnn.py b/typhon/retrieval/qrnn/qrnn.py index 956c375b..72307cae 100644 --- a/typhon/retrieval/qrnn/qrnn.py +++ b/typhon/retrieval/qrnn/qrnn.py @@ -1,701 +1,318 @@ +""" +typhon.retrieval.qrnn.qrnn +========================== + +This module provides the QRNN class, which implements the high-level +functionality of quantile regression neural networks, while the neural +network implementation is left to the model backends implemented in the +``typhon.retrieval.qrnn.models`` submodule. +""" import copy import logging import os import pickle +import importlib +import scipy +from scipy.stats import norm import numpy as np from scipy.interpolate import CubicSpline -# Keras Imports -try: - import keras - from keras.models import Sequential, clone_model - from keras.layers import Dense, Activation, Dropout - from keras.optimizers import SGD -except ImportError: - raise ImportError( - "Could not import the required Keras modules. The QRNN " - "implementation was developed for use with Keras version 2.0.9.") - ################################################################################ -# Loss Functions +# Set the backend ################################################################################ -import keras.backend as K - - -logger = logging.getLogger(__name__) - - -def skewed_absolute_error(y_true, y_pred, tau): - """ - The quantile loss function for a given quantile tau: - - L(y_true, y_pred) = (tau - I(y_pred < y_true)) * (y_pred - y_true) - - Where I is the indicator function. - """ - dy = y_pred - y_true - return K.mean((1.0 - tau) * K.relu(dy) + tau * K.relu(-dy), axis=-1) - - -def quantile_loss(y_true, y_pred, taus): - """ - The quantiles loss for a list of quantiles. Sums up the error contribution - from the each of the quantile loss functions. - """ - e = skewed_absolute_error( - K.flatten(y_true), K.flatten(y_pred[:, 0]), taus[0]) - for i, tau in enumerate(taus[1:]): - e += skewed_absolute_error(K.flatten(y_true), - K.flatten(y_pred[:, i + 1]), - tau) - return e - - -class QuantileLoss: - """ - Wrapper class for the quantile error loss function. A class is used here - to allow the implementation of a custom `__repr` function, so that the - loss function object can be easily loaded using `keras.model.load`. - - Attributes: - - quantiles: List of quantiles that should be estimated with - this loss function. - - """ - - def __init__(self, quantiles): - self.__name__ = "QuantileLoss" - self.quantiles = quantiles - - def __call__(self, y_true, y_pred): - return quantile_loss(y_true, y_pred, self.quantiles) - - def __repr__(self): - return "QuantileLoss(" + repr(self.quantiles) + ")" - -################################################################################ -# Keras Interface Classes -################################################################################ - - -class TrainingGenerator: +try: + import typhon.retrieval.qrnn.models.keras as keras + backend = keras +except Exception as e: + try: + import typhon.retrieval.qrnn.models.pytorch as pytorch + backend = pytorch + except: + raise Exception("Couldn't import neither Keras nor Pytorch " + "one of them must be available to use the QRNN" + " module.") + +def set_backend(name): """ - This Keras sample generator takes the noise-free training data - and adds independent Gaussian noise to each of the components - of the input. + Set the neural network package to use as backend. - Attributes: + The currently available backend are "keras" and "pytorch". - x_train: The training input, i.e. the brightness temperatures - measured by the satellite. - y_train: The training output, i.e. the value of the retrieval - quantity. - x_mean: A vector containing the mean of each input component. - x_sigma: A vector containing the standard deviation of each - component. - batch_size: The size of a training batch. + Args: + name(str): The name of the backend. """ - - def __init__(self, x_train, x_mean, x_sigma, y_train, sigma_noise, batch_size): - self.bs = batch_size - - self.x_train = x_train - self.x_mean = x_mean - self.x_sigma = x_sigma - self.y_train = y_train - self.sigma_noise = sigma_noise - - self.indices = np.random.permutation(x_train.shape[0]) - self.i = 0 - - def __iter__(self): - logger.info("iter...") - return self - - def __next__(self): - inds = self.indices[np.arange(self.i * self.bs, - (self.i + 1) * self.bs) - % self.indices.size] - x_batch = np.copy(self.x_train[inds, :]) - if not self.sigma_noise is None: - x_batch += np.random.randn(*x_batch.shape) * self.sigma_noise - x_batch = (x_batch - self.x_mean) / self.x_sigma - y_batch = self.y_train[inds] - - self.i = self.i + 1 - - # Shuffle training set after each epoch. - if self.i % (self.x_train.shape[0] // self.bs) == 0: - self.indices = np.random.permutation(self.x_train.shape[0]) - - return (x_batch, y_batch) - -# TODO: Make y-noise argument optional - - -class AdversarialTrainingGenerator: + global backend + if name == "keras": + try: + import typhon.retrieval.qrnn.models.keras as keras + backend = keras + except Exception as e: + raise Exception("The following error occurred while trying " + " to import keras: ", e) + elif name == "pytorch": + try: + import typhon.retrieval.qrnn.models.pytorch as pytorch + backend = pytorch + except Exception as e: + raise Exception("The following error occurred while trying " + " to import pytorch: ", e) + else: + raise Exception("\"{}\" is not a supported backend.".format(name)) + +def get_backend(name): """ - This Keras sample generator takes the noise-free training data - and adds independent Gaussian noise to each of the components - of the input. + Get module object corresponding to the short backend name. - Attributes: + The currently available backend are "keras" and "pytorch". - x_train: The training input, i.e. the brightness temperatures - measured by the satellite. - y_train: The training output, i.e. the value of the retrieval - quantity. - x_mean: A vector containing the mean of each input component. - x_sigma: A vector containing the standard deviation of each - component. - batch_size: The size of a training batch. + Args: + name(str): The name of the backend. """ - - def __init__(self, - x_train, - x_mean, - x_sigma, - y_train, - sigma_noise, - batch_size, - input_gradients, - eps): - self.bs = batch_size - - self.x_train = x_train - self.x_mean = x_mean - self.x_sigma = x_sigma - self.y_train = y_train - self.sigma_noise = sigma_noise - - self.indices = np.random.permutation(x_train.shape[0]) - self.i = 0 - - # compile gradient function - bs2 = self.bs // 2 - - self.input_gradients = input_gradients - self.eps = eps - - def __iter__(self): - logger.info("iter...") - return self - - def __next__(self): - - if self.i == 0: - inds = np.random.randint(0, self.x_train.shape[0], self.bs) - - x_batch = np.copy(self.x_train[inds, :]) - if (self.sigma_noise): - x_batch += np.random.randn(*x_batch.shape) * self.sigma_noise - - x_batch = (x_batch - self.x_mean) / self.x_sigma - y_batch = self.y_train[inds] - - else: - - bs2 = self.bs // 2 - inds = np.random.randint(0, self.x_train.shape[0], bs2) - - x_batch = np.zeros((self.bs, self.x_train.shape[1])) - y_batch = np.zeros((self.bs, 1)) - - x_batch[:bs2, :] = np.copy(self.x_train[inds, :]) - if (self.sigma_noise): - x_batch[:bs2, :] += np.random.randn(bs2, self.x_train.shape[1]) \ - * self.sigma_noise - x_batch[:bs2, :] = (x_batch[:bs2, :] - self.x_mean) / self.x_sigma - y_batch[:bs2, :] = self.y_train[inds].reshape(-1, 1) - x_batch[bs2:, :] = x_batch[:bs2, :] - y_batch[bs2:, :] = y_batch[:bs2, :] - - if (self.i > 10): - grads = self.input_gradients( - [x_batch[:bs2, :], y_batch[:bs2, :], [1.0]])[0] - x_batch[bs2:, :] += self.eps * np.sign(grads) - - self.i = self.i + 1 - return (x_batch, y_batch) - - -# TODO: Make y-noise argument optional -class ValidationGenerator: + if name == "keras": + try: + import typhon.retrieval.qrnn.models.keras as keras + backend = keras + except Exception as e: + raise Exception("The following error occurred while trying " + " to import keras: ", e) + elif name == "pytorch": + try: + import typhon.retrieval.qrnn.models.pytorch as pytorch + backend = pytorch + except Exception as e: + raise Exception("The following error occurred while trying " + " to import pytorch: ", e) + else: + raise Exception("\"{}\" is not a supported backend.".format(name)) + return backend + +def fit_gaussian_to_quantiles(y_pred, taus): """ - This Keras sample generator is similar to the training generator - only that it returns the whole validation set and doesn't perform - any randomization. - - Attributes: - - x_val: The validation input, i.e. the brightness temperatures - measured by the satellite. - y_val: The validation output, i.e. the value of the retrieval - quantity. - x_mean: A vector containing the mean of each input component. - x_sigma: A vector containing the standard deviation of each - component. + Fits Gaussian distributions to predicted quantiles. + + Fits mean and standard deviation values to quantiles by minimizing + the mean squared distance of the predicted quantiles and those of + the corresponding Gaussian distribution. + + Args: + y_pred (``np.array``): Array of shape `(n, m)` containing the `m` + predicted quantiles for n different inputs. + taus(``np.array``): Array of shape `(m,)` containing the quantile + fractions corresponding to the predictions in ``y_pred``. + + Returns: + Tuple ``(mu, sigma)`` of vectors of size `n` containing the mean and + standard deviations of the Gaussian distributions corresponding to + the predictions in ``y_pred``. """ + x = norm.ppf(taus) - def __init__(self, x_val, x_mean, x_sigma, y_val, sigma_noise): - self.x_val = x_val - self.x_mean = x_mean - self.x_sigma = x_sigma - - self.y_val = y_val + d2e_00 = x.size + d2e_01 = x.sum(axis=-1) + d2e_10 = x.sum(axis=-1) + d2e_11 = np.sum(x ** 2, axis=-1) - self.sigma_noise = sigma_noise + d2e_det_inv = 1.0 / (d2e_00 * d2e_11 - d2e_01 * d2e_11) + d2e_inv_00 = d2e_det_inv * d2e_11 + d2e_inv_01 = -d2e_det_inv * d2e_01 + d2e_inv_10 = -d2e_det_inv * d2e_10 + d2e_inv_11 = d2e_det_inv * d2e_00 - def __iter__(self): - return self + de_0 = -np.sum(y_pred - x, axis=-1) + de_1 = -np.sum(x * (y_pred - x), axis=-1) - def __next__(self): - x_val = np.copy(self.x_val) - if not self.sigma_noise is None: - x_val += np.random.randn(*self.x_val.shape) * self.sigma_noise - x_val = (x_val - self.x_mean) / self.x_sigma - return (x_val, self.y_val) + mu = -(d2e_inv_00 * de_0 + d2e_inv_01 * de_1) + sigma = 1.0 - (d2e_inv_10 * de_0 + d2e_inv_11 * de_1) + return mu, sigma -class LRDecay(keras.callbacks.Callback): +def create_model(input_dim, + output_dim, + arch): """ - The LRDecay class implements the Keras callback interface and reduces - the learning rate according to validation loss reduction. - - Attributes: - - lr_decay: The factor c > 1.0 by which the learning rate is - reduced. - lr_minimum: The training is stopped when this learning rate - is reached. - convergence_steps: The number of epochs without validation loss - reduction required to reduce the learning rate. - + Creates a fully-connected neural network from a tuple + describing its architecture. + + Args: + input_dim(int): Number of input features. + output_dim(int): Number of output features. + arch: Tuple (d, w, a) containing the depth, i.e. number of + hidden layers width of the hidden layers, i. e. + the number of neurons in them, and the name of the + activation function as string. + Return: + Depending on the available backends, a fully-connected + keras or pytorch model, with the requested number of hidden + layers and neurons in them. """ + return backend.FullyConnected(input_dim, output_dim, arch) - def __init__(self, model, lr_decay, lr_minimum, convergence_steps): - self.model = model - self.lr_decay = lr_decay - self.lr_minimum = lr_minimum - self.convergence_steps = convergence_steps - self.steps = 0 - - def on_train_begin(self, logs={}): - self.losses = [] - self.steps = 0 - self.min_loss = 1e30 - - def on_epoch_end(self, epoch, logs={}): - self.losses += [logs.get('val_loss')] - if not self.losses[-1] < self.min_loss: - self.steps = self.steps + 1 - else: - self.steps = 0 - if self.steps > self.convergence_steps: - lr = keras.backend.get_value(self.model.optimizer.lr) - keras.backend.set_value( - self.model.optimizer.lr, lr / self.lr_decay) - self.steps = 0 - logger.info("\n Reduced learning rate to " + str(lr)) - - if lr < self.lr_minimum: - self.model.stop_training = True - - self.min_loss = min(self.min_loss, self.losses[-1]) ################################################################################ -# QRNN +# QRNN class ################################################################################ - class QRNN: r""" Quantile Regression Neural Network (QRNN) - This class implements quantile regression neural networks and can be used - to estimate quantiles of the posterior distribution of remote sensing - retrievals. + This class provides a high-level implementation of quantile regression + neural networks. It can be used to estimate quantiles of the posterior + distribution of remote sensing retrievals. - Internally, the QRNN uses a feed-forward neural network that is trained - to minimize the quantile loss function + The :class:`QRNN`` class uses an arbitrary neural network model, that is + trained to minimize the quantile loss function .. math:: \mathcal{L}_\tau(y_\tau, y_{true}) = \begin{cases} (1 - \tau)|y_\tau - y_{true}| & \text{ if } y_\tau < y_\text{true} \\ \tau |y_\tau - y_\text{true}| & \text{ otherwise, }\end{cases} - where :math:`x_\text{true}` is the expected value of the retrieval quantity + where :math:`x_\text{true}` is the true value of the retrieval quantity and and :math:`x_\tau` is the predicted quantile. The neural network has one output neuron for each quantile to estimate. - For the training, this implementation provides custom data generators that - can be used to add Gaussian noise to the training data as well as adversarial - training using the fast gradient sign method. + The QRNN class provides a generic QRNN implementation in the sense that it + does not assume a fixed neural network architecture or implementation. + Instead, this functionality is off-loaded to a model object, which can be + an arbitrary regression network such as a fully-connected or a + convolutional network. A range of different models are provided in the + typhon.retrieval.qrnn.models module. The :class:`QRNN`` class just + implements high-level operation on the QRNN output while training and + prediction are delegated to the model object. For details on the respective + implementation refer to the documentation of the corresponding model class. - This implementation also provides functionality to use an ensemble of networks - instead of just a single network to predict the quantiles. + .. note:: - .. note:: For the QRNN I am using :math:`x` to denote the input vector and - :math:`y` to denote the output. While this is opposed to typical - inverse problem notation, it is inline with machine learning - notation and felt more natural for the implementation. If this - annoys you, I am sorry. But the other way round it would have - annoyed other people and in particular me. + For the QRNN implementation :math:`x` is used to denote the input + vector and :math:`y` to denote the output. While this is opposed + to inverse problem notation typically used for retrievals, it is + in line with machine learning notation and felt more natural for + the implementation. If this annoys you, I am sorry. Attributes: - - input_dim (int): - The input dimension of the neural network, i.e. the dimension of the - measurement vector. - + backend(``str``): + The name of the backend used for the neural network model. quantiles (numpy.array): - The 1D-array containing the quantiles :math:`\tau \in [0, 1]` that the - network learns to predict. - - depth (int): - The number layers in the network excluding the input layer. - - width (int): - The width of the hidden layers in the network. - - activation (str): - The name of the activation functions to use in the hidden layers - of the network. - - models (list of keras.models.Sequential): - The ensemble of Keras neural networks used for the quantile regression - neural network. + The 1D-array containing the quantiles :math:`\tau \in [0, 1]` + that the network learns to predict. + model: + The neural network regression model used to predict the quantiles. """ - def __init__(self, - input_dim, - quantiles, - depth=3, - width=128, - activation="relu", + input_dimensions, + quantiles=None, + model=(3, 128, "relu"), ensemble_size=1, **kwargs): """ Create a QRNN model. Arguments: - - input_dim(int): The dimension of the measurement space, i.e. the number - of elements in a single measurement vector y - - quantiles(np.array): 1D-array containing the quantiles to estimate of - the posterior distribution. Given as fractions - within the range [0, 1]. - - depth(int): The number of hidden layers in the neural network to - use for the regression. Default is 3, i.e. three hidden - plus input and output layer. - - width(int): The number of neurons in each hidden layer. - - activation(str): The name of the activation functions to use. Default - is "relu", for rectified linear unit. See - `this `_ link for - available functions. - - **kwargs: Additional keyword arguments are passed to the constructor - call `keras.layers.Dense` of the hidden layers, which can - for example be used to add regularization. For more info consult - `Keras documentation. `_ + input_dimensions(int): + The dimension of the measurement space, i.e. the + number of elements in a single measurement vector y + quantiles(np.array): + 1D-array containing the quantiles to estimate of + the posterior distribution. Given as fractions within the range + [0, 1]. + model: + A (possibly trained) model instance or a tuple + ``(d, w, act)`` describing the architecture of a fully-connected + neural network with :code:`d` hidden layers with :code:`w` neurons + and :code:`act` activation functions. """ - self.input_dim = input_dim + self.input_dimensions = input_dimensions self.quantiles = np.array(quantiles) - self.depth = depth - self.width = width - self.activation = activation - - model = Sequential() - if depth == 0: - model.add(Dense(input_dim=input_dim, - units=len(quantiles), - activation=None)) + self.backend = backend.__name__ + + if type(model) == tuple: + self.model = backend.FullyConnected(self.input_dimensions, + self.quantiles, + model) + if quantiles is None: + raise ValueError("If model is given as architecture tuple, the" + " 'quantiles' kwarg must be provided.") else: - model.add(Dense(input_dim=input_dim, - units=width, - activation=activation)) - for i in range(depth - 2): - model.add(Dense(units=width, - activation=activation, - **kwargs)) - model.add(Dense(units=len(quantiles), activation=None)) - self.models = [clone_model(model) for i in range(ensemble_size)] - - def __fit_params__(self, kwargs): - at = kwargs.pop("adversarial_training", False) - dat = kwargs.pop("delta_at", 0.01) - batch_size = kwargs.pop("batch_size", 512) - convergence_epochs = kwargs.pop("convergence_epochs", 10) - initial_learning_rate = kwargs.pop('initial_learning_rate', 0.01) - learning_rate_decay = kwargs.pop('learning_rate_decay', 2.0) - learning_rate_minimum = kwargs.pop('learning_rate_minimum', 1e-6) - maximum_epochs = kwargs.pop("maximum_epochs", 200) - training_split = kwargs.pop("training_split", 0.9) - return at, dat, batch_size, convergence_epochs, initial_learning_rate, \ - learning_rate_decay, learning_rate_minimum, maximum_epochs, \ - training_split, kwargs - - def cross_validation(self, - x_train, - y_train, - sigma_noise = None, - n_folds=5, - s=None, - **kwargs): - r""" - Perform n-fold cross validation. - - This function trains the network n times on different subsets of the - provided training data, always keeping a fraction of 1/n samples apart - for testing. Performance for each of the networks is evaluated and mean - and standard deviation for all folds are returned. This is to reduce - the influence of random fluctuations of the network performance during - hyperparameter tuning. - - Arguments: - - x_train(numpy.array): Array of shape :code:`(m, n)` containing the - m n-dimensional training inputs. - - y_train(numpy.array): Array of shape :code:`(m, 1)` containing the - m training outputs. - - sigma_noise(None, float, np.array): If not `None` this value is used - to multiply the Gaussian noise - that is added to each training - batch. If None no noise is - added. - - n_folds(int): Number of folds to perform for the cross correlation. - - s(callable, None): Performance metric for the fold. If not None, - this should be a function object taking as - arguments :code:`(y_test, y_pred)`, i.e. the - expected output for the given fold :code:`y_test` - and the predicted output :code:`y_pred`. The - returned value is taken as performance metric. - - **kwargs: Additional keyword arguments are passed on to the :code:`fit` - method that is called for each fold. + if not quantiles is None: + if not quantiles == model.quantiles: + raise ValueError("Provided quantiles do not match those of " + "the provided model.") + + self.model = model + self.quantiles = model.quantiles + self.backend = model.backend + + def train(self, + training_data, + validation_data=None, + batch_size=256, + sigma_noise=None, + adversarial_training=False, + delta_at=0.01, + initial_learning_rate=1e-2, + momentum=0.0, + convergence_epochs=5, + learning_rate_decay=2.0, + learning_rate_minimum=1e-6, + maximum_epochs=200, + training_split=0.9, + gpu = False): """ - - n = x_train.shape[0] - n_test = n // n_folds - inds = np.random.permutation(np.arange(0, n)) - - results = [] - - # Nomenclature is a bit difficult here: - # Each cross validation fold has its own training, - # vaildation and test set. The size of the test set - # is number of provided training samples divided by the - # number of fold. The rest is used a traning and internal - # validation set according to the chose training_split - # ratio. - - - for i in range(n_folds): - for m in self.models: - m.reset_states() - - # Indices to use for training including training data and internal - # validation set to monitor convergence. - inds_train = np.append(np.arange(0, i * n_test), - np.arange(min((i + 1) * n_test, n), n)) - inds_train = inds[inds_train] - # Indices used to evaluate performance of the model. - inds_test = np.arange(i * n_test, (i + 1) * n_test) - inds_test = inds[inds_test] - - x_test_fold = x_train[inds_test, :] - y_test_fold = y_train[inds_test] - - # Splitting training and validation set. - x_train_fold = x_train[inds_train, :] - y_train_fold = y_train[inds_train] - - self.fit(x_train_fold, y_train_fold, - sigma_noise, **kwargs) - - # Evaluation on this folds test set. - if s: - y_pred = self.predict(x_test_fold) - results += [s(y_pred, y_test_fold)] - else: - loss = self.models[0].evaluate( - (x_test_fold - self.x_mean) / self.x_sigma, - y_test_fold) - logger.info(loss) - results += [loss] - logger.info(results) - results = np.array(results) - logger.info(results) - return (np.mean(results, axis=0), np.std(results, axis=0)) - - def fit(self, - x_train, - y_train, - sigma_noise=None, - x_val=None, - y_val=None, - **kwargs): - r""" - Train the QRNN on given training data. - - The training uses an internal validation set to monitor training - progress. This can be either split at random from the training - data (see `training_fraction` argument) or provided explicitly - using the `x_val` and `y_val` parameters - - Training of the QRNN is performed using stochastic gradient descent - (SGD). The learning rate is adaptively reduced during training when - the loss on the internal validation set has not been reduced for a - certain number of epochs. - - Two data augmentation techniques can be used to improve the - calibration of the QRNNs predictions. The first one adds Gaussian - noise to training batches before they are passed to the network. - The noise statistics are defined using the `sigma_noise` argument. - The second one is adversarial training. If adversarial training is - used, half of each training batch consists of adversarial samples - generated using the fast gradient sign method. The strength of the - perturbation is controlled using the `delta_at` parameter. - - During one epoch, each training sample is passed once to the network. - Their order is randomzied between epochs. - - Arguments: - - x_train(np.array): Array of shape `(n, m)` containing n training - samples of dimension m. - - y_train(np.array): Array of shape `(n, )` containing the training - output corresponding to the training data in - `x_train`. - - sigma_noise(None, float, np.array): If not `None` this value is used - to multiply the Gaussian noise - that is added to each training - batch. If None no noise is - added. - x_val(np.array): Array of shape :code:`(n', m)` containing n' validation - inputs that will be used to monitor training loss. Must - be provided in unison with :code:`y_val` or otherwise - will be ignored. - - y_val(np.array): Array of shape :code:`(n')` containing n' validation - outputs corresponding to the inputs in :code:`x_val`. - Must be provided in unison with :code:`x_val` or - otherwise will be ignored. - - adversarial_training(Bool): Whether or not to use adversarial training. - `False` by default. - - delta_at(flaot): Perturbation factor for the fast gradient sign method - determining the strength of the adversarial training - perturbation. `0.01` by default. - - batch_size(float): The batch size to use during training. Defaults to `512`. - - convergence_epochs(int): The number of epochs without decrease in - validation loss before the learning rate - is reduced. Defaults to `10`. - - initial_learning_rate(float): The inital value for the learning - rate. - - learning_rate_decay(float): The factor by which to reduce the - learning rate after no improvement - on the internal validation set was - observed for `convergence_epochs` - epochs. Defaults to `2.0`. - - learning_rate_minimum(float): The minimum learning rate at which - the training is terminated. Defaults - to `1e-6`. - - maximum_epochs(int): The maximum number of epochs to perform if - training does not terminate before. - - training_split(float): The ratio `0 < ts < 1.0` of the samples in - to be used as internal validation set. Defaults - to 0.9. - + Train model on given training data. + + The training is performed on the provided training data and an + optionally-provided validation set. Training can use the following + augmentation methods: + - Gaussian noise added to input + - Adversarial training + The learning rate is decreased gradually when the validation or training + loss did not decrease for a given number of epochs. + + Args: + training_data: Tuple of numpy arrays of a dataset object to use to + train the model. + validation_data: Optional validation data in the same format as the + training data. + batch_size: If training data is provided as arrays, this batch size + will be used to for the training. + sigma_noise: If training data is provided as arrays, training data + will be augmented by adding noise with the given standard + deviations to each input vector before it is presented to the + model. + adversarial_training(``bool``): Whether or not to perform + adversarial training using the fast gradient sign method. + delta_at: The scaling factor to apply for adversarial training. + initial_learning_rate(``float``): The learning rate with which the + training is started. + momentum(``float``): The momentum to use for training. + convergence_epochs(``int``): The number of epochs with + non-decreasing loss before the learning rate is decreased + learning_rate_decay(``float``): The factor by which the learning rate + is decreased. + learning_rate_minimum(``float``): The learning rate at which the + training is aborted. + maximum_epochs(``int``): For how many epochs to keep training. + training_split(``float``): If no validation data is provided, this + is the fraction of training data that is used for validation. + gpu(``bool``): Whether or not to try to run the training on the GPU. """ - if not (x_train.shape[1] == self.input_dim): - raise Exception("Training input must have the same extent along" / - "dimension 1 as input_dim (" + str(self.input_dim) / - + ")") - - if not (y_train.shape[1] == 1): - raise Exception("Currently only scalar retrieval targets are" / - " supported.") - - x_mean = np.mean(x_train, axis=0, keepdims=True) - x_sigma = np.std(x_train, axis=0, keepdims=True) - self.x_mean = x_mean - self.x_sigma = x_sigma - - # Handle parameters - # at: adversarial training - # bs: batch size - # ce: convergence epochs - # ilr: initial learning rate - # lrd: learning rate decay - # lrm: learning rate minimum - # me: maximum number of epochs - # ts: split ratio of training set - at, dat, bs, ce, ilr, lrd, lrm, me, ts, kwargs = self.__fit_params__( - kwargs) - - # Split training and validation set if x_val or y_val - # are not provided. - n = x_train.shape[0] - n_train = n - if x_val is None and y_val is None: - n_train = round(ts * n) - n_val = n - n_train - inds = np.random.permutation(n) - x_val = x_train[inds[n_train:], :] - y_val = y_train[inds[n_train:]] - x_train = x_train[inds[:n_train], :] - y_train = y_train[inds[:n_train]] - loss = QuantileLoss(self.quantiles) - - self.custom_objects = {loss.__name__: loss} - for model in self.models: - optimizer = SGD(lr=ilr) - model.compile(loss=loss, optimizer=optimizer) - - if at: - inputs = [model.input, model.targets[0], - model.sample_weights[0]] - input_gradients = K.function( - inputs, K.gradients(model.total_loss, model.input)) - training_generator = AdversarialTrainingGenerator(x_train, - self.x_mean, - self.x_sigma, - y_train, - sigma_noise, - bs, - input_gradients, - dat) - else: - training_generator = TrainingGenerator(x_train, self.x_mean, self.x_sigma, - y_train, sigma_noise, bs) - validation_generator = ValidationGenerator(x_val, self.x_mean, self.x_sigma, - y_val, sigma_noise) - lr_callback = LRDecay(model, lrd, lrm, ce) - model.fit_generator(training_generator, steps_per_epoch=n_train // bs, - epochs=me, validation_data=validation_generator, - validation_steps=1, callbacks=[lr_callback]) + return self.model.train(training_data, + validation_data, + batch_size, + sigma_noise, + adversarial_training, + delta_at, + initial_learning_rate, + momentum, + convergence_epochs, + learning_rate_decay, + learning_rate_minimum, + maximum_epochs, + training_split, + gpu) def predict(self, x): r""" @@ -715,9 +332,7 @@ def predict(self, x): quantiles of the network. """ - predictions = np.stack( - [m.predict((x - self.x_mean) / self.x_sigma) for m in self.models]) - return np.mean(predictions, axis=0) + return self.model.predict(x) def cdf(self, x): r""" @@ -726,16 +341,17 @@ def cdf(self, x): Propagates the inputs in `x` forward through the network and approximates the posterior CDF by a piecewise linear function. - The piecewise linear function is given by its values at - approximate quantiles $x_\tau$ for - :math: `\tau = \{0.0, \tau_1, \ldots, \tau_k, 1.0\}` where - :math: `\tau_k` are the quantiles to be estimated by the network. - The values for :math:`x_0.0` and :math:`x_1.0` are computed using + The piecewise linear function is given by its values at approximate + quantiles :math:`x_\tau`` for :math:`\tau = \{0.0, \tau_1, \ldots, + \tau_k, 1.0\}` where :math:`\tau_k` are the quantiles to be estimated + by the network. The values for :math:`x_{0.0}` and :math:`x_{1.0}` are + computed using .. math:: - x_0.0 = 2.0 x_{\tau_1} - x_{\tau_2} - x_1.0 = 2.0 x_{\tau_k} - x_{\tau_{k-1}} + x_{0.0} = 2.0 x_{\tau_1} - x_{\tau_2} + + x_{1.0} = 2.0 x_{\tau_k} - x_{\tau_{k-1}} Arguments: @@ -744,14 +360,19 @@ def cdf(self, x): Returns: - Tuple (xs, fs) containing the :math: `x`-values in `xs` and corresponding - values of the posterior CDF :math: `F(x)` in `fs`. + Tuple (xs, fs) containing the :math:`x`-values in `xs` and corresponding + values of the posterior CDF :math:`F(x)` in `fs`. """ - y_pred = np.zeros(self.quantiles.size + 2) - y_pred[1:-1] = self.predict(x) - y_pred[0] = 2.0 * y_pred[1] - y_pred[2] - y_pred[-1] = 2.0 * y_pred[-2] - y_pred[-3] + if len(x.shape) > 1: + s = x.shape[:-1] + (self.quantiles.size + 2,) + else: + s = (1, self.quantiles.size + 2) + + y_pred = np.zeros(s) + y_pred[:, 1:-1] = self.predict(x) + y_pred[:, 0] = 2.0 * y_pred[:, 1] - y_pred[:, 2] + y_pred[:, -1] = 2.0 * y_pred[:, -2] - y_pred[:, -3] qs = np.zeros(self.quantiles.size + 2) qs[1:-1] = self.quantiles @@ -760,88 +381,72 @@ def cdf(self, x): return y_pred, qs - def pdf(self, x, use_splines = False): + def calibration(self, *args, **kwargs): + """ + Compute calibration curve for the given dataset. + """ + return self.model.calibration(*args, *kwargs) + + def pdf(self, x): r""" Approximate the posterior probability density function (PDF) for given - inputs `x`. + inputs ``x``. - By default, the PDF is approximated by computing the derivative of the - piece-wise linear approximation of the CDF as computed by the :code:`cdf` - function. - - If :code:`use_splines` is set to :code:`True`, the PDF is computed from - a spline fit to the approximate CDF. + The PDF is approximated by computing the derivative of the piece-wise + linear approximation of the CDF as computed by the + :py:meth:`typhon.retrieval.qrnn.QRNN.cdf` function. Arguments: x(np.array): Array of shape `(n, m)` containing `n` inputs for which - to predict the conditional quantiles. - - use_splines(bool): Whether or not to use a spline fit to the CDF to - approximate the PDF. + to predict PDFs. Returns: - Tuple (xs, fs) containing the :math: `x`-values in `xs` and corresponding - values of the approximate posterior PDF :math: `F(x)` in `fs`. + Tuple (x_pdf, y_pdf) containing the array with shape `(n, k)` containing + the x and y coordinates describing the PDF for the inputs in ``x``. """ + x_cdf, y_cdf = self.cdf(x) + n, m = x_cdf.shape + x_pdf = np.zeros((n, m + 1)) - y_pred = np.zeros(self.quantiles.size) - y_pred = self.predict(x).ravel() + x_pdf[:, 0] = x_cdf[:, 0] + x_pdf[:, -1] = x_cdf[:, -1] + x_pdf[:, 1:-1] = 0.5 * (x_cdf[:, 1:] + x_cdf[:, :-1]) - y = np.zeros(y_pred.size + 1) - y[1:-1] = 0.5 * (y_pred[1:] + y_pred[:-1]) - y[0] = 2 * y_pred[0] - y_pred[1] - y[-1] = 2 * y_pred[-1] - y_pred[-2] - - if not use_splines: - - p = np.zeros(y.size) - p[1:-1] = np.diff(self.quantiles) / np.diff(y_pred) - else: + y_pdf = np.zeros((n, m + 1)) + y_pdf[:, 1:-1] = np.diff(y_cdf) / np.diff(x_cdf, axis=-1) + return x_pdf, y_pdf - y = np.zeros(y_pred.size + 2) - y[1:-1] = y_pred - y[0] = 3 * y_pred[0] - 2 * y_pred[1] - y[-1] = 3 * y_pred[-1] - 2 * y_pred[-2] - q = np.zeros(self.quantiles.size + 2) - q[1:-1] = np.array(self.quantiles) - q[0] = 0.0 - q[-1] = 1.0 - - sr = CubicSpline(y, q, bc_type = "clamped") - y = np.linspace(y[0], y[-1], 101) - p = sr(y, nu = 1) - - return y, p + def sample_posterior(self, x, n=1): + r""" + Generates :code:`n` samples from the estimated posterior + distribution for the input vector :code:`x`. The sampling + is performed by the inverse CDF method using the estimated + CDF obtained from the :code:`cdf` member function. + Arguments: - y_pred = np.zeros(self.quantiles.size + 2) - y_pred[1:-1] = self.predict(x) - y_pred[0] = 2.0 * y_pred[1] - y_pred[2] - y_pred[-1] = 2.0 * y_pred[-2] - y_pred[-3] + x(np.array): Array of shape `(n, m)` containing `n` inputs for which + to predict the conditional quantiles. - if use_splines: - x_t = np.zeros(x.size + 2) - x_t[1:-1] = x - x_t[0] = 2 * x[0] - x[1] - x_t[-1] = 2 * x[-1] - x[-2] - y_t = np.zeros(y.size + 2) - y_t[1:-1] = y - y_t[-1] = 1.0 + n(int): The number of samples to generate. - else: - logger.info(y) - x_new = np.zeros(x.size - 1) - x_new[2:-2] = 0.5 * (x[2:-3] + x[3:-2]) - x_new[0:2] = x[0:2] - x_new[-2:] = x[-2:] - y_new = np.zeros(y.size - 1) - y_new[1:-1] = np.diff(y[1:-1]) / np.diff(x[1:-1]) - return x_new, y_new + Returns: - def sample_posterior(self, x, n=1): + Tuple (xs, fs) containing the :math:`x`-values in `xs` and corresponding + values of the posterior CDF :math: `F(x)` in `fs`. + """ + result = np.zeros((x.shape[0], n)) + x_cdf, y_cdf = self.cdf(x) + for i in range(x_cdf.shape[0]): + p = np.random.rand(n) + y = np.interp(p, y_cdf, x_cdf[i, :]) + result[i, :] = y + return result + + def sample_posterior_gaussian_fit(self, x, n=1): r""" Generates :code:`n` samples from the estimated posterior distribution for the input vector :code:`x`. The sampling @@ -857,13 +462,14 @@ def sample_posterior(self, x, n=1): Returns: - Tuple (xs, fs) containing the :math: `x`-values in `xs` and corresponding + Tuple (xs, fs) containing the :math:`x`-values in `xs` and corresponding values of the posterior CDF :math: `F(x)` in `fs`. """ - y_pred, qs = self.cdf(x) - p = np.random.rand(n) - y = np.interp(p, qs, y_pred) - return y + result = np.zeros((x.shape[0], n)) + y_pred = self.predict(x) + mu, sigma = fit_gaussian_to_quantiles(y_pred, self.quantiles) + x = np.random.normal(size=(y_pred.shape[0], n)) + return mu.reshape(-1, 1) + sigma.reshape(-1, 1) * x def posterior_mean(self, x): r""" @@ -879,7 +485,7 @@ def posterior_mean(self, x): Array containing the posterior means for the provided inputs. """ y_pred, qs = self.cdf(x) - mus = y_pred[-1] - np.trapz(qs, x=y_pred) + mus = y_pred[:, -1] - np.trapz(qs, x=y_pred) return mus @staticmethod @@ -929,22 +535,19 @@ def crps(y_pred, y_test, quantiles): return np.trapz((qs - ind)**2.0, y_cdf) - def evaluate_crps(self, x, y_test): + def evaluate_crps(self, x_test, y_test): r""" Predict quantiles and compute the Continuous Ranked Probability Score (CRPS). - This function evaluates the networks prediction on the - inputs in `x` and evaluates the CRPS of the predictions - against the materializations in `y_test`. + This function evaluates the network predictions on the test data + ``x_test`` and ``y_test`` and and evaluates the CRPS. Arguments: - x(numpy.array): Array of shape `(n, m)` containing the `n` - `m`-dimensional inputs for which to evaluate - the CRPS. + x_test(numpy.array): Array of shape `(n, m)` input test data. - y_test(numpy.array): Array containing the `n` materializations of - the true conditional distribution. + y_test(numpy.array): Array of length n containing the output test + data. Returns: @@ -952,45 +555,40 @@ def evaluate_crps(self, x, y_test): inputs in `x`. """ - return QRNN.crps(self.predict(x), y_test, self.quantiles) - - def save(self, path): - r""" - Store the QRNN model in a file. - - This stores the model to a file using pickle for all - attributes that support pickling. The Keras model - is handled separately, since it can not be pickled. - - .. note:: In addition to the model file with the given filename, - additional files suffixed with :code:`_model_i` will be - created for each neural network this model consists of. - - Arguments: - - path(str): The path including filename indicating where to - store the model. + return QRNN.crps(self.predict(x_test), y_test, self.quantiles) + def classify(self, x, threshold): """ + Classify output based on posterior PDF and given numeric threshold. - f = open(path, "wb") - filename = os.path.basename(path) - name = os.path.splitext(filename)[0] - dirname = os.path.dirname(path) - - self.model_files = [] - for i, m in enumerate(self.models): - self.model_files += [name + "_model_" + str(i)] - m.save(os.path.join(dirname, self.model_files[i])) - pickle.dump(self, f) - f.close() + Args: + x: The input data as :code:`np.ndarray` or backend-specific + dataset object. + threshold: The numeric threshold to apply for classification. + """ + y = self.predict(x) + out_shape = y.shape[:1] + (1,) + y.shape[2:] + c = self.quantiles[0] * np.ones(out_shape) + + for i in range(self.quantiles.size - 1): + q_l = y[:, [i]] + q_r = y[:, [i+1]] + inds = np.logical_and(q_l < threshold, + q_r >= threshold) + c[inds] = self.quantiles[i] * (threshold - q_l[inds]) + c[inds] += self.quantiles[i + 1] * (q_r[inds] - threshold) + c[inds] /= (q_r[inds] - q_l[inds]) + + c[threshold > q_r] = self.quantiles[-1] + return 1.0 - c @staticmethod def load(path): r""" Load a model from a file. - This loads a model that has been stored using the `save` method. + This loads a model that has been stored using the + :py:meth:`typhon.retrieval.qrnn.QRNN.save` method. Arguments: @@ -1000,27 +598,37 @@ def load(path): The loaded QRNN object. """ - filename = os.path.basename(path) - dirname = os.path.dirname(path) - - f = open(path, "rb") - qrnn = pickle.load(f) - qrnn.models = [] - for mf in qrnn.model_files: - mf = os.path.basename(mf) - try: - mp = os.path.join(dirname, os.path.basename(mf)) - qrnn.models += [keras.models.load_model(mp, qrnn.custom_objects)] - except: - raise Exception("Error loading the neural network models. " \ - "Please make sure all files created during the"\ - " saving are in this folder.") - f.close() + with open(path, 'rb') as f: + qrnn = pickle.load(f) + backend = importlib.import_module(qrnn.backend) + model = backend.load_model(f, qrnn.quantiles) + qrnn.model = model return qrnn + def save(self, path): + r""" + Store the QRNN model in a file. + + This stores the model to a file using pickle for all attributes that + support pickling. The Keras model is handled separately, since it can + not be pickled. + + Arguments: + + path(str): The path including filename indicating where to + store the model. + + """ + f = open(path, "wb") + pickle.dump(self, f) + backend = importlib.import_module(self.backend) + backend.save_model(f, self.model) + f.close() + + def __getstate__(self): dct = copy.copy(self.__dict__) - dct.pop("models") + dct.pop("model") return dct def __setstate__(self, state): diff --git a/typhon/tests/retrieval/qrnn/test_qrnn.py b/typhon/tests/retrieval/qrnn/test_qrnn.py new file mode 100644 index 00000000..53b7ea03 --- /dev/null +++ b/typhon/tests/retrieval/qrnn/test_qrnn.py @@ -0,0 +1,98 @@ +""" +Tests for typhon.retrieval.qrnn module. + +Tests the QRNN implementation for all available backends. +""" +from typhon.retrieval.qrnn import QRNN, set_backend, get_backend +import numpy as np +import os +import importlib +import pytest +import tempfile + +# +# Import available backends. +# + +backends = [] +try: + import typhon.retrieval.qrnn.models.keras + + backends += ["keras"] +except: + pass + +try: + import typhon.retrieval.qrnn.models.pytorch + + backends += ["pytorch"] +except: + pass + + +class TestQrnn: + def setup_method(self): + dir = os.path.dirname(os.path.realpath(__file__)) + path = os.path.join(dir, "test_data") + x_train = np.load(os.path.join(path, "x_train.npy")) + x_mean = np.mean(x_train, keepdims=True) + x_sigma = np.std(x_train, keepdims=True) + self.x_train = (x_train - x_mean) / x_sigma + self.y_train = np.load(os.path.join(path, "y_train.npy")) + + @pytest.mark.parametrize("backend", backends) + def test_qrnn(self, backend): + """ + Test training of QRNNs using numpy arrays as input. + """ + set_backend(backend) + qrnn = QRNN(self.x_train.shape[1], np.linspace(0.05, 0.95, 10)) + qrnn.train((self.x_train, self.y_train), maximum_epochs=1) + + qrnn.predict(self.x_train) + + x, qs = qrnn.cdf(self.x_train[:2, :]) + assert qs[0] == 0.0 + assert qs[-1] == 1.0 + + x, y = qrnn.pdf(self.x_train[:2, :]) + assert x.shape == y.shape + + mu = qrnn.posterior_mean(self.x_train[:2, :]) + assert len(mu.shape) == 1 + + r = qrnn.sample_posterior(self.x_train[:4, :], n=2) + assert r.shape == (4, 2) + + r = qrnn.sample_posterior_gaussian_fit(self.x_train[:4, :], n=2) + assert r.shape == (4, 2) + + @pytest.mark.parametrize("backend", backends) + def test_qrnn_datasets(self, backend): + """ + Provide data as dataset object instead of numpy arrays. + """ + set_backend(backend) + backend = get_backend(backend) + data = backend.BatchedDataset((self.x_train, self.y_train), 256) + qrnn = QRNN(self.x_train.shape[1], np.linspace(0.05, 0.95, 10)) + qrnn.train(data, maximum_epochs=1) + + @pytest.mark.parametrize("backend", backends) + def test_save_qrnn(self, backend): + """ + Test saving and loading of QRNNs. + """ + set_backend(backend) + qrnn = QRNN(self.x_train.shape[1], np.linspace(0.05, 0.95, 10)) + f = tempfile.NamedTemporaryFile() + qrnn.save(f.name) + qrnn_loaded = QRNN.load(f.name) + + x_pred = qrnn.predict(self.x_train) + x_pred_loaded = qrnn.predict(self.x_train) + + if not type(x_pred) == np.ndarray: + x_pred = x_pred.detach() + + assert np.allclose(x_pred, x_pred_loaded)