From 3d26ce0d0d844f77f06841076355fb9c44b6b799 Mon Sep 17 00:00:00 2001 From: Aron Date: Mon, 15 Jan 2024 14:32:06 +0100 Subject: [PATCH] WIP: implement validation losses as a metric --- n3fit/src/n3fit/backends/__init__.py | 19 +- .../n3fit/backends/keras_backend/MetaModel.py | 44 +-- .../n3fit/backends/keras_backend/callbacks.py | 24 +- .../n3fit/backends/keras_backend/metrics.py | 41 +++ n3fit/src/n3fit/hyper_optimization/rewards.py | 19 +- n3fit/src/n3fit/model_gen.py | 17 +- n3fit/src/n3fit/model_trainer.py | 310 ++++++++++-------- n3fit/src/n3fit/stopping.py | 300 +++++++---------- 8 files changed, 382 insertions(+), 392 deletions(-) create mode 100644 n3fit/src/n3fit/backends/keras_backend/metrics.py diff --git a/n3fit/src/n3fit/backends/__init__.py b/n3fit/src/n3fit/backends/__init__.py index f49bbd0f53..84aaf2a8fe 100644 --- a/n3fit/src/n3fit/backends/__init__.py +++ b/n3fit/src/n3fit/backends/__init__.py @@ -1,20 +1,19 @@ -from n3fit.backends.keras_backend.internal_state import ( - set_initial_state, - clear_backend_state, - set_eager -) +from n3fit.backends.keras_backend import callbacks, constraints, operations from n3fit.backends.keras_backend.MetaLayer import MetaLayer from n3fit.backends.keras_backend.MetaModel import MetaModel from n3fit.backends.keras_backend.base_layers import ( + Concatenate, Input, - concatenate, Lambda, base_layer_selector, + concatenate, regularizer_selector, - Concatenate, ) -from n3fit.backends.keras_backend import operations -from n3fit.backends.keras_backend import constraints -from n3fit.backends.keras_backend import callbacks +from n3fit.backends.keras_backend.internal_state import ( + clear_backend_state, + set_eager, + set_initial_state, +) +from n3fit.backends.keras_backend.metrics import LossMetric print("Using Keras backend") diff --git a/n3fit/src/n3fit/backends/keras_backend/MetaModel.py b/n3fit/src/n3fit/backends/keras_backend/MetaModel.py index 1b0990bb03..7c77606ae5 100644 --- a/n3fit/src/n3fit/backends/keras_backend/MetaModel.py +++ b/n3fit/src/n3fit/backends/keras_backend/MetaModel.py @@ -119,7 +119,6 @@ def __init__(self, input_tensors, output_tensors, scaler=None, input_values=None self.single_replica_generator = None self.target_tensors = None - self.compute_losses_function = None self._scaler = scaler @tf.autograph.experimental.do_not_convert @@ -169,6 +168,7 @@ def perform_fit(self, x=None, y=None, epochs=1, **kwargs): x_params = self._parse_input(x) if y is None: y = self.target_tensors + y = {name: np.zeros((1, 1)) for name in self.loss.keys()} history = super().fit(x=x_params, y=y, epochs=epochs, **kwargs) loss_dict = history.history return loss_dict @@ -179,44 +179,6 @@ def predict(self, x=None, **kwargs): result = super().predict(x=x, **kwargs) return result - def compute_losses(self): - """ - This function is equivalent to the model ``evaluate(x,y)`` method of most TensorFlow models - which return a dictionary of losses per output layer. - The losses reported in the ``evaluate`` method for n3fit are, however, summed over replicas. - Instead the loss we are interested in is usually the output of the model (i.e., predict) - This function then generates a dict of partial losses of the model separated per replica. - i.e., the output for experiment {'LHC_exp'} will be an array of Nrep elements. - - Returns - ------- - dict - a dictionary with all partial losses of the model - """ - if self.compute_losses_function is None: - # If it is the first time we are passing through, compile the function and save it - out_names = [f"{i}_loss" for i in self.output_names] - out_names.insert(0, "loss") - - # Compile a evaluation function - @tf.function - def losses_fun(): - predictions = self(self._parse_input(None)) - # If we only have one dataset the output changes - if len(out_names) == 2: - predictions = [predictions] - total_loss = tf.reduce_sum(predictions, axis=0) - ret = [total_loss] + predictions - return dict(zip(out_names, ret)) - - self.compute_losses_function = losses_fun - - ret = self.compute_losses_function() - - # The output of this function is to be used by python (and numpy) - # so we need to convert the tensors - return _to_numpy_or_python_type(ret) - def compile( self, optimizer_name="RMSprop", @@ -236,7 +198,7 @@ def compile( - A ``target_output`` can be defined. If done in this way (for instance because we know the target data will be the same for the whole fit) the data will be compiled together with the model and won't be necessary to - input it again when calling the ``perform_fit`` or ``compute_losses`` methods. + input it again when calling the ``perform_fit`` method. Parameters ---------- @@ -282,7 +244,7 @@ def compile( target_output = [target_output] self.target_tensors = target_output - super().compile(optimizer=opt, loss=loss) + super().compile(optimizer=opt, loss=loss, **kwargs) def set_masks_to(self, names, val=0.0): """Set all mask value to the selected value diff --git a/n3fit/src/n3fit/backends/keras_backend/callbacks.py b/n3fit/src/n3fit/backends/keras_backend/callbacks.py index 7349d6be36..94e1fe07dd 100644 --- a/n3fit/src/n3fit/backends/keras_backend/callbacks.py +++ b/n3fit/src/n3fit/backends/keras_backend/callbacks.py @@ -10,9 +10,10 @@ import logging from time import time + import numpy as np import tensorflow as tf -from tensorflow.keras.callbacks import TensorBoard, Callback +from tensorflow.keras.callbacks import Callback, TensorBoard log = logging.getLogger(__name__) @@ -30,7 +31,7 @@ def __init__(self, count_range=100): self.last_time = 0 def on_epoch_end(self, epoch, logs=None): - """ At the end of every epoch it checks the time """ + """At the end of every epoch it checks the time""" new_time = time() if epoch == 0: # The first epoch is only useful for starting @@ -45,13 +46,13 @@ def on_epoch_end(self, epoch, logs=None): self.last_time = new_time def on_train_end(self, logs=None): - """ Print the results """ + """Print the results""" total_time = time() - self.starting_time n_times = len(self.all_times) # Skip the first 100 epochs to avoid fluctuations due to compilations of part of the code # by epoch 100 all parts of the code have usually been called so it's a good compromise - mean = np.mean(self.all_times[min(110, n_times-1):]) - std = np.std(self.all_times[min(110, n_times-1):]) + mean = np.mean(self.all_times[min(110, n_times - 1) :]) + std = np.std(self.all_times[min(110, n_times - 1) :]) log.info(f"> > Average time per epoch: {mean:.5} +- {std:.5} s") log.info(f"> > > Total time: {total_time/60:.5} min") @@ -77,7 +78,7 @@ def __init__(self, stopping_object, log_freq=100): self.stopping_object = stopping_object def on_epoch_end(self, epoch, logs=None): - """ Function to be called at the end of every epoch """ + """Function to be called at the end of every epoch""" print_stats = ((epoch + 1) % self.log_freq) == 0 # Note that the input logs correspond to the fit before the weights are updated self.stopping_object.monitor_chi2(logs, epoch, print_stats=print_stats) @@ -103,11 +104,13 @@ class LagrangeCallback(Callback): List of the names of the datasets to be trained multipliers: list(float) List of multipliers to be applied + losses: dict + Dictionary of losses update_freq: int each how many epochs the positivity lambda is updated """ - def __init__(self, datasets, multipliers, update_freq=100): + def __init__(self, datasets, multipliers, losses, update_freq=100): super().__init__() if len(multipliers) != len(datasets): raise ValueError("The number of datasets and multipliers do not match") @@ -115,11 +118,12 @@ def __init__(self, datasets, multipliers, update_freq=100): self.datasets = datasets self.multipliers = multipliers self.updateable_weights = [] + self.losses = losses def on_train_begin(self, logs=None): - """ Save an instance of all relevant layers """ + """Save an instance of all relevant layers""" for layer_name in self.datasets: - layer = self.model.get_layer(layer_name) + layer = self.losses[layer_name] self.updateable_weights.append(layer.weights) @tf.function @@ -133,7 +137,7 @@ def _update_weights(self): w.assign(w * multiplier) def on_epoch_end(self, epoch, logs=None): - """ Function to be called at the end of every epoch """ + """Function to be called at the end of every epoch""" if (epoch + 1) % self.update_freq == 0: self._update_weights() diff --git a/n3fit/src/n3fit/backends/keras_backend/metrics.py b/n3fit/src/n3fit/backends/keras_backend/metrics.py new file mode 100644 index 0000000000..08d4a6af8f --- /dev/null +++ b/n3fit/src/n3fit/backends/keras_backend/metrics.py @@ -0,0 +1,41 @@ +import tensorflow as tf +from tensorflow.keras.metrics import Metric + +import n3fit.backends.keras_backend.operations as op + + +class LossMetric(Metric): + """ + Implementation of the (validation) loss as a metric. + Keeps track of per replica loss internally, aggregates just for logging. + + Parameters + ---------- + loss_layer : tf.keras.layers.Layer + The loss layer to use for the metric. + agg : str + Aggregation method to use for the replicas. Can be 'sum' or 'mean'. + """ + + def __init__(self, loss_layer, agg='sum', name='val_loss', **kwargs): + super().__init__(name=name, **kwargs) + self.loss_layer = loss_layer + if agg == 'sum': + self.agg = op.sum + elif agg == 'mean': + self.agg = op.mean + else: + raise ValueError(f'agg must be sum or mean, got {agg}') + num_replicas = loss_layer.output.shape[0] + self.per_replica_losses = self.add_weight( + name="per_replica_losses", shape=(num_replicas,), initializer="zeros" + ) + + def update_state(self, y_true, y_pred, sample_weight=None): + self.per_replica_losses.assign(self.loss_layer(y_pred)) + + def result(self): + return self.agg(self.per_replica_losses) + + def reset_state(self): + self.per_replica_losses.assign(tf.zeros_like(self.per_replica_losses)) diff --git a/n3fit/src/n3fit/hyper_optimization/rewards.py b/n3fit/src/n3fit/hyper_optimization/rewards.py index 0587a47649..38a214343a 100644 --- a/n3fit/src/n3fit/hyper_optimization/rewards.py +++ b/n3fit/src/n3fit/hyper_optimization/rewards.py @@ -145,15 +145,16 @@ def fit_future_tests(n3pdfs=None, experimental_models=None, **_kwargs): # Update the mask of the last_model so that its synced with this layer last_model.get_layer(layer.name).update_mask(layer.mask) - # Compute the loss with pdf errors - pdf_chi2 = exp_model.compute_losses()["loss"][0] - - # And the loss of the best (most complete) fit - best_chi2 = last_model.compute_losses()["loss"][0] - - # Now make this into a measure of the total loss - # for instance, any deviation from the "best" value is bad - total_loss += np.abs(best_chi2 - pdf_chi2) +# TODO Aron: replace compute_losses here, is this even ever called? +# # Compute the loss with pdf errors +# pdf_chi2 = exp_model.compute_losses()["loss"][0] +# +# # And the loss of the best (most complete) fit +# best_chi2 = last_model.compute_losses()["loss"][0] +# +# # Now make this into a measure of the total loss +# # for instance, any deviation from the "best" value is bad +# total_loss += np.abs(best_chi2 - pdf_chi2) if compatibility_mode: set_eager(False) diff --git a/n3fit/src/n3fit/model_gen.py b/n3fit/src/n3fit/model_gen.py index 2494a158e0..3abe2435cc 100644 --- a/n3fit/src/n3fit/model_gen.py +++ b/n3fit/src/n3fit/model_gen.py @@ -69,7 +69,7 @@ def _generate_experimental_layer(self, pdf): output_layers = [obs(pdf) for obs in self.observables] # Finally concatenate all observables (so that experiments are one single entity) - ret = op.as_layer(op.concatenate, name=f"{self.name}_concat")(output_layers) + ret = op.as_layer(op.concatenate, name=self.name)(output_layers) if self.rotation is not None: ret = self.rotation(ret) @@ -119,10 +119,9 @@ def _generate_loss(self, mask=None): def __call__(self, obs_layer, mask=None): loss_f = self._generate_loss(mask) if self.trvl_mask_layer is not None: - masked_obs_layer = self.trvl_mask_layer(obs_layer) - return loss_f(masked_obs_layer) - else: - return loss_f(obs_layer) + obs_layer = self.trvl_mask_layer(obs_layer) + + return loss_f(obs_layer) def observable_generator( @@ -247,7 +246,7 @@ def observable_generator( if spec_dict["positivity"]: out_positivity_observables = ObservableWrapper(spec_name, model_observables, dataset_xsizes) out_positivity_losses = MaskedLossWrapper( - spec_name, + f"{spec_name}_tr", tr_mask_layer, multiplier=positivity_initial, positivity=not integrability, @@ -266,7 +265,11 @@ def observable_generator( observables = ObservableWrapper(spec_name, model_observables, dataset_xsizes, rotation=obsrot) out_tr = MaskedLossWrapper( - spec_name, tr_mask_layer, rotation=obsrot, invcovmat=invcovmat_tr, data=training_data + f"{spec_name}_tr", + tr_mask_layer, + rotation=obsrot, + invcovmat=invcovmat_tr, + data=training_data, ) out_vl = MaskedLossWrapper( f"{spec_name}_val", diff --git a/n3fit/src/n3fit/model_trainer.py b/n3fit/src/n3fit/model_trainer.py index 1ebe0dd0c2..8e85fd2fe8 100644 --- a/n3fit/src/n3fit/model_trainer.py +++ b/n3fit/src/n3fit/model_trainer.py @@ -15,7 +15,7 @@ import numpy as np from n3fit import model_gen -from n3fit.backends import MetaModel, callbacks, clear_backend_state +from n3fit.backends import LossMetric, MetaModel, callbacks, clear_backend_state from n3fit.backends import operations as op import n3fit.hyper_optimization.penalties import n3fit.hyper_optimization.rewards @@ -39,17 +39,26 @@ InputInfo = namedtuple("InputInfo", ["input", "split", "idx"]) -def _compute_masked_losses(all_observables, losses, masks): +def _compute_loss_functions(all_observables, loss_layers, masks, name): """ Takes as input a list of observables and (where neded) a mask to select the output. - Returns a list of loss(masked_obs). + Returns a dictionary {obs_name: loss(masked_obs)}. Note that the list of masks don't need to be the same size as the list of layers/observables Also note that the masks here are at the level of datasets, masking out entire folds. - Masking at the level of training/validation splits happens inside the losses. + Masking at the level of training/validation splits happens inside the loss_layers. """ - loss_names = [loss.name.strip('_val').strip('_exp') for loss in losses] - observables = [all_observables[loss_name] for loss_name in loss_names] - return [loss(obs, mask=m) for obs, loss, m in zip_longest(observables, losses, masks)] + loss_functions = {} + for loss_layer, m in zip_longest(loss_layers, masks): + obs_name = loss_layer.name.strip('_val').strip('_exp').strip('_tr') + loss_model = MetaModel( + {obs_name: all_observables[obs_name]}, + loss_layer(all_observables[obs_name], mask=m), + name=f"{obs_name}_loss_{name}", + ) + loss_model.build(input_shape=all_observables[obs_name].shape) + loss_functions[obs_name] = loss_model + + return loss_functions def _LM_initial_and_multiplier(input_initial, input_multiplier, max_lambda, steps): @@ -199,7 +208,6 @@ def __init__( self.input_list = [] self.observables = [] self.training = { - "output": [], "expdata": [], "ndata": 0, "model": None, @@ -211,16 +219,10 @@ def __init__( "integinitials": [], "folds": [], } - self.validation = { - "output": [], - "expdata": [], - "ndata": 0, - "model": None, - "folds": [], - "posdatasets": [], - } - self.experimental = {"output": [], "expdata": [], "ndata": 0, "model": None, "folds": []} + self.validation = {"expdata": [], "ndata": 0, "model": None, "folds": [], "posdatasets": []} + self.experimental = {"expdata": [], "ndata": 0, "model": None, "folds": []} self.tr_masks = [] + self.output = {"training": [], "validation": [], "experimental": []} self._fill_the_dictionaries() @@ -364,30 +366,9 @@ def _xgrid_generation(self): return InputInfo(input_layer, sp_layer, inputs_idx) - def _model_generation(self, xinput, pdf_model, partition, partition_idx): + def _observables_model_generation(self, xinput, pdf_model): """ - Fills the three dictionaries (``training``, ``validation``, ``experimental``) - with the ``model`` entry - - Compiles the validation and experimental models with fakes optimizers and learning rate - as they are never trained, but this is needed by some backends - in order to run evaluate on them. - - Before entering this function we have the input of the model - and a list of outputs, but they are not connected. - This function connects inputs with outputs by injecting the PDF. - At this point we have a PDF model that takes an input (1, None, 1) - and outputs in return (1, none, 14). - - The injection of the PDF is done by concatenating all inputs and calling - pdf_model on it. - This in turn generates an output_layer that needs to be splitted for every experiment - as we have a set of observable "functions" that each take (1, exp_xgrid_size, 14) - and output (1, masked_ndata) where masked_ndata can be the training/validation - or the experimental mask (in which cased masked_ndata == ndata). - Several models can be fitted at once by passing a list of models with a shared input - so that every mode receives the same input and the output will be concatenated at the end - the final output of the model is then (1, None, 14, n) (with n=number of parallel models). + Generate the model that computes all observables. Parameters ---------- @@ -397,21 +378,16 @@ def _model_generation(self, xinput, pdf_model, partition, partition_idx): the results of the PDF (PDF(xgrid)) among the different observables pdf_model: n3fit.backend.MetaModel a model that produces PDF values - partition: dict - Only active during k-folding, information about the partition to be fitted - partition_idx: int - Index of the partition Returns ------- - models: dict - dict of MetaModels for training, validation and experimental + observables_model: MetaModel + Model that computes all observables + all_observables: dict + dictionary of all observables, keyed by name """ - log.info("Generating the Model") + log.info("Generating the observables Model") - # For multireplica fits: - # The trainable part of the n3fit framework is a concatenation of all PDF models - # We apply the Model as Layers and save for later the model (full_pdf) full_model_input_dict, full_pdf = pdf_model.apply_as_layer({"pdf_input": xinput.input}) split_pdf_unique = xinput.split(full_pdf) @@ -421,49 +397,11 @@ def _model_generation(self, xinput, pdf_model, partition, partition_idx): # Now compute ALL observables, without any masking, into a dictionary all_observables = {obs.name: obs(pdf) for obs, pdf in zip(self.observables, split_pdf)} - - # If we are in a kfolding partition, select which datasets are out - training_mask = validation_mask = experimental_mask = [None] - if partition and partition["datasets"]: - # If we want to overfit the fold, leave the training and validation masks as [None] - # otherwise, use the mask generated for the fold. - # The experimental model instead is always limited to the fold - if not partition.get("overfit", False): - training_mask = [i[partition_idx] for i in self.training["folds"]] - validation_mask = [i[partition_idx] for i in self.validation["folds"]] - experimental_mask = [i[partition_idx] for i in self.experimental["folds"]] - - # Training and validation leave out the kofld dataset - # experiment leaves out the negation - output_tr = _compute_masked_losses(all_observables, self.training["output"], training_mask) - training = MetaModel(full_model_input_dict, output_tr) - - # Validation skips integrability and the "true" chi2 skips also positivity, - # so we must only use the corresponding subset of PDF functions - val_pdfs = [] - exp_pdfs = [] - for partial_pdf, obs in zip(split_pdf, self.training["output"]): - if not obs.positivity and not obs.integrability: - val_pdfs.append(partial_pdf) - exp_pdfs.append(partial_pdf) - elif not obs.integrability and obs.positivity: - val_pdfs.append(partial_pdf) - - # We don't want to included the integrablity in the validation - output_vl = _compute_masked_losses( - all_observables, self.validation["output"], validation_mask - ) - validation = MetaModel(full_model_input_dict, output_vl) - - # Or the positivity in the total chi2 - output_ex = _compute_masked_losses( - all_observables, self.experimental["output"], experimental_mask - ) - experimental = MetaModel(full_model_input_dict, output_ex) + model = MetaModel(full_model_input_dict, all_observables, name="observables") if self.print_summary: - training.summary() - pdf_model = training.get_layer("PDFs") + model.summary() + pdf_model = model.get_layer("PDFs") pdf_model.summary() nn_model = pdf_model.get_layer("NN_0") nn_model.summary() @@ -474,9 +412,52 @@ def _model_generation(self, xinput, pdf_model, partition, partition_idx): except ValueError: pass - models = {"training": training, "validation": validation, "experimental": experimental} + return model, all_observables + + def _losses_generation(self, all_observables, partition, partition_idx): + """ + Generates the training, validation and experimental losses. + + These first two differ in the training/validation mask, and the latter in its + partition when doing k-folding. + + Parameters + ---------- + all_observables: dict + Dictionary with all observables, outputs of the observables model + partition: dict + Only active during k-folding, information about the partition to be fitted + partition_idx: int + Index of the partition + + Returns + ------- + losses: dict + Dictionary with the losses for training, validation and experimental + Each of these is a dictionary itself, with the observable name as key + and a MetaModel computing the loss value + """ + + # If we are in a kfolding partition, select which datasets are out + kfolding_masks = {"training": [None], "validation": [None], "experimental": [None]} + if partition and partition["datasets"]: + # If we want to overfit the fold, leave the training and validation masks as [None] + # otherwise, use the mask generated for the fold. + # The experimental model instead is always limited to the fold + if not partition.get("overfit", False): + kfolding_masks["training"] = [i[partition_idx] for i in self.training["folds"]] + kfolding_masks["validation"] = [i[partition_idx] for i in self.validation["folds"]] + kfolding_masks["experimental"] = [i[partition_idx] for i in self.experimental["folds"]] + + name = {"training": "tr", "validation": "val", "experimental": "exp"} + losses = { + split: _compute_loss_functions( + all_observables, self.output[split], kfolding_masks[split], name=name[split] + ) + for split in ["training", "validation", "experimental"] + } - return models + return losses def _reset_observables(self): """ @@ -487,6 +468,9 @@ def _reset_observables(self): or be obliterated when/if the backend state is reset """ self.input_list = [] + self.observables_model = None + self.losses = None + self.observables = [] for key in ["output", "posmultipliers", "integmultipliers"]: self.training[key] = [] self.validation[key] = [] @@ -561,9 +545,9 @@ def _generate_observables( self.observables.append(exp_layer["observables"]) # Now save the masked losses - self.training["output"].append(exp_layer["output_tr"]) - self.validation["output"].append(exp_layer["output_vl"]) - self.experimental["output"].append(exp_layer["output"]) + self.output["training"].append(exp_layer["output_tr"]) + self.output["validation"].append(exp_layer["output_vl"]) + self.output["experimental"].append(exp_layer["output"]) # Generate the positivity penalty for pos_dict in self.pos_info: @@ -595,8 +579,8 @@ def _generate_observables( # Observables self.observables.append(pos_layer["observables"]) # Masked losses - self.training["output"].append(pos_layer["output_tr"]) - self.validation["output"].append(pos_layer["output_tr"]) + self.output["training"].append(pos_layer["output_tr"]) + self.output["validation"].append(pos_layer["output_tr"]) self.training["posmultipliers"].append(pos_multiplier) self.training["posinitials"].append(pos_initial) @@ -622,7 +606,7 @@ def _generate_observables( # The integrability all falls to the training self.observables.append(integ_layer["observables"]) - self.training["output"].append(integ_layer["output_tr"]) + self.output["training"].append(integ_layer["output_tr"]) self.training["integmultipliers"].append(integ_multiplier) self.training["integinitials"].append(integ_initial) @@ -696,12 +680,33 @@ def _generate_pdf( return pdf_model def _prepare_reporting(self, partition): - """Parses the information received by the :py:class:`n3fit.ModelTrainer.ModelTrainer` + """ + Parses the information received by the :py:class:`n3fit.ModelTrainer.ModelTrainer` to select the bits necessary for reporting the chi2. - Receives the chi2 partition data to see whether any dataset is to be left out + Receives the chi2 partition data to see whether any dataset is to be left out. + + Parameters + ---------- + partition: dict + dictionary containing the information of the partition + + Returns + ------- + `tr_ndata` + dictionary of {'exp' : ndata} + `vl_ndata` + dictionary of {'exp' : ndata} + `pos_set` + list of the names of the positivity sets + + Note: if there is no validation (total number of val points == 0) + then vl_ndata will point to tr_ndata """ + tr_ndata_dict = {} + vl_ndata_dict = {} + pos_set = [] + reported_keys = ["name", "count_chi2", "positivity", "integrability", "ndata", "ndata_vl"] - reporting_list = [] for exp_dict in self.all_info: reporting_dict = {k: exp_dict.get(k) for k in reported_keys} if partition: @@ -713,10 +718,22 @@ def _prepare_reporting(self, partition): frac = dataset["frac"] reporting_dict["ndata"] -= int(ndata * frac) reporting_dict["ndata_vl"] = int(ndata * (1 - frac)) - reporting_list.append(reporting_dict) - return reporting_list - def _train_and_fit(self, training_model, stopping_object, epochs=100): + exp_name = reporting_dict["name"] + if reporting_dict.get("count_chi2"): + tr_ndata = reporting_dict["ndata"] + vl_ndata = reporting_dict["ndata_vl"] + if tr_ndata: + tr_ndata_dict[exp_name] = tr_ndata + if vl_ndata: + vl_ndata_dict[exp_name] = vl_ndata + if reporting_dict.get("positivity") and not reporting_dict.get("integrability"): + pos_set.append(f"{exp_name}_tr") + if not vl_ndata_dict: + vl_ndata_dict = None + return tr_ndata_dict, vl_ndata_dict, pos_set + + def _train_and_fit(self, observables_model, training_losses, stopping_object, epochs=100): """ Trains the NN for the number of epochs given using stopping_object as the stopping criteria @@ -730,15 +747,17 @@ def _train_and_fit(self, training_model, stopping_object, epochs=100): callback_pos = callbacks.LagrangeCallback( self.training["posdatasets"], self.training["posmultipliers"], + training_losses, update_freq=PUSH_POSITIVITY_EACH, ) callback_integ = callbacks.LagrangeCallback( self.training["integdatasets"], self.training["integmultipliers"], + training_losses, update_freq=PUSH_INTEGRABILITY_EACH, ) - training_model.perform_fit( + observables_model.perform_fit( epochs=epochs, verbose=False, callbacks=self.callbacks + [callback_st, callback_pos, callback_integ], @@ -795,14 +814,22 @@ def evaluate(self, stopping_object): val_chi2 : chi2 of the validation set exp_chi2: chi2 of the experimental data (without replica or tr/vl split) """ - if self.training["model"] is None: + if self.observables_model is None: raise RuntimeError("Modeltrainer.evaluate was called before any training") - # Needs to receive a `stopping_object` in order to select the part of the - # training and the validation which are actually `chi2` and not part of the penalty - train_chi2 = stopping_object.evaluate_training(self.training["model"]) - val_chi2 = stopping_object.vl_chi2 - exp_chi2 = self.experimental["model"].compute_losses()["loss"] / self.experimental["ndata"] - return train_chi2, val_chi2, exp_chi2 + observables = self.observables_model.predict() + chi2s = {} + for split in ["training", "validation", "experimental"]: + chi2_loss_models = [ + loss_m + for loss_m in self.losses[split].values() + # don't count POS and INTEG losses + if not loss_m.name.startswith("POS") and not loss_m.name.startswith("INTEG") + ] + all_losses = [loss_m(observables).numpy() for loss_m in chi2_loss_models] + num_points = [loss_m.layers[-1].input_shape[-1] for loss_m in chi2_loss_models] + chi2s[split] = np.sum(all_losses, axis=0) / np.sum(num_points) + + return chi2s["training"], chi2s["validation"], chi2s["experimental"] def hyperparametrizable(self, params): """ @@ -900,7 +927,9 @@ def hyperparametrizable(self, params): # Model generation joins all the different observable layers # together with pdf model generated above - models = self._model_generation(xinput, pdf_model, partition, k) + observables_model, observables = self._observables_model_generation(xinput, pdf_model) + + losses = self._losses_generation(observables, partition, k) # Only after model generation, apply possible weight file # Starting every replica with the same weights @@ -912,22 +941,26 @@ def hyperparametrizable(self, params): # Reset the positivity and integrability multipliers pos_and_int = self.training["posdatasets"] + self.training["integdatasets"] initial_values = self.training["posinitials"] + self.training["posinitials"] - models["training"].reset_layer_weights_to(pos_and_int, initial_values) + # Aron TODO: check if this works + pos_and_int_inits = {name: init for name, init in zip(pos_and_int, initial_values)} + for name, init in pos_and_int_inits.items(): + layer_name = losses["training"][name].layers[-1].name + losses["training"][name].reset_layer_weights_to(layer_name, init) + losses["training"].reset_layer_weights_to(pos_and_int, initial_values) # Generate the list containing reporting info necessary for chi2 reporting = self._prepare_reporting(partition) + # TODO Aron: is this still relevant? if self.no_validation: - # Substitute the validation model with the training model - models["validation"] = models["training"] - validation_model = models["training"] - else: - validation_model = models["validation"] + # Substitute the validation losses with the training losses + losses["validation"] = losses["training"] # Generate the stopping_object this object holds statistical information about the fit # it is used to perform stopping stopping_object = Stopping( - validation_model, + observables_model, + losses["training"], reporting, pdf_model, total_epochs=epochs, @@ -936,11 +969,26 @@ def hyperparametrizable(self, params): threshold_chi2=threshold_chi2, ) - # Compile each of the models with the right parameters - for model in models.values(): - model.compile(**params["optimizer"]) + def loss_function(loss_layer): + def val_loss(y_true, y_pred): + return op.sum(loss_layer(y_pred)) + + return val_loss + + validation_metrics = { + f"{name.strip('_loss_val')}": LossMetric(loss_layer) + for name, loss_layer in losses["validation"].items() + } + training_losses = { + name: loss_function(loss_layer) for name, loss_layer in losses["training"].items() + } + observables_model.compile( + **params["optimizer"], loss=training_losses, metrics=validation_metrics + ) - passed = self._train_and_fit(models["training"], stopping_object, epochs=epochs) + passed = self._train_and_fit( + observables_model, losses["training"], stopping_object, epochs=epochs + ) if self.mode_hyperopt: # If doing a hyperparameter scan we need to keep track of the loss function @@ -948,7 +996,10 @@ def hyperparametrizable(self, params): validation_loss = np.mean(stopping_object.vl_chi2) # Compute experimental loss - exp_loss_raw = np.average(models["experimental"].compute_losses()["loss"]) + observables = observables_model.predict() + exp_loss_raw = [loss(observables) for loss in losses["experimental"].values()] + exp_loss_raw = np.average(exp_loss_raw, axis=0) + # And divide by the number of active points in this fold # it would be nice to have a ndata_per_fold variable coming in the vp object... ndata = np.sum([np.count_nonzero(i[k]) for i in self.experimental["folds"]]) @@ -971,7 +1022,7 @@ def hyperparametrizable(self, params): l_valid.append(validation_loss) l_exper.append(experimental_loss) pdfs_per_fold.append(pdf_model) - exp_models.append(models["experimental"]) + exp_models.append(losses["experimental"]) if hyper_loss > self.hyper_threshold: log.info( @@ -1006,9 +1057,8 @@ def hyperparametrizable(self, params): return dict_out # Keep a reference to the models after training for future reporting - self.training["model"] = models["training"] - self.experimental["model"] = models["experimental"] - self.validation["model"] = models["validation"] + self.observables_model = observables_model + self.losses = losses # In a normal run, the only information we need to output is the stopping object # (which contains metadata about the stopping) diff --git a/n3fit/src/n3fit/stopping.py b/n3fit/src/n3fit/stopping.py index 8f8cbc5a67..d7ed227a2a 100644 --- a/n3fit/src/n3fit/stopping.py +++ b/n3fit/src/n3fit/stopping.py @@ -20,7 +20,7 @@ This implies several changes in the behaviour of this class as the training chi2 will now be monitored for stability. In order to parse the set of loss functions coming from the backend::MetaModel, - the function `parse_losses` relies on the fact that they are all suffixed with `_loss` + the function `normalize_chi2s` relies on the fact that they are all suffixed with `_loss` the validation case, instead, is suffixed with `val_loss`. In the particular casse in which both training and validation model correspond to the same backend::MetaModel only the `_loss` suffix can be found. This is taken into account by the class `Stopping` @@ -31,6 +31,8 @@ import numpy as np +from n3fit.backends import LossMetric + log = logging.getLogger(__name__) # Put a very big number here so that we for sure discard this run @@ -44,55 +46,17 @@ THRESHOLD_POS = 1e-6 -def parse_ndata(all_data): - """ - Parses the list of dictionaries received from ModelTrainer - into a dictionary containing only the name of the experiments - together with the number of points. - - Returns - ------- - `tr_ndata` - dictionary of {'exp' : ndata} - `vl_ndata` - dictionary of {'exp' : ndata} - `pos_set`: list of the names of the positivity sets - - Note: if there is no validation (total number of val points == 0) - then vl_ndata will point to tr_ndata +def normalize_chi2s(loss_dict, chi2_point_dict, suffix="loss"): """ - tr_ndata_dict = {} - vl_ndata_dict = {} - pos_set = [] - for dictionary in all_data: - exp_name = dictionary["name"] - if dictionary.get("count_chi2"): - tr_ndata = dictionary["ndata"] - vl_ndata = dictionary["ndata_vl"] - if tr_ndata: - tr_ndata_dict[exp_name] = tr_ndata - if vl_ndata: - vl_ndata_dict[exp_name] = vl_ndata - if dictionary.get("positivity") and not dictionary.get("integrability"): - pos_set.append(exp_name) - if not vl_ndata_dict: - vl_ndata_dict = None - return tr_ndata_dict, vl_ndata_dict, pos_set - - -def parse_losses(history_object, data, suffix="loss"): - """ - Receives an object containing the chi2 - Usually a history object, but it can come in the form of a dictionary. - - It loops over the dictionary and uses the npoints_data dictionary to - normalize the chi2 and return backs a tuple (`total`, `tr_chi2`) + From a dict of all losses, select those that contribute to the chi2 (i.e. exclude positivity + and integrability penalties). + Normalize by the number of points in each dataset. Parameters ---------- - history_object: dict - A history object dictionary - data: dict + loss_dict: dict + dictionary of losses per experiment + chi2_point_dict: dict dictionary with the name of the experiments to be taken into account and the number of datapoints of the experiments suffix: str (default: ``loss``) @@ -100,36 +64,23 @@ def parse_losses(history_object, data, suffix="loss"): Returns ------- - total_loss: float - Total value for the loss dict_chi2: dict - dictionary of {'expname' : loss } + dictionary of {'expname': loss }, including the normalized total chi2 as 'total', + and the loss including penalties as 'loss' """ - try: - hobj = history_object.history - except AttributeError: # So it works whether we pass the out or the out.history - hobj = history_object - - # In the general case epochs = 1. - # In case that we are doing more than 1 epoch, take the last result as it is the result - # the model is in at the moment - # This value is only used for printing output purposes so should not have any significance dict_chi2 = {} total_points = 0 total_loss = 0 - for exp_name, npoints in data.items(): - loss = np.array(hobj[exp_name + f"_{suffix}"]) + for exp_name, npoints in chi2_point_dict.items(): + loss = np.array(loss_dict[exp_name + f"_{suffix}"]) dict_chi2[exp_name] = loss / npoints total_points += npoints total_loss += loss - # By taking the loss from the history object we would be saving the total loss - # including positivity sets and (if added/enabled) regularizsers - # instead we want to restrict ourselves to the loss coming from experiments - # total_loss = np.mean(hobj["loss"]) / total_points - total_loss /= total_points - dict_chi2["total"] = total_loss - return total_loss, dict_chi2 + dict_chi2["total"] = total_loss / total_points + dict_chi2["loss"] = loss_dict["loss"] + + return dict_chi2 class FitState: @@ -142,72 +93,25 @@ class FitState: Parameters ---------- - training_info: dict - all losses for the training model - validation_info: dict - all losses for the validation model + training_chi2s: dict + chi2 losses for the training model + validation_chi2s: dict + chi2 losses for the validation model """ - vl_ndata = None - tr_ndata = None - vl_suffix = None - - def __init__(self, training_info, validation_info): - if self.vl_ndata is None or self.tr_ndata is None or self.vl_suffix is None: - raise ValueError( - "FitState cannot be instantiated until vl_ndata, tr_ndata and vl_suffix are filled" - ) - self._training = training_info - self.validation = validation_info - self._parsed = False - self._vl_chi2 = None # These are per replica - self._tr_chi2 = None # This is an overall training chi2 - self._vl_dict = None - self._tr_dict = None - - @property - def vl_loss(self): - """Return the total validation loss as it comes from the info dictionaries""" - return self.validation.get("loss") - - @property - def tr_loss(self): - """Return the total validation loss as it comes from the info dictionaries""" - return self._training.get("loss") - - def _parse_chi2(self): - """ - Parses the chi2 from the losses according to the `tr_ndata` and - `vl_ndata` dictionaries of {dataset: n_points} - """ - if self._parsed: - return - if self._training is not None: - self._tr_chi2, self._tr_dict = parse_losses(self._training, self.tr_ndata) - if self.validation is not None: - self._vl_chi2, self._vl_dict = parse_losses( - self.validation, self.vl_ndata, suffix=self.vl_suffix - ) - - @property - def tr_chi2(self): - self._parse_chi2() - return self._tr_chi2 + def __init__(self, training_chi2s, validation_chi2s): + # these are summed over replicas + self.all_tr_chi2 = training_chi2s + # these are per replica + self.all_vl_chi2 = validation_chi2s - @property - def vl_chi2(self): - self._parse_chi2() - return self._vl_chi2 - - @property - def all_tr_chi2(self): - self._parse_chi2() - return self._tr_dict + self.tr_chi2 = self.all_tr_chi2["total"] + self.vl_chi2 = self.all_vl_chi2["total"] - @property - def all_vl_chi2(self): - self._parse_chi2() - return self._vl_dict + self.tr_loss = self.all_tr_chi2["loss"] + self.vl_loss = self.all_vl_chi2["loss"] + del self.all_tr_chi2["loss"] + del self.all_vl_chi2["loss"] def all_tr_chi2_for_replica(self, i_replica): """Return the tr chi2 per dataset for a given replica""" @@ -249,17 +153,7 @@ class FitHistory: dictionary of {dataset: n_points} for the validation data """ - def __init__(self, tr_ndata, vl_ndata): - if vl_ndata is None: - vl_ndata = tr_ndata - vl_suffix = "loss" - else: - vl_suffix = "val_loss" - # All instances of FitState should use these - FitState.tr_ndata = tr_ndata - FitState.vl_ndata = vl_ndata - FitState.vl_suffix = vl_suffix - + def __init__(self): # Save a list of status for the entire fit self._history = [] self.final_epoch = None @@ -273,24 +167,24 @@ def get_state(self, epoch): f"Tried to get obtain the state for epoch {epoch} when only {len(self._history)} epochs have been saved" ) from e - def register(self, epoch, training_info, validation_info): + def register(self, epoch, training_chi2s, validation_chi2s): """Save a new fitstate and updates the current final epoch Parameters ---------- epoch: int the current epoch of the fit - training_info: dict - all losses for the training model - validation_info: dict - all losses for the validation model + training_chi2s: dict + chi2 losses for the training model + validation_chi2s: dict + chi2 losses for the validation model Returns ------- FitState """ # Save all the information in a fitstate object - fitstate = FitState(training_info, validation_info) + fitstate = FitState(training_chi2s, validation_chi2s) self.final_epoch = epoch self._history.append(fitstate) return fitstate @@ -305,12 +199,12 @@ class Stopping: Parameters ---------- - validation_model: n3fit.backends.MetaModel - the model with the validation mask applied - (and compiled with the validation data and covmat) + observables_model: n3fit.backends.MetaModel + The model outputting the observables + training_losses: dict + dictionary of {dataset: loss} for the training data all_data_dicts: dict - list containg all dictionaries containing all information about - the experiments/validation/regularizers/etc to be parsed by Stopping + tuple of dicts containing info about training, validation, stopping datasets pdf_model: n3fit.backends.MetaModel pdf_model being trained threshold_positivity: float @@ -327,7 +221,8 @@ class Stopping: def __init__( self, - validation_model, + observables_model, + training_losses, all_data_dicts, pdf_model, threshold_positivity=THRESHOLD_POS, @@ -336,14 +231,19 @@ def __init__( threshold_chi2=10.0, dont_stop=False, ): + self._observables_model = observables_model self._pdf_model = pdf_model # Save the validation object - self._validation = validation_model + self._training_losses = training_losses # Create the History object - tr_ndata, vl_ndata, pos_sets = parse_ndata(all_data_dicts) - self._history = FitHistory(tr_ndata, vl_ndata) + tr_ndata, vl_ndata, pos_sets = all_data_dicts + self.tr_ndata = tr_ndata + self.vl_ndata = vl_ndata if vl_ndata is not None else tr_ndata + self.tr_suffix = "loss" + self.vl_suffix = "val_loss" if vl_ndata is not None else "loss" + self._history = FitHistory() # And the positivity checker self._positivity = Positivity(threshold_positivity, pos_sets) @@ -360,17 +260,19 @@ def __init__( self.total_epochs = total_epochs self._stop_epochs = [total_epochs - 1] * self._n_replicas - self._best_epochs = [None] * self._n_replicas + self._best_epochs = [-1] * self._n_replicas self.positivity_statuses = [POS_BAD] * self._n_replicas self._best_weights = [None] * self._n_replicas self._best_val_chi2s = [INITIAL_CHI2] * self._n_replicas + self._best_weights_prev = [None] * self._n_replicas + self._best_val_chi2s_prev = [INITIAL_CHI2] * self._n_replicas @property def vl_chi2(self): """Current validation chi2""" - validation_info = self._validation.compute_losses() - fitstate = FitState(None, validation_info) - return fitstate.vl_chi2 + validation_losses = self.compute_validation_losses() + validation_chi2s = normalize_chi2(validation_losses, self.vl_ndata, self.vl_suffix) + return validation_chi2s["total"] @property def e_best_chi2(self): @@ -392,25 +294,32 @@ def positivity_status(self): for each replica""" return self.positivity_statuses - def evaluate_training(self, training_model): - """Given the training model, evaluates the - model and parses the chi2 of the training datasets - - Parameters - ---------- - training_model: n3fit.backends.MetaModel - an object implementing the evaluate function - - Returns - ------- - tr_chi2: float - chi2 of the given ``training_model`` + def compute_validation_losses(self): """ - training_info = training_model.compute_losses() - fitstate = FitState(training_info, None) - return fitstate.tr_chi2 + Returns dict {loss_name: per replica losses} - def monitor_chi2(self, training_info, epoch, print_stats=False): + Used inside training loop, no observables_model call done + """ + metrics = { + m.name: m.per_replica_losses.numpy() + for m in self._observables_model.metrics + if isinstance(m, LossMetric) + } + metrics["loss"] = np.sum([m for m in metrics.values()], axis=0) + + # TODO Aron: temporary fix for some issue with POS suffix. + metrics2 = {} + for name in metrics.keys(): + if name[:3] == 'POS': + newname = name.replace('val', 'tr') + else: + newname = name + metrics2[newname] = metrics[name] + + metrics = metrics2 + return metrics + + def monitor_chi2(self, training_losses, epoch, print_stats=False): """ Function to be called at the end of every epoch. Stores the total chi2 of the training set as well as the @@ -424,7 +333,7 @@ def monitor_chi2(self, training_info, epoch, print_stats=False): Parameters ---------- - training_info: dict + training_losses: dict output of a .fit() call, dictionary of the total loss (summed over replicas) for each experiment epoch: int @@ -436,16 +345,18 @@ def monitor_chi2(self, training_info, epoch, print_stats=False): true/false according to the status of the run """ # Step 1. Check whether the fit has NaN'd and stop it if so - if np.isnan(training_info["loss"]): + if np.isnan(training_losses["loss"]): log.warning(" > NaN found, stopping activated") self.make_stop() return False # Step 2. Compute the validation metrics - validation_info = self._validation.compute_losses() + validation_losses = self.compute_validation_losses() # Step 3. Register the current point in (the) history - fitstate = self._history.register(epoch, training_info, validation_info) + training_chi2s = normalize_chi2s(training_losses, self.tr_ndata, self.tr_suffix) + validation_chi2s = normalize_chi2s(validation_losses, self.vl_ndata, self.vl_suffix) + fitstate = self._history.register(epoch, training_chi2s, validation_chi2s) if print_stats: self.print_current_stats(epoch, fitstate) @@ -456,7 +367,7 @@ def monitor_chi2(self, training_info, epoch, print_stats=False): passes = self._counts | (fitstate.vl_chi2 < self._threshold_chi2) passes &= fitstate.vl_loss < self._best_val_chi2s # And the ones that pass positivity - passes &= self._positivity(fitstate) + passes &= self._positivity(validation_losses) self._stopping_degrees += self._counts @@ -466,8 +377,10 @@ def monitor_chi2(self, training_info, epoch, print_stats=False): # By definition, if we have a ``best_epoch`` then positivity passed self.positivity_statuses[i_replica] = POS_OK - self._best_val_chi2s[i_replica] = self._history.get_state(epoch).vl_loss[i_replica] - self._best_weights[i_replica] = self._pdf_model.get_replica_weights(i_replica) + self.update_bests( + weights=self._pdf_model.get_replica_weights(i_replica), + val_chi2s=self._history.get_state(epoch).vl_loss[i_replica], + ) self._stopping_degrees[i_replica] = 0 self._counts[i_replica] = 1 @@ -482,11 +395,28 @@ def monitor_chi2(self, training_info, epoch, print_stats=False): self.make_stop() return True + def update_bests(self, weights, val_chi2s): + self._best_weights_prev = self._best_weights + self._best_val_chi2s_prev = self._best_val_chi2s + + self._best_weights = weights + self._best_val_chi2s = val_chi2s + + def revert_one_step(self): + """Validation is computed before updating the weights, so in the end we need to go back one step""" + # TODO Aron: unless we just stopped because we reached the last epoch? + self._stop_epochs = [se - 1 for se in self._stop_epochs] + self._best_epochs = [be - 1 for be in self._best_epochs] + + self._best_weights = self._best_weights_prev + self._best_val_chi2s = self._best_val_chi2s_prev + def make_stop(self): """Convenience method to set the stop_now flag and reload the history to the point of the best model if any """ self._stop_now = True + self.revert_one_step() self._restore_best_weights() def _restore_best_weights(self): @@ -597,9 +527,9 @@ def check_positivity(self, history_object): positivity_pass &= history_object[key_loss] < self.threshold return np.array(positivity_pass) - def __call__(self, fitstate): + def __call__(self, validation_losses): """ Checks whether a given FitState object passes the positivity requirement """ - return self.check_positivity(fitstate.validation) + return self.check_positivity(validation_losses)