Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove the learners_file and pass learners directly #39

Merged
merged 27 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4b90053
remove the learners_file and pass learners directly
basnijholt Apr 9, 2020
3046ad6
attempt to fix "zmq.error.Again: Resource temporarily unavailable"
basnijholt Apr 9, 2020
5296cb4
use cloudpickle and send_serialized and recv_serialized
basnijholt Apr 9, 2020
eb92364
use loky for process-pool
basnijholt Apr 9, 2020
3494a1e
add loky as dependency
basnijholt Apr 9, 2020
d84404e
fix doc-string and types of get_learner
basnijholt Apr 14, 2020
b5c8831
use jinja2 for run_script.py
basnijholt Apr 16, 2020
2212dd3
add check_goal_on_start argument to RunManager
basnijholt Apr 21, 2020
0a0e939
only accept fnames to be of str because it failed with BalancingLearner
basnijholt Apr 22, 2020
12b4ed7
add copy_from_sequence_learner
basnijholt Apr 23, 2020
e156473
increase timeout to receive learner to 60s
basnijholt Apr 23, 2020
6d305a6
avoid file note found error in log parser
basnijholt Apr 23, 2020
24a7b8b
make sure _stop_request gets an existing fname
basnijholt Apr 23, 2020
49703ed
always use strings for fnames
basnijholt Apr 23, 2020
bc76101
fix _get_fnames(only_running=True)
basnijholt May 14, 2020
b3e1d82
add _stop_requests to handle a list of fnames
basnijholt May 14, 2020
68ff2ed
rename serialize, deserialize -> _serialize, _deserialize
basnijholt May 19, 2020
848af86
use cloudpickle with mpi4py
basnijholt May 19, 2020
b34bd2d
catch all exceptions
basnijholt May 19, 2020
31df175
change example
basnijholt May 19, 2020
6226706
remove learners_file.py from the docs
basnijholt May 19, 2020
237f7ee
use order of arguments
basnijholt May 19, 2020
026475a
catch fname is None
basnijholt May 19, 2020
7bdb95d
import BaseLearner directly
basnijholt May 19, 2020
35cf7e7
require adaptive>=0.11
basnijholt May 20, 2020
858e59f
also update environment.yml
basnijholt May 20, 2020
d241e95
add jinja2 and numpy to environment.yml
basnijholt May 20, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[settings]
known_third_party = adaptive,dill,ipyparallel,ipywidgets,nbconvert,numpy,pandas,psutil,setuptools,structlog,tinydb,toolz,tqdm,zmq
known_third_party = adaptive,cloudpickle,ipyparallel,ipywidgets,jinja2,nbconvert,numpy,pandas,psutil,setuptools,structlog,tinydb,toolz,tqdm,zmq
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
recursive-include adaptive_scheduler *.py
include adaptive_scheduler/run_script.py.j2
include LICENSE
include README.rst
include requirements.txt
45 changes: 21 additions & 24 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ Design goals
How does it work?
-----------------

You create a file where you define a bunch of ``learners`` and corresponding ``fnames`` such that they can be imported, like:
You create a bunch of ``learners`` and corresponding ``fnames`` such that they can be loaded, like:

.. code-block:: python

# learners_file.py
import adaptive
from functools import partial

Expand Down Expand Up @@ -69,8 +68,9 @@ Then you start a process that creates and submits as many job-scripts as there a
scheduler = adaptive_scheduler.scheduler.SLURM(cores=10) # every learner get this many cores

run_manager = adaptive_scheduler.server_support.RunManager(
scheduler=scheduler,
learners_file="learners_file.py",
scheduler,
learners,
fnames,
goal=goal,
log_interval=30, # write info such as npoints, cpu_usage, time, etc. to the job log file
save_interval=300, # save the data every 300 seconds
Expand All @@ -90,7 +90,7 @@ But how does it *really* work?
------------------------------

The `~adaptive_scheduler.server_support.RunManager` basically does the following.
So, *you* need to create a ``learners_file.py`` that defines ``N`` ``learners`` and ``fnames`` (like in the section above).
So, *you* need to create ``N`` ``learners`` and ``fnames`` (like in the section above).
Then a "job manager" writes and submits ``max(N, max_simultaneous_jobs)`` job scripts but *doesn't know* which learner it is going to run!
This is the responsibility of the "database manager", which keeps a database of ``job_id <--> learner``.
The job script starts a Python file ``run_learner.py`` in which the learner is run.
Expand All @@ -102,45 +102,42 @@ In a Jupyter notebook we can start the "job manager" and the "database manager",

import adaptive_scheduler
from adaptive_scheduler import server_support
from learners_file import learners, fnames

# create a scheduler
scheduler = adaptive_scheduler.scheduler.PBS(
cores=10,
run_script="run_learner.py",
)
scheduler = adaptive_scheduler.scheduler.SLURM(cores=10, run_script="run_learner.py",)

# create a new database that keeps track of job <-> learner
db_fname = "running.json"
url = server_support.get_allowed_url() # get a url where we can run the database_manager
database_manager = server_support.DatabaseManager(url, scheduler, db_fname, fnames)
url = (
server_support.get_allowed_url()
) # get a url where we can run the database_manager
database_manager = server_support.DatabaseManager(
url, scheduler, db_fname, learners, fnames
)
database_manager.start()

# create the Python script that runs a learner (run_learner.py)
server_support._make_default_run_script(
url=url,
learners_file="learners_file.py",
save_interval=300,
log_interval=30,
goal=None,
executor_type=scheduler.executor_type,
run_script_fname=scheduler.run_script,
url=url,
save_interval=300,
log_interval=30,
goal=None,
executor_type=scheduler.executor_type,
run_script_fname=scheduler.run_script,
)

# create unique names for the jobs
n_jobs = len(learners)
job_names = [f"test-job-{i}" for i in range(n_jobs)]

job_manager = server_support.JobManager(
job_names,
database_manager,
scheduler,
)
job_manager = server_support.JobManager(job_names, database_manager, scheduler)
job_manager.start()


Then when the job have been running for a while you can check ``server_support.parse_log_files(job_names, database_manager, scheduler)``.

And use ``scheduler.cancel(job_names)`` to cancel the jobs.

You don't actually ever have to leave the Jupter notebook, take a look at the `example notebook <https://github.com/basnijholt/adaptive-scheduler/blob/master/example.ipynb>`_.

Jupyter notebook example
Expand Down
58 changes: 22 additions & 36 deletions adaptive_scheduler/client_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
import zmq
from adaptive import AsyncRunner, BaseLearner

from adaptive_scheduler.utils import _get_npoints, log_exception
from adaptive_scheduler.utils import (
_deserialize,
_get_npoints,
_serialize,
log_exception,
maybe_lst,
)

ctx = zmq.Context()
logger = logging.getLogger("adaptive_scheduler.client")
Expand All @@ -32,22 +38,13 @@ def _add_log_file_handler(log_fname):


def get_learner(
learners: List[BaseLearner],
fnames: List[str],
url: str,
log_fname: str,
job_id: str,
job_name: str,
) -> Tuple[str, str]:
url: str, log_fname: str, job_id: str, job_name: str,
) -> Tuple[BaseLearner, Union[str, List[str]]]:
"""Get a learner from the database running at `url` and this learner's
process will be logged in `log_fname` and running under `job_id`.

Parameters
----------
learners : list of `adaptive.BaseLearner` isinstances
List of `learners` corresponding to `fnames`.
fnames : list
List of `fnames` corresponding to `learners`.
url : str
The url of the database manager running via
(`adaptive_scheduler.server_support.manage_database`).
Expand All @@ -60,6 +57,8 @@ def get_learner(

Returns
-------
learner : `adaptive.BaseLearner`
Learner that is chosen.
fname : str
The filename of the learner that was chosen.
"""
Expand All @@ -68,11 +67,12 @@ def get_learner(
"trying to get learner", job_id=job_id, log_fname=log_fname, job_name=job_name
)
with ctx.socket(zmq.REQ) as socket:
socket.setsockopt(zmq.LINGER, 0)
socket.connect(url)
socket.send_pyobj(("start", job_id, log_fname, job_name))
log.info(f"sent start signal, timeout after 10s.")
socket.setsockopt(zmq.RCVTIMEO, 10_000) # timeout after 10s
reply = socket.recv_pyobj()
socket.send_serialized(("start", job_id, log_fname, job_name), _serialize)
log.info(f"sent start signal, going to wait 60s for a reply.")
socket.setsockopt(zmq.RCVTIMEO, 60_000) # timeout after 60s
reply = socket.recv_serialized(_deserialize)
log.info("got reply", reply=str(reply))
if reply is None:
msg = f"No learners to be run."
Expand All @@ -83,25 +83,11 @@ def get_learner(
log_exception(log, "got an exception", exception=reply)
raise reply
else:
fname = reply
log.info(f"got fname")

def maybe_lst(fname: Union[List[str], str]):
if isinstance(fname, tuple):
# TinyDB converts tuples to lists
fname = list(fname)
return fname

try:
learner = next(l for l, f in zip(learners, fnames) if maybe_lst(f) == fname)
except StopIteration:
msg = "Learner with this fname doesn't exist in the database."
exception = RuntimeError(msg)
log_exception(log, msg, exception)
raise exception
learner, fname = reply
log.info(f"got fname and learner")

log.info("picked a learner")
return learner, fname
return learner, maybe_lst(fname)


def tell_done(url: str, fname: str) -> None:
Expand All @@ -118,10 +104,10 @@ def tell_done(url: str, fname: str) -> None:
log.info("goal reached! 🎉🎊🥳")
with ctx.socket(zmq.REQ) as socket:
socket.connect(url)
socket.send_pyobj(("stop", fname))
socket.send_serialized(("stop", fname), _serialize)
socket.setsockopt(zmq.RCVTIMEO, 10_000) # timeout after 10s
log.info("sent stop signal, timeout after 10s", fname=fname)
socket.recv_pyobj() # Needed because of socket type
log.info("sent stop signal, going to wait 10s for a reply", fname=fname)
socket.recv_serialized(_deserialize) # Needed because of socket type


def _get_log_entry(runner: AsyncRunner, npoints_start: int) -> Dict[str, Any]:
Expand Down
83 changes: 83 additions & 0 deletions adaptive_scheduler/run_script.py.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env python3
# {{ run_script_fname }}, automatically generated
# by `adaptive_scheduler.server_support._make_default_run_script()`.
{% if executor_type == "dask-mpi" %}
from dask_mpi import initialize
initialize()
{% endif %}
import argparse
from contextlib import suppress

import adaptive
import cloudpickle
from adaptive_scheduler import client_support
{%- if executor_type == "mpi4py" %}
import cloudpickle
from mpi4py import MPI
from mpi4py.futures import MPIPoolExecutor
MPI.pickle.__init__(cloudpickle.dumps, cloudpickle.loads)
{% elif executor_type == "ipyparallel" %}
from adaptive_scheduler.utils import connect_to_ipyparallel
{% elif executor_type == "dask-mpi" %}
from distributed import Client
{% elif executor_type == "process-pool" %}
from loky import get_reusable_executor
{% endif %}
if __name__ == "__main__": # ← use this, see warning @ https://bit.ly/2HAk0GG

# parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("--profile", action="store", dest="profile", type=str)
parser.add_argument("--n", action="store", dest="n", type=int)
parser.add_argument("--log-fname", action="store", dest="log_fname", type=str)
parser.add_argument("--job-id", action="store", dest="job_id", type=str)
parser.add_argument("--name", action="store", dest="name", type=str)
args = parser.parse_args()

# the address of the "database manager"
url = "{{ url }}"

# ask the database for a learner that we can run which we log in `args.log_fname`
learner, fname = client_support.get_learner(
url, args.log_fname, args.job_id, args.name
)

# load the data
with suppress(Exception):
learner.load(fname)

# connect to the executor
{%- if executor_type == "mpi4py" %}
executor = MPIPoolExecutor()
{% elif executor_type == "ipyparallel" %}
executor = connect_to_ipyparallel(profile=args.profile, n=args.n)
{% elif executor_type == "dask-mpi" %}
executor = Client()
{% elif executor_type == "process-pool" %}
executor = get_reusable_executor(max_workers=args.n)
{% endif %}
# this is serialized by cloudpickle.dumps
runner_kwargs = cloudpickle.loads(
{{ serialized_runner_kwargs }}
)

# run until `some_goal` is reached with an `MPIPoolExecutor`
runner = adaptive.Runner(learner, executor=executor, **runner_kwargs)

# periodically save the data (in case the job dies)
runner.start_periodic_saving(dict(fname=fname), interval={{ save_interval }})

# log progress info in the job output script, optional
client_support.log_info(runner, interval={{ log_interval }})

# block until runner goal reached
runner.ioloop.run_until_complete(runner.task)

# save once more after the runner is done
learner.save(fname)

# tell the database that this learner has reached its goal
client_support.tell_done(url, fname)

# log once more after the runner is done
client_support.log_info(runner, interval=0)
Loading