diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/normalization/LengthNormalization.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/normalization/LengthNormalization.scala deleted file mode 100644 index f3a9d9f74c..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/normalization/LengthNormalization.scala +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.rdd.normalization - -import org.apache.spark.Logging -import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.models.Interval - -object LengthNormalization extends Serializable with Logging { - - /** - * Normalizes an RDD that contains a double value and an interval by the width - * of the interval. - * - * @param rdd An RDD containing (a value to be normalized, an interval, and an - * additional data value), for normalization. - * @return Returns an RDD containing (the double normalized by the interval - * length, the original interval, the original data value) after normalization. - * - * @tparam T Datatype of additional value parameter to maintain. - * - * @see pkn - */ - def apply[I <: Interval, T](rdd: RDD[((Double, I), T)]): RDD[((Double, I), T)] = { - rdd.map(t => ((t._1._1 / t._1._2.width, t._1._2), t._2)) - } - - /** - * Normalizes an RDD that contains a double value and an interval by the width - * of the interval and the total aggregate value of all values. This is useful - * for calculating entities like reads/fragments per kilobase of transcript - * per million reads (RPKM/FPKM). - * - * @param rdd An RDD containing (a value to be normalized, an interval, and an - * additional data value), for normalization. - * @param n Global normalization factor. E.g., for RPKM, n = 1,000,000 (reads - * per kilobase transcript per _million_ reads). - * - * @return Returns an RDD containing (the double normalized by the interval - * length, the original interval, the original data value) after normalization. - * - * @tparam T Datatype of additional value parameter to maintain. - * - * @see apply - */ - def pkn[I <: Interval, T](rdd: RDD[((Double, I), T)], - k: Double = 1000.0, - n: Double = 1000000.0): RDD[((Double, I), T)] = { - val cachedRdd = rdd.cache - - // generate count - val norm = cachedRdd.map(kv => kv._1._1).reduce(_ + _) / n - - // normalize the RDD - val normalizedRdd = apply(cachedRdd).map(t => ((t._1._1 * norm * k, t._1._2), t._2)) - - // uncache - cachedRdd.unpersist() - - // return - normalizedRdd - } -} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/normalization/ZScoreNormalization.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/normalization/ZScoreNormalization.scala deleted file mode 100644 index a57ac7203b..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/normalization/ZScoreNormalization.scala +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.rdd.normalization - -import org.apache.spark.Logging -import org.apache.spark.rdd.RDD -import scala.math.sqrt - -object ZScoreNormalization extends Serializable with Logging { - - /** - * Normalizes an RDD of double values by computing the Z score for each value. - * Per point, the Z score (also known as standard score) is computed by - * subtracting the mean across all values from the point, and then dividing - * by the standard deviation across all points. - * - * @param rdd RDD of (Double, Value) pairs to be normalized. - * @returns Returns an RDD where the original double value has been replaced - * by the Z score for that point. - * - * @tparam T Type of data passed along. - */ - def apply[T](rdd: RDD[(Double, T)]): RDD[(Double, T)] = { - val cachedRdd = rdd.cache - - // compute mean and standard deviation - val n = cachedRdd.count - val mu = mean(cachedRdd.map(kv => kv._1), n) - val sigma = sqrt(variance(cachedRdd.map(kv => kv._1), n, mu)) - - // update keys - log.info("Normalizing by z-score with µ: " + mu + " and σ: " + sigma) - val update = cachedRdd.map(kv => ((kv._1 - mu) / sigma, kv._2)) - - // unpersist rdd - cachedRdd.unpersist() - - // return - update - } - - /** - * Computes the mean of a set of samples. - * - * @param rdd An RDD of doubles. - * @param n The number of samples in the RDD. - * @return Returns the mean of the RDD of doubles. - */ - private[normalization] def mean(rdd: RDD[Double], n: Long): Double = { - rdd.reduce(_ + _) / n.toDouble - } - - /** - * Computes the variance of a set of samples. - * - * @param rdd An RDD of doubles. - * @param n The number of samples in the RDD. - * @param mu The mean of all the samples in the RDD. - * @return Returns the mean of the RDD of doubles. - */ - private[normalization] def variance(rdd: RDD[Double], n: Long, mu: Double): Double = { - rdd.map(d => (d - mu) * (d - mu)).reduce(_ + _) / n.toDouble - } -} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/normalization/LengthNormalizationSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/normalization/LengthNormalizationSuite.scala deleted file mode 100644 index e186dde5a3..0000000000 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/normalization/LengthNormalizationSuite.scala +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.rdd.normalization - -import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.models.ReferenceRegion -import org.bdgenomics.adam.util.ADAMFunSuite -import scala.math.{ abs, sqrt } - -class LengthNormalizationSuite extends ADAMFunSuite { - def fpEquals(n1: Double, n2: Double, epsilon: Double = 1e-6): Boolean = { - abs(n1 - n2) < epsilon - } - - sparkTest("normalize a single targeted region") { - val rdd = sc.parallelize(Seq(((1000.0, ReferenceRegion("chr1", 0L, 1001L)), 1))) - - LengthNormalization(rdd) - .map(t => t._1._1) - .collect() - .foreach(fpEquals(_, 1.0)) - } - - sparkTest("normalize a set of targeted regions") { - val rddVals = sc.parallelize(Seq(1.0, 5.0, 3.0, 4.0, 2.0)) - - val rdd = rddVals.zip(sc.parallelize(Seq(1000.0, 500.0, 3215.0, 10000.0, 55000.0))) - .map(kv => ((kv._1 * kv._2, ReferenceRegion("", 0L, kv._2.toLong + 1)), 1)) - - LengthNormalization(rdd) - .map(t => t._1._1) - .zip(rddVals) - .collect() - .foreach(p => fpEquals(p._1, p._2)) - } - - sparkTest("calculate *pkm type normalization for a set of targeted regions") { - val rddVals = sc.parallelize(Seq(1.0, 5.0, 3.0, 4.0, 2.0)) - - val rdd = rddVals.map(_ * 100000.0) - .zip(sc.parallelize(Seq(1000.0, 500.0, 3215.0, 10000.0, 55000.0))) - .map(kv => ((kv._1 * kv._2, ReferenceRegion("", 0L, kv._2.toLong + 1)), 1)) - - LengthNormalization.pkn(rdd) - .map(t => t._1._1) - .zip(rddVals) - .collect() - .foreach(p => fpEquals(p._1, p._2)) - } -} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/normalization/ZScoreNormalizationSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/normalization/ZScoreNormalizationSuite.scala deleted file mode 100644 index 66b3b7d8cc..0000000000 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/normalization/ZScoreNormalizationSuite.scala +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.rdd.normalization - -import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.util.ADAMFunSuite -import scala.math.{ abs, sqrt } - -class ZScoreNormalizationSuite extends ADAMFunSuite { - def fpEquals(n1: Double, n2: Double, epsilon: Double = 1e-6): Boolean = { - abs(n1 - n2) < epsilon - } - - sparkTest("compute mean of a set of samples") { - val rdd = sc.parallelize(Seq(3.0, 4.0, 5.0, 4.0, 5.0, 3.0, 2.0, 6.0)) - - assert(fpEquals(4.0, ZScoreNormalization.mean(rdd, rdd.count))) - } - - sparkTest("compute variance of a set of samples") { - val rdd = sc.parallelize(Seq(3.0, 4.0, 5.0, 4.0, 5.0, 3.0, 2.0, 6.0)) - - val expected = (4.0 * 1.0 + 2.0 * 4.0) / 8.0 - - assert(fpEquals(expected, ZScoreNormalization.variance(rdd, rdd.count, 4.0))) - } - - sparkTest("variance should be 0 if all elements are the same") { - val rdd = sc.parallelize(Seq(3.0, 3.0, 3.0, 3.0, 3.0)) - - assert(fpEquals(0.0, ZScoreNormalization.variance(rdd, rdd.count, - ZScoreNormalization.mean(rdd, rdd.count)))) - } - - sparkTest("check z-score for a varying rdd") { - // this rdd contains a set of values whose square roots are equal to their z-score - // for this rdd, µ = 0.0, σ = 2.0 - val rdd = sc.parallelize(Seq(-2.0, 0.0, 0.0, 2.0)) - - val r = ZScoreNormalization(rdd.map(v => (v, 1))) - .map(kv => kv._1) - .zip(rdd) - .collect() - - r.foreach(p => { - val p2 = if (p._2 != 0.0) { - sqrt(abs(p._2)) * p._2 / abs(p._2) - } else { - 0.0 // need this, else we try to div by 0 - } - assert(fpEquals(p._1, p2)) - }) - } -}