From d0a32a7321a5c1cfa5e13d9079a4bf0e1e1de677 Mon Sep 17 00:00:00 2001 From: Andre Sailer Date: Wed, 13 Jun 2018 10:06:20 +0200 Subject: [PATCH 01/13] TransferClient: remove weird ^M --- Core/DISET/TransferClient.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Core/DISET/TransferClient.py b/Core/DISET/TransferClient.py index fe482f02b2e..611f74fcd18 100755 --- a/Core/DISET/TransferClient.py +++ b/Core/DISET/TransferClient.py @@ -150,7 +150,8 @@ def sendBulk( self, fileList, bulkId, token = "", compress = True, bulkSize = -1 retVal = fileHelper.bulkToNetwork( fileList, compress, onthefly ) if not retVal[ 'OK' ]: return retVal - retVal = transport.receiveData() return retVal + retVal = transport.receiveData() + return retVal finally: self._disconnect( trid ) From 0e415b395102114c9a6a64571edecec32cbc1b2c Mon Sep 17 00:00:00 2001 From: Andre Sailer Date: Wed, 13 Jun 2018 15:08:47 +0200 Subject: [PATCH 02/13] TaskManagerAgentBase: have to define operation variable before try/except operation is used in the except block --- TransformationSystem/Agent/TaskManagerAgentBase.py | 1 + 1 file changed, 1 insertion(+) diff --git a/TransformationSystem/Agent/TaskManagerAgentBase.py b/TransformationSystem/Agent/TaskManagerAgentBase.py index c72d026db14..bb676c9b2ae 100644 --- a/TransformationSystem/Agent/TaskManagerAgentBase.py +++ b/TransformationSystem/Agent/TaskManagerAgentBase.py @@ -260,6 +260,7 @@ def _execute(self, threadID): # Each thread will have its own clients clients = self._getClients() method = '_execute' + operation = 'None' while True: startTime = time.time() From 54393ab260fc47a2ee92bb4ac94ca6a0e0a02751 Mon Sep 17 00:00:00 2001 From: Andre Sailer Date: Wed, 13 Jun 2018 10:08:05 +0200 Subject: [PATCH 03/13] RequestTaskAgent: drop default value for shifterProxy --- TransformationSystem/Agent/RequestTaskAgent.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/TransformationSystem/Agent/RequestTaskAgent.py b/TransformationSystem/Agent/RequestTaskAgent.py index 532cf3fc0cf..2ebdc45999c 100644 --- a/TransformationSystem/Agent/RequestTaskAgent.py +++ b/TransformationSystem/Agent/RequestTaskAgent.py @@ -32,8 +32,6 @@ def initialize(self): if not res['OK']: return res - self.am_setOption('shifterProxy', 'DataManager') - # clients self.taskManager = RequestTasks(transClient=self.transClient) From aac40ec9bd13e8a9c27cf827a01f5cefe3005508 Mon Sep 17 00:00:00 2001 From: Andre Sailer Date: Thu, 31 May 2018 16:51:03 +0200 Subject: [PATCH 04/13] TransformationSystem: make things multiVO capable TaskManagerAgentBase: add option ShifterCredentials to set the credentials to use for all submissions, this is single VO only TaskManagerAgentBase: WorkflowTasks/RequestTasks: pass ownerDN and ownerGroup parameter to all the submission clients if using shifterProxy ownerDN and ownerGroup are None thus reproducing the original behaviour TaskManagerAgentBase: refactor adding operations for transformation to separate function to ensure presence of Owner/DN/Group in dict entries --- .../Agent/RequestTaskAgent.py | 7 + .../Agent/TaskManagerAgentBase.py | 123 +++++++++++------- .../test/Test_Agent_TransformationSystem.py | 3 +- TransformationSystem/Client/TaskManager.py | 22 +++- WorkloadManagementSystem/Client/WMSClient.py | 44 +++---- 5 files changed, 124 insertions(+), 75 deletions(-) diff --git a/TransformationSystem/Agent/RequestTaskAgent.py b/TransformationSystem/Agent/RequestTaskAgent.py index 2ebdc45999c..092c0457b9b 100644 --- a/TransformationSystem/Agent/RequestTaskAgent.py +++ b/TransformationSystem/Agent/RequestTaskAgent.py @@ -50,3 +50,10 @@ def _getClients(self): threadTaskManager = RequestTasks() res.update({'TaskManager': threadTaskManager}) return res + + def _getPerTransformationClients(self, ownerDN=None, ownerGroup=None): + """Set the clients for per transformation credentials.""" + res = super(RequestTaskAgent, self)._getPerTransformationClients(ownerDN=ownerDN, ownerGroup=ownerGroup) + threadTaskManager = RequestTasks(ownerDN=ownerDN, ownerGroup=ownerGroup) + res.update({'TaskManager': threadTaskManager}) + return res diff --git a/TransformationSystem/Agent/TaskManagerAgentBase.py b/TransformationSystem/Agent/TaskManagerAgentBase.py index bb676c9b2ae..2f324bd9a08 100644 --- a/TransformationSystem/Agent/TaskManagerAgentBase.py +++ b/TransformationSystem/Agent/TaskManagerAgentBase.py @@ -14,6 +14,8 @@ from DIRAC import S_OK +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations +from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername, getUsernameForDN from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor from DIRAC.Core.Base.AgentModule import AgentModule from DIRAC.Core.Utilities.ThreadPool import ThreadPool @@ -47,10 +49,6 @@ def __init__(self, *args, **kwargs): self.tasksPerLoop = 50 - self.owner = '' - self.ownerGroup = '' - self.ownerDN = '' - self.pluginLocation = '' self.bulkSubmissionFlag = False @@ -81,6 +79,9 @@ def initialize(self): # Bulk submission flag self.bulkSubmissionFlag = self.am_getOption('BulkSubmission', False) + # Shifter credentials to use, could replace the use of shifterProxy eventually + self.credentials = self.am_getOption('ShifterCredentials', self.am_getOption('shifterProxy', None)) + # setting up the threading maxNumberOfThreads = self.am_getOption('maxNumberOfThreads', 15) threadPool = ThreadPool(maxNumberOfThreads, maxNumberOfThreads) @@ -110,6 +111,27 @@ def execute(self): """ operationsOnTransformationDict = {} + owner, ownerGroup, ownerDN = None, None, None + # getting the credentials for submission + if getProxyInfo(False, False)['OK']: # there is a shifterProxy + res = getProxyInfo(False, False) + proxyInfo = res['Value'] # must be there + owner = proxyInfo['username'] + ownerGroup = proxyInfo['group'] + ownerDN = proxyInfo['identity'] + self.log.info("ShifterProxy: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup)) + elif self.credentials: + resCred = Operations().getOptionsDict("/Shifter/%s" % self.credentials) + if resCred['OK']: + owner = resCred['Value']['User'] + ownerGroup = resCred['Value']['Group'] + ownerDN = getDNForUsername(owner)['Value'] + self.log.info("Cred: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup)) + else: + self.log.error("Cred: Failed to find shifter credentials", self.credentials) + return resCred + else: + self.log.info("Using per Transformation Credentials!") # Determine whether the task status is to be monitored and updated enableTaskMonitor = self.am_getOption('MonitorTasks', '') @@ -123,10 +145,8 @@ def execute(self): if not transformations['OK']: self.log.warn("Could not select transformations:", transformations['Message']) else: - transformationIDsAndBodies = dict((transformation['TransformationID'], - transformation['Body']) for transformation in transformations['Value']) - for transID, body in transformationIDsAndBodies.iteritems(): - operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['updateTaskStatus']} + self._addOperationForTransformations(operationsOnTransformationDict, 'updateTaskStatus', transformations, + owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN) # Determine whether the task files status is to be monitored and updated enableFileMonitor = self.am_getOption('MonitorFiles', '') @@ -140,13 +160,8 @@ def execute(self): if not transformations['OK']: self.log.warn("Could not select transformations:", transformations['Message']) else: - transformationIDsAndBodies = dict((transformation['TransformationID'], - transformation['Body']) for transformation in transformations['Value']) - for transID, body in transformationIDsAndBodies.iteritems(): - if transID in operationsOnTransformationDict: - operationsOnTransformationDict[transID]['Operations'].append('updateFileStatus') - else: - operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['updateFileStatus']} + self._addOperationForTransformations(operationsOnTransformationDict, 'updateFileStatus', transformations, + owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN) # Determine whether the checking of reserved tasks is to be performed enableCheckReserved = self.am_getOption('CheckReserved', '') @@ -160,29 +175,14 @@ def execute(self): if not transformations['OK']: self.log.warn("Could not select transformations:", transformations['Message']) else: - transformationIDsAndBodies = dict((transformation['TransformationID'], - transformation['Body']) for transformation in transformations['Value']) - for transID, body in transformationIDsAndBodies.iteritems(): - if transID in operationsOnTransformationDict: - operationsOnTransformationDict[transID]['Operations'].append('checkReservedTasks') - else: - operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['checkReservedTasks']} + self._addOperationForTransformations(operationsOnTransformationDict, 'checkReservedTasks', transformations, + owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN) # Determine whether the submission of tasks is to be performed enableSubmission = self.am_getOption('SubmitTasks', '') if not enableSubmission: self.log.verbose("Submission of tasks is disabled. To enable it, create the 'SubmitTasks' option") else: - # getting the credentials for submission - res = getProxyInfo(False, False) - if not res['OK']: - self.log.error("Failed to determine credentials for submission", res['Message']) - return res - proxyInfo = res['Value'] - self.owner = proxyInfo['username'] - self.ownerGroup = proxyInfo['group'] - self.ownerDN = proxyInfo['identity'] - self.log.info("Tasks will be submitted with the credentials %s:%s" % (self.owner, self.ownerGroup)) # Get the transformations for which the check of reserved tasks have to be performed status = self.am_getOption('SubmitTransformationStatus', self.am_getOption('SubmitStatus', ['Active', 'Completing'])) @@ -192,13 +192,10 @@ def execute(self): else: # Get the transformations which should be submitted self.tasksPerLoop = self.am_getOption('TasksPerLoop', self.tasksPerLoop) - transformationIDsAndBodies = dict((transformation['TransformationID'], - transformation['Body']) for transformation in transformations['Value']) - for transID, body in transformationIDsAndBodies.iteritems(): - if transID in operationsOnTransformationDict: - operationsOnTransformationDict[transID]['Operations'].append('submitTasks') - else: - operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['submitTasks']} + self._addOperationForTransformations(operationsOnTransformationDict, 'submitTasks', transformations, + owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN) + + self._fillTheQueue(operationsOnTransformationDict) @@ -254,11 +251,24 @@ def _getClients(self): return {'TransformationClient': threadTransformationClient, 'TaskManager': threadTaskManager} + def _getPerTransformationClients(self, ownerDN=None, ownerGroup=None): + """Set the clients for per transformation credentials. + + Returns the clients used in the threads - this is another function that should be extended. + The clients provided here are defaults, and should be adapted + """ + threadTransformationClient = TransformationClient() + threadTaskManager = WorkflowTasks(ownerDN=ownerDN, ownerGroup=ownerGroup) + threadTaskManager.pluginLocation = self.pluginLocation + + return {'TransformationClient': threadTransformationClient, + 'TaskManager': threadTaskManager} + def _execute(self, threadID): """ This is what runs inside the threads, in practice this is the function that does the real stuff """ - # Each thread will have its own clients - clients = self._getClients() + # Each thread will have its own clients if we use credentials/shifterProxy + clients = self._getClients() if self.credentials else None method = '_execute' operation = 'None' @@ -275,6 +285,9 @@ def _execute(self, threadID): self._logWarn("Got a transf not in transInQueue...?", method=method, transID=transID) break + if not self.credentials: + ownerDN, group = transIDOPBody[transID]['OwnerDN'], transIDOPBody[transID]['OwnerGroup'] + clients = self._getPerTransformationClients(ownerDN=ownerDN, ownerGroup=group) self.transInThread[transID] = ' [Thread%d] [%s] ' % (threadID, str(transID)) self._logInfo("Start processing transformation", method=method, transID=transID) clients['TaskManager'].transInThread = self.transInThread @@ -512,6 +525,9 @@ def submitTasks(self, transIDOPBody, clients): """ transID = transIDOPBody.keys()[0] transBody = transIDOPBody[transID]['Body'] + owner = transIDOPBody[transID]['Owner'] + ownerGroup = transIDOPBody[transID]['OwnerGroup'] + ownerDN = transIDOPBody[transID]['OwnerDN'] method = 'submitTasks' # Get all tasks to submit @@ -533,9 +549,9 @@ def submitTasks(self, transIDOPBody, clients): # Prepare tasks preparedTransformationTasks = clients['TaskManager'].prepareTransformationTasks(transBody, tasks, - self.owner, - self.ownerGroup, - self.ownerDN, + owner, + ownerGroup, + ownerDN, self.bulkSubmissionFlag) self._logDebug("prepareTransformationTasks return value:", preparedTransformationTasks, method=method, transID=transID) @@ -563,3 +579,22 @@ def submitTasks(self, transIDOPBody, clients): return res return S_OK() + + @staticmethod + def _addOperationForTransformations(operationsOnTransformationDict, operation, transformations, + owner=None, ownerGroup=None, ownerDN=None): + """Fill the operationsOnTransformationDict""" + transformationIDsAndBodies = [(transformation['TransformationID'], + transformation['Body'], + transformation['AuthorDN'], + transformation['AuthorGroup'], + ) for transformation in transformations['Value']] + for transID, body, t_ownerDN, t_ownerGroup in transformationIDsAndBodies: + if transID in operationsOnTransformationDict: + operationsOnTransformationDict[transID]['Operations'].append(operation) + else: + operationsOnTransformationDict[transID] = {'Body': body, 'Operations': [operation], + 'Owner': owner if owner else getUsernameForDN(t_ownerDN)['Value'], + 'OwnerGroup': ownerGroup if owner else t_ownerGroup, + 'OwnerDN': ownerDN if owner else t_ownerDN + } diff --git a/TransformationSystem/Agent/test/Test_Agent_TransformationSystem.py b/TransformationSystem/Agent/test/Test_Agent_TransformationSystem.py index 192f37ca8e7..3d06758de29 100644 --- a/TransformationSystem/Agent/test/Test_Agent_TransformationSystem.py +++ b/TransformationSystem/Agent/test/Test_Agent_TransformationSystem.py @@ -238,7 +238,8 @@ def test_checkReservedTasks( self ): def test_submitTasks( self ): clients = {'TransformationClient':self.tc_mock, 'TaskManager':self.tm_mock} - transIDOPBody = {1:{'Operations':['op1', 'op2'], 'Body':'veryBigBody'}} + transIDOPBody = {1: {'Operations': ['op1', 'op2'], 'Body': 'veryBigBody', + 'Owner': 'prodMan', 'OwnerDN': '/ca=man/user=prodMan', 'OwnerGroup': 'prodMans'}} # errors getting self.tc_mock.getTasksToSubmit.return_value = {'OK': False, 'Message': 'a mess'} diff --git a/TransformationSystem/Client/TaskManager.py b/TransformationSystem/Client/TaskManager.py index e71d80ddbab..9c048ca4743 100644 --- a/TransformationSystem/Client/TaskManager.py +++ b/TransformationSystem/Client/TaskManager.py @@ -1,4 +1,4 @@ -""" TaskManager contains WorkflowsTasks and RequestTasks modules, for managing jobs and requests tasks +""" TaskManager contains WorkflowTasks and RequestTasks modules, for managing jobs and requests tasks """ __RCSID__ = "$Id$" @@ -108,7 +108,9 @@ class RequestTasks(TaskBase): """ def __init__(self, transClient=None, logger=None, requestClient=None, - requestClass=None, requestValidator=None): + requestClass=None, requestValidator=None, + ownerDN=None, ownerGroup=None, + ): """ c'tor the requestClass is by default Request. @@ -120,9 +122,13 @@ def __init__(self, transClient=None, logger=None, requestClient=None, logger = gLogger.getSubLogger('RequestTasks') super(RequestTasks, self).__init__(transClient, logger) + useCertificates = True if (bool(ownerDN) and bool(ownerGroup)) else None if not requestClient: - self.requestClient = ReqClient() + self.requestClient = ReqClient(useCertificates=useCertificates, + delegatedDN=ownerDN, + delegatedGroup=ownerGroup, + ) else: self.requestClient = requestClient @@ -417,7 +423,9 @@ class WorkflowTasks(TaskBase): """ def __init__(self, transClient=None, logger=None, submissionClient=None, jobMonitoringClient=None, - outputDataModule=None, jobClass=None, opsH=None, destinationPlugin=None): + outputDataModule=None, jobClass=None, opsH=None, destinationPlugin=None, + ownerDN=None, ownerGroup=None, + ): """ Generates some default objects. jobClass is by default "DIRAC.Interfaces.API.Job.Job". An extension of it also works: VOs can pass in their job class extension, if present @@ -428,8 +436,12 @@ def __init__(self, transClient=None, logger=None, submissionClient=None, jobMoni super(WorkflowTasks, self).__init__(transClient, logger) + useCertificates = True if (bool(ownerDN) and bool(ownerGroup)) else None if not submissionClient: - self.submissionClient = WMSClient() + self.submissionClient = WMSClient(useCertificates=useCertificates, + delegatedDN=ownerDN, + delegatedGroup=ownerGroup, + ) else: self.submissionClient = submissionClient diff --git a/WorkloadManagementSystem/Client/WMSClient.py b/WorkloadManagementSystem/Client/WMSClient.py index 8728a767a83..f151cfd9cd5 100755 --- a/WorkloadManagementSystem/Client/WMSClient.py +++ b/WorkloadManagementSystem/Client/WMSClient.py @@ -30,15 +30,17 @@ class WMSClient(object): """ def __init__(self, jobManagerClient=None, sbRPCClient=None, sbTransferClient=None, - useCertificates=False, timeout=600): + useCertificates=False, timeout=600, delegatedDN=None, delegatedGroup=None): """ WMS Client constructor Here we also initialize the needed clients and connections """ self.useCertificates = useCertificates + self.delegatedDN = delegatedDN + self.delegatedGroup = delegatedGroup self.timeout = timeout - self.jobManager = jobManagerClient + self._jobManager = jobManagerClient self.operationsHelper = Operations() self.sandboxClient = None if sbRPCClient and sbTransferClient: @@ -46,6 +48,18 @@ def __init__(self, jobManagerClient=None, sbRPCClient=None, sbTransferClient=Non transferClient=sbTransferClient, useCertificates=useCertificates) + @property + def jobManager(self): + if not self._jobManager: + self._jobManager = RPCClient('WorkloadManagement/JobManager', + useCertificates=self.useCertificates, + delegatedDN=self.delegatedDN, + delegatedGroup=self.delegatedGroup, + timeout=self.timeout) + + return self._jobManager + + ############################################################################### def __getInputSandboxEntries(self, classAdJob): @@ -109,7 +123,9 @@ def __uploadInputSandbox(self, classAdJob, jobDescriptionObject=None): if okFiles: if not self.sandboxClient: - self.sandboxClient = SandboxStoreClient(useCertificates=self.useCertificates) + self.sandboxClient = SandboxStoreClient(useCertificates=self.useCertificates, + delegatedDN=self.delegatedDN, + delegatedGroup=self.delegatedGroup) result = self.sandboxClient.uploadFilesAsSandbox(okFiles) if not result['OK']: return result @@ -159,12 +175,6 @@ def submitJob(self, jdl, jobDescriptionObject=None): return result nJobs = result['Value'] parametricJob = nJobs > 0 - - if not self.jobManager: - self.jobManager = RPCClient('WorkloadManagement/JobManager', - useCertificates=self.useCertificates, - timeout=self.timeout) - result = self.jobManager.submitJob(classAdJob.asJDL()) if parametricJob: @@ -200,38 +210,22 @@ def killJob(self, jobID): """ Kill running job. jobID can be an integer representing a single DIRAC job ID or a list of IDs """ - if not self.jobManager: - self.jobManager = RPCClient('WorkloadManagement/JobManager', - useCertificates=self.useCertificates, - timeout=self.timeout) return self.jobManager.killJob(jobID) def deleteJob(self, jobID): """ Delete job(s) from the WMS Job database. jobID can be an integer representing a single DIRAC job ID or a list of IDs """ - if not self.jobManager: - self.jobManager = RPCClient('WorkloadManagement/JobManager', - useCertificates=self.useCertificates, - timeout=self.timeout) return self.jobManager.deleteJob(jobID) def rescheduleJob(self, jobID): """ Reschedule job(s) in WMS Job database. jobID can be an integer representing a single DIRAC job ID or a list of IDs """ - if not self.jobManager: - self.jobManager = RPCClient('WorkloadManagement/JobManager', - useCertificates=self.useCertificates, - timeout=self.timeout) return self.jobManager.rescheduleJob(jobID) def resetJob(self, jobID): """ Reset job(s) in WMS Job database. jobID can be an integer representing a single DIRAC job ID or a list of IDs """ - if not self.jobManager: - self.jobManager = RPCClient('WorkloadManagement/JobManager', - useCertificates=self.useCertificates, - timeout=self.timeout) return self.jobManager.resetJob(jobID) From 4e9a652976b39ba7adc36ae2f8430469c0d641b8 Mon Sep 17 00:00:00 2001 From: Andre Sailer Date: Thu, 14 Jun 2018 17:05:38 +0200 Subject: [PATCH 05/13] TaskManagerAgentBase: name getProxyInfo Parameters; store result --- TransformationSystem/Agent/TaskManagerAgentBase.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/TransformationSystem/Agent/TaskManagerAgentBase.py b/TransformationSystem/Agent/TaskManagerAgentBase.py index 2f324bd9a08..2b563bc4876 100644 --- a/TransformationSystem/Agent/TaskManagerAgentBase.py +++ b/TransformationSystem/Agent/TaskManagerAgentBase.py @@ -113,9 +113,9 @@ def execute(self): operationsOnTransformationDict = {} owner, ownerGroup, ownerDN = None, None, None # getting the credentials for submission - if getProxyInfo(False, False)['OK']: # there is a shifterProxy - res = getProxyInfo(False, False) - proxyInfo = res['Value'] # must be there + resProxy = getProxyInfo(proxy=False, disableVOMS=False) + if resProxy['OK']: # there is a shifterProxy + proxyInfo = resProxy['Value'] owner = proxyInfo['username'] ownerGroup = proxyInfo['group'] ownerDN = proxyInfo['identity'] From 332fb05268cbc0a00dbf57585f837dfce95ee9de Mon Sep 17 00:00:00 2001 From: Andre Sailer Date: Mon, 18 Jun 2018 10:45:47 +0200 Subject: [PATCH 06/13] TaskManagerAgents: fix logic for non-proxy based general credentials execute is running after _execute(?), so we need to find ShifterCredentials in initialize --- .../Agent/RequestTaskAgent.py | 4 +- .../Agent/TaskManagerAgentBase.py | 49 +++++++++++++------ 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/TransformationSystem/Agent/RequestTaskAgent.py b/TransformationSystem/Agent/RequestTaskAgent.py index 092c0457b9b..ce2b9f33aca 100644 --- a/TransformationSystem/Agent/RequestTaskAgent.py +++ b/TransformationSystem/Agent/RequestTaskAgent.py @@ -51,9 +51,9 @@ def _getClients(self): res.update({'TaskManager': threadTaskManager}) return res - def _getPerTransformationClients(self, ownerDN=None, ownerGroup=None): + def _getDelegatedClients(self, ownerDN=None, ownerGroup=None): """Set the clients for per transformation credentials.""" - res = super(RequestTaskAgent, self)._getPerTransformationClients(ownerDN=ownerDN, ownerGroup=ownerGroup) + res = super(RequestTaskAgent, self)._getDelegatedClients(ownerDN=ownerDN, ownerGroup=ownerGroup) threadTaskManager = RequestTasks(ownerDN=ownerDN, ownerGroup=ownerGroup) res.update({'TaskManager': threadTaskManager}) return res diff --git a/TransformationSystem/Agent/TaskManagerAgentBase.py b/TransformationSystem/Agent/TaskManagerAgentBase.py index 2b563bc4876..cad5f06515c 100644 --- a/TransformationSystem/Agent/TaskManagerAgentBase.py +++ b/TransformationSystem/Agent/TaskManagerAgentBase.py @@ -12,7 +12,7 @@ import datetime from Queue import Queue -from DIRAC import S_OK +from DIRAC import S_OK, S_ERROR from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername, getUsernameForDN @@ -80,8 +80,12 @@ def initialize(self): self.bulkSubmissionFlag = self.am_getOption('BulkSubmission', False) # Shifter credentials to use, could replace the use of shifterProxy eventually - self.credentials = self.am_getOption('ShifterCredentials', self.am_getOption('shifterProxy', None)) - + self.shifterProxy = self.am_getOption('shifterProxy', None) + self.credentials = self.am_getOption('ShifterCredentials', None) + self.credTuple = (None, None, None) + resCred = self.__getCredentials() + if not resCred['OK']: + return resCred # setting up the threading maxNumberOfThreads = self.am_getOption('maxNumberOfThreads', 15) threadPool = ThreadPool(maxNumberOfThreads, maxNumberOfThreads) @@ -121,15 +125,7 @@ def execute(self): ownerDN = proxyInfo['identity'] self.log.info("ShifterProxy: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup)) elif self.credentials: - resCred = Operations().getOptionsDict("/Shifter/%s" % self.credentials) - if resCred['OK']: - owner = resCred['Value']['User'] - ownerGroup = resCred['Value']['Group'] - ownerDN = getDNForUsername(owner)['Value'] - self.log.info("Cred: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup)) - else: - self.log.error("Cred: Failed to find shifter credentials", self.credentials) - return resCred + owner, ownerGroup, ownerDN = self.credTuple else: self.log.info("Using per Transformation Credentials!") @@ -251,7 +247,7 @@ def _getClients(self): return {'TransformationClient': threadTransformationClient, 'TaskManager': threadTaskManager} - def _getPerTransformationClients(self, ownerDN=None, ownerGroup=None): + def _getDelegatedClients(self, ownerDN=None, ownerGroup=None): """Set the clients for per transformation credentials. Returns the clients used in the threads - this is another function that should be extended. @@ -268,7 +264,9 @@ def _execute(self, threadID): """ This is what runs inside the threads, in practice this is the function that does the real stuff """ # Each thread will have its own clients if we use credentials/shifterProxy - clients = self._getClients() if self.credentials else None + clients = self._getClients() if self.shifterProxy else \ + self._getDelegatedClients(ownerGroup=self.credTuple[1], ownerDN=self.credTuple[2]) if self.credentials \ + else None method = '_execute' operation = 'None' @@ -285,9 +283,9 @@ def _execute(self, threadID): self._logWarn("Got a transf not in transInQueue...?", method=method, transID=transID) break - if not self.credentials: + if not (self.credentials or self.shifterProxy): ownerDN, group = transIDOPBody[transID]['OwnerDN'], transIDOPBody[transID]['OwnerGroup'] - clients = self._getPerTransformationClients(ownerDN=ownerDN, ownerGroup=group) + clients = self._getDelegatedClients(ownerDN=ownerDN, ownerGroup=group) self.transInThread[transID] = ' [Thread%d] [%s] ' % (threadID, str(transID)) self._logInfo("Start processing transformation", method=method, transID=transID) clients['TaskManager'].transInThread = self.transInThread @@ -598,3 +596,22 @@ def _addOperationForTransformations(operationsOnTransformationDict, operation, t 'OwnerGroup': ownerGroup if owner else t_ownerGroup, 'OwnerDN': ownerDN if owner else t_ownerDN } + + def __getCredentials(self): + """Get the credentials to use if ShifterCredentials are set, otherwise do nothing. + + This function fills the self.credTuple tuple. + """ + if not self.credentials: + return S_OK() + resCred = Operations().getOptionsDict("/Shifter/%s" % self.credentials) + if not resCred['OK']: + self.log.error("Cred: Failed to find shifter credentials", self.credentials) + return resCred + owner = resCred['Value']['User'] + ownerGroup = resCred['Value']['Group'] + # returns a list + ownerDN = getDNForUsername(owner)['Value'][0] + self.credTuple = (owner, ownerGroup, ownerDN) + self.log.info("Cred: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup)) + return S_OK() From b4e8f233db11de02130522a84c1593048055f497 Mon Sep 17 00:00:00 2001 From: Andre Sailer Date: Wed, 20 Jun 2018 10:30:22 +0200 Subject: [PATCH 07/13] Docs: TransformationSystem add MultiVO setup options --- .../Systems/Transformation/index.rst | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/docs/source/AdministratorGuide/Systems/Transformation/index.rst b/docs/source/AdministratorGuide/Systems/Transformation/index.rst index a61ffd60aa4..4bb5082067a 100644 --- a/docs/source/AdministratorGuide/Systems/Transformation/index.rst +++ b/docs/source/AdministratorGuide/Systems/Transformation/index.rst @@ -617,3 +617,46 @@ Actions on transformations * **Flush:** It has a meaning only depending on the plugin used, for example the 'BySize' plugin, used *e.g.* for merging productions, creates a task if there are enough files in input to have at least a certain size: 'flush' will make the 'BySize' plugin to ignore such requirement. When a transformation is flushed also its replica cache will be re-created (instead of after 24 hours). * **Complete:** The transformation can be archived by the TransformationCleaningAgent. Archived means that the data produced stay, but not the entries in the TransformationDB * **Clean:** The transformation is cleaned by the TransformationCleaningAgent: jobs are killed and removed from WMS. Produced and stored files are removed from the Storage Elements, when "OutputDirectories" parameter is set for the transformation. + + +---------------------- +Multi VO Configuration +---------------------- + +.. versionadded:: v6r21 + +There are two possibilities to configure the agents of the transformation system for the use in a multi VO installation. + + - Use the same WorkflowTaskAgent and RequestTaskAgents for multiple VOs, no + **shifterProxy** or **ShifterCredential** must be set for these agents. If + neither of those options are set the credentials of the owner of the + transformations are used to submit Jobs or Requests. + + - Use a set of WorkflowTaskAgent and RequestTaskAgent for each VO. This + requires that each VO uses a distinct set of Transformation Types, + e.g. MCSimulation_BigVO. This allows one to set VO specific + shifterProxies. This setup is recommended to create a dedicated + WorkflowTaskAgent or RequestTaskAgent for a VO that will create a large + number of jobs or requests. + + It is possible to mix the two configurations and have one WorkflowTaskAgent + treat transformations of many smaller VOs, while installing a dedicated + instance for the larger ones:: + + WorkflowTaskAgent + { + ... + TransType = MCSimulation + TransType += MCReconstruction + ... + #No shifterProxy / ShifterCredentials + } + WorkflowTaskAgent-BigVO + { + ... + TransType = MCSimulation_BigVO + TransType += MCReconstruction_BigVO + Module = WorkflowTaskAgent + ... + #shifterProxy / ShifterCredentials are optional + } From 143aebdc12e907123a849edbb16bf3163518816f Mon Sep 17 00:00:00 2001 From: Andre Sailer Date: Wed, 20 Jun 2018 13:55:42 +0200 Subject: [PATCH 08/13] WTA: Docs: move configuration parameter table to python file --- .../Agent/WorkflowTaskAgent.py | 42 ++++++++++++++++++- .../Systems/Transformation/Agents/index.rst | 4 +- .../Agents/workflowtaskagent.rst | 25 ----------- .../Systems/Transformation/index.rst | 5 ++- 4 files changed, 47 insertions(+), 29 deletions(-) diff --git a/TransformationSystem/Agent/WorkflowTaskAgent.py b/TransformationSystem/Agent/WorkflowTaskAgent.py index 22233021210..e6bf9d16697 100755 --- a/TransformationSystem/Agent/WorkflowTaskAgent.py +++ b/TransformationSystem/Agent/WorkflowTaskAgent.py @@ -1,5 +1,43 @@ -""" The Workflow Task Agent takes workflow tasks created in the - transformation database and submits to the workload management system. +"""The Workflow Task Agent takes workflow tasks created in the transformation +database and submits to the workload management system. + + +The WorkflowTaskAgent takes workflow tasks created in the TransformationDB and submits them to the +WMS. Since version v6r13 there are some new capabilities in the form of TaskManager plugins. + ++------------------------------+-------------------------------------------+-------------------------------------+ +| **Name** | **Description** | **Example** | ++------------------------------+-------------------------------------------+-------------------------------------+ +| *TransType* | | | ++------------------------------+-------------------------------------------+-------------------------------------+ +| *TaskUpdateStatus* | | | ++------------------------------+-------------------------------------------+-------------------------------------+ +| *shifterProxy* | Use a dedicated proxy to submit jobs to | | +| | the WMS | | ++------------------------------+-------------------------------------------+-------------------------------------+ +| *ShifterCredentials* | Use delegated credentials, same values as | | +| | for shifterProxy, but there will not be | | +| | any actual proxy used. (New in v6r21) | | ++------------------------------+-------------------------------------------+-------------------------------------+ +| *CheckReserved* | | | ++------------------------------+-------------------------------------------+-------------------------------------+ +| *MonitorFiles* | | | ++------------------------------+-------------------------------------------+-------------------------------------+ +| *SubmitTasks* | | | ++------------------------------+-------------------------------------------+-------------------------------------+ +| *TasksPerLoop* | | | ++------------------------------+-------------------------------------------+-------------------------------------+ +| *MonitorTasks* | | | ++------------------------------+-------------------------------------------+-------------------------------------+ + +.. versionadded:: v6r21 + + It is possible to run the WorkflowTaskAgent without a *shifterProxy* or + *ShifterCredentials*, in this case the credentials of the authors of the + transformations are used to submit the jobs to the WMS. This enables the use of + a single WorkflowTaskAgent for multiple VOs. See also the section about the + :ref:`trans-multi-vo`. + """ from DIRAC import S_OK diff --git a/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/index.rst b/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/index.rst index 053a0249b90..7128c64b6fc 100644 --- a/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/index.rst +++ b/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/index.rst @@ -3,6 +3,8 @@ Systems / Transformation / / Agents - Sub-subsection Agents associated with DataManagement System: +See also the sections in :mod:`TransformationSystem.Agent` + .. toctree:: :maxdepth: 2 @@ -12,4 +14,4 @@ Agents associated with DataManagement System: transformationagent transformationcleaningagent validateoutputdataagent - workflowtaskagent + diff --git a/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/workflowtaskagent.rst b/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/workflowtaskagent.rst index 6176c6b2dc6..e69de29bb2d 100644 --- a/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/workflowtaskagent.rst +++ b/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/workflowtaskagent.rst @@ -1,25 +0,0 @@ -Systems / Transformation / / Agents / WorkflowTaskAgent - Sub-subsection -=================================================================================== - -The WorkflowTaskAgent takes workflow tasks created in the TransformationDB and submits them to the -WMS. Since version v6r13 there are some new capabilities in the form of TaskManager plugins. - -+------------------------------+-------------------------------+------------------------------+ -| **Name** | **Description** | **Example** | -+------------------------------+-------------------------------+------------------------------+ -| *TransType* | | | -+------------------------------+-------------------------------+------------------------------+ -| *TaskUpdateStatus* | | | -+------------------------------+-------------------------------+------------------------------+ -| *shifterProxy* | | | -+------------------------------+-------------------------------+------------------------------+ -| *CheckReserved* | | | -+------------------------------+-------------------------------+------------------------------+ -| *MonitorFiles* | | | -+------------------------------+-------------------------------+------------------------------+ -| *SubmitTasks* | | | -+------------------------------+-------------------------------+------------------------------+ -| *TasksPerLoop* | | | -+------------------------------+-------------------------------+------------------------------+ -| *MonitorTasks* | | | -+------------------------------+-------------------------------+------------------------------+ diff --git a/docs/source/AdministratorGuide/Systems/Transformation/index.rst b/docs/source/AdministratorGuide/Systems/Transformation/index.rst index 4bb5082067a..6fe85037d6e 100644 --- a/docs/source/AdministratorGuide/Systems/Transformation/index.rst +++ b/docs/source/AdministratorGuide/Systems/Transformation/index.rst @@ -618,17 +618,20 @@ Actions on transformations * **Complete:** The transformation can be archived by the TransformationCleaningAgent. Archived means that the data produced stay, but not the entries in the TransformationDB * **Clean:** The transformation is cleaned by the TransformationCleaningAgent: jobs are killed and removed from WMS. Produced and stored files are removed from the Storage Elements, when "OutputDirectories" parameter is set for the transformation. +.. _trans-multi-vo: ---------------------- Multi VO Configuration ---------------------- + + .. versionadded:: v6r21 There are two possibilities to configure the agents of the transformation system for the use in a multi VO installation. - Use the same WorkflowTaskAgent and RequestTaskAgents for multiple VOs, no - **shifterProxy** or **ShifterCredential** must be set for these agents. If + *shifterProxy* or *ShifterCredential* must be set for these agents. If neither of those options are set the credentials of the owner of the transformations are used to submit Jobs or Requests. From 1ace00f495b1e84a989caeeca2d55ac576c01121 Mon Sep 17 00:00:00 2001 From: Andre Sailer Date: Wed, 20 Jun 2018 14:16:02 +0200 Subject: [PATCH 09/13] RequestTaskAgent: Docs: move configuration parameter table to python file --- .../Agent/RequestTaskAgent.py | 55 ++++++++++++++++++- .../Systems/Transformation/Agents/index.rst | 1 - .../Agents/requesttaskagent.rst | 39 ------------- .../Agents/workflowtaskagent.rst | 0 4 files changed, 53 insertions(+), 42 deletions(-) delete mode 100644 docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/requesttaskagent.rst delete mode 100644 docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/workflowtaskagent.rst diff --git a/TransformationSystem/Agent/RequestTaskAgent.py b/TransformationSystem/Agent/RequestTaskAgent.py index ce2b9f33aca..f9883c5be9b 100644 --- a/TransformationSystem/Agent/RequestTaskAgent.py +++ b/TransformationSystem/Agent/RequestTaskAgent.py @@ -1,5 +1,56 @@ -""" The Request Task Agent takes request tasks created in the transformation database - and submits to the request management system +"""The Request Task Agent takes request tasks created in the +TransformationDB and submits to the request management system. + ++----------------------+---------------------------------------+-------------------------------------------------------+ +| **Name** | **Description** | **Example** | ++----------------------+---------------------------------------+-------------------------------------------------------+ +| *shifterProxy* | Use a dedicated proxy to submit jobs | DataManager | +| | to the WMS | | ++----------------------+---------------------------------------+-------------------------------------------------------+ +| *ShifterCredentials* | Use delegated credentials, same values| | +| |as for | | +| | shifterProxy, but there will not be | | +| |any actual | | +| | proxy used. (New in v6r21) | | +| | | | ++----------------------+---------------------------------------+-------------------------------------------------------+ +| *TransType* | | | ++----------------------+---------------------------------------+-------------------------------------------------------+ +| *PluginLocation* | | DIRAC.TransformationSystem.Client.TaskManagerPlugin | ++----------------------+---------------------------------------+-------------------------------------------------------+ +| *maxNumberOfThreads* | | 15 | ++----------------------+---------------------------------------+-------------------------------------------------------+ +| *TasksPerLoop* | | | ++----------------------+---------------------------------------+-------------------------------------------------------+ +| *TaskUpdateStatus* | | Checking, Deleted, Killed, Staging, Stalled, Matched, | +| | | Scheduled, Rescheduled, Completed, Submitted, | +| | | Assigned, Received, Waiting, Running | ++----------------------+---------------------------------------+-------------------------------------------------------+ +| *SubmitTasks* | | | ++----------------------+---------------------------------------+-------------------------------------------------------+ +| *SubmitStatus* | | Active, Completing | ++----------------------+---------------------------------------+-------------------------------------------------------+ +| *MonitorTasks* | | | ++----------------------+---------------------------------------+-------------------------------------------------------+ +| *MonitorFiles* | | | ++----------------------+---------------------------------------+-------------------------------------------------------+ +| *CheckReserved* | | | ++----------------------+---------------------------------------+-------------------------------------------------------+ +| *CheckReservedStatus*| | Active, Completing, Stopped | ++----------------------+---------------------------------------+-------------------------------------------------------+ +| *UpdateTaskStatus* | | Active, Completing, Stopped | ++----------------------+---------------------------------------+-------------------------------------------------------+ +| *UpdateFileStatus* | | Active, Completing, Stopped | ++----------------------+---------------------------------------+-------------------------------------------------------+ + +.. versionadded:: v6r21 + + It is possible to run the RequestTaskAgent without a *shifterProxy* or + *ShifterCredentials*, in this case the credentials of the authors of the + transformations are used to submit the jobs to the RMS. This enables the use of + a single RequestTaskAgent for multiple VOs. See also the section about the + :ref:`trans-multi-vo`. + """ from DIRAC import S_OK diff --git a/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/index.rst b/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/index.rst index 7128c64b6fc..52bb8bf1e8e 100644 --- a/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/index.rst +++ b/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/index.rst @@ -10,7 +10,6 @@ See also the sections in :mod:`TransformationSystem.Agent` inputdataagent mcextensionagent - requesttaskagent transformationagent transformationcleaningagent validateoutputdataagent diff --git a/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/requesttaskagent.rst b/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/requesttaskagent.rst deleted file mode 100644 index 1e56dee5b1c..00000000000 --- a/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/requesttaskagent.rst +++ /dev/null @@ -1,39 +0,0 @@ -Systems / Transformation / / Agents / RequestTaskAgent - Sub-subsection -================================================================================== - -The Request Task Agent takes request tasks created in the transformation -database and submits to the request management system - -+------------------------------+-------------------------------+-------------------------------------------------------------+ -| **Name** | **Description** | **Example** | -+------------------------------+-------------------------------+-------------------------------------------------------------+ -| shifterProxy | | DataManager | -+------------------------------+-------------------------------+-------------------------------------------------------------+ -| TransType | | | -+------------------------------+-------------------------------+-------------------------------------------------------------+ -| PluginLocation | | DIRAC.TransformationSystem.Client.TaskManagerPlugin | -+------------------------------+-------------------------------+-------------------------------------------------------------+ -| maxNumberOfThreads | | 15 | -+------------------------------+-------------------------------+-------------------------------------------------------------+ -| TasksPerLoop | | | -+------------------------------+-------------------------------+-------------------------------------------------------------+ -| TaskUpdateStatus | | Checking, Deleted, Killed, Staging, Stalled, Matched, | -| | | Scheduled, Rescheduled, Completed, Submitted, Assigned, | -| | | Received, Waiting, Running | -+------------------------------+-------------------------------+-------------------------------------------------------------+ -| SubmitTasks | | | -+------------------------------+-------------------------------+-------------------------------------------------------------+ -| SubmitStatus | | Active, Completing | -+------------------------------+-------------------------------+-------------------------------------------------------------+ -| MonitorTasks | | | -+------------------------------+-------------------------------+-------------------------------------------------------------+ -| MonitorFiles | | | -+------------------------------+-------------------------------+-------------------------------------------------------------+ -| CheckReserved | | | -+------------------------------+-------------------------------+-------------------------------------------------------------+ -| CheckReservedStatus | | Active, Completing, Stopped | -+------------------------------+-------------------------------+-------------------------------------------------------------+ -| UpdateTaskStatus | | Active, Completing, Stopped | -+------------------------------+-------------------------------+-------------------------------------------------------------+ -| UpdateFileStatus | | Active, Completing, Stopped | -+------------------------------+-------------------------------+-------------------------------------------------------------+ diff --git a/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/workflowtaskagent.rst b/docs/source/AdministratorGuide/Configuration/ConfReference/Systems/Transformation/Agents/workflowtaskagent.rst deleted file mode 100644 index e69de29bb2d..00000000000 From 7aa0d354503afdddc8d6de060bf551c62b75df1b Mon Sep 17 00:00:00 2001 From: Andre Sailer Date: Thu, 21 Jun 2018 13:42:16 +0200 Subject: [PATCH 10/13] RequestTaskAgent and WorkflowTaskAgent: Docs: add parameters as config template like structure --- .../Agent/RequestTaskAgent.py | 71 ++++++++++++++++--- .../Agent/WorkflowTaskAgent.py | 60 ++++++++++++++-- TransformationSystem/ConfigTemplate.cfg | 2 +- 3 files changed, 115 insertions(+), 18 deletions(-) diff --git a/TransformationSystem/Agent/RequestTaskAgent.py b/TransformationSystem/Agent/RequestTaskAgent.py index f9883c5be9b..df5fb6bbd49 100644 --- a/TransformationSystem/Agent/RequestTaskAgent.py +++ b/TransformationSystem/Agent/RequestTaskAgent.py @@ -1,6 +1,8 @@ """The Request Task Agent takes request tasks created in the TransformationDB and submits to the request management system. +The following options can be set for the RequestTaskAgent. + +----------------------+---------------------------------------+-------------------------------------------------------+ | **Name** | **Description** | **Example** | +----------------------+---------------------------------------+-------------------------------------------------------+ @@ -8,11 +10,9 @@ | | to the WMS | | +----------------------+---------------------------------------+-------------------------------------------------------+ | *ShifterCredentials* | Use delegated credentials, same values| | -| |as for | | -| | shifterProxy, but there will not be | | -| |any actual | | -| | proxy used. (New in v6r21) | | -| | | | +| | as for shifterProxy, but there will | | +| | not be any actual proxy used. | | +| | (New in v6r21) | | +----------------------+---------------------------------------+-------------------------------------------------------+ | *TransType* | | | +----------------------+---------------------------------------+-------------------------------------------------------+ @@ -43,13 +43,62 @@ | *UpdateFileStatus* | | Active, Completing, Stopped | +----------------------+---------------------------------------+-------------------------------------------------------+ -.. versionadded:: v6r21 - It is possible to run the RequestTaskAgent without a *shifterProxy* or - *ShifterCredentials*, in this case the credentials of the authors of the - transformations are used to submit the jobs to the RMS. This enables the use of - a single RequestTaskAgent for multiple VOs. See also the section about the - :ref:`trans-multi-vo`. +:: + + RequestTaskAgent + { + # Use a dedicated proxy to submit requests to the RMS + shifterProxy=DataManager + # Use delegated credentials. Use this instead of the shifterProxy option (New in v6r21) + ShifterCredentials= + # Transformation types to be taken into account by the agent + TransType=Replication,Removal + # Location of the transformation plugins + PluginLocation=DIRAC.TransformationSystem.Client.TaskManagerPlugin + # maximum number of threads to use in this agent + maxNumberOfThreads=15 + + # Give this option a value if the agent should submit Requests + SubmitTasks= + # Status of transformations for which to submit Requests + SubmitStatus=Active,Completing + # Number of tasks to submit in one execution cycle per transformation + TasksPerLoop=50 + + # Give this option a value if the agent should monitor tasks + MonitorTasks= + # Status of transformations for which to monitor tasks + UpdateTasksStatus = Active,Completing,Stopped + # Task statuses considered transient that should be monitored for updates + TaskUpdateStatus=Checking,Deleted,Killed,Staging,Stalled,Matched + TaskUpdateStatus+=Scheduled,Rescheduled,Completed,Submitted + TaskUpdateStatus+=Assigned,Received,Waiting,Running + # Number of tasks to be updated in one call + TaskUpdateChunkSize=0 + + # Give this option a value if the agent should monitor files + MonitorFiles= + # Status of transformations for which to monitor Files + UpdateFilesStatus=Active,Completing,Stopped + + # Give this option a value if the agent should check Reserved tasks + CheckReserved= + # Status of transformations for which to check reserved tasks + CheckReservedStatus= Active,Completing,Stopped + + } + +* The options *SubmitTasks*, *MonitorTasks*, *MonitorFiles*, and *CheckReserved* + need to be assigned any non-empty value to be activated + +* .. versionadded:: v6r21 + + It is possible to run the RequestTaskAgent without a *shifterProxy* or + *ShifterCredentials*, in this case the credentials of the authors of the + transformations are used to submit the jobs to the RMS. This enables the use of + a single RequestTaskAgent for multiple VOs. See also the section about the + :ref:`trans-multi-vo`. """ diff --git a/TransformationSystem/Agent/WorkflowTaskAgent.py b/TransformationSystem/Agent/WorkflowTaskAgent.py index e6bf9d16697..6a99c213731 100755 --- a/TransformationSystem/Agent/WorkflowTaskAgent.py +++ b/TransformationSystem/Agent/WorkflowTaskAgent.py @@ -5,6 +5,8 @@ The WorkflowTaskAgent takes workflow tasks created in the TransformationDB and submits them to the WMS. Since version v6r13 there are some new capabilities in the form of TaskManager plugins. +The following options can be set for the WorkflowTaskAgent. + +------------------------------+-------------------------------------------+-------------------------------------+ | **Name** | **Description** | **Example** | +------------------------------+-------------------------------------------+-------------------------------------+ @@ -30,13 +32,59 @@ | *MonitorTasks* | | | +------------------------------+-------------------------------------------+-------------------------------------+ -.. versionadded:: v6r21 +:: + + WorkflowTaskAgent + { + # Use a dedicated proxy to submit jobs to the WMS + shifterProxy = ProductionManager + # Use delegated credentials. Use this instead of the shifterProxy option (New in v6r21) + ShifterCredentials = + # Transformation types to be taken into account by the agent + TransType = MCSimulation,DataReconstruction,DataStripping,MCStripping,Merge + # Location of the transformation plugins + PluginLocation = DIRAC.TransformationSystem.Client.TaskManagerPlugin + # maximum number of threads to use in this agent + maxNumberOfThreads = 15 + + # Give this option a value if the agent should submit Requests + SubmitTasks = yes + # Status of transformations for which to submit Requests + SubmitStatus = Active,Completing + # Number of tasks to submit in one execution cycle per transformation + TasksPerLoop = 50 + + # Give this option a value if the agent should monitor tasks + MonitorTasks = yes + # Status of transformations for which to monitor tasks + UpdateTasksStatus = Active,Completing,Stopped + # Task statuses considered transient that should be monitored for updates + TaskUpdateStatus = Submitted,Received,Waiting,Running,Matched,Completed,Failed + # Number of tasks to be updated in one call + TaskUpdateChunkSize = 0 + + # Give this option a value if the agent should monitor files + MonitorFiles = + # Status of transformations for which to monitor Files + UpdateFilesStatus = Active,Completing,Stopped + + # Give this option a value if the agent should check Reserved tasks + CheckReserved = + # Status of transformations for which to check reserved tasks + CheckReservedStatus = Active,Completing,Stopped + + } + +* The options *SubmitTasks*, *MonitorTasks*, *MonitorFiles*, and *CheckReserved* + need to be assigned any non-empty value to be activated + +* .. versionadded:: v6r21 - It is possible to run the WorkflowTaskAgent without a *shifterProxy* or - *ShifterCredentials*, in this case the credentials of the authors of the - transformations are used to submit the jobs to the WMS. This enables the use of - a single WorkflowTaskAgent for multiple VOs. See also the section about the - :ref:`trans-multi-vo`. + It is possible to run the RequestTaskAgent without a *shifterProxy* or + *ShifterCredentials*, in this case the credentials of the authors of the + transformations are used to submit the jobs to the RMS. This enables the use of + a single RequestTaskAgent for multiple VOs. See also the section about the + :ref:`trans-multi-vo`. """ diff --git a/TransformationSystem/ConfigTemplate.cfg b/TransformationSystem/ConfigTemplate.cfg index 5d563dce16f..10847571b37 100644 --- a/TransformationSystem/ConfigTemplate.cfg +++ b/TransformationSystem/ConfigTemplate.cfg @@ -44,7 +44,7 @@ Agents TransType = MCSimulation,DataReconstruction,DataStripping,MCStripping,Merge # Task statuses considered transient that should be monitored for updates TaskUpdateStatus = Submitted,Received,Waiting,Running,Matched,Completed,Failed - # Flag to eanble task submission + # Flag to enable task submission SubmitTasks = yes # Flag for checking reserved tasks that failed submission CheckReserved = yes From 3859ad3f9f055b2dd6fbcb13e991a228fa54b461 Mon Sep 17 00:00:00 2001 From: Andre Sailer Date: Thu, 21 Jun 2018 14:23:27 +0200 Subject: [PATCH 11/13] TaskManagerAgentBase: add ownerDN, ownerGroup parameters to _getClients --- .../Agent/RequestTaskAgent.py | 17 +++++------- .../Agent/TaskManagerAgentBase.py | 27 ++++++++----------- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/TransformationSystem/Agent/RequestTaskAgent.py b/TransformationSystem/Agent/RequestTaskAgent.py index df5fb6bbd49..8be7c6c2d65 100644 --- a/TransformationSystem/Agent/RequestTaskAgent.py +++ b/TransformationSystem/Agent/RequestTaskAgent.py @@ -143,17 +143,14 @@ def initialize(self): return S_OK() - def _getClients(self): - """ Here the taskManager becomes a RequestTasks object - """ - res = TaskManagerAgentBase._getClients(self) - threadTaskManager = RequestTasks() - res.update({'TaskManager': threadTaskManager}) - return res + def _getClients(self, ownerDN=None, ownerGroup=None): + """Set the clients for task submission. - def _getDelegatedClients(self, ownerDN=None, ownerGroup=None): - """Set the clients for per transformation credentials.""" - res = super(RequestTaskAgent, self)._getDelegatedClients(ownerDN=ownerDN, ownerGroup=ownerGroup) + Here the taskManager becomes a RequestTasks object. + + See :func:`DIRAC.TransformationSystem.TaskManagerAgentBase._getClients`. + """ + res = super(RequestTaskAgent, self)._getClients(ownerDN=ownerDN, ownerGroup=ownerGroup) threadTaskManager = RequestTasks(ownerDN=ownerDN, ownerGroup=ownerGroup) res.update({'TaskManager': threadTaskManager}) return res diff --git a/TransformationSystem/Agent/TaskManagerAgentBase.py b/TransformationSystem/Agent/TaskManagerAgentBase.py index cad5f06515c..4f546874ff4 100644 --- a/TransformationSystem/Agent/TaskManagerAgentBase.py +++ b/TransformationSystem/Agent/TaskManagerAgentBase.py @@ -12,7 +12,7 @@ import datetime from Queue import Queue -from DIRAC import S_OK, S_ERROR +from DIRAC import S_OK from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername, getUsernameForDN @@ -235,23 +235,18 @@ def _fillTheQueue(self, operationsOnTransformationsDict): ############################################################################# - def _getClients(self): - """ returns the clients used in the threads - this is another function that should be extended. + def _getClients(self, ownerDN=None, ownerGroup=None): + """Returns the clients used in the threads - The clients provided here are defaults, and should be adapted - """ - threadTransformationClient = TransformationClient() - threadTaskManager = WorkflowTasks() # this is for wms tasks, replace it with something else if needed - threadTaskManager.pluginLocation = self.pluginLocation + This is another function that should be extended. - return {'TransformationClient': threadTransformationClient, - 'TaskManager': threadTaskManager} + The clients provided here are defaults, and should be adapted - def _getDelegatedClients(self, ownerDN=None, ownerGroup=None): - """Set the clients for per transformation credentials. + If ownerDN and ownerGroup are not None the clients will delegate to these credentials - Returns the clients used in the threads - this is another function that should be extended. - The clients provided here are defaults, and should be adapted + :param str ownerDN: DN of the owner of the submitted jobs + :param str ownerGroup: group of the owner of the submitted jobs + :returns: dict of Clients """ threadTransformationClient = TransformationClient() threadTaskManager = WorkflowTasks(ownerDN=ownerDN, ownerGroup=ownerGroup) @@ -265,7 +260,7 @@ def _execute(self, threadID): """ # Each thread will have its own clients if we use credentials/shifterProxy clients = self._getClients() if self.shifterProxy else \ - self._getDelegatedClients(ownerGroup=self.credTuple[1], ownerDN=self.credTuple[2]) if self.credentials \ + self._getClients(ownerGroup=self.credTuple[1], ownerDN=self.credTuple[2]) if self.credentials \ else None method = '_execute' operation = 'None' @@ -285,7 +280,7 @@ def _execute(self, threadID): break if not (self.credentials or self.shifterProxy): ownerDN, group = transIDOPBody[transID]['OwnerDN'], transIDOPBody[transID]['OwnerGroup'] - clients = self._getDelegatedClients(ownerDN=ownerDN, ownerGroup=group) + clients = self._getClients(ownerDN=ownerDN, ownerGroup=group) self.transInThread[transID] = ' [Thread%d] [%s] ' % (threadID, str(transID)) self._logInfo("Start processing transformation", method=method, transID=transID) clients['TaskManager'].transInThread = self.transInThread From 830d86021cd726411bfb91f9ac46b4e10185d5df Mon Sep 17 00:00:00 2001 From: Andre Sailer Date: Thu, 28 Jun 2018 14:25:36 +0200 Subject: [PATCH 12/13] RequestTaskAgent and WorkflowTaskAgent: Docs: drop tables of parameters --- .../Agent/RequestTaskAgent.py | 41 ------------------- .../Agent/WorkflowTaskAgent.py | 25 ----------- 2 files changed, 66 deletions(-) diff --git a/TransformationSystem/Agent/RequestTaskAgent.py b/TransformationSystem/Agent/RequestTaskAgent.py index 8be7c6c2d65..a2a9c06c37d 100644 --- a/TransformationSystem/Agent/RequestTaskAgent.py +++ b/TransformationSystem/Agent/RequestTaskAgent.py @@ -3,47 +3,6 @@ The following options can be set for the RequestTaskAgent. -+----------------------+---------------------------------------+-------------------------------------------------------+ -| **Name** | **Description** | **Example** | -+----------------------+---------------------------------------+-------------------------------------------------------+ -| *shifterProxy* | Use a dedicated proxy to submit jobs | DataManager | -| | to the WMS | | -+----------------------+---------------------------------------+-------------------------------------------------------+ -| *ShifterCredentials* | Use delegated credentials, same values| | -| | as for shifterProxy, but there will | | -| | not be any actual proxy used. | | -| | (New in v6r21) | | -+----------------------+---------------------------------------+-------------------------------------------------------+ -| *TransType* | | | -+----------------------+---------------------------------------+-------------------------------------------------------+ -| *PluginLocation* | | DIRAC.TransformationSystem.Client.TaskManagerPlugin | -+----------------------+---------------------------------------+-------------------------------------------------------+ -| *maxNumberOfThreads* | | 15 | -+----------------------+---------------------------------------+-------------------------------------------------------+ -| *TasksPerLoop* | | | -+----------------------+---------------------------------------+-------------------------------------------------------+ -| *TaskUpdateStatus* | | Checking, Deleted, Killed, Staging, Stalled, Matched, | -| | | Scheduled, Rescheduled, Completed, Submitted, | -| | | Assigned, Received, Waiting, Running | -+----------------------+---------------------------------------+-------------------------------------------------------+ -| *SubmitTasks* | | | -+----------------------+---------------------------------------+-------------------------------------------------------+ -| *SubmitStatus* | | Active, Completing | -+----------------------+---------------------------------------+-------------------------------------------------------+ -| *MonitorTasks* | | | -+----------------------+---------------------------------------+-------------------------------------------------------+ -| *MonitorFiles* | | | -+----------------------+---------------------------------------+-------------------------------------------------------+ -| *CheckReserved* | | | -+----------------------+---------------------------------------+-------------------------------------------------------+ -| *CheckReservedStatus*| | Active, Completing, Stopped | -+----------------------+---------------------------------------+-------------------------------------------------------+ -| *UpdateTaskStatus* | | Active, Completing, Stopped | -+----------------------+---------------------------------------+-------------------------------------------------------+ -| *UpdateFileStatus* | | Active, Completing, Stopped | -+----------------------+---------------------------------------+-------------------------------------------------------+ - - :: RequestTaskAgent diff --git a/TransformationSystem/Agent/WorkflowTaskAgent.py b/TransformationSystem/Agent/WorkflowTaskAgent.py index 6a99c213731..3521f1fd4db 100755 --- a/TransformationSystem/Agent/WorkflowTaskAgent.py +++ b/TransformationSystem/Agent/WorkflowTaskAgent.py @@ -7,31 +7,6 @@ The following options can be set for the WorkflowTaskAgent. -+------------------------------+-------------------------------------------+-------------------------------------+ -| **Name** | **Description** | **Example** | -+------------------------------+-------------------------------------------+-------------------------------------+ -| *TransType* | | | -+------------------------------+-------------------------------------------+-------------------------------------+ -| *TaskUpdateStatus* | | | -+------------------------------+-------------------------------------------+-------------------------------------+ -| *shifterProxy* | Use a dedicated proxy to submit jobs to | | -| | the WMS | | -+------------------------------+-------------------------------------------+-------------------------------------+ -| *ShifterCredentials* | Use delegated credentials, same values as | | -| | for shifterProxy, but there will not be | | -| | any actual proxy used. (New in v6r21) | | -+------------------------------+-------------------------------------------+-------------------------------------+ -| *CheckReserved* | | | -+------------------------------+-------------------------------------------+-------------------------------------+ -| *MonitorFiles* | | | -+------------------------------+-------------------------------------------+-------------------------------------+ -| *SubmitTasks* | | | -+------------------------------+-------------------------------------------+-------------------------------------+ -| *TasksPerLoop* | | | -+------------------------------+-------------------------------------------+-------------------------------------+ -| *MonitorTasks* | | | -+------------------------------+-------------------------------------------+-------------------------------------+ - :: WorkflowTaskAgent From dc8ddf9a6a559e41e39fc769241c09feab8e40dc Mon Sep 17 00:00:00 2001 From: Andre Sailer Date: Thu, 5 Jul 2018 12:56:39 +0200 Subject: [PATCH 13/13] TransformationDocs: downgrade multiVO versionadded to v6r20p5 --- TransformationSystem/Agent/RequestTaskAgent.py | 4 ++-- TransformationSystem/Agent/WorkflowTaskAgent.py | 4 ++-- .../AdministratorGuide/Systems/Transformation/index.rst | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/TransformationSystem/Agent/RequestTaskAgent.py b/TransformationSystem/Agent/RequestTaskAgent.py index a2a9c06c37d..d6a22c7f706 100644 --- a/TransformationSystem/Agent/RequestTaskAgent.py +++ b/TransformationSystem/Agent/RequestTaskAgent.py @@ -9,7 +9,7 @@ { # Use a dedicated proxy to submit requests to the RMS shifterProxy=DataManager - # Use delegated credentials. Use this instead of the shifterProxy option (New in v6r21) + # Use delegated credentials. Use this instead of the shifterProxy option (New in v6r20p5) ShifterCredentials= # Transformation types to be taken into account by the agent TransType=Replication,Removal @@ -51,7 +51,7 @@ * The options *SubmitTasks*, *MonitorTasks*, *MonitorFiles*, and *CheckReserved* need to be assigned any non-empty value to be activated -* .. versionadded:: v6r21 +* .. versionadded:: v6r20p5 It is possible to run the RequestTaskAgent without a *shifterProxy* or *ShifterCredentials*, in this case the credentials of the authors of the diff --git a/TransformationSystem/Agent/WorkflowTaskAgent.py b/TransformationSystem/Agent/WorkflowTaskAgent.py index 3521f1fd4db..56d6ed5d640 100755 --- a/TransformationSystem/Agent/WorkflowTaskAgent.py +++ b/TransformationSystem/Agent/WorkflowTaskAgent.py @@ -13,7 +13,7 @@ { # Use a dedicated proxy to submit jobs to the WMS shifterProxy = ProductionManager - # Use delegated credentials. Use this instead of the shifterProxy option (New in v6r21) + # Use delegated credentials. Use this instead of the shifterProxy option (New in v6r20p5) ShifterCredentials = # Transformation types to be taken into account by the agent TransType = MCSimulation,DataReconstruction,DataStripping,MCStripping,Merge @@ -53,7 +53,7 @@ * The options *SubmitTasks*, *MonitorTasks*, *MonitorFiles*, and *CheckReserved* need to be assigned any non-empty value to be activated -* .. versionadded:: v6r21 +* .. versionadded:: v6r20p5 It is possible to run the RequestTaskAgent without a *shifterProxy* or *ShifterCredentials*, in this case the credentials of the authors of the diff --git a/docs/source/AdministratorGuide/Systems/Transformation/index.rst b/docs/source/AdministratorGuide/Systems/Transformation/index.rst index 6fe85037d6e..8c9aa0bb7f6 100644 --- a/docs/source/AdministratorGuide/Systems/Transformation/index.rst +++ b/docs/source/AdministratorGuide/Systems/Transformation/index.rst @@ -626,7 +626,7 @@ Multi VO Configuration -.. versionadded:: v6r21 +.. versionadded:: v6r20p5 There are two possibilities to configure the agents of the transformation system for the use in a multi VO installation.