Skip to content

Commit

Permalink
[ADAM-1220] Fix optPartitions parameter in shuffle region join hooks …
Browse files Browse the repository at this point in the history
…in GenomicRDD.

Resolves #1220. Adds a function called in each of the shuffle join implementations
that calculates the sequence dictionary after the join, as well as the partition sizes
to request.
  • Loading branch information
fnothaft committed Nov 29, 2016
1 parent ac2142a commit 11725a8
Showing 1 changed file with 37 additions and 48 deletions.
85 changes: 37 additions & 48 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,31 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
.asInstanceOf[GenomicRDD[(Option[T], X), Z]]
}

/**
* Computes the partition size and final sequence dictionary for a join.
*
* @param optPartitions Optional user-requested number of partitions for the
* end of the shuffle.
* @param genomicRdd The genomic RDD we are joining against.
* @return Returns a tuple containing the (partition size, final sequence
* dictionary after the join).
*/
private[rdd] def joinPartitionSizeAndSequences[X, Y <: GenomicRDD[X, Y]](
optPartitions: Option[Int],
genomicRdd: GenomicRDD[X, Y]): (Long, SequenceDictionary) = {

// what sequences do we wind up with at the end?
val finalSequences = sequences ++ genomicRdd.sequences

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)

(finalSequences.records.map(_.length).sum / partitions,
finalSequences)
}

/**
* Performs a sort-merge inner join between this RDD and another RDD.
*
Expand All @@ -454,18 +479,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = {

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)

// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(T, X)](
InnerShuffleRegionJoin[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
Expand Down Expand Up @@ -493,18 +512,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = {

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)

// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(Option[T], X)](
RightOuterShuffleRegionJoin[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
Expand Down Expand Up @@ -535,18 +548,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Option[X]), Z] = {

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)

// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(T, Option[X])](
LeftOuterShuffleRegionJoin[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
Expand Down Expand Up @@ -576,18 +583,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Option[X]), Z] = {

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)

// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(Option[T], Option[X])](
FullOuterShuffleRegionJoin[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
Expand Down Expand Up @@ -618,18 +619,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Iterable[X]), Z] = {

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)

// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(T, Iterable[X])](
InnerShuffleRegionJoinAndGroupByLeft[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
Expand Down Expand Up @@ -662,18 +657,12 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Iterable[X]), Z] = {

// did the user provide a set partition count?
// if no, take the max partition count from our rdds
val partitions = optPartitions.getOrElse(Seq(rdd.partitions.length,
genomicRdd.rdd.partitions.length).max)

// what sequences do we wind up with at the end?
val endSequences = sequences ++ genomicRdd.sequences
val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)

// key the RDDs and join
GenericGenomicRDD[(Option[T], Iterable[X])](
RightOuterShuffleRegionJoinAndGroupByLeft[T, X](endSequences,
partitions,
partitionSize,
rdd.context).partitionAndJoin(flattenRddByRegions(),
genomicRdd.flattenRddByRegions()),
endSequences,
Expand Down

0 comments on commit 11725a8

Please sign in to comment.