Skip to content
This repository has been archived by the owner on Mar 17, 2023. It is now read-only.

Commit

Permalink
Merge pull request #12 from miguelpmachado/MPM_20200723
Browse files Browse the repository at this point in the history
Threads as consumables
Index reference fasta file
Fixes #9
  • Loading branch information
giesselmann authored Jul 23, 2020
2 parents 05e0f5a + 86a7bbc commit 92b7676
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 4 deletions.
3 changes: 2 additions & 1 deletion docs/usage/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ Following the steps in the [general](general.md) workflow documentation, the usa
**-j or --jobs or --core**
: In cluster mode the maximum number of jobs submitted to the queue.


**--resources threads=N**
: \[OPTIONAL\] In cluster mode, define threads as consumable resources that shall constrain the scheduling to a maximum N threads in usage. This can be specified in *config.yaml* file found inside the *profile* directory.
5 changes: 4 additions & 1 deletion profiles/slurm/config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
jobs: 8
resources: threads=8

## Uncomment to use threads/CPUs as consumable resources
# resources: threads=8

shadow-prefix: /tmp/nanopype_snakemake
latency-wait: 60
restart-times: 1
Expand Down
36 changes: 35 additions & 1 deletion rules/alignment.smk
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ rule minimap2:
threads: config['threads_alignment']
group: "minimap2"
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, threads, attempt: int((1.0 + (0.2 * (attempt - 1))) * (config['memory']['minimap2'][0] + config['memory']['minimap2'][1] * threads)),
time_min = lambda wildcards, threads, attempt: int((960 / threads) * attempt * config['runtime']['minimap2']) # 60 min / 16 threads
singularity:
Expand All @@ -91,6 +92,7 @@ rule graphmap2:
threads: config['threads_alignment']
group: "graphmap2"
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, threads, attempt: int((1.0 + (0.2 * (attempt - 1))) * (config['memory']['graphmap2'][0] + config['memory']['graphmap2'][1] * threads)),
time_min = lambda wildcards, threads, attempt: int((1440 / threads) * attempt * config['runtime']['graphmap2']), # 90 min / 16 threads
singularity:
Expand Down Expand Up @@ -119,12 +121,14 @@ rule ngmlr:
sequence = lambda wildcards: get_sequence_batch(wildcards, config),
reference = lambda wildcards: config['references'][wildcards.reference]['genome'],
index = lambda wildcards : directory(os.path.dirname(config['references'][wildcards.reference]['genome'])),
index_flag = lambda wildcards: config['references'][wildcards.reference]['genome'] + '.ngm'
index_flag = lambda wildcards: config['references'][wildcards.reference]['genome'] + '.ngm',
fasta_fai = lambda wildcards: config['references'][wildcards.reference]['genome'] + '.fai'
output:
pipe("alignments/ngmlr/{sequence_workflow}/batches/{tag, [^\/]*}/{runname, [^.\/]*}/{batch, [^.]*}.{reference}.sam")
threads: config['threads_alignment']
group: "ngmlr"
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, threads, attempt: int((1.0 + (0.2 * (attempt - 1))) * (config['memory']['ngmlr'][0] + config['memory']['ngmlr'][1] * threads)),
time_min = lambda wildcards, threads, attempt: int((5760 / threads) * attempt * config['runtime']['ngmlr']) # 360 min / 16 threads
singularity:
Expand All @@ -148,6 +152,25 @@ rule ngmlr_index:
touch {output.index}
"""


# Samtools index fasta
rule samtools_index_fasta:
input:
fasta = "{reference}.{ext}"
output:
index = "{reference}.{ext, (fa|fasta)}.fai"
shadow: 'minimal'
threads: config.get('threads_samtools') or 1
resources:
threads = lambda wildcards, threads: threads
singularity:
"docker://nanopype/alignment:{tag}".format(tag=config['version']['tag'])
shell:
"""
{config[bin_singularity][samtools]} faidx {input.fasta}
"""


# sam to bam conversion and RG tag
rule aligner_sam2bam:
input:
Expand All @@ -158,6 +181,7 @@ rule aligner_sam2bam:
shadow: "minimal"
threads: 1
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, attempt: int((1.0 + (0.2 * (attempt - 1))) * 5000)
singularity:
"docker://nanopype/alignment:{tag}".format(tag=config['version']['tag'])
Expand All @@ -184,6 +208,8 @@ rule aligner_merge_batches:
bam = "alignments/{aligner, [^.\/]*}/{sequence_workflow}/batches/{tag, [^\/]*}/{runname, [^.\/]*}.{reference, [^.]*}.bam",
bai = "alignments/{aligner, [^.\/]*}/{sequence_workflow}/batches/{tag, [^\/]*}/{runname, [^.\/]*}.{reference, [^.]*}.bam.bai"
threads: config.get('threads_samtools') or 1
resources:
threads = lambda wildcards, threads: threads,
params:
input_prefix = lambda wildcards, input : input.bam[:-4]
singularity:
Expand Down Expand Up @@ -221,6 +247,8 @@ rule aligner_merge_tag:
bam = "alignments/{aligner, [^.\/]*}/{sequence_workflow, ((?!batches).)*}/{tag, [^\/]*}.{reference, [^.]*}.bam",
bai = "alignments/{aligner, [^.\/]*}/{sequence_workflow, ((?!batches).)*}/{tag, [^\/]*}.{reference, [^.]*}.bam.bai"
threads: config.get('threads_samtools') or 1
resources:
threads = lambda wildcards, threads: threads,
params:
input_prefix = lambda wildcards, input : input.bam[:-4]
singularity:
Expand All @@ -241,6 +269,8 @@ rule aligner_1D2:
"alignments/{aligner}/{sequence_workflow}/batches/{tag}/{runname}.{reference}.bam"
output:
"alignments/{aligner, [^.\/]*}/{sequence_workflow}/batches/{tag, [^\/]*}/{runname, [^.\/]*}.{reference, [^.]*}.1D2.tsv"
resources:
threads = lambda wildcards, threads: threads,
params:
buffer = 200,
tolerance = 200
Expand All @@ -258,6 +288,8 @@ rule aligner_stats:
output:
"alignments/{aligner, [^.\/]*}/{sequence_workflow}/batches/{tag, [^\/]*}/{runname, [^.\/]*}.{reference, [^.]*}.hdf5"
threads: config.get('threads_samtools') or 1
resources:
threads = lambda wildcards, threads: threads,
singularity:
"docker://nanopype/alignment:{tag}".format(tag=config['version']['tag'])
shell:
Expand All @@ -274,6 +306,8 @@ rule aligner_coverage:
bedGraph = "alignments/{aligner, [^.\/]*}/{sequence_workflow, ((?!batches).)*}/{tag, [^\/]*}.{reference, [^.]*}.bedGraph",
bw = "alignments/{aligner, [^.\/]*}/{sequence_workflow, ((?!batches).)*}/{tag, [^\/]*}.{reference, [^.]*}.bw"
threads: config.get('threads_samtools') or 1
resources:
threads = lambda wildcards, threads: threads,
singularity:
"docker://nanopype/alignment:{tag}".format(tag=config['version']['tag'])
shell:
Expand Down
1 change: 1 addition & 0 deletions rules/asm.smk
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ rule flye:
fa = "assembly/flye/{sequence_workflow}/{tag}.fasta"
threads : config.get('threads_asm') or 1
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, threads, attempt: int((1.0 + (0.1 * (attempt - 1))) * (config['memory']['flye'][0] + config['memory']['flye'][1] * threads)),
time_min = lambda wildcards, threads, attempt: int((576000 / threads) * attempt * config['runtime']['flye']) # 120 h / 80 threads
params:
Expand Down
3 changes: 3 additions & 0 deletions rules/basecalling.smk
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ rule albacore:
shadow: "shallow"
threads: config['threads_basecalling']
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, threads, attempt: int((1.0 + (0.1 * (attempt - 1))) * (config['memory']['albacore'][0] + config['memory']['albacore'][1] * threads)),
time_min = lambda wildcards, threads, attempt: int((960 / threads) * attempt * config['runtime']['albacore']) # 60 min / 16 threads
params:
Expand Down Expand Up @@ -102,6 +103,7 @@ rule guppy:
shadow: "shallow"
threads: config['threads_basecalling']
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, threads, attempt: int((1.0 + (0.1 * (attempt - 1))) * (config['memory']['guppy_basecaller'][0] + config['memory']['guppy_basecaller'][1] * threads)),
time_min = lambda wildcards, threads, attempt: int((1440 / threads) * attempt * config['runtime']['guppy_basecaller']), # 90 min / 16 threads
GPU = 1
Expand Down Expand Up @@ -143,6 +145,7 @@ rule flappie:
shadow: "shallow"
threads: config['threads_basecalling']
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, threads, attempt: int((1.0 + (0.1 * (attempt - 1))) * (config['memory']['flappie'][0] + config['memory']['flappie'][1] * threads)),
time_min = lambda wildcards, threads, attempt: int((5760 / threads) * attempt * config['runtime']['flappie']) # 360 min / 16 threads
params:
Expand Down
3 changes: 3 additions & 0 deletions rules/demux.smk
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ rule deepbinner:
shadow: "minimal"
threads: config['threads_demux']
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, threads, attempt: int((1.0 + (0.1 * (attempt - 1))) * (config['memory']['deepbinner'][0] + config['memory']['deepbinner'][1] * threads)),
time_min = lambda wildcards, threads, attempt: int((960 / threads) * attempt * config['runtime']['deepbinner']) # 60 min / 16 threads
singularity:
Expand All @@ -79,6 +80,7 @@ checkpoint guppy_barcode_batches:
batches = directory("demux/guppy/batches/{runname}")
threads: config['threads_demux']
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, threads, attempt: int((1.0 + (0.1 * (attempt - 1))) * (config['memory']['guppy_barcoder'][0] + config['memory']['guppy_barcoder'][1] * threads)),
time_min = lambda wildcards, threads, attempt: int((960 / threads) * attempt * config['runtime']['guppy_barcoder']) # 60 min / 16 threads
params:
Expand All @@ -99,6 +101,7 @@ checkpoint guppy_barcode:
barcodes = directory("demux/guppy/barcodes/{runname}")
threads: config['threads_demux']
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, threads, attempt: int((1.0 + (0.1 * (attempt - 1))) * (config['memory']['guppy_barcoder'][0] + config['memory']['guppy_barcoder'][1] * threads)),
time_min = lambda wildcards, threads, attempt: int((960 / threads) * attempt * config['runtime']['guppy_barcoder']) # 60 min / 16 threads
params:
Expand Down
19 changes: 18 additions & 1 deletion rules/methylation.smk
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,14 @@ rule methylation_nanopolish:
sequences = lambda wildcards : get_sequence_batch(wildcards, config),
bam = lambda wildcards : get_alignment_batch(wildcards, config),
bai = lambda wildcards : get_alignment_batch(wildcards, config) + '.bai',
reference = lambda wildcards: config['references'][wildcards.reference]['genome']
reference = lambda wildcards: config['references'][wildcards.reference]['genome'],
fasta_fai = lambda wildcards: config['references'][wildcards.reference]['genome'] + '.fai'
output:
"methylation/nanopolish/{aligner, [^.\/]*}/{sequence_workflow, ((?!batches).)*}/batches/{tag, [^\/]*}/{runname, [^.\/]*}/{batch, [^.]*}.{reference, [^.\/]*}.tsv.gz"
shadow: "shallow"
threads: config['threads_methylation']
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, input, threads, attempt: int((1.0 + (0.1 * (attempt - 1))) * (config['memory']['nanopolish'][0] + config['memory']['nanopolish'][1] * threads)),
time_min = lambda wildcards, input, threads, attempt: int((960 / threads) * attempt * config['runtime']['nanopolish']) # 60 min / 16 threads
params:
Expand Down Expand Up @@ -113,6 +115,7 @@ rule methylation_flappie:
shadow: "minimal"
threads: 1
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, input, threads, attempt: int((1.0 + (0.1 * (attempt - 1))) * (8000 + 500 * threads)),
time_min = lambda wildcards, input, threads, attempt: int((15 / threads) * attempt) # 15 min / 1 thread
singularity:
Expand All @@ -133,6 +136,7 @@ rule methylation_guppy:
shadow: "minimal"
threads: 1
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, input, threads, attempt: int((1.0 + (0.1 * (attempt - 1))) * (5000 + 500 * threads)),
time_min = lambda wildcards, input, threads, attempt: int((15 / threads) * attempt) # 15 min / 1 thread
singularity:
Expand Down Expand Up @@ -178,6 +182,8 @@ rule methylation_frequencies:
"methylation/{methylation_caller}/{aligner}/{sequence_workflow}/{tag}.{reference}.fofn"
output:
"methylation/{methylation_caller, [^.\/]*}/{aligner, [^.\/]*}/{sequence_workflow, ((?!batches).)*}/{tag, [^\/]*}.{reference, [^.\/]*}.frequencies.tsv.gz"
resources:
threads = lambda wildcards, threads: threads,
params:
threshold = lambda wildcards : config['methylation_nanopolish_logp_threshold'] if wildcards.methylation_caller == 'nanopolish' else config['methylation_flappie_qval_threshold'] if wildcards.methylation_caller == 'flappie' else config['methylation_guppy_prob_threshold'] if wildcards.methylation_caller == 'guppy' else 0
singularity:
Expand All @@ -193,6 +199,8 @@ rule methylation_bedGraph:
"methylation/{methylation_caller}/{aligner}/{sequence_workflow}/{tag}.{reference}.frequencies.tsv.gz"
output:
"methylation/{methylation_caller, [^.\/]*}/{aligner, [^.\/]*}/{sequence_workflow, ((?!batches).)*}/{tag, [^\/]*}.{coverage, [^.\/]*}.{reference, [^.\/]*}.bedGraph"
resources:
threads = lambda wildcards, threads: threads,
params:
methylation_min_coverage = lambda wildcards : get_min_coverage(wildcards)
singularity:
Expand All @@ -209,6 +217,8 @@ rule methylation_bigwig:
chr_sizes = lambda wildcards : config["references"][wildcards.reference]["chr_sizes"]
output:
"methylation/{methylation_caller, [^.\/]*}/{aligner, [^.\/]*}/{sequence_workflow, ((?!batches).)*}/{tag, [^\/]*}.{coverage, [^.\/]*}.{reference, [^.\/]*}.bw"
resources:
threads = lambda wildcards, threads: threads,
singularity:
"docker://nanopype/methylation:{tag}".format(tag=config['version']['tag'])
shell:
Expand All @@ -226,6 +236,7 @@ rule methylation_single_read:
bai = "methylation/{methylation_caller, [^.\/]*}/{aligner, [^.\/]*}/{sequence_workflow, ((?!batches).)*}/batches/{tag, [^\/]*}/{runname, [^.\/]*}/{batch, [^.]*}.{reference, [^.\/]*}.bam.bai"
threads: 1
resources:
threads = lambda wildcards, threads: threads,
mem_mb = 16000,
time_min = 15
params:
Expand All @@ -246,6 +257,8 @@ rule methylation_single_read_run:
output:
bam = "methylation/{methylation_caller, [^.\/]*}/{aligner, [^.\/]*}/{sequence_workflow, ((?!batches).)*}/batches/{tag, [^\/]*}/{runname, [^.\/]*}.{reference, [^.\/]*}.bam"
threads: config.get('threads_samtools') or 1
resources:
threads = lambda wildcards, threads: threads,
params:
input_prefix = lambda wildcards, input : input.fofn[:-5]
singularity:
Expand All @@ -267,6 +280,8 @@ rule methylation_single_read_tag:
output:
bam = "methylation/{methylation_caller, [^.\/]*}/{aligner, [^.\/]*}/{sequence_workflow, ((?!batches).)*}/{tag, [^\/]*}.{reference, [^.\/]*}.bam"
threads: config.get('threads_samtools') or 1
resources:
threads = lambda wildcards, threads: threads,
params:
input_prefix = lambda wildcards, input : input.fofn[:-5]
singularity:
Expand All @@ -288,6 +303,8 @@ rule methylation_1D2:
pairs = "alignments/{aligner}/{sequence_workflow}/batches/{tag}/{runname}.{reference}.1D2.tsv"
output:
"methylation/{methylation_caller, [^.\/]*}/{aligner, [^.\/]*}/{sequence_workflow, ((?!batches).)*}/batches/{tag, [^\/]*}/{runname, [^.\/]*}.{reference, [^.\/]*}.1D2.tsv.gz"
resources:
threads = lambda wildcards, threads: threads,
singularity:
"docker://nanopype/methylation:{tag}".format(tag=config['version']['tag'])
shell:
Expand Down
1 change: 1 addition & 0 deletions rules/storage.smk
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ rule storage_index_batch:
shadow: "shallow"
threads: 1
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, attempt: int((1.0 + (0.1 * (attempt - 1))) * 4000),
time_min = 15
shell:
Expand Down
5 changes: 5 additions & 0 deletions rules/sv.smk
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ rule sniffles:
shadow: "minimal"
threads: config['threads_sv']
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, threads, attempt: int((1.0 + (0.1 * (attempt - 1))) * (config['memory']['sniffles'][0] + config['memory']['sniffles'][1] * threads)),
time_min = lambda wildcards, threads, attempt: int((3840 / threads) * attempt * config['runtime']['sniffles']) # 240 min / 16 threads
singularity:
Expand All @@ -77,6 +78,8 @@ rule sv_compress:
output:
"sv/sniffles/{aligner, [^.\/]*}/{sequence_workflow, [^.\/]*}/{tag, [^\/]*}.{reference, [^.\/]*}.vcf.gz"
threads: 1
resources:
threads = lambda wildcards, threads: threads,
singularity:
"docker://nanopype/sv:{tag}".format(tag=config['version']['tag'])
shell:
Expand All @@ -94,6 +97,8 @@ rule strique:
"sv/strique/{aligner, [^\/]*}/{sequence_workflow, ((?!batches).)*}/batches/{tag, [^\/]*}/{runname, [^\/]*}/{batch, [^.\/]*}.{reference}.tsv"
shadow: "minimal"
threads: config['threads_sv']
resources:
threads = lambda wildcards, threads: threads,
params:
model = config['sv_STRique_model'] if 'sv_STRique_model' in config else '',
mod_model = '--mod_model {}'.format(config['sv_STRique_mod_model']) if 'sv_STRique_mod_model' in config else ''
Expand Down
2 changes: 2 additions & 0 deletions rules/transcript.smk
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ checkpoint pychopper:
shadow: "minimal"
threads: 1
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, threads, attempt: int((1.0 + (0.1 * (attempt - 1))) * (3000 + 1000 * threads)),
time_min = lambda wildcards, threads, attempt: int((1440 / threads) * attempt) # 90 min / 16 threads
singularity:
Expand All @@ -66,6 +67,7 @@ rule pinfish:
threads: config['threads_transcript']
shadow: 'minimal'
resources:
threads = lambda wildcards, threads: threads,
mem_mb = lambda wildcards, threads, attempt: int((1.0 + (0.1 * (attempt - 1))) * (config['memory']['pinfish'][0] + config['memory']['pinfish'][1] * threads)),
time_min = lambda wildcards, threads, attempt: int((1440 / threads) * attempt * config['runtime']['pinfish']) # 90 min / 16 threads
singularity:
Expand Down

0 comments on commit 92b7676

Please sign in to comment.