diff --git a/CodeEntropy/calculations/ConformationFunctions.py b/CodeEntropy/calculations/ConformationFunctions.py deleted file mode 100644 index b51746a..0000000 --- a/CodeEntropy/calculations/ConformationFunctions.py +++ /dev/null @@ -1,89 +0,0 @@ -import logging - -import numpy as np - -logger = logging.getLogger(__name__) - -# from MDAnalysis.analysis.dihedrals import Dihedral - - -def assign_conformation( - data_container, dihedral, number_frames, bin_width, start, end, step -): - """ - Create a state vector, showing the state in which the input dihedral is - as a function of time. The function creates a histogram from the timeseries of the - dihedral angle values and identifies points of dominant occupancy - (called CONVEX TURNING POINTS). - Based on the identified TPs, states are assigned to each configuration of the - dihedral. - - Input - ----- - dihedral_atom_group : the group of 4 atoms defining the dihedral - number_frames : number of frames in the trajectory - bin_width : the width of the histogram bit, default 30 degrees - start : int, starting frame, will default to 0 - end : int, ending frame, will default to -1 (last frame in trajectory) - step : int, spacing between frames, will default to 1 - - Return - ------ - A timeseries with integer labels describing the state at each point in time. - - """ - conformations = np.zeros(number_frames) - phi = np.zeros(number_frames) - - # get the values of the angle for the dihedral - # dihedral angle values have a range from -180 to 180 - for timestep in data_container.trajectory[start:end:step]: - timestep_index = timestep.frame - start - value = dihedral.value() - # we want postive values in range 0 to 360 to make the peak assignment work - # using the fact that dihedrals have circular symetry - # (i.e. -15 degrees = +345 degrees) - if value < 0: - value += 360 - phi[timestep_index] = value - - # create a histogram using numpy - number_bins = int(360 / bin_width) - popul, bin_edges = np.histogram(a=phi, bins=number_bins, range=(0, 360)) - bin_value = [0.5 * (bin_edges[i] + bin_edges[i + 1]) for i in range(0, len(popul))] - - # identify "convex turning-points" and populate a list of peaks - # peak : a bin whose neighboring bins have smaller population - # NOTE might have problems if the peak is wide with a flat or sawtooth top - peak_values = [] - - for bin_index in range(number_bins): - # if there is no dihedrals in a bin then it cannot be a peak - if popul[bin_index] == 0: - pass - # being careful of the last bin - # (dihedrals have circular symmetry, the histogram does not) - elif ( - bin_index == number_bins - 1 - ): # the -1 is because the index starts with 0 not 1 - if ( - popul[bin_index] >= popul[bin_index - 1] - and popul[bin_index] >= popul[0] - ): - peak_values.append(bin_value[bin_index]) - else: - if ( - popul[bin_index] >= popul[bin_index - 1] - and popul[bin_index] >= popul[bin_index + 1] - ): - peak_values.append(bin_value[bin_index]) - - # go through each frame again and assign conformation state - for frame in range(number_frames): - # find the TP that the snapshot is least distant from - distances = [abs(phi[frame] - peak) for peak in peak_values] - conformations[frame] = np.argmin(distances) - - logger.debug(f"Final conformations: {conformations}") - - return conformations diff --git a/CodeEntropy/calculations/EntropyFunctions.py b/CodeEntropy/calculations/EntropyFunctions.py deleted file mode 100644 index aded08a..0000000 --- a/CodeEntropy/calculations/EntropyFunctions.py +++ /dev/null @@ -1,246 +0,0 @@ -import logging -import math - -# import matplotlib.pyplot as plt -# import MDAnalysis as mda -import numpy as np - -# import pandas as pd -from numpy import linalg as la - -from CodeEntropy.calculations import ConformationFunctions as CONF -from CodeEntropy.calculations import UnitsAndConversions as UAC - -logger = logging.getLogger(__name__) - -# import sys -# from ast import arg - - -# from CodeEntropy import NeighbourFunctions as NF - - -def frequency_calculation(lambdas, temp): - """ - Function to calculate an array of vibrational frequencies from the eigenvalues of - the covariance matrix. - - Calculated from eq. (3) in Higham, S.-Y. Chou, F. Gräter and R. H. Henchman, - Molecular Physics, 2018, 116, 1965–1976//eq. (3) in A. Chakravorty, J. Higham - and R. H. Henchman, J. Chem. Inf. Model., 2020, 60, 5540–5551 - - frequency=sqrt(λ/kT)/2π - - Input - ----- - lambdas : array of floats - eigenvalues of the covariance matrix - temp: float - temperature - - Returns - ------- - frequencies : array of floats - corresponding vibrational frequencies - """ - pi = np.pi - # get kT in Joules from given temperature - kT = UAC.get_KT2J(temp) - logger.debug(f"Temperature: {temp}, kT: {kT}") - - lambdas = np.array(lambdas) # Ensure input is a NumPy array - logger.debug(f"Eigenvalues (lambdas): {lambdas}") - - # Check for negatives and raise an error if any are found - if np.any(lambdas < 0): - logger.error(f"Negative eigenvalues encountered: {lambdas[lambdas < 0]}") - raise ValueError(f"Negative eigenvalues encountered: {lambdas[lambdas < 0]}") - - # Compute frequencies safely - frequencies = 1 / (2 * pi) * np.sqrt(lambdas / kT) - logger.debug(f"Calculated frequencies: {frequencies}") - - return frequencies - - -def vibrational_entropy(matrix, matrix_type, temp, highest_level): - """ - Function to calculate the vibrational entropy for each level calculated from eq. (4) - in J. Higham, S.-Y. Chou, F. Gräter and R. H. Henchman, Molecular Physics, 2018, - 116, 1965–1976 / eq. (2) in A. Chakravorty, J. Higham and R. H. Henchman, - J. Chem. Inf. Model., 2020, 60, 5540–5551. - - Input - ----- - matrix : matrix - force/torque covariance matrix - matrix_type: string - temp: float - temperature - highest_level: bool - is this the highest level of the heirarchy (whole molecule) - - Returns - ------- - S_vib_total : float - transvibrational/rovibrational entropy - """ - # N beads at a level => 3N x 3N covariance matrix => 3N eigenvalues - # Get eigenvalues of the given matrix and change units to SI units - lambdas = la.eigvals(matrix) - logger.debug(f"Eigenvalues (lambdas) before unit change: {lambdas}") - lambdas = UAC.change_lambda_units(lambdas) - logger.debug(f"Eigenvalues (lambdas) after unit change: {lambdas}") - - # Calculate frequencies from the eigenvalues - frequencies = frequency_calculation(lambdas, temp) - logger.debug(f"Calculated frequencies: {frequencies}") - - # Sort frequencies lowest to highest - frequencies = np.sort(frequencies) - logger.debug(f"Sorted frequencies: {frequencies}") - - kT = UAC.get_KT2J(temp) - logger.debug(f"Temperature: {temp}, kT: {kT}") - exponent = UAC.PLANCK_CONST * frequencies / kT - logger.debug(f"Exponent values: {exponent}") - power_positive = np.power(np.e, exponent) - power_negative = np.power(np.e, -exponent) - logger.debug(f"Power positive values: {power_positive}") - logger.debug(f"Power negative values: {power_negative}") - S_components = exponent / (power_positive - 1) - np.log(1 - power_negative) - S_components = ( - S_components * UAC.GAS_CONST - ) # multiply by R - get entropy in J mol^{-1} K^{-1} - logger.debug(f"Entropy components: {S_components}") - # N beads at a level => 3N x 3N covariance matrix => 3N eigenvalues - if matrix_type == "force": # force covariance matrix - if highest_level: # whole molecule level - we take all frequencies into account - S_vib_total = sum(S_components) - - # discard the 6 lowest frequencies to discard translation and rotation of the - # whole unit the overall translation and rotation of a unit is an internal - # motion of the level above - else: - S_vib_total = sum(S_components[6:]) - - else: # torque covariance matrix - we always take all values into account - S_vib_total = sum(S_components) - - logger.debug(f"Total vibrational entropy: {S_vib_total}") - - return S_vib_total - - -def conformational_entropy( - data_container, dihedrals, bin_width, start, end, step, number_frames -): - """ - Function to calculate conformational entropies using eq. (7) in Higham, S.-Y. Chou, - F. Gräter and R. H. Henchman, Molecular Physics, 2018, 116, 1965–1976 / eq. (4) in - A. Chakravorty, J. Higham and R. H. Henchman, J. Chem. Inf. Model., 2020, 60, - 5540–5551. - - Uses the adaptive enumeration method (AEM). - - Input - ----- - dihedrals : array - array of dihedrals in the molecule - Returns - ------- - S_conf_total : float - conformational entropy - """ - - S_conf_total = 0 - - # For each dihedral, identify the conformation in each frame - num_dihedrals = len(dihedrals) - conformation = np.zeros((num_dihedrals, number_frames)) - index = 0 - for dihedral in dihedrals: - conformation[index] = CONF.assign_conformation( - data_container, dihedral, number_frames, bin_width, start, end, step - ) - index += 1 - - logger.debug(f"Conformation matrix: {conformation}") - - # For each frame, convert the conformation of all dihedrals into a state string - states = ["" for x in range(number_frames)] - for frame_index in range(number_frames): - for index in range(num_dihedrals): - states[frame_index] += str(conformation[index][frame_index]) - - logger.debug(f"States: {states}") - - # Count how many times each state occurs, then use the probability to get the - # entropy - # entropy = sum over states p*ln(p) - values, counts = np.unique(states, return_counts=True) - for state in range(len(values)): - logger.debug(f"Unique states: {values}") - logger.debug(f"Counts: {counts}") - count = counts[state] - probability = count / number_frames - entropy = probability * np.log(probability) - S_conf_total += entropy - - # multiply by gas constant to get the units J/mol/K - S_conf_total *= -1 * UAC.GAS_CONST - - logger.debug(f"Total conformational entropy: {S_conf_total}") - - return S_conf_total - - -def orientational_entropy(neighbours_dict): - """ - Function to calculate orientational entropies from eq. (10) in J. Higham, S.-Y. - Chou, F. Gräter and R. H. Henchman, Molecular Physics, 2018, 116,3 1965–1976. - Number of orientations, Ω, is calculated using eq. (8) in J. Higham, - S.-Y. Chou, F. Gräter and R. H. Henchman, Molecular Physics, 2018, 116, - 3 1965–1976. - - σ is assumed to be 1 for the molecules we're concerned with and hence, - max {1, (Nc^3*π)^(1/2)} will always be (Nc^3*π)^(1/2). - - TODO future release - function for determing symmetry and symmetry numbers - maybe? - - Input - ----- - neighbours_dict : dictionary - dictionary of neighbours for the molecule - - should contain the type of neighbour molecule and the number of neighbour - molecules of that species - - Returns - ------- - S_or_total : float - orientational entropy - """ - - # Replaced molecule with neighbour as this is what the for loop uses - S_or_total = 0 - for neighbour in neighbours_dict: # we are going through neighbours - if neighbour in []: # water molecules - call POSEIDON functions - pass # TODO temporary until function is written - else: - # the bound ligand is always going to be a neighbour - omega = np.sqrt((neighbours_dict[neighbour] ** 3) * math.pi) - logger.debug(f"Omega for neighbour {neighbour}: {omega}") - # orientational entropy arising from each neighbouring species - # - we know the species is going to be a neighbour - S_or_component = math.log(omega) - logger.debug( - f"S_or_component (log(omega)) for neighbour {neighbour}: " - f"{S_or_component}" - ) - S_or_component *= UAC.GAS_CONST - logger.debug( - f"S_or_component after multiplying by GAS_CONST for neighbour " - f"{neighbour}: {S_or_component}" - ) - S_or_total += S_or_component - logger.debug( - f"S_or_total after adding component for neighbour {neighbour}: " - f"{S_or_total}" - ) - # TODO for future releases - # implement a case for molecules with hydrogen bonds but to a lesser - # extent than water - - logger.debug(f"Final total orientational entropy: {S_or_total}") - - return S_or_total diff --git a/CodeEntropy/calculations/GeometricFunctions.py b/CodeEntropy/calculations/GeometricFunctions.py deleted file mode 100644 index b63bf39..0000000 --- a/CodeEntropy/calculations/GeometricFunctions.py +++ /dev/null @@ -1,463 +0,0 @@ -import logging - -import numpy as np - -logger = logging.getLogger(__name__) - - -def get_beads(data_container, level): - """ - Function to define beads depending on the level in the hierarchy. - - Input - ----- - data_container : the MDAnalysis universe - level : the heirarchy level (polymer, residue, or united atom) - - Output - ------ - list_of_beads : the relevent beads - """ - - if level == "polymer": - list_of_beads = [] - atom_group = "all" - list_of_beads.append(data_container.select_atoms(atom_group)) - - if level == "residue": - list_of_beads = [] - num_residues = len(data_container.residues) - for residue in range(num_residues): - atom_group = "resindex " + str(residue) - list_of_beads.append(data_container.select_atoms(atom_group)) - - if level == "united_atom": - list_of_beads = [] - heavy_atoms = data_container.select_atoms("not name H*") - for atom in heavy_atoms: - atom_group = ( - "index " - + str(atom.index) - + " or (name H* and bonded index " - + str(atom.index) - + ")" - ) - list_of_beads.append(data_container.select_atoms(atom_group)) - - logger.debug(f"List of beads: {list_of_beads}") - - return list_of_beads - - -def get_axes(data_container, level, index=0): - """ - Function to set the translational and rotational axes. - The translational axes are based on the principal axes of the unit one level larger - than the level we are interested in (except for the polymer level where there is no - larger unit). The rotational axes use the covalent links between residues or atoms - where possible to define the axes, or if the unit is not bonded to others of the - same level the prinicpal axes of the unit are used. - - Input - ----- - data_container : the information about the molecule and trajectory - level : the level (united atom, residue, or polymer) of interest - index : residue index (integer) - - Output - ------ - trans_axes : translational axes - rot_axes : rotational axes - """ - index = int(index) - - if level == "polymer": - # for polymer use principle axis for both translation and rotation - trans_axes = data_container.atoms.principal_axes() - rot_axes = data_container.atoms.principal_axes() - - if level == "residue": - # Translation - # for residues use principal axes of whole molecule for translation - trans_axes = data_container.atoms.principal_axes() - - # Rotation - # find bonds between atoms in residue of interest and other residues - # we are assuming bonds only exist between adjacent residues - # (linear chains of residues) - # TODO refine selection so that it will work for branched polymers - index_prev = index - 1 - index_next = index + 1 - atom_set = data_container.select_atoms( - f"(resindex {index_prev} or resindex {index_next}) and bonded resid {index}" - ) - residue = data_container.select_atoms(f"resindex {index}") - - if len(atom_set) == 0: - # if no bonds to other residues use pricipal axes of residue - rot_axes = residue.atoms.principal_axes() - - else: - # set center of rotation to center of mass of the residue - center = residue.atoms.center_of_mass() - - # get vector for average position of bonded atoms - vector = get_avg_pos(atom_set, center) - - # use spherical coordinates function to get rotational axes - rot_axes = get_sphCoord_axes(vector) - - if level == "united_atom": - # Translation - # for united atoms use principal axes of residue for translation - trans_axes = data_container.residues.principal_axes() - - # Rotation - # for united atoms use heavy atoms bonded to the heavy atom - atom_set = data_container.select_atoms(f"not name H* and bonded index {index}") - - # center at position of heavy atom - atom_group = data_container.select_atoms(f"index {index}") - center = atom_group.positions[0] - - # get vector for average position of hydrogens - vector = get_avg_pos(atom_set, center) - - # use spherical coordinates function to get rotational axes - rot_axes = get_sphCoord_axes(vector) - - logger.debug(f"Translational Axes: {trans_axes}") - logger.debug(f"Rotational Axes: {rot_axes}") - - return trans_axes, rot_axes - - -def get_avg_pos(atom_set, center): - """ - Function to get the average position of a set of atoms. - - Input - ----- - atoms : MDAnalysis atom group - center : position for center of rotation - - Output - ------ - avg_position : three dimensional vector - """ - # start with an empty vector - avg_position = np.zeros((3)) - - # get number of atoms - number_atoms = len(atom_set.names) - - if number_atoms != 0: - # sum positions for all atoms in the given set - for atom_index in range(number_atoms): - atom_position = atom_set.atoms[atom_index].position - - avg_position += atom_position - - avg_position /= number_atoms # divide by number of atoms to get average - - else: - # if no atoms in set the unit has no bonds to restrict its rotational motion, - # so we can use a random vector to get the spherical coordinates axes - avg_position = np.random.random(3) - - # transform the average position to a coordinate system with the origin at center - avg_position = avg_position - center - - logger.debug(f"Average Position: {avg_position}") - - return avg_position - - -def get_sphCoord_axes(arg_r): - """ - For a given vector in space, treat it is a radial vector rooted at 0,0,0 and - derive a curvilinear coordinate system according to the rules of polar spherical - coordinates - """ - - x2y2 = arg_r[0] ** 2 + arg_r[1] ** 2 - r2 = x2y2 + arg_r[2] ** 2 - - # Check for division by zero - if r2 == 0.0: - raise ValueError("r2 is zero, cannot compute spherical coordinates.") - - if x2y2 == 0.0: - raise ValueError("x2y2 is zero, cannot compute sin_phi and cos_phi.") - - # Check for non-negative values inside the square root - if x2y2 / r2 < 0: - raise ValueError( - f"Negative value encountered for sin_theta calculation: {x2y2 / r2}. " - f"Cannot take square root." - ) - - if x2y2 < 0: - raise ValueError( - f"Negative value encountered for sin_phi and cos_phi calculation: {x2y2}. " - f"Cannot take square root." - ) - - if x2y2 != 0.0: - sin_theta = np.sqrt(x2y2 / r2) - cos_theta = arg_r[2] / np.sqrt(r2) - - sin_phi = arg_r[1] / np.sqrt(x2y2) - cos_phi = arg_r[0] / np.sqrt(x2y2) - - else: - sin_theta = 0.0 - cos_theta = 1 - - sin_phi = 0.0 - cos_phi = 1 - - # if abs(sin_theta) > 1 or abs(sin_phi) > 1: - # print('Bad sine : T {} , P {}'.format(sin_theta, sin_phi)) - - # cos_theta = np.sqrt(1 - sin_theta*sin_theta) - # cos_phi = np.sqrt(1 - sin_phi*sin_phi) - - # print('{} {} {}'.format(*arg_r)) - # print('Sin T : {}, cos T : {}'.format(sin_theta, cos_theta)) - # print('Sin P : {}, cos P : {}'.format(sin_phi, cos_phi)) - - spherical_basis = np.zeros((3, 3)) - - # r^ - spherical_basis[0, :] = np.asarray( - [sin_theta * cos_phi, sin_theta * sin_phi, cos_theta] - ) - - # Theta^ - spherical_basis[1, :] = np.asarray( - [cos_theta * cos_phi, cos_theta * sin_phi, -sin_theta] - ) - - # Phi^ - spherical_basis[2, :] = np.asarray([-sin_phi, cos_phi, 0.0]) - - logger.debug(f"Spherical Basis: {spherical_basis}") - - return spherical_basis - - -def get_weighted_forces( - data_container, bead, trans_axes, highest_level, force_partitioning=0.5 -): - """ - Function to calculate the mass weighted forces for a given bead. - - Input - ----- - bead : the part of the system to be considered - trans_axes : the axes relative to which the forces are located - - Output - ------ - weighted_force : the mass weighted sum of the forces in the bead - """ - - forces_trans = np.zeros((3,)) - - # Sum forces from all atoms in the bead - for atom in bead.atoms: - # update local forces in translational axes - forces_local = np.matmul(trans_axes, data_container.atoms[atom.index].force) - forces_trans += forces_local - - if highest_level: - # multiply by the force_partitioning parameter to avoid double counting - # of the forces on weakly correlated atoms - # the default value of force_partitioning is 0.5 (dividing by two) - forces_trans = force_partitioning * forces_trans - - # divide the sum of forces by the mass of the bead to get the weighted forces - mass = bead.total_mass() - - # Check that mass is positive to avoid division by 0 or negative values inside sqrt - if mass <= 0: - raise ValueError( - f"Invalid mass value: {mass}. Mass must be positive to compute the square " - f"root." - ) - - weighted_force = forces_trans / np.sqrt(mass) - - logger.debug(f"Weighted Force: {weighted_force}") - - return weighted_force - - -def get_weighted_torques(data_container, bead, rot_axes, force_partitioning=0.5): - """ - Function to calculate the moment of inertia weighted torques for a given bead. - - This function computes torques in a rotated frame and then weights them using - the moment of inertia tensor. To prevent numerical instability, it treats - extremely small diagonal elements of the moment of inertia tensor as zero - (since values below machine precision are effectively zero). This avoids - unnecessary use of extended precision (e.g., float128). - - Additionally, if the computed torque is already zero, the function skips - the division step, reducing unnecessary computations and potential errors. - - Parameters - ---------- - data_container : object - Contains atomic positions and forces. - bead : object - The part of the molecule to be considered. - rot_axes : np.ndarray - The axes relative to which the forces and coordinates are located. - force_partitioning : float, optional - Factor to adjust force contributions, default is 0.5. - - Returns - ------- - np.ndarray - The mass-weighted sum of the torques in the bead. - """ - - torques = np.zeros((3,)) - weighted_torque = np.zeros((3,)) - - for atom in bead.atoms: - - # update local coordinates in rotational axes - coords_rot = data_container.atoms[atom.index].position - bead.center_of_mass() - coords_rot = np.matmul(rot_axes, coords_rot) - # update local forces in rotational frame - forces_rot = np.matmul(rot_axes, data_container.atoms[atom.index].force) - - # multiply by the force_partitioning parameter to avoid double counting - # of the forces on weakly correlated atoms - # the default value of force_partitioning is 0.5 (dividing by two) - forces_rot = force_partitioning * forces_rot - - # define torques (cross product of coordinates and forces) in rotational axes - torques_local = np.cross(coords_rot, forces_rot) - torques += torques_local - - # divide by moment of inertia to get weighted torques - # moment of inertia is a 3x3 tensor - # the weighting is done in each dimension (x,y,z) using the diagonal elements of - # the moment of inertia tensor - moment_of_inertia = bead.moment_of_inertia() - - for dimension in range(3): - # Skip calculation if torque is already zero - if np.isclose(torques[dimension], 0): - weighted_torque[dimension] = 0 - continue - - # Check for zero moment of inertia - if np.isclose(moment_of_inertia[dimension, dimension], 0): - raise ZeroDivisionError( - f"Attempted to divide by zero moment of inertia in dimension " - f"{dimension}." - ) - - # Check for negative moment of inertia - if moment_of_inertia[dimension, dimension] < 0: - raise ValueError( - f"Negative value encountered for moment of inertia: " - f"{moment_of_inertia[dimension, dimension]} " - f"Cannot compute weighted torque." - ) - - # Compute weighted torque - weighted_torque[dimension] = torques[dimension] / np.sqrt( - moment_of_inertia[dimension, dimension] - ) - - logger.debug(f"Weighted Torque: {weighted_torque}") - - return weighted_torque - - -def create_submatrix(data_i, data_j, number_frames): - """ - Function for making covariance matrices. - - Input - ----- - data_i : values for bead i - data_j : valuees for bead j - - Output - ------ - submatrix : 3x3 matrix for the covariance between i and j - """ - - # Start with 3 by 3 matrix of zeros - submatrix = np.zeros((3, 3)) - - # For each frame calculate the outer product (cross product) of the data from the - # two beads and add the result to the submatrix - for frame in range(number_frames): - outer_product_matrix = np.outer(data_i[frame], data_j[frame]) - submatrix = np.add(submatrix, outer_product_matrix) - - # Divide by the number of frames to get the average - submatrix /= number_frames - - logger.debug(f"Submatrix: {submatrix}") - - return submatrix - - -def filter_zero_rows_columns(arg_matrix, verbose): - """ - function for removing rows and columns that contain only zeros from a matrix - - Input - ----- - arg_matrix : matrix - - Output - ------ - arg_matrix : the reduced size matrix - """ - - # record the initial size - init_shape = np.shape(arg_matrix) - - zero_indices = list( - filter( - lambda row: np.all(np.isclose(arg_matrix[row, :], 0.0)), - np.arange(np.shape(arg_matrix)[0]), - ) - ) - all_indices = np.ones((np.shape(arg_matrix)[0]), dtype=bool) - all_indices[zero_indices] = False - arg_matrix = arg_matrix[all_indices, :] - - all_indices = np.ones((np.shape(arg_matrix)[1]), dtype=bool) - zero_indices = list( - filter( - lambda col: np.all(np.isclose(arg_matrix[:, col], 0.0)), - np.arange(np.shape(arg_matrix)[1]), - ) - ) - all_indices[zero_indices] = False - arg_matrix = arg_matrix[:, all_indices] - - # get the final shape - final_shape = np.shape(arg_matrix) - - if init_shape != final_shape: - logger.debug( - "A shape change has occurred ({},{}) -> ({}, {})".format( - *init_shape, *final_shape - ) - ) - - logger.debug(f"arg_matrix: {arg_matrix}") - - return arg_matrix diff --git a/CodeEntropy/calculations/LevelFunctions.py b/CodeEntropy/calculations/LevelFunctions.py deleted file mode 100644 index a837e9f..0000000 --- a/CodeEntropy/calculations/LevelFunctions.py +++ /dev/null @@ -1,229 +0,0 @@ -# import MDAnalysis as mda -import logging - -import numpy as np - -from CodeEntropy.calculations import GeometricFunctions as GF - -logger = logging.getLogger(__name__) - - -def select_levels(data_container, verbose): - """ - Function to read input system and identify the number of molecules and the levels - (i.e. united atom, residue and/or polymer) that should be used. - The level refers to the size of the bead (atom or collection of atoms) that will be - used in the entropy calculations. - - Input - ----- - arg_DataContainer : MDAnalysis universe object containing the system of interest - - Returns - ------- - number_molecules : integer - levels : array of strings for each molecule - """ - - # fragments is MDAnalysis terminology for what chemists would call molecules - number_molecules = len(data_container.atoms.fragments) - logger.debug("The number of molecules is {}.".format(number_molecules)) - fragments = data_container.atoms.fragments - levels = [[] for _ in range(number_molecules)] - - for molecule in range(number_molecules): - levels[molecule].append("united_atom") # every molecule has at least one atom - - atoms_in_fragment = fragments[molecule].select_atoms("not name H*") - number_residues = len(atoms_in_fragment.residues) - - # if a fragment has more than one heavy atom assign residue level - if len(atoms_in_fragment) > 1: - levels[molecule].append("residue") - - # if assigned residue level and there is more than one residue assign - # polymer level - if number_residues > 1: - levels[molecule].append("polymer") - - logger.debug(f"levels {levels}") - - return number_molecules, levels - - -def get_matrices( - data_container, level, verbose, start, end, step, number_frames, highest_level -): - """ - Function to create the force matrix needed for the transvibrational entropy - calculation and the torque matrix for the rovibrational entropy calculation. - - Input - ----- - data_container : MDAnalysis universe type with the information on the - molecule of interest. - level : string, which of the polymer, residue, or united atom levels - are the matrices for. - verbose : true/false, controlls how much is printed - start : int, starting frame, default 0 (first frame) - end : int, ending frame, default -1 (last frame) - step : int, step for going through trajectories, default 1 - - Returns - ------- - force_matrix : force covariance matrix for transvibrational entropy - torque_matrix : torque convariance matrix for rovibrational entropy - """ - - # Make beads - list_of_beads = GF.get_beads(data_container, level) - - # number of beads and frames in trajectory - number_beads = len(list_of_beads) - - # initialize force and torque arrays - weighted_forces = [[0 for x in range(number_frames)] for y in range(number_beads)] - weighted_torques = [[0 for x in range(number_frames)] for y in range(number_beads)] - - # Calculate forces/torques for each bead - for bead_index in range(number_beads): - for timestep in data_container.trajectory[start:end:step]: - # Set up axes - # translation and rotation use different axes - # how the axes are defined depends on the level - trans_axes, rot_axes = GF.get_axes(data_container, level, bead_index) - - # Sort out coordinates, forces, and torques for each atom in the bead - timestep_index = timestep.frame - start - weighted_forces[bead_index][timestep_index] = GF.get_weighted_forces( - data_container, list_of_beads[bead_index], trans_axes, highest_level - ) - weighted_torques[bead_index][timestep_index] = GF.get_weighted_torques( - data_container, list_of_beads[bead_index], rot_axes - ) - - # Make covariance matrices - looping over pairs of beads - # list of pairs of indices - pair_list = [(i, j) for i in range(number_beads) for j in range(number_beads)] - - force_submatrix = [[0 for x in range(number_beads)] for y in range(number_beads)] - torque_submatrix = [[0 for x in range(number_beads)] for y in range(number_beads)] - - for i, j in pair_list: - # for each pair of beads - # reducing effort because the matrix for [i][j] is the transpose of the one for - # [j][i] - if i <= j: - # calculate the force covariance segment of the matrix - force_submatrix[i][j] = GF.create_submatrix( - weighted_forces[i], weighted_forces[j], number_frames - ) - force_submatrix[j][i] = np.transpose(force_submatrix[i][j]) - - # calculate the torque covariance segment of the matrix - torque_submatrix[i][j] = GF.create_submatrix( - weighted_torques[i], weighted_torques[j], number_frames - ) - torque_submatrix[j][i] = np.transpose(torque_submatrix[i][j]) - - # Tidy up - # use np.block to make submatrices into one matrix - force_matrix = np.block( - [ - [force_submatrix[i][j] for j in range(number_beads)] - for i in range(number_beads) - ] - ) - - torque_matrix = np.block( - [ - [torque_submatrix[i][j] for j in range(number_beads)] - for i in range(number_beads) - ] - ) - - # fliter zeros to remove any rows/columns that are all zero - force_matrix = GF.filter_zero_rows_columns(force_matrix, verbose) - torque_matrix = GF.filter_zero_rows_columns(torque_matrix, verbose) - - logger.debug(f"Force Matrix: {force_matrix}") - logger.debug(f"Torque Matrix: {torque_matrix}") - - return force_matrix, torque_matrix - - -def get_dihedrals(data_container, level): - """ - Define the set of dihedrals for use in the conformational entropy function. - If residue level, the dihedrals are defined from the atoms - (4 bonded atoms for 1 dihedral). - If polymer level, use the bonds between residues to cast dihedrals. - Note: not using improper dihedrals only ones with 4 atoms/residues - in a linear arrangement. - - Input - ----- - data_container : system information - level : level of the hierarchy (should be residue or polymer) - - Output - ------ - dihedrals : set of dihedrals - """ - # Start with empty array - dihedrals = [] - - # if united atom level, read dihedrals from MDAnalysis universe - if level == "united_atom": - dihedrals = data_container.dihedrals - - # if residue level, looking for dihedrals involving residues - if level == "residue": - num_residues = len(data_container.residues) - if num_residues < 4: - logger.debug("no residue level dihedrals") - - else: - # find bonds between residues N-3:N-2 and N-1:N - for residue in range(4, num_residues + 1): - # Using MDAnalysis selection, - # assuming only one covalent bond between neighbouring residues - # TODO not written for branched polymers - atom_string = ( - "resindex " - + str(residue - 4) - + " and bonded resindex " - + str(residue - 3) - ) - atom1 = data_container.select_atoms(atom_string) - - atom_string = ( - "resindex " - + str(residue - 3) - + " and bonded resindex " - + str(residue - 4) - ) - atom2 = data_container.select_atoms(atom_string) - - atom_string = ( - "resindex " - + str(residue - 2) - + " and bonded resindex " - + str(residue - 1) - ) - atom3 = data_container.select_atoms(atom_string) - - atom_string = ( - "resindex " - + str(residue - 1) - + " and bonded resindex " - + str(residue - 2) - ) - atom4 = data_container.select_atoms(atom_string) - - atom_group = atom1 + atom2 + atom3 + atom4 - dihedrals.append(atom_group.dihedral) - - logger.debug(f"Dihedrals: {dihedrals}") - - return dihedrals diff --git a/CodeEntropy/calculations/MDAUniverseHelper.py b/CodeEntropy/calculations/MDAUniverseHelper.py deleted file mode 100644 index 6a75fed..0000000 --- a/CodeEntropy/calculations/MDAUniverseHelper.py +++ /dev/null @@ -1,129 +0,0 @@ -import logging -import pickle - -import MDAnalysis as mda -from MDAnalysis.analysis.base import AnalysisFromFunction -from MDAnalysis.coordinates.memory import MemoryReader - -logger = logging.getLogger(__name__) - - -def new_U_select_frame(u, start=None, end=None, step=1): - """Create a reduced universe by dropping frames according to user selection - - Parameters - ---------- - u : MDAnalyse.Universe - A Universe object will all topology, dihedrals,coordinates and force information - start : int or None, Optional, default: None - Frame id to start analysis. Default None will start from frame 0 - end : int or None, Optional, default: None - Frame id to end analysis. Default None will end at last frame - step : int, Optional, default: 1 - Steps between frame. - - Returns - ------- - u2 : MDAnalysis.Universe - reduced universe - """ - if start is None: - start = 0 - if end is None: - end = len(u.trajectory) - select_atom = u.select_atoms("all", updating=True) - coordinates = ( - AnalysisFromFunction(lambda ag: ag.positions.copy(), select_atom) - .run() - .results["timeseries"][start:end:step] - ) - forces = ( - AnalysisFromFunction(lambda ag: ag.forces.copy(), select_atom) - .run() - .results["timeseries"][start:end:step] - ) - dimensions = ( - AnalysisFromFunction(lambda ag: ag.dimensions.copy(), select_atom) - .run() - .results["timeseries"][start:end:step] - ) - u2 = mda.Merge(select_atom) - u2.load_new(coordinates, format=MemoryReader, forces=forces, dimensions=dimensions) - logger.debug(f"MDAnalysis.Universe - reduced universe: {u2}") - return u2 - - -def new_U_select_atom(u, select_string="all"): - """Create a reduced universe by dropping atoms according to user selection - - Parameters - ---------- - u : MDAnalyse.Universe - A Universe object will all topology, dihedrals,coordinates and force information - select_string : str, Optional, default: 'all' - MDAnalysis.select_atoms selection string. - - Returns - ------- - u2 : MDAnalysis.Universe - reduced universe - - """ - select_atom = u.select_atoms(select_string, updating=True) - coordinates = ( - AnalysisFromFunction(lambda ag: ag.positions.copy(), select_atom) - .run() - .results["timeseries"] - ) - forces = ( - AnalysisFromFunction(lambda ag: ag.forces.copy(), select_atom) - .run() - .results["timeseries"] - ) - dimensions = ( - AnalysisFromFunction(lambda ag: ag.dimensions.copy(), select_atom) - .run() - .results["timeseries"] - ) - u2 = mda.Merge(select_atom) - u2.load_new(coordinates, format=MemoryReader, forces=forces, dimensions=dimensions) - logger.debug(f"MDAnalysis.Universe - reduced universe: {u2}") - return u2 - - -def write_universe(u, name="default"): - """Write a universe to working directories as pickle - - Parameters - ---------- - u : MDAnalyse.Universe - A Universe object will all topology, dihedrals,coordinates and force information - name : str, Optional. default: 'default' - The name of file with sub file name .pkl - - Returns - ------- - name : str - filename of saved universe - """ - filename = f"{name}.pkl" - pickle.dump(u, open(filename, "wb")) - return name - - -def read_universe(path): - """read a universe to working directories as pickle - - Parameters - ---------- - path : str - The path to file. - - Returns - ------- - u : MDAnalysis.Universe - A Universe object will all topology, dihedrals,coordinates and force - information. - """ - u = pickle.load(open(path, "rb")) - return u diff --git a/CodeEntropy/calculations/NeighbourFunctions.py b/CodeEntropy/calculations/NeighbourFunctions.py deleted file mode 100644 index d612f63..0000000 --- a/CodeEntropy/calculations/NeighbourFunctions.py +++ /dev/null @@ -1,96 +0,0 @@ -# import os -import logging -import sys - -# import matplotlib.pyplot as plt -import MDAnalysis as mda -import numpy as np - -logger = logging.getLogger(__name__) - - -# from ast import arg - - -# from numpy import linalg as la - -# import math - -# from CodeEntropy import CustomFunctions as CF -# from CodeEntropy import GeometricFunctions as GF -# from CodeEntropy import UnitsAndConversions as UAC -# from CodeEntropy import LevelFunctions as LF -# from CodeEntropy import Utils -# from CodeEntropy import Writer -# import multiprocessing as mp -# from functools import partial - - -# import pandas as pd -# from MDAnalysis.analysis import distances - -np.set_printoptions(threshold=sys.maxsize) - - -def get_neighbours(molecule_i, reduced_atom): - """ - Molecules in the coordination shell are obtained using the relative angular distance - (RAD) algorithm, as presented in J.Higham and R. H. Henchman, J. Chem. Phys., 2016, - 145, 084108. - - Input - ----- - molecule: DataContainer type with the information on the molecule of interest for - which are finding the neighbours - reduced_atom: MDAnalysis universe object containing the system of interest - Returns - ------- - neighbours_dict : dictionary - dictionary of neighbours for the molecule - should - contain the type of neighbour molecule and the number of neighbour - molecules of that species - neighbours_array: array containing all neighbouring molecules - """ - number_molecules = len(reduced_atom.atoms.fragments) - neighbours_dict = {} - neighbours_array = [] - # go through molecules to see if they're in the coordination shell of the molecule - # of interest - for j in range(0, number_molecules): - molecule_j = reduced_atom.atoms.fragments[j] - # to avoid adding a molecule as its own neighbour - if molecule_i != molecule_j: - # we have already found a neighbour - if len(neighbours_array) > 0: - for molecule_k in ( - neighbours_array - # check if every closer molecule that is already in the neighbour - # list is unblocked - ): - # distance between the molecule of interest and the molecule - # that we check if it's a neighbour - r_ij = mda.analysis.distances.dist(molecule_i, molecule_j) - # distance between the molecule of interest and the neighbours - # already identified - r_ik = mda.analysis.distances.dist(molecule_i, molecule_k) - r_jk = mda.analysis.distances.dist(molecule_j, molecule_k) - cos_theta_jik = (r_ij**2 + r_ik**2 - r_jk**2) / ( - 2 * r_ij * r_ik - ) # cosine of the angle subintended at i by j and k - if (r_ij) ** (-2) < (r_ik) ** (-2) * cos_theta_jik: - break # j is not a neighbour as it is blocked by k - else: - neighbours_array.append(molecule_j.atoms.resids[0]) - if molecule_j.smth in neighbours_dict.keys(): - neighbours_dict[molecule_j.atoms.resnames[0]] += 1 - else: - neighbours_dict[molecule_j.atoms.resnames[0]] = 1 - else: # first neighbour - # r_ij = mda.analysis.distances.dist(molecule_i, molecule_j) - # print(r_ij) - neighbours_array.append(molecule_j.atoms.resids[0]) - neighbours_dict[molecule_j.atoms.resnames[0]] = 1 - - logger.debug(f"Neighbours dictionary: {neighbours_dict}") - logger.debug(f"Neighbours array: {neighbours_array}") - - return neighbours_dict, neighbours_array diff --git a/CodeEntropy/calculations/UnitsAndConversions.py b/CodeEntropy/calculations/UnitsAndConversions.py deleted file mode 100644 index c24a1cf..0000000 --- a/CodeEntropy/calculations/UnitsAndConversions.py +++ /dev/null @@ -1,40 +0,0 @@ -# TEMPERATURE -DEF_TEMPER = 298 # K - -# PHYSICAL CONSTANTS -N_AVOGADRO = 6.0221415e23 -GAS_CONST = 8.3144598484848 # J/mol/K -PLANCK_CONST = 6.62607004081818e-34 # J s -C_LIGHT = 29979245800 # cm/s - -# LENGTH -NM2ANG = 1e1 -M2ANG = 1e10 -ANG2M = 1e-10 -NM2M = 1e-9 -ANG2NM = 1e-1 - - -# MASS -AMU2KG = 1.66053892110e-27 -KG2AMU = 6.022141289754078e26 -sqrtKG2AMU = 24540051527562.199 - -# ENERGY -KJ2KCAL = 0.239006 -KCAL2KJ = 4.184 - -# CHARMM units -AKMA2PS = 4.88882129e-02 - - -def change_lambda_units(arg_lambdas): - """Unit of lambdas : kJ2 mol-2 A-2 amu-1 - change units of lambda to J/s2""" - # return arg_lambdas * N_AVOGADRO * N_AVOGADRO * AMU2KG * 1e-26 - return arg_lambdas * 1e29 / N_AVOGADRO - - -def get_KT2J(arg_temper): - """A temperature dependent KT to Joule conversion""" - return 4.11e-21 * arg_temper / DEF_TEMPER diff --git a/CodeEntropy/config/__init__.py b/CodeEntropy/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/CodeEntropy/entropy.py b/CodeEntropy/entropy.py new file mode 100644 index 0000000..d4975cc --- /dev/null +++ b/CodeEntropy/entropy.py @@ -0,0 +1,718 @@ +import logging +import math + +import numpy as np +import pandas as pd +from numpy import linalg as la + +logger = logging.getLogger(__name__) + + +class EntropyManager: + """ + Manages entropy calculations at multiple molecular levels, based on a + molecular dynamics trajectory. + """ + + def __init__(self, run_manager, args, universe, data_logger, level_manager): + """ + Initializes the EntropyManager with required components. + + Args: + run_manager: Manager for universe and selection operations. + args: Argument namespace containing user parameters. + universe: MDAnalysis universe representing the simulation system. + data_logger: Logger for storing and exporting entropy data. + level_manager: Provides level-specific data such as matrices and dihedrals. + """ + self._run_manager = run_manager + self._args = args + self._universe = universe + self._data_logger = data_logger + self._level_manager = level_manager + self._GAS_CONST = 8.3144598484848 + + self._results_df = pd.DataFrame( + columns=["Molecule ID", "Level", "Type", "Result"] + ) + self._residue_results_df = pd.DataFrame( + columns=["Molecule ID", "Residue", "Type", "Result"] + ) + + @property + def results_df(self): + """Returns the dataframe containing entropy results at all levels.""" + return self._results_df + + @property + def residue_results_df(self): + """ + Returns the dataframe containing united-atom level results for each residue. + """ + return self._residue_results_df + + def execute(self): + """ + Executes the full entropy computation workflow over selected molecules and + levels. This includes both vibrational and conformational entropy, recorded + per molecule and residue. + """ + start, end, step = self._get_trajectory_bounds() + number_frames = self._get_number_frames(start, end, step) + reduced_atom = self._get_reduced_universe() + number_molecules, levels = self._level_manager.select_levels(reduced_atom) + + ve = VibrationalEntropy( + self._run_manager, + self._args, + self._universe, + self._data_logger, + self._level_manager, + ) + ce = ConformationalEntropy( + self._run_manager, + self._args, + self._universe, + self._data_logger, + self._level_manager, + ) + + for molecule_id in range(number_molecules): + mol_container = self._get_molecule_container(reduced_atom, molecule_id) + + for level in levels[molecule_id]: + highest_level = level == levels[molecule_id][-1] + if level == "united_atom": + self._process_united_atom_level( + molecule_id, + mol_container, + ve, + ce, + level, + start, + end, + step, + number_frames, + highest_level, + ) + elif level in ("polymer", "residue"): + self._process_vibrational_only_levels( + molecule_id, + mol_container, + ve, + level, + start, + end, + step, + number_frames, + highest_level, + ) + if level == "residue": + self._process_conformational_residue_level( + molecule_id, + mol_container, + ce, + level, + start, + end, + step, + number_frames, + ) + + self._finalize_molecule_results(molecule_id, level) + + self._data_logger.log_tables() + + def _get_trajectory_bounds(self): + """ + Returns the start, end, and step frame indices based on input arguments. + + Returns: + Tuple of (start, end, step) frame indices. + """ + start = self._args.start or 0 + end = self._args.end or -1 + step = self._args.step or 1 + + return start, end, step + + def _get_number_frames(self, start, end, step): + """ + Calculates the total number of trajectory frames used in the calculation. + + Args: + start (int): Start frame index. + end (int): End frame index. If -1, it refers to the end of the trajectory. + step (int): Frame step size. + + Returns: + int: Total number of frames considered. + """ + trajectory_length = len(self._universe.trajectory) + + if start == 0 and end == -1 and step == 1: + return trajectory_length + + if end == -1: + end = trajectory_length + else: + end += 1 + + return math.floor((end - start) / step) + + def _get_reduced_universe(self): + """ + Applies atom selection based on the user's input. + + Returns: + MDAnalysis.Universe: Selected subset of the system. + """ + if self._args.selection_string == "all": + return self._universe + reduced = self._run_manager.new_U_select_atom( + self._universe, self._args.selection_string + ) + name = f"{len(reduced.trajectory)}_frame_dump_atom_selection" + self._run_manager.write_universe(reduced, name) + return reduced + + def _get_molecule_container(self, universe, molecule_id): + """ + Extracts the atom group corresponding to a single molecule from the universe. + + Args: + universe (MDAnalysis.Universe): The reduced universe. + molecule_id (int): Index of the molecule to extract. + + Returns: + MDAnalysis.Universe: Universe containing only the selected molecule. + """ + frag = universe.atoms.fragments[molecule_id] + selection_string = f"index {frag.indices[0]}:{frag.indices[-1]}" + return self._run_manager.new_U_select_atom(universe, selection_string) + + def _process_united_atom_level( + self, mol_id, mol_container, ve, ce, level, start, end, step, n_frames, highest + ): + """ + Calculates translational, rotational, and conformational entropy at the + united-atom level. + + Args: + mol_id (int): ID of the molecule. + mol_container (Universe): Universe for the selected molecule. + ve: VibrationalEntropy object. + ce: ConformationalEntropy object. + level (str): Granularity level (should be 'united_atom'). + start, end, step (int): Trajectory frame parameters. + n_frames (int): Number of trajectory frames. + highest (bool): Whether this is the highest level of resolution for + the molecule. + """ + bin_width = self._args.bin_width + S_trans, S_rot, S_conf = 0, 0, 0 + for residue_id, residue in enumerate(mol_container.residues): + res_container = self._run_manager.new_U_select_atom( + mol_container, + f"index {residue.atoms.indices[0]}:{residue.atoms.indices[-1]}", + ) + heavy_res = self._run_manager.new_U_select_atom( + res_container, "not name H*" + ) + + force_matrix, torque_matrix = self._level_manager.get_matrices( + res_container, level, start, end, step, n_frames, highest + ) + + S_trans_res = ve.vibrational_entropy_calculation( + force_matrix, "force", self._args.temperature, highest + ) + S_rot_res = ve.vibrational_entropy_calculation( + torque_matrix, "torque", self._args.temperature, highest + ) + + dihedrals = self._level_manager.get_dihedrals(heavy_res, level) + S_conf_res = ce.conformational_entropy_calculation( + heavy_res, dihedrals, bin_width, start, end, step, n_frames + ) + + S_trans += S_trans_res + S_rot += S_rot_res + S_conf += S_conf_res + + self._log_residue_data(mol_id, residue_id, "Transvibrational", S_trans_res) + self._log_residue_data(mol_id, residue_id, "Rovibrational", S_rot_res) + self._log_residue_data(mol_id, residue_id, "Conformational", S_conf_res) + + self._log_result(mol_id, level, "Transvibrational", S_trans) + self._log_result(mol_id, level, "Rovibrational", S_rot) + self._log_result(mol_id, level, "Conformational", S_conf) + + def _process_vibrational_only_levels( + self, mol_id, mol_container, ve, level, start, end, step, n_frames, highest + ): + """ + Calculates vibrational entropy at levels where conformational entropy is + not considered. + + Args: + mol_id (int): Molecule ID. + mol_container (Universe): Selected molecule's universe. + ve: VibrationalEntropy object. + level (str): Current granularity level ('polymer' or 'residue'). + start, end, step (int): Trajectory frame parameters. + n_frames (int): Number of trajectory frames. + highest (bool): Flag indicating if this is the highest granularity + level. + """ + force_matrix, torque_matrix = self._level_manager.get_matrices( + mol_container, level, start, end, step, n_frames, highest + ) + S_trans = ve.vibrational_entropy_calculation( + force_matrix, "force", self._args.temperature, highest + ) + S_rot = ve.vibrational_entropy_calculation( + torque_matrix, "torque", self._args.temperature, highest + ) + + self._log_result(mol_id, level, "Transvibrational", S_trans) + self._log_result(mol_id, level, "Rovibrational", S_rot) + + def _process_conformational_residue_level( + self, mol_id, mol_container, ce, level, start, end, step, n_frames + ): + """ + Computes conformational entropy at the residue level (whole-molecule dihedral + analysis). + + Args: + mol_id (int): ID of the molecule. + mol_container (Universe): Selected molecule's universe. + ce: ConformationalEntropy object. + level (str): Level name (should be 'residue'). + start, end, step (int): Frame bounds. + n_frames (int): Number of frames used. + """ + bin_width = self._args.bin_width + dihedrals = self._level_manager.get_dihedrals(mol_container, level) + S_conf = ce.conformational_entropy_calculation( + mol_container, dihedrals, bin_width, start, end, step, n_frames + ) + self._log_result(mol_id, level, "Conformational", S_conf) + + def _finalize_molecule_results(self, mol_id, level): + """ + Summarizes entropy for a molecule and saves results to file. + + Args: + mol_id (int): ID of the molecule. + level (str): Current level name (used for tagging final results). + """ + S_total = self._results_df[self._results_df["Molecule ID"] == mol_id][ + "Result" + ].sum() + self._log_result(mol_id, "Molecule Total", "Molecule Total Entropy", S_total) + self._data_logger.save_dataframes_as_json( + self._results_df, self._residue_results_df, self._args.output_file + ) + + def _log_result(self, mol_id, level, entropy_type, value): + """ + Logs and stores a single entropy value in the global results dataframe. + + Args: + mol_id (int): Molecule ID. + level (str): Entropy level or type. + entropy_type (str): Type of entropy (e.g., 'Transvibrational'). + value (float): Entropy value. + """ + row = pd.DataFrame( + { + "Molecule ID": [mol_id], + "Level": [level], + "Type": [f"{entropy_type} (J/mol/K)"], + "Result": [value], + } + ) + self._results_df = pd.concat([self._results_df, row], ignore_index=True) + self._data_logger.add_results_data(mol_id, level, entropy_type, value) + + def _log_residue_data(self, mol_id, residue_id, entropy_type, value): + """ + Logs and stores per-residue entropy data. + + Args: + mol_id (int): Molecule ID. + residue_id (int): Residue index within the molecule. + entropy_type (str): Entropy category. + value (float): Entropy value. + """ + row = pd.DataFrame( + { + "Molecule ID": [mol_id], + "Residue": [residue_id], + "Type": [f"{entropy_type} (J/mol/K)"], + "Result": [value], + } + ) + self._residue_results_df = pd.concat( + [self._residue_results_df, row], ignore_index=True + ) + self._data_logger.add_residue_data(mol_id, residue_id, entropy_type, value) + + +class VibrationalEntropy(EntropyManager): + """ + Performs vibrational entropy calculations using molecular trajectory data. + Extends the base EntropyManager with constants and logic specific to + vibrational modes and thermodynamic properties. + """ + + def __init__(self, run_manager, args, universe, data_logger, level_manager): + """ + Initializes the VibrationalEntropy manager with all required components and + defines physical constants used in vibrational entropy calculations. + """ + super().__init__(run_manager, args, universe, data_logger, level_manager) + self._PLANCK_CONST = 6.62607004081818e-34 + + def frequency_calculation(self, lambdas, temp): + """ + Function to calculate an array of vibrational frequencies from the eigenvalues + of the covariance matrix. + + Calculated from eq. (3) in Higham, S.-Y. Chou, F. Gräter and R. H. Henchman, + Molecular Physics, 2018, 116, 1965–1976//eq. (3) in A. Chakravorty, J. Higham + and R. H. Henchman, J. Chem. Inf. Model., 2020, 60, 5540–5551 + + frequency=sqrt(λ/kT)/2π + + Input + ----- + lambdas : array of floats - eigenvalues of the covariance matrix + temp: float - temperature + + Returns + ------- + frequencies : array of floats - corresponding vibrational frequencies + """ + pi = np.pi + # get kT in Joules from given temperature + kT = self._run_manager.get_KT2J(temp) + logger.debug(f"Temperature: {temp}, kT: {kT}") + + lambdas = np.array(lambdas) # Ensure input is a NumPy array + logger.debug(f"Eigenvalues (lambdas): {lambdas}") + + # Check for negatives and raise an error if any are found + if np.any(lambdas < 0): + logger.error(f"Negative eigenvalues encountered: {lambdas[lambdas < 0]}") + raise ValueError( + f"Negative eigenvalues encountered: {lambdas[lambdas < 0]}" + ) + + # Compute frequencies safely + frequencies = 1 / (2 * pi) * np.sqrt(lambdas / kT) + logger.debug(f"Calculated frequencies: {frequencies}") + + return frequencies + + def vibrational_entropy_calculation(self, matrix, matrix_type, temp, highest_level): + """ + Function to calculate the vibrational entropy for each level calculated from + eq. (4) in J. Higham, S.-Y. Chou, F. Gräter and R. H. Henchman, Molecular + Physics, 2018, 116, 1965–1976 / eq. (2) in A. Chakravorty, J. Higham and + R. H. Henchman, J. Chem. Inf. Model., 2020, 60, 5540–5551. + + Input + ----- + matrix : matrix - force/torque covariance matrix + matrix_type: string + temp: float - temperature + highest_level: bool - is this the highest level of the heirarchy + + Returns + ------- + S_vib_total : float - transvibrational/rovibrational entropy + """ + # N beads at a level => 3N x 3N covariance matrix => 3N eigenvalues + # Get eigenvalues of the given matrix and change units to SI units + lambdas = la.eigvals(matrix) + logger.debug(f"Eigenvalues (lambdas) before unit change: {lambdas}") + + lambdas = self._run_manager.change_lambda_units(lambdas) + logger.debug(f"Eigenvalues (lambdas) after unit change: {lambdas}") + + # Calculate frequencies from the eigenvalues + frequencies = self.frequency_calculation(lambdas, temp) + logger.debug(f"Calculated frequencies: {frequencies}") + + # Sort frequencies lowest to highest + frequencies = np.sort(frequencies) + logger.debug(f"Sorted frequencies: {frequencies}") + + kT = self._run_manager.get_KT2J(temp) + logger.debug(f"Temperature: {temp}, kT: {kT}") + exponent = self._PLANCK_CONST * frequencies / kT + logger.debug(f"Exponent values: {exponent}") + power_positive = np.power(np.e, exponent) + power_negative = np.power(np.e, -exponent) + logger.debug(f"Power positive values: {power_positive}") + logger.debug(f"Power negative values: {power_negative}") + S_components = exponent / (power_positive - 1) - np.log(1 - power_negative) + S_components = ( + S_components * self._GAS_CONST + ) # multiply by R - get entropy in J mol^{-1} K^{-1} + logger.debug(f"Entropy components: {S_components}") + # N beads at a level => 3N x 3N covariance matrix => 3N eigenvalues + if matrix_type == "force": # force covariance matrix + if ( + highest_level + ): # whole molecule level - we take all frequencies into account + S_vib_total = sum(S_components) + + # discard the 6 lowest frequencies to discard translation and rotation of + # the whole unit the overall translation and rotation of a unit is an + # internal motion of the level above + else: + S_vib_total = sum(S_components[6:]) + + else: # torque covariance matrix - we always take all values into account + S_vib_total = sum(S_components) + + logger.debug(f"Total vibrational entropy: {S_vib_total}") + + return S_vib_total + + +class ConformationalEntropy(EntropyManager): + """ + Performs conformational entropy calculations based on molecular dynamics data. + Inherits from EntropyManager and includes constants specific to conformational + analysis using statistical mechanics principles. + """ + + def __init__(self, run_manager, args, universe, data_logger, level_manager): + """ + Initializes the ConformationalEntropy manager with all required components and + sets the gas constant used in conformational entropy calculations. + """ + super().__init__(run_manager, args, universe, data_logger, level_manager) + + def assign_conformation( + self, data_container, dihedral, number_frames, bin_width, start, end, step + ): + """ + Create a state vector, showing the state in which the input dihedral is + as a function of time. The function creates a histogram from the timeseries of + the dihedral angle values and identifies points of dominant occupancy + (called CONVEX TURNING POINTS). + Based on the identified TPs, states are assigned to each configuration of the + dihedral. + + Input + ----- + dihedral_atom_group : the group of 4 atoms defining the dihedral + number_frames : number of frames in the trajectory + bin_width : the width of the histogram bit, default 30 degrees + start : int, starting frame, will default to 0 + end : int, ending frame, will default to -1 (last frame in trajectory) + step : int, spacing between frames, will default to 1 + + Return + ------ + A timeseries with integer labels describing the state at each point in time. + + """ + conformations = np.zeros(number_frames) + phi = np.zeros(number_frames) + + # get the values of the angle for the dihedral + # dihedral angle values have a range from -180 to 180 + for timestep in data_container.trajectory[start:end:step]: + timestep_index = timestep.frame - start + value = dihedral.value() + # we want postive values in range 0 to 360 to make the peak assignment + # work using the fact that dihedrals have circular symetry + # (i.e. -15 degrees = +345 degrees) + if value < 0: + value += 360 + phi[timestep_index] = value + + # create a histogram using numpy + number_bins = int(360 / bin_width) + popul, bin_edges = np.histogram(a=phi, bins=number_bins, range=(0, 360)) + bin_value = [ + 0.5 * (bin_edges[i] + bin_edges[i + 1]) for i in range(0, len(popul)) + ] + + # identify "convex turning-points" and populate a list of peaks + # peak : a bin whose neighboring bins have smaller population + # NOTE might have problems if the peak is wide with a flat or sawtooth top + peak_values = [] + + for bin_index in range(number_bins): + # if there is no dihedrals in a bin then it cannot be a peak + if popul[bin_index] == 0: + pass + # being careful of the last bin + # (dihedrals have circular symmetry, the histogram does not) + elif ( + bin_index == number_bins - 1 + ): # the -1 is because the index starts with 0 not 1 + if ( + popul[bin_index] >= popul[bin_index - 1] + and popul[bin_index] >= popul[0] + ): + peak_values.append(bin_value[bin_index]) + else: + if ( + popul[bin_index] >= popul[bin_index - 1] + and popul[bin_index] >= popul[bin_index + 1] + ): + peak_values.append(bin_value[bin_index]) + + # go through each frame again and assign conformation state + for frame in range(number_frames): + # find the TP that the snapshot is least distant from + distances = [abs(phi[frame] - peak) for peak in peak_values] + conformations[frame] = np.argmin(distances) + + logger.debug(f"Final conformations: {conformations}") + + return conformations + + def conformational_entropy_calculation( + self, data_container, dihedrals, bin_width, start, end, step, number_frames + ): + """ + Function to calculate conformational entropies using eq. (7) in Higham, + S.-Y. Chou, F. Gräter and R. H. Henchman, Molecular Physics, 2018, 116, + 1965–1976 / eq. (4) in A. Chakravorty, J. Higham and R. H. Henchman, + J. Chem. Inf. Model., 2020, 60, 5540–5551. + + Uses the adaptive enumeration method (AEM). + + Input + ----- + dihedrals : array - array of dihedrals in the molecule + Returns + ------- + S_conf_total : float - conformational entropy + """ + + S_conf_total = 0 + + # For each dihedral, identify the conformation in each frame + num_dihedrals = len(dihedrals) + conformation = np.zeros((num_dihedrals, number_frames)) + index = 0 + for dihedral in dihedrals: + conformation[index] = self.assign_conformation( + data_container, dihedral, number_frames, bin_width, start, end, step + ) + index += 1 + + logger.debug(f"Conformation matrix: {conformation}") + + # For each frame, convert the conformation of all dihedrals into a + # state string + states = ["" for x in range(number_frames)] + for frame_index in range(number_frames): + for index in range(num_dihedrals): + states[frame_index] += str(conformation[index][frame_index]) + + logger.debug(f"States: {states}") + + # Count how many times each state occurs, then use the probability + # to get the entropy + # entropy = sum over states p*ln(p) + values, counts = np.unique(states, return_counts=True) + for state in range(len(values)): + logger.debug(f"Unique states: {values}") + logger.debug(f"Counts: {counts}") + count = counts[state] + probability = count / number_frames + entropy = probability * np.log(probability) + S_conf_total += entropy + + # multiply by gas constant to get the units J/mol/K + S_conf_total *= -1 * self._GAS_CONST + + logger.debug(f"Total conformational entropy: {S_conf_total}") + + return S_conf_total + + +class OrientationalEntropy(EntropyManager): + """ + Performs orientational entropy calculations using molecular dynamics data. + Inherits from EntropyManager and includes constants relevant to rotational + and orientational degrees of freedom. + """ + + def __init__(self, run_manager, args, universe, data_logger, level_manager): + """ + Initializes the OrientationalEntropy manager with all required components and + sets the gas constant used in orientational entropy calculations. + """ + super().__init__(run_manager, args, universe, data_logger, level_manager) + + def orientational_entropy_calculation(self, neighbours_dict): + """ + Function to calculate orientational entropies from eq. (10) in J. Higham, + S.-Y. Chou, F. Gräter and R. H. Henchman, Molecular Physics, 2018, 116, + 3 1965–1976. Number of orientations, Ω, is calculated using eq. (8) in + J. Higham, S.-Y. Chou, F. Gräter and R. H. Henchman, Molecular Physics, + 2018, 116, 3 1965–1976. + + σ is assumed to be 1 for the molecules we're concerned with and hence, + max {1, (Nc^3*π)^(1/2)} will always be (Nc^3*π)^(1/2). + + TODO future release - function for determing symmetry and symmetry numbers + maybe? + + Input + ----- + neighbours_dict : dictionary - dictionary of neighbours for the molecule - + should contain the type of neighbour molecule and the number of neighbour + molecules of that species + + Returns + ------- + S_or_total : float - orientational entropy + """ + + # Replaced molecule with neighbour as this is what the for loop uses + S_or_total = 0 + for neighbour in neighbours_dict: # we are going through neighbours + if neighbour in []: # water molecules - call POSEIDON functions + pass # TODO temporary until function is written + else: + # the bound ligand is always going to be a neighbour + omega = np.sqrt((neighbours_dict[neighbour] ** 3) * math.pi) + logger.debug(f"Omega for neighbour {neighbour}: {omega}") + # orientational entropy arising from each neighbouring species + # - we know the species is going to be a neighbour + S_or_component = math.log(omega) + logger.debug( + f"S_or_component (log(omega)) for neighbour {neighbour}: " + f"{S_or_component}" + ) + S_or_component *= self.GAS_CONST + logger.debug( + f"S_or_component after multiplying by GAS_CONST for neighbour " + f"{neighbour}: {S_or_component}" + ) + S_or_total += S_or_component + logger.debug( + f"S_or_total after adding component for neighbour {neighbour}: " + f"{S_or_total}" + ) + # TODO for future releases + # implement a case for molecules with hydrogen bonds but to a lesser + # extent than water + + logger.debug(f"Final total orientational entropy: {S_or_total}") + + return S_or_total diff --git a/CodeEntropy/levels.py b/CodeEntropy/levels.py new file mode 100644 index 0000000..5a8ed4b --- /dev/null +++ b/CodeEntropy/levels.py @@ -0,0 +1,717 @@ +import logging + +import numpy as np + +logger = logging.getLogger(__name__) + + +class LevelManager: + """ + Manages the structural and dynamic levels involved in entropy calculations. This + includes selecting relevant levels, computing axes for translation and rotation, + and handling bead-based representations of molecular systems. Provides utility + methods to extract averaged positions, convert coordinates to spherical systems, + compute weighted forces and torques, and manipulate matrices used in entropy + analysis. + """ + + def __init__(self): + """ + Initializes the LevelManager with placeholders for level-related data, + including translational and rotational axes, number of beads, and a + general-purpose data container. + """ + self.data_container = None + self._levels = None + self._trans_axes = None + self._rot_axes = None + self._number_of_beads = None + + def select_levels(self, data_container): + """ + Function to read input system and identify the number of molecules and + the levels (i.e. united atom, residue and/or polymer) that should be used. + The level refers to the size of the bead (atom or collection of atoms) + that will be used in the entropy calculations. + + Input + ----- + arg_DataContainer : MDAnalysis universe object containing the system of interest + + Returns + ------- + number_molecules : integer + levels : array of strings for each molecule + """ + + # fragments is MDAnalysis terminology for what chemists would call molecules + number_molecules = len(data_container.atoms.fragments) + logger.debug("The number of molecules is {}.".format(number_molecules)) + + fragments = data_container.atoms.fragments + levels = [[] for _ in range(number_molecules)] + + for molecule in range(number_molecules): + levels[molecule].append( + "united_atom" + ) # every molecule has at least one atom + + atoms_in_fragment = fragments[molecule].select_atoms("not name H*") + number_residues = len(atoms_in_fragment.residues) + + if len(atoms_in_fragment) > 1: + levels[molecule].append("residue") + + if number_residues > 1: + levels[molecule].append("polymer") + + logger.debug(f"levels {levels}") + + return number_molecules, levels + + def get_matrices( + self, data_container, level, start, end, step, number_frames, highest_level + ): + """ + Function to create the force matrix needed for the transvibrational entropy + calculation and the torque matrix for the rovibrational entropy calculation. + + Input + ----- + data_container : MDAnalysis universe type with the information on the + molecule of interest. + level : string, which of the polymer, residue, or united atom levels + are the matrices for. + start : int, starting frame, default 0 (first frame) + end : int, ending frame, default -1 (last frame) + step : int, step for going through trajectories, default 1 + + Returns + ------- + force_matrix : force covariance matrix for transvibrational entropy + torque_matrix : torque convariance matrix for rovibrational entropy + """ + + # Make beads + list_of_beads = self.get_beads(data_container, level) + + # number of beads and frames in trajectory + number_beads = len(list_of_beads) + + # initialize force and torque arrays + weighted_forces = [ + [0 for x in range(number_frames)] for y in range(number_beads) + ] + weighted_torques = [ + [0 for x in range(number_frames)] for y in range(number_beads) + ] + + # Calculate forces/torques for each bead + for bead_index in range(number_beads): + for timestep in data_container.trajectory[start:end:step]: + # Set up axes + # translation and rotation use different axes + # how the axes are defined depends on the level + trans_axes, rot_axes = self.get_axes(data_container, level, bead_index) + + # Sort out coordinates, forces, and torques for each atom in the bead + timestep_index = timestep.frame - start + weighted_forces[bead_index][timestep_index] = self.get_weighted_forces( + data_container, list_of_beads[bead_index], trans_axes, highest_level + ) + weighted_torques[bead_index][timestep_index] = ( + self.get_weighted_torques( + data_container, list_of_beads[bead_index], rot_axes + ) + ) + + # Make covariance matrices - looping over pairs of beads + # list of pairs of indices + pair_list = [(i, j) for i in range(number_beads) for j in range(number_beads)] + + force_submatrix = [ + [0 for x in range(number_beads)] for y in range(number_beads) + ] + torque_submatrix = [ + [0 for x in range(number_beads)] for y in range(number_beads) + ] + + for i, j in pair_list: + # for each pair of beads + # reducing effort because the matrix for [i][j] is the transpose of the one + # for [j][i] + if i <= j: + # calculate the force covariance segment of the matrix + force_submatrix[i][j] = self.create_submatrix( + weighted_forces[i], weighted_forces[j], number_frames + ) + force_submatrix[j][i] = np.transpose(force_submatrix[i][j]) + + # calculate the torque covariance segment of the matrix + torque_submatrix[i][j] = self.create_submatrix( + weighted_torques[i], weighted_torques[j], number_frames + ) + torque_submatrix[j][i] = np.transpose(torque_submatrix[i][j]) + + # use np.block to make submatrices into one matrix + force_matrix = np.block( + [ + [force_submatrix[i][j] for j in range(number_beads)] + for i in range(number_beads) + ] + ) + + torque_matrix = np.block( + [ + [torque_submatrix[i][j] for j in range(number_beads)] + for i in range(number_beads) + ] + ) + + # fliter zeros to remove any rows/columns that are all zero + force_matrix = self.filter_zero_rows_columns(force_matrix) + torque_matrix = self.filter_zero_rows_columns(torque_matrix) + + logger.debug(f"Force Matrix: {force_matrix}") + logger.debug(f"Torque Matrix: {torque_matrix}") + + return force_matrix, torque_matrix + + def get_dihedrals(self, data_container, level): + """ + Define the set of dihedrals for use in the conformational entropy function. + If residue level, the dihedrals are defined from the atoms + (4 bonded atoms for 1 dihedral). + If polymer level, use the bonds between residues to cast dihedrals. + Note: not using improper dihedrals only ones with 4 atoms/residues + in a linear arrangement. + + Input + ----- + data_container : system information + level : level of the hierarchy (should be residue or polymer) + + Output + ------ + dihedrals : set of dihedrals + """ + # Start with empty array + dihedrals = [] + + # if united atom level, read dihedrals from MDAnalysis universe + if level == "united_atom": + dihedrals = data_container.dihedrals + + # if residue level, looking for dihedrals involving residues + if level == "residue": + num_residues = len(data_container.residues) + if num_residues < 4: + logger.debug("no residue level dihedrals") + + else: + # find bonds between residues N-3:N-2 and N-1:N + for residue in range(4, num_residues + 1): + # Using MDAnalysis selection, + # assuming only one covalent bond between neighbouring residues + # TODO not written for branched polymers + atom_string = ( + "resindex " + + str(residue - 4) + + " and bonded resindex " + + str(residue - 3) + ) + atom1 = data_container.select_atoms(atom_string) + + atom_string = ( + "resindex " + + str(residue - 3) + + " and bonded resindex " + + str(residue - 4) + ) + atom2 = data_container.select_atoms(atom_string) + + atom_string = ( + "resindex " + + str(residue - 2) + + " and bonded resindex " + + str(residue - 1) + ) + atom3 = data_container.select_atoms(atom_string) + + atom_string = ( + "resindex " + + str(residue - 1) + + " and bonded resindex " + + str(residue - 2) + ) + atom4 = data_container.select_atoms(atom_string) + + atom_group = atom1 + atom2 + atom3 + atom4 + dihedrals.append(atom_group.dihedral) + + logger.debug(f"Dihedrals: {dihedrals}") + + return dihedrals + + def get_beads(self, data_container, level): + """ + Function to define beads depending on the level in the hierarchy. + + Input + ----- + data_container : the MDAnalysis universe + level : the heirarchy level (polymer, residue, or united atom) + + Output + ------ + list_of_beads : the relevent beads + """ + + if level == "polymer": + list_of_beads = [] + atom_group = "all" + list_of_beads.append(data_container.select_atoms(atom_group)) + + if level == "residue": + list_of_beads = [] + num_residues = len(data_container.residues) + for residue in range(num_residues): + atom_group = "resindex " + str(residue) + list_of_beads.append(data_container.select_atoms(atom_group)) + + if level == "united_atom": + list_of_beads = [] + heavy_atoms = data_container.select_atoms("not name H*") + for atom in heavy_atoms: + atom_group = ( + "index " + + str(atom.index) + + " or (name H* and bonded index " + + str(atom.index) + + ")" + ) + list_of_beads.append(data_container.select_atoms(atom_group)) + + logger.debug(f"List of beads: {list_of_beads}") + + return list_of_beads + + def get_axes(self, data_container, level, index=0): + """ + Function to set the translational and rotational axes. + The translational axes are based on the principal axes of the unit + one level larger than the level we are interested in (except for + the polymer level where there is no larger unit). The rotational + axes use the covalent links between residues or atoms where possible + to define the axes, or if the unit is not bonded to others of the + same level the prinicpal axes of the unit are used. + + Input + ----- + data_container : the information about the molecule and trajectory + level : the level (united atom, residue, or polymer) of interest + index : residue index (integer) + + Output + ------ + trans_axes : translational axes + rot_axes : rotational axes + """ + index = int(index) + + if level == "polymer": + # for polymer use principle axis for both translation and rotation + trans_axes = data_container.atoms.principal_axes() + rot_axes = data_container.atoms.principal_axes() + + if level == "residue": + # Translation + # for residues use principal axes of whole molecule for translation + trans_axes = data_container.atoms.principal_axes() + + # Rotation + # find bonds between atoms in residue of interest and other residues + # we are assuming bonds only exist between adjacent residues + # (linear chains of residues) + # TODO refine selection so that it will work for branched polymers + index_prev = index - 1 + index_next = index + 1 + atom_set = data_container.select_atoms( + f"(resindex {index_prev} or resindex {index_next}) " + f"and bonded resid {index}" + ) + residue = data_container.select_atoms(f"resindex {index}") + + if len(atom_set) == 0: + # if no bonds to other residues use pricipal axes of residue + rot_axes = residue.atoms.principal_axes() + + else: + # set center of rotation to center of mass of the residue + center = residue.atoms.center_of_mass() + + # get vector for average position of bonded atoms + vector = self.get_avg_pos(atom_set, center) + + # use spherical coordinates function to get rotational axes + rot_axes = self.get_sphCoord_axes(vector) + + if level == "united_atom": + # Translation + # for united atoms use principal axes of residue for translation + trans_axes = data_container.residues.principal_axes() + + # Rotation + # for united atoms use heavy atoms bonded to the heavy atom + atom_set = data_container.select_atoms( + f"not name H* and bonded index {index}" + ) + + # center at position of heavy atom + atom_group = data_container.select_atoms(f"index {index}") + center = atom_group.positions[0] + + # get vector for average position of hydrogens + vector = self.get_avg_pos(atom_set, center) + + # use spherical coordinates function to get rotational axes + rot_axes = self.get_sphCoord_axes(vector) + + logger.debug(f"Translational Axes: {trans_axes}") + logger.debug(f"Rotational Axes: {rot_axes}") + + return trans_axes, rot_axes + + def get_avg_pos(self, atom_set, center): + """ + Function to get the average position of a set of atoms. + + Input + ----- + atoms : MDAnalysis atom group + center : position for center of rotation + + Output + ------ + avg_position : three dimensional vector + """ + # start with an empty vector + avg_position = np.zeros((3)) + + # get number of atoms + number_atoms = len(atom_set.names) + + if number_atoms != 0: + # sum positions for all atoms in the given set + for atom_index in range(number_atoms): + atom_position = atom_set.atoms[atom_index].position + + avg_position += atom_position + + avg_position /= number_atoms # divide by number of atoms to get average + + else: + # if no atoms in set the unit has no bonds to restrict its rotational + # motion, so we can use a random vector to get the spherical + # coordinates axes + avg_position = np.random.random(3) + + # transform the average position to a coordinate system with the origin + # at center + avg_position = avg_position - center + + logger.debug(f"Average Position: {avg_position}") + + return avg_position + + def get_sphCoord_axes(self, arg_r): + """ + For a given vector in space, treat it is a radial vector rooted at + 0,0,0 and derive a curvilinear coordinate system according to the + rules of polar spherical coordinates + """ + + x2y2 = arg_r[0] ** 2 + arg_r[1] ** 2 + r2 = x2y2 + arg_r[2] ** 2 + + # Check for division by zero + if r2 == 0.0: + raise ValueError("r2 is zero, cannot compute spherical coordinates.") + + if x2y2 == 0.0: + raise ValueError("x2y2 is zero, cannot compute sin_phi and cos_phi.") + + # Check for non-negative values inside the square root + if x2y2 / r2 < 0: + raise ValueError( + f"Negative value encountered for sin_theta calculation: {x2y2 / r2}. " + f"Cannot take square root." + ) + + if x2y2 < 0: + raise ValueError( + f"Negative value encountered for sin_phi and cos_phi " + f"calculation: {x2y2}. " + f"Cannot take square root." + ) + + if x2y2 != 0.0: + sin_theta = np.sqrt(x2y2 / r2) + cos_theta = arg_r[2] / np.sqrt(r2) + + sin_phi = arg_r[1] / np.sqrt(x2y2) + cos_phi = arg_r[0] / np.sqrt(x2y2) + + else: + sin_theta = 0.0 + cos_theta = 1 + + sin_phi = 0.0 + cos_phi = 1 + + # if abs(sin_theta) > 1 or abs(sin_phi) > 1: + # print('Bad sine : T {} , P {}'.format(sin_theta, sin_phi)) + + # cos_theta = np.sqrt(1 - sin_theta*sin_theta) + # cos_phi = np.sqrt(1 - sin_phi*sin_phi) + + # print('{} {} {}'.format(*arg_r)) + # print('Sin T : {}, cos T : {}'.format(sin_theta, cos_theta)) + # print('Sin P : {}, cos P : {}'.format(sin_phi, cos_phi)) + + spherical_basis = np.zeros((3, 3)) + + # r^ + spherical_basis[0, :] = np.asarray( + [sin_theta * cos_phi, sin_theta * sin_phi, cos_theta] + ) + + # Theta^ + spherical_basis[1, :] = np.asarray( + [cos_theta * cos_phi, cos_theta * sin_phi, -sin_theta] + ) + + # Phi^ + spherical_basis[2, :] = np.asarray([-sin_phi, cos_phi, 0.0]) + + logger.debug(f"Spherical Basis: {spherical_basis}") + + return spherical_basis + + def get_weighted_forces( + self, data_container, bead, trans_axes, highest_level, force_partitioning=0.5 + ): + """ + Function to calculate the mass weighted forces for a given bead. + + Input + ----- + bead : the part of the system to be considered + trans_axes : the axes relative to which the forces are located + + Output + ------ + weighted_force : the mass weighted sum of the forces in the bead + """ + + forces_trans = np.zeros((3,)) + + # Sum forces from all atoms in the bead + for atom in bead.atoms: + # update local forces in translational axes + forces_local = np.matmul(trans_axes, data_container.atoms[atom.index].force) + forces_trans += forces_local + + if highest_level: + # multiply by the force_partitioning parameter to avoid double counting + # of the forces on weakly correlated atoms + # the default value of force_partitioning is 0.5 (dividing by two) + forces_trans = force_partitioning * forces_trans + + # divide the sum of forces by the mass of the bead to get the weighted forces + mass = bead.total_mass() + + # Check that mass is positive to avoid division by 0 or negative values inside + # sqrt + if mass <= 0: + raise ValueError( + f"Invalid mass value: {mass}. Mass must be positive to compute the " + f"square root." + ) + + weighted_force = forces_trans / np.sqrt(mass) + + logger.debug(f"Weighted Force: {weighted_force}") + + return weighted_force + + def get_weighted_torques( + self, data_container, bead, rot_axes, force_partitioning=0.5 + ): + """ + Function to calculate the moment of inertia weighted torques for a given bead. + + This function computes torques in a rotated frame and then weights them using + the moment of inertia tensor. To prevent numerical instability, it treats + extremely small diagonal elements of the moment of inertia tensor as zero + (since values below machine precision are effectively zero). This avoids + unnecessary use of extended precision (e.g., float128). + + Additionally, if the computed torque is already zero, the function skips + the division step, reducing unnecessary computations and potential errors. + + Parameters + ---------- + data_container : object + Contains atomic positions and forces. + bead : object + The part of the molecule to be considered. + rot_axes : np.ndarray + The axes relative to which the forces and coordinates are located. + force_partitioning : float, optional + Factor to adjust force contributions, default is 0.5. + + Returns + ------- + np.ndarray + The mass-weighted sum of the torques in the bead. + """ + + torques = np.zeros((3,)) + weighted_torque = np.zeros((3,)) + + for atom in bead.atoms: + + # update local coordinates in rotational axes + coords_rot = ( + data_container.atoms[atom.index].position - bead.center_of_mass() + ) + coords_rot = np.matmul(rot_axes, coords_rot) + # update local forces in rotational frame + forces_rot = np.matmul(rot_axes, data_container.atoms[atom.index].force) + + # multiply by the force_partitioning parameter to avoid double counting + # of the forces on weakly correlated atoms + # the default value of force_partitioning is 0.5 (dividing by two) + forces_rot = force_partitioning * forces_rot + + # define torques (cross product of coordinates and forces) in rotational + # axes + torques_local = np.cross(coords_rot, forces_rot) + torques += torques_local + + # divide by moment of inertia to get weighted torques + # moment of inertia is a 3x3 tensor + # the weighting is done in each dimension (x,y,z) using the diagonal + # elements of the moment of inertia tensor + moment_of_inertia = bead.moment_of_inertia() + + for dimension in range(3): + # Skip calculation if torque is already zero + if np.isclose(torques[dimension], 0): + weighted_torque[dimension] = 0 + continue + + # Check for zero moment of inertia + if np.isclose(moment_of_inertia[dimension, dimension], 0): + raise ZeroDivisionError( + f"Attempted to divide by zero moment of inertia in dimension " + f"{dimension}." + ) + + # Check for negative moment of inertia + if moment_of_inertia[dimension, dimension] < 0: + raise ValueError( + f"Negative value encountered for moment of inertia: " + f"{moment_of_inertia[dimension, dimension]} " + f"Cannot compute weighted torque." + ) + + # Compute weighted torque + weighted_torque[dimension] = torques[dimension] / np.sqrt( + moment_of_inertia[dimension, dimension] + ) + + logger.debug(f"Weighted Torque: {weighted_torque}") + + return weighted_torque + + def create_submatrix(self, data_i, data_j, number_frames): + """ + Function for making covariance matrices. + + Input + ----- + data_i : values for bead i + data_j : valuees for bead j + + Output + ------ + submatrix : 3x3 matrix for the covariance between i and j + """ + + # Start with 3 by 3 matrix of zeros + submatrix = np.zeros((3, 3)) + + # For each frame calculate the outer product (cross product) of the data from + # the two beads and add the result to the submatrix + for frame in range(number_frames): + outer_product_matrix = np.outer(data_i[frame], data_j[frame]) + submatrix = np.add(submatrix, outer_product_matrix) + + # Divide by the number of frames to get the average + submatrix /= number_frames + + logger.debug(f"Submatrix: {submatrix}") + + return submatrix + + def filter_zero_rows_columns(self, arg_matrix): + """ + function for removing rows and columns that contain only zeros from a matrix + + Input + ----- + arg_matrix : matrix + + Output + ------ + arg_matrix : the reduced size matrix + """ + + # record the initial size + init_shape = np.shape(arg_matrix) + + zero_indices = list( + filter( + lambda row: np.all(np.isclose(arg_matrix[row, :], 0.0)), + np.arange(np.shape(arg_matrix)[0]), + ) + ) + all_indices = np.ones((np.shape(arg_matrix)[0]), dtype=bool) + all_indices[zero_indices] = False + arg_matrix = arg_matrix[all_indices, :] + + all_indices = np.ones((np.shape(arg_matrix)[1]), dtype=bool) + zero_indices = list( + filter( + lambda col: np.all(np.isclose(arg_matrix[:, col], 0.0)), + np.arange(np.shape(arg_matrix)[1]), + ) + ) + all_indices[zero_indices] = False + arg_matrix = arg_matrix[:, all_indices] + + # get the final shape + final_shape = np.shape(arg_matrix) + + if init_shape != final_shape: + logger.debug( + "A shape change has occurred ({},{}) -> ({}, {})".format( + *init_shape, *final_shape + ) + ) + + logger.debug(f"arg_matrix: {arg_matrix}") + + return arg_matrix diff --git a/CodeEntropy/main.py b/CodeEntropy/main.py new file mode 100644 index 0000000..f88ed62 --- /dev/null +++ b/CodeEntropy/main.py @@ -0,0 +1,28 @@ +import logging +import sys + +from CodeEntropy.run import RunManager + +logger = logging.getLogger(__name__) + + +def main(): + """ + Main function for calculating the entropy of a system using the multiscale cell + correlation method. + """ + + # Setup initial services + folder = RunManager.create_job_folder() + + try: + run_manager = RunManager(folder=folder) + run_manager.run_entropy_workflow() + except Exception as e: + logger.critical(f"Fatal error during entropy calculation: {e}", exc_info=True) + sys.exit(1) + + +if __name__ == "__main__": + + main() diff --git a/CodeEntropy/main_mcc.py b/CodeEntropy/main_mcc.py deleted file mode 100644 index caeadb8..0000000 --- a/CodeEntropy/main_mcc.py +++ /dev/null @@ -1,473 +0,0 @@ -import logging -import math -import os -import re -import sys - -import MDAnalysis as mda -import pandas as pd - -from CodeEntropy.calculations import EntropyFunctions as EF -from CodeEntropy.calculations import LevelFunctions as LF -from CodeEntropy.calculations import MDAUniverseHelper as MDAHelper -from CodeEntropy.config.arg_config_manager import ConfigManager -from CodeEntropy.config.data_logger import DataLogger -from CodeEntropy.config.logging_config import LoggingConfig - - -def create_job_folder(): - """ - Create a new job folder with an incremented job number based on existing folders. - """ - # Get the current working directory - base_dir = os.getcwd() - - # List all folders in the base directory - existing_folders = [ - f for f in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, f)) - ] - - # Filter folders that match the pattern 'jobXXX' - job_folders = [f for f in existing_folders if re.match(r"job\d{3}", f)] - - # Determine the next job number - if job_folders: - max_job_number = max([int(re.search(r"\d{3}", f).group()) for f in job_folders]) - next_job_number = max_job_number + 1 - else: - next_job_number = 1 - - # Format the new job folder name - new_job_folder = f"job{next_job_number:03d}" - new_job_folder_path = os.path.join(base_dir, new_job_folder) - - # Create the new job folder - os.makedirs(new_job_folder_path, exist_ok=True) - - return new_job_folder_path - - -def main(): - """ - Main function for calculating the entropy of a system using the multiscale cell - correlation method. - """ - folder = create_job_folder() - data_logger = DataLogger() - arg_config = ConfigManager() - - # Load configuration - config = arg_config.load_config("config.yaml") - if config is None: - raise ValueError( - "No configuration file found, and no CLI arguments were provided." - ) - - parser = arg_config.setup_argparse() - args, unknown = parser.parse_known_args() - args.output_file = os.path.join(folder, args.output_file) - - try: - # Initialize the logging system once - logging_config = LoggingConfig(folder) - logger = logging_config.setup_logging() - - # Process each run in the YAML configuration - for run_name, run_config in config.items(): - if isinstance(run_config, dict): - # Merging CLI arguments with YAML configuration - args = arg_config.merge_configs(args, run_config) - - # Determine logging level - log_level = logging.DEBUG if args.verbose else logging.INFO - - # Update the logging level - logging_config.update_logging_level(log_level) - - # Capture and log the command-line invocation - command = " ".join(sys.argv) - logging.getLogger("commands").info(command) - - # Ensure necessary arguments are provided - if not getattr(args, "top_traj_file"): - raise ValueError( - "The 'top_traj_file' argument is required but not provided." - ) - if not getattr(args, "selection_string"): - raise ValueError( - "The 'selection_string' argument is required but not provided." - ) - - # Log all inputs for the current run - logger.info(f"All input for {run_name}") - for arg in vars(args): - logger.info(f" {arg}: {getattr(args, arg) or ''}") - else: - logger.warning(f"Run configuration for {run_name} is not a dictionary.") - except ValueError as e: - logger.error(e) - raise - - # Get topology and trajectory file names and make universe - tprfile = args.top_traj_file[0] - trrfile = args.top_traj_file[1:] - u = mda.Universe(tprfile, trrfile) - - # Define bin_width for histogram from inputs - bin_width = args.bin_width - - # Define trajectory slicing from inputs - start = args.start - if start is None: - start = 0 - end = args.end - if end is None: - end = -1 - step = args.step - if step is None: - step = 1 - # Count number of frames, easy if not slicing - # MDAnalysis trajectory slicing only includes up to end-1 - # This works the way we want it to if the whole trajectory is being included - if start == 0 and end == -1 and step == 1: - end = len(u.trajectory) - number_frames = len(u.trajectory) - elif end == -1: - end = len(u.trajectory) - number_frames = math.floor((end - start) / step) - else: - end = end + 1 - number_frames = math.floor((end - start) / step) - logger.debug(f"Number of Frames: {number_frames}") - - # Create pandas data frame for results - results_df = pd.DataFrame(columns=["Molecule ID", "Level", "Type", "Result"]) - residue_results_df = pd.DataFrame( - columns=["Molecule ID", "Residue", "Type", "Result"] - ) - - # Reduce number of atoms in MDA universe to selection_string arg - # (default all atoms included) - if args.selection_string == "all": - reduced_atom = u - else: - reduced_atom = MDAHelper.new_U_select_atom(u, args.selection_string) - reduced_atom_name = f"{len(reduced_atom.trajectory)}_frame_dump_atom_selection" - MDAHelper.write_universe(reduced_atom, reduced_atom_name) - - # Scan system for molecules and select levels (united atom, residue, polymer) - # for each - number_molecules, levels = LF.select_levels(reduced_atom, args.verbose) - - # Loop over molecules - for molecule in range(number_molecules): - # molecule data container of MDAnalysis Universe type for internal degrees - # of freedom getting indices of first and last atoms in the molecule - # assuming atoms are numbered consecutively and all atoms in a given - # molecule are together - index1 = reduced_atom.atoms.fragments[molecule].indices[0] - index2 = reduced_atom.atoms.fragments[molecule].indices[-1] - selection_string = f"index {index1}:{index2}" - molecule_container = MDAHelper.new_U_select_atom(reduced_atom, selection_string) - - # Calculate entropy for each relevent level - for level in levels[molecule]: - if level == levels[molecule][-1]: - highest_level = True - else: - highest_level = False - - if level == "united_atom": - # loop over residues, report results per residue + total united atom - # level. This is done per residue to reduce the size of the matrices - - # amino acid resiudes have tens of united atoms but a whole protein - # could have thousands. Doing the calculation per residue allows for - # comparisons of contributions from different residues - num_residues = len(molecule_container.residues) - S_trans = 0 - S_rot = 0 - S_conf = 0 - - for residue in range(num_residues): - # molecule data container of MDAnalysis Universe type for internal - # degrees of freedom getting indices of first and last atoms in the - # molecule assuming atoms are numbered consecutively and all atoms - # in a given molecule are together - index1 = molecule_container.residues[residue].atoms.indices[0] - index2 = molecule_container.residues[residue].atoms.indices[-1] - selection_string = f"index {index1}:{index2}" - residue_container = MDAHelper.new_U_select_atom( - molecule_container, selection_string - ) - residue_heavy_atoms_container = MDAHelper.new_U_select_atom( - residue_container, "not name H*" - ) # only heavy atom dihedrals are relevant - - # Vibrational entropy at every level - # Get the force and torque matrices for the beads at the relevant - # level - - force_matrix, torque_matrix = LF.get_matrices( - residue_container, - level, - args.verbose, - start, - end, - step, - number_frames, - highest_level, - ) - - # Calculate the entropy from the diagonalisation of the matrices - S_trans_residue = EF.vibrational_entropy( - force_matrix, "force", args.temperature, highest_level - ) - S_trans += S_trans_residue - logger.debug(f"S_trans_{level}_{residue} = {S_trans_residue}") - new_row = pd.DataFrame( - { - "Molecule ID": [molecule], - "Residue": [residue], - "Type": ["Transvibrational (J/mol/K)"], - "Result": [S_trans_residue], - } - ) - residue_results_df = pd.concat( - [residue_results_df, new_row], ignore_index=True - ) - data_logger.add_residue_data( - molecule, residue, "Transvibrational", S_trans_residue - ) - - S_rot_residue = EF.vibrational_entropy( - torque_matrix, "torque", args.temperature, highest_level - ) - S_rot += S_rot_residue - logger.debug(f"S_rot_{level}_{residue} = {S_rot_residue}") - new_row = pd.DataFrame( - { - "Molecule ID": [molecule], - "Residue": [residue], - "Type": ["Rovibrational (J/mol/K)"], - "Result": [S_rot_residue], - } - ) - residue_results_df = pd.concat( - [residue_results_df, new_row], ignore_index=True - ) - data_logger.add_residue_data( - molecule, residue, "Rovibrational", S_rot_residue - ) - - # Conformational entropy based on atom dihedral angle distributions - # Gives entropy of conformations within each residue - - # Get dihedral angle distribution - dihedrals = LF.get_dihedrals(residue_heavy_atoms_container, level) - - # Calculate conformational entropy - S_conf_residue = EF.conformational_entropy( - residue_heavy_atoms_container, - dihedrals, - bin_width, - start, - end, - step, - number_frames, - ) - S_conf += S_conf_residue - logger.debug(f"S_conf_{level}_{residue} = {S_conf_residue}") - new_row = pd.DataFrame( - { - "Molecule ID": [molecule], - "Residue": [residue], - "Type": ["Conformational (J/mol/K)"], - "Result": [S_conf_residue], - } - ) - residue_results_df = pd.concat( - [residue_results_df, new_row], ignore_index=True - ) - data_logger.add_residue_data( - molecule, residue, "Conformational", S_conf_residue - ) - - # Print united atom level results summed over all residues - logger.debug(f"S_trans_{level} = {S_trans}") - new_row = pd.DataFrame( - { - "Molecule ID": [molecule], - "Level": [level], - "Type": ["Transvibrational (J/mol/K)"], - "Result": [S_trans], - } - ) - - results_df = pd.concat([results_df, new_row], ignore_index=True) - - data_logger.add_results_data( - molecule, level, "Transvibrational", S_trans - ) - - logger.debug(f"S_rot_{level} = {S_rot}") - new_row = pd.DataFrame( - { - "Molecule ID": [molecule], - "Level": [level], - "Type": ["Rovibrational (J/mol/K)"], - "Result": [S_rot], - } - ) - results_df = pd.concat([results_df, new_row], ignore_index=True) - - data_logger.add_results_data(molecule, level, "Rovibrational", S_rot) - logger.debug(f"S_conf_{level} = {S_conf}") - - new_row = pd.DataFrame( - { - "Molecule ID": [molecule], - "Level": [level], - "Type": ["Conformational (J/mol/K)"], - "Result": [S_conf], - } - ) - results_df = pd.concat([results_df, new_row], ignore_index=True) - - data_logger.add_results_data(molecule, level, "Conformational", S_conf) - - if level in ("polymer", "residue"): - # Vibrational entropy at every level - # Get the force and torque matrices for the beads at the relevant level - force_matrix, torque_matrix = LF.get_matrices( - molecule_container, - level, - args.verbose, - start, - end, - step, - number_frames, - highest_level, - ) - - # Calculate the entropy from the diagonalisation of the matrices - S_trans = EF.vibrational_entropy( - force_matrix, "force", args.temperature, highest_level - ) - logger.debug(f"S_trans_{level} = {S_trans}") - - # Create new row as a DataFrame for Transvibrational - new_row_trans = pd.DataFrame( - { - "Molecule ID": [molecule], - "Level": [level], - "Type": ["Transvibrational (J/mol/K)"], - "Result": [S_trans], - } - ) - - # Concatenate the new row to the DataFrame - results_df = pd.concat([results_df, new_row_trans], ignore_index=True) - - # Calculate the entropy for Rovibrational - S_rot = EF.vibrational_entropy( - torque_matrix, "torque", args.temperature, highest_level - ) - logger.debug(f"S_rot_{level} = {S_rot}") - - # Create new row as a DataFrame for Rovibrational - new_row_rot = pd.DataFrame( - { - "Molecule ID": [molecule], - "Level": [level], - "Type": ["Rovibrational (J/mol/K)"], - "Result": [S_rot], - } - ) - - # Concatenate the new row to the DataFrame - results_df = pd.concat([results_df, new_row_rot], ignore_index=True) - - data_logger.add_results_data( - molecule, level, "Transvibrational", S_trans - ) - data_logger.add_results_data(molecule, level, "Rovibrational", S_rot) - - # Note: conformational entropy is not calculated at the polymer level, - # because there is at most one polymer bead per molecule so no dihedral - # angles. - - if level == "residue": - # Conformational entropy based on distributions of dihedral angles - # of residues. Gives conformational entropy of secondary structure - - # Get dihedral angle distribution - dihedrals = LF.get_dihedrals(molecule_container, level) - # Calculate conformational entropy - S_conf = EF.conformational_entropy( - molecule_container, - dihedrals, - bin_width, - start, - end, - step, - number_frames, - ) - logger.debug(f"S_conf_{level} = {S_conf}") - new_row = pd.DataFrame( - { - "Molecule ID": [molecule], - "Level": [level], - "Type": ["Conformational (J/mol/K)"], - "Result": [S_conf], - } - ) - results_df = pd.concat([results_df, new_row], ignore_index=True) - data_logger.add_results_data(molecule, level, "Conformational", S_conf) - - # Orientational entropy based on network of neighbouring molecules, - # only calculated at the highest level (whole molecule) - # if highest_level: - # neigbours = LF.get_neighbours(reduced_atom, molecule) - # S_orient = EF.orientational_entropy(neighbours) - # print(f"S_orient_{level} = {S_orient}") - # new_row = pd.DataFrame({ - # 'Molecule ID': [molecule], - # 'Level': [level], - # 'Type':['Orientational (J/mol/K)'], - # 'Result': [S_orient],}) - # results_df = pd.concat([results_df, new_row], ignore_index=True) - # with open(args.output_file, "a") as out: - # print(molecule, - # "\t", - # level, - # "\tOrientational\t", - # S_orient, - # file=out) - - # Report total entropy for the molecule - S_molecule = results_df[results_df["Molecule ID"] == molecule]["Result"].sum() - logger.debug(f"S_molecule = {S_molecule}") - new_row = pd.DataFrame( - { - "Molecule ID": [molecule], - "Level": ["Molecule Total"], - "Type": ["Molecule Total Entropy "], - "Result": [S_molecule], - } - ) - results_df = pd.concat([results_df, new_row], ignore_index=True) - - data_logger.add_results_data( - molecule, level, "Molecule Total Entropy", S_molecule - ) - data_logger.save_dataframes_as_json( - results_df, residue_results_df, args.output_file - ) - - logger.info("Molecules:") - data_logger.log_tables() - - -if __name__ == "__main__": - - main() diff --git a/CodeEntropy/run.py b/CodeEntropy/run.py new file mode 100644 index 0000000..5aa76ee --- /dev/null +++ b/CodeEntropy/run.py @@ -0,0 +1,288 @@ +import logging +import os +import pickle + +import MDAnalysis as mda +from MDAnalysis.analysis.base import AnalysisFromFunction +from MDAnalysis.coordinates.memory import MemoryReader + +from CodeEntropy.config.arg_config_manager import ConfigManager +from CodeEntropy.config.data_logger import DataLogger +from CodeEntropy.config.logging_config import LoggingConfig +from CodeEntropy.entropy import EntropyManager +from CodeEntropy.levels import LevelManager + +logger = logging.getLogger(__name__) + + +class RunManager: + """ + Handles the setup and execution of entropy analysis runs, including configuration + loading, logging, and access to physical constants used in calculations. + """ + + def __init__(self, folder): + """ + Initializes the RunManager with the working folder and sets up configuration, + data logging, and logging systems. Also defines physical constants used in + entropy calculations. + """ + self.folder = folder + self._config_manager = ConfigManager() + self._data_logger = DataLogger() + self._logging_config = LoggingConfig(folder) + self._N_AVOGADRO = 6.0221415e23 + self._DEF_TEMPER = 298 + + @property + def N_AVOGADRO(self): + """Returns Avogadro's number used in entropy calculations.""" + return self._N_AVOGADRO + + @property + def DEF_TEMPER(self): + """Returns the default temperature (in Kelvin) used in the analysis.""" + return self._DEF_TEMPER + + @staticmethod + def create_job_folder(): + """ + Create a new job folder with an incremented job number based on existing + folders. + """ + # Get the current working directory + current_dir = os.getcwd() + + # Get a list of existing folders that start with "job" + existing_folders = [f for f in os.listdir(current_dir) if f.startswith("job")] + + # Extract numbers from existing folder names + job_numbers = [] + for folder in existing_folders: + try: + # Assuming folder names are in the format "jobXXX" + job_number = int(folder[3:]) # Get the number part after "job" + job_numbers.append(job_number) + except ValueError: + continue # Ignore any folder names that don't follow the pattern + + # If no folders exist, start with job001 + if not job_numbers: + next_job_number = 1 + else: + next_job_number = max(job_numbers) + 1 + + # Create the new job folder name + new_job_folder = f"job{next_job_number:03d}" + + # Create the full path to the new folder + new_folder_path = os.path.join(current_dir, new_job_folder) + + # Create the directory + os.makedirs(new_folder_path, exist_ok=True) + + # Return the path of the newly created folder + return new_folder_path + + def run_entropy_workflow(self): + """ + Runs the entropy analysis workflow by setting up logging, loading configuration + files, parsing arguments, and executing the analysis for each configured run. + Initializes the MDAnalysis Universe and supporting managers, and logs all + relevant inputs and commands. + """ + try: + logger = self._logging_config.setup_logging() + + config = self._config_manager.load_config("config.yaml") + if config is None: + raise ValueError( + "No configuration file found, and no CLI arguments were provided." + ) + + parser = self._config_manager.setup_argparse() + args, _ = parser.parse_known_args() + args.output_file = os.path.join(self.folder, args.output_file) + + for run_name, run_config in config.items(): + if not isinstance(run_config, dict): + logger.warning( + f"Run configuration for {run_name} is not a dictionary." + ) + continue + + args = self._config_manager.merge_configs(args, run_config) + + log_level = logging.DEBUG if args.verbose else logging.INFO + self._logging_config.update_logging_level(log_level) + + command = " ".join(os.sys.argv) + logging.getLogger("commands").info(command) + + if not getattr(args, "top_traj_file", None): + raise ValueError("Missing 'top_traj_file' argument.") + if not getattr(args, "selection_string", None): + raise ValueError("Missing 'selection_string' argument.") + + # Log all inputs for the current run + logger.info(f"All input for {run_name}") + for arg in vars(args): + logger.info(f" {arg}: {getattr(args, arg) or ''}") + + # Load MDAnalysis Universe + tprfile = args.top_traj_file[0] + trrfile = args.top_traj_file[1:] + logger.debug(f"Loading Universe with {tprfile} and {trrfile}") + u = mda.Universe(tprfile, trrfile) + + # Create LevelManager instance + level_manager = LevelManager() + + # Inject all dependencies into EntropyManager + entropy_manager = EntropyManager( + run_manager=self, + args=args, + universe=u, + data_logger=self._data_logger, + level_manager=level_manager, + ) + + entropy_manager.execute() + + except Exception as e: + logger.error(f"RunManager encountered an error: {e}", exc_info=True) + raise + + def new_U_select_frame(self, u, start=None, end=None, step=1): + """Create a reduced universe by dropping frames according to user selection + + Parameters + ---------- + u : MDAnalyse.Universe + A Universe object will all topology, dihedrals,coordinates and force + information + start : int or None, Optional, default: None + Frame id to start analysis. Default None will start from frame 0 + end : int or None, Optional, default: None + Frame id to end analysis. Default None will end at last frame + step : int, Optional, default: 1 + Steps between frame. + + Returns + ------- + u2 : MDAnalysis.Universe + reduced universe + """ + if start is None: + start = 0 + if end is None: + end = len(u.trajectory) + select_atom = u.select_atoms("all", updating=True) + coordinates = ( + AnalysisFromFunction(lambda ag: ag.positions.copy(), select_atom) + .run() + .results["timeseries"][start:end:step] + ) + forces = ( + AnalysisFromFunction(lambda ag: ag.forces.copy(), select_atom) + .run() + .results["timeseries"][start:end:step] + ) + dimensions = ( + AnalysisFromFunction(lambda ag: ag.dimensions.copy(), select_atom) + .run() + .results["timeseries"][start:end:step] + ) + u2 = mda.Merge(select_atom) + u2.load_new( + coordinates, format=MemoryReader, forces=forces, dimensions=dimensions + ) + logger.debug(f"MDAnalysis.Universe - reduced universe: {u2}") + return u2 + + def new_U_select_atom(self, u, select_string="all"): + """Create a reduced universe by dropping atoms according to user selection + + Parameters + ---------- + u : MDAnalyse.Universe + A Universe object will all topology, dihedrals,coordinates and force + information + select_string : str, Optional, default: 'all' + MDAnalysis.select_atoms selection string. + + Returns + ------- + u2 : MDAnalysis.Universe + reduced universe + + """ + select_atom = u.select_atoms(select_string, updating=True) + coordinates = ( + AnalysisFromFunction(lambda ag: ag.positions.copy(), select_atom) + .run() + .results["timeseries"] + ) + forces = ( + AnalysisFromFunction(lambda ag: ag.forces.copy(), select_atom) + .run() + .results["timeseries"] + ) + dimensions = ( + AnalysisFromFunction(lambda ag: ag.dimensions.copy(), select_atom) + .run() + .results["timeseries"] + ) + u2 = mda.Merge(select_atom) + u2.load_new( + coordinates, format=MemoryReader, forces=forces, dimensions=dimensions + ) + logger.debug(f"MDAnalysis.Universe - reduced universe: {u2}") + return u2 + + def write_universe(self, u, name="default"): + """Write a universe to working directories as pickle + + Parameters + ---------- + u : MDAnalyse.Universe + A Universe object will all topology, dihedrals,coordinates and force + information + name : str, Optional. default: 'default' + The name of file with sub file name .pkl + + Returns + ------- + name : str + filename of saved universe + """ + filename = f"{name}.pkl" + pickle.dump(u, open(filename, "wb")) + return name + + def read_universe(self, path): + """read a universe to working directories as pickle + + Parameters + ---------- + path : str + The path to file. + + Returns + ------- + u : MDAnalysis.Universe + A Universe object will all topology, dihedrals,coordinates and force + information. + """ + u = pickle.load(open(path, "rb")) + return u + + def change_lambda_units(self, arg_lambdas): + """Unit of lambdas : kJ2 mol-2 A-2 amu-1 + change units of lambda to J/s2""" + # return arg_lambdas * N_AVOGADRO * N_AVOGADRO * AMU2KG * 1e-26 + return arg_lambdas * 1e29 / self.N_AVOGADRO + + def get_KT2J(self, arg_temper): + """A temperature dependent KT to Joule conversion""" + return 4.11e-21 * arg_temper / self.DEF_TEMPER diff --git a/pyproject.toml b/pyproject.toml index c1fc229..b30d0d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,7 +66,7 @@ docs = [ ] [project.scripts] -CodeEntropy = "CodeEntropy.main_mcc:main" +CodeEntropy = "CodeEntropy.main:main" [tool.isort] profile = "black" diff --git a/tests/test_EntropyFunctions/test_arg_config_manager.py b/tests/test_CodeEntropy/test_arg_config_manager.py similarity index 88% rename from tests/test_EntropyFunctions/test_arg_config_manager.py rename to tests/test_CodeEntropy/test_arg_config_manager.py index 989e090..427275e 100644 --- a/tests/test_EntropyFunctions/test_arg_config_manager.py +++ b/tests/test_CodeEntropy/test_arg_config_manager.py @@ -1,4 +1,5 @@ import argparse +import logging import os import shutil import tempfile @@ -7,7 +8,7 @@ import tests.data as data from CodeEntropy.config.arg_config_manager import ConfigManager -from CodeEntropy.main_mcc import main +from CodeEntropy.main import main class test_arg_config_manager(unittest.TestCase): @@ -93,12 +94,9 @@ def test_load_config_file_not_found(self, mock_file): @patch.object(ConfigManager, "load_config", return_value=None) def test_no_cli_no_yaml(self, mock_load_config): """Test behavior when no CLI arguments and no YAML file are provided.""" - with self.assertRaises(ValueError) as context: + with self.assertRaises(SystemExit) as context: main() - self.assertEqual( - str(context.exception), - "No configuration file found, and no CLI arguments were provided.", - ) + self.assertEqual(context.exception.code, 1) def test_invalid_run_config_type(self): """ @@ -302,6 +300,49 @@ def test_merge_with_none_yaml(self): self.assertEqual(merged_args.top_traj_file, ["/cli/path"]) + @patch("CodeEntropy.config.arg_config_manager.logger") + def test_merge_configs_sets_debug_logging(self, mock_logger): + """ + Ensure logging is set to DEBUG when verbose=True. + """ + arg_config = ConfigManager() + args = argparse.Namespace(verbose=True) + for key in arg_config.arg_map: + if not hasattr(args, key): + setattr(args, key, None) + + # Mock logger handlers + mock_handler = MagicMock() + mock_logger.handlers = [mock_handler] + + arg_config.merge_configs(args, {}) + + mock_logger.setLevel.assert_called_with(logging.DEBUG) + mock_handler.setLevel.assert_called_with(logging.DEBUG) + mock_logger.debug.assert_called_with( + "Verbose mode enabled. Logger set to DEBUG level." + ) + + @patch("CodeEntropy.config.arg_config_manager.logger") + def test_merge_configs_sets_info_logging(self, mock_logger): + """ + Ensure logging is set to INFO when verbose=False. + """ + arg_config = ConfigManager() + args = argparse.Namespace(verbose=False) + for key in arg_config.arg_map: + if not hasattr(args, key): + setattr(args, key, None) + + # Mock logger handlers + mock_handler = MagicMock() + mock_logger.handlers = [mock_handler] + + arg_config.merge_configs(args, {}) + + mock_logger.setLevel.assert_called_with(logging.INFO) + mock_handler.setLevel.assert_called_with(logging.INFO) + @patch("argparse.ArgumentParser.parse_args") def test_default_values(self, mock_parse_args): """ diff --git a/tests/test_CodeEntropy/test_data_logger.py b/tests/test_CodeEntropy/test_data_logger.py new file mode 100644 index 0000000..803b95f --- /dev/null +++ b/tests/test_CodeEntropy/test_data_logger.py @@ -0,0 +1,139 @@ +import json +import os +import shutil +import tempfile +import unittest +from unittest.mock import patch + +import pandas as pd + +from CodeEntropy.config.data_logger import DataLogger +from CodeEntropy.main import main + + +class TestDataLogger(unittest.TestCase): + """ + Unit tests for the DataLogger class. These tests verify the + correct behavior of data logging, JSON export, and table + logging functionalities. + """ + + def setUp(self): + """ + Set up a temporary test environment before each test. + Creates a temporary directory and initializes a DataLogger instance. + """ + self.test_dir = tempfile.mkdtemp(prefix="CodeEntropy_") + self.code_entropy = main + + self._orig_dir = os.getcwd() + os.chdir(self.test_dir) + + self.logger = DataLogger() + self.output_file = "test_output.json" + + def tearDown(self): + """ + Clean up the test environment after each test. + Removes the temporary directory and restores the original working directory. + """ + os.chdir(self._orig_dir) + if os.path.exists(self.test_dir): + shutil.rmtree(self.test_dir) + + def test_init(self): + """ + Test that the DataLogger initializes with empty molecule and residue data lists. + """ + self.assertEqual(self.logger.molecule_data, []) + self.assertEqual(self.logger.residue_data, []) + + def test_add_results_data(self): + """ + Test that add_results_data correctly appends a molecule-level entry. + """ + self.logger.add_results_data( + 0, "united_atom", "Transvibrational (J/mol/K)", 653.404 + ) + self.assertEqual( + self.logger.molecule_data, + [[0, "united_atom", "Transvibrational (J/mol/K)", "653.404"]], + ) + + def test_add_residue_data(self): + """ + Test that add_residue_data correctly appends a residue-level entry. + """ + self.logger.add_residue_data(0, 0, "Transvibrational (J/mol/K)", 122.612) + self.assertEqual( + self.logger.residue_data, [[0, 0, "Transvibrational (J/mol/K)", "122.612"]] + ) + + def test_save_dataframes_as_json(self): + """ + Test that save_dataframes_as_json correctly writes molecule and residue data + to a JSON file with the expected structure and values. + """ + molecule_df = pd.DataFrame( + [ + { + "Molecule ID": 0, + "Level": "united_atom", + "Type": "Transvibrational (J/mol/K)", + "Result": 653.404, + }, + { + "Molecule ID": 1, + "Level": "united_atom", + "Type": "Rovibrational (J/mol/K)", + "Result": 236.081, + }, + ] + ) + residue_df = pd.DataFrame( + [ + { + "Molecule ID": 0, + "Residue": 0, + "Type": "Transvibrational (J/mol/K)", + "Result": 122.612, + }, + { + "Molecule ID": 1, + "Residue": 0, + "Type": "Conformational (J/mol/K)", + "Result": 6.845, + }, + ] + ) + + self.logger.save_dataframes_as_json(molecule_df, residue_df, self.output_file) + + with open(self.output_file, "r") as f: + data = json.load(f) + + self.assertIn("molecule_data", data) + self.assertIn("residue_data", data) + self.assertEqual(data["molecule_data"][0]["Type"], "Transvibrational (J/mol/K)") + self.assertEqual(data["residue_data"][0]["Residue"], 0) + + @patch("CodeEntropy.config.data_logger.logger") + def test_log_tables(self, mock_logger): + """ + Test that log_tables logs formatted molecule and residue tables using the + logger. + """ + self.logger.add_results_data( + 0, "united_atom", "Transvibrational (J/mol/K)", 653.404 + ) + self.logger.add_residue_data(0, 0, "Transvibrational (J/mol/K)", 122.612) + + self.logger.log_tables() + + calls = [call[0][0] for call in mock_logger.info.call_args_list] + self.assertTrue(any("Molecule Data Table:" in c for c in calls)) + self.assertTrue(any("Residue Data Table:" in c for c in calls)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_CodeEntropy/test_entropy.py b/tests/test_CodeEntropy/test_entropy.py new file mode 100644 index 0000000..7522cfe --- /dev/null +++ b/tests/test_CodeEntropy/test_entropy.py @@ -0,0 +1,179 @@ +import os +import shutil +import tempfile +import unittest +from unittest.mock import MagicMock + +import numpy as np +import pytest + +from CodeEntropy.entropy import VibrationalEntropy +from CodeEntropy.main import main +from CodeEntropy.run import RunManager + + +class TestVibrationalEntropy(unittest.TestCase): + """ + Unit tests for the functionality of Vibrational entropy calculations. + """ + + def setUp(self): + """ + Set up test environment. + """ + self.test_dir = tempfile.mkdtemp(prefix="CodeEntropy_") + self.code_entropy = main + + # Change to test directory + self._orig_dir = os.getcwd() + os.chdir(self.test_dir) + + def tearDown(self): + """ + Clean up after each test. + """ + os.chdir(self._orig_dir) + if os.path.exists(self.test_dir): + shutil.rmtree(self.test_dir) + + # test when lambda is zero + def test_frequency_calculation_0(self): + lambdas = 0 + temp = 298 + + run_manager = RunManager("mock_folder") + + ve = VibrationalEntropy( + run_manager, MagicMock(), MagicMock(), MagicMock(), MagicMock() + ) + frequencies = ve.frequency_calculation(lambdas, temp) + + assert frequencies == 0 + + # test when lambdas are positive + def test_frequency_calculation_pos(self): + lambdas = np.array([585495.0917897299, 658074.5130064893, 782425.305888707]) + temp = 298 + + # Create a mock RunManager and set return value for get_KT2J + run_manager = RunManager("mock_folder") + + # Instantiate VibrationalEntropy with mocks + ve = VibrationalEntropy( + run_manager, MagicMock(), MagicMock(), MagicMock(), MagicMock() + ) + + # Call the method under test + frequencies = ve.frequency_calculation(lambdas, temp) + + assert frequencies == pytest.approx( + [1899594266400.4016, 2013894687315.6213, 2195940987139.7097] + ) + + # TODO test for error handling when lambdas are negative + + # test for matrix_type force, highest level=yes + def test_vibrational_entropy_polymer_force(self): + matrix = np.array( + [ + [4.67476, -0.04069, -0.19714], + [-0.04069, 3.86300, -0.17922], + [-0.19714, -0.17922, 3.66307], + ] + ) + matrix_type = "force" + temp = 298 + highest_level = "yes" + + run_manager = RunManager("mock_folder") + ve = VibrationalEntropy( + run_manager, MagicMock(), MagicMock(), MagicMock(), MagicMock() + ) + + S_vib = ve.vibrational_entropy_calculation( + matrix, matrix_type, temp, highest_level + ) + + assert S_vib == pytest.approx(52.88123410327823) + + # test for matrix_type force, highest level=no + + # test for matrix_type torque + def test_vibrational_entropy_polymer_torque(self): + matrix = np.array( + [ + [6.69611, 0.39754, 0.57763], + [0.39754, 4.63265, 0.38648], + [0.57763, 0.38648, 6.34589], + ] + ) + matrix_type = "torque" + temp = 298 + highest_level = "yes" + + run_manager = RunManager("mock_folder") + ve = VibrationalEntropy( + run_manager, MagicMock(), MagicMock(), MagicMock(), MagicMock() + ) + + S_vib = ve.vibrational_entropy_calculation( + matrix, matrix_type, temp, highest_level + ) + + assert S_vib == pytest.approx(48.45003266069881) + + # TODO test for error handling on invalid inputs + + +class TestConformationalEntropy(unittest.TestCase): + """ + Unit tests for the functionality of conformational entropy calculations. + """ + + def setUp(self): + """ + Set up test environment. + """ + self.test_dir = tempfile.mkdtemp(prefix="CodeEntropy_") + self.code_entropy = main + + # Change to test directory + self._orig_dir = os.getcwd() + os.chdir(self.test_dir) + + def tearDown(self): + """ + Clean up after each test. + """ + os.chdir(self._orig_dir) + if os.path.exists(self.test_dir): + shutil.rmtree(self.test_dir) + + +class TestOrientationalEntropy(unittest.TestCase): + """ + Unit tests for the functionality of orientational entropy calculations. + """ + + def setUp(self): + """ + Set up test environment. + """ + self.test_dir = tempfile.mkdtemp(prefix="CodeEntropy_") + self.code_entropy = main + + # Change to test directory + self._orig_dir = os.getcwd() + os.chdir(self.test_dir) + + def tearDown(self): + """ + Clean up after each test. + """ + os.chdir(self._orig_dir) + if os.path.exists(self.test_dir): + shutil.rmtree(self.test_dir) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_CodeEntropy/test_logging_config.py b/tests/test_CodeEntropy/test_logging_config.py new file mode 100644 index 0000000..ac25ac9 --- /dev/null +++ b/tests/test_CodeEntropy/test_logging_config.py @@ -0,0 +1,79 @@ +import logging +import os +import tempfile +import unittest + +from CodeEntropy.config.logging_config import LoggingConfig + + +class TestLoggingConfig(unittest.TestCase): + + def setUp(self): + # Use a temporary directory for logs + self.temp_dir = tempfile.TemporaryDirectory() + self.log_dir = os.path.join(self.temp_dir.name, "logs") + self.logging_config = LoggingConfig(folder=self.temp_dir.name) + + def tearDown(self): + self.temp_dir.cleanup() + + def test_log_directory_created(self): + """Check if the log directory is created upon init""" + self.assertTrue(os.path.exists(self.log_dir)) + self.assertTrue(os.path.isdir(self.log_dir)) + + def test_setup_logging_returns_logger(self): + """Ensure setup_logging returns a logger instance""" + logger = self.logging_config.setup_logging() + self.assertIsInstance(logger, logging.Logger) + + def test_expected_log_files_created(self): + """Ensure log file paths are configured correctly in the logging config""" + self.logging_config.setup_logging() + + # Map actual output files to their corresponding handler keys + expected_handlers = { + "program.out": "stdout", + "program.log": "logfile", + "program.err": "errorfile", + "program.com": "commandfile", + "mdanalysis.log": "mdanalysis_log", + } + + for filename, handler_key in expected_handlers.items(): + expected_path = os.path.join(self.log_dir, filename) + actual_path = self.logging_config.LOGGING["handlers"][handler_key][ + "filename" + ] + self.assertEqual(actual_path, expected_path) + + def test_update_logging_level(self): + """Ensure logging levels are updated correctly""" + self.logging_config.setup_logging() + + # Update to DEBUG + self.logging_config.update_logging_level(logging.DEBUG) + root_logger = logging.getLogger() + self.assertEqual(root_logger.level, logging.DEBUG) + + # Check that at least one handler is DEBUG + handler_levels = [h.level for h in root_logger.handlers] + self.assertIn(logging.DEBUG, handler_levels) + + # Update to INFO + self.logging_config.update_logging_level(logging.INFO) + self.assertEqual(root_logger.level, logging.INFO) + + def test_mdanalysis_and_command_loggers_exist(self): + """Ensure specialized loggers are set up""" + self.logging_config.setup_logging() + mda_logger = logging.getLogger("MDAnalysis") + cmd_logger = logging.getLogger("commands") + self.assertEqual(mda_logger.level, logging.DEBUG) + self.assertEqual(cmd_logger.level, logging.INFO) + self.assertFalse(mda_logger.propagate) + self.assertFalse(cmd_logger.propagate) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_CodeEntropy/test_main.py b/tests/test_CodeEntropy/test_main.py new file mode 100644 index 0000000..42b9b79 --- /dev/null +++ b/tests/test_CodeEntropy/test_main.py @@ -0,0 +1,73 @@ +import unittest +from unittest.mock import MagicMock, patch + +from CodeEntropy.main import main + + +class TestMain(unittest.TestCase): + """ + Unit tests for the main functionality of CodeEntropy. + """ + + @patch("CodeEntropy.main.sys.exit") + @patch("CodeEntropy.main.RunManager") + def test_main_successful_run(self, mock_RunManager, mock_exit): + """ + Test that main runs successfully and does not call sys.exit. + """ + # Mock RunManager's methods to simulate successful execution + mock_run_manager_instance = MagicMock() + mock_RunManager.return_value = mock_run_manager_instance + + # Simulate that RunManager.create_job_folder returns a folder + mock_RunManager.create_job_folder.return_value = "dummy_folder" + + # Simulate the successful completion of the run_entropy_workflow method + mock_run_manager_instance.run_entropy_workflow.return_value = None + + # Run the main function + main() + + # Verify that sys.exit was not called + mock_exit.assert_not_called() + + # Verify that RunManager's methods were called correctly + mock_RunManager.create_job_folder.assert_called_once() + mock_run_manager_instance.run_entropy_workflow.assert_called_once() + + @patch("CodeEntropy.main.sys.exit") + @patch("CodeEntropy.main.RunManager") + @patch("CodeEntropy.main.logger") + def test_main_exception_triggers_exit( + self, mock_logger, mock_RunManager, mock_exit + ): + """ + Test that main logs a critical error and exits if RunManager + raises an exception. + """ + # Simulate an exception being raised in run_entropy_workflow + mock_run_manager_instance = MagicMock() + mock_RunManager.return_value = mock_run_manager_instance + + # Simulate that RunManager.create_job_folder returns a folder + mock_RunManager.create_job_folder.return_value = "dummy_folder" + + # Simulate an exception in the run_entropy_workflow method + mock_run_manager_instance.run_entropy_workflow.side_effect = Exception( + "Test exception" + ) + + # Run the main function and mock sys.exit to ensure it gets called + main() + + # Ensure sys.exit(1) was called due to the exception + mock_exit.assert_called_once_with(1) + + # Ensure that the logger logged the critical error with exception details + mock_logger.critical.assert_called_once_with( + "Fatal error during entropy calculation: Test exception", exc_info=True + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_CodeEntropy/test_run.py b/tests/test_CodeEntropy/test_run.py new file mode 100644 index 0000000..dd5178d --- /dev/null +++ b/tests/test_CodeEntropy/test_run.py @@ -0,0 +1,105 @@ +import os +import shutil +import tempfile +import unittest +from unittest.mock import patch + +from CodeEntropy.run import RunManager + + +class TestRunManager(unittest.TestCase): + """ + Unit tests for the RunManager class. These tests verify the + correct behavior of run manager. + """ + + def setUp(self): + """ + Set up a temporary directory as the working directory before each test. + """ + self.test_dir = tempfile.mkdtemp(prefix="CodeEntropy_") + self._orig_dir = os.getcwd() + os.chdir(self.test_dir) + + def tearDown(self): + """ + Clean up by removing the temporary directory and restoring the original working + directory. + """ + os.chdir(self._orig_dir) + shutil.rmtree(self.test_dir) + + @patch("os.makedirs") + @patch("os.listdir") + def test_create_job_folder_empty_directory(self, mock_listdir, mock_makedirs): + """ + Test that 'job001' is created when the directory is initially empty. + """ + mock_listdir.return_value = [] + new_folder_path = RunManager.create_job_folder() + expected_path = os.path.join(self.test_dir, "job001") + self.assertEqual(new_folder_path, expected_path) + mock_makedirs.assert_called_once_with(expected_path, exist_ok=True) + + @patch("os.makedirs") + @patch("os.listdir") + def test_create_job_folder_with_existing_folders(self, mock_listdir, mock_makedirs): + """ + Test that the next sequential job folder (e.g., 'job004') is created when + existing folders 'job001', 'job002', and 'job003' are present. + """ + mock_listdir.return_value = ["job001", "job002", "job003"] + new_folder_path = RunManager.create_job_folder() + expected_path = os.path.join(self.test_dir, "job004") + self.assertEqual(new_folder_path, expected_path) + mock_makedirs.assert_called_once_with(expected_path, exist_ok=True) + + @patch("os.makedirs") + @patch("os.listdir") + def test_create_job_folder_with_non_matching_folders( + self, mock_listdir, mock_makedirs + ): + """ + Test that 'job001' is created when the directory contains only non-job-related + folders. + """ + mock_listdir.return_value = ["folderA", "another_one"] + new_folder_path = RunManager.create_job_folder() + expected_path = os.path.join(self.test_dir, "job001") + self.assertEqual(new_folder_path, expected_path) + mock_makedirs.assert_called_once_with(expected_path, exist_ok=True) + + @patch("os.makedirs") + @patch("os.listdir") + def test_create_job_folder_mixed_folder_names(self, mock_listdir, mock_makedirs): + """ + Test that the correct next job folder (e.g., 'job003') is created when both + job and non-job folders exist in the directory. + """ + mock_listdir.return_value = ["job001", "abc", "job002", "random"] + new_folder_path = RunManager.create_job_folder() + expected_path = os.path.join(self.test_dir, "job003") + self.assertEqual(new_folder_path, expected_path) + mock_makedirs.assert_called_once_with(expected_path, exist_ok=True) + + @patch("os.makedirs") + @patch("os.listdir") + def test_create_job_folder_with_invalid_job_suffix( + self, mock_listdir, mock_makedirs + ): + """ + Test that invalid job folder names like 'jobABC' are ignored when determining + the next job number. + """ + # Simulate existing folders, one of which is invalid + mock_listdir.return_value = ["job001", "jobABC", "job002"] + + new_folder_path = RunManager.create_job_folder() + expected_path = os.path.join(self.test_dir, "job003") + + self.assertEqual(new_folder_path, expected_path) + mock_makedirs.assert_called_once_with(expected_path, exist_ok=True) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_EntropyFunctions/test_conformational_entropy.py b/tests/test_EntropyFunctions/test_conformational_entropy.py deleted file mode 100644 index 79b0cd9..0000000 --- a/tests/test_EntropyFunctions/test_conformational_entropy.py +++ /dev/null @@ -1,6 +0,0 @@ -# import numpy -# import pytest - -# from CodeEntropy import EntropyFunctions as EF - -# Test conformational_entropy diff --git a/tests/test_EntropyFunctions/test_frequency_calculation.py b/tests/test_EntropyFunctions/test_frequency_calculation.py deleted file mode 100644 index ed0a31d..0000000 --- a/tests/test_EntropyFunctions/test_frequency_calculation.py +++ /dev/null @@ -1,33 +0,0 @@ -import numpy -import pytest - -from CodeEntropy.calculations import EntropyFunctions as EF - -# Test frequency_calculation -# calculate vibrational frequencies from eigenvalues of covariance matrix -# Given lambda value(s) do you get the correct frequency - - -# test when lambda is zero -def test_frequency_calculation_0(): - lambdas = 0 - temp = 298 - - frequencies = EF.frequency_calculation(lambdas, temp) - - assert frequencies == 0 - - -# test when lambdas are positive -def test_frequency_calculation_pos(): - lambdas = numpy.array([585495.0917897299, 658074.5130064893, 782425.305888707]) - temp = 298 - - frequencies = EF.frequency_calculation(lambdas, temp) - - assert frequencies == pytest.approx( - [1899594266400.4016, 2013894687315.6213, 2195940987139.7097] - ) - - -# TODO test for error handling when lambdas are negative diff --git a/tests/test_EntropyFunctions/test_main_mcc.py b/tests/test_EntropyFunctions/test_main_mcc.py deleted file mode 100644 index c771685..0000000 --- a/tests/test_EntropyFunctions/test_main_mcc.py +++ /dev/null @@ -1,88 +0,0 @@ -import os -import shutil -import sys -import tempfile -import unittest -from unittest.mock import MagicMock, mock_open, patch - -from CodeEntropy.main_mcc import main - - -class TestMainMcc(unittest.TestCase): - """ - Unit tests for the main functionality of CodeEntropy. - """ - - def setUp(self): - """ - Set up test environment. - """ - self.test_dir = tempfile.mkdtemp(prefix="CodeEntropy_") - self.config_file = os.path.join(self.test_dir, "config.yaml") - self.code_entropy = main - - # Create a mock config file - with patch("builtins.open", new_callable=mock_open) as mock_file: - self.setup_file(mock_file) - with open(self.config_file, "w") as f: - f.write(mock_file.return_value.read()) - - # Change to test directory - self._orig_dir = os.getcwd() - os.chdir(self.test_dir) - - def tearDown(self): - """ - Clean up after each test. - """ - os.chdir(self._orig_dir) - if os.path.exists(self.test_dir): - shutil.rmtree(self.test_dir) - - def setup_file(self, mock_file): - """ - Mock the contents of a configuration file. - """ - mock_file.return_value = mock_open( - read_data="--- \n \nrun1:\n " - "top_traj_file: ['/path/to/tpr', '/path/to/trr']\n " - "selection_string: 'all'\n " - "start: 0\n " - "end: -1\n " - "step: 1\n " - "bin_width: 30\n " - "tempra: 298.0\n " - "verbose: False\n " - "thread: 1\n " - "output_file: 'output_file.json'\n " - "force_partitioning: 0.5\n " - "water_entropy: False" - ).return_value - - def test_CodeEntropy_imported(self): - """Sample test, will always pass so long as import statement worked.""" - assert "CodeEntropy" in sys.modules - - @patch( - "builtins.open", - new_callable=mock_open, - read_data=b"--- \n top_traj_file: ['/path/to/tpr', '/path/to/trr'] \n", - ) - @patch("os.path.exists", return_value=True) - @patch("MDAnalysis.Universe") - @patch("gettext.translation", return_value=MagicMock()) - def test_run(self, mock_translation, mock_universe, mock_exists, mock_file): - """ - Test the execution of the main function with the necessary CLI argument. - """ - with patch( - "sys.argv", - ["CodeEntropy", "--top_traj_file", "/path/to/tpr", "/path/to/trr"], - ): - self.setup_file(mock_file) - mock_universe.return_value.trajectory = MagicMock() - main() - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_EntropyFunctions/test_orientational_entropy.py b/tests/test_EntropyFunctions/test_orientational_entropy.py deleted file mode 100644 index fc0268f..0000000 --- a/tests/test_EntropyFunctions/test_orientational_entropy.py +++ /dev/null @@ -1,6 +0,0 @@ -# import numpy -# import pytest - -# from CodeEntropy import EntropyFunctions as EF - -# Test orientational_entropy diff --git a/tests/test_EntropyFunctions/test_vibrational_entropy.py b/tests/test_EntropyFunctions/test_vibrational_entropy.py deleted file mode 100644 index d29d432..0000000 --- a/tests/test_EntropyFunctions/test_vibrational_entropy.py +++ /dev/null @@ -1,49 +0,0 @@ -import numpy -import pytest - -from CodeEntropy.calculations import EntropyFunctions as EF - -# Test vibrational_entropy -# Given a matrix does the code calculate the correct entropy value - - -# test for matrix_type force, highest level=yes -def test_vibrational_entropy_polymer_force(): - matrix = numpy.array( - [ - [4.67476, -0.04069, -0.19714], - [-0.04069, 3.86300, -0.17922], - [-0.19714, -0.17922, 3.66307], - ] - ) - matrix_type = "force" - temp = 298 - highest_level = "yes" - - S_vib = EF.vibrational_entropy(matrix, matrix_type, temp, highest_level) - - assert S_vib == pytest.approx(52.88123410327823) - - -# test for matrix_type force, highest level=no - - -# test for matrix_type torque -def test_vibrational_entropy_polymer_torque(): - matrix = numpy.array( - [ - [6.69611, 0.39754, 0.57763], - [0.39754, 4.63265, 0.38648], - [0.57763, 0.38648, 6.34589], - ] - ) - matrix_type = "torque" - temp = 298 - highest_level = "yes" - - S_vib = EF.vibrational_entropy(matrix, matrix_type, temp, highest_level) - - assert S_vib == pytest.approx(48.45003266069881) - - -# TODO test for error handling on invalid inputs