Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Commit

Permalink
Drop URL/TaskType from PypeTask
Browse files Browse the repository at this point in the history
  • Loading branch information
cdunn2001 committed Nov 27, 2016
1 parent e238ea1 commit 87b6262
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 48 deletions.
12 changes: 3 additions & 9 deletions falcon_kit/mains/get_read_ctg_map.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from __future__ import absolute_import
#from pypeflow.data import PypeLocalFile, makePypeLocalFile, fn
#from pypeflow.task import PypeTask, PypeThreadTaskBase, PypeTaskBase
#from pypeflow.controller import PypeWorkflow, PypeMPWorkflow, PypeThreadWorkflow
from pypeflow.simple_pwatcher_bridge import (PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase,
makePypeLocalFile, fn, PypeTask)
PypeThreadTaskBase = MyFakePypeThreadTaskBase
Expand Down Expand Up @@ -84,8 +81,7 @@ def get_read_ctg_map(rawread_dir, pread_dir, asm_dir):
task = PypeTask(
inputs = {'rawread_db': rawread_db},
outputs = {'rawread_id_file': rawread_id_file},
TaskType = PypeThreadTaskBase,
URL = 'task://localhost/dump_rawread_ids')
)
wf.addTask(task(dump_rawread_ids))

pread_db = makePypeLocalFile(os.path.join(pread_dir, 'preads.db'))
Expand All @@ -94,8 +90,7 @@ def get_read_ctg_map(rawread_dir, pread_dir, asm_dir):
task = PypeTask(
inputs = {'pread_db': pread_db},
outputs = {'pread_id_file': pread_id_file},
TaskType = PypeThreadTaskBase,
URL = 'task://localhost/dump_pread_ids' )
)
wf.addTask(task(dump_pread_ids))

wf.refreshTargets() # block
Expand All @@ -115,8 +110,7 @@ def get_read_ctg_map(rawread_dir, pread_dir, asm_dir):
task = PypeTask(
inputs = inputs,
outputs = {'read_to_contig_map': read_to_contig_map},
TaskType = PypeThreadTaskBase,
URL = 'task://localhost/get_ctg_read_map')
)
wf.addTask(task(generate_read_to_ctg_map))

wf.refreshTargets() # block
Expand Down
50 changes: 11 additions & 39 deletions falcon_kit/mains/run1.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from .. import run_support as support
from .. import bash, pype_tasks
from ..util.system import only_these_symlinks
#from pypeflow.pwatcher_bridge import PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase
#from pypeflow.data import makePypeLocalFile, fn
#from pypeflow.task import PypeTask
from pypeflow.simple_pwatcher_bridge import (PypeProcWatcherWorkflow, MyFakePypeThreadTaskBase,
makePypeLocalFile, fn, PypeTask)
import argparse
Expand Down Expand Up @@ -34,8 +31,6 @@ def create_daligner_tasks(basedir, scatter_fn):
make_daligner_task = PypeTask(inputs = inputs,
outputs = outputs,
parameters = parameters,
TaskType = MyFakePypeThreadTaskBase,
URL = URL,
wdir = wdir,
)
daligner_task = make_daligner_task(pype_tasks.task_run_daligner)
Expand All @@ -60,8 +55,6 @@ def create_merge_tasks(basedir, scatter_fn):
make_task = PypeTask(inputs = inputs,
outputs = outputs,
parameters = parameters,
TaskType = MyFakePypeThreadTaskBase,
URL = URL,
wdir = wdir,
)
task = make_task(pype_tasks.task_run_las_merge)
Expand All @@ -86,8 +79,6 @@ def create_consensus_tasks(basedir, scatter_fn):
make_c_task = PypeTask(inputs = inputs,
outputs = outputs,
parameters = parameters,
TaskType = MyFakePypeThreadTaskBase,
URL = URL,
wdir = wdir,
)
c_task = make_c_task(pype_tasks.task_run_consensus)
Expand All @@ -103,9 +94,7 @@ def create_merge_gather_task(wd, inputs):
outputs = {'las_fofn': las_fofn_plf,
'las_fopfn': las_fopfn_plf,
},
TaskType = MyFakePypeThreadTaskBase,
)
# URL = 'task://localhost/pmerge_gather')
task = make_task(pype_tasks.task_merge_gather)
return task, las_fofn_plf, las_fopfn_plf

Expand All @@ -116,8 +105,7 @@ def create_consensus_gather_task(wd, inputs):
make_cns_gather_task = PypeTask(
inputs = inputs, # consensus_out
outputs = {'preads_fofn': preads_fofn_plf},
TaskType = MyFakePypeThreadTaskBase,
URL = 'task://localhost/cns_gather' )
)
task = make_cns_gather_task(pype_tasks.task_cns_gather)
return task, preads_fofn_plf

Expand Down Expand Up @@ -170,7 +158,7 @@ def run(wf, config,
make_fofn_abs_task = PypeTask(inputs = {'i_fofn': input_fofn_plf},
outputs = {'o_fofn': rawread_fofn_plf},
parameters = {},
TaskType = MyFakePypeThreadTaskBase)
)
fofn_abs_task = make_fofn_abs_task(pype_tasks.task_make_fofn_abs_raw)

wf.addTasks([fofn_abs_task])
Expand All @@ -195,7 +183,7 @@ def run(wf, config,
'run_jobs': run_jobs,
},
parameters = parameters,
TaskType = MyFakePypeThreadTaskBase)
)
build_rdb_task = make_build_rdb_task(pype_tasks.task_build_rdb)

wf.addTasks([build_rdb_task])
Expand All @@ -218,8 +206,6 @@ def run(wf, config,
'pread_aln': False,
'config': config,
},
TaskType = MyFakePypeThreadTaskBase,
URL = 'task://localhost/raw-daligner-scatter'
)
task = make_daligner_scatter(pype_tasks.task_daligner_scatter)
wf.addTask(task)
Expand All @@ -237,8 +223,7 @@ def run(wf, config,
inputs = daligner_out,
outputs = {'gathered': r_gathered_las_plf},
parameters = parameters,
TaskType = MyFakePypeThreadTaskBase,
URL = 'task://localhost/rda_check' )
)
check_r_da_task = make_daligner_gather(pype_tasks.task_daligner_gather)
wf.addTask(check_r_da_task)
wf.refreshTargets(exitOnFailure=exitOnFailure)
Expand All @@ -257,8 +242,6 @@ def run(wf, config,
'db_prefix': 'raw_reads',
'config': config,
},
TaskType = MyFakePypeThreadTaskBase,
URL = 'task://localhost/raw-merge-scatter'
)
task = make_task(pype_tasks.task_merge_scatter)
wf.addTask(task)
Expand Down Expand Up @@ -286,8 +269,6 @@ def run(wf, config,
'db_prefix': 'raw_reads',
'config': config,
},
TaskType = MyFakePypeThreadTaskBase,
URL = 'task://localhost/raw-cns-scatter'
)
task = make_task(pype_tasks.task_consensus_scatter)
wf.addTask(task)
Expand All @@ -310,8 +291,7 @@ def run(wf, config,
'preads_fofn': preads_fofn_plf, },
outputs = {'pre_assembly_report': pre_assembly_report_plf, },
parameters = parameters,
TaskType = MyFakePypeThreadTaskBase,
URL = 'task://localhost/report_pre_assembly')
)
task = make_task(pype_tasks.task_report_pre_assembly)
wf.addTask(task)

Expand All @@ -330,7 +310,7 @@ def run(wf, config,
make_fofn_abs_task = PypeTask(inputs = {'i_fofn': rawread_fofn_plf},
outputs = {'o_fofn': preads_fofn_plf},
parameters = {},
TaskType = MyFakePypeThreadTaskBase)
)
fofn_abs_task = make_fofn_abs_task(pype_tasks.task_make_fofn_abs_preads)
wf.addTasks([fofn_abs_task])
wf.refreshTargets([fofn_abs_task])
Expand All @@ -349,8 +329,7 @@ def run(wf, config,
'run_jobs': run_jobs,
},
parameters = parameters,
TaskType = MyFakePypeThreadTaskBase,
URL = 'task://localhost/build_pdb')
)
build_pdb_task = make_build_pdb_task(pype_tasks.task_build_pdb)

wf.addTasks([build_pdb_task])
Expand All @@ -376,8 +355,6 @@ def run(wf, config,
'pread_aln': True,
'config': config,
},
TaskType = MyFakePypeThreadTaskBase,
URL = 'task://localhost/preads-daligner-scatter'
)
task = make_daligner_scatter(pype_tasks.task_daligner_scatter)
wf.addTask(task)
Expand All @@ -394,8 +371,7 @@ def run(wf, config,
inputs = daligner_out,
outputs = {'gathered': p_gathered_las_plf},
parameters = parameters,
TaskType = MyFakePypeThreadTaskBase,
URL = 'task://localhost/pda_check' )
)
check_p_da_task = make_daligner_gather(pype_tasks.task_daligner_gather)
wf.addTask(check_p_da_task)
wf.refreshTargets(exitOnFailure=exitOnFailure)
Expand All @@ -415,9 +391,7 @@ def run(wf, config,
'db_prefix': 'preads',
'config': config,
},
TaskType = MyFakePypeThreadTaskBase,
#URL = 'task://localhost/preads-merge-scatter'
)
)
task = make_task(pype_tasks.task_merge_scatter)
wf.addTask(task)
wf.refreshTargets(exitOnFailure=exitOnFailure)
Expand Down Expand Up @@ -447,8 +421,7 @@ def run(wf, config,
'config': config,
'sge_option': config['sge_option_fc'],
},
TaskType = MyFakePypeThreadTaskBase,
URL = 'task://localhost/db2falcon' )
)
wf.addTask(make_run_db2falcon(pype_tasks.task_run_db2falcon))

falcon_asm_done = makePypeLocalFile( os.path.join(falcon_asm_dir, 'falcon_asm_done'))
Expand All @@ -463,8 +436,7 @@ def run(wf, config,
'pread_dir': pread_dir,
'sge_option': config['sge_option_fc'],
},
TaskType = MyFakePypeThreadTaskBase,
URL = 'task://localhost/falcon_asm' )
)
wf.addTask(make_run_falcon_asm(pype_tasks.task_run_falcon_asm))
wf.refreshTargets()

Expand Down

0 comments on commit 87b6262

Please sign in to comment.