diff --git a/factory/glideFactoryMonitorAggregator.py b/factory/glideFactoryMonitorAggregator.py index 05f2a04ce..814e455d4 100644 --- a/factory/glideFactoryMonitorAggregator.py +++ b/factory/glideFactoryMonitorAggregator.py @@ -1,26 +1,14 @@ # SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC # SPDX-License-Identifier: Apache-2.0 -# -# Project: -# glideinWMS -# -# File Version: -# # Description: # This module implements the functions needed # to aggregate the monitoring fo the glidein factory -# -# Author: -# Igor Sfiligoi (May 23rd 2007) -# import copy import json import os.path import pickle -import shutil -import tempfile import time from glideinwms.factory import glideFactoryMonitoring @@ -89,76 +77,31 @@ def rrd_site(name): type_strings = {"Status": "Status", "Requested": "Req", "ClientMonitor": "Client"} ############################################################################## -rrd_problems_found = False +# Function used by Factory reconfig/upgrade +# No logging available, output is to stdout/err -def verifyHelper(filename, dict, fix_rrd=False): - """ - Helper function for verifyRRD. Checks one file, - prints out errors. if fix_rrd, will attempt to - dump out rrd to xml, add the missing attributes, - then restore. Original file is obliterated. - - @param filename: filename of rrd to check - @param dict: expected dictionary - @param fix_rrd: if true, will attempt to add missing attrs - """ - global rrd_problems_found - if not os.path.exists(filename): - print("WARNING: %s missing, will be created on restart" % (filename)) - return - rrd_obj = rrdSupport.rrdSupport() - (missing, extra) = rrd_obj.verify_rrd(filename, dict) - for attr in extra: - print(f"ERROR: {filename} has extra attribute {attr}") - if fix_rrd: - print("ERROR: fix_rrd cannot fix extra attributes") - if not fix_rrd: - for attr in missing: - print(f"ERROR: {filename} missing attribute {attr}") - if len(missing) > 0: - rrd_problems_found = True - if fix_rrd and (len(missing) > 0): - (f, tempfilename) = tempfile.mkstemp() - (out, tempfilename2) = tempfile.mkstemp() - (restored, restoredfilename) = tempfile.mkstemp() - backup_str = str(int(time.time())) + ".backup" - print(f"Fixing {filename}... (backed up to {filename + backup_str})") - os.close(out) - os.close(restored) - os.unlink(restoredfilename) - # Use exe version since dump, restore not available in rrdtool - dump_obj = rrdSupport.rrdtool_exe() - outstr = dump_obj.dump(filename) - for line in outstr: - os.write(f, "%s\n" % line) - os.close(f) - # Move file to backup location - shutil.move(filename, filename + backup_str) - rrdSupport.addDataStore(tempfilename, tempfilename2, missing) - dump_obj.restore(tempfilename2, restoredfilename) - os.unlink(tempfilename) - os.unlink(tempfilename2) - shutil.move(restoredfilename, filename) - if len(extra) > 0: - rrd_problems_found = True - - -def verifyRRD(fix_rrd=False): +def verifyRRD(fix_rrd=False, backup=True): """ Go through all known monitoring rrds and verify that they match existing schema (could be different if an upgrade happened) If fix_rrd is true, then also attempt to add any missing attributes. + + Args: + fix_rrd (bool): if True, will attempt to add missing attrs + backup (bool): if True, backup the old RRD before fixing + + Returns: + bool: True if all OK, False if there is a problem w/ RRD files + """ - global rrd_problems_found - global monitorAggregatorConfig + rrd_problems_found = False mon_dir = monitorAggregatorConfig.monitor_dir - + # Factory monitoring dictionaries status_dict = {} completed_stats_dict = {} completed_waste_dict = {} counts_dict = {} - # initialize the RRD dictionaries to match the current schema for verification for tp in list(status_attributes.keys()): if tp in list(type_strings.keys()): @@ -166,64 +109,20 @@ def verifyRRD(fix_rrd=False): attributes_tp = status_attributes[tp] for a in attributes_tp: status_dict[f"{tp_str}{a}"] = None - for jobrange in glideFactoryMonitoring.getAllJobRanges(): completed_stats_dict[f"JobsNr_{jobrange}"] = None for timerange in glideFactoryMonitoring.getAllTimeRanges(): completed_stats_dict[f"Lasted_{timerange}"] = None completed_stats_dict[f"JobsLasted_{timerange}"] = None - for jobtype in glideFactoryMonitoring.getAllJobTypes(): for timerange in glideFactoryMonitoring.getAllMillRanges(): completed_waste_dict[f"{jobtype}_{timerange}"] = None - for jobtype in ("Entered", "Exited", "Status"): for jobstatus in ("Wait", "Idle", "Running", "Held"): counts_dict[f"{jobtype}{jobstatus}"] = None for jobstatus in ("Completed", "Removed"): - counts_dict["{}{}".format("Entered", jobstatus)] = None - # FROM: lib2to3.fixes.fix_ws_comma - # completed_waste_dict["%s_%s"%(jobtype, timerange)]=None - # - # for jobtype in ('Entered', 'Exited', 'Status'): - # for jobstatus in ('Wait', 'Idle', 'Running', 'Held'): - # counts_dict["%s%s"%(jobtype, jobstatus)]=None - # for jobstatus in ('Completed', 'Removed'): - # counts_dict["%s%s"%('Entered', jobstatus)]=None - # - # verifyHelper(os.path.join(total_dir, - # "Status_Attributes.rrd"), status_dict, fix_rrd) - # verifyHelper(os.path.join(total_dir, - # "Log_Completed.rrd"), - # glideFactoryMonitoring.getLogCompletedDefaults(), fix_rrd) - # verifyHelper(os.path.join(total_dir, - # "Log_Completed_Stats.rrd"), completed_stats_dict, fix_rrd) - # verifyHelper(os.path.join(total_dir, - # "Log_Completed_WasteTime.rrd"), completed_waste_dict, fix_rrd) - # verifyHelper(os.path.join(total_dir, - # "Log_Counts.rrd"), counts_dict, fix_rrd) - # for filename in os.listdir(dir): - # if filename[:6]=="entry_": - # entrydir=os.path.join(dir, filename) - # for subfilename in os.listdir(entrydir): - # if subfilename[:9]=="frontend_": - # current_dir=os.path.join(entrydir, subfilename) - # verifyHelper(os.path.join(current_dir, - # "Status_Attributes.rrd"), status_dict, fix_rrd) - # verifyHelper(os.path.join(current_dir, - # "Log_Completed.rrd"), - # glideFactoryMonitoring.getLogCompletedDefaults(), fix_rrd) - # verifyHelper(os.path.join(current_dir, - # "Log_Completed_Stats.rrd"), completed_stats_dict, fix_rrd) - # verifyHelper(os.path.join(current_dir, - # "Log_Completed_WasteTime.rrd"), - # completed_waste_dict, fix_rrd) - # verifyHelper(os.path.join(current_dir, - # "Log_Counts.rrd"), counts_dict, fix_rrd) - # return not rrd_problems_found - + counts_dict[f"Entered{jobstatus}"] = None completed_dict = glideFactoryMonitoring.getLogCompletedDefaults() - rrdict = { "Status_Attributes.rrd": status_dict, "Log_Completed.rrd": completed_dict, @@ -231,12 +130,15 @@ def verifyRRD(fix_rrd=False): "Log_Completed_WasteTime.rrd": completed_waste_dict, "Log_Counts.rrd": counts_dict, } - + # check all the existing files + if not os.path.isdir(mon_dir): + print(f"WARNING: monitor directory '{mon_dir}' does not exist, skipping rrd verification.") + return True for dir_name, sdir_name, f_list in os.walk(mon_dir): for file_name in f_list: if file_name in list(rrdict.keys()): - verifyHelper(os.path.join(dir_name, file_name), rrdict[file_name], fix_rrd) - + if rrdSupport.verifyHelper(os.path.join(dir_name, file_name), rrdict[file_name], fix_rrd, backup): + rrd_problems_found = True return not rrd_problems_found @@ -280,11 +182,12 @@ def aggregateStatus(in_downtime): for entry in monitorAggregatorConfig.entries: # load entry status file status_fname = os.path.join( - os.path.join(monitorAggregatorConfig.monitor_dir, "entry_" + entry), monitorAggregatorConfig.status_relname + monitorAggregatorConfig.monitor_dir, f"entry_{entry}", monitorAggregatorConfig.status_relname ) # load entry completed data file completed_data_fname = os.path.join( - os.path.join(monitorAggregatorConfig.monitor_dir, "entry_" + entry), + monitorAggregatorConfig.monitor_dir, + f"entry_{entry}", monitorAggregatorConfig.completed_data_relname, ) completed_data_fp = None @@ -377,7 +280,7 @@ def aggregateStatus(in_downtime): else: # All other fields could be numbers or something else try: - # if there already, sum + # if is there already, sum if a in tela: tela[a] += int(ela[a]) else: @@ -501,11 +404,13 @@ def aggregateStatus(in_downtime): a_el = int(tp_el[a]) val_dict[f"{tp_str}{a}"] = a_el - glideFactoryMonitoring.monitoringConfig.write_rrd_multi("total/Status_Attributes", "GAUGE", updated, val_dict) + glideFactoryMonitoring.monitoringConfig.write_rrd_multi( + os.path.join("total", "Status_Attributes"), "GAUGE", updated, val_dict + ) # Frontend total rrds across all factories for fe in list(status_fe["frontends"].keys()): - glideFactoryMonitoring.monitoringConfig.establish_dir("total/%s" % ("frontend_" + fe)) + glideFactoryMonitoring.monitoringConfig.establish_dir(os.path.join("total", f"frontend_{fe}")) for tp in list(status_fe["frontends"][fe].keys()): # values (RRD type) - Status or Requested if not (tp in list(type_strings.keys())): @@ -520,7 +425,7 @@ def aggregateStatus(in_downtime): a_el = int(tp_el[a]) val_dict[f"{tp_str}{a}"] = a_el glideFactoryMonitoring.monitoringConfig.write_rrd_multi( - "total/%s/Status_Attributes" % ("frontend_" + fe), "GAUGE", updated, val_dict + os.path.join("total", f"frontend_{fe}", "Status_Attributes"), "GAUGE", updated, val_dict ) return status @@ -547,13 +452,20 @@ def aggregateJobsSummary(): for entry in monitorAggregatorConfig.entries: # load entry log summary file status_fname = os.path.join( - os.path.join(monitorAggregatorConfig.monitor_dir, "entry_" + entry), + monitorAggregatorConfig.monitor_dir, + f"entry_{entry}", monitorAggregatorConfig.jobsummary_relname, ) try: with open(status_fname, "rb") as fd: entry_joblist = pickle.load(fd) except OSError: + # Errors with the file, e.g. FileNotFoundError, IsADirectoryError, PermissionError + logSupport.log.debug(f"Missing file {status_fname}: ignoring and continuing") + continue + except (EOFError, pickle.UnpicklingError): + # Errors with the file content + logSupport.log.debug(f"Empty or corrupted pickle file {status_fname}: ignoring and continuing") continue schedd_name = entry_joblist.get("schedd_name", None) pool_name = entry_joblist.get("collector_name", None) @@ -631,13 +543,15 @@ def aggregateLogSummary(): for entry in monitorAggregatorConfig.entries: # load entry log summary file status_fname = os.path.join( - os.path.join(monitorAggregatorConfig.monitor_dir, "entry_" + entry), + monitorAggregatorConfig.monitor_dir, + f"entry_{entry}", monitorAggregatorConfig.logsummary_relname, ) try: entry_data = xmlParse.xmlfile2dict(status_fname, always_singular_list=["Fraction", "TimeRange", "Range"]) except OSError: + logSupport.log.debug(f"Missing file {status_fname}: ignoring and continuing") continue # file not found, ignore # update entry @@ -913,16 +827,18 @@ def aggregateRRDStats(log=logSupport.log): # assigns the data from every site to 'stats' stats = {} for entry in monitorAggregatorConfig.entries: - rrd_fname = os.path.join(os.path.join(monitorAggregatorConfig.monitor_dir, "entry_" + entry), rrd_site(rrd)) + rrd_fname = os.path.join(monitorAggregatorConfig.monitor_dir, f"entry_{entry}", rrd_site(rrd)) try: stats[entry] = xmlParse.xmlfile2dict(rrd_fname, always_singular_list={"timezone": {}}) + except FileNotFoundError: + log.debug( + f"aggregateRRDStats {rrd_fname} exception: parse_xml, IOError, File not found (OK if first time)" + ) except OSError: - if os.path.exists(rrd_fname): - log.debug("aggregateRRDStats %s exception: parse_xml, IOError" % rrd_fname) - else: + log.debug(f"aggregateRRDStats {rrd_fname} exception: parse_xml, IOError") + if not os.path.exists(rrd_fname): log.debug( - "aggregateRRDStats %s exception: parse_xml, IOError, File not existing (OK if first time)" - % rrd_fname + f"aggregateRRDStats {rrd_fname} exception: parse_xml, IOError, File not found (OK if first time) - should have been FileNotFoundError" ) stats_entries = list(stats.keys()) diff --git a/factory/glideFactoryMonitoring.py b/factory/glideFactoryMonitoring.py index 62a055a8f..47fcee987 100644 --- a/factory/glideFactoryMonitoring.py +++ b/factory/glideFactoryMonitoring.py @@ -1,19 +1,9 @@ # SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC # SPDX-License-Identifier: Apache-2.0 -# -# Project: -# glideinWMS -# -# File Version: -# # Description: # This module implements the functions needed # to monitor the glidein factory -# -# Author: -# Igor Sfiligoi (Dec 11th 2006) -# import copy import json @@ -193,8 +183,8 @@ def write_completed_json(self, relative_fname, time, val_dict): def establish_dir(self, relative_dname): dname = os.path.join(self.monitor_dir, relative_dname) - if not os.path.isdir(dname): - os.mkdir(dname) + # make a directory, its parents and rise no exception if already there + os.makedirs(dname, exist_ok=True) return def write_rrd_multi(self, relative_fname, ds_type, time, val_dict, min_val=None, max_val=None): @@ -718,7 +708,7 @@ def write_file(self, monitoringConfig=None, alt_stats=None): if not isinstance(a_el, dict): # ignore subdictionaries val_dict[f"{tp_str}{a}"] = a_el - monitoringConfig.write_rrd_multi("%s/Status_Attributes" % fe_dir, "GAUGE", self.updated, val_dict) + monitoringConfig.write_rrd_multi(os.path.join(fe_dir, "Status_Attributes"), "GAUGE", self.updated, val_dict) self.files_updated = self.updated return @@ -1347,22 +1337,28 @@ def write_file(self, monitoringConfig=None): # write the data to disk monitoringConfig.write_rrd_multi_hetero( - "%s/Log_Counts" % fe_dir, val_dict_counts_desc, self.updated, val_dict_counts + os.path.join(fe_dir, "Log_Counts"), val_dict_counts_desc, self.updated, val_dict_counts + ) + monitoringConfig.write_rrd_multi( + os.path.join(fe_dir, "Log_Completed"), "ABSOLUTE", self.updated, val_dict_completed + ) + monitoringConfig.write_completed_json( + os.path.join(fe_dir, "Log_Completed"), self.updated, val_dict_completed ) - monitoringConfig.write_rrd_multi("%s/Log_Completed" % fe_dir, "ABSOLUTE", self.updated, val_dict_completed) - monitoringConfig.write_completed_json("%s/Log_Completed" % fe_dir, self.updated, val_dict_completed) monitoringConfig.write_rrd_multi( - "%s/Log_Completed_Stats" % fe_dir, "ABSOLUTE", self.updated, val_dict_stats + os.path.join(fe_dir, "Log_Completed_Stats"), "ABSOLUTE", self.updated, val_dict_stats + ) + monitoringConfig.write_completed_json( + os.path.join(fe_dir, "Log_Completed_Stats"), self.updated, val_dict_stats ) - monitoringConfig.write_completed_json("%s/Log_Completed_Stats" % fe_dir, self.updated, val_dict_stats) # Disable Waste RRDs... WasteTime much more useful - # monitoringConfig.write_rrd_multi("%s/Log_Completed_Waste"%fe_dir, + # monitoringConfig.write_rrd_multi(os.path.join(fe_dir, "Log_Completed_Waste"), # "ABSOLUTE",self.updated,val_dict_waste) monitoringConfig.write_rrd_multi( - "%s/Log_Completed_WasteTime" % fe_dir, "ABSOLUTE", self.updated, val_dict_wastetime + os.path.join(fe_dir, "Log_Completed_WasteTime"), "ABSOLUTE", self.updated, val_dict_wastetime ) monitoringConfig.write_completed_json( - "%s/Log_Completed_WasteTime" % fe_dir, self.updated, val_dict_wastetime + os.path.join(fe_dir, "Log_Completed_WasteTime"), self.updated, val_dict_wastetime ) self.aggregate_frontend_data(self.updated, diff_summary) @@ -1382,10 +1378,10 @@ def aggregate_frontend_data(self, updated, diff_summary): for frontend in list(diff_summary.keys()): fe_dir = "frontend_" + frontend - completed_filename = os.path.join(monitoringConfig.monitor_dir, fe_dir) + "/Log_Completed.json" - completed_stats_filename = os.path.join(monitoringConfig.monitor_dir, fe_dir) + "/Log_Completed_Stats.json" - completed_wastetime_filename = ( - os.path.join(monitoringConfig.monitor_dir, fe_dir) + "/Log_Completed_WasteTime.json" + completed_filename = os.path.join(monitoringConfig.monitor_dir, fe_dir, "Log_Completed.json") + completed_stats_filename = os.path.join(monitoringConfig.monitor_dir, fe_dir, "Log_Completed_Stats.json") + completed_wastetime_filename = os.path.join( + monitoringConfig.monitor_dir, fe_dir, "Log_Completed_WasteTime.json" ) try: @@ -1920,28 +1916,5 @@ def get_completed_stats_xml_desc(): ################################################## -# def tmp2final(fname): -# """ -# KEL this exact method is also in glideinFrontendMonitoring.py -# """ -# try: -# os.remove(fname + "~") -# except: -# pass -# -# try: -# os.rename(fname, fname + "~") -# except: -# pass -# -# try: -# os.rename(fname + ".tmp", fname) -# except: -# print "Failed renaming %s.tmp into %s" % (fname, fname) -# return - - -################################################## - # global configuration of the module monitoringConfig = MonitoringConfig() diff --git a/frontend/glideinFrontendMonitorAggregator.py b/frontend/glideinFrontendMonitorAggregator.py index d93a729d8..23c89fb1e 100644 --- a/frontend/glideinFrontendMonitorAggregator.py +++ b/frontend/glideinFrontendMonitorAggregator.py @@ -1,26 +1,12 @@ # SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC # SPDX-License-Identifier: Apache-2.0 -# -# Project: -# glideinWMS -# -# File Version: -# # Description: # This module implements the functions needed # to aggregate the monitoring fo the frontend -# -# Author: -# Igor Sfiligoi (Mar 19th 2009) -# -import copy import os import os.path -import shutil -import string -import tempfile import time from glideinwms.frontend import glideinFrontendMonitoring @@ -90,76 +76,32 @@ def config_frontend(self, monitor_dir, groups): "Requested": "Req", } -#################################### -rrd_problems_found = False +################################################ +# Function used by Frontend reconfig/upgrade +# No logging available, output is to stdout/err -def verifyHelper(filename, dict, fix_rrd=False): - """ - Helper function for verifyRRD. Checks one file, - prints out errors. if fix_rrd, will attempt to - dump out rrd to xml, add the missing attributes, - then restore. Original file is obliterated. - - @param filename: filename of rrd to check - @param dict: expected dictionary - @param fix_rrd: if true, will attempt to add missing attrs - """ - global rrd_problems_found - if not os.path.exists(filename): - print("WARNING: %s missing, will be created on restart" % (filename)) - return - rrd_obj = rrdSupport.rrdSupport() - (missing, extra) = rrd_obj.verify_rrd(filename, dict) - for attr in extra: - print(f"ERROR: {filename} has extra attribute {attr}") - if fix_rrd: - print("ERROR: fix_rrd cannot fix extra attributes") - if not fix_rrd: - for attr in missing: - print(f"ERROR: {filename} missing attribute {attr}") - if len(missing) > 0: - rrd_problems_found = True - if fix_rrd and (len(missing) > 0): - (f, tempfilename) = tempfile.mkstemp() - (out, tempfilename2) = tempfile.mkstemp() - (restored, restoredfilename) = tempfile.mkstemp() - backup_str = str(int(time.time())) + ".backup" - print(f"Fixing {filename}... (backed up to {filename + backup_str})") - os.close(out) - os.close(restored) - os.unlink(restoredfilename) - # Use exe version since dump, restore not available in rrdtool - dump_obj = rrdSupport.rrdtool_exe() - outstr = dump_obj.dump(filename) - for line in outstr: - os.write(f, "%s\n" % line) - os.close(f) - rrdSupport.addDataStore(tempfilename, tempfilename2, missing) - os.unlink(filename) - dump_obj.restore(tempfilename2, restoredfilename) - os.unlink(tempfilename) - os.unlink(tempfilename2) - shutil.move(restoredfilename, filename) - if len(extra) > 0: - rrd_problems_found = True - - -def verifyRRD(fix_rrd=False): + +def verifyRRD(fix_rrd=False, backup=False): """ Go through all known monitoring rrds and verify that they match existing schema (could be different if an upgrade happened) If fix_rrd is true, then also attempt to add any missing attributes. + + Args: + fix_rrd (bool): if True, will attempt to add missing attrs + backup (bool): if True, backup the old RRD before fixing + + Returns: + bool: True if all OK, False if there is a problem w/ RRD files + """ - global rrd_problems_found - global monitorAggregatorConfig - # FROM: migration_3_1 - # dir=monitorAggregatorConfig.monitor_dir - # total_dir=os.path.join(dir, "total") + rrd_problems_found = False mon_dir = monitorAggregatorConfig.monitor_dir - + # Frontend monitoring dictionaries status_dict = {} status_total_dict = {} + # initialize the RRD dictionaries to match the current schema for verification for tp in list(frontend_status_attributes.keys()): if tp in list(frontend_total_type_strings.keys()): tp_str = frontend_total_type_strings[tp] @@ -171,36 +113,19 @@ def verifyRRD(fix_rrd=False): attributes_tp = frontend_status_attributes[tp] for a in attributes_tp: status_dict[f"{tp_str}{a}"] = None - + # check all the existing files if not os.path.isdir(mon_dir): print("WARNING: monitor/ directory does not exist, skipping rrd verification.") return True - # FROM: migration_3_1 - # for filename in os.listdir(dir): - # if (filename[:6]=="group_") or (filename=="total"): - # current_dir=os.path.join(dir, filename) - # if filename=="total": - # verifyHelper(os.path.join(current_dir, - # "Status_Attributes.rrd"), status_total_dict, fix_rrd) - # for dirname in os.listdir(current_dir): - # current_subdir=os.path.join(current_dir, dirname) - # if dirname[:6]=="state_": - # verifyHelper(os.path.join(current_subdir, - # "Status_Attributes.rrd"), status_dict, fix_rrd) - # if dirname[:8]=="factory_": - # verifyHelper(os.path.join(current_subdir, - # "Status_Attributes.rrd"), status_dict, fix_rrd) - # if dirname=="total": - # verifyHelper(os.path.join(current_subdir, - # "Status_Attributes.rrd"), status_total_dict, fix_rrd) for dir_name, sdir_name, f_list in os.walk(mon_dir): for file_name in f_list: if file_name == "Status_Attributes.rrd": if os.path.basename(dir_name) == "total": - verifyHelper(os.path.join(dir_name, file_name), status_total_dict, fix_rrd) + if rrdSupport.verifyHelper(os.path.join(dir_name, file_name), status_total_dict, fix_rrd, backup): + rrd_problems_found = True else: - verifyHelper(os.path.join(dir_name, file_name), status_dict, fix_rrd) - + if rrdSupport.verifyHelper(os.path.join(dir_name, file_name), status_dict, fix_rrd, backup): + rrd_problems_found = True return not rrd_problems_found @@ -276,7 +201,7 @@ def aggregateStatus(): for group in monitorAggregatorConfig.groups: # load group status file status_fname = os.path.join( - os.path.join(monitorAggregatorConfig.monitor_dir, "group_" + group), monitorAggregatorConfig.status_relname + monitorAggregatorConfig.monitor_dir, f"group_{group}", monitorAggregatorConfig.status_relname ) try: group_data = xmlParse.xmlfile2dict(status_fname) @@ -292,7 +217,7 @@ def aggregateStatus(): for fos in ("factories", "states"): try: status["groups"][group][fos] = group_data[fos] - except KeyError as e: + except KeyError: # first time after upgrade factories may not be defined status["groups"][group][fos] = {} @@ -457,15 +382,15 @@ def aggregateStatus(): # Write rrds glideinFrontendMonitoring.monitoringConfig.establish_dir("total") - write_one_rrd("total/Status_Attributes", updated, global_total, 0) + write_one_rrd(os.path.join("total", "Status_Attributes"), updated, global_total, 0) for fact in list(global_fact_totals["factories"].keys()): - fe_dir = "total/factory_%s" % glideinFrontendMonitoring.sanitize(fact) + fe_dir = os.path.join("total", f"factory_{glideinFrontendMonitoring.sanitize(fact)}") glideinFrontendMonitoring.monitoringConfig.establish_dir(fe_dir) - write_one_rrd("%s/Status_Attributes" % fe_dir, updated, global_fact_totals["factories"][fact], 1) + write_one_rrd(os.path.join(fe_dir, "Status_Attributes"), updated, global_fact_totals["factories"][fact], 1) for fact in list(global_fact_totals["states"].keys()): - fe_dir = "total/state_%s" % glideinFrontendMonitoring.sanitize(fact) + fe_dir = os.path.join("total", f"state_{glideinFrontendMonitoring.sanitize(fact)}") glideinFrontendMonitoring.monitoringConfig.establish_dir(fe_dir) - write_one_rrd("%s/Status_Attributes" % fe_dir, updated, global_fact_totals["states"][fact], 1) + write_one_rrd(os.path.join(fe_dir, "Status_Attributes"), updated, global_fact_totals["states"][fact], 1) return status diff --git a/frontend/glideinFrontendMonitoring.py b/frontend/glideinFrontendMonitoring.py index e3be22a13..a4f5b5bef 100644 --- a/frontend/glideinFrontendMonitoring.py +++ b/frontend/glideinFrontendMonitoring.py @@ -1,32 +1,17 @@ # SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC # SPDX-License-Identifier: Apache-2.0 -# -# Project: -# glideinWMS -# -# File Version: -# # Description: # This module implements the functions needed # to monitor the VO frontend -# -# Author: -# Igor Sfiligoi (Mar 19th 2009) -# import copy -import fcntl -import math import os import os.path -import random -import re import string import time -import traceback -from glideinwms.lib import logSupport, rrdSupport, timeConversion, xmlFormat +from glideinwms.lib import logSupport, rrdSupport, timeConversion, util, xmlFormat from glideinwms.lib.defaults import BINARY_ENCODING ############################################################ @@ -57,19 +42,16 @@ def __init__(self): def write_file(self, relative_fname, output_str): fname = os.path.join(self.monitor_dir, relative_fname) - if not os.path.isdir(os.path.dirname(fname)): - os.makedirs(os.path.dirname(fname)) + os.makedirs(os.path.dirname(fname), exist_ok=True) # print "Writing "+fname with open(fname + ".tmp", "w") as fd: fd.write(output_str + "\n") - - tmp2final(fname) + util.file_tmp2final(fname, mask_exceptions=(logSupport.log.error, f"Failed rename/write into {fname}")) return def establish_dir(self, relative_dname): dname = os.path.join(self.monitor_dir, relative_dname) - if not os.path.isdir(dname): - os.mkdir(dname) + os.makedirs(dname, exist_ok=True) return def write_rrd_multi(self, relative_fname, ds_type, time, val_dict, min_val=None, max_val=None): @@ -396,7 +378,7 @@ def write_one_rrd(self, name, data, fact=0): val_dict[f"{tp_str}{a}"] = a_el monitoringConfig.establish_dir("%s" % name) - monitoringConfig.write_rrd_multi("%s/Status_Attributes" % name, "GAUGE", self.updated, val_dict) + monitoringConfig.write_rrd_multi(os.path.join(name, "Status_Attributes"), "GAUGE", self.updated, val_dict) ######################################################################## @@ -649,7 +631,7 @@ def write_file(self): if not isinstance(a_el, dict): # ignore subdictionaries val_dict[f"{tp_str}{a}"] = a_el - monitoringConfig.write_rrd_multi("%s/Status_Attributes" % fe_dir, "GAUGE", self.updated, val_dict) + monitoringConfig.write_rrd_multi(os.path.join(fe_dir, "Status_Attributes"), "GAUGE", self.updated, val_dict) self.files_updated = self.updated return @@ -658,29 +640,8 @@ def write_file(self): ############### P R I V A T E ################ ################################################## -def tmp2final(fname): - """ - This exact method is also in glideFactoryMonitoring.py - """ - try: - os.remove(fname + "~") - except: - pass - - try: - os.rename(fname, fname + "~") - except: - pass - try: - os.rename(fname + ".tmp", fname) - except: - print(f"Failed renaming {fname}.tmp into {fname}") - logSupport.log.error(f"Failed renaming {fname}.tmp into {fname}") - return - -################################################## def sanitize(name): good_chars = string.ascii_letters + string.digits + ".-" outarr = [] @@ -740,7 +701,10 @@ def write_frontend_descript_xml(frontendDescript, monitor_dir): with open(fname + ".tmp", "wb") as f: f.write(output.encode(BINARY_ENCODING)) - tmp2final(fname) + util.file_tmp2final( + fname, + mask_exceptions=(logSupport.log.error, f"Failed rename/write of the frontend descript.xml: {fname}"), + ) except OSError: logSupport.log.exception("Error writing out the frontend descript.xml: ") diff --git a/lib/defaults.py b/lib/defaults.py index 18ee774c2..fe905ad9d 100644 --- a/lib/defaults.py +++ b/lib/defaults.py @@ -17,6 +17,7 @@ # Anyway M2Crypto uses 'utf_8' to implement AnyStr (union of bytes and str) BINARY_ENCODING_CRYPTO = "utf_8" # valid aliases: utf-8, utf8 BINARY_ENCODING_ASCII = "ascii" # valid aliases: 646, us-ascii +BINARY_ENCODING_DEFAULT = "utf_8" # valid aliases: utf-8, utf8 Default Python 3 encoding def force_bytes(instr, encoding=BINARY_ENCODING_CRYPTO): diff --git a/lib/rrdSupport.py b/lib/rrdSupport.py index 3dd31a152..59e001a27 100644 --- a/lib/rrdSupport.py +++ b/lib/rrdSupport.py @@ -1,30 +1,21 @@ # SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC # SPDX-License-Identifier: Apache-2.0 -# -# Project: -# glideinWMS -# -# File Version: -# # Description: # This module implements the basic functions needed # to interface to rrdtool -# -# Author: -# Igor Sfiligoi -# - -import shlex -import string -import subprocess + +import os +import shutil +import tempfile import time -from . import subprocessSupport +from . import defaults, subprocessSupport try: import rrdtool # pylint: disable=import-error except: + # Will use the binary tools if the Python library is not available pass @@ -538,10 +529,9 @@ def __init__(self): ################################################################## -################################## -# Dummy, do nothing -# Used just to get a object class DummyDiskLock: + """Dummy, do nothing. Used just to get a object""" + def close(self): return @@ -561,6 +551,10 @@ def string_quote_join(arglist): class rrdtool_exe: """This class is a wrapper around the rrdtool client (binary) and is used in place of the rrdtool python module, if that one is not available + + It provides also extra functions: + dump: returns an array of lines with the content instead of saving the RRD in an XML file + restore: allows the restore of a DB """ def __init__(self): @@ -590,7 +584,9 @@ def dump(self, *args): """Run rrd_tool dump Input is usually just the file name. - Output is a list of lines, as returned from rrdtool. + Output is a list of lines, as returned from rrdtool dump. + This is different from the `dump` method provided by the `rrdtool` package (Python binding) + which outputs to a file or stdout Args: *args: rrdtool dump arguments, joined in single string for the command line @@ -600,7 +596,7 @@ def dump(self, *args): """ cmdline = f"{self.rrd_bin} dump {string_quote_join(args)}" - outstr = subprocessSupport.iexe_cmd(cmdline).decode("utf-8").split("\n") + outstr = subprocessSupport.iexe_cmd(cmdline).split("\n") return outstr def restore(self, *args): @@ -672,3 +668,68 @@ def addDataStore(filenamein, filenameout, attrlist): out.write(line) if "" in line: parse = True + + +# Function used by verifyRRD (in Factory and Frontend), invoked during reconfig/upgrade +# No logging available, output is to stdout/err +def verifyHelper(filename, data_dict, fix_rrd=False, backup=True): + """Helper function for verifyRRD. + Checks one file, prints out errors. + if fix_rrd, will attempt to dump out rrd to xml, add the missing attributes, then restore. + Original file is backed up with time stamp if backup is True, obliterated otherwise. + + Args: + filename(str): filename of rrd to check + data_dict(dict): expected dictionary + fix_rrd(bool): if True, will attempt to add missing attrs + backup(bool): if not True skip the backup of original rrd + + Returns: + bool: True if there were some problem with the RRD file, False if all OK + + """ + rrd_problems_found = False + if not os.path.exists(filename): + print(f"WARNING: {filename} missing, will be created on restart") + return + rrd_obj = rrdSupport() + (missing, extra) = rrd_obj.verify_rrd(filename, data_dict) + for attr in extra: + print(f"ERROR: {filename} has extra attribute {attr}") + if fix_rrd: + print("ERROR: fix_rrd cannot fix extra attributes") + if not fix_rrd: + for attr in missing: + print(f"ERROR: {filename} missing attribute {attr}") + if len(missing) > 0: + rrd_problems_found = True + if fix_rrd and (len(missing) > 0): + (f, tempfilename) = tempfile.mkstemp() + (out, tempfilename2) = tempfile.mkstemp() + (restored, restoredfilename) = tempfile.mkstemp() + os.close(out) + os.close(restored) + os.unlink(restoredfilename) + # Use exe version since dump, restore not available in rrdtool + dump_obj = rrdtool_exe() + outstr = dump_obj.dump(filename) + for line in outstr: + # dump is returning an array of strings decoded w/ utf-8 + os.write(f, f"{line}\n".encode(defaults.BINARY_ENCODING_DEFAULT)) + os.close(f) + if backup: + backup_str = str(int(time.time())) + ".backup" + print(f"Fixing {filename}... (backed up to {filename + backup_str})") + # Move file to back up location + shutil.move(filename, filename + backup_str) + else: + print(f"Fixing {filename}... (no back up)") + os.unlink(filename) + addDataStore(tempfilename, tempfilename2, missing) + dump_obj.restore(tempfilename2, restoredfilename) + os.unlink(tempfilename) + os.unlink(tempfilename2) + shutil.move(restoredfilename, filename) + if len(extra) > 0: + rrd_problems_found = True + return rrd_problems_found diff --git a/lib/util.py b/lib/util.py index 9d2814ded..1ab1360bb 100644 --- a/lib/util.py +++ b/lib/util.py @@ -3,18 +3,8 @@ # SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC # SPDX-License-Identifier: Apache-2.0 -# -# Project: -# glideinWMS -# -# File Version: -# # Description: # This is a collection of utilities functions for file handling and other -# -# Author: -# Marco Mambelli (some functions are from other modules and hardened) -# import contextlib @@ -169,16 +159,20 @@ def dict_normalize(in_dict, keys=None, prefix="", suffix="", default=None): # Atomic writing of files ################################### +# TODO: to remove this comment block after all is OK (MM) +# Using Python >=3.6 as of 2023 +# os.replace() is available, leaving this commented to make sure no other module is importing replace() from util +# # Replace atomically the destination with the source # replace(source, destination) -try: - # Python 3.3+ provides os.replace which is guaranteed atomic and overwrites existing files - replace = os.replace # pylint: disable=no-member -except AttributeError: - # os.rename is atomic in POSIX systems (e.g. not on Windows or some non local file systems) - # This post covers at length the problem, including a working solution on Windows - # http://stupidpythonideas.blogspot.com/2014/07/getting-atomic-writes-right.html - replace = os.rename +# try: +# # Python 3.3+ provides os.replace which is guaranteed atomic and overwrites existing files +# replace = os.replace # pylint: disable=no-member +# except AttributeError: +# # os.rename is atomic in POSIX systems (e.g. not on Windows or some non local file systems) +# # This post covers at length the problem, including a working solution on Windows +# # http://stupidpythonideas.blogspot.com/2014/07/getting-atomic-writes-right.html +# replace = os.rename class ExpiredFileException(Exception): @@ -313,21 +307,28 @@ def file_pickle_load(fname, mask_exceptions=None, default=None, expiration=-1, r # TODO: replace all definitions with this one -def file_tmp2final(fname, tmp_fname=None, bck_fname=None, do_backup=True, mask_exceptions=None): +def file_tmp2final( + fname, tmp_fname=None, bck_fname=None, do_backup=True, mask_exceptions=None, log=None, do_print=False +): """Complete an atomic write by moving a file new version to its destination. If do_backup is True it removes the previous backup and copies the file to bak_fname. Moves tmp_fname to fname. - :param fname: name of the file - :param tmp_fname: name of the temporary file with the new version of the content (Default: .tmp) - :param bck_fname: name of a backup of the old version (Default: ~) - :param do_backup: do a backup of the old version only if True (Default: True) - :param mask_exceptions: callback function and arguments to use if an exception happens (Default: None) - The callback function can access the exception via sys.exc_info() - If a function is not provided, the exception is re-risen - if provided it is called using mask_exceptions[0](*mask_exceptions[1:]) - :return: False if the move caused an exception. True otherwise + Args: + fname(str): name of the file + tmp_fname(str|None): name of the temporary file with the new version of the content (Default: .tmp) + bck_fname(str|None): name of a backup of the old version (Default: ~) + do_backup(bool): do a backup of the old version only if True (Default: True) + mask_exceptions: callback function and arguments to use if an exception happens (Default: None) + The callback function can access the exception via sys.exc_info() + If a function is not provided, the exception is re-risen + if provided it is called using mask_exceptions[0](*mask_exceptions[1:]) + log: + do_print: + + Returns: + bool: False if the move caused an exception. True otherwise """ if tmp_fname is None: tmp_fname = fname + ".tmp" @@ -341,7 +342,7 @@ def file_tmp2final(fname, tmp_fname=None, bck_fname=None, do_backup=True, mask_e except: pass try: - replace(tmp_fname, fname) + os.replace(tmp_fname, fname) except: # print "Failed renaming %s into %s" % (tmp_fname, fname) conditional_raise(mask_exceptions)