From e69c5e7f1cc9340fd06d040d87d929449a8db725 Mon Sep 17 00:00:00 2001 From: fstagni Date: Tue, 19 Jan 2021 15:15:24 +0100 Subject: [PATCH 1/4] added config and instructions to the right place --- .../Systems/MonitoringSystem/index.rst | 55 +++++++++++++++---- src/DIRAC/MonitoringSystem/ConfigTemplate.cfg | 20 +------ src/DIRAC/MonitoringSystem/DB/MonitoringDB.py | 25 ++++++++- 3 files changed, 68 insertions(+), 32 deletions(-) diff --git a/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst b/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst index 32edb104aba..a5091e9e2c6 100644 --- a/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst +++ b/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst @@ -10,24 +10,25 @@ Monitoring System Overview ========= -The Monitoring system is used to monitor various components of DIRAC. Currently, we have two monitoring types: +The Monitoring system is used to monitor various components of DIRAC. Currently, we have three monitoring types: - WMSHistory: for monitoring the DIRAC WMS - - Component Monitoring: for monitoring DIRAC components such as services, agents, etc. + - Component Monitoring: for monitoring DIRAC components such as services, agents, etc. + - RMS Monitoring: for monitoring the DIRAC RequestManagement System (mostly the Request Executing Agent). -It is based on Elasticsearch distributed search and analytics NoSQL database. If you want to use it, you have to install the Monitoring service and -elasticsearch db. You can use a single node, if you do not have to store lot of data, otherwise you need a cluster (more than one node). +It is based on Elasticsearch distributed search and analytics NoSQL database. +If you want to use it, you have to install the Monitoring service, and of course connect to a ElasticSearch instance. Install Elasticsearch ====================== -You can found in https://www.elastic.co official web site. I propose to use standard tools to install for example: yum, rpm, etc. otherwise -you encounter some problems. If you are not familiar with managing linux packages, you have to ask your college or read some relevant documents. +This is not covered here, as installation and administration of ES are not part of DIRAC guide. +Just a note on the ES versions supported: ES7 and ES6 are supported, the support for ES5 is not assured. Configure the MonitoringSystem =============================== -You can run your El cluster without authentication or using User name and password. You have to add the following parameters: +You can run your Elastic cluster even without authentication, or using User name and password. You have to add the following parameters: - User - Password @@ -35,7 +36,7 @@ You can run your El cluster without authentication or using User name and passwo - Port The User name and Password must be added to the local cfg file while the other can be added to the CS using the Configuration web application. -You have to handle the EL secret information in a similar way to what is done for the other supported SQL databases, e.g. MySQL +You have to handle the ES secret information in a similar way to what is done for the other supported SQL databases, e.g. MySQL For example:: @@ -47,18 +48,48 @@ For example:: User = test Password = password } + } + + +The following option can be set in `Systems/Monitoring//Databases/MonitoringDB`: + + *IndexPrefix*: Prefix used to prepend to indexes created in the ES instance. If this + is not present in the CS, the indexes are prefixed with the setup name. +For each monitoring types managed, the Period (how often a new index is created) +can be defined with:: + + MonitoringTypes + { + ComponentMonitoring + { + # Indexing strategy. Possible values: day, week, month, year, null + Period = month + } + RMSMonitoring + { + # Indexing strategy. Possible values: day, week, month, year, null + Period = month + } + WMSHistory + { + # Indexing strategy. Possible values: day, week, month, year, null + Period = day + } } +The given periods above are also the default periods in the code. + Enable WMSHistory monitoring ============================ -You have to install the WorkloadManagemet/StatesMonitoringAgent. This agent is used to collect information using the JobDB and send it to the Elasticsearch database. -If you install this agent, you can stop the StatesAccounting agent. +You have to install the WorkloadManagemet/StatesMonitoringAgent. +This agent is used to collect information using the JobDB and send it to the Elasticsearch database. +If you install this agent, you can stop the StatesAccounting agent, that was reporting to the MySQL backend of the Accounting system. -Note: You can use RabbitMQ for failover. This is optional as the agent already has a failover mechanism. You can configure RabbitMQ in the local dirac.cfg file -where the agent is running:: +You can use RabbitMQ for failover. This is optional as the agent already has a failover mechanism. +You can configure RabbitMQ in the local dirac.cfg file where the agent is running:: Resources { diff --git a/src/DIRAC/MonitoringSystem/ConfigTemplate.cfg b/src/DIRAC/MonitoringSystem/ConfigTemplate.cfg index 33217659430..82a0e68a466 100644 --- a/src/DIRAC/MonitoringSystem/ConfigTemplate.cfg +++ b/src/DIRAC/MonitoringSystem/ConfigTemplate.cfg @@ -6,30 +6,12 @@ Services Port = 9137 Authorization { - Default = authenticated + Default = authenticated FileTransfer { Default = authenticated } } - MonitoringTypes - { - ComponentMonitoring - { - # Indexing strategy. Possible values: day, week, month, year, null - Period = month - } - RMSMonitoring - { - # Indexing strategy. Possible values: day, week, month, year, null - Period = month - } - WMSHistory - { - # Indexing strategy. Possible values: day, week, month, year, null - Period = day - } - } } ##END } \ No newline at end of file diff --git a/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py b/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py index 49f051a19c7..3872a171ca3 100644 --- a/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py +++ b/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py @@ -3,11 +3,34 @@ **Configuration Parameters**: -The following options can be set in ``Systems/Monitoring//Databases/MonitoringDB`` +The following option can be set in `Systems/Monitoring//Databases/MonitoringDB` * *IndexPrefix*: Prefix used to prepend to indexes created in the ES instance. If this is not present in the CS, the indexes are prefixed with the setup name. +For each monitoring types managed, the Period (how often a new index is created) +can be defined with:: + + MonitoringTypes + { + ComponentMonitoring + { + # Indexing strategy. Possible values: day, week, month, year, null + Period = month + } + RMSMonitoring + { + # Indexing strategy. Possible values: day, week, month, year, null + Period = month + } + WMSHistory + { + # Indexing strategy. Possible values: day, week, month, year, null + Period = day + } + } + + """ from __future__ import absolute_import from __future__ import division From cc206485b31736d4e0188df7f53731c90f05586b Mon Sep 17 00:00:00 2001 From: fstagni Date: Tue, 19 Jan 2021 15:33:07 +0100 Subject: [PATCH 2/4] fix: ported out of the inner cycle --- src/DIRAC/ResourceStatusSystem/Agent/SummarizeLogsAgent.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/DIRAC/ResourceStatusSystem/Agent/SummarizeLogsAgent.py b/src/DIRAC/ResourceStatusSystem/Agent/SummarizeLogsAgent.py index 4e90f0ea576..e552d8d14b4 100644 --- a/src/DIRAC/ResourceStatusSystem/Agent/SummarizeLogsAgent.py +++ b/src/DIRAC/ResourceStatusSystem/Agent/SummarizeLogsAgent.py @@ -101,8 +101,8 @@ def execute(self): self.log.error(deleteResult['Message']) continue - if self.months: - self._removeOldHistoryEntries(element, self.months) + if self.months: + self._removeOldHistoryEntries(element, self.months) return S_OK() @@ -252,6 +252,7 @@ def _removeOldHistoryEntries(self, element, months): :return: S_OK / S_ERROR """ toRemove = datetime.utcnow().replace(microsecond=0) - timedelta(days=30 * months) + self.log.info("Removing history entries", "older than %s" % toRemove) deleteResult = self.rsClient.deleteStatusElement(element, 'History', meta={'older': ['DateEffective', toRemove]}) From e1997049f6763b38de1916a68b516b0be3f533d8 Mon Sep 17 00:00:00 2001 From: fstagni Date: Wed, 20 Jan 2021 18:16:41 +0100 Subject: [PATCH 3/4] moving towards a single agent for WMS history --- .../Client/MonitoringReporter.py | 20 +-- src/DIRAC/MonitoringSystem/ConfigTemplate.cfg | 2 +- .../Agent/StatesAccountingAgent.py | 170 ++++++++---------- .../Agent/StatesMonitoringAgent.py | 44 ++--- .../ConfigTemplate.cfg | 8 +- 5 files changed, 111 insertions(+), 133 deletions(-) diff --git a/src/DIRAC/MonitoringSystem/Client/MonitoringReporter.py b/src/DIRAC/MonitoringSystem/Client/MonitoringReporter.py index cbc95409856..6957316f027 100644 --- a/src/DIRAC/MonitoringSystem/Client/MonitoringReporter.py +++ b/src/DIRAC/MonitoringSystem/Client/MonitoringReporter.py @@ -37,12 +37,11 @@ class MonitoringReporter(object): """ .. class:: MonitoringReporter - This class is used to interact with the db using failover mechanism. + This class is used to interact with the ES DB, using a MQ as a failover mechanism. :param int __maxRecordsInABundle: limit the number of records to be inserted to the db. :param threading.RLock __documentLock: is used to lock the local store when it is being modified. - :param __documents: contains the recods which will be inserted to the db. - :type __documents: python:list + :param list __documents: contains the records which will be inserted to the db. :param str __monitoringType: type of the records which will be inserted to the db. For example: WMSHistory. :param str __failoverQueueName: the name of the messaging queue. For example: /queue/dirac.certification """ @@ -62,8 +61,8 @@ def __del__(self): def processRecords(self): """ - It consumes all messages from the MQ (these are failover messages). In case of failure, the messages - will be inserted to the MQ again. + It consumes all messages from the MQ (these are failover messages). + In case of failure, the messages will be inserted to the MQ again. """ retVal = monitoringDB.pingDB() # if the db is not accessible, the records will be not processed from MQ if retVal['OK']: @@ -74,10 +73,9 @@ def processRecords(self): result = createConsumer("Monitoring::Queues::%s" % self.__failoverQueueName) if not result['OK']: - gLogger.error("Fail to create Consumer: %s" % result['Message']) - return S_ERROR("Fail to create Consumer: %s" % result['Message']) - else: - mqConsumer = result['Value'] + gLogger.error("Fail to create Consumer", result['Message']) + return S_ERROR("Fail to create Consumer") + mqConsumer = result['Value'] result = S_OK() failedToProcess = [] @@ -128,8 +126,8 @@ def publishRecords(self, records, mqProducer=None): def commit(self): """ - It inserts the accumulated data to the db. In case of failure - it keeps in memory/MQ + It inserts the accumulated data to the db. + In case of failure it keeps in memory/MQ """ # before we try to insert the data to the db, we process all the data # which are already in the queue diff --git a/src/DIRAC/MonitoringSystem/ConfigTemplate.cfg b/src/DIRAC/MonitoringSystem/ConfigTemplate.cfg index 82a0e68a466..fddc3c67cb2 100644 --- a/src/DIRAC/MonitoringSystem/ConfigTemplate.cfg +++ b/src/DIRAC/MonitoringSystem/ConfigTemplate.cfg @@ -14,4 +14,4 @@ Services } } ##END -} \ No newline at end of file +} diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py index 978865ffbb3..9349e479cf7 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py @@ -1,34 +1,32 @@ -######################################################################## -# File : StatesAccountingAgent.py -# Author : A.T. -######################################################################## - """ StatesAccountingAgent sends periodically numbers of jobs in various states for various sites to the Monitoring system to create historical plots. + +.. literalinclude:: ../ConfigTemplate.cfg + :start-after: ##BEGIN StatesAccountingAgent + :end-before: ##END + :dedent: 2 + :caption: StatesAccountingAgent options """ + from __future__ import absolute_import from __future__ import division from __future__ import print_function -__RCSID__ = "$Id$" +__RCSID__ = "$Id$" -from DIRAC import gConfig, S_OK +from DIRAC import S_OK, S_ERROR from DIRAC.Core.Base.AgentModule import AgentModule -from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB +from DIRAC.Core.Utilities import Time from DIRAC.AccountingSystem.Client.Types.WMSHistory import WMSHistory from DIRAC.AccountingSystem.Client.DataStoreClient import DataStoreClient -from DIRAC.Core.Utilities import Time +from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter +from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB class StatesAccountingAgent(AgentModule): - """ - The specific agents must provide the following methods: - - initialize() for initial settings - - beginExecution() - - execute() - the main method called in the agent cycle - - endExecution() - - finalize() - the graceful exit of the method, this one is usually used - for the agent restart + """ Agent that every 15 minutes will report + to the AccountingDB (MySQL) or the Monitoring DB (ElasticSearch), or both, + a snapshot of the JobDB. """ __summaryKeyFieldsMapping = ['Status', @@ -37,23 +35,32 @@ class StatesAccountingAgent(AgentModule): 'UserGroup', 'JobGroup', 'JobType', - ] - __summaryDefinedFields = [('ApplicationStatus', 'unset'), ('MinorStatus', 'unset')] + 'ApplicationStatus', + 'MinorStatus'] + __summaryDefinedFields = [('ApplicationStatus', 'unset'), + ('MinorStatus', 'unset')] __summaryValueFieldsMapping = ['Jobs', - 'Reschedules', - ] + 'Reschedules'] __renameFieldsMapping = {'JobType': 'JobSplitType'} def initialize(self): - """ Standard constructor + """ Standard initialization """ - self.dsClients = {} - self.jobDB = JobDB() - self.retryOnce = False - self.retryValues = [] + # This agent will always loop every 15 minutes + self.am_setOption("PollingTime", 900) + + self.backends = self.am_getOption("Backends", "Accounting").replace(' ', '').split(',') + messageQueue = self.am_getOption("MessageQueue", "dirac.wmshistory") + + self.datastores = {} # For storing the clients to Accounting and Monitoring + + if 'Accounting' in self.backends: + self.datastores['Accounting'] = DataStoreClient(retryGraceTime=900) + if 'Monitoring' in self.backends: + self.datastores['Monitoring'] = MonitoringReporter( + monitoringType="WMSHistory", + failoverQueueName=messageQueue) - self.reportPeriod = 850 - self.am_setOption("PollingTime", self.reportPeriod) self.__jobDBFields = [] for field in self.__summaryKeyFieldsMapping: if field == 'User': @@ -66,76 +73,51 @@ def initialize(self): def execute(self): """ Main execution method """ - result = gConfig.getSections("/DIRAC/Setups") - if not result['OK']: - return result - validSetups = result['Value'] - self.log.info("Valid setups for this cycle are %s" % ", ".join(validSetups)) # Get the WMS Snapshot! - result = self.jobDB.getSummarySnapshot(self.__jobDBFields) + result = JobDB().getSummarySnapshot(self.__jobDBFields) now = Time.dateTime() if not result['OK']: self.log.error("Can't get the JobDB summary", "%s: won't commit at this cycle" % result['Message']) - else: - values = result['Value'][1] - - if self.retryOnce: - self.log.verbose("Adding to records to commit those not committed within the previous cycle") - acWMSListAdded = [] - - for record in values: - recordSetup = record[0] - if recordSetup not in validSetups: - self.log.error("Setup %s is not valid" % recordSetup) - continue - if recordSetup not in self.dsClients: - self.log.info("Creating DataStore client for %s" % recordSetup) - self.dsClients[recordSetup] = DataStoreClient(retryGraceTime=900) - record = record[1:] - rD = {} - for fV in self.__summaryDefinedFields: - rD[fV[0]] = fV[1] - for iP in range(len(self.__summaryKeyFieldsMapping)): - fieldName = self.__summaryKeyFieldsMapping[iP] - rD[self.__renameFieldsMapping.get(fieldName, fieldName)] = record[iP] - record = record[len(self.__summaryKeyFieldsMapping):] - for iP in range(len(self.__summaryValueFieldsMapping)): - rD[self.__summaryValueFieldsMapping[iP]] = int(record[iP]) - acWMS = WMSHistory() - acWMS.setStartTime(now) - acWMS.setEndTime(now) - acWMS.setValuesFromDict(rD) - retVal = acWMS.checkValues() - if not retVal['OK']: - self.log.error("Invalid accounting record ", "%s -> %s" % (retVal['Message'], rD)) - else: - self.dsClients[recordSetup].addRegister(acWMS) - acWMSListAdded.append(acWMS) - - if self.retryOnce and self.retryValues: - for acWMSCumulated in self.retryValues: - retVal = acWMSCumulated.checkValues() + return S_ERROR() + + # Now we try to commit + values = result['Value'][1] + + self.log.info("Start sending records") + for record in values: + record = record[1:] + rD = {} + for fV in self.__summaryDefinedFields: + rD[fV[0]] = fV[1] + for iP in range(len(self.__summaryKeyFieldsMapping)): + fieldName = self.__summaryKeyFieldsMapping[iP] + rD[self.__renameFieldsMapping.get(fieldName, fieldName)] = record[iP] + record = record[len(self.__summaryKeyFieldsMapping):] + for iP in range(len(self.__summaryValueFieldsMapping)): + rD[self.__summaryValueFieldsMapping[iP]] = int(record[iP]) + + for backend in self.datastores: + if backend.lower() == 'monitoring': + rD['timestamp'] = int(Time.toEpoch(now)) + self.datastores['Monitoring'].addRecord(rD) + + elif backend.lower() == 'accounting': + acWMS = WMSHistory() + acWMS.setStartTime(now) + acWMS.setEndTime(now) + acWMS.setValuesFromDict(rD) + retVal = acWMS.checkValues() if not retVal['OK']: - self.log.error("Invalid accounting record ", "%s" % (retVal['Message'])) - else: - self.dsClients[recordSetup].addRegister(acWMSCumulated) - - for setup in self.dsClients: - self.log.info("Sending records for setup %s" % setup) - result = self.dsClients[setup].commit() - if not result['OK']: - self.log.error("Couldn't commit wms history for setup %s" % setup, result['Message']) - # Re-creating the client: for new connection, and for avoiding accumulating too large of a backlog - self.dsClients[setup] = DataStoreClient(retryGraceTime=900) - if not self.retryOnce: - self.log.info("Will try again at next cycle") - self.retryOnce = True - self.retryValues = acWMSListAdded + self.log.error("Invalid accounting record ", "%s -> %s" % (retVal['Message'], rD)) else: - self.log.warn("Won't retry one more time") - self.retryOnce = False - self.retryValues = [] - else: - self.log.info("Sent %s records for setup %s" % (result['Value'], setup)) - self.retryOnce = False + self.datastores['Accounting'].addRegister(acWMS) + + for backend, datastore in self.datastores.items(): + self.log.info("Committing to %s backend" % backend) + result = datastore.commit() + if not result['OK']: + self.log.error("Couldn't commit WMS history to %s" % backend, result['Message']) + return S_ERROR() + self.log.verbose("Done committing to %s backend" % backend) + return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StatesMonitoringAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StatesMonitoringAgent.py index c5e3456bac7..f7ddad71bb3 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StatesMonitoringAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StatesMonitoringAgent.py @@ -1,6 +1,11 @@ ''' StatesMonitoringAgent sends periodically numbers of jobs in various states for various - sites to the Monitoring system to create historical plots. + sites to the Monitoring system to create monitoring plots. + + As of DIRAC v7r2, this agent is an almost exact copy of StatesAccountingAgent, + the only difference being that it will only use the Monitoring System + as its backend (and so ElasticSearch). + This agent will be removed from DIRAC v7r3. .. literalinclude:: ../ConfigTemplate.cfg :start-after: ##BEGIN StatesMonitoringAgent @@ -15,7 +20,7 @@ __RCSID__ = "$Id$" -from DIRAC import gConfig, S_OK +from DIRAC import S_OK from DIRAC.Core.Base.AgentModule import AgentModule from DIRAC.Core.Utilities import Time from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter @@ -24,13 +29,6 @@ class StatesMonitoringAgent(AgentModule): """ - The specific agents must provide the following methods: - - initialize() for initial settings - - beginExecution() - - execute() - the main method called in the agent cycle - - endExecution() - - finalize() - the graceful exit of the method, this one is usually used - for the agent restart """ __summaryKeyFieldsMapping = ['Status', @@ -41,7 +39,8 @@ class StatesMonitoringAgent(AgentModule): 'JobType', 'ApplicationStatus', 'MinorStatus'] - __summaryDefinedFields = [('ApplicationStatus', 'unset'), ('MinorStatus', 'unset')] + __summaryDefinedFields = [('ApplicationStatus', 'unset'), + ('MinorStatus', 'unset')] __summaryValueFieldsMapping = ['Jobs', 'Reschedules'] __renameFieldsMapping = {'JobType': 'JobSplitType'} @@ -52,7 +51,7 @@ class StatesMonitoringAgent(AgentModule): monitoringReporter = None def initialize(self): - """ Standard constructor + """ Standard initialization """ self.jobDB = JobDB() @@ -60,7 +59,9 @@ def initialize(self): self.am_setOption("PollingTime", 900) self.messageQueue = self.am_getOption('MessageQueue', 'dirac.wmshistory') - self.monitoringReporter = MonitoringReporter(monitoringType="WMSHistory", failoverQueueName=self.messageQueue) + self.monitoringReporter = MonitoringReporter( + monitoringType="WMSHistory", + failoverQueueName=self.messageQueue) for field in self.__summaryKeyFieldsMapping: if field == 'User': @@ -74,24 +75,16 @@ def initialize(self): def execute(self): """ Main execution method """ - result = gConfig.getSections("/DIRAC/Setups") - if not result['OK']: - return result - validSetups = result['Value'] - self.log.info("Valid setups for this cycle are %s" % ", ".join(validSetups)) # Get the WMS Snapshot! result = self.jobDB.getSummarySnapshot(self.__jobDBFields) now = Time.dateTime() if not result['OK']: - self.log.error("Can't get the jobdb summary", result['Message']) + self.log.error("Can't get the JobDB summary", "%s: won't commit at this cycle" % result['Message']) else: values = result['Value'][1] - self.log.info("Start sending records!") + + self.log.info("Start sending records") for record in values: - recordSetup = record[0] - if recordSetup not in validSetups: - self.log.error("Setup %s is not valid" % recordSetup) - continue record = record[1:] rD = {} for fV in self.__summaryDefinedFields: @@ -104,10 +97,11 @@ def execute(self): rD[self.__summaryValueFieldsMapping[iP]] = int(record[iP]) rD['timestamp'] = int(Time.toEpoch(now)) self.monitoringReporter.addRecord(rD) + retVal = self.monitoringReporter.commit() if retVal['OK']: - self.log.info("The records are successfully sent to the Store!") + self.log.info("Records sent", "(%s)" % result['Value']) else: - self.log.warn("Faild to insert the records! It will be retried in the next iteration", retVal['Message']) + self.log.error("Failed to insert the records, it will be retried in the next iteration", retVal['Message']) return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index 22dd3a275af..cc3879bf94b 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -240,14 +240,18 @@ Agents SendPilotSubmissionAccounting = True } ##END + ##BEGIN StatesAccountingAgent StatesAccountingAgent { - PollingTime = 120 + # The backend used. Either "Accounting" or "Monitoring", or both + Backends = Accounting + # the name of the message queue used for the failover + MessageQueue = dirac.wmshistory } + ##END ##BEGIN StatesMonitoringAgent StatesMonitoringAgent { - PollingTime = 900 # the name of the message queue used for the failover MessageQueue = dirac.wmshistory } From 2d82e6729a9bd4924888a3a218a932f05dcb85f1 Mon Sep 17 00:00:00 2001 From: fstagni Date: Tue, 2 Feb 2021 18:16:07 +0100 Subject: [PATCH 4/4] fixed docs --- .../Systems/MonitoringSystem/index.rst | 68 ++++++++++++++----- .../Agent/StatesAccountingAgent.py | 48 ++++++------- .../Agent/StatesMonitoringAgent.py | 10 +-- tests/Jenkins/utilities.sh | 14 ++-- tests/System/random_files_creator.sh | 6 +- 5 files changed, 91 insertions(+), 55 deletions(-) diff --git a/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst b/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst index a5091e9e2c6..9d9f9370608 100644 --- a/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst +++ b/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst @@ -23,7 +23,8 @@ Install Elasticsearch ====================== This is not covered here, as installation and administration of ES are not part of DIRAC guide. -Just a note on the ES versions supported: ES7 and ES6 are supported, the support for ES5 is not assured. +Just a note on the ES versions supported: ES7 and ES6 are supported, the support for ES5 is not assured, +and the one for ES6 will be dropped in a future release. Configure the MonitoringSystem =============================== @@ -35,7 +36,7 @@ You can run your Elastic cluster even without authentication, or using User name - Host - Port -The User name and Password must be added to the local cfg file while the other can be added to the CS using the Configuration web application. +The *User* name and *Password* must be added to the local cfg file while the other can be added to the CS using the Configuration web application. You have to handle the ES secret information in a similar way to what is done for the other supported SQL databases, e.g. MySQL @@ -54,7 +55,7 @@ For example:: The following option can be set in `Systems/Monitoring//Databases/MonitoringDB`: *IndexPrefix*: Prefix used to prepend to indexes created in the ES instance. If this - is not present in the CS, the indexes are prefixed with the setup name. + is not present in the CS, the indices are prefixed with the setup name. For each monitoring types managed, the Period (how often a new index is created) can be defined with:: @@ -84,18 +85,18 @@ The given periods above are also the default periods in the code. Enable WMSHistory monitoring ============================ -You have to install the WorkloadManagemet/StatesMonitoringAgent. -This agent is used to collect information using the JobDB and send it to the Elasticsearch database. -If you install this agent, you can stop the StatesAccounting agent, that was reporting to the MySQL backend of the Accounting system. +You have to add ``Monitoring`` to the ``Backends`` option of WorkloadManagemet/StatesAccountingAgent. +If you do so, this agent will collect information using the JobDB and send it to the Elasticsearch database. +This same agent can also report to the MySQL backend of the Accounting system (which is in fact the default). -You can use RabbitMQ for failover. This is optional as the agent already has a failover mechanism. -You can configure RabbitMQ in the local dirac.cfg file where the agent is running:: +Optionally, you can use an MQ system (like RabbitMQ) for failover, even though the agent already has a simple failover mechanism. +You can configure the MQ in the local dirac.cfg file where the agent is running:: Resources { MQServices { - hostname (for example lbvobox10.cern.ch) + hostname.some.where { MQType = Stomp Port = 61613 @@ -117,24 +118,59 @@ You can configure RabbitMQ in the local dirac.cfg file where the agent is runnin Enable Component monitoring =========================== -You have to set DynamicMonitoring=True in the CS:: +You have to set ``DynamicMonitoring=True`` in the CS:: Systems { - Framework - { - SystemAdministrator + Framework + { + + { + Services + { + SystemAdministrator { - ... - DynamicMonitoring = True - } + ... + DynamicMonitoring = True + } } } + } + } .. image:: cs.png :align: center + +Enable RMS Monitoring +===================== + +In order to enable RMSMonitoring we need to set value of ``EnableActivityMonitoring`` flag to yes/true in the CS:: + + + Systems + { + RequestManagement + { + + { + Agents + { + RequestExecutingAgent + { + ... + EnableActivityMonitoring = True + } + } + } + } + } + + +or inside the ``/Operations`` section as a general flag. + + Accessing the Monitoring information ===================================== diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py index 9349e479cf7..bfb4d0c01c9 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py @@ -35,12 +35,12 @@ class StatesAccountingAgent(AgentModule): 'UserGroup', 'JobGroup', 'JobType', - 'ApplicationStatus', - 'MinorStatus'] + 'ApplicationStatus', + 'MinorStatus'] __summaryDefinedFields = [('ApplicationStatus', 'unset'), - ('MinorStatus', 'unset')] + ('MinorStatus', 'unset')] __summaryValueFieldsMapping = ['Jobs', - 'Reschedules'] + 'Reschedules'] __renameFieldsMapping = {'JobType': 'JobSplitType'} def initialize(self): @@ -58,8 +58,8 @@ def initialize(self): self.datastores['Accounting'] = DataStoreClient(retryGraceTime=900) if 'Monitoring' in self.backends: self.datastores['Monitoring'] = MonitoringReporter( - monitoringType="WMSHistory", - failoverQueueName=messageQueue) + monitoringType="WMSHistory", + failoverQueueName=messageQueue) self.__jobDBFields = [] for field in self.__summaryKeyFieldsMapping: @@ -88,36 +88,36 @@ def execute(self): record = record[1:] rD = {} for fV in self.__summaryDefinedFields: - rD[fV[0]] = fV[1] + rD[fV[0]] = fV[1] for iP in range(len(self.__summaryKeyFieldsMapping)): - fieldName = self.__summaryKeyFieldsMapping[iP] - rD[self.__renameFieldsMapping.get(fieldName, fieldName)] = record[iP] + fieldName = self.__summaryKeyFieldsMapping[iP] + rD[self.__renameFieldsMapping.get(fieldName, fieldName)] = record[iP] record = record[len(self.__summaryKeyFieldsMapping):] for iP in range(len(self.__summaryValueFieldsMapping)): - rD[self.__summaryValueFieldsMapping[iP]] = int(record[iP]) + rD[self.__summaryValueFieldsMapping[iP]] = int(record[iP]) for backend in self.datastores: - if backend.lower() == 'monitoring': - rD['timestamp'] = int(Time.toEpoch(now)) - self.datastores['Monitoring'].addRecord(rD) - - elif backend.lower() == 'accounting': - acWMS = WMSHistory() - acWMS.setStartTime(now) - acWMS.setEndTime(now) - acWMS.setValuesFromDict(rD) - retVal = acWMS.checkValues() + if backend.lower() == 'monitoring': + rD['timestamp'] = int(Time.toEpoch(now)) + self.datastores['Monitoring'].addRecord(rD) + + elif backend.lower() == 'accounting': + acWMS = WMSHistory() + acWMS.setStartTime(now) + acWMS.setEndTime(now) + acWMS.setValuesFromDict(rD) + retVal = acWMS.checkValues() if not retVal['OK']: - self.log.error("Invalid accounting record ", "%s -> %s" % (retVal['Message'], rD)) + self.log.error("Invalid accounting record ", "%s -> %s" % (retVal['Message'], rD)) else: - self.datastores['Accounting'].addRegister(acWMS) + self.datastores['Accounting'].addRegister(acWMS) for backend, datastore in self.datastores.items(): self.log.info("Committing to %s backend" % backend) result = datastore.commit() if not result['OK']: - self.log.error("Couldn't commit WMS history to %s" % backend, result['Message']) - return S_ERROR() + self.log.error("Couldn't commit WMS history to %s" % backend, result['Message']) + return S_ERROR() self.log.verbose("Done committing to %s backend" % backend) return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StatesMonitoringAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StatesMonitoringAgent.py index f7ddad71bb3..e756bb7eae8 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StatesMonitoringAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StatesMonitoringAgent.py @@ -40,7 +40,7 @@ class StatesMonitoringAgent(AgentModule): 'ApplicationStatus', 'MinorStatus'] __summaryDefinedFields = [('ApplicationStatus', 'unset'), - ('MinorStatus', 'unset')] + ('MinorStatus', 'unset')] __summaryValueFieldsMapping = ['Jobs', 'Reschedules'] __renameFieldsMapping = {'JobType': 'JobSplitType'} @@ -60,8 +60,8 @@ def initialize(self): self.messageQueue = self.am_getOption('MessageQueue', 'dirac.wmshistory') self.monitoringReporter = MonitoringReporter( - monitoringType="WMSHistory", - failoverQueueName=self.messageQueue) + monitoringType="WMSHistory", + failoverQueueName=self.messageQueue) for field in self.__summaryKeyFieldsMapping: if field == 'User': @@ -100,8 +100,8 @@ def execute(self): retVal = self.monitoringReporter.commit() if retVal['OK']: - self.log.info("Records sent", "(%s)" % result['Value']) + self.log.info("Records sent", "(%s)" % result['Value']) else: - self.log.error("Failed to insert the records, it will be retried in the next iteration", retVal['Message']) + self.log.error("Failed to insert the records, it will be retried in the next iteration", retVal['Message']) return S_OK() diff --git a/tests/Jenkins/utilities.sh b/tests/Jenkins/utilities.sh index 8e1bca3218f..b1d3731fa95 100644 --- a/tests/Jenkins/utilities.sh +++ b/tests/Jenkins/utilities.sh @@ -486,12 +486,12 @@ generateCA() { # Generate the CA certificate openssl req -config openssl_config_ca.cnf \ - -key ca.key.pem \ - -new -x509 \ - -days 7300 \ - -sha256 \ - -extensions v3_ca \ - -out ca.cert.pem + -key ca.key.pem \ + -new -x509 \ + -days 7300 \ + -sha256 \ + -extensions v3_ca \ + -out ca.cert.pem # Copy the CA to the list of trusted CA cp ca.cert.pem "${SERVERINSTALLDIR}/etc/grid-security/certificates/" @@ -864,7 +864,7 @@ diracAgents(){ python "${TESTCODE}/DIRAC/tests/Jenkins/dirac-cfg-add-option.py" "agent" "$agent" echo "==> calling dirac-agent $agent -o MaxCycles=1 ${DEBUG}" if ! dirac-agent "$agent" -o MaxCycles=1 "${DEBUG}"; then - echo 'ERROR: dirac-agent failed' >&2 + echo 'ERROR: dirac-agent failed' >&2 exit 1 fi fi diff --git a/tests/System/random_files_creator.sh b/tests/System/random_files_creator.sh index dbae937b4ee..02f42413eef 100755 --- a/tests/System/random_files_creator.sh +++ b/tests/System/random_files_creator.sh @@ -53,14 +53,14 @@ if [[ "${#}" -gt 0 ]]; then -p=*|--Path=*) temporaryPath="${i#*=}" - if [[ ! -d "${temporaryPath}" ]]; then - mkdir -p "${temporaryPath}" + if [[ ! -d "${temporaryPath}" ]]; then + mkdir -p "${temporaryPath}" fi shift # past argument=value ;; *) - echo -e "${helpmessage}" + echo -e "${helpmessage}" exit 0 # unknown option ;;