Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rrd improvements #326

Merged
merged 1 commit into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 48 additions & 132 deletions factory/glideFactoryMonitorAggregator.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,14 @@
# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0

#
# Project:
# glideinWMS
#
# File Version:
#
# Description:
# This module implements the functions needed
# to aggregate the monitoring fo the glidein factory
#
# Author:
# Igor Sfiligoi (May 23rd 2007)
#

import copy
import json
import os.path
import pickle
import shutil
import tempfile
import time

from glideinwms.factory import glideFactoryMonitoring
Expand Down Expand Up @@ -89,154 +77,68 @@ def rrd_site(name):
type_strings = {"Status": "Status", "Requested": "Req", "ClientMonitor": "Client"}

##############################################################################
rrd_problems_found = False
# Function used by Factory reconfig/upgrade
# No logging available, output is to stdout/err


def verifyHelper(filename, dict, fix_rrd=False):
"""
Helper function for verifyRRD. Checks one file,
prints out errors. if fix_rrd, will attempt to
dump out rrd to xml, add the missing attributes,
then restore. Original file is obliterated.

@param filename: filename of rrd to check
@param dict: expected dictionary
@param fix_rrd: if true, will attempt to add missing attrs
"""
global rrd_problems_found
if not os.path.exists(filename):
print("WARNING: %s missing, will be created on restart" % (filename))
return
rrd_obj = rrdSupport.rrdSupport()
(missing, extra) = rrd_obj.verify_rrd(filename, dict)
for attr in extra:
print(f"ERROR: {filename} has extra attribute {attr}")
if fix_rrd:
print("ERROR: fix_rrd cannot fix extra attributes")
if not fix_rrd:
for attr in missing:
print(f"ERROR: {filename} missing attribute {attr}")
if len(missing) > 0:
rrd_problems_found = True
if fix_rrd and (len(missing) > 0):
(f, tempfilename) = tempfile.mkstemp()
(out, tempfilename2) = tempfile.mkstemp()
(restored, restoredfilename) = tempfile.mkstemp()
backup_str = str(int(time.time())) + ".backup"
print(f"Fixing {filename}... (backed up to {filename + backup_str})")
os.close(out)
os.close(restored)
os.unlink(restoredfilename)
# Use exe version since dump, restore not available in rrdtool
dump_obj = rrdSupport.rrdtool_exe()
outstr = dump_obj.dump(filename)
for line in outstr:
os.write(f, "%s\n" % line)
os.close(f)
# Move file to backup location
shutil.move(filename, filename + backup_str)
rrdSupport.addDataStore(tempfilename, tempfilename2, missing)
dump_obj.restore(tempfilename2, restoredfilename)
os.unlink(tempfilename)
os.unlink(tempfilename2)
shutil.move(restoredfilename, filename)
if len(extra) > 0:
rrd_problems_found = True


def verifyRRD(fix_rrd=False):
def verifyRRD(fix_rrd=False, backup=True):
"""
Go through all known monitoring rrds and verify that they
match existing schema (could be different if an upgrade happened)
If fix_rrd is true, then also attempt to add any missing attributes.

Args:
fix_rrd (bool): if True, will attempt to add missing attrs
backup (bool): if True, backup the old RRD before fixing

Returns:
bool: True if all OK, False if there is a problem w/ RRD files

"""
global rrd_problems_found
global monitorAggregatorConfig
rrd_problems_found = False
mon_dir = monitorAggregatorConfig.monitor_dir

# Factory monitoring dictionaries
status_dict = {}
completed_stats_dict = {}
completed_waste_dict = {}
counts_dict = {}

# initialize the RRD dictionaries to match the current schema for verification
for tp in list(status_attributes.keys()):
if tp in list(type_strings.keys()):
tp_str = type_strings[tp]
attributes_tp = status_attributes[tp]
for a in attributes_tp:
status_dict[f"{tp_str}{a}"] = None

for jobrange in glideFactoryMonitoring.getAllJobRanges():
completed_stats_dict[f"JobsNr_{jobrange}"] = None
for timerange in glideFactoryMonitoring.getAllTimeRanges():
completed_stats_dict[f"Lasted_{timerange}"] = None
completed_stats_dict[f"JobsLasted_{timerange}"] = None

for jobtype in glideFactoryMonitoring.getAllJobTypes():
for timerange in glideFactoryMonitoring.getAllMillRanges():
completed_waste_dict[f"{jobtype}_{timerange}"] = None

for jobtype in ("Entered", "Exited", "Status"):
for jobstatus in ("Wait", "Idle", "Running", "Held"):
counts_dict[f"{jobtype}{jobstatus}"] = None
for jobstatus in ("Completed", "Removed"):
counts_dict["{}{}".format("Entered", jobstatus)] = None
# FROM: lib2to3.fixes.fix_ws_comma
# completed_waste_dict["%s_%s"%(jobtype, timerange)]=None
#
# for jobtype in ('Entered', 'Exited', 'Status'):
# for jobstatus in ('Wait', 'Idle', 'Running', 'Held'):
# counts_dict["%s%s"%(jobtype, jobstatus)]=None
# for jobstatus in ('Completed', 'Removed'):
# counts_dict["%s%s"%('Entered', jobstatus)]=None
#
# verifyHelper(os.path.join(total_dir,
# "Status_Attributes.rrd"), status_dict, fix_rrd)
# verifyHelper(os.path.join(total_dir,
# "Log_Completed.rrd"),
# glideFactoryMonitoring.getLogCompletedDefaults(), fix_rrd)
# verifyHelper(os.path.join(total_dir,
# "Log_Completed_Stats.rrd"), completed_stats_dict, fix_rrd)
# verifyHelper(os.path.join(total_dir,
# "Log_Completed_WasteTime.rrd"), completed_waste_dict, fix_rrd)
# verifyHelper(os.path.join(total_dir,
# "Log_Counts.rrd"), counts_dict, fix_rrd)
# for filename in os.listdir(dir):
# if filename[:6]=="entry_":
# entrydir=os.path.join(dir, filename)
# for subfilename in os.listdir(entrydir):
# if subfilename[:9]=="frontend_":
# current_dir=os.path.join(entrydir, subfilename)
# verifyHelper(os.path.join(current_dir,
# "Status_Attributes.rrd"), status_dict, fix_rrd)
# verifyHelper(os.path.join(current_dir,
# "Log_Completed.rrd"),
# glideFactoryMonitoring.getLogCompletedDefaults(), fix_rrd)
# verifyHelper(os.path.join(current_dir,
# "Log_Completed_Stats.rrd"), completed_stats_dict, fix_rrd)
# verifyHelper(os.path.join(current_dir,
# "Log_Completed_WasteTime.rrd"),
# completed_waste_dict, fix_rrd)
# verifyHelper(os.path.join(current_dir,
# "Log_Counts.rrd"), counts_dict, fix_rrd)
# return not rrd_problems_found

counts_dict[f"Entered{jobstatus}"] = None
completed_dict = glideFactoryMonitoring.getLogCompletedDefaults()

rrdict = {
"Status_Attributes.rrd": status_dict,
"Log_Completed.rrd": completed_dict,
"Log_Completed_Stats.rrd": completed_stats_dict,
"Log_Completed_WasteTime.rrd": completed_waste_dict,
"Log_Counts.rrd": counts_dict,
}

# check all the existing files
if not os.path.isdir(mon_dir):
print(f"WARNING: monitor directory '{mon_dir}' does not exist, skipping rrd verification.")
return True
for dir_name, sdir_name, f_list in os.walk(mon_dir):
for file_name in f_list:
if file_name in list(rrdict.keys()):
verifyHelper(os.path.join(dir_name, file_name), rrdict[file_name], fix_rrd)

if rrdSupport.verifyHelper(os.path.join(dir_name, file_name), rrdict[file_name], fix_rrd, backup):
rrd_problems_found = True
return not rrd_problems_found


Expand Down Expand Up @@ -280,11 +182,12 @@ def aggregateStatus(in_downtime):
for entry in monitorAggregatorConfig.entries:
# load entry status file
status_fname = os.path.join(
os.path.join(monitorAggregatorConfig.monitor_dir, "entry_" + entry), monitorAggregatorConfig.status_relname
monitorAggregatorConfig.monitor_dir, f"entry_{entry}", monitorAggregatorConfig.status_relname
)
# load entry completed data file
completed_data_fname = os.path.join(
os.path.join(monitorAggregatorConfig.monitor_dir, "entry_" + entry),
monitorAggregatorConfig.monitor_dir,
f"entry_{entry}",
monitorAggregatorConfig.completed_data_relname,
)
completed_data_fp = None
Expand Down Expand Up @@ -377,7 +280,7 @@ def aggregateStatus(in_downtime):
else:
# All other fields could be numbers or something else
try:
# if there already, sum
# if is there already, sum
if a in tela:
tela[a] += int(ela[a])
else:
Expand Down Expand Up @@ -501,11 +404,13 @@ def aggregateStatus(in_downtime):
a_el = int(tp_el[a])
val_dict[f"{tp_str}{a}"] = a_el

glideFactoryMonitoring.monitoringConfig.write_rrd_multi("total/Status_Attributes", "GAUGE", updated, val_dict)
glideFactoryMonitoring.monitoringConfig.write_rrd_multi(
os.path.join("total", "Status_Attributes"), "GAUGE", updated, val_dict
)

# Frontend total rrds across all factories
for fe in list(status_fe["frontends"].keys()):
glideFactoryMonitoring.monitoringConfig.establish_dir("total/%s" % ("frontend_" + fe))
glideFactoryMonitoring.monitoringConfig.establish_dir(os.path.join("total", f"frontend_{fe}"))
for tp in list(status_fe["frontends"][fe].keys()):
# values (RRD type) - Status or Requested
if not (tp in list(type_strings.keys())):
Expand All @@ -520,7 +425,7 @@ def aggregateStatus(in_downtime):
a_el = int(tp_el[a])
val_dict[f"{tp_str}{a}"] = a_el
glideFactoryMonitoring.monitoringConfig.write_rrd_multi(
"total/%s/Status_Attributes" % ("frontend_" + fe), "GAUGE", updated, val_dict
os.path.join("total", f"frontend_{fe}", "Status_Attributes"), "GAUGE", updated, val_dict
)

return status
Expand All @@ -547,13 +452,20 @@ def aggregateJobsSummary():
for entry in monitorAggregatorConfig.entries:
# load entry log summary file
status_fname = os.path.join(
os.path.join(monitorAggregatorConfig.monitor_dir, "entry_" + entry),
monitorAggregatorConfig.monitor_dir,
f"entry_{entry}",
monitorAggregatorConfig.jobsummary_relname,
)
try:
with open(status_fname, "rb") as fd:
entry_joblist = pickle.load(fd)
except OSError:
# Errors with the file, e.g. FileNotFoundError, IsADirectoryError, PermissionError
logSupport.log.debug(f"Missing file {status_fname}: ignoring and continuing")
continue
except (EOFError, pickle.UnpicklingError):
# Errors with the file content
logSupport.log.debug(f"Empty or corrupted pickle file {status_fname}: ignoring and continuing")
continue
schedd_name = entry_joblist.get("schedd_name", None)
pool_name = entry_joblist.get("collector_name", None)
Expand Down Expand Up @@ -631,13 +543,15 @@ def aggregateLogSummary():
for entry in monitorAggregatorConfig.entries:
# load entry log summary file
status_fname = os.path.join(
os.path.join(monitorAggregatorConfig.monitor_dir, "entry_" + entry),
monitorAggregatorConfig.monitor_dir,
f"entry_{entry}",
monitorAggregatorConfig.logsummary_relname,
)

try:
entry_data = xmlParse.xmlfile2dict(status_fname, always_singular_list=["Fraction", "TimeRange", "Range"])
except OSError:
logSupport.log.debug(f"Missing file {status_fname}: ignoring and continuing")
continue # file not found, ignore

# update entry
Expand Down Expand Up @@ -913,16 +827,18 @@ def aggregateRRDStats(log=logSupport.log):
# assigns the data from every site to 'stats'
stats = {}
for entry in monitorAggregatorConfig.entries:
rrd_fname = os.path.join(os.path.join(monitorAggregatorConfig.monitor_dir, "entry_" + entry), rrd_site(rrd))
rrd_fname = os.path.join(monitorAggregatorConfig.monitor_dir, f"entry_{entry}", rrd_site(rrd))
try:
stats[entry] = xmlParse.xmlfile2dict(rrd_fname, always_singular_list={"timezone": {}})
except FileNotFoundError:
log.debug(
f"aggregateRRDStats {rrd_fname} exception: parse_xml, IOError, File not found (OK if first time)"
)
except OSError:
if os.path.exists(rrd_fname):
log.debug("aggregateRRDStats %s exception: parse_xml, IOError" % rrd_fname)
else:
log.debug(f"aggregateRRDStats {rrd_fname} exception: parse_xml, IOError")
if not os.path.exists(rrd_fname):
log.debug(
"aggregateRRDStats %s exception: parse_xml, IOError, File not existing (OK if first time)"
% rrd_fname
f"aggregateRRDStats {rrd_fname} exception: parse_xml, IOError, File not found (OK if first time) - should have been FileNotFoundError"
)

stats_entries = list(stats.keys())
Expand Down
Loading
Loading