From 7fefe001d82a2f95a86f76f261594e2ab5f02ba1 Mon Sep 17 00:00:00 2001 From: Tristan Rice Date: Thu, 14 Apr 2022 19:36:13 -0700 Subject: [PATCH] slurm_scheduler: autodetect nomem setting --- scripts/slurmtest.sh | 1 - torchx/schedulers/slurm_scheduler.py | 59 +++++++++++-- .../schedulers/test/slurm_scheduler_test.py | 83 +++++++++++++++++-- 3 files changed, 126 insertions(+), 17 deletions(-) diff --git a/scripts/slurmtest.sh b/scripts/slurmtest.sh index 116f790ec..d57fe8261 100755 --- a/scripts/slurmtest.sh +++ b/scripts/slurmtest.sh @@ -33,7 +33,6 @@ cat < .torchxconfig partition=compute time=10 comment=hello -nomem=true job_dir=$JOB_DIR EOT diff --git a/torchx/schedulers/slurm_scheduler.py b/torchx/schedulers/slurm_scheduler.py index 4a211e32a..b9637e7bf 100644 --- a/torchx/schedulers/slurm_scheduler.py +++ b/torchx/schedulers/slurm_scheduler.py @@ -94,7 +94,7 @@ class SlurmReplicaRequest: @classmethod def from_role( - cls, name: str, role: Role, cfg: Mapping[str, CfgVal] + cls, name: str, role: Role, cfg: Mapping[str, CfgVal], nomem: bool ) -> "SlurmReplicaRequest": """ ``from_role`` creates a SlurmReplicaRequest for the specific role and @@ -112,7 +112,7 @@ def from_role( if resource != NONE: if resource.cpu > 0: sbatch_opts.setdefault("cpus-per-task", str(resource.cpu)) - if not cfg.get("nomem") and resource.memMB > 0: + if not nomem and resource.memMB > 0: sbatch_opts.setdefault("mem", str(resource.memMB)) if resource.gpu > 0: sbatch_opts.setdefault("gpus-per-task", str(resource.gpu)) @@ -272,6 +272,9 @@ class SlurmScheduler(Scheduler, DirWorkspace): If ``job_dir`` is specified the DirWorkspace will create a new isolated directory with a snapshot of the workspace. mounts: false + + If a partition has less than 1GB of RealMemory configured we disable memory + requests to workaround https://github.com/aws/aws-parallelcluster/issues/2198. """ def __init__(self, session_name: str) -> None: @@ -293,12 +296,6 @@ def run_opts(self) -> runopts: "minutes", "minutes:seconds", "hours:minutes:seconds", "days-hours", \ "days-hours:minutes" or "days-hours:minutes:seconds"', ) - opts.add( - "nomem", - type_=bool, - default=False, - help="disables memory request to workaround https://github.com/aws/aws-parallelcluster/issues/2198", - ) opts.add( "comment", type_=str, @@ -350,12 +347,51 @@ def schedule(self, dryrun_info: AppDryRunInfo[SlurmBatchRequest]) -> str: return job_id + def _partition_memmb(self, partition: Optional[str]) -> Optional[int]: + """ + _partition_memmb returns the memory allocation for the given partition + or the default partition if none is specified. + """ + try: + p = subprocess.run( + ["sinfo", "--format", "%P,%m", "--noconvert"], + stdout=subprocess.PIPE, + ) + except FileNotFoundError: + return None + if p.returncode != 0: + return None + output = p.stdout.decode("utf-8").strip().split("\n") + if len(output) <= 1: + return None + + reader = csv.DictReader(output, delimiter=",") + for row in reader: + part = row.get("PARTITION") + mem = row.get("MEMORY") + if part is None or mem is None: + continue + default = "*" in part + part = part.strip("*") + memmb = int(mem.strip("M+")) + if part == partition or (partition is None and default): + return memmb + return None + def _submit_dryrun( self, app: AppDef, cfg: Mapping[str, CfgVal] ) -> AppDryRunInfo[SlurmBatchRequest]: job_dir = cfg.get("job_dir") assert job_dir is None or isinstance(job_dir, str), "job_dir must be str" + partition = cfg.get("partition") + assert partition is None or isinstance(partition, str), "partition must be str" + + # check if the partition has at least 1GB memory, if we're not sure, + # default to using memory allocations + memmb = self._partition_memmb(partition) + nomem = memmb is not None and memmb <= 1000 + replicas = {} for role in app.roles: for replica_id in range(role.num_replicas): @@ -367,7 +403,12 @@ def _submit_dryrun( ) name = f"{role.name}-{replica_id}" replica_role = values.apply(role) - replicas[name] = SlurmReplicaRequest.from_role(name, replica_role, cfg) + replicas[name] = SlurmReplicaRequest.from_role( + name, + replica_role, + cfg, + nomem=nomem, + ) cmd = ["sbatch", "--parsable"] for k in SBATCH_JOB_OPTIONS: diff --git a/torchx/schedulers/test/slurm_scheduler_test.py b/torchx/schedulers/test/slurm_scheduler_test.py index 59439d83e..00dc48bd6 100644 --- a/torchx/schedulers/test/slurm_scheduler_test.py +++ b/torchx/schedulers/test/slurm_scheduler_test.py @@ -75,6 +75,15 @@ def simple_app() -> specs.AppDef: ) +def mem_app() -> specs.AppDef: + return specs.AppDef( + name="foo", + roles=[ + simple_role(), + ], + ) + + class SlurmSchedulerTest(unittest.TestCase): def test_create_scheduler(self) -> None: scheduler = create_scheduler("foo") @@ -83,7 +92,7 @@ def test_create_scheduler(self) -> None: def test_replica_request(self) -> None: role = simple_role() sbatch, srun = SlurmReplicaRequest.from_role( - "role-0", role, cfg={} + "role-0", role, cfg={}, nomem=False ).materialize() self.assertEqual( sbatch, @@ -109,7 +118,10 @@ def test_replica_request(self) -> None: def test_replica_request_nomem(self) -> None: sbatch, srun = SlurmReplicaRequest.from_role( - "role-name", simple_role(), cfg={"nomem": True} + "role-name", + simple_role(), + cfg={}, + nomem=True, ).materialize() self.assertEqual( sbatch, @@ -123,7 +135,10 @@ def test_replica_request_nomem(self) -> None: def test_replica_request_constraint(self) -> None: sbatch, srun = SlurmReplicaRequest.from_role( - "role-name", simple_role(), cfg={"constraint": "orange"} + "role-name", + simple_role(), + cfg={"constraint": "orange"}, + nomem=False, ).materialize() self.assertIn( "--constraint=orange", @@ -137,7 +152,9 @@ def test_replica_request_app_id(self) -> None: entrypoint="echo", args=[f"hello {specs.macros.app_id}"], ) - _, srun = SlurmReplicaRequest.from_role("role-name", role, cfg={}).materialize() + _, srun = SlurmReplicaRequest.from_role( + "role-name", role, cfg={}, nomem=False + ).materialize() self.assertIn( "echo 'hello '\"$SLURM_JOB_ID\"''", " ".join(srun), @@ -156,7 +173,9 @@ def test_replica_request_run_config(self) -> None: "time": "5:13", } - sbatch, _ = SlurmReplicaRequest.from_role("role-name", role, cfg).materialize() + sbatch, _ = SlurmReplicaRequest.from_role( + "role-name", role, cfg, nomem=False + ).materialize() run_opts = scheduler.run_opts() @@ -205,8 +224,12 @@ def test_dryrun_multi_role(self) -> None: """, ) + @patch( + "torchx.schedulers.slurm_scheduler.SlurmScheduler._partition_memmb", + return_value=2048, + ) @patch("subprocess.run") - def test_run_multi_role(self, run: MagicMock) -> None: + def test_run_multi_role(self, run: MagicMock, partition_memmb: MagicMock) -> None: run.return_value.stdout = b"1234" scheduler = create_scheduler("foo") app = specs.AppDef( @@ -402,6 +425,25 @@ def test_log_iter(self, run: MagicMock) -> None: with self.assertRaises(ValueError): scheduler.log_iter("54", "echo", 1, streams=Stream.COMBINED) + @patch("subprocess.run") + def test_dryrun_nomem(self, run: MagicMock) -> None: + run.return_value.returncode = 0 + + scheduler = create_scheduler("foo") + app = mem_app() + + run.return_value.stdout = b"PARTITION,MEMORY\nfoo*,5000" + info = scheduler.submit_dryrun(app, cfg={}) + self.assertIn("mem", info.request.replicas["foo-0"].sbatch_opts) + + run.return_value.stdout = b"PARTITION,MEMORY\nfoo*,1" + info = scheduler.submit_dryrun(app, cfg={}) + self.assertNotIn("mem", info.request.replicas["foo-0"].sbatch_opts) + + run.return_value.stdout = b"" + info = scheduler.submit_dryrun(app, cfg={}) + self.assertIn("mem", info.request.replicas["foo-0"].sbatch_opts) + def test_dryrun_comment(self) -> None: scheduler = create_scheduler("foo") app = simple_app() @@ -435,8 +477,14 @@ def test_dryrun_mail(self) -> None: info.request.cmd, ) + @patch( + "torchx.schedulers.slurm_scheduler.SlurmScheduler._partition_memmb", + return_value=2048, + ) @patch("subprocess.run") - def test_run_workspace_job_dir(self, run: MagicMock) -> None: + def test_run_workspace_job_dir( + self, run: MagicMock, partition_memmb: MagicMock + ) -> None: with tmp_cwd(): run.return_value.stdout = b"1234" scheduler = create_scheduler("foo") @@ -461,3 +509,24 @@ def test_run_workspace_job_dir(self, run: MagicMock) -> None: "dir/torchx-sbatch.sh", ], ) + + @patch("subprocess.run") + def test_partition_memmb(self, run: MagicMock) -> None: + scheduler = create_scheduler("foo") + + ret = run.return_value + ret.returncode = 0 + ret.stdout = b""" +PARTITION,MEMORY +scavenge,500000+ +compute*,1 +""" + self.assertEqual(scheduler._partition_memmb(None), 1) + self.assertEqual(scheduler._partition_memmb("compute"), 1) + self.assertEqual(scheduler._partition_memmb("nonexistant"), None) + self.assertEqual(scheduler._partition_memmb("scavenge"), 500000) + + ret.stdout = b""" +PARTITION,MEMORY +""" + self.assertEqual(scheduler._partition_memmb(None), None)