Skip to content

Commit

Permalink
TaskManagerAgents: fix logic for non-proxy based general credentials
Browse files Browse the repository at this point in the history
execute is running after _execute(?), so we need to find ShifterCredentials in initialize
  • Loading branch information
andresailer committed Jun 18, 2018
1 parent 306e662 commit c3655cc
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 18 deletions.
4 changes: 2 additions & 2 deletions TransformationSystem/Agent/RequestTaskAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ def _getClients(self):
res.update({'TaskManager': threadTaskManager})
return res

def _getPerTransformationClients(self, ownerDN=None, ownerGroup=None):
def _getDelegatedClients(self, ownerDN=None, ownerGroup=None):
"""Set the clients for per transformation credentials."""
res = super(RequestTaskAgent, self)._getPerTransformationClients(ownerDN=ownerDN, ownerGroup=ownerGroup)
res = super(RequestTaskAgent, self)._getDelegatedClients(ownerDN=ownerDN, ownerGroup=ownerGroup)
threadTaskManager = RequestTasks(ownerDN=ownerDN, ownerGroup=ownerGroup)
res.update({'TaskManager': threadTaskManager})
return res
49 changes: 33 additions & 16 deletions TransformationSystem/Agent/TaskManagerAgentBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import datetime
from Queue import Queue

from DIRAC import S_OK
from DIRAC import S_OK, S_ERROR

from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername, getUsernameForDN
Expand Down Expand Up @@ -80,8 +80,12 @@ def initialize(self):
self.bulkSubmissionFlag = self.am_getOption('BulkSubmission', False)

# Shifter credentials to use, could replace the use of shifterProxy eventually
self.credentials = self.am_getOption('ShifterCredentials', self.am_getOption('shifterProxy', None))

self.shifterProxy = self.am_getOption('shifterProxy', None)
self.credentials = self.am_getOption('ShifterCredentials', None)
self.credTuple = (None, None, None)
resCred = self.__getCredentials()
if not resCred['OK']:
return resCred
# setting up the threading
maxNumberOfThreads = self.am_getOption('maxNumberOfThreads', 15)
threadPool = ThreadPool(maxNumberOfThreads, maxNumberOfThreads)
Expand Down Expand Up @@ -121,15 +125,7 @@ def execute(self):
ownerDN = proxyInfo['identity']
self.log.info("ShifterProxy: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup))
elif self.credentials:
resCred = Operations().getOptionsDict("/Shifter/%s" % self.credentials)
if resCred['OK']:
owner = resCred['Value']['User']
ownerGroup = resCred['Value']['Group']
ownerDN = getDNForUsername(owner)['Value']
self.log.info("Cred: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup))
else:
self.log.error("Cred: Failed to find shifter credentials", self.credentials)
return resCred
owner, ownerGroup, ownerDN = self.credTuple
else:
self.log.info("Using per Transformation Credentials!")

Expand Down Expand Up @@ -251,7 +247,7 @@ def _getClients(self):
return {'TransformationClient': threadTransformationClient,
'TaskManager': threadTaskManager}

def _getPerTransformationClients(self, ownerDN=None, ownerGroup=None):
def _getDelegatedClients(self, ownerDN=None, ownerGroup=None):
"""Set the clients for per transformation credentials.
Returns the clients used in the threads - this is another function that should be extended.
Expand All @@ -268,7 +264,9 @@ def _execute(self, threadID):
""" This is what runs inside the threads, in practice this is the function that does the real stuff
"""
# Each thread will have its own clients if we use credentials/shifterProxy
clients = self._getClients() if self.credentials else None
clients = self._getClients() if self.shifterProxy else \
self._getDelegatedClients(ownerGroup=self.credTuple[1], ownerDN=self.credTuple[2]) if self.credentials \
else None
method = '_execute'
operation = 'None'

Expand All @@ -285,9 +283,9 @@ def _execute(self, threadID):
self._logWarn("Got a transf not in transInQueue...?",
method=method, transID=transID)
break
if not self.credentials:
if not (self.credentials or self.shifterProxy):
ownerDN, group = transIDOPBody[transID]['OwnerDN'], transIDOPBody[transID]['OwnerGroup']
clients = self._getPerTransformationClients(ownerDN=ownerDN, ownerGroup=group)
clients = self._getDelegatedClients(ownerDN=ownerDN, ownerGroup=group)
self.transInThread[transID] = ' [Thread%d] [%s] ' % (threadID, str(transID))
self._logInfo("Start processing transformation", method=method, transID=transID)
clients['TaskManager'].transInThread = self.transInThread
Expand Down Expand Up @@ -598,3 +596,22 @@ def _addOperationForTransformations(operationsOnTransformationDict, operation, t
'OwnerGroup': ownerGroup if owner else t_ownerGroup,
'OwnerDN': ownerDN if owner else t_ownerDN
}

def __getCredentials(self):
"""Get the credentials to use if ShifterCredentials are set, otherwise do nothing.
This function fills the self.credTuple tuple.
"""
if not self.credentials:
return S_OK()
resCred = Operations().getOptionsDict("/Shifter/%s" % self.credentials)
if not resCred['OK']:
self.log.error("Cred: Failed to find shifter credentials", self.credentials)
return resCred
owner = resCred['Value']['User']
ownerGroup = resCred['Value']['Group']
# returns a list
ownerDN = getDNForUsername(owner)['Value'][0]
self.credTuple = (owner, ownerGroup, ownerDN)
self.log.info("Cred: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup))
return S_OK()

0 comments on commit c3655cc

Please sign in to comment.