Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel torsiondrive optimisations #277

Merged
merged 7 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion openff/bespokefit/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ def worker_cli(worker_type: str):

settings = current_settings()

worker_kwargs = {}

if worker_type == "fragmenter":
worker_settings = settings.fragmenter_settings
elif worker_type == "qc-compute":
worker_settings = settings.qc_compute_settings
worker_kwargs["pool"] = "solo"
else:
worker_settings = settings.optimizer_settings

Expand All @@ -59,4 +62,4 @@ def worker_cli(worker_type: str):
worker_status.stop()
console.print(f"[[green]✓[/green]] bespoke {worker_type} worker launched")

spawn_worker(worker_app, concurrency=1, asynchronous=False)
spawn_worker(worker_app, concurrency=1, asynchronous=False, **worker_kwargs)
10 changes: 8 additions & 2 deletions openff/bespokefit/executor/services/qcgenerator/qcengine.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from multiprocessing import Pool
from multiprocessing import current_process, get_context
from typing import Dict, List, Union

from qcelemental.models import FailedOperation
Expand Down Expand Up @@ -43,6 +43,10 @@ def _spawn_optimizations(
settings = current_settings()
program = input_model.optimization_spec.keywords["program"]
opts_per_worker = settings.BEFLOW_QC_COMPUTE_WORKER_N_TASKS
# we can only split the tasks if the celery worker is the main process so if not set back to 1
if current_process().name != "MainProcess":
opts_per_worker = 1

if program == "psi4" and opts_per_worker == "auto":
# we recommend 8 cores per worker for psi4 from our qcfractal jobs
opts_per_worker = max([int(config.ncores / 8), 1])
Expand All @@ -55,7 +59,9 @@ def _spawn_optimizations(
# split the resources based on the number of tasks
n_workers = int(min([n_jobs, opts_per_worker]))
opt_config = _divide_config(config=config, n_workers=n_workers)
with Pool(processes=n_workers) as pool:

# Using fork can hang on our local HPC so pin to use spawn
with get_context("spawn").Pool(processes=n_workers) as pool:
tasks = {
grid_point: [
pool.apply_async(
Expand Down
11 changes: 9 additions & 2 deletions openff/bespokefit/executor/services/qcgenerator/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,18 @@ def compute_torsion_drive(task_json: str) -> TorsionDriveResult:
),
)

# run all torsiondrives through our custom procedure which handles parallel optimisations
return_value = qcengine.compute_procedure(
input_schema, "torsiondrive", raise_error=True, local_options=_task_config()
input_schema,
"TorsionDriveParallel",
raise_error=True,
task_config=_task_config(),
)

if isinstance(return_value, TorsionDriveResult):
_task_logger.info(
f"1D TorsionDrive successfully completed in {return_value.provenance.wall_time}"
)
return_value = TorsionDriveResult(
**return_value.dict(exclude={"optimization_history", "stdout", "stderr"}),
optimization_history={},
Expand Down Expand Up @@ -204,7 +211,7 @@ def compute_optimization(
input_schema,
task.optimization_spec.program,
raise_error=True,
local_options=_task_config(),
task_config=_task_config(),
)

if isinstance(return_value, OptimizationResult):
Expand Down
7 changes: 4 additions & 3 deletions openff/bespokefit/executor/utilities/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,20 @@ def configure_celery_app(
return celery_app


def _spawn_worker(celery_app, concurrency: int = 1):
def _spawn_worker(celery_app, concurrency: int = 1, **kwargs):
worker = celery_app.Worker(
concurrency=concurrency,
loglevel="INFO",
logfile=f"celery-{celery_app.main}.log",
quiet=True,
hostname=celery_app.main,
**kwargs,
)
worker.start()


def spawn_worker(
celery_app, concurrency: int = 1, asynchronous: bool = True
celery_app, concurrency: int = 1, asynchronous: bool = True, **kwargs
) -> Optional[multiprocessing.Process]:
if concurrency < 1:
return
Expand All @@ -81,7 +82,7 @@ def spawn_worker(
return worker_process

else:
_spawn_worker(celery_app, concurrency)
_spawn_worker(celery_app, concurrency, **kwargs)


def get_task_information(app: Celery, task_id: str) -> TaskInformation:
Expand Down
2 changes: 1 addition & 1 deletion openff/bespokefit/tests/cli/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_launch_worker(worker_type, runner, monkeypatch):
in test_celery/test_spawn_worker
"""

def mock_spawn(*args):
def mock_spawn(*args, **kwargs):
return True

monkeypatch.setattr(celery, "_spawn_worker", mock_spawn)
Expand Down