Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GPU support to the StepChain spec #11588

Merged
merged 2 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions src/python/WMCore/WMSpec/StdSpecs/StepChain.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
made available.
"""
from __future__ import division

import json

from future.utils import viewitems
from builtins import range

from Utils.Utilities import strToBool
import WMCore.WMSpec.Steps.StepFactory as StepFactory
from WMCore.Lexicon import primdataset, taskStepName
from WMCore.Lexicon import primdataset, taskStepName, gpuParameters
from WMCore.WMSpec.StdSpecs.StdBase import StdBase
from WMCore.WMSpec.WMWorkloadTools import (validateArgumentsCreate, parsePileupConfig,
checkMemCore, checkEventStreams, checkTimePerEvent)
Expand Down Expand Up @@ -321,9 +324,17 @@ def setupNextSteps(self, task, origArgs):
multicore = taskConf['Multicore']
if taskConf.get("EventStreams") is not None and taskConf['EventStreams'] >= 0:
eventStreams = taskConf['EventStreams']

currentCmsswStepHelper.setNumberOfCores(multicore, eventStreams)

# GPU settings
gpuRequired = self.requiresGPU
gpuParams = json.loads(taskConf.get('GPUParams', 'null'))
if taskConf.get('RequiresGPU', None):
gpuRequired = taskConf['RequiresGPU']
if "GPUParams" not in taskConf:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @amaltaro,
But in this line here, you may still hit exactly the same set of exceptions, because the 'GPUParams' key may be present in taskConf but have an empty string as a value. Maybe we rely on some previous validation for not having empty or misconfigured values for those parameters... I think the safer way would be to have all checks related to the gpuParams enclosed in a single if/then/else block. But anyway, up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be covered by this attribute definition:
https://github.com/dmwm/WMCore/blob/master/src/python/WMCore/WMSpec/StdSpecs/StdBase.py#L1228

which already does the data type validation as well.

gpuParams = json.loads(self.gPUParams)
currentCmsswStepHelper.setGPUSettings(gpuRequired, gpuParams)

# Pileup check
taskConf["PileupConfig"] = parsePileupConfig(taskConf["MCPileup"], taskConf["DataPileup"])
if taskConf["PileupConfig"]:
Expand Down Expand Up @@ -484,7 +495,11 @@ def getWorkloadCreateArgs():
"TimePerEvent": {"default": 12.0, "type": float, "validate": lambda x: x > 0},
"Memory": {"default": 2300.0, "type": float, "validate": lambda x: x > 0},
"Multicore": {"default": 1, "type": int, "validate": checkMemCore},
"EventStreams": {"type": int, "null": True, "default": 0, "validate": checkEventStreams}
"EventStreams": {"type": int, "null": True, "default": 0, "validate": checkEventStreams},
# no need for workload-level defaults, if task-level default is provided
"RequiresGPU": {"default": None, "null": True,
"validate": lambda x: x in ("forbidden", "optional", "required")},
"GPUParams": {"default": json.dumps(None), "validate": gpuParameters},
}
baseArgs.update(specArgs)
StdBase.setDefaultArgumentsProperty(baseArgs)
Expand Down Expand Up @@ -554,6 +569,11 @@ def validateSchema(self, schema):
msg += "You probably want to remove that step completely and try again."
self.raiseValidationException(msg=msg)

try:
StdBase.validateGPUSettings(schema)
except Exception as ex:
self.raiseValidationException(str(ex))

outputModTier = []
for i in range(1, numSteps + 1):
stepNumber = "Step%s" % i
Expand Down Expand Up @@ -603,6 +623,8 @@ def validateStep(self, taskConf, taskArgumentDefinition):
"""
try:
validateArgumentsCreate(taskConf, taskArgumentDefinition, checkInputDset=False)
# Validate GPU-related spec parameters
StdBase.validateGPUSettings(taskConf)
except WMSpecFactoryException:
# just re-raise it to keep the error message clear
raise
Expand Down
10 changes: 10 additions & 0 deletions src/python/WMCore/WMSpec/Steps/Templates/CMSSW.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,16 @@ def setGPUSettings(self, requiresGPU, gpuParams):
self.data.application.gpu.gpuRequired = requiresGPU
self.data.application.gpu.gpuRequirements = gpuParams

def getGPUSettings(self):
"""
Return the GPU configuration for this CMSSW step
:return: a tuple with:
* string whether GPU is required or not
* dictionary with the GPU requirements (or None)
"""
return (self.data.application.gpu.gpuRequired,
self.data.application.gpu.gpuRequirements)


class CMSSW(Template):
"""
Expand Down
159 changes: 159 additions & 0 deletions test/python/WMCore_t/WMSpec_t/StdSpecs_t/StepChain_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"""
from __future__ import print_function

import json

from future.utils import viewitems, listvalues

from builtins import range
Expand Down Expand Up @@ -2418,6 +2420,163 @@ def testRunlist(self):
with self.assertRaises(WMSpecFactoryException):
factory.factoryWorkloadConstruction("TestWorkload", arguments)

def testGPUStepChains(self):
"""
Test GPU support in StepChains, top level settings only
"""
testArguments = StepChainWorkloadFactory.getTestArguments()
testArguments.update(deepcopy(REQUEST))

configDocs = injectStepChainConfigMC(self.configDatabase)
for s in ['Step1', 'Step2', 'Step3']:
testArguments[s]['ConfigCacheID'] = configDocs[s]
testArguments['Step2']['KeepOutput'] = False

factory = StepChainWorkloadFactory()
testWorkload = factory.factoryWorkloadConstruction("TestWorkload", testArguments)
self.assertIsNone(testArguments['RequiresGPU'])
self.assertEqual(testArguments['GPUParams'], json.dumps(None))
for stepKey in ['Step1', 'Step2', 'Step3']:
self.assertTrue("RequiresGPU" not in testArguments[stepKey])
self.assertTrue("GPUParams" not in testArguments[stepKey])

for taskName in testWorkload.listAllTaskNames():
taskObj = testWorkload.getTaskByName(taskName)
for stepName in taskObj.listAllStepNames():
stepHelper = taskObj.getStepHelper(stepName)
if stepHelper.stepType() == "CMSSW":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, "forbidden")
self.assertIsNone(stepHelper.data.application.gpu.gpuRequirements)
else:
self.assertFalse(hasattr(stepHelper.data.application, "gpu"))

# test assignment with wrong Trust flags
assignDict = {"SiteWhitelist": ["T2_US_Nebraska"], "Team": "The-A-Team",
"RequestStatus": "assigned"}
testWorkload.updateArguments(assignDict)

self.assertIsNone(testArguments['RequiresGPU'])
self.assertEqual(testArguments['GPUParams'], json.dumps(None))
for stepKey in ['Step1', 'Step2', 'Step3']:
self.assertTrue("RequiresGPU" not in testArguments[stepKey])
self.assertTrue("GPUParams" not in testArguments[stepKey])

for taskName in testWorkload.listAllTaskNames():
taskObj = testWorkload.getTaskByName(taskName)
for stepName in taskObj.listAllStepNames():
stepHelper = taskObj.getStepHelper(stepName)
if stepHelper.stepType() == "CMSSW":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, "forbidden")
self.assertIsNone(stepHelper.data.application.gpu.gpuRequirements)
else:
self.assertFalse(hasattr(stepHelper.data.application, "gpu"))

# last but not least, test a failing case
testArguments['RequiresGPU'] = "required"
testArguments['GPUParams'] = json.dumps(None)
with self.assertRaises(WMSpecFactoryException):
factory.factoryWorkloadConstruction("PullingTheChain", testArguments)

def testGPUStepChainsTasks(self):
"""
Test GPU support in StepChains, with task-level settings
"""
testArguments = StepChainWorkloadFactory.getTestArguments()
testArguments.update(deepcopy(REQUEST))

configDocs = injectStepChainConfigMC(self.configDatabase)
for s in ['Step1', 'Step2', 'Step3']:
testArguments[s]['ConfigCacheID'] = configDocs[s]
testArguments['Step2']['KeepOutput'] = False

gpuParams = {"GPUMemoryMB": 1234, "CUDARuntime": "11.2.3", "CUDACapabilities": ["7.5", "8.0"]}
testArguments['Step1'].update({"RequiresGPU": "optional", "GPUParams": json.dumps(gpuParams)})
testArguments['Step2'].update({"RequiresGPU": "required", "GPUParams": json.dumps(gpuParams)})
factory = StepChainWorkloadFactory()
testWorkload = factory.factoryWorkloadConstruction("TestWorkload", testArguments)

# validate requires GPU
self.assertIsNone(testArguments['RequiresGPU'])
self.assertEqual(testArguments["Step1"]['RequiresGPU'], "optional")
self.assertEqual(testArguments["Step2"]['RequiresGPU'], "required")
self.assertTrue("RequiresGPU" not in testArguments["Step3"])

# validate GPU parameters
self.assertEqual(testArguments['GPUParams'], json.dumps(None))
self.assertEqual(testArguments["Step1"]['GPUParams'], json.dumps(gpuParams))
self.assertEqual(testArguments["Step2"]['GPUParams'], json.dumps(gpuParams))
self.assertTrue("GPUParams" not in testArguments["Step3"])

for taskName in testWorkload.listAllTaskNames():
taskObj = testWorkload.getTaskByName(taskName)
for stepName in taskObj.listAllStepNames():
stepHelper = taskObj.getStepHelper(stepName)
if taskObj.taskType() in ["Merge", "Harvesting", "Cleanup", "LogCollect"]:
if stepHelper.stepType() == "CMSSW":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, "forbidden")
self.assertIsNone(stepHelper.data.application.gpu.gpuRequirements)
else:
self.assertFalse(hasattr(stepHelper.data.application, "gpu"))
elif stepHelper.stepType() == "CMSSW" and taskName == "GENSIM":
if stepHelper.name() == "cmsRun1":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, testArguments["Step1"]['RequiresGPU'])
self.assertItemsEqual(stepHelper.data.application.gpu.gpuRequirements, gpuParams)
elif stepHelper.name() == "cmsRun2":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, testArguments["Step2"]['RequiresGPU'])
self.assertItemsEqual(stepHelper.data.application.gpu.gpuRequirements, gpuParams)
elif stepHelper.name() == "cmsRun3":
self.assertEqual(stepHelper.data.application.gpu.gpuRequired, "forbidden")
self.assertIsNone(stepHelper.data.application.gpu.gpuRequirements)
elif stepHelper.stepType() == "CMSSW":
raise RuntimeError("Should not reach this code")
else:
self.assertFalse(hasattr(stepHelper.data.application, "gpu"))

prodTask = testWorkload.getTask('GENSIM')
gpuRequired, gpuRequirements = prodTask.getStepHelper('cmsRun1').getGPUSettings()
self.assertEqual(gpuRequired, testArguments["Step1"]['RequiresGPU'])
self.assertItemsEqual(gpuRequirements, gpuParams)

gpuRequired, gpuRequirements = prodTask.getStepHelper('cmsRun2').getGPUSettings()
self.assertEqual(gpuRequired, testArguments["Step2"]['RequiresGPU'])
self.assertItemsEqual(gpuRequirements, gpuParams)

gpuRequired, gpuRequirements = prodTask.getStepHelper('cmsRun3').getGPUSettings()
self.assertEqual(gpuRequired, testArguments["Step3"].get('RequiresGPU', "forbidden"))
self.assertIsNone(gpuRequirements)



# test assignment with wrong Trust flags
assignDict = {"SiteWhitelist": ["T2_US_Nebraska"], "Team": "The-A-Team",
"RequestStatus": "assigned"}
testWorkload.updateArguments(assignDict)

# validate requires GPU
self.assertIsNone(testArguments['RequiresGPU'])
self.assertEqual(testArguments["Step1"]['RequiresGPU'], "optional")
self.assertEqual(testArguments["Step2"]['RequiresGPU'], "required")
self.assertTrue("RequiresGPU" not in testArguments["Step3"])

# validate GPU parameters
self.assertEqual(testArguments['GPUParams'], json.dumps(None))
self.assertEqual(testArguments["Step1"]['GPUParams'], json.dumps(gpuParams))
self.assertEqual(testArguments["Step2"]['GPUParams'], json.dumps(gpuParams))
self.assertTrue("GPUParams" not in testArguments["Step3"])

prodTask = testWorkload.getTask('GENSIM')
gpuRequired, gpuRequirements = prodTask.getStepHelper('cmsRun1').getGPUSettings()
self.assertEqual(gpuRequired, testArguments["Step1"]['RequiresGPU'])
self.assertItemsEqual(gpuRequirements, gpuParams)

gpuRequired, gpuRequirements = prodTask.getStepHelper('cmsRun2').getGPUSettings()
self.assertEqual(gpuRequired, testArguments["Step2"]['RequiresGPU'])
self.assertItemsEqual(gpuRequirements, gpuParams)

gpuRequired, gpuRequirements = prodTask.getStepHelper('cmsRun3').getGPUSettings()
self.assertEqual(gpuRequired, testArguments["Step3"].get('RequiresGPU', "forbidden"))
self.assertIsNone(gpuRequirements)


if __name__ == '__main__':
unittest.main()