Skip to content

Commit

Permalink
Merge pull request #2017 from heuermh/gvcf-end
Browse files Browse the repository at this point in the history
[ADAM-1988] Add copyVariantEndToAttribute method to support gVCF END attribute …
  • Loading branch information
akmorrow13 committed Aug 30, 2018
2 parents f7b51cc + 81c49ae commit 3542342
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.adam.serialization.AvroSerializer
import org.bdgenomics.adam.sql.{
Genotype => GenotypeProduct,
Variant => VariantProduct
Variant => VariantProduct,
VariantAnnotation => VariantAnnotationProduct
}
import org.bdgenomics.utils.interval.array.{ IntervalArray, IntervalArraySerializer }
import org.bdgenomics.formats.avro.{
Genotype,
GenotypeAllele,
Sample
Sample,
Variant,
VariantAnnotation
}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -195,6 +198,20 @@ case class DatasetBoundGenotypeRDD private[rdd] (
copy(samples = newSamples.toSeq)
}

override def copyVariantEndToAttribute(): GenotypeRDD = {
def copyEnd(g: GenotypeProduct): GenotypeProduct = {
val variant = g.variant.getOrElse(VariantProduct())
val annotation = variant.annotation.getOrElse(VariantAnnotationProduct())
val attributes = annotation.attributes + ("END" -> g.end.toString)
val annotationCopy = annotation.copy(attributes = attributes)
val variantCopy = variant.copy(annotation = Some(annotationCopy))
g.copy(variant = Some(variantCopy))
}
val sqlContext = SQLContext.getOrCreate(rdd.context)
import sqlContext.implicits._
transformDataset(dataset => dataset.map(copyEnd))
}

override def filterToFiltersPassed(): GenotypeRDD = {
transformDataset(dataset => dataset.filter(dataset.col("variantCallingAnnotations.filtersPassed")))
}
Expand Down Expand Up @@ -374,6 +391,25 @@ sealed abstract class GenotypeRDD extends MultisampleAvroGenomicDataset[Genotype
VariantRDD(maybeDedupedVariants, sequences, headerLines)
}

/**
* Copy variant end to a variant attribute (VCF INFO field "END").
*
* @return GenotypeRDD with variant end copied to a variant attribute.
*/
def copyVariantEndToAttribute(): GenotypeRDD = {
def copyEnd(g: Genotype): Genotype = {
val variant = Option(g.variant).getOrElse(new Variant())
val annotation = Option(variant.annotation).getOrElse(new VariantAnnotation())
val attributes = new java.util.HashMap[String, String]()
Option(annotation.attributes).map(attributes.putAll(_))
attributes.put("END", g.end.toString)
val annotationCopy = VariantAnnotation.newBuilder(annotation).setAttributes(attributes).build()
val variantCopy = Variant.newBuilder(variant).setAnnotation(annotationCopy).build()
Genotype.newBuilder(g).setVariant(variantCopy).build()
}
transform(rdd => rdd.map(copyEnd))
}

/**
* Filter this GenotypeRDD to genotype filters passed (VCF FORMAT field "FT" value PASS).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
*/
package org.bdgenomics.adam.rdd.variant

import com.google.common.io.Files
import htsjdk.samtools.ValidationStringency
import htsjdk.variant.vcf.VCFHeaderLine
import java.io.File
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ Dataset, SQLContext }
import org.bdgenomics.adam.converters.VariantContextConverter
Expand Down Expand Up @@ -100,6 +103,8 @@ object GenotypeRDDSuite extends Serializable {

class GenotypeRDDSuite extends ADAMFunSuite {

val tempDir = Files.createTempDir()

sparkTest("union two genotype rdds together") {
val genotype1 = sc.loadGenotypes(testFile("gvcf_dir/gvcf_multiallelic.g.vcf"))
val genotype2 = sc.loadGenotypes(testFile("small.vcf"))
Expand Down Expand Up @@ -684,4 +689,96 @@ class GenotypeRDDSuite extends ADAMFunSuite {
val genotypesDs = genotypes.transformDataset(ds => ds)
assert(genotypesDs.filterNoCalls().dataset.count() === 3)
}

sparkTest("round trip gVCF END attribute without nested variant annotations rdd bound") {
val genotypes = sc.loadGenotypes(testFile("gvcf_multiallelic/multiallelic.vcf"))
val first = genotypes.sort.rdd.collect.head
assert(first.end === 16157602L)
assert(first.variant.end === 16157602L)
assert(first.variant.annotation === null)

val path = new File(tempDir, "test.gvcf.vcf")
genotypes.copyVariantEndToAttribute.toVariantContexts.saveAsVcf(path.getAbsolutePath,
asSingleFile = true,
deferMerging = false,
disableFastConcat = false,
ValidationStringency.SILENT)

val vcfGenotypes = sc.loadGenotypes("%s/test.gvcf.vcf".format(tempDir))
val firstVcfGenotype = vcfGenotypes.sort.rdd.collect.head
assert(firstVcfGenotype.end === 16157602L)
// variant end copied to END attribute before writing, so this is correct
assert(firstVcfGenotype.variant.end === 16157602L)
// ...but annotations are not populated on read
assert(firstVcfGenotype.variant.annotation === null)
}

sparkTest("round trip gVCF END attribute without nested variant annotations dataset bound") {
val genotypes = sc.loadGenotypes(testFile("gvcf_multiallelic/multiallelic.vcf"))
val first = genotypes.sort.dataset.first
assert(first.end === Some(16157602L))
assert(first.variant.get.end === Some(16157602L))
assert(first.variant.get.annotation.isEmpty)

val path = new File(tempDir, "test.gvcf.vcf")
genotypes.copyVariantEndToAttribute.toVariantContexts.saveAsVcf(path.getAbsolutePath,
asSingleFile = true,
deferMerging = false,
disableFastConcat = false,
ValidationStringency.SILENT)

val vcfGenotypes = sc.loadGenotypes("%s/test.gvcf.vcf".format(tempDir))
val firstVcfGenotype = vcfGenotypes.sort.dataset.first
assert(firstVcfGenotype.end === Some(16157602L))
// variant end copied to END attribute before writing, so this is correct
assert(firstVcfGenotype.variant.get.end === Some(16157602L))
// ...but annotations are not populated on read
assert(first.variant.get.annotation.isEmpty)
}

sparkTest("round trip gVCF END attribute with nested variant annotations rdd bound") {
VariantContextConverter.setNestAnnotationInGenotypesProperty(sc.hadoopConfiguration, true)

val genotypes = sc.loadGenotypes(testFile("gvcf_multiallelic/multiallelic.vcf"))
val first = genotypes.sort.rdd.collect.head
assert(first.end === 16157602L)
assert(first.variant.end === 16157602L)
assert(first.variant.annotation.attributes.get("END") === "16157602")

val path = new File(tempDir, "test.gvcf.vcf")
genotypes.toVariantContexts.saveAsVcf(path.getAbsolutePath,
asSingleFile = true,
deferMerging = false,
disableFastConcat = false,
ValidationStringency.SILENT)

val vcfGenotypes = sc.loadGenotypes("%s/test.gvcf.vcf".format(tempDir))
val firstVcfGenotype = vcfGenotypes.sort.rdd.collect.head
assert(firstVcfGenotype.end === 16157602L)
assert(firstVcfGenotype.variant.end === 16157602L)
assert(firstVcfGenotype.variant.annotation.attributes.get("END") === "16157602")
}

sparkTest("round trip gVCF END attribute with nested variant annotations dataset bound") {
VariantContextConverter.setNestAnnotationInGenotypesProperty(sc.hadoopConfiguration, true)

val genotypes = sc.loadGenotypes(testFile("gvcf_multiallelic/multiallelic.vcf"))
val first = genotypes.sort.dataset.first
assert(first.end === Some(16157602L))
assert(first.variant.get.end === Some(16157602L))
assert(first.variant.get.annotation.get.attributes.get("END") === Some("16157602"))

val path = new File(tempDir, "test.gvcf.vcf")
genotypes.toVariantContexts.saveAsVcf(path.getAbsolutePath,
asSingleFile = true,
deferMerging = false,
disableFastConcat = false,
ValidationStringency.SILENT)

val vcfGenotypes = sc.loadGenotypes("%s/test.gvcf.vcf".format(tempDir))
val firstVcfGenotype = vcfGenotypes.sort.dataset.first
assert(firstVcfGenotype.end === Some(16157602L))
assert(firstVcfGenotype.variant.get.end === Some(16157602L))
assert(firstVcfGenotype.variant.get.annotation.get.attributes.get("END") === Some("16157602"))
}
}

0 comments on commit 3542342

Please sign in to comment.