From de2ff174db2fe7c871b5e1921095c882c39c3816 Mon Sep 17 00:00:00 2001 From: Lester Hedges Date: Wed, 19 Nov 2025 12:32:29 +0000 Subject: [PATCH 1/6] Add ReplicaSystem implementation for replica exchange simulations. --- .../Exscientia/_SireWrappers/__init__.py | 2 + .../_SireWrappers/_replica_system.py | 606 ++++++++++++++++++ python/BioSimSpace/_SireWrappers/__init__.py | 2 + .../_SireWrappers/_replica_system.py | 606 ++++++++++++++++++ tests/IO/test_perturbable.py | 11 - .../_SireWrappers/test_replica_system.py | 146 +++++ tests/_SireWrappers/test_replica_system.py | 126 ++++ tests/conftest.py | 2 +- 8 files changed, 1489 insertions(+), 12 deletions(-) create mode 100644 python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py create mode 100644 python/BioSimSpace/_SireWrappers/_replica_system.py create mode 100644 tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py create mode 100644 tests/_SireWrappers/test_replica_system.py diff --git a/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/__init__.py b/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/__init__.py index 0bc37219..6a9f7669 100644 --- a/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/__init__.py +++ b/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/__init__.py @@ -34,6 +34,7 @@ Molecules Residue System + ReplicaSystem SearchResult """ @@ -49,3 +50,4 @@ from ._residue import * from ._search_result import * from ._system import * +from ._replica_system import * diff --git a/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py b/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py new file mode 100644 index 00000000..ac8b7610 --- /dev/null +++ b/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py @@ -0,0 +1,606 @@ +###################################################################### +# BioSimSpace: Making biomolecular simulation a breeze! +# +# Copyright: 2017-2025 +# +# Authors: Lester Hedges +# +# BioSimSpace is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# BioSimSpace is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with BioSimSpace. If not, see . +##################################################################### + +""" +A thin wrapper around Sire.System.System and sire.mol._trjajectory.TrajectoryIterator. +This is an internal package and should not be directly exposed to the user. +""" + +__author__ = "Lester Hedges" +__email__ = "lester.hedges@gmail.com" + +__all__ = ["ReplicaSystem"] + + +class ReplicaSystem: + """ + A container class for storing molecular systems for replica dynamics simulations. + Here a system has a single topology, but multiple coordinate sets (replicas). + """ + + def __init__(self, system, trajectory=None, num_replicas=None): + """ + Constructor. + + Parameters + ---------- + + system : sire.system.System, sire.legacy.System.System, :class:`System ` + A Sire or BioSimSpace System object. + + trajectory : str, optional + The path to a trajectory file containing multiple coordinate sets. + + num_replicas : int, optional + The number of replicas (coordinate sets) to create. If provided, the + coordinate set in `system` will be duplicated this many times. This is + only used if the system does not already contain multiple frames or a + trajectory is not provided. + """ + + from sire.mol._trajectory import TrajectoryIterator as _TrajectoryIterator + from sire.system import System as _NewSireSystem + from sire.legacy.System import System as _SireSystem + from ._system import System as _System + + # Check that the system is valid. + if isinstance(system, _NewSireSystem): + self._new_sire_object = system + self._sire_object = system._system + elif isinstance(system, _SireSystem): + self._new_sire_object = _NewSireSystem(system) + self._sire_object = system + elif isinstance(system, _System): + self._new_sire_object = _NewSireSystem(system._sire_object) + self._sire_object = system._sire_object + else: + raise TypeError( + "'system' must be of type 'sire.system.System', " + "'sire.legacy.System.System' or 'BioSimSpace._SireWrappers.System'" + ) + + # Check if the system is perturbable. + try: + perturbable = self._new_sire_object["perturbable"] + self._is_perturbable = True + + from sire.morph import link_to_reference as _link_to_reference + + self._new_sire_object = _link_to_reference(self._new_sire_object) + except: + self._is_perturbable = False + + # If a trajectory is provided, make sure the file exists. + if trajectory is not None: + if self._new_sire_object.num_frames() > 1: + raise ValueError( + "Cannot provide a trajectory when 'system' already contains multiple frames." + ) + + is_traj_iterator = False + if isinstance(trajectory, _TrajectoryIterator): + self._trajectory = trajectory + is_traj_iterator = True + + if not is_traj_iterator: + import os as _os + from tempfile import NamedTemporaryFile as _NamedTemporaryFile + from .. import _isVerbose + + if not _os.path.isfile(trajectory): + raise IOError(f"'trajectory' does not exist: {trajectory}") + + # Get the extension of the file so that we can guess the format. + _, ext = _os.path.splitext(trajectory) + + # Use a GROMACS topology for XTC files. + if self._is_perturbable or ext == ".xtc": + tmp_top = _NamedTemporaryFile(delete=False, suffix=".top") + # For now, use AMBER PRM7 for all other formats. + else: + tmp_top = _NamedTemporaryFile(delete=False, suffix=".prm7") + + # Try to write out a temporary topology file. + try: + from sire import save as _save + + _save(self._new_sire_object, tmp_top.name, silent=True) + except Exception as e: + tmp_top.close() + _os.unlink(tmp_top.name) + + msg = "Failed to write temporary topology file for trajectory loading." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + # Now try to load the trajectory. + try: + from sire import load as _load + + sire_system = _load(tmp_top.name, trajectory, silent=True) + except Exception as e: + msg = "Failed to load trajectory file: %s" % trajectory + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + # Update the internal trajectory. + self._trajectory = sire_system.trajectory() + + # Clean up the temporary topology file. + tmp_top.close() + _os.unlink(tmp_top.name) + + # Store the trajectory if there are multiple frames in the system. + elif self._new_sire_object.num_frames() > 1: + self._trajectory = self._new_sire_object.trajectory() + # Otherwise, duplicate the system if num_replicas is provided. + elif num_replicas is not None: + if not isinstance(num_replicas, int) or num_replicas < 1: + raise ValueError("'num_replicas' must be a positive integer.") + + import os as _os + from tempfile import TemporaryDirectory as _TemporaryDirectory + from sire import load as _load + from sire import save as _save + from .. import _isVerbose + + with _TemporaryDirectory() as tmp_dir: + filenames = [] + # Write out the current coordinates the specified number of times. + try: + # Write the first file only. + tmp_file = _os.path.join(tmp_dir, f"replica_0000.gro") + _save(self._new_sire_object, tmp_file, silent=True) + filenames.append(tmp_file) + + # Copy the first file for the remaining replicas. + for i in range(1, num_replicas): + tmp_file = _os.path.join(tmp_dir, f"replica_{i:04d}.gro") + _os.link(_os.path.join(tmp_dir, f"replica_0000.gro"), tmp_file) + filenames.append(tmp_file) + except Exception as e: + msg = "Failed to write temporary files for replica duplication." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + if self._is_perturbable: + top_ext = "top" + else: + top_ext = "prm7" + + # Write out the topology once. + try: + tmp_top = _os.path.join(tmp_dir, f"topology.{top_ext}") + filenames.append(tmp_top) + _save(self._new_sire_object, tmp_top, silent=True) + except Exception as e: + msg = "Failed to write temporary topology file for replica duplication." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + # Now load them all back in as a single system with multiple frames. + try: + sire_system = _load(filenames, silent=True) + except Exception as e: + msg = "Failed to load temporary files for replica duplication." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + # Update the internal trajectory. + self._trajectory = sire_system.trajectory() + + else: + raise ValueError( + "Either 'trajectory' or 'num_replicas' must be provided " + "if 'system' does not already contain multiple frames." + ) + + # Whether the system is linked to the perturbed state. + self._is_perturbed = False + + def __str__(self): + """Return a human readable string representation of the object.""" + return "" % self.nReplicas() + + def __repr__(self): + """Return a string showing how to instantiate the object.""" + return "" % self.nReplicas() + + def __getitem__(self, index): + """ + Get a specific replica (coordinate set) from the system. + + Parameters + ---------- + + index : int + The index of the replica to retrieve (0-based). + + Returns + ------- + + :class:`System ` + The requested replica as a BioSimSpace System object. + """ + + return self.getReplica(index, is_lambda1=self._is_perturbed) + + def nReplicas(self): + """ + Get the number of replicas (coordinate sets) in the system. + + Returns + ------- + + int + The number of replicas. + """ + + return self._trajectory.num_frames() + + def save(self, filename, traj_format="dcd", save_velocities=False): + """ + Save the replica system to a stream and trajectory file. + + Parameters + ---------- + + filename : str + The base filename. + + traj_format : str, optional + The format to save the trajectory in. Default is 'dcd'. + Options are "dcd" or "xtc". + + save_velocities : bool, optional + Whether to save velocities along with coordinates. Default is False. + + Returns + ------- + + stream : str + The path to the saved stream file. + + trajectory : str + The path to the saved trajectory file. + """ + + from sire import save as _save + from sire.stream import save as _save_stream + from .. import _isVerbose + + if not isinstance(filename, str): + raise TypeError("'filename' must be of type 'str'.") + + # Validate the trajectory format. + valid_formats = ["dcd", "xtc"] + + if not isinstance(traj_format, str): + raise TypeError("'traj_format' must be of type 'str'.") + + traj_format = traj_format.lower().replace(" ", "") + if traj_format not in valid_formats: + raise ValueError( + f"'traj_format' must be one of: {', '.join(valid_formats)}" + ) + + if not isinstance(save_velocities, bool): + raise TypeError("'save_velocities' must be of type 'bool'.") + + # Save the trajectory first. + try: + traj_filename = f"{filename}.{traj_format}" + _save( + self._trajectory, + traj_filename, + save_velocities=save_velocities, + silent=True, + ) + except Exception as e: + msg = "Failed to save trajectory file." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + # Now clone the new Sire system so that we can remove the trajectory. + system = self._new_sire_object.clone() + system.delete_all_frames() + + # Stream the system to file. + try: + stream_filename = f"{filename}.bss" + _save_stream(system, stream_filename) + except Exception as e: + msg = "Failed to save stream file." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + return stream_filename, traj_filename + + @staticmethod + def load(stream, trajectory): + """ + Load a replica system from a stream and trajectory file. + + Parameters + ---------- + + stream : str + The path to the stream file. + + trajectory : str + The path to the trajectory file. + + Returns + ------- + + :class:`ReplicaSystem ` + The loaded replica system. + """ + + from sire.stream import load as _load + from .. import _isVerbose + + if not isinstance(stream, str): + raise TypeError("'stream' must be of type 'str'.") + if not isinstance(trajectory, str): + raise TypeError("'trajectory' must be of type 'str'.") + + # Try to load the system. + try: + sire_system = _load(stream) + except Exception as e: + msg = "Failed to load replica system from stream and trajectory." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + return ReplicaSystem(sire_system, trajectory=trajectory) + + def getReplica(self, index, is_lambda1=False, property_map={}): + """ + Get a specific replica (coordinate set) from the system. + + Parameters + ---------- + + index : int + The index of the replica to retrieve (0-based). + + is_lambda1 : bool, optional + Whether to return the system at lambda = 1. Default is False. + + property_map : dict, optional + A dictionary that maps system "properties" to their user defined + values. This allows the user to refer to properties with their + own naming scheme, e.g. { "charge" : "my-charge" } + + Returns + ------- + + :class:`System ` + The requested replica as a BioSimSpace System object. + """ + + from ._system import System as _System + from sire.morph import link_to_perturbed as _link_to_perturbed + + if not isinstance(index, int): + raise TypeError("'index' must be an integer.") + + if index < 0: + index += self.nReplicas() + + if index < 0 or index >= self.nReplicas(): + raise IndexError( + f"'index' {index} is out of range [0, {self.nReplicas()-1}]." + ) + + # Clone the new Sire system to avoid modifying the original. + replica_system = self._new_sire_object.clone() + + # Link to the perturbed state, if requested. + if is_lambda1: + try: + replica_system = _link_to_perturbed(replica_system) + except: + pass + + if not isinstance(property_map, dict): + raise TypeError("'property_map' must be of type 'dict'.") + + # Get the specific frame. + frame = self._trajectory[index].current() + + from sire.io import get_coords_array as _get_coords_array + from sire.legacy.IO import setCoordinates as _setCoordinates + + # Copy the frame coordinates into the system. + frame = _setCoordinates( + replica_system._system, + _get_coords_array(frame).tolist(), + is_lambda1=is_lambda1, + map=property_map, + ) + + return _System(frame) + + def saveReplicas(self, filenames, save_velocities=False, is_lambda1=False): + """ + Save each replica (coordinate set) to individual files. + + Parameters + ---------- + + filenames : list of str + A list of filenames to save each replica to. The extension will + determine the format, which must be the same for all replicas. + + save_velocities : bool, optional + Whether to save velocities along with coordinates. Default is False. + """ + + from sire import save as _save + from .. import _isVerbose + + if not isinstance(filenames, list): + raise TypeError("'filenames' must be a list of strings.") + if not all(isinstance(f, str) for f in filenames): + raise TypeError("All elements in 'filenames' must be of type 'str'.") + + if len(filenames) != self.nReplicas(): + raise ValueError("'filenames' length must match the number of replicas.") + + if not isinstance(save_velocities, bool): + raise TypeError("'save_velocities' must be of type 'bool'.") + + try: + _save( + self._trajectory, + filenames, + save_velocities=save_velocities, + silent=True, + ) + except Exception as e: + msg = "Failed to save replicas to individual files." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + @staticmethod + def loadReplicas(replica_system, filenames): + """ + Load multiple replicas (coordinate sets) from individual files. + + Parameters + ---------- + + replica_system : :class:`ReplicaSystem ` + A ReplicaSystem object to use as a template for loading the replicas. + + filenames : list of str + A list of filenames to load each replica from. The extension will + determine the format, which must be the same for all replicas. + + Returns + ------- + + :class:`ReplicaSystem ` + The loaded replica system. + """ + + import os as _os + from tempfile import NamedTemporaryFile as _NamedTemporaryFile + from sire import load as _load + from sire import save as _save + from .. import _isVerbose + + if not isinstance(replica_system, ReplicaSystem): + raise TypeError("'replica_system' must be of type 'ReplicaSystem'.") + + if not isinstance(filenames, list): + raise TypeError("'filenames' must be a list of strings.") + if not all(isinstance(f, str) for f in filenames): + raise TypeError("All elements in 'filenames' must be of type 'str'.") + + if len(filenames) == 0: + raise ValueError("'filenames' must contain at least one filename.") + + # Get the extension of the first file to determine format. + _, ext = _os.path.splitext(filenames[0]) + + # If this is a GRO file, then write a GROMACS topology. + if replica_system._is_perturbable or ext == ".gro": + top_suffix = ".top" + # Otherwise, use AMBER PRM7. + else: + top_suffix = ".prm7" + + # Write a temporary topology file using the first file. + try: + tmp_top = _NamedTemporaryFile(delete=False, suffix=top_suffix) + _save(replica_system._new_sire_object, tmp_top.name, silent=True) + except Exception as e: + msg = "Failed to write temporary topology file for replica loading." + tmp_top.close() + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + # Prepend the temporary topology file to the list of filenames. + filenames = [tmp_top.name] + filenames + + # Now try to load all the replicas. + try: + sire_system = _load(filenames, silent=True) + except Exception as e: + msg = "Failed to load replicas from individual files." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + # Clean up the temporary topology file. + tmp_top.close() + _os.unlink(tmp_top.name) + + return ReplicaSystem( + replica_system._new_sire_object, trajectory=sire_system.trajectory() + ) + + def setPerturbed(self): + """ + Set the system to the perturbed (lambda = 1) state. + """ + self._is_perturbed = True + + def setReference(self): + """ + Set the system to the reference (lambda = 0) state. + """ + self._is_perturbed = False diff --git a/python/BioSimSpace/_SireWrappers/__init__.py b/python/BioSimSpace/_SireWrappers/__init__.py index 0bc37219..6a9f7669 100644 --- a/python/BioSimSpace/_SireWrappers/__init__.py +++ b/python/BioSimSpace/_SireWrappers/__init__.py @@ -34,6 +34,7 @@ Molecules Residue System + ReplicaSystem SearchResult """ @@ -49,3 +50,4 @@ from ._residue import * from ._search_result import * from ._system import * +from ._replica_system import * diff --git a/python/BioSimSpace/_SireWrappers/_replica_system.py b/python/BioSimSpace/_SireWrappers/_replica_system.py new file mode 100644 index 00000000..ac8b7610 --- /dev/null +++ b/python/BioSimSpace/_SireWrappers/_replica_system.py @@ -0,0 +1,606 @@ +###################################################################### +# BioSimSpace: Making biomolecular simulation a breeze! +# +# Copyright: 2017-2025 +# +# Authors: Lester Hedges +# +# BioSimSpace is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# BioSimSpace is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with BioSimSpace. If not, see . +##################################################################### + +""" +A thin wrapper around Sire.System.System and sire.mol._trjajectory.TrajectoryIterator. +This is an internal package and should not be directly exposed to the user. +""" + +__author__ = "Lester Hedges" +__email__ = "lester.hedges@gmail.com" + +__all__ = ["ReplicaSystem"] + + +class ReplicaSystem: + """ + A container class for storing molecular systems for replica dynamics simulations. + Here a system has a single topology, but multiple coordinate sets (replicas). + """ + + def __init__(self, system, trajectory=None, num_replicas=None): + """ + Constructor. + + Parameters + ---------- + + system : sire.system.System, sire.legacy.System.System, :class:`System ` + A Sire or BioSimSpace System object. + + trajectory : str, optional + The path to a trajectory file containing multiple coordinate sets. + + num_replicas : int, optional + The number of replicas (coordinate sets) to create. If provided, the + coordinate set in `system` will be duplicated this many times. This is + only used if the system does not already contain multiple frames or a + trajectory is not provided. + """ + + from sire.mol._trajectory import TrajectoryIterator as _TrajectoryIterator + from sire.system import System as _NewSireSystem + from sire.legacy.System import System as _SireSystem + from ._system import System as _System + + # Check that the system is valid. + if isinstance(system, _NewSireSystem): + self._new_sire_object = system + self._sire_object = system._system + elif isinstance(system, _SireSystem): + self._new_sire_object = _NewSireSystem(system) + self._sire_object = system + elif isinstance(system, _System): + self._new_sire_object = _NewSireSystem(system._sire_object) + self._sire_object = system._sire_object + else: + raise TypeError( + "'system' must be of type 'sire.system.System', " + "'sire.legacy.System.System' or 'BioSimSpace._SireWrappers.System'" + ) + + # Check if the system is perturbable. + try: + perturbable = self._new_sire_object["perturbable"] + self._is_perturbable = True + + from sire.morph import link_to_reference as _link_to_reference + + self._new_sire_object = _link_to_reference(self._new_sire_object) + except: + self._is_perturbable = False + + # If a trajectory is provided, make sure the file exists. + if trajectory is not None: + if self._new_sire_object.num_frames() > 1: + raise ValueError( + "Cannot provide a trajectory when 'system' already contains multiple frames." + ) + + is_traj_iterator = False + if isinstance(trajectory, _TrajectoryIterator): + self._trajectory = trajectory + is_traj_iterator = True + + if not is_traj_iterator: + import os as _os + from tempfile import NamedTemporaryFile as _NamedTemporaryFile + from .. import _isVerbose + + if not _os.path.isfile(trajectory): + raise IOError(f"'trajectory' does not exist: {trajectory}") + + # Get the extension of the file so that we can guess the format. + _, ext = _os.path.splitext(trajectory) + + # Use a GROMACS topology for XTC files. + if self._is_perturbable or ext == ".xtc": + tmp_top = _NamedTemporaryFile(delete=False, suffix=".top") + # For now, use AMBER PRM7 for all other formats. + else: + tmp_top = _NamedTemporaryFile(delete=False, suffix=".prm7") + + # Try to write out a temporary topology file. + try: + from sire import save as _save + + _save(self._new_sire_object, tmp_top.name, silent=True) + except Exception as e: + tmp_top.close() + _os.unlink(tmp_top.name) + + msg = "Failed to write temporary topology file for trajectory loading." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + # Now try to load the trajectory. + try: + from sire import load as _load + + sire_system = _load(tmp_top.name, trajectory, silent=True) + except Exception as e: + msg = "Failed to load trajectory file: %s" % trajectory + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + # Update the internal trajectory. + self._trajectory = sire_system.trajectory() + + # Clean up the temporary topology file. + tmp_top.close() + _os.unlink(tmp_top.name) + + # Store the trajectory if there are multiple frames in the system. + elif self._new_sire_object.num_frames() > 1: + self._trajectory = self._new_sire_object.trajectory() + # Otherwise, duplicate the system if num_replicas is provided. + elif num_replicas is not None: + if not isinstance(num_replicas, int) or num_replicas < 1: + raise ValueError("'num_replicas' must be a positive integer.") + + import os as _os + from tempfile import TemporaryDirectory as _TemporaryDirectory + from sire import load as _load + from sire import save as _save + from .. import _isVerbose + + with _TemporaryDirectory() as tmp_dir: + filenames = [] + # Write out the current coordinates the specified number of times. + try: + # Write the first file only. + tmp_file = _os.path.join(tmp_dir, f"replica_0000.gro") + _save(self._new_sire_object, tmp_file, silent=True) + filenames.append(tmp_file) + + # Copy the first file for the remaining replicas. + for i in range(1, num_replicas): + tmp_file = _os.path.join(tmp_dir, f"replica_{i:04d}.gro") + _os.link(_os.path.join(tmp_dir, f"replica_0000.gro"), tmp_file) + filenames.append(tmp_file) + except Exception as e: + msg = "Failed to write temporary files for replica duplication." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + if self._is_perturbable: + top_ext = "top" + else: + top_ext = "prm7" + + # Write out the topology once. + try: + tmp_top = _os.path.join(tmp_dir, f"topology.{top_ext}") + filenames.append(tmp_top) + _save(self._new_sire_object, tmp_top, silent=True) + except Exception as e: + msg = "Failed to write temporary topology file for replica duplication." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + # Now load them all back in as a single system with multiple frames. + try: + sire_system = _load(filenames, silent=True) + except Exception as e: + msg = "Failed to load temporary files for replica duplication." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + # Update the internal trajectory. + self._trajectory = sire_system.trajectory() + + else: + raise ValueError( + "Either 'trajectory' or 'num_replicas' must be provided " + "if 'system' does not already contain multiple frames." + ) + + # Whether the system is linked to the perturbed state. + self._is_perturbed = False + + def __str__(self): + """Return a human readable string representation of the object.""" + return "" % self.nReplicas() + + def __repr__(self): + """Return a string showing how to instantiate the object.""" + return "" % self.nReplicas() + + def __getitem__(self, index): + """ + Get a specific replica (coordinate set) from the system. + + Parameters + ---------- + + index : int + The index of the replica to retrieve (0-based). + + Returns + ------- + + :class:`System ` + The requested replica as a BioSimSpace System object. + """ + + return self.getReplica(index, is_lambda1=self._is_perturbed) + + def nReplicas(self): + """ + Get the number of replicas (coordinate sets) in the system. + + Returns + ------- + + int + The number of replicas. + """ + + return self._trajectory.num_frames() + + def save(self, filename, traj_format="dcd", save_velocities=False): + """ + Save the replica system to a stream and trajectory file. + + Parameters + ---------- + + filename : str + The base filename. + + traj_format : str, optional + The format to save the trajectory in. Default is 'dcd'. + Options are "dcd" or "xtc". + + save_velocities : bool, optional + Whether to save velocities along with coordinates. Default is False. + + Returns + ------- + + stream : str + The path to the saved stream file. + + trajectory : str + The path to the saved trajectory file. + """ + + from sire import save as _save + from sire.stream import save as _save_stream + from .. import _isVerbose + + if not isinstance(filename, str): + raise TypeError("'filename' must be of type 'str'.") + + # Validate the trajectory format. + valid_formats = ["dcd", "xtc"] + + if not isinstance(traj_format, str): + raise TypeError("'traj_format' must be of type 'str'.") + + traj_format = traj_format.lower().replace(" ", "") + if traj_format not in valid_formats: + raise ValueError( + f"'traj_format' must be one of: {', '.join(valid_formats)}" + ) + + if not isinstance(save_velocities, bool): + raise TypeError("'save_velocities' must be of type 'bool'.") + + # Save the trajectory first. + try: + traj_filename = f"{filename}.{traj_format}" + _save( + self._trajectory, + traj_filename, + save_velocities=save_velocities, + silent=True, + ) + except Exception as e: + msg = "Failed to save trajectory file." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + # Now clone the new Sire system so that we can remove the trajectory. + system = self._new_sire_object.clone() + system.delete_all_frames() + + # Stream the system to file. + try: + stream_filename = f"{filename}.bss" + _save_stream(system, stream_filename) + except Exception as e: + msg = "Failed to save stream file." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + return stream_filename, traj_filename + + @staticmethod + def load(stream, trajectory): + """ + Load a replica system from a stream and trajectory file. + + Parameters + ---------- + + stream : str + The path to the stream file. + + trajectory : str + The path to the trajectory file. + + Returns + ------- + + :class:`ReplicaSystem ` + The loaded replica system. + """ + + from sire.stream import load as _load + from .. import _isVerbose + + if not isinstance(stream, str): + raise TypeError("'stream' must be of type 'str'.") + if not isinstance(trajectory, str): + raise TypeError("'trajectory' must be of type 'str'.") + + # Try to load the system. + try: + sire_system = _load(stream) + except Exception as e: + msg = "Failed to load replica system from stream and trajectory." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + return ReplicaSystem(sire_system, trajectory=trajectory) + + def getReplica(self, index, is_lambda1=False, property_map={}): + """ + Get a specific replica (coordinate set) from the system. + + Parameters + ---------- + + index : int + The index of the replica to retrieve (0-based). + + is_lambda1 : bool, optional + Whether to return the system at lambda = 1. Default is False. + + property_map : dict, optional + A dictionary that maps system "properties" to their user defined + values. This allows the user to refer to properties with their + own naming scheme, e.g. { "charge" : "my-charge" } + + Returns + ------- + + :class:`System ` + The requested replica as a BioSimSpace System object. + """ + + from ._system import System as _System + from sire.morph import link_to_perturbed as _link_to_perturbed + + if not isinstance(index, int): + raise TypeError("'index' must be an integer.") + + if index < 0: + index += self.nReplicas() + + if index < 0 or index >= self.nReplicas(): + raise IndexError( + f"'index' {index} is out of range [0, {self.nReplicas()-1}]." + ) + + # Clone the new Sire system to avoid modifying the original. + replica_system = self._new_sire_object.clone() + + # Link to the perturbed state, if requested. + if is_lambda1: + try: + replica_system = _link_to_perturbed(replica_system) + except: + pass + + if not isinstance(property_map, dict): + raise TypeError("'property_map' must be of type 'dict'.") + + # Get the specific frame. + frame = self._trajectory[index].current() + + from sire.io import get_coords_array as _get_coords_array + from sire.legacy.IO import setCoordinates as _setCoordinates + + # Copy the frame coordinates into the system. + frame = _setCoordinates( + replica_system._system, + _get_coords_array(frame).tolist(), + is_lambda1=is_lambda1, + map=property_map, + ) + + return _System(frame) + + def saveReplicas(self, filenames, save_velocities=False, is_lambda1=False): + """ + Save each replica (coordinate set) to individual files. + + Parameters + ---------- + + filenames : list of str + A list of filenames to save each replica to. The extension will + determine the format, which must be the same for all replicas. + + save_velocities : bool, optional + Whether to save velocities along with coordinates. Default is False. + """ + + from sire import save as _save + from .. import _isVerbose + + if not isinstance(filenames, list): + raise TypeError("'filenames' must be a list of strings.") + if not all(isinstance(f, str) for f in filenames): + raise TypeError("All elements in 'filenames' must be of type 'str'.") + + if len(filenames) != self.nReplicas(): + raise ValueError("'filenames' length must match the number of replicas.") + + if not isinstance(save_velocities, bool): + raise TypeError("'save_velocities' must be of type 'bool'.") + + try: + _save( + self._trajectory, + filenames, + save_velocities=save_velocities, + silent=True, + ) + except Exception as e: + msg = "Failed to save replicas to individual files." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + @staticmethod + def loadReplicas(replica_system, filenames): + """ + Load multiple replicas (coordinate sets) from individual files. + + Parameters + ---------- + + replica_system : :class:`ReplicaSystem ` + A ReplicaSystem object to use as a template for loading the replicas. + + filenames : list of str + A list of filenames to load each replica from. The extension will + determine the format, which must be the same for all replicas. + + Returns + ------- + + :class:`ReplicaSystem ` + The loaded replica system. + """ + + import os as _os + from tempfile import NamedTemporaryFile as _NamedTemporaryFile + from sire import load as _load + from sire import save as _save + from .. import _isVerbose + + if not isinstance(replica_system, ReplicaSystem): + raise TypeError("'replica_system' must be of type 'ReplicaSystem'.") + + if not isinstance(filenames, list): + raise TypeError("'filenames' must be a list of strings.") + if not all(isinstance(f, str) for f in filenames): + raise TypeError("All elements in 'filenames' must be of type 'str'.") + + if len(filenames) == 0: + raise ValueError("'filenames' must contain at least one filename.") + + # Get the extension of the first file to determine format. + _, ext = _os.path.splitext(filenames[0]) + + # If this is a GRO file, then write a GROMACS topology. + if replica_system._is_perturbable or ext == ".gro": + top_suffix = ".top" + # Otherwise, use AMBER PRM7. + else: + top_suffix = ".prm7" + + # Write a temporary topology file using the first file. + try: + tmp_top = _NamedTemporaryFile(delete=False, suffix=top_suffix) + _save(replica_system._new_sire_object, tmp_top.name, silent=True) + except Exception as e: + msg = "Failed to write temporary topology file for replica loading." + tmp_top.close() + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + # Prepend the temporary topology file to the list of filenames. + filenames = [tmp_top.name] + filenames + + # Now try to load all the replicas. + try: + sire_system = _load(filenames, silent=True) + except Exception as e: + msg = "Failed to load replicas from individual files." + + if _isVerbose(): + raise IOError(msg) from e + else: + raise IOError(msg) + + # Clean up the temporary topology file. + tmp_top.close() + _os.unlink(tmp_top.name) + + return ReplicaSystem( + replica_system._new_sire_object, trajectory=sire_system.trajectory() + ) + + def setPerturbed(self): + """ + Set the system to the perturbed (lambda = 1) state. + """ + self._is_perturbed = True + + def setReference(self): + """ + Set the system to the reference (lambda = 0) state. + """ + self._is_perturbed = False diff --git a/tests/IO/test_perturbable.py b/tests/IO/test_perturbable.py index d33646a8..d6bbaf2f 100644 --- a/tests/IO/test_perturbable.py +++ b/tests/IO/test_perturbable.py @@ -5,17 +5,6 @@ from tests.conftest import url -@pytest.fixture(scope="module") -def perturbable_molecule(): - """Re-use the same perturbable system for each test.""" - return BSS.IO.readPerturbableSystem( - f"{url}/perturbable_system0.prm7", - f"{url}/perturbable_system0.rst7", - f"{url}/perturbable_system1.prm7", - f"{url}/perturbable_system1.rst7", - ).getMolecules()[0] - - def test_connectivity(perturbable_molecule): """ Make sure there is a single connectivity property when the end states diff --git a/tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py b/tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py new file mode 100644 index 00000000..02e37416 --- /dev/null +++ b/tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py @@ -0,0 +1,146 @@ +import os +import pytest +import tempfile + +import BioSimSpace as BSS +from BioSimSpace._SireWrappers import System, ReplicaSystem + +from tests.Sandpit.Exscientia.conftest import url + + +@pytest.fixture(scope="module") +def system(): + """Solvated alanine dipeptide system.""" + return BSS.IO.readMolecules(["tests/input/ala.top", "tests/input/ala.crd"]) + + +@pytest.fixture(scope="module") +def perturbable_system(): + """A vacuum perturbable system.""" + return BSS.IO.readPerturbableSystem( + f"{url}/perturbable_system0.prm7", + f"{url}/perturbable_system0.rst7", + f"{url}/perturbable_system1.prm7", + f"{url}/perturbable_system1.rst7", + ) + + +@pytest.fixture(scope="module") +def replica_system(system): + """A replica system for testing.""" + return ReplicaSystem(system, num_replicas=10) + + +@pytest.fixture(scope="module") +def perturbable_replica_system(perturbable_system): + """A perturbable replica system for testing.""" + return ReplicaSystem(perturbable_system, num_replicas=10) + + +@pytest.mark.parametrize("rs", ["replica_system", "perturbable_replica_system"]) +def test_num_replicas(rs, request): + """Test the number of replicas in the replica system.""" + replica_system = request.getfixturevalue(rs) + assert replica_system.nReplicas() == 10 + + +@pytest.mark.parametrize("rs", ["replica_system", "perturbable_replica_system"]) +def test_get_replica(rs, request): + """Test retrieving a replica system.""" + replica_system = request.getfixturevalue(rs) + + replica = replica_system.getReplica(0) + assert isinstance(replica, System) + + # Make sure negative indexing works. + replica_neg = replica_system.getReplica(-1) + assert isinstance(replica_neg, System) + + # Make sure __getitem__ works. + replica_item = replica_system[-1] + + # Make sure bounds checking works. + with pytest.raises(IndexError): + replica_system.getReplica(10) + with pytest.raises(IndexError): + replica_system.getReplica(-11) + + +@pytest.mark.parametrize("rs", ["replica_system", "perturbable_replica_system"]) +def test_stream(rs, request): + """Test streaming the replica system to a file.""" + replica_system = request.getfixturevalue(rs) + + with tempfile.TemporaryDirectory() as tmpdir: + stream, trajectory = replica_system.save(f"{tmpdir}/replica_system") + + # Check that files were created. + assert os.path.exists(f"{tmpdir}/replica_system.bss") + assert os.path.exists(f"{tmpdir}/replica_system.dcd") + + # Check that we can load the replica system back. + rs = ReplicaSystem.load( + f"{tmpdir}/replica_system.bss", f"{tmpdir}/replica_system.dcd" + ) + + # Check that the number of replicas matches. + assert rs.nReplicas() == replica_system.nReplicas() + + # Make sure we can stream using XTC trajectory format. + stream_xtc, trajectory_xtc = replica_system.save( + f"{tmpdir}/replica_system_xtc", traj_format="xtc" + ) + + assert os.path.exists(f"{tmpdir}/replica_system_xtc.bss") + assert os.path.exists(f"{tmpdir}/replica_system_xtc.xtc") + + # Check that we can load the replica system back. + rs_xtc = ReplicaSystem.load( + f"{tmpdir}/replica_system_xtc.bss", f"{tmpdir}/replica_system_xtc.xtc" + ) + + # Check that the number of replicas matches. + assert rs_xtc.nReplicas() == replica_system.nReplicas() + + +@pytest.mark.parametrize("rs", ["replica_system", "perturbable_replica_system"]) +def test_save_load_replicas(rs, request): + """Test saving/loading individual replicas to/from files.""" + replica_system = request.getfixturevalue(rs) + + with tempfile.TemporaryDirectory() as tmpdir: + # Create the list of filenames, one for each replica. + filenames = [ + f"{tmpdir}/replica_{i:03d}.gro" for i in range(replica_system.nReplicas()) + ] + + # Save the replicas. + replica_system.saveReplicas(filenames) + + # Check that files were created. + for filename in filenames: + assert os.path.exists(filename) + + # Make sure an exception is raised if the number of filenames does not + # match the number of replicas. + with pytest.raises(ValueError): + replica_system.saveReplicas(filenames[:-1]) + + # Make sure an exception is raised the file names don't all have the same + # extension. + bad_filenames = filenames.copy() + bad_filenames[0] = f"{tmpdir}/replica_000.rst7" + with pytest.raises(IOError): + replica_system.saveReplicas(bad_filenames) + + # Try to load the replicas back. + rs = ReplicaSystem.loadReplicas(replica_system, filenames) + + # Check that the number of replicas matches. + assert rs.nReplicas() == replica_system.nReplicas() + + # Only load back a subset of replicas. + rs = ReplicaSystem.loadReplicas(replica_system, filenames[:5]) + + # Check that the number of replicas matches. + assert rs.nReplicas() == 5 diff --git a/tests/_SireWrappers/test_replica_system.py b/tests/_SireWrappers/test_replica_system.py new file mode 100644 index 00000000..c85658c8 --- /dev/null +++ b/tests/_SireWrappers/test_replica_system.py @@ -0,0 +1,126 @@ +import os +import pytest +import tempfile + +from BioSimSpace._SireWrappers import System, ReplicaSystem + + +@pytest.fixture(scope="module") +def replica_system(system): + """A replica system for testing.""" + return ReplicaSystem(system, num_replicas=10) + + +@pytest.fixture(scope="module") +def perturbable_replica_system(perturbable_system): + """A perturbable replica system for testing.""" + return ReplicaSystem(perturbable_system, num_replicas=10) + + +@pytest.mark.parametrize("rs", ["replica_system", "perturbable_replica_system"]) +def test_num_replicas(rs, request): + """Test the number of replicas in the replica system.""" + replica_system = request.getfixturevalue(rs) + assert replica_system.nReplicas() == 10 + + +@pytest.mark.parametrize("rs", ["replica_system", "perturbable_replica_system"]) +def test_get_replica(rs, request): + """Test retrieving a replica system.""" + replica_system = request.getfixturevalue(rs) + + replica = replica_system.getReplica(0) + assert isinstance(replica, System) + + # Make sure negative indexing works. + replica_neg = replica_system.getReplica(-1) + assert isinstance(replica_neg, System) + + # Make sure __getitem__ works. + replica_item = replica_system[-1] + + # Make sure bounds checking works. + with pytest.raises(IndexError): + replica_system.getReplica(10) + with pytest.raises(IndexError): + replica_system.getReplica(-11) + + +@pytest.mark.parametrize("rs", ["replica_system", "perturbable_replica_system"]) +def test_stream(rs, request): + """Test streaming the replica system to a file.""" + replica_system = request.getfixturevalue(rs) + + with tempfile.TemporaryDirectory() as tmpdir: + stream, trajectory = replica_system.save(f"{tmpdir}/replica_system") + + # Check that files were created. + assert os.path.exists(f"{tmpdir}/replica_system.bss") + assert os.path.exists(f"{tmpdir}/replica_system.dcd") + + # Check that we can load the replica system back. + rs = ReplicaSystem.load( + f"{tmpdir}/replica_system.bss", f"{tmpdir}/replica_system.dcd" + ) + + # Check that the number of replicas matches. + assert rs.nReplicas() == replica_system.nReplicas() + + # Make sure we can stream using XTC trajectory format. + stream_xtc, trajectory_xtc = replica_system.save( + f"{tmpdir}/replica_system_xtc", traj_format="xtc" + ) + + assert os.path.exists(f"{tmpdir}/replica_system_xtc.bss") + assert os.path.exists(f"{tmpdir}/replica_system_xtc.xtc") + + # Check that we can load the replica system back. + rs_xtc = ReplicaSystem.load( + f"{tmpdir}/replica_system_xtc.bss", f"{tmpdir}/replica_system_xtc.xtc" + ) + + # Check that the number of replicas matches. + assert rs_xtc.nReplicas() == replica_system.nReplicas() + + +@pytest.mark.parametrize("rs", ["replica_system", "perturbable_replica_system"]) +def test_save_load_replicas(rs, request): + """Test saving/loading individual replicas to/from files.""" + replica_system = request.getfixturevalue(rs) + + with tempfile.TemporaryDirectory() as tmpdir: + # Create the list of filenames, one for each replica. + filenames = [ + f"{tmpdir}/replica_{i:03d}.gro" for i in range(replica_system.nReplicas()) + ] + + # Save the replicas. + replica_system.saveReplicas(filenames) + + # Check that files were created. + for filename in filenames: + assert os.path.exists(filename) + + # Make sure an exception is raised if the number of filenames does not + # match the number of replicas. + with pytest.raises(ValueError): + replica_system.saveReplicas(filenames[:-1]) + + # Make sure an exception is raised the file names don't all have the same + # extension. + bad_filenames = filenames.copy() + bad_filenames[0] = f"{tmpdir}/replica_000.rst7" + with pytest.raises(IOError): + replica_system.saveReplicas(bad_filenames) + + # Try to load the replicas back. + rs = ReplicaSystem.loadReplicas(replica_system, filenames) + + # Check that the number of replicas matches. + assert rs.nReplicas() == replica_system.nReplicas() + + # Only load back a subset of replicas. + rs = ReplicaSystem.loadReplicas(replica_system, filenames[:5]) + + # Check that the number of replicas matches. + assert rs.nReplicas() == 5 diff --git a/tests/conftest.py b/tests/conftest.py index 15ad5563..76223f9d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -69,7 +69,7 @@ def system(): return BSS.IO.readMolecules(["tests/input/ala.top", "tests/input/ala.crd"]) -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def perturbable_system(): """A vacuum perturbable system.""" return BSS.IO.readPerturbableSystem( From 62bc1c798becc81fa0656032eeb98fe83290aa88 Mon Sep 17 00:00:00 2001 From: Lester Hedges Date: Thu, 20 Nov 2025 09:50:08 +0000 Subject: [PATCH 2/6] Handle squashed AMBER FEP trajectories. --- .../_SireWrappers/_replica_system.py | 153 ++++++++++++------ .../_SireWrappers/_replica_system.py | 153 ++++++++++++------ .../_SireWrappers/test_replica_system.py | 25 +-- tests/_SireWrappers/test_replica_system.py | 25 +-- 4 files changed, 230 insertions(+), 126 deletions(-) diff --git a/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py b/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py index ac8b7610..1561cf46 100644 --- a/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py +++ b/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py @@ -36,7 +36,9 @@ class ReplicaSystem: Here a system has a single topology, but multiple coordinate sets (replicas). """ - def __init__(self, system, trajectory=None, num_replicas=None): + def __init__( + self, system, trajectory=None, num_replicas=None, is_squashed=None, **kwargs + ): """ Constructor. @@ -54,6 +56,13 @@ def __init__(self, system, trajectory=None, num_replicas=None): coordinate set in `system` will be duplicated this many times. This is only used if the system does not already contain multiple frames or a trajectory is not provided. + + is_squashed : bool, optional + Whether the trajectory is squashed (i.e. atoms for both the reference + and perturbed states are stored in a single frame). This will be true + for AMBER alchemical trajectories. Default is None, which will attempt to + automatically determine this based on the system properties and trajectory + format. """ from sire.mol._trajectory import TrajectoryIterator as _TrajectoryIterator @@ -88,6 +97,29 @@ def __init__(self, system, trajectory=None, num_replicas=None): except: self._is_perturbable = False + if is_squashed is not None: + if not isinstance(is_squashed, bool): + raise TypeError("'is_squashed' must be of type 'bool'.") + self._is_squashed = is_squashed + + # If this is a perturbable system and the trajectory is squashed, then + # we need to convert the system to squashed format. + if self._is_perturbable and self._is_squashed: + from ..Align._squash import _squash + + self._explicit_dummies = kwargs.get("explicit_dummies", False) + if not isinstance(self._explicit_dummies, bool): + self._explicit_dummies = False + + squashed_system, self._mapping = _squash( + _System(self._sire_object), explicit_dummies=self._explicit_dummies + ) + self._squashed_system = squashed_system._sire_object + else: + self._explicit_dummies = False + self._squashed_system = None + self._mapping = None + # If a trajectory is provided, make sure the file exists. if trajectory is not None: if self._new_sire_object.num_frames() > 1: @@ -111,8 +143,12 @@ def __init__(self, system, trajectory=None, num_replicas=None): # Get the extension of the file so that we can guess the format. _, ext = _os.path.splitext(trajectory) - # Use a GROMACS topology for XTC files. - if self._is_perturbable or ext == ".xtc": + # This is an AMBER DCD file, so it will be in squashed format. + if self._is_perturbable and ext == ".dcd": + self._is_squashed = True + tmp_top = _NamedTemporaryFile(delete=False, suffix=".prm7") + # Use a GROMACS topology for XTC files and non-AMBER perturbable systems. + elif self._is_perturbable or ext == ".xtc": tmp_top = _NamedTemporaryFile(delete=False, suffix=".top") # For now, use AMBER PRM7 for all other formats. else: @@ -122,7 +158,14 @@ def __init__(self, system, trajectory=None, num_replicas=None): try: from sire import save as _save - _save(self._new_sire_object, tmp_top.name, silent=True) + if self._is_perturbable and self._is_squashed: + _save( + _NewSireSystem(self._squashed_system), + tmp_top.name, + silent=True, + ) + else: + _save(self._new_sire_object, tmp_top.name, silent=True) except Exception as e: tmp_top.close() _os.unlink(tmp_top.name) @@ -132,7 +175,7 @@ def __init__(self, system, trajectory=None, num_replicas=None): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None # Now try to load the trajectory. try: @@ -145,7 +188,7 @@ def __init__(self, system, trajectory=None, num_replicas=None): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None # Update the internal trajectory. self._trajectory = sire_system.trajectory() @@ -174,7 +217,12 @@ def __init__(self, system, trajectory=None, num_replicas=None): try: # Write the first file only. tmp_file = _os.path.join(tmp_dir, f"replica_0000.gro") - _save(self._new_sire_object, tmp_file, silent=True) + if self._is_perturbable and self._is_squashed: + _save( + _NewSireSystem(self._squashed_system), tmp_file, silent=True + ) + else: + _save(self._new_sire_object, tmp_file, silent=True) filenames.append(tmp_file) # Copy the first file for the remaining replicas. @@ -188,9 +236,11 @@ def __init__(self, system, trajectory=None, num_replicas=None): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None - if self._is_perturbable: + if self._is_perturbable and self._is_squashed: + top_ext = "prm7" + elif self._is_perturbable: top_ext = "top" else: top_ext = "prm7" @@ -199,14 +249,21 @@ def __init__(self, system, trajectory=None, num_replicas=None): try: tmp_top = _os.path.join(tmp_dir, f"topology.{top_ext}") filenames.append(tmp_top) - _save(self._new_sire_object, tmp_top, silent=True) + if self._is_perturbable and self._is_squashed: + _save( + _NewSireSystem(self._squashed_system), + tmp_top, + silent=True, + ) + else: + _save(self._new_sire_object, tmp_top, silent=True) except Exception as e: msg = "Failed to write temporary topology file for replica duplication." if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None # Now load them all back in as a single system with multiple frames. try: @@ -217,7 +274,7 @@ def __init__(self, system, trajectory=None, num_replicas=None): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None # Update the internal trajectory. self._trajectory = sire_system.trajectory() @@ -271,7 +328,7 @@ def nReplicas(self): return self._trajectory.num_frames() - def save(self, filename, traj_format="dcd", save_velocities=False): + def save(self, filename, save_velocities=False): """ Save the replica system to a stream and trajectory file. @@ -281,10 +338,6 @@ def save(self, filename, traj_format="dcd", save_velocities=False): filename : str The base filename. - traj_format : str, optional - The format to save the trajectory in. Default is 'dcd'. - Options are "dcd" or "xtc". - save_velocities : bool, optional Whether to save velocities along with coordinates. Default is False. @@ -305,21 +358,17 @@ def save(self, filename, traj_format="dcd", save_velocities=False): if not isinstance(filename, str): raise TypeError("'filename' must be of type 'str'.") - # Validate the trajectory format. - valid_formats = ["dcd", "xtc"] - - if not isinstance(traj_format, str): - raise TypeError("'traj_format' must be of type 'str'.") - - traj_format = traj_format.lower().replace(" ", "") - if traj_format not in valid_formats: - raise ValueError( - f"'traj_format' must be one of: {', '.join(valid_formats)}" - ) - if not isinstance(save_velocities, bool): raise TypeError("'save_velocities' must be of type 'bool'.") + # Work out the trajectory format. + if self._is_perturbable and self._is_squashed: + traj_format = "dcd" + elif self._is_perturbable: + traj_format = "xtc" + else: + traj_format = "dcd" + # Save the trajectory first. try: traj_filename = f"{filename}.{traj_format}" @@ -335,7 +384,7 @@ def save(self, filename, traj_format="dcd", save_velocities=False): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None # Now clone the new Sire system so that we can remove the trajectory. system = self._new_sire_object.clone() @@ -351,7 +400,7 @@ def save(self, filename, traj_format="dcd", save_velocities=False): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None return stream_filename, traj_filename @@ -393,7 +442,7 @@ def load(stream, trajectory): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None return ReplicaSystem(sire_system, trajectory=trajectory) @@ -452,18 +501,32 @@ def getReplica(self, index, is_lambda1=False, property_map={}): # Get the specific frame. frame = self._trajectory[index].current() - from sire.io import get_coords_array as _get_coords_array - from sire.legacy.IO import setCoordinates as _setCoordinates + if self._is_perturbable and self._is_squashed: + from ..Align._squash import _unsquash + from ._system import System as _System - # Copy the frame coordinates into the system. - frame = _setCoordinates( - replica_system._system, - _get_coords_array(frame).tolist(), - is_lambda1=is_lambda1, - map=property_map, - ) + frame = _unsquash( + _System(self._sire_object), + _System(frame._system), + self._mapping, + explicit_dummies=self._explicit_dummies, + ) + + else: + from sire.io import get_coords_array as _get_coords_array + from sire.legacy.IO import setCoordinates as _setCoordinates + + # Copy the frame coordinates into the system. + frame = _setCoordinates( + replica_system._system, + _get_coords_array(frame).tolist(), + is_lambda1=is_lambda1, + map=property_map, + ) + + frame = _System(frame) - return _System(frame) + return frame def saveReplicas(self, filenames, save_velocities=False, is_lambda1=False): """ @@ -507,7 +570,7 @@ def saveReplicas(self, filenames, save_velocities=False, is_lambda1=False): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None @staticmethod def loadReplicas(replica_system, filenames): @@ -569,7 +632,7 @@ def loadReplicas(replica_system, filenames): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None # Prepend the temporary topology file to the list of filenames. filenames = [tmp_top.name] + filenames @@ -583,7 +646,7 @@ def loadReplicas(replica_system, filenames): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None # Clean up the temporary topology file. tmp_top.close() diff --git a/python/BioSimSpace/_SireWrappers/_replica_system.py b/python/BioSimSpace/_SireWrappers/_replica_system.py index ac8b7610..1561cf46 100644 --- a/python/BioSimSpace/_SireWrappers/_replica_system.py +++ b/python/BioSimSpace/_SireWrappers/_replica_system.py @@ -36,7 +36,9 @@ class ReplicaSystem: Here a system has a single topology, but multiple coordinate sets (replicas). """ - def __init__(self, system, trajectory=None, num_replicas=None): + def __init__( + self, system, trajectory=None, num_replicas=None, is_squashed=None, **kwargs + ): """ Constructor. @@ -54,6 +56,13 @@ def __init__(self, system, trajectory=None, num_replicas=None): coordinate set in `system` will be duplicated this many times. This is only used if the system does not already contain multiple frames or a trajectory is not provided. + + is_squashed : bool, optional + Whether the trajectory is squashed (i.e. atoms for both the reference + and perturbed states are stored in a single frame). This will be true + for AMBER alchemical trajectories. Default is None, which will attempt to + automatically determine this based on the system properties and trajectory + format. """ from sire.mol._trajectory import TrajectoryIterator as _TrajectoryIterator @@ -88,6 +97,29 @@ def __init__(self, system, trajectory=None, num_replicas=None): except: self._is_perturbable = False + if is_squashed is not None: + if not isinstance(is_squashed, bool): + raise TypeError("'is_squashed' must be of type 'bool'.") + self._is_squashed = is_squashed + + # If this is a perturbable system and the trajectory is squashed, then + # we need to convert the system to squashed format. + if self._is_perturbable and self._is_squashed: + from ..Align._squash import _squash + + self._explicit_dummies = kwargs.get("explicit_dummies", False) + if not isinstance(self._explicit_dummies, bool): + self._explicit_dummies = False + + squashed_system, self._mapping = _squash( + _System(self._sire_object), explicit_dummies=self._explicit_dummies + ) + self._squashed_system = squashed_system._sire_object + else: + self._explicit_dummies = False + self._squashed_system = None + self._mapping = None + # If a trajectory is provided, make sure the file exists. if trajectory is not None: if self._new_sire_object.num_frames() > 1: @@ -111,8 +143,12 @@ def __init__(self, system, trajectory=None, num_replicas=None): # Get the extension of the file so that we can guess the format. _, ext = _os.path.splitext(trajectory) - # Use a GROMACS topology for XTC files. - if self._is_perturbable or ext == ".xtc": + # This is an AMBER DCD file, so it will be in squashed format. + if self._is_perturbable and ext == ".dcd": + self._is_squashed = True + tmp_top = _NamedTemporaryFile(delete=False, suffix=".prm7") + # Use a GROMACS topology for XTC files and non-AMBER perturbable systems. + elif self._is_perturbable or ext == ".xtc": tmp_top = _NamedTemporaryFile(delete=False, suffix=".top") # For now, use AMBER PRM7 for all other formats. else: @@ -122,7 +158,14 @@ def __init__(self, system, trajectory=None, num_replicas=None): try: from sire import save as _save - _save(self._new_sire_object, tmp_top.name, silent=True) + if self._is_perturbable and self._is_squashed: + _save( + _NewSireSystem(self._squashed_system), + tmp_top.name, + silent=True, + ) + else: + _save(self._new_sire_object, tmp_top.name, silent=True) except Exception as e: tmp_top.close() _os.unlink(tmp_top.name) @@ -132,7 +175,7 @@ def __init__(self, system, trajectory=None, num_replicas=None): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None # Now try to load the trajectory. try: @@ -145,7 +188,7 @@ def __init__(self, system, trajectory=None, num_replicas=None): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None # Update the internal trajectory. self._trajectory = sire_system.trajectory() @@ -174,7 +217,12 @@ def __init__(self, system, trajectory=None, num_replicas=None): try: # Write the first file only. tmp_file = _os.path.join(tmp_dir, f"replica_0000.gro") - _save(self._new_sire_object, tmp_file, silent=True) + if self._is_perturbable and self._is_squashed: + _save( + _NewSireSystem(self._squashed_system), tmp_file, silent=True + ) + else: + _save(self._new_sire_object, tmp_file, silent=True) filenames.append(tmp_file) # Copy the first file for the remaining replicas. @@ -188,9 +236,11 @@ def __init__(self, system, trajectory=None, num_replicas=None): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None - if self._is_perturbable: + if self._is_perturbable and self._is_squashed: + top_ext = "prm7" + elif self._is_perturbable: top_ext = "top" else: top_ext = "prm7" @@ -199,14 +249,21 @@ def __init__(self, system, trajectory=None, num_replicas=None): try: tmp_top = _os.path.join(tmp_dir, f"topology.{top_ext}") filenames.append(tmp_top) - _save(self._new_sire_object, tmp_top, silent=True) + if self._is_perturbable and self._is_squashed: + _save( + _NewSireSystem(self._squashed_system), + tmp_top, + silent=True, + ) + else: + _save(self._new_sire_object, tmp_top, silent=True) except Exception as e: msg = "Failed to write temporary topology file for replica duplication." if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None # Now load them all back in as a single system with multiple frames. try: @@ -217,7 +274,7 @@ def __init__(self, system, trajectory=None, num_replicas=None): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None # Update the internal trajectory. self._trajectory = sire_system.trajectory() @@ -271,7 +328,7 @@ def nReplicas(self): return self._trajectory.num_frames() - def save(self, filename, traj_format="dcd", save_velocities=False): + def save(self, filename, save_velocities=False): """ Save the replica system to a stream and trajectory file. @@ -281,10 +338,6 @@ def save(self, filename, traj_format="dcd", save_velocities=False): filename : str The base filename. - traj_format : str, optional - The format to save the trajectory in. Default is 'dcd'. - Options are "dcd" or "xtc". - save_velocities : bool, optional Whether to save velocities along with coordinates. Default is False. @@ -305,21 +358,17 @@ def save(self, filename, traj_format="dcd", save_velocities=False): if not isinstance(filename, str): raise TypeError("'filename' must be of type 'str'.") - # Validate the trajectory format. - valid_formats = ["dcd", "xtc"] - - if not isinstance(traj_format, str): - raise TypeError("'traj_format' must be of type 'str'.") - - traj_format = traj_format.lower().replace(" ", "") - if traj_format not in valid_formats: - raise ValueError( - f"'traj_format' must be one of: {', '.join(valid_formats)}" - ) - if not isinstance(save_velocities, bool): raise TypeError("'save_velocities' must be of type 'bool'.") + # Work out the trajectory format. + if self._is_perturbable and self._is_squashed: + traj_format = "dcd" + elif self._is_perturbable: + traj_format = "xtc" + else: + traj_format = "dcd" + # Save the trajectory first. try: traj_filename = f"{filename}.{traj_format}" @@ -335,7 +384,7 @@ def save(self, filename, traj_format="dcd", save_velocities=False): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None # Now clone the new Sire system so that we can remove the trajectory. system = self._new_sire_object.clone() @@ -351,7 +400,7 @@ def save(self, filename, traj_format="dcd", save_velocities=False): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None return stream_filename, traj_filename @@ -393,7 +442,7 @@ def load(stream, trajectory): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None return ReplicaSystem(sire_system, trajectory=trajectory) @@ -452,18 +501,32 @@ def getReplica(self, index, is_lambda1=False, property_map={}): # Get the specific frame. frame = self._trajectory[index].current() - from sire.io import get_coords_array as _get_coords_array - from sire.legacy.IO import setCoordinates as _setCoordinates + if self._is_perturbable and self._is_squashed: + from ..Align._squash import _unsquash + from ._system import System as _System - # Copy the frame coordinates into the system. - frame = _setCoordinates( - replica_system._system, - _get_coords_array(frame).tolist(), - is_lambda1=is_lambda1, - map=property_map, - ) + frame = _unsquash( + _System(self._sire_object), + _System(frame._system), + self._mapping, + explicit_dummies=self._explicit_dummies, + ) + + else: + from sire.io import get_coords_array as _get_coords_array + from sire.legacy.IO import setCoordinates as _setCoordinates + + # Copy the frame coordinates into the system. + frame = _setCoordinates( + replica_system._system, + _get_coords_array(frame).tolist(), + is_lambda1=is_lambda1, + map=property_map, + ) + + frame = _System(frame) - return _System(frame) + return frame def saveReplicas(self, filenames, save_velocities=False, is_lambda1=False): """ @@ -507,7 +570,7 @@ def saveReplicas(self, filenames, save_velocities=False, is_lambda1=False): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None @staticmethod def loadReplicas(replica_system, filenames): @@ -569,7 +632,7 @@ def loadReplicas(replica_system, filenames): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None # Prepend the temporary topology file to the list of filenames. filenames = [tmp_top.name] + filenames @@ -583,7 +646,7 @@ def loadReplicas(replica_system, filenames): if _isVerbose(): raise IOError(msg) from e else: - raise IOError(msg) + raise IOError(msg) from None # Clean up the temporary topology file. tmp_top.close() diff --git a/tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py b/tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py index 02e37416..57d26efb 100644 --- a/tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py +++ b/tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py @@ -76,32 +76,21 @@ def test_stream(rs, request): # Check that files were created. assert os.path.exists(f"{tmpdir}/replica_system.bss") - assert os.path.exists(f"{tmpdir}/replica_system.dcd") + if replica_system._is_perturbable: + assert os.path.exists(f"{tmpdir}/replica_system.xtc") + traj_ext = "xtc" + else: + assert os.path.exists(f"{tmpdir}/replica_system.dcd") + traj_ext = "dcd" # Check that we can load the replica system back. rs = ReplicaSystem.load( - f"{tmpdir}/replica_system.bss", f"{tmpdir}/replica_system.dcd" + f"{tmpdir}/replica_system.bss", f"{tmpdir}/replica_system.{traj_ext}" ) # Check that the number of replicas matches. assert rs.nReplicas() == replica_system.nReplicas() - # Make sure we can stream using XTC trajectory format. - stream_xtc, trajectory_xtc = replica_system.save( - f"{tmpdir}/replica_system_xtc", traj_format="xtc" - ) - - assert os.path.exists(f"{tmpdir}/replica_system_xtc.bss") - assert os.path.exists(f"{tmpdir}/replica_system_xtc.xtc") - - # Check that we can load the replica system back. - rs_xtc = ReplicaSystem.load( - f"{tmpdir}/replica_system_xtc.bss", f"{tmpdir}/replica_system_xtc.xtc" - ) - - # Check that the number of replicas matches. - assert rs_xtc.nReplicas() == replica_system.nReplicas() - @pytest.mark.parametrize("rs", ["replica_system", "perturbable_replica_system"]) def test_save_load_replicas(rs, request): diff --git a/tests/_SireWrappers/test_replica_system.py b/tests/_SireWrappers/test_replica_system.py index c85658c8..c7927364 100644 --- a/tests/_SireWrappers/test_replica_system.py +++ b/tests/_SireWrappers/test_replica_system.py @@ -56,32 +56,21 @@ def test_stream(rs, request): # Check that files were created. assert os.path.exists(f"{tmpdir}/replica_system.bss") - assert os.path.exists(f"{tmpdir}/replica_system.dcd") + if replica_system._is_perturbable: + assert os.path.exists(f"{tmpdir}/replica_system.xtc") + traj_ext = "xtc" + else: + assert os.path.exists(f"{tmpdir}/replica_system.dcd") + traj_ext = "dcd" # Check that we can load the replica system back. rs = ReplicaSystem.load( - f"{tmpdir}/replica_system.bss", f"{tmpdir}/replica_system.dcd" + f"{tmpdir}/replica_system.bss", f"{tmpdir}/replica_system.{traj_ext}" ) # Check that the number of replicas matches. assert rs.nReplicas() == replica_system.nReplicas() - # Make sure we can stream using XTC trajectory format. - stream_xtc, trajectory_xtc = replica_system.save( - f"{tmpdir}/replica_system_xtc", traj_format="xtc" - ) - - assert os.path.exists(f"{tmpdir}/replica_system_xtc.bss") - assert os.path.exists(f"{tmpdir}/replica_system_xtc.xtc") - - # Check that we can load the replica system back. - rs_xtc = ReplicaSystem.load( - f"{tmpdir}/replica_system_xtc.bss", f"{tmpdir}/replica_system_xtc.xtc" - ) - - # Check that the number of replicas matches. - assert rs_xtc.nReplicas() == replica_system.nReplicas() - @pytest.mark.parametrize("rs", ["replica_system", "perturbable_replica_system"]) def test_save_load_replicas(rs, request): From b6f7296987350b28b57ef490c17c0bbafa2ef4e3 Mon Sep 17 00:00:00 2001 From: Lester Hedges Date: Thu, 20 Nov 2025 11:10:35 +0000 Subject: [PATCH 3/6] Add method to get all replicas as a list of System objects. --- .../_SireWrappers/_replica_system.py | 31 +++++++++++++++++++ .../_SireWrappers/_replica_system.py | 31 +++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py b/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py index 1561cf46..b37e86cb 100644 --- a/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py +++ b/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py @@ -572,6 +572,37 @@ def saveReplicas(self, filenames, save_velocities=False, is_lambda1=False): else: raise IOError(msg) from None + def getReplicas(self, is_lambda1=False, property_map={}): + """ + Get all replicas (coordinate sets) from the system. + + Parameters + ---------- + + is_lambda1 : bool, optional + Whether to return the systems at lambda = 1. Default is False. + + property_map : dict, optional + A dictionary that maps system "properties" to their user defined + values. This allows the user to refer to properties with their + own naming scheme, e.g. { "charge" : "my-charge" } + + Returns + ------- + + list of :class:`System ` + A list of the replicas as BioSimSpace System objects. + """ + + replicas = [] + for i in range(self.nReplicas()): + replica = self.getReplica( + i, is_lambda1=is_lambda1, property_map=property_map + ) + replicas.append(replica) + + return replicas + @staticmethod def loadReplicas(replica_system, filenames): """ diff --git a/python/BioSimSpace/_SireWrappers/_replica_system.py b/python/BioSimSpace/_SireWrappers/_replica_system.py index 1561cf46..b37e86cb 100644 --- a/python/BioSimSpace/_SireWrappers/_replica_system.py +++ b/python/BioSimSpace/_SireWrappers/_replica_system.py @@ -572,6 +572,37 @@ def saveReplicas(self, filenames, save_velocities=False, is_lambda1=False): else: raise IOError(msg) from None + def getReplicas(self, is_lambda1=False, property_map={}): + """ + Get all replicas (coordinate sets) from the system. + + Parameters + ---------- + + is_lambda1 : bool, optional + Whether to return the systems at lambda = 1. Default is False. + + property_map : dict, optional + A dictionary that maps system "properties" to their user defined + values. This allows the user to refer to properties with their + own naming scheme, e.g. { "charge" : "my-charge" } + + Returns + ------- + + list of :class:`System ` + A list of the replicas as BioSimSpace System objects. + """ + + replicas = [] + for i in range(self.nReplicas()): + replica = self.getReplica( + i, is_lambda1=is_lambda1, property_map=property_map + ) + replicas.append(replica) + + return replicas + @staticmethod def loadReplicas(replica_system, filenames): """ From d66853d6e3038678cd921f7b5ca7f60b5efd91ee Mon Sep 17 00:00:00 2001 From: Lester Hedges Date: Thu, 20 Nov 2025 11:52:37 +0000 Subject: [PATCH 4/6] Add unit test for squashed representation handling. --- .../_SireWrappers/_replica_system.py | 23 ++++++-- .../_SireWrappers/_replica_system.py | 23 ++++++-- .../_SireWrappers/test_replica_system.py | 59 +++++++++++++++---- tests/_SireWrappers/test_replica_system.py | 59 +++++++++++++++---- 4 files changed, 132 insertions(+), 32 deletions(-) diff --git a/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py b/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py index b37e86cb..c42a0a41 100644 --- a/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py +++ b/python/BioSimSpace/Sandpit/Exscientia/_SireWrappers/_replica_system.py @@ -102,21 +102,22 @@ def __init__( raise TypeError("'is_squashed' must be of type 'bool'.") self._is_squashed = is_squashed + # Check the kwargs to see whether explicit dummies should be used + # when generating squashed systems. + self._explicit_dummies = kwargs.get("explicit_dummies", False) + if not isinstance(self._explicit_dummies, bool): + self._explicit_dummies = False + # If this is a perturbable system and the trajectory is squashed, then # we need to convert the system to squashed format. if self._is_perturbable and self._is_squashed: from ..Align._squash import _squash - self._explicit_dummies = kwargs.get("explicit_dummies", False) - if not isinstance(self._explicit_dummies, bool): - self._explicit_dummies = False - squashed_system, self._mapping = _squash( _System(self._sire_object), explicit_dummies=self._explicit_dummies ) self._squashed_system = squashed_system._sire_object else: - self._explicit_dummies = False self._squashed_system = None self._mapping = None @@ -145,7 +146,17 @@ def __init__( # This is an AMBER DCD file, so it will be in squashed format. if self._is_perturbable and ext == ".dcd": - self._is_squashed = True + # If the user has not specified whether the trajectory is + # squashed, then we need to assume that it is. + if not self._is_squashed: + from ..Align._squash import _squash + + squashed_system, self._mapping = _squash( + _System(self._sire_object), + explicit_dummies=self._explicit_dummies, + ) + self._squashed_system = squashed_system._sire_object + self._is_squashed = True tmp_top = _NamedTemporaryFile(delete=False, suffix=".prm7") # Use a GROMACS topology for XTC files and non-AMBER perturbable systems. elif self._is_perturbable or ext == ".xtc": diff --git a/python/BioSimSpace/_SireWrappers/_replica_system.py b/python/BioSimSpace/_SireWrappers/_replica_system.py index b37e86cb..c42a0a41 100644 --- a/python/BioSimSpace/_SireWrappers/_replica_system.py +++ b/python/BioSimSpace/_SireWrappers/_replica_system.py @@ -102,21 +102,22 @@ def __init__( raise TypeError("'is_squashed' must be of type 'bool'.") self._is_squashed = is_squashed + # Check the kwargs to see whether explicit dummies should be used + # when generating squashed systems. + self._explicit_dummies = kwargs.get("explicit_dummies", False) + if not isinstance(self._explicit_dummies, bool): + self._explicit_dummies = False + # If this is a perturbable system and the trajectory is squashed, then # we need to convert the system to squashed format. if self._is_perturbable and self._is_squashed: from ..Align._squash import _squash - self._explicit_dummies = kwargs.get("explicit_dummies", False) - if not isinstance(self._explicit_dummies, bool): - self._explicit_dummies = False - squashed_system, self._mapping = _squash( _System(self._sire_object), explicit_dummies=self._explicit_dummies ) self._squashed_system = squashed_system._sire_object else: - self._explicit_dummies = False self._squashed_system = None self._mapping = None @@ -145,7 +146,17 @@ def __init__( # This is an AMBER DCD file, so it will be in squashed format. if self._is_perturbable and ext == ".dcd": - self._is_squashed = True + # If the user has not specified whether the trajectory is + # squashed, then we need to assume that it is. + if not self._is_squashed: + from ..Align._squash import _squash + + squashed_system, self._mapping = _squash( + _System(self._sire_object), + explicit_dummies=self._explicit_dummies, + ) + self._squashed_system = squashed_system._sire_object + self._is_squashed = True tmp_top = _NamedTemporaryFile(delete=False, suffix=".prm7") # Use a GROMACS topology for XTC files and non-AMBER perturbable systems. elif self._is_perturbable or ext == ".xtc": diff --git a/tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py b/tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py index 57d26efb..2a8d4ec5 100644 --- a/tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py +++ b/tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py @@ -37,6 +37,12 @@ def perturbable_replica_system(perturbable_system): return ReplicaSystem(perturbable_system, num_replicas=10) +@pytest.fixture(scope="module") +def squashed_perturbable_replica_system(perturbable_system): + """A squashed perturbable replica system for testing.""" + return ReplicaSystem(perturbable_system, num_replicas=10, is_squashed=True) + + @pytest.mark.parametrize("rs", ["replica_system", "perturbable_replica_system"]) def test_num_replicas(rs, request): """Test the number of replicas in the replica system.""" @@ -75,18 +81,11 @@ def test_stream(rs, request): stream, trajectory = replica_system.save(f"{tmpdir}/replica_system") # Check that files were created. - assert os.path.exists(f"{tmpdir}/replica_system.bss") - if replica_system._is_perturbable: - assert os.path.exists(f"{tmpdir}/replica_system.xtc") - traj_ext = "xtc" - else: - assert os.path.exists(f"{tmpdir}/replica_system.dcd") - traj_ext = "dcd" + assert os.path.exists(stream) + assert os.path.exists(trajectory) # Check that we can load the replica system back. - rs = ReplicaSystem.load( - f"{tmpdir}/replica_system.bss", f"{tmpdir}/replica_system.{traj_ext}" - ) + rs = ReplicaSystem.load(stream, trajectory) # Check that the number of replicas matches. assert rs.nReplicas() == replica_system.nReplicas() @@ -133,3 +132,43 @@ def test_save_load_replicas(rs, request): # Check that the number of replicas matches. assert rs.nReplicas() == 5 + + +def test_squashed_representation(squashed_perturbable_replica_system): + """Test that the internal squashed trajectory representation works.""" + # Check that the number of atoms in the internal system is less + # than the number of atoms in the internal trajectory. + assert ( + squashed_perturbable_replica_system._new_sire_object.num_atoms() + < squashed_perturbable_replica_system._trajectory[0].current().num_atoms() + ) + + # Check that we can retrieve a replica system. + replica = squashed_perturbable_replica_system.getReplica(0) + assert isinstance(replica, System) + + # Make sure that the replica has the correct number of atoms. + assert ( + replica.nAtoms() + == squashed_perturbable_replica_system._new_sire_object.num_atoms() + ) + + # Test saving and loading the squashed replica system. + with tempfile.TemporaryDirectory(delete=False) as tmpdir: + stream, trajectory = squashed_perturbable_replica_system.save( + f"{tmpdir}/squashed_replica_system" + ) + + # Check that files were created. + assert os.path.exists(stream) + assert os.path.exists(trajectory) + + # Check that we can load the replica system back. + rs = ReplicaSystem.load(stream, trajectory) + + # Check that the number of replicas matches. + assert rs.nReplicas() == squashed_perturbable_replica_system.nReplicas() + + # Make sure that the trajectory is still squashed. + assert rs._is_squashed is True + assert rs._new_sire_object.num_atoms() < rs._trajectory[0].current().num_atoms() diff --git a/tests/_SireWrappers/test_replica_system.py b/tests/_SireWrappers/test_replica_system.py index c7927364..6bc9c810 100644 --- a/tests/_SireWrappers/test_replica_system.py +++ b/tests/_SireWrappers/test_replica_system.py @@ -17,6 +17,12 @@ def perturbable_replica_system(perturbable_system): return ReplicaSystem(perturbable_system, num_replicas=10) +@pytest.fixture(scope="module") +def squashed_perturbable_replica_system(perturbable_system): + """A squashed perturbable replica system for testing.""" + return ReplicaSystem(perturbable_system, num_replicas=10, is_squashed=True) + + @pytest.mark.parametrize("rs", ["replica_system", "perturbable_replica_system"]) def test_num_replicas(rs, request): """Test the number of replicas in the replica system.""" @@ -55,18 +61,11 @@ def test_stream(rs, request): stream, trajectory = replica_system.save(f"{tmpdir}/replica_system") # Check that files were created. - assert os.path.exists(f"{tmpdir}/replica_system.bss") - if replica_system._is_perturbable: - assert os.path.exists(f"{tmpdir}/replica_system.xtc") - traj_ext = "xtc" - else: - assert os.path.exists(f"{tmpdir}/replica_system.dcd") - traj_ext = "dcd" + assert os.path.exists(stream) + assert os.path.exists(trajectory) # Check that we can load the replica system back. - rs = ReplicaSystem.load( - f"{tmpdir}/replica_system.bss", f"{tmpdir}/replica_system.{traj_ext}" - ) + rs = ReplicaSystem.load(stream, trajectory) # Check that the number of replicas matches. assert rs.nReplicas() == replica_system.nReplicas() @@ -113,3 +112,43 @@ def test_save_load_replicas(rs, request): # Check that the number of replicas matches. assert rs.nReplicas() == 5 + + +def test_squashed_representation(squashed_perturbable_replica_system): + """Test that the internal squashed trajectory representation works.""" + # Check that the number of atoms in the internal system is less + # than the number of atoms in the internal trajectory. + assert ( + squashed_perturbable_replica_system._new_sire_object.num_atoms() + < squashed_perturbable_replica_system._trajectory[0].current().num_atoms() + ) + + # Check that we can retrieve a replica system. + replica = squashed_perturbable_replica_system.getReplica(0) + assert isinstance(replica, System) + + # Make sure that the replica has the correct number of atoms. + assert ( + replica.nAtoms() + == squashed_perturbable_replica_system._new_sire_object.num_atoms() + ) + + # Test saving and loading the squashed replica system. + with tempfile.TemporaryDirectory(delete=False) as tmpdir: + stream, trajectory = squashed_perturbable_replica_system.save( + f"{tmpdir}/squashed_replica_system" + ) + + # Check that files were created. + assert os.path.exists(stream) + assert os.path.exists(trajectory) + + # Check that we can load the replica system back. + rs = ReplicaSystem.load(stream, trajectory) + + # Check that the number of replicas matches. + assert rs.nReplicas() == squashed_perturbable_replica_system.nReplicas() + + # Make sure that the trajectory is still squashed. + assert rs._is_squashed is True + assert rs._new_sire_object.num_atoms() < rs._trajectory[0].current().num_atoms() From 581013b9379d1654dfb227d132b9b8717efa3fa4 Mon Sep 17 00:00:00 2001 From: Lester Hedges Date: Fri, 21 Nov 2025 13:12:23 +0000 Subject: [PATCH 5/6] Remove debugging flag. --- tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py | 2 +- tests/_SireWrappers/test_replica_system.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py b/tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py index 2a8d4ec5..f8ae6aa6 100644 --- a/tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py +++ b/tests/Sandpit/Exscientia/_SireWrappers/test_replica_system.py @@ -154,7 +154,7 @@ def test_squashed_representation(squashed_perturbable_replica_system): ) # Test saving and loading the squashed replica system. - with tempfile.TemporaryDirectory(delete=False) as tmpdir: + with tempfile.TemporaryDirectory() as tmpdir: stream, trajectory = squashed_perturbable_replica_system.save( f"{tmpdir}/squashed_replica_system" ) diff --git a/tests/_SireWrappers/test_replica_system.py b/tests/_SireWrappers/test_replica_system.py index 6bc9c810..2bfa6030 100644 --- a/tests/_SireWrappers/test_replica_system.py +++ b/tests/_SireWrappers/test_replica_system.py @@ -134,7 +134,7 @@ def test_squashed_representation(squashed_perturbable_replica_system): ) # Test saving and loading the squashed replica system. - with tempfile.TemporaryDirectory(delete=False) as tmpdir: + with tempfile.TemporaryDirectory() as tmpdir: stream, trajectory = squashed_perturbable_replica_system.save( f"{tmpdir}/squashed_replica_system" ) From dccf1ae2642fbd7fd1189765b9925428603a6c41 Mon Sep 17 00:00:00 2001 From: Lester Hedges Date: Fri, 21 Nov 2025 13:14:47 +0000 Subject: [PATCH 6/6] Restore missing fixture. --- tests/IO/test_perturbable.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/IO/test_perturbable.py b/tests/IO/test_perturbable.py index d6bbaf2f..d33646a8 100644 --- a/tests/IO/test_perturbable.py +++ b/tests/IO/test_perturbable.py @@ -5,6 +5,17 @@ from tests.conftest import url +@pytest.fixture(scope="module") +def perturbable_molecule(): + """Re-use the same perturbable system for each test.""" + return BSS.IO.readPerturbableSystem( + f"{url}/perturbable_system0.prm7", + f"{url}/perturbable_system0.rst7", + f"{url}/perturbable_system1.prm7", + f"{url}/perturbable_system1.rst7", + ).getMolecules()[0] + + def test_connectivity(perturbable_molecule): """ Make sure there is a single connectivity property when the end states