diff --git a/examples/README.rst b/examples/README.rst index e64eb377..753a50f7 100644 --- a/examples/README.rst +++ b/examples/README.rst @@ -19,7 +19,7 @@ You can run the example via:: sbatch -A pymorize.slurm -The ``sample.yaml`` file shows a configuration for an ``AWI-CM 1`` +The ``sample.yaml`` file shows a configuration for an ``AWI-CM 1`` simulation, and processes one set of files, ``fgco2``, which was called ``CO2f`` in ``FESOM 1``. The default pipeline is used, and nothing special is done. @@ -35,17 +35,21 @@ or:: Monitoring the Dask Progress ============================ -``pymorize`` makes heavy use of ``dask``, and ``dask`` provides a dashboard to view the progress, however, you -need to set up SSH tunnels to properly see it. As a convenient shortcut, ``pymorize`` has tunneling built into -it's command line interface:: +``pymorize`` makes heavy use of ``dask``, and ``dask`` provides a dashboard to view the progress, however, you +need to set up SSH tunnels to properly see it from your local computer. As a convenient shortcut, ``pymorize`` +has tunneling built into it's command line interface:: pymorize ssh-tunnel --gateway= --username= --compute-node= +**Or even more convenient!** Search for ``ssh-tunnel`` in your ``slurm-.out`` (or in the stdout if you +are running ``pymorize process`` directly from the login node). You should be able to find the precise +command you need to use in your local computer, matching the syntax above. + Note that ``JOB_NODE`` is where your main ``pymorize`` job starts, and **not** one of the dask worker jobs. You can also generate the required SSH tunnels by hand. On your local workstation:: - + ssh -L 8080:localhost:8080 -L 8080::8787 @ On the login node:: diff --git a/setup.py b/setup.py index bce002db..29837f83 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,7 @@ def read(filename): package_dir={"": "src"}, packages=find_packages(where="src", exclude=("tests",)), install_requires=[ + "bokeh", "cerberus", "cf_xarray", "cftime", diff --git a/src/pymorize/cluster.py b/src/pymorize/cluster.py new file mode 100644 index 00000000..b34d9759 --- /dev/null +++ b/src/pymorize/cluster.py @@ -0,0 +1,36 @@ +""" +This module contains the functions to manage the Dask cluster. +""" + +import dask + +from .logging import logger + + +def set_dashboard_link(cluster): + """ + Checks whether the default user configuration for the dashboard link is valid. + If the configuration is invalid it tried to catch the following errors: + + * ``KeyError``: 'JUPYTERHUB_SERVICE_PREFIX' -> The dashboard link is not valid because + the cluster was not launched from JupyterHub. In this case, the default dashboard + link is set to 'http://{host}:8787'. + + Parameters + ---------- + cluster : dask_jobqueue.SLURMCluster + The Dask cluster to set the dashboard link. + """ + try: + _ = cluster.dashboard_link + except KeyError as e: + if "JUPYTERHUB_SERVICE_PREFIX" in str(e): + logger.debug( + "Trying to use JupyterHub prefix for the dashboard link, but the it " + "was not launched from JupyterHub. Falling back to the default " + "dashboard link." + ) + default_dashboard_link = "http://{host}:8787" + dask.config.set({"distributed.dashboard.link": default_dashboard_link}) + else: + raise e diff --git a/src/pymorize/cmorizer.py b/src/pymorize/cmorizer.py index c7ca47f7..7c8567a5 100644 --- a/src/pymorize/cmorizer.py +++ b/src/pymorize/cmorizer.py @@ -1,4 +1,6 @@ import copy +import getpass +import os from importlib.resources import files from pathlib import Path @@ -14,7 +16,8 @@ from prefect.futures import wait from rich.progress import track -from .config import PymorizeConfig, PymorizeConfigManager, parse_bool +from .cluster import set_dashboard_link +from .config import PymorizeConfig, PymorizeConfigManager from .data_request import (DataRequest, DataRequestTable, DataRequestVariable, IgnoreTableFiles) from .filecache import fc @@ -22,7 +25,6 @@ from .pipeline import Pipeline from .rule import Rule from .timeaverage import _frequency_from_approx_interval -from .units import handle_unit_conversion from .utils import wait_for_workers from .validate import PIPELINES_VALIDATOR, RULES_VALIDATOR @@ -120,6 +122,7 @@ def _post_init_create_dask_cluster(self): # FIXME: In the future, we can support PBS, too. logger.info("Setting up SLURMCluster...") self._cluster = SLURMCluster() + set_dashboard_link(self._cluster) cluster_mode = self._pymorize_cfg.get("cluster_mode", "adapt") if cluster_mode == "adapt": min_jobs = self._pymorize_cfg.get("minimum_jobs", 1) @@ -136,6 +139,16 @@ def _post_init_create_dask_cluster(self): # FIXME: Client needs to be available here? logger.info(f"SLURMCluster can be found at: {self._cluster=}") logger.info(f"Dashboard {self._cluster.dashboard_link}") + # NOTE(PG): In CI context, os.getlogin and nodename may not be available (???) + username = getpass.getuser() + nodename = getattr(os.uname(), "nodename", "UNKNOWN") + # FIXME: Include the gateway option if possible + logger.info( + "To see the dashboards run the following command in your computer's " + "terminal:\n" + f"\tpymorize ssh-tunnel --username {username} --compute-node " + f"{nodename}" + ) dask_extras = 0 logger.info("Importing Dask Extras...") @@ -392,7 +405,7 @@ def is_unit_scalar(value): if is_unit_scalar(cmor_units): if not is_unit_scalar(model_units): dimless = rule.get("dimensionless_unit_mappings", {}) - if not cmor_units in dimless.get(cmor_variable, {}): + if cmor_units not in dimless.get(cmor_variable, {}): errors.append( f"Missing mapping for dimensionless variable {cmor_variable}" ) diff --git a/src/pymorize/ssh_tunnel.py b/src/pymorize/ssh_tunnel.py index 08c04caf..58bb9674 100644 --- a/src/pymorize/ssh_tunnel.py +++ b/src/pymorize/ssh_tunnel.py @@ -43,6 +43,9 @@ def ssh_tunnel_cli( """ Create an SSH tunnel to access Prefect and Dask dashboards on a remote compute node. """ + dask_link = click.style(f"http://localhost:{local_dask_port}/status", fg='blue', underline=True) + prefect_link = click.style(f"http://localhost:{local_prefect_port}", fg='blue', underline=True) + ssh_command = f"ssh -nNT -L {local_dask_port}:{compute_node}:{remote_dask_port} -L {local_prefect_port}:{compute_node}:{remote_prefect_port} {username}@{gateway}" click.echo(f"Creating SSH tunnel via: {ssh_command}") @@ -53,10 +56,10 @@ def ssh_tunnel_cli( f"Port forwarding: localhost:{local_prefect_port} -> {gateway}:{remote_prefect_port} -> {compute_node}:{remote_prefect_port}" ) click.echo( - f"Dask Dashboard will be accessible at http://localhost:{local_dask_port}/status" + f"Dask Dashboard will be accessible at {dask_link}" ) click.echo( - f"Prefect Dashboard will be accessible at http://localhost:{local_prefect_port}" + f"Prefect Dashboard will be accessible at {prefect_link}" ) click.echo("Press Ctrl+C to close the tunnel") diff --git a/tests/meta/test_os_login_in_CI.py b/tests/meta/test_os_login_in_CI.py new file mode 100644 index 00000000..12780224 --- /dev/null +++ b/tests/meta/test_os_login_in_CI.py @@ -0,0 +1,17 @@ +import os +import warnings + + +def test_os_login(): + try: + assert os.getlogin() + except OSError: + warnings.warn("os.getlogin() failed") + + +def test_os_uname(): + assert os.uname() + + +def test_os_uname_nodename(): + assert os.uname().nodename