Skip to content

Commit

Permalink
Worker launch CLI (#272)
Browse files Browse the repository at this point in the history
* add password authentication to redis connections

* update tests to use the settings password

* use the username and password when connecting a celery worker

* use the password in the settings

* try without explicit username

* add cli point to create bespokefit workers

* expose the cli command

* add cli test

* update toolkit version and pin pint

* remove pint pin

* fix optimised parameter caching and added test

* update docs with launch-worker command
  • Loading branch information
jthorton authored Jul 19, 2023
1 parent 656dca3 commit 4c4422f
Show file tree
Hide file tree
Showing 15 changed files with 239 additions and 34 deletions.
2 changes: 1 addition & 1 deletion devtools/conda-envs/no_openeye.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ dependencies:
- rdkit >=22
- ambertools >=22
- openff-utilities
- openff-toolkit-base =0.11
- openff-toolkit-base >=0.11,<0.14
- openff-forcefields
- openff-interchange
- openff-qcsubmit
Expand Down
2 changes: 1 addition & 1 deletion devtools/conda-envs/test-env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ dependencies:
- click-option-group
- rdkit
- openff-utilities
- openff-toolkit-base =0.11
- openff-toolkit-base >=0.11,<0.14
- openff-forcefields
- openff-interchange
- openff-units
Expand Down
45 changes: 45 additions & 0 deletions docs/users/bespoke-executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,52 @@ different stages may run in parallel.

See the [quick start guide](quick_start_chapter) for details on submitting jobs to a running bespoke executor.

(executor_distributed_workers)=
## Distributed Workers

Bespokefit is able to make use of distributed resources across HPC clusters or multiple machines on the same network via
the [Celery] framework which underpins the workers. In this example we assume the workers and bespoke executor are on
different machines. First gather the IP address of the machine which will be running the bespoke executor

```shell
ifconfig -a
```

A bespoke executor with no local workers can then be launched using the `launch` command

```shell
openff-bespoke executor launch --directory "bespoke-executor" \
--n-fragmenter-workers 0 \
--n-optimizer-workers 0 \
--n-qc-compute-workers 0
```

We now need to provide the address of the executor inorder to connect the remote workers. BespokeFit has a number of run
time [settings] which can be configured via environment variables. The address of the executor should be set to
`BEFLOW_REDIS_ADDRESS` in the environment the workers will be launched from using

```shell
export BEFLOW_REDIS_ADDRESS="address"
```

Bespoke workers of a given type can then be launched using the `launch-worker` command, the following would start a
fragmentation worker.

```shell
openff-bespoke launch-worker --worker-type fragmenter
```

Provided the worker starts successfully a log file will be generated called `celery-fragmenter.log` which should be
checked to make sure the worker has connected to the executor.

:::{note}
The `launch-worker` command does not allow for configuration of the worker resources, it is recommended that the
corresponding environment variable [settings] are used instead.
:::


[QCEngine]: http://docs.qcarchive.molssi.org/projects/QCEngine/en/stable/
[settings]: openff.bespokefit.utilities.Settings

(executor_using_api)=
## Using the API
Expand Down
2 changes: 2 additions & 0 deletions openff/bespokefit/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from openff.bespokefit.cli.combine import combine_cli
from openff.bespokefit.cli.executor import executor_cli
from openff.bespokefit.cli.prepare import prepare_cli
from openff.bespokefit.cli.worker import worker_cli


@click.group()
Expand All @@ -15,3 +16,4 @@ def cli():
cli.add_command(prepare_cli)
cli.add_command(cache_cli)
cli.add_command(combine_cli)
cli.add_command(worker_cli)
4 changes: 3 additions & 1 deletion openff/bespokefit/cli/executor/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ def validate_redis_connection(console: "rich.Console", allow_existing: bool = Tr
settings = current_settings()

if not is_redis_available(
host=settings.BEFLOW_REDIS_ADDRESS, port=settings.BEFLOW_REDIS_PORT
host=settings.BEFLOW_REDIS_ADDRESS,
port=settings.BEFLOW_REDIS_PORT,
password=settings.BEFLOW_REDIS_PASSWORD,
):
return

Expand Down
62 changes: 62 additions & 0 deletions openff/bespokefit/cli/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import importlib

import click
import rich
from rich import pretty

from openff.bespokefit.cli.utilities import print_header
from openff.bespokefit.executor.services import current_settings
from openff.bespokefit.executor.utilities.celery import spawn_worker

worker_types = ["fragmenter", "qc-compute", "optimizer"]


@click.command("launch-worker")
@click.option(
"--worker-type",
type=click.Choice(worker_types),
help="The type of bespokefit worker to launch",
required=True,
)
def worker_cli(worker_type: str):
"""
Launch a single worker of the requested type in the main process.
Used to connect workers to a remote bespokefit server.
Note:
By default bespokefit will automatically use all cores and memory made available to the worker which should
be declared in the job submission script. To change these defaults see the settings `BEFLOW_QC_COMPUTE_WORKER_N_CORES` &
`BEFLOW_QC_COMPUTE_WORKER_MAX_MEM`.
Args:
worker_type: The alias name of the worker type which should be started.
"""

pretty.install()

console = rich.get_console()
print_header(console)

worker_status = console.status(f"launching {worker_type} worker")
worker_status.start()

settings = current_settings()

if worker_type == "fragmenter":
worker_settings = settings.fragmenter_settings
elif worker_type == "qc-compute":
worker_settings = settings.qc_compute_settings
else:
worker_settings = settings.optimizer_settings

worker_module = importlib.import_module(worker_settings.import_path)
importlib.reload(worker_module)
worker_app = getattr(worker_module, "celery_app")

worker_status.stop()
console.print(f"[[green]✓[/green]] bespoke {worker_type} worker launched")

spawn_worker(worker_app, concurrency=1, asynchronous=False)
4 changes: 3 additions & 1 deletion openff/bespokefit/executor/services/optimizer/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def optimize(self, optimization_input_json: str) -> str:
# cache the final parameters
if (
is_redis_available(
host=settings.BEFLOW_REDIS_ADDRESS, port=settings.BEFLOW_REDIS_PORT
host=settings.BEFLOW_REDIS_ADDRESS,
port=settings.BEFLOW_REDIS_PORT,
password=settings.BEFLOW_REDIS_PASSWORD,
)
and result.refit_force_field is not None
):
Expand Down
7 changes: 5 additions & 2 deletions openff/bespokefit/executor/utilities/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from openff.bespokefit.executor.services.models import Error
from openff.bespokefit.executor.utilities.typing import Status
from openff.bespokefit.utilities import current_settings


class TaskInformation(TypedDict):
Expand All @@ -33,14 +34,16 @@ def get_status(task_result: AsyncResult) -> Status:
def configure_celery_app(
app_name: str, redis_connection: Redis, include: List[str] = None
):
settings = current_settings()
redis_host_name = redis_connection.connection_pool.connection_kwargs["host"]
redis_port = redis_connection.connection_pool.connection_kwargs["port"]
redis_db = redis_connection.connection_pool.connection_kwargs["db"]
password = settings.BEFLOW_REDIS_PASSWORD

celery_app = Celery(
app_name,
backend=f"redis://{redis_host_name}:{redis_port}/{redis_db}",
broker=f"redis://{redis_host_name}:{redis_port}/{redis_db}",
backend=f"redis://:{password}@{redis_host_name}:{redis_port}/{redis_db}",
broker=f"redis://:{password}@{redis_host_name}:{redis_port}/{redis_db}",
include=include,
)

Expand Down
53 changes: 38 additions & 15 deletions openff/bespokefit/executor/utilities/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from openff.bespokefit.executor.services import current_settings

__REDIS_VERSION: int = 1
__CONNECTION_POOL: Dict[Tuple[str, int, Optional[int], bool], redis.Redis] = {}
__CONNECTION_POOL: Dict[
Tuple[str, int, Optional[int], Optional[str], bool], redis.Redis
] = {}


class RedisNotConfiguredError(BaseException):
Expand All @@ -33,30 +35,31 @@ def expected_redis_config_version() -> int:

def connect_to_default_redis(validate: bool = True) -> redis.Redis:
"""Connects to a redis server using the settings defined by the
`BEFLOW_REDIS_ADDRESS`, `BEFLOW_REDIS_PORT` and `BEFLOW_REDIS_PORT` settings.
`BEFLOW_REDIS_ADDRESS`, `BEFLOW_REDIS_PORT`, `BEFLOW_REDIS_DB` and `BEFLOW_REDIS_PASSWORD` settings.
"""

settings = current_settings()

return connect_to_redis(
settings.BEFLOW_REDIS_ADDRESS,
settings.BEFLOW_REDIS_PORT,
settings.BEFLOW_REDIS_DB,
host=settings.BEFLOW_REDIS_ADDRESS,
port=settings.BEFLOW_REDIS_PORT,
db=settings.BEFLOW_REDIS_DB,
password=settings.BEFLOW_REDIS_PASSWORD,
validate=validate,
)


def connect_to_redis(
host: str, port: int, db: int, validate: bool = True
host: str, port: int, db: int, validate: bool = True, password: Optional[str] = None
) -> redis.Redis:
"""Connects to a redis server using the specified settings."""

connection_key = (host, port, db, validate)
connection_key = (host, port, db, password, validate)

if connection_key in __CONNECTION_POOL:
return __CONNECTION_POOL[connection_key]

connection = redis.Redis(host=host, port=port, db=db)
connection = redis.Redis(host=host, port=port, db=db, password=password)

if validate:
version = connection.get("openff-bespokefit:redis-version")
Expand All @@ -82,12 +85,14 @@ def connect_to_redis(
return connection


def is_redis_available(host: str, port: int = 6363) -> bool:
def is_redis_available(
host: str, port: int = 6363, password: Optional[str] = None
) -> bool:
"""Returns whether a server running on the local host on a particular port is
available.
"""

redis_client = redis.Redis(host=host, port=port)
redis_client = redis.Redis(host=host, port=port, password=password)

try:
redis_client.get("null")
Expand All @@ -110,6 +115,7 @@ def launch_redis(
persistent: bool = True,
terminate_at_exit: bool = True,
) -> subprocess.Popen:
settings = current_settings()
redis_server_path = shutil.which("redis-server")

if redis_server_path is None:
Expand All @@ -126,14 +132,17 @@ def launch_redis(
"correctly installed."
)

if is_redis_available("localhost", port):
if is_redis_available(
host="localhost", port=port, password=settings.BEFLOW_REDIS_PASSWORD
):
raise RuntimeError(f"There is already a server running at localhost:{port}")

redis_save_exists = os.path.isfile(
"redis.db" if not directory else os.path.join(directory, "redis.db")
)

redis_command = f"redis-server --port {str(port)} --dbfilename redis.db"
# to allow connections from other machines we need a default user password
redis_command = f"redis-server --port {str(port)} --dbfilename redis.db --requirepass {settings.BEFLOW_REDIS_PASSWORD}"

if directory:
redis_command = f"{redis_command} --dir {directory}"
Expand All @@ -156,7 +165,9 @@ def launch_redis(
timeout = True

for i in range(0, 60):
if is_redis_available("localhost", port):
if is_redis_available(
host="localhost", port=port, password=settings.BEFLOW_REDIS_PASSWORD
):
timeout = False
break

Expand All @@ -166,12 +177,24 @@ def launch_redis(
raise RuntimeError("The redis server failed to start.")

try:
connect_to_redis("localhost", port, 0, validate=True)
connect_to_redis(
host="localhost",
port=port,
db=settings.BEFLOW_REDIS_DB,
password=settings.BEFLOW_REDIS_PASSWORD,
validate=True,
)
except RedisNotConfiguredError:
if redis_save_exists:
raise

connection = connect_to_redis("localhost", port, 0, validate=False)
connection = connect_to_redis(
host="localhost",
port=port,
db=settings.BEFLOW_REDIS_DB,
password=settings.BEFLOW_REDIS_PASSWORD,
validate=False,
)
connection.set(
"openff-bespokefit:redis-version", expected_redis_config_version()
)
Expand Down
1 change: 0 additions & 1 deletion openff/bespokefit/tests/cli/executor/test_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def mock_start(self, *args, **kwargs):
"--no-launch-redis",
],
)
print(output.output)
assert output.exit_code == 0


Expand Down
27 changes: 27 additions & 0 deletions openff/bespokefit/tests/cli/test_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import pytest

from openff.bespokefit.cli.worker import worker_cli
from openff.bespokefit.executor.utilities import celery


@pytest.mark.parametrize(
"worker_type",
[
pytest.param("fragmenter", id="fragmenter"),
pytest.param("qc-compute", id="qc-compute"),
pytest.param("optimizer", id="optimizer"),
],
)
def test_launch_worker(worker_type, runner, monkeypatch):
"""Test launching a worker of the correct type, note we do not start the worker this is tested
in test_celery/test_spawn_worker
"""

def mock_spawn(*args):
return True

monkeypatch.setattr(celery, "_spawn_worker", mock_spawn)

output = runner.invoke(worker_cli, args=["--worker-type", worker_type])
assert output.exit_code == 0
assert worker_type in output.output
Loading

0 comments on commit 4c4422f

Please sign in to comment.