From 81c49ae67282f0101300eee53d40feb6a013c634 Mon Sep 17 00:00:00 2001 From: Michael L Heuer Date: Tue, 17 Jul 2018 10:51:55 -0500 Subject: [PATCH] Add copyVariantEndToAttribute method to support gVCF END attribute when not including nested variant annotations. --- .../adam/rdd/variant/GenotypeRDD.scala | 40 +++++++- .../adam/rdd/variant/GenotypeRDDSuite.scala | 97 +++++++++++++++++++ 2 files changed, 135 insertions(+), 2 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala index 43ee1e0a95..10f1a5993a 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDD.scala @@ -42,13 +42,16 @@ import org.bdgenomics.adam.rich.RichVariant import org.bdgenomics.adam.serialization.AvroSerializer import org.bdgenomics.adam.sql.{ Genotype => GenotypeProduct, - Variant => VariantProduct + Variant => VariantProduct, + VariantAnnotation => VariantAnnotationProduct } import org.bdgenomics.utils.interval.array.{ IntervalArray, IntervalArraySerializer } import org.bdgenomics.formats.avro.{ Genotype, GenotypeAllele, - Sample + Sample, + Variant, + VariantAnnotation } import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -195,6 +198,20 @@ case class DatasetBoundGenotypeRDD private[rdd] ( copy(samples = newSamples.toSeq) } + override def copyVariantEndToAttribute(): GenotypeRDD = { + def copyEnd(g: GenotypeProduct): GenotypeProduct = { + val variant = g.variant.getOrElse(VariantProduct()) + val annotation = variant.annotation.getOrElse(VariantAnnotationProduct()) + val attributes = annotation.attributes + ("END" -> g.end.toString) + val annotationCopy = annotation.copy(attributes = attributes) + val variantCopy = variant.copy(annotation = Some(annotationCopy)) + g.copy(variant = Some(variantCopy)) + } + val sqlContext = SQLContext.getOrCreate(rdd.context) + import sqlContext.implicits._ + transformDataset(dataset => dataset.map(copyEnd)) + } + override def filterToFiltersPassed(): GenotypeRDD = { transformDataset(dataset => dataset.filter(dataset.col("variantCallingAnnotations.filtersPassed"))) } @@ -374,6 +391,25 @@ sealed abstract class GenotypeRDD extends MultisampleAvroGenomicDataset[Genotype VariantRDD(maybeDedupedVariants, sequences, headerLines) } + /** + * Copy variant end to a variant attribute (VCF INFO field "END"). + * + * @return GenotypeRDD with variant end copied to a variant attribute. + */ + def copyVariantEndToAttribute(): GenotypeRDD = { + def copyEnd(g: Genotype): Genotype = { + val variant = Option(g.variant).getOrElse(new Variant()) + val annotation = Option(variant.annotation).getOrElse(new VariantAnnotation()) + val attributes = new java.util.HashMap[String, String]() + Option(annotation.attributes).map(attributes.putAll(_)) + attributes.put("END", g.end.toString) + val annotationCopy = VariantAnnotation.newBuilder(annotation).setAttributes(attributes).build() + val variantCopy = Variant.newBuilder(variant).setAnnotation(annotationCopy).build() + Genotype.newBuilder(g).setVariant(variantCopy).build() + } + transform(rdd => rdd.map(copyEnd)) + } + /** * Filter this GenotypeRDD to genotype filters passed (VCF FORMAT field "FT" value PASS). * diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala index 7c84a0e2d5..1a8c028049 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/variant/GenotypeRDDSuite.scala @@ -17,7 +17,10 @@ */ package org.bdgenomics.adam.rdd.variant +import com.google.common.io.Files +import htsjdk.samtools.ValidationStringency import htsjdk.variant.vcf.VCFHeaderLine +import java.io.File import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ Dataset, SQLContext } import org.bdgenomics.adam.converters.VariantContextConverter @@ -100,6 +103,8 @@ object GenotypeRDDSuite extends Serializable { class GenotypeRDDSuite extends ADAMFunSuite { + val tempDir = Files.createTempDir() + sparkTest("union two genotype rdds together") { val genotype1 = sc.loadGenotypes(testFile("gvcf_dir/gvcf_multiallelic.g.vcf")) val genotype2 = sc.loadGenotypes(testFile("small.vcf")) @@ -684,4 +689,96 @@ class GenotypeRDDSuite extends ADAMFunSuite { val genotypesDs = genotypes.transformDataset(ds => ds) assert(genotypesDs.filterNoCalls().dataset.count() === 3) } + + sparkTest("round trip gVCF END attribute without nested variant annotations rdd bound") { + val genotypes = sc.loadGenotypes(testFile("gvcf_multiallelic/multiallelic.vcf")) + val first = genotypes.sort.rdd.collect.head + assert(first.end === 16157602L) + assert(first.variant.end === 16157602L) + assert(first.variant.annotation === null) + + val path = new File(tempDir, "test.gvcf.vcf") + genotypes.copyVariantEndToAttribute.toVariantContexts.saveAsVcf(path.getAbsolutePath, + asSingleFile = true, + deferMerging = false, + disableFastConcat = false, + ValidationStringency.SILENT) + + val vcfGenotypes = sc.loadGenotypes("%s/test.gvcf.vcf".format(tempDir)) + val firstVcfGenotype = vcfGenotypes.sort.rdd.collect.head + assert(firstVcfGenotype.end === 16157602L) + // variant end copied to END attribute before writing, so this is correct + assert(firstVcfGenotype.variant.end === 16157602L) + // ...but annotations are not populated on read + assert(firstVcfGenotype.variant.annotation === null) + } + + sparkTest("round trip gVCF END attribute without nested variant annotations dataset bound") { + val genotypes = sc.loadGenotypes(testFile("gvcf_multiallelic/multiallelic.vcf")) + val first = genotypes.sort.dataset.first + assert(first.end === Some(16157602L)) + assert(first.variant.get.end === Some(16157602L)) + assert(first.variant.get.annotation.isEmpty) + + val path = new File(tempDir, "test.gvcf.vcf") + genotypes.copyVariantEndToAttribute.toVariantContexts.saveAsVcf(path.getAbsolutePath, + asSingleFile = true, + deferMerging = false, + disableFastConcat = false, + ValidationStringency.SILENT) + + val vcfGenotypes = sc.loadGenotypes("%s/test.gvcf.vcf".format(tempDir)) + val firstVcfGenotype = vcfGenotypes.sort.dataset.first + assert(firstVcfGenotype.end === Some(16157602L)) + // variant end copied to END attribute before writing, so this is correct + assert(firstVcfGenotype.variant.get.end === Some(16157602L)) + // ...but annotations are not populated on read + assert(first.variant.get.annotation.isEmpty) + } + + sparkTest("round trip gVCF END attribute with nested variant annotations rdd bound") { + VariantContextConverter.setNestAnnotationInGenotypesProperty(sc.hadoopConfiguration, true) + + val genotypes = sc.loadGenotypes(testFile("gvcf_multiallelic/multiallelic.vcf")) + val first = genotypes.sort.rdd.collect.head + assert(first.end === 16157602L) + assert(first.variant.end === 16157602L) + assert(first.variant.annotation.attributes.get("END") === "16157602") + + val path = new File(tempDir, "test.gvcf.vcf") + genotypes.toVariantContexts.saveAsVcf(path.getAbsolutePath, + asSingleFile = true, + deferMerging = false, + disableFastConcat = false, + ValidationStringency.SILENT) + + val vcfGenotypes = sc.loadGenotypes("%s/test.gvcf.vcf".format(tempDir)) + val firstVcfGenotype = vcfGenotypes.sort.rdd.collect.head + assert(firstVcfGenotype.end === 16157602L) + assert(firstVcfGenotype.variant.end === 16157602L) + assert(firstVcfGenotype.variant.annotation.attributes.get("END") === "16157602") + } + + sparkTest("round trip gVCF END attribute with nested variant annotations dataset bound") { + VariantContextConverter.setNestAnnotationInGenotypesProperty(sc.hadoopConfiguration, true) + + val genotypes = sc.loadGenotypes(testFile("gvcf_multiallelic/multiallelic.vcf")) + val first = genotypes.sort.dataset.first + assert(first.end === Some(16157602L)) + assert(first.variant.get.end === Some(16157602L)) + assert(first.variant.get.annotation.get.attributes.get("END") === Some("16157602")) + + val path = new File(tempDir, "test.gvcf.vcf") + genotypes.toVariantContexts.saveAsVcf(path.getAbsolutePath, + asSingleFile = true, + deferMerging = false, + disableFastConcat = false, + ValidationStringency.SILENT) + + val vcfGenotypes = sc.loadGenotypes("%s/test.gvcf.vcf".format(tempDir)) + val firstVcfGenotype = vcfGenotypes.sort.dataset.first + assert(firstVcfGenotype.end === Some(16157602L)) + assert(firstVcfGenotype.variant.get.end === Some(16157602L)) + assert(firstVcfGenotype.variant.get.annotation.get.attributes.get("END") === Some("16157602")) + } }