Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lustre jobstats on OzSTAR #47

Merged
merged 4 commits into from
May 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 44 additions & 11 deletions backend/backend_ozstar.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def init(self):

# Lustre jobstats
self.lustre_data = {}
self.lustre_data_rate = {}
self.previous_lustre_ts = time.time()

# Get hostnames for converting lustre client IPs to hostnames
self.get_etc_hostnames()
Expand Down Expand Up @@ -281,9 +283,8 @@ def query_influx_lustre(self):
# jobHarvest already reduces the data, so just query it by job
# the timestamp is the collection time, which is delayed by 20s
query = f'from(bucket: "{influx_config.BUCKET_LUSTRE_JOBSTATS}")\
|> range(start: -{influx_config.LUSTRE_JOBSTATS_DERIVATIVE_WINDOW}s,)\
|> range(start: -50s,)\
|> filter(fn: (r) => r["_field"] == "read_bytes" or r["_field"] == "write_bytes" or r["_field"] == "iops")\
|> derivative(nonNegative: true)\
|> last()\
|> drop(columns: ["_start", "_stop", "_time"])\
|> group(columns: ["job", "fs", "server"])'
Expand Down Expand Up @@ -867,14 +868,16 @@ def job_mem_request(self, job_id):
return job["min_memory_node"]

def job_lustre(self, job_id):
if job_id in self.lustre_data:
return self.lustre_data[job_id]
if job_id in self.lustre_data_rate:
return self.lustre_data_rate[job_id]
else:
return {}

def update_lustre_jobstats(self):
now = time.time()
influx_result = self.query_influx_lustre()
lustre_data = {}
lustre_data_rate = {}

# Jobs with lustre stats found
jobs_with_stats = []
Expand All @@ -890,30 +893,60 @@ def update_lustre_jobstats(self):
if self.job_state(job_id) == "RUNNING":
jobs_with_stats += [job_id]

if job_id not in lustre_data:
lustre_data[job_id] = {}
run_time = self.job_run_time(job_id)

for d in [lustre_data, lustre_data_rate]:
if job_id not in d:
d[job_id] = {}

# Unpack values
for record in table.records:
assert record["job"] == job_id

fs = record.values["fs"]

if fs not in lustre_data[job_id]:
lustre_data[job_id][fs] = {
"mds": {"read_bytes": 0, "write_bytes": 0, "iops": 0},
"oss": {"read_bytes": 0, "write_bytes": 0, "iops": 0},
}
for d in [lustre_data, lustre_data_rate]:
if fs not in d[job_id]:
d[job_id][fs] = {
"mds": {"read_bytes": 0, "write_bytes": 0, "iops": 0},
"oss": {"read_bytes": 0, "write_bytes": 0, "iops": 0},
}

server = record.values["server"]
value = round(record.get_value(), 2)
field = record.get_field()

# Calculate derivative using previous value
if (
job_id in self.lustre_data
and fs in self.lustre_data[job_id]
and server in self.lustre_data[job_id][fs]
and field in self.lustre_data[job_id][fs][server]
):
prev_value = self.lustre_data[job_id][fs][server][field]
time_diff = now - self.previous_lustre_ts
if time_diff > 0:
derivative = (value - prev_value) / time_diff
else:
self.log.error(
f"Time difference between lustre jobstats is {time_diff} seconds"
)
elif run_time <= 1:
# The job has been running for less than a minute
# First value, so approximate derivative using sampling frequency
derivative = value / config.UPDATE_INTERVAL
else:
derivative = 0

lustre_data[job_id][fs][server][field] = value
lustre_data_rate[job_id][fs][server][field] = derivative

self.log.info(
f"Lustre stats found for {len(jobs_with_stats)}/{self.n_running_jobs} jobs"
)
self.lustre_data = lustre_data
self.lustre_data_rate = lustre_data_rate
self.previous_lustre_ts = now

def update_lustre_per_node(self):
influx_result = self.query_influx_lustre_per_node()
Expand Down
Loading