Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v6r20] MultiVO TransformationSystem #3723

Merged
merged 13 commits into from
Jul 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Core/DISET/TransferClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ def sendBulk( self, fileList, bulkId, token = "", compress = True, bulkSize = -1
retVal = fileHelper.bulkToNetwork( fileList, compress, onthefly )
if not retVal[ 'OK' ]:
return retVal
retVal = transport.receiveData() return retVal
retVal = transport.receiveData()
return retVal
finally:
self._disconnect( trid )

Expand Down
77 changes: 69 additions & 8 deletions TransformationSystem/Agent/RequestTaskAgent.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,64 @@
""" The Request Task Agent takes request tasks created in the transformation database
and submits to the request management system
"""The Request Task Agent takes request tasks created in the
TransformationDB and submits to the request management system.

The following options can be set for the RequestTaskAgent.

::

RequestTaskAgent
{
# Use a dedicated proxy to submit requests to the RMS
shifterProxy=DataManager
# Use delegated credentials. Use this instead of the shifterProxy option (New in v6r20p5)
ShifterCredentials=
# Transformation types to be taken into account by the agent
TransType=Replication,Removal
# Location of the transformation plugins
PluginLocation=DIRAC.TransformationSystem.Client.TaskManagerPlugin
# maximum number of threads to use in this agent
maxNumberOfThreads=15

# Give this option a value if the agent should submit Requests
SubmitTasks=
# Status of transformations for which to submit Requests
SubmitStatus=Active,Completing
# Number of tasks to submit in one execution cycle per transformation
TasksPerLoop=50

# Give this option a value if the agent should monitor tasks
MonitorTasks=
# Status of transformations for which to monitor tasks
UpdateTasksStatus = Active,Completing,Stopped
# Task statuses considered transient that should be monitored for updates
TaskUpdateStatus=Checking,Deleted,Killed,Staging,Stalled,Matched
TaskUpdateStatus+=Scheduled,Rescheduled,Completed,Submitted
TaskUpdateStatus+=Assigned,Received,Waiting,Running
# Number of tasks to be updated in one call
TaskUpdateChunkSize=0

# Give this option a value if the agent should monitor files
MonitorFiles=
# Status of transformations for which to monitor Files
UpdateFilesStatus=Active,Completing,Stopped

# Give this option a value if the agent should check Reserved tasks
CheckReserved=
# Status of transformations for which to check reserved tasks
CheckReservedStatus= Active,Completing,Stopped

}

* The options *SubmitTasks*, *MonitorTasks*, *MonitorFiles*, and *CheckReserved*
need to be assigned any non-empty value to be activated

* .. versionadded:: v6r20p5

It is possible to run the RequestTaskAgent without a *shifterProxy* or
*ShifterCredentials*, in this case the credentials of the authors of the
transformations are used to submit the jobs to the RMS. This enables the use of
a single RequestTaskAgent for multiple VOs. See also the section about the
:ref:`trans-multi-vo`.

"""

from DIRAC import S_OK
Expand Down Expand Up @@ -32,8 +91,6 @@ def initialize(self):
if not res['OK']:
return res

self.am_setOption('shifterProxy', 'DataManager')

# clients
self.taskManager = RequestTasks(transClient=self.transClient)

Expand All @@ -45,10 +102,14 @@ def initialize(self):

return S_OK()

def _getClients(self):
""" Here the taskManager becomes a RequestTasks object
def _getClients(self, ownerDN=None, ownerGroup=None):
"""Set the clients for task submission.

Here the taskManager becomes a RequestTasks object.

See :func:`DIRAC.TransformationSystem.TaskManagerAgentBase._getClients`.
"""
res = TaskManagerAgentBase._getClients(self)
threadTaskManager = RequestTasks()
res = super(RequestTaskAgent, self)._getClients(ownerDN=ownerDN, ownerGroup=ownerGroup)
threadTaskManager = RequestTasks(ownerDN=ownerDN, ownerGroup=ownerGroup)
res.update({'TaskManager': threadTaskManager})
return res
144 changes: 96 additions & 48 deletions TransformationSystem/Agent/TaskManagerAgentBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from DIRAC import S_OK

from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername, getUsernameForDN
from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Utilities.ThreadPool import ThreadPool
Expand Down Expand Up @@ -47,10 +49,6 @@ def __init__(self, *args, **kwargs):

self.tasksPerLoop = 50

self.owner = ''
self.ownerGroup = ''
self.ownerDN = ''

self.pluginLocation = ''
self.bulkSubmissionFlag = False

Expand Down Expand Up @@ -81,6 +79,13 @@ def initialize(self):
# Bulk submission flag
self.bulkSubmissionFlag = self.am_getOption('BulkSubmission', False)

# Shifter credentials to use, could replace the use of shifterProxy eventually
self.shifterProxy = self.am_getOption('shifterProxy', None)
self.credentials = self.am_getOption('ShifterCredentials', None)
self.credTuple = (None, None, None)
resCred = self.__getCredentials()
if not resCred['OK']:
return resCred
# setting up the threading
maxNumberOfThreads = self.am_getOption('maxNumberOfThreads', 15)
threadPool = ThreadPool(maxNumberOfThreads, maxNumberOfThreads)
Expand Down Expand Up @@ -110,6 +115,19 @@ def execute(self):
"""

operationsOnTransformationDict = {}
owner, ownerGroup, ownerDN = None, None, None
# getting the credentials for submission
resProxy = getProxyInfo(proxy=False, disableVOMS=False)
if resProxy['OK']: # there is a shifterProxy
proxyInfo = resProxy['Value']
owner = proxyInfo['username']
ownerGroup = proxyInfo['group']
ownerDN = proxyInfo['identity']
self.log.info("ShifterProxy: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup))
elif self.credentials:
owner, ownerGroup, ownerDN = self.credTuple
else:
self.log.info("Using per Transformation Credentials!")

# Determine whether the task status is to be monitored and updated
enableTaskMonitor = self.am_getOption('MonitorTasks', '')
Expand All @@ -123,10 +141,8 @@ def execute(self):
if not transformations['OK']:
self.log.warn("Could not select transformations:", transformations['Message'])
else:
transformationIDsAndBodies = dict((transformation['TransformationID'],
transformation['Body']) for transformation in transformations['Value'])
for transID, body in transformationIDsAndBodies.iteritems():
operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['updateTaskStatus']}
self._addOperationForTransformations(operationsOnTransformationDict, 'updateTaskStatus', transformations,
owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN)

# Determine whether the task files status is to be monitored and updated
enableFileMonitor = self.am_getOption('MonitorFiles', '')
Expand All @@ -140,13 +156,8 @@ def execute(self):
if not transformations['OK']:
self.log.warn("Could not select transformations:", transformations['Message'])
else:
transformationIDsAndBodies = dict((transformation['TransformationID'],
transformation['Body']) for transformation in transformations['Value'])
for transID, body in transformationIDsAndBodies.iteritems():
if transID in operationsOnTransformationDict:
operationsOnTransformationDict[transID]['Operations'].append('updateFileStatus')
else:
operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['updateFileStatus']}
self._addOperationForTransformations(operationsOnTransformationDict, 'updateFileStatus', transformations,
owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN)

# Determine whether the checking of reserved tasks is to be performed
enableCheckReserved = self.am_getOption('CheckReserved', '')
Expand All @@ -160,29 +171,14 @@ def execute(self):
if not transformations['OK']:
self.log.warn("Could not select transformations:", transformations['Message'])
else:
transformationIDsAndBodies = dict((transformation['TransformationID'],
transformation['Body']) for transformation in transformations['Value'])
for transID, body in transformationIDsAndBodies.iteritems():
if transID in operationsOnTransformationDict:
operationsOnTransformationDict[transID]['Operations'].append('checkReservedTasks')
else:
operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['checkReservedTasks']}
self._addOperationForTransformations(operationsOnTransformationDict, 'checkReservedTasks', transformations,
owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN)

# Determine whether the submission of tasks is to be performed
enableSubmission = self.am_getOption('SubmitTasks', '')
if not enableSubmission:
self.log.verbose("Submission of tasks is disabled. To enable it, create the 'SubmitTasks' option")
else:
# getting the credentials for submission
res = getProxyInfo(False, False)
if not res['OK']:
self.log.error("Failed to determine credentials for submission", res['Message'])
return res
proxyInfo = res['Value']
self.owner = proxyInfo['username']
self.ownerGroup = proxyInfo['group']
self.ownerDN = proxyInfo['identity']
self.log.info("Tasks will be submitted with the credentials %s:%s" % (self.owner, self.ownerGroup))
# Get the transformations for which the check of reserved tasks have to be performed
status = self.am_getOption('SubmitTransformationStatus',
self.am_getOption('SubmitStatus', ['Active', 'Completing']))
Expand All @@ -192,13 +188,10 @@ def execute(self):
else:
# Get the transformations which should be submitted
self.tasksPerLoop = self.am_getOption('TasksPerLoop', self.tasksPerLoop)
transformationIDsAndBodies = dict((transformation['TransformationID'],
transformation['Body']) for transformation in transformations['Value'])
for transID, body in transformationIDsAndBodies.iteritems():
if transID in operationsOnTransformationDict:
operationsOnTransformationDict[transID]['Operations'].append('submitTasks')
else:
operationsOnTransformationDict[transID] = {'Body': body, 'Operations': ['submitTasks']}
self._addOperationForTransformations(operationsOnTransformationDict, 'submitTasks', transformations,
owner=owner, ownerGroup=ownerGroup, ownerDN=ownerDN)



self._fillTheQueue(operationsOnTransformationDict)

Expand Down Expand Up @@ -242,13 +235,21 @@ def _fillTheQueue(self, operationsOnTransformationsDict):

#############################################################################

def _getClients(self):
""" returns the clients used in the threads - this is another function that should be extended.
def _getClients(self, ownerDN=None, ownerGroup=None):
"""Returns the clients used in the threads

This is another function that should be extended.

The clients provided here are defaults, and should be adapted

If ownerDN and ownerGroup are not None the clients will delegate to these credentials

The clients provided here are defaults, and should be adapted
:param str ownerDN: DN of the owner of the submitted jobs
:param str ownerGroup: group of the owner of the submitted jobs
:returns: dict of Clients
"""
threadTransformationClient = TransformationClient()
threadTaskManager = WorkflowTasks() # this is for wms tasks, replace it with something else if needed
threadTaskManager = WorkflowTasks(ownerDN=ownerDN, ownerGroup=ownerGroup)
threadTaskManager.pluginLocation = self.pluginLocation

return {'TransformationClient': threadTransformationClient,
Expand All @@ -257,9 +258,12 @@ def _getClients(self):
def _execute(self, threadID):
""" This is what runs inside the threads, in practice this is the function that does the real stuff
"""
# Each thread will have its own clients
clients = self._getClients()
# Each thread will have its own clients if we use credentials/shifterProxy
clients = self._getClients() if self.shifterProxy else \
self._getClients(ownerGroup=self.credTuple[1], ownerDN=self.credTuple[2]) if self.credentials \
else None
method = '_execute'
operation = 'None'

while True:
startTime = time.time()
Expand All @@ -274,6 +278,9 @@ def _execute(self, threadID):
self._logWarn("Got a transf not in transInQueue...?",
method=method, transID=transID)
break
if not (self.credentials or self.shifterProxy):
ownerDN, group = transIDOPBody[transID]['OwnerDN'], transIDOPBody[transID]['OwnerGroup']
clients = self._getClients(ownerDN=ownerDN, ownerGroup=group)
self.transInThread[transID] = ' [Thread%d] [%s] ' % (threadID, str(transID))
self._logInfo("Start processing transformation", method=method, transID=transID)
clients['TaskManager'].transInThread = self.transInThread
Expand Down Expand Up @@ -511,6 +518,9 @@ def submitTasks(self, transIDOPBody, clients):
"""
transID = transIDOPBody.keys()[0]
transBody = transIDOPBody[transID]['Body']
owner = transIDOPBody[transID]['Owner']
ownerGroup = transIDOPBody[transID]['OwnerGroup']
ownerDN = transIDOPBody[transID]['OwnerDN']
method = 'submitTasks'

# Get all tasks to submit
Expand All @@ -532,9 +542,9 @@ def submitTasks(self, transIDOPBody, clients):
# Prepare tasks
preparedTransformationTasks = clients['TaskManager'].prepareTransformationTasks(transBody,
tasks,
self.owner,
self.ownerGroup,
self.ownerDN,
owner,
ownerGroup,
ownerDN,
self.bulkSubmissionFlag)
self._logDebug("prepareTransformationTasks return value:", preparedTransformationTasks,
method=method, transID=transID)
Expand Down Expand Up @@ -562,3 +572,41 @@ def submitTasks(self, transIDOPBody, clients):
return res

return S_OK()

@staticmethod
def _addOperationForTransformations(operationsOnTransformationDict, operation, transformations,
owner=None, ownerGroup=None, ownerDN=None):
"""Fill the operationsOnTransformationDict"""
transformationIDsAndBodies = [(transformation['TransformationID'],
transformation['Body'],
transformation['AuthorDN'],
transformation['AuthorGroup'],
) for transformation in transformations['Value']]
for transID, body, t_ownerDN, t_ownerGroup in transformationIDsAndBodies:
if transID in operationsOnTransformationDict:
operationsOnTransformationDict[transID]['Operations'].append(operation)
else:
operationsOnTransformationDict[transID] = {'Body': body, 'Operations': [operation],
'Owner': owner if owner else getUsernameForDN(t_ownerDN)['Value'],
'OwnerGroup': ownerGroup if owner else t_ownerGroup,
'OwnerDN': ownerDN if owner else t_ownerDN
}

def __getCredentials(self):
"""Get the credentials to use if ShifterCredentials are set, otherwise do nothing.

This function fills the self.credTuple tuple.
"""
if not self.credentials:
return S_OK()
resCred = Operations().getOptionsDict("/Shifter/%s" % self.credentials)
if not resCred['OK']:
self.log.error("Cred: Failed to find shifter credentials", self.credentials)
return resCred
owner = resCred['Value']['User']
ownerGroup = resCred['Value']['Group']
# returns a list
ownerDN = getDNForUsername(owner)['Value'][0]
self.credTuple = (owner, ownerGroup, ownerDN)
self.log.info("Cred: Tasks will be submitted with the credentials %s:%s" % (owner, ownerGroup))
return S_OK()
Loading