diff --git a/scripts/slurmtest.sh b/scripts/slurmtest.sh index 83f8d9056..0f2fe347f 100755 --- a/scripts/slurmtest.sh +++ b/scripts/slurmtest.sh @@ -18,12 +18,12 @@ source "$VENV"/bin/activate python --version pip install "$REMOTE_WHEEL" -APP_ID="$(torchx run --wait --scheduler slurm --scheduler_args partition=compute,time=10 utils.echo --image /tmp --num_replicas 3)" +APP_ID="$(torchx run --wait --scheduler slurm --scheduler_args partition=compute,time=10 utils.echo --num_replicas 3)" torchx status "$APP_ID" torchx describe "$APP_ID" -LOG_FILE="slurm-$(basename "$APP_ID").out" -cat "$LOG_FILE" -LINES="$(wc -l "$LOG_FILE" | cut -d' ' -f1)" +sacct -j "$(basename "$APP_ID")" +torchx log "$APP_ID" +LINES="$(torchx log "$APP_ID" | wc -l)" if [ "$LINES" -ne 3 ] then diff --git a/torchx/schedulers/local_scheduler.py b/torchx/schedulers/local_scheduler.py index feaedf40a..d0734b01c 100644 --- a/torchx/schedulers/local_scheduler.py +++ b/torchx/schedulers/local_scheduler.py @@ -909,14 +909,19 @@ def __del__(self) -> None: class LogIterator: def __init__( - self, app_id: str, regex: str, log_file: str, scheduler: LocalScheduler + self, + app_id: str, + regex: str, + log_file: str, + scheduler: Scheduler, + should_tail: bool = True, ) -> None: self._app_id: str = app_id self._regex: Pattern[str] = re.compile(regex) self._log_file: str = log_file self._log_fp: Optional[TextIO] = None - self._scheduler: LocalScheduler = scheduler - self._app_finished: bool = False + self._scheduler: Scheduler = scheduler + self._app_finished: bool = not should_tail def _check_finished(self) -> None: # either the app (already finished) was evicted from the LRU cache diff --git a/torchx/schedulers/slurm_scheduler.py b/torchx/schedulers/slurm_scheduler.py index 7c3dbf1f8..704f708e4 100644 --- a/torchx/schedulers/slurm_scheduler.py +++ b/torchx/schedulers/slurm_scheduler.py @@ -15,10 +15,13 @@ import shlex import subprocess import tempfile +import warnings from dataclasses import dataclass -from typing import Any, Dict, List, Mapping, Optional, Tuple +from datetime import datetime +from typing import Any, Dict, List, Mapping, Optional, Tuple, Iterable -from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse, Scheduler +from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse, Scheduler, Stream +from torchx.schedulers.local_scheduler import LogIterator from torchx.specs import ( NONE, AppDef, @@ -100,15 +103,28 @@ def from_role( if resource.gpu > 0: sbatch_opts.setdefault("gpus-per-task", str(resource.gpu)) + srun_opts = { + "output": f"slurm-{macros.app_id}-{name}.out", + } + return cls( name=name, entrypoint=role.entrypoint, args=list(role.args), sbatch_opts=sbatch_opts, - srun_opts={}, + srun_opts=srun_opts, env=dict(role.env), ) + def _opts_to_strs(self, opts: Dict[str, str]) -> List[str]: + out = [] + for key, value in opts.items(): + if value is not None: + out.append(f"--{key}={value}") + else: + out.append(f"--{key}") + return out + def materialize(self) -> Tuple[List[str], List[str]]: """ materialize returns the sbatch and srun groups for this role. They @@ -116,10 +132,12 @@ def materialize(self) -> Tuple[List[str], List[str]]: """ sbatch_args = [ f"--job-name={self.name}", - ] + [f"--{key}={value}" for key, value in self.sbatch_opts.items()] - srun_args = [f"--{key}={value}" for key, value in self.srun_opts.items()] + [ - f"--export={key}={value}" for key, value in self.env.items() - ] + ] + self._opts_to_strs(self.sbatch_opts) + srun_args = self._opts_to_strs(self.srun_opts) + + if len(self.env) > 0: + kvs = [f"{key}={value}" for key, value in self.env.items()] + srun_args += ["--export=ALL," + ",".join(kvs)] srun_group = srun_args + [self.entrypoint] + self.args srun_group = [_apply_app_id_env(arg) for arg in srun_group] @@ -160,6 +178,9 @@ def materialize(self) -> str: # exit on error set -e +export PYTHONUNBUFFERED=1 +export SLURM_UNBUFFEREDIO=1 + srun {" ".join(srun_groups)} """ sbatch_cmd = self.cmd + sbatch_groups @@ -176,7 +197,11 @@ class SlurmScheduler(Scheduler): resource allocations and args and then sbatch is used to launch all of them together. - Logs are written to the default slurm log file. + Logs are available in combined form via ``torchx log``, the programmatic API + as well as in the job launch directory as + ``slurm---.out``. If TorchX is running in a + different directory than where the job was created the logs won't be able to + be found. Some of the config options passed to it are added as SBATCH arguments to each replica. See https://slurm.schedmd.com/sbatch.html#SECTION_OPTIONS for info @@ -203,9 +228,7 @@ class SlurmScheduler(Scheduler): type: scheduler features: cancel: true - logs: | - Logs are accessible via the default slurm log file but not the - programmatic API. + logs: true distributed: true describe: | Partial support. SlurmScheduler will return job and replica @@ -262,7 +285,7 @@ def _submit_dryrun( app_id=macros.app_id, replica_id=str(replica_id), ) - name = f"{app.name}-{role.name}-{replica_id}" + name = f"{role.name}-{replica_id}" replica_role = values.apply(role) replicas[name] = SlurmReplicaRequest.from_role(name, replica_role, cfg) req = SlurmBatchRequest( @@ -308,19 +331,19 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]: ), f"failed to translate slurm state {state} to torchx state" app_state = state_enum - name_parts = row["JobName"].split("-") - if len(name_parts) < 3: + role, _, replica_id = row["JobName"].rpartition("-") + if not replica_id or not role: # name should always have at least 3 parts but sometimes sacct # is slow to update continue - role = name_parts[-2] - replica_id = int(name_parts[-1]) if role not in roles: roles[role] = Role(name=role, num_replicas=0, image="") roles_statuses[role] = RoleStatus(role, []) roles[role].num_replicas += 1 roles_statuses[role].replicas.append( - ReplicaStatus(id=replica_id, role=role, state=app_state, hostname=""), + ReplicaStatus( + id=int(replica_id), role=role, state=app_state, hostname="" + ), ) return DescribeAppResponse( @@ -331,6 +354,34 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]: msg=msg, ) + def log_iter( + self, + app_id: str, + role_name: str, + k: int = 0, + regex: Optional[str] = None, + since: Optional[datetime] = None, + until: Optional[datetime] = None, + should_tail: bool = False, + streams: Optional[Stream] = None, + ) -> Iterable[str]: + if since or until: + warnings.warn( + "since and/or until times specified for SlurmScheduler.log_iter." + " These will be ignored and all log lines will be returned" + ) + if streams is not None and streams != Stream.COMBINED: + warnings.warn( + "streams specified for SlurmScheduler.log_iter." + " These will be ignored and all log lines will be returned" + ) + + log_file = f"slurm-{app_id}-{role_name}-{k}.out" + + return LogIterator( + app_id, regex or ".*", log_file, self, should_tail=should_tail + ) + def create_scheduler(session_name: str, **kwargs: Any) -> SlurmScheduler: return SlurmScheduler( diff --git a/torchx/schedulers/test/slurm_scheduler_test.py b/torchx/schedulers/test/slurm_scheduler_test.py index 27d285d25..f645db215 100644 --- a/torchx/schedulers/test/slurm_scheduler_test.py +++ b/torchx/schedulers/test/slurm_scheduler_test.py @@ -4,12 +4,17 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. +import datetime +import os import subprocess +import tempfile import unittest +from contextlib import contextmanager +from typing import Generator from unittest.mock import MagicMock, call, patch from torchx import specs -from torchx.schedulers.api import DescribeAppResponse +from torchx.schedulers.api import DescribeAppResponse, Stream from torchx.schedulers.slurm_scheduler import ( SlurmBatchRequest, SlurmReplicaRequest, @@ -18,6 +23,17 @@ ) +@contextmanager +def tmp_cwd() -> Generator[None, None, None]: + with tempfile.TemporaryDirectory() as path: + cwd = os.getcwd() + os.chdir(path) + try: + yield + finally: + os.chdir(cwd) + + class SlurmSchedulerTest(unittest.TestCase): def test_create_scheduler(self) -> None: scheduler = create_scheduler("foo") @@ -40,12 +56,12 @@ def test_replica_request(self) -> None: ), ) sbatch, srun = SlurmReplicaRequest.from_role( - "role-name", role, cfg={} + "role-0", role, cfg={} ).materialize() self.assertEqual( sbatch, [ - "--job-name=role-name", + "--job-name=role-0", "--ntasks-per-node=1", "--cpus-per-task=2", "--mem=10", @@ -54,7 +70,13 @@ def test_replica_request(self) -> None: ) self.assertEqual( srun, - ["--export=FOO=bar", "echo", "'hello slurm'", "test"], + [ + '--output=slurm-"$SLURM_JOB_ID"-role-0.out', + "--export=ALL,FOO=bar", + "echo", + "'hello slurm'", + "test", + ], ) # test nomem option @@ -133,25 +155,28 @@ def test_dryrun_multi_role(self) -> None: self.assertEqual(req.cmd, ["sbatch", "--parsable"]) self.assertEqual( set(req.replicas.keys()), - {"foo-a-0", "foo-a-1", "foo-b-0"}, + {"a-0", "a-1", "b-0"}, ) script = req.materialize() self.assertEqual( script, """#!/bin/bash -#SBATCH --job-name=foo-a-0 --ntasks-per-node=1 +#SBATCH --job-name=a-0 --ntasks-per-node=1 #SBATCH hetjob -#SBATCH --job-name=foo-a-1 --ntasks-per-node=1 +#SBATCH --job-name=a-1 --ntasks-per-node=1 #SBATCH hetjob -#SBATCH --job-name=foo-b-0 --ntasks-per-node=1 +#SBATCH --job-name=b-0 --ntasks-per-node=1 # exit on error set -e -srun echo 0 'hello '"$SLURM_JOB_ID"'' :\\ - echo 1 'hello '"$SLURM_JOB_ID"'' :\\ - echo +export PYTHONUNBUFFERED=1 +export SLURM_UNBUFFEREDIO=1 + +srun --output=slurm-"$SLURM_JOB_ID"-a-0.out echo 0 'hello '"$SLURM_JOB_ID"'' :\\ + --output=slurm-"$SLURM_JOB_ID"-a-1.out echo 1 'hello '"$SLURM_JOB_ID"'' :\\ + --output=slurm-"$SLURM_JOB_ID"-b-0.out echo """, ) @@ -205,33 +230,80 @@ def test_cancel(self, run: MagicMock, describe: MagicMock) -> None: @patch("subprocess.run") def test_describe_completed(self, run: MagicMock) -> None: run.return_value.stdout = b""" -JobID|JobName|Partition|Account|AllocCPUS|State|ExitCode| -176+0|echo-echo-0|compute||1|COMPLETED|0:0| -176+0.batch|batch|||1|COMPLETED|0:0| -176+0.0|echo|||1|COMPLETED|0:0| -176+1|echo-echo-1|compute||1|COMPLETED|0:0| -176+1.0|echo|||1|COMPLETED|0:0| -176+2|echo-echo-2|compute||1|COMPLETED|0:0| -176+2.0|echo|||1|COMPLETED|0:0| +JobID|JobName|Partition|Account|AllocCPUS|State|ExitCode +1853+0|echo-0|compute||1|COMPLETED|0:0 +1853+0.batch|batch|||1|COMPLETED|0:0 +1853+0.0|echo|||1|COMPLETED|0:0 +1853+1|echo-1|compute||1|COMPLETED|0:0 +1853+1.0|echo|||1|COMPLETED|0:0 +1853+2|echo-2|compute||1|COMPLETED|0:0 +1853+2.0|echo|||1|COMPLETED|0:0 """.strip() scheduler = create_scheduler("foo") - out = scheduler.describe(app_id="176") + out = scheduler.describe(app_id="1853") self.assertEqual(run.call_count, 1) self.assertEqual( run.call_args, call( - ["sacct", "--parsable2", "-j", "176"], + ["sacct", "--parsable2", "-j", "1853"], stdout=subprocess.PIPE, check=True, ), ) self.assertIsNotNone(out) - self.assertEqual(out.app_id, "176") + self.assertEqual(out.app_id, "1853") self.assertEqual(out.msg, "COMPLETED") self.assertEqual(out.state, specs.AppState.SUCCEEDED) + self.assertEqual( + out.roles, + [ + specs.Role( + name="echo", + image="", + num_replicas=3, + ) + ], + ) + + @patch("subprocess.run") + def test_describe_single_replica(self, run: MagicMock) -> None: + run.return_value.stdout = b""" +JobID|JobName|Partition|Account|AllocCPUS|State|ExitCode +1902|sh-0|compute||1|FAILED|2:0 +1902.batch|batch|||1|FAILED|2:0 +1902.0|sh|||1|FAILED|2:0 +""".strip() + + scheduler = create_scheduler("foo") + out = scheduler.describe(app_id="1902") + + self.assertEqual(run.call_count, 1) + self.assertEqual( + run.call_args, + call( + ["sacct", "--parsable2", "-j", "1902"], + stdout=subprocess.PIPE, + check=True, + ), + ) + + self.assertIsNotNone(out) + self.assertEqual(out.app_id, "1902") + self.assertEqual(out.msg, "FAILED") + self.assertEqual(out.state, specs.AppState.FAILED) + self.assertEqual( + out.roles, + [ + specs.Role( + name="sh", + image="", + num_replicas=1, + ) + ], + ) @patch("subprocess.run") def test_describe_running(self, run: MagicMock) -> None: @@ -253,3 +325,22 @@ def test_describe_running(self, run: MagicMock) -> None: self.assertEqual(out.app_id, "54") self.assertEqual(out.msg, "RUNNING") self.assertEqual(out.state, specs.AppState.RUNNING) + + @patch("subprocess.run") + def test_log_iter(self, run: MagicMock) -> None: + scheduler = create_scheduler("foo") + + with tmp_cwd(): + with open("slurm-54-echo-1.out", "wt") as f: + f.write("hello\nworld\n") + + logs = list( + scheduler.log_iter( + "54", + "echo", + 1, + streams=Stream.STDERR, + since=datetime.datetime.now(), + ) + ) + self.assertEqual(logs, ["hello", "world"])