Skip to content

Commit

Permalink
[AVOCADO-200] Feed assembled variants to INDEL realigner.
Browse files Browse the repository at this point in the history
Resolves bigdatagenomics#200. Depends on bigdatagenomics/adam#1363.

* Adds a ConsensusRealigner that uses the main Realigner to realign reads,
  which are then fed into the variant discovery engine with loose filters. Any
  INDEL variants that are discovered are then fed in to ADAM's INDEL realignment
  engine as known INDELs.
* Uses this realignment mode in the BiallelicGenotyperSuite to clean up a
  previously failing test.
* Adds this to the Reassemble CLI as a `-use_consensus_realigner` switch.
  • Loading branch information
fnothaft committed Jan 23, 2017
1 parent c9f8b7b commit 9c7094c
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 24 deletions.
12 changes: 10 additions & 2 deletions avocado-cli/src/main/scala/org/bdgenomics/avocado/Reassemble.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.SparkContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.read.{ AlignedReadRDD, AlignmentRecordRDD, MDTagging }
import org.bdgenomics.avocado.realigner.Realigner
import org.bdgenomics.avocado.realigner.{ ConsensusRealigner, Realigner }
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }

Expand Down Expand Up @@ -58,6 +58,10 @@ class ReassembleArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
name = "-defer_merging",
usage = "Defers merging single file output")
var deferMerging: Boolean = false
@Args4jOption(required = false,
name = "-use_consensus_realigner",
usage = "If provided, uses consensus-mode realigner.")
var useConsensusRealigner: Boolean = false

// required by ADAMSaveAnyArgs
var sortFastqOutput: Boolean = false
Expand All @@ -79,7 +83,11 @@ class Reassemble(
val reads = sc.loadAlignments(args.inputPath)

// realign the reads
val reassembledReads = Realigner.realign(reads, args.kmerLength)
val reassembledReads = if (args.useConsensusRealigner) {
ConsensusRealigner.realign(reads, args.kmerLength)
} else {
Realigner.realign(reads, args.kmerLength)
}

// save the reads
reassembledReads.save(args)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.avocado.realigner

import org.bdgenomics.adam.algorithms.consensus.ConsensusGenerator
import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD
import org.bdgenomics.adam.rdd.variant.VariantRDD
import org.bdgenomics.avocado.genotyping.DiscoverVariants
import org.bdgenomics.formats.avro.Variant

object ConsensusRealigner {

/**
* Realigns a set of reads against the reference genome.
*
* @param reads Reads to realign.
* @param kmerLength The length k of the k-mers.
* @return Returns the realigned reads.
*/
def realign(reads: AlignmentRecordRDD,
kmerLength: Int): AlignmentRecordRDD = {

// use the realigner to make a first pass over the reads
val realignedReads = Realigner.realign(reads, kmerLength)

// from here, we'll discover any potential variants
val variants = filterIndels(DiscoverVariants(realignedReads))

// we'll pass these discovered variants to ADAM's indel realigner
reads.realignIndels(ConsensusGenerator.fromKnownIndels(variants))
}

/**
* @param v The variant to filter.
* @return Returns false if the variant is a SNV/MNV.
*/
private[realigner] def discardSnvs(v: Variant): Boolean = {
v.getReferenceAllele.length != v.getAlternateAllele.length
}

/**
* @param rdd The RDD of variants to filter.
* @return Returns a new RDD of variants where the SNVs & MNVs have been
* removed and only INDEL variants remain.
*/
private[realigner] def filterIndels(rdd: VariantRDD): VariantRDD = {
rdd.transform(r => r.filter(discardSnvs))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.bdgenomics.avocado.genotyping
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.avocado.AvocadoFunSuite
import org.bdgenomics.avocado.models.Observation
import org.bdgenomics.avocado.realigner.ConsensusRealigner
import org.bdgenomics.avocado.util.{
HardFilterGenotypes,
TestHardFilterGenotypesArgs
Expand Down Expand Up @@ -437,14 +438,16 @@ class BiallelicGenotyperSuite extends AvocadoFunSuite {
})
}

ignore("call hom alt C->CCCCT insertion at 1/866511") {
sparkTest("call hom alt C->CCCCT insertion at 1/866511") {
val readPath = resourceUrl("NA12878.chr1.866511.sam")
val reads = sc.loadAlignments(readPath.toString)
.transform(rdd => {
rdd.filter(_.getMapq > 0)
})

val gts = BiallelicGenotyper.discoverAndCall(reads,
val realignedReads = ConsensusRealigner.realign(reads, 20)

val gts = BiallelicGenotyper.discoverAndCall(realignedReads,
2,
optPhredThreshold = Some(30)).transform(rdd => {
rdd.filter(gt => gt.getAlleles.forall(_ == GenotypeAllele.ALT))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.avocado.realigner

import org.bdgenomics.adam.models.SequenceDictionary
import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD
import org.bdgenomics.adam.rdd.variant.VariantRDD
import org.bdgenomics.formats.avro.Variant

class ConsensusRealignerSuite extends SparkRealignerSuite {

val allowLegacyCigars = true

def realign(rdd: AlignmentRecordRDD,
kmerLength: Int): AlignmentRecordRDD = {
ConsensusRealigner.realign(rdd, kmerLength)
}

val snv = Variant.newBuilder
.setStart(0L)
.setReferenceAllele("A")
.setAlternateAllele("G")
.build
val mnv = Variant.newBuilder
.setStart(1L)
.setReferenceAllele("AC")
.setAlternateAllele("GT")
.build
val ins = Variant.newBuilder
.setStart(2L)
.setReferenceAllele("A")
.setAlternateAllele("ACC")
.build
val del = Variant.newBuilder
.setStart(3L)
.setReferenceAllele("GCT")
.setAlternateAllele("G")
.build

test("filter out a snv") {
assert(!ConsensusRealigner.discardSnvs(snv))
}

test("filter out a mnv") {
assert(!ConsensusRealigner.discardSnvs(mnv))
}

test("keep an insert") {
assert(ConsensusRealigner.discardSnvs(ins))
}

test("keep a deletion") {
assert(ConsensusRealigner.discardSnvs(del))
}

sparkTest("filter snv/mnv variants out") {
val vRdd = VariantRDD(sc.parallelize(Seq(snv, mnv, ins, del)),
SequenceDictionary.empty)
val filteredVariants = ConsensusRealigner.filterIndels(vRdd)
.rdd
.collect

val variantIds = filteredVariants.map(_.getStart)
.toSet

assert(variantIds.size === 2)
assert(variantIds(2L))
assert(variantIds(3L))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import org.bdgenomics.formats.avro.AlignmentRecord

class RealignerSuite extends SparkRealignerSuite {

val allowLegacyCigars = false

def realign(rdd: AlignmentRecordRDD,
kmerLength: Int): AlignmentRecordRDD = {
Realigner.realign(rdd, kmerLength)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import org.bdgenomics.formats.avro.AlignmentRecord

trait SparkRealignerSuite extends AvocadoFunSuite {

val allowLegacyCigars: Boolean

def realign(rdd: AlignmentRecordRDD,
kmerLength: Int): AlignmentRecordRDD

Expand All @@ -47,11 +49,16 @@ trait SparkRealignerSuite extends AvocadoFunSuite {
}

sparkTest("realign a set of reads around an insert") {
// insertion sequence:
// true insertion sequence:
// ins: AATGAGACTTACATCATTAAAACCGTGTGGACACA
// ref: AATGAGACTTACATCATT__AACCGTGTGGACACA
//
// test alignment:
// X: |
// ins: AATGAGACTTACATCATTAAAACCGTGTGGACACA
// ref: AATGAGACTTACATCATTAA__CCGTGTGGACACA
// ref: AATGAGACTTACATCATTAAC__CGTGTGGACACA
val sequence = "AATGAGACTTACATCATTAAAACCGTGTGGACACA"
val insertStart = 20
val insertStart = 21
val readLength = insertStart + 6 + 2

// generate 7 reads with a 6bp flank
Expand All @@ -65,10 +72,13 @@ trait SparkRealignerSuite extends AvocadoFunSuite {
.setRecordGroupName("rg")
.setReadMapped(true)
.setSequence(sequence.drop(rId).take(readLength))
.setQual("*" * readLength)
.setStart(rId.toLong)
.setEnd((rId + readLength - 2 + 1).toLong)
.setEnd((rId + insertStart + 6).toLong)
.setCigar("%dM2I%dM".format(basesBeforeInsert, basesAfterInsert))
.setMismatchingPositions((readLength - 2).toString)
.setMismatchingPositions("%dC%d".format(basesBeforeInsert - 1,
basesAfterInsert))
.setMapq(50)
.build()
})

Expand All @@ -81,25 +91,34 @@ trait SparkRealignerSuite extends AvocadoFunSuite {

// these values are different from above because original alignments were
// not left justified
val basesBeforeInsert = insertStart - rId - 2
val basesAfterInsert = 8 + rId

assert(r.getCigar === "%d=2I%d=".format(basesBeforeInsert, basesAfterInsert))
val basesBeforeInsert = insertStart - rId - 3
val basesAfterInsert = 9 + rId

if (allowLegacyCigars) {
assert(r.getCigar === "%dM2I%dM".format(basesBeforeInsert, basesAfterInsert))
} else {
assert(r.getCigar === "%d=2I%d=".format(basesBeforeInsert, basesAfterInsert))
}
assert(r.getMismatchingPositions === (readLength - 2).toString)
})
}

sparkTest("realign a set of reads around a deletion") {
// deletion sequence:
// del: AGGTCTGAATGAGACTTA__TCATTAACCGTGTGGACACA
// true deletion sequence:
// del: AGGTCTGAATGAGACTT__ATCATTAACCGTGTGGACACA
// ref: AGGTCTGAATGAGACTTACATCATTAACCGTGTGGACACA
//
// test alignment:
// X: |
// del: AGGTCTGAATGAGACT__TATCATTAACCGTGTGGACACA
// ref: AGGTCTGAATGAGACTTACATCATTAACCGTGTGGACACA
val sequence = "AGGTCTGAATGAGACTTATCATTAACCGTGTGGACACA"
val deleteStart = 18
val readLength = deleteStart + 8
val deleteStart = 16
val readLength = deleteStart + 7

// generate 10 reads with a 8bp flank
val reads = (0 until 10).map(rId => {
val basesBeforeDelete = deleteStart - rId
val basesBeforeDelete = deleteStart - rId - 1
val basesAfterDelete = 8 + rId

AlignmentRecord.newBuilder()
Expand All @@ -108,10 +127,13 @@ trait SparkRealignerSuite extends AvocadoFunSuite {
.setRecordGroupName("rg")
.setReadMapped(true)
.setSequence(sequence.drop(rId).take(readLength))
.setQual("*" * readLength)
.setStart(rId.toLong)
.setEnd((rId + readLength + 2 + 1).toLong)
.setEnd((rId + readLength + 2).toLong)
.setCigar("%dM2D%dM".format(basesBeforeDelete, basesAfterDelete))
.setMismatchingPositions("%d^CA%d".format(basesBeforeDelete, basesAfterDelete))
.setMismatchingPositions("%d^TA0C%d".format(basesBeforeDelete,
basesAfterDelete - 1))
.setMapq(50)
.build()
})

Expand All @@ -124,23 +146,30 @@ trait SparkRealignerSuite extends AvocadoFunSuite {

// these values are different from above because original alignments were
// not left justified
val basesBeforeDelete = deleteStart - rId - 1
val basesAfterDelete = 9 + rId
val basesBeforeDelete = deleteStart - rId
val basesAfterDelete = 7 + rId

assert(r.getCigar === "%d=2D%d=".format(basesBeforeDelete, basesAfterDelete))
if (allowLegacyCigars) {
assert(r.getCigar === "%dM2D%dM".format(basesBeforeDelete, basesAfterDelete))
} else {
assert(r.getCigar === "%d=2D%d=".format(basesBeforeDelete, basesAfterDelete))
}
assert(r.getMismatchingPositions === "%d^AC%d".format(basesBeforeDelete, basesAfterDelete))
})
}

sparkTest("realigning a read with a repeat will return the original read") {
val read = AlignmentRecord.newBuilder()
.setContigName("ctg")
.setReadName("A_READ")
.setReadMapped(true)
.setStart(10L)
.setEnd(17L)
.setSequence("TCAAAAAAGG")
.setQual("**********")
.setCigar("3M4I3M")
.setMismatchingPositions("6")
.setMapq(50)
.build()

// make into a genomic rdd
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<name>avocado: A Variant Caller, Distributed</name>

<properties>
<adam.version>0.21.0</adam.version>
<adam.version>0.21.1-SNAPSHOT</adam.version>
<avro.version>1.8.0</avro.version>
<java.version>1.8</java.version>
<scala.version>2.10.4</scala.version>
Expand Down

0 comments on commit 9c7094c

Please sign in to comment.