diff --git a/jobTree/__init__.py b/jobTree/__init__.py new file mode 100644 index 0000000..f1367e7 --- /dev/null +++ b/jobTree/__init__.py @@ -0,0 +1,259 @@ + +import jobTree.manage +import jobTree.db +from jobTree.db import join as db_join +import sys +import os + +class Stack(object): + + def __init__(self, target): + self.target = target + + @staticmethod + def addJobTreeOptions(parser): + parser.add_option("--batchSystem", dest="batchSystem", + help="The type of batch system to run the job(s) with, currently can be 'singleMachine'/'drmma'. default=%default", + default="drmaa") + + + parser.add_option("--jobTree", dest="jobTree", + help="Directory in which to place job management files \ +(this needs to be globally accessible by all machines running jobs).\n\ +If you pass an existing directory it will check if it's a valid existin job tree, then\ +try and restart the jobs in it", + default=None) + + + + def startJobTree(self, options): + self.options = options + extra_path = os.path.dirname(os.path.abspath(sys.argv[0])) + os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', "") + ":" + extra_path + dbPath = "file://jobTree_workdir" + config = jobTree.manage.Config(dbPath, 'auto', workdir="tmp_dir") + manager = jobTree.manage.Manager(config) + instance = manager.submit('root', self.target, {}) + manager.wait(instance) + db = jobTree.db.connect(dbPath) + for table in db.listTables(instance): + if table.table.endswith("@error"): + errorFound = False + for key, value in db.listKeyValue(table): + sys.stderr.write("%s %s" % (key, value['error'])) + + + + +class TargetBase(object): + """ + Base Target Class + """ + def __init__(self): + self.__manager__ = None + + def __setmanager__(self, manager): + self.__manager__ = manager + self.child_count = 0 + + def __setpath__(self, instance, tablePath): + self.__instance__ = instance + self.__tablepath__ = tablePath + + def __close__(self): + pass + + + + +class Target(TargetBase): + """ + The target class describes a python class to be pickel'd and + run as a remote process at a later time. + """ + + def __init__(self, **kw): + pass + + def run(self): + """ + The run method is user provided and run in a seperate process, + possible on a remote node. + """ + raise Exception() + + def getcwd(self): + return db_join(self.__instance__, self.__tablepath__) + + def addChildTarget(self, child_name, child=None, params={}, out_table=None, chdir=None): + """ + Add child target to be executed + + :param child_name: + Unique name of child to be executed + + :param child: + Target object to be pickled and run as a remote + + + Example:: + + class MyWorker(jobtree.Target): + + def __init__(self, dataBlock): + self.dataBlock = dataBlock + + def run(self): + c = MyOtherTarget(dataBlock.pass_to_child) + self.addChildTarget('child', c ) + + + """ + if isinstance(child_name, TargetBase): + self.__manager__._addChild(self.__instance__, self.__tablepath__, "child_%d" % (self.child_count), child_name, params=params, out_table=out_table, chdir=chdir) + else: + self.__manager__._addChild(self.__instance__, self.__tablepath__, child_name, child, params=params, out_table=out_table, chdir=chdir) + + self.child_count += 1 + + def runTarget(self, child): + """ + Run a target directly (inline). This method give the manager a chance + to setup table and run path info for the target class before calling the 'run' method + + """ + + self.__manager__._runTarget(self, child) + + def setFollowOnTarget(self, child_name, child=None, chdir=None): + if isinstance(child_name, Target): + self.addFollowTarget("follow", child_name, chdir=chdir) + else: + self.addFollowTarget(child_name, child, chdir=chdir) + + def addFollowTarget(self, child_name, child, depends=None, chdir=None): + """ + A follow target is a delayed callback, that isn't run until all + of a targets children have complete + """ + if depends is None: + self.__manager__._addChild(self.__instance__, self.__tablepath__, child_name, child, depends=self.__tablepath__ + "@follow", chdir=chdir) + else: + refs = [] + if isinstance(depends, str): + depends = [depends] + for d in depends: + refs.append(db_join(self.__instance__, self.__tablepath__, d).table) + self.__manager__._addChild(self.__instance__, self.__tablepath__, child_name, child, depends=refs, chdir=chdir) + + def createTable(self, tableName, tableInfo={}): + """ + Create a table to output to + + :param tableName: Name of the table to be opened + + :param tableInfo: An object to descibe the characteristics of the table + + :return: :class:`jobTree.db.table.WriteTable` + + In the case of a target:: + + e02d038d-98a3-494a-9a27-c04b4516ced4:/submit_20120110/the_child + + The call:: + + self.openTable('output') + + Would create the table:: + + e02d038d-98a3-494a-9a27-c04b4516ced4:/submit_20120110/the_child/output + + + """ + return self.__manager__._createTable(self.__instance__, self.__tablepath__ + "/" + tableName, tableInfo) + + def openTable(self, tableName): + """ + Open a table input from. By default, tables belong to the parents, + because they have already finished executing + + :param tableName: Name of the table to be opened + + :return: :class:`jobTree.db.table.ReadTable` + + In the case of a target:: + + e02d038d-98a3-494a-9a27-c04b4516ced4:/submit_20120110/the_child + + The call:: + + self.openTable('input') + + Would open the table:: + + e02d038d-98a3-494a-9a27-c04b4516ced4:/submit_20120110/input + + Because openTable opens in the parent directory (because the parent has already completed) + + + """ + tablePath = db_join(self.__instance__, self.__tablepath__, "..", tableName) + return self.__manager__._openTable(tablePath.instance, tablePath.table) + + def createFile(self, dstPath, info = {}): + dstPath = db_join(self.__instance__, self.__tablepath__, dstPath) + self.__manager__._createFile(dstPath, info) + + def copyTo(self, path, dstPath): + dstPath = db_join(self.__instance__, self.__tablepath__, dstPath) + self.__manager__._copyTo(path, dstPath) + + + def copyFrom(self, path, srcPath): + srcPath = db_join(self.__instance__, self.__tablepath__, srcPath) + self.__manager__._copyFrom(path, srcPath) + + +class SubmitTarget(Target): + """ + Submission target + + This is a target class, but unlike the original target class, which + is generated from a stored pickle, the SubmitTarget receives information from + a JSON style'd data structure from a submission table. + This allows for it to be instantiated outside of a python environment, ie from the command + line or via web request. + + Example pipeline.py:: + + class PipelineRoot(jobTree.SubmitTarget): + + def run(self, params): + self.addChildTarget( 'tableScan', GenerateTable(params['tableBase']) ) + + if __name__ == '__main__': + config = jobTree.manage.Config('tmpdir', 'datadir', 'processExecutor') + manager = jobTree.manage.Manager(config) + instance = manager.submit('test', 'pipeline.PipelineRoot', {'tableBase' : sys.argv[1]} ) + manager.wait() + + + """ + def __init__(self): + pass + + def run(self, params): + """ + The run method is user provided and run in a seperate process, + possible on a remote node. + + :param params: + Data structure containing submission data + """ + raise Exception() + + def openTable(self, tableName): + i = db_join(self.__instance__, tableName) + print "open table:", i + return self.__manager__._openTable(i.instance, i.table) + diff --git a/__init__.py b/jobTree/batchSystems/__init__.py similarity index 100% rename from __init__.py rename to jobTree/batchSystems/__init__.py diff --git a/batchSystems/abstractBatchSystem.py b/jobTree/batchSystems/abstractBatchSystem.py similarity index 100% rename from batchSystems/abstractBatchSystem.py rename to jobTree/batchSystems/abstractBatchSystem.py diff --git a/batchSystems/combinedBatchSystem.py b/jobTree/batchSystems/combinedBatchSystem.py similarity index 100% rename from batchSystems/combinedBatchSystem.py rename to jobTree/batchSystems/combinedBatchSystem.py diff --git a/batchSystems/gridengine.py b/jobTree/batchSystems/gridengine.py similarity index 97% rename from batchSystems/gridengine.py rename to jobTree/batchSystems/gridengine.py index dda633d..b9b6829 100644 --- a/batchSystems/gridengine.py +++ b/jobTree/batchSystems/gridengine.py @@ -79,7 +79,8 @@ def prepareQsub(cpu, mem): def qsub(qsubline): logger.debug("**"+" ".join(qsubline)) process = subprocess.Popen(qsubline, stdout=subprocess.PIPE) - result = int(process.stdout.readline().strip().split('.')[0]) + res_line = process.stdout.readline().strip() + result = int(res_line.split('.')[0]) logger.debug("Got the job id: %s" % (str(result))) return result @@ -127,7 +128,7 @@ class GridengineBatchSystem(AbstractBatchSystem): def __init__(self, config): AbstractBatchSystem.__init__(self, config) #Call the parent constructor - self.gridengineResultsFile = config.attrib["results_file"] + self.gridengineResultsFile = os.path.join(config.attrib["job_tree"], "results.txt") #Reset the job queue and results (initially, we do this again once we've killed the jobs) self.gridengineResultsFileHandle = open(self.gridengineResultsFile, 'w') self.gridengineResultsFileHandle.close() #We lose any previous state in this file, and ensure the files existence @@ -158,9 +159,10 @@ def addJob(self, command, sgeJobID, issuedJobs, index=None): self.currentjobs.add(jobID) self.newJobsQueue.put((sgeJobID, index)) - def issueJob(self, command, memory, cpu, logFile): + def issueJob(self, command, memory, cpu): #, logFile): qsubline = prepareQsub(cpu, memory) - qsubline.extend(['-o', logFile, '-e', logFile, command]) + #qsubline.extend(['-o', logFile, '-e', logFile, command]) + qsubline.extend([command]) result = qsub(qsubline) jobs = dict() self.addJob(command, result, jobs) diff --git a/batchSystems/multijob.py b/jobTree/batchSystems/multijob.py similarity index 100% rename from batchSystems/multijob.py rename to jobTree/batchSystems/multijob.py diff --git a/batchSystems/parasol.py b/jobTree/batchSystems/parasol.py similarity index 100% rename from batchSystems/parasol.py rename to jobTree/batchSystems/parasol.py diff --git a/batchSystems/singleMachine.py b/jobTree/batchSystems/singleMachine.py similarity index 100% rename from batchSystems/singleMachine.py rename to jobTree/batchSystems/singleMachine.py diff --git a/jobTree/db/__init__.py b/jobTree/db/__init__.py new file mode 100644 index 0000000..7dd32c2 --- /dev/null +++ b/jobTree/db/__init__.py @@ -0,0 +1,388 @@ +""" + +Example of database scanning:: + + #!/usr/bin/env python + + import sys + import jobTree.db + + db = jobTree.db.connect(sys.argv[1]) + + for inst in db.listInstances(): + for table in db.listTables(inst): + meta = db.getTableInfo(table) + if 'datatype' in meta and meta['datatype'] == 'cgdata': + for key in db.listKeys(table): + print key + +""" + + +import os +import json +import re +from glob import glob +import shutil +import tempfile +import socket +import threading +import time +import sys +import errno + +try: + from jobTree.net.ttypes import TableRef as TableRefBase +except ImportError: + TableRefBase = object + + class NotImplementedException(Exception): + def __init__(self): + Exception.__init__(self) + + class TableError(Exception): + def __init__(self, message): + Exception.__init__(self, message) + +class TableRef(TableRefBase): + """ + All tables in jobTree identified by two values: + + 1. The instance : A UUID code the identifies a specific pipeline run + 2. The TablePath : A path to a particular table + + In string form, there two items are seperated with a colin, ie:: + + e02d038d-98a3-494a-9a27-c04b4516ced4:/test/myTable + + The table path heiarchy represents the chain of targets and their + children. The top level directory is the original submission name. + Every time a 'addChildTarget' is called on a target a new directory + is created. + + The submission code:: + + instance = manager.submit('submit_20120110', 'mymodule.Submit', {'count' : 5} ) + + Would create the table directory:: + + e02d038d-98a3-494a-9a27-c04b4516ced4:/submit_20120110 + + The target code in mymodule.py :: + + class Submit(jobTree.SubmitTarget): + def run(self, params): + self.addChildTarget('the_child', MyChildTarget(params['count'])) + + Would create the table directory:: + + e02d038d-98a3-494a-9a27-c04b4516ced4:/submit_20120110/the_child + + The child target call:: + + class MyChildTarget(jobTree.Target): + + def __init__(self, count): + self.count = count + + def run(self): + oTable = self.createTable('outTable') + + for i in range(self.count): + oTable.emit("test_%d" % (i), i) + + oTable.close() + + Would create the output table:: + + e02d038d-98a3-494a-9a27-c04b4516ced4:/submit_20120110/the_child/outTable + + And the table would have the key pairs emitted during the for loop + + ====== ===== + key value + ====== ===== + test_0 0 + test_1 1 + test_2 2 + test_3 3 + test_4 4 + ====== ===== + + A :class:`jobTree.db.TableRef` automatically parses table path strings + into a common class + + + Example 1:: + + t = jobTree.db.TableRef('e02d038d-98a3-494a-9a27-c04b4516ced4', '/submit_20120110/the_child/outTable') + + Example 2:: + + t = jobTree.db.TableRef('e02d038d-98a3-494a-9a27-c04b4516ced4:/submit_20120110/the_child/outTable') + + In both cases, t is the same + + """ + + + def __init__(self, instance, table=None): + if table is None and instance.count(":"): + tmp = instance.split(":") + self._instance = tmp[0] + self._table = tmp[1] + else: + self._instance = instance + self._table = table + if not self._table.startswith("/"): + self._table = "/" + self._table + + def __repr__(self): + return repr( (self._instance, self._table) ) + + def __cmp__(self, other): + return cmp(str(self), str(other)) + + def __hash__(self): + return hash(self.toPath()) + + def __eq__(self, other): + return str(self) == str(other) + + @property + def instance(self): + """ + Instance UUID string + """ + return self._instance + + @property + def table(self): + """ + Table path + """ + return self._table + + def __str__(self): + return "%s:%s" % (self._instance, self._table) + + def toPath(self): + """ + Get the string representation of the table reference + """ + return "%s:%s" % (self._instance, self._table) + +def join(*args): + """ + Join togeather series of strings and TableRefs to build absolute TableRef + + :params args: :class:`jobTree.db.TableRef` and strings + + If the first argument is a string, it is assumed to be the instance reference + + Valid examples + + Example 1:: + + > inst_a = "f227a0af-826a-4617-90ee-136acbd42715" + > ref_a = jobTree.db.TableRef(inst_a, "tableTest") + > str(jobTree.db.join(ref_a, "child_a")) == "f227a0af-826a-4617-90ee-136acbd42715:/tableTest/child_a" + True + + Example 2:: + + > inst_a = "f227a0af-826a-4617-90ee-136acbd42715" + > ref_a = jobTree.db.TableRef(inst_a, "tableTest") + str(jobTree.db.join(ref_a, "..", "child_a")) == "f227a0af-826a-4617-90ee-136acbd42715:/child_a" + + Example 3:: + + > inst_a = "f227a0af-826a-4617-90ee-136acbd42715" + > ref_a = jobTree.db.TableRef(inst_a, "tableTest") + > str(jobTree.db.join(ref_a, "/child_a")) == "f227a0af-826a-4617-90ee-136acbd42715:/child_a" + + Example 4:: + + > inst_a = "f227a0af-826a-4617-90ee-136acbd42715" + > inst_b = "a061bfac-987e-4aa8-a3d5-567352b09ed3" + > ref_a = jobTree.db.TableRef(inst_a, "tableTest") + > ref_b = jobTree.db.TableRef(inst_b, "child_1") + > jobTree.db.join(ref_a, ref_b) == "a061bfac-987e-4aa8-a3d5-567352b09ed3:/child_1" + True + > jobTree.db.join(ref_a, "a061bfac-987e-4aa8-a3d5-567352b09ed3:/child_1") == "a061bfac-987e-4aa8-a3d5-567352b09ed3:/child_1" + True + > jobTree.db.join("f227a0af-826a-4617-90ee-136acbd42715:/tableTest/child_a", "a061bfac-987e-4aa8-a3d5-567352b09ed3:/child_1") == "a061bfac-987e-4aa8-a3d5-567352b09ed3:/child_1" + True + + """ + inst = None + path = [] + for a in args: + if isinstance(a, TableRef): + inst = a.instance + path = a.table.split("/") + elif a.count(":"): + tmp = a.split(":") + inst = tmp[0] + path = tmp[1].split("/") + else: + if inst is None: + inst = a + else: + path.append(a) + return TableRef(inst, os.path.abspath(os.path.join("/", *path))) + +def basename(path): + return os.path.basename(str(path)) + +class DBBase: + """ + Base DB interface. This interface can be implemented via file system + ie :class:`jobTree.db.FileDB` or via network database interface + """ + + def getPath(self): + """ + Get the connection URL of the server + """ + raise NotImplementedException() + + + def createInstance(self, instance, instanceInfo): + """ + Create an instance in the database + """ + raise NotImplementedException() + + def getInstanceInfo(self, instance): + """ + Get info about an instance + """ + raise NotImplementedException() + + + def hasTable(self, tableRef): + """ + Check for existance of table + """ + raise NotImplementedException() + + def createTable(self, tableRef, tableInfo): + """ + Create a table in a given instance + """ + raise NotImplementedException() + + def createFile(self, path, fileInfo): + """ + Create a file in a given instance + """ + raise NotImplementedException() + + def getTableInfo(self, tableRef): + """ + Get table information + """ + raise NotImplementedException() + + def addData(self, table, key, value): + """ + Add data to table + """ + raise NotImplementedException() + + def getValue(self, table, key): + """ + Get values from table associated with key + """ + raise NotImplementedException() + + def listKeyValue(self, table): + """ + List the key value pairs stored in a table + """ + raise NotImplementedException() + + def listKeys(self, table): + """ + List all of the keys in a table + """ + raise NotImplementedException() + + def hasKey(self, table, key): + """ + Check if table has a key + """ + raise NotImplementedException() + + def listInstances(self): + """ + List instances found in a database + """ + raise NotImplementedException() + + def listTables(self, instance): + """ + List tables associated with an instance + """ + raise NotImplementedException() + + def hasAttachment(self, table, key, name): + """ + Check if named attachment for a key exists. + """ + raise NotImplementedException() + + def listAttachments(self, table, key): + """ + List attachments for a given key + """ + raise NotImplementedException() + + def copyTo(self, path, table, key=None, name=None): + """ + Copy file to attachment associated to a key + """ + raise NotImplementedException() + + def copyFrom(self, path, table, key=None, name=None): + """ + Copy file from attachment associated to a key + """ + raise NotImplementedException() + + def readAttachment(self, table, key, name): + """ + Get file like handle to read attachment + """ + raise NotImplementedException() + + + + + +dbType = { +"file" : "jobTree.db.filedb.FileDB", +"jobTree" : "jobTree.db.thrift_net.Client" +} + +def connect(path): + """ + Connect to a jobTree Database + + :param path: A string describing the address of the database, ie file://data_dir or jobTree://server01:16020 + + :returns: :class:`jobTree.db.DBBase` + + """ + tmp = path.split("://") + if tmp[0] in dbType: + className = dbType[tmp[0]] + tmp1 = className.split('.') + mod = __import__(".".join(tmp1[:-1])) + cls = mod + for a in tmp1[1:]: + cls = getattr(cls, a) + return cls(tmp[1]) + return None + diff --git a/jobTree/db/filedb.py b/jobTree/db/filedb.py new file mode 100644 index 0000000..2c5eaa0 --- /dev/null +++ b/jobTree/db/filedb.py @@ -0,0 +1,231 @@ + +import os +import re +import json +import shutil +import tempfile +from glob import glob +from jobTree.db import DBBase +from urllib import quote, unquote + +from jobTree.db.fslock import LockFile +from jobTree.db import TableRef + +def path_quote(word): + return quote(word).replace("/", '%2F') + + +class FileDB(DBBase): + """ + Implementation of DB API using a file system + """ + def __init__(self, basedir): + self.basedir = os.path.abspath(basedir) + self.out_handle = {} + + def flush(self): + for a in self.out_handle: + self.out_handle[a].close() + shutil.move( self.out_handle[a].name, self.out_handle[a].name.replace("@data_shard.", "@data.") ) + self.out_handle = {} + + def getPath(self): + return "file://" + self.basedir + + def createInstance(self, instance, instanceInfo): + instdir = os.path.join(self.basedir, instance) + if not os.path.exists(instdir): + os.makedirs(instdir) + handle = open( os.path.join(instdir, "@info"), "w") + handle.write(json.dumps(instanceInfo)) + handle.close() + + def getInstanceInfo(self, instance): + instdir = os.path.join(self.basedir, instance) + ipath = os.path.join(instdir, "@info") + if os.path.exists(ipath): + handle = open( ipath ) + info = json.loads(handle.read()) + handle.close() + return info + return {} + + def hasTable(self, tableRef): + fsPath = self._getFSPath(tableRef) + return os.path.exists(fsPath + "@info") + + def hasFile(self, tableRef): + fsPath = self._getFSPath(tableRef) + return os.path.exists(fsPath + "@finfo") + + def deleteTable(self, tableRef): + fsPath = self._getFSPath(tableRef) + for path in glob(fsPath + "@data.*"): + os.unlink(path) + os.unlink(fsPath + "@info") + if os.path.exists(fsPath + "@archive"): + shutil.rmtree(fsPath + "@archive") + + def deleteFile(self, tableRef): + fsPath = self._getFSPath(tableRef) + os.unlink(fsPath + "@finfo") + os.unlink(fsPath + "@file") + + def deleteInstance(self, instance): + fspath = os.path.join(self.basedir, instance) + shutil.rmtree(fspath) + + def getTableInfo(self, tableRef): + path = self._getFSPath(tableRef) + "@info" + if not os.path.exists(path): + return {} + handle = open(path) + data = json.loads(handle.read()) + handle.close() + return data + + def createTable(self, tableRef, tableInfo): + fsDir = os.path.dirname(self._getFSPath(tableRef)) + if not os.path.exists(fsDir): + try: + os.makedirs(fsDir) + except OSError: + pass + handle = open(self._getFSPath(tableRef) + "@info", "w") + handle.write(json.dumps(tableInfo)) + handle.close() + + def createFile(self, pathRef, fileInfo): + fsDir = os.path.dirname(self._getFSPath(pathRef)) + if not os.path.exists(fsDir): + try: + os.makedirs(fsDir) + except OSError: + pass + handle = open(self._getFSPath(pathRef) + "@finfo", "w") + handle.write(json.dumps(fileInfo)) + handle.close() + + def _getFSPath(self, table): + return os.path.join(self.basedir, table.instance, re.sub(r'^/', '', table.table)) + + def addData(self, table, key, value): + fspath = self._getFSPath(table) + if table not in self.out_handle: + self.out_handle[table] = tempfile.NamedTemporaryFile(dir=os.path.dirname(fspath), prefix=os.path.basename(table.table) + "@data_shard.", delete=False) + self.out_handle[table].write( key ) + self.out_handle[table].write( "\t" ) + self.out_handle[table].write(json.dumps(value)) + self.out_handle[table].write( "\n" ) + self.out_handle[table].flush() + + def getValue(self, table, key): + fsPath = self._getFSPath(table) + out = [] + for path in glob(fsPath + "@data.*"): + handle = open(path) + for line in handle: + tmp = line.split("\t") + if tmp[0] == key: + out.append(json.loads(tmp[1])) + handle.close() + return out + + def listKeyValue(self, table): + path = self._getFSPath(table) + out = [] + for path in glob(path + "@data.*"): + handle = open(path) + for line in handle: + tmp = line.split("\t") + out.append( (tmp[0], json.loads(tmp[1]) ) ) + handle.close() + return out + + def listKeys(self, table): + out = [] + fspath = self._getFSPath(table) + for path in glob(fspath + "@data.*"): + handle = open(path) + for line in handle: + tmp = line.split("\t") + out.append(tmp[0]) + handle.close() + return out + + def hasKey(self, table, key): + o = self.listKeys(table) + return key in o + + def listInstances(self): + out = [] + for path in glob(os.path.join(self.basedir, "*")): + out.append(os.path.basename(path)) + out.sort() + return out + + + def _dirscan(self, dir, inst): + out = {} + for path in glob(os.path.join(dir, "*")): + if path.count("@data"): + tableName = re.sub(os.path.join(self.basedir, inst), "", re.sub("@data.*", "", path)) + tableRef = TableRef(inst, tableName) + out[ tableRef ] = True + else: + if not path.endswith("@attach"): + if os.path.isdir(path): + for a in self._dirscan( path, inst ): + out[a] = True + return out.keys() + + + def listTables(self, instance): + out = self._dirscan( os.path.abspath(os.path.join(self.basedir, instance) ), instance ) + out.sort() + return out + + def hasAttachment(self, table, key, name): + path = self._getFSPath(table) + attachPath = os.path.join(path + "@attach", key, name) + return os.path.exists(attachPath) + + def listAttachments(self, table, key): + path = self._getFSPath(table) + out = [] + attachPath = os.path.join( path + "@attach", key) + for path in glob( os.path.join(attachPath, "*") ): + out.append(unquote(os.path.basename(path))) + return out + + def copyTo(self, path, table, key=None, name=None): + fspath = self._getFSPath(table) + with LockFile(fspath, lock_break=120): + if key is None: + attachPath = fspath + "@file" + shutil.copy( path, attachPath ) + else: + attachPath = os.path.join( fspath + "@attach", key, path_quote(name)) + keyDir = os.path.dirname(attachPath) + if not os.path.exists(keyDir): + try: + os.makedirs(keyDir) + except OSError: + pass + shutil.copy( path, attachPath ) + + def copyFrom(self, path, table, key=None, name=None): + fspath = self._getFSPath(table) + with LockFile(fspath, lock_break=120): + if key is None: + attachPath = fspath + "@file" + shutil.copy( attachPath, path ) + else: + attachPath = os.path.join( fspath + "@attach", key, path_quote(name)) + shutil.copy( attachPath, path ) + + def readAttachment(self, table, key, name): + fspath = self._getFSPath(table) + attachPath = os.path.join( fspath + "@attach", key, path_quote(name)) + return open(attachPath) + diff --git a/jobTree/db/fslock.py b/jobTree/db/fslock.py new file mode 100644 index 0000000..370cf74 --- /dev/null +++ b/jobTree/db/fslock.py @@ -0,0 +1,203 @@ + +import os +import socket +import threading +import time +import errno + +""" +NFS ready file lock based on Lockfile (http://code.google.com/p/pylockfile/) +""" + +class Error(Exception): + """ + Base class for other exceptions. + + >>> try: + ... raise Error + ... except Exception: + ... pass + """ + pass + +class LockError(Error): + """ + Base class for error arising from attempts to acquire the lock. + + >>> try: + ... raise LockError + ... except Error: + ... pass + """ + pass + + +class UnlockError(Error): + """ + Base class for errors arising from attempts to release the lock. + + >>> try: + ... raise UnlockError + ... except Error: + ... pass + """ + pass + +class LockFailed(LockError): + """Lock file creation failed for some other reason. + + >>> try: + ... raise LockFailed + ... except LockError: + ... pass + """ + pass + + +class NotLocked(UnlockError): + """Raised when an attempt is made to unlock an unlocked file. + + >>> try: + ... raise NotLocked + ... except UnlockError: + ... pass + """ + pass + + +class NotMyLock(UnlockError): + """Raised when an attempt is made to unlock an unlocked file. + + >>> try: + ... raise NotLocked + ... except UnlockError: + ... pass + """ + pass + + +class LockFile: + """Lock file by creating a directory.""" + def __init__(self, path, threaded=True, uniq_mux=None, lock_break=None): + """ + >>> lock = LinkLockFile('somefile') + >>> lock = LinkLockFile('somefile', threaded=False) + """ + self.path = path + self.lock_file = os.path.abspath(path) + "@lock" + self.hostname = socket.gethostname() + self.pid = os.getpid() + if threaded: + t = threading.current_thread() + # Thread objects in Python 2.4 and earlier do not have ident + # attrs. Worm around that. + ident = getattr(t, "ident", hash(t)) + self.tname = "-%x" % (ident & 0xffffffff) + else: + self.tname = "" + + self.unique_name = "%s.%s.%s%s" % (self.lock_file, + self.hostname, + self.tname, + self.pid) + if uniq_mux is not None: + self.unique_name = self.unique_name + "." + uniq_mux + self.lock_break = lock_break + + def acquire(self, timeout=None, lock_break=None): + if lock_break is None: + lock_break = self.lock_break + end_time = time.time() + if timeout is not None and timeout > 0: + end_time += timeout + + if timeout is None: + wait = 0.1 + else: + wait = max(0, timeout / 10) + + last_atime = 0 + last_adiff = 0 + while True: + try: + fp = open(self.unique_name, 'w') + fp.write(self.unique_name) + finally: + fp.close() + try: + os.link(self.unique_name, self.lock_file) + return + except OSError as error: + if error.errno == errno.ENOENT: + if os.stat(self.unique_name).st_nlink == 2: + #we may have obtained the lock, despite the error + #release, and try again to make sure + try: + os.unlink(self.lock_file) + except OSError: + pass + + if lock_break is not None: + try: + cur_atime = os.stat(self.lock_file).st_atime + if last_atime != cur_atime: + last_atime = cur_atime + last_adiff = cur_atime - time.time() + else: + if last_adiff - (cur_atime - time.time()) > lock_break: + try: + os.unlink(self.lock_file) + except OSError: + pass + except OSError: + pass + if timeout is not None and time.time() > end_time: + if timeout > 0: + raise LockTimeout("Timeout waiting to acquire" + " lock for %s" % + self.path) + else: + raise AlreadyLocked("%s is already locked" % + self.path) + + time.sleep(timeout/10 if timeout is not None else 0.1) + + try: + os.unlink(self.unique_name) + except OSError: + pass + + + def release(self): + if not self.is_locked(): + raise NotLocked + elif not os.path.exists(self.unique_name): + raise NotMyLock + try: + os.unlink(self.lock_file) + except OSError: + pass + try: + os.unlink(self.unique_name) + except OSError: + pass + + def is_locked(self): + return os.path.exists(self.lock_file) + + def i_am_locking(self): + return (self.is_locked() and + os.path.exists(self.unique_name)) + + def __enter__(self): + """ + Context manager support. + """ + self.acquire() + return self + + def __exit__(self, *_exc): + """ + Context manager support. + """ + self.release() diff --git a/jobTree/db/table.py b/jobTree/db/table.py new file mode 100644 index 0000000..bc5f07b --- /dev/null +++ b/jobTree/db/table.py @@ -0,0 +1,129 @@ + +import os +import json +import jobTree.db + +class WriteTable(object): + """ + A WriteTable is used for output, and can only be written to. + """ + def __init__(self, db, table_ref): + self.db = db + self.table_ref = table_ref + + def close(self): + """ + Properly flush and close table + """ + self.db.flush() + self.handle = None + + def emit(self, key, value): + """ + Emit a key value pair + + :param key: + Key to be emited + + :param value: + Value to be emitted. Note, this value must be :func:`json.dumps` serialisable + """ + self.db.addData(self.table_ref, key, value) + + def copyTo(self, path, key, name): + """ + Copy a file to the output table + + :param path: + Path of the file to be copied + + :param key: + Key the file will be associated with + + :param name: + Name the attachment will be stored as + """ + self.db.copyTo(path, self.table_ref, key, name) + + + def getPath(self): + """ + Get absolute path of table + + :returns: Sting of table's absolute path + """ + + return self.table_ref.toPath() + +class ReadTable(object): + def __init__(self, db, table_ref): + self.db = db + self.table_ref = table_ref + if not self.db.hasTable(table_ref): + raise jobTree.db.TableError("Table not found:" + str(table_ref)) + + def __iter__(self): + """ + Iterator through stack and return sets of key, value pairs + """ + return self.db.listKeyValue(self.table_ref).__iter__() + + def get(self, key): + """ + Get values associated with key + + :param key: Key to get + + :returns: List of values associated with key + """ + return self.db.getValue(self.table_ref, key) + + def hasKey(self, key): + """ + Does table have the key? + + :param key: the key + + :returns: Boolean + """ + return self.db.hasKey(self.table_ref, key) + + def copyFrom(self, path, key, name): + """ + Copy file from table + + :param path: Path to store file at: + + :param key: Key to copy attachment from + + :param name: Name of the attachment + + """ + return self.db.copyFrom(path, self.table_ref, key, name) + + def readAttachment(self, key, name): + """ + Get file like handle to read from attachment + + :param path: Path to store file at: + + :param key: Key to copy attachment from + + :param name: Name of the attachment + + """ + return self.db.readAttachment(self.table_ref, key, name) + + def listKeys(self): + """ + List keys in table + """ + return self.db.listKeys(self.table_ref) + + def getPath(self): + """ + Get absolute path of table + + :returns: Sting of table's absolute path + """ + return self.table_ref.toPath() diff --git a/jobTree/db/util.py b/jobTree/db/util.py new file mode 100644 index 0000000..106f18f --- /dev/null +++ b/jobTree/db/util.py @@ -0,0 +1,18 @@ + +import tempfile +import os +def copy_instance(src_db, inst, dst_db): + dst_db.createInstance(inst, src_db.getInstanceInfo(inst)) + for table in src_db.listTables(inst): + dst_db.createTable(table, src_db.getTableInfo(table)) + for key, val in src_db.listKeyValue(table): + dst_db.addData(table, key, val) + for key in src_db.listKeys(table): + for name in src_db.listAttachments(table,key): + handle, tmp = tempfile.mkstemp() + os.close(handle) + src_db.copyFrom(tmp, table, key, name) + dst_db.copyTo(tmp, table, key, name) + os.unlink(tmp) + + diff --git a/jobTree/manage/__init__.py b/jobTree/manage/__init__.py new file mode 100644 index 0000000..80d8b4f --- /dev/null +++ b/jobTree/manage/__init__.py @@ -0,0 +1,705 @@ +""" +The jobTree.manage module provide classes related to job submission/control +and task running. + +""" + +import uuid +import imp +import os +import re +import sys +import tempfile +import logging +import pickle +import time +import datetime +import traceback +import shutil +import socket +import json + +import jobTree.db +import jobTree.db.table + +logging.basicConfig(filename="jobTree.log", level=logging.INFO) +#logging.basicConfig(level=logging.DEBUG) + + +class UnimplementedMethod(Exception): + def __init__(self): + Exception.__init__(self) + +class InvalidScheduleRequest(Exception): + def __init__(self, msg): + Exception.__init__(self, msg) + + + +def inherit_config(): + """ + This checks environmental variables to look for configuration info + left from a parent JobTree process + """ + if "JOBTREE_DB" in os.environ: + config = Config(os.environ["JOBTREE_DB"], os.environ.get("JOBTREE_ENGINE", None)) + return config + return None + +""" +Map of names of execution engines to the actual code +that implements them +""" +executorMap = { + 'auto' : 'jobTree.manage.autoSelect', + 'process' : 'jobTree.manage.processExecutor.ProcessExecutor', + 'drmaa' : 'jobTree.manage.drmaaExecutor.DRMAAExecutor' +} + + +def autoSelect(): + import jobTree.manage.drmaaExecutor + if jobTree.manage.drmaaExecutor.isReady(): + return jobTree.manage.drmaaExecutor.DRMAAExecutor() + import jobTree.manage.processExecutor + return jobTree.manage.processExecutor.ProcessExecutor() + + +class Config: + """ + JobTree Manager Configuration. Defines the database path, working directory + and execution engine name(optional) + + :param dbpath: URL of the database, ie file://datadir or jobtree://server01:16016 + + :param engineName: Name of the engine to do the work: ie 'auto', 'process', 'drmaa' + + :param wordir: Base temp directory for work + """ + def __init__(self, dbpath, engineName=None, workdir="/tmp"): + self.workdir = os.path.abspath(workdir) + self.dbpath = dbpath + if engineName is not None: + try: + if engineName in executorMap: + engineName = executorMap[engineName] + tmp = engineName.split('.') + modName = ".".join(tmp[:-1]) + className = tmp[-1] + mod = __import__(modName) + cls = mod + for n in tmp[1:]: + cls = getattr(cls, n) + self.executor = cls() + except Exception: + raise UnimplementedMethod() + else: + self.executor = None + + +class Worker: + def __init__(self, config, appletPath): + self.config = config + self.appletPath = appletPath + self.local_children = False + + def run(self): + """ + The work running method + + """ + db = jobTree.db.connect( self.config.dbpath ) + if not os.path.exists(self.config.workdir): + os.mkdir(self.config.workdir) + tmpdir = tempfile.mkdtemp(dir=self.config.workdir) + + app = self.appletPath.split(":") + + instRef = jobTree.db.TableRef(app[0], app[1]) + parentName = re.sub("@request$", "", app[1] ) + appName = app[2] + appPath = parentName + app[2] + + logging.info("instanceREF: " + str(instRef)) + logging.info("parent: " + parentName) + logging.info("appname: " + appName) + + runVal = None + for val in db.getValue(instRef, appName): + runVal = val + if runVal is None: + raise Exception("Instance Entry not found") + + #look at the submission record to determine with environments need + #to be copied to the workdir + if "_environment" in runVal: + envRef = jobTree.db.TableRef(instRef.instance, "@environment") + for env in runVal["_environment"]: + for name in db.listAttachments(envRef, env): + opath = os.path.join(tmpdir, name) + logging.info("client copy: " + opath) + if not os.path.exists(os.path.dirname(opath)): + os.makedirs(os.path.dirname(opath)) + db.copyFrom(opath, envRef, env, name) + #add work dir to path list, so code can be used for pickel and object instances + sys.path.insert(0, tmpdir) + + + manager = jobTree.manage.Manager(self.config) + errorRef = jobTree.db.TableRef(instRef.instance, parentName + "@error") + + if '_submitInit' in runVal: + try: + runClass = runVal['_submitInit'] + sys.path.insert( 0, os.path.abspath(tmpdir) ) + tmp = runClass.split('.') + mod = __import__(tmp[0]) + cls = mod + for n in tmp[1:]: + cls = getattr(cls, n) + if issubclass(cls, jobTree.SubmitTarget): + obj = cls() + else: + obj = cls(runVal) + except Exception: + db.addData(errorRef, appName, {'error' : str(traceback.format_exc())}) + return + elif db.hasAttachment(instRef, appName, "pickle"): + db.copyFrom(tmpdir + "/pickle", instRef, appName, "pickle") + handle = open(tmpdir + "/pickle") + try: + obj = pickle.loads(handle.read()) + except Exception: + db.addData(errorRef, appName, {'error' : str(traceback.format_exc()), 'host' : socket.gethostname(), 'dir' : tmpdir}) + db.flush() + return + handle.close() + + cwd = os.getcwd() + os.chdir(tmpdir) + obj.__setpath__(instRef.instance, appPath) + obj.__setmanager__(manager) + self.inputList = [] + self.outputList = [] + manager._set_callback(self) + if '_outTable' in runVal: + obj.__outTableRef__ = jobTree.db.TableRef(instRef.instance, runVal['_outTable']) + obj.__outTable__ = jobTree.db.table.WriteTable(db, obj.__outTableRef__) + logging.debug("Starting user code") + + stdout_path = os.path.abspath(os.path.join(tmpdir, "jobTree.stdout")) + stderr_path = os.path.abspath(os.path.join(tmpdir, "jobTree.stderr")) + + child = os.fork() + if not child: + stdout_backup = os.dup(1) + stderr_backup = os.dup(2) + os.close(1) + os.open(stdout_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) + os.close(2) + os.open(stderr_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) + try: + """ + if isinstance(obj, jobTree.Target): + obj.run(runVal) + elif isinstance(obj, jobTree.MultiApplet): + obj.__run__() + else: + """ + if isinstance(obj, jobTree.SubmitTarget): + obj.run(runVal) + else: + obj.run() + os.close(1) + os.dup(stdout_backup) + os.close(2) + os.dup(stderr_backup) + except Exception: + os.close(1) + os.dup(stdout_backup) + os.close(2) + os.dup(stderr_backup) + logging.error("user code error") + db.addData(errorRef, appName, {'error' : str(traceback.format_exc()), 'time' : datetime.datetime.now().isoformat(), 'host' : socket.gethostname()}) + db.copyTo(stdout_path, errorRef, appName, "stdout") + db.copyTo(stderr_path, errorRef, appName, "stderr") + obj.__close__() + doneRef = jobTree.db.TableRef(instRef.instance, parentName + "@done") + db.addData(doneRef, appName, { 'time' : datetime.datetime.now().isoformat(), 'input' : self.inputList, 'output' : self.outputList }) + db.flush() + sys.exit(0) + else: + pid, stat = os.waitpid(child,0) + + + #send our writes to the DB + db.flush() + + logging.info("user code done") + + #clean up workspace + os.chdir(cwd) + #shutil.rmtree(tmpdir) + + + #if we have spawned local children, take care of them using the process manager + if self.local_children: + logging.info("Starting Local Process manager") + cConf = Config(self.config.dbpath, "process", workdir=self.config.workdir) + cManager = Manager(cConf) + cManager.wait(instRef.instance, appPath + "/@request") + + + def callback_openTable(self, ref): + self.inputList.append(str(ref)) + + def callback_createTable(self, ref): + self.outputList.append(str(ref)) + + def callback_addChild(self, child): + pass + """ + if isinstance(child, jobTree.LocalTarget): + self.local_children = True + """ + + +class TaskExecutor: + """ + The process executor handles the execution of child tasks. + """ + + def runCmd(self, name, cmdline, stdin=None): + raise UnimplementedMethod() + + def getSlotCount(self): + raise UnimplementedMethod() + + def getActiveCount(self): + raise UnimplementedMethod() + + def poll(self): + raise UnimplementedMethod() + + +class Task: + """ + A task represents a target workload to be run by an executor + """ + def __init__(self, manager, tableRef, jobName, jobInfo): + """ + + :param manager: + The manager the task belongs to + + :param tableRef: + The tableRef of the @request table that holds the work request + + :param jobInfo: + Job Information + + :param key: + The name of the particular task + + """ + self.tableRef = tableRef + self.jobName = jobName + self.manager = manager + self.jobInfo = jobInfo + + def getName(self): + return "%s:%s" % (self.tableRef, self.jobName) + + + def run(self, taskExec): + logging.info("Task Start:" + self.getName()) + cmd = [ sys.executable, "-m", "jobTree.manage.worker", self.manager.db.getPath(), self.manager.config.workdir, str(self.tableRef) + ":" + self.jobName ] + taskExec.runCmd(self.getName(), cmd) + + +class TaskManager: + def __init__(self, manager, executor): + self.manager = manager + self.task_queue = {} + self.executor = executor + self.active_tasks = {} + + def addTask(self, task): + if task.getName() not in self.task_queue: + logging.debug("TaskManager Adding Task: " + task.getName()) + self.task_queue[ task.getName() ] = task + + def taskCount(self): + return len(self.task_queue) + + def isFull(self): + jMax = self.executor.getMaxJobs() + if jMax is None or len(self.task_queue) < jMax: + return False + return True + + def cycle(self): + change = False + jMax = self.executor.getMaxJobs() + for t in self.task_queue: + if t not in self.active_tasks: + if jMax is None or len(self.active_tasks) < jMax: + self.task_queue[t].run(self.executor) + self.active_tasks[t] = True + change = True + dmap = self.executor.poll() + for t in dmap: + logging.info("Task Complete: " + t) + del self.task_queue[t] + del self.active_tasks[t] + change = True + return change + +class Applet(str): + def __init__(self, applet): + self.moduleName = applet + self.module = __import__( self.moduleName ) + + def getBase(self): + return os.path.dirname( self.module.__file__ ) + + def getManifest(self): + if hasattr(self.module, "__manifest__"): + return self.module.__manifest__ + jobTree_file = os.path.join(self.getBase(), "jobTree.info") + if os.path.exists(jobTree_file): + handle = open(jobTree_file) + data = handle.read() + handle.close() + meta = json.loads(data) + return meta['manifest'] + return None + + def getIncludes(self): + return getattr( self.module, '__include__', [] ) + + def getName(self): + return self.module.__name__ + +class ErrorFound(Exception): + def __init__(self): + pass + +class Manager: + """ + :param config: A :class:`jobTree.manage.Config` + + Usage:: + + if __name__ == '__main__': + config = jobTree.manage.Config(os.getcwd(), 'workdir') + manager = jobTree.manage.Manager(config) + instance = manager.submit('test', + 'genomicDist.MakeGenomicDistMatrix', + {'basedir' : os.path.abspath(sys.argv[1])} + ) + + """ + + def __init__(self, config): + """ + :param config: A :class:`jobTree.manage.Config` + + """ + self.config = config + self.applet_map = {} + self.callback = None + self.db = jobTree.db.connect(self.config.dbpath) + if self.config.executor is not None: + self.task_manager = TaskManager(self, self.config.executor) + else: + self.task_manager = None + + def _set_callback(self, callback): + self.callback = callback + + def submit(self, submitName, className, submitData={}, instance=None, instanceData={}, depends=None): + """ + Create new pipeline instance. + + :param submitName: A string to name the root target. + + :param className: A string defining the name of a :class:`jobTree.SubmitTarget` + that will be started with the submission data + + :param submitData: A block of data that can be processed via :func:`json.dumps` + and will be passed to an instance of the named class at run time + """ + + instRef = jobTree.db.TableRef(instance, "@request") + if instance is None: + instance = str(uuid.uuid4()) #Instance(str(uuid.uuid4())) + self.db.createInstance(instance, instanceData) + else: + if self.db.hasKey(instRef, submitName): + raise Exception("Submit Key Exists") + + submitData['_submitKey'] = submitName + submitData['_instance'] = instance + if depends is not None: + submitData['_depends'] = depends + if isinstance(className, str): + submitData['_submitInit'] = className + submitData['_environment'] = self.import_applet(instance, className.split('.')[0]) + instRef = jobTree.db.TableRef(instance, "@request") + self.db.createTable(instRef, {}) + self.db.createTable(jobTree.db.TableRef(instance, "@done"), {}) + self.db.createTable(jobTree.db.TableRef(instance, "@error"), {}) + self.db.addData( instRef, submitName, submitData) + self.db.flush() + elif isinstance(className, jobTree.Target): + className.__setpath__(instance, submitName) + self._addChild(instance, "", submitName, className) + + #elif isinstance(className, jobTree.SubmitTarget): + #self.import_applet(instance, className.__module__) + #className.__setpath__(instance, submitName) + #className.__setmanager__(self) + #className.run() + else: + raise InvalidScheduleRequest("Invalid Submission Class") + return instance + + def import_applet(self, inst, applet): + """ + Scan jobTree module and copy dependency files into environment table + """ + + a = Applet(applet) + + impList = [ a.getName() ] + added = { a.getName() : True } + + envRef = jobTree.db.TableRef(inst, "@environment") + self.db.createTable(envRef, {}) + while len(impList): + n = {} + for modName in impList: + added[modName] = True + a = Applet(modName) + modbase = os.path.dirname(os.path.abspath(a.module.__file__)) + "/" + if a.module.__package__ is not None: + modbase = os.path.dirname(os.path.dirname( modbase )) + "/" + + self.db.addData(envRef, modName, {}) + dirname = os.path.abspath(a.getBase()) + print "applet base:", a.getBase(), dirname + for f in a.getManifest(): + logging.info("copy: " + modbase + " " + os.path.join( dirname, f ).replace(modbase, "")) + #print "copy: " + modbase + " " + os.path.join( dirname, f ).replace(modbase, "") + #self.db.copyTo( os.path.join(a.getBase(), f), envRef, modName, os.path.join( dirname, f ).replace(modbase, "") ) + self.db.flush() + for inc in a.getIncludes(): + n[inc] = True + + impList = [] + for inc in n: + if not self.db.hasKey( envRef, inc ): + impList.append(inc) + return added.keys() + + def scan(self, instance, table=None): + """ + Scan an instance, or just one table in an instance, looking for work requests + """ + found = False + jobMap = {} + if table is None: + logging.debug("START_TASKSCAN:" + datetime.datetime.now().isoformat()) + for tableRef in self.db.listTables(instance): + if tableRef.table.endswith("@request"): + jt = self.scan(instance, tableRef.table) + for k in jt: + jobMap[k] = jt[k] + logging.debug("END_TASKSCAN:" + datetime.datetime.now().isoformat()) + else: + tableRef = jobTree.db.TableRef(instance, table) + tableBase = re.sub(r'/@request$', '', tableRef.table) + doneRef = jobTree.db.TableRef(tableRef.instance, tableBase + "/@done") + errorRef = jobTree.db.TableRef(tableRef.instance, tableBase + "/@error") + + doneHash = {} + errorFound = False + for k in self.db.listKeys(doneRef): + doneHash[k] = True + for k in self.db.listKeys(errorRef): + errorFound = True + + if errorFound: + raise ErrorFound() + + has_children = False + for key, value in self.db.listKeyValue(tableRef): + if key not in doneHash: + #self.task_manager.addTask(Task(self, instance, table, key)) + if '_depends' not in value: + has_children = True + task = Task(self, tableRef, key, value) + jobMap[ tableRef.instance + ":" + os.path.join(tableBase, key) ] = task + found = True + if has_children: + jobMap[ tableRef.instance + ":" + tableBase + "@follow" ] = None + #return found + return jobMap + + def wait(self, instance, table=None): + """ + Wait for the completion of a pipeline. + + :param instance: The instance to wait for + :raises: If an exector has not been defined in the configuration, then + an exception will be raised + """ + if self.task_manager is None: + raise Exception("Executor not defined") + sleepTime = 1 + while 1: + added = False + if not self.task_manager.isFull(): + #this is the global parent, so scan the whole tree + if table is None: + jobMap = {} + try: + jobMap = self.scan(instance) + except ErrorFound: + break + if len(jobMap) == 0 and self.task_manager.taskCount() == 0: + break + + logging.debug(str(jobMap)) + for j in jobMap: + dfound = False + if jobMap[j] is not None and "_local" not in jobMap[j].jobInfo: + if "_depends" in jobMap[j].jobInfo: + for dname in jobMap[j].jobInfo["_depends"]: + dref = jobTree.db.join(jobMap[j].tableRef.instance, dname) + dpath = dref.instance + ":" + dref.table + logging.debug("dpath: " + j + " " + jobMap[j].tableRef.table + " "+ dpath) + if dpath in jobMap: + dfound = True + if not dfound: + if self.task_manager.addTask(jobMap[j]): + added = True + else: + logging.debug( "delay: " + j ) + else: + #this is a child (local) instance watcher + #NOTE: this doesn't handle followOn targets + try: + jobMap = self.scan(instance, table) + except ErrorFound: + break + if len(jobMap) == 0 and self.task_manager.taskCount() == 0: + break + for j in jobMap: + if jobMap[j] is not None and "_local" in jobMap[j].jobInfo: + if self.task_manager.addTask(jobMap[j]): + added = True + + if self.task_manager.cycle(): + added = True + if added: + sleepTime = 1 + else: + if sleepTime < 30: + sleepTime += 1 + logging.debug("SleepTime:" + str(sleepTime)) + time.sleep(sleepTime) + + def _createTable(self, inst, tablePath, tableInfo): + ref = jobTree.db.TableRef(inst, tablePath) + fs = jobTree.db.connect(self.config.dbpath) + logging.info("Creating Table %s:%s" % (inst, tablePath)) + fs.createTable(ref, tableInfo) + if self.callback is not None: + self.callback.callback_createTable(ref) + return jobTree.db.table.WriteTable(fs, ref) + + def _openTable(self, inst, tablePath): + ref = jobTree.db.TableRef(inst, tablePath) + fs = jobTree.db.connect(self.config.dbpath) + if self.callback is not None: + self.callback.callback_openTable(ref) + return jobTree.db.table.ReadTable(fs, ref) + + def _createFile(self, tablePath, tableInfo): + fs = jobTree.db.connect(self.config.dbpath) + fs.createFile(tablePath, tableInfo) + + def _copyTo(self, path, dstPath): + fs = jobTree.db.connect(self.config.dbpath) + fs.copyTo(path, dstPath) + + def _copyFrom(self, path, srcPath): + fs = jobTree.db.connect(self.config.dbpath) + fs.copyFrom(path, srcPath) + + def _runTarget(self, parent, obj): + obj.__setpath__(parent.__instance__, parent.__tablepath__) + obj.__setmanager__(self) + obj.run() + + def _addChild(self, obj_instance, obj_tablepath, child_name, child, depends=None, out_table=None, params={}, chdir=None): + if chdir is None: + instRef = jobTree.db.TableRef(obj_instance, obj_tablepath + "/@request") + else: + instRef = jobTree.db.TableRef(obj_instance, os.path.join(obj_tablepath, chdir) + "/@request") + + if not self.db.hasTable(instRef): + self.db.createTable(instRef, {}) + self.db.createTable(jobTree.db.TableRef(instRef.instance, instRef.table.replace("/@request", "/@done")), {}) + self.db.createTable(jobTree.db.TableRef(instRef.instance, instRef.table.replace("/@request", "/@error")), {}) + + logging.info("Adding Child %s" % (child_name)) + meta = params + if depends is not None: + if isinstance(depends, str): + meta["_depends"]= [ depends ] + else: + meta["_depends"]= depends + + """ + if isinstance(child, str): + meta['_submitInit'] = child + meta['_environment'] = self.import_applet(obj_instance, child.split('.')[0]) + + elif isinstance(child, jobTree.MultiApplet): + tableRef = jobTree.db.TableRef(obj.__instance__, os.path.abspath(os.path.join(obj.__tablepath__, child.__keyTable__))) + meta["_keyTable"] = tableRef.toPath() + meta["_environment"] = [child.__module__] + elif isinstance(child, jobTree.LocalTarget): + if isinstance(obj, jobTree.LocalSubmitTarget): + raise InvalidScheduleRequest("LocalSubmitTarget Cannot have LocalChildren") + meta["_local"] = jobTree.db.TableRef(obj.__instance__, obj.__tablepath__).toPath() + meta["_environment"] = [ child.__module__ ] + + else: + meta["_environment"] = [ child.__module__ ] + """ + + """ + if isinstance(child, jobTree.TableTarget) and out_table is None: + raise InvalidScheduleRequest("Must define output table for TableTarget") + """ + if out_table is not None: + meta["_outTable"] = out_table + tableRef = jobTree.db.TableRef(obj_instance, out_table) + if not self.db.hasTable(tableRef): + self.db.createTable(tableRef, {}) + + self.db.addData(instRef, child_name, meta) + if not isinstance(child, str): + tmp = tempfile.NamedTemporaryFile() + tmp.write(pickle.dumps(child)) + tmp.flush() + self.db.copyTo(tmp.name, instRef, child_name, "pickle") + tmp.close() + if self.callback is not None: + self.callback.callback_addChild(child) + + self.db.flush() diff --git a/jobTree/manage/applet.py b/jobTree/manage/applet.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/jobTree/manage/applet.py @@ -0,0 +1 @@ + diff --git a/jobTree/manage/drmaaExecutor.py b/jobTree/manage/drmaaExecutor.py new file mode 100644 index 0000000..8b4ad52 --- /dev/null +++ b/jobTree/manage/drmaaExecutor.py @@ -0,0 +1,63 @@ + +import jobTree.manage +import os +import logging +import traceback + +try: + import drmaa +except ImportError: + logging.error("Python DRMAA not installed") + drmaa = None +except RuntimeError: + drmaa = None + logging.error(traceback.format_exc()) + + +def isReady(): + if drmaa is None: + return False + try: + s = drmaa.Session() + s.initialize() + s.exit() + return True + except Exception: + logging.error(traceback.format_exc()) + return False + +class DRMAAExecutor(jobTree.manage.TaskExecutor): + + def __init__(self): + self.task_queue = {} + self.sess=drmaa.Session() + self.sess.initialize() + + def getMaxJobs(self): + return None + + def runCmd(self, name, cmd): + jt = self.sess.createJobTemplate() + jt.remoteCommand = cmd[0] + jt.args = cmd[1:] + jt.joinFiles=True + jt.outputPath = ":/dev/null" + jt.errorPath = ":/dev/null" + jt.jobEnvironment = os.environ + jt.nativeSpecification = "-l h_vmem=10G -pe smp 2" #BUG: Need flexible native specifications parameter + jt.workingDirectory = os.getcwd() + jobid = self.sess.runJob(jt) + self.task_queue[name] = jobid + self.sess.deleteJobTemplate(jt) + + def poll(self): + out = {} + for t in self.task_queue: + ret = self.sess.jobStatus(self.task_queue[t]) + if ret in [ drmaa.JobState.DONE, drmaa.JobState.FAILED, drmaa.JobState.UNDETERMINED, drmaa.JobState.SYSTEM_SUSPENDED, drmaa.JobState.USER_SUSPENDED ]: + if ret is not None: + out[t] = ret + + for t in out: + del self.task_queue[t] + return out diff --git a/jobTree/manage/manager.py b/jobTree/manage/manager.py new file mode 100644 index 0000000..3b03488 --- /dev/null +++ b/jobTree/manage/manager.py @@ -0,0 +1,8 @@ + +import jobTree.manage +import sys + +if __name__ == "__main__": + config = jobTree.manage.Config( sys.argv[1], sys.argv[2], sys.argv[3] ) + manager = jobTree.manage.Manager( config ) + manager.wait(sys.argv[4]) diff --git a/jobTree/manage/processExecutor.py b/jobTree/manage/processExecutor.py new file mode 100644 index 0000000..0f9795b --- /dev/null +++ b/jobTree/manage/processExecutor.py @@ -0,0 +1,31 @@ + +import jobTree.manage +import multiprocessing + +import subprocess + +def isReady(): + return True + +class ProcessExecutor(jobTree.manage.TaskExecutor): + + def __init__(self): + self.task_queue = {} + + def getMaxJobs(self): + return multiprocessing.cpu_count() + + def runCmd(self, name, cmd): + print "Running Task %s: %s" % (name, cmd) + self.task_queue[name] = subprocess.Popen(cmd) + + def poll(self): + out = {} + for t in self.task_queue: + ret = self.task_queue[t].poll() + if ret is not None: + out[t] = ret + + for t in out: + del self.task_queue[t] + return out diff --git a/jobTree/manage/worker.py b/jobTree/manage/worker.py new file mode 100644 index 0000000..cd18202 --- /dev/null +++ b/jobTree/manage/worker.py @@ -0,0 +1,18 @@ + +import sys +import jobTree.manage +import jobTree.db +import tempfile +import os + + +if __name__ == "__main__": + """ + worker.py ::key + """ + if len(sys.argv) == 4: + config = jobTree.manage.Config(sys.argv[1], workdir=sys.argv[2]) + worker = jobTree.manage.Worker(config=config, appletPath=sys.argv[3]) + worker.run() + + diff --git a/batchSystems/__init__.py b/jobTree/scriptTree/__init__.py similarity index 100% rename from batchSystems/__init__.py rename to jobTree/scriptTree/__init__.py diff --git a/jobTree/scriptTree/stack.py b/jobTree/scriptTree/stack.py new file mode 100644 index 0000000..83197b2 --- /dev/null +++ b/jobTree/scriptTree/stack.py @@ -0,0 +1,2 @@ + +from jobTree import Stack \ No newline at end of file diff --git a/jobTree/scriptTree/target.py b/jobTree/scriptTree/target.py new file mode 100644 index 0000000..bbe0848 --- /dev/null +++ b/jobTree/scriptTree/target.py @@ -0,0 +1,2 @@ + +from jobTree import Target \ No newline at end of file diff --git a/scriptTree/__init__.py b/jobTree/src/__init__.py similarity index 100% rename from scriptTree/__init__.py rename to jobTree/src/__init__.py diff --git a/jobTree/src/bioio.py b/jobTree/src/bioio.py new file mode 100644 index 0000000..0431cff --- /dev/null +++ b/jobTree/src/bioio.py @@ -0,0 +1,11 @@ + +import logging +import subprocess +import os +def system(cmd): + print os.getcwd() + print cmd + subprocess.check_call(cmd, shell=True) + +logger = logging.getLogger() + diff --git a/src/common.py b/jobTree/src/common.py similarity index 100% rename from src/common.py rename to jobTree/src/common.py diff --git a/src/job.py b/jobTree/src/job.py similarity index 100% rename from src/job.py rename to jobTree/src/job.py diff --git a/src/jobTreeKill.py b/jobTree/src/jobTreeKill.py similarity index 100% rename from src/jobTreeKill.py rename to jobTree/src/jobTreeKill.py diff --git a/src/jobTreeRun.py b/jobTree/src/jobTreeRun.py similarity index 100% rename from src/jobTreeRun.py rename to jobTree/src/jobTreeRun.py diff --git a/src/jobTreeSlave.py b/jobTree/src/jobTreeSlave.py similarity index 100% rename from src/jobTreeSlave.py rename to jobTree/src/jobTreeSlave.py diff --git a/src/jobTreeStats.py b/jobTree/src/jobTreeStats.py similarity index 100% rename from src/jobTreeStats.py rename to jobTree/src/jobTreeStats.py diff --git a/src/jobTreeStatus.py b/jobTree/src/jobTreeStatus.py similarity index 100% rename from src/jobTreeStatus.py rename to jobTree/src/jobTreeStatus.py diff --git a/src/master.py b/jobTree/src/master.py similarity index 99% rename from src/master.py rename to jobTree/src/master.py index 4f7f40d..975aeac 100644 --- a/src/master.py +++ b/jobTree/src/master.py @@ -567,7 +567,7 @@ def makeGreyAndReissueJob(job): logger.info("Finished the main loop, now must finish deleting files") startTimeForRemovingFiles = time.time() jobRemover.join() - logger.critical("It took %i seconds to finish deleting files" % (time.time() - startTimeForRemovingFiles)) + logger.info("It took %i seconds to finish deleting files" % (time.time() - startTimeForRemovingFiles)) if stats: fileHandle = open(getStatsFileName(config.attrib["job_tree"]), 'ab') diff --git a/scriptTree/stack.py b/scriptTree/stack.py deleted file mode 100644 index 0c55b47..0000000 --- a/scriptTree/stack.py +++ /dev/null @@ -1,222 +0,0 @@ -#!/usr/bin/env python - -#Copyright (C) 2011 by Benedict Paten (benedictpaten@gmail.com) -# -#Permission is hereby granted, free of charge, to any person obtaining a copy -#of this software and associated documentation files (the "Software"), to deal -#in the Software without restriction, including without limitation the rights -#to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -#copies of the Software, and to permit persons to whom the Software is -#furnished to do so, subject to the following conditions: -# -#The above copyright notice and this permission notice shall be included in -#all copies or substantial portions of the Software. -# -#THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -#IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -#FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -#AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -#LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -#OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -#THE SOFTWARE. - -import sys -import os -import time -from optparse import OptionParser -try: - import cPickle -except ImportError: - import pickle as cPickle - -import xml.etree.cElementTree as ET - -from sonLib.bioio import logger -from sonLib.bioio import setLogLevel -from sonLib.bioio import setLoggingFromOptions -from sonLib.bioio import getTempFile -from sonLib.bioio import getTempDirectory -from sonLib.bioio import system -from sonLib.bioio import getTotalCpuTimeAndMemoryUsage, getTotalCpuTime - -from jobTree.src.jobTreeRun import addOptions -from jobTree.src.jobTreeRun import createJobTree -from jobTree.src.jobTreeRun import reloadJobTree -from jobTree.src.jobTreeRun import createFirstJob -from jobTree.src.jobTreeRun import loadEnvironment -from jobTree.src.master import mainLoop - -from jobTree.scriptTree.target import Target - -class Stack: - """Holds together a stack of targets and runs them. - The only public methods are documented at the top of this file.. - """ - def __init__(self, target): - self.target = target - self.verifyTargetAttributesExist(target) - - @staticmethod - def getDefaultOptions(): - """Returns am optparse.Values object name (string) : value - options used by job-tree. See the help string - of jobTree to see these options. - """ - parser = OptionParser() - Stack.addJobTreeOptions(parser) - options, args = parser.parse_args(args=[]) - assert len(args) == 0 - return options - - @staticmethod - def addJobTreeOptions(parser): - """Adds the default job-tree options to an optparse - parser object. - """ - addOptions(parser) - - def startJobTree(self, options): - """Runs jobtree using the given options (see Stack.getDefaultOptions - and Stack.addJobTreeOptions). - """ - self.verifyJobTreeOptions(options) - setLoggingFromOptions(options) - options.jobTree = os.path.abspath(options.jobTree) - if os.path.isdir(options.jobTree): - config, batchSystem = reloadJobTree(options.jobTree) - else: - config, batchSystem = createJobTree(options) - #Setup first job. - command = self.makeRunnable(options.jobTree) - memory = self.getMemory() - cpu = self.getCpu() - createFirstJob(command, config, memory=memory, cpu=cpu) - loadEnvironment(config) - return mainLoop(config, batchSystem) - -##### -#The remainder of the class is private to the user -#### - - def makeRunnable(self, tempDir): - pickleFile = getTempFile(".pickle", tempDir) - fileHandle = open(pickleFile, 'w') - cPickle.dump(self, fileHandle, cPickle.HIGHEST_PROTOCOL) - fileHandle.close() - i = set() - for importString in self.target.importStrings: - i.add(importString) - classNames = " ".join(i) - return "scriptTree %s %s" % (pickleFile, classNames) - - def getMemory(self, defaultMemory=sys.maxint): - memory = self.target.getMemory() - if memory == sys.maxint: - return defaultMemory - return memory - - def getCpu(self, defaultCpu=sys.maxint): - cpu = self.target.getCpu() - if cpu == sys.maxint: - return defaultCpu - return cpu - - def getLocalTempDir(self): - self.tempDirAccessed = True - return self.localTempDir - - def getGlobalTempDir(self): - return getTempDirectory(rootDir=self.globalTempDir) - - def execute(self, job, stats, localTempDir, globalTempDir, - memoryAvailable, cpuAvailable, - defaultMemory, defaultCpu): - self.tempDirAccessed = False - self.localTempDir = localTempDir - self.globalTempDir = globalTempDir - - if stats != None: - startTime = time.time() - startClock = getTotalCpuTime() - - baseDir = os.getcwd() - - self.target.setStack(self) - #Debug check that we have the right amount of CPU and memory for the job in hand - targetMemory = self.target.getMemory() - if targetMemory != sys.maxint: - assert targetMemory <= memoryAvailable - targetCpu = self.target.getCpu() - if targetCpu != sys.maxint: - assert targetCpu <= cpuAvailable - #Run the target, first cleanup then run. - self.target.run() - #Change dir back to cwd dir, if changed by target (this is a safety issue) - if os.getcwd() != baseDir: - os.chdir(baseDir) - #Cleanup after the target - if self.tempDirAccessed: - system("rm -rf %s/*" % self.localTempDir) - self.tempDirAccessed = False - #Handle the follow on - followOn = self.target.getFollowOn() - if followOn is not None: #Target to get rid of follow on when done. - if self.target.isGlobalTempDirSet(): - followOn.setGlobalTempDir(self.target.getGlobalTempDir()) - followOnStack = Stack(followOn) - job.addFollowOnCommand((followOnStack.makeRunnable(self.globalTempDir), - followOnStack.getMemory(defaultMemory), - followOnStack.getCpu(defaultCpu))) - - #Now add the children to the newChildren stack - newChildren = self.target.getChildren() - newChildren.reverse() - while len(newChildren) > 0: - childStack = Stack(newChildren.pop()) - job.addChildCommand((childStack.makeRunnable(self.globalTempDir), - childStack.getMemory(defaultMemory), - childStack.getCpu(defaultCpu))) - - #Now build jobs for each child command - for childCommand, runTime in self.target.getChildCommands(): - job.addChildCommand((childCommand, defaultMemory, defaultCpu)) - - for message in self.target.getMasterLoggingMessages(): - job.addMessage(message) - - #Finish up the stats - if stats != None: - stats = ET.SubElement(stats, "target") - stats.attrib["time"] = str(time.time() - startTime) - totalCpuTime, totalMemoryUsage = getTotalCpuTimeAndMemoryUsage() - stats.attrib["clock"] = str(totalCpuTime - startClock) - stats.attrib["class"] = ".".join((self.target.__class__.__name__,)) - stats.attrib["memory"] = str(totalMemoryUsage) - - def verifyJobTreeOptions(self, options): - """ verifyJobTreeOptions() returns None if all necessary values - are present in options, otherwise it raises an error. - It can also serve to validate the values of the options. - """ - required = ['logLevel', 'command', 'batchSystem', 'jobTree'] - for r in required: - if r not in vars(options): - raise RuntimeError("Error, there is a missing option (%s) from the scriptTree Stack, " - "did you remember to call Stack.addJobTreeOptions()?" % r) - if options.jobTree is None: - raise RuntimeError("Specify --jobTree") - - def verifyTargetAttributesExist(self, target): - """ verifyTargetAttributesExist() checks to make sure that the Target - instance has been properly instantiated. Returns None if instance is OK, - raises an error otherwise. - """ - required = ['_Target__followOn', '_Target__children', '_Target__childCommands', - '_Target__time', '_Target__memory', '_Target__cpu', 'globalTempDir'] - for r in required: - if r not in vars(target): - raise RuntimeError("Error, there is a missing attribute, %s, from a Target sub instance %s, " - "did you remember to call Target.__init__(self) in the %s " - "__init__ method?" % ( r, target.__class__.__name__, - target.__class__.__name__)) - diff --git a/scriptTree/target.py b/scriptTree/target.py deleted file mode 100644 index 10ff0bb..0000000 --- a/scriptTree/target.py +++ /dev/null @@ -1,136 +0,0 @@ -#!/usr/bin/env python - -#Copyright (C) 2011 by Benedict Paten (benedictpaten@gmail.com) -# -#Permission is hereby granted, free of charge, to any person obtaining a copy -#of this software and associated documentation files (the "Software"), to deal -#in the Software without restriction, including without limitation the rights -#to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -#copies of the Software, and to permit persons to whom the Software is -#furnished to do so, subject to the following conditions: -# -#The above copyright notice and this permission notice shall be included in -#all copies or substantial portions of the Software. -# -#THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -#IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -#FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -#AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -#LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -#OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -#THE SOFTWARE. - -import sys -from sonLib.bioio import system - -class Target: - """Each job wrapper extends this class. - """ - - def __init__(self, time=sys.maxint, memory=sys.maxint, cpu=sys.maxint): - """This method must be called by any overiding constructor. - """ - self.__followOn = None - self.__children = [] - self.__childCommands = [] - self.__memory = memory - self.__time = time #This parameter is no longer used. - self.__cpu = cpu - self.globalTempDir = None - if self.__module__ == "__main__": - raise RuntimeError("The module name of class %s is __main__, which prevents us from serialising it properly, \ -please ensure you re-import targets defined in main" % self.__class__.__name__) - self.importStrings = set((".".join((self.__module__, self.__class__.__name__)),)) - self.loggingMessages = [] - - def run(self): - """Do user stuff here, including creating any follow on jobs. - This function must not re-pickle the pickle file, which is an input file. - """ - pass - - def setFollowOnTarget(self, followOn): - """Set the follow on target. - Will complain if follow on already set. - """ - assert self.__followOn == None - self.__followOn = followOn - - def addChildTarget(self, childTarget): - """Adds the child target to be run as child of this target. - """ - self.__children.append(childTarget) - - def addChildCommand(self, childCommand, runTime=sys.maxint): - """A command to be run as child of the job tree. - """ - self.__childCommands.append((str(childCommand), float(runTime))) - - def getRunTime(self): - """Get the time the target is anticipated to run. - """ - return self.__time - - def getGlobalTempDir(self): - """Get the global temporary directory. - """ - #Check if we have initialised the global temp dir - doing this - #just in time prevents us from creating temp directories unless we have to. - if self.globalTempDir == None: - self.globalTempDir = self.stack.getGlobalTempDir() - return self.globalTempDir - - def getLocalTempDir(self): - """Get the local temporary directory. - """ - return self.stack.getLocalTempDir() - - def getMemory(self): - """Returns the number of bytes of memory that were requested by the job. - """ - return self.__memory - - def getCpu(self): - """Returns the number of cpus requested by the job. - """ - return self.__cpu - - def getFollowOn(self): - """Get the follow on target. - """ - return self.__followOn - - def getChildren(self): - """Get the child targets. - """ - return self.__children[:] - - def getChildCommands(self): - """Gets the child commands, as a list of tuples of strings and floats, representing the run times. - """ - return self.__childCommands[:] - - def logToMaster(self, string): - """Send a logging message to the master. Will only reported if logging is set to INFO level in the master. - """ - self.loggingMessages.append(str(string)) - -#### -#Private functions -#### - - def setGlobalTempDir(self, globalTempDir): - """Sets the global temp dir. - """ - self.globalTempDir = globalTempDir - - def isGlobalTempDirSet(self): - return self.globalTempDir != None - - def setStack(self, stack): - """Sets the stack object that is calling the target. - """ - self.stack = stack - - def getMasterLoggingMessages(self): - return self.loggingMessages[:] diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..118c6ad --- /dev/null +++ b/setup.py @@ -0,0 +1,61 @@ + +import sys +import os + + +if sys.version_info[:2] < (2, 5): + print "Biopython requires Python 2.5 or better. Python %d.%d detected" % \ + sys.version_info[:2] + sys.exit(-1) + + +from distutils.core import setup +from distutils.core import Command +from distutils.command.install import install +from distutils.command.build_py import build_py +from distutils.command.build_ext import build_ext +from distutils.extension import Extension + +class test_jobTree(Command): + + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + + def run(self): + os.chdir("test") + sys.path.insert(0, '') + import runTests + runTests.main([]) + + +PACKAGES = [ + 'jobTree', + 'jobTree.db', + 'jobTree.manage', + 'jobTree.src', + 'jobTree.batchSystems', + 'jobTree.scriptTree' +] + +__version__ = "0.1a" + +setup( + name='jobTree', + version=__version__, + author='Benedict Paten', + author_email='', + url='http://hgwdev.cse.ucsc.edu/~benedict/code/jobTree.html', + description='System to organize .', + download_url='https://github.com/benedictpaten/jobTree', + cmdclass = { + "test" : test_jobTree, + }, + + packages=PACKAGES, +) diff --git a/src/__init__.py b/src/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/test/__init__.py b/test/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/test/jobTree/__init__.py b/test/jobTree/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/test/jobTree/jobTest.py b/test/jobTree/jobTest.py deleted file mode 100644 index b51ed24..0000000 --- a/test/jobTree/jobTest.py +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env python -"""Test Job class -""" - -import unittest -import os -import time -import sys - -from sonLib.bioio import parseSuiteTestOptions -from sonLib.bioio import logger - -from jobTree.src.job import Job, readJob - -class TestCase(unittest.TestCase): - def testJob(self): - tempJobFile = os.path.join(os.getcwd(), "jobTestTempJob") - command = "by your command" - memory = 2^32 - cpu = 1 - parentFile = "the parent file" - globalTempDir = "the_global_temp_dir" - retryCount = 100 - - for i in xrange(10): - startTime = time.time() - for j in xrange(1000): - j = Job(command, memory, cpu, parentFile, globalTempDir, retryCount) - j.write(tempJobFile) - j = readJob(tempJobFile) - print "It took %f seconds to load/unload jobs" % (time.time() - startTime) #We've just used it for benchmarking, so far - #Would be good to extend this trivial test - os.remove(tempJobFile) - -def main(): - parseSuiteTestOptions() - sys.argv = sys.argv[:1] - unittest.main() - -if __name__ == '__main__': - #import cProfile - #cProfile.run('main()', "fooprof") - #import pstats - #p = pstats.Stats('fooprof') - #p.strip_dirs().sort_stats(-1).print_stats() - #print p - main() diff --git a/test/jobTree/jobTreeTest.py b/test/jobTree/jobTreeTest.py deleted file mode 100644 index 1e84efd..0000000 --- a/test/jobTree/jobTreeTest.py +++ /dev/null @@ -1,79 +0,0 @@ -#!/usr/bin/env python -"""Tests jobTree with the single machine batch system. -""" - -import unittest -import os -import os.path -import sys -import random -import subprocess -import xml.etree.cElementTree as ET - -from sonLib.bioio import system -from sonLib.bioio import TestStatus -from sonLib.bioio import parseSuiteTestOptions -from sonLib.bioio import logger -from sonLib.bioio import TempFileTree -from sonLib.bioio import getTempFile - -from jobTree.src.common import parasolIsInstalled, gridEngineIsInstalled - -class TestCase(unittest.TestCase): - - def setUp(self): - unittest.TestCase.setUp(self) - self.jobTreeDir = os.path.join(os.getcwd(), "jobTree") #A directory for the job tree to be created in - self.tempFileTreeDir = os.path.join(os.getcwd(), "tempFileTree") #Ensures that file tree is visible - self.tempFileTree = TempFileTree(self.tempFileTreeDir) #A place to get temp files from - - def tearDown(self): - unittest.TestCase.tearDown(self) - self.tempFileTree.destroyTempFiles() - system("rm -rf %s %s" % (self.jobTreeDir, self.tempFileTreeDir)) #Cleanup the job tree in case it hasn't already been cleaned up. - - # only done in singleMachine for now. Experts can run manually on other systems if they choose - def dependenciesTest(self, batchSystem="singleMachine"): - def fn(tree, maxJobs, maxThreads, size, cpusPerJob, sleepTime): - system("rm -rf %s" % self.jobTreeDir) - logName = self.tempFileTree.getTempFile(suffix="_comblog.txt", makeDir=False) - commandLine = "jobTreeTest_Dependencies.py --jobTree %s --logFile %s --batchSystem '%s' --tree %s --maxJobs %s --maxThreads %s --size %s --cpusPerJob=%s --sleepTime %s" % \ - (self.jobTreeDir, logName, batchSystem, tree, maxJobs, maxThreads, size, cpusPerJob, sleepTime) - system(commandLine) - - fn("comb", 10, 100, 100, 1, 10) - fn("comb", 200, 100, 100, 20, 10) - - fn("fly", 10, 8, 100, 1, 10) - fn("fly", 10, 8, 100, 2, 10) - - fn("balanced", 5, 10, 100, 1, 10) - fn("balanced", 5, 10, 100, 3, 10) - - def testJobTree_dependencies_singleMachine(self): - self.dependenciesTest(batchSystem="singleMachine") - - def testJobTree_dependencies_combined(self): - self.dependenciesTest(batchSystem="singleMachine singleMachine 1000000") - - def testJobTree_dependencies_parasol(self): - return - if parasolIsInstalled(): - self.dependenciesTest(batchSystem="parasol") - - def testJobTree_dependencies_gridengine(self): - return - if gridEngineIsInstalled(): - self.dependenciesTest(batchSystem="gridengine") - -def runJobTreeStatusAndFailIfNotComplete(jobTreeDir): - command = "jobTreeStatus --jobTree %s --failIfNotComplete --verbose" % jobTreeDir - system(command) - -def main(): - parseSuiteTestOptions() - sys.argv = sys.argv[:1] - unittest.main() - -if __name__ == '__main__': - main() diff --git a/test/jobTree/jobTreeTest_Dependencies.py b/test/jobTree/jobTreeTest_Dependencies.py deleted file mode 100644 index 337b6f1..0000000 --- a/test/jobTree/jobTreeTest_Dependencies.py +++ /dev/null @@ -1,261 +0,0 @@ -#!/usr/bin/env python -""" Test program designed to catch two bugs encountered when developing -progressive cactus: -1) spawning a daemon process causes indefinite jobtree hangs -2) jobtree does not properly parallelize jobs in certain types of -recursions - -If jobs get hung up until the daemon process -finsishes, that would be a case of bug 1). If jobs do not -get issued in parallel (ie the begin UP does not happen -concurrently for the leaves of the comb tree), then that is -a case of bug 2). This is now verified if a log file is -specified (--logFile [path] option) - ---Glenn Hickey -""" - -import os -from time import sleep -import random -import datetime -import sys -import math - -from sonLib.bioio import system -from optparse import OptionParser -import xml.etree.cElementTree as ET - -from jobTree.src.bioio import getLogLevelString -from jobTree.src.bioio import logger -from jobTree.src.bioio import setLoggingFromOptions - -from jobTree.scriptTree.target import Target -from jobTree.scriptTree.stack import Stack - - -from sonLib.bioio import spawnDaemon - -def writeLog(self, msg, startTime): - timeStamp = str(datetime.datetime.now() - startTime) - self.logToMaster("%s %s" % (timeStamp, msg)) - -def balancedTree(): - t = dict() - t["Anc00"] = [1,2] - t[1] = [11, 12] - t[2] = [21, 22] - t[11] = [111, 112] - t[12] = [121, 122] - t[21] = [211, 212] - t[22] = [221, 222] - t[111] = [1111, 1112] - t[112] = [1121, 1122] - t[121] = [1211, 1212] - t[122] = [1221, 1222] - t[211] = [2111, 2112] - t[212] = [2121, 2122] - t[221] = [2211, 2212] - t[222] = [2221, 2222] - t[1111] = [] - t[1112] = [] - t[1121] = [] - t[1122] = [] - t[1211] = [] - t[1212] = [] - t[1221] = [] - t[1222] = [] - t[2111] = [] - t[2112] = [] - t[2121] = [] - t[2122] = [] - t[2211] = [] - t[2212] = [] - t[2221] = [] - t[2222] = [] - return t - -def starTree(n = 10): - t = dict() - t["Anc00"] = range(1,n) - for i in range(1,n): - t[i] = [] - return t - -# odd numbers are leaves -def combTree(n = 100): - t = dict() - for i in range(0,n): - if i % 2 == 0: - t[i] = [i+1, i+2] - else: - t[i] = [] - t[i+1] = [] - t[i+2] = [] - t["Anc00"] = t[0] - return t - -# dependencies of the internal nodes of the fly12 tree -# note that 2, 5, 8 and 10 have no dependencies -def flyTree(): - t = dict() - t["Anc00"] = ["Anc01", "Anc03"] - t["Anc02"] = [] - t["Anc01"] = ["Anc02"] - t["Anc03"] = ["Anc04"] - t["Anc04"] = ["Anc05", "Anc06"] - t["Anc05"] = [] - t["Anc06"] = ["Anc07"] - t["Anc07"] = ["Anc08", "Anc09"] - t["Anc08"] = [] - t["Anc09"] = ["Anc10"] - t["Anc10"] = [] - return t - -class FirstJob(Target): - def __init__(self, tree, event, sleepTime, startTime, cpu): - Target.__init__(self, cpu=cpu) - self.tree = tree - self.event = event - self.sleepTime = sleepTime - self.startTime = startTime - self.cpu = cpu - - def run(self): - sleep(1) - self.addChildTarget(DownJob(self.tree, self.event, - self.sleepTime, self.startTime, self.cpu)) - - self.setFollowOnTarget(LastJob()) - -class LastJob(Target): - def __init__(self): - Target.__init__(self) - - def run(self): - sleep(1) - pass - -class DownJob(Target): - def __init__(self, tree, event, sleepTime, startTime, cpu): - Target.__init__(self, cpu=cpu) - self.tree = tree - self.event = event - self.sleepTime = sleepTime - self.startTime = startTime - self.cpu = cpu - - def run(self): - writeLog(self, "begin Down: %s" % self.event, self.startTime) - children = self.tree[self.event] - for child in children: - writeLog(self, "add %s as child of %s" % (child, self.event), - self.startTime) - self.addChildTarget(DownJob(self.tree, child, - self.sleepTime, self.startTime, self.cpu)) - - if len(children) == 0: - self.setFollowOnTarget(UpJob(self.tree, self.event, - self.sleepTime, self.startTime, self.cpu)) - return 0 - -class UpJob(Target): - def __init__(self, tree, event, sleepTime, startTime, cpu): - Target.__init__(self, cpu=cpu) - self.tree = tree - self.event = event - self.sleepTime = sleepTime - self.startTime = startTime - self.cpu = cpu - - def run(self): - writeLog(self, "begin UP: %s" % self.event, self.startTime) - - sleep(self.sleepTime) - spawnDaemon("sleep %s" % str(int(self.sleepTime) * 10)) - writeLog(self, "end UP: %s" % self.event, self.startTime) - -# let k = maxThreads. we make sure that jobs are fired in batches of k -# so the first k jobs all happen within epsilon time of each other, -# same for the next k jobs and so on. we allow at most alpha time -# between the different batches (ie between k+1 and k). -def checkLog(options): - epsilon = float(options.sleepTime) / 2.0 - alpha = options.sleepTime * 2.0 - logFile = open(options.logFile, "r") - stamps = [] - for logLine in logFile: - if "begin UP" in logLine: - chunks = logLine.split() - assert len(chunks) == 10 - timeString = chunks[6] - timeObj = datetime.datetime.strptime(timeString, "%H:%M:%S.%f") - timeStamp = timeObj.hour * 3600. + timeObj.minute * 60. + \ - timeObj.second + timeObj.microsecond / 1000000. - stamps.append(timeStamp) - - stamps.sort() - - maxThreads = int(options.maxThreads) - maxCpus = int(options.maxJobs) - maxConcurrentJobs = min(maxThreads, maxCpus) - cpusPerThread = float(maxCpus) / maxConcurrentJobs - cpusPerJob = int(options.cpusPerJob) - assert cpusPerJob >= 1 - assert cpusPerThread >= 1 - threadsPerJob = 1 - if cpusPerJob > cpusPerThread: - threadsPerJob = math.ceil(cpusPerJob / cpusPerThread) - maxConcurrentJobs = int(maxConcurrentJobs / threadsPerJob) - #print "Info on jobs", cpusPerThread, cpusPerJob, threadsPerJob, maxConcurrentJobs - assert maxConcurrentJobs >= 1 - for i in range(1,len(stamps)): - delta = stamps[i] - stamps[i-1] - if i % maxConcurrentJobs != 0: - if delta > epsilon: - raise RuntimeError("jobs out of sync: i=%d delta=%f threshold=%f" % - (i, delta, epsilon)) - elif delta > alpha: - raise RuntimeError("jobs out of sync: i=%d delta=%f threshold=%f" % - (i, delta, alpha)) - - logFile.close() - -def main(): - parser = OptionParser() - Stack.addJobTreeOptions(parser) - parser.add_option("--sleepTime", dest="sleepTime", type="int", - help="sleep [default=5] seconds", default=5) - parser.add_option("--tree", dest="tree", - help="tree [balanced|comb|star|fly]", default="comb") - parser.add_option("--size", dest="size", type="int", - help="tree size (for comb or star) [default=10]", - default=10) - parser.add_option("--cpusPerJob", dest="cpusPerJob", - help="Cpus per job", default="1") - - options, args = parser.parse_args() - setLoggingFromOptions(options) - - startTime = datetime.datetime.now() - - if options.tree == "star": - tree = starTree(options.size) - elif options.tree == "balanced": - tree = balancedTree() - elif options.tree == "fly": - tree = flyTree() - else: - tree = combTree(options.size) - - baseTarget = FirstJob(tree, "Anc00", options.sleepTime, startTime, int(options.cpusPerJob)) - Stack(baseTarget).startJobTree(options) - - if options.logFile is not None: - checkLog(options) - -if __name__ == '__main__': - from jobTree.test.jobTree.jobTreeTest_Dependencies import * - main() - - diff --git a/test/jobTree/machineList b/test/jobTree/machineList deleted file mode 100644 index 76cc74e..0000000 --- a/test/jobTree/machineList +++ /dev/null @@ -1 +0,0 @@ -localhost 1 4096 /tmp /tmp 4096 r1 \ No newline at end of file diff --git a/src/bioio.py b/test/runTests.py similarity index 50% rename from src/bioio.py rename to test/runTests.py index b9f6b0d..eae4137 100644 --- a/src/bioio.py +++ b/test/runTests.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python #Copyright (C) 2011 by Benedict Paten (benedictpaten@gmail.com) # @@ -20,15 +19,49 @@ #OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN #THE SOFTWARE. + import sys import os +import distutils.util + + +import unittest + +testList = [ +#("jobTree", "jobTreeTest"), +#("scriptTree", "scriptTreeTest"), +#("sort", "sortTest"), +#("utilities", "statsTest") +('unit', 'test_chain'), +('unit', 'test_dbi') +] + +def main( names ): + + if len(names) == 0: + names = testList + + test_path = sys.path[0] or "." + sys.path.insert(1, os.path.dirname( os.path.realpath( __file__ ) ) ) + build_path = os.path.abspath("%s/../build/lib.%s-%s" % ( + test_path, distutils.util.get_platform(), sys.version[:3])) + if os.access(build_path, os.F_OK): + sys.path.insert(1, build_path) + + os.environ[ "PYTHONPATH" ] = build_path + ":.:" + os.environ.get("PYTHONPATH", "") + -from sonLib.bioio import * + from sonLib.bioio import parseSuiteTestOptions -def workflowRootPath(): - """Function for finding external location. - """ - import jobTree.scriptTree.target - i = os.path.abspath(jobTree.scriptTree.target.__file__) - return os.path.split(os.path.split(i)[0])[0] + runner = unittest.TextTestRunner() + for moduleDir, moduleName in names: + os.chdir(moduleDir) + testClass = (__import__( moduleName )).TestCase + suite = unittest.makeSuite( testClass, "test" ) + i = runner.run(suite) + os.chdir("..") + +if __name__ == '__main__': + import sys + sys.exit(main( sys.argv[1:] )) diff --git a/test/scriptTree/scriptTreeTest.py b/test/scriptTree/scriptTreeTest.py index 9603db1..ee5085d 100644 --- a/test/scriptTree/scriptTreeTest.py +++ b/test/scriptTree/scriptTreeTest.py @@ -9,7 +9,7 @@ from sonLib.bioio import parseSuiteTestOptions from sonLib.bioio import system from sonLib.bioio import getTempDirectory -from jobTree.test.jobTree.jobTreeTest import runJobTreeStatusAndFailIfNotComplete +from jobTreeTest import runJobTreeStatusAndFailIfNotComplete class TestCase(unittest.TestCase): diff --git a/test/sort/sortTest.py b/test/sort/sortTest.py index 307826b..652a1ea 100644 --- a/test/sort/sortTest.py +++ b/test/sort/sortTest.py @@ -16,7 +16,7 @@ from jobTree.src.common import parasolIsInstalled, gridEngineIsInstalled -from jobTree.test.sort.lib import merge, sort, copySubRangeOfFile, getMidPoint +from sort.lib import merge, sort, copySubRangeOfFile, getMidPoint class TestCase(unittest.TestCase): def setUp(self): diff --git a/test/unit/config_test.py b/test/unit/config_test.py new file mode 100644 index 0000000..c8a47d2 --- /dev/null +++ b/test/unit/config_test.py @@ -0,0 +1,11 @@ + + +#REMOTE_SSH is for network testing. Will assume the following: +# - Password free SSH +# - The testing directory is mounted on NFS +# - The testing directory has the same path on both machines +# - Both machine have the same Python install path +REMOTE_SSH=None + +DEFAULT_DB="file://data_dir" +DEFAULT_EXE="process" \ No newline at end of file diff --git a/test/unit/test_chain.py b/test/unit/test_chain.py new file mode 100644 index 0000000..6a7ce2a --- /dev/null +++ b/test/unit/test_chain.py @@ -0,0 +1,106 @@ +import unittest +import os +import sys +import time +import subprocess +import shutil +import jobTree.manage +import jobTree +import jobTree.db +import random +import config_test + +__manifest__ = [ "test_chain.py" ] + +class OPChild(jobTree.Target): + def __init__(self, val): + self.val = val + + def run(self): + time.sleep(random.randint(5,15)) + out = self.createTable("test") + out.emit(self.val, 'test_%s' % (self.val)) + out.close() + +class OPFollow(jobTree.Target): + def __init__(self, src, val): + self.src = src + self.val = val + + def run(self): + time.sleep(random.randint(5,15)) + table = self.openTable(os.path.join(self.src, "test")) + assert table.hasKey(self.val) + + out = self.createTable("test") + out.emit(self.val, 'test_%s' % (self.val)) + out.close() + +class OPFollow2(jobTree.Target): + def __init__(self, src1, val1, src2, val2): + self.src1 = src1 + self.val1 = val1 + self.src2 = src2 + self.val2 = val2 + + def run(self): + time.sleep(random.randint(5,15)) + table1 = self.openTable(os.path.join(self.src1, "test")) + assert table1.hasKey(self.val1) + + table2 = self.openTable(os.path.join(self.src2, "test")) + assert table2.hasKey(self.val2) + + + +class Submission(jobTree.SubmitTarget): + def run(self, params): + print "submitted:", params + + self.addChildTarget('child_a', OPChild('a')) + self.addChildTarget('child_b', OPChild('b')) + self.addChildTarget('child_c', OPChild('c')) + + self.addFollowTarget('child_a_a', OPFollow('child_a', 'a'), depends='child_a') + self.addFollowTarget('child_b_a', OPFollow('child_b', 'b'), depends='child_b') + + self.addFollowTarget('child_a_a_a', OPFollow('child_a_a', 'a'), depends="child_a_a") + self.addFollowTarget('child_b_a_a', OPFollow('child_b_a', 'b'), depends="child_b_a") + + self.addFollowTarget('follow_a', OPFollow2('child_a', 'a', 'child_a_a', 'a'), depends=['child_a', 'child_a_a']) + self.addFollowTarget('follow_b', OPFollow2('child_a', 'a', 'child_b_a', 'b'), depends=['child_a', 'child_b_a']) + + +class TestCase(unittest.TestCase): + def test_submit(self): + config = jobTree.manage.Config(config_test.DEFAULT_DB, 'process', workdir="tmp_dir") + manager = jobTree.manage.Manager(config) + instance = manager.submit('tableTest', 'test_chain.Submission', {}) + manager.wait(instance) + + db = jobTree.db.connect(config_test.DEFAULT_DB) + for table in db.listTables(instance): + if table.table.endswith("@error"): + hasError = False + for key, val in db.listKeyValue(table): + print key, val + hasError = True + assert not hasError + + def tearDown(self): + return + try: + shutil.rmtree( "tmp_dir" ) + except OSError: + pass + try: + shutil.rmtree( "data_dir" ) + except OSError: + pass + +def main(): + sys.argv = sys.argv[:1] + unittest.main() + +if __name__ == '__main__': + main() diff --git a/test/unit/test_dbi.py b/test/unit/test_dbi.py new file mode 100644 index 0000000..85dfce3 --- /dev/null +++ b/test/unit/test_dbi.py @@ -0,0 +1,63 @@ + +import unittest +import jobTree.db +import config_test +import uuid +import filecmp +import os + +class TestCase(unittest.TestCase): + def test_dbi(self): + print os.getcwd() + instance = str(uuid.uuid4()) + db = jobTree.db.connect(config_test.DEFAULT_DB) + + db.createInstance(instance, {"info" : "test"} ) + + table_a_ref = jobTree.db.TableRef(instance, "/test1") + db.createTable(table_a_ref, { "info" : "other"} ) + + db.addData(table_a_ref, "key_1", {"data" : "key_1"}) + db.addData(table_a_ref, "key_2", {"data" : "key_2"}) + + db.flush() + + i = {} + for key in db.listKeys(table_a_ref): + i[key] = True + + assert "key_1" in i + assert "key_2" in i + + count = 0 + for key, val in db.listKeyValue(table_a_ref): + assert val['data'] == key + count += 1 + assert count == 2 + + print db.hasTable(table_a_ref) + assert db.hasTable(table_a_ref) == True + + db.deleteTable(table_a_ref) + + assert db.hasTable(table_a_ref)==False + + table_b_ref = jobTree.db.TableRef(instance, "/test_b") + db.createTable(table_b_ref, {}) + db.addData(table_b_ref, "key_1", {}) + db.copyTo("test_file.data", table_b_ref, "key_1", "test_file.data") + + db.flush() + + db.copyFrom("tmp.out", table_b_ref, "key_1", "test_file.data") + + assert filecmp.cmp("tmp.out", "test_file.data") + + assert db.hasAttachment(table_b_ref, "key_1", "test_file.data") + assert db.hasAttachment(table_b_ref, "key_1", "blabla.py") == False + + assert db.listAttachments(table_b_ref, "key_1") == ["test_file.data"] + + + db.deleteInstance(instance) + diff --git a/test/unit/test_file.data b/test/unit/test_file.data new file mode 100644 index 0000000..557db03 --- /dev/null +++ b/test/unit/test_file.data @@ -0,0 +1 @@ +Hello World diff --git a/test/utilities/statsTest.py b/test/utilities/statsTest.py index f4f35cb..089a093 100644 --- a/test/utilities/statsTest.py +++ b/test/utilities/statsTest.py @@ -11,7 +11,7 @@ from sonLib.bioio import getTempDirectory from sonLib.bioio import getTempFile -from jobTree.test.sort.sortTest import makeFileToSort +from sort.sortTest import makeFileToSort class TestCase(unittest.TestCase): def setUp(self):