Skip to content

Commit

Permalink
LocalDispatcher uses subproccesses to dispatch tasks via the execute …
Browse files Browse the repository at this point in the history
…app.

It saves the current script to a temporary location determined by its jobDirectory, and outputs progress as IECore messages. It passes the context entries for each task to the execute app via command line arguments.

Fixes #866.
  • Loading branch information
andrewkaufman committed Jun 25, 2014
1 parent 3f2bab2 commit 6e59343
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 50 deletions.
86 changes: 70 additions & 16 deletions python/Gaffer/LocalDispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
#
##########################################################################

import os
import errno
import subprocess

import Gaffer
import IECore

Expand All @@ -42,29 +46,79 @@ class LocalDispatcher( Gaffer.Dispatcher ) :
def __init__( self, name = "LocalDispatcher" ) :

Gaffer.Dispatcher.__init__( self, name )


def jobDirectory( self, context ) :

jobDirectory = Gaffer.Dispatcher.jobDirectory( self, context )
result = os.path.join( jobDirectory, "%06d" % self.__nextJobId( jobDirectory ) )

while True :
try :
os.makedirs( result )
break
except OSError, e :
if e.errno == errno.EEXIST :
result = os.path.join( jobDirectory, "%06d" % self.__nextJobId( jobDirectory ) )
continue
else :
raise e

return result

def _doDispatch( self, nodes ) :

if not nodes :
return


script = nodes[0].scriptNode()
if script is None :
c = Gaffer.Context()
else :
c = script.context()

taskList = map( lambda n: Gaffer.ExecutableNode.Task(n,c), nodes )

IECore.msg( IECore.MessageHandler.Level.Error, self.getName(), "Can only dispatch nodes which are part of a script." )
return

context = Gaffer.Context() if script is None else script.context()

with context :
scriptFileName = script["fileName"].getValue()
jobName = context.substitute( self["jobName"].getValue() )
jobDirectory = self.jobDirectory( context )

messageContext = "%s : Job %s %s" % ( self.getName(), jobName, os.path.basename( jobDirectory ) )
tmpScript = os.path.join( jobDirectory, os.path.basename( scriptFileName ) if scriptFileName else "untitled.gfr" )
script.serialiseToFile( tmpScript )

taskList = map( lambda node: Gaffer.ExecutableNode.Task( node, context ), nodes )
allTasksAndRequirements = Gaffer.Dispatcher._uniqueTasks( taskList )

for (task,requirements) in allTasksAndRequirements :

task.node.execute( [ task.context ] )


for ( task, requirements ) in allTasksAndRequirements :

frames = str(int(task.context.getFrame()))

cmd = [
"gaffer", "execute",
"-script", tmpScript,
"-nodes", task.node.relativeName( script ),
"-frames", frames,
"-context",
]

for entry in context.keys() :
if entry != "frame" :
cmd.extend( [ "-" + entry, repr(context[entry]) ] )

IECore.msg( IECore.MessageHandler.Level.Info, messageContext, " ".join( cmd ) )
result = subprocess.call( cmd )
if result :
IECore.msg( IECore.MessageHandler.Level.Error, messageContext, "Failed to execute " + task.node.getName() + " on frames " + frames )
return

IECore.msg( IECore.MessageHandler.Level.Info, messageContext, "Completed all tasks." )

def _doSetupPlugs( self, parentPlug ) :

pass

def __nextJobId( self, directory ) :

previousJobs = IECore.ls( directory, minSequenceSize = 1 )
nextJob = max( previousJobs[0].frameList.asList() ) + 1 if previousJobs else 0
return nextJob

IECore.registerRunTimeTyped( LocalDispatcher, typeName = "Gaffer::LocalDispatcher" )

Expand Down
13 changes: 1 addition & 12 deletions python/GafferTest/DispatcherTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,6 @@ def testDerivedClass( self ) :

self.assertEqual( op1.counter, 1 )

def testLocalDispatcher( self ) :

log = list()
op1 = TestOp("1", log)
n1 = Gaffer.ExecutableOpHolder()
n1.setParameterised( op1 )

Gaffer.Dispatcher.dispatcher('local').dispatch( [ n1 ] )

self.assertEqual( op1.counter, 1 )

def testDispatcherRegistration( self ) :

self.failUnless( "testDispatcher" in Gaffer.Dispatcher.dispatcherNames() )
Expand Down Expand Up @@ -139,7 +128,7 @@ def __slot( self, d, nodes ) :
op1 = TestOp("1", log)
n1 = Gaffer.ExecutableOpHolder()
n1.setParameterised( op1 )
dispatcher = Gaffer.Dispatcher.dispatcher('local')
dispatcher = DispatcherTest.MyDispatcher()
dispatcher.dispatch( [ n1 ] )

self.assertEqual( len( preCs ), 1 )
Expand Down
23 changes: 1 addition & 22 deletions python/GafferTest/ExecutableNodeTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,28 +125,7 @@ def testExecutionRequirements( self ) :
( Gaffer.ExecutableNode.Task( n2, c1 ), [ Gaffer.ExecutableNode.Task( n, c1 ) ] )

self.assertEqual( Gaffer.Dispatcher._uniqueTasks( [ Gaffer.ExecutableNode.Task( n2, c1 ) ] ), [ ( Gaffer.ExecutableNode.Task( n, c1 ), [] ), ( Gaffer.ExecutableNode.Task( n2, c1 ), [ Gaffer.ExecutableNode.Task( n, c1 ) ] ) ] )

def testExecute( self ):

n = ExecutableNodeTest.MyNode(False)

n2 = ExecutableNodeTest.MyNode(False)

# make n3 requiring n
r1 = Gaffer.Plug( name = "r1" )
n2['requirements'].addChild( r1 )
r1.setInput( n['requirement'] )

dispatcher = Gaffer.Dispatcher.dispatcher("local")

self.assertEqual( n2.executionCount, 0 )
self.assertEqual( n.executionCount, 0 )

dispatcher.dispatch( [ n2 ] )

self.assertEqual( n2.executionCount, 1 )
self.assertEqual( n.executionCount, 1 )


def testTaskConstructors( self ) :

c = Gaffer.Context()
Expand Down
211 changes: 211 additions & 0 deletions python/GafferTest/LocalDispatcherTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
##########################################################################
#
# Copyright (c) 2014, Image Engine Design Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided with
# the distribution.
#
# * Neither the name of John Haddon nor the names of
# any other contributors to this software may be used to endorse or
# promote products derived from this software without specific prior
# written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
##########################################################################

import os
import stat
import shutil
import unittest

import IECore

import Gaffer
import GafferTest

class LocalDispatcherTest( unittest.TestCase ) :

def setUp( self ) :

localDispatcher = Gaffer.Dispatcher.dispatcher( "local" )
localDispatcher["jobDirectory"].setValue( "/tmp/dispatcherTest" )

def testDispatcherRegistration( self ) :

self.failUnless( "local" in Gaffer.Dispatcher.dispatcherNames() )
self.failUnless( Gaffer.Dispatcher.dispatcher( "local" ).isInstanceOf( Gaffer.LocalDispatcher.staticTypeId() ) )

def testDispatch( self ) :

dispatcher = Gaffer.Dispatcher.dispatcher( "local" )

def createWriter( text ) :
node = GafferTest.TextWriter()
node["fileName"].setValue( "/tmp/dispatcherTest/%s_####.txt" % text )
node["text"].setValue( text + " on ${frame}" )
return node

# Create a tree of dependencies for execution:
# n1 requires:
# - n2 requires:
# -n2a
# -n2b
s = Gaffer.ScriptNode()
s["n1"] = createWriter( "n1" )
s["n2"] = createWriter( "n2" )
s["n2a"] = createWriter( "n2a" )
s["n2b"] = createWriter( "n2b" )
s["n1"]['requirements'][0].setInput( s["n2"]['requirement'] )
s["n2"]['requirements'][0].setInput( s["n2a"]['requirement'] )
s["n2"]['requirements'][1].setInput( s["n2b"]['requirement'] )

def verify( contexts, nodes = [ "n1", "n2", "n2a", "n2b" ], exist = True ) :

for context in contexts :

modTimes = {}

for node in s.children( Gaffer.Node ) :

fileName = context.substitute( node["fileName"].getValue() )
self.assertEqual( os.path.isfile( fileName ), exist )
if exist :
modTimes[node.getName()] = os.stat( fileName )[stat.ST_MTIME]
with file( fileName, "r" ) as f :
text = f.read()
self.assertEqual( text, "%s on %d" % ( node.getName(), context.getFrame() ) )

if exist :
if "n1" in nodes :
self.assertGreater( modTimes["n1"], modTimes["n2"] )
else :
self.assertLess( modTimes["n1"], modTimes["n2"] )

if "n2" in nodes :
self.assertGreater( modTimes["n2"], modTimes["n2a"] )
self.assertGreater( modTimes["n2"], modTimes["n2b"] )
else :
if "n2a" in nodes :
self.assertLess( modTimes["n2"], modTimes["n2a"] )
if "n2b" in nodes :
self.assertLess( modTimes["n2"], modTimes["n2b"] )

# No files should exist yet
verify( [ s.context() ], exist = False )

# Executing n1 should trigger execution of all of them
dispatcher.dispatch( [ s["n1"] ] )
verify( [ s.context() ] )

# Executing n1 and anything else, should be the same as just n1
dispatcher.dispatch( [ s["n2b"], s["n1"] ] )
verify( [ s.context() ] )

# Executing all nodes should be the same as just n1
dispatcher.dispatch( [ s["n2"], s["n2b"], s["n1"], s["n2a"] ] )
verify( [ s.context() ] )

# Executing a sub-branch (n2) should only trigger execution in that branch
dispatcher.dispatch( [ s["n2"] ] )
verify( [ s.context() ], nodes = [ "n2", "n2a", "n2b" ] )

# Executing a leaf node, should not trigger other executions.
dispatcher.dispatch( [ s["n2b"] ] )
verify( [ s.context() ], nodes = [ "n2b" ] )

def testContextVariation( self ) :

s = Gaffer.ScriptNode()
context = s.context()
context["script:name"] = "notTheRealScriptName"
context["textWriter:replace"] = IECore.StringVectorData( [ " ", "\n" ] )

s["n1"] = GafferTest.TextWriter()
s["n1"]["fileName"].setValue( "/tmp/dispatcherTest/${script:name}_####.txt" )
s["n1"]["text"].setValue( "${script:name} on ${frame}" )

fileName = context.substitute( s["n1"]["fileName"].getValue() )
self.assertFalse( os.path.isfile( fileName ) )

Gaffer.Dispatcher.dispatcher( "local" ).dispatch( [ s["n1"] ] )

self.assertTrue( os.path.isfile( fileName ) )
self.assertTrue( os.path.basename( fileName ).startswith( context["script:name"] ) )
with file( fileName, "r" ) as f :
text = f.read()
expected = "%s on %d" % ( context["script:name"], context.getFrame() )
expected = expected.replace( context["textWriter:replace"][0], context["textWriter:replace"][1] )
self.assertEqual( text, expected )

def testDispatcherSignals( self ) :

class CapturingSlot2( list ) :

def __init__( self, *signals ) :

self.__connections = []
for s in signals :
self.__connections.append( s.connect( Gaffer.WeakMethod( self.__slot ) ) )

def __slot( self, d, nodes ) :
self.append( (d,nodes) )

preCs = CapturingSlot2( Gaffer.Dispatcher.preDispatchSignal() )
self.assertEqual( len( preCs ), 0 )
postCs = GafferTest.CapturingSlot( Gaffer.Dispatcher.postDispatchSignal() )
self.assertEqual( len( postCs ), 0 )

s = Gaffer.ScriptNode()
s["n1"] = GafferTest.TextWriter()
s["n1"]["fileName"].setValue( "/tmp/dispatcherTest/n1_####.txt" )
s["n1"]["text"].setValue( "n1 on ${frame}" )

dispatcher = Gaffer.Dispatcher.dispatcher( "local" )
dispatcher.dispatch( [ s["n1"] ] )

self.assertEqual( len( preCs ), 1 )
self.failUnless( preCs[0][0].isSame( dispatcher ) )
self.assertEqual( preCs[0][1], [ s["n1"] ] )

self.assertEqual( len( postCs ), 1 )
self.failUnless( postCs[0][0].isSame( dispatcher ) )
self.assertEqual( postCs[0][1], [ s["n1"] ] )

def testBadJobDirectory( self ) :

dispatcher = Gaffer.LocalDispatcher()
self.assertEqual( dispatcher["jobName"].getValue(), "" )
self.assertEqual( dispatcher["jobDirectory"].getValue(), "" )
jobDir = dispatcher.jobDirectory( Gaffer.Context() )
self.assertNotEqual( jobDir, "" )
self.assertTrue( os.path.exists( jobDir ) )
shutil.rmtree( jobDir )

def tearDown( self ) :

shutil.rmtree( "/tmp/dispatcherTest", ignore_errors = True )

if __name__ == "__main__":
unittest.main()

Loading

0 comments on commit 6e59343

Please sign in to comment.