From 2bcabe1e3a46a8432e933a386a43b6a0d0640ecd Mon Sep 17 00:00:00 2001 From: fabien servant Date: Wed, 14 Dec 2022 09:18:03 +0100 Subject: [PATCH 1/5] [submitters] add Ripple --- meshroom/submitters/rippleSubmitter.py | 139 +++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 meshroom/submitters/rippleSubmitter.py diff --git a/meshroom/submitters/rippleSubmitter.py b/meshroom/submitters/rippleSubmitter.py new file mode 100644 index 0000000000..071d8cf610 --- /dev/null +++ b/meshroom/submitters/rippleSubmitter.py @@ -0,0 +1,139 @@ +import os +import json + +#meshroom modules +from meshroom.core.desc import Level +from meshroom.core.submitter import BaseSubmitter + +#mpc logging import +import mpc.logging + +#Ripple imports +from mpc.ripple.rippleConfig import RippleConfig as _RippleConfig +from mpc.ripple.rippleProcess import RippleProcess +from mpc.ripple.dispatcher import DefaultDispatcher +from mpc.ripple.rippleStorage import RippleStorage +from mpc.ripple.rippleUtilities import RippleGroup +from mpc.ripple.rippleAttribute import RippleAttribute + +#validators for numbers +from mpc.pyCore.validators import IntValidator + +_log = mpc.logging.getLogger() + +currentDir = os.path.dirname(os.path.realpath(__file__)) +binDir = os.path.dirname(os.path.dirname(os.path.dirname(currentDir))) + +# Give access to min/maxProcessors, which is an alias to slots +class RippleProcessWithSlots(RippleProcess): + minProcessors = RippleAttribute('', IntValidator(), 1, True) + maxProcessors = RippleAttribute('', IntValidator(), 1, True) + +class RippleSubmitter(BaseSubmitter): + def __init__(self, parent=None): + super(RippleSubmitter, self).__init__(name='Ripple', parent=parent) + + def createTask(self, meshroomFile, node, parents): + + nbBlocks = 1 + + #Map meshroom GPU modes to MPC services + gpudict = { + "NONE":"", + "NORMAL":",gpu8G", + "INTENSIVE":",gpu16G" + } + + #Specify some constraints + requirements = "!\"rs*\",@.mem>25{gpu}".format(gpu=gpudict[node.nodeDesc.gpu.name]) + + #specify which node to wait before launching the current one + waitsFor = [] + for parent in parents: + waitsFor.append(parent.name) + + #Basic command line for this node + command='meshroom_compute --node {nodeName} "{meshroomFile}" --extern'.format(nodeName=node.name, meshroomFile=meshroomFile) + + if node.isParallelized: + _, _, nbBlocks = node.nodeDesc.parallelization.getSizes(node) + + #Create as many process as iteration (or chunks) + rippleprocs = [] + for iteration in range(0, nbBlocks): + + #Add iteration number + commandext = '{cmd} --iteration {iter}'.format(cmd=command, iter=iteration) + + #Create process task with parameters + rippleproc = RippleProcessWithSlots(name='{name} iteration {iter}'.format(name=node.name, iter=iteration), discipline='ripple', appendKeys=True, keys=requirements, label=node.name, cmdList=[commandext], waitsFor=waitsFor, minProcessors=1, maxProcessors=1) + rippleprocs.append(rippleproc) + + rippleObj = RippleGroup(label="{name} Group".format(name=node.name), tasks=rippleprocs, name=node.name, waitsFor=waitsFor) + else: + rippleObj = RippleProcessWithSlots(name=node.name, discipline='ripple', appendKeys=True, keys=requirements, label=node.name, cmdList=[command], waitsFor=waitsFor, minProcessors=1, maxProcessors=1) + + return rippleObj + + def submit(self, nodes, edges, filepath, submitLabel): + + projectName = os.path.splitext(os.path.basename(filepath))[0] + label = submitLabel.format(projectName=projectName) + + #Build a tree + tree = {} + for node in nodes: + tree[node] = [] + + for end, start in edges: + tree[end].append(start) + + nodesDone = [] + hasChange = True + tasks = [] + + #As long as a valid node was found in the previous iteration + while hasChange: + + hasChange = False + toRemove = [] + + #Loop over all nodes in the graph + for node in tree.keys(): + + #Ignore a node already processed + found = False + for nodeDone in nodesDone: + if nodeDone == node.name: + found = True + + if found: + continue + + #Check if all parents are already visited + valid = True + for parent in tree[node]: + found = False + for nodeDone in nodesDone: + if nodeDone == parent.name: + found = True + if found is False: + valid = False + + if valid is False: + continue + + tasks.append(self.createTask(filepath, node, tree[node])) + + toRemove.append(node.name) + hasChange = True + + for itemRemove in toRemove: + nodesDone.append(itemRemove) + + if (len(tasks) == 0): + return True + + DefaultDispatcher(label=label, tasks=tasks, jobType='release', paused=False)() + + return True \ No newline at end of file From 5cd7b49a6402480d60e3351bcbc0fe2000a50936 Mon Sep 17 00:00:00 2001 From: fabien servant Date: Fri, 23 Dec 2022 07:38:30 +0100 Subject: [PATCH 2/5] ripple submitter python cosmetics --- meshroom/submitters/rippleSubmitter.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/meshroom/submitters/rippleSubmitter.py b/meshroom/submitters/rippleSubmitter.py index 071d8cf610..22d3fbe735 100644 --- a/meshroom/submitters/rippleSubmitter.py +++ b/meshroom/submitters/rippleSubmitter.py @@ -88,7 +88,7 @@ def submit(self, nodes, edges, filepath, submitLabel): for end, start in edges: tree[end].append(start) - nodesDone = [] + nodesDone = set() hasChange = True tasks = [] @@ -103,9 +103,8 @@ def submit(self, nodes, edges, filepath, submitLabel): #Ignore a node already processed found = False - for nodeDone in nodesDone: - if nodeDone == node.name: - found = True + if node.name in nodesDone: + found = True if found: continue @@ -114,9 +113,8 @@ def submit(self, nodes, edges, filepath, submitLabel): valid = True for parent in tree[node]: found = False - for nodeDone in nodesDone: - if nodeDone == parent.name: - found = True + if parent.name in nodesDone: + found = True if found is False: valid = False From 05e8b87200b33dd6a86dc87c2582ea644644e1a5 Mon Sep 17 00:00:00 2001 From: fabien servant Date: Fri, 23 Dec 2022 07:41:51 +0100 Subject: [PATCH 3/5] ripple submitter slots count selection heuristic --- meshroom/submitters/rippleSubmitter.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/meshroom/submitters/rippleSubmitter.py b/meshroom/submitters/rippleSubmitter.py index 22d3fbe735..7ba135555b 100644 --- a/meshroom/submitters/rippleSubmitter.py +++ b/meshroom/submitters/rippleSubmitter.py @@ -43,6 +43,18 @@ def createTask(self, meshroomFile, node, parents): "NORMAL":",gpu8G", "INTENSIVE":",gpu16G" } + + #decide if we need multiple slots + minProcessors = 1 + maxProcessors = 1 + if Level.INTENSIVE in (node.nodeDesc.ram, node.nodeDesc.cpu): + #at least 2 slots + minProcessors = 2 + #if more than 2 are available without waiting, use 3 or 4 + maxProcessors = 4 + elif Level.NORMAL in (node.nodeDesc.ram, node.nodeDesc.cpu): + #if 2 are available, otherwise 1 + maxProcessors = 2 #Specify some constraints requirements = "!\"rs*\",@.mem>25{gpu}".format(gpu=gpudict[node.nodeDesc.gpu.name]) @@ -66,12 +78,12 @@ def createTask(self, meshroomFile, node, parents): commandext = '{cmd} --iteration {iter}'.format(cmd=command, iter=iteration) #Create process task with parameters - rippleproc = RippleProcessWithSlots(name='{name} iteration {iter}'.format(name=node.name, iter=iteration), discipline='ripple', appendKeys=True, keys=requirements, label=node.name, cmdList=[commandext], waitsFor=waitsFor, minProcessors=1, maxProcessors=1) + rippleproc = RippleProcessWithSlots(name='{name} iteration {iter}'.format(name=node.name, iter=iteration), discipline='ripple', appendKeys=True, keys=requirements, label=node.name, cmdList=[commandext], waitsFor=waitsFor, minProcessors=minProcessors, maxProcessors=maxProcessors) rippleprocs.append(rippleproc) rippleObj = RippleGroup(label="{name} Group".format(name=node.name), tasks=rippleprocs, name=node.name, waitsFor=waitsFor) else: - rippleObj = RippleProcessWithSlots(name=node.name, discipline='ripple', appendKeys=True, keys=requirements, label=node.name, cmdList=[command], waitsFor=waitsFor, minProcessors=1, maxProcessors=1) + rippleObj = RippleProcessWithSlots(name=node.name, discipline='ripple', appendKeys=True, keys=requirements, label=node.name, cmdList=[command], waitsFor=waitsFor, minProcessors=minProcessors, maxProcessors=maxProcessors) return rippleObj From d04952c2761124bd87fbd3e67ff4b7fab296ccb1 Mon Sep 17 00:00:00 2001 From: "Fabien Servant @ TCS" <100348063+servantftechnicolor@users.noreply.github.com> Date: Fri, 23 Dec 2022 18:12:10 +0100 Subject: [PATCH 4/5] Python error fix --- meshroom/submitters/rippleSubmitter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/meshroom/submitters/rippleSubmitter.py b/meshroom/submitters/rippleSubmitter.py index 7ba135555b..e1db828726 100644 --- a/meshroom/submitters/rippleSubmitter.py +++ b/meshroom/submitters/rippleSubmitter.py @@ -139,11 +139,11 @@ def submit(self, nodes, edges, filepath, submitLabel): hasChange = True for itemRemove in toRemove: - nodesDone.append(itemRemove) + nodesDone.add(itemRemove) if (len(tasks) == 0): return True DefaultDispatcher(label=label, tasks=tasks, jobType='release', paused=False)() - return True \ No newline at end of file + return True From e355a34d67a75d82a631b31aa73d46abe19f0949 Mon Sep 17 00:00:00 2001 From: "Fabien Servant @ TCS" <100348063+servantftechnicolor@users.noreply.github.com> Date: Fri, 23 Dec 2022 18:20:28 +0100 Subject: [PATCH 5/5] add default parameter value for ripple submit --- meshroom/submitters/rippleSubmitter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/meshroom/submitters/rippleSubmitter.py b/meshroom/submitters/rippleSubmitter.py index e1db828726..d524ce7d55 100644 --- a/meshroom/submitters/rippleSubmitter.py +++ b/meshroom/submitters/rippleSubmitter.py @@ -87,7 +87,7 @@ def createTask(self, meshroomFile, node, parents): return rippleObj - def submit(self, nodes, edges, filepath, submitLabel): + def submit(self, nodes, edges, filepath, submitLabel="{projectName}"): projectName = os.path.splitext(os.path.basename(filepath))[0] label = submitLabel.format(projectName=projectName)