diff --git a/TransformationSystem/Agent/RequestTaskAgent.py b/TransformationSystem/Agent/RequestTaskAgent.py index 2ebdc45999c..092c0457b9b 100644 --- a/TransformationSystem/Agent/RequestTaskAgent.py +++ b/TransformationSystem/Agent/RequestTaskAgent.py @@ -50,3 +50,10 @@ def _getClients(self): threadTaskManager = RequestTasks() res.update({'TaskManager': threadTaskManager}) return res + + def _getPerTransformationClients(self, ownerDN=None, ownerGroup=None): + """Set the clients for per transformation credentials.""" + res = super(RequestTaskAgent, self)._getPerTransformationClients(ownerDN=ownerDN, ownerGroup=ownerGroup) + threadTaskManager = RequestTasks(ownerDN=ownerDN, ownerGroup=ownerGroup) + res.update({'TaskManager': threadTaskManager}) + return res diff --git a/TransformationSystem/Agent/TaskManagerAgentBase.py b/TransformationSystem/Agent/TaskManagerAgentBase.py index bb676c9b2ae..2f324bd9a08 100644 --- a/TransformationSystem/Agent/TaskManagerAgentBase.py +++ b/TransformationSystem/Agent/TaskManagerAgentBase.py @@ -14,6 +14,8 @@ from DIRAC import S_OK +from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations +from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername, getUsernameForDN from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor from DIRAC.Core.Base.AgentModule import AgentModule from DIRAC.Core.Utilities.ThreadPool import ThreadPool @@ -47,10 +49,6 @@ def __init__(self, *args, **kwargs): self.tasksPerLoop = 50 - self.owner = '' - self.ownerGroup = '' - self.ownerDN = '' - self.pluginLocation = '' self.bulkSubmissionFlag = False @@ -81,6 +79,9 @@ def initialize(self): # Bulk submission flag self.bulkSubmissionFlag = self.am_getOption('BulkSubmission', False) + # Shifter credentials to use, could replace the use of shifterProxy eventually + self.credentials = self.am_getOption('ShifterCredentials', self.am_getOption('shifterProxy', None)) + # setting up the threading maxNumberOfThreads = self.am_getOption('maxNumberOfThreads', 15) threadPool = ThreadPool(maxNumberOfThreads, maxNumberOfThreads) @@ -110,6 +111,27 @@ def execute(self): """ operationsOnTransformationDict = {} + owner, ownerGroup, ownerDN = None, None, None + # getting the credentials for submission + if getProxyInfo(False, False)['OK']: # there is a shifterProxy + res = getProxyInfo(False, False) + proxyInfo = res['Value'] # must be there + owner = proxyInfo['username'] + ownerGroup = proxyInfo['group'] + ownerDN = proxyInfo['identity'] + self.log.info("ShifterProxy: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup)) + elif self.credentials: + resCred = Operations().getOptionsDict("/Shifter/%s" % self.credentials) + if resCred['OK']: + owner = resCred['Value']['User'] + ownerGroup = resCred['Value']['Group'] + ownerDN = getDNForUsername(owner)['Value'] + self.log.info("Cred: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup)) + else: + self.log.error("Cred: Failed to find shifter credentials", self.credentials) + return resCred + else: + self.log.info("Using per Transformation Credentials!") # Determine whether the task status is to be monitored and updated enableTaskMonitor = self.am_getOption('MonitorTasks', '') @@ -123,10 +145,8 @@ def execute(self): if not transformations['OK']: self.log.warn("Could not select transformations:", transformations['Message']) else: - transformationIDsAndBodies = dict((transformation['TransformationID'], - transformation['Body']) for transformation in transformations['Value']) - for transID, body in transformationIDsAndBodies.iteritems(): - operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['updateTaskStatus']} + self._addOperationForTransformations(operationsOnTransformationDict, 'updateTaskStatus', transformations, + owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN) # Determine whether the task files status is to be monitored and updated enableFileMonitor = self.am_getOption('MonitorFiles', '') @@ -140,13 +160,8 @@ def execute(self): if not transformations['OK']: self.log.warn("Could not select transformations:", transformations['Message']) else: - transformationIDsAndBodies = dict((transformation['TransformationID'], - transformation['Body']) for transformation in transformations['Value']) - for transID, body in transformationIDsAndBodies.iteritems(): - if transID in operationsOnTransformationDict: - operationsOnTransformationDict[transID]['Operations'].append('updateFileStatus') - else: - operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['updateFileStatus']} + self._addOperationForTransformations(operationsOnTransformationDict, 'updateFileStatus', transformations, + owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN) # Determine whether the checking of reserved tasks is to be performed enableCheckReserved = self.am_getOption('CheckReserved', '') @@ -160,29 +175,14 @@ def execute(self): if not transformations['OK']: self.log.warn("Could not select transformations:", transformations['Message']) else: - transformationIDsAndBodies = dict((transformation['TransformationID'], - transformation['Body']) for transformation in transformations['Value']) - for transID, body in transformationIDsAndBodies.iteritems(): - if transID in operationsOnTransformationDict: - operationsOnTransformationDict[transID]['Operations'].append('checkReservedTasks') - else: - operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['checkReservedTasks']} + self._addOperationForTransformations(operationsOnTransformationDict, 'checkReservedTasks', transformations, + owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN) # Determine whether the submission of tasks is to be performed enableSubmission = self.am_getOption('SubmitTasks', '') if not enableSubmission: self.log.verbose("Submission of tasks is disabled. To enable it, create the 'SubmitTasks' option") else: - # getting the credentials for submission - res = getProxyInfo(False, False) - if not res['OK']: - self.log.error("Failed to determine credentials for submission", res['Message']) - return res - proxyInfo = res['Value'] - self.owner = proxyInfo['username'] - self.ownerGroup = proxyInfo['group'] - self.ownerDN = proxyInfo['identity'] - self.log.info("Tasks will be submitted with the credentials %s:%s" % (self.owner, self.ownerGroup)) # Get the transformations for which the check of reserved tasks have to be performed status = self.am_getOption('SubmitTransformationStatus', self.am_getOption('SubmitStatus', ['Active', 'Completing'])) @@ -192,13 +192,10 @@ def execute(self): else: # Get the transformations which should be submitted self.tasksPerLoop = self.am_getOption('TasksPerLoop', self.tasksPerLoop) - transformationIDsAndBodies = dict((transformation['TransformationID'], - transformation['Body']) for transformation in transformations['Value']) - for transID, body in transformationIDsAndBodies.iteritems(): - if transID in operationsOnTransformationDict: - operationsOnTransformationDict[transID]['Operations'].append('submitTasks') - else: - operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['submitTasks']} + self._addOperationForTransformations(operationsOnTransformationDict, 'submitTasks', transformations, + owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN) + + self._fillTheQueue(operationsOnTransformationDict) @@ -254,11 +251,24 @@ def _getClients(self): return {'TransformationClient': threadTransformationClient, 'TaskManager': threadTaskManager} + def _getPerTransformationClients(self, ownerDN=None, ownerGroup=None): + """Set the clients for per transformation credentials. + + Returns the clients used in the threads - this is another function that should be extended. + The clients provided here are defaults, and should be adapted + """ + threadTransformationClient = TransformationClient() + threadTaskManager = WorkflowTasks(ownerDN=ownerDN, ownerGroup=ownerGroup) + threadTaskManager.pluginLocation = self.pluginLocation + + return {'TransformationClient': threadTransformationClient, + 'TaskManager': threadTaskManager} + def _execute(self, threadID): """ This is what runs inside the threads, in practice this is the function that does the real stuff """ - # Each thread will have its own clients - clients = self._getClients() + # Each thread will have its own clients if we use credentials/shifterProxy + clients = self._getClients() if self.credentials else None method = '_execute' operation = 'None' @@ -275,6 +285,9 @@ def _execute(self, threadID): self._logWarn("Got a transf not in transInQueue...?", method=method, transID=transID) break + if not self.credentials: + ownerDN, group = transIDOPBody[transID]['OwnerDN'], transIDOPBody[transID]['OwnerGroup'] + clients = self._getPerTransformationClients(ownerDN=ownerDN, ownerGroup=group) self.transInThread[transID] = ' [Thread%d] [%s] ' % (threadID, str(transID)) self._logInfo("Start processing transformation", method=method, transID=transID) clients['TaskManager'].transInThread = self.transInThread @@ -512,6 +525,9 @@ def submitTasks(self, transIDOPBody, clients): """ transID = transIDOPBody.keys()[0] transBody = transIDOPBody[transID]['Body'] + owner = transIDOPBody[transID]['Owner'] + ownerGroup = transIDOPBody[transID]['OwnerGroup'] + ownerDN = transIDOPBody[transID]['OwnerDN'] method = 'submitTasks' # Get all tasks to submit @@ -533,9 +549,9 @@ def submitTasks(self, transIDOPBody, clients): # Prepare tasks preparedTransformationTasks = clients['TaskManager'].prepareTransformationTasks(transBody, tasks, - self.owner, - self.ownerGroup, - self.ownerDN, + owner, + ownerGroup, + ownerDN, self.bulkSubmissionFlag) self._logDebug("prepareTransformationTasks return value:", preparedTransformationTasks, method=method, transID=transID) @@ -563,3 +579,22 @@ def submitTasks(self, transIDOPBody, clients): return res return S_OK() + + @staticmethod + def _addOperationForTransformations(operationsOnTransformationDict, operation, transformations, + owner=None, ownerGroup=None, ownerDN=None): + """Fill the operationsOnTransformationDict""" + transformationIDsAndBodies = [(transformation['TransformationID'], + transformation['Body'], + transformation['AuthorDN'], + transformation['AuthorGroup'], + ) for transformation in transformations['Value']] + for transID, body, t_ownerDN, t_ownerGroup in transformationIDsAndBodies: + if transID in operationsOnTransformationDict: + operationsOnTransformationDict[transID]['Operations'].append(operation) + else: + operationsOnTransformationDict[transID] = {'Body': body, 'Operations': [operation], + 'Owner': owner if owner else getUsernameForDN(t_ownerDN)['Value'], + 'OwnerGroup': ownerGroup if owner else t_ownerGroup, + 'OwnerDN': ownerDN if owner else t_ownerDN + } diff --git a/TransformationSystem/Agent/test/Test_Agent_TransformationSystem.py b/TransformationSystem/Agent/test/Test_Agent_TransformationSystem.py index 192f37ca8e7..3d06758de29 100644 --- a/TransformationSystem/Agent/test/Test_Agent_TransformationSystem.py +++ b/TransformationSystem/Agent/test/Test_Agent_TransformationSystem.py @@ -238,7 +238,8 @@ def test_checkReservedTasks( self ): def test_submitTasks( self ): clients = {'TransformationClient':self.tc_mock, 'TaskManager':self.tm_mock} - transIDOPBody = {1:{'Operations':['op1', 'op2'], 'Body':'veryBigBody'}} + transIDOPBody = {1: {'Operations': ['op1', 'op2'], 'Body': 'veryBigBody', + 'Owner': 'prodMan', 'OwnerDN': '/ca=man/user=prodMan', 'OwnerGroup': 'prodMans'}} # errors getting self.tc_mock.getTasksToSubmit.return_value = {'OK': False, 'Message': 'a mess'} diff --git a/TransformationSystem/Client/TaskManager.py b/TransformationSystem/Client/TaskManager.py index 3caa6f358aa..dd597fb9d01 100644 --- a/TransformationSystem/Client/TaskManager.py +++ b/TransformationSystem/Client/TaskManager.py @@ -1,4 +1,4 @@ -""" TaskManager contains WorkflowsTasks and RequestTasks modules, for managing jobs and requests tasks +""" TaskManager contains WorkflowTasks and RequestTasks modules, for managing jobs and requests tasks """ __RCSID__ = "$Id$" @@ -108,7 +108,9 @@ class RequestTasks(TaskBase): """ def __init__(self, transClient=None, logger=None, requestClient=None, - requestClass=None, requestValidator=None): + requestClass=None, requestValidator=None, + ownerDN=None, ownerGroup=None, + ): """ c'tor the requestClass is by default Request. @@ -120,9 +122,13 @@ def __init__(self, transClient=None, logger=None, requestClient=None, logger = gLogger.getSubLogger('RequestTasks') super(RequestTasks, self).__init__(transClient, logger) + useCertificates = True if (bool(ownerDN) and bool(ownerGroup)) else None if not requestClient: - self.requestClient = ReqClient() + self.requestClient = ReqClient(useCertificates=useCertificates, + delegatedDN=ownerDN, + delegatedGroup=ownerGroup, + ) else: self.requestClient = requestClient @@ -417,7 +423,9 @@ class WorkflowTasks(TaskBase): """ def __init__(self, transClient=None, logger=None, submissionClient=None, jobMonitoringClient=None, - outputDataModule=None, jobClass=None, opsH=None, destinationPlugin=None): + outputDataModule=None, jobClass=None, opsH=None, destinationPlugin=None, + ownerDN=None, ownerGroup=None, + ): """ Generates some default objects. jobClass is by default "DIRAC.Interfaces.API.Job.Job". An extension of it also works: VOs can pass in their job class extension, if present @@ -428,8 +436,12 @@ def __init__(self, transClient=None, logger=None, submissionClient=None, jobMoni super(WorkflowTasks, self).__init__(transClient, logger) + useCertificates = True if (bool(ownerDN) and bool(ownerGroup)) else None if not submissionClient: - self.submissionClient = WMSClient() + self.submissionClient = WMSClient(useCertificates=useCertificates, + delegatedDN=ownerDN, + delegatedGroup=ownerGroup, + ) else: self.submissionClient = submissionClient diff --git a/WorkloadManagementSystem/Client/WMSClient.py b/WorkloadManagementSystem/Client/WMSClient.py index 8728a767a83..f151cfd9cd5 100755 --- a/WorkloadManagementSystem/Client/WMSClient.py +++ b/WorkloadManagementSystem/Client/WMSClient.py @@ -30,15 +30,17 @@ class WMSClient(object): """ def __init__(self, jobManagerClient=None, sbRPCClient=None, sbTransferClient=None, - useCertificates=False, timeout=600): + useCertificates=False, timeout=600, delegatedDN=None, delegatedGroup=None): """ WMS Client constructor Here we also initialize the needed clients and connections """ self.useCertificates = useCertificates + self.delegatedDN = delegatedDN + self.delegatedGroup = delegatedGroup self.timeout = timeout - self.jobManager = jobManagerClient + self._jobManager = jobManagerClient self.operationsHelper = Operations() self.sandboxClient = None if sbRPCClient and sbTransferClient: @@ -46,6 +48,18 @@ def __init__(self, jobManagerClient=None, sbRPCClient=None, sbTransferClient=Non transferClient=sbTransferClient, useCertificates=useCertificates) + @property + def jobManager(self): + if not self._jobManager: + self._jobManager = RPCClient('WorkloadManagement/JobManager', + useCertificates=self.useCertificates, + delegatedDN=self.delegatedDN, + delegatedGroup=self.delegatedGroup, + timeout=self.timeout) + + return self._jobManager + + ############################################################################### def __getInputSandboxEntries(self, classAdJob): @@ -109,7 +123,9 @@ def __uploadInputSandbox(self, classAdJob, jobDescriptionObject=None): if okFiles: if not self.sandboxClient: - self.sandboxClient = SandboxStoreClient(useCertificates=self.useCertificates) + self.sandboxClient = SandboxStoreClient(useCertificates=self.useCertificates, + delegatedDN=self.delegatedDN, + delegatedGroup=self.delegatedGroup) result = self.sandboxClient.uploadFilesAsSandbox(okFiles) if not result['OK']: return result @@ -159,12 +175,6 @@ def submitJob(self, jdl, jobDescriptionObject=None): return result nJobs = result['Value'] parametricJob = nJobs > 0 - - if not self.jobManager: - self.jobManager = RPCClient('WorkloadManagement/JobManager', - useCertificates=self.useCertificates, - timeout=self.timeout) - result = self.jobManager.submitJob(classAdJob.asJDL()) if parametricJob: @@ -200,38 +210,22 @@ def killJob(self, jobID): """ Kill running job. jobID can be an integer representing a single DIRAC job ID or a list of IDs """ - if not self.jobManager: - self.jobManager = RPCClient('WorkloadManagement/JobManager', - useCertificates=self.useCertificates, - timeout=self.timeout) return self.jobManager.killJob(jobID) def deleteJob(self, jobID): """ Delete job(s) from the WMS Job database. jobID can be an integer representing a single DIRAC job ID or a list of IDs """ - if not self.jobManager: - self.jobManager = RPCClient('WorkloadManagement/JobManager', - useCertificates=self.useCertificates, - timeout=self.timeout) return self.jobManager.deleteJob(jobID) def rescheduleJob(self, jobID): """ Reschedule job(s) in WMS Job database. jobID can be an integer representing a single DIRAC job ID or a list of IDs """ - if not self.jobManager: - self.jobManager = RPCClient('WorkloadManagement/JobManager', - useCertificates=self.useCertificates, - timeout=self.timeout) return self.jobManager.rescheduleJob(jobID) def resetJob(self, jobID): """ Reset job(s) in WMS Job database. jobID can be an integer representing a single DIRAC job ID or a list of IDs """ - if not self.jobManager: - self.jobManager = RPCClient('WorkloadManagement/JobManager', - useCertificates=self.useCertificates, - timeout=self.timeout) return self.jobManager.resetJob(jobID)