Skip to content

Commit

Permalink
Adding rod functionality: a specialized grouping of pileup data.
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft committed Dec 9, 2013
1 parent 5a943b9 commit 40acc56
Show file tree
Hide file tree
Showing 8 changed files with 597 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ class PileupAggregator(protected val args: PileupAggregatorArgs)
pileups.adamAggregatePileups().adamSave(args.pileupOutput, args)
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2013. Regents of the University of California
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.berkeley.cs.amplab.adam.models

import edu.berkeley.cs.amplab.adam.avro.ADAMPileup
import org.apache.spark.rdd.RDD

/**
* Class representing a set of pileup bases at a specific locus.
*
* @param position Position on the reference genome.
* @param pileups A list representing the bases at this locus.
*/
case class ADAMRod (position: Long, pileups: List[ADAMPileup]) {
// all bases must be at the same position
require(pileups.forall(_.getPosition.toLong == position))

lazy val isSingleSample: Boolean = pileups.map(_.getRecordGroupSample).distinct.length == 1

/**
* Splits this rod out by samples.
*
* @return A list of rods, each corresponding to a single sample.
*/
def splitBySamples (): List[ADAMRod] = {
if(isSingleSample) {
List(new ADAMRod(position, pileups))
} else {
pileups.groupBy(_.getRecordGroupSample).values.toList.map(pg => new ADAMRod(position, pg))
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import edu.berkeley.cs.amplab.adam.commands.{SAMRecordConverter, VariantContextC
import org.apache.spark.rdd.RDD
import org.apache.spark.{Logging, SparkContext}
import scala.collection.JavaConversions._
import edu.berkeley.cs.amplab.adam.models.{SequenceRecord, SequenceDictionary}
import edu.berkeley.cs.amplab.adam.models.{SequenceRecord, SequenceDictionary, ADAMRod}
import org.apache.hadoop.fs.Path
import fi.tkk.ics.hadoop.bam.util.SAMHeaderReader
import edu.berkeley.cs.amplab.adam.projections.{ADAMRecordField, Projection}
Expand All @@ -54,6 +54,9 @@ object AdamContext {
// Add methods specific to ADAMVariantContext RDDs
implicit def rddToAdamVariantContextRDD(rdd: RDD[ADAMVariantContext]) = new AdamVariantContextRDDFunctions(rdd)

// Add methods specific to the ADAMRod RDDs
implicit def rddToAdamRodRDD(rdd: RDD[ADAMRod]) = new AdamRodRDDFunctions(rdd)

// Add generic RDD methods for all types of ADAM RDDs
implicit def rddToAdamRDD[T <% SpecificRecord : Manifest](rdd: RDD[T]) = new AdamRDDFunctions(rdd)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import parquet.hadoop.util.ContextUtil
import org.apache.avro.specific.SpecificRecord
import edu.berkeley.cs.amplab.adam.avro.{ADAMPileup, ADAMRecord, ADAMVariant, ADAMGenotype, ADAMVariantDomain}
import edu.berkeley.cs.amplab.adam.commands.ParquetArgs
import edu.berkeley.cs.amplab.adam.models.{SequenceRecord, SequenceDictionary, SingleReadBucket, ReferencePosition}
import edu.berkeley.cs.amplab.adam.models.{SequenceRecord, SequenceDictionary, SingleReadBucket, ReferencePosition, ADAMRod}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.Logging
Expand Down Expand Up @@ -144,13 +144,89 @@ class AdamRecordRDDFunctions(rdd: RDD[ADAMRecord]) extends Serializable with Log
}
}

class AdamPileupRDDFunctions(rdd: RDD[ADAMPileup]) extends Serializable {
class AdamPileupRDDFunctions(rdd: RDD[ADAMPileup]) extends Serializable with Logging {
initLogging()

/**
* Aggregates pileup bases together.
*
* @param coverage Coverage value is used to increase number of reducer operators.
* @return RDD with aggregated bases.
*
* @see AdamRodRDDFunctions#adamAggregateRods
*/
def adamAggregatePileups(coverage: Int = 30): RDD[ADAMPileup] = {
val helper = new PileupAggregator
helper.aggregate(rdd, coverage)
}

/**
* Converts ungrouped pileup bases into reference grouped bases.
*
* @param coverage Coverage value is used to increase number of reducer operators.
* @return RDD with rods grouped by reference position.
*/
def adamPileupsToRods(coverage: Int = 30): RDD[ADAMRod] = {
val groups = rdd.groupBy((p: ADAMPileup) => p.getPosition, coverage)

groups.map(kv => ADAMRod(kv._1, kv._2.toList))
}

}

class AdamRodRDDFunctions(rdd: RDD[ADAMRod]) extends Serializable with Logging {
initLogging()

/**
* Given an RDD of rods, splits the rods up by the specific sample they correspond to.
* Returns a flat RDD.
*
* @return Rods split up by samples and _not_ grouped together.
*/
def adamSplitRodsBySamples(): RDD[ADAMRod] = {
rdd.flatMap(_.splitBySamples)
}

/**
* Given an RDD of rods, splits the rods up by the specific sample they correspond to.
* Returns an RDD where the samples are grouped by the reference position.
*
* @return Rods split up by samples and grouped together by position.
*/
def adamDivideRodsBySamples(): RDD[(Long, List[ADAMRod])] = {
rdd.keyBy(_.position).map(r => (r._1, r._2.splitBySamples))
}

/**
* Inside of a rod, aggregates pileup bases together.
*
* @return RDD with aggregated rods.
*
* @see AdamPileupRDDFunctions#adamAggregatePileups
*/
def adamAggregateRods(): RDD[ADAMRod] = {
val helper = new PileupAggregator
rdd.map(r => (r.position, r.pileups))
.map(kv => (kv._1, helper.flatten(kv._2)))
.map(kv => new ADAMRod(kv._1, kv._2))
}

/**
* Returns the average coverage for all pileups.
*
* @note Coverage value does not include locus positions where no reads are mapped, as no rods exist for these positions.
* @note If running on an RDD with multiple samples where the rods have been split by sample, will return the average
* coverage per sample, _averaged_ over all samples. If the RDD contains multiple samples and the rods have _not_ been split,
* this will return the average coverage per sample, _summed_ over all samples.
*
* @return Average coverage across mapped loci.
*/
def adamRodCoverage(): Double = {
val totalBases: Long = rdd.map(_.pileups.length.toLong).reduce(_ + _)

// coverage is the total count of bases, over the total number of loci
totalBases.toDouble / rdd.count.toDouble
}
}

class AdamVariantContextRDDFunctions(rdd: RDD[ADAMVariantContext]) extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,157 @@
package edu.berkeley.cs.amplab.adam.rdd

import edu.berkeley.cs.amplab.adam.avro.{Base, ADAMPileup}
import edu.berkeley.cs.amplab.adam.rdd.AdamContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.Logging

private[rdd] class PileupAggregator extends Serializable with Logging {
private[rdd] class PileupAggregator (validate: Boolean = false) extends Serializable with Logging {

/**
* Maps a pileup to uniquify by virtue of base, indel status, and sample.
*
* @param a Pileup to uniquify.
* @return Key uniquified by base, indel position, and sample.
*/
def mapPileup(a: ADAMPileup): (Option[Base], Option[java.lang.Integer], Option[CharSequence]) = {
(Option(a.getReadBase), Option(a.getRangeOffset), Option(a.getRecordGroupSample))
}

def aggregatePileup(pileupList: List[ADAMPileup]): ADAMPileup = {
/**
* Reduces down the data between several pileup bases.
*
* @param pileupList List of pileup bases to reduce.
* @return Single pileup base with bases reduced down.
*
* @note All bases are expected to be from the same sample, and to have the same base and indel location.
*/
protected def aggregatePileup(pileupList: List[ADAMPileup]): ADAMPileup = {

def combineEvidence(pileupGroup: List[ADAMPileup]): ADAMPileup = {
val pileup = pileupGroup.reduce((a: ADAMPileup, b: ADAMPileup) => {
a.setMapQuality(a.getMapQuality + b.getMapQuality)
a.setSangerQuality(a.getSangerQuality + b.getSangerQuality)
a.setCountAtPosition(a.getCountAtPosition + b.getCountAtPosition)
a.setNumSoftClipped(a.getNumSoftClipped + b.getNumSoftClipped)
a.setNumReverseStrand(a.getNumReverseStrand + b.getNumReverseStrand)

a
if (validate) {
require(Option(a.getMapQuality).isDefined &&
Option(a.getSangerQuality).isDefined &&
Option(a.getCountAtPosition).isDefined &&
Option(a.getNumSoftClipped).isDefined &&
Option(a.getNumReverseStrand).isDefined &&
Option(a.getReadName).isDefined &&
Option(a.getReadStart).isDefined &&
Option(a.getReadEnd).isDefined,
"Cannot aggregate pileup with required fields null: " + a)

require(Option(b.getMapQuality).isDefined &&
Option(b.getSangerQuality).isDefined &&
Option(b.getCountAtPosition).isDefined &&
Option(b.getNumSoftClipped).isDefined &&
Option(b.getNumReverseStrand).isDefined &&
Option(b.getReadName).isDefined &&
Option(b.getReadStart).isDefined &&
Option(b.getReadEnd).isDefined,
"Cannot aggregate pileup with required fields null: " + b)
}

// set copied fields
val c = ADAMPileup.newBuilder()
.setReferenceName(a.getReferenceName)
.setReferenceId(a.getReferenceId)
.setPosition(a.getPosition)
.setRangeOffset(a.getRangeOffset)
.setRangeLength(a.getRangeLength)
.setReferenceBase(a.getReferenceBase)
.setReadBase(a.getReadBase)

// merge read group fields
val rgsc = List(Option(a.getRecordGroupSequencingCenter), Option(b.getRecordGroupSequencingCenter))
.flatMap((p: Option[CharSequence]) => p)
.distinct
if (rgsc.length != 0) {
c.setRecordGroupSequencingCenter(rgsc.reduce((a: CharSequence, b: CharSequence) => a + "," + b))
}

val rgd = List(Option(a.getRecordGroupDescription), Option(b.getRecordGroupDescription))
.flatMap((p: Option[CharSequence]) => p)
.distinct
if (rgd.length != 0) {
c.setRecordGroupDescription(rgd.reduce((a: CharSequence, b: CharSequence) => a + "," + b))
}

val rgrde = List(Option(a.getRecordGroupRunDateEpoch), Option(b.getRecordGroupRunDateEpoch))
.flatMap((p: Option[java.lang.Long]) => p)
.distinct
if (rgrde.length == 1) {
// cannot join multiple values together - behavior is not defined
c.setRecordGroupRunDateEpoch(rgrde.head)
}

val rgfo = List(Option(a.getRecordGroupFlowOrder), Option(b.getRecordGroupFlowOrder))
.flatMap((p: Option[CharSequence]) => p)
.distinct
if (rgfo.length != 0) {
c.setRecordGroupFlowOrder(rgfo.reduce((a: CharSequence, b: CharSequence) => a + "," + b))
}

val rgks = List(Option(a.getRecordGroupKeySequence), Option(b.getRecordGroupKeySequence))
.flatMap((p: Option[CharSequence]) => p)
.distinct
if (rgks.length != 0) {
c.setRecordGroupKeySequence(rgks.reduce((a: CharSequence, b: CharSequence) => a + "," + b))
}

val rgl = List(Option(a.getRecordGroupLibrary), Option(b.getRecordGroupLibrary))
.flatMap((p: Option[CharSequence]) => p)
.distinct
if (rgl.length != 0) {
c.setRecordGroupLibrary(rgl.reduce((a: CharSequence, b: CharSequence) => a + "," + b))
}

val rgpmis = List(Option(a.getRecordGroupPredictedMedianInsertSize), Option(b.getRecordGroupPredictedMedianInsertSize))
.flatMap((p: Option[java.lang.Integer]) => p)
.distinct
if (rgpmis.length == 1) {
// cannot combine two values here - behavior is not defined
c.setRecordGroupPredictedMedianInsertSize(rgpmis.head)
}

val rgp = List(Option(a.getRecordGroupPlatform), Option(b.getRecordGroupPlatform))
.flatMap((p: Option[CharSequence]) => p)
.distinct
if (rgp.length != 0) {
c.setRecordGroupPlatform(rgp.reduce((a: CharSequence, b: CharSequence) => a + "," + b))
}

val rgpu = List(Option(a.getRecordGroupPlatformUnit), Option(b.getRecordGroupPlatformUnit))
.flatMap((p: Option[CharSequence]) => p)
.distinct
if (rgpu.length != 0) {
c.setRecordGroupPlatformUnit(rgpu.reduce((a: CharSequence, b: CharSequence) => a + "," + b))
}

val rgs = List(Option(a.getRecordGroupSample), Option(b.getRecordGroupSample))
.flatMap((p: Option[CharSequence]) => p)
.distinct
if (rgs.length != 0) {
c.setRecordGroupSample(rgs.reduce((a: CharSequence, b: CharSequence) => a + "," + b))
}

// set new fields
c.setMapQuality(a.getMapQuality * a.getCountAtPosition +
b.getMapQuality * b.getCountAtPosition)
.setSangerQuality(a.getSangerQuality * a.getCountAtPosition +
b.getSangerQuality * b.getCountAtPosition)
.setCountAtPosition(a.getCountAtPosition + b.getCountAtPosition)
.setNumSoftClipped(a.getNumSoftClipped + b.getNumSoftClipped)
.setNumReverseStrand(a.getNumReverseStrand + b.getNumReverseStrand)
.setReadName(a.getReadName + "," + b.getReadName)
.setReadStart(a.getReadStart.toLong min b.getReadStart.toLong)
.setReadEnd(a.getReadEnd.toLong max b.getReadEnd.toLong)

c.build()
})

val num = pileup.getCountAtPosition

// phred score is logarithmic so geometric mean is sum / count
pileup.setMapQuality(pileup.getMapQuality / num)
pileup.setSangerQuality(pileup.getSangerQuality / num)

Expand All @@ -50,13 +177,27 @@ private[rdd] class PileupAggregator extends Serializable with Logging {
combineEvidence(pileupList)
}

def aggregate(pileups: RDD[ADAMPileup], coverage: Int = 30): RDD[ADAMPileup] = {
def flatten(kv: Seq[ADAMPileup]): List[ADAMPileup] = {
val splitUp = kv.toList.groupBy(mapPileup)

splitUp.map(kv => aggregatePileup(kv._2)).toList
}
/**
* Groups pileups together and then aggregates their data together.
*
* @param kv Group of pileups to aggregate.
* @return Aggregated pileups.
*/
def flatten(kv: Seq[ADAMPileup]): List[ADAMPileup] = {
val splitUp = kv.toList.groupBy(mapPileup)

splitUp.map(kv => aggregatePileup(kv._2)).toList
}

/**
* Performs aggregation across an RDD.
*
* @param pileups RDD of pileup bases to aggregate.
* @param coverage Parameter showing average coverage. Default is 30. Used to increase number of reducers.
* @return RDD of aggregated bases.
*/
def aggregate(pileups: RDD[ADAMPileup], coverage: Int = 30): RDD[ADAMPileup] = {

log.info ("Aggregating " + pileups.count + " pileups.")

/* need to manually set partitions here, as this is a large reduction if there are long reads, or there is high coverage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import parquet.filter.UnboundRecordFilter
import edu.berkeley.cs.amplab.adam.avro.ADAMRecord
import edu.berkeley.cs.amplab.adam.models.ADAMVariantContext
import org.apache.spark.rdd.RDD
import edu.berkeley.cs.amplab.adam.avro.ADAMRecord
import edu.berkeley.cs.amplab.adam.util.SparkFunSuite
import edu.berkeley.cs.amplab.adam.rdd.AdamContext._
import edu.berkeley.cs.amplab.adam.util.PhredUtils._
Expand Down Expand Up @@ -50,4 +51,3 @@ class AdamContextSuite extends SparkFunSuite {

}


Loading

0 comments on commit 40acc56

Please sign in to comment.