Skip to content

Commit

Permalink
Making TreeRegionJoin consistent with ShuffleRegionJoin
Browse files Browse the repository at this point in the history
  • Loading branch information
devin-petersohn authored and fnothaft committed May 5, 2017
1 parent 36d8e0b commit c77c600
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.reflect.ClassTag
* The broadcast values are stored in a sorted array. It was going to be an
* ensemble of interval trees, but, that didn't work out.
*/
trait TreeRegionJoin[T, U] {
trait TreeRegionJoin[T, U, RT, RU] extends RegionJoin[T, U, RT, RU] {

private[rdd] def runJoinAndGroupByRightWithTree(
tree: IntervalArray[ReferenceRegion, T],
Expand Down Expand Up @@ -84,7 +84,7 @@ trait TreeRegionJoin[T, U] {
/**
* Implements an inner region join where the left side of the join is broadcast.
*/
case class InnerTreeRegionJoin[T: ClassTag, U: ClassTag]() extends RegionJoin[T, U, T, U] with TreeRegionJoin[T, U] {
case class InnerTreeRegionJoin[T: ClassTag, U: ClassTag]() extends TreeRegionJoin[T, U, T, U] {

def broadcastAndJoin(tree: IntervalArray[ReferenceRegion, T],
joinedRDD: RDD[(ReferenceRegion, U)]): RDD[(T, U)] = {
Expand Down Expand Up @@ -119,8 +119,7 @@ case class InnerTreeRegionJoin[T: ClassTag, U: ClassTag]() extends RegionJoin[T,
* broadcast.
*/
case class RightOuterTreeRegionJoin[T: ClassTag, U: ClassTag]()
extends RegionJoin[T, U, Option[T], U]
with TreeRegionJoin[T, U] {
extends TreeRegionJoin[T, U, Option[T], U] {

def broadcastAndJoin(tree: IntervalArray[ReferenceRegion, T],
joinedRDD: RDD[(ReferenceRegion, U)]): RDD[(Option[T], U)] = {
Expand Down Expand Up @@ -168,8 +167,7 @@ case class RightOuterTreeRegionJoin[T: ClassTag, U: ClassTag]()
* values on the left grouped by the right value.
*/
case class InnerTreeRegionJoinAndGroupByRight[T: ClassTag, U: ClassTag]()
extends RegionJoin[T, U, Iterable[T], U]
with TreeRegionJoin[T, U] {
extends TreeRegionJoin[T, U, Iterable[T], U] {

def broadcastAndJoin(tree: IntervalArray[ReferenceRegion, T],
joinedRDD: RDD[(ReferenceRegion, U)]): RDD[(Iterable[T], U)] = {
Expand Down Expand Up @@ -203,8 +201,7 @@ case class InnerTreeRegionJoinAndGroupByRight[T: ClassTag, U: ClassTag]()
* collections on the left side of the join are kept.
*/
case class RightOuterTreeRegionJoinAndGroupByRight[T: ClassTag, U: ClassTag]()
extends RegionJoin[T, U, Iterable[T], U]
with TreeRegionJoin[T, U] {
extends TreeRegionJoin[T, U, Iterable[T], U] {

def broadcastAndJoin(tree: IntervalArray[ReferenceRegion, T],
joinedRDD: RDD[(ReferenceRegion, U)]): RDD[(Iterable[T], U)] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ import org.bdgenomics.formats.avro.{ AlignmentRecord, Variant }
import org.bdgenomics.utils.interval.array.IntervalArray
import scala.reflect.ClassTag

private case class ConcreteTreeRegionJoin[T: ClassTag, U]() extends TreeRegionJoin[T, U] {
}

class TreeRegionJoinSuite extends ADAMFunSuite {

sparkTest("run a join between data on a single contig") {
Expand Down Expand Up @@ -62,7 +59,7 @@ class TreeRegionJoinSuite extends ADAMFunSuite {
.build)
})

val joinData = ConcreteTreeRegionJoin().runJoinAndGroupByRightWithTree(tree,
val joinData = InnerTreeRegionJoin().runJoinAndGroupByRightWithTree(tree,
leftRdd)
.map(kv => {
val (k, v) = kv
Expand Down

0 comments on commit c77c600

Please sign in to comment.