Skip to content

Commit

Permalink
Merge pull request #311 from fnothaft/simple-normalizations
Browse files Browse the repository at this point in the history
Adding several simple normalizations.
  • Loading branch information
tdanford committed Jul 29, 2014
2 parents 58ec0b2 + 5d5e596 commit 70d81f8
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 1 deletion.
34 changes: 34 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/models/Interval.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.models

/**
* An interval is a region on a coordinate space that has a defined width. This
* can be used to express a region of a genome, a transcript, a gene, etc.
*/
trait Interval {

/**
* A width is the key property of an interval, which can represent a genomic
* region, a transcript, a gene, etc.
*
* @return The width of this interval.
*/
def width: Long

}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ object ReferenceRegion {
* which is <i>not</i> in the region -- i.e. [start, end) define a 0-based
* half-open interval.
*/
case class ReferenceRegion(referenceName: String, start: Long, end: Long) extends Ordered[ReferenceRegion] {
case class ReferenceRegion(referenceName: String, start: Long, end: Long) extends Ordered[ReferenceRegion] with Interval {

assert(start >= 0)
assert(end >= start)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.normalization

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.Interval

object LengthNormalization extends Serializable with Logging {

/**
* Normalizes an RDD that contains a double value and an interval by the width
* of the interval.
*
* @param rdd An RDD containing (a value to be normalized, an interval, and an
* additional data value), for normalization.
* @return Returns an RDD containing (the double normalized by the interval
* length, the original interval, the original data value) after normalization.
*
* @tparam T Datatype of additional value parameter to maintain.
*
* @see pkn
*/
def apply[I <: Interval, T](rdd: RDD[((Double, I), T)]): RDD[((Double, I), T)] = {
rdd.map(t => ((t._1._1 / t._1._2.width, t._1._2), t._2))
}

/**
* Normalizes an RDD that contains a double value and an interval by the width
* of the interval and the total aggregate value of all values. This is useful
* for calculating entities like reads/fragments per kilobase of transcript
* per million reads (RPKM/FPKM).
*
* @param rdd An RDD containing (a value to be normalized, an interval, and an
* additional data value), for normalization.
* @param n Global normalization factor. E.g., for RPKM, n = 1,000,000 (reads
* per kilobase transcript per _million_ reads).
*
* @return Returns an RDD containing (the double normalized by the interval
* length, the original interval, the original data value) after normalization.
*
* @tparam T Datatype of additional value parameter to maintain.
*
* @see apply
*/
def pkn[I <: Interval, T](rdd: RDD[((Double, I), T)],
k: Double = 1000.0,
n: Double = 1000000.0): RDD[((Double, I), T)] = {
val cachedRdd = rdd.cache

// generate count
val norm = cachedRdd.map(kv => kv._1._1).reduce(_ + _) / n

// normalize the RDD
val normalizedRdd = apply(cachedRdd).map(t => ((t._1._1 * norm * k, t._1._2), t._2))

// uncache
cachedRdd.unpersist()

// return
normalizedRdd
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.normalization

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import scala.math.sqrt

object ZScoreNormalization extends Serializable with Logging {

/**
* Normalizes an RDD of double values by computing the Z score for each value.
* Per point, the Z score (also known as standard score) is computed by
* subtracting the mean across all values from the point, and then dividing
* by the standard deviation across all points.
*
* @param rdd RDD of (Double, Value) pairs to be normalized.
* @returns Returns an RDD where the original double value has been replaced
* by the Z score for that point.
*
* @tparam T Type of data passed along.
*/
def apply[T](rdd: RDD[(Double, T)]): RDD[(Double, T)] = {
val cachedRdd = rdd.cache

// compute mean and standard deviation
val n = cachedRdd.count
val mu = mean(cachedRdd.map(kv => kv._1), n)
val sigma = sqrt(variance(cachedRdd.map(kv => kv._1), n, mu))

// update keys
log.info("Normalizing by z-score with µ: " + mu + " and σ: " + sigma)
val update = cachedRdd.map(kv => ((kv._1 - mu) / sigma, kv._2))

// unpersist rdd
cachedRdd.unpersist()

// return
update
}

/**
* Computes the mean of a set of samples.
*
* @param rdd An RDD of doubles.
* @param n The number of samples in the RDD.
* @return Returns the mean of the RDD of doubles.
*/
private[normalization] def mean(rdd: RDD[Double], n: Long): Double = {
rdd.reduce(_ + _) / n.toDouble
}

/**
* Computes the variance of a set of samples.
*
* @param rdd An RDD of doubles.
* @param n The number of samples in the RDD.
* @param mu The mean of all the samples in the RDD.
* @return Returns the mean of the RDD of doubles.
*/
private[normalization] def variance(rdd: RDD[Double], n: Long, mu: Double): Double = {
rdd.map(d => (d - mu) * (d - mu)).reduce(_ + _) / n.toDouble
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.normalization

import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.ReferenceRegion
import org.bdgenomics.adam.util.SparkFunSuite
import scala.math.{ abs, sqrt }

class LengthNormalizationSuite extends SparkFunSuite {
def fpEquals(n1: Double, n2: Double, epsilon: Double = 1e-6): Boolean = {
abs(n1 - n2) < epsilon
}

sparkTest("normalize a single targeted region") {
val rdd = sc.parallelize(Seq(((1000.0, ReferenceRegion("chr1", 0L, 1001L)), 1)))

LengthNormalization(rdd)
.map(t => t._1._1)
.collect()
.foreach(fpEquals(_, 1.0))
}

sparkTest("normalize a set of targeted regions") {
val rddVals = sc.parallelize(Seq(1.0, 5.0, 3.0, 4.0, 2.0))

val rdd = rddVals.zip(sc.parallelize(Seq(1000.0, 500.0, 3215.0, 10000.0, 55000.0)))
.map(kv => ((kv._1 * kv._2, ReferenceRegion("", 0L, kv._2.toLong + 1)), 1))

LengthNormalization(rdd)
.map(t => t._1._1)
.zip(rddVals)
.collect()
.foreach(p => fpEquals(p._1, p._2))
}

sparkTest("calculate *pkm type normalization for a set of targeted regions") {
val rddVals = sc.parallelize(Seq(1.0, 5.0, 3.0, 4.0, 2.0))

val rdd = rddVals.map(_ * 100000.0)
.zip(sc.parallelize(Seq(1000.0, 500.0, 3215.0, 10000.0, 55000.0)))
.map(kv => ((kv._1 * kv._2, ReferenceRegion("", 0L, kv._2.toLong + 1)), 1))

LengthNormalization.pkn(rdd)
.map(t => t._1._1)
.zip(rddVals)
.collect()
.foreach(p => fpEquals(p._1, p._2))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.normalization

import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.util.SparkFunSuite
import scala.math.{ abs, sqrt }

class ZScoreNormalizationSuite extends SparkFunSuite {
def fpEquals(n1: Double, n2: Double, epsilon: Double = 1e-6): Boolean = {
abs(n1 - n2) < epsilon
}

sparkTest("compute mean of a set of samples") {
val rdd = sc.parallelize(Seq(3.0, 4.0, 5.0, 4.0, 5.0, 3.0, 2.0, 6.0))

assert(fpEquals(4.0, ZScoreNormalization.mean(rdd, rdd.count)))
}

sparkTest("compute variance of a set of samples") {
val rdd = sc.parallelize(Seq(3.0, 4.0, 5.0, 4.0, 5.0, 3.0, 2.0, 6.0))

val expected = (4.0 * 1.0 + 2.0 * 4.0) / 8.0

assert(fpEquals(expected, ZScoreNormalization.variance(rdd, rdd.count, 4.0)))
}

sparkTest("variance should be 0 if all elements are the same") {
val rdd = sc.parallelize(Seq(3.0, 3.0, 3.0, 3.0, 3.0))

assert(fpEquals(0.0, ZScoreNormalization.variance(rdd, rdd.count,
ZScoreNormalization.mean(rdd, rdd.count))))
}

sparkTest("check z-score for a varying rdd") {
// this rdd contains a set of values whose square roots are equal to their z-score
// for this rdd, µ = 0.0, σ = 2.0
val rdd = sc.parallelize(Seq(-2.0, 0.0, 0.0, 2.0))

val r = ZScoreNormalization(rdd.map(v => (v, 1)))
.map(kv => kv._1)
.zip(rdd)
.collect()

r.foreach(p => {
val p2 = if (p._2 != 0.0) {
sqrt(abs(p._2)) * p._2 / abs(p._2)
} else {
0.0 // need this, else we try to div by 0
}
assert(fpEquals(p._1, p2))
})
}
}

0 comments on commit 70d81f8

Please sign in to comment.