Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bin] Avoid multi-threading in non-interactive computation #1553

Merged
merged 2 commits into from
Oct 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions bin/meshroom_batch
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import meshroom
meshroom.setupEnvironment()

import meshroom.core.graph
import meshroom.core.taskManager
from meshroom import multiview
import logging

parser = argparse.ArgumentParser(description='Launch the full photogrammetry or Panorama HDR pipeline.')
parser.add_argument('-i', '--input', metavar='SFM/FOLDERS/IMAGES', type=str, nargs='*',
Expand Down Expand Up @@ -67,9 +67,24 @@ parser.add_argument('--submitter',
default='SimpleFarm',
help='Execute job with a specific submitter.')

parser.add_argument('-v', '--verbose', help="Verbosity level", default='',
choices=['', 'fatal', 'error', 'warning', 'info', 'debug', 'trace'],)

args = parser.parse_args()


logStringToPython = {
'fatal': logging.FATAL,
'error': logging.ERROR,
'warning': logging.WARNING,
'info': logging.INFO,
'debug': logging.DEBUG,
'trace': logging.DEBUG,
}
if args.verbose:
logging.getLogger().setLevel(logStringToPython[args.verbose])


def getOnlyNodeOfType(g, nodeType):
""" Helper function to get a node of 'nodeType' in the graph 'g' and raise if no or multiple candidates. """
nodes = g.nodesOfType(nodeType)
Expand Down Expand Up @@ -138,6 +153,9 @@ with multiview.GraphModification(graph):
if not graph.canComputeLeaves:
raise RuntimeError("Graph cannot be computed. Check for compatibility issues.")

if args.verbose:
graph.setVerbose(args.verbose)

if args.output:
publish = getOnlyNodeOfType(graph, 'Publish')
publish.output.value = args.output
Expand Down Expand Up @@ -201,9 +219,8 @@ if args.submit:
if not args.save:
raise ValueError('Need to save the project to file to submit on renderfarm.')
# submit on renderfarm
taskManager = meshroom.core.taskManager.TaskManager()
taskManager.submitFromFile(args.save, args.submitter, toNode=toNodes)
meshroom.core.graph.submit(args.save, args.submitter, toNode=toNodes)
elif args.compute:
# start computation
taskManager = meshroom.core.taskManager.TaskManager()
taskManager.compute(graph, toNodes=toNodes, forceCompute=args.forceCompute, forceStatus=args.forceStatus)
meshroom.core.graph.executeGraph(graph, toNodes=toNodes, forceCompute=args.forceCompute, forceStatus=args.forceStatus)

5 changes: 1 addition & 4 deletions bin/meshroom_compute
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import meshroom
meshroom.setupEnvironment()

import meshroom.core.graph
import meshroom.core.taskManager
from meshroom.core.node import Status


Expand Down Expand Up @@ -67,6 +66,4 @@ else:
if args.toNode:
toNodes = graph.findNodes([args.toNode])

taskManager = meshroom.core.taskManager.TaskManager()
taskManager.compute(graph, toNodes=toNodes, forceCompute=args.forceCompute, forceStatus=args.forceStatus)

meshroom.core.graph.executeGraph(graph, toNodes=toNodes, forceCompute=args.forceCompute, forceStatus=args.forceStatus)
5 changes: 2 additions & 3 deletions bin/meshroom_submit
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import meshroom
meshroom.setupEnvironment()

import meshroom.core.graph
import meshroom.core.taskManager

parser = argparse.ArgumentParser(description='Submit a Graph of processes on renderfarm.')
parser.add_argument('meshroomFile', metavar='MESHROOMFILE.mg', type=str,
Expand All @@ -19,5 +18,5 @@ parser.add_argument('--submitter',
help='Execute job with a specific submitter.')

args = parser.parse_args()
taskManager = meshroom.core.taskManager.TaskManager()
taskManager.submitFromFile(args.meshroomFile, args.submitter, toNode=args.toNode)

meshroom.core.graph.submit(args.save, args.submitter, toNode=toNodes)
106 changes: 106 additions & 0 deletions meshroom/core/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,15 @@ def cacheDir(self, value):
self.updateStatusFromCache(force=True)
self.cacheDirChanged.emit()

def setVerbose(self, v):
with GraphModification(self):
for node in self._nodes:
if node.hasAttribute('verbose'):
try:
node.verbose.value = v
except:
pass

nodes = Property(BaseObject, nodes.fget, constant=True)
edges = Property(BaseObject, edges.fget, constant=True)
filepathChanged = Signal()
Expand All @@ -1204,3 +1213,100 @@ def loadGraph(filepath):
graph.load(filepath)
graph.update()
return graph


def getAlreadySubmittedChunks(nodes):
out = []
for node in nodes:
for chunk in node.chunks:
if chunk.isAlreadySubmitted():
out.append(chunk)
return out


def executeGraph(graph, toNodes=None, forceCompute=False, forceStatus=False):
"""
"""
if forceCompute:
nodes, edges = graph.dfsOnFinish(startNodes=toNodes)
else:
nodes, edges = graph.dfsToProcess(startNodes=toNodes)
chunksInConflict = getAlreadySubmittedChunks(nodes)

if chunksInConflict:
chunksStatus = set([chunk.status.status.name for chunk in chunksInConflict])
chunksName = [node.name for node in chunksInConflict]
msg = 'WARNING: Some nodes are already submitted with status: {}\nNodes: {}'.format(
', '.join(chunksStatus),
', '.join(chunksName)
)
if forceStatus:
print(msg)
else:
raise RuntimeError(msg)

print('Nodes to execute: ', str([n.name for n in nodes]))

for node in nodes:
node.beginSequence(forceCompute)

for n, node in enumerate(nodes):
try:
multiChunks = len(node.chunks) > 1
for c, chunk in enumerate(node.chunks):
if multiChunks:
print('\n[{node}/{nbNodes}]({chunk}/{nbChunks}) {nodeName}'.format(
node=n+1, nbNodes=len(nodes),
chunk=c+1, nbChunks=len(node.chunks), nodeName=node.nodeType))
else:
print('\n[{node}/{nbNodes}] {nodeName}'.format(
node=n + 1, nbNodes=len(nodes), nodeName=node.nodeType))
chunk.process(forceCompute)
except Exception as e:
logging.error("Error on node computation: {}".format(e))
graph.clearSubmittedNodes()
raise

for node in nodes:
node.endSequence()


def submitGraph(graph, submitter, toNodes=None):
nodesToProcess, edgesToProcess = graph.dfsToProcess(startNodes=toNodes)
flowEdges = graph.flowEdges(startNodes=toNodes)
edgesToProcess = set(edgesToProcess).intersection(flowEdges)

if not nodesToProcess:
logging.warning('Nothing to compute')
return

logging.info("Nodes to process: {}".format(edgesToProcess))
logging.info("Edges to process: {}".format(edgesToProcess))

sub = None
if submitter:
sub = meshroom.core.submitters.get(submitter, None)
elif len(meshroom.core.submitters) == 1:
# if only one submitter available use it
sub = meshroom.core.submitters.values()[0]
if sub is None:
raise RuntimeError("Unknown Submitter: '{submitter}'. Available submitters are: '{allSubmitters}'.".format(
submitter=submitter, allSubmitters=str(meshroom.core.submitters.keys())))

try:
res = sub.submit(nodesToProcess, edgesToProcess, graph.filepath)
if res:
for node in nodesToProcess:
node.submit() # update node status
except Exception as e:
logging.error("Error on submit : {}".format(e))


def submit(graphFile, submitter, toNode=None):
"""
Submit the given graph via the given submitter.
"""
graph = loadGraph(graphFile)
toNodes = graph.findNodes([toNode]) if toNode else None
submitGraph(graph, submitter, toNodes)