Skip to content

Commit

Permalink
Improved test coverage of RegionJoin code.
Browse files Browse the repository at this point in the history
Added tests to all of the GenomicRDD test suites to run all of the available
broadcast and shuffle region join implementations across all of the various
GenomicRDD types. This caught several bugs, which were fixed in this commit.
  • Loading branch information
fnothaft committed Dec 6, 2016
1 parent 63b42b8 commit 85f889a
Show file tree
Hide file tree
Showing 10 changed files with 1,070 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -452,14 +452,25 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int],
genomicRdd: GenomicRDD[X, Y]): (Long, SequenceDictionary) = {

require(!(sequences.isEmpty && genomicRdd.sequences.isEmpty),
"Both RDDs at input to join have an empty sequence dictionary!")

// what sequences do we wind up with at the end?
val finalSequences = sequences ++ genomicRdd.sequences

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
val estPartitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)

// if the user provides too high of a partition count, the estimated number
// of partitions can go to 0
val partitions = if (estPartitions >= 1) {
estPartitions
} else {
1
}

(finalSequences.records.map(_.length).sum / partitions,
finalSequences)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ case class RightOuterShuffleRegionJoinAndGroupByLeft[T, U](sd: SequenceDictionar

protected def emptyFn(left: Iterator[((ReferenceRegion, Int), T)],
right: Iterator[((ReferenceRegion, Int), U)]): Iterator[(Option[T], Iterable[U])] = {
right.map(v => (None, Iterable(v._2)))
left.map(v => (Some(v._2), Iterable.empty)) ++ right.map(v => (None, Iterable(v._2)))
}
}

Expand Down Expand Up @@ -278,8 +278,9 @@ private trait SortedIntervalPartitionJoin[T, U, RT, RU] extends Iterator[(RT, RU
protected def pruneCache(to: Long)

private def getHits(): Unit = {
assert(!hits.hasNext)
// if there is nothing more in left, then I'm done
while (left.hasNext && hits.isEmpty) {
while (left.hasNext) {
// there is more in left...
val nl = left.next
val ((nextLeftRegion, _), nextLeft) = nl
Expand All @@ -298,7 +299,11 @@ private trait SortedIntervalPartitionJoin[T, U, RT, RU] extends Iterator[(RT, RU
// be improved by making cache a fancier data structure than just a list
// we filter for things that overlap, where at least one side of the join has a start position
// in this partition
hits = processHits(nextLeft, nextLeftRegion)
//
// also, see note "important: fun times with iterators" in this file, which explains
// that these must apparently be two lines
val newHits = processHits(nextLeft, nextLeftRegion)
hits = hits ++ newHits

assert(prevLeftRegion == null ||
(prevLeftRegion.referenceName == nextLeftRegion.referenceName &&
Expand All @@ -318,6 +323,11 @@ private trait SortedIntervalPartitionJoin[T, U, RT, RU] extends Iterator[(RT, RU
if (hits.isEmpty) {
getHits()
}
// if that fails, try advancing and pruning the cache
if (hits.isEmpty) {
advanceCache(binRegion.end)
pruneCache(binRegion.end)
}
// if hits is still empty, I must really be at the end
hits.hasNext
}
Expand Down Expand Up @@ -377,7 +387,11 @@ private case class SortedIntervalPartitionJoinAndGroupByLeft[T, U](

protected def postProcessHits(iter: Iterator[(T, U)],
currentLeft: T): Iterator[(T, Iterable[U])] = {
Iterator((currentLeft, iter.map(_._2).toIterable))
if (iter.hasNext) {
Iterator((currentLeft, iter.map(_._2).toIterable))
} else {
Iterator.empty
}
}
}

Expand Down Expand Up @@ -425,8 +439,40 @@ private trait SortedIntervalPartitionJoinWithVictims[T, U, RT, RU] extends Sorte
}

protected def pruneCache(to: Long) {

cache = cache.dropWhile(_._1.end <= to)
hits = hits ++ (victimCache.takeWhile(_._1.end <= to).map(u => postProcessPruned(u._2)))
cache = cache ++ victimCache.takeWhile(_._1.end > to)
victimCache = victimCache.dropWhile(_._1.end > to)

// important: fun times with iterators
//
// for reasons known only to God, if you combine these two lines down to a
// single line, it causes the hits iterator to be invalidated and become
// empty.
//
// MORE: it seems like there's some funniness with the underlying scala imp'l
// of append on two iterators. if the second line is written as:
//
// hits = hits ++ pped
//
// the line works as expected on scala 2.10. on scala 2.11, it occasionally
// fails. oddly enough, if you write the line above and then do a duplicate
// on the hits iterator (which you then reassign to hits), it works. i.e.,
//
// hits = hits ++ pped
// val (d, _) = hits.duplicate
// hits = d
//
// works on both scala 2.10 and 2.11 across all unit tests
//
// rewriting it as (pped ++ hits).toIterator seems to work all the time.
// that appends the hits iterator to a ListBuffer, and then returns an iterator
// over the list buffer. essentially, i think there's a bug in the Iterator.++
// method in scala that occasionally causes it to return an empty iterator, but
// i'm not sure why that is
val pped = (victimCache.takeWhile(_._1.end <= to).map(u => postProcessPruned(u._2)))
hits = (pped ++ hits).toIterator

victimCache = victimCache.dropWhile(_._1.end <= to)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class ADAMKryoRegistrator extends KryoRegistrator {

// org.bdgenomics.adam.rdd
kryo.register(classOf[org.bdgenomics.adam.rdd.GenomeBins])
kryo.register(Class.forName("org.bdgenomics.adam.rdd.SortedIntervalPartitionJoinAndGroupByLeft$$anonfun$postProcessHits$1"))

// org.bdgenomics.adam.rdd.read
kryo.register(classOf[org.bdgenomics.adam.rdd.read.FlagStatMetrics])
Expand Down Expand Up @@ -256,15 +257,23 @@ class ADAMKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[scala.Array[String]])
kryo.register(Class.forName("scala.Tuple2$mcCC$sp"))

// scala.collection
kryo.register(Class.forName("scala.collection.Iterator$$anon$11"))
kryo.register(Class.forName("scala.collection.Iterator$$anonfun$toStream$1"))

// scala.collection.convert
kryo.register(Class.forName("scala.collection.convert.Wrappers$"))

// scala.collection.immutable
kryo.register(classOf[scala.collection.immutable.::[_]])
kryo.register(classOf[scala.collection.immutable.Range])
kryo.register(Class.forName("scala.collection.immutable.Stream$Cons"))
kryo.register(Class.forName("scala.collection.immutable.Stream$Empty$"))

// scala.collection.mutable
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
kryo.register(classOf[scala.collection.mutable.ListBuffer[_]])
kryo.register(Class.forName("scala.collection.mutable.ListBuffer$$anon$1"))
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofInt])
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofLong])
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofByte])
Expand Down
4 changes: 4 additions & 0 deletions adam-core/src/test/resources/small.1.bed
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
1 143 26423
1 14397230 26472788
1 169801934 169801939
1 240997788 240997796
20 changes: 20 additions & 0 deletions adam-core/src/test/resources/small.1.narrowPeak
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
1 26472784 26472859 simread:1:26472783:false 0 + 0 -1 -1 -1
1 240997788 240997863 simread:1:240997787:true 0 + 0 -1 -1 -1
1 189606654 189606729 simread:1:189606653:true 0 + 0 -1 -1 -1
1 207027739 207027814 simread:1:207027738:true 0 + 0 -1 -1 -1
1 14397234 14397309 simread:1:14397233:false 0 + 0 -1 -1 -1
1 240344443 240344518 simread:1:240344442:true 0 + 0 -1 -1 -1
1 153978725 153978800 simread:1:153978724:false 0 + 0 -1 -1 -1
1 237728410 237728485 simread:1:237728409:true 0 + 0 -1 -1 -1
1 231911907 231911982 simread:1:231911906:false 0 + 0 -1 -1 -1
1 50683372 50683447 simread:1:50683371:false 0 + 0 -1 -1 -1
1 37577446 37577521 simread:1:37577445:false 0 + 0 -1 -1 -1
1 195211966 195212041 simread:1:195211965:false 0 + 0 -1 -1 -1
1 163841414 163841489 simread:1:163841413:false 0 + 0 -1 -1 -1
1 101556379 101556454 simread:1:101556378:false 0 + 0 -1 -1 -1
1 20101801 20101876 simread:1:20101800:true 0 + 0 -1 -1 -1
1 186794284 186794359 simread:1:186794283:true 0 + 0 -1 -1 -1
1 165341383 165341458 simread:1:165341382:true 0 + 0 -1 -1 -1
1 5469107 5469182 simread:1:5469106:true 0 + 0 -1 -1 -1
1 89554253 89554328 simread:1:89554252:false 0 + 0 -1 -1 -1
1 169801934 169802009 simread:1:169801933:true 0 + 0 -1 -1 -1
Original file line number Diff line number Diff line change
Expand Up @@ -538,5 +538,199 @@ class FeatureRDDSuite extends ADAMFunSuite with TypeCheckedTripleEquals {

assert(coverage.rdd.count == 19)
}

sparkTest("use broadcast join to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
val targets = sc.loadFeatures(targetsPath)

val jRdd = features.broadcastRegionJoin(targets)

assert(jRdd.rdd.count === 5L)
}

sparkTest("use right outer broadcast join to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
val targets = sc.loadFeatures(targetsPath)

val jRdd = features.rightOuterBroadcastRegionJoin(targets)

val c = jRdd.rdd.collect
assert(c.count(_._1.isEmpty) === 1)
assert(c.count(_._1.isDefined) === 5)
}

def sd = {
sc.loadBam(testFile("small.1.sam"))
.sequences
}

sparkTest("use shuffle join to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
.transform(_.repartition(1))
.copy(sequences = sd)
val targets = sc.loadFeatures(targetsPath)
.transform(_.repartition(1))

val jRdd = features.shuffleRegionJoin(targets)
val jRdd0 = features.shuffleRegionJoin(targets, optPartitions = Some(4))

// we can't guarantee that we get exactly the number of partitions requested,
// we get close though
assert(jRdd.rdd.partitions.length === 1)
assert(jRdd0.rdd.partitions.length === 5)

assert(jRdd.rdd.count === 5L)
assert(jRdd0.rdd.count === 5L)
}

sparkTest("use right outer shuffle join to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
.transform(_.repartition(1))
.copy(sequences = sd)
val targets = sc.loadFeatures(targetsPath)
.transform(_.repartition(1))

val jRdd = features.rightOuterShuffleRegionJoin(targets)
val jRdd0 = features.rightOuterShuffleRegionJoin(targets, optPartitions = Some(4))

// we can't guarantee that we get exactly the number of partitions requested,
// we get close though
assert(jRdd.rdd.partitions.length === 1)
assert(jRdd0.rdd.partitions.length === 5)

val c = jRdd.rdd.collect
val c0 = jRdd0.rdd.collect
assert(c.count(_._1.isEmpty) === 1)
assert(c0.count(_._1.isEmpty) === 1)
assert(c.count(_._1.isDefined) === 5)
assert(c0.count(_._1.isDefined) === 5)
}

sparkTest("use left outer shuffle join to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
.transform(_.repartition(1))
.copy(sequences = sd)
val targets = sc.loadFeatures(targetsPath)
.transform(_.repartition(1))

val jRdd = features.leftOuterShuffleRegionJoin(targets)
val jRdd0 = features.leftOuterShuffleRegionJoin(targets, optPartitions = Some(4))

// we can't guarantee that we get exactly the number of partitions requested,
// we get close though
assert(jRdd.rdd.partitions.length === 1)
assert(jRdd0.rdd.partitions.length === 5)

val c = jRdd.rdd.collect
val c0 = jRdd0.rdd.collect
assert(c.count(_._2.isEmpty) === 15)
assert(c0.count(_._2.isEmpty) === 15)
assert(c.count(_._2.isDefined) === 5)
assert(c0.count(_._2.isDefined) === 5)
}

sparkTest("use full outer shuffle join to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
.transform(_.repartition(1))
.copy(sequences = sd)
val targets = sc.loadFeatures(targetsPath)
.transform(_.repartition(1))

val jRdd = features.fullOuterShuffleRegionJoin(targets)
val jRdd0 = features.fullOuterShuffleRegionJoin(targets, optPartitions = Some(4))

// we can't guarantee that we get exactly the number of partitions requested,
// we get close though
assert(jRdd.rdd.partitions.length === 1)
assert(jRdd0.rdd.partitions.length === 5)

val c = jRdd.rdd.collect
val c0 = jRdd0.rdd.collect
assert(c.count(t => t._1.isEmpty && t._2.isEmpty) === 0)
assert(c0.count(t => t._1.isEmpty && t._2.isEmpty) === 0)
assert(c.count(t => t._1.isDefined && t._2.isEmpty) === 15)
assert(c0.count(t => t._1.isDefined && t._2.isEmpty) === 15)
assert(c.count(t => t._1.isEmpty && t._2.isDefined) === 1)
assert(c0.count(t => t._1.isEmpty && t._2.isDefined) === 1)
assert(c.count(t => t._1.isDefined && t._2.isDefined) === 5)
assert(c0.count(t => t._1.isDefined && t._2.isDefined) === 5)
}

sparkTest("use shuffle join with group by to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
.transform(_.repartition(1))
.copy(sequences = sd)
val targets = sc.loadFeatures(targetsPath)
.transform(_.repartition(1))

val jRdd = features.shuffleRegionJoinAndGroupByLeft(targets)
val jRdd0 = features.shuffleRegionJoinAndGroupByLeft(targets, optPartitions = Some(4))

// we can't guarantee that we get exactly the number of partitions requested,
// we get close though
assert(jRdd.rdd.partitions.length === 1)
assert(jRdd0.rdd.partitions.length === 5)

val c = jRdd.rdd.collect
val c0 = jRdd0.rdd.collect
assert(c.size === 5)
assert(c0.size === 5)
assert(c.forall(_._2.size == 1))
assert(c0.forall(_._2.size == 1))
}

sparkTest("use right outer shuffle join with group by to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
.transform(_.repartition(1))
.copy(sequences = sd)
val targets = sc.loadFeatures(targetsPath)
.transform(_.repartition(1))

val jRdd = features.rightOuterShuffleRegionJoinAndGroupByLeft(targets)
val jRdd0 = features.rightOuterShuffleRegionJoinAndGroupByLeft(targets, optPartitions = Some(4))

// we can't guarantee that we get exactly the number of partitions requested,
// we get close though
assert(jRdd.rdd.partitions.length === 1)
assert(jRdd0.rdd.partitions.length === 5)

val c = jRdd0.rdd.collect // FIXME
val c0 = jRdd0.rdd.collect

assert(c.count(_._1.isDefined) === 20)
assert(c0.count(_._1.isDefined) === 20)
assert(c.filter(_._1.isDefined).count(_._2.size == 1) === 5)
assert(c0.filter(_._1.isDefined).count(_._2.size == 1) === 5)
assert(c.filter(_._1.isDefined).count(_._2.isEmpty) === 15)
assert(c0.filter(_._1.isDefined).count(_._2.isEmpty) === 15)
assert(c.count(_._1.isEmpty) === 1)
assert(c0.count(_._1.isEmpty) === 1)
assert(c.filter(_._1.isEmpty).forall(_._2.size == 1))
assert(c0.filter(_._1.isEmpty).forall(_._2.size == 1))
}
}

Loading

0 comments on commit 85f889a

Please sign in to comment.