From c3655cc8246ece69b0b800b76bafd8fcb5c4936e Mon Sep 17 00:00:00 2001 From: Andre Sailer Date: Mon, 18 Jun 2018 10:45:47 +0200 Subject: [PATCH] TaskManagerAgents: fix logic for non-proxy based general credentials execute is running after _execute(?), so we need to find ShifterCredentials in initialize --- .../Agent/RequestTaskAgent.py | 4 +- .../Agent/TaskManagerAgentBase.py | 49 +++++++++++++------ 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/TransformationSystem/Agent/RequestTaskAgent.py b/TransformationSystem/Agent/RequestTaskAgent.py index 092c0457b9b..ce2b9f33aca 100644 --- a/TransformationSystem/Agent/RequestTaskAgent.py +++ b/TransformationSystem/Agent/RequestTaskAgent.py @@ -51,9 +51,9 @@ def _getClients(self): res.update({'TaskManager': threadTaskManager}) return res - def _getPerTransformationClients(self, ownerDN=None, ownerGroup=None): + def _getDelegatedClients(self, ownerDN=None, ownerGroup=None): """Set the clients for per transformation credentials.""" - res = super(RequestTaskAgent, self)._getPerTransformationClients(ownerDN=ownerDN, ownerGroup=ownerGroup) + res = super(RequestTaskAgent, self)._getDelegatedClients(ownerDN=ownerDN, ownerGroup=ownerGroup) threadTaskManager = RequestTasks(ownerDN=ownerDN, ownerGroup=ownerGroup) res.update({'TaskManager': threadTaskManager}) return res diff --git a/TransformationSystem/Agent/TaskManagerAgentBase.py b/TransformationSystem/Agent/TaskManagerAgentBase.py index 2b563bc4876..cad5f06515c 100644 --- a/TransformationSystem/Agent/TaskManagerAgentBase.py +++ b/TransformationSystem/Agent/TaskManagerAgentBase.py @@ -12,7 +12,7 @@ import datetime from Queue import Queue -from DIRAC import S_OK +from DIRAC import S_OK, S_ERROR from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername, getUsernameForDN @@ -80,8 +80,12 @@ def initialize(self): self.bulkSubmissionFlag = self.am_getOption('BulkSubmission', False) # Shifter credentials to use, could replace the use of shifterProxy eventually - self.credentials = self.am_getOption('ShifterCredentials', self.am_getOption('shifterProxy', None)) - + self.shifterProxy = self.am_getOption('shifterProxy', None) + self.credentials = self.am_getOption('ShifterCredentials', None) + self.credTuple = (None, None, None) + resCred = self.__getCredentials() + if not resCred['OK']: + return resCred # setting up the threading maxNumberOfThreads = self.am_getOption('maxNumberOfThreads', 15) threadPool = ThreadPool(maxNumberOfThreads, maxNumberOfThreads) @@ -121,15 +125,7 @@ def execute(self): ownerDN = proxyInfo['identity'] self.log.info("ShifterProxy: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup)) elif self.credentials: - resCred = Operations().getOptionsDict("/Shifter/%s" % self.credentials) - if resCred['OK']: - owner = resCred['Value']['User'] - ownerGroup = resCred['Value']['Group'] - ownerDN = getDNForUsername(owner)['Value'] - self.log.info("Cred: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup)) - else: - self.log.error("Cred: Failed to find shifter credentials", self.credentials) - return resCred + owner, ownerGroup, ownerDN = self.credTuple else: self.log.info("Using per Transformation Credentials!") @@ -251,7 +247,7 @@ def _getClients(self): return {'TransformationClient': threadTransformationClient, 'TaskManager': threadTaskManager} - def _getPerTransformationClients(self, ownerDN=None, ownerGroup=None): + def _getDelegatedClients(self, ownerDN=None, ownerGroup=None): """Set the clients for per transformation credentials. Returns the clients used in the threads - this is another function that should be extended. @@ -268,7 +264,9 @@ def _execute(self, threadID): """ This is what runs inside the threads, in practice this is the function that does the real stuff """ # Each thread will have its own clients if we use credentials/shifterProxy - clients = self._getClients() if self.credentials else None + clients = self._getClients() if self.shifterProxy else \ + self._getDelegatedClients(ownerGroup=self.credTuple[1], ownerDN=self.credTuple[2]) if self.credentials \ + else None method = '_execute' operation = 'None' @@ -285,9 +283,9 @@ def _execute(self, threadID): self._logWarn("Got a transf not in transInQueue...?", method=method, transID=transID) break - if not self.credentials: + if not (self.credentials or self.shifterProxy): ownerDN, group = transIDOPBody[transID]['OwnerDN'], transIDOPBody[transID]['OwnerGroup'] - clients = self._getPerTransformationClients(ownerDN=ownerDN, ownerGroup=group) + clients = self._getDelegatedClients(ownerDN=ownerDN, ownerGroup=group) self.transInThread[transID] = ' [Thread%d] [%s] ' % (threadID, str(transID)) self._logInfo("Start processing transformation", method=method, transID=transID) clients['TaskManager'].transInThread = self.transInThread @@ -598,3 +596,22 @@ def _addOperationForTransformations(operationsOnTransformationDict, operation, t 'OwnerGroup': ownerGroup if owner else t_ownerGroup, 'OwnerDN': ownerDN if owner else t_ownerDN } + + def __getCredentials(self): + """Get the credentials to use if ShifterCredentials are set, otherwise do nothing. + + This function fills the self.credTuple tuple. + """ + if not self.credentials: + return S_OK() + resCred = Operations().getOptionsDict("/Shifter/%s" % self.credentials) + if not resCred['OK']: + self.log.error("Cred: Failed to find shifter credentials", self.credentials) + return resCred + owner = resCred['Value']['User'] + ownerGroup = resCred['Value']['Group'] + # returns a list + ownerDN = getDNForUsername(owner)['Value'][0] + self.credTuple = (owner, ownerGroup, ownerDN) + self.log.info("Cred: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup)) + return S_OK()