Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate STDERR output on non-error jobs. #9

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 259 additions & 0 deletions jobTree/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@

import jobTree.manage
import jobTree.db
from jobTree.db import join as db_join
import sys
import os

class Stack(object):

def __init__(self, target):
self.target = target

@staticmethod
def addJobTreeOptions(parser):
parser.add_option("--batchSystem", dest="batchSystem",
help="The type of batch system to run the job(s) with, currently can be 'singleMachine'/'drmma'. default=%default",
default="drmaa")


parser.add_option("--jobTree", dest="jobTree",
help="Directory in which to place job management files \
(this needs to be globally accessible by all machines running jobs).\n\
If you pass an existing directory it will check if it's a valid existin job tree, then\
try and restart the jobs in it",
default=None)



def startJobTree(self, options):
self.options = options
extra_path = os.path.dirname(os.path.abspath(sys.argv[0]))
os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', "") + ":" + extra_path
dbPath = "file://jobTree_workdir"
config = jobTree.manage.Config(dbPath, 'auto', workdir="tmp_dir")
manager = jobTree.manage.Manager(config)
instance = manager.submit('root', self.target, {})
manager.wait(instance)
db = jobTree.db.connect(dbPath)
for table in db.listTables(instance):
if table.table.endswith("@error"):
errorFound = False
for key, value in db.listKeyValue(table):
sys.stderr.write("%s %s" % (key, value['error']))




class TargetBase(object):
"""
Base Target Class
"""
def __init__(self):
self.__manager__ = None

def __setmanager__(self, manager):
self.__manager__ = manager
self.child_count = 0

def __setpath__(self, instance, tablePath):
self.__instance__ = instance
self.__tablepath__ = tablePath

def __close__(self):
pass




class Target(TargetBase):
"""
The target class describes a python class to be pickel'd and
run as a remote process at a later time.
"""

def __init__(self, **kw):
pass

def run(self):
"""
The run method is user provided and run in a seperate process,
possible on a remote node.
"""
raise Exception()

def getcwd(self):
return db_join(self.__instance__, self.__tablepath__)

def addChildTarget(self, child_name, child=None, params={}, out_table=None, chdir=None):
"""
Add child target to be executed

:param child_name:
Unique name of child to be executed

:param child:
Target object to be pickled and run as a remote


Example::

class MyWorker(jobtree.Target):

def __init__(self, dataBlock):
self.dataBlock = dataBlock

def run(self):
c = MyOtherTarget(dataBlock.pass_to_child)
self.addChildTarget('child', c )


"""
if isinstance(child_name, TargetBase):
self.__manager__._addChild(self.__instance__, self.__tablepath__, "child_%d" % (self.child_count), child_name, params=params, out_table=out_table, chdir=chdir)
else:
self.__manager__._addChild(self.__instance__, self.__tablepath__, child_name, child, params=params, out_table=out_table, chdir=chdir)

self.child_count += 1

def runTarget(self, child):
"""
Run a target directly (inline). This method give the manager a chance
to setup table and run path info for the target class before calling the 'run' method

"""

self.__manager__._runTarget(self, child)

def setFollowOnTarget(self, child_name, child=None, chdir=None):
if isinstance(child_name, Target):
self.addFollowTarget("follow", child_name, chdir=chdir)
else:
self.addFollowTarget(child_name, child, chdir=chdir)

def addFollowTarget(self, child_name, child, depends=None, chdir=None):
"""
A follow target is a delayed callback, that isn't run until all
of a targets children have complete
"""
if depends is None:
self.__manager__._addChild(self.__instance__, self.__tablepath__, child_name, child, depends=self.__tablepath__ + "@follow", chdir=chdir)
else:
refs = []
if isinstance(depends, str):
depends = [depends]
for d in depends:
refs.append(db_join(self.__instance__, self.__tablepath__, d).table)
self.__manager__._addChild(self.__instance__, self.__tablepath__, child_name, child, depends=refs, chdir=chdir)

def createTable(self, tableName, tableInfo={}):
"""
Create a table to output to

:param tableName: Name of the table to be opened

:param tableInfo: An object to descibe the characteristics of the table

:return: :class:`jobTree.db.table.WriteTable`

In the case of a target::

e02d038d-98a3-494a-9a27-c04b4516ced4:/submit_20120110/the_child

The call::

self.openTable('output')

Would create the table::

e02d038d-98a3-494a-9a27-c04b4516ced4:/submit_20120110/the_child/output


"""
return self.__manager__._createTable(self.__instance__, self.__tablepath__ + "/" + tableName, tableInfo)

def openTable(self, tableName):
"""
Open a table input from. By default, tables belong to the parents,
because they have already finished executing

:param tableName: Name of the table to be opened

:return: :class:`jobTree.db.table.ReadTable`

In the case of a target::

e02d038d-98a3-494a-9a27-c04b4516ced4:/submit_20120110/the_child

The call::

self.openTable('input')

Would open the table::

e02d038d-98a3-494a-9a27-c04b4516ced4:/submit_20120110/input

Because openTable opens in the parent directory (because the parent has already completed)


"""
tablePath = db_join(self.__instance__, self.__tablepath__, "..", tableName)
return self.__manager__._openTable(tablePath.instance, tablePath.table)

def createFile(self, dstPath, info = {}):
dstPath = db_join(self.__instance__, self.__tablepath__, dstPath)
self.__manager__._createFile(dstPath, info)

def copyTo(self, path, dstPath):
dstPath = db_join(self.__instance__, self.__tablepath__, dstPath)
self.__manager__._copyTo(path, dstPath)


def copyFrom(self, path, srcPath):
srcPath = db_join(self.__instance__, self.__tablepath__, srcPath)
self.__manager__._copyFrom(path, srcPath)


class SubmitTarget(Target):
"""
Submission target

This is a target class, but unlike the original target class, which
is generated from a stored pickle, the SubmitTarget receives information from
a JSON style'd data structure from a submission table.
This allows for it to be instantiated outside of a python environment, ie from the command
line or via web request.

Example pipeline.py::

class PipelineRoot(jobTree.SubmitTarget):

def run(self, params):
self.addChildTarget( 'tableScan', GenerateTable(params['tableBase']) )

if __name__ == '__main__':
config = jobTree.manage.Config('tmpdir', 'datadir', 'processExecutor')
manager = jobTree.manage.Manager(config)
instance = manager.submit('test', 'pipeline.PipelineRoot', {'tableBase' : sys.argv[1]} )
manager.wait()


"""
def __init__(self):
pass

def run(self, params):
"""
The run method is user provided and run in a seperate process,
possible on a remote node.

:param params:
Data structure containing submission data
"""
raise Exception()

def openTable(self, tableName):
i = db_join(self.__instance__, tableName)
print "open table:", i
return self.__manager__._openTable(i.instance, i.table)

File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ def prepareQsub(cpu, mem):
def qsub(qsubline):
logger.debug("**"+" ".join(qsubline))
process = subprocess.Popen(qsubline, stdout=subprocess.PIPE)
result = int(process.stdout.readline().strip().split('.')[0])
res_line = process.stdout.readline().strip()
result = int(res_line.split('.')[0])
logger.debug("Got the job id: %s" % (str(result)))
return result

Expand Down Expand Up @@ -127,7 +128,7 @@ class GridengineBatchSystem(AbstractBatchSystem):

def __init__(self, config):
AbstractBatchSystem.__init__(self, config) #Call the parent constructor
self.gridengineResultsFile = config.attrib["results_file"]
self.gridengineResultsFile = os.path.join(config.attrib["job_tree"], "results.txt")
#Reset the job queue and results (initially, we do this again once we've killed the jobs)
self.gridengineResultsFileHandle = open(self.gridengineResultsFile, 'w')
self.gridengineResultsFileHandle.close() #We lose any previous state in this file, and ensure the files existence
Expand Down Expand Up @@ -158,9 +159,10 @@ def addJob(self, command, sgeJobID, issuedJobs, index=None):
self.currentjobs.add(jobID)
self.newJobsQueue.put((sgeJobID, index))

def issueJob(self, command, memory, cpu, logFile):
def issueJob(self, command, memory, cpu): #, logFile):
qsubline = prepareQsub(cpu, memory)
qsubline.extend(['-o', logFile, '-e', logFile, command])
#qsubline.extend(['-o', logFile, '-e', logFile, command])
qsubline.extend([command])
result = qsub(qsubline)
jobs = dict()
self.addJob(command, result, jobs)
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading