Skip to content

Commit 1aaa4b3

Browse files
committed
slurm_scheduler: add support for per replica log files and API access
1 parent 92e6897 commit 1aaa4b3

File tree

4 files changed

+126
-41
lines changed

4 files changed

+126
-41
lines changed

scripts/slurmtest.sh

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ source "$VENV"/bin/activate
1818
python --version
1919
pip install "$REMOTE_WHEEL"
2020

21-
APP_ID="$(torchx run --wait --scheduler slurm --scheduler_args partition=compute,time=10 utils.echo --image /tmp --num_replicas 3)"
21+
APP_ID="$(torchx run --wait --scheduler slurm --scheduler_args partition=compute,time=10 utils.echo --num_replicas 3)"
2222
torchx status "$APP_ID"
2323
torchx describe "$APP_ID"
24-
LOG_FILE="slurm-$(basename "$APP_ID").out"
25-
cat "$LOG_FILE"
26-
LINES="$(wc -l "$LOG_FILE" | cut -d' ' -f1)"
24+
sacct -j "$(basename "$APP_ID")"
25+
torchx log "$APP_ID"
26+
LINES="$(torchx log "$APP_ID" | wc -l)"
2727

2828
if [ "$LINES" -ne 3 ]
2929
then

torchx/schedulers/local_scheduler.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -909,14 +909,19 @@ def __del__(self) -> None:
909909

910910
class LogIterator:
911911
def __init__(
912-
self, app_id: str, regex: str, log_file: str, scheduler: LocalScheduler
912+
self,
913+
app_id: str,
914+
regex: str,
915+
log_file: str,
916+
scheduler: Scheduler,
917+
should_tail: bool = True,
913918
) -> None:
914919
self._app_id: str = app_id
915920
self._regex: Pattern[str] = re.compile(regex)
916921
self._log_file: str = log_file
917922
self._log_fp: Optional[TextIO] = None
918-
self._scheduler: LocalScheduler = scheduler
919-
self._app_finished: bool = False
923+
self._scheduler: Scheduler = scheduler
924+
self._app_finished: bool = not should_tail
920925

921926
def _check_finished(self) -> None:
922927
# either the app (already finished) was evicted from the LRU cache

torchx/schedulers/slurm_scheduler.py

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
import shlex
1616
import subprocess
1717
import tempfile
18+
import warnings
1819
from dataclasses import dataclass
19-
from typing import Any, Dict, List, Mapping, Optional, Tuple
20+
from datetime import datetime
21+
from typing import Any, Dict, List, Mapping, Optional, Tuple, Iterable
2022

21-
from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse, Scheduler
23+
from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse, Scheduler, Stream
24+
from torchx.schedulers.local_scheduler import LogIterator
2225
from torchx.specs import (
2326
NONE,
2427
AppDef,
@@ -100,12 +103,16 @@ def from_role(
100103
if resource.gpu > 0:
101104
sbatch_opts.setdefault("gpus-per-task", str(resource.gpu))
102105

106+
srun_opts = {
107+
"output": f"slurm-{macros.app_id}-{name}.out",
108+
}
109+
103110
return cls(
104111
name=name,
105112
entrypoint=role.entrypoint,
106113
args=list(role.args),
107114
sbatch_opts=sbatch_opts,
108-
srun_opts={},
115+
srun_opts=srun_opts,
109116
env=dict(role.env),
110117
)
111118

@@ -176,7 +183,11 @@ class SlurmScheduler(Scheduler):
176183
resource allocations and args and then sbatch is used to launch all of them
177184
together.
178185
179-
Logs are written to the default slurm log file.
186+
Logs are available in combined form via ``torchx log``, the programmatic API
187+
as well as in the job launch directory as
188+
``slurm-<jobid>-<role>-<replica_id>.out``. If TorchX is running in a
189+
different directory than where the job was created the logs won't be able to
190+
be found.
180191
181192
Some of the config options passed to it are added as SBATCH arguments to each
182193
replica. See https://slurm.schedmd.com/sbatch.html#SECTION_OPTIONS for info
@@ -203,9 +214,7 @@ class SlurmScheduler(Scheduler):
203214
type: scheduler
204215
features:
205216
cancel: true
206-
logs: |
207-
Logs are accessible via the default slurm log file but not the
208-
programmatic API.
217+
logs: true
209218
distributed: true
210219
describe: |
211220
Partial support. SlurmScheduler will return job and replica
@@ -262,7 +271,7 @@ def _submit_dryrun(
262271
app_id=macros.app_id,
263272
replica_id=str(replica_id),
264273
)
265-
name = f"{app.name}-{role.name}-{replica_id}"
274+
name = f"{role.name}-{replica_id}"
266275
replica_role = values.apply(role)
267276
replicas[name] = SlurmReplicaRequest.from_role(name, replica_role, cfg)
268277
req = SlurmBatchRequest(
@@ -308,19 +317,19 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
308317
), f"failed to translate slurm state {state} to torchx state"
309318
app_state = state_enum
310319

311-
name_parts = row["JobName"].split("-")
312-
if len(name_parts) < 3:
320+
role, _, replica_id = row["JobName"].rpartition("-")
321+
if not replica_id:
313322
# name should always have at least 3 parts but sometimes sacct
314323
# is slow to update
315324
continue
316-
role = name_parts[-2]
317-
replica_id = int(name_parts[-1])
318325
if role not in roles:
319326
roles[role] = Role(name=role, num_replicas=0, image="")
320327
roles_statuses[role] = RoleStatus(role, [])
321328
roles[role].num_replicas += 1
322329
roles_statuses[role].replicas.append(
323-
ReplicaStatus(id=replica_id, role=role, state=app_state, hostname=""),
330+
ReplicaStatus(
331+
id=int(replica_id), role=role, state=app_state, hostname=""
332+
),
324333
)
325334

326335
return DescribeAppResponse(
@@ -331,6 +340,34 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
331340
msg=msg,
332341
)
333342

343+
def log_iter(
344+
self,
345+
app_id: str,
346+
role_name: str,
347+
k: int = 0,
348+
regex: Optional[str] = None,
349+
since: Optional[datetime] = None,
350+
until: Optional[datetime] = None,
351+
should_tail: bool = False,
352+
streams: Optional[Stream] = None,
353+
) -> Iterable[str]:
354+
if since or until:
355+
warnings.warn(
356+
"since and/or until times specified for SlurmScheduler.log_iter."
357+
" These will be ignored and all log lines will be returned"
358+
)
359+
if streams is not None and streams != Stream.COMBINED:
360+
warnings.warn(
361+
"streams specified for SlurmScheduler.log_iter."
362+
" These will be ignored and all log lines will be returned"
363+
)
364+
365+
log_file = f"slurm-{app_id}-{role_name}-{k}.out"
366+
367+
return LogIterator(
368+
app_id, regex or ".*", log_file, self, should_tail=should_tail
369+
)
370+
334371

335372
def create_scheduler(session_name: str, **kwargs: Any) -> SlurmScheduler:
336373
return SlurmScheduler(

torchx/schedulers/test/slurm_scheduler_test.py

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@
44
# This source code is licensed under the BSD-style license found in the
55
# LICENSE file in the root directory of this source tree.
66

7+
import os
78
import subprocess
9+
import tempfile
810
import unittest
11+
from contextlib import contextmanager
12+
from typing import Generator
913
from unittest.mock import MagicMock, call, patch
1014

1115
from torchx import specs
@@ -18,6 +22,18 @@
1822
)
1923

2024

25+
@contextmanager
26+
def tmp_cwd() -> Generator[None, None, None]:
27+
path = tempfile.TemporaryDirectory()
28+
cwd = os.getcwd()
29+
os.chdir(path.name)
30+
try:
31+
yield
32+
finally:
33+
os.chdir(cwd)
34+
path.cleanup()
35+
36+
2137
class SlurmSchedulerTest(unittest.TestCase):
2238
def test_create_scheduler(self) -> None:
2339
scheduler = create_scheduler("foo")
@@ -40,12 +56,12 @@ def test_replica_request(self) -> None:
4056
),
4157
)
4258
sbatch, srun = SlurmReplicaRequest.from_role(
43-
"role-name", role, cfg={}
59+
"role-0", role, cfg={}
4460
).materialize()
4561
self.assertEqual(
4662
sbatch,
4763
[
48-
"--job-name=role-name",
64+
"--job-name=role-0",
4965
"--ntasks-per-node=1",
5066
"--cpus-per-task=2",
5167
"--mem=10",
@@ -54,7 +70,13 @@ def test_replica_request(self) -> None:
5470
)
5571
self.assertEqual(
5672
srun,
57-
["--export=FOO=bar", "echo", "'hello slurm'", "test"],
73+
[
74+
'--output=slurm-"$SLURM_JOB_ID"-role-0.out',
75+
"--export=FOO=bar",
76+
"echo",
77+
"'hello slurm'",
78+
"test",
79+
],
5880
)
5981

6082
# test nomem option
@@ -133,25 +155,25 @@ def test_dryrun_multi_role(self) -> None:
133155
self.assertEqual(req.cmd, ["sbatch", "--parsable"])
134156
self.assertEqual(
135157
set(req.replicas.keys()),
136-
{"foo-a-0", "foo-a-1", "foo-b-0"},
158+
{"a-0", "a-1", "b-0"},
137159
)
138160

139161
script = req.materialize()
140162
self.assertEqual(
141163
script,
142164
"""#!/bin/bash
143-
#SBATCH --job-name=foo-a-0 --ntasks-per-node=1
165+
#SBATCH --job-name=a-0 --ntasks-per-node=1
144166
#SBATCH hetjob
145-
#SBATCH --job-name=foo-a-1 --ntasks-per-node=1
167+
#SBATCH --job-name=a-1 --ntasks-per-node=1
146168
#SBATCH hetjob
147-
#SBATCH --job-name=foo-b-0 --ntasks-per-node=1
169+
#SBATCH --job-name=b-0 --ntasks-per-node=1
148170
149171
# exit on error
150172
set -e
151173
152-
srun echo 0 'hello '"$SLURM_JOB_ID"'' :\\
153-
echo 1 'hello '"$SLURM_JOB_ID"'' :\\
154-
echo
174+
srun --output=slurm-"$SLURM_JOB_ID"-a-0.out echo 0 'hello '"$SLURM_JOB_ID"'' :\\
175+
--output=slurm-"$SLURM_JOB_ID"-a-1.out echo 1 'hello '"$SLURM_JOB_ID"'' :\\
176+
--output=slurm-"$SLURM_JOB_ID"-b-0.out echo
155177
""",
156178
)
157179

@@ -205,33 +227,43 @@ def test_cancel(self, run: MagicMock, describe: MagicMock) -> None:
205227
@patch("subprocess.run")
206228
def test_describe_completed(self, run: MagicMock) -> None:
207229
run.return_value.stdout = b"""
208-
JobID|JobName|Partition|Account|AllocCPUS|State|ExitCode|
209-
176+0|echo-echo-0|compute||1|COMPLETED|0:0|
210-
176+0.batch|batch|||1|COMPLETED|0:0|
211-
176+0.0|echo|||1|COMPLETED|0:0|
212-
176+1|echo-echo-1|compute||1|COMPLETED|0:0|
213-
176+1.0|echo|||1|COMPLETED|0:0|
214-
176+2|echo-echo-2|compute||1|COMPLETED|0:0|
215-
176+2.0|echo|||1|COMPLETED|0:0|
230+
JobID|JobName|Partition|Account|AllocCPUS|State|ExitCode
231+
1853+0|echo-0|compute||1|COMPLETED|0:0
232+
1853+0.batch|batch|||1|COMPLETED|0:0
233+
1853+0.0|echo|||1|COMPLETED|0:0
234+
1853+1|echo-1|compute||1|COMPLETED|0:0
235+
1853+1.0|echo|||1|COMPLETED|0:0
236+
1853+2|echo-2|compute||1|COMPLETED|0:0
237+
1853+2.0|echo|||1|COMPLETED|0:0
216238
""".strip()
217239

218240
scheduler = create_scheduler("foo")
219-
out = scheduler.describe(app_id="176")
241+
out = scheduler.describe(app_id="1853")
220242

221243
self.assertEqual(run.call_count, 1)
222244
self.assertEqual(
223245
run.call_args,
224246
call(
225-
["sacct", "--parsable2", "-j", "176"],
247+
["sacct", "--parsable2", "-j", "1853"],
226248
stdout=subprocess.PIPE,
227249
check=True,
228250
),
229251
)
230252

231253
self.assertIsNotNone(out)
232-
self.assertEqual(out.app_id, "176")
254+
self.assertEqual(out.app_id, "1853")
233255
self.assertEqual(out.msg, "COMPLETED")
234256
self.assertEqual(out.state, specs.AppState.SUCCEEDED)
257+
self.assertEqual(
258+
out.roles,
259+
[
260+
specs.Role(
261+
name="echo",
262+
image="",
263+
num_replicas=3,
264+
)
265+
],
266+
)
235267

236268
@patch("subprocess.run")
237269
def test_describe_running(self, run: MagicMock) -> None:
@@ -253,3 +285,14 @@ def test_describe_running(self, run: MagicMock) -> None:
253285
self.assertEqual(out.app_id, "54")
254286
self.assertEqual(out.msg, "RUNNING")
255287
self.assertEqual(out.state, specs.AppState.RUNNING)
288+
289+
@patch("subprocess.run")
290+
def test_log_iter(self, run: MagicMock) -> None:
291+
scheduler = create_scheduler("foo")
292+
293+
with tmp_cwd():
294+
with open("slurm-54-echo-1.out", "wt") as f:
295+
f.write("hello\nworld\n")
296+
297+
logs = list(scheduler.log_iter("54", "echo", 1))
298+
self.assertEqual(logs, ["hello", "world"])

0 commit comments

Comments
 (0)