Skip to content

Commit

Permalink
[ADAM-600] Adding RegionJoin trait.
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft committed Mar 16, 2015
1 parent 13d12b1 commit 823cfc3
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class CalculateDepth(protected val args: CalculateDepthArgs) extends ADAMSparkCo

val joinedRDD: RDD[(ReferenceRegion, AlignmentRecord)] =
if (args.cartesian) BroadcastRegionJoin.cartesianFilter(variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_).get))
else BroadcastRegionJoin.partitionAndJoin(sc, variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_).get))
else BroadcastRegionJoin.partitionAndJoin(variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_).get))

val depths: RDD[(ReferenceRegion, Int)] =
joinedRDD.map { case (region, record) => (region, 1) }.reduceByKey(_ + _).sortByKey()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.reflect.ClassTag
* Different implementations will have different performance characteristics -- and new implementations
* will likely be added in the future, see the notes to each individual method for more details.
*/
object BroadcastRegionJoin {
object BroadcastRegionJoin extends RegionJoin {

/**
* Performs a region join between two RDDs (broadcast join).
Expand Down Expand Up @@ -62,11 +62,12 @@ object BroadcastRegionJoin {
* @return An RDD of pairs (x, y), where x is from baseRDD, y is from joinedRDD, and the region
* corresponding to x overlaps the region corresponding to y.
*/
def partitionAndJoin[T, U](sc: SparkContext,
baseRDD: RDD[(ReferenceRegion, T)],
def partitionAndJoin[T, U](baseRDD: RDD[(ReferenceRegion, T)],
joinedRDD: RDD[(ReferenceRegion, U)])(implicit tManifest: ClassTag[T],
uManifest: ClassTag[U]): RDD[(T, U)] = {

val sc = baseRDD.context

/**
* Original Join Design:
*
Expand Down Expand Up @@ -165,7 +166,7 @@ object BroadcastRegionJoin {
*
* @param regions The input-set of regions.
*/
class NonoverlappingRegions(regions: Iterable[ReferenceRegion]) extends Serializable {
private[rdd] class NonoverlappingRegions(regions: Iterable[ReferenceRegion]) extends Serializable {

assert(regions != null, "regions parameter cannot be null")

Expand Down Expand Up @@ -299,7 +300,7 @@ class NonoverlappingRegions(regions: Iterable[ReferenceRegion]) extends Serializ
"%s:%d-%d (%s)".format(referenceName, endpoints.head, endpoints.last, endpoints.mkString(","))
}

object NonoverlappingRegions {
private[rdd] object NonoverlappingRegions {

def apply[T](values: Seq[(ReferenceRegion, T)]) =
new NonoverlappingRegions(values.map(_._1))
Expand All @@ -322,7 +323,7 @@ object NonoverlappingRegions {
* be valid reference names with respect to the sequence
* dictionary.
*/
class MultiContigNonoverlappingRegions(
private[rdd] class MultiContigNonoverlappingRegions(
regions: Seq[(String, Iterable[ReferenceRegion])]) extends Serializable {

assert(regions != null,
Expand All @@ -338,7 +339,7 @@ class MultiContigNonoverlappingRegions(
regionMap.get(value._1.referenceName).fold(false)(_.hasRegionsFor(value))
}

object MultiContigNonoverlappingRegions {
private[rdd] object MultiContigNonoverlappingRegions {
def apply[T](values: Seq[(ReferenceRegion, T)]): MultiContigNonoverlappingRegions = {
new MultiContigNonoverlappingRegions(
values.map(kv => (kv._1.referenceName, kv._1))
Expand Down
44 changes: 44 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/RegionJoin.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd

import org.bdgenomics.adam.models.{ SequenceDictionary, ReferenceRegion }
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import scala.Predef._
import org.apache.spark.SparkContext
import scala.reflect.ClassTag

trait RegionJoin {
/**
* Performs a region join between two RDDs.
*
* @param sc A SparkContext for the cluster that will perform the join
* @param baseRDD The 'left' side of the join
* @param joinedRDD The 'right' side of the join
* @param tManifest implicit type of baseRDD
* @param uManifest implicit type of joinedRDD
* @tparam T type of baseRDD
* @tparam U type of joinedRDD
* @return An RDD of pairs (x, y), where x is from baseRDD, y is from joinedRDD, and the region
* corresponding to x overlaps the region corresponding to y.
*/
def partitionAndJoin[T, U](baseRDD: RDD[(ReferenceRegion, T)],
joinedRDD: RDD[(ReferenceRegion, U)])(implicit tManifest: ClassTag[T],
uManifest: ClassTag[U]): RDD[(T, U)]
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,30 @@ package org.bdgenomics.adam.rdd
import org.apache.spark.{ Logging, Partitioner, SparkContext }
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.{ SequenceDictionary, ReferenceRegion }
import org.bdgenomics.adam.models.{ SequenceDictionary, SequenceRecord, ReferenceRegion }
import org.bdgenomics.adam.rdd.ADAMContext._
import scala.collection.mutable.ListBuffer
import scala.math._
import scala.reflect.ClassTag

object ShuffleRegionJoin {
object ShuffleRegionJoin extends RegionJoin {

private var sd = new SequenceDictionary(Vector[SequenceRecord]())

def setSequenceDictionary(_sd: SequenceDictionary) {
sd = _sd
}

def partitionAndJoin[T, U](leftRDD: RDD[(ReferenceRegion, T)],
rightRDD: RDD[(ReferenceRegion, U)])(implicit tManifest: ClassTag[T],
uManifest: ClassTag[U]): RDD[(T, U)] = {

// we will compute these parameters from the datasets we've got
val maxPartitions = max(leftRDD.partitions.length.toLong, rightRDD.partitions.length.toLong)
val partitionSize = sd.records.map(_.length).sum / maxPartitions

partitionAndJoin(leftRDD, rightRDD, sd, partitionSize)
}

/**
* Performs a region join between two RDDs (shuffle join).
Expand All @@ -51,12 +69,13 @@ object ShuffleRegionJoin {
* @return An RDD of pairs (x, y), where x is from leftRDD, y is from rightRDD, and the region
* corresponding to x overlaps the region corresponding to y.
*/
def partitionAndJoin[T, U](sc: SparkContext,
leftRDD: RDD[(ReferenceRegion, T)],
def partitionAndJoin[T, U](leftRDD: RDD[(ReferenceRegion, T)],
rightRDD: RDD[(ReferenceRegion, U)],
seqDict: SequenceDictionary,
partitionSize: Long)(implicit tManifest: ClassTag[T],
uManifest: ClassTag[U]): RDD[(T, U)] = {
val sc = leftRDD.context

// Create the set of bins across the genome for parallel processing
val seqLengths = Map(seqDict.records.toSeq.map(rec => (rec.name.toString, rec.length)): _*)
val bins = sc.broadcast(GenomeBins(partitionSize, seqLengths))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,12 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite {
BroadcastRegionJoinSuite.getReferenceRegion(record2))

assert(BroadcastRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord](
sc,
rdd1,
rdd2).aggregate(true)(
BroadcastRegionJoinSuite.merge,
BroadcastRegionJoinSuite.and))

assert(BroadcastRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord](
sc,
rdd1,
rdd2)
.aggregate(0)(
Expand Down Expand Up @@ -171,15 +169,13 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite {
val recordsRdd = sc.parallelize(Seq(record1, record2)).keyBy(ReferenceRegion(_).get)

assert(BroadcastRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord](
sc,
baseRdd,
recordsRdd)
.aggregate(true)(
BroadcastRegionJoinSuite.merge,
BroadcastRegionJoinSuite.and))

assert(BroadcastRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord](
sc,
baseRdd,
recordsRdd).count() === 2)
}
Expand Down Expand Up @@ -222,15 +218,13 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite {
val recordsRdd = sc.parallelize(Seq(record1, record2, record3)).keyBy(ReferenceRegion(_).get)

assert(BroadcastRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord](
sc,
baseRdd,
recordsRdd)
.aggregate(true)(
BroadcastRegionJoinSuite.merge,
BroadcastRegionJoinSuite.and))

assert(BroadcastRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord](
sc,
baseRdd,
recordsRdd).count() === 3)
}
Expand Down Expand Up @@ -277,7 +271,6 @@ class BroadcastRegionJoinSuite extends ADAMFunSuite {
recordsRdd)
.leftOuterJoin(
BroadcastRegionJoin.partitionAndJoin(
sc,
baseRdd,
recordsRdd))
.filter({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class ShuffleRegionJoinSuite extends ADAMFunSuite {
val recordsRdd = sc.parallelize(Seq(record1, record2)).keyBy(ReferenceRegion(_).get)

assert(ShuffleRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord](
sc,
baseRdd,
recordsRdd,
seqDict,
Expand All @@ -66,7 +65,6 @@ class ShuffleRegionJoinSuite extends ADAMFunSuite {
ShuffleRegionJoinSuite.and))

assert(ShuffleRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord](
sc,
baseRdd,
recordsRdd,
seqDict,
Expand Down Expand Up @@ -111,7 +109,6 @@ class ShuffleRegionJoinSuite extends ADAMFunSuite {
val recordsRdd = sc.parallelize(Seq(record1, record2, record3)).keyBy(ReferenceRegion(_).get)

assert(ShuffleRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord](
sc,
baseRdd,
recordsRdd,
seqDict,
Expand All @@ -121,7 +118,6 @@ class ShuffleRegionJoinSuite extends ADAMFunSuite {
ShuffleRegionJoinSuite.and))

assert(ShuffleRegionJoin.partitionAndJoin[AlignmentRecord, AlignmentRecord](
sc,
baseRdd,
recordsRdd,
seqDict,
Expand Down Expand Up @@ -170,7 +166,6 @@ class ShuffleRegionJoinSuite extends ADAMFunSuite {
recordsRdd)
.leftOuterJoin(
ShuffleRegionJoin.partitionAndJoin(
sc,
baseRdd,
recordsRdd,
seqDict,
Expand Down

0 comments on commit 823cfc3

Please sign in to comment.