Skip to content

Commit

Permalink
Merge pull request #11588 from amaltaro/fix-10401
Browse files Browse the repository at this point in the history
Add GPU support to the StepChain spec
  • Loading branch information
amaltaro authored May 17, 2023
2 parents d64a483 + fb1397a commit 6bb9318
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 3 deletions.
28 changes: 25 additions & 3 deletions src/python/WMCore/WMSpec/StdSpecs/StepChain.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
made available.
"""
from __future__ import division

import json

from future.utils import viewitems
from builtins import range

from Utils.Utilities import strToBool
import WMCore.WMSpec.Steps.StepFactory as StepFactory
from WMCore.Lexicon import primdataset, taskStepName
from WMCore.Lexicon import primdataset, taskStepName, gpuParameters
from WMCore.WMSpec.StdSpecs.StdBase import StdBase
from WMCore.WMSpec.WMWorkloadTools import (validateArgumentsCreate, parsePileupConfig,
checkMemCore, checkEventStreams, checkTimePerEvent)
Expand Down Expand Up @@ -321,9 +324,17 @@ def setupNextSteps(self, task, origArgs):
multicore = taskConf['Multicore']
if taskConf.get("EventStreams") is not None and taskConf['EventStreams'] >= 0:
eventStreams = taskConf['EventStreams']

currentCmsswStepHelper.setNumberOfCores(multicore, eventStreams)

# GPU settings
gpuRequired = self.requiresGPU
gpuParams = json.loads(taskConf.get('GPUParams', 'null'))
if taskConf.get('RequiresGPU', None):
gpuRequired = taskConf['RequiresGPU']
if "GPUParams" not in taskConf:
gpuParams = json.loads(self.gPUParams)
currentCmsswStepHelper.setGPUSettings(gpuRequired, gpuParams)

# Pileup check
taskConf["PileupConfig"] = parsePileupConfig(taskConf["MCPileup"], taskConf["DataPileup"])
if taskConf["PileupConfig"]:
Expand Down Expand Up @@ -484,7 +495,11 @@ def getWorkloadCreateArgs():
"TimePerEvent": {"default": 12.0, "type": float, "validate": lambda x: x > 0},
"Memory": {"default": 2300.0, "type": float, "validate": lambda x: x > 0},
"Multicore": {"default": 1, "type": int, "validate": checkMemCore},
"EventStreams": {"type": int, "null": True, "default": 0, "validate": checkEventStreams}
"EventStreams": {"type": int, "null": True, "default": 0, "validate": checkEventStreams},
# no need for workload-level defaults, if task-level default is provided
"RequiresGPU": {"default": None, "null": True,
"validate": lambda x: x in ("forbidden", "optional", "required")},
"GPUParams": {"default": json.dumps(None), "validate": gpuParameters},
}
baseArgs.update(specArgs)
StdBase.setDefaultArgumentsProperty(baseArgs)
Expand Down Expand Up @@ -554,6 +569,11 @@ def validateSchema(self, schema):
msg += "You probably want to remove that step completely and try again."
self.raiseValidationException(msg=msg)

try:
StdBase.validateGPUSettings(schema)
except Exception as ex:
self.raiseValidationException(str(ex))

outputModTier = []
for i in range(1, numSteps + 1):
stepNumber = "Step%s" % i
Expand Down Expand Up @@ -603,6 +623,8 @@ def validateStep(self, taskConf, taskArgumentDefinition):
"""
try:
validateArgumentsCreate(taskConf, taskArgumentDefinition, checkInputDset=False)
# Validate GPU-related spec parameters
StdBase.validateGPUSettings(taskConf)
except WMSpecFactoryException:
# just re-raise it to keep the error message clear
raise
Expand Down
10 changes: 10 additions & 0 deletions src/python/WMCore/WMSpec/Steps/Templates/CMSSW.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,16 @@ def setGPUSettings(self, requiresGPU, gpuParams):
self.data.application.gpu.gpuRequired = requiresGPU
self.data.application.gpu.gpuRequirements = gpuParams

def getGPUSettings(self):
"""
Return the GPU configuration for this CMSSW step
:return: a tuple with:
* string whether GPU is required or not
* dictionary with the GPU requirements (or None)
"""
return (self.data.application.gpu.gpuRequired,
self.data.application.gpu.gpuRequirements)


class CMSSW(Template):
"""
Expand Down
159 changes: 159 additions & 0 deletions test/python/WMCore_t/WMSpec_t/StdSpecs_t/StepChain_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"""
from __future__ import print_function

import json

from future.utils import viewitems, listvalues

from builtins import range
Expand Down Expand Up @@ -2418,6 +2420,163 @@ def testRunlist(self):
with self.assertRaises(WMSpecFactoryException):
factory.factoryWorkloadConstruction("TestWorkload", arguments)

def testGPUStepChains(self):
"""
Test GPU support in StepChains, top level settings only
"""
testArguments = StepChainWorkloadFactory.getTestArguments()
testArguments.update(deepcopy(REQUEST))

configDocs = injectStepChainConfigMC(self.configDatabase)
for s in ['Step1', 'Step2', 'Step3']:
testArguments[s]['ConfigCacheID'] = configDocs[s]
testArguments['Step2']['KeepOutput'] = False

factory = StepChainWorkloadFactory()
testWorkload = factory.factoryWorkloadConstruction("TestWorkload", testArguments)
self.assertIsNone(testArguments['RequiresGPU'])
self.assertEqual(testArguments['GPUParams'], json.dumps(None))
for stepKey in ['Step1', 'Step2', 'Step3']:
self.assertTrue("RequiresGPU" not in testArguments[stepKey])
self.assertTrue("GPUParams" not in testArguments[stepKey])

for taskName in testWorkload.listAllTaskNames():
taskObj = testWorkload.getTaskByName(taskName)
for stepName in taskObj.listAllStepNames():
stepHelper = taskObj.getStepHelper(stepName)
if stepHelper.stepType() == "CMSSW":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, "forbidden")
self.assertIsNone(stepHelper.data.application.gpu.gpuRequirements)
else:
self.assertFalse(hasattr(stepHelper.data.application, "gpu"))

# test assignment with wrong Trust flags
assignDict = {"SiteWhitelist": ["T2_US_Nebraska"], "Team": "The-A-Team",
"RequestStatus": "assigned"}
testWorkload.updateArguments(assignDict)

self.assertIsNone(testArguments['RequiresGPU'])
self.assertEqual(testArguments['GPUParams'], json.dumps(None))
for stepKey in ['Step1', 'Step2', 'Step3']:
self.assertTrue("RequiresGPU" not in testArguments[stepKey])
self.assertTrue("GPUParams" not in testArguments[stepKey])

for taskName in testWorkload.listAllTaskNames():
taskObj = testWorkload.getTaskByName(taskName)
for stepName in taskObj.listAllStepNames():
stepHelper = taskObj.getStepHelper(stepName)
if stepHelper.stepType() == "CMSSW":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, "forbidden")
self.assertIsNone(stepHelper.data.application.gpu.gpuRequirements)
else:
self.assertFalse(hasattr(stepHelper.data.application, "gpu"))

# last but not least, test a failing case
testArguments['RequiresGPU'] = "required"
testArguments['GPUParams'] = json.dumps(None)
with self.assertRaises(WMSpecFactoryException):
factory.factoryWorkloadConstruction("PullingTheChain", testArguments)

def testGPUStepChainsTasks(self):
"""
Test GPU support in StepChains, with task-level settings
"""
testArguments = StepChainWorkloadFactory.getTestArguments()
testArguments.update(deepcopy(REQUEST))

configDocs = injectStepChainConfigMC(self.configDatabase)
for s in ['Step1', 'Step2', 'Step3']:
testArguments[s]['ConfigCacheID'] = configDocs[s]
testArguments['Step2']['KeepOutput'] = False

gpuParams = {"GPUMemoryMB": 1234, "CUDARuntime": "11.2.3", "CUDACapabilities": ["7.5", "8.0"]}
testArguments['Step1'].update({"RequiresGPU": "optional", "GPUParams": json.dumps(gpuParams)})
testArguments['Step2'].update({"RequiresGPU": "required", "GPUParams": json.dumps(gpuParams)})
factory = StepChainWorkloadFactory()
testWorkload = factory.factoryWorkloadConstruction("TestWorkload", testArguments)

# validate requires GPU
self.assertIsNone(testArguments['RequiresGPU'])
self.assertEqual(testArguments["Step1"]['RequiresGPU'], "optional")
self.assertEqual(testArguments["Step2"]['RequiresGPU'], "required")
self.assertTrue("RequiresGPU" not in testArguments["Step3"])

# validate GPU parameters
self.assertEqual(testArguments['GPUParams'], json.dumps(None))
self.assertEqual(testArguments["Step1"]['GPUParams'], json.dumps(gpuParams))
self.assertEqual(testArguments["Step2"]['GPUParams'], json.dumps(gpuParams))
self.assertTrue("GPUParams" not in testArguments["Step3"])

for taskName in testWorkload.listAllTaskNames():
taskObj = testWorkload.getTaskByName(taskName)
for stepName in taskObj.listAllStepNames():
stepHelper = taskObj.getStepHelper(stepName)
if taskObj.taskType() in ["Merge", "Harvesting", "Cleanup", "LogCollect"]:
if stepHelper.stepType() == "CMSSW":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, "forbidden")
self.assertIsNone(stepHelper.data.application.gpu.gpuRequirements)
else:
self.assertFalse(hasattr(stepHelper.data.application, "gpu"))
elif stepHelper.stepType() == "CMSSW" and taskName == "GENSIM":
if stepHelper.name() == "cmsRun1":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, testArguments["Step1"]['RequiresGPU'])
self.assertItemsEqual(stepHelper.data.application.gpu.gpuRequirements, gpuParams)
elif stepHelper.name() == "cmsRun2":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, testArguments["Step2"]['RequiresGPU'])
self.assertItemsEqual(stepHelper.data.application.gpu.gpuRequirements, gpuParams)
elif stepHelper.name() == "cmsRun3":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, "forbidden")
self.assertIsNone(stepHelper.data.application.gpu.gpuRequirements)
elif stepHelper.stepType() == "CMSSW":
raise RuntimeError("Should not reach this code")
else:
self.assertFalse(hasattr(stepHelper.data.application, "gpu"))

prodTask = testWorkload.getTask('GENSIM')
gpuRequired, gpuRequirements = prodTask.getStepHelper('cmsRun1').getGPUSettings()
self.assertEqual(gpuRequired, testArguments["Step1"]['RequiresGPU'])
self.assertItemsEqual(gpuRequirements, gpuParams)

gpuRequired, gpuRequirements = prodTask.getStepHelper('cmsRun2').getGPUSettings()
self.assertEqual(gpuRequired, testArguments["Step2"]['RequiresGPU'])
self.assertItemsEqual(gpuRequirements, gpuParams)

gpuRequired, gpuRequirements = prodTask.getStepHelper('cmsRun3').getGPUSettings()
self.assertEqual(gpuRequired, testArguments["Step3"].get('RequiresGPU', "forbidden"))
self.assertIsNone(gpuRequirements)



# test assignment with wrong Trust flags
assignDict = {"SiteWhitelist": ["T2_US_Nebraska"], "Team": "The-A-Team",
"RequestStatus": "assigned"}
testWorkload.updateArguments(assignDict)

# validate requires GPU
self.assertIsNone(testArguments['RequiresGPU'])
self.assertEqual(testArguments["Step1"]['RequiresGPU'], "optional")
self.assertEqual(testArguments["Step2"]['RequiresGPU'], "required")
self.assertTrue("RequiresGPU" not in testArguments["Step3"])

# validate GPU parameters
self.assertEqual(testArguments['GPUParams'], json.dumps(None))
self.assertEqual(testArguments["Step1"]['GPUParams'], json.dumps(gpuParams))
self.assertEqual(testArguments["Step2"]['GPUParams'], json.dumps(gpuParams))
self.assertTrue("GPUParams" not in testArguments["Step3"])

prodTask = testWorkload.getTask('GENSIM')
gpuRequired, gpuRequirements = prodTask.getStepHelper('cmsRun1').getGPUSettings()
self.assertEqual(gpuRequired, testArguments["Step1"]['RequiresGPU'])
self.assertItemsEqual(gpuRequirements, gpuParams)

gpuRequired, gpuRequirements = prodTask.getStepHelper('cmsRun2').getGPUSettings()
self.assertEqual(gpuRequired, testArguments["Step2"]['RequiresGPU'])
self.assertItemsEqual(gpuRequirements, gpuParams)

gpuRequired, gpuRequirements = prodTask.getStepHelper('cmsRun3').getGPUSettings()
self.assertEqual(gpuRequired, testArguments["Step3"].get('RequiresGPU', "forbidden"))
self.assertIsNone(gpuRequirements)


if __name__ == '__main__':
unittest.main()

0 comments on commit 6bb9318

Please sign in to comment.