Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add CI for --skip_markduplicates #412

Merged
merged 13 commits into from
Aug 4, 2021
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
# Nextflow versions: check pipeline minimum and current latest
nxf_ver: ['21.04.0', '']
engine: ['docker']
test: ['default', 'aligner', 'gatk4_spark', 'targeted', 'tumor_normal_pair', 'variant_calling', 'annotation']
test: ['default', 'aligner', 'gatk4_spark', 'targeted', 'skip_markduplicates', 'tumor_normal_pair', 'variant_calling', 'annotation']
steps:
- name: Check out pipeline code
uses: actions/checkout@v2
Expand Down
5 changes: 5 additions & 0 deletions conf/modules.config
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ params {
args2 = 'sort'
publish_files = false
}
'samtools_index_mapping' {
publish_by_meta = true
publish_files = ['bai':'mapped']
publish_dir = 'preprocessing'
}
// MARKDUPLICATES
'markduplicates' {
args = 'REMOVE_DUPLICATES=false VALIDATION_STRINGENCY=LENIENT'
Expand Down
7 changes: 7 additions & 0 deletions conf/test.config
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,16 @@ profiles {
pair {
params.input = 'https://raw.githubusercontent.com/nf-core/test-datasets/sarek/testdata/csv/tiny-manta-https.csv'
}
prepare_recalibration {
params.input = 'https://raw.githubusercontent.com/nf-core/test-datasets/sarek/testdata/csv/tiny-mapped-normal-https.csv'
params.step = 'prepare_recalibration'
}
save_bam_mapped {
params.save_bam_mapped = true
}
skip_markduplicates {
params.skip_markduplicates = true
}
split_fastq {
params.split_fastq = 2
}
Expand Down
2 changes: 1 addition & 1 deletion modules/local/gatk4/baserecalibrator/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ process GATK4_BASERECALIBRATOR {
path fasta
path fai
path dict
path knownSitesTBI
path knownSites
path knownSites_tbi

output:
tuple val(meta), path("*.table"), emit: table
Expand Down
8 changes: 4 additions & 4 deletions subworkflows/local/build_indices.nf
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ workflow BUILD_INDICES {

result_dbsnp_tbi = Channel.empty()
version_dbsnp_tbi = Channel.empty()
if (!(params.dbsnp_tbi) && params.dbsnp && ('mapping' in step || 'prepare_recalibration' in step || 'controlfreec' in tools || 'haplotypecaller' in tools|| 'mutect2' in tools || 'tnscope' in tools)) {
if (!(params.dbsnp_tbi) && params.dbsnp && ('mapping' in step || 'preparerecalibration' in step || 'controlfreec' in tools || 'haplotypecaller' in tools|| 'mutect2' in tools || 'tnscope' in tools)) {
dbsnp_id = dbsnp.map {it -> [[id:"$it.baseName"], it]}
(result_dbsnp_tbi, version_dbsnp_tbi) = TABIX_DBSNP(dbsnp_id)
result_dbsnp_tbi = result_dbsnp_tbi.map {meta, tbi -> [tbi]}
Expand All @@ -81,14 +81,14 @@ workflow BUILD_INDICES {

result_germline_resource_tbi = Channel.empty()
version_germline_resource_tbi = Channel.empty()
if (!(params.germline_resource_tbi) && params.germline_resource && 'mutect2' in tools){
if (!(params.germline_resource_tbi) && params.germline_resource && 'mutect2' in tools) {
germline_resource_id = germline_resource.map {it -> [[id:"$it.baseName"], it]}
(result_germline_resource_tbi, version_germline_resource_tbi) = TABIX_GERMLINE_RESOURCE(germline_resource_id)
}

result_known_indels_tbi = Channel.empty()
version_known_indels_tbi = Channel.empty()
if (!(params.known_indels_tbi) && params.known_indels && ('mapping' in step || 'prepare_recalibration' in step)){
if (!(params.known_indels_tbi) && params.known_indels && ('mapping' in step || 'preparerecalibration' in step)) {
known_indels_id = known_indels.map {it -> [[id:"$it.baseName"], it]}
(result_known_indels_tbi, version_known_indels_tbi) = TABIX_KNOWN_INDELS(known_indels_id)
result_known_indels_tbi = result_known_indels_tbi.map {meta, tbi -> [tbi]}
Expand All @@ -101,7 +101,7 @@ workflow BUILD_INDICES {

result_pon_tbi = Channel.empty()
version_pon_tbi = Channel.empty()
if (!(params.pon_tbi) && params.pon && ('tnscope' in tools || 'mutect2' in tools)){
if (!(params.pon_tbi) && params.pon && ('tnscope' in tools || 'mutect2' in tools)) {
pon_id = pon.map {it -> [[id:"$it.baseName"], it]}
(result_pon_tbi, version_pon_tbi) = TABIX_PON(pon_id)
}
Expand Down
6 changes: 3 additions & 3 deletions subworkflows/local/mapping_csv.nf
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

workflow MAPPING_CSV {
take:
bam_mapped // channel: [mandatory] meta, bam, bai
bam_indexed // channel: [mandatory] meta, bam, bai
save_bam_mapped // boolean: [mandatory] save_bam_mapped
skip_markduplicates // boolean: [mandatory] skip_markduplicates

main:
if (save_bam_mapped) {
csv_bam_mapped = bam_mapped.map { meta, bam, bai -> [meta] }
csv_bam_mapped = bam_indexed.map { meta, bam, bai -> [meta] }
// Creating csv files to restart from this step
csv_bam_mapped.collectFile(storeDir: "${params.outdir}/preprocessing/csv") { meta ->
patient = meta.patient[0]
Expand All @@ -26,7 +26,7 @@ workflow MAPPING_CSV {
}

if (skip_markduplicates) {
csv_bam_mapped = bam_mapped.map { meta, bam, bai -> [meta] }
csv_bam_mapped = bam_indexed.map { meta, bam, bai -> [meta] }
// Creating csv files to restart from this step
csv_bam_mapped.collectFile(storeDir: "${params.outdir}/preprocessing/csv") { meta ->
patient = meta.patient[0]
Expand Down
57 changes: 38 additions & 19 deletions subworkflows/nf-core/mapping.nf
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,46 @@
========================================================================================
*/

params.seqkit_split2_options = [:]
params.bwamem1_mem_options = [:]
params.bwamem1_mem_tumor_options = [:]
params.bwamem2_mem_options = [:]
params.bwamem2_mem_tumor_options = [:]
params.merge_bam_options = [:]
params.samtools_index_options = [:]
params.seqkit_split2_options = [:]

include { SEQKIT_SPLIT2 } from '../../modules/nf-core/modules/seqkit/split2/main.nf' addParams(options: params.seqkit_split2_options)
include { BWAMEM2_MEM as BWAMEM2_MEM_T } from '../../modules/local/bwamem2/mem/main' addParams(options: params.bwamem2_mem_tumor_options)
include { BWAMEM2_MEM } from '../../modules/local/bwamem2/mem/main' addParams(options: params.bwamem2_mem_options)
include { BWA_MEM as BWAMEM1_MEM } from '../../modules/local/bwa/mem/main' addParams(options: params.bwamem1_mem_options)
include { BWA_MEM as BWAMEM1_MEM_T } from '../../modules/local/bwa/mem/main' addParams(options: params.bwamem1_mem_tumor_options)
include { BWAMEM2_MEM } from '../../modules/local/bwamem2/mem/main' addParams(options: params.bwamem2_mem_options)
include { BWAMEM2_MEM as BWAMEM2_MEM_T } from '../../modules/local/bwamem2/mem/main' addParams(options: params.bwamem2_mem_tumor_options)
include { SAMTOOLS_INDEX } from '../../modules/local/samtools/index/main' addParams(options: params.samtools_index_options)
include { SAMTOOLS_MERGE } from '../../modules/nf-core/modules/samtools/merge/main' addParams(options: params.merge_bam_options)
include { SEQKIT_SPLIT2 } from '../../modules/nf-core/modules/seqkit/split2/main.nf' addParams(options: params.seqkit_split2_options)

workflow MAPPING {
take:
aligner // string: [mandatory] "bwa-mem" or "bwa-mem2"
bwa // channel: [mandatory] bwa
fai // channel: [mandatory] fai
fasta // channel: [mandatory] fasta
reads_input // channel: [mandatory] meta, reads_input
aligner // string: [mandatory] "bwa-mem" or "bwa-mem2"
bwa // channel: [mandatory] bwa
fai // channel: [mandatory] fai
fasta // channel: [mandatory] fasta
reads_input // channel: [mandatory] meta, reads_input
skip_markduplicates // boolean: true/false

main:

bam_mapped_index = Channel.empty()
bam_reports = Channel.empty()

bam_indexed = Channel.empty()

if(params.split_fastq > 1){
if (params.split_fastq > 1) {
reads_input_split = SEQKIT_SPLIT2(reads_input).reads.map{
key, reads ->
//TODO maybe this can be replaced by a regex to include part_001 etc.

//sorts list of split fq files by :
//[R1.part_001, R2.part_001, R1.part_002, R2.part_002,R1.part_003, R2.part_003,...]
//TODO: determine whether it is possible to have an uneven number of parts, so remainder: true woud need to be used, I guess this could be possible for unfiltered reads, reads that don't have pairs etc.
return [key, reads.sort{ a,b -> a.getName().tokenize('.')[ a.getName().tokenize('.').size() - 3] <=> b.getName().tokenize('.')[ b.getName().tokenize('.').size() - 3]}
.collate(2)]
return [key, reads.sort{ a,b -> a.getName().tokenize('.')[ a.getName().tokenize('.').size() - 3] <=> b.getName().tokenize('.')[ b.getName().tokenize('.').size() - 3]}.collate(2)]
}.transpose()
}else{
} else {
reads_input_split = reads_input
}

Expand Down Expand Up @@ -77,14 +79,31 @@ workflow MAPPING {
bam_bwa.map{ meta, bam ->
meta.remove('read_group')
meta.id = meta.sample
// groupKey is to makes sure that the correct group can advance as soon as it is complete
// and not stall the workflow until all pieces are mapped
def groupKey = groupKey(meta, meta.numLanes * params.split_fastq)
tuple(groupKey, bam)
[meta, bam]
}.groupTuple() //groupKey above is somehow makes sure, the workflow doesn't stall until all pieces are mapped, but that the correct group can advance as soon as it is complete
}.groupTuple()
.set{bam_mapped}

// STEP 1.5: MERGING AND INDEXING BAM FROM MULTIPLE LANES // MarkDuplicates can take care of this
// MarkDuplicates can handle multiple BAMS as input, so no merging/indexing at this step
// Except if and only if skipping MarkDuplicates

if (skip_markduplicates) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, adding it here makes sense. I struggled finding a logical way on how to sort all the different ways how to walk through to base recalibration

bam_mapped.branch{
single: it[1].size() == 1
multiple: it[1].size() > 1
}.set{ bam_to_merge }

SAMTOOLS_MERGE(bam_to_merge.multiple)
bam_merged = bam_to_merge.single.mix(SAMTOOLS_MERGE.out.bam)

SAMTOOLS_INDEX(bam_merged)
bam_indexed = bam_merged.join(SAMTOOLS_INDEX.out.bai)
}

emit:
bam = bam_mapped
bam = bam_mapped
bam_indexed = bam_indexed
}
24 changes: 8 additions & 16 deletions subworkflows/nf-core/prepare_recalibration.nf
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,23 @@ workflow PREPARE_RECALIBRATION {
known_sites // channel: [optional] known_sites
known_sites_tbi // channel: [optional] known_sites_tbi
no_intervals // value: [mandatory] no_intervals
known_indels
dbsnp

main:
cram_markduplicates.combine(intervals)
.map{ meta, cram, crai, intervals ->
new_meta = meta.clone()
new_meta.id = meta.sample + "_" + intervals.baseName
[new_meta, cram, crai, intervals]
}
.set{cram_markduplicates_intervals}
.map{ meta, cram, crai, intervals ->
new_meta = meta.clone()
new_meta.id = meta.sample + "_" + intervals.baseName
[new_meta, cram, crai, intervals]
}.set{cram_markduplicates_intervals}

if(use_gatk_spark){
if (use_gatk_spark) {
BASERECALIBRATOR_SPARK(cram_markduplicates_intervals, fasta, fai, dict, known_sites, known_sites_tbi)
table_baserecalibrator = BASERECALIBRATOR_SPARK.out.table
}else{
BASERECALIBRATOR(cram_markduplicates_intervals, fasta, fai, dict, known_sites_tbi, known_sites)
} else {
BASERECALIBRATOR(cram_markduplicates_intervals, fasta, fai, dict, known_sites, known_sites_tbi)
table_baserecalibrator = BASERECALIBRATOR.out.table
}

//num_intervals = intervals.toList().size.view() //Integer.valueOf()
//.view()
//println(intervals.toList().getClass()) //.value.getClass())

//STEP 3.5: MERGING RECALIBRATION TABLES
if (no_intervals) {
table_baserecalibrator.map { meta, table ->
Expand All @@ -62,7 +55,6 @@ workflow PREPARE_RECALIBRATION {

GATHERBQSRREPORTS(recaltable)
table_bqsr = GATHERBQSRREPORTS.out.table

}

emit:
Expand Down
35 changes: 13 additions & 22 deletions subworkflows/nf-core/qc_markduplicates.nf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ params.samtools_index_options = [:]
include { GATK4_MARKDUPLICATES } from '../../modules/local/gatk4/markduplicates/main' addParams(options: params.markduplicates_options)
include { GATK4_MARKDUPLICATES_SPARK } from '../../modules/local/gatk4/markduplicatesspark/main' addParams(options: params.markduplicatesspark_options)
include { GATK4_ESTIMATELIBRARYCOMPLEXITY } from '../../modules/local/gatk4/estimatelibrarycomplexity/main' addParams(options: params.estimatelibrarycomplexity_options)
include { SAMTOOLS_MERGE } from '../../modules/nf-core/modules/samtools/merge/main' addParams(options: params.merge_bam_options)
include { QUALIMAP_BAMQC } from '../../modules/local/qualimap/bamqc/main' addParams(options: params.qualimap_bamqc_options)
include { SAMTOOLS_STATS } from '../../modules/local/samtools/stats/main' addParams(options: params.samtools_stats_options)
include { SAMTOOLS_VIEW as SAMTOOLS_BAM_TO_CRAM } from '../../modules/local/samtools/view/main.nf' addParams(options: params.samtools_view_options)
Expand All @@ -25,51 +24,43 @@ include { SAMTOOLS_INDEX } from '../../modules/loca

workflow QC_MARKDUPLICATES {
take:
bam_mapped // channel: [mandatory] meta, bam, bai
bam_mapped // channel: [mandatory] meta, bam
bam_indexed // channel: [mandatory] meta, bam, bai
use_gatk_spark // value: [mandatory] use gatk spark
save_metrics // value: [mandatory] save metrics
fasta // channel: [mandatory] fasta
fai // channel: [mandatory] fai
dict // channel: [mandatory] dict
skip_markduplicates // boolean: true/false
skip_bamqc // boolean: true/false
skip_samtools // boolean: true/false
target_bed // channel: [optional] target_bed

main:

report_markduplicates = Channel.empty()
if(params.skip_markduplicates){

bam_mapped.branch{
single: it[1].size() == 1
multiple: it[1].size() > 1
}.set{ bam_to_merge }

SAMTOOLS_MERGE(bam_to_merge.multiple)
bam_merged = bam_to_merge.single.mix(SAMTOOLS_MERGE.out.bam)

SAMTOOLS_INDEX(bam_merged)
bam_markduplicates = bam_merged.join(SAMTOOLS_INDEX.out.bai)
cram_markduplicates = SAMTOOLS_BAM_TO_CRAM(bam_markduplicates, fasta, fai)
} else{

if (skip_markduplicates) {
bam_markduplicates = bam_indexed
SAMTOOLS_BAM_TO_CRAM(bam_markduplicates, fasta, fai)
cram_markduplicates = SAMTOOLS_BAM_TO_CRAM.out.cram
} else {
if (use_gatk_spark) {

//If BAMQC should be run on MD output, then don't use MDSpark to convert to cram, but use bam output instead
if(!skip_bamqc){
if (!skip_bamqc) {
GATK4_MARKDUPLICATES_SPARK(bam_mapped, fasta, fai, dict, "bam")
SAMTOOLS_INDEX(GATK4_MARKDUPLICATES_SPARK.out.output)
bam_markduplicates = GATK4_MARKDUPLICATES_SPARK.out.output.join(SAMTOOLS_INDEX.out.bai)

SAMTOOLS_BAM_TO_CRAM_SPARK(bam_markduplicates, fasta, fai)
cram_markduplicates = SAMTOOLS_BAM_TO_CRAM_SPARK.out.cram
}else{
} else {
GATK4_MARKDUPLICATES_SPARK(bam_mapped, fasta, fai, dict, "cram")
SAMTOOLS_INDEX(GATK4_MARKDUPLICATES_SPARK.out.output)
cram_markduplicates = GATK4_MARKDUPLICATES_SPARK.out.output.join(SAMTOOLS_INDEX.out.crai)
}

if(save_metrics){
if (save_metrics) {
GATK4_ESTIMATELIBRARYCOMPLEXITY(bam_mapped, fasta, fai, dict)
report_markduplicates = GATK4_ESTIMATELIBRARYCOMPLEXITY.out.metrics
}
Expand All @@ -88,12 +79,12 @@ workflow QC_MARKDUPLICATES {
//if !skip_markduplicates, then QC tools are run on duplicate marked bams
//After bamqc finishes, convert to cram for further analysis
qualimap_bamqc = Channel.empty()
if(!skip_bamqc){
if (!skip_bamqc && !skip_markduplicates) {
maxulysse marked this conversation as resolved.
Show resolved Hide resolved
//TODO: after adding CI tests, allow bamqc on mapped bams if no duplicate marking is done
QUALIMAP_BAMQC(bam_markduplicates, target_bed, params.target_bed)
qualimap_bamqc = QUALIMAP_BAMQC.out
}


samtools_stats = Channel.empty()
if (!skip_samtools) {
SAMTOOLS_STATS(cram_markduplicates, fasta)
Expand Down
3 changes: 3 additions & 0 deletions tests/test_annotation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
files:
- path: results/annotation/1234N/1234N_snpEff.ann.gz
- path: results/annotation/1234N/1234N_snpEff.ann.gz.tbi
- path: results/multiqc
- name: Run VEP
command: nextflow run main.nf -profile test,annotation,docker --tools vep
tags:
Expand All @@ -14,6 +15,7 @@
files:
- path: results/annotation/1234N/1234N_VEP.ann.gz
- path: results/annotation/1234N/1234N_VEP.ann.gz.tbi
- path: results/multiqc
- name: Run snpEff followed by VEP
command: nextflow run main.nf -profile test,annotation,docker --tools merge
tags:
Expand All @@ -26,3 +28,4 @@
- path: results/annotation/1234N/1234N_snpEff.ann.gz.tbi
- path: results/annotation/1234N/1234N_snpEff_VEP.ann.gz
- path: results/annotation/1234N/1234N_snpEff_VEP.ann.gz.tbi
- path: results/multiqc
Loading