Skip to content

Commit

Permalink
[AVOCADO-293] Use Spark SQL to filter variants prior to squaring off.
Browse files Browse the repository at this point in the history
Resolves #293.
  • Loading branch information
Frank Austin Nothaft authored and fnothaft committed Feb 18, 2018
1 parent 78f71e1 commit e0979dd
Showing 1 changed file with 36 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
*/
package org.bdgenomics.avocado.genotyping

import org.apache.spark.sql.functions._
import org.bdgenomics.adam.models.{ ReferenceRegion, VariantContext }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.variant.{
GenotypeRDD,
VariantRDD,
VariantContextRDD
}
import org.bdgenomics.adam.sql.{ Variant => VariantProduct }
import org.bdgenomics.formats.avro.{ Genotype, GenotypeAllele, Variant }
import scala.annotation.tailrec

Expand Down Expand Up @@ -125,34 +127,40 @@ object SquareOffReferenceModel {
*/
def extractVariants(genotypes: GenotypeRDD): VariantRDD = {

genotypes.transmute(_.flatMap(gt => {

if (gt.getAlleles.contains(GenotypeAllele.ALT) &&
gt.getVariant.getAlternateAllele != null) {

// trim the alleles so that we have a canonical result
val toTrimFromRight = trimRight(gt.getVariant.getReferenceAllele,
gt.getVariant.getAlternateAllele)

Some(Variant.newBuilder
.setContigName(gt.getContigName)
.setStart(gt.getStart)
.setEnd(gt.getEnd - toTrimFromRight)
.setReferenceAllele(gt.getVariant
.getReferenceAllele
.dropRight(toTrimFromRight))
.setAlternateAllele(gt.getVariant
.getAlternateAllele
.dropRight(toTrimFromRight))
.build)
} else {
None
}
})).transformDataset(_.dropDuplicates("start",
"end",
"contigName",
"referenceAllele",
"alternateAllele"))
val altString = GenotypeAllele.ALT.toString()

def isVariant(alleles: Seq[String]): Boolean = {
alleles.exists(_ == altString)
}

val isVariantUdf = udf((a: Seq[String]) => isVariant(a))
val trimUdf = udf((a: String, b: String) => trimRight(a, b))
val trimmerUdf = udf((a: String, b: Int) => a.dropRight(b))

genotypes.transmuteDataset(ds => {

import ds.sparkSession.implicits._

val variants = ds.where(isVariantUdf(ds("alleles")) && ds("alleles").isNotNull)
.select($"variant.*")
.as[VariantProduct]

val variantsToTrim = variants.withColumn("trimBy", trimUdf($"referenceAllele",
$"alternateAllele"))

// trim the alleles so that we have a canonical result
val trimmedVariants = variantsToTrim.withColumn("end", $"end" - $"trimBy")
.withColumn("referenceAllele", trimmerUdf($"referenceAllele", $"trimBy"))
.withColumn("alternateAllele", trimmerUdf($"alternateAllele", $"trimBy"))
.drop($"trimBy")
.as[VariantProduct]

trimmedVariants.dropDuplicates("start",
"end",
"contigName",
"referenceAllele",
"alternateAllele")
})
}

/**
Expand Down

0 comments on commit e0979dd

Please sign in to comment.