Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc shuffle join fixes. #1253

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 78 additions & 70 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,9 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
* @return Returns a new genomic RDD containing all pairs of keys that
* overlapped in the genomic coordinate space.
*/
def broadcastRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, X), Z]](genomicRdd: GenomicRDD[X, Y])(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = {
def broadcastRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, X), Z]](
genomicRdd: GenomicRDD[X, Y])(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = {

// key the RDDs and join
GenericGenomicRDD[(T, X)](InnerBroadcastRegionJoin[T, X]().partitionAndJoin(flattenRddByRegions(),
Expand All @@ -423,8 +424,9 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
* overlapped in the genomic coordinate space, and all keys from the
* right RDD that did not overlap a key in the left RDD.
*/
def rightOuterBroadcastRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], X), Z]](genomicRdd: GenomicRDD[X, Y])(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = {
def rightOuterBroadcastRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], X), Z]](
genomicRdd: GenomicRDD[X, Y])(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = {

// key the RDDs and join
GenericGenomicRDD[(Option[T], X)](RightOuterBroadcastRegionJoin[T, X]().partitionAndJoin(flattenRddByRegions(),
Expand All @@ -437,6 +439,42 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
.asInstanceOf[GenomicRDD[(Option[T], X), Z]]
}

/**
* Computes the partition size and final sequence dictionary for a join.
*
* @param optPartitions Optional user-requested number of partitions for the
* end of the shuffle.
* @param genomicRdd The genomic RDD we are joining against.
* @return Returns a tuple containing the (partition size, final sequence
* dictionary after the join).
*/
private[rdd] def joinPartitionSizeAndSequences[X, Y <: GenomicRDD[X, Y]](
optPartitions: Option[Int],
genomicRdd: GenomicRDD[X, Y]): (Long, SequenceDictionary) = {

require(!(sequences.isEmpty && genomicRdd.sequences.isEmpty),
"Both RDDs at input to join have an empty sequence dictionary!")

// what sequences do we wind up with at the end?
val finalSequences = sequences ++ genomicRdd.sequences

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val estPartitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)

// if the user provides too high of a partition count, the estimated number
// of partitions can go to 0
val partitions = if (estPartitions >= 1) {
estPartitions
} else {
1
}

(finalSequences.records.map(_.length).sum / partitions,
finalSequences)
}

/**
* Performs a sort-merge inner join between this RDD and another RDD.
*
Expand All @@ -450,22 +488,17 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
* @return Returns a new genomic RDD containing all pairs of keys that
* overlapped in the genomic coordinate space.
*/
def shuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, X), Z]](genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = {
def shuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, X), Z]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = {

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)

// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(T, X)](
InnerShuffleRegionJoin[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
Expand All @@ -489,22 +522,17 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
* overlapped in the genomic coordinate space, and all keys from the
* right RDD that did not overlap a key in the left RDD.
*/
def rightOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], X), Z]](genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = {

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)
def rightOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], X), Z]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = {

// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(Option[T], X)](
RightOuterShuffleRegionJoin[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
Expand All @@ -531,22 +559,17 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
* overlapped in the genomic coordinate space, and all keys from the
* left RDD that did not overlap a key in the right RDD.
*/
def leftOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, Option[X]), Z]](genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Option[X]), Z] = {

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)
def leftOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, Option[X]), Z]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Option[X]), Z] = {

// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(T, Option[X])](
LeftOuterShuffleRegionJoin[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
Expand All @@ -572,22 +595,17 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
* overlapped in the genomic coordinate space, and values that did not
* overlap will be paired with a `None`.
*/
def fullOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], Option[X]), Z]](genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Option[X]), Z] = {
def fullOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], Option[X]), Z]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Option[X]), Z] = {

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)

// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(Option[T], Option[X])](
FullOuterShuffleRegionJoin[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
Expand All @@ -614,22 +632,17 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
* overlapped in the genomic coordinate space, grouped together by
* the value they overlapped in the left RDD..
*/
def shuffleRegionJoinAndGroupByLeft[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, Iterable[X]), Z]](genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Iterable[X]), Z] = {

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)
def shuffleRegionJoinAndGroupByLeft[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, Iterable[X]), Z]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Iterable[X]), Z] = {

// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(T, Iterable[X])](
InnerShuffleRegionJoinAndGroupByLeft[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
Expand Down Expand Up @@ -658,22 +671,17 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
* the value they overlapped in the left RDD, and all values from the
* right RDD that did not overlap an item in the left RDD.
*/
def rightOuterShuffleRegionJoinAndGroupByLeft[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], Iterable[X]), Z]](genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Iterable[X]), Z] = {
def rightOuterShuffleRegionJoinAndGroupByLeft[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], Iterable[X]), Z]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Iterable[X]), Z] = {

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)

// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(Option[T], Iterable[X])](
RightOuterShuffleRegionJoinAndGroupByLeft[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/
package org.bdgenomics.adam.rdd

import org.apache.spark.SparkContext._
import org.apache.spark.{ Partitioner, SparkContext }
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.ReferenceRegion._
import org.bdgenomics.adam.models.{ SequenceDictionary, ReferenceRegion }
Expand Down Expand Up @@ -240,7 +240,7 @@ case class RightOuterShuffleRegionJoinAndGroupByLeft[T, U](sd: SequenceDictionar

protected def emptyFn(left: Iterator[((ReferenceRegion, Int), T)],
right: Iterator[((ReferenceRegion, Int), U)]): Iterator[(Option[T], Iterable[U])] = {
right.map(v => (None, Iterable(v._2)))
left.map(v => (Some(v._2), Iterable.empty)) ++ right.map(v => (None, Iterable(v._2)))
}
}

Expand Down Expand Up @@ -278,8 +278,9 @@ private trait SortedIntervalPartitionJoin[T, U, RT, RU] extends Iterator[(RT, RU
protected def pruneCache(to: Long)

private def getHits(): Unit = {
assert(!hits.hasNext)
// if there is nothing more in left, then I'm done
while (left.hasNext && hits.isEmpty) {
while (left.hasNext) {
// there is more in left...
val nl = left.next
val ((nextLeftRegion, _), nextLeft) = nl
Expand All @@ -298,7 +299,11 @@ private trait SortedIntervalPartitionJoin[T, U, RT, RU] extends Iterator[(RT, RU
// be improved by making cache a fancier data structure than just a list
// we filter for things that overlap, where at least one side of the join has a start position
// in this partition
hits = processHits(nextLeft, nextLeftRegion)
//
// also, see note "important: fun times with iterators" in this file, which explains
// that these must apparently be two lines
val newHits = processHits(nextLeft, nextLeftRegion)
hits = hits ++ newHits

assert(prevLeftRegion == null ||
(prevLeftRegion.referenceName == nextLeftRegion.referenceName &&
Expand All @@ -318,6 +323,11 @@ private trait SortedIntervalPartitionJoin[T, U, RT, RU] extends Iterator[(RT, RU
if (hits.isEmpty) {
getHits()
}
// if that fails, try advancing and pruning the cache
if (hits.isEmpty) {
advanceCache(binRegion.end)
pruneCache(binRegion.end)
}
// if hits is still empty, I must really be at the end
hits.hasNext
}
Expand Down Expand Up @@ -377,7 +387,11 @@ private case class SortedIntervalPartitionJoinAndGroupByLeft[T, U](

protected def postProcessHits(iter: Iterator[(T, U)],
currentLeft: T): Iterator[(T, Iterable[U])] = {
Iterator((currentLeft, iter.map(_._2).toIterable))
if (iter.hasNext) {
Iterator((currentLeft, iter.map(_._2).toIterable))
} else {
Iterator.empty
}
}
}

Expand Down Expand Up @@ -425,8 +439,40 @@ private trait SortedIntervalPartitionJoinWithVictims[T, U, RT, RU] extends Sorte
}

protected def pruneCache(to: Long) {

cache = cache.dropWhile(_._1.end <= to)
hits = hits ++ (victimCache.takeWhile(_._1.end <= to).map(u => postProcessPruned(u._2)))
cache = cache ++ victimCache.takeWhile(_._1.end > to)
victimCache = victimCache.dropWhile(_._1.end > to)

// important: fun times with iterators
//
// for reasons known only to God, if you combine these two lines down to a
// single line, it causes the hits iterator to be invalidated and become
// empty.
//
// MORE: it seems like there's some funniness with the underlying scala imp'l
// of append on two iterators. if the second line is written as:
//
// hits = hits ++ pped
//
// the line works as expected on scala 2.10. on scala 2.11, it occasionally
// fails. oddly enough, if you write the line above and then do a duplicate
// on the hits iterator (which you then reassign to hits), it works. i.e.,
//
// hits = hits ++ pped
// val (d, _) = hits.duplicate
// hits = d
//
// works on both scala 2.10 and 2.11 across all unit tests
//
// rewriting it as (pped ++ hits).toIterator seems to work all the time.
// that appends the hits iterator to a ListBuffer, and then returns an iterator
// over the list buffer. essentially, i think there's a bug in the Iterator.++
// method in scala that occasionally causes it to return an empty iterator, but
// i'm not sure why that is
val pped = (victimCache.takeWhile(_._1.end <= to).map(u => postProcessPruned(u._2)))
hits = (pped ++ hits).toIterator

victimCache = victimCache.dropWhile(_._1.end <= to)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class ADAMKryoRegistrator extends KryoRegistrator {

// org.bdgenomics.adam.rdd
kryo.register(classOf[org.bdgenomics.adam.rdd.GenomeBins])
kryo.register(Class.forName("org.bdgenomics.adam.rdd.SortedIntervalPartitionJoinAndGroupByLeft$$anonfun$postProcessHits$1"))

// org.bdgenomics.adam.rdd.read
kryo.register(classOf[org.bdgenomics.adam.rdd.read.FlagStatMetrics])
Expand Down Expand Up @@ -256,15 +257,23 @@ class ADAMKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[scala.Array[String]])
kryo.register(Class.forName("scala.Tuple2$mcCC$sp"))

// scala.collection
kryo.register(Class.forName("scala.collection.Iterator$$anon$11"))
kryo.register(Class.forName("scala.collection.Iterator$$anonfun$toStream$1"))

// scala.collection.convert
kryo.register(Class.forName("scala.collection.convert.Wrappers$"))

// scala.collection.immutable
kryo.register(classOf[scala.collection.immutable.::[_]])
kryo.register(classOf[scala.collection.immutable.Range])
kryo.register(Class.forName("scala.collection.immutable.Stream$Cons"))
kryo.register(Class.forName("scala.collection.immutable.Stream$Empty$"))

// scala.collection.mutable
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
kryo.register(classOf[scala.collection.mutable.ListBuffer[_]])
kryo.register(Class.forName("scala.collection.mutable.ListBuffer$$anon$1"))
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofInt])
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofLong])
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofByte])
Expand Down
4 changes: 4 additions & 0 deletions adam-core/src/test/resources/small.1.bed
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
1 143 26423
1 14397230 26472788
1 169801934 169801939
1 240997788 240997796
Loading