From 019659ef26f6d898c051b85a5841cc26fc97e1ed Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Thu, 23 Feb 2017 20:53:24 -0800 Subject: [PATCH] Added instrumentation timers around joins. --- .../adam/instrumentation/Timers.scala | 12 +++++++++++ .../org/bdgenomics/adam/rdd/GenomicRDD.scala | 21 ++++++++++--------- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala b/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala index b613dc7f7c..9d99bf3c40 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala @@ -87,4 +87,16 @@ object Timers extends Metrics { val SortingRightSide = timer("Sorting right side of join") val GrowingTrees = timer("Growing forest of trees") val RunningMapSideJoin = timer("Running map-side join") + + // org.bdgenomics.adam.rdd.GenomicRDD + val InnerBroadcastJoin = timer("Inner broadcast region join") + val RightOuterBroadcastJoin = timer("Right outer broadcast region join") + val BroadcastJoinAndGroupByRight = timer("Broadcast join followed by group-by on right") + val RightOuterBroadcastJoinAndGroupByRight = timer("Right outer broadcast join followed by group-by on right") + val InnerShuffleJoin = timer("Inner shuffle region join") + val RightOuterShuffleJoin = timer("Right outer shuffle region join") + val LeftOuterShuffleJoin = timer("Left outer shuffle region join") + val FullOuterShuffleJoin = timer("Full outer shuffle region join") + val ShuffleJoinAndGroupByLeft = timer("Shuffle join followed by group-by on left") + val RightOuterShuffleJoinAndGroupByLeft = timer("Right outer shuffle join followed by group-by on left") } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala index 3f85bad899..f166a09c8c 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala @@ -25,6 +25,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.SparkFiles import org.apache.spark.api.java.JavaRDD import org.apache.spark.rdd.RDD +import org.bdgenomics.adam.instrumentation.Timers._ import org.bdgenomics.adam.models.{ RecordGroupDictionary, ReferenceRegion, @@ -403,7 +404,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] { */ def broadcastRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, X), Z]]( genomicRdd: GenomicRDD[X, Y])( - implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = { + implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = InnerBroadcastJoin.time { // key the RDDs and join GenericGenomicRDD[(T, X)](InnerTreeRegionJoin[T, X]().broadcastAndJoin( @@ -432,7 +433,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] { */ def rightOuterBroadcastRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], X), Z]]( genomicRdd: GenomicRDD[X, Y])( - implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = { + implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = RightOuterBroadcastJoin.time { // key the RDDs and join GenericGenomicRDD[(Option[T], X)](RightOuterTreeRegionJoin[T, X]().broadcastAndJoin( @@ -496,7 +497,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] { * overlapped in the genomic coordinate space. */ def broadcastRegionJoinAndGroupByRight[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Iterable[T], X), Z]](genomicRdd: GenomicRDD[X, Y])( - implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Iterable[T], X), Z] = { + implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Iterable[T], X), Z] = BroadcastJoinAndGroupByRight.time { // key the RDDs and join GenericGenomicRDD[(Iterable[T], X)](InnerTreeRegionJoinAndGroupByRight[T, X]().broadcastAndJoin( @@ -524,7 +525,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] { * right RDD that did not overlap a key in the left RDD. */ def rightOuterBroadcastRegionJoinAndGroupByRight[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Iterable[T], X), Z]](genomicRdd: GenomicRDD[X, Y])( - implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Iterable[T], X), Z] = { + implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Iterable[T], X), Z] = RightOuterBroadcastJoinAndGroupByRight.time { // key the RDDs and join GenericGenomicRDD[(Iterable[T], X)](RightOuterTreeRegionJoinAndGroupByRight[T, X]().broadcastAndJoin( @@ -554,7 +555,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] { def shuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, X), Z]]( genomicRdd: GenomicRDD[X, Y], optPartitions: Option[Int] = None)( - implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = { + implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, X), Z] = InnerShuffleJoin.time { val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd) @@ -589,7 +590,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] { def rightOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], X), Z]]( genomicRdd: GenomicRDD[X, Y], optPartitions: Option[Int] = None)( - implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = { + implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], X), Z] = RightOuterShuffleJoin.time { val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd) @@ -627,7 +628,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] { def leftOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, Option[X]), Z]]( genomicRdd: GenomicRDD[X, Y], optPartitions: Option[Int] = None)( - implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Option[X]), Z] = { + implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Option[X]), Z] = LeftOuterShuffleJoin.time { val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd) @@ -664,7 +665,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] { def fullOuterShuffleRegionJoin[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], Option[X]), Z]]( genomicRdd: GenomicRDD[X, Y], optPartitions: Option[Int] = None)( - implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Option[X]), Z] = { + implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Option[X]), Z] = FullOuterShuffleJoin.time { val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd) @@ -702,7 +703,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] { def shuffleRegionJoinAndGroupByLeft[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(T, Iterable[X]), Z]]( genomicRdd: GenomicRDD[X, Y], optPartitions: Option[Int] = None)( - implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Iterable[X]), Z] = { + implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(T, Iterable[X]), Z] = ShuffleJoinAndGroupByLeft.time { val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd) @@ -742,7 +743,7 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] { def rightOuterShuffleRegionJoinAndGroupByLeft[X, Y <: GenomicRDD[X, Y], Z <: GenomicRDD[(Option[T], Iterable[X]), Z]]( genomicRdd: GenomicRDD[X, Y], optPartitions: Option[Int] = None)( - implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Iterable[X]), Z] = { + implicit tTag: ClassTag[T], xTag: ClassTag[X]): GenomicRDD[(Option[T], Iterable[X]), Z] = RightOuterShuffleJoinAndGroupByLeft.time { val (partitionSize, endSequences) = joinPartitionSizeAndSequences(optPartitions, genomicRdd)