Skip to content

slurm_scheduler: autodetect nomem setting #461

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion scripts/slurmtest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ cat <<EOT > .torchxconfig
partition=compute
time=10
comment=hello
nomem=true
job_dir=$JOB_DIR
EOT

Expand Down
59 changes: 50 additions & 9 deletions torchx/schedulers/slurm_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class SlurmReplicaRequest:

@classmethod
def from_role(
cls, name: str, role: Role, cfg: Mapping[str, CfgVal]
cls, name: str, role: Role, cfg: Mapping[str, CfgVal], nomem: bool
) -> "SlurmReplicaRequest":
"""
``from_role`` creates a SlurmReplicaRequest for the specific role and
Expand All @@ -112,7 +112,7 @@ def from_role(
if resource != NONE:
if resource.cpu > 0:
sbatch_opts.setdefault("cpus-per-task", str(resource.cpu))
if not cfg.get("nomem") and resource.memMB > 0:
if not nomem and resource.memMB > 0:
sbatch_opts.setdefault("mem", str(resource.memMB))
if resource.gpu > 0:
sbatch_opts.setdefault("gpus-per-task", str(resource.gpu))
Expand Down Expand Up @@ -272,6 +272,9 @@ class SlurmScheduler(Scheduler, DirWorkspace):
If ``job_dir`` is specified the DirWorkspace will create a new
isolated directory with a snapshot of the workspace.
mounts: false

If a partition has less than 1GB of RealMemory configured we disable memory
requests to workaround https://github.com/aws/aws-parallelcluster/issues/2198.
"""

def __init__(self, session_name: str) -> None:
Expand All @@ -293,12 +296,6 @@ def run_opts(self) -> runopts:
"minutes", "minutes:seconds", "hours:minutes:seconds", "days-hours", \
"days-hours:minutes" or "days-hours:minutes:seconds"',
)
opts.add(
"nomem",
type_=bool,
default=False,
help="disables memory request to workaround https://github.com/aws/aws-parallelcluster/issues/2198",
)
opts.add(
"comment",
type_=str,
Expand Down Expand Up @@ -350,12 +347,51 @@ def schedule(self, dryrun_info: AppDryRunInfo[SlurmBatchRequest]) -> str:

return job_id

def _partition_memmb(self, partition: Optional[str]) -> Optional[int]:
"""
_partition_memmb returns the memory allocation for the given partition
or the default partition if none is specified.
"""
try:
p = subprocess.run(
["sinfo", "--format", "%P,%m", "--noconvert"],
stdout=subprocess.PIPE,
)
except FileNotFoundError:
return None
if p.returncode != 0:
return None
output = p.stdout.decode("utf-8").strip().split("\n")
if len(output) <= 1:
return None

reader = csv.DictReader(output, delimiter=",")
for row in reader:
part = row.get("PARTITION")
mem = row.get("MEMORY")
if part is None or mem is None:
continue
default = "*" in part
part = part.strip("*")
memmb = int(mem.strip("M+"))
if part == partition or (partition is None and default):
return memmb
return None

def _submit_dryrun(
self, app: AppDef, cfg: Mapping[str, CfgVal]
) -> AppDryRunInfo[SlurmBatchRequest]:
job_dir = cfg.get("job_dir")
assert job_dir is None or isinstance(job_dir, str), "job_dir must be str"

partition = cfg.get("partition")
assert partition is None or isinstance(partition, str), "partition must be str"

# check if the partition has at least 1GB memory, if we're not sure,
# default to using memory allocations
memmb = self._partition_memmb(partition)
nomem = memmb is not None and memmb <= 1000

replicas = {}
for role in app.roles:
for replica_id in range(role.num_replicas):
Expand All @@ -367,7 +403,12 @@ def _submit_dryrun(
)
name = f"{role.name}-{replica_id}"
replica_role = values.apply(role)
replicas[name] = SlurmReplicaRequest.from_role(name, replica_role, cfg)
replicas[name] = SlurmReplicaRequest.from_role(
name,
replica_role,
cfg,
nomem=nomem,
)
cmd = ["sbatch", "--parsable"]

for k in SBATCH_JOB_OPTIONS:
Expand Down
83 changes: 76 additions & 7 deletions torchx/schedulers/test/slurm_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ def simple_app() -> specs.AppDef:
)


def mem_app() -> specs.AppDef:
return specs.AppDef(
name="foo",
roles=[
simple_role(),
],
)


class SlurmSchedulerTest(unittest.TestCase):
def test_create_scheduler(self) -> None:
scheduler = create_scheduler("foo")
Expand All @@ -83,7 +92,7 @@ def test_create_scheduler(self) -> None:
def test_replica_request(self) -> None:
role = simple_role()
sbatch, srun = SlurmReplicaRequest.from_role(
"role-0", role, cfg={}
"role-0", role, cfg={}, nomem=False
).materialize()
self.assertEqual(
sbatch,
Expand All @@ -109,7 +118,10 @@ def test_replica_request(self) -> None:

def test_replica_request_nomem(self) -> None:
sbatch, srun = SlurmReplicaRequest.from_role(
"role-name", simple_role(), cfg={"nomem": True}
"role-name",
simple_role(),
cfg={},
nomem=True,
).materialize()
self.assertEqual(
sbatch,
Expand All @@ -123,7 +135,10 @@ def test_replica_request_nomem(self) -> None:

def test_replica_request_constraint(self) -> None:
sbatch, srun = SlurmReplicaRequest.from_role(
"role-name", simple_role(), cfg={"constraint": "orange"}
"role-name",
simple_role(),
cfg={"constraint": "orange"},
nomem=False,
).materialize()
self.assertIn(
"--constraint=orange",
Expand All @@ -137,7 +152,9 @@ def test_replica_request_app_id(self) -> None:
entrypoint="echo",
args=[f"hello {specs.macros.app_id}"],
)
_, srun = SlurmReplicaRequest.from_role("role-name", role, cfg={}).materialize()
_, srun = SlurmReplicaRequest.from_role(
"role-name", role, cfg={}, nomem=False
).materialize()
self.assertIn(
"echo 'hello '\"$SLURM_JOB_ID\"''",
" ".join(srun),
Expand All @@ -156,7 +173,9 @@ def test_replica_request_run_config(self) -> None:
"time": "5:13",
}

sbatch, _ = SlurmReplicaRequest.from_role("role-name", role, cfg).materialize()
sbatch, _ = SlurmReplicaRequest.from_role(
"role-name", role, cfg, nomem=False
).materialize()

run_opts = scheduler.run_opts()

Expand Down Expand Up @@ -205,8 +224,12 @@ def test_dryrun_multi_role(self) -> None:
""",
)

@patch(
"torchx.schedulers.slurm_scheduler.SlurmScheduler._partition_memmb",
return_value=2048,
)
@patch("subprocess.run")
def test_run_multi_role(self, run: MagicMock) -> None:
def test_run_multi_role(self, run: MagicMock, partition_memmb: MagicMock) -> None:
run.return_value.stdout = b"1234"
scheduler = create_scheduler("foo")
app = specs.AppDef(
Expand Down Expand Up @@ -402,6 +425,25 @@ def test_log_iter(self, run: MagicMock) -> None:
with self.assertRaises(ValueError):
scheduler.log_iter("54", "echo", 1, streams=Stream.COMBINED)

@patch("subprocess.run")
def test_dryrun_nomem(self, run: MagicMock) -> None:
run.return_value.returncode = 0

scheduler = create_scheduler("foo")
app = mem_app()

run.return_value.stdout = b"PARTITION,MEMORY\nfoo*,5000"
info = scheduler.submit_dryrun(app, cfg={})
self.assertIn("mem", info.request.replicas["foo-0"].sbatch_opts)

run.return_value.stdout = b"PARTITION,MEMORY\nfoo*,1"
info = scheduler.submit_dryrun(app, cfg={})
self.assertNotIn("mem", info.request.replicas["foo-0"].sbatch_opts)

run.return_value.stdout = b""
info = scheduler.submit_dryrun(app, cfg={})
self.assertIn("mem", info.request.replicas["foo-0"].sbatch_opts)

def test_dryrun_comment(self) -> None:
scheduler = create_scheduler("foo")
app = simple_app()
Expand Down Expand Up @@ -435,8 +477,14 @@ def test_dryrun_mail(self) -> None:
info.request.cmd,
)

@patch(
"torchx.schedulers.slurm_scheduler.SlurmScheduler._partition_memmb",
return_value=2048,
)
@patch("subprocess.run")
def test_run_workspace_job_dir(self, run: MagicMock) -> None:
def test_run_workspace_job_dir(
self, run: MagicMock, partition_memmb: MagicMock
) -> None:
with tmp_cwd():
run.return_value.stdout = b"1234"
scheduler = create_scheduler("foo")
Expand All @@ -461,3 +509,24 @@ def test_run_workspace_job_dir(self, run: MagicMock) -> None:
"dir/torchx-sbatch.sh",
],
)

@patch("subprocess.run")
def test_partition_memmb(self, run: MagicMock) -> None:
scheduler = create_scheduler("foo")

ret = run.return_value
ret.returncode = 0
ret.stdout = b"""
PARTITION,MEMORY
scavenge,500000+
compute*,1
"""
self.assertEqual(scheduler._partition_memmb(None), 1)
self.assertEqual(scheduler._partition_memmb("compute"), 1)
self.assertEqual(scheduler._partition_memmb("nonexistant"), None)
self.assertEqual(scheduler._partition_memmb("scavenge"), 500000)

ret.stdout = b"""
PARTITION,MEMORY
"""
self.assertEqual(scheduler._partition_memmb(None), None)