Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce support for slurm jobs with bad constraints #3403

Merged
merged 2 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 82 additions & 41 deletions workflows/pipe-common/pipeline/hpc/engine/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def get_jobs(self):
except ExecutionError:
Logger.warn('Slurm jobs listing has failed.')
return []
return self._parse_jobs(output)
return list(self._parse_jobs(output))

def disable_host(self, host):
self.cmd_executor.execute(SlurmGridEngine._SCONTROL_UPDATE_NODE_STATE % ("DRAIN", host))
Expand Down Expand Up @@ -96,38 +96,87 @@ def _get_host_state(self, host):
return "UNKNOWN"

def _parse_jobs(self, scontrol_jobs_output):
jobs = []
jobs_des_lines = [line for line in scontrol_jobs_output.splitlines() if "JobId=" in line]
for job_desc in jobs_des_lines:
for job_desc in scontrol_jobs_output.splitlines():
if 'JobId=' not in job_desc:
continue

job_dict = self._parse_dict(job_desc)
resources = self._parse_dict(job_dict.get("TRES"), line_sep=",")
general_resources = self._parse_dict(job_dict.get("GRES"), line_sep=",")
num_node = int(re.match("(\\d+)-?.*", job_dict.get("NumNodes", "1")).group(1))
# Splitting one job on 'num_node' ephemeral jobs. The idea is to instruct autoscaler that we need to spread
# this job to `num_node` nodes and provide portion of resources
# TODO maybe there is another way to achieve that?
for node_idx in range(num_node):
job_state = GridEngineJobState.from_letter_code(job_dict.get("JobState"))
if job_state == GridEngineJobState.PENDING:
# In certain cases pending job's start date can be estimated start date.
# It confuses autoscaler and therefore should be ignored.
job_dict["StartTime"] = "Unknown"
jobs.append(
GridEngineJob(
id=job_dict.get("JobId") + "_" + str(node_idx),
root_id=job_dict.get("JobId"),
name=job_dict.get("JobName"),
user=self._parse_user(job_dict.get("UserId")),
state=job_state,
datetime=self._parse_date(
job_dict.get("StartTime") if job_dict.get("StartTime") != "Unknown" else job_dict.get("SubmitTime")),
hosts=self._parse_nodelist(job_dict.get("NodeList")),
cpu=int(job_dict.get("NumCPUs", "1")) // num_node,
gpu=0 if "gpu" not in general_resources else int(general_resources.get("gpu")) // num_node,
mem=self._parse_mem(self._find_memory_value(job_dict, resources))
)

root_job_id = job_dict.get('JobId')
job_name = job_dict.get('JobName')
job_user = self._parse_user(job_dict.get('UserId'))
job_hosts = self._parse_nodelist(job_dict.get('NodeList'))

# -> NumTasks=1
# ? -> NumTasks=N/A
# -n 20 -> NumTasks=20
# -N 20 -> NumTasks=20 NumNodes=20
num_tasks_str = job_dict.get('NumTasks', '1')
num_tasks = int(num_tasks_str) if num_tasks_str.isdigit() else 1

job_state = GridEngineJobState.from_letter_code(job_dict.get('JobState'))
if job_state == GridEngineJobState.PENDING:
# In certain cases pending job's start date can be estimated start date.
# It confuses autoscaler and therefore should be ignored.
job_dict['StartTime'] = 'Unknown'
job_datetime = self._parse_date(job_dict.get('StartTime') if job_dict.get('StartTime') != 'Unknown'
else job_dict.get('SubmitTime'))

# -c 20 -> MinCPUsNode=20
cpu_per_node = int(job_dict.get('MinCPUsNode', '1'))
job_cpu = cpu_per_node

# --gpus 20 -> TresPerJob=gres:gpu:20
# --gpus-per-job 20 -> TresPerJob=gres:gpu:20
# --gpus-per-task 20 -> TresPerTask=gres:gpu:20
# --gpus-per-node 20 -> TresPerNode=gres:gpu:20
tres_per_job = self._parse_tres(job_dict.get('TresPerJob', 'gres:gpu:0'))
tres_per_node = self._parse_tres(job_dict.get('TresPerNode', 'gres:gpu:0'))
tres_per_task = self._parse_tres(job_dict.get('TresPerTask', 'gres:gpu:0'))
gpu_per_job = int(tres_per_job.get('gpu', '0'))
gpu_per_node = int(tres_per_node.get('gpu', '0'))
gpu_per_task = int(tres_per_task.get('gpu', '0'))
job_gpu = max(int(gpu_per_job / num_tasks), gpu_per_node, gpu_per_task)

# --mem 200M -> MinMemoryNode=200M
# --mem-per-cpu 200M -> MinMemoryCPU=200M
# --mem-per-gpu 200M -> MemPerTres=gres:gpu:200
mem_per_tres = self._parse_tres(job_dict.get('MemPerTres', 'gres:gpu:0'))
mem_per_node = self._parse_mem(job_dict.get('MinMemoryNode', '0M'))
mem_per_cpu = self._parse_mem(job_dict.get('MinMemoryCPU', '0M'))
mem_per_gpu = self._parse_mem(mem_per_tres.get('gpu', '0') + 'M')
job_mem = max(mem_per_node, mem_per_cpu * job_cpu, mem_per_gpu * job_gpu)

for task_idx in range(num_tasks):
job_id = root_job_id + '_' + str(task_idx)
yield GridEngineJob(
id=job_id,
root_id=root_job_id,
name=job_name,
user=job_user,
state=job_state,
datetime=job_datetime,
hosts=job_hosts,
cpu=job_cpu,
gpu=job_gpu,
mem=job_mem
)
return jobs

def _parse_tres(self, tres_str):
if not tres_str:
return {}

tres_dict = {}
for tres_str in tres_str.split(','):
tres_items = tres_str.split(':')
if len(tres_items) == 3:
tres_type, tres_group, tres_value = tres_items
elif len(tres_items) == 4:
tres_type, tres_group, tres_name, tres_value = tres_items
else:
continue
tres_dict[tres_group] = tres_value
return tres_dict

def _parse_date(self, date):
return datetime.strptime(date, SlurmGridEngine._SCONTROL_DATETIME_FORMAT)
Expand All @@ -142,20 +191,12 @@ def _parse_dict(self, text, line_sep=" ", value_sep="="):
]
}

def _find_memory_value(self, job_dict, resource_dict):
if "MinMemoryNode" in job_dict:
return job_dict.get("MinMemoryNode")
elif "mem" in resource_dict:
return resource_dict.get("mem")
else:
return "0M"

def _parse_mem(self, mem_request):
if not mem_request:
return 0
modifiers = {
'k': 1000, 'm': 1000 ** 2, 'g': 1000 ** 3,
'K': 1024, 'M': 1024 ** 2, 'G': 1024 ** 3
'k': 1000, 'm': 1000 ** 2, 'g': 1000 ** 3, 't': 1000 ** 4,
'K': 1024, 'M': 1024 ** 2, 'G': 1024 ** 3, 'T': 1024 ** 4,
}
if mem_request[-1] in modifiers:
number = int(mem_request[:-1])
Expand Down
145 changes: 94 additions & 51 deletions workflows/pipe-common/shell/slurm_setup_master
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,39 @@ SLURM_MASTER_SETUP_TASK="SLURMMasterSetup"
SLURM_MASTER_SETUP_TASK_WORKERS="SLURMMasterSetupWorkers"
CURRENT_PID=$$

resolve_node_resources() {
_TASK="$1"

export _NODE_CPU_COUNT=$(nproc)
pipe_log_info "$_NODE_CPU_COUNT CPUs found" "$_TASK"

export _NODE_RAM_COUNT=$(grep MemTotal /proc/meminfo | awk '{print int($2 / 1024)}')
pipe_log_info "${_NODE_RAM_COUNT}M RAM found" "$_TASK"

export _NODE_GPUS_COUNT=$(nvidia-smi -L 2>/dev/null | wc -l)
export _NODE_GPUS_COUNT="${_NODE_GPUS_COUNT:-0}"
pipe_log_info "$_NODE_GPUS_COUNT GPUs found" "$_TASK"
}

configure_consumable_gpu_resource() {
_RESOURCE_VALUE="$1"

if (( _RESOURCE_VALUE > 0 )); then
for device in $(ls /dev/ | grep -E "nvidia[0-9]+") ; do
echo "Name=gpu File=/dev/$device" >> "$_SLURM_CONFIG_LOCATION/gres.conf"
done
fi
}

resolve_node_names() {
if [ ! -f "$DEFAULT_HOSTFILE" ]; then
export _NODE_NAMES="$(hostname)"
else
IFS=$'\n' read -d '' -r -a _NODE_NAMES < "$DEFAULT_HOSTFILE"
export _NODE_NAMES
fi
}

configure_slurm() {
_SLURM_CONFIG_LOCATION=$( slurm_config_location )

Expand All @@ -38,14 +71,29 @@ configure_slurm() {
/var/log/slurm_jobacct.log \
/var/log/slurm_jobcomp.log

cat > ${SLURM_COMMON_CONFIG_DIR}/cgroup.conf <<EOL
dd if=/dev/urandom bs=1 count=1024 > ${SLURM_COMMON_CONFIG_DIR}/munge.key
cp ${SLURM_COMMON_CONFIG_DIR}/munge.key /etc/munge/
chown munge: /etc/munge/munge.key
chmod 400 /etc/munge/munge.key
su -c /usr/sbin/munged -s /bin/bash munge

touch ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
touch ${SLURM_COMMON_CONFIG_DIR}/cgroup.conf

ln -s ${SLURM_COMMON_CONFIG_DIR}/slurm.conf "$_SLURM_CONFIG_LOCATION"
ln -s ${SLURM_COMMON_CONFIG_DIR}/cgroup.conf "$_SLURM_CONFIG_LOCATION"

resolve_node_resources "$SLURM_MASTER_SETUP_TASK"
configure_consumable_gpu_resource "$_NODE_GPUS_COUNT"

cat >> ${SLURM_COMMON_CONFIG_DIR}/cgroup.conf <<EOL
CgroupAutomount=yes
CgroupMountpoint=/cgroup
ConstrainCores=no
ConstrainRAMSpace=no
EOL

cat > ${SLURM_COMMON_CONFIG_DIR}/slurm.conf <<EOL
cat >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf <<EOL
ControlMachine=$HOSTNAME
#
MpiDefault=none
Expand All @@ -72,65 +120,52 @@ JobAcctGatherType=jobacct_gather/none
SlurmctldLogFile=/var/log/slurmctld.log
SlurmdLogFile=/var/log/slurmd.log

#
# COMPUTE NODES
EOL

_WORKER_CORES=$(nproc)
_NODE_GPUS_COUNT=$(nvidia-smi -L 2>/dev/null | wc -l)
_NODE_RAM_COUNT=$(grep MemTotal /proc/meminfo | awk '{print int($2 / 1024)}')
CP_CAP_SGE_MASTER_CORES="${CP_CAP_SGE_MASTER_CORES:-999999999}"
_WORKER_CORES=$((_WORKER_CORES < CP_CAP_SGE_MASTER_CORES ? _WORKER_CORES : CP_CAP_SGE_MASTER_CORES))
# RESOURCES
GresTypes=gpu

if (( _NODE_GPUS_COUNT > 0 ))
then
echo "GresTypes=gpu" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
for device in $(ls /dev/ | grep -E "nvidia[0-9]+") ; do
echo "Name=gpu File=/dev/$device" >> /$_SLURM_CONFIG_LOCATION/gres.conf
done
fi

if [ ! -f "$DEFAULT_HOSTFILE" ]; then
_NODE_NAMES="$(hostname)"
else
IFS=$'\n' read -d '' -r -a _NODE_NAMES < "$DEFAULT_HOSTFILE"
fi

for _NODE in ${_NODE_NAMES[*]} ; do
if (( _NODE_GPUS_COUNT > 0 ))
then
echo "NodeName=$_NODE NodeHostname=$_NODE NodeAddr=$(getent hosts $_NODE | awk '{ print $1 }') CPUs=$_WORKER_CORES RealMemory=$_NODE_RAM_COUNT Gres=gpu:$_NODE_GPUS_COUNT State=UNKNOWN" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
else
echo "NodeName=$_NODE NodeHostname=$_NODE NodeAddr=$(getent hosts $_NODE | awk '{ print $1 }') CPUs=$_WORKER_CORES RealMemory=$_NODE_RAM_COUNT State=UNKNOWN" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
fi
done
echo "PartitionName=main.q Nodes=ALL Default=YES MaxTime=INFINITE State=UP" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
EOL

echo "# EXTRAS" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
if [ ! -z $CP_SLURM_LICENSES ]; then
echo "Licenses=$CP_SLURM_LICENSES" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
fi

if check_cp_cap CP_CAP_GRID_ENGINE_NOTIFICATIONS; then
echo "MailProg=$COMMON_REPO_DIR_MUTUAL_LOC/shell/pipe_mail" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
fi
if check_cp_cap CP_CAP_AUTOSCALE; then
echo -e "\n# Dynamic cluster configuration" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
echo "#DYNAMIC CLUSTER" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
echo "MaxNodeCount=$(( ${CP_CAP_AUTOSCALE_WORKERS:-0} + ${node_count:-0} + 1 ))" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
echo "MaxNodeCount=1000" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
echo "TreeWidth=65533" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
fi
echo "" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf

# Override default mail program for Slurm
if check_cp_cap CP_CAP_GRID_ENGINE_NOTIFICATIONS; then
echo -e "\n# Mail notification configuration" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
echo "MailProg=$COMMON_REPO_DIR_MUTUAL_LOC/shell/pipe_mail" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
echo "# PARTITIONS" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
echo "PartitionName=$CP_CAP_SGE_QUEUE_NAME Nodes=ALL Default=YES MaxTime=INFINITE State=UP" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
echo "" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf

echo "# NODES" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
if check_cp_cap CP_CAP_SLURM_SYNTHETIC_WORKER; then
echo "NodeName=synthetic-worker NodeHostname=synthetic-worker NodeAddr=127.0.0.1 CPUs=10000 Gres=gpu:10000 RealMemory=1073741824 State=DOWN Reason=\"Synthetic Worker\"" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
fi

dd if=/dev/urandom bs=1 count=1024 > ${SLURM_COMMON_CONFIG_DIR}/munge.key
cp ${SLURM_COMMON_CONFIG_DIR}/munge.key /etc/munge/
chown munge: /etc/munge/munge.key
chmod 400 /etc/munge/munge.key
su -c /usr/sbin/munged -s /bin/bash munge
CP_CAP_SGE_MASTER_CORES="${CP_CAP_SGE_MASTER_CORES:-999999999}"
if [ "$CP_CAP_SGE_MASTER_CORES" == "0" ]; then
pipe_log_info "CP_CAP_SGE_MASTER_CORES is set to 0. Master host was disabled in $CP_CAP_SGE_QUEUE_NAME queue" "$SLURM_MASTER_SETUP_TASK"
else
_NODE="$(hostname)"
_NODE_ADDR="$(getent hosts $_NODE | awk '{ print $1 }')"
_MASTER_NODE_CPU_COUNT=$((_NODE_CPU_COUNT < CP_CAP_SGE_MASTER_CORES ? _NODE_CPU_COUNT : CP_CAP_SGE_MASTER_CORES))
echo "NodeName=$_NODE NodeHostname=$_NODE NodeAddr=$_NODE_ADDR CPUs=$_MASTER_NODE_CPU_COUNT RealMemory=$_NODE_RAM_COUNT Gres=gpu:$_NODE_GPUS_COUNT State=UNKNOWN" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
fi

ln -s ${SLURM_COMMON_CONFIG_DIR}/slurm.conf "$_SLURM_CONFIG_LOCATION"
ln -s ${SLURM_COMMON_CONFIG_DIR}/cgroup.conf "$_SLURM_CONFIG_LOCATION"
resolve_node_names
for _NODE in ${_NODE_NAMES[*]} ; do
if [ "$_NODE" == "$(hostname)" ]; then
continue
fi
_NODE_ADDR="$(getent hosts $_NODE | awk '{ print $1 }')"
echo "NodeName=$_NODE NodeHostname=$_NODE NodeAddr=$_NODE_ADDR CPUs=$_NODE_CPU_COUNT RealMemory=$_NODE_RAM_COUNT Gres=gpu:$_NODE_GPUS_COUNT State=UNKNOWN" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
done
echo "" >> ${SLURM_COMMON_CONFIG_DIR}/slurm.conf
}

check_last_exit_code() {
Expand Down Expand Up @@ -232,6 +267,8 @@ install_slurm_rhel() {

pipe_log_info "Installing SLURM master" "$SLURM_MASTER_SETUP_TASK"

export CP_CAP_SGE_QUEUE_NAME="${CP_CAP_SGE_QUEUE_NAME:-main.q}"
export CP_CAP_SLURM_SYNTHETIC_WORKER="${CP_CAP_SLURM_SYNTHETIC_WORKER:-true}"
export CP_SLURM_SOURCE_URL="${CP_SLURM_SOURCE_URL:-"${GLOBAL_DISTRIBUTION_URL}tools/slurm/slurm-22.05.5.tar.bz2"}"
export CP_SLURM_PACKAGE_DEB_URL="${CP_SLURM_PACKAGE_DEB_URL:-"${GLOBAL_DISTRIBUTION_URL}tools/slurm/deb/slurm_22.05.5_amd64.deb"}"
export CP_SLURM_PACKAGE_RPM_URL="${CP_SLURM_PACKAGE_RPM_URL:-"${GLOBAL_DISTRIBUTION_URL}tools/slurm/rpm/slurm-22.05.5-1.el7.x86_64.tar"}"
Expand All @@ -254,8 +291,14 @@ else
pipe_log_info "SLURM cluster has been configured" "$SLURM_MASTER_SETUP_TASK"
fi

slurmctld && slurmd
check_last_exit_code $? "SLURM daemons have started" "Fail to start SLURM daemons."
slurmctld
check_last_exit_code $? "SLURM control daemon has started" "Fail to start SLURM control daemon"

CP_CAP_SGE_MASTER_CORES="${CP_CAP_SGE_MASTER_CORES:-999999999}"
if [ "$CP_CAP_SGE_MASTER_CORES" != "0" ]; then
slurmd
check_last_exit_code $? "SLURM worker daemon has started" "Fail to start SLURM worker daemon"
fi

pipe_log_success "SLURM master node has been successfully configured" "$SLURM_MASTER_SETUP_TASK"

Expand Down
Loading
Loading