Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package com.twitter.scalding.reducer_estimation

import scala.collection.JavaConverters._
import cascading.flow.FlowStep
import cascading.tap.{ Tap, CompositeTap }
import cascading.tap.hadoop.Hfs
import org.apache.hadoop.mapred.JobConf
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -193,15 +189,11 @@ trait RuntimeReducerEstimator extends HistoryReducerEstimator {
}

object MedianEstimationScheme extends RuntimeEstimationScheme {
import reducer_estimation.{ mean, median }

def estimateJobTime(times: Seq[Double]) = mean(times)
def estimateJobTime(times: Seq[Double]) = median(times)
def estimateTaskTime(times: Seq[Double]) = median(times)
}

object MeanEstimationScheme extends RuntimeEstimationScheme {
import reducer_estimation.{ mean, median }

def estimateJobTime(times: Seq[Double]) = mean(times)
def estimateTaskTime(times: Seq[Double]) = mean(times)
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
package com.twitter.scalding
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I wonder why the compile didn't fail before. Ahh, because it was a root package. :/ Good catch.


package object reducer_estimation {
def median(xs: Seq[Double]): Option[Double] = xs.sorted.lift(xs.length / 2)
def mean(xs: Seq[Double]): Option[Double] = if (xs.isEmpty) None else Some(xs.sum / xs.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.twitter.scalding.reducer_estimation.RuntimeReducerEstimator.{ Runtime
import com.twitter.scalding.platform.{ HadoopPlatformJobTest, HadoopSharedPlatformTest }
import org.scalatest.{ Matchers, WordSpec }
import scala.collection.JavaConverters._
import scala.util.{ Failure, Success, Try }
import scala.util.{ Success, Try }

object HistoryService1 extends HistoryServiceWithData {
import HistoryServiceWithData._
Expand Down Expand Up @@ -35,7 +35,6 @@ class DummyEstimator extends ReducerEstimator {
}

class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopSharedPlatformTest {
import HipJob._

"Single-step job with runtime-based reducer estimator" should {
"set reducers correctly with median estimation scheme" in {
Expand Down Expand Up @@ -63,7 +62,7 @@ class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopShar
// (1200 / inputSize) ms per byte
// (1800 / inputSize) ms per byte
//
// The mean of these is (1500 / inputSize) ms per byte,
// The median of these is (1500 / inputSize) ms per byte,
// so we anticipate that processing (inputSize bytes)
// will take 1500 ms total.
// To do this in 25 ms, we need 60 reducers.
Expand Down Expand Up @@ -158,11 +157,11 @@ class RuntimeReducerEstimatorTest extends WordSpec with Matchers with HadoopShar
//
// We don't scale by input size.
//
// The mean of these is 3600 ms, so we anticipate
// that the job will take 3600 ms total.
// The median of these is 3000 ms, so we anticipate
// that the job will take 3000 ms total.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala> com.twitter.scalding.reducer_estimation.mean(Seq(1000.0D, 200.0D, 2400.0D).map(_ * 3))
res0: Option[Double] = Some(3600.0)

scala> com.twitter.scalding.reducer_estimation.median(Seq(1000.0D, 200.0D, 2400.0D).map(_ * 3))
res1: Option[Double] = Some(3000.0)

//
// To do this in 25 ms, we need 144 reducers.
assert(conf.getNumReduceTasks == 144)
// To do this in 25 ms, we need 120 reducers.
assert(conf.getNumReduceTasks == 120)
}
.run
}
Expand Down