diff --git a/bin/meshroom_batch b/bin/meshroom_batch index 3adcddfe00..5e1414955a 100755 --- a/bin/meshroom_batch +++ b/bin/meshroom_batch @@ -8,8 +8,8 @@ import meshroom meshroom.setupEnvironment() import meshroom.core.graph -import meshroom.core.taskManager from meshroom import multiview +import logging parser = argparse.ArgumentParser(description='Launch the full photogrammetry or Panorama HDR pipeline.') parser.add_argument('-i', '--input', metavar='SFM/FOLDERS/IMAGES', type=str, nargs='*', @@ -67,9 +67,24 @@ parser.add_argument('--submitter', default='SimpleFarm', help='Execute job with a specific submitter.') +parser.add_argument('-v', '--verbose', help="Verbosity level", default='', + choices=['', 'fatal', 'error', 'warning', 'info', 'debug', 'trace'],) + args = parser.parse_args() +logStringToPython = { + 'fatal': logging.FATAL, + 'error': logging.ERROR, + 'warning': logging.WARNING, + 'info': logging.INFO, + 'debug': logging.DEBUG, + 'trace': logging.DEBUG, +} +if args.verbose: + logging.getLogger().setLevel(logStringToPython[args.verbose]) + + def getOnlyNodeOfType(g, nodeType): """ Helper function to get a node of 'nodeType' in the graph 'g' and raise if no or multiple candidates. """ nodes = g.nodesOfType(nodeType) @@ -138,6 +153,9 @@ with multiview.GraphModification(graph): if not graph.canComputeLeaves: raise RuntimeError("Graph cannot be computed. Check for compatibility issues.") + if args.verbose: + graph.setVerbose(args.verbose) + if args.output: publish = getOnlyNodeOfType(graph, 'Publish') publish.output.value = args.output @@ -201,9 +219,8 @@ if args.submit: if not args.save: raise ValueError('Need to save the project to file to submit on renderfarm.') # submit on renderfarm - taskManager = meshroom.core.taskManager.TaskManager() - taskManager.submitFromFile(args.save, args.submitter, toNode=toNodes) + meshroom.core.graph.submit(args.save, args.submitter, toNode=toNodes) elif args.compute: # start computation - taskManager = meshroom.core.taskManager.TaskManager() - taskManager.compute(graph, toNodes=toNodes, forceCompute=args.forceCompute, forceStatus=args.forceStatus) + meshroom.core.graph.executeGraph(graph, toNodes=toNodes, forceCompute=args.forceCompute, forceStatus=args.forceStatus) + diff --git a/bin/meshroom_compute b/bin/meshroom_compute index 42c22b479a..7060c7dcf4 100755 --- a/bin/meshroom_compute +++ b/bin/meshroom_compute @@ -6,7 +6,6 @@ import meshroom meshroom.setupEnvironment() import meshroom.core.graph -import meshroom.core.taskManager from meshroom.core.node import Status @@ -67,6 +66,4 @@ else: if args.toNode: toNodes = graph.findNodes([args.toNode]) - taskManager = meshroom.core.taskManager.TaskManager() - taskManager.compute(graph, toNodes=toNodes, forceCompute=args.forceCompute, forceStatus=args.forceStatus) - + meshroom.core.graph.executeGraph(graph, toNodes=toNodes, forceCompute=args.forceCompute, forceStatus=args.forceStatus) diff --git a/bin/meshroom_submit b/bin/meshroom_submit index 8ee835245e..9606834546 100755 --- a/bin/meshroom_submit +++ b/bin/meshroom_submit @@ -5,7 +5,6 @@ import meshroom meshroom.setupEnvironment() import meshroom.core.graph -import meshroom.core.taskManager parser = argparse.ArgumentParser(description='Submit a Graph of processes on renderfarm.') parser.add_argument('meshroomFile', metavar='MESHROOMFILE.mg', type=str, @@ -19,5 +18,5 @@ parser.add_argument('--submitter', help='Execute job with a specific submitter.') args = parser.parse_args() -taskManager = meshroom.core.taskManager.TaskManager() -taskManager.submitFromFile(args.meshroomFile, args.submitter, toNode=args.toNode) + +meshroom.core.graph.submit(args.save, args.submitter, toNode=toNodes) diff --git a/meshroom/core/graph.py b/meshroom/core/graph.py index 0dee724555..9baf1a5960 100644 --- a/meshroom/core/graph.py +++ b/meshroom/core/graph.py @@ -1185,6 +1185,15 @@ def cacheDir(self, value): self.updateStatusFromCache(force=True) self.cacheDirChanged.emit() + def setVerbose(self, v): + with GraphModification(self): + for node in self._nodes: + if node.hasAttribute('verbose'): + try: + node.verbose.value = v + except: + pass + nodes = Property(BaseObject, nodes.fget, constant=True) edges = Property(BaseObject, edges.fget, constant=True) filepathChanged = Signal() @@ -1204,3 +1213,100 @@ def loadGraph(filepath): graph.load(filepath) graph.update() return graph + + +def getAlreadySubmittedChunks(nodes): + out = [] + for node in nodes: + for chunk in node.chunks: + if chunk.isAlreadySubmitted(): + out.append(chunk) + return out + + +def executeGraph(graph, toNodes=None, forceCompute=False, forceStatus=False): + """ + """ + if forceCompute: + nodes, edges = graph.dfsOnFinish(startNodes=toNodes) + else: + nodes, edges = graph.dfsToProcess(startNodes=toNodes) + chunksInConflict = getAlreadySubmittedChunks(nodes) + + if chunksInConflict: + chunksStatus = set([chunk.status.status.name for chunk in chunksInConflict]) + chunksName = [node.name for node in chunksInConflict] + msg = 'WARNING: Some nodes are already submitted with status: {}\nNodes: {}'.format( + ', '.join(chunksStatus), + ', '.join(chunksName) + ) + if forceStatus: + print(msg) + else: + raise RuntimeError(msg) + + print('Nodes to execute: ', str([n.name for n in nodes])) + + for node in nodes: + node.beginSequence(forceCompute) + + for n, node in enumerate(nodes): + try: + multiChunks = len(node.chunks) > 1 + for c, chunk in enumerate(node.chunks): + if multiChunks: + print('\n[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format( + node=n+1, nbNodes=len(nodes), + chunk=c+1, nbChunks=len(node.chunks), nodeName=node.nodeType)) + else: + print('\n[{node}/{nbNodes}] {nodeName}'.format( + node=n + 1, nbNodes=len(nodes), nodeName=node.nodeType)) + chunk.process(forceCompute) + except Exception as e: + logging.error("Error on node computation: {}".format(e)) + graph.clearSubmittedNodes() + raise + + for node in nodes: + node.endSequence() + + +def submitGraph(graph, submitter, toNodes=None): + nodesToProcess, edgesToProcess = graph.dfsToProcess(startNodes=toNodes) + flowEdges = graph.flowEdges(startNodes=toNodes) + edgesToProcess = set(edgesToProcess).intersection(flowEdges) + + if not nodesToProcess: + logging.warning('Nothing to compute') + return + + logging.info("Nodes to process: {}".format(edgesToProcess)) + logging.info("Edges to process: {}".format(edgesToProcess)) + + sub = None + if submitter: + sub = meshroom.core.submitters.get(submitter, None) + elif len(meshroom.core.submitters) == 1: + # if only one submitter available use it + sub = meshroom.core.submitters.values()[0] + if sub is None: + raise RuntimeError("Unknown Submitter: '{submitter}'. Available submitters are: '{allSubmitters}'.".format( + submitter=submitter, allSubmitters=str(meshroom.core.submitters.keys()))) + + try: + res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath) + if res: + for node in nodesToProcess: + node.submit() # update node status + except Exception as e: + logging.error("Error on submit : {}".format(e)) + + +def submit(graphFile, submitter, toNode=None): + """ + Submit the given graph via the given submitter. + """ + graph = loadGraph(graphFile) + toNodes = graph.findNodes([toNode]) if toNode else None + submitGraph(graph, submitter, toNodes) +