diff --git a/CodeEntropy/config/arg_config_manager.py b/CodeEntropy/config/arg_config_manager.py index 0d809ed..5f62832 100644 --- a/CodeEntropy/config/arg_config_manager.py +++ b/CodeEntropy/config/arg_config_manager.py @@ -67,7 +67,7 @@ "grouping": { "type": str, "help": "How to group molecules for averaging", - "default": "each", + "default": "molecules", }, } diff --git a/CodeEntropy/entropy.py b/CodeEntropy/entropy.py index 55724b4..e3bf58e 100644 --- a/CodeEntropy/entropy.py +++ b/CodeEntropy/entropy.py @@ -247,7 +247,7 @@ def _get_trajectory_bounds(self): Tuple of (start, end, step) frame indices. """ start = self._args.start or 0 - end = self._args.end or -1 + end = len(self._universe.trajectory) if self._args.end == -1 else self._args.end step = self._args.step or 1 return start, end, step @@ -343,11 +343,9 @@ def _process_united_atom_entropy( f_matrix = force_matrix[key] f_matrix = self._level_manager.filter_zero_rows_columns(f_matrix) - f_matrix = f_matrix / number_frames t_matrix = torque_matrix[key] t_matrix = self._level_manager.filter_zero_rows_columns(t_matrix) - t_matrix = t_matrix / number_frames S_trans_res = ve.vibrational_entropy_calculation( f_matrix, "force", self._args.temperature, highest @@ -356,8 +354,16 @@ def _process_united_atom_entropy( t_matrix, "torque", self._args.temperature, highest ) - S_conf_res = ce.conformational_entropy_calculation( - states[key], number_frames + values = states[key] + + contains_non_empty_states = ( + np.any(values) if isinstance(values, np.ndarray) else any(values) + ) + + S_conf_res = ( + ce.conformational_entropy_calculation(values, number_frames) + if contains_non_empty_states + else 0 ) S_trans += S_trans_res @@ -395,10 +401,8 @@ def _process_vibrational_entropy( level. """ force_matrix = self._level_manager.filter_zero_rows_columns(force_matrix) - force_matrix = force_matrix / number_frames torque_matrix = self._level_manager.filter_zero_rows_columns(torque_matrix) - torque_matrix = torque_matrix / number_frames S_trans = ve.vibrational_entropy_calculation( force_matrix, "force", self._args.temperature, highest @@ -425,7 +429,22 @@ def _process_conformational_entropy( start, end, step (int): Frame bounds. n_frames (int): Number of frames used. """ - S_conf = ce.conformational_entropy_calculation(states[group_id], number_frames) + group_states = states[group_id] if group_id < len(states) else None + + if group_states is not None: + contains_state_data = ( + group_states.any() + if isinstance(group_states, np.ndarray) + else any(group_states) + ) + else: + contains_state_data = False + + S_conf = ( + ce.conformational_entropy_calculation(group_states, number_frames) + if contains_state_data + else 0 + ) self._data_logger.add_results_data(group_id, level, "Conformational", S_conf) @@ -619,13 +638,19 @@ def frequency_calculation(self, lambdas, temp): lambdas = np.array(lambdas) # Ensure input is a NumPy array logger.debug(f"Eigenvalues (lambdas): {lambdas}") - # Check for negatives and raise an error if any are found - if np.any(lambdas < 0): - logger.error(f"Negative eigenvalues encountered: {lambdas[lambdas < 0]}") - raise ValueError( - f"Negative eigenvalues encountered: {lambdas[lambdas < 0]}" + lambdas = np.real_if_close(lambdas, tol=1000) + valid_mask = ( + np.isreal(lambdas) & (lambdas > 0) & (~np.isclose(lambdas, 0, atol=1e-07)) + ) + + if len(lambdas) > np.count_nonzero(valid_mask): + logger.warning( + f"{len(lambdas) - np.count_nonzero(valid_mask)} " + f"invalid eigenvalues excluded (complex, non-positive, or near-zero)." ) + lambdas = lambdas[valid_mask].real + # Compute frequencies safely frequencies = 1 / (2 * pi) * np.sqrt(lambdas / kT) logger.debug(f"Calculated frequencies: {frequencies}") @@ -748,8 +773,11 @@ def assign_conformation( # get the values of the angle for the dihedral # dihedral angle values have a range from -180 to 180 - for timestep in data_container.trajectory[start:end:step]: - timestep_index = timestep.frame - start + indices = list(range(start, end, step)) + for timestep_index, _ in zip( + indices, data_container.trajectory[start:end:step] + ): + timestep_index = timestep_index - start value = dihedral.value() # we want postive values in range 0 to 360 to make the peak assignment # work using the fact that dihedrals have circular symetry diff --git a/CodeEntropy/levels.py b/CodeEntropy/levels.py index 6c58f2d..aa06c40 100644 --- a/CodeEntropy/levels.py +++ b/CodeEntropy/levels.py @@ -171,7 +171,7 @@ def get_matrices( f"{force_matrix.shape}, new {force_block.shape}" ) else: - force_matrix += force_block + force_matrix = force_block if torque_matrix is None: torque_matrix = np.zeros_like(torque_block) @@ -181,7 +181,7 @@ def get_matrices( f"{torque_matrix.shape}, new {torque_block.shape}" ) else: - torque_matrix += torque_block + torque_matrix = torque_block return force_matrix, torque_matrix @@ -290,18 +290,27 @@ def compute_dihedral_conformations( - dihedrals (list): List of dihedral angle definitions. """ dihedrals = self.get_dihedrals(selector, level) - num_dihedrals = len(dihedrals) - conformation = np.zeros((num_dihedrals, number_frames)) - for i, dihedral in enumerate(dihedrals): - conformation[i] = ce.assign_conformation( - selector, dihedral, number_frames, bin_width, start, end, step - ) + if len(dihedrals) == 0: + logger.debug("No dihedrals found; skipping conformation assignment.") + states = [] + else: + num_dihedrals = len(dihedrals) + conformation = np.zeros((num_dihedrals, number_frames)) - states = [ - "".join(str(int(conformation[d][f])) for d in range(num_dihedrals)) - for f in range(number_frames) - ] + for i, dihedral in enumerate(dihedrals): + conformation[i] = ce.assign_conformation( + selector, dihedral, number_frames, bin_width, start, end, step + ) + + states = [ + state + for state in ( + "".join(str(int(conformation[d][f])) for d in range(num_dihedrals)) + for f in range(number_frames) + ) + if state + ] return states @@ -733,40 +742,58 @@ def build_covariance_matrices( number_frames, ): """ - Construct force and torque covariance matrices for all molecules and levels. + Construct average force and torque covariance matrices for all molecules and + entropy levels. - Parameters: - entropy_manager (EntropyManager): Instance of the EntropyManager - reduced_atom (Universe): The reduced atom selection. - number_molecules (int): Number of molecules in the system. - levels (list): List of entropy levels per molecule. - start (int): Start frame index. - end (int): End frame index. - step (int): Step size for frame iteration. - number_frames (int): Total number of frames to process. + Parameters + ---------- + entropy_manager : EntropyManager + Instance of the EntropyManager. + reduced_atom : Universe + The reduced atom selection. + levels : dict + Dictionary mapping molecule IDs to lists of entropy levels. + groups : dict + Dictionary mapping group IDs to lists of molecule IDs. + start : int + Start frame index. + end : int + End frame index. + step : int + Step size for frame iteration. + number_frames : int + Total number of frames to process. - Returns: - tuple: A tuple containing: - - force_matrices (dict): Force covariance matrices by level. - - torque_matrices (dict): Torque covariance matrices by level. + Returns + ------- + tuple + force_avg : dict + Averaged force covariance matrices by entropy level. + torque_avg : dict + Averaged torque covariance matrices by entropy level. """ number_groups = len(groups) - force_matrices = { + + force_avg = { "ua": {}, "res": [None] * number_groups, "poly": [None] * number_groups, } - torque_matrices = { + torque_avg = { "ua": {}, "res": [None] * number_groups, "poly": [None] * number_groups, } + frame_counts = { + "ua": {}, + "res": np.zeros(number_groups, dtype=int), + "poly": np.zeros(number_groups, dtype=int), + } - for timestep in reduced_atom.trajectory[start:end:step]: - time_index = timestep.frame - start + indices = list(range(start, end, step)) + for time_index, _ in zip(indices, reduced_atom.trajectory[start:end:step]): - for group_id in groups.keys(): - molecules = groups[group_id] + for group_id, molecules in groups.items(): for mol_id in molecules: mol = entropy_manager._get_molecule_container(reduced_atom, mol_id) for level in levels[mol_id]: @@ -776,13 +803,14 @@ def build_covariance_matrices( group_id, level, levels[mol_id], - time_index, + time_index - start, number_frames, - force_matrices, - torque_matrices, + force_avg, + torque_avg, + frame_counts, ) - return force_matrices, torque_matrices + return force_avg, torque_avg def update_force_torque_matrices( self, @@ -793,22 +821,55 @@ def update_force_torque_matrices( level_list, time_index, num_frames, - force_matrices, - torque_matrices, + force_avg, + torque_avg, + frame_counts, ): """ - Update force and torque matrices for a given molecule and entropy level. + Update the running averages of force and torque covariance matrices + for a given molecule and entropy level. - Parameters: - entropy_manager (EntropyManager): Instance of the EntropyManager - mol (AtomGroup): The molecule to process. - group_id (int): Index of the group. - level (str): Current entropy level ("united_atom", "residue", or "polymer"). - level_list (list): List of levels for the molecule. - time_index (int): Index of the current frame. - num_frames (int): Total number of frames. - force_matrices (dict): Dictionary of force matrices to update. - torque_matrices (dict): Dictionary of torque matrices to update. + This function computes the force and torque covariance matrices for the + current frame and updates the existing averages in-place using the incremental + mean formula: + + new_avg = old_avg + (value - old_avg) / n + + where n is the number of frames processed so far for that molecule/level + combination. This ensures that the averages are maintained without storing + all previous frame data. + + Parameters + ---------- + entropy_manager : EntropyManager + Instance of the EntropyManager. + mol : AtomGroup + The molecule to process. + group_id : int + Index of the group to which the molecule belongs. + level : str + Current entropy level ("united_atom", "residue", or "polymer"). + level_list : list + List of entropy levels for the molecule. + time_index : int + Index of the current frame relative to the start of the trajectory slice. + num_frames : int + Total number of frames to process. + force_avg : dict + Dictionary holding the running average force matrices, keyed by entropy + level. + torque_avg : dict + Dictionary holding the running average torque matrices, keyed by entropy + level. + frame_counts : dict + Dictionary holding the count of frames processed for each molecule/level + combination. + + Returns + ------- + None + Updates are performed in-place on `force_avg`, `torque_avg`, and + `frame_counts`. """ highest = level == level_list[-1] @@ -825,11 +886,19 @@ def update_force_torque_matrices( level, num_frames, highest, - force_matrices["ua"].get(key), - torque_matrices["ua"].get(key), + None if key not in force_avg["ua"] else force_avg["ua"][key], + None if key not in torque_avg["ua"] else torque_avg["ua"][key], ) - force_matrices["ua"][key] = f_mat - torque_matrices["ua"][key] = t_mat + + if key not in force_avg["ua"]: + force_avg["ua"][key] = f_mat.copy() + torque_avg["ua"][key] = t_mat.copy() + frame_counts["ua"][key] = 1 + else: + frame_counts["ua"][key] += 1 + n = frame_counts["ua"][key] + force_avg["ua"][key] += (f_mat - force_avg["ua"][key]) / n + torque_avg["ua"][key] += (t_mat - torque_avg["ua"][key]) / n elif level in ["residue", "polymer"]: mol.trajectory[time_index] @@ -839,11 +908,23 @@ def update_force_torque_matrices( level, num_frames, highest, - force_matrices[key][group_id], - torque_matrices[key][group_id], + None if force_avg[key][group_id] is None else force_avg[key][group_id], + ( + None + if torque_avg[key][group_id] is None + else torque_avg[key][group_id] + ), ) - force_matrices[key][group_id] = f_mat - torque_matrices[key][group_id] = t_mat + + if force_avg[key][group_id] is None: + force_avg[key][group_id] = f_mat.copy() + torque_avg[key][group_id] = t_mat.copy() + frame_counts[key][group_id] = 1 + else: + frame_counts[key][group_id] += 1 + n = frame_counts[key][group_id] + force_avg[key][group_id] += (f_mat - force_avg[key][group_id]) / n + torque_avg[key][group_id] += (t_mat - torque_avg[key][group_id]) / n def filter_zero_rows_columns(self, arg_matrix): """ diff --git a/tests/test_CodeEntropy/test_entropy.py b/tests/test_CodeEntropy/test_entropy.py index 3d82a32..a5b6455 100644 --- a/tests/test_CodeEntropy/test_entropy.py +++ b/tests/test_CodeEntropy/test_entropy.py @@ -222,7 +222,7 @@ def test_get_trajectory_bounds(self): self.assertIsInstance(entropy_manager._args.end, int) self.assertIsInstance(entropy_manager._args.step, int) - self.assertEqual(entropy_manager._get_trajectory_bounds(), (0, -1, 1)) + self.assertEqual(entropy_manager._get_trajectory_bounds(), (0, 0, 1)) @patch( "argparse.ArgumentParser.parse_args", @@ -805,7 +805,7 @@ def test_frequency_calculation_0(self): Ensures that the method returns 0 when the input eigenvalue (lambda) is zero. """ - lambdas = 0 + lambdas = [0] temp = 298 run_manager = RunManager("mock_folder") @@ -815,7 +815,7 @@ def test_frequency_calculation_0(self): ) frequencies = ve.frequency_calculation(lambdas, temp) - assert frequencies == 0 + assert np.allclose(frequencies, [0.0]) def test_frequency_calculation_positive(self): """ @@ -842,30 +842,75 @@ def test_frequency_calculation_positive(self): [1899594266400.4016, 2013894687315.6213, 2195940987139.7097] ) - def test_frequency_calculation_negative(self): + def test_frequency_calculation_filters_invalid(self): """ - Test `frequency_calculation` with a negative eigenvalue. + Test `frequency_calculation` filters out invalid eigenvalues. - Ensures that the method raises a `ValueError` when any eigenvalue is negative, - as this is physically invalid for frequency calculations. + Ensures that negative, complex, and near-zero eigenvalues are excluded, + and frequencies are calculated only for valid ones. """ - lambdas = np.array([585495.0917897299, -658074.5130064893, 782425.305888707]) + lambdas = np.array( + [585495.0917897299, -658074.5130064893, 0.0, 782425.305888707] + ) temp = 298 # Create a mock RunManager and set return value for get_KT2J - run_manager = RunManager("temp_folder") - run_manager.get_KT2J + run_manager = MagicMock() + run_manager.get_KT2J.return_value = 2.479e-21 # example value in Joules # Instantiate VibrationalEntropy with mocks ve = VibrationalEntropy( run_manager, MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock() ) - # Assert that ValueError is raised due to negative eigenvalue - with self.assertRaises(ValueError) as context: - ve.frequency_calculation(lambdas, temp) + # Call the method + frequencies = ve.frequency_calculation(lambdas, temp) + + # Expected: only two valid eigenvalues used + expected_lambdas = np.array([585495.0917897299, 782425.305888707]) + expected_frequencies = ( + 1 + / (2 * np.pi) + * np.sqrt(expected_lambdas / run_manager.get_KT2J.return_value) + ) + + # Assert frequencies match expected + np.testing.assert_allclose(frequencies, expected_frequencies, rtol=1e-5) + + def test_frequency_calculation_filters_invalid_with_warning(self): + """ + Test `frequency_calculation` filters out invalid eigenvalues and logs a warning. - self.assertIn("Negative eigenvalues", str(context.exception)) + Ensures that negative, complex, and near-zero eigenvalues are excluded, + and a warning is logged about the exclusions. + """ + lambdas = np.array( + [585495.0917897299, -658074.5130064893, 0.0, 782425.305888707] + ) + temp = 298 + + run_manager = MagicMock() + run_manager.get_KT2J.return_value = 2.479e-21 # example value + + ve = VibrationalEntropy( + run_manager, MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock() + ) + + with self.assertLogs("CodeEntropy.entropy", level="WARNING") as cm: + frequencies = ve.frequency_calculation(lambdas, temp) + + # Check that warning was logged + warning_messages = "\n".join(cm.output) + self.assertIn("invalid eigenvalues excluded", warning_messages) + + # Check that only valid frequencies are returned + expected_lambdas = np.array([585495.0917897299, 782425.305888707]) + expected_frequencies = ( + 1 + / (2 * np.pi) + * np.sqrt(expected_lambdas / run_manager.get_KT2J.return_value) + ) + np.testing.assert_allclose(frequencies, expected_frequencies, rtol=1e-5) def test_vibrational_entropy_calculation_force_not_highest(self): """ diff --git a/tests/test_CodeEntropy/test_levels.py b/tests/test_CodeEntropy/test_levels.py index 734c7ef..2066592 100644 --- a/tests/test_CodeEntropy/test_levels.py +++ b/tests/test_CodeEntropy/test_levels.py @@ -257,6 +257,30 @@ def test_get_dihedrals_no_residue(self): # Should result in no resdies self.assertEqual(result, []) + def test_compute_dihedral_conformations_no_dihedrals(self): + """ + Test `compute_dihedral_conformations` when no dihedrals are found. + Ensures it returns an empty list of states. + """ + level_manager = LevelManager() + + level_manager.get_dihedrals = MagicMock(return_value=[]) + + selector = MagicMock() + + result = level_manager.compute_dihedral_conformations( + selector=selector, + level="united_atom", + number_frames=10, + bin_width=10.0, + start=0, + end=10, + step=1, + ce=MagicMock(), + ) + + self.assertEqual(result, []) + def test_get_beads_polymer_level(self): """ Test `get_beads` for 'polymer' level. diff --git a/tests/test_CodeEntropy/test_main.py b/tests/test_CodeEntropy/test_main.py index bee18f0..9a16f35 100644 --- a/tests/test_CodeEntropy/test_main.py +++ b/tests/test_CodeEntropy/test_main.py @@ -105,7 +105,7 @@ def test_main_entry_point_runs(self): config_path = os.path.join(self.test_dir, "config.yaml") with open(config_path, "w") as f: - f.write("run1:\n" " selection_string: resid 1\n") + f.write("run1:\n" " end: 60\n" " selection_string: resid 1\n") result = subprocess.run( [