Skip to content

Commit f5c3cae

Browse files
Rextilne
authored andcommitted
Slurm plugin: add logic to prevent scale up and self-terminate POWER_SAVE instances when clustermgtd is down
* Computemgtd: Add logic to consider self node as down if node appears to be in power saving. When a node is in appears to be in power saving but the backing instance is up the node is not correctly attached to the scheduler. Computemgtd should consider self node as node and self-terminate the instance in this case * ResumeProgram: Add logic to prevent launching new instances when unable to retrieve relevant clustermgtd heartbeat * SuspendProgram: Print additional warning message when unable to retrieve relevant clustermgtd heartbeat * Fix unit tests for above changes Signed-off-by: Rex <shuningc@amazon.com>
1 parent d338e38 commit f5c3cae

File tree

14 files changed

+208
-99
lines changed

14 files changed

+208
-99
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ This file is used to list changes made in each version of the aws-parallelcluste
88

99
**CHANGES**
1010
- Use inclusive language in internal naming convention.
11+
- Improve error handling in slurm plugin processes when clustermgtd is down.
1112

1213
2.10.0
1314
-----

src/slurm_plugin/clustermgtd.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,8 @@ def manage_cluster(self):
432432

433433
def _write_timestamp_to_file(self):
434434
"""Write timestamp into shared file so compute nodes can determine if head node is online."""
435-
with open(self._config.heartbeat_file_path, "w") as timestamp_file:
435+
# Make clustermgtd heartbeat readable to all users
436+
with open(os.open(self._config.heartbeat_file_path, os.O_WRONLY | os.O_CREAT, 0o644), "w") as timestamp_file:
436437
# Note: heartbeat must be written with datetime.strftime to convert localized datetime into str
437438
# datetime.strptime will not work with str(datetime)
438439
timestamp_file.write(datetime.now(tz=timezone.utc).strftime(TIMESTAMP_FORMAT))

src/slurm_plugin/common.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import json
1616
import logging
1717
import subprocess
18-
from datetime import timezone
18+
from datetime import datetime, timezone
1919

2020
import boto3
2121
from botocore.exceptions import ClientError
@@ -422,3 +422,35 @@ def retrieve_instance_type_mapping(file_path):
422422
e,
423423
)
424424
raise
425+
426+
427+
def _get_clustermgtd_heartbeat(clustermgtd_heartbeat_file_path):
428+
"""Get clustermgtd's last heartbeat."""
429+
with open(clustermgtd_heartbeat_file_path, "r") as timestamp_file:
430+
# Note: heartbeat must be written with datetime.strftime to convert localized datetime into str
431+
# datetime.strptime will not work with str(datetime)
432+
# Example timestamp written to heartbeat file: 2020-07-30 19:34:02.613338+00:00
433+
return datetime.strptime(timestamp_file.read().strip(), TIMESTAMP_FORMAT)
434+
435+
436+
def _expired_clustermgtd_heartbeat(last_heartbeat, current_time, clustermgtd_timeout):
437+
"""Test if clustermgtd heartbeat is expired."""
438+
if time_is_up(last_heartbeat, current_time, clustermgtd_timeout):
439+
logger.error(
440+
"Clustermgtd has been offline since %s. Current time is %s. Timeout of %s seconds has expired!",
441+
last_heartbeat,
442+
current_time,
443+
clustermgtd_timeout,
444+
)
445+
return True
446+
return False
447+
448+
449+
def is_clustermgtd_heartbeat_valid(current_time, clustermgtd_timeout, clustermgtd_heartbeat_file_path):
450+
try:
451+
last_heartbeat = _get_clustermgtd_heartbeat(clustermgtd_heartbeat_file_path)
452+
logger.info("Latest heartbeat from clustermgtd: %s", last_heartbeat)
453+
return not _expired_clustermgtd_heartbeat(last_heartbeat, current_time, clustermgtd_timeout)
454+
except Exception as e:
455+
logger.error("Unable to retrieve clustermgtd heartbeat with exception: %s", e)
456+
return False

src/slurm_plugin/computemgtd.py

Lines changed: 21 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import logging
1414
import os
15+
import time
1516
from datetime import datetime, timezone
1617
from logging.config import fileConfig
1718
from subprocess import CalledProcessError
@@ -23,7 +24,7 @@
2324
from common.schedulers.slurm_commands import get_nodes_info
2425
from common.time_utils import seconds
2526
from common.utils import check_command_output, sleep_remaining_loop_time
26-
from slurm_plugin.common import CONFIG_FILE_DIR, TIMESTAMP_FORMAT, InstanceManager, log_exception, time_is_up
27+
from slurm_plugin.common import CONFIG_FILE_DIR, InstanceManager, is_clustermgtd_heartbeat_valid, log_exception
2728

2829
LOOP_TIME = 60
2930
RELOAD_CONFIG_ITERATIONS = 10
@@ -38,8 +39,8 @@ class ComputemgtdConfig:
3839
"max_retry": 1,
3940
"loop_time": LOOP_TIME,
4041
"proxy": "NONE",
41-
"clustermgtd_timeout": 600,
4242
"disable_computemgtd_actions": False,
43+
"clustermgtd_timeout": 600,
4344
"slurm_nodename_file": os.path.join(CONFIG_FILE_DIR, "slurm_nodename"),
4445
"logging_config": os.path.join(
4546
os.path.dirname(__file__), "logging", "parallelcluster_computemgtd_logging.conf"
@@ -61,7 +62,7 @@ def _get_config(self, config_file_path):
6162
try:
6263
config.read_file(open(config_file_path, "r"))
6364
except IOError:
64-
log.error(f"Cannot read cluster manager configuration file: {config_file_path}")
65+
log.error(f"Cannot read computemgtd configuration file: {config_file_path}")
6566
raise
6667

6768
# Get config settings
@@ -115,51 +116,34 @@ def _self_terminate(computemgtd_config):
115116
computemgtd_config.region, computemgtd_config.cluster_name, computemgtd_config.boto3_config
116117
)
117118
self_instance_id = check_command_output("curl -s http://169.254.169.254/latest/meta-data/instance-id", shell=True)
119+
# Sleep for 10 seconds so termination log entries are uploaded to CW logs
120+
log.info("Prepaing to self terminate the instance %s in 10 seconds!", self_instance_id)
121+
time.sleep(10)
118122
log.info("Self terminating instance %s now!", self_instance_id)
119123
instance_manager.delete_instances([self_instance_id], terminate_batch_size=1)
120124

121125

122-
def _get_clustermgtd_heartbeat(clustermgtd_heartbeat_file_path):
123-
"""Get clustermgtd's last heartbeat."""
124-
with open(clustermgtd_heartbeat_file_path, "r") as timestamp_file:
125-
# Note: heartbeat must be written with datetime.strftime to convert localized datetime into str
126-
# datetime.strptime will not work with str(datetime)
127-
# Example timestamp written to heartbeat file: 2020-07-30 19:34:02.613338+00:00
128-
return datetime.strptime(timestamp_file.read().strip(), TIMESTAMP_FORMAT)
129-
130-
131-
def _expired_clustermgtd_heartbeat(last_heartbeat, current_time, clustermgtd_timeout):
132-
"""Test if clustermgtd heartbeat is expired."""
133-
if time_is_up(last_heartbeat, current_time, clustermgtd_timeout):
134-
log.error(
135-
"Clustermgtd has been offline since %s. Current time is %s. Timeout of %s seconds has expired!",
136-
last_heartbeat,
137-
current_time,
138-
clustermgtd_timeout,
139-
)
140-
return True
141-
return False
142-
143-
144126
@retry(stop_max_attempt_number=3, wait_fixed=1500)
145127
def _get_nodes_info_with_retry(nodes):
146128
return get_nodes_info(nodes)
147129

148130

149131
def _is_self_node_down(self_nodename):
150132
"""
151-
Check if self node is down in slurm.
133+
Check if self node is healthy according to the scheduler.
152134
153-
This check prevents termination of a node that is still well-attached to the scheduler.
154-
Note: node that is not attached to the scheduler will be in DOWN* after SlurmdTimeout.
135+
Node is considered healthy if:
136+
1. Node is not in DOWN
137+
2. Node is not in POWER_SAVE
138+
Note: node that is incorrectly attached to the scheduler will be in DOWN* after SlurmdTimeout.
155139
"""
156140
try:
157141
self_node = _get_nodes_info_with_retry(self_nodename)[0]
158142
log.info("Current self node state %s", self_node.__repr__())
159-
if self_node.is_down():
160-
log.warning("Node is in DOWN state, preparing for self termination...")
143+
if self_node.is_down() or self_node.is_power():
144+
log.warning("Node is incorrectly attached to scheduler, preparing for self termination...")
161145
return True
162-
log.info("Node is not in a DOWN state and correctly attached to scheduler, not terminating...")
146+
log.info("Node is correctly attached to scheduler, not terminating...")
163147
return False
164148
except Exception as e:
165149
# This could happen is slurmctld is down completely
@@ -168,13 +152,6 @@ def _is_self_node_down(self_nodename):
168152
return True
169153

170154

171-
def _fail_self_check(last_heartbeat, current_time, computemgtd_config):
172-
"""Determine if self checks are failing and if the node should self-terminate."""
173-
return _expired_clustermgtd_heartbeat(
174-
last_heartbeat, current_time, computemgtd_config.clustermgtd_timeout
175-
) and _is_self_node_down(computemgtd_config.nodename)
176-
177-
178155
def _load_daemon_config():
179156
# Get program config
180157
computemgtd_config = ComputemgtdConfig(os.path.join(COMPUTEMGTD_CONFIG_PATH))
@@ -208,17 +185,15 @@ def _run_computemgtd():
208185
reload_config_counter -= 1
209186

210187
# Check heartbeat
211-
try:
212-
last_heartbeat = _get_clustermgtd_heartbeat(computemgtd_config.clustermgtd_heartbeat_file_path)
213-
log.info("Latest heartbeat from clustermgtd: %s", last_heartbeat)
214-
except Exception as e:
215-
log.error("Unable to retrieve clustermgtd heartbeat with exception: %s", e)
216-
finally:
188+
if not is_clustermgtd_heartbeat_valid(
189+
current_time, computemgtd_config.clustermgtd_timeout, computemgtd_config.clustermgtd_heartbeat_file_path
190+
):
217191
if computemgtd_config.disable_computemgtd_actions:
218192
log.info("All computemgtd actions currently disabled")
219-
elif _fail_self_check(last_heartbeat, current_time, computemgtd_config):
193+
elif _is_self_node_down(computemgtd_config.nodename):
220194
_self_terminate(computemgtd_config)
221-
sleep_remaining_loop_time(computemgtd_config.loop_time, current_time)
195+
196+
sleep_remaining_loop_time(computemgtd_config.loop_time, current_time)
222197

223198

224199
@retry(wait_fixed=seconds(LOOP_TIME))

src/slurm_plugin/resume.py

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,21 @@
1212

1313
import logging
1414
import os
15+
from datetime import datetime, timezone
1516
from logging.config import fileConfig
1617

1718
import argparse
1819
from botocore.config import Config
1920
from configparser import ConfigParser
2021

2122
from common.schedulers.slurm_commands import get_nodes_info, set_nodes_down
22-
from slurm_plugin.common import CONFIG_FILE_DIR, InstanceManager, print_with_count, retrieve_instance_type_mapping
23+
from slurm_plugin.common import (
24+
CONFIG_FILE_DIR,
25+
InstanceManager,
26+
is_clustermgtd_heartbeat_valid,
27+
print_with_count,
28+
retrieve_instance_type_mapping,
29+
)
2330

2431
log = logging.getLogger(__name__)
2532

@@ -29,6 +36,7 @@ class SlurmResumeConfig:
2936
"max_retry": 1,
3037
"max_batch_size": 500,
3138
"update_node_address": True,
39+
"clustermgtd_timeout": 300,
3240
"proxy": "NONE",
3341
"logging_config": os.path.join(os.path.dirname(__file__), "logging", "parallelcluster_resume_logging.conf"),
3442
"hosted_zone": None,
@@ -79,6 +87,12 @@ def _get_config(self, config_file_path):
7987
"slurm_resume", "instance_type_mapping", fallback=self.DEFAULTS.get("instance_type_mapping")
8088
)
8189
self.instance_name_type_mapping = retrieve_instance_type_mapping(instance_name_type_mapping_file)
90+
self.clustermgtd_timeout = config.getint(
91+
"slurm_resume",
92+
"clustermgtd_timeout",
93+
fallback=self.DEFAULTS.get("clustermgtd_timeout"),
94+
)
95+
self.clustermgtd_heartbeat_file_path = config.get("slurm_resume", "clustermgtd_heartbeat_file_path")
8296

8397
# Configure boto3 to retry 1 times by default
8498
self._boto3_retry = config.getint("slurm_resume", "boto3_retry", fallback=self.DEFAULTS.get("max_retry"))
@@ -115,6 +129,19 @@ def _handle_failed_nodes(node_list):
115129

116130
def _resume(arg_nodes, resume_config):
117131
"""Launch new EC2 nodes according to nodes requested by slurm."""
132+
# Check heartbeat
133+
current_time = datetime.now(tz=timezone.utc)
134+
if not is_clustermgtd_heartbeat_valid(
135+
current_time, resume_config.clustermgtd_timeout, resume_config.clustermgtd_heartbeat_file_path
136+
):
137+
log.error(
138+
"No valid clustermgtd heartbeat detected, clustermgtd is down!\n"
139+
"Please check clustermgtd log for error.\n"
140+
"Not launching nodes %s",
141+
arg_nodes,
142+
)
143+
_handle_failed_nodes(arg_nodes)
144+
return
118145
log.info("Launching EC2 instances for the following Slurm nodes: %s", arg_nodes)
119146
node_list = [node.name for node in get_nodes_info(arg_nodes)]
120147
log.debug("Retrieved nodelist: %s", node_list)
@@ -148,6 +175,13 @@ def _resume(arg_nodes, resume_config):
148175

149176

150177
def main():
178+
default_log_file = "/var/log/parallelcluster/slurm_resume.log"
179+
logging.basicConfig(
180+
filename=default_log_file,
181+
level=logging.INFO,
182+
format="%(asctime)s - [%(name)s:%(funcName)s] - %(levelname)s - %(message)s",
183+
)
184+
log.info("ResumeProgram startup.")
151185
parser = argparse.ArgumentParser()
152186
parser.add_argument("nodes", help="Nodes to burst")
153187
args = parser.parse_args()
@@ -157,12 +191,6 @@ def main():
157191
# Configure root logger
158192
fileConfig(resume_config.logging_config, disable_existing_loggers=False)
159193
except Exception as e:
160-
default_log_file = "/var/log/parallelcluster/slurm_resume.log"
161-
logging.basicConfig(
162-
filename=default_log_file,
163-
level=logging.INFO,
164-
format="%(asctime)s - [%(name)s:%(funcName)s] - %(levelname)s - %(message)s",
165-
)
166194
log.warning(
167195
"Unable to configure logging from %s, using default settings and writing to %s.\nException: %s",
168196
resume_config.logging_config,

src/slurm_plugin/suspend.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,20 @@
1212

1313
import logging
1414
import os
15+
from datetime import datetime, timezone
1516
from logging.config import fileConfig
1617

1718
import argparse
1819
from configparser import ConfigParser
1920

20-
from slurm_plugin.common import CONFIG_FILE_DIR
21+
from slurm_plugin.common import CONFIG_FILE_DIR, is_clustermgtd_heartbeat_valid
2122

2223
log = logging.getLogger(__name__)
2324

2425

2526
class SlurmSuspendConfig:
2627
DEFAULTS = {
28+
"clustermgtd_timeout": 300,
2729
"logging_config": os.path.join(os.path.dirname(__file__), "logging", "parallelcluster_suspend_logging.conf"),
2830
}
2931

@@ -35,13 +37,26 @@ def __init__(self, config_file_path):
3537
log.error(f"Cannot read slurm cloud bursting scripts configuration file: {config_file_path}")
3638
raise
3739

40+
self.clustermgtd_timeout = config.getint(
41+
"slurm_suspend",
42+
"clustermgtd_timeout",
43+
fallback=self.DEFAULTS.get("clustermgtd_timeout"),
44+
)
45+
self.clustermgtd_heartbeat_file_path = config.get("slurm_suspend", "clustermgtd_heartbeat_file_path")
3846
self.logging_config = config.get(
3947
"slurm_suspend", "logging_config", fallback=self.DEFAULTS.get("logging_config")
4048
)
4149
log.info(self.__repr__())
4250

4351

4452
def main():
53+
default_log_file = "/var/log/parallelcluster/slurm_suspend.log"
54+
logging.basicConfig(
55+
filename=default_log_file,
56+
level=logging.INFO,
57+
format="%(asctime)s - [%(name)s:%(funcName)s] - %(levelname)s - %(message)s",
58+
)
59+
log.info("SuspendProgram startup.")
4560
parser = argparse.ArgumentParser()
4661
parser.add_argument("nodes", help="Nodes to release")
4762
args = parser.parse_args()
@@ -50,20 +65,27 @@ def main():
5065
# Configure root logger
5166
fileConfig(suspend_config.logging_config, disable_existing_loggers=False)
5267
except Exception as e:
53-
default_log_file = "/var/log/parallelcluster/slurm_suspend.log"
54-
logging.basicConfig(
55-
filename=default_log_file,
56-
level=logging.INFO,
57-
format="%(asctime)s - [%(name)s:%(funcName)s] - %(levelname)s - %(message)s",
58-
)
5968
log.warning(
6069
"Unable to configure logging from %s, using default settings and writing to %s.\nException: %s",
6170
suspend_config.logging_config,
6271
default_log_file,
6372
e,
6473
)
74+
6575
log.info("Suspending following nodes. Clustermgtd will cleanup orphaned instances: %s", args.nodes)
66-
log.info("SuspendProgram finished. Nodes will be available after SuspendTimeout")
76+
current_time = datetime.now(tz=timezone.utc)
77+
if not is_clustermgtd_heartbeat_valid(
78+
current_time, suspend_config.clustermgtd_timeout, suspend_config.clustermgtd_heartbeat_file_path
79+
):
80+
log.error(
81+
"No valid clustermgtd heartbeat detected, clustermgtd is down! "
82+
"Please check clustermgtd log for error.\n"
83+
"Nodes will be reset to POWER_SAVE state after SuspendTimeout. "
84+
"The backing EC2 instances may not be correctly terminated.\n"
85+
"Please check and terminate any orphaned instances in EC2!"
86+
)
87+
else:
88+
log.info("SuspendProgram finished. Nodes will be available after SuspendTimeout")
6789

6890

6991
if __name__ == "__main__":

0 commit comments

Comments
 (0)