Skip to content

slurm_scheduler: add support for per replica log files and API access #373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions scripts/slurmtest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ source "$VENV"/bin/activate
python --version
pip install "$REMOTE_WHEEL"

APP_ID="$(torchx run --wait --scheduler slurm --scheduler_args partition=compute,time=10 utils.echo --image /tmp --num_replicas 3)"
APP_ID="$(torchx run --wait --scheduler slurm --scheduler_args partition=compute,time=10 utils.echo --num_replicas 3)"
torchx status "$APP_ID"
torchx describe "$APP_ID"
LOG_FILE="slurm-$(basename "$APP_ID").out"
cat "$LOG_FILE"
LINES="$(wc -l "$LOG_FILE" | cut -d' ' -f1)"
sacct -j "$(basename "$APP_ID")"
torchx log "$APP_ID"
LINES="$(torchx log "$APP_ID" | wc -l)"

if [ "$LINES" -ne 3 ]
then
Expand Down
11 changes: 8 additions & 3 deletions torchx/schedulers/local_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,14 +909,19 @@ def __del__(self) -> None:

class LogIterator:
def __init__(
self, app_id: str, regex: str, log_file: str, scheduler: LocalScheduler
self,
app_id: str,
regex: str,
log_file: str,
scheduler: Scheduler,
should_tail: bool = True,
) -> None:
self._app_id: str = app_id
self._regex: Pattern[str] = re.compile(regex)
self._log_file: str = log_file
self._log_fp: Optional[TextIO] = None
self._scheduler: LocalScheduler = scheduler
self._app_finished: bool = False
self._scheduler: Scheduler = scheduler
self._app_finished: bool = not should_tail

def _check_finished(self) -> None:
# either the app (already finished) was evicted from the LRU cache
Expand Down
85 changes: 68 additions & 17 deletions torchx/schedulers/slurm_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
import shlex
import subprocess
import tempfile
import warnings
from dataclasses import dataclass
from typing import Any, Dict, List, Mapping, Optional, Tuple
from datetime import datetime
from typing import Any, Dict, List, Mapping, Optional, Tuple, Iterable

from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse, Scheduler
from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse, Scheduler, Stream
from torchx.schedulers.local_scheduler import LogIterator
from torchx.specs import (
NONE,
AppDef,
Expand Down Expand Up @@ -100,26 +103,41 @@ def from_role(
if resource.gpu > 0:
sbatch_opts.setdefault("gpus-per-task", str(resource.gpu))

srun_opts = {
"output": f"slurm-{macros.app_id}-{name}.out",
}

return cls(
name=name,
entrypoint=role.entrypoint,
args=list(role.args),
sbatch_opts=sbatch_opts,
srun_opts={},
srun_opts=srun_opts,
env=dict(role.env),
)

def _opts_to_strs(self, opts: Dict[str, str]) -> List[str]:
out = []
for key, value in opts.items():
if value is not None:
out.append(f"--{key}={value}")
else:
out.append(f"--{key}")
return out

def materialize(self) -> Tuple[List[str], List[str]]:
"""
materialize returns the sbatch and srun groups for this role. They
should be combined using `:` per slurm heterogenous groups.
"""
sbatch_args = [
f"--job-name={self.name}",
] + [f"--{key}={value}" for key, value in self.sbatch_opts.items()]
srun_args = [f"--{key}={value}" for key, value in self.srun_opts.items()] + [
f"--export={key}={value}" for key, value in self.env.items()
]
] + self._opts_to_strs(self.sbatch_opts)
srun_args = self._opts_to_strs(self.srun_opts)

if len(self.env) > 0:
kvs = [f"{key}={value}" for key, value in self.env.items()]
srun_args += ["--export=ALL," + ",".join(kvs)]

srun_group = srun_args + [self.entrypoint] + self.args
srun_group = [_apply_app_id_env(arg) for arg in srun_group]
Expand Down Expand Up @@ -160,6 +178,9 @@ def materialize(self) -> str:
# exit on error
set -e

export PYTHONUNBUFFERED=1
export SLURM_UNBUFFEREDIO=1

srun {" ".join(srun_groups)}
"""
sbatch_cmd = self.cmd + sbatch_groups
Expand All @@ -176,7 +197,11 @@ class SlurmScheduler(Scheduler):
resource allocations and args and then sbatch is used to launch all of them
together.

Logs are written to the default slurm log file.
Logs are available in combined form via ``torchx log``, the programmatic API
as well as in the job launch directory as
``slurm-<jobid>-<role>-<replica_id>.out``. If TorchX is running in a
different directory than where the job was created the logs won't be able to
be found.

Some of the config options passed to it are added as SBATCH arguments to each
replica. See https://slurm.schedmd.com/sbatch.html#SECTION_OPTIONS for info
Expand All @@ -203,9 +228,7 @@ class SlurmScheduler(Scheduler):
type: scheduler
features:
cancel: true
logs: |
Logs are accessible via the default slurm log file but not the
programmatic API.
logs: true
distributed: true
describe: |
Partial support. SlurmScheduler will return job and replica
Expand Down Expand Up @@ -262,7 +285,7 @@ def _submit_dryrun(
app_id=macros.app_id,
replica_id=str(replica_id),
)
name = f"{app.name}-{role.name}-{replica_id}"
name = f"{role.name}-{replica_id}"
replica_role = values.apply(role)
replicas[name] = SlurmReplicaRequest.from_role(name, replica_role, cfg)
req = SlurmBatchRequest(
Expand Down Expand Up @@ -308,19 +331,19 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
), f"failed to translate slurm state {state} to torchx state"
app_state = state_enum

name_parts = row["JobName"].split("-")
if len(name_parts) < 3:
role, _, replica_id = row["JobName"].rpartition("-")
if not replica_id or not role:
# name should always have at least 3 parts but sometimes sacct
# is slow to update
continue
role = name_parts[-2]
replica_id = int(name_parts[-1])
if role not in roles:
roles[role] = Role(name=role, num_replicas=0, image="")
roles_statuses[role] = RoleStatus(role, [])
roles[role].num_replicas += 1
roles_statuses[role].replicas.append(
ReplicaStatus(id=replica_id, role=role, state=app_state, hostname=""),
ReplicaStatus(
id=int(replica_id), role=role, state=app_state, hostname=""
),
)

return DescribeAppResponse(
Expand All @@ -331,6 +354,34 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
msg=msg,
)

def log_iter(
self,
app_id: str,
role_name: str,
k: int = 0,
regex: Optional[str] = None,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
should_tail: bool = False,
streams: Optional[Stream] = None,
) -> Iterable[str]:
if since or until:
warnings.warn(
"since and/or until times specified for SlurmScheduler.log_iter."
" These will be ignored and all log lines will be returned"
)
if streams is not None and streams != Stream.COMBINED:
warnings.warn(
"streams specified for SlurmScheduler.log_iter."
" These will be ignored and all log lines will be returned"
)

log_file = f"slurm-{app_id}-{role_name}-{k}.out"

return LogIterator(
app_id, regex or ".*", log_file, self, should_tail=should_tail
)


def create_scheduler(session_name: str, **kwargs: Any) -> SlurmScheduler:
return SlurmScheduler(
Expand Down
Loading