Skip to content

Commit

Permalink
Tests pass.
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft committed Dec 1, 2016
1 parent 63b42b8 commit 32abf4b
Show file tree
Hide file tree
Showing 10 changed files with 1,070 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -452,14 +452,25 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int],
genomicRdd: GenomicRDD[X, Y]): (Long, SequenceDictionary) = {

require(!(sequences.isEmpty && genomicRdd.sequences.isEmpty),
"Both RDDs at input to join have an empty sequence dictionary!")

// what sequences do we wind up with at the end?
val finalSequences = sequences ++ genomicRdd.sequences

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
val estPartitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)

// if the user provides too high of a partition count, the estimated number
// of partitions can go to 0
val partitions = if (estPartitions >= 1) {
estPartitions
} else {
1
}

(finalSequences.records.map(_.length).sum / partitions,
finalSequences)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ case class RightOuterShuffleRegionJoinAndGroupByLeft[T, U](sd: SequenceDictionar

protected def emptyFn(left: Iterator[((ReferenceRegion, Int), T)],
right: Iterator[((ReferenceRegion, Int), U)]): Iterator[(Option[T], Iterable[U])] = {
right.map(v => (None, Iterable(v._2)))
left.map(v => (Some(v._2), Iterable.empty)) ++ right.map(v => (None, Iterable(v._2)))
}
}

Expand Down Expand Up @@ -278,8 +278,9 @@ private trait SortedIntervalPartitionJoin[T, U, RT, RU] extends Iterator[(RT, RU
protected def pruneCache(to: Long)

private def getHits(): Unit = {
assert(!hits.hasNext)
// if there is nothing more in left, then I'm done
while (left.hasNext && hits.isEmpty) {
while (left.hasNext) {
// there is more in left...
val nl = left.next
val ((nextLeftRegion, _), nextLeft) = nl
Expand All @@ -298,7 +299,11 @@ private trait SortedIntervalPartitionJoin[T, U, RT, RU] extends Iterator[(RT, RU
// be improved by making cache a fancier data structure than just a list
// we filter for things that overlap, where at least one side of the join has a start position
// in this partition
hits = processHits(nextLeft, nextLeftRegion)
//
// also, see note "important: fun times with iterators" in this file, which explains
// that these must apparently be two lines
val newHits = processHits(nextLeft, nextLeftRegion)
hits = hits ++ newHits

assert(prevLeftRegion == null ||
(prevLeftRegion.referenceName == nextLeftRegion.referenceName &&
Expand All @@ -318,6 +323,11 @@ private trait SortedIntervalPartitionJoin[T, U, RT, RU] extends Iterator[(RT, RU
if (hits.isEmpty) {
getHits()
}
// if that fails, try advancing and pruning the cache
if (hits.isEmpty) {
advanceCache(binRegion.end)
pruneCache(binRegion.end)
}
// if hits is still empty, I must really be at the end
hits.hasNext
}
Expand Down Expand Up @@ -377,7 +387,11 @@ private case class SortedIntervalPartitionJoinAndGroupByLeft[T, U](

protected def postProcessHits(iter: Iterator[(T, U)],
currentLeft: T): Iterator[(T, Iterable[U])] = {
Iterator((currentLeft, iter.map(_._2).toIterable))
if (iter.hasNext) {
Iterator((currentLeft, iter.map(_._2).toIterable))
} else {
Iterator.empty
}
}
}

Expand Down Expand Up @@ -425,8 +439,40 @@ private trait SortedIntervalPartitionJoinWithVictims[T, U, RT, RU] extends Sorte
}

protected def pruneCache(to: Long) {

cache = cache.dropWhile(_._1.end <= to)
hits = hits ++ (victimCache.takeWhile(_._1.end <= to).map(u => postProcessPruned(u._2)))
cache = cache ++ victimCache.takeWhile(_._1.end > to)
victimCache = victimCache.dropWhile(_._1.end > to)

// important: fun times with iterators
//
// for reasons known only to God, if you combine these two lines down to a
// single line, it causes the hits iterator to be invalidated and become
// empty.
//
// MORE: it seems like there's some funniness with the underlying scala imp'l
// of append on two iterators. if the second line is written as:
//
// hits = hits ++ pped
//
// the line works as expected on scala 2.10. on scala 2.11, it occasionally
// fails. oddly enough, if you write the line above and then do a duplicate
// on the hits iterator (which you then reassign to hits), it works. i.e.,
//
// hits = hits ++ pped
// val (d, _) = hits.duplicate
// hits = d
//
// works on both scala 2.10 and 2.11 across all unit tests
//
// rewriting it as (pped ++ hits).toIterator seems to work all the time.
// that appends the hits iterator to a ListBuffer, and then returns an iterator
// over the list buffer. essentially, i think there's a bug in the Iterator.++
// method in scala that occasionally causes it to return an empty iterator, but
// i'm not sure why that is
val pped = (victimCache.takeWhile(_._1.end <= to).map(u => postProcessPruned(u._2)))
hits = (pped ++ hits).toIterator

victimCache = victimCache.dropWhile(_._1.end <= to)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class ADAMKryoRegistrator extends KryoRegistrator {

// org.bdgenomics.adam.rdd
kryo.register(classOf[org.bdgenomics.adam.rdd.GenomeBins])
kryo.register(Class.forName("org.bdgenomics.adam.rdd.SortedIntervalPartitionJoinAndGroupByLeft$$anonfun$postProcessHits$1"))

// org.bdgenomics.adam.rdd.read
kryo.register(classOf[org.bdgenomics.adam.rdd.read.FlagStatMetrics])
Expand Down Expand Up @@ -256,15 +257,23 @@ class ADAMKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[scala.Array[String]])
kryo.register(Class.forName("scala.Tuple2$mcCC$sp"))

// scala.collection
kryo.register(Class.forName("scala.collection.Iterator$$anon$11"))
kryo.register(Class.forName("scala.collection.Iterator$$anonfun$toStream$1"))

// scala.collection.convert
kryo.register(Class.forName("scala.collection.convert.Wrappers$"))

// scala.collection.immutable
kryo.register(classOf[scala.collection.immutable.::[_]])
kryo.register(classOf[scala.collection.immutable.Range])
kryo.register(Class.forName("scala.collection.immutable.Stream$Cons"))
kryo.register(Class.forName("scala.collection.immutable.Stream$Empty$"))

// scala.collection.mutable
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
kryo.register(classOf[scala.collection.mutable.ListBuffer[_]])
kryo.register(Class.forName("scala.collection.mutable.ListBuffer$$anon$1"))
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofInt])
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofLong])
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofByte])
Expand Down
4 changes: 4 additions & 0 deletions adam-core/src/test/resources/small.1.bed
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
1 143 26423
1 14397230 26472788
1 169801934 169801939
1 240997788 240997796
20 changes: 20 additions & 0 deletions adam-core/src/test/resources/small.1.narrowPeak
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
1 26472784 26472859 simread:1:26472783:false 0 + 0 -1 -1 -1
1 240997788 240997863 simread:1:240997787:true 0 + 0 -1 -1 -1
1 189606654 189606729 simread:1:189606653:true 0 + 0 -1 -1 -1
1 207027739 207027814 simread:1:207027738:true 0 + 0 -1 -1 -1
1 14397234 14397309 simread:1:14397233:false 0 + 0 -1 -1 -1
1 240344443 240344518 simread:1:240344442:true 0 + 0 -1 -1 -1
1 153978725 153978800 simread:1:153978724:false 0 + 0 -1 -1 -1
1 237728410 237728485 simread:1:237728409:true 0 + 0 -1 -1 -1
1 231911907 231911982 simread:1:231911906:false 0 + 0 -1 -1 -1
1 50683372 50683447 simread:1:50683371:false 0 + 0 -1 -1 -1
1 37577446 37577521 simread:1:37577445:false 0 + 0 -1 -1 -1
1 195211966 195212041 simread:1:195211965:false 0 + 0 -1 -1 -1
1 163841414 163841489 simread:1:163841413:false 0 + 0 -1 -1 -1
1 101556379 101556454 simread:1:101556378:false 0 + 0 -1 -1 -1
1 20101801 20101876 simread:1:20101800:true 0 + 0 -1 -1 -1
1 186794284 186794359 simread:1:186794283:true 0 + 0 -1 -1 -1
1 165341383 165341458 simread:1:165341382:true 0 + 0 -1 -1 -1
1 5469107 5469182 simread:1:5469106:true 0 + 0 -1 -1 -1
1 89554253 89554328 simread:1:89554252:false 0 + 0 -1 -1 -1
1 169801934 169802009 simread:1:169801933:true 0 + 0 -1 -1 -1
Original file line number Diff line number Diff line change
Expand Up @@ -538,5 +538,199 @@ class FeatureRDDSuite extends ADAMFunSuite with TypeCheckedTripleEquals {

assert(coverage.rdd.count == 19)
}

sparkTest("use broadcast join to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
val targets = sc.loadFeatures(targetsPath)

val jRdd = features.broadcastRegionJoin(targets)

assert(jRdd.rdd.count === 5L)
}

sparkTest("use right outer broadcast join to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
val targets = sc.loadFeatures(targetsPath)

val jRdd = features.rightOuterBroadcastRegionJoin(targets)

val c = jRdd.rdd.collect
assert(c.count(_._1.isEmpty) === 1)
assert(c.count(_._1.isDefined) === 5)
}

def sd = {
sc.loadBam(testFile("small.1.sam"))
.sequences
}

sparkTest("use shuffle join to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
.transform(_.repartition(1))
.copy(sequences = sd)
val targets = sc.loadFeatures(targetsPath)
.transform(_.repartition(1))

val jRdd = features.shuffleRegionJoin(targets)
val jRdd0 = features.shuffleRegionJoin(targets, optPartitions = Some(4))

// we can't guarantee that we get exactly the number of partitions requested,
// we get close though
assert(jRdd.rdd.partitions.length === 1)
assert(jRdd0.rdd.partitions.length === 5)

assert(jRdd.rdd.count === 5L)
assert(jRdd0.rdd.count === 5L)
}

sparkTest("use right outer shuffle join to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
.transform(_.repartition(1))
.copy(sequences = sd)
val targets = sc.loadFeatures(targetsPath)
.transform(_.repartition(1))

val jRdd = features.rightOuterShuffleRegionJoin(targets)
val jRdd0 = features.rightOuterShuffleRegionJoin(targets, optPartitions = Some(4))

// we can't guarantee that we get exactly the number of partitions requested,
// we get close though
assert(jRdd.rdd.partitions.length === 1)
assert(jRdd0.rdd.partitions.length === 5)

val c = jRdd.rdd.collect
val c0 = jRdd0.rdd.collect
assert(c.count(_._1.isEmpty) === 1)
assert(c0.count(_._1.isEmpty) === 1)
assert(c.count(_._1.isDefined) === 5)
assert(c0.count(_._1.isDefined) === 5)
}

sparkTest("use left outer shuffle join to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
.transform(_.repartition(1))
.copy(sequences = sd)
val targets = sc.loadFeatures(targetsPath)
.transform(_.repartition(1))

val jRdd = features.leftOuterShuffleRegionJoin(targets)
val jRdd0 = features.leftOuterShuffleRegionJoin(targets, optPartitions = Some(4))

// we can't guarantee that we get exactly the number of partitions requested,
// we get close though
assert(jRdd.rdd.partitions.length === 1)
assert(jRdd0.rdd.partitions.length === 5)

val c = jRdd.rdd.collect
val c0 = jRdd0.rdd.collect
assert(c.count(_._2.isEmpty) === 15)
assert(c0.count(_._2.isEmpty) === 15)
assert(c.count(_._2.isDefined) === 5)
assert(c0.count(_._2.isDefined) === 5)
}

sparkTest("use full outer shuffle join to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
.transform(_.repartition(1))
.copy(sequences = sd)
val targets = sc.loadFeatures(targetsPath)
.transform(_.repartition(1))

val jRdd = features.fullOuterShuffleRegionJoin(targets)
val jRdd0 = features.fullOuterShuffleRegionJoin(targets, optPartitions = Some(4))

// we can't guarantee that we get exactly the number of partitions requested,
// we get close though
assert(jRdd.rdd.partitions.length === 1)
assert(jRdd0.rdd.partitions.length === 5)

val c = jRdd.rdd.collect
val c0 = jRdd0.rdd.collect
assert(c.count(t => t._1.isEmpty && t._2.isEmpty) === 0)
assert(c0.count(t => t._1.isEmpty && t._2.isEmpty) === 0)
assert(c.count(t => t._1.isDefined && t._2.isEmpty) === 15)
assert(c0.count(t => t._1.isDefined && t._2.isEmpty) === 15)
assert(c.count(t => t._1.isEmpty && t._2.isDefined) === 1)
assert(c0.count(t => t._1.isEmpty && t._2.isDefined) === 1)
assert(c.count(t => t._1.isDefined && t._2.isDefined) === 5)
assert(c0.count(t => t._1.isDefined && t._2.isDefined) === 5)
}

sparkTest("use shuffle join with group by to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
.transform(_.repartition(1))
.copy(sequences = sd)
val targets = sc.loadFeatures(targetsPath)
.transform(_.repartition(1))

val jRdd = features.shuffleRegionJoinAndGroupByLeft(targets)
val jRdd0 = features.shuffleRegionJoinAndGroupByLeft(targets, optPartitions = Some(4))

// we can't guarantee that we get exactly the number of partitions requested,
// we get close though
assert(jRdd.rdd.partitions.length === 1)
assert(jRdd0.rdd.partitions.length === 5)

val c = jRdd.rdd.collect
val c0 = jRdd0.rdd.collect
assert(c.size === 5)
assert(c0.size === 5)
assert(c.forall(_._2.size == 1))
assert(c0.forall(_._2.size == 1))
}

sparkTest("use right outer shuffle join with group by to pull down features mapped to targets") {
val featuresPath = testFile("small.1.narrowPeak")
val targetsPath = testFile("small.1.bed")

val features = sc.loadFeatures(featuresPath)
.transform(_.repartition(1))
.copy(sequences = sd)
val targets = sc.loadFeatures(targetsPath)
.transform(_.repartition(1))

val jRdd = features.rightOuterShuffleRegionJoinAndGroupByLeft(targets)
val jRdd0 = features.rightOuterShuffleRegionJoinAndGroupByLeft(targets, optPartitions = Some(4))

// we can't guarantee that we get exactly the number of partitions requested,
// we get close though
assert(jRdd.rdd.partitions.length === 1)
assert(jRdd0.rdd.partitions.length === 5)

val c = jRdd0.rdd.collect // FIXME
val c0 = jRdd0.rdd.collect

assert(c.count(_._1.isDefined) === 20)
assert(c0.count(_._1.isDefined) === 20)
assert(c.filter(_._1.isDefined).count(_._2.size == 1) === 5)
assert(c0.filter(_._1.isDefined).count(_._2.size == 1) === 5)
assert(c.filter(_._1.isDefined).count(_._2.isEmpty) === 15)
assert(c0.filter(_._1.isDefined).count(_._2.isEmpty) === 15)
assert(c.count(_._1.isEmpty) === 1)
assert(c0.count(_._1.isEmpty) === 1)
assert(c.filter(_._1.isEmpty).forall(_._2.size == 1))
assert(c0.filter(_._1.isEmpty).forall(_._2.size == 1))
}
}

Loading

0 comments on commit 32abf4b

Please sign in to comment.