From 7fe60d99ae629000836e009e5607237904fb63ad Mon Sep 17 00:00:00 2001 From: Antonio Linares Date: Mon, 26 Aug 2024 22:20:55 -0400 Subject: [PATCH 1/4] Set specific memory for merge task if specified --- src/python/WMCore/WMSpec/WMTask.py | 15 +++++++++++---- src/python/WMCore/WMSpec/WMWorkload.py | 6 +++++- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/python/WMCore/WMSpec/WMTask.py b/src/python/WMCore/WMSpec/WMTask.py index cf262cf041..b39c673d3f 100644 --- a/src/python/WMCore/WMSpec/WMTask.py +++ b/src/python/WMCore/WMSpec/WMTask.py @@ -596,7 +596,7 @@ def jobSplittingParameters(self, performance=True): return splittingParams - def setJobResourceInformation(self, timePerEvent=None, sizePerEvent=None, memoryReq=None): + def setJobResourceInformation(self, timePerEvent=None, sizePerEvent=None, memoryReq=None, taskType=None): """ _setJobResourceInformation_ @@ -604,7 +604,11 @@ def setJobResourceInformation(self, timePerEvent=None, sizePerEvent=None, memory the three key values are main memory usage, time per processing unit (e.g. time per event) and disk usage per processing unit (e.g. size per event). """ - if self.taskType() in ["Merge", "Cleanup", "LogCollect"]: + + # If task type is specified, it may be that we dont want "Merge" tasks to be ignored, + # or that the task type is not one of the three to ignore + + if self.taskType() in ["Merge", "Cleanup", "LogCollect"] and taskType is None: # don't touch job requirements for these task types return @@ -1215,14 +1219,17 @@ def _setPerformanceMonitorConfig(self): self.monitoring.section_("PerformanceMonitor") return - def setMaxPSS(self, maxPSS): + def setMaxPSS(self, maxPSS, taskType=None): """ _setMaxPSS_ Set MaxPSS performance monitoring for this task. :param maxPSS: maximum Proportional Set Size (PSS) memory consumption in MiB """ - if self.taskType() in ["Merge", "Cleanup", "LogCollect"]: + # If task type is specified, it may be that we dont want "Merge" tasks to be ignored, + # or that the task type is not one of the three to ignore + + if self.taskType() in ["Merge", "Cleanup", "LogCollect"] and taskType is None: # keep the default settings (from StdBase) for these task types return diff --git a/src/python/WMCore/WMSpec/WMWorkload.py b/src/python/WMCore/WMSpec/WMWorkload.py index 7aee473d81..ec797321a2 100644 --- a/src/python/WMCore/WMSpec/WMWorkload.py +++ b/src/python/WMCore/WMSpec/WMWorkload.py @@ -893,9 +893,13 @@ def setMemory(self, memory, initialTask=None): for task in taskIterator: if isinstance(memory, dict): - mem = memory.get(task.name()) + if task.name() in memory: + mem = memory.get(task.name()) + else: + mem = memory.get("default") else: mem = memory + task.setJobResourceInformation(memoryReq=mem) self.setMemory(memory, task) From 797902dd656f4684561f64e522b98a12c68e26da Mon Sep 17 00:00:00 2001 From: Antonio Linares Date: Tue, 27 Aug 2024 16:46:04 -0400 Subject: [PATCH 2/4] revert change in setMemory() --- src/python/WMCore/WMSpec/WMWorkload.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/python/WMCore/WMSpec/WMWorkload.py b/src/python/WMCore/WMSpec/WMWorkload.py index ec797321a2..570549deff 100644 --- a/src/python/WMCore/WMSpec/WMWorkload.py +++ b/src/python/WMCore/WMSpec/WMWorkload.py @@ -893,13 +893,10 @@ def setMemory(self, memory, initialTask=None): for task in taskIterator: if isinstance(memory, dict): - if task.name() in memory: - mem = memory.get(task.name()) - else: - mem = memory.get("default") + mem = memory.get(task.name()) else: mem = memory - + task.setJobResourceInformation(memoryReq=mem) self.setMemory(memory, task) From 6d1a3274b400a6743acc33dbb123ddf067ebcfc2 Mon Sep 17 00:00:00 2001 From: Antonio Linares Date: Tue, 27 Aug 2024 16:49:21 -0400 Subject: [PATCH 3/4] revert WMWorkload.py --- src/python/WMCore/WMSpec/WMWorkload.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/python/WMCore/WMSpec/WMWorkload.py b/src/python/WMCore/WMSpec/WMWorkload.py index 570549deff..7aee473d81 100644 --- a/src/python/WMCore/WMSpec/WMWorkload.py +++ b/src/python/WMCore/WMSpec/WMWorkload.py @@ -896,7 +896,6 @@ def setMemory(self, memory, initialTask=None): mem = memory.get(task.name()) else: mem = memory - task.setJobResourceInformation(memoryReq=mem) self.setMemory(memory, task) From 59c5e3f2dc8eed6c3105a7be80bc7c986a816019 Mon Sep 17 00:00:00 2001 From: Antonio Linares Date: Tue, 27 Aug 2024 17:39:25 -0400 Subject: [PATCH 4/4] First draft support for setting memory by task --- src/python/WMCore/WMSpec/WMTask.py | 2 +- src/python/WMCore/WMSpec/WMWorkload.py | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/python/WMCore/WMSpec/WMTask.py b/src/python/WMCore/WMSpec/WMTask.py index b39c673d3f..29a2adb88f 100644 --- a/src/python/WMCore/WMSpec/WMTask.py +++ b/src/python/WMCore/WMSpec/WMTask.py @@ -625,7 +625,7 @@ def setJobResourceInformation(self, timePerEvent=None, sizePerEvent=None, memory if memoryReq or getattr(performanceParams, "memoryRequirement", None): performanceParams.memoryRequirement = memoryReq or getattr(performanceParams, "memoryRequirement") # if we change memory requirements, then we must change MaxPSS as well - self.setMaxPSS(performanceParams.memoryRequirement) + self.setMaxPSS(performanceParams.memoryRequirement, taskType) return diff --git a/src/python/WMCore/WMSpec/WMWorkload.py b/src/python/WMCore/WMSpec/WMWorkload.py index 7aee473d81..1af3ffc84e 100644 --- a/src/python/WMCore/WMSpec/WMWorkload.py +++ b/src/python/WMCore/WMSpec/WMWorkload.py @@ -896,7 +896,14 @@ def setMemory(self, memory, initialTask=None): mem = memory.get(task.name()) else: mem = memory - task.setJobResourceInformation(memoryReq=mem) + + # If a task is specified, the ResourceInformation should take into account that + # it may not want to be ignored, regardless of the task + if initialTask is not None: + task.setJobResourceInformation(memoryReq=mem, taskType=task.taskType()) # Or taskType=initialTask.taskType() ? + else: + task.setJobResourceInformation(memoryReq=mem) + self.setMemory(memory, task) return @@ -2002,6 +2009,10 @@ def updateArguments(self, kwargs): if kwargs.get("Memory") is not None: self.setMemory(kwargs.get("Memory")) + if kwargs.get("TaskMemory") is not None: + for taskName in kwargs.get("TaskMemory"): + task = self.getTaskByName(taskName) + self.setMemory(memory=kwargs.get("TaskMemory"), initialTask=task) if kwargs.get("Multicore") is not None: self.setCoresAndStreams(kwargs.get("Multicore"), kwargs.get("EventStreams"))