From 823cfc3d4160be8280c98125c0c3cf2e47641379 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Mon, 16 Mar 2015 13:26:43 -0700 Subject: [PATCH] [ADAM-600] Adding RegionJoin trait. --- .../bdgenomics/adam/cli/CalculateDepth.scala | 2 +- .../adam/rdd/BroadcastRegionJoin.scala | 15 ++++--- .../org/bdgenomics/adam/rdd/RegionJoin.scala | 44 +++++++++++++++++++ .../adam/rdd/ShuffleRegionJoin.scala | 27 ++++++++++-- .../adam/rdd/BroadcastRegionJoinSuite.scala | 7 --- .../adam/rdd/ShuffleRegionJoinSuite.scala | 5 --- 6 files changed, 76 insertions(+), 24 deletions(-) create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rdd/RegionJoin.scala diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CalculateDepth.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CalculateDepth.scala index 9a7feb64e2..ab9bbf922d 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CalculateDepth.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CalculateDepth.scala @@ -90,7 +90,7 @@ class CalculateDepth(protected val args: CalculateDepthArgs) extends ADAMSparkCo val joinedRDD: RDD[(ReferenceRegion, AlignmentRecord)] = if (args.cartesian) BroadcastRegionJoin.cartesianFilter(variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_).get)) - else BroadcastRegionJoin.partitionAndJoin(sc, variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_).get)) + else BroadcastRegionJoin.partitionAndJoin(variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_).get)) val depths: RDD[(ReferenceRegion, Int)] = joinedRDD.map { case (region, record) => (region, 1) }.reduceByKey(_ + _).sortByKey() diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoin.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoin.scala index 23bc27e0a4..e41c4c77c9 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoin.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoin.scala @@ -31,7 +31,7 @@ import scala.reflect.ClassTag * Different implementations will have different performance characteristics -- and new implementations * will likely be added in the future, see the notes to each individual method for more details. */ -object BroadcastRegionJoin { +object BroadcastRegionJoin extends RegionJoin { /** * Performs a region join between two RDDs (broadcast join). @@ -62,11 +62,12 @@ object BroadcastRegionJoin { * @return An RDD of pairs (x, y), where x is from baseRDD, y is from joinedRDD, and the region * corresponding to x overlaps the region corresponding to y. */ - def partitionAndJoin[T, U](sc: SparkContext, - baseRDD: RDD[(ReferenceRegion, T)], + def partitionAndJoin[T, U](baseRDD: RDD[(ReferenceRegion, T)], joinedRDD: RDD[(ReferenceRegion, U)])(implicit tManifest: ClassTag[T], uManifest: ClassTag[U]): RDD[(T, U)] = { + val sc = baseRDD.context + /** * Original Join Design: * @@ -165,7 +166,7 @@ object BroadcastRegionJoin { * * @param regions The input-set of regions. */ -class NonoverlappingRegions(regions: Iterable[ReferenceRegion]) extends Serializable { +private[rdd] class NonoverlappingRegions(regions: Iterable[ReferenceRegion]) extends Serializable { assert(regions != null, "regions parameter cannot be null") @@ -299,7 +300,7 @@ class NonoverlappingRegions(regions: Iterable[ReferenceRegion]) extends Serializ "%s:%d-%d (%s)".format(referenceName, endpoints.head, endpoints.last, endpoints.mkString(",")) } -object NonoverlappingRegions { +private[rdd] object NonoverlappingRegions { def apply[T](values: Seq[(ReferenceRegion, T)]) = new NonoverlappingRegions(values.map(_._1)) @@ -322,7 +323,7 @@ object NonoverlappingRegions { * be valid reference names with respect to the sequence * dictionary. */ -class MultiContigNonoverlappingRegions( +private[rdd] class MultiContigNonoverlappingRegions( regions: Seq[(String, Iterable[ReferenceRegion])]) extends Serializable { assert(regions != null, @@ -338,7 +339,7 @@ class MultiContigNonoverlappingRegions( regionMap.get(value._1.referenceName).fold(false)(_.hasRegionsFor(value)) } -object MultiContigNonoverlappingRegions { +private[rdd] object MultiContigNonoverlappingRegions { def apply[T](values: Seq[(ReferenceRegion, T)]): MultiContigNonoverlappingRegions = { new MultiContigNonoverlappingRegions( values.map(kv => (kv._1.referenceName, kv._1)) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/RegionJoin.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/RegionJoin.scala new file mode 100644 index 0000000000..930959a909 --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/RegionJoin.scala @@ -0,0 +1,44 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd + +import org.bdgenomics.adam.models.{ SequenceDictionary, ReferenceRegion } +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ +import scala.Predef._ +import org.apache.spark.SparkContext +import scala.reflect.ClassTag + +trait RegionJoin { + /** + * Performs a region join between two RDDs. + * + * @param sc A SparkContext for the cluster that will perform the join + * @param baseRDD The 'left' side of the join + * @param joinedRDD The 'right' side of the join + * @param tManifest implicit type of baseRDD + * @param uManifest implicit type of joinedRDD + * @tparam T type of baseRDD + * @tparam U type of joinedRDD + * @return An RDD of pairs (x, y), where x is from baseRDD, y is from joinedRDD, and the region + * corresponding to x overlaps the region corresponding to y. + */ + def partitionAndJoin[T, U](baseRDD: RDD[(ReferenceRegion, T)], + joinedRDD: RDD[(ReferenceRegion, U)])(implicit tManifest: ClassTag[T], + uManifest: ClassTag[U]): RDD[(T, U)] +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoin.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoin.scala index 35c4a8fe2c..f7ca6a3688 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoin.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoin.scala @@ -21,12 +21,30 @@ package org.bdgenomics.adam.rdd import org.apache.spark.{ Logging, Partitioner, SparkContext } import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.models.{ SequenceDictionary, ReferenceRegion } +import org.bdgenomics.adam.models.{ SequenceDictionary, SequenceRecord, ReferenceRegion } +import org.bdgenomics.adam.rdd.ADAMContext._ import scala.collection.mutable.ListBuffer import scala.math._ import scala.reflect.ClassTag -object ShuffleRegionJoin { +object ShuffleRegionJoin extends RegionJoin { + + private var sd = new SequenceDictionary(Vector[SequenceRecord]()) + + def setSequenceDictionary(_sd: SequenceDictionary) { + sd = _sd + } + + def partitionAndJoin[T, U](leftRDD: RDD[(ReferenceRegion, T)], + rightRDD: RDD[(ReferenceRegion, U)])(implicit tManifest: ClassTag[T], + uManifest: ClassTag[U]): RDD[(T, U)] = { + + // we will compute these parameters from the datasets we've got + val maxPartitions = max(leftRDD.partitions.length.toLong, rightRDD.partitions.length.toLong) + val partitionSize = sd.records.map(_.length).sum / maxPartitions + + partitionAndJoin(leftRDD, rightRDD, sd, partitionSize) + } /** * Performs a region join between two RDDs (shuffle join). @@ -51,12 +69,13 @@ object ShuffleRegionJoin { * @return An RDD of pairs (x, y), where x is from leftRDD, y is from rightRDD, and the region * corresponding to x overlaps the region corresponding to y. */ - def partitionAndJoin[T, U](sc: SparkContext, - leftRDD: RDD[(ReferenceRegion, T)], + def partitionAndJoin[T, U](leftRDD: RDD[(ReferenceRegion, T)], rightRDD: RDD[(ReferenceRegion, U)], seqDict: SequenceDictionary, partitionSize: Long)(implicit tManifest: ClassTag[T], uManifest: ClassTag[U]): RDD[(T, U)] = { + val sc = leftRDD.context + // Create the set of bins across the genome for parallel processing val seqLengths = Map(seqDict.records.toSeq.map(rec => (rec.name.toString, rec.length)): _*) val bins = sc.broadcast(GenomeBins(partitionSize, seqLengths)) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoinSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoinSuite.scala index 3ce6d6af3a..f1cc3a11ab 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoinSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/BroadcastRegionJoinSuite.scala @@ -133,14 +133,12 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite { BroadcastRegionJoinSuite.getReferenceRegion(record2)) assert(BroadcastRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord]( - sc, rdd1, rdd2).aggregate(true)( BroadcastRegionJoinSuite.merge, BroadcastRegionJoinSuite.and)) assert(BroadcastRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord]( - sc, rdd1, rdd2) .aggregate(0)( @@ -171,7 +169,6 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite { val recordsRdd = sc.parallelize(Seq(record1, record2)).keyBy(ReferenceRegion(_).get) assert(BroadcastRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord]( - sc, baseRdd, recordsRdd) .aggregate(true)( @@ -179,7 +176,6 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite { BroadcastRegionJoinSuite.and)) assert(BroadcastRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord]( - sc, baseRdd, recordsRdd).count() === 2) } @@ -222,7 +218,6 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite { val recordsRdd = sc.parallelize(Seq(record1, record2, record3)).keyBy(ReferenceRegion(_).get) assert(BroadcastRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord]( - sc, baseRdd, recordsRdd) .aggregate(true)( @@ -230,7 +225,6 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite { BroadcastRegionJoinSuite.and)) assert(BroadcastRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord]( - sc, baseRdd, recordsRdd).count() === 3) } @@ -277,7 +271,6 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite { recordsRdd) .leftOuterJoin( BroadcastRegionJoin.partitionAndJoin( - sc, baseRdd, recordsRdd)) .filter({ diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoinSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoinSuite.scala index 0c8d5aa70b..1282c94205 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoinSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ShuffleRegionJoinSuite.scala @@ -56,7 +56,6 @@ class ShuffleRegionJoinSuite extends ADAMFunSuite { val recordsRdd = sc.parallelize(Seq(record1, record2)).keyBy(ReferenceRegion(_).get) assert(ShuffleRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord]( - sc, baseRdd, recordsRdd, seqDict, @@ -66,7 +65,6 @@ class ShuffleRegionJoinSuite extends ADAMFunSuite { ShuffleRegionJoinSuite.and)) assert(ShuffleRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord]( - sc, baseRdd, recordsRdd, seqDict, @@ -111,7 +109,6 @@ class ShuffleRegionJoinSuite extends ADAMFunSuite { val recordsRdd = sc.parallelize(Seq(record1, record2, record3)).keyBy(ReferenceRegion(_).get) assert(ShuffleRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord]( - sc, baseRdd, recordsRdd, seqDict, @@ -121,7 +118,6 @@ class ShuffleRegionJoinSuite extends ADAMFunSuite { ShuffleRegionJoinSuite.and)) assert(ShuffleRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord]( - sc, baseRdd, recordsRdd, seqDict, @@ -170,7 +166,6 @@ class ShuffleRegionJoinSuite extends ADAMFunSuite { recordsRdd) .leftOuterJoin( ShuffleRegionJoin.partitionAndJoin( - sc, baseRdd, recordsRdd, seqDict,