diff --git a/CodeEntropy/config/arg_config_manager.py b/CodeEntropy/config/arg_config_manager.py index 7596c41..e733952 100644 --- a/CodeEntropy/config/arg_config_manager.py +++ b/CodeEntropy/config/arg_config_manager.py @@ -27,7 +27,7 @@ "kcal_force_units": { "type": bool, "default": False, - "help": "Set this to True if you have a separate force file with nonSI units.", + "help": "Set this to True if you have a separate force file with kcal units.", }, "selection_string": { "type": str, diff --git a/CodeEntropy/dihedral_tools.py b/CodeEntropy/dihedral_tools.py new file mode 100644 index 0000000..1c5d24d --- /dev/null +++ b/CodeEntropy/dihedral_tools.py @@ -0,0 +1,392 @@ +import logging + +import numpy as np +from MDAnalysis.analysis.dihedrals import Dihedral +from rich.progress import ( + BarColumn, + Progress, + SpinnerColumn, + TextColumn, + TimeElapsedColumn, +) + +logger = logging.getLogger(__name__) + + +class DihedralAnalysis: + """ + Functions for finding dihedral angles and analysing them to get the + states needed for the conformational entropy functions. + """ + + def __init__(self, universe_operations=None): + """ + Initialise with placeholders. + """ + self._universe_operations = universe_operations + self.data_container = None + self.states_ua = None + self.states_res = None + + def build_conformational_states( + self, + data_container, + levels, + groups, + start, + end, + step, + bin_width, + ): + """ + Build the conformational states descriptors based on dihedral angles + needed for the calculation of the conformational entropy. + """ + number_groups = len(groups) + states_ua = {} + states_res = [None] * number_groups + + total_items = sum( + len(levels[mol_id]) for mols in groups.values() for mol_id in mols + ) + + with Progress( + SpinnerColumn(), + TextColumn("[bold blue]{task.fields[title]}", justify="right"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.1f}%"), + TimeElapsedColumn(), + ) as progress: + + task = progress.add_task( + "[green]Building Conformational States...", + total=total_items, + title="Starting...", + ) + + for group_id in groups.keys(): + molecules = groups[group_id] + mol = self._universe_operations.get_molecule_container( + data_container, molecules[0] + ) + num_residues = len(mol.residues) + dihedrals_ua = [[] for _ in range(num_residues)] + peaks_ua = [{} for _ in range(num_residues)] + dihedrals_res = [] + peaks_res = {} + + # Identify dihedral AtomGroups + for level in levels[molecules[0]]: + if level == "united_atom": + for res_id in range(num_residues): + selection1 = mol.residues[res_id].atoms.indices[0] + selection2 = mol.residues[res_id].atoms.indices[-1] + res_container = self._universe_operations.new_U_select_atom( + mol, + f"index {selection1}:" f"{selection2}", + ) + heavy_res = self._universe_operations.new_U_select_atom( + res_container, "prop mass > 1.1" + ) + + dihedrals_ua[res_id] = self._get_dihedrals(heavy_res, level) + + elif level == "residue": + dihedrals_res = self._get_dihedrals(mol, level) + + # Identify peaks + for level in levels[molecules[0]]: + if level == "united_atom": + for res_id in range(num_residues): + if len(dihedrals_ua[res_id]) == 0: + # No dihedrals means no histogram or peaks + peaks_ua[res_id] = [] + else: + peaks_ua[res_id] = self._identify_peaks( + data_container, + molecules, + dihedrals_ua[res_id], + bin_width, + start, + end, + step, + ) + + elif level == "residue": + if len(dihedrals_res) == 0: + # No dihedrals means no histogram or peaks + peaks_res = [] + else: + peaks_res = self._identify_peaks( + data_container, + molecules, + dihedrals_res, + bin_width, + start, + end, + step, + ) + + # Assign states for each group + for level in levels[molecules[0]]: + if level == "united_atom": + for res_id in range(num_residues): + key = (group_id, res_id) + if len(dihedrals_ua[res_id]) == 0: + # No conformational states + states_ua[key] = [] + else: + states_ua[key] = self._assign_states( + data_container, + molecules, + dihedrals_ua[res_id], + peaks_ua[res_id], + start, + end, + step, + ) + + elif level == "residue": + if len(dihedrals_res) == 0: + # No conformational states + states_res[group_id] = [] + else: + states_res[group_id] = self._assign_states( + data_container, + molecules, + dihedrals_res, + peaks_res, + start, + end, + step, + ) + + progress.advance(task) + + return states_ua, states_res + + def _get_dihedrals(self, data_container, level): + """ + Define the set of dihedrals for use in the conformational entropy function. + If united atom level, the dihedrals are defined from the heavy atoms + (4 bonded atoms for 1 dihedral). + If residue level, use the bonds between residues to cast dihedrals. + Note: not using improper dihedrals only ones with 4 atoms/residues + in a linear arrangement. + + Args: + data_container (MDAnalysis.Universe): system information + level (str): level of the hierarchy (should be residue or polymer) + + Returns: + dihedrals (array): set of dihedrals + """ + # Start with empty array + dihedrals = [] + atom_groups = [] + + # if united atom level, read dihedrals from MDAnalysis universe + if level == "united_atom": + dihedrals = data_container.dihedrals + num_dihedrals = len(dihedrals) + for index in range(num_dihedrals): + atom_groups.append(dihedrals[index].atoms) + + # if residue level, looking for dihedrals involving residues + if level == "residue": + num_residues = len(data_container.residues) + logger.debug(f"Number Residues: {num_residues}") + if num_residues < 4: + logger.debug("no residue level dihedrals") + + else: + # find bonds between residues N-3:N-2 and N-1:N + for residue in range(4, num_residues + 1): + # Using MDAnalysis selection, + # assuming only one covalent bond between neighbouring residues + # TODO not written for branched polymers + atom_string = ( + "resindex " + + str(residue - 4) + + " and bonded resindex " + + str(residue - 3) + ) + atom1 = data_container.select_atoms(atom_string) + + atom_string = ( + "resindex " + + str(residue - 3) + + " and bonded resindex " + + str(residue - 4) + ) + atom2 = data_container.select_atoms(atom_string) + + atom_string = ( + "resindex " + + str(residue - 2) + + " and bonded resindex " + + str(residue - 1) + ) + atom3 = data_container.select_atoms(atom_string) + + atom_string = ( + "resindex " + + str(residue - 1) + + " and bonded resindex " + + str(residue - 2) + ) + atom4 = data_container.select_atoms(atom_string) + + atom_group = atom1 + atom2 + atom3 + atom4 + atom_groups.append(atom_group) + + logger.debug(f"Level: {level}, Dihedrals: {atom_groups}") + + return atom_groups + + def _identify_peaks( + self, + data_container, + molecules, + dihedrals, + bin_width, + start, + end, + step, + ): + """ + Build a histogram of the dihedral data and identify the peaks. + This is to give the information needed for the adaptive method + of identifying dihedral states. + """ + peak_values = [] * len(dihedrals) + for dihedral_index in range(len(dihedrals)): + phi = [] + # get the values of the angle for the dihedral + # loop over all molecules in the averaging group + # dihedral angle values have a range from -180 to 180 + for molecule in molecules: + mol = self._universe_operations.get_molecule_container( + data_container, molecule + ) + number_frames = len(mol.trajectory) + dihedral_results = Dihedral(dihedrals).run() + for timestep in range(number_frames): + value = dihedral_results.results.angles[timestep][dihedral_index] + + # We want postive values in range 0 to 360 to make + # the peak assignment. + # works using the fact that dihedrals have circular symetry + # (i.e. -15 degrees = +345 degrees) + if value < 0: + value += 360 + phi.append(value) + + # create a histogram using numpy + number_bins = int(360 / bin_width) + popul, bin_edges = np.histogram(a=phi, bins=number_bins, range=(0, 360)) + bin_value = [ + 0.5 * (bin_edges[i] + bin_edges[i + 1]) for i in range(0, len(popul)) + ] + + # identify "convex turning-points" and populate a list of peaks + # peak : a bin whose neighboring bins have smaller population + # NOTE might have problems if the peak is wide with a flat or + # sawtooth top in which case check you have a sensible bin width + + peaks = [] + for bin_index in range(number_bins): + # if there is no dihedrals in a bin then it cannot be a peak + if popul[bin_index] == 0: + pass + # being careful of the last bin + # (dihedrals have circular symmetry, the histogram does not) + elif ( + bin_index == number_bins - 1 + ): # the -1 is because the index starts with 0 not 1 + if ( + popul[bin_index] >= popul[bin_index - 1] + and popul[bin_index] >= popul[0] + ): + peaks.append(bin_value[bin_index]) + else: + if ( + popul[bin_index] >= popul[bin_index - 1] + and popul[bin_index] >= popul[bin_index + 1] + ): + peaks.append(bin_value[bin_index]) + + peak_values.append(peaks) + + logger.debug(f"Dihedral: {dihedral_index}, Peak Values: {peak_values}") + + return peak_values + + def _assign_states( + self, + data_container, + molecules, + dihedrals, + peaks, + start, + end, + step, + ): + """ + Turn the dihedral values into conformations based on the peaks + from the histogram. + Then combine these to form states for each molecule. + """ + states = None + + # get the values of the angle for the dihedral + # dihedral angle values have a range from -180 to 180 + for molecule in molecules: + conformations = [] + mol = self._universe_operations.get_molecule_container( + data_container, molecule + ) + number_frames = len(mol.trajectory) + dihedral_results = Dihedral(dihedrals).run() + for dihedral_index in range(len(dihedrals)): + conformation = [] + for timestep in range(number_frames): + value = dihedral_results.results.angles[timestep][dihedral_index] + + # We want postive values in range 0 to 360 to make + # the peak assignment. + # works using the fact that dihedrals have circular symetry + # (i.e. -15 degrees = +345 degrees) + if value < 0: + value += 360 + + # Find the turning point/peak that the snapshot is closest to. + distances = [abs(value - peak) for peak in peaks[dihedral_index]] + conformation.append(np.argmin(distances)) + + logger.debug( + f"Dihedral: {dihedral_index} Conformations: {conformation}" + ) + conformations.append(conformation) + + # for all the dihedrals available concatenate the label of each + # dihedral into the state for that frame + mol_states = [ + state + for state in ( + "".join( + str(int(conformations[d][f])) for d in range(len(dihedrals)) + ) + for f in range(number_frames) + ) + if state + ] + + if states is None: + states = mol_states + else: + states.extend(mol_states) + + logger.debug(f"States: {states}") + + return states diff --git a/CodeEntropy/entropy.py b/CodeEntropy/entropy.py index 2be1434..c5e72c1 100644 --- a/CodeEntropy/entropy.py +++ b/CodeEntropy/entropy.py @@ -27,7 +27,15 @@ class EntropyManager: """ def __init__( - self, run_manager, args, universe, data_logger, level_manager, group_molecules + self, + run_manager, + args, + universe, + data_logger, + level_manager, + group_molecules, + dihedral_analysis, + universe_operations, ): """ Initializes the EntropyManager with required components. @@ -47,6 +55,8 @@ def __init__( self._data_logger = data_logger self._level_manager = level_manager self._group_molecules = group_molecules + self._dihedral_analysis = dihedral_analysis + self._universe_operations = universe_operations self._GAS_CONST = 8.3144598484848 def execute(self): @@ -75,6 +85,8 @@ def execute(self): self._data_logger, self._level_manager, self._group_molecules, + self._dihedral_analysis, + self._universe_operations, ) ce = ConformationalEntropy( self._run_manager, @@ -83,6 +95,8 @@ def execute(self): self._data_logger, self._level_manager, self._group_molecules, + self._dihedral_analysis, + self._universe_operations, ) reduced_atom, number_molecules, levels, groups = self._initialize_molecules() @@ -124,17 +138,14 @@ def execute(self): # Identify the conformational states from dihedral angles for the # conformational entropy calculations - states_ua, states_res = self._level_manager.build_conformational_states( - self, + states_ua, states_res = self._dihedral_analysis.build_conformational_states( reduced_atom, levels, nonwater_groups, start, end, step, - number_frames, self._args.bin_width, - ce, ) # Complete the entropy calculations @@ -266,7 +277,9 @@ def _compute_entropies( ) for group_id in groups.keys(): - mol = self._get_molecule_container(reduced_atom, groups[group_id][0]) + mol = self._universe_operations.get_molecule_container( + reduced_atom, groups[group_id][0] + ) residue_group = "_".join( sorted(set(res.resname for res in mol.residues)) @@ -274,7 +287,9 @@ def _compute_entropies( group_residue_count = len(groups[group_id]) group_atom_count = 0 for mol_id in groups[group_id]: - each_mol = self._get_molecule_container(reduced_atom, mol_id) + each_mol = self._universe_operations.get_molecule_container( + reduced_atom, mol_id + ) group_atom_count += len(each_mol.atoms) self._data_logger.add_group_label( group_id, residue_group, group_residue_count, group_atom_count @@ -384,30 +399,13 @@ def _get_reduced_universe(self): return self._universe # Otherwise create a new (smaller) universe based on the selection - reduced = self._run_manager.new_U_select_atom( - self._universe, self._args.selection_string - ) + u = self._universe + selection_string = self._args.selection_string + reduced = self._universe_operations.new_U_select_atom(u, selection_string) name = f"{len(reduced.trajectory)}_frame_dump_atom_selection" self._run_manager.write_universe(reduced, name) - return reduced - - def _get_molecule_container(self, universe, molecule_id): - """ - Extracts the atom group corresponding to a single molecule from the universe. - - Args: - universe (MDAnalysis.Universe): The reduced universe. - molecule_id (int): Index of the molecule to extract. - - Returns: - MDAnalysis.Universe: Universe containing only the selected molecule. - """ - # Identify the atoms in the molecule - frag = universe.atoms.fragments[molecule_id] - selection_string = f"index {frag.indices[0]}:{frag.indices[-1]}" - # Build a new universe with only the one molecule - return self._run_manager.new_U_select_atom(universe, selection_string) + return reduced def _process_united_atom_entropy( self, @@ -476,7 +474,7 @@ def _process_united_atom_entropy( # If there are no conformational states (i.e. no dihedrals) # then the conformational entropy is zero S_conf_res = ( - ce.conformational_entropy_calculation(values, number_frames) + ce.conformational_entropy_calculation(values) if contains_non_empty_states else 0 ) @@ -607,7 +605,7 @@ def _process_conformational_entropy( # If there are no conformational states (i.e. no dihedrals) # then the conformational entropy is zero S_conf = ( - ce.conformational_entropy_calculation(group_states, number_frames) + ce.conformational_entropy_calculation(group_states) if contains_state_data else 0 ) @@ -807,14 +805,29 @@ class VibrationalEntropy(EntropyManager): """ def __init__( - self, run_manager, args, universe, data_logger, level_manager, group_molecules + self, + run_manager, + args, + universe, + data_logger, + level_manager, + group_molecules, + dihedral_analysis, + universe_operations, ): """ Initializes the VibrationalEntropy manager with all required components and defines physical constants used in vibrational entropy calculations. """ super().__init__( - run_manager, args, universe, data_logger, level_manager, group_molecules + run_manager, + args, + universe, + data_logger, + level_manager, + group_molecules, + dihedral_analysis, + universe_operations, ) self._PLANCK_CONST = 6.62607004081818e-34 @@ -942,104 +955,32 @@ class ConformationalEntropy(EntropyManager): """ def __init__( - self, run_manager, args, universe, data_logger, level_manager, group_molecules + self, + run_manager, + args, + universe, + data_logger, + level_manager, + group_molecules, + dihedral_analysis, + universe_operations, ): """ Initializes the ConformationalEntropy manager with all required components and sets the gas constant used in conformational entropy calculations. """ super().__init__( - run_manager, args, universe, data_logger, level_manager, group_molecules + run_manager, + args, + universe, + data_logger, + level_manager, + group_molecules, + dihedral_analysis, + universe_operations, ) - def assign_conformation( - self, data_container, dihedral, number_frames, bin_width, start, end, step - ): - """ - Create a state vector, showing the state in which the input dihedral is - as a function of time. The function creates a histogram from the timeseries of - the dihedral angle values and identifies points of dominant occupancy - (called CONVEX TURNING POINTS). - Based on the identified TPs, states are assigned to each configuration of the - dihedral. - - Args: - data_container (MDAnalysis Universe): data for the molecule/residue unit - dihedral (array): The dihedral angles in the unit - number_frames (int): number of frames in the trajectory - bin_width (int): the width of the histogram bit, default 30 degrees - start (int): starting frame, will default to 0 - end (int): ending frame, will default to -1 (last frame in trajectory) - step (int): spacing between frames, will default to 1 - - Returns: - conformations (array): A timeseries with integer labels describing the - state at each point in time. - - """ - conformations = np.zeros(number_frames) - phi = np.zeros(number_frames) - - # get the values of the angle for the dihedral - # dihedral angle values have a range from -180 to 180 - indices = list(range(number_frames)) - for timestep_index, _ in zip( - indices, data_container.trajectory[start:end:step] - ): - timestep_index = timestep_index - value = dihedral.value() - # we want postive values in range 0 to 360 to make the peak assignment - # works using the fact that dihedrals have circular symetry - # (i.e. -15 degrees = +345 degrees) - if value < 0: - value += 360 - phi[timestep_index] = value - - # create a histogram using numpy - number_bins = int(360 / bin_width) - popul, bin_edges = np.histogram(a=phi, bins=number_bins, range=(0, 360)) - bin_value = [ - 0.5 * (bin_edges[i] + bin_edges[i + 1]) for i in range(0, len(popul)) - ] - - # identify "convex turning-points" and populate a list of peaks - # peak : a bin whose neighboring bins have smaller population - # NOTE might have problems if the peak is wide with a flat or sawtooth - # top in which case check you have a sensible bin width - peak_values = [] - - for bin_index in range(number_bins): - # if there is no dihedrals in a bin then it cannot be a peak - if popul[bin_index] == 0: - pass - # being careful of the last bin - # (dihedrals have circular symmetry, the histogram does not) - elif ( - bin_index == number_bins - 1 - ): # the -1 is because the index starts with 0 not 1 - if ( - popul[bin_index] >= popul[bin_index - 1] - and popul[bin_index] >= popul[0] - ): - peak_values.append(bin_value[bin_index]) - else: - if ( - popul[bin_index] >= popul[bin_index - 1] - and popul[bin_index] >= popul[bin_index + 1] - ): - peak_values.append(bin_value[bin_index]) - - # go through each frame again and assign conformation state - for frame in range(number_frames): - # find the TP that the snapshot is least distant from - distances = [abs(phi[frame] - peak) for peak in peak_values] - conformations[frame] = np.argmin(distances) - - logger.debug(f"Final conformations: {conformations}") - - return conformations - - def conformational_entropy_calculation(self, states, number_frames): + def conformational_entropy_calculation(self, states): """ Function to calculate conformational entropies using eq. (7) in Higham, S.-Y. Chou, F. Gräter and R. H. Henchman, Molecular Physics, 2018, 116, @@ -1050,7 +991,6 @@ def conformational_entropy_calculation(self, states, number_frames): Args: states (array): Conformational states in the molecule - number_frames (int): The number of frames analysed Returns: S_conf_total (float) : conformational entropy @@ -1062,11 +1002,12 @@ def conformational_entropy_calculation(self, states, number_frames): # to get the entropy # entropy = sum over states p*ln(p) values, counts = np.unique(states, return_counts=True) + total_count = len(states) for state in range(len(values)): logger.debug(f"Unique states: {values}") logger.debug(f"Counts: {counts}") count = counts[state] - probability = count / number_frames + probability = count / total_count entropy = probability * np.log(probability) S_conf_total += entropy @@ -1086,14 +1027,29 @@ class OrientationalEntropy(EntropyManager): """ def __init__( - self, run_manager, args, universe, data_logger, level_manager, group_molecules + self, + run_manager, + args, + universe, + data_logger, + level_manager, + group_molecules, + dihedral_analysis, + universe_operations, ): """ Initializes the OrientationalEntropy manager with all required components and sets the gas constant used in orientational entropy calculations. """ super().__init__( - run_manager, args, universe, data_logger, level_manager, group_molecules + run_manager, + args, + universe, + data_logger, + level_manager, + group_molecules, + dihedral_analysis, + universe_operations, ) def orientational_entropy_calculation(self, neighbours_dict): diff --git a/CodeEntropy/group_molecules.py b/CodeEntropy/group_molecules.py index af91ffa..417f293 100644 --- a/CodeEntropy/group_molecules.py +++ b/CodeEntropy/group_molecules.py @@ -61,11 +61,6 @@ def _by_none(self, universe): for molecule_i in range(number_molecules): molecule_groups[molecule_i] = [molecule_i] - number_groups = len(molecule_groups) - - logger.debug(f"Number of molecule groups: {number_groups}") - logger.debug(f"Molecule groups are: {molecule_groups}") - return molecule_groups def _by_molecules(self, universe): @@ -108,9 +103,4 @@ def _by_molecules(self, universe): molecule_groups[molecule_j].append(molecule_i) break - number_groups = len(molecule_groups) - - logger.debug(f"Number of molecule groups: {number_groups}") - logger.debug(f"Molecule groups are: {molecule_groups}") - return molecule_groups diff --git a/CodeEntropy/levels.py b/CodeEntropy/levels.py index 89c185f..64768a6 100644 --- a/CodeEntropy/levels.py +++ b/CodeEntropy/levels.py @@ -22,7 +22,7 @@ class LevelManager: analysis. """ - def __init__(self): + def __init__(self, universe_operations): """ Initializes the LevelManager with placeholders for level-related data, including translational and rotational axes, number of beads, and a @@ -33,6 +33,7 @@ def __init__(self): self._trans_axes = None self._rot_axes = None self._number_of_beads = None + self._universe_operations = universe_operations def select_levels(self, data_container): """ @@ -186,140 +187,6 @@ def get_matrices( return force_matrix, torque_matrix - def get_dihedrals(self, data_container, level): - """ - Define the set of dihedrals for use in the conformational entropy function. - If united atom level, the dihedrals are defined from the heavy atoms - (4 bonded atoms for 1 dihedral). - If residue level, use the bonds between residues to cast dihedrals. - Note: not using improper dihedrals only ones with 4 atoms/residues - in a linear arrangement. - - Args: - data_container (MDAnalysis.Universe): system information - level (str): level of the hierarchy (should be residue or polymer) - - Returns: - dihedrals (array): set of dihedrals - """ - # Start with empty array - dihedrals = [] - - # if united atom level, read dihedrals from MDAnalysis universe - if level == "united_atom": - dihedrals = data_container.dihedrals - - # if residue level, looking for dihedrals involving residues - if level == "residue": - num_residues = len(data_container.residues) - logger.debug(f"Number Residues: {num_residues}") - if num_residues < 4: - logger.debug("no residue level dihedrals") - - else: - # find bonds between residues N-3:N-2 and N-1:N - for residue in range(4, num_residues + 1): - # Using MDAnalysis selection, - # assuming only one covalent bond between neighbouring residues - # TODO not written for branched polymers - atom_string = ( - "resindex " - + str(residue - 4) - + " and bonded resindex " - + str(residue - 3) - ) - atom1 = data_container.select_atoms(atom_string) - - atom_string = ( - "resindex " - + str(residue - 3) - + " and bonded resindex " - + str(residue - 4) - ) - atom2 = data_container.select_atoms(atom_string) - - atom_string = ( - "resindex " - + str(residue - 2) - + " and bonded resindex " - + str(residue - 1) - ) - atom3 = data_container.select_atoms(atom_string) - - atom_string = ( - "resindex " - + str(residue - 1) - + " and bonded resindex " - + str(residue - 2) - ) - atom4 = data_container.select_atoms(atom_string) - - atom_group = atom1 + atom2 + atom3 + atom4 - dihedrals.append(atom_group.dihedral) - - logger.debug(f"Level: {level}, Dihedrals: {dihedrals}") - - return dihedrals - - def compute_dihedral_conformations( - self, - selector, - level, - number_frames, - bin_width, - start, - end, - step, - ce, - ): - """ - Compute dihedral conformations for a given selector and entropy level. - - Parameters: - selector (AtomGroup): Atom selection to compute dihedrals for. - level (str): Entropy level ("united_atom" or "residue"). - number_frames (int): Number of frames to process. - bin_width (float): Bin width for dihedral angle discretization. - start (int): Start frame index. - end (int): End frame index. - step (int): Step size for frame iteration. - ce : Conformational Entropy class - - Returns: - states (list): List of conformation strings per frame. - """ - # Identify the dihedral angles in the residue/molecule - dihedrals = self.get_dihedrals(selector, level) - - # When there are no dihedrals, there is only one possible conformation - # so the conformational states are not relevant - if len(dihedrals) == 0: - logger.debug("No dihedrals found; skipping conformation assignment.") - states = [] - else: - # Identify the conformational label for each dihedral at each frame - num_dihedrals = len(dihedrals) - conformation = np.zeros((num_dihedrals, number_frames)) - - for i, dihedral in enumerate(dihedrals): - conformation[i] = ce.assign_conformation( - selector, dihedral, number_frames, bin_width, start, end, step - ) - - # for all the dihedrals available concatenate the label of each - # dihedral into the state for that frame - states = [ - state - for state in ( - "".join(str(int(conformation[d][f])) for d in range(num_dihedrals)) - for f in range(number_frames) - ) - if state - ] - - logger.debug(f"level: {level}, states: {states}") - return states - def get_beads(self, data_container, level): """ Function to define beads depending on the level in the hierarchy. @@ -831,14 +698,10 @@ def build_covariance_matrices( for time_index, _ in zip(indices, reduced_atom.trajectory[start:end:step]): for group_id, molecules in groups.items(): for mol_id in molecules: - mol = entropy_manager._get_molecule_container( + mol = self._universe_operations.get_molecule_container( reduced_atom, mol_id ) for level in levels[mol_id]: - mol = entropy_manager._get_molecule_container( - reduced_atom, mol_id - ) - resname = mol.atoms[0].resname resid = mol.atoms[0].resid segid = mol.atoms[0].segid @@ -940,7 +803,7 @@ def update_force_torque_matrices( if level == "united_atom": for res_id, residue in enumerate(mol.residues): key = (group_id, res_id) - res = entropy_manager._run_manager.new_U_select_atom( + res = self._universe_operations.new_U_select_atom( mol, f"index {residue.atoms.indices[0]}:{residue.atoms.indices[-1]}" ) @@ -1054,134 +917,3 @@ def filter_zero_rows_columns(self, arg_matrix): logger.debug(f"arg_matrix: {arg_matrix}") return arg_matrix - - def build_conformational_states( - self, - entropy_manager, - reduced_atom, - levels, - groups, - start, - end, - step, - number_frames, - bin_width, - ce, - ): - """ - Construct the conformational states for each molecule at - relevant levels. - - Parameters: - entropy_manager (EntropyManager): Instance of the EntropyManager - reduced_atom (Universe): The reduced atom selection. - levels (list): List of entropy levels per molecule. - groups (dict): Groups for averaging over molecules. - start (int): Start frame index. - end (int): End frame index. - step (int): Step size for frame iteration. - number_frames (int): Total number of frames to process. - bin_width (int): Width of histogram bins. - ce: Conformational Entropy object - - Returns: - tuple: A tuple containing: - - states_ua (dict): Conformational states at the united-atom level. - - states_res (list): Conformational states at the residue level. - """ - number_groups = len(groups) - states_ua = {} - states_res = [None] * number_groups - - total_items = sum( - len(levels[mol_id]) for mols in groups.values() for mol_id in mols - ) - - with Progress( - SpinnerColumn(), - TextColumn("[bold blue]{task.fields[title]}", justify="right"), - BarColumn(), - TextColumn("[progress.percentage]{task.percentage:>3.1f}%"), - TimeElapsedColumn(), - ) as progress: - - task = progress.add_task( - "[green]Building Conformational States...", - total=total_items, - title="Starting...", - ) - - for group_id in groups.keys(): - molecules = groups[group_id] - for mol_id in molecules: - mol = entropy_manager._get_molecule_container(reduced_atom, mol_id) - - resname = mol.atoms[0].resname - resid = mol.atoms[0].resid - segid = mol.atoms[0].segid - - mol_label = f"{resname}_{resid} (segid {segid})" - - for level in levels[mol_id]: - progress.update( - task, - title=f"Building conformational states | " - f"Molecule: {mol_label} | " - f"Level: {level}", - ) - - if level == "united_atom": - for res_id, residue in enumerate(mol.residues): - key = (group_id, res_id) - - res_container = ( - entropy_manager._run_manager.new_U_select_atom( - mol, - f"index {residue.atoms.indices[0]}:" - f"{residue.atoms.indices[-1]}", - ) - ) - heavy_res = ( - entropy_manager._run_manager.new_U_select_atom( - res_container, "prop mass > 1.1" - ) - ) - states = self.compute_dihedral_conformations( - heavy_res, - level, - number_frames, - bin_width, - start, - end, - step, - ce, - ) - - if key in states_ua: - states_ua[key].extend(states) - else: - states_ua[key] = states - - elif level == "residue": - states = self.compute_dihedral_conformations( - mol, - level, - number_frames, - bin_width, - start, - end, - step, - ce, - ) - - if states_res[group_id] is None: - states_res[group_id] = states - else: - states_res[group_id].extend(states) - - progress.advance(task) - - logger.debug(f"states_ua {states_ua}") - logger.debug(f"states_res {states_res}") - - return states_ua, states_res diff --git a/CodeEntropy/mda_universe_operations.py b/CodeEntropy/mda_universe_operations.py new file mode 100644 index 0000000..d7f739e --- /dev/null +++ b/CodeEntropy/mda_universe_operations.py @@ -0,0 +1,159 @@ +import logging + +import MDAnalysis as mda +from MDAnalysis.analysis.base import AnalysisFromFunction +from MDAnalysis.coordinates.memory import MemoryReader + +logger = logging.getLogger(__name__) + + +class UniverseOperations: + """ + Functions to create and manipulate MDAnalysis Universe objects. + """ + + def __init__(self): + """ + Initialise class + """ + self._universe = None + + def new_U_select_frame(self, u, start=None, end=None, step=1): + """Create a reduced universe by dropping frames according to + user selection. + + Parameters + ---------- + u : MDAnalyse.Universe + A Universe object will all topology, dihedrals,coordinates and force + information + start : int or None, Optional, default: None + Frame id to start analysis. Default None will start from frame 0 + end : int or None, Optional, default: None + Frame id to end analysis. Default None will end at last frame + step : int, Optional, default: 1 + Steps between frame. + + Returns + ------- + u2 : MDAnalysis.Universe + reduced universe + """ + if start is None: + start = 0 + if end is None: + end = len(u.trajectory) + select_atom = u.select_atoms("all", updating=True) + coordinates = ( + AnalysisFromFunction(lambda ag: ag.positions.copy(), select_atom) + .run() + .results["timeseries"][start:end:step] + ) + forces = ( + AnalysisFromFunction(lambda ag: ag.forces.copy(), select_atom) + .run() + .results["timeseries"][start:end:step] + ) + u2 = mda.Merge(select_atom) + u2.load_new(coordinates, format=MemoryReader, forces=forces) + logger.debug(f"MDAnalysis.Universe - reduced universe: {u2}") + + return u2 + + def new_U_select_atom(self, u, select_string="all"): + """Create a reduced universe by dropping atoms according to + user selection. + + Parameters + ---------- + u : MDAnalyse.Universe + A Universe object will all topology, dihedrals,coordinates and force + information + select_string : str, Optional, default: 'all' + MDAnalysis.select_atoms selection string. + + Returns + ------- + u2 : MDAnalysis.Universe + reduced universe + + """ + select_atom = u.select_atoms(select_string, updating=True) + coordinates = ( + AnalysisFromFunction(lambda ag: ag.positions.copy(), select_atom) + .run() + .results["timeseries"] + ) + forces = ( + AnalysisFromFunction(lambda ag: ag.forces.copy(), select_atom) + .run() + .results["timeseries"] + ) + u2 = mda.Merge(select_atom) + u2.load_new(coordinates, format=MemoryReader, forces=forces) + logger.debug(f"MDAnalysis.Universe - reduced universe: {u2}") + + return u2 + + def get_molecule_container(self, universe, molecule_id): + """ + Extracts the atom group corresponding to a single molecule from the universe. + + Args: + universe (MDAnalysis.Universe): The reduced universe. + molecule_id (int): Index of the molecule to extract. + + Returns: + MDAnalysis.Universe: Universe containing only the selected molecule. + """ + # Identify the atoms in the molecule + frag = universe.atoms.fragments[molecule_id] + selection_string = f"index {frag.indices[0]}:{frag.indices[-1]}" + + return self.new_U_select_atom(universe, selection_string) + + def merge_forces(self, tprfile, trrfile, forcefile, fileformat=None, kcal=False): + """ + Creates a universe by merging the coordinates and forces from + different input files. + + Args: + tprfile : Topology input file + trrfile : Coordinate trajectory file + forcefile : Force trajectory file + format : Optional string for MDAnalysis identifying the file format + kcal : Optional Boolean for when the forces are in kcal not kJ + + Returns: + MDAnalysis Universe object + """ + + logger.debug(f"Loading Universe with {trrfile}") + u = mda.Universe(tprfile, trrfile, format=fileformat) + + logger.debug(f"Loading Universe with {forcefile}") + u_force = mda.Universe(tprfile, forcefile, format=fileformat) + + select_atom = u.select_atoms("all") + select_atom_force = u_force.select_atoms("all") + + coordinates = ( + AnalysisFromFunction(lambda ag: ag.positions.copy(), select_atom) + .run() + .results["timeseries"] + ) + forces = ( + AnalysisFromFunction(lambda ag: ag.positions.copy(), select_atom_force) + .run() + .results["timeseries"] + ) + + if kcal: + # Convert from kcal to kJ + forces *= 4.184 + + logger.debug("Merging forces with coordinates universe.") + new_universe = mda.Merge(select_atom) + new_universe.load_new(coordinates, forces=forces) + + return new_universe diff --git a/CodeEntropy/run.py b/CodeEntropy/run.py index 71b55ad..20cbae6 100644 --- a/CodeEntropy/run.py +++ b/CodeEntropy/run.py @@ -6,8 +6,6 @@ import requests import yaml from art import text2art -from MDAnalysis.analysis.base import AnalysisFromFunction -from MDAnalysis.coordinates.memory import MemoryReader from rich.align import Align from rich.console import Group from rich.padding import Padding @@ -19,9 +17,11 @@ from CodeEntropy.config.arg_config_manager import ConfigManager from CodeEntropy.config.data_logger import DataLogger from CodeEntropy.config.logging_config import LoggingConfig +from CodeEntropy.dihedral_tools import DihedralAnalysis from CodeEntropy.entropy import EntropyManager from CodeEntropy.group_molecules import GroupMolecules from CodeEntropy.levels import LevelManager +from CodeEntropy.mda_universe_operations import UniverseOperations logger = logging.getLogger(__name__) console = LoggingConfig.get_console() @@ -247,52 +247,34 @@ def run_entropy_workflow(self): # Load MDAnalysis Universe tprfile = args.top_traj_file[0] trrfile = args.top_traj_file[1:] - fileformat = args.file_format - logger.debug(f"Loading Universe with {tprfile} and {trrfile}") - u = mda.Universe(tprfile, trrfile, format=fileformat) - - # If forces are in separate file merge them with the - # coordinates from the trajectory file forcefile = args.force_file - if forcefile is not None: - logger.debug(f"Loading Universe with {forcefile}") - u_force = mda.Universe(tprfile, forcefile, format=fileformat) - select_atom = u.select_atoms("all") - select_atom_force = u_force.select_atoms("all") - - coordinates = ( - AnalysisFromFunction( - lambda ag: ag.positions.copy(), select_atom - ) - .run() - .results["timeseries"] - ) - forces = ( - AnalysisFromFunction( - lambda ag: ag.positions.copy(), select_atom_force - ) - .run() - .results["timeseries"] - ) - - if args.kcal_force_units: - # Convert from kcal to kJ - forces *= 4.184 + fileformat = args.file_format + kcal_units = args.kcal_force_units - logger.debug("Merging forces with coordinates universe.") - new_universe = mda.Merge(select_atom) - new_universe.load_new(coordinates, forces=forces) + # Create shared UniverseOperations instance + universe_operations = UniverseOperations() - u = new_universe + if forcefile is None: + logger.debug(f"Loading Universe with {tprfile} and {trrfile}") + u = mda.Universe(tprfile, trrfile, format=fileformat) + else: + u = universe_operations.merge_forces( + tprfile, trrfile, forcefile, fileformat, kcal_units + ) self._config_manager.input_parameters_validation(u, args) # Create LevelManager instance - level_manager = LevelManager() + level_manager = LevelManager(universe_operations) # Create GroupMolecules instance group_molecules = GroupMolecules() + # Create shared DihedralAnalysis with injected universe_operations + dihedral_analysis = DihedralAnalysis( + universe_operations=universe_operations + ) + # Inject all dependencies into EntropyManager entropy_manager = EntropyManager( run_manager=self, @@ -301,6 +283,8 @@ def run_entropy_workflow(self): data_logger=self._data_logger, level_manager=level_manager, group_molecules=group_molecules, + dihedral_analysis=dihedral_analysis, + universe_operations=universe_operations, ) entropy_manager.execute() @@ -311,79 +295,6 @@ def run_entropy_workflow(self): logger.error(f"RunManager encountered an error: {e}", exc_info=True) raise - def new_U_select_frame(self, u, start=None, end=None, step=1): - """Create a reduced universe by dropping frames according to user selection - - Parameters - ---------- - u : MDAnalyse.Universe - A Universe object will all topology, dihedrals,coordinates and force - information - start : int or None, Optional, default: None - Frame id to start analysis. Default None will start from frame 0 - end : int or None, Optional, default: None - Frame id to end analysis. Default None will end at last frame - step : int, Optional, default: 1 - Steps between frame. - - Returns - ------- - u2 : MDAnalysis.Universe - reduced universe - """ - if start is None: - start = 0 - if end is None: - end = len(u.trajectory) - select_atom = u.select_atoms("all", updating=True) - coordinates = ( - AnalysisFromFunction(lambda ag: ag.positions.copy(), select_atom) - .run() - .results["timeseries"][start:end:step] - ) - forces = ( - AnalysisFromFunction(lambda ag: ag.forces.copy(), select_atom) - .run() - .results["timeseries"][start:end:step] - ) - u2 = mda.Merge(select_atom) - u2.load_new(coordinates, format=MemoryReader, forces=forces) - logger.debug(f"MDAnalysis.Universe - reduced universe: {u2}") - return u2 - - def new_U_select_atom(self, u, select_string="all"): - """Create a reduced universe by dropping atoms according to user selection - - Parameters - ---------- - u : MDAnalyse.Universe - A Universe object will all topology, dihedrals,coordinates and force - information - select_string : str, Optional, default: 'all' - MDAnalysis.select_atoms selection string. - - Returns - ------- - u2 : MDAnalysis.Universe - reduced universe - - """ - select_atom = u.select_atoms(select_string, updating=True) - coordinates = ( - AnalysisFromFunction(lambda ag: ag.positions.copy(), select_atom) - .run() - .results["timeseries"] - ) - forces = ( - AnalysisFromFunction(lambda ag: ag.forces.copy(), select_atom) - .run() - .results["timeseries"] - ) - u2 = mda.Merge(select_atom) - u2.load_new(coordinates, format=MemoryReader, forces=forces) - logger.debug(f"MDAnalysis.Universe - reduced universe: {u2}") - return u2 - def write_universe(self, u, name="default"): """Write a universe to working directories as pickle diff --git a/tests/test_CodeEntropy/test_dihedral_tools.py b/tests/test_CodeEntropy/test_dihedral_tools.py new file mode 100644 index 0000000..99071f9 --- /dev/null +++ b/tests/test_CodeEntropy/test_dihedral_tools.py @@ -0,0 +1,571 @@ +from unittest.mock import MagicMock, patch + +from CodeEntropy.dihedral_tools import DihedralAnalysis +from tests.test_CodeEntropy.test_base import BaseTestCase + + +class TestDihedralAnalysis(BaseTestCase): + """ + Unit tests for DihedralAnalysis. + """ + + def setUp(self): + super().setUp() + self.analysis = DihedralAnalysis() + + def test_get_dihedrals_united_atom(self): + """ + Test `_get_dihedrals` for 'united_atom' level. + + The function should: + - read dihedrals from `data_container.dihedrals` + - extract `.atoms` from each dihedral + - return a list of atom groups + + Expected behavior: + If dihedrals = [d1, d2, d3] and each dihedral has an `.atoms` + attribute, then the returned list must be: + [d1.atoms, d2.atoms, d3.atoms] + """ + data_container = MagicMock() + + # Mock dihedral objects with `.atoms` + d1 = MagicMock() + d1.atoms = "atoms1" + d2 = MagicMock() + d2.atoms = "atoms2" + d3 = MagicMock() + d3.atoms = "atoms3" + + data_container.dihedrals = [d1, d2, d3] + + result = self.analysis._get_dihedrals(data_container, level="united_atom") + + self.assertEqual(result, ["atoms1", "atoms2", "atoms3"]) + + def test_get_dihedrals_residue(self): + """ + Test `_get_dihedrals` for 'residue' level with 5 residues. + + The implementation: + - iterates over residues 4 → N + - for each, selects 4 bonded atom groups + - merges them using __add__ to form a single atom_group + - appends to result list + + For 5 residues (0–4), two dihedral groups should be created. + Expected: + - result of length 2 + - each item equal to the merged mock atom group + """ + data_container = MagicMock() + data_container.residues = [0, 1, 2, 3, 4] + + mock_atom_group = MagicMock() + mock_atom_group.__add__.return_value = mock_atom_group + + # Every MDAnalysis selection returns the same mock atom group + data_container.select_atoms.return_value = mock_atom_group + + result = self.analysis._get_dihedrals(data_container, level="residue") + + self.assertEqual(len(result), 2) + self.assertTrue(all(r is mock_atom_group for r in result)) + + def test_get_dihedrals_no_residue(self): + """ + Test `_get_dihedrals` for 'residue' level when fewer than + 4 residues exist (here: 3 residues). + + Expected: + - The function returns an empty list. + """ + data_container = MagicMock() + data_container.residues = [0, 1, 2] # Only 3 residues → too few + + result = self.analysis._get_dihedrals(data_container, level="residue") + + self.assertEqual(result, []) + + @patch("CodeEntropy.dihedral_tools.Dihedral") + def test_identify_peaks_empty_dihedrals(self, Dihedral_patch): + """ + Test `_identify_peaks` returns an empty list when the + input dihedral list is empty. + + Expected: + - No angle extraction occurs. + - No histograms computed. + - Return value is an empty list. + """ + universe_operations = MagicMock() + analysis = DihedralAnalysis(universe_operations) + + peaks = analysis._identify_peaks( + data_container=MagicMock(), + molecules=[0], + dihedrals=[], + bin_width=10, + start=0, + end=360, + step=1, + ) + + assert peaks == [] + + @patch("CodeEntropy.dihedral_tools.Dihedral") + def test_identify_peaks_negative_angles_become_positive(self, Dihedral_patch): + """ + Test that negative dihedral angles are converted into the + 0–360° range before histogramming. + + Scenario: + - A single dihedral produces a single angle: -15°. + - This should be converted to +345°. + - With 90° bins, it falls into the final bin → one peak. + + Expected: + - One peak detected. + - Peak center lies between 300° and 360°. + """ + universe_operations = MagicMock() + analysis = DihedralAnalysis(universe_operations) + + R = MagicMock() + R.results.angles = [[-15]] + Dihedral_patch.return_value.run.return_value = R + + mol = MagicMock() + mol.trajectory = [0] + universe_operations.get_molecule_container.return_value = mol + + peaks = analysis._identify_peaks( + MagicMock(), + [0], + dihedrals=[MagicMock()], + bin_width=90, + start=0, + end=360, + step=1, + ) + + assert len(peaks) == 1 + assert len(peaks[0]) == 1 + assert 300 <= peaks[0][0] <= 360 + + @patch("CodeEntropy.dihedral_tools.Dihedral") + def test_identify_peaks_internal_peak_detection(self, Dihedral_patch): + """ + Test the detection of a peak located in a middle histogram bin. + + Scenario: + - Angles fall into bin #1 (45°, 50°, 55°). + - Bin 1 has higher population than its neighbors. + + Expected: + - Exactly one peak is detected. + """ + universe_operations = MagicMock() + analysis = DihedralAnalysis(universe_operations) + + R = MagicMock() + R.results.angles = [[45], [50], [55]] + Dihedral_patch.return_value.run.return_value = R + + mol = MagicMock() + mol.trajectory = [0, 1, 2] + universe_operations.get_molecule_container.return_value = mol + + peaks = analysis._identify_peaks( + MagicMock(), + [0], + dihedrals=[MagicMock()], + bin_width=90, + start=0, + end=360, + step=1, + ) + + assert len(peaks[0]) == 1 + + @patch("CodeEntropy.dihedral_tools.Dihedral") + def test_identify_peaks_circular_boundary(self, Dihedral_patch): + """ + Test that `_identify_peaks` handles circular histogram boundaries + correctly when identifying peaks in the last bin. + + Setup: + - All angles are near 350°, falling into the final bin. + + Expected: + - The final bin is correctly identified as a peak. + """ + ops = MagicMock() + analysis = DihedralAnalysis(ops) + + R = MagicMock() + R.results.angles = [[350], [355], [349]] + Dihedral_patch.return_value.run.return_value = R + + mol = MagicMock() + mol.trajectory = [0, 1, 2] + ops.get_molecule_container.return_value = mol + + peaks = analysis._identify_peaks( + MagicMock(), + [0], + dihedrals=[MagicMock()], + bin_width=90, + start=0, + end=360, + step=1, + ) + + assert len(peaks[0]) == 1 + + @patch("CodeEntropy.dihedral_tools.Dihedral") + def test_identify_peaks_circular_last_bin(self, Dihedral_patch): + """ + Test peak detection for circular histogram boundaries, where the + last bin compares against the first bin. + + Scenario: + - All angles near 350° fall into the final bin. + - Final bin should be considered a peak if it exceeds both + previous and first bins. + + Expected: + - One peak detected in the last bin. + """ + universe_operations = MagicMock() + analysis = DihedralAnalysis(universe_operations) + + R = MagicMock() + R.results.angles = [[350], [355], [349]] + Dihedral_patch.return_value.run.return_value = R + + mol = MagicMock() + mol.trajectory = [0, 1, 2] + universe_operations.get_molecule_container.return_value = mol + + peaks = analysis._identify_peaks( + MagicMock(), + [0], + dihedrals=[MagicMock()], + bin_width=90, + start=0, + end=360, + step=1, + ) + + assert len(peaks[0]) == 1 + + @patch("CodeEntropy.dihedral_tools.Dihedral") + def test_assign_states_negative_angle_conversion(self, Dihedral_patch): + """ + Test `_assign_states` converts negative angles correctly and assigns + the dihedral to the nearest peak. + + Scenario: + - Angle returned = -10° → converted to 350°. + - Peak list contains [350]. + + Expected: + - Assigned state is "0". + """ + universe_operations = MagicMock() + analysis = DihedralAnalysis(universe_operations) + + R = MagicMock() + R.results.angles = [[-10]] + Dihedral_patch.return_value.run.return_value = R + + mol = MagicMock() + mol.trajectory = [0] + universe_operations.get_molecule_container.return_value = mol + + states = analysis._assign_states( + MagicMock(), + [0], + dihedrals=[MagicMock()], + peaks=[[350]], + start=0, + end=360, + step=1, + ) + + assert states == ["0"] + + @patch("CodeEntropy.dihedral_tools.Dihedral") + def test_assign_states_closest_peak_selection(self, Dihedral_patch): + """ + Test that `_assign_states` selects the peak nearest to each dihedral + angle. + + Setup: + - Angle = 30°. + - Peaks = [20, 100]. + - Nearest peak = 20 (index 0). + + Expected: + - Returned state is ["0"]. + """ + ops = MagicMock() + analysis = DihedralAnalysis(ops) + + R = MagicMock() + R.results.angles = [[30]] + Dihedral_patch.return_value.run.return_value = R + + mol = MagicMock() + mol.trajectory = [0] + ops.get_molecule_container.return_value = mol + + states = analysis._assign_states( + MagicMock(), + [0], + dihedrals=[MagicMock()], + peaks=[[20, 100]], + start=0, + end=360, + step=1, + ) + + assert states == ["0"] + + @patch("CodeEntropy.dihedral_tools.Dihedral") + def test_assign_states_closest_peak(self, Dihedral_patch): + """ + Test assignment to the correct peak based on minimum angular distance. + + Scenario: + - Angle = 30°. + - Peaks = [20, 100]. + - Closest peak is 20° → index 0. + + Expected: + - Returned state is "0". + """ + universe_operations = MagicMock() + analysis = DihedralAnalysis(universe_operations) + + R = MagicMock() + R.results.angles = [[30]] + Dihedral_patch.return_value.run.return_value = R + + mol = MagicMock() + mol.trajectory = [0] + universe_operations.get_molecule_container.return_value = mol + + states = analysis._assign_states( + MagicMock(), + [0], + dihedrals=[MagicMock()], + peaks=[[20, 100]], + start=0, + end=360, + step=1, + ) + + assert states == ["0"] + + @patch("CodeEntropy.dihedral_tools.Dihedral") + def test_assign_states_multiple_dihedrals(self, Dihedral_patch): + """ + Test concatenation of state labels across multiple dihedrals. + + Scenario: + - Two dihedrals, one frame: + dihedral 0 → 10° → closest peak 0 + dihedral 1 → 200° → closest peak 180 (index 0) + - Resulting frame state: "00". + + Expected: + - Returned list: ["00"]. + """ + universe_operations = MagicMock() + analysis = DihedralAnalysis(universe_operations) + + R = MagicMock() + R.results.angles = [[10, 200]] + Dihedral_patch.return_value.run.return_value = R + + mol = MagicMock() + mol.trajectory = [0] + universe_operations.get_molecule_container.return_value = mol + + peaks = [[0, 180], [180, 300]] + + states = analysis._assign_states( + MagicMock(), + [0], + dihedrals=[MagicMock(), MagicMock()], + peaks=peaks, + start=0, + end=360, + step=1, + ) + + assert states == ["00"] + + def test_assign_states_multiple_molecules(self): + """ + Test that `_assign_states` generates different conformational state + labels for different molecules when their dihedral angle trajectories + differ. + + Molecule 0 is mocked to produce an angle near peak 0. + Molecule 1 is mocked to produce an angle near peak 1. + + Expected: + The returned state list reflects these differences as + ["0", "1"]. + """ + + universe_operations = MagicMock() + analysis = DihedralAnalysis(universe_operations) + + mol1 = MagicMock() + mol1.trajectory = [0] + + mol2 = MagicMock() + mol2.trajectory = [0] + + universe_operations.get_molecule_container.side_effect = [mol1, mol2] + + # Two different R objects + R1 = MagicMock() + R1.results.angles = [[10]] # peak index 0 + + R2 = MagicMock() + R2.results.angles = [[200]] # peak index 1 + + peaks = [[0, 180]] + + # Patch where Dihedral is *used* + with patch("CodeEntropy.dihedral_tools.Dihedral") as Dihedral_patch: + instance = Dihedral_patch.return_value + instance.run.side_effect = [R1, R2] + + states = analysis._assign_states( + MagicMock(), + molecules=[0, 1], + dihedrals=[MagicMock()], + peaks=peaks, + start=0, + end=360, + step=1, + ) + + assert states == ["0", "1"] + + def test_build_states_united_atom_no_dihedrals(self): + """ + Test that UA-level state building produces empty state lists when no + dihedrals are found for any residue. + """ + ops = MagicMock() + analysis = DihedralAnalysis(ops) + + mol = MagicMock() + mol.residues = [MagicMock()] + ops.get_molecule_container.return_value = mol + ops.new_U_select_atom.return_value = MagicMock() + + analysis._get_dihedrals = MagicMock(return_value=[]) + analysis._identify_peaks = MagicMock(return_value=[]) + analysis._assign_states = MagicMock(return_value=[]) + + groups = {0: [0]} + levels = {0: ["united_atom"]} + + states_ua, states_res = analysis.build_conformational_states( + MagicMock(), levels, groups, start=0, end=360, step=1, bin_width=10 + ) + + assert states_ua[(0, 0)] == [] + + def test_build_states_united_atom_accumulate(self): + """ + Test that UA-level state building assigns states independently to each + residue and accumulates them correctly. + """ + ops = MagicMock() + analysis = DihedralAnalysis(ops) + + mol = MagicMock() + mol.residues = [MagicMock(), MagicMock()] + ops.get_molecule_container.return_value = mol + ops.new_U_select_atom.return_value = MagicMock() + + analysis._get_dihedrals = MagicMock(return_value=[1]) + analysis._identify_peaks = MagicMock(return_value=[[10]]) + analysis._assign_states = MagicMock(return_value=["A"]) + + groups = {0: [0]} + levels = {0: ["united_atom"]} + + states_ua, _ = analysis.build_conformational_states( + MagicMock(), levels, groups, start=0, end=360, step=1, bin_width=10 + ) + + assert states_ua[(0, 0)] == ["A"] + assert states_ua[(0, 1)] == ["A"] + + def test_build_states_residue_no_dihedrals(self): + """ + Test that residue-level state building returns an empty list when + `_get_dihedrals` reports no available dihedral groups. + """ + ops = MagicMock() + analysis = DihedralAnalysis(ops) + + mol = MagicMock() + mol.residues = [MagicMock()] + ops.get_molecule_container.return_value = mol + + analysis._get_dihedrals = MagicMock(return_value=[]) + analysis._identify_peaks = MagicMock(return_value=[]) + analysis._assign_states = MagicMock(return_value=[]) + + groups = {0: [0]} + levels = {0: ["residue"]} + + _, states_res = analysis.build_conformational_states( + MagicMock(), levels, groups, start=0, end=360, step=1, bin_width=10 + ) + + assert states_res[0] == [] + + def test_build_states_residue_accumulate(self): + """ + Test that residue-level state building delegates all molecules in a group + to a single `_assign_states` call, and stores its returned list directly. + + Expected: + _assign_states returns ["A", "B"], so states_res[0] == ["A", "B"]. + """ + ops = MagicMock() + analysis = DihedralAnalysis(ops) + + mol1 = MagicMock() + mol1.residues = [MagicMock()] + mol2 = MagicMock() + mol2.residues = [MagicMock()] + + ops.get_molecule_container.side_effect = [mol1, mol2] + + analysis._get_dihedrals = MagicMock(return_value=[1]) + analysis._identify_peaks = MagicMock(return_value=[[10]]) + + # One call for the whole group → one return value + analysis._assign_states = MagicMock(return_value=["A", "B"]) + + groups = {0: [0, 1]} + levels = {0: ["residue"], 1: ["residue"]} + + _, states_res = analysis.build_conformational_states( + MagicMock(), levels, groups, start=0, end=360, step=1, bin_width=10 + ) + + assert states_res[0] == ["A", "B"] diff --git a/tests/test_CodeEntropy/test_entropy.py b/tests/test_CodeEntropy/test_entropy.py index e6c09a1..8fce98f 100644 --- a/tests/test_CodeEntropy/test_entropy.py +++ b/tests/test_CodeEntropy/test_entropy.py @@ -20,6 +20,7 @@ ) from CodeEntropy.levels import LevelManager from CodeEntropy.main import main +from CodeEntropy.mda_universe_operations import UniverseOperations from CodeEntropy.run import ConfigManager, RunManager from tests.test_CodeEntropy.test_base import BaseTestCase @@ -47,11 +48,20 @@ def test_execute_full_workflow(self): bin_width=0.1, temperature=300, selection_string="all", water_entropy=False ) run_manager = RunManager("mock_folder/job001") - level_manager = LevelManager() + universe_operations = UniverseOperations() + level_manager = LevelManager(universe_operations) data_logger = DataLogger() group_molecules = MagicMock() + dihedral_analysis = MagicMock() entropy_manager = EntropyManager( - run_manager, args, u, data_logger, level_manager, group_molecules + run_manager, + args, + u, + data_logger, + level_manager, + group_molecules, + dihedral_analysis, + universe_operations, ) # Mocks for trajectory and molecules @@ -75,7 +85,7 @@ def test_execute_full_workflow(self): entropy_manager._level_manager.build_covariance_matrices = MagicMock( return_value=("force_matrices", "torque_matrices", "frame_counts") ) - entropy_manager._level_manager.build_conformational_states = MagicMock( + entropy_manager._dihedral_analysis.build_conformational_states = MagicMock( return_value=(["state_ua"], ["state_res"]) ) entropy_manager._compute_entropies = MagicMock() @@ -101,18 +111,15 @@ def test_execute_full_workflow(self): entropy_manager.execute() # Assert the key calls happened with expected arguments - build_states = entropy_manager._level_manager.build_conformational_states + build_states = entropy_manager._dihedral_analysis.build_conformational_states build_states.assert_called_once_with( - entropy_manager, mock_reduced_atom, mock_levels, mock_groups, 0, 10, 1, - 11, args.bin_width, - ce, ) entropy_manager._compute_entropies.assert_called_once_with( @@ -144,11 +151,20 @@ def test_execute_triggers_handle_water_entropy_minimal(self): bin_width=0.1, temperature=300, selection_string="all", water_entropy=True ) run_manager = RunManager("mock_folder/job001") - level_manager = LevelManager() + universe_operations = UniverseOperations() + level_manager = LevelManager(universe_operations) data_logger = DataLogger() group_molecules = MagicMock() + dihedral_analysis = MagicMock() entropy_manager = EntropyManager( - run_manager, args, u, data_logger, level_manager, group_molecules + run_manager, + args, + u, + data_logger, + level_manager, + group_molecules, + dihedral_analysis, + universe_operations, ) entropy_manager._get_trajectory_bounds = MagicMock(return_value=(0, 10, 1)) @@ -159,7 +175,7 @@ def test_execute_triggers_handle_water_entropy_minimal(self): entropy_manager._level_manager.build_covariance_matrices = MagicMock( return_value=("force_matrices", "torque_matrices", "frame_counts") ) - entropy_manager._level_manager.build_conformational_states = MagicMock( + entropy_manager._dihedral_analysis.build_conformational_states = MagicMock( return_value=(["state_ua"], ["state_res"]) ) entropy_manager._compute_entropies = MagicMock() @@ -194,7 +210,14 @@ def test_water_entropy_sets_selection_string_when_all(self): mock_universe = MagicMock() args = MagicMock(water_entropy=True, selection_string="all") manager = EntropyManager( - MagicMock(), args, mock_universe, DataLogger(), MagicMock(), MagicMock() + MagicMock(), + args, + mock_universe, + DataLogger(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) manager._calculate_water_entropy = MagicMock() @@ -215,7 +238,14 @@ def test_water_entropy_appends_to_custom_selection_string(self): mock_universe = MagicMock() args = MagicMock(water_entropy=True, selection_string="protein") manager = EntropyManager( - MagicMock(), args, mock_universe, DataLogger(), MagicMock(), MagicMock() + MagicMock(), + args, + mock_universe, + DataLogger(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) manager._calculate_water_entropy = MagicMock() @@ -237,7 +267,14 @@ def test_handle_water_entropy_returns_early(self): mock_universe = MagicMock() args = MagicMock(water_entropy=True, selection_string="protein") manager = EntropyManager( - MagicMock(), args, mock_universe, DataLogger(), MagicMock(), MagicMock() + MagicMock(), + args, + mock_universe, + DataLogger(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) # Patch _calculate_water_entropy to track if called @@ -270,11 +307,18 @@ def test_initialize_molecules(self): bin_width=0.1, temperature=300, selection_string="all", water_entropy=False ) run_manager = RunManager("mock_folder/job001") - level_manager = LevelManager() + level_manager = LevelManager(MagicMock()) data_logger = DataLogger() group_molecules = MagicMock() manager = EntropyManager( - run_manager, args, MagicMock(), data_logger, level_manager, group_molecules + run_manager, + args, + MagicMock(), + data_logger, + level_manager, + group_molecules, + MagicMock(), + MagicMock(), ) # Mock dependencies @@ -317,7 +361,14 @@ def test_get_trajectory_bounds(self): args, _ = parser.parse_known_args() entropy_manager = EntropyManager( - MagicMock(), args, MagicMock(), MagicMock(), MagicMock(), MagicMock() + MagicMock(), + args, + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) self.assertIsInstance(entropy_manager._args.start, int) @@ -350,7 +401,14 @@ def test_get_number_frames(self, mock_args): mock_universe.trajectory = range(10) entropy_manager = EntropyManager( - MagicMock(), args, mock_universe, MagicMock(), MagicMock(), MagicMock() + MagicMock(), + args, + mock_universe, + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) # Use _get_trajectory_bounds to convert end=-1 into the actual last frame @@ -384,7 +442,14 @@ def test_get_number_frames_sliced_trajectory(self, mock_args): mock_universe.trajectory = range(30) entropy_manager = EntropyManager( - MagicMock(), args, mock_universe, MagicMock(), MagicMock(), MagicMock() + MagicMock(), + args, + mock_universe, + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) start, end, step = entropy_manager._get_trajectory_bounds() @@ -416,7 +481,14 @@ def test_get_number_frames_sliced_trajectory_step(self, mock_args): mock_universe.trajectory = range(20) entropy_manager = EntropyManager( - MagicMock(), args, mock_universe, MagicMock(), MagicMock(), MagicMock() + MagicMock(), + args, + mock_universe, + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) start, end, step = entropy_manager._get_trajectory_bounds() @@ -425,94 +497,65 @@ def test_get_number_frames_sliced_trajectory_step(self, mock_args): # Expect 20 frames divided by step of 5 = 4 frames self.assertEqual(number_frames, 4) - @patch( - "argparse.ArgumentParser.parse_args", - return_value=MagicMock( - selection_string="all", - ), - ) - def test_get_reduced_universe_all(self, mock_args): - """ - Test `_get_reduced_universe` with 'all' selection. - - Verifies that the full universe is returned when the selection string - is set to 'all', and the number of atoms remains unchanged. - """ - # Load MDAnalysis Universe - tprfile = os.path.join(self.test_data_dir, "md_A4_dna.tpr") - trrfile = os.path.join(self.test_data_dir, "md_A4_dna_xf.trr") - u = mda.Universe(tprfile, trrfile) - - config_manager = ConfigManager() - - parser = config_manager.setup_argparse() - args = parser.parse_args() - - entropy_manager = EntropyManager( - MagicMock(), args, u, MagicMock(), MagicMock(), MagicMock() - ) - - entropy_manager._get_reduced_universe() - - self.assertEqual(entropy_manager._universe.atoms.n_atoms, 254) - @patch( "argparse.ArgumentParser.parse_args", return_value=MagicMock( - selection_string="resname DA", + selection_string="all", ), ) - def test_get_reduced_universe_reduced(self, mock_args): + def test_get_reduced_universe_all(self, mock_args): """ - Test `_get_reduced_universe` with a specific atom selection. + Test `_get_reduced_universe` with 'all' selection. - Ensures that the reduced universe contains fewer atoms than the original - when a specific selection string is used. + Verifies that the full universe is returned when the selection string + is set to 'all', and the number of atoms remains unchanged. """ - # Load MDAnalysis Universe tprfile = os.path.join(self.test_data_dir, "md_A4_dna.tpr") trrfile = os.path.join(self.test_data_dir, "md_A4_dna_xf.trr") u = mda.Universe(tprfile, trrfile) config_manager = ConfigManager() - run_manager = RunManager("mock_folder/job001") parser = config_manager.setup_argparse() args = parser.parse_args() entropy_manager = EntropyManager( - run_manager, args, u, MagicMock(), MagicMock(), MagicMock() + MagicMock(), + args, + u, + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) - reduced_u = entropy_manager._get_reduced_universe() + entropy_manager._get_reduced_universe() - # Assert that the reduced universe has fewer atoms - assert len(reduced_u.atoms) < len(u.atoms) + self.assertEqual(entropy_manager._universe.atoms.n_atoms, 254) @patch( "argparse.ArgumentParser.parse_args", return_value=MagicMock( - selection_string="all", + selection_string="resname DA", ), ) - def test_get_molecule_container(self, mock_args): + def test_get_reduced_universe_reduced(self, mock_args): """ - Test `_get_molecule_container` for extracting a molecule fragment. + Test `_get_reduced_universe` with a specific atom selection. - Verifies that the returned universe contains the correct atoms corresponding - to the specified molecule ID's fragment from the original universe. + Ensures that the reduced universe contains fewer atoms than the original + when a specific selection string is used. """ - # Load a test universe + # Load MDAnalysis Universe tprfile = os.path.join(self.test_data_dir, "md_A4_dna.tpr") trrfile = os.path.join(self.test_data_dir, "md_A4_dna_xf.trr") u = mda.Universe(tprfile, trrfile) - # Assume the universe has at least one fragment - assert len(u.atoms.fragments) > 0 + universe_operations = UniverseOperations() - # Setup managers config_manager = ConfigManager() run_manager = RunManager("mock_folder/job001") @@ -520,36 +563,48 @@ def test_get_molecule_container(self, mock_args): args = parser.parse_args() entropy_manager = EntropyManager( - run_manager, args, u, MagicMock(), MagicMock(), MagicMock() + run_manager, + args, + u, + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + universe_operations, ) - # Call the method - molecule_id = 0 - mol_universe = entropy_manager._get_molecule_container(u, molecule_id) - - # Get the original fragment - original_fragment = u.atoms.fragments[molecule_id] - - # Assert that the atoms in the returned universe match the fragment - selected_indices = mol_universe.atoms.indices - expected_indices = original_fragment.indices + reduced_u = entropy_manager._get_reduced_universe() - assert set(selected_indices) == set(expected_indices) - assert len(mol_universe.atoms) == len(original_fragment) + # Assert that the reduced universe has fewer atoms + assert len(reduced_u.atoms) < len(u.atoms) - def test_process_united_atom_entropy(self): + @patch( + "argparse.ArgumentParser.parse_args", + return_value=MagicMock( + selection_string="all", + ), + ) + def test_process_united_atom_entropy(self, selection_string_mock): """ Tests that `_process_united_atom_entropy` correctly logs global and residue-level entropy results for a mocked molecular system. """ # Setup managers and arguments args = MagicMock(bin_width=0.1, temperature=300, selection_string="all") - run_manager = MagicMock() + universe_operations = UniverseOperations() + run_manager = MagicMock(universe_operations) level_manager = MagicMock() data_logger = DataLogger() group_molecules = MagicMock() manager = EntropyManager( - run_manager, args, MagicMock(), data_logger, level_manager, group_molecules + run_manager, + args, + MagicMock(), + data_logger, + level_manager, + group_molecules, + MagicMock(), + universe_operations, ) # Mock molecule container with residues and atoms @@ -630,16 +685,24 @@ def test_process_vibrational_only_levels(self): # Setup managers and arguments args = MagicMock(bin_width=0.1, temperature=300, selection_string="all") run_manager = RunManager("mock_folder/job001") - level_manager = LevelManager() + universe_operations = UniverseOperations() + level_manager = LevelManager(universe_operations) data_logger = DataLogger() group_molecules = MagicMock() manager = EntropyManager( - run_manager, args, u, data_logger, level_manager, group_molecules + run_manager, + args, + u, + data_logger, + level_manager, + group_molecules, + MagicMock(), + universe_operations, ) # Prepare mock molecule container reduced_atom = manager._get_reduced_universe() - mol_container = manager._get_molecule_container(reduced_atom, 0) + mol_container = universe_operations.get_molecule_container(reduced_atom, 0) # Simulate trajectory length mol_container.trajectory = [None] * 10 # 10 frames @@ -683,11 +746,19 @@ def test_compute_entropies_polymer_branch(self): """ args = MagicMock(bin_width=0.1) run_manager = MagicMock() - level_manager = MagicMock() + universe_operations = UniverseOperations() + level_manager = LevelManager(universe_operations) data_logger = DataLogger() group_molecules = MagicMock() manager = EntropyManager( - run_manager, args, MagicMock(), data_logger, level_manager, group_molecules + run_manager, + args, + MagicMock(), + data_logger, + level_manager, + group_molecules, + MagicMock(), + universe_operations, ) reduced_atom = MagicMock() @@ -703,7 +774,7 @@ def test_compute_entropies_polymer_branch(self): mol_mock = MagicMock() mol_mock.residues = [] - manager._get_molecule_container = MagicMock(return_value=mol_mock) + universe_operations.get_molecule_container = MagicMock(return_value=mol_mock) manager._process_vibrational_entropy = MagicMock() ve = MagicMock() @@ -742,11 +813,19 @@ def test_process_conformational_residue_level(self): # Setup managers and arguments args = MagicMock(bin_width=0.1, temperature=300, selection_string="all") run_manager = RunManager("mock_folder/job001") - level_manager = LevelManager() + universe_operations = UniverseOperations() + level_manager = LevelManager(universe_operations) data_logger = DataLogger() group_molecules = MagicMock() manager = EntropyManager( - run_manager, args, u, data_logger, level_manager, group_molecules + run_manager, + args, + u, + data_logger, + level_manager, + group_molecules, + MagicMock(), + universe_operations, ) # Create dummy states @@ -787,12 +866,20 @@ def test_process_conformational_entropy_no_states_entry(self): # Setup managers and arguments args = MagicMock() + universe_operations = MagicMock() run_manager = MagicMock() level_manager = MagicMock() data_logger = DataLogger() group_molecules = MagicMock() manager = EntropyManager( - run_manager, args, u, data_logger, level_manager, group_molecules + run_manager, + args, + u, + data_logger, + level_manager, + group_molecules, + MagicMock(), + universe_operations, ) # States dict does NOT contain group_id=1 @@ -823,12 +910,20 @@ def test_compute_entropies_united_atom(self): level with highest=False when it's the only level. """ args = MagicMock(bin_width=0.1) + universe_operations = UniverseOperations() run_manager = MagicMock() level_manager = MagicMock() data_logger = DataLogger() group_molecules = MagicMock() manager = EntropyManager( - run_manager, args, MagicMock(), data_logger, level_manager, group_molecules + run_manager, + args, + MagicMock(), + data_logger, + level_manager, + group_molecules, + MagicMock(), + universe_operations, ) reduced_atom = MagicMock() @@ -844,7 +939,7 @@ def test_compute_entropies_united_atom(self): mol_mock = MagicMock() mol_mock.residues = [] - manager._get_molecule_container = MagicMock(return_value=mol_mock) + universe_operations.get_molecule_container = MagicMock(return_value=mol_mock) manager._process_united_atom_entropy = MagicMock() ve = MagicMock() @@ -886,12 +981,20 @@ def test_compute_entropies_residue(self): """ # Setup args = MagicMock(bin_width=0.1) + universe_operations = UniverseOperations() run_manager = MagicMock() level_manager = MagicMock() data_logger = DataLogger() group_molecules = MagicMock() manager = EntropyManager( - run_manager, args, MagicMock(), data_logger, level_manager, group_molecules + run_manager, + args, + MagicMock(), + data_logger, + level_manager, + group_molecules, + MagicMock(), + universe_operations, ) reduced_atom = MagicMock() @@ -910,7 +1013,7 @@ def test_compute_entropies_residue(self): # Mock molecule mol_mock = MagicMock() mol_mock.residues = [] - manager._get_molecule_container = MagicMock(return_value=mol_mock) + universe_operations.get_molecule_container = MagicMock(return_value=mol_mock) manager._process_vibrational_entropy = MagicMock() manager._process_conformational_entropy = MagicMock() @@ -939,12 +1042,21 @@ def test_compute_entropies_residue(self): def test_compute_entropies_polymer(self): args = MagicMock(bin_width=0.1) + universe_operations = UniverseOperations() run_manager = MagicMock() level_manager = MagicMock() data_logger = DataLogger() group_molecules = MagicMock() + dihedral_analysis = MagicMock() manager = EntropyManager( - run_manager, args, MagicMock(), data_logger, level_manager, group_molecules + run_manager, + args, + MagicMock(), + data_logger, + level_manager, + group_molecules, + dihedral_analysis, + universe_operations, ) reduced_atom = MagicMock() @@ -961,7 +1073,7 @@ def test_compute_entropies_polymer(self): mol_mock = MagicMock() mol_mock.residues = [] - manager._get_molecule_container = MagicMock(return_value=mol_mock) + universe_operations.get_molecule_container = MagicMock(return_value=mol_mock) manager._process_vibrational_entropy = MagicMock() ve = MagicMock() @@ -1009,7 +1121,7 @@ def test_finalize_molecule_results_aggregates_and_logs_total_entropy(self): ] data_logger.residue_data = [] - manager = EntropyManager(None, args, None, data_logger, None, None) + manager = EntropyManager(None, args, None, data_logger, None, None, None, None) # Patch save method data_logger.save_dataframes_as_json = MagicMock() @@ -1052,7 +1164,7 @@ def test_finalize_molecule_results_skips_invalid_entries(self, mock_logger): ] data_logger.residue_data = [] - manager = EntropyManager(None, args, None, data_logger, None, None) + manager = EntropyManager(None, args, None, data_logger, None, None, None, None) # Patch save method data_logger.save_dataframes_as_json = MagicMock() @@ -1091,7 +1203,14 @@ def setUp(self): os.chdir(self.test_dir) self.entropy_manager = EntropyManager( - MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock() + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) def tearDown(self): @@ -1117,13 +1236,22 @@ def test_vibrational_entropy_init(self): args.selection_string = "all" run_manager = RunManager("mock_folder/job001") - level_manager = LevelManager() + universe_operations = UniverseOperations() + level_manager = LevelManager(universe_operations) data_logger = DataLogger() group_molecules = MagicMock() + dihedral_analysis = MagicMock() # Instantiate VibrationalEntropy ve = VibrationalEntropy( - run_manager, args, universe, data_logger, level_manager, group_molecules + run_manager, + args, + universe, + data_logger, + level_manager, + group_molecules, + dihedral_analysis, + universe_operations, ) # Basic assertions to check initialization @@ -1144,7 +1272,14 @@ def test_frequency_calculation_0(self): run_manager = RunManager("mock_folder/job001") ve = VibrationalEntropy( - run_manager, MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock() + run_manager, + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) frequencies = ve.frequency_calculation(lambdas, temp) @@ -1165,7 +1300,14 @@ def test_frequency_calculation_positive(self): # Instantiate VibrationalEntropy with mocks ve = VibrationalEntropy( - run_manager, MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock() + run_manager, + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) # Call the method under test @@ -1193,7 +1335,14 @@ def test_frequency_calculation_filters_invalid(self): # Instantiate VibrationalEntropy with mocks ve = VibrationalEntropy( - run_manager, MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock() + run_manager, + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) # Call the method @@ -1226,7 +1375,14 @@ def test_frequency_calculation_filters_invalid_with_warning(self): run_manager.get_KT2J.return_value = 2.479e-21 # example value ve = VibrationalEntropy( - run_manager, MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock() + run_manager, + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) with self.assertLogs("CodeEntropy.entropy", level="WARNING") as cm: @@ -1260,7 +1416,14 @@ def test_vibrational_entropy_calculation_force_not_highest(self): # Instantiate VibrationalEntropy with mocks ve = VibrationalEntropy( - run_manager, MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock() + run_manager, + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) # Patch frequency_calculation to return known frequencies @@ -1305,7 +1468,14 @@ def test_vibrational_entropy_polymer_force(self): run_manager = RunManager("mock_folder/job001") ve = VibrationalEntropy( - run_manager, MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock() + run_manager, + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) S_vib = ve.vibrational_entropy_calculation( @@ -1335,7 +1505,14 @@ def test_vibrational_entropy_polymer_torque(self): run_manager = RunManager("mock_folder/job001") ve = VibrationalEntropy( - run_manager, MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock() + run_manager, + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), + MagicMock(), ) S_vib = ve.vibrational_entropy_calculation( @@ -1592,13 +1769,21 @@ def test_confirmational_entropy_init(self): args.selection_string = "all" run_manager = RunManager("mock_folder/job001") - level_manager = LevelManager() + universe_operations = UniverseOperations() + level_manager = LevelManager(universe_operations) data_logger = DataLogger() group_molecules = MagicMock() # Instantiate ConformationalEntropy ce = ConformationalEntropy( - run_manager, args, universe, data_logger, level_manager, group_molecules + run_manager, + args, + universe, + data_logger, + level_manager, + group_molecules, + MagicMock(), + universe_operations, ) # Basic assertions to check initialization @@ -1606,102 +1791,6 @@ def test_confirmational_entropy_init(self): self.assertEqual(ce._args.temperature, 300) self.assertEqual(ce._args.bin_width, 0.1) - def test_assign_conformation(self): - """ - Test the `assign_conformation` method for correct binning of dihedral angles. - - Mocks a dihedral angle with specific values across frames and checks that: - - The returned result is a NumPy array. - - The array has the expected length. - - All values are non-negative and of floating-point type. - """ - # Mock dihedral with predefined values - dihedral = MagicMock() - dihedral.value = MagicMock(side_effect=[-30, 350, 350, 250, 10, 10]) - - # Create a list of mock timesteps with frame numbers - mock_timesteps = [MagicMock(frame=i) for i in range(6)] - - # Mock data_container with a trajectory that returns the mock timesteps - data_container = MagicMock() - data_container.trajectory.__getitem__.return_value = mock_timesteps - - # Load test universe - tprfile = os.path.join(self.test_data_dir, "md_A4_dna.tpr") - trrfile = os.path.join(self.test_data_dir, "md_A4_dna_xf.trr") - u = mda.Universe(tprfile, trrfile) - - # Setup managers and arguments - args = MagicMock(bin_width=0.1, temperature=300, selection_string="all") - run_manager = RunManager("mock_folder/job001") - level_manager = LevelManager() - data_logger = DataLogger() - group_molecules = MagicMock() - - ce = ConformationalEntropy( - run_manager, args, u, data_logger, level_manager, group_molecules - ) - - result = ce.assign_conformation( - data_container=data_container, - dihedral=dihedral, - number_frames=6, - bin_width=60, - start=0, - end=6, - step=1, - ) - - assert isinstance(result, np.ndarray) - assert len(result) == 6 - assert np.all(result >= 0) - assert np.issubdtype(result.dtype, np.floating) - - def test_assign_conformation_last_bin_peak(self): - """ - Test that the last bin in the histogram is correctly evaluated as a peak - when its population is greater than or equal to its neighbors. - """ - - dihedral = MagicMock() - dihedral.value = MagicMock(side_effect=[5, 10, 250, 260, 350, 355]) - - # Mock trajectory frames - mock_timesteps = [MagicMock(frame=i) for i in range(6)] - data_container = MagicMock() - data_container.trajectory.__getitem__.return_value = mock_timesteps - - # Create dummy universe and managers - tprfile = os.path.join(self.test_data_dir, "md_A4_dna.tpr") - trrfile = os.path.join(self.test_data_dir, "md_A4_dna_xf.trr") - u = mda.Universe(tprfile, trrfile) - - args = MagicMock(bin_width=60, temperature=300, selection_string="all") - run_manager = RunManager("mock_folder/job001") - level_manager = LevelManager() - data_logger = DataLogger() - group_molecules = MagicMock() - - ce = ConformationalEntropy( - run_manager, args, u, data_logger, level_manager, group_molecules - ) - - result = ce.assign_conformation( - data_container=data_container, - dihedral=dihedral, - number_frames=6, - bin_width=60, - start=0, - end=6, - step=1, - ) - - # Basic checks - assert isinstance(result, np.ndarray) - assert len(result) == 6 - assert np.all(result >= 0) - assert np.issubdtype(result.dtype, np.floating) - def test_conformational_entropy_calculation(self): """ Test `conformational_entropy_calculation` method to verify @@ -1711,24 +1800,31 @@ def test_conformational_entropy_calculation(self): # Setup managers and arguments args = MagicMock(bin_width=0.1, temperature=300, selection_string="all") run_manager = RunManager("mock_folder/job001") - level_manager = LevelManager() + universe_operations = UniverseOperations() + level_manager = LevelManager(universe_operations) data_logger = DataLogger() group_molecules = MagicMock() ce = ConformationalEntropy( - run_manager, args, MagicMock(), data_logger, level_manager, group_molecules + run_manager, + args, + MagicMock(), + data_logger, + level_manager, + group_molecules, + MagicMock(), + universe_operations, ) # Create a simple array of states with known counts states = np.array([0, 0, 1, 1, 1, 2]) # 2x state 0, 3x state 1, 1x state 2 - number_frames = len(states) # Manually compute expected entropy probs = np.array([2 / 6, 3 / 6, 1 / 6]) expected_entropy = -np.sum(probs * np.log(probs)) * ce._GAS_CONST # Run the method under test - result = ce.conformational_entropy_calculation(states, number_frames) + result = ce.conformational_entropy_calculation(states) # Assert the result is close to expected entropy self.assertAlmostEqual(result, expected_entropy, places=6) @@ -1773,13 +1869,21 @@ def test_orientational_entropy_init(self): args.selection_string = "all" run_manager = RunManager("mock_folder/job001") - level_manager = LevelManager() + universe_operations = UniverseOperations() + level_manager = LevelManager(universe_operations) data_logger = DataLogger() group_molecules = MagicMock() # Instantiate OrientationalEntropy oe = OrientationalEntropy( - run_manager, args, universe, data_logger, level_manager, group_molecules + run_manager, + args, + universe, + data_logger, + level_manager, + group_molecules, + MagicMock(), + universe_operations, ) # Basic assertions to check initialization @@ -1800,7 +1904,7 @@ def test_orientational_entropy_calculation(self): } # Create an instance of OrientationalEntropy with dummy dependencies - oe = OrientationalEntropy(None, None, None, None, None, None) + oe = OrientationalEntropy(None, None, None, None, None, None, None, None) # Run the method result = oe.orientational_entropy_calculation(neighbours_dict) @@ -1821,7 +1925,7 @@ def test_orientational_entropy_water_branch_is_covered(self): """ neighbours_dict = {"H2O": 1} # Matches the condition exactly - oe = OrientationalEntropy(None, None, None, None, None, None) + oe = OrientationalEntropy(None, None, None, None, None, None, None, None) result = oe.orientational_entropy_calculation(neighbours_dict) # Since the logic is skipped, total entropy should be 0.0 diff --git a/tests/test_CodeEntropy/test_levels.py b/tests/test_CodeEntropy/test_levels.py index 08d83f8..dd751de 100644 --- a/tests/test_CodeEntropy/test_levels.py +++ b/tests/test_CodeEntropy/test_levels.py @@ -3,6 +3,7 @@ import numpy as np from CodeEntropy.levels import LevelManager +from CodeEntropy.mda_universe_operations import UniverseOperations from tests.test_CodeEntropy.test_base import BaseTestCase @@ -45,8 +46,10 @@ def test_select_levels(self): data_container.atoms.fragments = [fragment1, fragment2] + universe_operations = UniverseOperations() + # Import the class and call the method - level_manager = LevelManager() + level_manager = LevelManager(universe_operations) number_molecules, levels = level_manager.select_levels(data_container) # Assertions @@ -61,8 +64,10 @@ def test_get_matrices(self): Ensures that the method returns correctly shaped matrices after filtering. """ - # Create a mock LevelManager level_manager - level_manager = LevelManager() + # Create a mock UniverseOperations and LevelManager + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) # Mock internal methods level_manager.get_beads = MagicMock(return_value=["bead1", "bead2"]) @@ -113,7 +118,9 @@ def test_get_matrices_force_shape_mismatch(self): Test that get_matrices raises a ValueError when the provided force_matrix has a shape mismatch with the computed force block matrix. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) # Mock internal methods level_manager.get_beads = MagicMock(return_value=["bead1", "bead2"]) @@ -150,7 +157,9 @@ def test_get_matrices_torque_shape_mismatch(self): Test that get_matrices raises a ValueError when the provided torque_matrix has a shape mismatch with the computed torque block matrix. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) # Mock internal methods level_manager.get_beads = MagicMock(return_value=["bead1", "bead2"]) @@ -186,7 +195,9 @@ def test_get_matrices_torque_consistency(self): Test that get_matrices returns consistent torque and force matrices when called multiple times with the same inputs. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) level_manager.get_beads = MagicMock(return_value=["bead1", "bead2"]) level_manager.get_axes = MagicMock(return_value=("trans_axes", "rot_axes")) @@ -227,151 +238,14 @@ def test_get_matrices_torque_consistency(self): self.assertTrue(np.allclose(torque_matrix_1, torque_matrix_2, atol=1e-8)) self.assertTrue(np.allclose(force_matrix_1, force_matrix_2, atol=1e-8)) - def test_get_dihedrals_united_atom(self): - """ - Test `get_dihedrals` for 'united_atom' level. - Ensures it returns the dihedrals directly from the data container. - """ - level_manager = LevelManager() - - data_container = MagicMock() - mock_dihedrals = ["d1", "d2", "d3"] - data_container.dihedrals = mock_dihedrals - - result = level_manager.get_dihedrals(data_container, level="united_atom") - self.assertEqual(result, mock_dihedrals) - - def test_get_dihedrals_residue(self): - """ - Test `get_dihedrals` for 'residue' level with 5 residues. - Mocks bonded atom selections and verifies that dihedrals are constructed. - """ - level_manager = LevelManager() - - data_container = MagicMock() - data_container.residues = [0, 1, 2, 3, 4] # 5 residues - - # Mock select_atoms to return atom groups with .dihedral - mock_dihedral = MagicMock() - mock_atom_group = MagicMock() - mock_atom_group.__add__.return_value = mock_atom_group - mock_atom_group.dihedral = mock_dihedral - data_container.select_atoms.return_value = mock_atom_group - - result = level_manager.get_dihedrals(data_container, level="residue") - - # Should create 2 dihedrals for 5 residues (residues 0–3 and 1–4) - self.assertEqual(len(result), 2) - self.assertTrue(all(d == mock_dihedral for d in result)) - - def test_get_dihedrals_no_residue(self): - """ - Test `get_dihedrals` for 'residue' level with 3 residues. - Mocks bonded atom selections and verifies that dihedrals are constructed. - """ - level_manager = LevelManager() - - data_container = MagicMock() - data_container.residues = [0, 1, 2] # 3 residues - - # Mock select_atoms to return atom groups with .dihedral - mock_dihedral = MagicMock() - mock_atom_group = MagicMock() - mock_atom_group.__add__.return_value = mock_atom_group - mock_atom_group.dihedral = mock_dihedral - data_container.select_atoms.return_value = mock_atom_group - - result = level_manager.get_dihedrals(data_container, level="residue") - - # Should result in no resdies - self.assertEqual(result, []) - - def test_compute_dihedral_conformations(self): - """ - Test `compute_dihedral_conformations` to ensure it correctly calls - `assign_conformation` on each dihedral and returns the expected - list of conformation strings. - """ - - # Setup - level_manager = LevelManager() - - # Mock selector (can be anything since we're mocking internals) - selector = MagicMock() - - # Mock dihedrals: pretend we have 3 dihedrals - mocked_dihedrals = ["d1", "d2", "d3"] - level_manager.get_dihedrals = MagicMock(return_value=mocked_dihedrals) - - # Mock the conformation entropy (ce) object with assign_conformation method - ce = MagicMock() - # For each dihedral, assign_conformation returns a numpy array of ints - ce.assign_conformation = MagicMock( - side_effect=[ - np.array([0, 1, 2]), - np.array([1, 0, 1]), - np.array([2, 2, 0]), - ] - ) - - number_frames = 3 - bin_width = 10 - start = 0 - end = 3 - step = 1 - level = "residue" - - # Call the method - states = level_manager.compute_dihedral_conformations( - selector, level, number_frames, bin_width, start, end, step, ce - ) - - # Expected states per frame - expected_states = [ - "012", # frame 0: d1=0, d2=1, d3=2 - "102", # frame 1: d1=1, d2=0, d3=2 - "210", # frame 2: d1=2, d2=1, d3=0 - ] - - # Verify the call count matches the number of dihedrals - self.assertEqual(ce.assign_conformation.call_count, len(mocked_dihedrals)) - - # Verify returned states are as expected - self.assertEqual(states, expected_states) - - # Verify get_dihedrals was called once with correct arguments - level_manager.get_dihedrals.assert_called_once_with(selector, level) - - def test_compute_dihedral_conformations_no_dihedrals(self): - """ - Test `compute_dihedral_conformations` when no dihedrals are found. - Ensures it returns an empty list of states. - """ - level_manager = LevelManager() - - level_manager.get_dihedrals = MagicMock(return_value=[]) - - selector = MagicMock() - - result = level_manager.compute_dihedral_conformations( - selector=selector, - level="united_atom", - number_frames=10, - bin_width=10.0, - start=0, - end=10, - step=1, - ce=MagicMock(), - ) - - self.assertEqual(result, []) - def test_get_beads_polymer_level(self): """ Test `get_beads` for 'polymer' level. Should return a single atom group representing the whole system. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) data_container = MagicMock() mock_atom_group = MagicMock() @@ -389,7 +263,9 @@ def test_get_beads_residue_level(self): Test `get_beads` for 'residue' level. Should return one atom group per residue. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) data_container = MagicMock() data_container.residues = [0, 1, 2] # 3 residues @@ -407,7 +283,9 @@ def test_get_beads_united_atom_level(self): Test `get_beads` for 'united_atom' level. Should return one bead per heavy atom, including bonded hydrogens. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) data_container = MagicMock() heavy_atoms = [MagicMock(index=i) for i in range(3)] @@ -431,7 +309,9 @@ def test_get_beads_hydrogen_molecule(self): Test `get_beads` for 'united_atom' level. Should return one bead for molecule with no heavy atoms. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) data_container = MagicMock() heavy_atoms = [] @@ -453,7 +333,9 @@ def test_get_axes_united_atom_no_bonds(self): Test `get_axes` for 'united_atom' level when no bonded atoms are found. Ensures that rotational axes fall back to residues' principal axes. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) data_container = MagicMock() @@ -486,7 +368,9 @@ def test_get_axes_polymer_level(self): Should return principal axes of the full system for both translation and rotation. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) data_container = MagicMock() principal_axes = np.identity(3) @@ -502,7 +386,9 @@ def test_get_axes_residue_level_with_bonds(self): Test `get_axes` for 'residue' level with bonded neighbors. Should use spherical coordinate axes for rotation. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) data_container = MagicMock() data_container.atoms.principal_axes.return_value = "trans_axes" @@ -531,7 +417,9 @@ def test_get_axes_residue_level_without_bonds(self): Test `get_axes` for 'residue' level with no bonded neighbors. Should use principal axes of the residue for rotation. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) data_container = MagicMock() data_container.atoms.principal_axes.return_value = "trans_axes" @@ -555,7 +443,9 @@ def test_get_axes_united_atom_level(self): Should use residue principal axes for translation and spherical axes for rotation. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) data_container = MagicMock() data_container.residues.principal_axes.return_value = "trans_axes" @@ -583,7 +473,9 @@ def test_get_avg_pos_with_atoms(self): Test `get_avg_pos` with a non-empty atom set. Should return the average of atom positions minus the center. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) atom1 = MagicMock() atom1.position = np.array([1.0, 2.0, 3.0]) @@ -606,7 +498,9 @@ def test_get_avg_pos_empty(self, mock_random): Test `get_avg_pos` with an empty atom set. Should return a random vector minus the center. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) atom_set = MagicMock() atom_set.names = [] @@ -625,7 +519,9 @@ def test_get_sphCoord_axes_valid_vector(self): Test with a valid non-zero vector. Should return a 3x3 orthonormal basis matrix. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) vector = np.array([1.0, 1.0, 1.0]) result = level_manager.get_sphCoord_axes(vector) @@ -638,7 +534,9 @@ def test_get_sphCoord_axes_vector_on_z_axis_raises(self): Test with a vector along the z-axis (x2y2 == 0). Should raise ValueError due to undefined phi. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) vector = np.array([0.0, 0.0, 1.0]) with self.assertRaises(ValueError): @@ -648,7 +546,9 @@ def test_get_sphCoord_axes_negative_x2y2_div_r2(self): """ Test with a vector that would cause x2y2 / r2 < 0. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) vector = np.array([1e-10, 1e-10, 1e10]) # x2y2 is tiny, r2 is huge result = level_manager.get_sphCoord_axes(vector) @@ -659,7 +559,9 @@ def test_get_sphCoord_axes_zero_vector_raises(self): Test with a zero vector. Should raise ValueError due to r2 == 0. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) vector = np.array([0.0, 0.0, 0.0]) with self.assertRaises(ValueError) as context: @@ -671,7 +573,9 @@ def test_get_sphCoord_axes_x2y2_zero_raises(self): Test with a vector along the z-axis (x2y2 == 0, r2 != 0). Should raise ValueError due to undefined phi. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) vector = np.array([0.0, 0.0, 1.0]) # r2 = 1.0, x2y2 = 0.0 with self.assertRaises(ValueError) as context: @@ -682,7 +586,9 @@ def test_get_weighted_force_with_partitioning(self): """ Test correct weighted force calculation with partitioning enabled. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) atom = MagicMock() atom.index = 0 @@ -707,7 +613,9 @@ def test_get_weighted_force_without_partitioning(self): """ Test correct weighted force calculation with partitioning disabled. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) atom = MagicMock() atom.index = 0 @@ -736,7 +644,9 @@ def test_get_weighted_forces_zero_mass_raises_value_error(self): """ Test that a zero mass raises a ValueError. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) atom = MagicMock() atom.index = 0 @@ -763,7 +673,9 @@ def test_get_weighted_forces_negative_mass_raises_value_error(self): """ Test that a negative mass raises a ValueError. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) atom = MagicMock() atom.index = 0 @@ -790,7 +702,9 @@ def test_get_weighted_torques_weighted_torque_basic(self): """ Test basic torque calculation with non-zero moment of inertia and torques. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) # Mock atom atom = MagicMock() @@ -824,7 +738,9 @@ def test_get_weighted_torques_zero_torque_skips_division(self): """ Test that zero torque components skip division and remain zero. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) atom = MagicMock() atom.index = 0 @@ -854,7 +770,9 @@ def test_get_weighted_torques_zero_moi_raises(self): Should raise ZeroDivisionError when moment of inertia is zero in a dimension and torque in that dimension is non-zero. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) atom = MagicMock() atom.index = 0 @@ -889,7 +807,9 @@ def test_get_weighted_torques_negative_moi_raises(self): Should raise ValueError when moment of inertia is negative in a dimension and torque in that dimension is non-zero. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) atom = MagicMock() atom.index = 0 @@ -927,7 +847,9 @@ def test_create_submatrix_basic_outer_product(self): """ Test with known vectors to verify correct outer product. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) data_i = np.array([1, 0, 0]) data_j = np.array([0, 1, 0]) @@ -941,7 +863,9 @@ def test_create_submatrix_zero_vectors_returns_zero_matrix(self): """ Test that all-zero input vectors return a zero matrix. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) data_i = np.zeros(3) data_j = np.zeros(3) @@ -954,7 +878,9 @@ def test_create_submatrix_single_frame(self): Test that one frame should return the outer product of the single pair of vectors. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) vec_i = np.array([1, 2, 3]) vec_j = np.array([4, 5, 6]) @@ -967,7 +893,9 @@ def test_create_submatrix_symmetric_result_when_data_equal(self): """ Test that if data_i == data_j, the result is symmetric. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) data = np.array([1, 2, 3]) result = level_manager.create_submatrix(data, data) @@ -985,9 +913,11 @@ def test_build_covariance_matrices_atomic(self): """ # Instantiate your class (replace YourClass with actual class name) - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) - # Mock entropy_manager and _get_molecule_container + # Mock entropy_manager and get_molecule_container entropy_manager = MagicMock() # Fake atom with minimal attributes @@ -1000,8 +930,8 @@ def test_build_covariance_matrices_atomic(self): fake_mol = MagicMock() fake_mol.atoms = [atom] - # Always return fake_mol from _get_molecule_container - entropy_manager._get_molecule_container = MagicMock(return_value=fake_mol) + # Always return fake_mol from get_molecule_container + universe_operations.get_molecule_container = MagicMock(return_value=fake_mol) # Mock reduced_atom with trajectory yielding two timesteps timestep1 = MagicMock() @@ -1043,26 +973,35 @@ def test_build_covariance_matrices_atomic(self): self.assertEqual(len(force_matrices["res"]), len(groups)) self.assertEqual(len(force_matrices["poly"]), len(groups)) - # Check _get_molecule_container call count: 2 timesteps * 2 molecules = 4 calls - self.assertEqual(entropy_manager._get_molecule_container.call_count, 10) + # Check get_molecule_container call count: 2 timesteps * 2 molecules = 4 calls + self.assertEqual(universe_operations.get_molecule_container.call_count, 4) # Check update_force_torque_matrices call count: self.assertEqual(level_manager.update_force_torque_matrices.call_count, 6) def test_update_force_torque_matrices_united_atom(self): """ - Test that `update_force_torque_matrices` correctly updates force and torque - matrices for the 'united_atom' level, assigning per-residue matrices and - incrementing frame counts. + Test that update_force_torque_matrices() correctly initializes force and torque + matrices for the 'united_atom' level. + + Ensures: + - The matrices are initialized for each UA group key. + - Frame counts are incremented correctly. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + universe_operations.new_U_select_atom = MagicMock() + + level_manager = LevelManager(universe_operations) + entropy_manager = MagicMock() run_manager = MagicMock() entropy_manager._run_manager = run_manager - mock_residue_group = MagicMock() - mock_residue_group.trajectory.__getitem__.return_value = None - run_manager.new_U_select_atom.return_value = mock_residue_group + mock_res = MagicMock() + mock_res.trajectory = MagicMock() + mock_res.trajectory.__getitem__.return_value = None + + universe_operations.new_U_select_atom.return_value = mock_res mock_residue1 = MagicMock() mock_residue1.atoms.indices = [0, 2] @@ -1072,9 +1011,9 @@ def test_update_force_torque_matrices_united_atom(self): mol = MagicMock() mol.residues = [mock_residue1, mock_residue2] - f_mat_mock = np.array([[1]]) - t_mat_mock = np.array([[2]]) - level_manager.get_matrices = MagicMock(return_value=(f_mat_mock, t_mat_mock)) + f_mat = np.array([[1]]) + t_mat = np.array([[2]]) + level_manager.get_matrices = MagicMock(return_value=(f_mat, t_mat)) force_avg = {"ua": {}, "res": [None], "poly": [None]} torque_avg = {"ua": {}, "res": [None], "poly": [None]} @@ -1086,7 +1025,7 @@ def test_update_force_torque_matrices_united_atom(self): group_id=0, level="united_atom", level_list=["residue", "united_atom"], - time_index=5, + time_index=0, num_frames=10, force_avg=force_avg, torque_avg=torque_avg, @@ -1094,35 +1033,51 @@ def test_update_force_torque_matrices_united_atom(self): force_partitioning=0.5, ) - expected_keys = [(0, 0), (0, 1)] - for key in expected_keys: - np.testing.assert_array_equal(force_avg["ua"][key], f_mat_mock) - np.testing.assert_array_equal(torque_avg["ua"][key], t_mat_mock) - self.assertEqual(frame_counts["ua"][key], 1) + assert (0, 0) in force_avg["ua"] + assert (0, 1) in force_avg["ua"] + assert (0, 0) in torque_avg["ua"] + assert (0, 1) in torque_avg["ua"] + + np.testing.assert_array_equal(force_avg["ua"][(0, 0)], f_mat) + np.testing.assert_array_equal(force_avg["ua"][(0, 1)], f_mat) + np.testing.assert_array_equal(torque_avg["ua"][(0, 0)], t_mat) + np.testing.assert_array_equal(torque_avg["ua"][(0, 1)], t_mat) + + assert frame_counts["ua"][(0, 0)] == 1 + assert frame_counts["ua"][(0, 1)] == 1 def test_update_force_torque_matrices_united_atom_increment(self): """ - Test that `update_force_torque_matrices` correctly updates force and torque - matrices for the 'united_atom' level when the key already exists. + Test that update_force_torque_matrices() correctly updates (increments) + existing force and torque matrices for the 'united_atom' level. + + Confirms correct incremental averaging behavior. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + universe_operations.new_U_select_atom = MagicMock() + + level_manager = LevelManager(universe_operations) + entropy_manager = MagicMock() mol = MagicMock() - # Simulate one residue with two atoms residue = MagicMock() residue.atoms.indices = [0, 1] mol.residues = [residue] + mol.trajectory = MagicMock() mol.trajectory.__getitem__.return_value = None selected_atoms = MagicMock() - entropy_manager._run_manager.new_U_select_atom.return_value = selected_atoms + selected_atoms.trajectory = MagicMock() selected_atoms.trajectory.__getitem__.return_value = None - f_mat_1 = np.array([[1.0]], dtype=np.float64) - t_mat_1 = np.array([[2.0]], dtype=np.float64) - f_mat_2 = np.array([[3.0]], dtype=np.float64) - t_mat_2 = np.array([[4.0]], dtype=np.float64) + universe_operations.new_U_select_atom.return_value = selected_atoms + + f_mat_1 = np.array([[1.0]]) + t_mat_1 = np.array([[2.0]]) + + f_mat_2 = np.array([[3.0]]) + t_mat_2 = np.array([[4.0]]) level_manager.get_matrices = MagicMock(return_value=(f_mat_1, t_mat_1)) @@ -1130,7 +1085,6 @@ def test_update_force_torque_matrices_united_atom_increment(self): torque_avg = {"ua": {}, "res": [None], "poly": [None]} frame_counts = {"ua": {}, "res": [None], "poly": [None]} - # First call: initialize level_manager.update_force_torque_matrices( entropy_manager=entropy_manager, mol=mol, @@ -1145,7 +1099,6 @@ def test_update_force_torque_matrices_united_atom_increment(self): force_partitioning=0.5, ) - # Second call: update level_manager.get_matrices = MagicMock(return_value=(f_mat_2, t_mat_2)) level_manager.update_force_torque_matrices( @@ -1175,7 +1128,9 @@ def test_update_force_torque_matrices_residue(self): matrices for the 'residue' level, assigning whole-molecule matrices and incrementing frame counts. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) entropy_manager = MagicMock() mol = MagicMock() mol.trajectory.__getitem__.return_value = None @@ -1213,7 +1168,9 @@ def test_update_force_torque_matrices_incremental_average(self): Ensures that float precision is maintained and no casting errors occur. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) entropy_manager = MagicMock() mol = MagicMock() mol.trajectory.__getitem__.return_value = None @@ -1273,7 +1230,9 @@ def test_filter_zero_rows_columns_no_zeros(self): """ Test that matrix with no zero-only rows or columns should return unchanged. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) matrix = np.array([[1, 2], [3, 4]]) result = level_manager.filter_zero_rows_columns(matrix) @@ -1283,7 +1242,9 @@ def test_filter_zero_rows_columns_remove_rows_and_columns(self): """ Test that matrix with zero-only rows and columns should return reduced matrix. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) matrix = np.array([[0, 0, 0], [0, 5, 0], [0, 0, 0]]) expected = np.array([[5]]) @@ -1294,7 +1255,9 @@ def test_filter_zero_rows_columns_all_zeros(self): """ Test that matrix with all zeros should return an empty matrix. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) matrix = np.zeros((3, 3)) result = level_manager.filter_zero_rows_columns(matrix) @@ -1305,144 +1268,11 @@ def test_filter_zero_rows_columns_partial_zero_removal(self): """ Matrix with zeros in specific rows/columns should remove only those. """ - level_manager = LevelManager() + universe_operations = UniverseOperations() + + level_manager = LevelManager(universe_operations) matrix = np.array([[0, 0, 0], [1, 2, 3], [0, 0, 0]]) expected = np.array([[1, 2, 3]]) result = level_manager.filter_zero_rows_columns(matrix) np.testing.assert_array_equal(result, expected) - - def test_build_conformational_states_united_atom_accumulates_states(self): - """ - Test that the 'build_conformational_states' method correctly accumulates - united atom level conformational states for multiple molecules within the - same group. - - Specifically, when called with two molecules in the same group, the method - should append the states returned for the second molecule to the list of - states for the first molecule, resulting in a nested list structure. - - Verifies: - - The states_ua dictionary accumulates states as a nested list. - - The compute_dihedral_conformations method is called once per molecule. - """ - level_manager = LevelManager() - entropy_manager = MagicMock() - reduced_atom = MagicMock() - ce = MagicMock() - - # Setup mock residue for molecules - residue = MagicMock() - residue.atoms.indices = [10, 11, 12] - - # Setup two mock molecules with the same residue - mol_0 = MagicMock() - mol_0.residues = [residue] - mol_1 = MagicMock() - mol_1.residues = [residue] - - # entropy_manager returns different molecules by mol_id - entropy_manager._get_molecule_container.side_effect = [mol_0, mol_1] - - # new_U_select_atom returns dummy selections twice per molecule call - dummy_sel_1 = MagicMock() - dummy_sel_2 = MagicMock() - # For mol_0: light then heavy - # For mol_1: light then heavy - entropy_manager._run_manager.new_U_select_atom.side_effect = [ - dummy_sel_1, - dummy_sel_2, - dummy_sel_1, - dummy_sel_2, - ] - - # Mock compute_dihedral_conformations to return different states for each call - state_1 = ["ua_state_1"] - state_2 = ["ua_state_2"] - level_manager.compute_dihedral_conformations = MagicMock( - side_effect=[state_1, state_2] - ) - - groups = {0: [0, 1]} # Group 0 contains molecule 0 and molecule 1 - levels = [["united_atom"], ["united_atom"]] - start, end, step = 0, 10, 1 - number_frames = 10 - bin_width = 0.1 - - states_ua, states_res = level_manager.build_conformational_states( - entropy_manager, - reduced_atom, - levels, - groups, - start, - end, - step, - number_frames, - bin_width, - ce, - ) - - assert states_ua[(0, 0)] == ["ua_state_1", "ua_state_2"] - - # Confirm compute_dihedral_conformations was called twice (once per molecule) - assert level_manager.compute_dihedral_conformations.call_count == 2 - - def test_build_conformational_states_residue_level_accumulates_states(self): - """ - Test that the 'build_conformational_states' method correctly accumulates - residue level conformational states for multiple molecules within the - same group. - - When called with multiple molecules assigned to the same group at residue level, - the method should concatenate the returned states into a single flat list. - - Verifies: - - The states_res list contains concatenated residue states from all molecules. - - The states_ua dictionary remains empty for residue level. - - compute_dihedral_conformations is called once per molecule. - """ - level_manager = LevelManager() - entropy_manager = MagicMock() - reduced_atom = MagicMock() - ce = MagicMock() - - # Setup molecule with no residues - mol = MagicMock() - mol.residues = [] - entropy_manager._get_molecule_container.return_value = mol - - # Setup return values for compute_dihedral_conformations - states_1 = ["res_state1"] - states_2 = ["res_state2"] - level_manager.compute_dihedral_conformations = MagicMock( - side_effect=[states_1, states_2] - ) - - # Setup inputs with 2 molecules in same group - groups = {0: [0, 1]} # Both mol 0 and mol 1 are in group 0 - levels = [["residue"], ["residue"]] - start, end, step = 0, 10, 1 - number_frames = 10 - bin_width = 0.1 - - # Run - states_ua, states_res = level_manager.build_conformational_states( - entropy_manager, - reduced_atom, - levels, - groups, - start, - end, - step, - number_frames, - bin_width, - ce, - ) - - # Confirm accumulation occurred - assert states_ua == {} - assert states_res[0] == ["res_state1", "res_state2"] - assert states_res == [["res_state1", "res_state2"]] - - # Assert both calls to compute_dihedral_conformations happened - assert level_manager.compute_dihedral_conformations.call_count == 2 diff --git a/tests/test_CodeEntropy/test_main.py b/tests/test_CodeEntropy/test_main.py index 6db9dd8..76feb52 100644 --- a/tests/test_CodeEntropy/test_main.py +++ b/tests/test_CodeEntropy/test_main.py @@ -93,11 +93,11 @@ def test_main_entry_point_runs(self): config_path = os.path.join(self.test_dir, "config.yaml") with open(config_path, "w") as f: - f.write("run1:\n" " end: 60\n" " selection_string: resid 1\n") + f.write("run1:\n" " end: 1\n" " selection_string: resname DA\n") citation_path = os.path.join(self.test_dir, "CITATION.cff") with open(citation_path, "w") as f: - f.write("run1:\n" " end: 60\n" " selection_string: resid 1\n") + f.write("\n") result = subprocess.run( [ @@ -126,6 +126,7 @@ def test_main_entry_point_runs(self): with open(output_file) as f: content = f.read() + print(content) self.assertIn("DA", content) diff --git a/tests/test_CodeEntropy/test_mda_universe_operations.py b/tests/test_CodeEntropy/test_mda_universe_operations.py new file mode 100644 index 0000000..fb4c891 --- /dev/null +++ b/tests/test_CodeEntropy/test_mda_universe_operations.py @@ -0,0 +1,315 @@ +import logging +import os +from unittest.mock import MagicMock, patch + +import MDAnalysis as mda +import numpy as np + +import tests.data as data +from CodeEntropy.mda_universe_operations import UniverseOperations +from tests.test_CodeEntropy.test_base import BaseTestCase + + +class TestUniverseOperations(BaseTestCase): + """ + Unit tests for UniverseOperations. + """ + + def setUp(self): + super().setUp() + self.test_data_dir = os.path.dirname(data.__file__) + + # Disable MDAnalysis and commands file logging entirely + logging.getLogger("MDAnalysis").handlers = [logging.NullHandler()] + logging.getLogger("commands").handlers = [logging.NullHandler()] + + @patch("CodeEntropy.mda_universe_operations.AnalysisFromFunction") + @patch("CodeEntropy.mda_universe_operations.mda.Merge") + def test_new_U_select_frame(self, MockMerge, MockAnalysisFromFunction): + """ + Unit test for UniverseOperations.new_U_select_frame(). + + Verifies that: + - The Universe is queried with select_atoms("all", updating=True) + - AnalysisFromFunction is called to obtain coordinates and forces + - mda.Merge is called with the selected AtomGroup + - The new universe returned by Merge.load_new receives the correct arrays + - The method returns the merged universe + """ + # Mock Universe and its components + mock_universe = MagicMock() + mock_trajectory = MagicMock() + mock_trajectory.__len__.return_value = 10 + mock_universe.trajectory = mock_trajectory + + mock_select_atoms = MagicMock() + mock_universe.select_atoms.return_value = mock_select_atoms + + # Mock AnalysisFromFunction results for coordinates, forces, and dimensions + coords = np.random.rand(10, 100, 3) + forces = np.random.rand(10, 100, 3) + + mock_coords_analysis = MagicMock() + mock_coords_analysis.run.return_value.results = {"timeseries": coords} + + mock_forces_analysis = MagicMock() + mock_forces_analysis.run.return_value.results = {"timeseries": forces} + + # Set the side effects for the three AnalysisFromFunction calls + MockAnalysisFromFunction.side_effect = [ + mock_coords_analysis, + mock_forces_analysis, + ] + + # Mock the merge operation + mock_merged_universe = MagicMock() + MockMerge.return_value = mock_merged_universe + + ops = UniverseOperations() + result = ops.new_U_select_frame(mock_universe) + + mock_universe.select_atoms.assert_called_once_with("all", updating=True) + MockMerge.assert_called_once_with(mock_select_atoms) + + # Ensure the 'load_new' method was called with the correct arguments + mock_merged_universe.load_new.assert_called_once() + args, kwargs = mock_merged_universe.load_new.call_args + + # Assert that the arrays are passed correctly + np.testing.assert_array_equal(args[0], coords) + np.testing.assert_array_equal(kwargs["forces"], forces) + + # Check if format was included in the kwargs + self.assertIn("format", kwargs) + + # Ensure the result is the mock merged universe + self.assertEqual(result, mock_merged_universe) + + @patch("CodeEntropy.mda_universe_operations.AnalysisFromFunction") + @patch("CodeEntropy.mda_universe_operations.mda.Merge") + def test_new_U_select_atom(self, MockMerge, MockAnalysisFromFunction): + """ + Unit test for UniverseOperations.new_U_select_atom(). + + Ensures that: + - The Universe is queried with the correct selection string + - Coordinates and forces are extracted via AnalysisFromFunction + - mda.Merge receives the AtomGroup from select_atoms + - The new universe is populated with the expected data via load_new() + - The returned universe is the object created by Merge + """ + # Mock Universe and its components + mock_universe = MagicMock() + mock_select_atoms = MagicMock() + mock_universe.select_atoms.return_value = mock_select_atoms + + # Mock AnalysisFromFunction results for coordinates, forces, and dimensions + coords = np.random.rand(10, 100, 3) + forces = np.random.rand(10, 100, 3) + + mock_coords_analysis = MagicMock() + mock_coords_analysis.run.return_value.results = {"timeseries": coords} + + mock_forces_analysis = MagicMock() + mock_forces_analysis.run.return_value.results = {"timeseries": forces} + + # Set the side effects for the three AnalysisFromFunction calls + MockAnalysisFromFunction.side_effect = [ + mock_coords_analysis, + mock_forces_analysis, + ] + + # Mock the merge operation + mock_merged_universe = MagicMock() + MockMerge.return_value = mock_merged_universe + + ops = UniverseOperations() + + result = ops.new_U_select_atom(mock_universe, select_string="resid 1-10") + + mock_universe.select_atoms.assert_called_once_with("resid 1-10", updating=True) + MockMerge.assert_called_once_with(mock_select_atoms) + + # Ensure the 'load_new' method was called with the correct arguments + mock_merged_universe.load_new.assert_called_once() + args, kwargs = mock_merged_universe.load_new.call_args + + # Assert that the arrays are passed correctly + np.testing.assert_array_equal(args[0], coords) + np.testing.assert_array_equal(kwargs["forces"], forces) + + # Check if format was included in the kwargs + self.assertIn("format", kwargs) + + # Ensure the result is the mock merged universe + self.assertEqual(result, mock_merged_universe) + + def test_get_molecule_container(self): + """ + Integration test for UniverseOperations.get_molecule_container(). + + Uses a real MDAnalysis Universe loaded from test trajectory files. + Confirms that: + - The correct fragment for a given molecule index is selected + - The returned reduced Universe contains exactly the expected atom indices + - The number of atoms matches the original fragment + """ + tprfile = os.path.join(self.test_data_dir, "md_A4_dna.tpr") + trrfile = os.path.join(self.test_data_dir, "md_A4_dna_xf.trr") + + u = mda.Universe(tprfile, trrfile) + + ops = UniverseOperations() + + molecule_id = 0 + + fragment = u.atoms.fragments[molecule_id] + expected_indices = fragment.indices + + mol_u = ops.get_molecule_container(u, molecule_id) + + selected_indices = mol_u.atoms.indices + + self.assertSetEqual(set(selected_indices), set(expected_indices)) + self.assertEqual(len(selected_indices), len(expected_indices)) + + @patch("CodeEntropy.mda_universe_operations.AnalysisFromFunction") + @patch("CodeEntropy.mda_universe_operations.mda.Merge") + @patch("CodeEntropy.mda_universe_operations.mda.Universe") + def test_merge_forces(self, MockUniverse, MockMerge, MockAnalysisFromFunction): + """ + Unit test for UniverseOperations.merge_forces(). + + This test ensures that: + - Two MDAnalysis Universes are created: one for coordinates + (tprfile + trrfile) and one for forces (tprfile + forcefile). + - Both Universes correctly return AtomGroups via select_atoms("all"). + - Coordinates and forces are extracted using AnalysisFromFunction. + - mda.Merge is called with the coordinate AtomGroup. + - The merged Universe receives the correct coordinate and force arrays + through load_new(). + - When kcal=False, force values are passed through unchanged + (no kcal→kJ conversion). + - The returned universe is the same object returned by mda.Merge(). + """ + + mock_u_coords = MagicMock() + mock_u_force = MagicMock() + MockUniverse.side_effect = [mock_u_coords, mock_u_force] + + # Each universe returns a mock AtomGroup from select_atoms("all") + mock_ag_coords = MagicMock() + mock_ag_force = MagicMock() + mock_u_coords.select_atoms.return_value = mock_ag_coords + mock_u_force.select_atoms.return_value = mock_ag_force + + coords = np.random.rand(5, 10, 3) + forces = np.random.rand(5, 10, 3) + + mock_coords_analysis = MagicMock() + mock_coords_analysis.run.return_value.results = {"timeseries": coords} + + mock_forces_analysis = MagicMock() + mock_forces_analysis.run.return_value.results = {"timeseries": forces} + + # Two calls: first for coordinates, second for forces + MockAnalysisFromFunction.side_effect = [ + mock_coords_analysis, + mock_forces_analysis, + ] + + mock_merged = MagicMock() + MockMerge.return_value = mock_merged + + ops = UniverseOperations() + result = ops.merge_forces( + tprfile="topol.tpr", + trrfile="traj.trr", + forcefile="forces.trr", + fileformat=None, + kcal=False, + ) + + self.assertEqual(MockUniverse.call_count, 2) + MockUniverse.assert_any_call("topol.tpr", "traj.trr", format=None) + MockUniverse.assert_any_call("topol.tpr", "forces.trr", format=None) + + mock_u_coords.select_atoms.assert_called_once_with("all") + mock_u_force.select_atoms.assert_called_once_with("all") + + self.assertEqual(MockAnalysisFromFunction.call_count, 2) + + MockMerge.assert_called_once_with(mock_ag_coords) + + mock_merged.load_new.assert_called_once() + args, kwargs = mock_merged.load_new.call_args + + # Coordinates passed positionally + np.testing.assert_array_equal(args[0], coords) + + # Forces passed via kwargs + np.testing.assert_array_equal(kwargs["forces"], forces) + + # Finally the function returns the merged universe + self.assertEqual(result, mock_merged) + + @patch("CodeEntropy.mda_universe_operations.AnalysisFromFunction") + @patch("CodeEntropy.mda_universe_operations.mda.Merge") + @patch("CodeEntropy.mda_universe_operations.mda.Universe") + def test_merge_forces_kcal_conversion( + self, MockUniverse, MockMerge, MockAnalysisFromFunction + ): + """ + Unit test for UniverseOperations.merge_forces() covering the kcal→kJ + conversion branch. + + Verifies that: + - Two Universe objects are constructed for coords and forces. + - Each Universe returns an AtomGroup via select_atoms("all"). + - AnalysisFromFunction is called twice. + - Forces are multiplied EXACTLY once by 4.184 when kcal=True. + - Merge() is called with the coordinate AtomGroup. + - load_new() receives the correct coordinates and converted forces. + - The returned Universe is the Merge() result. + """ + mock_u_coords = MagicMock() + mock_u_force = MagicMock() + MockUniverse.side_effect = [mock_u_coords, mock_u_force] + + mock_ag_coords = MagicMock() + mock_ag_force = MagicMock() + mock_u_coords.select_atoms.return_value = mock_ag_coords + mock_u_force.select_atoms.return_value = mock_ag_force + + coords = np.ones((2, 3, 3)) + + original_forces = np.ones((2, 3, 3)) + mock_forces_array = original_forces.copy() + + # Mock AnalysisFromFunction return values + mock_coords_analysis = MagicMock() + mock_coords_analysis.run.return_value.results = {"timeseries": coords} + + mock_forces_analysis = MagicMock() + mock_forces_analysis.run.return_value.results = { + "timeseries": mock_forces_array + } + + MockAnalysisFromFunction.side_effect = [ + mock_coords_analysis, + mock_forces_analysis, + ] + + mock_merged = MagicMock() + MockMerge.return_value = mock_merged + + ops = UniverseOperations() + result = ops.merge_forces("t.tpr", "c.trr", "f.trr", kcal=True) + + _, kwargs = mock_merged.load_new.call_args + + expected_forces = original_forces * 4.184 + np.testing.assert_array_equal(kwargs["forces"], expected_forces) + np.testing.assert_array_equal(mock_merged.load_new.call_args[0][0], coords) + + self.assertEqual(result, mock_merged) diff --git a/tests/test_CodeEntropy/test_run.py b/tests/test_CodeEntropy/test_run.py index 42a6232..0174df1 100644 --- a/tests/test_CodeEntropy/test_run.py +++ b/tests/test_CodeEntropy/test_run.py @@ -3,7 +3,6 @@ from io import StringIO from unittest.mock import MagicMock, mock_open, patch -import numpy as np import requests import yaml from rich.console import Console @@ -360,6 +359,75 @@ def test_run_entropy_workflow(self): ) mock_entropy_manager.execute.assert_called_once() + def test_run_entropy_workflow_with_forcefile(self): + """ + Test the else-branch in run_entropy_workflow where forcefile is not None. + """ + run_manager = RunManager("mock_folder/job001") + run_manager._logging_config = MagicMock() + run_manager._config_manager = MagicMock() + run_manager.load_citation_data = MagicMock() + run_manager.show_splash = MagicMock() + run_manager._data_logger = MagicMock() + run_manager.folder = self.test_dir + + # Logger mock + mock_logger = MagicMock() + run_manager._logging_config.setup_logging.return_value = mock_logger + + # Config contains force_file + run_manager._config_manager.load_config.return_value = { + "test_run": { + "top_traj_file": ["/path/to/tpr", "/path/to/trr"], + "force_file": "/path/to/forces", + "file_format": "gro", + "kcal_force_units": "kcal", + "selection_string": "all", + "output_file": "output.json", + "verbose": False, + } + } + + # Parse args mock + mock_args = MagicMock() + mock_args.output_file = "output.json" + mock_args.verbose = False + mock_args.top_traj_file = ["/path/to/tpr", "/path/to/trr"] + mock_args.force_file = "/path/to/forces" + mock_args.file_format = "gro" + mock_args.kcal_force_units = "kcal" + mock_args.selection_string = "all" + + parser = run_manager._config_manager.setup_argparse.return_value + parser.parse_known_args.return_value = (mock_args, []) + run_manager._config_manager.merge_configs.return_value = mock_args + + # Mock UniverseOperations.merge_forces + with ( + unittest.mock.patch( + "CodeEntropy.run.EntropyManager", return_value=MagicMock() + ) as Entropy_patch, + unittest.mock.patch("CodeEntropy.run.UniverseOperations") as UOps_patch, + unittest.mock.patch("CodeEntropy.run.mda.Universe") as mock_universe, + ): + mock_universe_ops = UOps_patch.return_value + mock_universe_ops.merge_forces.return_value = MagicMock() + + run_manager.run_entropy_workflow() + + # Ensure merge_forces is used + mock_universe_ops.merge_forces.assert_called_once_with( + "/path/to/tpr", + ["/path/to/trr"], + "/path/to/forces", + "gro", + "kcal", + ) + + mock_universe.assert_not_called() + + Entropy_patch.return_value.execute.assert_called_once() + def test_run_configuration_warning(self): """ Test that a warning is logged when the config entry is not a dictionary. @@ -513,285 +581,6 @@ def test_run_entropy_workflow_missing_selection_string(self): with self.assertRaisesRegex(ValueError, "Missing 'selection_string' argument."): run_manager.run_entropy_workflow() - @patch("CodeEntropy.run.EntropyManager") - @patch("CodeEntropy.run.GroupMolecules") - @patch("CodeEntropy.run.LevelManager") - @patch("CodeEntropy.run.AnalysisFromFunction") - @patch("CodeEntropy.run.mda.Merge") - @patch("CodeEntropy.run.mda.Universe") - def test_merges_forcefile_correctly( - self, MockUniverse, MockMerge, MockAnalysis, *_mocks - ): - """ - Ensure that coordinates and forces are merged correctly - when a force file is provided. - """ - run_manager = RunManager("mock/job001") - mock_logger = MagicMock() - run_manager._logging_config = MagicMock( - setup_logging=MagicMock(return_value=mock_logger) - ) - run_manager._config_manager = MagicMock() - run_manager._data_logger = MagicMock() - run_manager.show_splash = MagicMock() - - args = MagicMock( - top_traj_file=["topol.tpr", "traj.xtc"], - force_file="forces.xtc", - file_format="xtc", - selection_string="all", - verbose=False, - output_file="output.json", - kcal_force_units=False, - ) - run_manager._config_manager.load_config.return_value = {"run1": {}} - parser = run_manager._config_manager.setup_argparse.return_value - parser.parse_known_args.return_value = (args, []) - run_manager._config_manager.merge_configs.return_value = args - run_manager._config_manager.input_parameters_validation = MagicMock() - - mock_u, mock_u_force = MagicMock(), MagicMock() - MockUniverse.side_effect = [mock_u, mock_u_force] - mock_atoms, mock_atoms_force = MagicMock(), MagicMock() - mock_u.select_atoms.return_value = mock_atoms - mock_u_force.select_atoms.return_value = mock_atoms_force - - coords, forces = np.random.rand(2, 3, 3), np.random.rand(2, 3, 3) - MockAnalysis.side_effect = [ - MagicMock( - run=MagicMock(return_value=MagicMock(results={"timeseries": coords})) - ), - MagicMock( - run=MagicMock(return_value=MagicMock(results={"timeseries": forces})) - ), - ] - mock_merge = MagicMock() - MockMerge.return_value = mock_merge - - run_manager.run_entropy_workflow() - - MockUniverse.assert_any_call("topol.tpr", ["traj.xtc"], format="xtc") - MockUniverse.assert_any_call("topol.tpr", "forces.xtc", format="xtc") - MockMerge.assert_called_once_with(mock_atoms) - mock_merge.load_new.assert_called_once_with(coords, forces=forces) - self.assertEqual(mock_logger.debug.call_count, 3) - - @patch("CodeEntropy.run.EntropyManager") - @patch("CodeEntropy.run.GroupMolecules") - @patch("CodeEntropy.run.LevelManager") - @patch("CodeEntropy.run.AnalysisFromFunction") - @patch("CodeEntropy.run.mda.Merge") - @patch("CodeEntropy.run.mda.Universe") - def test_converts_kcal_to_kj(self, MockUniverse, MockMerge, MockAnalysis, *_mocks): - """ - Ensure that forces are scaled by 4.184 when kcal_force_units=True. - """ - run_manager = RunManager("mock/job001") - mock_logger = MagicMock() - run_manager._logging_config = MagicMock( - setup_logging=MagicMock(return_value=mock_logger) - ) - run_manager._config_manager = MagicMock() - run_manager._data_logger = MagicMock() - run_manager.show_splash = MagicMock() - - args = MagicMock( - top_traj_file=["topol.tpr", "traj.xtc"], - force_file="forces.xtc", - file_format="xtc", - selection_string="all", - verbose=False, - output_file="output.json", - kcal_force_units=True, - ) - run_manager._config_manager.load_config.return_value = {"run1": {}} - parser = run_manager._config_manager.setup_argparse.return_value - parser.parse_known_args.return_value = (args, []) - run_manager._config_manager.merge_configs.return_value = args - run_manager._config_manager.input_parameters_validation = MagicMock() - - mock_u, mock_u_force = MagicMock(), MagicMock() - MockUniverse.side_effect = [mock_u, mock_u_force] - mock_atoms, mock_atoms_force = MagicMock(), MagicMock() - mock_u.select_atoms.return_value = mock_atoms - mock_u_force.select_atoms.return_value = mock_atoms_force - - coords = np.random.rand(2, 3, 3) - forces = np.random.rand(2, 3, 3) - forces_orig = forces.copy() - MockAnalysis.side_effect = [ - MagicMock( - run=MagicMock(return_value=MagicMock(results={"timeseries": coords})) - ), - MagicMock( - run=MagicMock(return_value=MagicMock(results={"timeseries": forces})) - ), - ] - mock_merge = MagicMock() - MockMerge.return_value = mock_merge - - run_manager.run_entropy_workflow() - _, kwargs = mock_merge.load_new.call_args - np.testing.assert_allclose(kwargs["forces"], forces_orig * 4.184) - - @patch("CodeEntropy.run.EntropyManager") - @patch("CodeEntropy.run.GroupMolecules") - @patch("CodeEntropy.run.LevelManager") - @patch("CodeEntropy.run.AnalysisFromFunction") - @patch("CodeEntropy.run.mda.Merge") - @patch("CodeEntropy.run.mda.Universe") - def test_logs_debug_messages(self, MockUniverse, MockMerge, MockAnalysis, *_mocks): - """ - Ensure that loading and merging steps produce debug logs. - """ - run_manager = RunManager("mock/job001") - mock_logger = MagicMock() - run_manager._logging_config = MagicMock( - setup_logging=MagicMock(return_value=mock_logger) - ) - run_manager._config_manager = MagicMock() - run_manager._data_logger = MagicMock() - run_manager.show_splash = MagicMock() - - args = MagicMock( - top_traj_file=["topol.tpr", "traj.xtc"], - force_file="forces.xtc", - file_format="xtc", - selection_string="all", - verbose=False, - output_file="output.json", - kcal_force_units=False, - ) - run_manager._config_manager.load_config.return_value = {"run1": {}} - parser = run_manager._config_manager.setup_argparse.return_value - parser.parse_known_args.return_value = (args, []) - run_manager._config_manager.merge_configs.return_value = args - run_manager._config_manager.input_parameters_validation = MagicMock() - - mock_u, mock_u_force = MagicMock(), MagicMock() - MockUniverse.side_effect = [mock_u, mock_u_force] - mock_atoms, mock_atoms_force = MagicMock(), MagicMock() - mock_u.select_atoms.return_value = mock_atoms - mock_u_force.select_atoms.return_value = mock_atoms_force - - arr = np.random.rand(2, 3, 3) - MockAnalysis.side_effect = [ - MagicMock( - run=MagicMock(return_value=MagicMock(results={"timeseries": arr})) - ), - MagicMock( - run=MagicMock(return_value=MagicMock(results={"timeseries": arr})) - ), - ] - MockMerge.return_value = MagicMock() - - run_manager.run_entropy_workflow() - debug_msgs = [c[0][0] for c in mock_logger.debug.call_args_list] - self.assertTrue(any("forces.xtc" in m for m in debug_msgs)) - self.assertTrue(any("Merging forces" in m for m in debug_msgs)) - - @patch("CodeEntropy.run.AnalysisFromFunction") - @patch("CodeEntropy.run.mda.Merge") - def test_new_U_select_frame(self, MockMerge, MockAnalysisFromFunction): - # Mock Universe and its components - mock_universe = MagicMock() - mock_trajectory = MagicMock() - mock_trajectory.__len__.return_value = 10 - mock_universe.trajectory = mock_trajectory - - mock_select_atoms = MagicMock() - mock_universe.select_atoms.return_value = mock_select_atoms - - # Mock AnalysisFromFunction results for coordinates, forces, and dimensions - coords = np.random.rand(10, 100, 3) - forces = np.random.rand(10, 100, 3) - - mock_coords_analysis = MagicMock() - mock_coords_analysis.run.return_value.results = {"timeseries": coords} - - mock_forces_analysis = MagicMock() - mock_forces_analysis.run.return_value.results = {"timeseries": forces} - - # Set the side effects for the three AnalysisFromFunction calls - MockAnalysisFromFunction.side_effect = [ - mock_coords_analysis, - mock_forces_analysis, - ] - - # Mock the merge operation - mock_merged_universe = MagicMock() - MockMerge.return_value = mock_merged_universe - - run_manager = RunManager("mock_folder/job001") - result = run_manager.new_U_select_frame(mock_universe) - - mock_universe.select_atoms.assert_called_once_with("all", updating=True) - MockMerge.assert_called_once_with(mock_select_atoms) - - # Ensure the 'load_new' method was called with the correct arguments - mock_merged_universe.load_new.assert_called_once() - args, kwargs = mock_merged_universe.load_new.call_args - - # Assert that the arrays are passed correctly - np.testing.assert_array_equal(args[0], coords) - np.testing.assert_array_equal(kwargs["forces"], forces) - - # Check if format was included in the kwargs - self.assertIn("format", kwargs) - - # Ensure the result is the mock merged universe - self.assertEqual(result, mock_merged_universe) - - @patch("CodeEntropy.run.AnalysisFromFunction") - @patch("CodeEntropy.run.mda.Merge") - def test_new_U_select_atom(self, MockMerge, MockAnalysisFromFunction): - # Mock Universe and its components - mock_universe = MagicMock() - mock_select_atoms = MagicMock() - mock_universe.select_atoms.return_value = mock_select_atoms - - # Mock AnalysisFromFunction results for coordinates, forces, and dimensions - coords = np.random.rand(10, 100, 3) - forces = np.random.rand(10, 100, 3) - - mock_coords_analysis = MagicMock() - mock_coords_analysis.run.return_value.results = {"timeseries": coords} - - mock_forces_analysis = MagicMock() - mock_forces_analysis.run.return_value.results = {"timeseries": forces} - - # Set the side effects for the three AnalysisFromFunction calls - MockAnalysisFromFunction.side_effect = [ - mock_coords_analysis, - mock_forces_analysis, - ] - - # Mock the merge operation - mock_merged_universe = MagicMock() - MockMerge.return_value = mock_merged_universe - - run_manager = RunManager("mock_folder/job001") - result = run_manager.new_U_select_atom( - mock_universe, select_string="resid 1-10" - ) - - mock_universe.select_atoms.assert_called_once_with("resid 1-10", updating=True) - MockMerge.assert_called_once_with(mock_select_atoms) - - # Ensure the 'load_new' method was called with the correct arguments - mock_merged_universe.load_new.assert_called_once() - args, kwargs = mock_merged_universe.load_new.call_args - - # Assert that the arrays are passed correctly - np.testing.assert_array_equal(args[0], coords) - np.testing.assert_array_equal(kwargs["forces"], forces) - - # Check if format was included in the kwargs - self.assertIn("format", kwargs) - - # Ensure the result is the mock merged universe - self.assertEqual(result, mock_merged_universe) - @patch("CodeEntropy.run.pickle.dump") @patch("CodeEntropy.run.open", create=True) def test_write_universe(self, mock_open, mock_pickle_dump):