Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added instrumentation timers around joins. #1401

Merged
merged 1 commit into from
Feb 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,16 @@ object Timers extends Metrics {
val SortingRightSide = timer("Sorting right side of join")
val GrowingTrees = timer("Growing forest of trees")
val RunningMapSideJoin = timer("Running map-side join")

// org.bdgenomics.adam.rdd.GenomicRDD
val InnerBroadcastJoin = timer("Inner broadcast region join")
val RightOuterBroadcastJoin = timer("Right outer broadcast region join")
val BroadcastJoinAndGroupByRight = timer("Broadcast join followed by group-by on right")
val RightOuterBroadcastJoinAndGroupByRight = timer("Right outer broadcast join followed by group-by on right")
val InnerShuffleJoin = timer("Inner shuffle region join")
val RightOuterShuffleJoin = timer("Right outer shuffle region join")
val LeftOuterShuffleJoin = timer("Left outer shuffle region join")
val FullOuterShuffleJoin = timer("Full outer shuffle region join")
val ShuffleJoinAndGroupByLeft = timer("Shuffle join followed by group-by on left")
val RightOuterShuffleJoinAndGroupByLeft = timer("Right outer shuffle join followed by group-by on left")
}
21 changes: 11 additions & 10 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.SparkFiles
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.{
RecordGroupDictionary,
ReferenceRegion,
Expand Down Expand Up @@ -403,7 +404,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
*/
def broadcastRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, X), Z]](
genomicRdd: GenomicRDD[X, Y])(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = InnerBroadcastJoin.time {

// key the RDDs and join
GenericGenomicRDD[(T, X)](InnerTreeRegionJoin[T, X]().broadcastAndJoin(
Expand Down Expand Up @@ -432,7 +433,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
*/
def rightOuterBroadcastRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], X), Z]](
genomicRdd: GenomicRDD[X, Y])(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = RightOuterBroadcastJoin.time {

// key the RDDs and join
GenericGenomicRDD[(Option[T], X)](RightOuterTreeRegionJoin[T, X]().broadcastAndJoin(
Expand Down Expand Up @@ -496,7 +497,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
* overlapped in the genomic coordinate space.
*/
def broadcastRegionJoinAndGroupByRight[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Iterable[T], X), Z]](genomicRdd: GenomicRDD[X, Y])(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Iterable[T], X), Z] = {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Iterable[T], X), Z] = BroadcastJoinAndGroupByRight.time {

// key the RDDs and join
GenericGenomicRDD[(Iterable[T], X)](InnerTreeRegionJoinAndGroupByRight[T, X]().broadcastAndJoin(
Expand Down Expand Up @@ -524,7 +525,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
* right RDD that did not overlap a key in the left RDD.
*/
def rightOuterBroadcastRegionJoinAndGroupByRight[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Iterable[T], X), Z]](genomicRdd: GenomicRDD[X, Y])(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Iterable[T], X), Z] = {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Iterable[T], X), Z] = RightOuterBroadcastJoinAndGroupByRight.time {

// key the RDDs and join
GenericGenomicRDD[(Iterable[T], X)](RightOuterTreeRegionJoinAndGroupByRight[T, X]().broadcastAndJoin(
Expand Down Expand Up @@ -554,7 +555,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
def shuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, X), Z]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = InnerShuffleJoin.time {

val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions,
genomicRdd)
Expand Down Expand Up @@ -589,7 +590,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
def rightOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], X), Z]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = RightOuterShuffleJoin.time {

val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions,
genomicRdd)
Expand Down Expand Up @@ -627,7 +628,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
def leftOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, Option[X]), Z]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Option[X]), Z] = {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Option[X]), Z] = LeftOuterShuffleJoin.time {

val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions,
genomicRdd)
Expand Down Expand Up @@ -664,7 +665,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
def fullOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], Option[X]), Z]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Option[X]), Z] = {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Option[X]), Z] = FullOuterShuffleJoin.time {

val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions,
genomicRdd)
Expand Down Expand Up @@ -702,7 +703,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
def shuffleRegionJoinAndGroupByLeft[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, Iterable[X]), Z]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Iterable[X]), Z] = {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Iterable[X]), Z] = ShuffleJoinAndGroupByLeft.time {

val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions,
genomicRdd)
Expand Down Expand Up @@ -742,7 +743,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] {
def rightOuterShuffleRegionJoinAndGroupByLeft[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], Iterable[X]), Z]](
genomicRdd: GenomicRDD[X, Y],
optPartitions: Option[Int] = None)(
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Iterable[X]), Z] = {
implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Iterable[X]), Z] = RightOuterShuffleJoinAndGroupByLeft.time {

val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions,
genomicRdd)
Expand Down