Skip to content

Commit

Permalink
Merge pull request #78 from esm-tools/fix/dask_dashboard
Browse files Browse the repository at this point in the history
Improve user interface for the dask dashboard
  • Loading branch information
pgierz authored Nov 27, 2024
2 parents c48f078 + fe11f31 commit d1bea94
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 10 deletions.
14 changes: 9 additions & 5 deletions examples/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ You can run the example via::

sbatch -A <YOUR ACCOUNT> pymorize.slurm

The ``sample.yaml`` file shows a configuration for an ``AWI-CM 1``
The ``sample.yaml`` file shows a configuration for an ``AWI-CM 1``
simulation, and processes one set of files, ``fgco2``, which was
called ``CO2f`` in ``FESOM 1``. The default pipeline is used, and
nothing special is done.
Expand All @@ -35,17 +35,21 @@ or::
Monitoring the Dask Progress
============================

``pymorize`` makes heavy use of ``dask``, and ``dask`` provides a dashboard to view the progress, however, you
need to set up SSH tunnels to properly see it. As a convenient shortcut, ``pymorize`` has tunneling built into
it's command line interface::
``pymorize`` makes heavy use of ``dask``, and ``dask`` provides a dashboard to view the progress, however, you
need to set up SSH tunnels to properly see it from your local computer. As a convenient shortcut, ``pymorize``
has tunneling built into it's command line interface::

pymorize ssh-tunnel --gateway=<LOGIN_NODE> --username=<USER> --compute-node=<JOB_NODE>

**Or even more convenient!** Search for ``ssh-tunnel`` in your ``slurm-<JOB_ID>.out`` (or in the stdout if you
are running ``pymorize process`` directly from the login node). You should be able to find the precise
command you need to use in your local computer, matching the syntax above.

Note that ``JOB_NODE`` is where your main ``pymorize`` job starts, and **not** one of the dask worker
jobs.

You can also generate the required SSH tunnels by hand. On your local workstation::

ssh -L 8080:localhost:8080 -L 8080:<COMPUTE_NODE>:8787 <USER>@<SPECIFIC_LOGIN_NODE>

On the login node::
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def read(filename):
package_dir={"": "src"},
packages=find_packages(where="src", exclude=("tests",)),
install_requires=[
"bokeh",
"cerberus",
"cf_xarray",
"cftime",
Expand Down
36 changes: 36 additions & 0 deletions src/pymorize/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""
This module contains the functions to manage the Dask cluster.
"""

import dask

from .logging import logger


def set_dashboard_link(cluster):
"""
Checks whether the default user configuration for the dashboard link is valid.
If the configuration is invalid it tried to catch the following errors:
* ``KeyError``: 'JUPYTERHUB_SERVICE_PREFIX' -> The dashboard link is not valid because
the cluster was not launched from JupyterHub. In this case, the default dashboard
link is set to 'http://{host}:8787'.
Parameters
----------
cluster : dask_jobqueue.SLURMCluster
The Dask cluster to set the dashboard link.
"""
try:
_ = cluster.dashboard_link
except KeyError as e:
if "JUPYTERHUB_SERVICE_PREFIX" in str(e):
logger.debug(
"Trying to use JupyterHub prefix for the dashboard link, but the it "
"was not launched from JupyterHub. Falling back to the default "
"dashboard link."
)
default_dashboard_link = "http://{host}:8787"
dask.config.set({"distributed.dashboard.link": default_dashboard_link})
else:
raise e
19 changes: 16 additions & 3 deletions src/pymorize/cmorizer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import copy
import getpass
import os
from importlib.resources import files
from pathlib import Path

Expand All @@ -14,15 +16,15 @@
from prefect.futures import wait
from rich.progress import track

from .config import PymorizeConfig, PymorizeConfigManager, parse_bool
from .cluster import set_dashboard_link
from .config import PymorizeConfig, PymorizeConfigManager
from .data_request import (DataRequest, DataRequestTable, DataRequestVariable,
IgnoreTableFiles)
from .filecache import fc
from .logging import logger
from .pipeline import Pipeline
from .rule import Rule
from .timeaverage import _frequency_from_approx_interval
from .units import handle_unit_conversion
from .utils import wait_for_workers
from .validate import PIPELINES_VALIDATOR, RULES_VALIDATOR

Expand Down Expand Up @@ -120,6 +122,7 @@ def _post_init_create_dask_cluster(self):
# FIXME: In the future, we can support PBS, too.
logger.info("Setting up SLURMCluster...")
self._cluster = SLURMCluster()
set_dashboard_link(self._cluster)
cluster_mode = self._pymorize_cfg.get("cluster_mode", "adapt")
if cluster_mode == "adapt":
min_jobs = self._pymorize_cfg.get("minimum_jobs", 1)
Expand All @@ -136,6 +139,16 @@ def _post_init_create_dask_cluster(self):
# FIXME: Client needs to be available here?
logger.info(f"SLURMCluster can be found at: {self._cluster=}")
logger.info(f"Dashboard {self._cluster.dashboard_link}")
# NOTE(PG): In CI context, os.getlogin and nodename may not be available (???)
username = getpass.getuser()
nodename = getattr(os.uname(), "nodename", "UNKNOWN")
# FIXME: Include the gateway option if possible
logger.info(
"To see the dashboards run the following command in your computer's "
"terminal:\n"
f"\tpymorize ssh-tunnel --username {username} --compute-node "
f"{nodename}"
)

dask_extras = 0
logger.info("Importing Dask Extras...")
Expand Down Expand Up @@ -392,7 +405,7 @@ def is_unit_scalar(value):
if is_unit_scalar(cmor_units):
if not is_unit_scalar(model_units):
dimless = rule.get("dimensionless_unit_mappings", {})
if not cmor_units in dimless.get(cmor_variable, {}):
if cmor_units not in dimless.get(cmor_variable, {}):
errors.append(
f"Missing mapping for dimensionless variable {cmor_variable}"
)
Expand Down
7 changes: 5 additions & 2 deletions src/pymorize/ssh_tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ def ssh_tunnel_cli(
"""
Create an SSH tunnel to access Prefect and Dask dashboards on a remote compute node.
"""
dask_link = click.style(f"http://localhost:{local_dask_port}/status", fg='blue', underline=True)
prefect_link = click.style(f"http://localhost:{local_prefect_port}", fg='blue', underline=True)

ssh_command = f"ssh -nNT -L {local_dask_port}:{compute_node}:{remote_dask_port} -L {local_prefect_port}:{compute_node}:{remote_prefect_port} {username}@{gateway}"

click.echo(f"Creating SSH tunnel via: {ssh_command}")
Expand All @@ -53,10 +56,10 @@ def ssh_tunnel_cli(
f"Port forwarding: localhost:{local_prefect_port} -> {gateway}:{remote_prefect_port} -> {compute_node}:{remote_prefect_port}"
)
click.echo(
f"Dask Dashboard will be accessible at http://localhost:{local_dask_port}/status"
f"Dask Dashboard will be accessible at {dask_link}"
)
click.echo(
f"Prefect Dashboard will be accessible at http://localhost:{local_prefect_port}"
f"Prefect Dashboard will be accessible at {prefect_link}"
)
click.echo("Press Ctrl+C to close the tunnel")

Expand Down
17 changes: 17 additions & 0 deletions tests/meta/test_os_login_in_CI.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
import warnings


def test_os_login():
try:
assert os.getlogin()
except OSError:
warnings.warn("os.getlogin() failed")


def test_os_uname():
assert os.uname()


def test_os_uname_nodename():
assert os.uname().nodename

0 comments on commit d1bea94

Please sign in to comment.