Skip to content

Commit

Permalink
TransformationSystem: make things multiVO capable
Browse files Browse the repository at this point in the history
TaskManagerAgentBase: add option ShifterCredentials to set the credentials to use for all submissions, this is single VO only
TaskManagerAgentBase: WorkflowTasks/RequestTasks: pass ownerDN and ownerGroup parameter to all the submission clients if using shifterProxy ownerDN and ownerGroup are None thus reproducing the original behaviour
TaskManagerAgentBase: refactor adding operations for transformation to separate function to ensure presence of Owner/DN/Group in dict entries
  • Loading branch information
andresailer committed Jun 14, 2018
1 parent 98c757c commit d416f69
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 75 deletions.
7 changes: 7 additions & 0 deletions TransformationSystem/Agent/RequestTaskAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,10 @@ def _getClients(self):
threadTaskManager = RequestTasks()
res.update({'TaskManager': threadTaskManager})
return res

def _getPerTransformationClients(self, ownerDN=None, ownerGroup=None):
"""Set the clients for per transformation credentials."""
res = super(RequestTaskAgent, self)._getPerTransformationClients(ownerDN=ownerDN, ownerGroup=ownerGroup)
threadTaskManager = RequestTasks(ownerDN=ownerDN, ownerGroup=ownerGroup)
res.update({'TaskManager': threadTaskManager})
return res
123 changes: 79 additions & 44 deletions TransformationSystem/Agent/TaskManagerAgentBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from DIRAC import S_OK

from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername, getUsernameForDN
from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Utilities.ThreadPool import ThreadPool
Expand Down Expand Up @@ -47,10 +49,6 @@ def __init__(self, *args, **kwargs):

self.tasksPerLoop = 50

self.owner = ''
self.ownerGroup = ''
self.ownerDN = ''

self.pluginLocation = ''
self.bulkSubmissionFlag = False

Expand Down Expand Up @@ -81,6 +79,9 @@ def initialize(self):
# Bulk submission flag
self.bulkSubmissionFlag = self.am_getOption('BulkSubmission', False)

# Shifter credentials to use, could replace the use of shifterProxy eventually
self.credentials = self.am_getOption('ShifterCredentials', self.am_getOption('shifterProxy', None))

# setting up the threading
maxNumberOfThreads = self.am_getOption('maxNumberOfThreads', 15)
threadPool = ThreadPool(maxNumberOfThreads, maxNumberOfThreads)
Expand Down Expand Up @@ -110,6 +111,27 @@ def execute(self):
"""

operationsOnTransformationDict = {}
owner, ownerGroup, ownerDN = None, None, None
# getting the credentials for submission
if getProxyInfo(False, False)['OK']: # there is a shifterProxy
res = getProxyInfo(False, False)
proxyInfo = res['Value'] # must be there
owner = proxyInfo['username']
ownerGroup = proxyInfo['group']
ownerDN = proxyInfo['identity']
self.log.info("ShifterProxy: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup))
elif self.credentials:
resCred = Operations().getOptionsDict("/Shifter/%s" % self.credentials)
if resCred['OK']:
owner = resCred['Value']['User']
ownerGroup = resCred['Value']['Group']
ownerDN = getDNForUsername(owner)['Value']
self.log.info("Cred: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup))
else:
self.log.error("Cred: Failed to find shifter credentials", self.credentials)
return resCred
else:
self.log.info("Using per Transformation Credentials!")

# Determine whether the task status is to be monitored and updated
enableTaskMonitor = self.am_getOption('MonitorTasks', '')
Expand All @@ -123,10 +145,8 @@ def execute(self):
if not transformations['OK']:
self.log.warn("Could not select transformations:", transformations['Message'])
else:
transformationIDsAndBodies = dict((transformation['TransformationID'],
transformation['Body']) for transformation in transformations['Value'])
for transID, body in transformationIDsAndBodies.iteritems():
operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['updateTaskStatus']}
self._addOperationForTransformations(operationsOnTransformationDict, 'updateTaskStatus', transformations,
owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN)

# Determine whether the task files status is to be monitored and updated
enableFileMonitor = self.am_getOption('MonitorFiles', '')
Expand All @@ -140,13 +160,8 @@ def execute(self):
if not transformations['OK']:
self.log.warn("Could not select transformations:", transformations['Message'])
else:
transformationIDsAndBodies = dict((transformation['TransformationID'],
transformation['Body']) for transformation in transformations['Value'])
for transID, body in transformationIDsAndBodies.iteritems():
if transID in operationsOnTransformationDict:
operationsOnTransformationDict[transID]['Operations'].append('updateFileStatus')
else:
operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['updateFileStatus']}
self._addOperationForTransformations(operationsOnTransformationDict, 'updateFileStatus', transformations,
owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN)

# Determine whether the checking of reserved tasks is to be performed
enableCheckReserved = self.am_getOption('CheckReserved', '')
Expand All @@ -160,29 +175,14 @@ def execute(self):
if not transformations['OK']:
self.log.warn("Could not select transformations:", transformations['Message'])
else:
transformationIDsAndBodies = dict((transformation['TransformationID'],
transformation['Body']) for transformation in transformations['Value'])
for transID, body in transformationIDsAndBodies.iteritems():
if transID in operationsOnTransformationDict:
operationsOnTransformationDict[transID]['Operations'].append('checkReservedTasks')
else:
operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['checkReservedTasks']}
self._addOperationForTransformations(operationsOnTransformationDict, 'checkReservedTasks', transformations,
owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN)

# Determine whether the submission of tasks is to be performed
enableSubmission = self.am_getOption('SubmitTasks', '')
if not enableSubmission:
self.log.verbose("Submission of tasks is disabled. To enable it, create the 'SubmitTasks' option")
else:
# getting the credentials for submission
res = getProxyInfo(False, False)
if not res['OK']:
self.log.error("Failed to determine credentials for submission", res['Message'])
return res
proxyInfo = res['Value']
self.owner = proxyInfo['username']
self.ownerGroup = proxyInfo['group']
self.ownerDN = proxyInfo['identity']
self.log.info("Tasks will be submitted with the credentials %s:%s" % (self.owner, self.ownerGroup))
# Get the transformations for which the check of reserved tasks have to be performed
status = self.am_getOption('SubmitTransformationStatus',
self.am_getOption('SubmitStatus', ['Active', 'Completing']))
Expand All @@ -192,13 +192,10 @@ def execute(self):
else:
# Get the transformations which should be submitted
self.tasksPerLoop = self.am_getOption('TasksPerLoop', self.tasksPerLoop)
transformationIDsAndBodies = dict((transformation['TransformationID'],
transformation['Body']) for transformation in transformations['Value'])
for transID, body in transformationIDsAndBodies.iteritems():
if transID in operationsOnTransformationDict:
operationsOnTransformationDict[transID]['Operations'].append('submitTasks')
else:
operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['submitTasks']}
self._addOperationForTransformations(operationsOnTransformationDict, 'submitTasks', transformations,
owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN)



self._fillTheQueue(operationsOnTransformationDict)

Expand Down Expand Up @@ -254,11 +251,24 @@ def _getClients(self):
return {'TransformationClient': threadTransformationClient,
'TaskManager': threadTaskManager}

def _getPerTransformationClients(self, ownerDN=None, ownerGroup=None):
"""Set the clients for per transformation credentials.
Returns the clients used in the threads - this is another function that should be extended.
The clients provided here are defaults, and should be adapted
"""
threadTransformationClient = TransformationClient()
threadTaskManager = WorkflowTasks(ownerDN=ownerDN, ownerGroup=ownerGroup)
threadTaskManager.pluginLocation = self.pluginLocation

return {'TransformationClient': threadTransformationClient,
'TaskManager': threadTaskManager}

def _execute(self, threadID):
""" This is what runs inside the threads, in practice this is the function that does the real stuff
"""
# Each thread will have its own clients
clients = self._getClients()
# Each thread will have its own clients if we use credentials/shifterProxy
clients = self._getClients() if self.credentials else None
method = '_execute'
operation = 'None'

Expand All @@ -275,6 +285,9 @@ def _execute(self, threadID):
self._logWarn("Got a transf not in transInQueue...?",
method=method, transID=transID)
break
if not self.credentials:
ownerDN, group = transIDOPBody[transID]['OwnerDN'], transIDOPBody[transID]['OwnerGroup']
clients = self._getPerTransformationClients(ownerDN=ownerDN, ownerGroup=group)
self.transInThread[transID] = ' [Thread%d] [%s] ' % (threadID, str(transID))
self._logInfo("Start processing transformation", method=method, transID=transID)
clients['TaskManager'].transInThread = self.transInThread
Expand Down Expand Up @@ -512,6 +525,9 @@ def submitTasks(self, transIDOPBody, clients):
"""
transID = transIDOPBody.keys()[0]
transBody = transIDOPBody[transID]['Body']
owner = transIDOPBody[transID]['Owner']
ownerGroup = transIDOPBody[transID]['OwnerGroup']
ownerDN = transIDOPBody[transID]['OwnerDN']
method = 'submitTasks'

# Get all tasks to submit
Expand All @@ -533,9 +549,9 @@ def submitTasks(self, transIDOPBody, clients):
# Prepare tasks
preparedTransformationTasks = clients['TaskManager'].prepareTransformationTasks(transBody,
tasks,
self.owner,
self.ownerGroup,
self.ownerDN,
owner,
ownerGroup,
ownerDN,
self.bulkSubmissionFlag)
self._logDebug("prepareTransformationTasks return value:", preparedTransformationTasks,
method=method, transID=transID)
Expand Down Expand Up @@ -563,3 +579,22 @@ def submitTasks(self, transIDOPBody, clients):
return res

return S_OK()

@staticmethod
def _addOperationForTransformations(operationsOnTransformationDict, operation, transformations,
owner=None, ownerGroup=None, ownerDN=None):
"""Fill the operationsOnTransformationDict"""
transformationIDsAndBodies = [(transformation['TransformationID'],
transformation['Body'],
transformation['AuthorDN'],
transformation['AuthorGroup'],
) for transformation in transformations['Value']]
for transID, body, t_ownerDN, t_ownerGroup in transformationIDsAndBodies:
if transID in operationsOnTransformationDict:
operationsOnTransformationDict[transID]['Operations'].append(operation)
else:
operationsOnTransformationDict[transID] = {'Body': body, 'Operations': [operation],
'Owner': owner if owner else getUsernameForDN(t_ownerDN)['Value'],
'OwnerGroup': ownerGroup if owner else t_ownerGroup,
'OwnerDN': ownerDN if owner else t_ownerDN
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ def test_checkReservedTasks( self ):
def test_submitTasks( self ):
clients = {'TransformationClient':self.tc_mock, 'TaskManager':self.tm_mock}

transIDOPBody = {1:{'Operations':['op1', 'op2'], 'Body':'veryBigBody'}}
transIDOPBody = {1: {'Operations': ['op1', 'op2'], 'Body': 'veryBigBody',
'Owner': 'prodMan', 'OwnerDN': '/ca=man/user=prodMan', 'OwnerGroup': 'prodMans'}}

# errors getting
self.tc_mock.getTasksToSubmit.return_value = {'OK': False, 'Message': 'a mess'}
Expand Down
22 changes: 17 additions & 5 deletions TransformationSystem/Client/TaskManager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" TaskManager contains WorkflowsTasks and RequestTasks modules, for managing jobs and requests tasks
""" TaskManager contains WorkflowTasks and RequestTasks modules, for managing jobs and requests tasks
"""

__RCSID__ = "$Id$"
Expand Down Expand Up @@ -108,7 +108,9 @@ class RequestTasks(TaskBase):
"""

def __init__(self, transClient=None, logger=None, requestClient=None,
requestClass=None, requestValidator=None):
requestClass=None, requestValidator=None,
ownerDN=None, ownerGroup=None,
):
""" c'tor
the requestClass is by default Request.
Expand All @@ -120,9 +122,13 @@ def __init__(self, transClient=None, logger=None, requestClient=None,
logger = gLogger.getSubLogger('RequestTasks')

super(RequestTasks, self).__init__(transClient, logger)
useCertificates = True if (bool(ownerDN) and bool(ownerGroup)) else None

if not requestClient:
self.requestClient = ReqClient()
self.requestClient = ReqClient(useCertificates=useCertificates,
delegatedDN=ownerDN,
delegatedGroup=ownerGroup,
)
else:
self.requestClient = requestClient

Expand Down Expand Up @@ -417,7 +423,9 @@ class WorkflowTasks(TaskBase):
"""

def __init__(self, transClient=None, logger=None, submissionClient=None, jobMonitoringClient=None,
outputDataModule=None, jobClass=None, opsH=None, destinationPlugin=None):
outputDataModule=None, jobClass=None, opsH=None, destinationPlugin=None,
ownerDN=None, ownerGroup=None,
):
""" Generates some default objects.
jobClass is by default "DIRAC.Interfaces.API.Job.Job". An extension of it also works:
VOs can pass in their job class extension, if present
Expand All @@ -428,8 +436,12 @@ def __init__(self, transClient=None, logger=None, submissionClient=None, jobMoni

super(WorkflowTasks, self).__init__(transClient, logger)

useCertificates = True if (bool(ownerDN) and bool(ownerGroup)) else None
if not submissionClient:
self.submissionClient = WMSClient()
self.submissionClient = WMSClient(useCertificates=useCertificates,
delegatedDN=ownerDN,
delegatedGroup=ownerGroup,
)
else:
self.submissionClient = submissionClient

Expand Down
44 changes: 19 additions & 25 deletions WorkloadManagementSystem/Client/WMSClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,36 @@ class WMSClient(object):
"""

def __init__(self, jobManagerClient=None, sbRPCClient=None, sbTransferClient=None,
useCertificates=False, timeout=600):
useCertificates=False, timeout=600, delegatedDN=None, delegatedGroup=None):
""" WMS Client constructor
Here we also initialize the needed clients and connections
"""

self.useCertificates = useCertificates
self.delegatedDN = delegatedDN
self.delegatedGroup = delegatedGroup
self.timeout = timeout
self.jobManager = jobManagerClient
self._jobManager = jobManagerClient
self.operationsHelper = Operations()
self.sandboxClient = None
if sbRPCClient and sbTransferClient:
self.sandboxClient = SandboxStoreClient(rpcClient=sbRPCClient,
transferClient=sbTransferClient,
useCertificates=useCertificates)

@property
def jobManager(self):
if not self._jobManager:
self._jobManager = RPCClient('WorkloadManagement/JobManager',
useCertificates=self.useCertificates,
delegatedDN=self.delegatedDN,
delegatedGroup=self.delegatedGroup,
timeout=self.timeout)

return self._jobManager


###############################################################################

def __getInputSandboxEntries(self, classAdJob):
Expand Down Expand Up @@ -109,7 +123,9 @@ def __uploadInputSandbox(self, classAdJob, jobDescriptionObject=None):

if okFiles:
if not self.sandboxClient:
self.sandboxClient = SandboxStoreClient(useCertificates=self.useCertificates)
self.sandboxClient = SandboxStoreClient(useCertificates=self.useCertificates,
delegatedDN=self.delegatedDN,
delegatedGroup=self.delegatedGroup)
result = self.sandboxClient.uploadFilesAsSandbox(okFiles)
if not result['OK']:
return result
Expand Down Expand Up @@ -159,12 +175,6 @@ def submitJob(self, jdl, jobDescriptionObject=None):
return result
nJobs = result['Value']
parametricJob = nJobs > 0

if not self.jobManager:
self.jobManager = RPCClient('WorkloadManagement/JobManager',
useCertificates=self.useCertificates,
timeout=self.timeout)

result = self.jobManager.submitJob(classAdJob.asJDL())

if parametricJob:
Expand Down Expand Up @@ -200,38 +210,22 @@ def killJob(self, jobID):
""" Kill running job.
jobID can be an integer representing a single DIRAC job ID or a list of IDs
"""
if not self.jobManager:
self.jobManager = RPCClient('WorkloadManagement/JobManager',
useCertificates=self.useCertificates,
timeout=self.timeout)
return self.jobManager.killJob(jobID)

def deleteJob(self, jobID):
""" Delete job(s) from the WMS Job database.
jobID can be an integer representing a single DIRAC job ID or a list of IDs
"""
if not self.jobManager:
self.jobManager = RPCClient('WorkloadManagement/JobManager',
useCertificates=self.useCertificates,
timeout=self.timeout)
return self.jobManager.deleteJob(jobID)

def rescheduleJob(self, jobID):
""" Reschedule job(s) in WMS Job database.
jobID can be an integer representing a single DIRAC job ID or a list of IDs
"""
if not self.jobManager:
self.jobManager = RPCClient('WorkloadManagement/JobManager',
useCertificates=self.useCertificates,
timeout=self.timeout)
return self.jobManager.rescheduleJob(jobID)

def resetJob(self, jobID):
""" Reset job(s) in WMS Job database.
jobID can be an integer representing a single DIRAC job ID or a list of IDs
"""
if not self.jobManager:
self.jobManager = RPCClient('WorkloadManagement/JobManager',
useCertificates=self.useCertificates,
timeout=self.timeout)
return self.jobManager.resetJob(jobID)

0 comments on commit d416f69

Please sign in to comment.