Skip to content

Commit

Permalink
use jinja2 for run_script.py
Browse files Browse the repository at this point in the history
  • Loading branch information
basnijholt committed Apr 16, 2020
1 parent 46cb844 commit 704a949
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 81 deletions.
2 changes: 1 addition & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[settings]
known_third_party = adaptive,cloudpickle,ipyparallel,ipywidgets,nbconvert,pandas,psutil,setuptools,structlog,tinydb,toolz,tqdm,zmq
known_third_party = adaptive,cloudpickle,ipyparallel,ipywidgets,jinja2,nbconvert,pandas,psutil,setuptools,structlog,tinydb,toolz,tqdm,zmq
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
recursive-include adaptive_scheduler *.py
include adaptive_scheduler/run_script.py.j2
include LICENSE
include README.rst
include requirements.txt
80 changes: 80 additions & 0 deletions adaptive_scheduler/run_script.py.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!/usr/bin/env python3
# {{ run_script_fname }}, automatically generated
# by `adaptive_scheduler.server_support._make_default_run_script()`.
{% if executor_type == "dask-mpi" %}
from dask_mpi import initialize
initialize()
{% endif %}
import argparse
from contextlib import suppress

import adaptive
import cloudpickle
from adaptive_scheduler import client_support
{%- if executor_type == "mpi4py" %}
from mpi4py.futures import MPIPoolExecutor
{% elif executor_type == "ipyparallel" %}
from adaptive_scheduler.utils import connect_to_ipyparallel
{% elif executor_type == "dask-mpi" %}
from distributed import Client
{% elif executor_type == "process-pool" %}
from loky import get_reusable_executor
{% endif %}
if __name__ == "__main__": # ← use this, see warning @ https://bit.ly/2HAk0GG

# parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("--profile", action="store", dest="profile", type=str)
parser.add_argument("--n", action="store", dest="n", type=int)
parser.add_argument("--log-fname", action="store", dest="log_fname", type=str)
parser.add_argument("--job-id", action="store", dest="job_id", type=str)
parser.add_argument("--name", action="store", dest="name", type=str)
args = parser.parse_args()

# the address of the "database manager"
url = "{{ url }}"

# ask the database for a learner that we can run which we log in `args.log_fname`
learner, fname = client_support.get_learner(
url, args.log_fname, args.job_id, args.name
)

# load the data
with suppress(Exception):
learner.load(fname)

# connect to the executor
{%- if executor_type == "mpi4py" %}
executor = MPIPoolExecutor()
{% elif executor_type == "ipyparallel" %}
executor = connect_to_ipyparallel(profile=args.profile, n=args.n)
{% elif executor_type == "dask-mpi" %}
executor = Client()
{% elif executor_type == "process-pool" %}
executor = get_reusable_executor(max_workers=args.n)
{% endif %}
# this is serialized by cloudpickle.dumps
runner_kwargs = cloudpickle.loads(
{{ serialized_runner_kwargs }}
)

# run until `some_goal` is reached with an `MPIPoolExecutor`
runner = adaptive.Runner(learner, executor=executor, **runner_kwargs)

# periodically save the data (in case the job dies)
runner.start_periodic_saving(dict(fname=fname), interval={{ save_interval }})

# log progress info in the job output script, optional
client_support.log_info(runner, interval={{ log_interval }})

# block until runner goal reached
runner.ioloop.run_until_complete(runner.task)

# save once more after the runner is done
learner.save(fname)

# tell the database that this learner has reached its goal
client_support.tell_done(url, fname)

# log once more after the runner is done
client_support.log_info(runner, interval=0)
97 changes: 17 additions & 80 deletions adaptive_scheduler/server_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import os
import shutil
import socket
import textwrap
import time
from concurrent.futures import ThreadPoolExecutor
from contextlib import suppress
Expand All @@ -17,6 +16,7 @@

import adaptive
import cloudpickle
import jinja2
import pandas as pd
import structlog
import zmq
Expand Down Expand Up @@ -119,7 +119,7 @@ def __init__(
self.scheduler = scheduler
self.db_fname = db_fname
self.learners = learners
self.fnames = fnames
self.fnames = list(map(str, fnames)) # convert from pathlib.Path
self.overwrite_db = overwrite_db

self.defaults = dict(
Expand Down Expand Up @@ -523,92 +523,29 @@ def _make_default_run_script(
runner_kwargs = dict(default_runner_kwargs, goal=goal, **(runner_kwargs or {}))
serialized_runner_kwargs = cloudpickle.dumps(runner_kwargs)

if executor_type == "mpi4py":
import_line = "from mpi4py.futures import MPIPoolExecutor"
executor_line = "MPIPoolExecutor()"
elif executor_type == "ipyparallel":
import_line = "from adaptive_scheduler.utils import connect_to_ipyparallel"
executor_line = "connect_to_ipyparallel(profile=args.profile, n=args.n)"
elif executor_type == "dask-mpi":
if executor_type not in ("mpi4py", "ipyparallel", "dask-mpi", "process-pool"):
raise NotImplementedError(
"Use 'ipyparallel', 'dask-mpi', 'mpi4py' or 'process-pool'."
)

if executor_type == "dask-mpi":
try:
import dask_mpi # noqa: F401
except ModuleNotFoundError as e:
msg = "You need to have 'dask-mpi' installed to use `executor_type='dask-mpi'`."
raise Exception(msg) from e
import_line = "from distributed import Client"
executor_line = "Client()"
elif executor_type == "process-pool":
import_line = "from loky import get_reusable_executor"
executor_line = "get_reusable_executor(max_workers=args.n)"
else:
raise NotImplementedError(
"Use 'ipyparallel', 'dask-mpi', 'mpi4py' or 'process-pool'."
)

template = textwrap.dedent(
f"""\
#!/usr/bin/env python3
# {run_script_fname}, automatically generated
# by `adaptive_scheduler.server_support._make_default_run_script()`.
import argparse
from contextlib import suppress
import adaptive
import cloudpickle
from adaptive_scheduler import client_support
{import_line}
if __name__ == "__main__": # ← use this, see warning @ https://bit.ly/2HAk0GG
# parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("--profile", action="store", dest="profile", type=str)
parser.add_argument("--n", action="store", dest="n", type=int)
parser.add_argument("--log-fname", action="store", dest="log_fname", type=str)
parser.add_argument("--job-id", action="store", dest="job_id", type=str)
parser.add_argument("--name", action="store", dest="name", type=str)
args = parser.parse_args()
# the address of the "database manager"
url = "{url}"
# ask the database for a learner that we can run which we log in `args.log_fname`
learner, fname = client_support.get_learner(
url, args.log_fname, args.job_id, args.name
)
# load the data
with suppress(Exception):
learner.load(fname)
# connect to the executor
executor = {executor_line}
# this is serialized by cloudpickle.dumps
runner_kwargs = cloudpickle.loads({serialized_runner_kwargs})
# run until `some_goal` is reached with an `MPIPoolExecutor`
runner = adaptive.Runner(learner, executor=executor, **runner_kwargs)

# periodically save the data (in case the job dies)
runner.start_periodic_saving(dict(fname=fname), interval={save_interval})
with open(Path(__file__).parent / "run_script.py.j2") as f:
empty = "".join(f.readlines())

# log progress info in the job output script, optional
client_support.log_info(runner, interval={log_interval})
# block until runner goal reached
runner.ioloop.run_until_complete(runner.task)
# save once more after the runner is done
learner.save(fname)
# tell the database that this learner has reached its goal
client_support.tell_done(url, fname)
"""
template = jinja2.Template(empty).render(
run_script_fname=run_script_fname,
executor_type=executor_type,
url=url,
serialized_runner_kwargs=serialized_runner_kwargs,
save_interval=save_interval,
log_interval=log_interval,
)
if executor_type == "dask-mpi":
template = "from dask_mpi import initialize; initialize()\n" + template

with open(run_script_fname, "w") as f:
f.write(template)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ adaptive
cloudpickle
dill
ipyparallel
jinja2
loky
mpi4py
pandas
Expand Down

0 comments on commit 704a949

Please sign in to comment.