Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- all `asyncmd.utils` methods now require a MDEngine (class) to dispatch to the correct engine submodule methods
- GmxEngine `apply_constraints` and `generate_velocities` methods: rename `wdir` argument to `workdir` to make it consistent with `prepare` and `prepare_from_files` (also add the `workdir` argument to the MDEngine ABC).

### Fixed
Expand Down
7 changes: 4 additions & 3 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@
{
"name": "PyPI",
"url": "https://pypi.org/project/asyncmd/",
# download stats are broken? (August 2025)
#"icon": "https://img.shields.io/pypi/dm/asyncmd",
"icon": "https://img.shields.io/pypi/v/asyncmd",
"icon": "https://img.shields.io/pypi/dm/asyncmd?label=pypi%20downloads",
# keep the replacement icon for PyPi link, but commented out
# (for the next time download stats are broken)
#"icon": "https://img.shields.io/pypi/v/asyncmd",
"type": "url",
},
{
Expand Down
13 changes: 8 additions & 5 deletions src/asyncmd/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
TrajectoryFunctionValueCacheInH5PY as _TrajectoryFunctionValueCacheInH5PY,
)
# pylint: disable-next=unused-import
from .slurm import set_slurm_settings, set_all_slurm_settings

Check notice on line 39 in src/asyncmd/config.py

View workflow job for this annotation

GitHub Actions / pylint

I0011

Locally disabling unused-import (W0611)


if typing.TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -64,8 +64,8 @@
spawning hundreds of processes.
"""
# NOTE: I think we should use a conservative default, e.g. 0.25*cpu_count()
# pylint: disable-next=global-variable-not-assigned
# pylint: disable-next=global-statement
global _SEMAPHORES

Check notice on line 68 in src/asyncmd/config.py

View workflow job for this annotation

GitHub Actions / pylint

I0020

Suppressed 'global-statement' (from line 68)

Check notice on line 68 in src/asyncmd/config.py

View workflow job for this annotation

GitHub Actions / pylint

I0011

Locally disabling global-statement (W0603)
if num is None:
if (logical_cpu_count := os.cpu_count()) is not None:
num = max(1, int(logical_cpu_count / 4))
Expand Down Expand Up @@ -107,8 +107,8 @@
# semaphores from non-async code, but sometimes use the sync subprocess.run
# and subprocess.check_call [which also need files/pipes to work])
# also maybe we need other open files like a storage :)
# pylint: disable-next=global-variable-not-assigned
# pylint: disable-next=global-statement
global _SEMAPHORES

Check notice on line 111 in src/asyncmd/config.py

View workflow job for this annotation

GitHub Actions / pylint

I0020

Suppressed 'global-statement' (from line 111)

Check notice on line 111 in src/asyncmd/config.py

View workflow job for this annotation

GitHub Actions / pylint

I0011

Locally disabling global-statement (W0603)
rlim_soft = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
if num is None:
num = rlim_soft
Expand Down Expand Up @@ -157,8 +157,8 @@
The maximum number of simultaneous SLURM jobs for this invocation of
python/asyncmd. `None` means do not limit the maximum number of jobs.
"""
# pylint: disable-next=global-variable-not-assigned
# pylint: disable-next=global-statement
global _OPT_SEMAPHORES

Check notice on line 161 in src/asyncmd/config.py

View workflow job for this annotation

GitHub Actions / pylint

I0020

Suppressed 'global-statement' (from line 161)

Check notice on line 161 in src/asyncmd/config.py

View workflow job for this annotation

GitHub Actions / pylint

I0011

Locally disabling global-statement (W0603)
if num is None:
_OPT_SEMAPHORES[_OPT_SEMAPHORES_KEYS.SLURM_MAX_JOB] = None
else:
Expand Down Expand Up @@ -195,8 +195,8 @@
ValueError
Raised if ``cache_type`` is not one of the allowed values.
"""
# pylint: disable-next=global-variable-not-assigned
# pylint: disable-next=global-statement
global _GLOBALS

Check notice on line 199 in src/asyncmd/config.py

View workflow job for this annotation

GitHub Actions / pylint

I0011

Locally disabling global-statement (W0603)
allowed_values = ["h5py", "npz", "memory"]
if (cache_type := cache_type.lower()) not in allowed_values:
raise ValueError(f"Given cache type must be one of {allowed_values}."
Expand Down Expand Up @@ -252,8 +252,8 @@
clear_old_cache : bool, optional
Whether to clear the old/previously set cache, by default False.
"""
# pylint: disable-next=global-variable-not-assigned
# pylint: disable-next=global-statement
global _GLOBALS

Check notice on line 256 in src/asyncmd/config.py

View workflow job for this annotation

GitHub Actions / pylint

I0011

Locally disabling global-statement (W0603)
if _GLOBALS.get(_GLOBALS_KEYS.TRAJECTORY_FUNCTION_CACHE_TYPE, "not_set") != "h5py":
# nothing to copy as h5py was not the old cache type
if h5py_group.file.mode == "r":
Expand Down Expand Up @@ -315,6 +315,9 @@
_deregister_h5py_cache_for_all_trajectories(h5py_group=h5py_group)
if _GLOBALS.get(_GLOBALS_KEYS.H5PY_CACHE, None) is h5py_group:
_GLOBALS[_GLOBALS_KEYS.H5PY_CACHE] = None
logger.warning("Deregistered global writeable h5py cache. No TrajectoryFunction"
" values will be cached until a new h5py cache has been registered."
)
if h5py_group in _GLOBALS.get(_GLOBALS_KEYS.H5PY_CACHE_READ_ONLY_FALLBACKS, []):
_GLOBALS[_GLOBALS_KEYS.H5PY_CACHE_READ_ONLY_FALLBACKS].remove(h5py_group)

Expand All @@ -325,7 +328,7 @@
"""
print(f"Values controlling caching: {_GLOBALS}")
# pylint: disable-next=protected-access
sem_print = {key: sem._value

Check notice on line 331 in src/asyncmd/config.py

View workflow job for this annotation

GitHub Actions / pylint

I0011

Locally disabling protected-access (W0212)
for key, sem in {**_SEMAPHORES, **_OPT_SEMAPHORES}.items()
if sem is not None}
print(f"Semaphores controlling resource usage: {sem_print}")
32 changes: 16 additions & 16 deletions src/asyncmd/gromacs/mdengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@
top_file=top_file,
)
self._engine_state = _GmxEngineState()
# TODO: store a hash/the file contents for gro, top, ndx to check against

Check warning on line 214 in src/asyncmd/gromacs/mdengine.py

View workflow job for this annotation

GitHub Actions / pylint

W0511

TODO: store a hash/the file contents for gro, top, ndx to check against
# when we load from storage/restart? if we do this, do it in the property!
self.gro_file = gro_file
self.top_file = top_file
Expand Down Expand Up @@ -508,8 +508,8 @@
k=6,
)
)
swdir = os.path.join(wdir, run_name)
await aiofiles.os.mkdir(swdir)
wdir = os.path.join(wdir, run_name)
await aiofiles.os.mkdir(wdir)
constraints_mdp = copy.deepcopy(self.mdp)
constraints_mdp["continuation"] = "no" if constraints else "yes"
constraints_mdp["gen-vel"] = "yes" if generate_velocities else "no"
Expand All @@ -523,12 +523,12 @@
# make sure we have draw a new/different random number for gen-vel
constraints_mdp["gen-seed"] = -1
constraints_mdp["nsteps"] = 0
await self._run_grompp(workdir=swdir, deffnm=run_name,
await self._run_grompp(workdir=wdir, deffnm=run_name,
trr_in=conf_in.trajectory_files[0],
tpr_out=os.path.join(swdir, f"{run_name}.tpr"),
tpr_out=os.path.join(wdir, f"{run_name}.tpr"),
mdp_obj=constraints_mdp)
cmd_str = self._mdrun_cmd(tpr=os.path.join(swdir, f"{run_name}.tpr"),
workdir=swdir,
cmd_str = self._mdrun_cmd(tpr=os.path.join(wdir, f"{run_name}.tpr"),
workdir=wdir,
deffnm=run_name)
logger.debug("About to execute gmx mdrun command for constraints and"
"/or velocity generation: %s",
Expand All @@ -537,9 +537,9 @@
stdout = bytes()
await self._acquire_resources_gmx_mdrun()
mdrun_proc = await self._start_gmx_mdrun(
cmd_str=cmd_str, workdir=swdir,
cmd_str=cmd_str, workdir=wdir,
run_name=run_name,
# TODO: we hardcode that the 0step MD runs can not be longer than 15 min

Check warning on line 542 in src/asyncmd/gromacs/mdengine.py

View workflow job for this annotation

GitHub Actions / pylint

W0511

TODO: we hardcode that the 0step MD runs can not be longer than 15 min
# (but i think this should be fine for randomizing velocities and/or
# applying constraints?!)
walltime=0.25,
Expand All @@ -563,25 +563,25 @@
# the FrameExtractor (i.e. MDAnalysis) handle any potential conversions
engine_traj = Trajectory(
trajectory_files=os.path.join(
swdir, f"{run_name}{self._num_suffix(1)}.trr"
wdir, f"{run_name}{self._num_suffix(1)}.trr"
),
structure_file=conf_in.structure_file,
)
extractor = NoModificationFrameExtractor()
# Note: we use extract (and not extract_async) because otherwise
# it can happen in super-rare circumstances that the Trajectory
# we just instantiated is "replaced" by a Trajectory with the
# same hash but a different filename/path, then the extraction
# same hash but a different filename/path. If then in addition
# this trajectory is removed before extracting, the extraction
# fails. If we dont await this can not happen since we do not
# give up control in between.
out_traj = extractor.extract(outfile=conf_out_name,
traj_in=engine_traj,
idx=len(engine_traj) - 1,
)
return out_traj
return extractor.extract(outfile=conf_out_name,
traj_in=engine_traj,
idx=len(engine_traj) - 1,
)
finally:
await self._cleanup_gmx_mdrun(workdir=swdir, run_name=run_name)
shutil.rmtree(swdir) # remove the whole directory we used as wdir
await self._cleanup_gmx_mdrun(workdir=wdir, run_name=run_name)
shutil.rmtree(wdir) # remove the whole directory we used as wdir

async def prepare(self, starting_configuration: Trajectory | None | str,
workdir: str, deffnm: str) -> None:
Expand Down Expand Up @@ -965,7 +965,7 @@
cmd += f" -n {ndx_file}"
if trr_in is not None:
# input trr is optional
# TODO /NOTE: currently we do not pass '-time', i.e. we just use the

Check warning on line 968 in src/asyncmd/gromacs/mdengine.py

View workflow job for this annotation

GitHub Actions / pylint

W0511

TODO /NOTE: currently we do not pass '-time', i.e. we just use the
# gmx default frame selection: last frame from trr
trr_in = os.path.relpath(trr_in, start=workdir)
cmd += f" -t {trr_in}"
Expand Down Expand Up @@ -996,7 +996,7 @@
# however gromacs -deffnm is deprecated (and buggy),
# so we just make our own 'deffnm', i.e. we name all files the same
# except for the ending but do so explicitly
# TODO /FIXME: we dont specify the names for e.g. pull outputfiles,

Check warning on line 999 in src/asyncmd/gromacs/mdengine.py

View workflow job for this annotation

GitHub Actions / pylint

W0511

TODO /FIXME: we dont specify the names for e.g. pull outputfiles,
# so they will have their default names and will collide
# when running multiple engines in the same folder!
cmd = f"{self.mdrun_executable} -noappend -s {tpr}"
Expand Down Expand Up @@ -1026,7 +1026,7 @@
# purposes) be used as a drop-in replacement. Therefore we only need to
# reimplement `_start_gmx_mdrun()`, `_acquire_resources_gmx_mdrun()` and
# `_cleanup_gmx_mdrun()` to have a working SlurmGmxEngine.
# TODO: use SLURM also for grompp?! (would it make stuff faster?)

Check warning on line 1029 in src/asyncmd/gromacs/mdengine.py

View workflow job for this annotation

GitHub Actions / pylint

W0511

TODO: use SLURM also for grompp?! (would it make stuff faster?)
# I (hejung) think probably not by much because we already use
# asyncios subprocess for grompp (i.e. do it asynchronous) and grompp
# will most likely not take much resources on the login (local) node
Expand Down
13 changes: 7 additions & 6 deletions src/asyncmd/gromacs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
The general variants of these functions can be found in asyncmd.utils.
"""
import os
import math
import logging
import aiofiles.os

Expand Down Expand Up @@ -58,12 +59,12 @@ def get_value_from_mdp(k):
v = mdp[k]
except KeyError:
# not set, defaults to 0
v = float("inf")
v = math.inf
else:
# need to check for 0 (== no output!) in case somone puts the
# need to check for 0 (== no output!) in case someone puts the
# defaults (or reads an mdout.mdp where gmx lists all the defaults)
if not v:
v = float("inf")
v = math.inf
return v

if traj_type.upper() == "TRR":
Expand All @@ -75,15 +76,15 @@ def get_value_from_mdp(k):
vals = []
for k in keys:
vals += [get_value_from_mdp(k=k)]
if (nstout := min(vals)) == float("inf"):
if (nstout := min(vals)) == math.inf:
raise ValueError(f"The MDP you passed results in no {traj_type} "
+ "trajectory output.")
if traj_type.upper == "TRR":
# additional checks that nstvout and nstfout are multiples of nstxout
# (if they are defined)
additional_keys = ["nstvout", "nstfout"]
for k in additional_keys:
if (v := get_value_from_mdp(k=k)) != float("inf"):
if (v := get_value_from_mdp(k=k)) != math.inf:
if v % nstout:
logger.warning("%s trajectory output is not a multiple of "
"the nstxout frequency (%s=%d, nstxout=%d).",
Expand Down Expand Up @@ -136,7 +137,7 @@ async def get_all_file_parts(folder: str, deffnm: str, file_ending: str) -> "lis
deffnm : str
deffnm (prefix of filenames) used in the simulation.
file_ending : str
File ending of the requested filetype (with or without preceeding ".").
File ending of the requested filetype (with or without preceding ".").

Returns
-------
Expand Down
4 changes: 2 additions & 2 deletions src/asyncmd/slurm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def set_all_slurm_settings(*, sinfo_executable: str = "sinfo",
List of nodes to exclude in job submissions, by default None, which
results in no excluded nodes.
"""
# pylint: disable-next=global-variable-not-assigned
# pylint: disable-next=global-statement
global SlurmProcess
SlurmProcess.slurm_cluster_mediator = SlurmClusterMediator(
sinfo_executable=sinfo_executable,
Expand Down Expand Up @@ -124,7 +124,7 @@ def set_slurm_settings(*, sinfo_executable: str | None = None,
List of nodes to exclude in job submissions, by default None, which
results in no excluded nodes.
"""
# pylint: disable-next=global-variable-not-assigned
# pylint: disable-next=global-statement
global SlurmProcess
# collect options for slurm cluster mediator
mediator_options: dict[str, str | int | list[str]] = {}
Expand Down
2 changes: 2 additions & 0 deletions src/asyncmd/trajectory/propagate.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@
folder=workdir,
deffnm=deffnm,
file_ending=ending.lower(),
engine=self.engine_cls,
)
# make sure we dont miss anything because we have different
# capitalization
Expand All @@ -261,12 +262,13 @@
folder=workdir,
deffnm=deffnm,
file_ending=ending.upper(),
engine=self.engine_cls,
)
await asyncio.gather(*(aiofiles.os.unlink(f)
for f in parts_to_remove
)
)
# TODO: address the note below?

Check warning on line 271 in src/asyncmd/trajectory/propagate.py

View workflow job for this annotation

GitHub Actions / pylint

W0511

TODO: address the note below?
# NOTE: this is a bit hacky: we just try to remove the offset and
# lock files for every file we remove (since we do not know
# if the file we remove is a trajectory [and therefore
Expand Down
24 changes: 17 additions & 7 deletions src/asyncmd/trajectory/trajectory_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,13 @@ def __init__(self, traj_hash: int, traj_files: list[str]) -> None:
)
# setup (writeable) main cache if we have it
if writeable_h5py_cache is None:
logger.warning("Initializing a Trajectory cache in h5py with only "
"read-only h5py.Groups associated. Newly calculated "
"function values will not be cached!")
# This can spam the log, because it happens once per trajectory that is
# read from a read-only storage, e.g. for analysis
logger.debug("Initializing h5py Trajectory cache with only "
"read-only h5py.Groups associated. Newly calculated "
"function values will not be cached! (trajectory files=%s)",
traj_files,
)
self._main_cache = None
else:
self._main_cache = OneH5PYGroupTrajectoryFunctionValueCache(
Expand Down Expand Up @@ -542,10 +546,16 @@ def deregister_h5py_cache(self, h5py_cache: "h5py.File | h5py.Group") -> None:
"""
if self._main_cache is not None:
if self._main_cache.h5py_cache is h5py_cache:
logger.warning(
"Deregistering the writeable (main) cache (%s). "
"Newly calculated function values will not be cached!",
h5py_cache
# This can spam the log, because it happens once per trajectory
# in existence when closing/deregistering a h5py cache.
# We warn once when deregistering the h5py cache (in central config.py)
# and use debug here for now
logger.debug(
"Deregistering the writeable (main) cache (%s) for Trajectory "
"consisting of files %s."
"Newly calculated function values will not be cached until "
"a new h5py cache group is registered or the cache type changed!",
h5py_cache, self._traj_files,
)
self._main_cache = None
# found it so it can not be a fallback_cache, so get out of here
Expand Down
47 changes: 37 additions & 10 deletions src/asyncmd/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
It also includes various functions to retrieve or ensure important parameters from
MDConfig/MDEngine combinations, such as nstout_from_mdconfig and ensure_mdconfig_options.
"""
import logging

from .mdengine import MDEngine
from .mdconfig import MDConfig
from .trajectory.trajectory import Trajectory
Expand All @@ -28,7 +30,11 @@
from .gromacs import mdconfig as gmx_config


async def get_all_traj_parts(folder: str, deffnm: str, engine: MDEngine) -> list[Trajectory]:
logger = logging.getLogger(__name__)


async def get_all_traj_parts(folder: str, deffnm: str, engine: MDEngine | type[MDEngine],
) -> list[Trajectory]:
"""
List all trajectories in folder by given engine class with given deffnm.

Expand All @@ -37,10 +43,12 @@ async def get_all_traj_parts(folder: str, deffnm: str, engine: MDEngine) -> list
folder : str
Absolute or relative path to a folder.
deffnm : str
deffnm used by the engines simulation run from which we want the trajs.
engine : MDEngine
The engine that produced the trajectories
(or one from the same class and with similar init args)
deffnm used by the engines simulation run from which we want the trajectories.
engine : MDEngine | type[MDEngine]
The engine that produced the trajectories (or one from the same class
and with similar init args). Note that it is also possible to pass an
uninitialized engine class, but then the default trajectory output type
will be returned.

Returns
-------
Expand All @@ -52,7 +60,17 @@ async def get_all_traj_parts(folder: str, deffnm: str, engine: MDEngine) -> list
ValueError
Raised when the engine class is unknown.
"""
if isinstance(engine, (gmx_engine.GmxEngine, gmx_engine.SlurmGmxEngine)):
# test for uninitialized engine classes, we warn but return the default traj type
if isinstance(engine, type) and issubclass(engine, MDEngine):
logger.warning("Engine %s is not initialized, i.e. it is an engine class. "
"Returning the default output trajectory type for this "
"engine class.", engine)
if (
isinstance(engine, (gmx_engine.GmxEngine, gmx_engine.SlurmGmxEngine))
or (isinstance(engine, type) # check that it is a type otherwise issubclass might not work
and issubclass(engine, (gmx_engine.GmxEngine, gmx_engine.SlurmGmxEngine))
)
):
return await gmx_utils.get_all_traj_parts(folder=folder, deffnm=deffnm,
traj_type=engine.output_traj_type,
)
Expand All @@ -61,6 +79,7 @@ async def get_all_traj_parts(folder: str, deffnm: str, engine: MDEngine) -> list


async def get_all_file_parts(folder: str, deffnm: str, file_ending: str,
engine: MDEngine | type[MDEngine],
) -> list[str]:
"""
Find and return all files with given ending produced by a `MDEngine`.
Expand All @@ -75,16 +94,24 @@ async def get_all_file_parts(folder: str, deffnm: str, file_ending: str,
deffnm (prefix of filenames) used in the simulation.
file_ending : str
File ending of the requested filetype (with or without preceding ".").
engine : MDEngine | type[MDEngine]
The engine or engine class that produced the file parts.

Returns
-------
list[str]
Ordered list of filepaths for files with given ending.
"""
# TODO: we just use the function from the gromacs engines for now, i.e. we
# assume that the filename scheme will be the same for other engines
return await gmx_utils.get_all_file_parts(folder=folder, deffnm=deffnm,
file_ending=file_ending)
if (
isinstance(engine, (gmx_engine.GmxEngine, gmx_engine.SlurmGmxEngine))
or (isinstance(engine, type) # check that it is a type otherwise issubclass might not work
and issubclass(engine, (gmx_engine.GmxEngine, gmx_engine.SlurmGmxEngine))
)
):
return await gmx_utils.get_all_file_parts(folder=folder, deffnm=deffnm,
file_ending=file_ending)
raise ValueError(f"Engine {engine} is not a known MDEngine (class)."
+ " Maybe someone just forgot to add the function?")


def nstout_from_mdconfig(mdconfig: MDConfig, output_traj_type: str) -> int:
Expand Down
5 changes: 3 additions & 2 deletions tests/helper_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ class NoOpMDEngine(MDEngine):
current_trajectory = None
output_traj_type = "TEST"
steps_done = 0
async def apply_constraints(self, conf_in: Trajectory, conf_out_name: str) -> Trajectory:
async def apply_constraints(self, conf_in: Trajectory, conf_out_name: str,
*, workdir: str = ".") -> Trajectory:
pass
async def prepare(self, starting_configuration: Trajectory, workdir: str, deffnm: str) -> None:
pass
async def prepare_from_files(self, workdir: str, deffnm: str) -> None:
pass
async def run_walltime(self, walltime: float) -> Trajectory:
async def run_walltime(self, walltime: float, max_steps: int | None = None) -> Trajectory:
pass
async def run_steps(self, nsteps: int, steps_per_part: bool = False) -> Trajectory:
pass
Expand Down
Loading
Loading