From 40acc56f74eda652bc4185b57450d356da058c2b Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Fri, 6 Dec 2013 15:34:14 -0800 Subject: [PATCH] Adding rod functionality: a specialized grouping of pileup data. --- .../adam/commands/PileupAggregator.scala | 2 +- .../cs/amplab/adam/models/ADAMRod.scala | 46 +++++ .../cs/amplab/adam/rdd/AdamContext.scala | 5 +- .../cs/amplab/adam/rdd/AdamRDDFunctions.scala | 80 +++++++- .../cs/amplab/adam/rdd/PileupAggregator.scala | 171 ++++++++++++++++-- .../cs/amplab/adam/rdd/AdamContextSuite.scala | 2 +- .../adam/rdd/AdamRDDFunctionsSuite.scala | 166 +++++++++++++++++ .../adam/rdd/PileupAggregationSuite.scala | 145 +++++++++++++++ 8 files changed, 597 insertions(+), 20 deletions(-) create mode 100644 adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/models/ADAMRod.scala create mode 100644 adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctionsSuite.scala create mode 100644 adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/PileupAggregationSuite.scala diff --git a/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/commands/PileupAggregator.scala b/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/commands/PileupAggregator.scala index 3a438ab123..0495d2f30c 100644 --- a/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/commands/PileupAggregator.scala +++ b/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/commands/PileupAggregator.scala @@ -53,4 +53,4 @@ class PileupAggregator(protected val args: PileupAggregatorArgs) pileups.adamAggregatePileups().adamSave(args.pileupOutput, args) } -} \ No newline at end of file +} diff --git a/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/models/ADAMRod.scala b/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/models/ADAMRod.scala new file mode 100644 index 0000000000..1f5b8838e9 --- /dev/null +++ b/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/models/ADAMRod.scala @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2013. Regents of the University of California + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.berkeley.cs.amplab.adam.models + +import edu.berkeley.cs.amplab.adam.avro.ADAMPileup +import org.apache.spark.rdd.RDD + +/** + * Class representing a set of pileup bases at a specific locus. + * + * @param position Position on the reference genome. + * @param pileups A list representing the bases at this locus. + */ +case class ADAMRod (position: Long, pileups: List[ADAMPileup]) { + // all bases must be at the same position + require(pileups.forall(_.getPosition.toLong == position)) + + lazy val isSingleSample: Boolean = pileups.map(_.getRecordGroupSample).distinct.length == 1 + + /** + * Splits this rod out by samples. + * + * @return A list of rods, each corresponding to a single sample. + */ + def splitBySamples (): List[ADAMRod] = { + if(isSingleSample) { + List(new ADAMRod(position, pileups)) + } else { + pileups.groupBy(_.getRecordGroupSample).values.toList.map(pg => new ADAMRod(position, pg)) + } + } + +} diff --git a/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/AdamContext.scala b/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/AdamContext.scala index cf37171c0f..c83a43ff22 100644 --- a/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/AdamContext.scala +++ b/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/AdamContext.scala @@ -30,7 +30,7 @@ import edu.berkeley.cs.amplab.adam.commands.{SAMRecordConverter, VariantContextC import org.apache.spark.rdd.RDD import org.apache.spark.{Logging, SparkContext} import scala.collection.JavaConversions._ -import edu.berkeley.cs.amplab.adam.models.{SequenceRecord, SequenceDictionary} +import edu.berkeley.cs.amplab.adam.models.{SequenceRecord, SequenceDictionary, ADAMRod} import org.apache.hadoop.fs.Path import fi.tkk.ics.hadoop.bam.util.SAMHeaderReader import edu.berkeley.cs.amplab.adam.projections.{ADAMRecordField, Projection} @@ -54,6 +54,9 @@ object AdamContext { // Add methods specific to ADAMVariantContext RDDs implicit def rddToAdamVariantContextRDD(rdd: RDD[ADAMVariantContext]) = new AdamVariantContextRDDFunctions(rdd) + // Add methods specific to the ADAMRod RDDs + implicit def rddToAdamRodRDD(rdd: RDD[ADAMRod]) = new AdamRodRDDFunctions(rdd) + // Add generic RDD methods for all types of ADAM RDDs implicit def rddToAdamRDD[T <% SpecificRecord : Manifest](rdd: RDD[T]) = new AdamRDDFunctions(rdd) diff --git a/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctions.scala b/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctions.scala index 204a64019d..0d79cfb48e 100644 --- a/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctions.scala +++ b/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctions.scala @@ -23,7 +23,7 @@ import parquet.hadoop.util.ContextUtil import org.apache.avro.specific.SpecificRecord import edu.berkeley.cs.amplab.adam.avro.{ADAMPileup, ADAMRecord, ADAMVariant, ADAMGenotype, ADAMVariantDomain} import edu.berkeley.cs.amplab.adam.commands.ParquetArgs -import edu.berkeley.cs.amplab.adam.models.{SequenceRecord, SequenceDictionary, SingleReadBucket, ReferencePosition} +import edu.berkeley.cs.amplab.adam.models.{SequenceRecord, SequenceDictionary, SingleReadBucket, ReferencePosition, ADAMRod} import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.apache.spark.Logging @@ -144,13 +144,89 @@ class AdamRecordRDDFunctions(rdd: RDD[ADAMRecord]) extends Serializable with Log } } -class AdamPileupRDDFunctions(rdd: RDD[ADAMPileup]) extends Serializable { +class AdamPileupRDDFunctions(rdd: RDD[ADAMPileup]) extends Serializable with Logging { + initLogging() + /** + * Aggregates pileup bases together. + * + * @param coverage Coverage value is used to increase number of reducer operators. + * @return RDD with aggregated bases. + * + * @see AdamRodRDDFunctions#adamAggregateRods + */ def adamAggregatePileups(coverage: Int = 30): RDD[ADAMPileup] = { val helper = new PileupAggregator helper.aggregate(rdd, coverage) } + /** + * Converts ungrouped pileup bases into reference grouped bases. + * + * @param coverage Coverage value is used to increase number of reducer operators. + * @return RDD with rods grouped by reference position. + */ + def adamPileupsToRods(coverage: Int = 30): RDD[ADAMRod] = { + val groups = rdd.groupBy((p: ADAMPileup) => p.getPosition, coverage) + + groups.map(kv => ADAMRod(kv._1, kv._2.toList)) + } + +} + +class AdamRodRDDFunctions(rdd: RDD[ADAMRod]) extends Serializable with Logging { + initLogging() + + /** + * Given an RDD of rods, splits the rods up by the specific sample they correspond to. + * Returns a flat RDD. + * + * @return Rods split up by samples and _not_ grouped together. + */ + def adamSplitRodsBySamples(): RDD[ADAMRod] = { + rdd.flatMap(_.splitBySamples) + } + + /** + * Given an RDD of rods, splits the rods up by the specific sample they correspond to. + * Returns an RDD where the samples are grouped by the reference position. + * + * @return Rods split up by samples and grouped together by position. + */ + def adamDivideRodsBySamples(): RDD[(Long, List[ADAMRod])] = { + rdd.keyBy(_.position).map(r => (r._1, r._2.splitBySamples)) + } + + /** + * Inside of a rod, aggregates pileup bases together. + * + * @return RDD with aggregated rods. + * + * @see AdamPileupRDDFunctions#adamAggregatePileups + */ + def adamAggregateRods(): RDD[ADAMRod] = { + val helper = new PileupAggregator + rdd.map(r => (r.position, r.pileups)) + .map(kv => (kv._1, helper.flatten(kv._2))) + .map(kv => new ADAMRod(kv._1, kv._2)) + } + + /** + * Returns the average coverage for all pileups. + * + * @note Coverage value does not include locus positions where no reads are mapped, as no rods exist for these positions. + * @note If running on an RDD with multiple samples where the rods have been split by sample, will return the average + * coverage per sample, _averaged_ over all samples. If the RDD contains multiple samples and the rods have _not_ been split, + * this will return the average coverage per sample, _summed_ over all samples. + * + * @return Average coverage across mapped loci. + */ + def adamRodCoverage(): Double = { + val totalBases: Long = rdd.map(_.pileups.length.toLong).reduce(_ + _) + + // coverage is the total count of bases, over the total number of loci + totalBases.toDouble / rdd.count.toDouble + } } class AdamVariantContextRDDFunctions(rdd: RDD[ADAMVariantContext]) extends Serializable { diff --git a/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/PileupAggregator.scala b/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/PileupAggregator.scala index f6848af4ad..871ec87e18 100644 --- a/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/PileupAggregator.scala +++ b/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/PileupAggregator.scala @@ -17,30 +17,157 @@ package edu.berkeley.cs.amplab.adam.rdd import edu.berkeley.cs.amplab.adam.avro.{Base, ADAMPileup} +import edu.berkeley.cs.amplab.adam.rdd.AdamContext._ import org.apache.spark.rdd.RDD import org.apache.spark.Logging -private[rdd] class PileupAggregator extends Serializable with Logging { +private[rdd] class PileupAggregator (validate: Boolean = false) extends Serializable with Logging { + /** + * Maps a pileup to uniquify by virtue of base, indel status, and sample. + * + * @param a Pileup to uniquify. + * @return Key uniquified by base, indel position, and sample. + */ def mapPileup(a: ADAMPileup): (Option[Base], Option[java.lang.Integer], Option[CharSequence]) = { (Option(a.getReadBase), Option(a.getRangeOffset), Option(a.getRecordGroupSample)) } - def aggregatePileup(pileupList: List[ADAMPileup]): ADAMPileup = { + /** + * Reduces down the data between several pileup bases. + * + * @param pileupList List of pileup bases to reduce. + * @return Single pileup base with bases reduced down. + * + * @note All bases are expected to be from the same sample, and to have the same base and indel location. + */ + protected def aggregatePileup(pileupList: List[ADAMPileup]): ADAMPileup = { def combineEvidence(pileupGroup: List[ADAMPileup]): ADAMPileup = { val pileup = pileupGroup.reduce((a: ADAMPileup, b: ADAMPileup) => { - a.setMapQuality(a.getMapQuality + b.getMapQuality) - a.setSangerQuality(a.getSangerQuality + b.getSangerQuality) - a.setCountAtPosition(a.getCountAtPosition + b.getCountAtPosition) - a.setNumSoftClipped(a.getNumSoftClipped + b.getNumSoftClipped) - a.setNumReverseStrand(a.getNumReverseStrand + b.getNumReverseStrand) - - a + if (validate) { + require(Option(a.getMapQuality).isDefined && + Option(a.getSangerQuality).isDefined && + Option(a.getCountAtPosition).isDefined && + Option(a.getNumSoftClipped).isDefined && + Option(a.getNumReverseStrand).isDefined && + Option(a.getReadName).isDefined && + Option(a.getReadStart).isDefined && + Option(a.getReadEnd).isDefined, + "Cannot aggregate pileup with required fields null: " + a) + + require(Option(b.getMapQuality).isDefined && + Option(b.getSangerQuality).isDefined && + Option(b.getCountAtPosition).isDefined && + Option(b.getNumSoftClipped).isDefined && + Option(b.getNumReverseStrand).isDefined && + Option(b.getReadName).isDefined && + Option(b.getReadStart).isDefined && + Option(b.getReadEnd).isDefined, + "Cannot aggregate pileup with required fields null: " + b) + } + + // set copied fields + val c = ADAMPileup.newBuilder() + .setReferenceName(a.getReferenceName) + .setReferenceId(a.getReferenceId) + .setPosition(a.getPosition) + .setRangeOffset(a.getRangeOffset) + .setRangeLength(a.getRangeLength) + .setReferenceBase(a.getReferenceBase) + .setReadBase(a.getReadBase) + + // merge read group fields + val rgsc = List(Option(a.getRecordGroupSequencingCenter), Option(b.getRecordGroupSequencingCenter)) + .flatMap((p: Option[CharSequence]) => p) + .distinct + if (rgsc.length != 0) { + c.setRecordGroupSequencingCenter(rgsc.reduce((a: CharSequence, b: CharSequence) => a + "," + b)) + } + + val rgd = List(Option(a.getRecordGroupDescription), Option(b.getRecordGroupDescription)) + .flatMap((p: Option[CharSequence]) => p) + .distinct + if (rgd.length != 0) { + c.setRecordGroupDescription(rgd.reduce((a: CharSequence, b: CharSequence) => a + "," + b)) + } + + val rgrde = List(Option(a.getRecordGroupRunDateEpoch), Option(b.getRecordGroupRunDateEpoch)) + .flatMap((p: Option[java.lang.Long]) => p) + .distinct + if (rgrde.length == 1) { + // cannot join multiple values together - behavior is not defined + c.setRecordGroupRunDateEpoch(rgrde.head) + } + + val rgfo = List(Option(a.getRecordGroupFlowOrder), Option(b.getRecordGroupFlowOrder)) + .flatMap((p: Option[CharSequence]) => p) + .distinct + if (rgfo.length != 0) { + c.setRecordGroupFlowOrder(rgfo.reduce((a: CharSequence, b: CharSequence) => a + "," + b)) + } + + val rgks = List(Option(a.getRecordGroupKeySequence), Option(b.getRecordGroupKeySequence)) + .flatMap((p: Option[CharSequence]) => p) + .distinct + if (rgks.length != 0) { + c.setRecordGroupKeySequence(rgks.reduce((a: CharSequence, b: CharSequence) => a + "," + b)) + } + + val rgl = List(Option(a.getRecordGroupLibrary), Option(b.getRecordGroupLibrary)) + .flatMap((p: Option[CharSequence]) => p) + .distinct + if (rgl.length != 0) { + c.setRecordGroupLibrary(rgl.reduce((a: CharSequence, b: CharSequence) => a + "," + b)) + } + + val rgpmis = List(Option(a.getRecordGroupPredictedMedianInsertSize), Option(b.getRecordGroupPredictedMedianInsertSize)) + .flatMap((p: Option[java.lang.Integer]) => p) + .distinct + if (rgpmis.length == 1) { + // cannot combine two values here - behavior is not defined + c.setRecordGroupPredictedMedianInsertSize(rgpmis.head) + } + + val rgp = List(Option(a.getRecordGroupPlatform), Option(b.getRecordGroupPlatform)) + .flatMap((p: Option[CharSequence]) => p) + .distinct + if (rgp.length != 0) { + c.setRecordGroupPlatform(rgp.reduce((a: CharSequence, b: CharSequence) => a + "," + b)) + } + + val rgpu = List(Option(a.getRecordGroupPlatformUnit), Option(b.getRecordGroupPlatformUnit)) + .flatMap((p: Option[CharSequence]) => p) + .distinct + if (rgpu.length != 0) { + c.setRecordGroupPlatformUnit(rgpu.reduce((a: CharSequence, b: CharSequence) => a + "," + b)) + } + + val rgs = List(Option(a.getRecordGroupSample), Option(b.getRecordGroupSample)) + .flatMap((p: Option[CharSequence]) => p) + .distinct + if (rgs.length != 0) { + c.setRecordGroupSample(rgs.reduce((a: CharSequence, b: CharSequence) => a + "," + b)) + } + + // set new fields + c.setMapQuality(a.getMapQuality * a.getCountAtPosition + + b.getMapQuality * b.getCountAtPosition) + .setSangerQuality(a.getSangerQuality * a.getCountAtPosition + + b.getSangerQuality * b.getCountAtPosition) + .setCountAtPosition(a.getCountAtPosition + b.getCountAtPosition) + .setNumSoftClipped(a.getNumSoftClipped + b.getNumSoftClipped) + .setNumReverseStrand(a.getNumReverseStrand + b.getNumReverseStrand) + .setReadName(a.getReadName + "," + b.getReadName) + .setReadStart(a.getReadStart.toLong min b.getReadStart.toLong) + .setReadEnd(a.getReadEnd.toLong max b.getReadEnd.toLong) + + c.build() }) val num = pileup.getCountAtPosition + // phred score is logarithmic so geometric mean is sum / count pileup.setMapQuality(pileup.getMapQuality / num) pileup.setSangerQuality(pileup.getSangerQuality / num) @@ -50,13 +177,27 @@ private[rdd] class PileupAggregator extends Serializable with Logging { combineEvidence(pileupList) } - def aggregate(pileups: RDD[ADAMPileup], coverage: Int = 30): RDD[ADAMPileup] = { - def flatten(kv: Seq[ADAMPileup]): List[ADAMPileup] = { - val splitUp = kv.toList.groupBy(mapPileup) - - splitUp.map(kv => aggregatePileup(kv._2)).toList - } + /** + * Groups pileups together and then aggregates their data together. + * + * @param kv Group of pileups to aggregate. + * @return Aggregated pileups. + */ + def flatten(kv: Seq[ADAMPileup]): List[ADAMPileup] = { + val splitUp = kv.toList.groupBy(mapPileup) + + splitUp.map(kv => aggregatePileup(kv._2)).toList + } + /** + * Performs aggregation across an RDD. + * + * @param pileups RDD of pileup bases to aggregate. + * @param coverage Parameter showing average coverage. Default is 30. Used to increase number of reducers. + * @return RDD of aggregated bases. + */ + def aggregate(pileups: RDD[ADAMPileup], coverage: Int = 30): RDD[ADAMPileup] = { + log.info ("Aggregating " + pileups.count + " pileups.") /* need to manually set partitions here, as this is a large reduction if there are long reads, or there is high coverage. diff --git a/adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/AdamContextSuite.scala b/adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/AdamContextSuite.scala index adb386496a..01625d227b 100644 --- a/adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/AdamContextSuite.scala +++ b/adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/AdamContextSuite.scala @@ -19,6 +19,7 @@ import parquet.filter.UnboundRecordFilter import edu.berkeley.cs.amplab.adam.avro.ADAMRecord import edu.berkeley.cs.amplab.adam.models.ADAMVariantContext import org.apache.spark.rdd.RDD +import edu.berkeley.cs.amplab.adam.avro.ADAMRecord import edu.berkeley.cs.amplab.adam.util.SparkFunSuite import edu.berkeley.cs.amplab.adam.rdd.AdamContext._ import edu.berkeley.cs.amplab.adam.util.PhredUtils._ @@ -50,4 +51,3 @@ class AdamContextSuite extends SparkFunSuite { } - diff --git a/adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctionsSuite.scala b/adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctionsSuite.scala new file mode 100644 index 0000000000..73bc1a3af5 --- /dev/null +++ b/adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/AdamRDDFunctionsSuite.scala @@ -0,0 +1,166 @@ +/** + * Copyright 2013 Genome Bridge LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.berkeley.cs.amplab.adam.rdd + +import org.apache.spark.rdd.RDD +import edu.berkeley.cs.amplab.adam.avro.{ADAMPileup, Base} +import edu.berkeley.cs.amplab.adam.util.SparkFunSuite +import edu.berkeley.cs.amplab.adam.rdd.AdamContext._ +import edu.berkeley.cs.amplab.adam.models.ADAMRod + +class AdamRDDFunctionsSuite extends SparkFunSuite { + + sparkTest("can convert pileups to rods, bases at different pos") { + val p0 = ADAMPileup.newBuilder() + .setPosition(0L) + .setReadBase(Base.A) + .build() + val p1 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.C) + .build() + + val pileups: RDD[ADAMPileup] = sc.parallelize(List(p0, p1)) + + val rods = pileups.adamPileupsToRods(1) + + assert(rods.count === 2) + assert(rods.filter(_.position == 0L).count === 1) + assert(rods.filter(_.position == 0L).flatMap(_.pileups).filter(_.getReadBase == Base.A).count === 1) + assert(rods.filter(_.position == 1L).count === 1) + assert(rods.filter(_.position == 1L).flatMap(_.pileups).filter(_.getReadBase == Base.C).count === 1) + } + + sparkTest("can convert pileups to rods, bases at same pos") { + val p0 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.A) + .build() + val p1 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.C) + .build() + + val pileups: RDD[ADAMPileup] = sc.parallelize(List(p0, p1)) + + val rods = pileups.adamPileupsToRods(1) + + assert(rods.count === 1) + assert(rods.filter(_.position == 1L).count === 1) + assert(rods.filter(_.position == 1L).flatMap(_.pileups).count === 2) + assert(rods.filter(_.position == 1L).flatMap(_.pileups).filter(_.getReadBase == Base.A).count === 1) + assert(rods.filter(_.position == 1L).flatMap(_.pileups).filter(_.getReadBase == Base.C).count === 1) + } + + sparkTest("can convert pileups to rods, bases at same pos, split by different sample") { + val p0 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.A) + .setRecordGroupSample("0") + .build() + val p1 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.C) + .setRecordGroupSample("1") + .build() + + val pileups: RDD[ADAMPileup] = sc.parallelize(List(p0, p1)) + + val rods = pileups.adamPileupsToRods(1) + + assert(rods.count === 1) + assert(rods.filter(_.position == 1L).count === 1) + assert(rods.filter(_.position == 1L).flatMap(_.pileups).count === 2) + assert(rods.filter(_.position == 1L).flatMap(_.pileups).filter(_.getReadBase == Base.A).count === 1) + assert(rods.filter(_.position == 1L).flatMap(_.pileups).filter(_.getReadBase == Base.C).count === 1) + assert(rods.filter(_.isSingleSample).count === 0) + + val split = rods.adamSplitRodsBySamples() + + assert(split.count === 2) + assert(split.filter(_.position == 1L).count === 2) + assert(split.filter(_.isSingleSample).count === 2) + } + + sparkTest("can convert pileups to rods, bases at same pos, split by same sample") { + val p0 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.A) + .setRecordGroupSample("1") + .build() + val p1 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.C) + .setRecordGroupSample("1") + .build() + + val pileups: RDD[ADAMPileup] = sc.parallelize(List(p0, p1)) + + val rods = pileups.adamPileupsToRods(1) + + assert(rods.count === 1) + assert(rods.filter(_.position == 1L).count === 1) + assert(rods.filter(_.position == 1L).flatMap(_.pileups).count === 2) + assert(rods.filter(_.position == 1L).flatMap(_.pileups).filter(_.getReadBase == Base.A).count === 1) + assert(rods.filter(_.position == 1L).flatMap(_.pileups).filter(_.getReadBase == Base.C).count === 1) + assert(rods.filter(_.isSingleSample).count === 1) + + val split = rods.adamSplitRodsBySamples() + + assert(split.count === 1) + assert(split.filter(_.isSingleSample).count === 1) + } + + + sparkTest("check coverage, bases at different pos") { + val p0 = ADAMPileup.newBuilder() + .setPosition(0L) + .setReadBase(Base.A) + .build() + val p1 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.C) + .build() + + val pileups: RDD[ADAMPileup] = sc.parallelize(List(p0, p1)) + + val coverage = pileups.adamPileupsToRods(1) + .adamRodCoverage() + + // floating point, so apply tolerance + assert(coverage > 0.99 && coverage < 1.01) + } + + sparkTest("check coverage, bases at same pos") { + val p0 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.A) + .build() + val p1 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.C) + .build() + + val pileups: RDD[ADAMPileup] = sc.parallelize(List(p0, p1)) + + val coverage = pileups.adamPileupsToRods(1) + .adamRodCoverage() + + // floating point, so apply tolerance + assert(coverage > 1.99 && coverage < 2.01) + } + +} diff --git a/adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/PileupAggregationSuite.scala b/adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/PileupAggregationSuite.scala new file mode 100644 index 0000000000..56b114fa78 --- /dev/null +++ b/adam-commands/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/PileupAggregationSuite.scala @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2013. Regents of the University of California + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edu.berkeley.cs.amplab.adam.rdd + +import org.scalatest.FunSuite +import edu.berkeley.cs.amplab.adam.avro.{ADAMPileup, ADAMRecord, Base} + +class PileupAggregationSuite extends FunSuite { + + test("aggregating a pileup with two different bases does not change values") { + val p0 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.A) + .setMapQuality(10) + .setSangerQuality(30) + .setCountAtPosition(1) + .setNumSoftClipped(0) + .setNumReverseStrand(0) + .build() + val p1 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.C) + .setMapQuality(20) + .setSangerQuality(40) + .setCountAtPosition(1) + .setNumSoftClipped(1) + .setNumReverseStrand(1) + .build() + + val pileups = List(p0, p1) + + val aggregator = new PileupAggregator + + val aggregated = aggregator.flatten(pileups) + + assert(aggregated.length === 2) + assert(aggregated.filter(_.getReadBase == Base.C).head === pileups.filter(_.getReadBase == Base.C).head) + assert(aggregated.filter(_.getReadBase == Base.A).head === pileups.filter(_.getReadBase == Base.A).head) + } + + test("aggregating a pileup with a single base type") { + val p0 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.A) + .setMapQuality(9) + .setSangerQuality(31) + .setCountAtPosition(1) + .setNumSoftClipped(0) + .setNumReverseStrand(0) + .setReadName("read0") + .setReadStart(0L) + .setReadEnd(1L) + .build() + val p1 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.A) + .setMapQuality(11) + .setSangerQuality(29) + .setCountAtPosition(1) + .setNumSoftClipped(1) + .setNumReverseStrand(1) + .setReadName("read1") + .setReadStart(1L) + .setReadEnd(2L) + .build() + + val pileups = List(p0, p1) + + val aggregator = new PileupAggregator + + val aggregated = aggregator.flatten(pileups) + + assert(aggregated.length === 1) + assert(aggregated.head.getPosition === 1L) + assert(aggregated.head.getReadBase === Base.A) + assert(aggregated.head.getSangerQuality === 30) + assert(aggregated.head.getMapQuality === 10) + assert(aggregated.head.getCountAtPosition === 2) + assert(aggregated.head.getNumSoftClipped === 1) + assert(aggregated.head.getNumReverseStrand === 1) + assert(aggregated.head.getReadName === "read0,read1") + assert(aggregated.head.getReadStart === 0L) + assert(aggregated.head.getReadEnd === 2L) + } + + test("aggregating a pileup with a single base type, multiple bases at a position") { + val p0 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.A) + .setMapQuality(8) + .setSangerQuality(32) + .setCountAtPosition(1) + .setNumSoftClipped(0) + .setNumReverseStrand(0) + .setReadName("read0") + .setReadStart(0L) + .setReadEnd(1L) + .build() + val p1 = ADAMPileup.newBuilder() + .setPosition(1L) + .setReadBase(Base.A) + .setMapQuality(11) + .setSangerQuality(29) + .setCountAtPosition(2) + .setNumSoftClipped(2) + .setNumReverseStrand(2) + .setReadName("read1") + .setReadStart(1L) + .setReadEnd(2L) + .build() + + val pileups = List(p0, p1) + + val aggregator = new PileupAggregator + + val aggregated = aggregator.flatten(pileups) + + assert(aggregated.length === 1) + assert(aggregated.head.getPosition === 1L) + assert(aggregated.head.getReadBase === Base.A) + assert(aggregated.head.getSangerQuality === 30) + assert(aggregated.head.getMapQuality === 10) + assert(aggregated.head.getCountAtPosition === 3) + assert(aggregated.head.getNumSoftClipped === 2) + assert(aggregated.head.getNumReverseStrand === 2) + assert(aggregated.head.getReadName === "read0,read1") + assert(aggregated.head.getReadStart === 0L) + assert(aggregated.head.getReadEnd === 2L) + } + +}