Skip to content

Commit 470213e

Browse files
committed
slurm_scheduler: add support for per replica log files and API access
1 parent 92e6897 commit 470213e

File tree

4 files changed

+120
-39
lines changed

4 files changed

+120
-39
lines changed

scripts/slurmtest.sh

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ source "$VENV"/bin/activate
1818
python --version
1919
pip install "$REMOTE_WHEEL"
2020

21-
APP_ID="$(torchx run --wait --scheduler slurm --scheduler_args partition=compute,time=10 utils.echo --image /tmp --num_replicas 3)"
21+
APP_ID="$(torchx run --wait --scheduler slurm --scheduler_args partition=compute,time=10 utils.echo --num_replicas 3)"
2222
torchx status "$APP_ID"
2323
torchx describe "$APP_ID"
24-
LOG_FILE="slurm-$(basename "$APP_ID").out"
25-
cat "$LOG_FILE"
26-
LINES="$(wc -l "$LOG_FILE" | cut -d' ' -f1)"
24+
sacct -j "$(basename "$APP_ID")"
25+
torchx log "$APP_ID"
26+
LINES="$(torchx log "$APP_ID" | wc -l)"
2727

2828
if [ "$LINES" -ne 3 ]
2929
then

torchx/schedulers/local_scheduler.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -909,14 +909,19 @@ def __del__(self) -> None:
909909

910910
class LogIterator:
911911
def __init__(
912-
self, app_id: str, regex: str, log_file: str, scheduler: LocalScheduler
912+
self,
913+
app_id: str,
914+
regex: str,
915+
log_file: str,
916+
scheduler: LocalScheduler,
917+
should_tail: bool = True,
913918
) -> None:
914919
self._app_id: str = app_id
915920
self._regex: Pattern[str] = re.compile(regex)
916921
self._log_file: str = log_file
917922
self._log_fp: Optional[TextIO] = None
918923
self._scheduler: LocalScheduler = scheduler
919-
self._app_finished: bool = False
924+
self._app_finished: bool = not should_tail
920925

921926
def _check_finished(self) -> None:
922927
# either the app (already finished) was evicted from the LRU cache

torchx/schedulers/slurm_scheduler.py

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
import shlex
1616
import subprocess
1717
import tempfile
18+
import warnings
1819
from dataclasses import dataclass
19-
from typing import Any, Dict, List, Mapping, Optional, Tuple
20+
from datetime import datetime
21+
from typing import Any, Dict, List, Mapping, Optional, Tuple, Iterable
2022

21-
from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse, Scheduler
23+
from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse, Scheduler, Stream
24+
from torchx.schedulers.local_scheduler import LogIterator
2225
from torchx.specs import (
2326
NONE,
2427
AppDef,
@@ -100,12 +103,16 @@ def from_role(
100103
if resource.gpu > 0:
101104
sbatch_opts.setdefault("gpus-per-task", str(resource.gpu))
102105

106+
srun_opts = {
107+
"output": f"slurm-{macros.app_id}-{name}.out",
108+
}
109+
103110
return cls(
104111
name=name,
105112
entrypoint=role.entrypoint,
106113
args=list(role.args),
107114
sbatch_opts=sbatch_opts,
108-
srun_opts={},
115+
srun_opts=srun_opts,
109116
env=dict(role.env),
110117
)
111118

@@ -176,7 +183,11 @@ class SlurmScheduler(Scheduler):
176183
resource allocations and args and then sbatch is used to launch all of them
177184
together.
178185
179-
Logs are written to the default slurm log file.
186+
Logs are available in combined form via ``torchx log``, the programmatic API
187+
as well as in the job launch directory as
188+
``slurm-<jobid>-<role>-<replica_id>.out``. If TorchX is running in a
189+
different directory than where the job was created the logs won't be able to
190+
be found.
180191
181192
Some of the config options passed to it are added as SBATCH arguments to each
182193
replica. See https://slurm.schedmd.com/sbatch.html#SECTION_OPTIONS for info
@@ -203,9 +214,7 @@ class SlurmScheduler(Scheduler):
203214
type: scheduler
204215
features:
205216
cancel: true
206-
logs: |
207-
Logs are accessible via the default slurm log file but not the
208-
programmatic API.
217+
logs: true
209218
distributed: true
210219
describe: |
211220
Partial support. SlurmScheduler will return job and replica
@@ -262,7 +271,7 @@ def _submit_dryrun(
262271
app_id=macros.app_id,
263272
replica_id=str(replica_id),
264273
)
265-
name = f"{app.name}-{role.name}-{replica_id}"
274+
name = f"{role.name}-{replica_id}"
266275
replica_role = values.apply(role)
267276
replicas[name] = SlurmReplicaRequest.from_role(name, replica_role, cfg)
268277
req = SlurmBatchRequest(
@@ -308,13 +317,11 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
308317
), f"failed to translate slurm state {state} to torchx state"
309318
app_state = state_enum
310319

311-
name_parts = row["JobName"].split("-")
312-
if len(name_parts) < 3:
320+
role, _, replica_id = row["JobName"].rpartition("-")
321+
if not replica_id:
313322
# name should always have at least 3 parts but sometimes sacct
314323
# is slow to update
315324
continue
316-
role = name_parts[-2]
317-
replica_id = int(name_parts[-1])
318325
if role not in roles:
319326
roles[role] = Role(name=role, num_replicas=0, image="")
320327
roles_statuses[role] = RoleStatus(role, [])
@@ -331,6 +338,34 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
331338
msg=msg,
332339
)
333340

341+
def log_iter(
342+
self,
343+
app_id: str,
344+
role_name: str,
345+
k: int = 0,
346+
regex: Optional[str] = None,
347+
since: Optional[datetime] = None,
348+
until: Optional[datetime] = None,
349+
should_tail: bool = False,
350+
streams: Optional[Stream] = None,
351+
) -> Iterable[str]:
352+
if since or until:
353+
warnings.warn(
354+
"since and/or until times specified for SlurmScheduler.log_iter."
355+
" These will be ignored and all log lines will be returned"
356+
)
357+
if streams is not None and streams != Stream.COMBINED:
358+
warnings.warn(
359+
"streams specified for SlurmScheduler.log_iter."
360+
" These will be ignored and all log lines will be returned"
361+
)
362+
363+
log_file = f"slurm-{app_id}-{role_name}-{k}.out"
364+
365+
return LogIterator(
366+
app_id, regex or ".*", log_file, self, should_tail=should_tail
367+
)
368+
334369

335370
def create_scheduler(session_name: str, **kwargs: Any) -> SlurmScheduler:
336371
return SlurmScheduler(

torchx/schedulers/test/slurm_scheduler_test.py

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
# This source code is licensed under the BSD-style license found in the
55
# LICENSE file in the root directory of this source tree.
66

7+
import os
78
import subprocess
9+
import tempfile
810
import unittest
11+
from contextlib import contextmanager
912
from unittest.mock import MagicMock, call, patch
1013

1114
from torchx import specs
@@ -18,6 +21,18 @@
1821
)
1922

2023

24+
@contextmanager
25+
def tmp_cwd():
26+
path = tempfile.TemporaryDirectory()
27+
cwd = os.getcwd()
28+
os.chdir(path.name)
29+
try:
30+
yield
31+
finally:
32+
os.chdir(cwd)
33+
path.cleanup()
34+
35+
2136
class SlurmSchedulerTest(unittest.TestCase):
2237
def test_create_scheduler(self) -> None:
2338
scheduler = create_scheduler("foo")
@@ -40,12 +55,12 @@ def test_replica_request(self) -> None:
4055
),
4156
)
4257
sbatch, srun = SlurmReplicaRequest.from_role(
43-
"role-name", role, cfg={}
58+
"role-0", role, cfg={}
4459
).materialize()
4560
self.assertEqual(
4661
sbatch,
4762
[
48-
"--job-name=role-name",
63+
"--job-name=role-0",
4964
"--ntasks-per-node=1",
5065
"--cpus-per-task=2",
5166
"--mem=10",
@@ -54,7 +69,13 @@ def test_replica_request(self) -> None:
5469
)
5570
self.assertEqual(
5671
srun,
57-
["--export=FOO=bar", "echo", "'hello slurm'", "test"],
72+
[
73+
'--output=slurm-"$SLURM_JOB_ID"-role-0.out',
74+
"--export=FOO=bar",
75+
"echo",
76+
"'hello slurm'",
77+
"test",
78+
],
5879
)
5980

6081
# test nomem option
@@ -133,25 +154,25 @@ def test_dryrun_multi_role(self) -> None:
133154
self.assertEqual(req.cmd, ["sbatch", "--parsable"])
134155
self.assertEqual(
135156
set(req.replicas.keys()),
136-
{"foo-a-0", "foo-a-1", "foo-b-0"},
157+
{"a-0", "a-1", "b-0"},
137158
)
138159

139160
script = req.materialize()
140161
self.assertEqual(
141162
script,
142163
"""#!/bin/bash
143-
#SBATCH --job-name=foo-a-0 --ntasks-per-node=1
164+
#SBATCH --job-name=a-0 --ntasks-per-node=1
144165
#SBATCH hetjob
145-
#SBATCH --job-name=foo-a-1 --ntasks-per-node=1
166+
#SBATCH --job-name=a-1 --ntasks-per-node=1
146167
#SBATCH hetjob
147-
#SBATCH --job-name=foo-b-0 --ntasks-per-node=1
168+
#SBATCH --job-name=b-0 --ntasks-per-node=1
148169
149170
# exit on error
150171
set -e
151172
152-
srun echo 0 'hello '"$SLURM_JOB_ID"'' :\\
153-
echo 1 'hello '"$SLURM_JOB_ID"'' :\\
154-
echo
173+
srun --output=slurm-"$SLURM_JOB_ID"-a-0.out echo 0 'hello '"$SLURM_JOB_ID"'' :\\
174+
--output=slurm-"$SLURM_JOB_ID"-a-1.out echo 1 'hello '"$SLURM_JOB_ID"'' :\\
175+
--output=slurm-"$SLURM_JOB_ID"-b-0.out echo
155176
""",
156177
)
157178

@@ -205,33 +226,43 @@ def test_cancel(self, run: MagicMock, describe: MagicMock) -> None:
205226
@patch("subprocess.run")
206227
def test_describe_completed(self, run: MagicMock) -> None:
207228
run.return_value.stdout = b"""
208-
JobID|JobName|Partition|Account|AllocCPUS|State|ExitCode|
209-
176+0|echo-echo-0|compute||1|COMPLETED|0:0|
210-
176+0.batch|batch|||1|COMPLETED|0:0|
211-
176+0.0|echo|||1|COMPLETED|0:0|
212-
176+1|echo-echo-1|compute||1|COMPLETED|0:0|
213-
176+1.0|echo|||1|COMPLETED|0:0|
214-
176+2|echo-echo-2|compute||1|COMPLETED|0:0|
215-
176+2.0|echo|||1|COMPLETED|0:0|
229+
JobID|JobName|Partition|Account|AllocCPUS|State|ExitCode
230+
1853+0|echo-0|compute||1|COMPLETED|0:0
231+
1853+0.batch|batch|||1|COMPLETED|0:0
232+
1853+0.0|echo|||1|COMPLETED|0:0
233+
1853+1|echo-1|compute||1|COMPLETED|0:0
234+
1853+1.0|echo|||1|COMPLETED|0:0
235+
1853+2|echo-2|compute||1|COMPLETED|0:0
236+
1853+2.0|echo|||1|COMPLETED|0:0
216237
""".strip()
217238

218239
scheduler = create_scheduler("foo")
219-
out = scheduler.describe(app_id="176")
240+
out = scheduler.describe(app_id="1853")
220241

221242
self.assertEqual(run.call_count, 1)
222243
self.assertEqual(
223244
run.call_args,
224245
call(
225-
["sacct", "--parsable2", "-j", "176"],
246+
["sacct", "--parsable2", "-j", "1853"],
226247
stdout=subprocess.PIPE,
227248
check=True,
228249
),
229250
)
230251

231252
self.assertIsNotNone(out)
232-
self.assertEqual(out.app_id, "176")
253+
self.assertEqual(out.app_id, "1853")
233254
self.assertEqual(out.msg, "COMPLETED")
234255
self.assertEqual(out.state, specs.AppState.SUCCEEDED)
256+
self.assertEqual(
257+
out.roles,
258+
[
259+
specs.Role(
260+
name="echo",
261+
image="",
262+
num_replicas=3,
263+
)
264+
],
265+
)
235266

236267
@patch("subprocess.run")
237268
def test_describe_running(self, run: MagicMock) -> None:
@@ -253,3 +284,13 @@ def test_describe_running(self, run: MagicMock) -> None:
253284
self.assertEqual(out.app_id, "54")
254285
self.assertEqual(out.msg, "RUNNING")
255286
self.assertEqual(out.state, specs.AppState.RUNNING)
287+
288+
def test_log_iter(self) -> None:
289+
scheduler = create_scheduler("foo")
290+
291+
with tmp_cwd():
292+
with open("slurm-54-echo-1.out", "wt") as f:
293+
f.write("hello\nworld\n")
294+
295+
logs = list(scheduler.log_iter("54", "echo", 1))
296+
self.assertEqual(logs, ["hello", "world"])

0 commit comments

Comments
 (0)