Skip to content

Commit

Permalink
[ADAM-1462] Add quality score binner.
Browse files Browse the repository at this point in the history
Resolves #1462. Adds an API for binning quality scores to `AlignmentRecordRDD`
and `FragmentRDD`. Exposes this on the `Transform` CLI.
  • Loading branch information
fnothaft committed May 12, 2017
1 parent ea9ce6c commit a089670
Show file tree
Hide file tree
Showing 9 changed files with 564 additions and 4 deletions.
21 changes: 19 additions & 2 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.bdgenomics.adam.models.SnpTable
import org.bdgenomics.adam.projections.{ AlignmentRecordField, Filter }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, QualityScoreBin }
import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
Expand Down Expand Up @@ -117,6 +117,8 @@ class TransformArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
var mdTagsFragmentSize: Long = 1000000L
@Args4jOption(required = false, name = "-md_tag_overwrite", usage = "When adding MD tags to reads, overwrite existing incorrect tags.")
var mdTagsOverwrite: Boolean = false
@Args4jOption(required = false, name = "-bin_quality_scores", usage = "Rewrites quality scores of reads into bins from a string of bin descriptions, e.g. 0,20,10;20,40,30.")
var binQualityScores: String = null
@Args4jOption(required = false, name = "-cache", usage = "Cache data to avoid recomputing between stages.")
var cache: Boolean = false
@Args4jOption(required = false, name = "-storage_level", usage = "Set the storage level to use for caching.")
Expand All @@ -128,6 +130,18 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans

val stringency = ValidationStringency.valueOf(args.stringency)

/**
* @param rdd An RDD of reads.
* @return If the binQualityScores argument is set, rewrites the quality scores of the
* reads into bins. Else, returns the original RDD.
*/
private def maybeBin(rdd: AlignmentRecordRDD): AlignmentRecordRDD = {
Option(args.binQualityScores).fold(rdd)(binDescription => {
val bins = QualityScoreBin(binDescription)
rdd.binQualityScores(bins)
})
}

/**
* @param rdd An RDD of reads.
* @return If the repartition argument is set, repartitions the input dataset
Expand Down Expand Up @@ -354,8 +368,11 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
// first repartition if needed
val initialRdd = maybeRepartition(rdd)

// then bin, if desired
val binnedRdd = maybeBin(rdd)

// then, mark duplicates, if desired
val maybeDedupedRdd = maybeDedupe(initialRdd)
val maybeDedupedRdd = maybeDedupe(binnedRdd)

// once we've deduped our reads, maybe realign them
val maybeRealignedRdd = maybeRealign(sc, maybeDedupedRdd, sl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.bdgenomics.adam.cli

import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.ADAMFunSuite

class TransformSuite extends ADAMFunSuite {
Expand Down Expand Up @@ -55,4 +56,18 @@ class TransformSuite extends ADAMFunSuite {
Transform(Array("-single", "-sort_reads", "-sort_lexicographically", intermediateAdamPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

sparkTest("put quality scores into bins") {
val inputPath = copyResource("bqsr1.sam")
val finalPath = tmpFile("binned.adam")
Transform(Array(inputPath, finalPath, "-bin_quality_scores", "0,20,10;20,40,30;40,60,50")).run(sc)
val qualityScoreCounts = sc.loadAlignments(finalPath)
.rdd
.flatMap(_.getQual)
.map(s => s.toInt - 33)
.countByValue

assert(qualityScoreCounts(30) === 92899)
assert(qualityScoreCounts(10) === 7101)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import org.bdgenomics.adam.models.{
import org.bdgenomics.adam.rdd.{ AvroReadGroupGenomicRDD, JavaSaveArgs }
import org.bdgenomics.adam.rdd.read.{
AlignmentRecordRDD,
MarkDuplicates
BinQualities,
MarkDuplicates,
QualityScoreBin
}
import org.bdgenomics.adam.serialization.AvroSerializer
import org.bdgenomics.formats.avro._
Expand Down Expand Up @@ -157,6 +159,21 @@ case class FragmentRDD(rdd: RDD[Fragment],
saveAsParquet(new JavaSaveArgs(filePath))
}

/**
* Rewrites the quality scores of fragments to place all quality scores in bins.
*
* Quality score binning maps all quality scores to a limited number of
* discrete values, thus reducing the entropy of the quality score
* distribution, and reducing the amount of space that fragments consume on disk.
*
* @param bins The bins to use.
* @return Fragments whose quality scores are binned.
*/
def binQualityScores(bins: Seq[QualityScoreBin]): FragmentRDD = {
AlignmentRecordRDD.validateBins(bins)
BinQualities(this, bins)
}

/**
* Returns the regions that this fragment covers.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,34 @@ object AlignmentRecordRDD {
SequenceDictionary.empty,
RecordGroupDictionary.empty)
}

/**
* Validates that there are no gaps in a set of quality score bins.
*
* @param bins Bins to validate.
*
* @throws IllegalArgumentException Throws exception if the bins are empty,
* there is a gap between bins, or two bins overlap.
*/
private[rdd] def validateBins(bins: Seq[QualityScoreBin]) {
require(bins.nonEmpty, "Bins cannot be empty.")

// if we have multiple bins, validate them
// - check that we don't have gaps between bins
// - check that we don't have overlapping bins
if (bins.size > 1) {
val sortedBins = bins.sortBy(_.low)
(0 until (sortedBins.size - 1)).foreach(idx => {
if (sortedBins(idx).high < sortedBins(idx + 1).low) {
throw new IllegalArgumentException("Gap between bins %s and %s (all bins: %s).".format(
sortedBins(idx), sortedBins(idx + 1), bins.mkString(",")))
} else if (sortedBins(idx).high > sortedBins(idx + 1).low) {
throw new IllegalArgumentException("Bins %s and %s overlap (all bins: %s).".format(
sortedBins(idx), sortedBins(idx + 1), bins.mkString(",")))
}
})
}
}
}

case class AlignmentRecordRDD(
Expand Down Expand Up @@ -967,4 +995,19 @@ case class AlignmentRecordRDD(
// return
replaceRdd(finalRdd)
}

/**
* Rewrites the quality scores of reads to place all quality scores in bins.
*
* Quality score binning maps all quality scores to a limited number of
* discrete values, thus reducing the entropy of the quality score
* distribution, and reducing the amount of space that reads consume on disk.
*
* @param bins The bins to use.
* @return Reads whose quality scores are binned.
*/
def binQualityScores(bins: Seq[QualityScoreBin]): AlignmentRecordRDD = {
AlignmentRecordRDD.validateBins(bins)
BinQualities(this, bins)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.read

import org.bdgenomics.adam.rdd.fragment.FragmentRDD
import org.bdgenomics.formats.avro.{ AlignmentRecord, Fragment }
import scala.collection.JavaConversions._

/**
* Companion object for creating quality score bins.
*/
object QualityScoreBin {

/**
* Creates multiple bins from a string of bin descriptions.
*
* The string defining the bins should have each bin separated by a semicolon
* (";"). Each bin should be a comma separated list of three digits. The first
* digit is the low end of the bin, the second digit is the high end of the
* bin, and the third digit is the quality score to assign to bases in the
* bin. E.g., to define a bin that runs from 0 to 20 (low end inclusive, high
* end exclusive), and that assigns a quality score of 10, we'd write:
*
* 0,20,10
*
* To define this bin and another bin that runs from 20 to 40 and assigns a
* score of 30, we would write:
*
* 0,20,10;20,40,30
*
* @param binString A string defining the bins to create.
* @return Returns a Seq of bins.
*/
def apply(binString: String): Seq[QualityScoreBin] = {
binString.split(";")
.map(parseBin)
}

/**
* Creates a single bin.
*
* For more details, see the bin documentation in the apply method. A single
* bin is defined as a triple of integers: low,high,score.
*
* @param bin The single bin to parse.
* @return Returns an object representing this bin.
*/
private[read] def parseBin(bin: String): QualityScoreBin = {
val splitString = bin.split(",")
require(splitString.size == 3,
"Bin string (%s) did not contain exactly 3 elements.".format(bin))
try {
QualityScoreBin(splitString(0).toInt,
splitString(1).toInt,
splitString(2).toInt)
} catch {
case nfe: NumberFormatException => {
throw new IllegalArgumentException("All elements in bin description (%s) must be integers.".format(bin))
}
}
}
}

/**
* A bin to put quality scores in.
*
* @param low The lowest quality score in the bin.
* @param high The highest quality score in the bin.
* @param score The score to assign to all these bases.
*/
case class QualityScoreBin(low: Int,
high: Int,
score: Int) {

require(low >= 0, "Low phred score (%d) must be greater than 0.".format(low))
require(high > low, "High score (%d) must be greater than the low score (%d).".format(
high, low))
require(high < 255, "High score (%d) must be below 255.".format(high))

private val lowPhred = (low + 33).toChar
private val highPhred = (high + 33).toChar
private val scoreToEmit = (score + 33).toChar
require(score >= low && score < high,
"Score to emit (%d) must be between high and low scores (%d–%d).".format(
score, low, high))

private[rdd] def optGetBase(phred: Char): Option[Char] = {
if ((phred >= lowPhred) && (phred < highPhred)) {
Some(scoreToEmit)
} else {
None
}
}
}

private[rdd] object BinQualities extends Serializable {

/**
* Rewrites the quality scores attached to a read into bins.
*
* @param reads The reads to bin the quality scores of.
* @param bins The bins to place the quality scores in.
* @return Returns a new RDD of reads were the quality scores of the read
* bases have been binned.
*/
def apply(reads: AlignmentRecordRDD,
bins: Seq[QualityScoreBin]): AlignmentRecordRDD = {

reads.transform(rdd => {
rdd.map(binRead(_, bins))
})
}

/**
* Rewrites the quality scores attached to a fragment into bins.
*
* @param fragments The fragments to bin the quality scores of.
* @param bins The bins to place the quality scores in.
* @return Returns a new RDD of fragments were the quality scores of the fragment
* bases have been binned.
*/
def apply(fragments: FragmentRDD,
bins: Seq[QualityScoreBin]): FragmentRDD = {

fragments.transform(rdd => {
rdd.map(binFragment(_, bins))
})
}

/**
* Rewrites the quality scores of a single fragment.
*
* @param fragment The fragment whose quality scores should be rewritten.
* @param bins The bins to place the quality scores in.
* @return Returns a new fragment whose quality scores have been updated.
*/
private[read] def binFragment(fragment: Fragment,
bins: Seq[QualityScoreBin]): Fragment = {
val reads: Seq[AlignmentRecord] = fragment.getAlignments
val binnedReads = reads.map(binRead(_, bins))
Fragment.newBuilder(fragment)
.setAlignments(binnedReads)
.build
}

/**
* Rewrites the quality scores of a single read.
*
* @param read The read whose quality scores should be rewritten.
* @param bins The bins to place the quality scores in.
* @return Returns a new read whose quality scores have been updated.
*/
private[read] def binRead(read: AlignmentRecord,
bins: Seq[QualityScoreBin]): AlignmentRecord = {
if (read.getQual != null) {
AlignmentRecord.newBuilder(read)
.setQual(binQualities(read.getQual, bins))
.build
} else {
read
}
}

/**
* Rewrites a string of quality scores.
*
* @param read The read whose quality scores should be rewritten.
* @param bins The bins to place the quality scores in.
* @return Returns a new read whose quality scores have been updated.
*/
private[read] def binQualities(quals: String,
bins: Seq[QualityScoreBin]): String = {
quals.map(q => {
val scores = bins.flatMap(_.optGetBase(q))

if (scores.size == 1) {
scores.head
} else if (scores.size > 1) {
throw new IllegalStateException("Quality score (%s) fell into multiple bins (from bins %s)".format(q,
bins.mkString(",")))
} else {
throw new IllegalStateException("Quality score (%s) fell into no bins (from bins %s).".format(q,
bins.mkString(",")))
}
}).mkString
}
}
Loading

0 comments on commit a089670

Please sign in to comment.